From dd7a99e8b73264120912bf5800eea38ce064868d Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 21 Feb 2026 15:57:31 +0000 Subject: [PATCH] Add federation event handlers, AP delivery, and anchoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3-5 of ActivityPub integration: - Federation handlers: post.published, calendar_entry.created, product.listed → publish_activity() for AP outbox - AP delivery handler: federation.activity_created → sign + POST to follower inboxes with HTTP Signatures - IPFS storage wired into publish_activity() (best-effort) - Anchoring utility: merkle trees + OpenTimestamps Bitcoin timestamping Co-Authored-By: Claude Opus 4.6 --- events/handlers/__init__.py | 2 + events/handlers/ap_delivery_handler.py | 150 ++++++++++++++++ events/handlers/federation_handlers.py | 167 +++++++++++++++++ services/federation_impl.py | 23 ++- utils/anchoring.py | 236 +++++++++++++++++++++++++ 5 files changed, 577 insertions(+), 1 deletion(-) create mode 100644 events/handlers/ap_delivery_handler.py create mode 100644 events/handlers/federation_handlers.py create mode 100644 utils/anchoring.py diff --git a/events/handlers/__init__.py b/events/handlers/__init__.py index 380dc5a..76e2c9e 100644 --- a/events/handlers/__init__.py +++ b/events/handlers/__init__.py @@ -6,3 +6,5 @@ def register_shared_handlers(): import shared.events.handlers.container_handlers # noqa: F401 import shared.events.handlers.login_handlers # noqa: F401 import shared.events.handlers.order_handlers # noqa: F401 + import shared.events.handlers.federation_handlers # noqa: F401 + import shared.events.handlers.ap_delivery_handler # noqa: F401 diff --git a/events/handlers/ap_delivery_handler.py b/events/handlers/ap_delivery_handler.py new file mode 100644 index 0000000..cc8a69a --- /dev/null +++ b/events/handlers/ap_delivery_handler.py @@ -0,0 +1,150 @@ +"""Deliver AP activities to remote followers. + +On ``federation.activity_created`` → load activity + actor + followers → +sign with HTTP Signatures → POST to each follower inbox. +""" +from __future__ import annotations + +import logging + +import httpx +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.events.bus import register_handler, DomainEvent +from shared.models.federation import ActorProfile, APActivity, APFollower +from shared.services.registry import services + +log = logging.getLogger(__name__) + +AP_CONTENT_TYPE = "application/activity+json" +DELIVERY_TIMEOUT = 15 # seconds per request + + +def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) -> dict: + """Build the full AP activity JSON-LD for delivery.""" + username = actor.preferred_username + actor_url = f"https://{domain}/users/{username}" + + obj = dict(activity.object_data or {}) + obj.setdefault("id", activity.activity_id + "/object") + obj.setdefault("type", activity.object_type) + obj.setdefault("attributedTo", actor_url) + obj.setdefault("published", activity.published.isoformat() if activity.published else None) + + return { + "@context": "https://www.w3.org/ns/activitystreams", + "id": activity.activity_id, + "type": activity.activity_type, + "actor": actor_url, + "published": activity.published.isoformat() if activity.published else None, + "to": ["https://www.w3.org/ns/activitystreams#Public"], + "cc": [f"{actor_url}/followers"], + "object": obj, + } + + +async def _deliver_to_inbox( + client: httpx.AsyncClient, + inbox_url: str, + body: dict, + actor: ActorProfile, + domain: str, +) -> bool: + """POST signed activity to a single inbox. Returns True on success.""" + from shared.utils.http_signatures import sign_request + import json + + body_bytes = json.dumps(body).encode() + key_id = f"https://{domain}/users/{actor.preferred_username}#main-key" + + headers = sign_request( + method="POST", + url=inbox_url, + body=body_bytes, + private_key_pem=actor.private_key_pem, + key_id=key_id, + ) + headers["Content-Type"] = AP_CONTENT_TYPE + + try: + resp = await client.post( + inbox_url, + content=body_bytes, + headers=headers, + timeout=DELIVERY_TIMEOUT, + ) + if resp.status_code < 300: + log.info("Delivered to %s → %d", inbox_url, resp.status_code) + return True + else: + log.warning("Delivery to %s → %d: %s", inbox_url, resp.status_code, resp.text[:200]) + return False + except Exception: + log.exception("Delivery failed for %s", inbox_url) + return False + + +async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None: + """Deliver a newly created activity to all followers.""" + import os + + if not services.has("federation"): + return + + payload = event.payload + activity_id_uri = payload.get("activity_id") + if not activity_id_uri: + return + + domain = os.getenv("AP_DOMAIN", "rose-ash.com") + + # Load the activity + activity = ( + await session.execute( + select(APActivity).where(APActivity.activity_id == activity_id_uri) + ) + ).scalar_one_or_none() + if not activity: + log.warning("Activity not found: %s", activity_id_uri) + return + + # Load actor with private key + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.id == activity.actor_profile_id) + ) + ).scalar_one_or_none() + if not actor or not actor.private_key_pem: + log.warning("Actor not found or missing key for activity %s", activity_id_uri) + return + + # Load followers + followers = ( + await session.execute( + select(APFollower).where(APFollower.actor_profile_id == actor.id) + ) + ).scalars().all() + + if not followers: + log.debug("No followers to deliver to for %s", activity_id_uri) + return + + # Build activity JSON + activity_json = _build_activity_json(activity, actor, domain) + + # Deliver to each follower inbox + # Deduplicate inboxes (multiple followers might share a shared inbox) + inboxes = {f.follower_inbox for f in followers if f.follower_inbox} + + log.info( + "Delivering %s to %d inbox(es) for @%s", + activity.activity_type, len(inboxes), actor.preferred_username, + ) + + async with httpx.AsyncClient() as client: + for inbox_url in inboxes: + await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain) + + +register_handler("federation.activity_created", on_activity_created) diff --git a/events/handlers/federation_handlers.py b/events/handlers/federation_handlers.py new file mode 100644 index 0000000..152434f --- /dev/null +++ b/events/handlers/federation_handlers.py @@ -0,0 +1,167 @@ +"""Federation event handlers — publish domain content as AP activities. + +Listens for content events and calls services.federation.publish_activity() +to create AP activities. Each handler checks: + 1. services.has("federation") — skip if federation not wired + 2. The content has a user_id — skip anonymous/system content + 3. The user has an ActorProfile — skip users who haven't chosen a username +""" +from __future__ import annotations + +import logging + +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.events.bus import register_handler, DomainEvent +from shared.services.registry import services + +log = logging.getLogger(__name__) + + +async def _try_publish( + session: AsyncSession, + *, + user_id: int | None, + activity_type: str, + object_type: str, + object_data: dict, + source_type: str, + source_id: int, +) -> None: + """Publish an AP activity if federation is available and user has a profile.""" + if not services.has("federation"): + return + + if not user_id: + return + + # Check user has an ActorProfile (chose a username) + actor = await services.federation.get_actor_by_user_id(session, user_id) + if not actor: + return + + # Don't re-publish if we already have an activity for this source + existing = await services.federation.get_activity_for_source( + session, source_type, source_id, + ) + if existing and activity_type == "Create": + return # Already published + + try: + await services.federation.publish_activity( + session, + actor_user_id=user_id, + activity_type=activity_type, + object_type=object_type, + object_data=object_data, + source_type=source_type, + source_id=source_id, + ) + log.info( + "Published %s/%s for %s#%d by user %d", + activity_type, object_type, source_type, source_id, user_id, + ) + except Exception: + log.exception("Failed to publish activity for %s#%d", source_type, source_id) + + +# -- Post published/updated (from Ghost webhook sync) ------------------------- + +async def on_post_published(event: DomainEvent, session: AsyncSession) -> None: + p = event.payload + await _try_publish( + session, + user_id=p.get("user_id"), + activity_type="Create", + object_type="Article", + object_data={ + "name": p.get("title", ""), + "content": p.get("excerpt", ""), + "url": p.get("url", ""), + }, + source_type="Post", + source_id=event.aggregate_id, + ) + + +async def on_post_updated(event: DomainEvent, session: AsyncSession) -> None: + p = event.payload + await _try_publish( + session, + user_id=p.get("user_id"), + activity_type="Update", + object_type="Article", + object_data={ + "name": p.get("title", ""), + "content": p.get("excerpt", ""), + "url": p.get("url", ""), + }, + source_type="Post", + source_id=event.aggregate_id, + ) + + +# -- Calendar entry created/updated ------------------------------------------- + +async def on_calendar_entry_created(event: DomainEvent, session: AsyncSession) -> None: + p = event.payload + await _try_publish( + session, + user_id=p.get("user_id"), + activity_type="Create", + object_type="Event", + object_data={ + "name": p.get("title", ""), + "startTime": p.get("start_time", ""), + "endTime": p.get("end_time", ""), + "url": p.get("url", ""), + }, + source_type="CalendarEntry", + source_id=event.aggregate_id, + ) + + +async def on_calendar_entry_updated(event: DomainEvent, session: AsyncSession) -> None: + p = event.payload + await _try_publish( + session, + user_id=p.get("user_id"), + activity_type="Update", + object_type="Event", + object_data={ + "name": p.get("title", ""), + "startTime": p.get("start_time", ""), + "endTime": p.get("end_time", ""), + "url": p.get("url", ""), + }, + source_type="CalendarEntry", + source_id=event.aggregate_id, + ) + + +# -- Product listed/updated --------------------------------------------------- + +async def on_product_listed(event: DomainEvent, session: AsyncSession) -> None: + p = event.payload + await _try_publish( + session, + user_id=p.get("user_id"), + activity_type="Create", + object_type="Object", + object_data={ + "name": p.get("title", ""), + "summary": p.get("description", ""), + "url": p.get("url", ""), + }, + source_type="Product", + source_id=event.aggregate_id, + ) + + +# -- Registration -------------------------------------------------------------- + +register_handler("post.published", on_post_published) +register_handler("post.updated", on_post_updated) +register_handler("calendar_entry.created", on_calendar_entry_created) +register_handler("calendar_entry.updated", on_calendar_entry_updated) +register_handler("product.listed", on_product_listed) diff --git a/services/federation_impl.py b/services/federation_impl.py index cc197ec..9ae761a 100644 --- a/services/federation_impl.py +++ b/services/federation_impl.py @@ -158,7 +158,28 @@ class SqlFederationService: session.add(activity) await session.flush() - # Emit domain event for downstream processing (IPFS storage, delivery) + # Store activity JSON on IPFS (best-effort — don't fail publish if IPFS down) + try: + from shared.utils.ipfs_client import add_json, is_available + if await is_available(): + activity_json = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": activity_uri, + "type": activity_type, + "actor": f"https://{domain}/users/{username}", + "published": now.isoformat(), + "object": { + "type": object_type, + **object_data, + }, + } + cid = await add_json(activity_json) + activity.ipfs_cid = cid + await session.flush() + except Exception: + pass # IPFS failure is non-fatal + + # Emit domain event for downstream processing (delivery) from shared.events import emit_event await emit_event( session, diff --git a/utils/anchoring.py b/utils/anchoring.py new file mode 100644 index 0000000..8e81580 --- /dev/null +++ b/utils/anchoring.py @@ -0,0 +1,236 @@ +"""Merkle tree construction and OpenTimestamps anchoring. + +Ported from ~/art-dag/activity-pub/anchoring.py. +Builds a SHA256 merkle tree from activity IDs, submits the root to +OpenTimestamps for Bitcoin timestamping, and stores the tree + proof on IPFS. +""" +from __future__ import annotations + +import hashlib +import logging +from datetime import datetime, timezone + +import httpx +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.models.federation import APActivity, APAnchor + +log = logging.getLogger(__name__) + +OTS_SERVERS = [ + "https://a.pool.opentimestamps.org", + "https://b.pool.opentimestamps.org", + "https://a.pool.eternitywall.com", +] + + +def _sha256(data: str | bytes) -> str: + """SHA256 hex digest.""" + if isinstance(data, str): + data = data.encode() + return hashlib.sha256(data).hexdigest() + + +def build_merkle_tree(items: list[str]) -> dict: + """Build a SHA256 merkle tree from a list of strings (activity IDs). + + Returns: + { + "root": hex_str, + "leaves": [hex_str, ...], + "levels": [[hex_str, ...], ...], # bottom-up + } + """ + if not items: + raise ValueError("Cannot build merkle tree from empty list") + + # Sort for deterministic ordering + items = sorted(items) + + # Leaf hashes + leaves = [_sha256(item) for item in items] + levels = [leaves[:]] + + current = leaves[:] + while len(current) > 1: + next_level = [] + for i in range(0, len(current), 2): + left = current[i] + right = current[i + 1] if i + 1 < len(current) else left + combined = _sha256(left + right) + next_level.append(combined) + levels.append(next_level) + current = next_level + + return { + "root": current[0], + "leaves": leaves, + "levels": levels, + } + + +def get_merkle_proof(tree: dict, item: str) -> list[dict] | None: + """Generate a proof-of-membership for an item. + + Returns a list of {sibling: hex, position: "left"|"right"} dicts, + or None if the item is not in the tree. + """ + item_hash = _sha256(item) + leaves = tree["leaves"] + + try: + idx = leaves.index(item_hash) + except ValueError: + return None + + proof = [] + for level in tree["levels"][:-1]: # skip root level + if idx % 2 == 0: + sibling_idx = idx + 1 + position = "right" + else: + sibling_idx = idx - 1 + position = "left" + + if sibling_idx < len(level): + proof.append({"sibling": level[sibling_idx], "position": position}) + else: + proof.append({"sibling": level[idx], "position": position}) + + idx = idx // 2 + + return proof + + +def verify_merkle_proof(item: str, proof: list[dict], root: str) -> bool: + """Verify a merkle proof against a root hash.""" + current = _sha256(item) + for step in proof: + sibling = step["sibling"] + if step["position"] == "right": + current = _sha256(current + sibling) + else: + current = _sha256(sibling + current) + return current == root + + +async def submit_to_opentimestamps(merkle_root: str) -> bytes | None: + """Submit a hash to OpenTimestamps. Returns the (incomplete) OTS proof bytes.""" + root_bytes = bytes.fromhex(merkle_root) + + for server in OTS_SERVERS: + try: + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.post( + f"{server}/digest", + content=root_bytes, + headers={"Content-Type": "application/x-opentimestamps"}, + ) + if resp.status_code == 200: + log.info("OTS proof obtained from %s", server) + return resp.content + except Exception: + log.debug("OTS server %s failed", server, exc_info=True) + continue + + log.warning("All OTS servers failed for root %s", merkle_root) + return None + + +async def upgrade_ots_proof(proof_bytes: bytes) -> tuple[bytes, bool]: + """Try to upgrade an incomplete OTS proof to a Bitcoin-confirmed one. + + Returns (proof_bytes, is_confirmed). The proof_bytes may be updated. + """ + # OpenTimestamps upgrade is done via the `ots` CLI or their calendar API. + # For now, return the proof as-is with is_confirmed=False. + # TODO: Implement calendar-based upgrade polling. + return proof_bytes, False + + +async def create_anchor( + session: AsyncSession, + batch_size: int = 100, +) -> APAnchor | None: + """Anchor a batch of un-anchored activities. + + 1. Select activities without an anchor_id + 2. Build merkle tree from their activity_ids + 3. Store tree on IPFS + 4. Submit root to OpenTimestamps + 5. Store OTS proof on IPFS + 6. Create APAnchor record + 7. Link activities to anchor + """ + # Find un-anchored activities + result = await session.execute( + select(APActivity) + .where( + APActivity.anchor_id.is_(None), + APActivity.is_local == True, # noqa: E712 + ) + .order_by(APActivity.created_at.asc()) + .limit(batch_size) + ) + activities = result.scalars().all() + + if not activities: + log.debug("No un-anchored activities to process") + return None + + activity_ids = [a.activity_id for a in activities] + log.info("Anchoring %d activities", len(activity_ids)) + + # Build merkle tree + tree = build_merkle_tree(activity_ids) + merkle_root = tree["root"] + + # Store tree on IPFS + tree_cid = None + ots_proof_cid = None + try: + from shared.utils.ipfs_client import add_json, add_bytes, is_available + if await is_available(): + tree_cid = await add_json({ + "root": merkle_root, + "leaves": tree["leaves"], + "activity_ids": activity_ids, + "created_at": datetime.now(timezone.utc).isoformat(), + }) + log.info("Merkle tree stored on IPFS: %s", tree_cid) + except Exception: + log.exception("IPFS tree storage failed") + + # Submit to OpenTimestamps + ots_proof = await submit_to_opentimestamps(merkle_root) + if ots_proof: + try: + from shared.utils.ipfs_client import add_bytes, is_available + if await is_available(): + ots_proof_cid = await add_bytes(ots_proof) + log.info("OTS proof stored on IPFS: %s", ots_proof_cid) + except Exception: + log.exception("IPFS OTS proof storage failed") + + # Create anchor record + anchor = APAnchor( + merkle_root=merkle_root, + tree_ipfs_cid=tree_cid, + ots_proof_cid=ots_proof_cid, + activity_count=len(activities), + ) + session.add(anchor) + await session.flush() + + # Link activities to anchor + for a in activities: + a.anchor_id = anchor.id + await session.flush() + + log.info( + "Anchor created: root=%s, activities=%d, tree_cid=%s", + merkle_root, len(activities), tree_cid, + ) + + return anchor