Merge sx-pub branch into ocaml-vm

Brings in sx-pub federated publishing protocol (Phases 1-4):
- DB models, IPFS publishing, federation, anchoring
- Live dashboard UI on the plan page
- Dev infrastructure (docker-compose, dev-pub.sh)

Conflicts: sx-browser.js (kept ocaml-vm rebuild), sx-pub.sx (kept sx-pub version with dashboard)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-25 16:37:04 +00:00
10 changed files with 1922 additions and 6 deletions

30
dev-pub.sh Executable file
View File

@@ -0,0 +1,30 @@
#!/usr/bin/env bash
set -euo pipefail
# Dev mode for sx-pub (SX-based ActivityPub)
# Bind-mounted source + auto-reload on externalnet
# Browse to pub.sx-web.org
#
# Usage:
# ./dev-pub.sh # Start sx-pub dev
# ./dev-pub.sh down # Stop
# ./dev-pub.sh logs # Tail logs
# ./dev-pub.sh --build # Rebuild image then start
COMPOSE="docker compose -p sx-pub -f docker-compose.dev-pub.yml"
case "${1:-up}" in
down)
$COMPOSE down
;;
logs)
$COMPOSE logs -f sx_pub
;;
*)
BUILD_FLAG=""
if [[ "${1:-}" == "--build" ]]; then
BUILD_FLAG="--build"
fi
$COMPOSE up $BUILD_FLAG
;;
esac

114
docker-compose.dev-pub.yml Normal file
View File

@@ -0,0 +1,114 @@
# Dev mode for sx-pub (SX-based ActivityPub)
# Starts as sx_docs clone — AP protocol built in SX from scratch
# Accessible at pub.sx-web.org via Caddy on externalnet
# Own DB + pgbouncer + IPFS node
services:
sx_pub:
image: registry.rose-ash.com:5000/sx_docs:latest
environment:
SX_STANDALONE: "true"
SECRET_KEY: "${SECRET_KEY:-pub-dev-secret}"
REDIS_URL: redis://redis:6379/0
DATABASE_URL: postgresql+asyncpg://postgres:change-me@pgbouncer:5432/sx_pub
ALEMBIC_DATABASE_URL: postgresql+psycopg://postgres:change-me@db:5432/sx_pub
SX_PUB_DOMAIN: pub.sx-web.org
WORKERS: "1"
ENVIRONMENT: development
RELOAD: "true"
SX_USE_REF: "1"
SX_USE_OCAML: "1"
SX_OCAML_BIN: "/app/bin/sx_server"
SX_BOUNDARY_STRICT: "1"
SX_DEV: "1"
OCAMLRUNPARAM: "b"
IPFS_API: http://ipfs:5001
ports:
- "8014:8000"
volumes:
- /root/sx-pub/_config/dev-sh-config.yaml:/app/config/app-config.yaml:ro
- ./shared:/app/shared
- ./sx/app.py:/app/app.py
- ./sx/sxc:/app/sxc
- ./sx/bp:/app/bp
- ./sx/services:/app/services
- ./sx/content:/app/content
- ./sx/sx:/app/sx
- ./sx/path_setup.py:/app/path_setup.py
- ./sx/entrypoint.sh:/usr/local/bin/entrypoint.sh
- ./sx/__init__.py:/app/__init__.py:ro
# Spec + web SX files
- ./spec:/app/spec:ro
- ./web:/app/web:ro
# OCaml SX kernel binary
- ./hosts/ocaml/_build/default/bin/sx_server.exe:/app/bin/sx_server:ro
# sibling models for cross-domain SQLAlchemy imports
- ./blog/__init__.py:/app/blog/__init__.py:ro
- ./blog/models:/app/blog/models:ro
- ./market/__init__.py:/app/market/__init__.py:ro
- ./market/models:/app/market/models:ro
- ./cart/__init__.py:/app/cart/__init__.py:ro
- ./cart/models:/app/cart/models:ro
- ./events/__init__.py:/app/events/__init__.py:ro
- ./events/models:/app/events/models:ro
- ./federation/__init__.py:/app/federation/__init__.py:ro
- ./federation/models:/app/federation/models:ro
- ./account/__init__.py:/app/account/__init__.py:ro
- ./account/models:/app/account/models:ro
- ./relations/__init__.py:/app/relations/__init__.py:ro
- ./relations/models:/app/relations/models:ro
- ./likes/__init__.py:/app/likes/__init__.py:ro
- ./likes/models:/app/likes/models:ro
- ./orders/__init__.py:/app/orders/__init__.py:ro
- ./orders/models:/app/orders/models:ro
depends_on:
- pgbouncer
- redis
- ipfs
networks:
- externalnet
- default
restart: unless-stopped
db:
image: postgres:16
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: change-me
POSTGRES_DB: sx_pub
volumes:
- db_data:/var/lib/postgresql/data
restart: unless-stopped
pgbouncer:
image: edoburu/pgbouncer:latest
environment:
DB_HOST: db
DB_PORT: "5432"
DB_USER: postgres
DB_PASSWORD: change-me
POOL_MODE: transaction
DEFAULT_POOL_SIZE: "10"
MAX_CLIENT_CONN: "100"
AUTH_TYPE: plain
depends_on:
- db
restart: unless-stopped
ipfs:
image: ipfs/kubo:latest
volumes:
- ipfs_data:/data/ipfs
restart: unless-stopped
redis:
image: redis:7-alpine
restart: unless-stopped
volumes:
db_data:
ipfs_data:
networks:
externalnet:
external: true

164
shared/models/sx_pub.py Normal file
View File

