sx-pub Phase 3: federation — outbox, inbox, follow, delivery, HTTP signatures

New endpoints:
- GET /pub/outbox — paginated activity feed
- POST /pub/inbox — receive Follow/Accept/Publish from remote servers
- POST /pub/follow — follow a remote sx-pub server
- GET /pub/followers — list accepted followers
- GET /pub/following — list who we follow

Federation mechanics:
- HTTP Signature generation (RSA-SHA256) for signed outgoing requests
- HTTP Signature verification for incoming requests
- Auto-accept Follow → store follower → send Accept back
- Accept handling → update following state
- Publish mirroring → pin remote CID to local IPFS
- deliver_to_followers → fan out signed activities to all follower inboxes
- Publish now records activity in outbox for federation delivery

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-25 01:38:36 +00:00
parent cf130c4174
commit aa1d4d7a67
3 changed files with 632 additions and 1 deletions

View File

@@ -215,3 +215,125 @@
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(set-response-header "Cache-Control" "public, max-age=31536000, immutable")
(get data "content")))))
;; ==========================================================================
;; Phase 3: Federation — outbox, inbox, follow, followers, following
;; ==========================================================================
;; --------------------------------------------------------------------------
;; Outbox
;; --------------------------------------------------------------------------
(defhandler pub-outbox
:path "/pub/outbox"
:method :get
:returns "element"
(&key)
(let ((page (helper "request-arg" "page" ""))
(data (helper "pub-outbox-data" page)))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(let ((items (map (fn (a)
(str "\n (" (get a "type")
" :object-type \"" (get a "object-type") "\""
" :published \"" (get a "published") "\""
" :cid \"" (get a "cid") "\")"))
(get data "items"))))
(str
"(SxOutbox"
"\n :total " (get data "total")
"\n :page " (get data "page")
(join "" items) ")")))))
;; --------------------------------------------------------------------------
;; Inbox
;; --------------------------------------------------------------------------
(defhandler pub-inbox
:path "/pub/inbox"
:method :post
:csrf false
:returns "element"
(&key)
(let ((body (helper "pub-request-body")))
(if (= body "")
(do
(set-response-status 400)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
"(Error :message \"Empty body\")")
(let ((result (helper "pub-process-inbox" body)))
(do
(set-response-status 202)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str "(Accepted :result " (str result) ")"))))))
;; --------------------------------------------------------------------------
;; Follow a remote server
;; --------------------------------------------------------------------------
(defhandler pub-follow
:path "/pub/follow"
:method :post
:csrf false
:returns "element"
(&key)
(let ((actor-url (helper "request-form" "actor_url" "")))
(if (= actor-url "")
(do
(set-response-status 400)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
"(Error :message \"Missing actor_url\")")
(let ((result (helper "pub-follow-remote" actor-url)))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(if (get result "error")
(do
(set-response-status 502)
(str "(Error :message \"" (get result "error") "\")"))
(str
"(FollowSent"
"\n :actor-url \"" (get result "actor-url") "\""
"\n :status \"" (get result "status") "\")")))))))
;; --------------------------------------------------------------------------
;; Followers
;; --------------------------------------------------------------------------
(defhandler pub-followers
:path "/pub/followers"
:method :get
:returns "element"
(&key)
(let ((data (helper "pub-followers-data")))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(let ((items (map (fn (f)
(str "\n (SxFollower"
" :acct \"" (get f "acct") "\""
" :actor-url \"" (get f "actor-url") "\")"))
data)))
(str "(SxFollowers" (join "" items) ")")))))
;; --------------------------------------------------------------------------
;; Following
;; --------------------------------------------------------------------------
(defhandler pub-following
:path "/pub/following"
:method :get
:returns "element"
(&key)
(let ((data (helper "pub-following-data")))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(let ((items (map (fn (f)
(str "\n (SxFollowing"
" :actor-url \"" (get f "actor-url") "\")"))
data)))
(str "(SxFollowing" (join "" items) ")")))))

View File

