"""sx-pub Python IO helpers — actor, IPFS, collections, publishing, federation. These are called from SX defhandlers via (helper "pub-..." args...). All DB access uses g.s (per-request async session from register_db). """ from __future__ import annotations import hashlib import logging import os from datetime import datetime, timezone from typing import Any logger = logging.getLogger("sx.pub") SX_PUB_DOMAIN = os.getenv("SX_PUB_DOMAIN", "pub.sx-web.org") async def get_or_create_actor() -> dict[str, Any]: """Get or create the singleton sx-pub actor. Auto-generates RSA keypair.""" from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubActor result = await g.s.execute( select(SxPubActor).where(SxPubActor.preferred_username == "sx") ) actor = result.scalar_one_or_none() if actor is None: from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import serialization private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ).decode("utf-8") public_pem = private_key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ).decode("utf-8") actor = SxPubActor( preferred_username="sx", display_name="SX Language", summary="Federated SX specification publisher", public_key_pem=public_pem, private_key_pem=private_pem, domain=SX_PUB_DOMAIN, ) g.s.add(actor) await g.s.flush() logger.info("Created sx-pub actor id=%d domain=%s", actor.id, SX_PUB_DOMAIN) # Seed default collections on first run await seed_default_collections() return { "preferred-username": actor.preferred_username, "display-name": actor.display_name or actor.preferred_username, "summary": actor.summary or "", "public-key-pem": actor.public_key_pem, "domain": actor.domain, } async def seed_default_collections() -> None: """Create default collections if they don't exist.""" from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubCollection defaults = [ ("core-specs", "Core Specifications", "Language spec files — evaluator, parser, primitives, render", 0), ("platforms", "Platforms", "Host platform implementations — JavaScript, Python, OCaml", 1), ("components", "Components", "Reusable UI components published as content-addressed SX", 2), ("libraries", "Libraries", "SX library modules — stdlib, signals, freeze, web forms", 3), ] for slug, name, description, order in defaults: exists = await g.s.execute( select(SxPubCollection).where(SxPubCollection.slug == slug) ) if exists.scalar_one_or_none() is None: g.s.add(SxPubCollection( slug=slug, name=name, description=description, sort_order=order, )) await g.s.flush() logger.info("Seeded %d default collections", len(defaults)) async def list_collections() -> list[dict[str, Any]]: """List all pub collections.""" from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubCollection result = await g.s.execute( select(SxPubCollection).order_by(SxPubCollection.sort_order) ) return [ { "slug": c.slug, "name": c.name, "description": c.description or "", } for c in result.scalars().all() ] async def check_status() -> dict[str, Any]: """Health check — DB, IPFS, actor.""" status: dict[str, Any] = {"healthy": "true"} # DB try: from quart import g from sqlalchemy import text await g.s.execute(text("SELECT 1")) status["db"] = "connected" except Exception as e: status["db"] = f"error: {e}" status["healthy"] = "false" # IPFS try: from shared.utils.ipfs_client import is_available ok = await is_available() status["ipfs"] = "available" if ok else "unavailable" except Exception as e: status["ipfs"] = f"error: {e}" # Actor try: actor = await get_or_create_actor() status["actor"] = actor["preferred-username"] status["domain"] = actor["domain"] except Exception as e: status["actor"] = f"error: {e}" status["healthy"] = "false" return status # --------------------------------------------------------------------------- # Phase 2: Publishing + Browsing # --------------------------------------------------------------------------- async def publish_document(collection_slug: str, slug: str, content: str, title: str = "", summary: str = "") -> dict[str, Any]: """Pin SX content to IPFS and store in DB. Returns doc info dict.""" import hashlib from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubCollection, SxPubDocument from shared.utils.ipfs_client import add_bytes # Resolve collection result = await g.s.execute( select(SxPubCollection).where(SxPubCollection.slug == collection_slug) ) collection = result.scalar_one_or_none() if collection is None: return {"error": f"Collection not found: {collection_slug}"} # Hash content content_bytes = content.encode("utf-8") content_hash = hashlib.sha3_256(content_bytes).hexdigest() # Pin to IPFS try: cid = await add_bytes(content_bytes, pin=True) except Exception as e: logger.error("IPFS pin failed for %s/%s: %s", collection_slug, slug, e) return {"error": f"IPFS pin failed: {e}"} # Upsert document result = await g.s.execute( select(SxPubDocument).where( SxPubDocument.collection_id == collection.id, SxPubDocument.slug == slug, ) ) doc = result.scalar_one_or_none() if doc is None: doc = SxPubDocument( collection_id=collection.id, slug=slug, title=title or slug, summary=summary, content_hash=content_hash, ipfs_cid=cid, size_bytes=len(content_bytes), status="published", ) g.s.add(doc) else: doc.content_hash = content_hash doc.ipfs_cid = cid doc.size_bytes = len(content_bytes) doc.status = "published" if title: doc.title = title if summary: doc.summary = summary await g.s.flush() logger.info("Published %s/%s → %s (%d bytes)", collection_slug, slug, cid, len(content_bytes)) # Record Publish activity in outbox from shared.models.sx_pub import SxPubActivity g.s.add(SxPubActivity( activity_type="Publish", object_type="SxDocument", object_data={ "path": f"/pub/{collection_slug}/{slug}", "cid": cid, "collection": collection_slug, "slug": slug, "title": title or slug, "summary": summary, }, ipfs_cid=cid, )) await g.s.flush() return { "path": f"/pub/{collection_slug}/{slug}", "cid": cid, "hash": content_hash, "size": len(content_bytes), "collection": collection_slug, "slug": slug, "title": doc.title or slug, } async def collection_items(collection_slug: str) -> dict[str, Any]: """List published documents in a collection.""" from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubCollection, SxPubDocument result = await g.s.execute( select(SxPubCollection).where(SxPubCollection.slug == collection_slug) ) collection = result.scalar_one_or_none() if collection is None: return {"error": f"Collection not found: {collection_slug}"} result = await g.s.execute( select(SxPubDocument).where( SxPubDocument.collection_id == collection.id, SxPubDocument.status == "published", ).order_by(SxPubDocument.slug) ) docs = result.scalars().all() return { "collection": collection_slug, "name": collection.name, "description": collection.description or "", "items": [ { "slug": d.slug, "title": d.title or d.slug, "summary": d.summary or "", "cid": d.ipfs_cid or "", "size": d.size_bytes or 0, } for d in docs ], } async def resolve_document(collection_slug: str, slug: str) -> dict[str, Any]: """Resolve a document path to its content via IPFS.""" from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubCollection, SxPubDocument from shared.utils.ipfs_client import get_bytes result = await g.s.execute( select(SxPubCollection).where(SxPubCollection.slug == collection_slug) ) collection = result.scalar_one_or_none() if collection is None: return {"error": "not-found"} result = await g.s.execute( select(SxPubDocument).where( SxPubDocument.collection_id == collection.id, SxPubDocument.slug == slug, ) ) doc = result.scalar_one_or_none() if doc is None or not doc.ipfs_cid: return {"error": "not-found"} content_bytes = await get_bytes(doc.ipfs_cid) if content_bytes is None: return {"error": "ipfs-unavailable"} return { "slug": doc.slug, "title": doc.title or doc.slug, "summary": doc.summary or "", "cid": doc.ipfs_cid, "collection": collection_slug, "content": content_bytes.decode("utf-8", errors="replace"), } async def fetch_cid(cid: str) -> dict[str, Any]: """Fetch raw content from IPFS by CID.""" from shared.utils.ipfs_client import get_bytes content_bytes = await get_bytes(cid) if content_bytes is None: return {"error": "not-found"} return { "cid": cid, "content": content_bytes.decode("utf-8", errors="replace"), "size": len(content_bytes), } # --------------------------------------------------------------------------- # Phase 3: Federation — outbox, inbox, follow, delivery, signatures # --------------------------------------------------------------------------- async def outbox_data(page: str = "") -> dict[str, Any]: """List published activities (outbox).""" from quart import g from sqlalchemy import select, func as sa_func from shared.models.sx_pub import SxPubActivity page_num = int(page) if page else 1 per_page = 20 offset = (page_num - 1) * per_page total_result = await g.s.execute(select(sa_func.count(SxPubActivity.id))) total = total_result.scalar() or 0 result = await g.s.execute( select(SxPubActivity) .order_by(SxPubActivity.published.desc()) .offset(offset).limit(per_page) ) activities = result.scalars().all() return { "total": total, "page": page_num, "items": [ { "type": a.activity_type, "object-type": a.object_type or "", "published": a.published.isoformat() if a.published else "", "cid": a.ipfs_cid or "", "data": a.object_data or {}, } for a in activities ], } async def followers_data() -> list[dict[str, Any]]: """List followers.""" from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubFollower result = await g.s.execute( select(SxPubFollower).where(SxPubFollower.state == "accepted") .order_by(SxPubFollower.created_at.desc()) ) return [ { "acct": f.follower_acct, "actor-url": f.follower_actor_url, "inbox": f.follower_inbox, } for f in result.scalars().all() ] async def following_data() -> list[dict[str, Any]]: """List who we follow.""" from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubFollowing result = await g.s.execute( select(SxPubFollowing).where(SxPubFollowing.state == "accepted") .order_by(SxPubFollowing.created_at.desc()) ) return [ { "actor-url": f.remote_actor_url, "inbox": f.remote_inbox, } for f in result.scalars().all() ] def _sign_request(method: str, url: str, body: str, private_key_pem: str, key_id: str) -> dict[str, str]: """Generate HTTP Signature headers for an outgoing request.""" from urllib.parse import urlparse from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding import base64 parsed = urlparse(url) path = parsed.path host = parsed.netloc date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT") # Build signature string digest = "SHA-256=" + base64.b64encode( hashlib.sha256(body.encode("utf-8")).digest() ).decode("ascii") signed_string = ( f"(request-target): {method.lower()} {path}\n" f"host: {host}\n" f"date: {date}\n" f"digest: {digest}" ) private_key = serialization.load_pem_private_key( private_key_pem.encode("utf-8"), password=None, ) signature = base64.b64encode( private_key.sign( signed_string.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256(), ) ).decode("ascii") sig_header = ( f'keyId="{key_id}",' f'algorithm="rsa-sha256",' f'headers="(request-target) host date digest",' f'signature="{signature}"' ) return { "Host": host, "Date": date, "Digest": digest, "Signature": sig_header, "Content-Type": "text/sx; charset=utf-8", } def _verify_signature(headers: dict, method: str, path: str, body: str, public_key_pem: str) -> bool: """Verify HTTP Signature on an incoming request.""" import base64 import re from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding sig_header = headers.get("Signature", "") if not sig_header: return False # Parse signature header parts = {} for match in re.finditer(r'(\w+)="([^"]*)"', sig_header): parts[match.group(1)] = match.group(2) if "signature" not in parts or "headers" not in parts: return False # Reconstruct signed string signed_headers = parts["headers"].split() lines = [] for h in signed_headers: if h == "(request-target)": lines.append(f"(request-target): {method.lower()} {path}") else: val = headers.get(h.title(), headers.get(h, "")) lines.append(f"{h}: {val}") signed_string = "\n".join(lines) try: public_key = serialization.load_pem_public_key( public_key_pem.encode("utf-8"), ) public_key.verify( base64.b64decode(parts["signature"]), signed_string.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256(), ) return True except Exception as e: logger.warning("Signature verification failed: %s", e) return False async def follow_remote(actor_url: str) -> dict[str, Any]: """Send a Follow activity to a remote sx-pub server.""" import httpx from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubFollowing, SxPubActor # Check not already following result = await g.s.execute( select(SxPubFollowing).where(SxPubFollowing.remote_actor_url == actor_url) ) existing = result.scalar_one_or_none() if existing: return {"status": existing.state, "actor-url": actor_url} # Fetch remote actor document try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.get(actor_url, headers={"Accept": "text/sx"}) resp.raise_for_status() remote_actor_sx = resp.text except Exception as e: return {"error": f"Failed to fetch remote actor: {e}"} # Parse inbox URL from the SX actor document # Simple extraction — look for :inbox "..." import re inbox_match = re.search(r':inbox\s+"([^"]+)"', remote_actor_sx) if not inbox_match: return {"error": "Could not find inbox in remote actor document"} # Build absolute inbox URL from urllib.parse import urljoin remote_inbox = urljoin(actor_url, inbox_match.group(1)) # Get our actor for signing actor_result = await g.s.execute( select(SxPubActor).where(SxPubActor.preferred_username == "sx") ) our_actor = actor_result.scalar_one_or_none() if not our_actor: return {"error": "Local actor not initialized"} our_id = f"https://{our_actor.domain}/pub/actor" # Build Follow activity follow_sx = ( f'(Follow\n' f' :actor "{our_id}"\n' f' :object "{actor_url}")' ) # Sign and send key_id = f"{our_id}#main-key" headers = _sign_request("POST", remote_inbox, follow_sx, our_actor.private_key_pem, key_id) try: async with httpx.AsyncClient(timeout=15) as client: resp = await client.post(remote_inbox, content=follow_sx, headers=headers) logger.info("Sent Follow to %s → %d", remote_inbox, resp.status_code) except Exception as e: logger.error("Follow delivery failed to %s: %s", remote_inbox, e) # Store the following record following = SxPubFollowing( remote_actor_url=actor_url, remote_inbox=remote_inbox, state="pending", ) g.s.add(following) await g.s.flush() return {"status": "pending", "actor-url": actor_url, "inbox": remote_inbox} async def process_inbox(body_sx: str) -> dict[str, Any]: """Process an incoming activity from a remote sx-pub server.""" import re from quart import g from sqlalchemy import select from shared.models.sx_pub import ( SxPubFollower, SxPubFollowing, SxPubActivity, SxPubActor, ) from shared.utils.ipfs_client import pin_cid # Parse activity type and actor type_match = re.match(r'\((\w+)', body_sx.strip()) actor_match = re.search(r':actor\s+"([^"]+)"', body_sx) object_match = re.search(r':object\s+"([^"]+)"', body_sx) if not type_match: return {"error": "Could not parse activity type"} activity_type = type_match.group(1) remote_actor = actor_match.group(1) if actor_match else "" object_val = object_match.group(1) if object_match else "" logger.info("Inbox received: %s from %s", activity_type, remote_actor) if activity_type == "Follow": # Someone wants to follow us — auto-accept inbox_match = re.search(r':inbox\s+"([^"]+)"', body_sx) remote_inbox = inbox_match.group(1) if inbox_match else "" # If no inbox in activity, try fetching the remote actor if not remote_inbox and remote_actor: import httpx try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(remote_actor, headers={"Accept": "text/sx"}) im = re.search(r':inbox\s+"([^"]+)"', resp.text) if im: from urllib.parse import urljoin remote_inbox = urljoin(remote_actor, im.group(1)) except Exception: pass # Store follower result = await g.s.execute( select(SxPubFollower).where(SxPubFollower.follower_actor_url == remote_actor) ) follower = result.scalar_one_or_none() if follower is None: follower = SxPubFollower( follower_acct=remote_actor, follower_inbox=remote_inbox or remote_actor, follower_actor_url=remote_actor, state="accepted", ) g.s.add(follower) await g.s.flush() logger.info("Accepted follow from %s", remote_actor) # Send Accept back actor_result = await g.s.execute( select(SxPubActor).where(SxPubActor.preferred_username == "sx") ) our_actor = actor_result.scalar_one_or_none() if our_actor and remote_inbox: our_id = f"https://{our_actor.domain}/pub/actor" accept_sx = ( f'(Accept\n' f' :actor "{our_id}"\n' f' :object {body_sx})' ) key_id = f"{our_id}#main-key" headers = _sign_request("POST", remote_inbox, accept_sx, our_actor.private_key_pem, key_id) import httpx try: async with httpx.AsyncClient(timeout=10) as client: await client.post(remote_inbox, content=accept_sx, headers=headers) except Exception as e: logger.warning("Accept delivery failed: %s", e) return {"accepted": remote_actor} elif activity_type == "Accept": # Our follow was accepted — update state if object_val: result = await g.s.execute( select(SxPubFollowing).where( SxPubFollowing.remote_actor_url == remote_actor ) ) following = result.scalar_one_or_none() if following: following.state = "accepted" following.accepted_at = datetime.now(timezone.utc) await g.s.flush() logger.info("Follow accepted by %s", remote_actor) return {"accepted-by": remote_actor} elif activity_type == "Publish": # Pin the published CID locally cid_match = re.search(r':cid\s+"([^"]+)"', body_sx) if cid_match: cid = cid_match.group(1) pinned = await pin_cid(cid) logger.info("Mirrored CID %s from %s (pinned=%s)", cid, remote_actor, pinned) # Record the activity g.s.add(SxPubActivity( activity_type="Publish", object_type="SxDocument", object_data={"remote_actor": remote_actor, "body": body_sx}, ipfs_cid=cid_match.group(1) if cid_match else None, )) await g.s.flush() return {"mirrored": cid_match.group(1) if cid_match else ""} return {"ignored": activity_type} async def deliver_to_followers(activity_sx: str) -> dict[str, Any]: """Deliver an activity to all follower inboxes.""" import httpx from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubFollower, SxPubActor actor_result = await g.s.execute( select(SxPubActor).where(SxPubActor.preferred_username == "sx") ) our_actor = actor_result.scalar_one_or_none() if not our_actor: return {"error": "Actor not initialized", "delivered": 0} our_id = f"https://{our_actor.domain}/pub/actor" key_id = f"{our_id}#main-key" result = await g.s.execute( select(SxPubFollower).where(SxPubFollower.state == "accepted") ) followers = result.scalars().all() delivered = 0 failed = 0 for follower in followers: headers = _sign_request("POST", follower.follower_inbox, activity_sx, our_actor.private_key_pem, key_id) try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( follower.follower_inbox, content=activity_sx, headers=headers, ) if resp.status_code < 300: delivered += 1 else: failed += 1 logger.warning("Delivery to %s returned %d", follower.follower_inbox, resp.status_code) except Exception as e: failed += 1 logger.error("Delivery to %s failed: %s", follower.follower_inbox, e) logger.info("Delivered to %d/%d followers (%d failed)", delivered, len(followers), failed) return {"delivered": delivered, "failed": failed, "total": len(followers)} async def get_request_body() -> str: """Get the raw request body as text.""" from quart import request data = await request.get_data(as_text=True) return data async def get_request_headers() -> dict[str, str]: """Get request headers as a dict.""" from quart import request return dict(request.headers) async def get_request_method() -> str: """Get the HTTP method.""" from quart import request return request.method async def get_request_path() -> str: """Get the request path.""" from quart import request return request.path # --------------------------------------------------------------------------- # Phase 4: Anchoring — Merkle trees, OTS, verification # --------------------------------------------------------------------------- async def anchor_pending() -> dict[str, Any]: """Anchor all unanchored Publish activities into a Merkle tree. 1. Collect unanchored activities (by CID) 2. Build Merkle tree 3. Pin tree to IPFS 4. Submit root to OpenTimestamps 5. Store proof on IPFS 6. Record anchor in DB """ from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubActivity from shared.utils.anchoring import ( build_merkle_tree, submit_to_opentimestamps, ) from shared.utils.ipfs_client import add_json, add_bytes, is_available # Find unanchored Publish activities with CIDs result = await g.s.execute( select(SxPubActivity).where( SxPubActivity.activity_type == "Publish", SxPubActivity.ipfs_cid.isnot(None), SxPubActivity.object_data["anchor_tree_cid"].is_(None), ).order_by(SxPubActivity.created_at.asc()) .limit(100) ) activities = result.scalars().all() if not activities: return {"status": "nothing-to-anchor", "count": 0} # Build Merkle tree from CIDs cids = [a.ipfs_cid for a in activities if a.ipfs_cid] if not cids: return {"status": "no-cids", "count": 0} tree = build_merkle_tree(cids) merkle_root = tree["root"] # Pin tree to IPFS tree_cid = None ots_proof_cid = None if await is_available(): try: tree_data = { "root": merkle_root, "leaves": tree["leaves"], "cids": cids, "created_at": datetime.now(timezone.utc).isoformat(), } tree_cid = await add_json(tree_data) logger.info("Merkle tree pinned: %s (%d leaves)", tree_cid, len(cids)) except Exception as e: logger.error("IPFS tree storage failed: %s", e) # Submit to OpenTimestamps ots_proof = await submit_to_opentimestamps(merkle_root) if ots_proof and await is_available(): try: ots_proof_cid = await add_bytes(ots_proof) logger.info("OTS proof pinned: %s", ots_proof_cid) except Exception as e: logger.error("IPFS OTS proof storage failed: %s", e) # Record anchor in activities (store in object_data) anchor_info = { "merkle_root": merkle_root, "tree_cid": tree_cid, "ots_proof_cid": ots_proof_cid, "activity_count": len(activities), "anchored_at": datetime.now(timezone.utc).isoformat(), } for a in activities: data = dict(a.object_data or {}) data["anchor_tree_cid"] = tree_cid data["anchor_merkle_root"] = merkle_root data["anchor_ots_cid"] = ots_proof_cid a.object_data = data # Also record anchor as its own activity from shared.models.sx_pub import SxPubActivity as SPA g.s.add(SPA( activity_type="Anchor", object_type="MerkleTree", object_data=anchor_info, ipfs_cid=tree_cid, )) await g.s.flush() logger.info("Anchored %d activities, root=%s, tree=%s, ots=%s", len(activities), merkle_root, tree_cid, ots_proof_cid) return { "status": "anchored", "count": len(activities), "merkle-root": merkle_root, "tree-cid": tree_cid or "", "ots-proof-cid": ots_proof_cid or "", } async def verify_cid_anchor(cid: str) -> dict[str, Any]: """Verify the anchor proof for a specific CID.""" from quart import g from sqlalchemy import select from shared.models.sx_pub import SxPubActivity from shared.utils.anchoring import build_merkle_tree, verify_merkle_proof from shared.utils.ipfs_client import get_bytes # Find the Publish activity for this CID result = await g.s.execute( select(SxPubActivity).where( SxPubActivity.activity_type == "Publish", SxPubActivity.ipfs_cid == cid, ) ) activity = result.scalar_one_or_none() if not activity: return {"error": "not-found", "cid": cid} data = activity.object_data or {} tree_cid = data.get("anchor_tree_cid") merkle_root = data.get("anchor_merkle_root") ots_cid = data.get("anchor_ots_cid") if not tree_cid: return {"status": "not-anchored", "cid": cid} # Fetch the Merkle tree from IPFS to verify verified = False if tree_cid: tree_bytes = await get_bytes(tree_cid) if tree_bytes: import json try: tree_data = json.loads(tree_bytes) tree = build_merkle_tree(tree_data["cids"]) from shared.utils.anchoring import get_merkle_proof proof = get_merkle_proof(tree, cid) if proof is not None: verified = verify_merkle_proof(cid, proof, tree["root"]) except Exception as e: logger.warning("Merkle verification failed: %s", e) return { "status": "anchored" if verified else "unverified", "cid": cid, "merkle-root": merkle_root or "", "tree-cid": tree_cid or "", "ots-proof-cid": ots_cid or "", "verified": "true" if verified else "false", "published": activity.published.isoformat() if activity.published else "", }