@@ -0,0 +1,164 @@
"""sx-pub ORM models — federated SX publishing protocol.
Tables for the sx-pub actor, content collections, published documents,
outbox activities, and federation relationships (followers/following).
"""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import (
String, Integer, DateTime, Text,
ForeignKey, UniqueConstraint, Index, func,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from shared.db.base import Base
class SxPubActor(Base):
"""Singleton actor for this sx-pub instance."""
__tablename__ = "sx_pub_actor"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
preferred_username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
display_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
summary: Mapped[str | None] = mapped_column(Text, nullable=True)
public_key_pem: Mapped[str] = mapped_column(Text, nullable=False)
private_key_pem: Mapped[str] = mapped_column(Text, nullable=False)
domain: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
def __repr__(self) -> str:
return f"<SxPubActor @{self.preferred_username}>"
class SxPubCollection(Base):
"""Named grouping of published documents (e.g. core-specs, platforms)."""
__tablename__ = "sx_pub_collections"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
slug: Mapped[str] = mapped_column(String(128), unique=True, nullable=False)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
sort_order: Mapped[int] = mapped_column(Integer, nullable=False, default=0, server_default="0")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
documents = relationship("SxPubDocument", back_populates="collection", lazy="selectin")
def __repr__(self) -> str:
return f"<SxPubCollection {self.slug}>"
class SxPubDocument(Base):
"""Published content — path→CID index entry."""
__tablename__ = "sx_pub_documents"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
collection_id: Mapped[int] = mapped_column(
Integer, ForeignKey("sx_pub_collections.id", ondelete="CASCADE"), nullable=False,
)
slug: Mapped[str] = mapped_column(String(255), nullable=False)
title: Mapped[str | None] = mapped_column(String(512), nullable=True)
summary: Mapped[str | None] = mapped_column(Text, nullable=True)
content_hash: Mapped[str] = mapped_column(String(128), nullable=False)
ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True)
size_bytes: Mapped[int | None] = mapped_column(Integer, nullable=True)
requires: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
status: Mapped[str] = mapped_column(
String(20), nullable=False, default="draft", server_default="draft",
)
published_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
collection = relationship("SxPubCollection", back_populates="documents")
__table_args__ = (
UniqueConstraint("collection_id", "slug", name="uq_pub_doc_collection_slug"),
Index("ix_pub_doc_collection", "collection_id"),
Index("ix_pub_doc_status", "status"),
Index("ix_pub_doc_cid", "ipfs_cid"),
)
def __repr__(self) -> str:
return f"<SxPubDocument {self.slug} cid={self.ipfs_cid}>"
class SxPubActivity(Base):
"""Outbox activity (Publish, Follow, Announce, Anchor)."""
__tablename__ = "sx_pub_activities"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
activity_type: Mapped[str] = mapped_column(String(64), nullable=False)
object_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True)
published: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
__table_args__ = (
Index("ix_pub_activity_type", "activity_type"),
Index("ix_pub_activity_published", "published"),
)
def __repr__(self) -> str:
return f"<SxPubActivity {self.activity_type}>"
class SxPubFollower(Base):
"""Remote server that follows us."""
__tablename__ = "sx_pub_followers"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
follower_acct: Mapped[str] = mapped_column(String(512), nullable=False)
follower_inbox: Mapped[str] = mapped_column(String(512), nullable=False)
follower_actor_url: Mapped[str] = mapped_column(String(512), nullable=False)
follower_public_key: Mapped[str | None] = mapped_column(Text, nullable=True)
state: Mapped[str] = mapped_column(
String(20), nullable=False, default="accepted", server_default="accepted",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
__table_args__ = (
UniqueConstraint("follower_acct", name="uq_pub_follower_acct"),
Index("ix_pub_follower_state", "state"),
)
def __repr__(self) -> str:
return f"<SxPubFollower {self.follower_acct}>"
class SxPubFollowing(Base):
"""Remote server we follow."""
__tablename__ = "sx_pub_following"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
remote_actor_url: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
remote_inbox: Mapped[str] = mapped_column(String(512), nullable=False)
state: Mapped[str] = mapped_column(
String(20), nullable=False, default="pending", server_default="pending",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
accepted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
__table_args__ = (
Index("ix_pub_following_state", "state"),
)
def __repr__(self) -> str:
return f"<SxPubFollowing {self.remote_actor_url}>"

35
sx/alembic.ini Normal file
View File

@@ -0,0 +1,35 @@
[alembic]
script_location = alembic
sqlalchemy.url =
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s

17
sx/alembic/env.py Normal file
View File

@@ -0,0 +1,17 @@
from alembic import context
from shared.db.alembic_env import run_alembic
MODELS = [
"shared.models.sx_pub",
]
TABLES = frozenset({
"sx_pub_actor",
"sx_pub_collections",
"sx_pub_documents",
"sx_pub_activities",
"sx_pub_followers",
"sx_pub_following",
})
run_alembic(context.config, MODELS, TABLES)

View File

@@ -62,8 +62,10 @@ def create_app() -> "Quart":
extra_kw = {}
if SX_STANDALONE:
extra_kw["no_db"] = True
extra_kw["no_oauth"] = True
# Enable DB if DATABASE_URL is set (needed for sx-pub)
if not os.getenv("DATABASE_URL"):
extra_kw["no_db"] = True
app = create_base_app(
"sx",

395
sx/sx/handlers/pub-api.sx Normal file
View File

@@ -0,0 +1,395 @@
;; ==========================================================================
;; sx-pub API endpoints — actor, webfinger, collections, publishing, browsing
;;
;; All responses are text/sx. No JSON.
;; ==========================================================================
;; --------------------------------------------------------------------------
;; Actor
;; --------------------------------------------------------------------------
(defhandler pub-actor
:path "/pub/actor"
:method :get
:returns "element"
(&key)
(let ((actor (helper "pub-actor-data")))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str
"(SxActor"
"\n :id \"https://" (get actor "domain") "/pub/actor\""
"\n :type \"SxPublisher\""
"\n :name \"" (get actor "display-name") "\""
"\n :summary \"" (get actor "summary") "\""
"\n :inbox \"/pub/inbox\""
"\n :outbox \"/pub/outbox\""
"\n :followers \"/pub/followers\""
"\n :following \"/pub/following\""
"\n :public-key-pem \"" (get actor "public-key-pem") "\")"))))
;; --------------------------------------------------------------------------
;; Webfinger
;; --------------------------------------------------------------------------
(defhandler pub-webfinger
:path "/pub/webfinger"
:method :get
:returns "element"
(&key)
(let ((resource (helper "request-arg" "resource" ""))
(actor (helper "pub-actor-data")))
(let ((expected (str "acct:" (get actor "preferred-username") "@" (get actor "domain"))))
(if (!= resource expected)
(do
(set-response-status 404)
(str "(Error :message \"Resource not found\")"))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str
"(SxWebfinger"
"\n :subject \"" expected "\""
"\n :actor \"https://" (get actor "domain") "/pub/actor\""
"\n :type \"SxPublisher\")"))))))
;; --------------------------------------------------------------------------
;; Collections
;; --------------------------------------------------------------------------
(defhandler pub-collections
:path "/pub/collections"
:method :get
:returns "element"
(&key)
(let ((collections (helper "pub-collections-data")))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(let ((items (map (fn (c)
(str "\n (SxCollection"
" :slug \"" (get c "slug") "\""
" :name \"" (get c "name") "\""
" :description \"" (get c "description") "\""
" :href \"/pub/" (get c "slug") "\")"))
collections)))
(str "(SxCollections" (join "" items) ")")))))
;; --------------------------------------------------------------------------
;; Status
;; --------------------------------------------------------------------------
(defhandler pub-status
:path "/pub/status"
:method :get
:returns "element"
(&key)
(let ((status (helper "pub-status-data")))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str
"(SxPubStatus"
"\n :healthy " (get status "healthy")
"\n :db \"" (get status "db") "\""
"\n :ipfs \"" (get status "ipfs") "\""
"\n :actor \"" (get status "actor") "\""
"\n :domain \"" (or (get status "domain") "unknown") "\")"))))
;; ==========================================================================
;; Phase 2: Publishing + Browsing
;; ==========================================================================
;; --------------------------------------------------------------------------
;; Publish
;; --------------------------------------------------------------------------
(defhandler pub-publish
:path "/pub/publish"
:method :post
:csrf false
:returns "element"
(&key)
(let ((collection (helper "request-form" "collection" ""))
(slug (helper "request-form" "slug" ""))
(content (helper "request-form" "content" ""))
(title (helper "request-form" "title" ""))
(summary (helper "request-form" "summary" "")))
(if (or (= collection "") (= slug "") (= content ""))
(do
(set-response-status 400)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
"(Error :message \"Missing collection, slug, or content\")")
(let ((result (helper "pub-publish" collection slug content title summary)))
(if (get result "error")
(do
(set-response-status 500)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str "(Error :message \"" (get result "error") "\")"))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str
"(Published"
"\n :path \"" (get result "path") "\""
"\n :cid \"" (get result "cid") "\""
"\n :hash \"" (get result "hash") "\""
"\n :size " (get result "size")
"\n :collection \"" (get result "collection") "\""
"\n :slug \"" (get result "slug") "\""
"\n :title \"" (get result "title") "\")")))))))
;; --------------------------------------------------------------------------
;; Browse collection
;; --------------------------------------------------------------------------
(defhandler pub-browse-collection
:path "/pub/browse/<collection_slug>"
:method :get
:returns "element"
(&key collection_slug)
(let ((data (helper "pub-collection-items" collection_slug)))
(if (get data "error")
(do
(set-response-status 404)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str "(Error :message \"" (get data "error") "\")"))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(let ((items (map (fn (d)
(str "\n (SxDocument"
" :slug \"" (get d "slug") "\""
" :title \"" (get d "title") "\""
" :summary \"" (get d "summary") "\""
" :cid \"" (get d "cid") "\""
" :size " (get d "size") ")"))
(get data "items"))))
(str
"(SxCollection"
"\n :slug \"" (get data "collection") "\""
"\n :name \"" (get data "name") "\""
"\n :description \"" (get data "description") "\""
(join "" items) ")"))))))
;; --------------------------------------------------------------------------
;; Resolve document by path
;; --------------------------------------------------------------------------
(defhandler pub-document
:path "/pub/doc/<collection_slug>/<slug>"
:method :get
:returns "element"
(&key collection_slug slug)
(let ((data (helper "pub-resolve-document" collection_slug slug)))
(if (get data "error")
(do
(set-response-status 404)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str "(Error :message \"" (get data "error") "\")"))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(set-response-header "X-IPFS-CID" (get data "cid"))
(get data "content")))))
;; --------------------------------------------------------------------------
;; Direct CID fetch
;; --------------------------------------------------------------------------
(defhandler pub-cid
:path "/pub/cid/<cid>"
:method :get
:returns "element"
(&key cid)
(let ((data (helper "pub-fetch-cid" cid)))
(if (get data "error")
(do
(set-response-status 404)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str "(Error :message \"" (get data "error") "\")"))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(set-response-header "Cache-Control" "public, max-age=31536000, immutable")
(get data "content")))))
;; ==========================================================================
;; Phase 3: Federation — outbox, inbox, follow, followers, following
;; ==========================================================================
;; --------------------------------------------------------------------------
;; Outbox
;; --------------------------------------------------------------------------
(defhandler pub-outbox
:path "/pub/outbox"
:method :get
:returns "element"
(&key)
(let ((page (helper "request-arg" "page" ""))
(data (helper "pub-outbox-data" page)))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(let ((items (map (fn (a)
(str "\n (" (get a "type")
" :object-type \"" (get a "object-type") "\""
" :published \"" (get a "published") "\""
" :cid \"" (get a "cid") "\")"))
(get data "items"))))
(str
"(SxOutbox"
"\n :total " (get data "total")
"\n :page " (get data "page")
(join "" items) ")")))))
;; --------------------------------------------------------------------------
;; Inbox
;; --------------------------------------------------------------------------
(defhandler pub-inbox
:path "/pub/inbox"
:method :post
:csrf false
:returns "element"
(&key)
(let ((body (helper "pub-request-body")))
(if (= body "")
(do
(set-response-status 400)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
"(Error :message \"Empty body\")")
(let ((result (helper "pub-process-inbox" body)))
(do
(set-response-status 202)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(str "(Accepted :result " (str result) ")"))))))
;; --------------------------------------------------------------------------
;; Follow a remote server
;; --------------------------------------------------------------------------
(defhandler pub-follow
:path "/pub/follow"
:method :post
:csrf false
:returns "element"
(&key)
(let ((actor-url (helper "request-form" "actor_url" "")))
(if (= actor-url "")
(do
(set-response-status 400)
(set-response-header "Content-Type" "text/sx; charset=utf-8")
"(Error :message \"Missing actor_url\")")
(let ((result (helper "pub-follow-remote" actor-url)))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(if (get result "error")
(do
(set-response-status 502)
(str "(Error :message \"" (get result "error") "\")"))
(str
"(FollowSent"
"\n :actor-url \"" (get result "actor-url") "\""
"\n :status \"" (get result "status") "\")")))))))
;; --------------------------------------------------------------------------
;; Followers
;; --------------------------------------------------------------------------
(defhandler pub-followers
:path "/pub/followers"
:method :get
:returns "element"
(&key)
(let ((data (helper "pub-followers-data")))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(let ((items (map (fn (f)
(str "\n (SxFollower"
" :acct \"" (get f "acct") "\""
" :actor-url \"" (get f "actor-url") "\")"))
data)))
(str "(SxFollowers" (join "" items) ")")))))
;; --------------------------------------------------------------------------
;; Following
;; --------------------------------------------------------------------------
(defhandler pub-following
:path "/pub/following"
:method :get
:returns "element"
(&key)
(let ((data (helper "pub-following-data")))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(let ((items (map (fn (f)
(str "\n (SxFollowing"
" :actor-url \"" (get f "actor-url") "\")"))
data)))
(str "(SxFollowing" (join "" items) ")")))))
;; ==========================================================================
;; Phase 4: Anchoring — Merkle trees, OTS, verification
;; ==========================================================================
;; --------------------------------------------------------------------------
;; Anchor pending activities
;; --------------------------------------------------------------------------
(defhandler pub-anchor
:path "/pub/anchor"
:method :post
:csrf false
:returns "element"
(&key)
(let ((result (helper "pub-anchor-pending")))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(if (= (get result "status") "nothing-to-anchor")
"(Anchor :status \"nothing-to-anchor\" :count 0)"
(str
"(Anchor"
"\n :status \"" (get result "status") "\""
"\n :count " (get result "count")
"\n :merkle-root \"" (get result "merkle-root") "\""
"\n :tree-cid \"" (get result "tree-cid") "\""
"\n :ots-proof-cid \"" (get result "ots-proof-cid") "\")")))))
;; --------------------------------------------------------------------------
;; Verify a CID's anchor
;; --------------------------------------------------------------------------
(defhandler pub-verify
:path "/pub/verify/<cid>"
:method :get
:returns "element"
(&key cid)
(let ((data (helper "pub-verify-anchor" cid)))
(do
(set-response-header "Content-Type" "text/sx; charset=utf-8")
(if (get data "error")
(do
(set-response-status 404)
(str "(Error :message \"" (get data "error") "\")"))
(str
"(AnchorVerification"
"\n :cid \"" (get data "cid") "\""
"\n :status \"" (get data "status") "\""
"\n :verified " (get data "verified")
"\n :merkle-root \"" (get data "merkle-root") "\""
"\n :tree-cid \"" (get data "tree-cid") "\""
"\n :ots-proof-cid \"" (get data "ots-proof-cid") "\""
"\n :published \"" (get data "published") "\")")))))