@@ -45,6 +45,13 @@ def _register_sx_helpers() -> None:
"pub-collection-items": _pub_collection_items,
"pub-resolve-document": _pub_resolve_document,
"pub-fetch-cid": _pub_fetch_cid,
"pub-outbox-data": _pub_outbox_data,
"pub-followers-data": _pub_followers_data,
"pub-following-data": _pub_following_data,
"pub-follow-remote": _pub_follow_remote,
"pub-process-inbox": _pub_process_inbox,
"pub-deliver-to-followers": _pub_deliver_to_followers,
"pub-request-body": _pub_request_body,
})
@@ -1764,3 +1771,38 @@ async def _pub_resolve_document(collection_slug, slug):
async def _pub_fetch_cid(cid):
from .pub_helpers import fetch_cid
return await fetch_cid(cid)
async def _pub_outbox_data(page=""):
from .pub_helpers import outbox_data
return await outbox_data(page)
async def _pub_followers_data():
from .pub_helpers import followers_data
return await followers_data()
async def _pub_following_data():
from .pub_helpers import following_data
return await following_data()
async def _pub_follow_remote(actor_url):
from .pub_helpers import follow_remote
return await follow_remote(actor_url)
async def _pub_process_inbox(body_sx):
from .pub_helpers import process_inbox
return await process_inbox(body_sx)
async def _pub_deliver_to_followers(activity_sx):
from .pub_helpers import deliver_to_followers
return await deliver_to_followers(activity_sx)
async def _pub_request_body():
from .pub_helpers import get_request_body
return await get_request_body()

View File

