Files
rose-ash/sx/sxc/pages/pub_helpers.py
giles cf130c4174 sx-pub Phase 2: publish to IPFS, browse collections, resolve by path + CID
New endpoints:
- POST /pub/publish — pin SX content to IPFS, store path→CID in DB
- GET /pub/browse/<collection> — list published documents
- GET /pub/doc/<collection>/<slug> — resolve path to CID, fetch from IPFS
- GET /pub/cid/<cid> — direct CID fetch (immutable, cache forever)

New helpers: pub-publish, pub-collection-items, pub-resolve-document, pub-fetch-cid

Tested: published stdlib.sx (6.9KB) → QmQQyR3Ltqi5sFiwZh5dutPbAM4QsEBnw419RyNnTj4fFM

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 01:30:05 +00:00

313 lines
9.9 KiB
Python

"""sx-pub Python IO helpers — actor management, IPFS status, collections.
These are called from SX defhandlers via (helper "pub-..." args...).
All DB access uses g.s (per-request async session from register_db).
"""
from __future__ import annotations
import logging
import os
from typing import Any
logger = logging.getLogger("sx.pub")
SX_PUB_DOMAIN = os.getenv("SX_PUB_DOMAIN", "pub.sx-web.org")
async def get_or_create_actor() -> dict[str, Any]:
"""Get or create the singleton sx-pub actor. Auto-generates RSA keypair."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubActor
result = await g.s.execute(
select(SxPubActor).where(SxPubActor.preferred_username == "sx")
)
actor = result.scalar_one_or_none()
if actor is None:
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048,
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("utf-8")
actor = SxPubActor(
preferred_username="sx",
display_name="SX Language",
summary="Federated SX specification publisher",
public_key_pem=public_pem,
private_key_pem=private_pem,
domain=SX_PUB_DOMAIN,
)
g.s.add(actor)
await g.s.flush()
logger.info("Created sx-pub actor id=%d domain=%s", actor.id, SX_PUB_DOMAIN)
# Seed default collections on first run
await seed_default_collections()
return {
"preferred-username": actor.preferred_username,
"display-name": actor.display_name or actor.preferred_username,
"summary": actor.summary or "",
"public-key-pem": actor.public_key_pem,
"domain": actor.domain,
}
async def seed_default_collections() -> None:
"""Create default collections if they don't exist."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection
defaults = [
("core-specs", "Core Specifications", "Language spec files — evaluator, parser, primitives, render", 0),
("platforms", "Platforms", "Host platform implementations — JavaScript, Python, OCaml", 1),
("components", "Components", "Reusable UI components published as content-addressed SX", 2),
("libraries", "Libraries", "SX library modules — stdlib, signals, freeze, web forms", 3),
]
for slug, name, description, order in defaults:
exists = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == slug)
)
if exists.scalar_one_or_none() is None:
g.s.add(SxPubCollection(
slug=slug, name=name, description=description, sort_order=order,
))
await g.s.flush()
logger.info("Seeded %d default collections", len(defaults))
async def list_collections() -> list[dict[str, Any]]:
"""List all pub collections."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection
result = await g.s.execute(
select(SxPubCollection).order_by(SxPubCollection.sort_order)
)
return [
{
"slug": c.slug,
"name": c.name,
"description": c.description or "",
}
for c in result.scalars().all()
]
async def check_status() -> dict[str, Any]:
"""Health check — DB, IPFS, actor."""
status: dict[str, Any] = {"healthy": "true"}
# DB
try:
from quart import g
from sqlalchemy import text
await g.s.execute(text("SELECT 1"))
status["db"] = "connected"
except Exception as e:
status["db"] = f"error: {e}"
status["healthy"] = "false"
# IPFS
try:
from shared.utils.ipfs_client import is_available
ok = await is_available()
status["ipfs"] = "available" if ok else "unavailable"
except Exception as e:
status["ipfs"] = f"error: {e}"
# Actor
try:
actor = await get_or_create_actor()
status["actor"] = actor["preferred-username"]
status["domain"] = actor["domain"]
except Exception as e:
status["actor"] = f"error: {e}"
status["healthy"] = "false"
return status
# ---------------------------------------------------------------------------
# Phase 2: Publishing + Browsing
# ---------------------------------------------------------------------------
async def publish_document(collection_slug: str, slug: str, content: str,
title: str = "", summary: str = "") -> dict[str, Any]:
"""Pin SX content to IPFS and store in DB. Returns doc info dict."""
import hashlib
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection, SxPubDocument
from shared.utils.ipfs_client import add_bytes
# Resolve collection
result = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == collection_slug)
)
collection = result.scalar_one_or_none()
if collection is None:
return {"error": f"Collection not found: {collection_slug}"}
# Hash content
content_bytes = content.encode("utf-8")
content_hash = hashlib.sha3_256(content_bytes).hexdigest()
# Pin to IPFS
try:
cid = await add_bytes(content_bytes, pin=True)
except Exception as e:
logger.error("IPFS pin failed for %s/%s: %s", collection_slug, slug, e)
return {"error": f"IPFS pin failed: {e}"}
# Upsert document
result = await g.s.execute(
select(SxPubDocument).where(
SxPubDocument.collection_id == collection.id,
SxPubDocument.slug == slug,
)
)
doc = result.scalar_one_or_none()
if doc is None:
doc = SxPubDocument(
collection_id=collection.id,
slug=slug,
title=title or slug,
summary=summary,
content_hash=content_hash,
ipfs_cid=cid,
size_bytes=len(content_bytes),
status="published",
)
g.s.add(doc)
else:
doc.content_hash = content_hash
doc.ipfs_cid = cid
doc.size_bytes = len(content_bytes)
doc.status = "published"
if title:
doc.title = title
if summary:
doc.summary = summary
await g.s.flush()
logger.info("Published %s/%s%s (%d bytes)", collection_slug, slug, cid, len(content_bytes))
return {
"path": f"/pub/{collection_slug}/{slug}",
"cid": cid,
"hash": content_hash,
"size": len(content_bytes),
"collection": collection_slug,
"slug": slug,
"title": doc.title or slug,
}
async def collection_items(collection_slug: str) -> dict[str, Any]:
"""List published documents in a collection."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection, SxPubDocument
result = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == collection_slug)
)
collection = result.scalar_one_or_none()
if collection is None:
return {"error": f"Collection not found: {collection_slug}"}
result = await g.s.execute(
select(SxPubDocument).where(
SxPubDocument.collection_id == collection.id,
SxPubDocument.status == "published",
).order_by(SxPubDocument.slug)
)
docs = result.scalars().all()
return {
"collection": collection_slug,
"name": collection.name,
"description": collection.description or "",
"items": [
{
"slug": d.slug,
"title": d.title or d.slug,
"summary": d.summary or "",
"cid": d.ipfs_cid or "",
"size": d.size_bytes or 0,
}
for d in docs
],
}
async def resolve_document(collection_slug: str, slug: str) -> dict[str, Any]:
"""Resolve a document path to its content via IPFS."""
from quart import g
from sqlalchemy import select
from shared.models.sx_pub import SxPubCollection, SxPubDocument
from shared.utils.ipfs_client import get_bytes
result = await g.s.execute(
select(SxPubCollection).where(SxPubCollection.slug == collection_slug)
)
collection = result.scalar_one_or_none()
if collection is None:
return {"error": "not-found"}
result = await g.s.execute(
select(SxPubDocument).where(
SxPubDocument.collection_id == collection.id,
SxPubDocument.slug == slug,
)
)
doc = result.scalar_one_or_none()
if doc is None or not doc.ipfs_cid:
return {"error": "not-found"}
content_bytes = await get_bytes(doc.ipfs_cid)
if content_bytes is None:
return {"error": "ipfs-unavailable"}
return {
"slug": doc.slug,
"title": doc.title or doc.slug,
"summary": doc.summary or "",
"cid": doc.ipfs_cid,
"collection": collection_slug,
"content": content_bytes.decode("utf-8", errors="replace"),
}
async def fetch_cid(cid: str) -> dict[str, Any]:
"""Fetch raw content from IPFS by CID."""
from shared.utils.ipfs_client import get_bytes
content_bytes = await get_bytes(cid)
if content_bytes is None:
return {"error": "not-found"}
return {
"cid": cid,
"content": content_bytes.decode("utf-8", errors="replace"),
"size": len(content_bytes),
}