View File

@@ -244,14 +244,131 @@
(~docs/section :title "Implementation Phases" :id "phases"
(div :class "space-y-4"
(div :class "rounded border border-emerald-200 bg-emerald-50 p-4"
(h4 :class "font-semibold text-emerald-800" "Phase 1: Foundation")
(h4 :class "font-semibold text-emerald-800" "Phase 1: Foundation")
(p :class "text-emerald-700 text-sm" "DB schema, async IPFS client, actor endpoint, webfinger, " (code "/pub/actor") " returns SX actor document."))
(div :class "rounded border border-sky-200 bg-sky-50 p-4"
(h4 :class "font-semibold text-sky-800" "Phase 2: Publishing")
(h4 :class "font-semibold text-sky-800" "Phase 2: Publishing")
(p :class "text-sky-700 text-sm" "Pin to IPFS, path→CID index, collection browsing. Publish the actual SX spec files as the first content."))
(div :class "rounded border border-violet-200 bg-violet-50 p-4"
(h4 :class "font-semibold text-violet-800" "Phase 3: Federation")
(h4 :class "font-semibold text-violet-800" "Phase 3: Federation")
(p :class "text-violet-700 text-sm" "Inbox/outbox, follow/accept, HTTP signature verification, activity delivery, content mirroring."))
(div :class "rounded border border-amber-200 bg-amber-50 p-4"
(h4 :class "font-semibold text-amber-800" "Phase 4: Anchoring")
(p :class "text-amber-700 text-sm" "Merkle trees, OpenTimestamps, Bitcoin proof, provenance UI."))))))
(h4 :class "font-semibold text-amber-800" "Phase 4: Anchoring")
(p :class "text-amber-700 text-sm" "Merkle trees, OpenTimestamps, Bitcoin proof, provenance verification."))))
;; -----------------------------------------------------------------------
;; Live Dashboard
;; -----------------------------------------------------------------------
(~docs/section :title "Live Dashboard" :id "dashboard"
(p "Live data from the sx-pub API — server-rendered from the same endpoints.")
;; --- Status ---
(~docs/subsection :title "Server Status"
(let ((status (helper "pub-status-data")))
(div :class "grid grid-cols-2 sm:grid-cols-4 gap-3"
(div :class "rounded border border-stone-200 p-3 text-center"
(p :class "text-xs text-stone-400 uppercase" "DB")
(p :class "font-semibold text-sm" (get status "db")))
(div :class "rounded border border-stone-200 p-3 text-center"
(p :class "text-xs text-stone-400 uppercase" "IPFS")
(p :class "font-semibold text-sm" (get status "ipfs")))
(div :class "rounded border border-stone-200 p-3 text-center"
(p :class "text-xs text-stone-400 uppercase" "Actor")
(p :class "font-semibold text-sm" (get status "actor")))
(div :class "rounded border border-stone-200 p-3 text-center"
(p :class "text-xs text-stone-400 uppercase" "Domain")
(p :class "font-semibold text-sm" (or (get status "domain") "—"))))))
;; --- Actor ---
(~docs/subsection :title "Actor Identity"
(let ((actor (helper "pub-actor-data")))
(div :class "rounded border border-stone-200 bg-stone-50 p-4 space-y-2"
(div :class "flex items-center gap-3"
(div :class "w-10 h-10 rounded-full bg-violet-100 flex items-center justify-center text-violet-700 font-bold" "sx")
(div
(p :class "font-semibold" (get actor "display-name"))
(p :class "text-sm text-stone-500" (str "@" (get actor "preferred-username") "@" (get actor "domain")))))
(p :class "text-sm text-stone-600" (get actor "summary"))
(details :class "text-xs"
(summary :class "text-stone-400 cursor-pointer" "Public key")
(pre :class "mt-2 bg-stone-100 rounded p-2 text-xs overflow-x-auto" (get actor "public-key-pem"))))))
;; --- Collections ---
(~docs/subsection :title "Collections"
(let ((collections (helper "pub-collections-data")))
(div :class "grid gap-3"
(map (fn (c)
(div :class "rounded border border-stone-200 p-4 hover:border-violet-300 transition-colors"
(div :class "flex items-center justify-between"
(div
(h4 :class "font-semibold text-stone-800" (get c "name"))
(p :class "text-sm text-stone-500" (get c "description")))
(span :class "text-xs font-mono text-violet-600 bg-violet-50 px-2 py-1 rounded"
(str "/pub/" (get c "slug"))))))
collections))))
;; --- Published Documents ---
(~docs/subsection :title "Published Documents"
(let ((specs (helper "pub-collection-items" "core-specs")))
(when (not (get specs "error"))
(div :class "space-y-2"
(h4 :class "text-sm font-semibold text-stone-500 uppercase tracking-wide"
(get specs "name"))
(map (fn (d)
(when (!= (get d "slug") "")
(div :class "rounded border border-stone-200 p-3 flex items-center justify-between"
(div
(p :class "font-medium text-stone-800" (get d "title"))
(p :class "text-xs text-stone-400" (get d "summary")))
(div :class "text-right"
(p :class "text-xs font-mono text-emerald-600 truncate max-w-48" (get d "cid"))
(p :class "text-xs text-stone-400" (str (get d "size") " bytes"))))))
(get specs "items"))))))
;; --- Outbox ---
(~docs/subsection :title "Recent Activity"
(let ((outbox (helper "pub-outbox-data" "")))
(if (= (get outbox "total") 0)
(p :class "text-sm text-stone-400 italic" "No activities yet.")
(div :class "space-y-2"
(p :class "text-xs text-stone-400" (str (get outbox "total") " total activities"))
(map (fn (a)
(when (!= (get a "type") "")
(div :class "rounded border border-stone-200 p-3 flex items-center gap-3"
(span :class "text-xs font-semibold text-white bg-violet-500 px-2 py-0.5 rounded"
(get a "type"))
(span :class "text-xs text-stone-500" (get a "published"))
(when (!= (get a "cid") "")
(span :class "text-xs font-mono text-emerald-600 truncate max-w-48"
(get a "cid"))))))
(get outbox "items"))))))
;; --- Followers ---
(~docs/subsection :title "Followers"
(let ((followers (helper "pub-followers-data")))
(if (empty? followers)
(p :class "text-sm text-stone-400 italic" "No followers yet.")
(div :class "space-y-2"
(map (fn (f)
(when (!= (get f "acct") "")
(div :class "rounded border border-stone-200 p-3 flex items-center gap-2"
(div :class "w-8 h-8 rounded-full bg-sky-100 flex items-center justify-center text-sky-700 text-xs font-bold" "F")
(p :class "text-sm font-mono text-stone-600 truncate" (get f "acct")))))
followers))))
;; --- API Endpoints ---
(~docs/subsection :title "Try the API"
(p :class "text-sm text-stone-600 mb-2" "All endpoints return " (code "text/sx") ". Try them directly:")
(div :class "grid grid-cols-2 sm:grid-cols-3 gap-2"
(map (fn (endpoint)
(a :href (get endpoint "href")
:class "block rounded border border-stone-200 p-2 text-center hover:border-violet-300 hover:bg-violet-50 transition-colors text-xs font-mono text-stone-600"
(get endpoint "label")))
(list
{"label" "GET /pub/actor" "href" "/pub/actor"}
{"label" "GET /pub/status" "href" "/pub/status"}
{"label" "GET /pub/collections" "href" "/pub/collections"}
{"label" "GET /pub/outbox" "href" "/pub/outbox"}
{"label" "GET /pub/followers" "href" "/pub/followers"}
{"label" "GET /pub/following" "href" "/pub/following"}))))))))

