diff --git a/sx/sx/handlers/pub-api.sx b/sx/sx/handlers/pub-api.sx index 11c8a47a..ffea0630 100644 --- a/sx/sx/handlers/pub-api.sx +++ b/sx/sx/handlers/pub-api.sx @@ -215,3 +215,125 @@ (set-response-header "Content-Type" "text/sx; charset=utf-8") (set-response-header "Cache-Control" "public, max-age=31536000, immutable") (get data "content"))))) + + +;; ========================================================================== +;; Phase 3: Federation — outbox, inbox, follow, followers, following +;; ========================================================================== + + +;; -------------------------------------------------------------------------- +;; Outbox +;; -------------------------------------------------------------------------- + +(defhandler pub-outbox + :path "/pub/outbox" + :method :get + :returns "element" + (&key) + (let ((page (helper "request-arg" "page" "")) + (data (helper "pub-outbox-data" page))) + (do + (set-response-header "Content-Type" "text/sx; charset=utf-8") + (let ((items (map (fn (a) + (str "\n (" (get a "type") + " :object-type \"" (get a "object-type") "\"" + " :published \"" (get a "published") "\"" + " :cid \"" (get a "cid") "\")")) + (get data "items")))) + (str + "(SxOutbox" + "\n :total " (get data "total") + "\n :page " (get data "page") + (join "" items) ")"))))) + + +;; -------------------------------------------------------------------------- +;; Inbox +;; -------------------------------------------------------------------------- + +(defhandler pub-inbox + :path "/pub/inbox" + :method :post + :csrf false + :returns "element" + (&key) + (let ((body (helper "pub-request-body"))) + (if (= body "") + (do + (set-response-status 400) + (set-response-header "Content-Type" "text/sx; charset=utf-8") + "(Error :message \"Empty body\")") + (let ((result (helper "pub-process-inbox" body))) + (do + (set-response-status 202) + (set-response-header "Content-Type" "text/sx; charset=utf-8") + (str "(Accepted :result " (str result) ")")))))) + + +;; -------------------------------------------------------------------------- +;; Follow a remote server +;; -------------------------------------------------------------------------- + +(defhandler pub-follow + :path "/pub/follow" + :method :post + :csrf false + :returns "element" + (&key) + (let ((actor-url (helper "request-form" "actor_url" ""))) + (if (= actor-url "") + (do + (set-response-status 400) + (set-response-header "Content-Type" "text/sx; charset=utf-8") + "(Error :message \"Missing actor_url\")") + (let ((result (helper "pub-follow-remote" actor-url))) + (do + (set-response-header "Content-Type" "text/sx; charset=utf-8") + (if (get result "error") + (do + (set-response-status 502) + (str "(Error :message \"" (get result "error") "\")")) + (str + "(FollowSent" + "\n :actor-url \"" (get result "actor-url") "\"" + "\n :status \"" (get result "status") "\")"))))))) + + +;; -------------------------------------------------------------------------- +;; Followers +;; -------------------------------------------------------------------------- + +(defhandler pub-followers + :path "/pub/followers" + :method :get + :returns "element" + (&key) + (let ((data (helper "pub-followers-data"))) + (do + (set-response-header "Content-Type" "text/sx; charset=utf-8") + (let ((items (map (fn (f) + (str "\n (SxFollower" + " :acct \"" (get f "acct") "\"" + " :actor-url \"" (get f "actor-url") "\")")) + data))) + (str "(SxFollowers" (join "" items) ")"))))) + + +;; -------------------------------------------------------------------------- +;; Following +;; -------------------------------------------------------------------------- + +(defhandler pub-following + :path "/pub/following" + :method :get + :returns "element" + (&key) + (let ((data (helper "pub-following-data"))) + (do + (set-response-header "Content-Type" "text/sx; charset=utf-8") + (let ((items (map (fn (f) + (str "\n (SxFollowing" + " :actor-url \"" (get f "actor-url") "\")")) + data))) + (str "(SxFollowing" (join "" items) ")"))))) diff --git a/sx/sxc/pages/helpers.py b/sx/sxc/pages/helpers.py index 9ecbb368..0877c6cb 100644 --- a/sx/sxc/pages/helpers.py +++ b/sx/sxc/pages/helpers.py @@ -45,6 +45,13 @@ def _register_sx_helpers() -> None: "pub-collection-items": _pub_collection_items, "pub-resolve-document": _pub_resolve_document, "pub-fetch-cid": _pub_fetch_cid, + "pub-outbox-data": _pub_outbox_data, + "pub-followers-data": _pub_followers_data, + "pub-following-data": _pub_following_data, + "pub-follow-remote": _pub_follow_remote, + "pub-process-inbox": _pub_process_inbox, + "pub-deliver-to-followers": _pub_deliver_to_followers, + "pub-request-body": _pub_request_body, }) @@ -1764,3 +1771,38 @@ async def _pub_resolve_document(collection_slug, slug): async def _pub_fetch_cid(cid): from .pub_helpers import fetch_cid return await fetch_cid(cid) + + +async def _pub_outbox_data(page=""): + from .pub_helpers import outbox_data + return await outbox_data(page) + + +async def _pub_followers_data(): + from .pub_helpers import followers_data + return await followers_data() + + +async def _pub_following_data(): + from .pub_helpers import following_data + return await following_data() + + +async def _pub_follow_remote(actor_url): + from .pub_helpers import follow_remote + return await follow_remote(actor_url) + + +async def _pub_process_inbox(body_sx): + from .pub_helpers import process_inbox + return await process_inbox(body_sx) + + +async def _pub_deliver_to_followers(activity_sx): + from .pub_helpers import deliver_to_followers + return await deliver_to_followers(activity_sx) + + +async def _pub_request_body(): + from .pub_helpers import get_request_body + return await get_request_body() diff --git a/sx/sxc/pages/pub_helpers.py b/sx/sxc/pages/pub_helpers.py index 48ed7c42..535816f0 100644 --- a/sx/sxc/pages/pub_helpers.py +++ b/sx/sxc/pages/pub_helpers.py @@ -1,12 +1,14 @@ -"""sx-pub Python IO helpers — actor management, IPFS status, collections. +"""sx-pub Python IO helpers — actor, IPFS, collections, publishing, federation. These are called from SX defhandlers via (helper "pub-..." args...). All DB access uses g.s (per-request async session from register_db). """ from __future__ import annotations +import hashlib import logging import os +from datetime import datetime, timezone from typing import Any logger = logging.getLogger("sx.pub") @@ -210,6 +212,23 @@ async def publish_document(collection_slug: str, slug: str, content: str, await g.s.flush() logger.info("Published %s/%s → %s (%d bytes)", collection_slug, slug, cid, len(content_bytes)) + # Record Publish activity in outbox + from shared.models.sx_pub import SxPubActivity + g.s.add(SxPubActivity( + activity_type="Publish", + object_type="SxDocument", + object_data={ + "path": f"/pub/{collection_slug}/{slug}", + "cid": cid, + "collection": collection_slug, + "slug": slug, + "title": title or slug, + "summary": summary, + }, + ipfs_cid=cid, + )) + await g.s.flush() + return { "path": f"/pub/{collection_slug}/{slug}", "cid": cid, @@ -310,3 +329,451 @@ async def fetch_cid(cid: str) -> dict[str, Any]: "content": content_bytes.decode("utf-8", errors="replace"), "size": len(content_bytes), } + + +# --------------------------------------------------------------------------- +# Phase 3: Federation — outbox, inbox, follow, delivery, signatures +# --------------------------------------------------------------------------- + +async def outbox_data(page: str = "") -> dict[str, Any]: + """List published activities (outbox).""" + from quart import g + from sqlalchemy import select, func as sa_func + from shared.models.sx_pub import SxPubActivity + + page_num = int(page) if page else 1 + per_page = 20 + offset = (page_num - 1) * per_page + + total_result = await g.s.execute(select(sa_func.count(SxPubActivity.id))) + total = total_result.scalar() or 0 + + result = await g.s.execute( + select(SxPubActivity) + .order_by(SxPubActivity.published.desc()) + .offset(offset).limit(per_page) + ) + activities = result.scalars().all() + + return { + "total": total, + "page": page_num, + "items": [ + { + "type": a.activity_type, + "object-type": a.object_type or "", + "published": a.published.isoformat() if a.published else "", + "cid": a.ipfs_cid or "", + "data": a.object_data or {}, + } + for a in activities + ], + } + + +async def followers_data() -> list[dict[str, Any]]: + """List followers.""" + from quart import g + from sqlalchemy import select + from shared.models.sx_pub import SxPubFollower + + result = await g.s.execute( + select(SxPubFollower).where(SxPubFollower.state == "accepted") + .order_by(SxPubFollower.created_at.desc()) + ) + return [ + { + "acct": f.follower_acct, + "actor-url": f.follower_actor_url, + "inbox": f.follower_inbox, + } + for f in result.scalars().all() + ] + + +async def following_data() -> list[dict[str, Any]]: + """List who we follow.""" + from quart import g + from sqlalchemy import select + from shared.models.sx_pub import SxPubFollowing + + result = await g.s.execute( + select(SxPubFollowing).where(SxPubFollowing.state == "accepted") + .order_by(SxPubFollowing.created_at.desc()) + ) + return [ + { + "actor-url": f.remote_actor_url, + "inbox": f.remote_inbox, + } + for f in result.scalars().all() + ] + + +def _sign_request(method: str, url: str, body: str, private_key_pem: str, + key_id: str) -> dict[str, str]: + """Generate HTTP Signature headers for an outgoing request.""" + from urllib.parse import urlparse + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import padding + import base64 + + parsed = urlparse(url) + path = parsed.path + host = parsed.netloc + date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT") + + # Build signature string + digest = "SHA-256=" + base64.b64encode( + hashlib.sha256(body.encode("utf-8")).digest() + ).decode("ascii") + + signed_string = ( + f"(request-target): {method.lower()} {path}\n" + f"host: {host}\n" + f"date: {date}\n" + f"digest: {digest}" + ) + + private_key = serialization.load_pem_private_key( + private_key_pem.encode("utf-8"), password=None, + ) + signature = base64.b64encode( + private_key.sign( + signed_string.encode("utf-8"), + padding.PKCS1v15(), + hashes.SHA256(), + ) + ).decode("ascii") + + sig_header = ( + f'keyId="{key_id}",' + f'algorithm="rsa-sha256",' + f'headers="(request-target) host date digest",' + f'signature="{signature}"' + ) + + return { + "Host": host, + "Date": date, + "Digest": digest, + "Signature": sig_header, + "Content-Type": "text/sx; charset=utf-8", + } + + +def _verify_signature(headers: dict, method: str, path: str, + body: str, public_key_pem: str) -> bool: + """Verify HTTP Signature on an incoming request.""" + import base64 + import re + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import padding + + sig_header = headers.get("Signature", "") + if not sig_header: + return False + + # Parse signature header + parts = {} + for match in re.finditer(r'(\w+)="([^"]*)"', sig_header): + parts[match.group(1)] = match.group(2) + + if "signature" not in parts or "headers" not in parts: + return False + + # Reconstruct signed string + signed_headers = parts["headers"].split() + lines = [] + for h in signed_headers: + if h == "(request-target)": + lines.append(f"(request-target): {method.lower()} {path}") + else: + val = headers.get(h.title(), headers.get(h, "")) + lines.append(f"{h}: {val}") + signed_string = "\n".join(lines) + + try: + public_key = serialization.load_pem_public_key( + public_key_pem.encode("utf-8"), + ) + public_key.verify( + base64.b64decode(parts["signature"]), + signed_string.encode("utf-8"), + padding.PKCS1v15(), + hashes.SHA256(), + ) + return True + except Exception as e: + logger.warning("Signature verification failed: %s", e) + return False + + +async def follow_remote(actor_url: str) -> dict[str, Any]: + """Send a Follow activity to a remote sx-pub server.""" + import httpx + from quart import g + from sqlalchemy import select + from shared.models.sx_pub import SxPubFollowing, SxPubActor + + # Check not already following + result = await g.s.execute( + select(SxPubFollowing).where(SxPubFollowing.remote_actor_url == actor_url) + ) + existing = result.scalar_one_or_none() + if existing: + return {"status": existing.state, "actor-url": actor_url} + + # Fetch remote actor document + try: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.get(actor_url, headers={"Accept": "text/sx"}) + resp.raise_for_status() + remote_actor_sx = resp.text + except Exception as e: + return {"error": f"Failed to fetch remote actor: {e}"} + + # Parse inbox URL from the SX actor document + # Simple extraction — look for :inbox "..." + import re + inbox_match = re.search(r':inbox\s+"([^"]+)"', remote_actor_sx) + if not inbox_match: + return {"error": "Could not find inbox in remote actor document"} + + # Build absolute inbox URL + from urllib.parse import urljoin + remote_inbox = urljoin(actor_url, inbox_match.group(1)) + + # Get our actor for signing + actor_result = await g.s.execute( + select(SxPubActor).where(SxPubActor.preferred_username == "sx") + ) + our_actor = actor_result.scalar_one_or_none() + if not our_actor: + return {"error": "Local actor not initialized"} + + our_id = f"https://{our_actor.domain}/pub/actor" + + # Build Follow activity + follow_sx = ( + f'(Follow\n' + f' :actor "{our_id}"\n' + f' :object "{actor_url}")' + ) + + # Sign and send + key_id = f"{our_id}#main-key" + headers = _sign_request("POST", remote_inbox, follow_sx, + our_actor.private_key_pem, key_id) + + try: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.post(remote_inbox, content=follow_sx, headers=headers) + logger.info("Sent Follow to %s → %d", remote_inbox, resp.status_code) + except Exception as e: + logger.error("Follow delivery failed to %s: %s", remote_inbox, e) + + # Store the following record + following = SxPubFollowing( + remote_actor_url=actor_url, + remote_inbox=remote_inbox, + state="pending", + ) + g.s.add(following) + await g.s.flush() + + return {"status": "pending", "actor-url": actor_url, "inbox": remote_inbox} + + +async def process_inbox(body_sx: str) -> dict[str, Any]: + """Process an incoming activity from a remote sx-pub server.""" + import re + from quart import g + from sqlalchemy import select + from shared.models.sx_pub import ( + SxPubFollower, SxPubFollowing, SxPubActivity, SxPubActor, + ) + from shared.utils.ipfs_client import pin_cid + + # Parse activity type and actor + type_match = re.match(r'\((\w+)', body_sx.strip()) + actor_match = re.search(r':actor\s+"([^"]+)"', body_sx) + object_match = re.search(r':object\s+"([^"]+)"', body_sx) + + if not type_match: + return {"error": "Could not parse activity type"} + + activity_type = type_match.group(1) + remote_actor = actor_match.group(1) if actor_match else "" + object_val = object_match.group(1) if object_match else "" + + logger.info("Inbox received: %s from %s", activity_type, remote_actor) + + if activity_type == "Follow": + # Someone wants to follow us — auto-accept + inbox_match = re.search(r':inbox\s+"([^"]+)"', body_sx) + remote_inbox = inbox_match.group(1) if inbox_match else "" + + # If no inbox in activity, try fetching the remote actor + if not remote_inbox and remote_actor: + import httpx + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(remote_actor, headers={"Accept": "text/sx"}) + im = re.search(r':inbox\s+"([^"]+)"', resp.text) + if im: + from urllib.parse import urljoin + remote_inbox = urljoin(remote_actor, im.group(1)) + except Exception: + pass + + # Store follower + result = await g.s.execute( + select(SxPubFollower).where(SxPubFollower.follower_actor_url == remote_actor) + ) + follower = result.scalar_one_or_none() + if follower is None: + follower = SxPubFollower( + follower_acct=remote_actor, + follower_inbox=remote_inbox or remote_actor, + follower_actor_url=remote_actor, + state="accepted", + ) + g.s.add(follower) + await g.s.flush() + logger.info("Accepted follow from %s", remote_actor) + + # Send Accept back + actor_result = await g.s.execute( + select(SxPubActor).where(SxPubActor.preferred_username == "sx") + ) + our_actor = actor_result.scalar_one_or_none() + if our_actor and remote_inbox: + our_id = f"https://{our_actor.domain}/pub/actor" + accept_sx = ( + f'(Accept\n' + f' :actor "{our_id}"\n' + f' :object {body_sx})' + ) + key_id = f"{our_id}#main-key" + headers = _sign_request("POST", remote_inbox, accept_sx, + our_actor.private_key_pem, key_id) + import httpx + try: + async with httpx.AsyncClient(timeout=10) as client: + await client.post(remote_inbox, content=accept_sx, headers=headers) + except Exception as e: + logger.warning("Accept delivery failed: %s", e) + + return {"accepted": remote_actor} + + elif activity_type == "Accept": + # Our follow was accepted — update state + if object_val: + result = await g.s.execute( + select(SxPubFollowing).where( + SxPubFollowing.remote_actor_url == remote_actor + ) + ) + following = result.scalar_one_or_none() + if following: + following.state = "accepted" + following.accepted_at = datetime.now(timezone.utc) + await g.s.flush() + logger.info("Follow accepted by %s", remote_actor) + return {"accepted-by": remote_actor} + + elif activity_type == "Publish": + # Pin the published CID locally + cid_match = re.search(r':cid\s+"([^"]+)"', body_sx) + if cid_match: + cid = cid_match.group(1) + pinned = await pin_cid(cid) + logger.info("Mirrored CID %s from %s (pinned=%s)", cid, remote_actor, pinned) + + # Record the activity + g.s.add(SxPubActivity( + activity_type="Publish", + object_type="SxDocument", + object_data={"remote_actor": remote_actor, "body": body_sx}, + ipfs_cid=cid_match.group(1) if cid_match else None, + )) + await g.s.flush() + return {"mirrored": cid_match.group(1) if cid_match else ""} + + return {"ignored": activity_type} + + +async def deliver_to_followers(activity_sx: str) -> dict[str, Any]: + """Deliver an activity to all follower inboxes.""" + import httpx + from quart import g + from sqlalchemy import select + from shared.models.sx_pub import SxPubFollower, SxPubActor + + actor_result = await g.s.execute( + select(SxPubActor).where(SxPubActor.preferred_username == "sx") + ) + our_actor = actor_result.scalar_one_or_none() + if not our_actor: + return {"error": "Actor not initialized", "delivered": 0} + + our_id = f"https://{our_actor.domain}/pub/actor" + key_id = f"{our_id}#main-key" + + result = await g.s.execute( + select(SxPubFollower).where(SxPubFollower.state == "accepted") + ) + followers = result.scalars().all() + + delivered = 0 + failed = 0 + + for follower in followers: + headers = _sign_request("POST", follower.follower_inbox, activity_sx, + our_actor.private_key_pem, key_id) + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + follower.follower_inbox, + content=activity_sx, + headers=headers, + ) + if resp.status_code < 300: + delivered += 1 + else: + failed += 1 + logger.warning("Delivery to %s returned %d", + follower.follower_inbox, resp.status_code) + except Exception as e: + failed += 1 + logger.error("Delivery to %s failed: %s", follower.follower_inbox, e) + + logger.info("Delivered to %d/%d followers (%d failed)", + delivered, len(followers), failed) + return {"delivered": delivered, "failed": failed, "total": len(followers)} + + +async def get_request_body() -> str: + """Get the raw request body as text.""" + from quart import request + data = await request.get_data(as_text=True) + return data + + +async def get_request_headers() -> dict[str, str]: + """Get request headers as a dict.""" + from quart import request + return dict(request.headers) + + +async def get_request_method() -> str: + """Get the HTTP method.""" + from quart import request + return request.method + + +async def get_request_path() -> str: + """Get the request path.""" + from quart import request + return request.path