Add format_date helper to handle datetime objects in templates
The db now returns datetime objects instead of strings in some cases. Added format_date() helper function that handles both datetime and string values, and replaced all [:10] date slicing with calls to this helper. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
155
server.py
155
server.py
@@ -297,6 +297,17 @@ def wants_html(request: Request) -> bool:
|
||||
return "text/html" in accept and "application/json" not in accept and "application/activity+json" not in accept
|
||||
|
||||
|
||||
def format_date(value, length: int = 10) -> str:
|
||||
"""Format a date value (datetime or string) to a string, sliced to length."""
|
||||
if value is None:
|
||||
return ""
|
||||
if hasattr(value, 'isoformat'):
|
||||
return value.isoformat()[:length]
|
||||
if isinstance(value, str):
|
||||
return value[:length]
|
||||
return ""
|
||||
|
||||
|
||||
# ============ Auth UI Endpoints ============
|
||||
|
||||
@app.get("/login", response_class=HTMLResponse)
|
||||
@@ -483,7 +494,7 @@ async def ui_activity_detail(activity_index: int, request: Request):
|
||||
activity_id = activity.get("activity_id", "")
|
||||
actor_id = activity.get("actor_id", "")
|
||||
actor_name = actor_id.split("/")[-1] if actor_id else "unknown"
|
||||
published = activity.get("published", "")[:10]
|
||||
published = format_date(activity.get("published"))
|
||||
obj = activity.get("object_data", {})
|
||||
|
||||
# Object details
|
||||
@@ -576,7 +587,7 @@ async def ui_activity_detail(activity_index: int, request: Request):
|
||||
recipe = provenance.get("recipe", "")
|
||||
inputs = provenance.get("inputs", [])
|
||||
l1_run_id = provenance.get("l1_run_id", "")
|
||||
rendered_at = provenance.get("rendered_at", "")[:10] if provenance.get("rendered_at") else ""
|
||||
rendered_at = format_date(provenance.get("rendered_at"))
|
||||
effects_commit = provenance.get("effects_commit", "")
|
||||
effect_url = provenance.get("effect_url")
|
||||
infrastructure = provenance.get("infrastructure", {})
|
||||
@@ -747,7 +758,7 @@ async def ui_asset_detail(name: str, request: Request):
|
||||
origin = asset.get("origin") or {}
|
||||
provenance = asset.get("provenance") or {}
|
||||
metadata = asset.get("metadata") or {}
|
||||
created_at = asset.get("created_at", "")[:10]
|
||||
created_at = format_date(asset.get("created_at"))
|
||||
|
||||
type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600"
|
||||
|
||||
@@ -829,7 +840,7 @@ async def ui_asset_detail(name: str, request: Request):
|
||||
recipe = provenance.get("recipe", "")
|
||||
inputs = provenance.get("inputs", [])
|
||||
l1_run_id = provenance.get("l1_run_id", "")
|
||||
rendered_at = provenance.get("rendered_at", "")[:10] if provenance.get("rendered_at") else ""
|
||||
rendered_at = format_date(provenance.get("rendered_at"))
|
||||
effects_commit = provenance.get("effects_commit", "")
|
||||
infrastructure = provenance.get("infrastructure", {})
|
||||
|
||||
@@ -1273,13 +1284,14 @@ async def get_users_list(request: Request, page: int = 1, limit: int = 20):
|
||||
rows = ""
|
||||
for uname, user_data in users_page:
|
||||
webfinger = f"@{uname}@{DOMAIN}"
|
||||
created_at = format_date(user_data.get("created_at"))
|
||||
rows += f'''
|
||||
<tr class="border-b border-dark-500 hover:bg-dark-600 transition-colors">
|
||||
<td class="py-3 px-4">
|
||||
<a href="/users/{uname}" class="text-blue-400 hover:text-blue-300 font-medium">{uname}</a>
|
||||
</td>
|
||||
<td class="py-3 px-4"><code class="text-xs bg-dark-600 px-2 py-1 rounded text-gray-300">{webfinger}</code></td>
|
||||
<td class="py-3 px-4 text-gray-400">{user_data.get("created_at", "")[:10]}</td>
|
||||
<td class="py-3 px-4 text-gray-400">{created_at}</td>
|
||||
</tr>
|
||||
'''
|
||||
|
||||
@@ -2104,7 +2116,7 @@ async def get_activities(request: Request, page: int = 1, limit: int = 20):
|
||||
<td class="py-3 px-4">
|
||||
<a href="/users/{actor_name}" class="text-gray-400 hover:text-blue-300">{actor_name}</a>
|
||||
</td>
|
||||
<td class="py-3 px-4 text-gray-400">{activity.get("published", "")[:10]}</td>
|
||||
<td class="py-3 px-4 text-gray-400">{format_date(activity.get("published"))}</td>
|
||||
<td class="py-3 px-4">
|
||||
<a href="/activities/{activity_index}" class="px-3 py-1 bg-blue-600 hover:bg-blue-700 text-white text-xs font-medium rounded transition-colors">View</a>
|
||||
</td>
|
||||
@@ -2231,6 +2243,137 @@ async def get_object(content_hash: str, request: Request):
|
||||
raise HTTPException(404, f"Object not found: {content_hash}")
|
||||
|
||||
|
||||
# ============ Anchoring (Bitcoin timestamps) ============
|
||||
|
||||
@app.post("/anchors/create")
|
||||
async def create_anchor_endpoint(user: User = Depends(get_required_user)):
|
||||
"""
|
||||
Create a new anchor for all unanchored activities.
|
||||
|
||||
Builds a merkle tree, stores it on IPFS, and submits to OpenTimestamps
|
||||
for Bitcoin anchoring. The anchor proof is backed up to persistent storage.
|
||||
"""
|
||||
import anchoring
|
||||
import ipfs_client
|
||||
|
||||
# Get unanchored activities
|
||||
unanchored = await db.get_unanchored_activities()
|
||||
if not unanchored:
|
||||
return {"message": "No unanchored activities", "anchored": 0}
|
||||
|
||||
activity_ids = [a["activity_id"] for a in unanchored]
|
||||
|
||||
# Create anchor
|
||||
anchor = await anchoring.create_anchor(activity_ids, db, ipfs_client)
|
||||
|
||||
if anchor:
|
||||
return {
|
||||
"message": f"Anchored {len(activity_ids)} activities",
|
||||
"merkle_root": anchor["merkle_root"],
|
||||
"tree_ipfs_cid": anchor.get("tree_ipfs_cid"),
|
||||
"activity_count": anchor["activity_count"]
|
||||
}
|
||||
else:
|
||||
raise HTTPException(500, "Failed to create anchor")
|
||||
|
||||
|
||||
@app.get("/anchors")
|
||||
async def list_anchors():
|
||||
"""List all anchors."""
|
||||
anchors = await db.get_all_anchors()
|
||||
stats = await db.get_anchor_stats()
|
||||
return {
|
||||
"anchors": anchors,
|
||||
"stats": stats
|
||||
}
|
||||
|
||||
|
||||
@app.get("/anchors/{merkle_root}")
|
||||
async def get_anchor_endpoint(merkle_root: str):
|
||||
"""Get anchor details by merkle root."""
|
||||
anchor = await db.get_anchor(merkle_root)
|
||||
if not anchor:
|
||||
raise HTTPException(404, f"Anchor not found: {merkle_root}")
|
||||
return anchor
|
||||
|
||||
|
||||
@app.get("/anchors/{merkle_root}/tree")
|
||||
async def get_anchor_tree(merkle_root: str):
|
||||
"""Get the full merkle tree from IPFS."""
|
||||
anchor = await db.get_anchor(merkle_root)
|
||||
if not anchor:
|
||||
raise HTTPException(404, f"Anchor not found: {merkle_root}")
|
||||
|
||||
tree_cid = anchor.get("tree_ipfs_cid")
|
||||
if not tree_cid:
|
||||
raise HTTPException(404, "Anchor has no tree on IPFS")
|
||||
|
||||
import ipfs_client
|
||||
try:
|
||||
tree_bytes = ipfs_client.get_bytes(tree_cid)
|
||||
if tree_bytes:
|
||||
return json.loads(tree_bytes)
|
||||
except Exception as e:
|
||||
raise HTTPException(500, f"Failed to fetch tree from IPFS: {e}")
|
||||
|
||||
|
||||
@app.get("/anchors/verify/{activity_id}")
|
||||
async def verify_activity_anchor(activity_id: str):
|
||||
"""
|
||||
Verify an activity's anchor proof.
|
||||
|
||||
Returns the merkle proof showing this activity is included in an anchored batch.
|
||||
"""
|
||||
import anchoring
|
||||
import ipfs_client
|
||||
|
||||
# Get activity
|
||||
activity = await db.get_activity(activity_id)
|
||||
if not activity:
|
||||
raise HTTPException(404, f"Activity not found: {activity_id}")
|
||||
|
||||
anchor_root = activity.get("anchor_root")
|
||||
if not anchor_root:
|
||||
return {"verified": False, "reason": "Activity not yet anchored"}
|
||||
|
||||
# Get anchor
|
||||
anchor = await db.get_anchor(anchor_root)
|
||||
if not anchor:
|
||||
return {"verified": False, "reason": "Anchor record not found"}
|
||||
|
||||
# Get tree from IPFS
|
||||
tree_cid = anchor.get("tree_ipfs_cid")
|
||||
if not tree_cid:
|
||||
return {"verified": False, "reason": "Merkle tree not on IPFS"}
|
||||
|
||||
try:
|
||||
tree_bytes = ipfs_client.get_bytes(tree_cid)
|
||||
tree = json.loads(tree_bytes) if tree_bytes else None
|
||||
except Exception:
|
||||
return {"verified": False, "reason": "Failed to fetch tree from IPFS"}
|
||||
|
||||
if not tree:
|
||||
return {"verified": False, "reason": "Could not load merkle tree"}
|
||||
|
||||
# Get proof
|
||||
proof = anchoring.get_merkle_proof(tree, activity_id)
|
||||
if not proof:
|
||||
return {"verified": False, "reason": "Activity not in merkle tree"}
|
||||
|
||||
# Verify proof
|
||||
valid = anchoring.verify_merkle_proof(activity_id, proof, anchor_root)
|
||||
|
||||
return {
|
||||
"verified": valid,
|
||||
"activity_id": activity_id,
|
||||
"merkle_root": anchor_root,
|
||||
"tree_ipfs_cid": tree_cid,
|
||||
"proof": proof,
|
||||
"bitcoin_txid": anchor.get("bitcoin_txid"),
|
||||
"confirmed_at": anchor.get("confirmed_at")
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8200)
|
||||
|
||||
Reference in New Issue
Block a user