View File

@@ -37,6 +37,23 @@ def _register_sx_helpers() -> None:
"spec-explorer-data": _spec_explorer_data,
"spec-explorer-data-by-slug": _spec_explorer_data_by_slug,
"handler-source": _handler_source,
# sx-pub helpers (only functional when DATABASE_URL is set)
"pub-actor-data": _pub_actor_data,
"pub-collections-data": _pub_collections_data,
"pub-status-data": _pub_status_data,
"pub-publish": _pub_publish,
"pub-collection-items": _pub_collection_items,
"pub-resolve-document": _pub_resolve_document,
"pub-fetch-cid": _pub_fetch_cid,
"pub-outbox-data": _pub_outbox_data,
"pub-followers-data": _pub_followers_data,
"pub-following-data": _pub_following_data,
"pub-follow-remote": _pub_follow_remote,
"pub-process-inbox": _pub_process_inbox,
"pub-deliver-to-followers": _pub_deliver_to_followers,
"pub-request-body": _pub_request_body,
"pub-anchor-pending": _pub_anchor_pending,
"pub-verify-anchor": _pub_verify_anchor,
})
@@ -1717,3 +1734,87 @@ def _page_helpers_demo_data() -> dict:
results["attr-keys"] = list(ATTR_DETAILS.keys())
return results
# ---------------------------------------------------------------------------
# sx-pub helpers — thin wrappers for SX access
# ---------------------------------------------------------------------------
async def _pub_actor_data():
from .pub_helpers import get_or_create_actor
return await get_or_create_actor()
async def _pub_collections_data():
from .pub_helpers import list_collections
return await list_collections()
async def _pub_status_data():
from .pub_helpers import check_status
return await check_status()
async def _pub_publish(collection, slug, content, title="", summary=""):
from .pub_helpers import publish_document
return await publish_document(collection, slug, content, title, summary)
async def _pub_collection_items(collection_slug):
from .pub_helpers import collection_items
return await collection_items(collection_slug)
async def _pub_resolve_document(collection_slug, slug):
from .pub_helpers import resolve_document
return await resolve_document(collection_slug, slug)
async def _pub_fetch_cid(cid):
from .pub_helpers import fetch_cid
return await fetch_cid(cid)
async def _pub_outbox_data(page=""):
from .pub_helpers import outbox_data
return await outbox_data(page)
async def _pub_followers_data():
from .pub_helpers import followers_data
return await followers_data()
async def _pub_following_data():
from .pub_helpers import following_data
return await following_data()
async def _pub_follow_remote(actor_url):
from .pub_helpers import follow_remote
return await follow_remote(actor_url)
async def _pub_process_inbox(body_sx):
from .pub_helpers import process_inbox
return await process_inbox(body_sx)
async def _pub_deliver_to_followers(activity_sx):
from .pub_helpers import deliver_to_followers
return await deliver_to_followers(activity_sx)
async def _pub_request_body():
from .pub_helpers import get_request_body
return await get_request_body()
async def _pub_anchor_pending():
from .pub_helpers import anchor_pending
return await anchor_pending()
async def _pub_verify_anchor(cid):
from .pub_helpers import verify_cid_anchor
return await verify_cid_anchor(cid)