@@ -1,12 +1,14 @@
"""sx-pub Python IO helpers — actor management, IPFS status, collections.
"""sx-pub Python IO helpers — actor, IPFS, collections, publishing, federation.
These are called from SX defhandlers via (helper "pub-..." args...).
All DB access uses g.s (per-request async session from register_db).
"""
from __future__ import annotations
import hashlib
import logging
import os
from datetime import datetime, timezone
from typing import Any
logger = logging.getLogger("sx.pub")
@@ -210,6 +212,23 @@ async def publish_document(collection_slug: str, slug: str, content: str,
await g.s.flush()
logger.info("Published %s/%s%s (%d bytes)", collection_slug, slug, cid, len(content_bytes))
# Record Publish activity in outbox
from shared.models.sx_pub import SxPubActivity
g.s.add(SxPubActivity(
activity_type="Publish",
object_type="SxDocument",
object_data={
"path": f"/pub/{collection_slug}/{slug}",
"cid": cid,
"collection": collection_slug,
"slug": slug,
"title": title or slug,
"summary": summary,
},
ipfs_cid=cid,
))
await g.s.flush()
return {
"path": f"/pub/{collection_slug}/{slug}",
"cid": cid,
@@ -310,3 +329,451 @@ async def fetch_cid(cid: str) -> dict[str, Any]:
"content": content_bytes.decode("utf-8", errors="replace"),
"size": len(content_bytes),
}
# ---------------------------------------------------------------------------
# Phase 3: Federation — outbox, inbox, follow, delivery, signatures
# ---------------------------------------------------------------------------
async def outbox_data(page: str = "") -> dict[str, Any]:
"""List published activities (outbox)."""
from quart import g
from sqlalchemy import select, func as sa_func
from shared.models.sx_pub import SxPubActivity
page_num = int(page) if page else 1
per_page = 20
offset = (page_num - 1) * per_page
total_result = await g.s.execute(select(sa_func.count(SxPubActivity.id)))
total = total_result.scalar() or 0
result = await g.s.execute(
select(SxPubActivity)
.order_by(SxPubActivity.published.desc())
.offset(offset).limit(per_page)
)
activities = result.scalars().all()
return {
"total": total,
"page": page_num,
"items": [
{
"type": a.activity_type,
"object-type": a.object_type or "",
"published": a.published.isoformat() if a.published else "",
"cid": a.ipfs_cid or "",
"data": a.object_data or {},
}
for a in activities
],
}
async def followers_data() -> list[dict[str, Any]]:
"""List followers."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollower
result = await g.s.execute(
select(SxPubFollower).where(SxPubFollower.state == "accepted")
.order_by(SxPubFollower.created_at.desc())
)
return [
{
"acct": f.follower_acct,
"actor-url": f.follower_actor_url,
"inbox": f.follower_inbox,
}
for f in result.scalars().all()
]
async def following_data() -> list[dict[str, Any]]:
"""List who we follow."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollowing
result = await g.s.execute(
select(SxPubFollowing).where(SxPubFollowing.state == "accepted")
.order_by(SxPubFollowing.created_at.desc())
)
return [
{
"actor-url": f.remote_actor_url,
"inbox": f.remote_inbox,
}
for f in result.scalars().all()
]
def _sign_request(method: str, url: str, body: str, private_key_pem: str,
key_id: str) -> dict[str, str]:
"""Generate HTTP Signature headers for an outgoing request."""
from urllib.parse import urlparse
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
import base64
parsed = urlparse(url)
path = parsed.path
host = parsed.netloc
date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT")
# Build signature string
digest = "SHA-256=" + base64.b64encode(
hashlib.sha256(body.encode("utf-8")).digest()
).decode("ascii")
signed_string = (
f"(request-target): {method.lower()} {path}\n"
f"host: {host}\n"
f"date: {date}\n"
f"digest: {digest}"
)
private_key = serialization.load_pem_private_key(
private_key_pem.encode("utf-8"), password=None,
)
signature = base64.b64encode(
private_key.sign(
signed_string.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
).decode("ascii")
sig_header = (
f'keyId="{key_id}",'
f'algorithm="rsa-sha256",'
f'headers="(request-target) host date digest",'
f'signature="{signature}"'
)
return {
"Host": host,
"Date": date,
"Digest": digest,
"Signature": sig_header,
"Content-Type": "text/sx; charset=utf-8",
}
def _verify_signature(headers: dict, method: str, path: str,
body: str, public_key_pem: str) -> bool:
"""Verify HTTP Signature on an incoming request."""
import base64
import re
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
sig_header = headers.get("Signature", "")
if not sig_header:
return False
# Parse signature header
parts = {}
for match in re.finditer(r'(\w+)="([^"]*)"', sig_header):
parts[match.group(1)] = match.group(2)
if "signature" not in parts or "headers" not in parts:
return False
# Reconstruct signed string
signed_headers = parts["headers"].split()
lines = []
for h in signed_headers:
if h == "(request-target)":
lines.append(f"(request-target): {method.lower()} {path}")
else:
val = headers.get(h.title(), headers.get(h, ""))
lines.append(f"{h}: {val}")
signed_string = "\n".join(lines)
try:
public_key = serialization.load_pem_public_key(
public_key_pem.encode("utf-8"),
)
public_key.verify(
base64.b64decode(parts["signature"]),
signed_string.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
return True
except Exception as e:
logger.warning("Signature verification failed: %s", e)
return False
async def follow_remote(actor_url: str) -> dict[str, Any]:
"""Send a Follow activity to a remote sx-pub server."""
import httpx
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollowing, SxPubActor
# Check not already following
result = await g.s.execute(
select(SxPubFollowing).where(SxPubFollowing.remote_actor_url == actor_url)
)
existing = result.scalar_one_or_none()
if existing:
return {"status": existing.state, "actor-url": actor_url}
# Fetch remote actor document
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(actor_url, headers={"Accept": "text/sx"})
resp.raise_for_status()
remote_actor_sx = resp.text
except Exception as e:
return {"error": f"Failed to fetch remote actor: {e}"}
# Parse inbox URL from the SX actor document
# Simple extraction — look for :inbox "..."
import re
inbox_match = re.search(r':inbox\s+"([^"]+)"', remote_actor_sx)
if not inbox_match:
return {"error": "Could not find inbox in remote actor document"}
# Build absolute inbox URL
from urllib.parse import urljoin
remote_inbox = urljoin(actor_url, inbox_match.group(1))
# Get our actor for signing
actor_result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
our_actor = actor_result.scalar_one_or_none()
if not our_actor:
return {"error": "Local actor not initialized"}
our_id = f"https://{our_actor.domain}/pub/actor"
# Build Follow activity
follow_sx = (
f'(Follow\n'
f' :actor "{our_id}"\n'
f' :object "{actor_url}")'
)
# Sign and send
key_id = f"{our_id}#main-key"
headers = _sign_request("POST", remote_inbox, follow_sx,
our_actor.private_key_pem, key_id)
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(remote_inbox, content=follow_sx, headers=headers)
logger.info("Sent Follow to %s%d", remote_inbox, resp.status_code)
except Exception as e:
logger.error("Follow delivery failed to %s: %s", remote_inbox, e)
# Store the following record
following = SxPubFollowing(
remote_actor_url=actor_url,
remote_inbox=remote_inbox,
state="pending",
)
g.s.add(following)
await g.s.flush()
return {"status": "pending", "actor-url": actor_url, "inbox": remote_inbox}
async def process_inbox(body_sx: str) -> dict[str, Any]:
"""Process an incoming activity from a remote sx-pub server."""
import re
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import (
SxPubFollower, SxPubFollowing, SxPubActivity, SxPubActor,
)
from shared.utils.ipfs_client import pin_cid
# Parse activity type and actor
type_match = re.match(r'\((\w+)', body_sx.strip())
actor_match = re.search(r':actor\s+"([^"]+)"', body_sx)
object_match = re.search(r':object\s+"([^"]+)"', body_sx)
if not type_match:
return {"error": "Could not parse activity type"}
activity_type = type_match.group(1)
remote_actor = actor_match.group(1) if actor_match else ""
object_val = object_match.group(1) if object_match else ""
logger.info("Inbox received: %s from %s", activity_type, remote_actor)
if activity_type == "Follow":
# Someone wants to follow us — auto-accept
inbox_match = re.search(r':inbox\s+"([^"]+)"', body_sx)
remote_inbox = inbox_match.group(1) if inbox_match else ""
# If no inbox in activity, try fetching the remote actor
if not remote_inbox and remote_actor:
import httpx
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(remote_actor, headers={"Accept": "text/sx"})
im = re.search(r':inbox\s+"([^"]+)"', resp.text)
if im:
from urllib.parse import urljoin
remote_inbox = urljoin(remote_actor, im.group(1))
except Exception:
pass
# Store follower
result = await g.s.execute(
select(SxPubFollower).where(SxPubFollower.follower_actor_url == remote_actor)
)
follower = result.scalar_one_or_none()
if follower is None:
follower = SxPubFollower(
follower_acct=remote_actor,
follower_inbox=remote_inbox or remote_actor,
follower_actor_url=remote_actor,
state="accepted",
)
g.s.add(follower)
await g.s.flush()
logger.info("Accepted follow from %s", remote_actor)
# Send Accept back
actor_result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
our_actor = actor_result.scalar_one_or_none()
if our_actor and remote_inbox:
our_id = f"https://{our_actor.domain}/pub/actor"
accept_sx = (
f'(Accept\n'
f' :actor "{our_id}"\n'
f' :object {body_sx})'
)
key_id = f"{our_id}#main-key"
headers = _sign_request("POST", remote_inbox, accept_sx,
our_actor.private_key_pem, key_id)
import httpx
try:
async with httpx.AsyncClient(timeout=10) as client:
await client.post(remote_inbox, content=accept_sx, headers=headers)
except Exception as e:
logger.warning("Accept delivery failed: %s", e)
return {"accepted": remote_actor}
elif activity_type == "Accept":
# Our follow was accepted — update state
if object_val:
result = await g.s.execute(
select(SxPubFollowing).where(
SxPubFollowing.remote_actor_url == remote_actor
)
)
following = result.scalar_one_or_none()
if following:
following.state = "accepted"
following.accepted_at = datetime.now(timezone.utc)
await g.s.flush()
logger.info("Follow accepted by %s", remote_actor)
return {"accepted-by": remote_actor}
elif activity_type == "Publish":
# Pin the published CID locally
cid_match = re.search(r':cid\s+"([^"]+)"', body_sx)
if cid_match:
cid = cid_match.group(1)
pinned = await pin_cid(cid)
logger.info("Mirrored CID %s from %s (pinned=%s)", cid, remote_actor, pinned)
# Record the activity
g.s.add(SxPubActivity(
activity_type="Publish",
object_type="SxDocument",
object_data={"remote_actor": remote_actor, "body": body_sx},
ipfs_cid=cid_match.group(1) if cid_match else None,
))
await g.s.flush()
return {"mirrored": cid_match.group(1) if cid_match else ""}
return {"ignored": activity_type}
async def deliver_to_followers(activity_sx: str) -> dict[str, Any]:
"""Deliver an activity to all follower inboxes."""
import httpx
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollower, SxPubActor
actor_result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
our_actor = actor_result.scalar_one_or_none()
if not our_actor:
return {"error": "Actor not initialized", "delivered": 0}
our_id = f"https://{our_actor.domain}/pub/actor"
key_id = f"{our_id}#main-key"
result = await g.s.execute(
select(SxPubFollower).where(SxPubFollower.state == "accepted")
)
followers = result.scalars().all()
delivered = 0
failed = 0
for follower in followers:
headers = _sign_request("POST", follower.follower_inbox, activity_sx,
our_actor.private_key_pem, key_id)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
follower.follower_inbox,
content=activity_sx,
headers=headers,
)
if resp.status_code < 300:
delivered += 1
else:
failed += 1
logger.warning("Delivery to %s returned %d",
follower.follower_inbox, resp.status_code)
except Exception as e:
failed += 1
logger.error("Delivery to %s failed: %s", follower.follower_inbox, e)
logger.info("Delivered to %d/%d followers (%d failed)",
delivered, len(followers), failed)
return {"delivered": delivered, "failed": failed, "total": len(followers)}
async def get_request_body() -> str:
"""Get the raw request body as text."""
from quart import request
data = await request.get_data(as_text=True)
return data
async def get_request_headers() -> dict[str, str]:
"""Get request headers as a dict."""
from quart import request
return dict(request.headers)
async def get_request_method() -> str:
"""Get the HTTP method."""
from quart import request
return request.method
async def get_request_path() -> str:
"""Get the request path."""
from quart import request
return request.path