941
sx/sxc/pages/pub_helpers.py Normal file
View File

@@ -0,0 +1,941 @@
"""sx-pub Python IO helpers — actor, IPFS, collections, publishing, federation.
These are called from SX defhandlers via (helper "pub-..." args...).
All DB access uses g.s (per-request async session from register_db).
"""
from __future__ import annotations
import hashlib
import logging
import os
from datetime import datetime, timezone
from typing import Any
logger = logging.getLogger("sx.pub")
SX_PUB_DOMAIN = os.getenv("SX_PUB_DOMAIN", "pub.sx-web.org")
async def get_or_create_actor() -> dict[str, Any]:
"""Get or create the singleton sx-pub actor. Auto-generates RSA keypair."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubActor
result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
actor = result.scalar_one_or_none()
if actor is None:
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048,
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("utf-8")
actor = SxPubActor(
preferred_username="sx",
display_name="SX Language",
summary="Federated SX specification publisher",
public_key_pem=public_pem,
private_key_pem=private_pem,
domain=SX_PUB_DOMAIN,
)
g.s.add(actor)
await g.s.flush()
logger.info("Created sx-pub actor id=%d domain=%s", actor.id, SX_PUB_DOMAIN)
# Seed default collections on first run
await seed_default_collections()
return {
"preferred-username": actor.preferred_username,
"display-name": actor.display_name or actor.preferred_username,
"summary": actor.summary or "",
"public-key-pem": actor.public_key_pem,
"domain": actor.domain,
}
async def seed_default_collections() -> None:
"""Create default collections if they don't exist."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection
defaults = [
("core-specs", "Core Specifications", "Language spec files — evaluator, parser, primitives, render", 0),
("platforms", "Platforms", "Host platform implementations — JavaScript, Python, OCaml", 1),
("components", "Components", "Reusable UI components published as content-addressed SX", 2),
("libraries", "Libraries", "SX library modules — stdlib, signals, freeze, web forms", 3),
]
for slug, name, description, order in defaults:
exists = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == slug)
)
if exists.scalar_one_or_none() is None:
g.s.add(SxPubCollection(
slug=slug, name=name, description=description, sort_order=order,
))
await g.s.flush()
logger.info("Seeded %d default collections", len(defaults))
async def list_collections() -> list[dict[str, Any]]:
"""List all pub collections."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection
result = await g.s.execute(
select(SxPubCollection).order_by(SxPubCollection.sort_order)
)
return [
{
"slug": c.slug,
"name": c.name,
"description": c.description or "",
}
for c in result.scalars().all()
]
async def check_status() -> dict[str, Any]:
"""Health check — DB, IPFS, actor."""
status: dict[str, Any] = {"healthy": "true"}
# DB
try:
from quart import g
from sqlalchemy import text
await g.s.execute(text("SELECT 1"))
status["db"] = "connected"
except Exception as e:
status["db"] = f"error: {e}"
status["healthy"] = "false"
# IPFS
try:
from shared.utils.ipfs_client import is_available
ok = await is_available()
status["ipfs"] = "available" if ok else "unavailable"
except Exception as e:
status["ipfs"] = f"error: {e}"
# Actor
try:
actor = await get_or_create_actor()
status["actor"] = actor["preferred-username"]
status["domain"] = actor["domain"]
except Exception as e:
status["actor"] = f"error: {e}"
status["healthy"] = "false"
return status
# ---------------------------------------------------------------------------
# Phase 2: Publishing + Browsing
# ---------------------------------------------------------------------------
async def publish_document(collection_slug: str, slug: str, content: str,
title: str = "", summary: str = "") -> dict[str, Any]:
"""Pin SX content to IPFS and store in DB. Returns doc info dict."""
import hashlib
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection, SxPubDocument
from shared.utils.ipfs_client import add_bytes
# Resolve collection
result = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == collection_slug)
)
collection = result.scalar_one_or_none()
if collection is None:
return {"error": f"Collection not found: {collection_slug}"}
# Hash content
content_bytes = content.encode("utf-8")
content_hash = hashlib.sha3_256(content_bytes).hexdigest()
# Pin to IPFS
try:
cid = await add_bytes(content_bytes, pin=True)
except Exception as e:
logger.error("IPFS pin failed for %s/%s: %s", collection_slug, slug, e)
return {"error": f"IPFS pin failed: {e}"}
# Upsert document
result = await g.s.execute(
select(SxPubDocument).where(
SxPubDocument.collection_id == collection.id,
SxPubDocument.slug == slug,
)
)
doc = result.scalar_one_or_none()
if doc is None:
doc = SxPubDocument(
collection_id=collection.id,
slug=slug,
title=title or slug,
summary=summary,
content_hash=content_hash,
ipfs_cid=cid,
size_bytes=len(content_bytes),
status="published",
)
g.s.add(doc)
else:
doc.content_hash = content_hash
doc.ipfs_cid = cid
doc.size_bytes = len(content_bytes)
doc.status = "published"
if title:
doc.title = title
if summary:
doc.summary = summary
await g.s.flush()
logger.info("Published %s/%s%s (%d bytes)", collection_slug, slug, cid, len(content_bytes))
# Record Publish activity in outbox
from shared.models.sx_pub import SxPubActivity
g.s.add(SxPubActivity(
activity_type="Publish",
object_type="SxDocument",
object_data={
"path": f"/pub/{collection_slug}/{slug}",
"cid": cid,
"collection": collection_slug,
"slug": slug,
"title": title or slug,
"summary": summary,
},
ipfs_cid=cid,
))
await g.s.flush()
return {
"path": f"/pub/{collection_slug}/{slug}",
"cid": cid,
"hash": content_hash,
"size": len(content_bytes),
"collection": collection_slug,
"slug": slug,
"title": doc.title or slug,
}
async def collection_items(collection_slug: str) -> dict[str, Any]:
"""List published documents in a collection."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection, SxPubDocument
result = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == collection_slug)
)
collection = result.scalar_one_or_none()
if collection is None:
return {"error": f"Collection not found: {collection_slug}"}
result = await g.s.execute(
select(SxPubDocument).where(
SxPubDocument.collection_id == collection.id,
SxPubDocument.status == "published",
).order_by(SxPubDocument.slug)
)
docs = result.scalars().all()
return {
"collection": collection_slug,
"name": collection.name,
"description": collection.description or "",
"items": [
{
"slug": d.slug,
"title": d.title or d.slug,
"summary": d.summary or "",
"cid": d.ipfs_cid or "",
"size": d.size_bytes or 0,
}
for d in docs
],
}
async def resolve_document(collection_slug: str, slug: str) -> dict[str, Any]:
"""Resolve a document path to its content via IPFS."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection, SxPubDocument
from shared.utils.ipfs_client import get_bytes
result = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == collection_slug)
)
collection = result.scalar_one_or_none()
if collection is None:
return {"error": "not-found"}
result = await g.s.execute(
select(SxPubDocument).where(
SxPubDocument.collection_id == collection.id,
SxPubDocument.slug == slug,
)
)
doc = result.scalar_one_or_none()
if doc is None or not doc.ipfs_cid:
return {"error": "not-found"}
content_bytes = await get_bytes(doc.ipfs_cid)
if content_bytes is None:
return {"error": "ipfs-unavailable"}
return {
"slug": doc.slug,
"title": doc.title or doc.slug,
"summary": doc.summary or "",
"cid": doc.ipfs_cid,
"collection": collection_slug,
"content": content_bytes.decode("utf-8", errors="replace"),
}
async def fetch_cid(cid: str) -> dict[str, Any]:
"""Fetch raw content from IPFS by CID."""
from shared.utils.ipfs_client import get_bytes
content_bytes = await get_bytes(cid)
if content_bytes is None:
return {"error": "not-found"}
return {
"cid": cid,
"content": content_bytes.decode("utf-8", errors="replace"),
"size": len(content_bytes),
}
# ---------------------------------------------------------------------------
# Phase 3: Federation — outbox, inbox, follow, delivery, signatures
# ---------------------------------------------------------------------------
async def outbox_data(page: str = "") -> dict[str, Any]:
"""List published activities (outbox)."""
from quart import g
from sqlalchemy import select, func as sa_func
from shared.models.sx_pub import SxPubActivity
page_num = int(page) if page else 1
per_page = 20
offset = (page_num - 1) * per_page
total_result = await g.s.execute(select(sa_func.count(SxPubActivity.id)))
total = total_result.scalar() or 0
result = await g.s.execute(
select(SxPubActivity)
.order_by(SxPubActivity.published.desc())
.offset(offset).limit(per_page)
)
activities = result.scalars().all()
return {
"total": total,
"page": page_num,
"items": [
{
"type": a.activity_type,
"object-type": a.object_type or "",
"published": a.published.isoformat() if a.published else "",
"cid": a.ipfs_cid or "",
"data": a.object_data or {},
}
for a in activities
],
}
async def followers_data() -> list[dict[str, Any]]:
"""List followers."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollower
result = await g.s.execute(
select(SxPubFollower).where(SxPubFollower.state == "accepted")
.order_by(SxPubFollower.created_at.desc())
)
return [
{
"acct": f.follower_acct,
"actor-url": f.follower_actor_url,
"inbox": f.follower_inbox,
}
for f in result.scalars().all()
]
async def following_data() -> list[dict[str, Any]]:
"""List who we follow."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollowing
result = await g.s.execute(
select(SxPubFollowing).where(SxPubFollowing.state == "accepted")
.order_by(SxPubFollowing.created_at.desc())
)
return [
{
"actor-url": f.remote_actor_url,
"inbox": f.remote_inbox,
}
for f in result.scalars().all()
]
def _sign_request(method: str, url: str, body: str, private_key_pem: str,
key_id: str) -> dict[str, str]:
"""Generate HTTP Signature headers for an outgoing request."""
from urllib.parse import urlparse
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
import base64
parsed = urlparse(url)
path = parsed.path
host = parsed.netloc
date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT")
# Build signature string
digest = "SHA-256=" + base64.b64encode(
hashlib.sha256(body.encode("utf-8")).digest()
).decode("ascii")
signed_string = (
f"(request-target): {method.lower()} {path}\n"
f"host: {host}\n"
f"date: {date}\n"
f"digest: {digest}"
)
private_key = serialization.load_pem_private_key(
private_key_pem.encode("utf-8"), password=None,
)
signature = base64.b64encode(
private_key.sign(
signed_string.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
).decode("ascii")
sig_header = (
f'keyId="{key_id}",'
f'algorithm="rsa-sha256",'
f'headers="(request-target) host date digest",'
f'signature="{signature}"'
)
return {
"Host": host,
"Date": date,
"Digest": digest,
"Signature": sig_header,
"Content-Type": "text/sx; charset=utf-8",
}
def _verify_signature(headers: dict, method: str, path: str,
body: str, public_key_pem: str) -> bool:
"""Verify HTTP Signature on an incoming request."""
import base64
import re
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
sig_header = headers.get("Signature", "")
if not sig_header:
return False
# Parse signature header
parts = {}
for match in re.finditer(r'(\w+)="([^"]*)"', sig_header):
parts[match.group(1)] = match.group(2)
if "signature" not in parts or "headers" not in parts:
return False
# Reconstruct signed string
signed_headers = parts["headers"].split()
lines = []
for h in signed_headers:
if h == "(request-target)":
lines.append(f"(request-target): {method.lower()} {path}")
else:
val = headers.get(h.title(), headers.get(h, ""))
lines.append(f"{h}: {val}")
signed_string = "\n".join(lines)
try:
public_key = serialization.load_pem_public_key(
public_key_pem.encode("utf-8"),
)
public_key.verify(
base64.b64decode(parts["signature"]),
signed_string.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
return True
except Exception as e:
logger.warning("Signature verification failed: %s", e)
return False
async def follow_remote(actor_url: str) -> dict[str, Any]:
"""Send a Follow activity to a remote sx-pub server."""
import httpx
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollowing, SxPubActor
# Check not already following
result = await g.s.execute(
select(SxPubFollowing).where(SxPubFollowing.remote_actor_url == actor_url)
)
existing = result.scalar_one_or_none()
if existing:
return {"status": existing.state, "actor-url": actor_url}
# Fetch remote actor document
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(actor_url, headers={"Accept": "text/sx"})
resp.raise_for_status()
remote_actor_sx = resp.text
except Exception as e:
return {"error": f"Failed to fetch remote actor: {e}"}
# Parse inbox URL from the SX actor document
# Simple extraction — look for :inbox "..."
import re
inbox_match = re.search(r':inbox\s+"([^"]+)"', remote_actor_sx)
if not inbox_match:
return {"error": "Could not find inbox in remote actor document"}
# Build absolute inbox URL
from urllib.parse import urljoin
remote_inbox = urljoin(actor_url, inbox_match.group(1))
# Get our actor for signing
actor_result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
our_actor = actor_result.scalar_one_or_none()
if not our_actor:
return {"error": "Local actor not initialized"}
our_id = f"https://{our_actor.domain}/pub/actor"
# Build Follow activity
follow_sx = (
f'(Follow\n'
f' :actor "{our_id}"\n'
f' :object "{actor_url}")'
)
# Sign and send
key_id = f"{our_id}#main-key"
headers = _sign_request("POST", remote_inbox, follow_sx,
our_actor.private_key_pem, key_id)
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(remote_inbox, content=follow_sx, headers=headers)
logger.info("Sent Follow to %s%d", remote_inbox, resp.status_code)
except Exception as e:
logger.error("Follow delivery failed to %s: %s", remote_inbox, e)
# Store the following record
following = SxPubFollowing(
remote_actor_url=actor_url,
remote_inbox=remote_inbox,
state="pending",
)
g.s.add(following)
await g.s.flush()
return {"status": "pending", "actor-url": actor_url, "inbox": remote_inbox}
async def process_inbox(body_sx: str) -> dict[str, Any]:
"""Process an incoming activity from a remote sx-pub server."""
import re
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import (
SxPubFollower, SxPubFollowing, SxPubActivity, SxPubActor,
)
from shared.utils.ipfs_client import pin_cid
# Parse activity type and actor
type_match = re.match(r'\((\w+)', body_sx.strip())
actor_match = re.search(r':actor\s+"([^"]+)"', body_sx)
object_match = re.search(r':object\s+"([^"]+)"', body_sx)
if not type_match:
return {"error": "Could not parse activity type"}
activity_type = type_match.group(1)
remote_actor = actor_match.group(1) if actor_match else ""
object_val = object_match.group(1) if object_match else ""
logger.info("Inbox received: %s from %s", activity_type, remote_actor)
if activity_type == "Follow":
# Someone wants to follow us — auto-accept
inbox_match = re.search(r':inbox\s+"([^"]+)"', body_sx)
remote_inbox = inbox_match.group(1) if inbox_match else ""
# If no inbox in activity, try fetching the remote actor
if not remote_inbox and remote_actor:
import httpx
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(remote_actor, headers={"Accept": "text/sx"})
im = re.search(r':inbox\s+"([^"]+)"', resp.text)
if im:
from urllib.parse import urljoin
remote_inbox = urljoin(remote_actor, im.group(1))
except Exception:
pass
# Store follower
result = await g.s.execute(
select(SxPubFollower).where(SxPubFollower.follower_actor_url == remote_actor)
)
follower = result.scalar_one_or_none()
if follower is None:
follower = SxPubFollower(
follower_acct=remote_actor,
follower_inbox=remote_inbox or remote_actor,
follower_actor_url=remote_actor,
state="accepted",
)
g.s.add(follower)
await g.s.flush()
logger.info("Accepted follow from %s", remote_actor)
# Send Accept back
actor_result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
our_actor = actor_result.scalar_one_or_none()
if our_actor and remote_inbox:
our_id = f"https://{our_actor.domain}/pub/actor"
accept_sx = (
f'(Accept\n'
f' :actor "{our_id}"\n'
f' :object {body_sx})'
)
key_id = f"{our_id}#main-key"
headers = _sign_request("POST", remote_inbox, accept_sx,
our_actor.private_key_pem, key_id)
import httpx
try:
async with httpx.AsyncClient(timeout=10) as client:
await client.post(remote_inbox, content=accept_sx, headers=headers)
except Exception as e:
logger.warning("Accept delivery failed: %s", e)
return {"accepted": remote_actor}
elif activity_type == "Accept":
# Our follow was accepted — update state
if object_val:
result = await g.s.execute(
select(SxPubFollowing).where(
SxPubFollowing.remote_actor_url == remote_actor
)
)
following = result.scalar_one_or_none()
if following:
following.state = "accepted"
following.accepted_at = datetime.now(timezone.utc)
await g.s.flush()
logger.info("Follow accepted by %s", remote_actor)
return {"accepted-by": remote_actor}
elif activity_type == "Publish":
# Pin the published CID locally
cid_match = re.search(r':cid\s+"([^"]+)"', body_sx)
if cid_match:
cid = cid_match.group(1)
pinned = await pin_cid(cid)
logger.info("Mirrored CID %s from %s (pinned=%s)", cid, remote_actor, pinned)
# Record the activity
g.s.add(SxPubActivity(
activity_type="Publish",
object_type="SxDocument",
object_data={"remote_actor": remote_actor, "body": body_sx},
ipfs_cid=cid_match.group(1) if cid_match else None,
))
await g.s.flush()
return {"mirrored": cid_match.group(1) if cid_match else ""}
return {"ignored": activity_type}
async def deliver_to_followers(activity_sx: str) -> dict[str, Any]:
"""Deliver an activity to all follower inboxes."""
import httpx
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubFollower, SxPubActor
actor_result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
our_actor = actor_result.scalar_one_or_none()
if not our_actor:
return {"error": "Actor not initialized", "delivered": 0}
our_id = f"https://{our_actor.domain}/pub/actor"
key_id = f"{our_id}#main-key"
result = await g.s.execute(
select(SxPubFollower).where(SxPubFollower.state == "accepted")
)
followers = result.scalars().all()
delivered = 0
failed = 0
for follower in followers:
headers = _sign_request("POST", follower.follower_inbox, activity_sx,
our_actor.private_key_pem, key_id)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
follower.follower_inbox,
content=activity_sx,
headers=headers,
)
if resp.status_code < 300:
delivered += 1
else:
failed += 1
logger.warning("Delivery to %s returned %d",
follower.follower_inbox, resp.status_code)
except Exception as e:
failed += 1
logger.error("Delivery to %s failed: %s", follower.follower_inbox, e)
logger.info("Delivered to %d/%d followers (%d failed)",
delivered, len(followers), failed)
return {"delivered": delivered, "failed": failed, "total": len(followers)}
async def get_request_body() -> str:
"""Get the raw request body as text."""
from quart import request
data = await request.get_data(as_text=True)
return data
async def get_request_headers() -> dict[str, str]:
"""Get request headers as a dict."""
from quart import request
return dict(request.headers)
async def get_request_method() -> str:
"""Get the HTTP method."""
from quart import request
return request.method
async def get_request_path() -> str:
"""Get the request path."""
from quart import request
return request.path
# ---------------------------------------------------------------------------
# Phase 4: Anchoring — Merkle trees, OTS, verification
# ---------------------------------------------------------------------------
async def anchor_pending() -> dict[str, Any]:
"""Anchor all unanchored Publish activities into a Merkle tree.
1. Collect unanchored activities (by CID)
2. Build Merkle tree
3. Pin tree to IPFS
4. Submit root to OpenTimestamps
5. Store proof on IPFS
6. Record anchor in DB
"""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubActivity
from shared.utils.anchoring import (
build_merkle_tree, submit_to_opentimestamps,
)
from shared.utils.ipfs_client import add_json, add_bytes, is_available
# Find unanchored Publish activities with CIDs
result = await g.s.execute(
select(SxPubActivity).where(
SxPubActivity.activity_type == "Publish",
SxPubActivity.ipfs_cid.isnot(None),
SxPubActivity.object_data["anchor_tree_cid"].is_(None),
).order_by(SxPubActivity.created_at.asc())
.limit(100)
)
activities = result.scalars().all()
if not activities:
return {"status": "nothing-to-anchor", "count": 0}
# Build Merkle tree from CIDs
cids = [a.ipfs_cid for a in activities if a.ipfs_cid]
if not cids:
return {"status": "no-cids", "count": 0}
tree = build_merkle_tree(cids)
merkle_root = tree["root"]
# Pin tree to IPFS
tree_cid = None
ots_proof_cid = None
if await is_available():
try:
tree_data = {
"root": merkle_root,
"leaves": tree["leaves"],
"cids": cids,
"created_at": datetime.now(timezone.utc).isoformat(),
}
tree_cid = await add_json(tree_data)
logger.info("Merkle tree pinned: %s (%d leaves)", tree_cid, len(cids))
except Exception as e:
logger.error("IPFS tree storage failed: %s", e)
# Submit to OpenTimestamps
ots_proof = await submit_to_opentimestamps(merkle_root)
if ots_proof and await is_available():
try:
ots_proof_cid = await add_bytes(ots_proof)
logger.info("OTS proof pinned: %s", ots_proof_cid)
except Exception as e:
logger.error("IPFS OTS proof storage failed: %s", e)
# Record anchor in activities (store in object_data)
anchor_info = {
"merkle_root": merkle_root,
"tree_cid": tree_cid,
"ots_proof_cid": ots_proof_cid,
"activity_count": len(activities),
"anchored_at": datetime.now(timezone.utc).isoformat(),
}
for a in activities:
data = dict(a.object_data or {})
data["anchor_tree_cid"] = tree_cid
data["anchor_merkle_root"] = merkle_root
data["anchor_ots_cid"] = ots_proof_cid
a.object_data = data
# Also record anchor as its own activity
from shared.models.sx_pub import SxPubActivity as SPA
g.s.add(SPA(
activity_type="Anchor",
object_type="MerkleTree",
object_data=anchor_info,
ipfs_cid=tree_cid,
))
await g.s.flush()
logger.info("Anchored %d activities, root=%s, tree=%s, ots=%s",
len(activities), merkle_root, tree_cid, ots_proof_cid)
return {
"status": "anchored",
"count": len(activities),
"merkle-root": merkle_root,
"tree-cid": tree_cid or "",
"ots-proof-cid": ots_proof_cid or "",
}
async def verify_cid_anchor(cid: str) -> dict[str, Any]:
"""Verify the anchor proof for a specific CID."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubActivity
from shared.utils.anchoring import build_merkle_tree, verify_merkle_proof
from shared.utils.ipfs_client import get_bytes
# Find the Publish activity for this CID
result = await g.s.execute(
select(SxPubActivity).where(
SxPubActivity.activity_type == "Publish",
SxPubActivity.ipfs_cid == cid,
)
)
activity = result.scalar_one_or_none()
if not activity:
return {"error": "not-found", "cid": cid}
data = activity.object_data or {}
tree_cid = data.get("anchor_tree_cid")
merkle_root = data.get("anchor_merkle_root")
ots_cid = data.get("anchor_ots_cid")
if not tree_cid:
return {"status": "not-anchored", "cid": cid}
# Fetch the Merkle tree from IPFS to verify
verified = False
if tree_cid:
tree_bytes = await get_bytes(tree_cid)
if tree_bytes:
import json
try:
tree_data = json.loads(tree_bytes)
tree = build_merkle_tree(tree_data["cids"])
from shared.utils.anchoring import get_merkle_proof
proof = get_merkle_proof(tree, cid)
if proof is not None:
verified = verify_merkle_proof(cid, proof, tree["root"])
except Exception as e:
logger.warning("Merkle verification failed: %s", e)
return {
"status": "anchored" if verified else "unverified",
"cid": cid,
"merkle-root": merkle_root or "",
"tree-cid": tree_cid or "",
"ots-proof-cid": ots_cid or "",
"verified": "true" if verified else "false",
"published": activity.published.isoformat() if activity.published else "",
}