From 8850a0106a51acb55d5c7b84dd45b0b012b6333e Mon Sep 17 00:00:00 2001 From: giles Date: Sat, 21 Feb 2026 15:10:08 +0000 Subject: [PATCH] Add federation/ActivityPub models, contracts, and services Phase 0+1 of ActivityPub integration: - 6 ORM models (ActorProfile, APActivity, APFollower, APInboxItem, APAnchor, IPFSPin) - FederationService protocol + SqlFederationService implementation + stub - 4 DTOs (ActorProfileDTO, APActivityDTO, APFollowerDTO, APAnchorDTO) - Registry slot for federation service - Alembic migration for federation tables - IPFS async client (httpx-based) - HTTP Signatures (RSA-2048 sign/verify) - login_url() now uses AUTH_APP env var for flexible auth routing Co-Authored-By: Claude Opus 4.6 --- alembic/env.py | 2 +- .../k1i9f5g7h8_add_federation_tables.py | 142 ++++++++ contracts/dtos.py | 53 +++ contracts/protocols.py | 67 ++++ infrastructure/factory.py | 2 +- infrastructure/urls.py | 11 +- models/__init__.py | 3 + models/federation.py | 195 +++++++++++ requirements.txt | 1 + services/federation_impl.py | 304 ++++++++++++++++++ services/registry.py | 13 + services/stubs.py | 47 +++ utils/http_signatures.py | 181 +++++++++++ utils/ipfs_client.py | 141 ++++++++ 14 files changed, 1158 insertions(+), 4 deletions(-) create mode 100644 alembic/versions/k1i9f5g7h8_add_federation_tables.py create mode 100644 models/federation.py create mode 100644 services/federation_impl.py create mode 100644 utils/http_signatures.py create mode 100644 utils/ipfs_client.py diff --git a/alembic/env.py b/alembic/env.py index caef2b1..353163c 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -19,7 +19,7 @@ from shared.db.base import Base # Import ALL models so Base.metadata sees every table import shared.models # noqa: F401 User, KV, MagicLink, MenuItem, Ghost* -for _mod in ("blog.models", "market.models", "cart.models", "events.models", "glue.models"): +for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models", "glue.models"): try: __import__(_mod) except ImportError: diff --git a/alembic/versions/k1i9f5g7h8_add_federation_tables.py b/alembic/versions/k1i9f5g7h8_add_federation_tables.py new file mode 100644 index 0000000..78af7f6 --- /dev/null +++ b/alembic/versions/k1i9f5g7h8_add_federation_tables.py @@ -0,0 +1,142 @@ +"""add federation tables + +Revision ID: k1i9f5g7h8 +Revises: j0h8e4f6g7 +Create Date: 2026-02-21 + +Creates: +- ap_actor_profiles — AP identity per user +- ap_activities — local + remote AP activities +- ap_followers — remote followers +- ap_inbox_items — raw incoming AP activities +- ap_anchors — OpenTimestamps merkle batches +- ipfs_pins — IPFS content tracking (platform-wide) +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +revision = "k1i9f5g7h8" +down_revision = "j0h8e4f6g7" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # -- ap_anchors (referenced by ap_activities) ---------------------------- + op.create_table( + "ap_anchors", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("merkle_root", sa.String(128), nullable=False), + sa.Column("tree_ipfs_cid", sa.String(128), nullable=True), + sa.Column("ots_proof_cid", sa.String(128), nullable=True), + sa.Column("activity_count", sa.Integer(), nullable=False, server_default="0"), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column("confirmed_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("bitcoin_txid", sa.String(128), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + + # -- ap_actor_profiles --------------------------------------------------- + op.create_table( + "ap_actor_profiles", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("preferred_username", sa.String(64), nullable=False), + sa.Column("display_name", sa.String(255), nullable=True), + sa.Column("summary", sa.Text(), nullable=True), + sa.Column("public_key_pem", sa.Text(), nullable=False), + sa.Column("private_key_pem", sa.Text(), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("preferred_username"), + sa.UniqueConstraint("user_id"), + ) + op.create_index("ix_ap_actor_user_id", "ap_actor_profiles", ["user_id"], unique=True) + op.create_index("ix_ap_actor_username", "ap_actor_profiles", ["preferred_username"], unique=True) + + # -- ap_activities ------------------------------------------------------- + op.create_table( + "ap_activities", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("activity_id", sa.String(512), nullable=False), + sa.Column("activity_type", sa.String(64), nullable=False), + sa.Column("actor_profile_id", sa.Integer(), nullable=False), + sa.Column("object_type", sa.String(64), nullable=True), + sa.Column("object_data", postgresql.JSONB(), nullable=True), + sa.Column("published", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column("signature", postgresql.JSONB(), nullable=True), + sa.Column("is_local", sa.Boolean(), nullable=False, server_default="true"), + sa.Column("source_type", sa.String(64), nullable=True), + sa.Column("source_id", sa.Integer(), nullable=True), + sa.Column("ipfs_cid", sa.String(128), nullable=True), + sa.Column("anchor_id", sa.Integer(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.ForeignKeyConstraint(["actor_profile_id"], ["ap_actor_profiles.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["anchor_id"], ["ap_anchors.id"], ondelete="SET NULL"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("activity_id"), + ) + op.create_index("ix_ap_activity_actor", "ap_activities", ["actor_profile_id"]) + op.create_index("ix_ap_activity_source", "ap_activities", ["source_type", "source_id"]) + op.create_index("ix_ap_activity_published", "ap_activities", ["published"]) + + # -- ap_followers -------------------------------------------------------- + op.create_table( + "ap_followers", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("actor_profile_id", sa.Integer(), nullable=False), + sa.Column("follower_acct", sa.String(512), nullable=False), + sa.Column("follower_inbox", sa.String(512), nullable=False), + sa.Column("follower_actor_url", sa.String(512), nullable=False), + sa.Column("follower_public_key", sa.Text(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.ForeignKeyConstraint(["actor_profile_id"], ["ap_actor_profiles.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("actor_profile_id", "follower_acct", name="uq_follower_acct"), + ) + op.create_index("ix_ap_follower_actor", "ap_followers", ["actor_profile_id"]) + + # -- ap_inbox_items ------------------------------------------------------ + op.create_table( + "ap_inbox_items", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("actor_profile_id", sa.Integer(), nullable=False), + sa.Column("raw_json", postgresql.JSONB(), nullable=False), + sa.Column("activity_type", sa.String(64), nullable=True), + sa.Column("from_actor", sa.String(512), nullable=True), + sa.Column("state", sa.String(20), nullable=False, server_default="pending"), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column("processed_at", sa.DateTime(timezone=True), nullable=True), + sa.ForeignKeyConstraint(["actor_profile_id"], ["ap_actor_profiles.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index("ix_ap_inbox_state", "ap_inbox_items", ["state"]) + op.create_index("ix_ap_inbox_actor", "ap_inbox_items", ["actor_profile_id"]) + + # -- ipfs_pins ----------------------------------------------------------- + op.create_table( + "ipfs_pins", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("content_hash", sa.String(128), nullable=False), + sa.Column("ipfs_cid", sa.String(128), nullable=False), + sa.Column("pin_type", sa.String(64), nullable=False), + sa.Column("source_type", sa.String(64), nullable=True), + sa.Column("source_id", sa.Integer(), nullable=True), + sa.Column("size_bytes", sa.BigInteger(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("ipfs_cid"), + ) + op.create_index("ix_ipfs_pin_source", "ipfs_pins", ["source_type", "source_id"]) + op.create_index("ix_ipfs_pin_cid", "ipfs_pins", ["ipfs_cid"], unique=True) + + +def downgrade() -> None: + op.drop_table("ipfs_pins") + op.drop_table("ap_inbox_items") + op.drop_table("ap_followers") + op.drop_table("ap_activities") + op.drop_table("ap_actor_profiles") + op.drop_table("ap_anchors") diff --git a/contracts/dtos.py b/contracts/dtos.py index e78cd95..34223e9 100644 --- a/contracts/dtos.py +++ b/contracts/dtos.py @@ -134,3 +134,56 @@ class CartSummaryDTO: items: list[CartItemDTO] = field(default_factory=list) ticket_count: int = 0 ticket_total: Decimal = Decimal("0") + + +# --------------------------------------------------------------------------- +# Federation / ActivityPub domain +# --------------------------------------------------------------------------- + +@dataclass(frozen=True, slots=True) +class ActorProfileDTO: + id: int + user_id: int + preferred_username: str + public_key_pem: str + display_name: str | None = None + summary: str | None = None + inbox_url: str | None = None + outbox_url: str | None = None + created_at: datetime | None = None + + +@dataclass(frozen=True, slots=True) +class APActivityDTO: + id: int + activity_id: str + activity_type: str + actor_profile_id: int + object_type: str | None = None + object_data: dict | None = None + published: datetime | None = None + is_local: bool = True + source_type: str | None = None + source_id: int | None = None + ipfs_cid: str | None = None + + +@dataclass(frozen=True, slots=True) +class APFollowerDTO: + id: int + actor_profile_id: int + follower_acct: str + follower_inbox: str + follower_actor_url: str + created_at: datetime | None = None + + +@dataclass(frozen=True, slots=True) +class APAnchorDTO: + id: int + merkle_root: str + activity_count: int = 0 + tree_ipfs_cid: str | None = None + ots_proof_cid: str | None = None + confirmed_at: datetime | None = None + bitcoin_txid: str | None = None diff --git a/contracts/protocols.py b/contracts/protocols.py index 63ea90c..360dd3b 100644 --- a/contracts/protocols.py +++ b/contracts/protocols.py @@ -19,6 +19,9 @@ from .dtos import ( ProductDTO, CartItemDTO, CartSummaryDTO, + ActorProfileDTO, + APActivityDTO, + APFollowerDTO, ) @@ -162,3 +165,67 @@ class CartService(Protocol): async def adopt_cart_for_user( self, session: AsyncSession, user_id: int, session_id: str, ) -> None: ... + + +@runtime_checkable +class FederationService(Protocol): + # -- Actor management ----------------------------------------------------- + async def get_actor_by_username( + self, session: AsyncSession, username: str, + ) -> ActorProfileDTO | None: ... + + async def get_actor_by_user_id( + self, session: AsyncSession, user_id: int, + ) -> ActorProfileDTO | None: ... + + async def create_actor( + self, session: AsyncSession, user_id: int, preferred_username: str, + display_name: str | None = None, summary: str | None = None, + ) -> ActorProfileDTO: ... + + async def username_available( + self, session: AsyncSession, username: str, + ) -> bool: ... + + # -- Publishing (core cross-domain API) ----------------------------------- + async def publish_activity( + self, session: AsyncSession, *, + actor_user_id: int, + activity_type: str, + object_type: str, + object_data: dict, + source_type: str | None = None, + source_id: int | None = None, + ) -> APActivityDTO: ... + + # -- Queries -------------------------------------------------------------- + async def get_activity( + self, session: AsyncSession, activity_id: str, + ) -> APActivityDTO | None: ... + + async def get_outbox( + self, session: AsyncSession, username: str, + page: int = 1, per_page: int = 20, + ) -> tuple[list[APActivityDTO], int]: ... + + async def get_activity_for_source( + self, session: AsyncSession, source_type: str, source_id: int, + ) -> APActivityDTO | None: ... + + # -- Followers ------------------------------------------------------------ + async def get_followers( + self, session: AsyncSession, username: str, + ) -> list[APFollowerDTO]: ... + + async def add_follower( + self, session: AsyncSession, username: str, + follower_acct: str, follower_inbox: str, follower_actor_url: str, + follower_public_key: str | None = None, + ) -> APFollowerDTO: ... + + async def remove_follower( + self, session: AsyncSession, username: str, follower_acct: str, + ) -> bool: ... + + # -- Stats ---------------------------------------------------------------- + async def get_stats(self, session: AsyncSession) -> dict: ... diff --git a/infrastructure/factory.py b/infrastructure/factory.py index 5238af6..8eadda9 100644 --- a/infrastructure/factory.py +++ b/infrastructure/factory.py @@ -11,7 +11,7 @@ from shared.config import init_config, config, pretty from shared.models import KV # ensure shared models imported # Register all app model classes with SQLAlchemy so cross-domain # relationship() string references resolve correctly. -for _mod in ("blog.models", "market.models", "cart.models", "events.models"): +for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models"): try: __import__(_mod) except ImportError: diff --git a/infrastructure/urls.py b/infrastructure/urls.py index ef1e85e..cdcd0e8 100644 --- a/infrastructure/urls.py +++ b/infrastructure/urls.py @@ -37,6 +37,10 @@ def events_url(path: str = "/") -> str: return app_url("events", path) +def federation_url(path: str = "/") -> str: + return app_url("federation", path) + + def page_cart_url(page_slug: str, path: str = "/") -> str: if not path.startswith("/"): path = "/" + path @@ -62,6 +66,9 @@ def market_product_url(product_slug: str, suffix: str = "", market_place=None) - def login_url(next_url: str = "") -> str: + # Auth lives in blog (coop) for now. Set AUTH_APP=federation to switch. + auth_app = os.getenv("AUTH_APP", "coop") + base = app_url(auth_app, "/auth/login/") if next_url: - return coop_url(f"/auth/login/?next={quote(next_url, safe='')}") - return coop_url("/auth/login/") + return f"{base}?next={quote(next_url, safe='')}" + return base diff --git a/models/__init__.py b/models/__init__.py index fcb3aa3..e4fcc27 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -27,3 +27,6 @@ from .calendars import ( ) from .container_relation import ContainerRelation from .menu_node import MenuNode +from .federation import ( + ActorProfile, APActivity, APFollower, APInboxItem, APAnchor, IPFSPin, +) diff --git a/models/federation.py b/models/federation.py new file mode 100644 index 0000000..0c96763 --- /dev/null +++ b/models/federation.py @@ -0,0 +1,195 @@ +"""Federation / ActivityPub ORM models. + +These models support AP identity, activities, followers, inbox processing, +IPFS content addressing, and OpenTimestamps anchoring. +""" +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import ( + String, Integer, DateTime, Text, Boolean, BigInteger, + ForeignKey, UniqueConstraint, Index, func, +) +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from shared.db.base import Base + + +class ActorProfile(Base): + """AP identity for a user. Created when user chooses a username.""" + __tablename__ = "ap_actor_profiles" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + user_id: Mapped[int] = mapped_column( + Integer, ForeignKey("users.id", ondelete="CASCADE"), + unique=True, nullable=False, + ) + preferred_username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False) + display_name: Mapped[str | None] = mapped_column(String(255), nullable=True) + summary: Mapped[str | None] = mapped_column(Text, nullable=True) + public_key_pem: Mapped[str] = mapped_column(Text, nullable=False) + private_key_pem: Mapped[str] = mapped_column(Text, nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + + # Relationships + user = relationship("User", backref="actor_profile", uselist=False, lazy="selectin") + activities = relationship("APActivity", back_populates="actor_profile", lazy="dynamic") + followers = relationship("APFollower", back_populates="actor_profile", lazy="dynamic") + + __table_args__ = ( + Index("ix_ap_actor_user_id", "user_id", unique=True), + Index("ix_ap_actor_username", "preferred_username", unique=True), + ) + + def __repr__(self) -> str: + return f"" + + +class APActivity(Base): + """An ActivityPub activity (local or remote).""" + __tablename__ = "ap_activities" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) + activity_type: Mapped[str] = mapped_column(String(64), nullable=False) + actor_profile_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, + ) + object_type: Mapped[str | None] = mapped_column(String(64), nullable=True) + object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + published: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + signature: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + is_local: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True, server_default="true") + + # Link back to originating domain object (e.g. source_type='post', source_id=42) + source_type: Mapped[str | None] = mapped_column(String(64), nullable=True) + source_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + + # IPFS content-addressed copy of the activity + ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True) + + # Anchoring (filled later when batched into a merkle tree) + anchor_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("ap_anchors.id", ondelete="SET NULL"), nullable=True, + ) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + + # Relationships + actor_profile = relationship("ActorProfile", back_populates="activities") + + __table_args__ = ( + Index("ix_ap_activity_actor", "actor_profile_id"), + Index("ix_ap_activity_source", "source_type", "source_id"), + Index("ix_ap_activity_published", "published"), + ) + + def __repr__(self) -> str: + return f"" + + +class APFollower(Base): + """A remote follower of a local actor.""" + __tablename__ = "ap_followers" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + actor_profile_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, + ) + follower_acct: Mapped[str] = mapped_column(String(512), nullable=False) + follower_inbox: Mapped[str] = mapped_column(String(512), nullable=False) + follower_actor_url: Mapped[str] = mapped_column(String(512), nullable=False) + follower_public_key: Mapped[str | None] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + + # Relationships + actor_profile = relationship("ActorProfile", back_populates="followers") + + __table_args__ = ( + UniqueConstraint("actor_profile_id", "follower_acct", name="uq_follower_acct"), + Index("ix_ap_follower_actor", "actor_profile_id"), + ) + + def __repr__(self) -> str: + return f"" + + +class APInboxItem(Base): + """Raw incoming AP activity, stored for async processing.""" + __tablename__ = "ap_inbox_items" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + actor_profile_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, + ) + raw_json: Mapped[dict] = mapped_column(JSONB, nullable=False) + activity_type: Mapped[str | None] = mapped_column(String(64), nullable=True) + from_actor: Mapped[str | None] = mapped_column(String(512), nullable=True) + state: Mapped[str] = mapped_column( + String(20), nullable=False, default="pending", server_default="pending", + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + + __table_args__ = ( + Index("ix_ap_inbox_state", "state"), + Index("ix_ap_inbox_actor", "actor_profile_id"), + ) + + def __repr__(self) -> str: + return f"" + + +class APAnchor(Base): + """OpenTimestamps anchoring batch — merkle tree of activities.""" + __tablename__ = "ap_anchors" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + merkle_root: Mapped[str] = mapped_column(String(128), nullable=False) + tree_ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True) + ots_proof_cid: Mapped[str | None] = mapped_column(String(128), nullable=True) + activity_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + confirmed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + bitcoin_txid: Mapped[str | None] = mapped_column(String(128), nullable=True) + + def __repr__(self) -> str: + return f"" + + +class IPFSPin(Base): + """Tracks content stored on IPFS — used by all domains.""" + __tablename__ = "ipfs_pins" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + content_hash: Mapped[str] = mapped_column(String(128), nullable=False) + ipfs_cid: Mapped[str] = mapped_column(String(128), nullable=False, unique=True) + pin_type: Mapped[str] = mapped_column(String(64), nullable=False) + source_type: Mapped[str | None] = mapped_column(String(64), nullable=True) + source_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + size_bytes: Mapped[int | None] = mapped_column(BigInteger, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + + __table_args__ = ( + Index("ix_ipfs_pin_source", "source_type", "source_id"), + Index("ix_ipfs_pin_cid", "ipfs_cid", unique=True), + ) + + def __repr__(self) -> str: + return f"" diff --git a/requirements.txt b/requirements.txt index 6672849..8d247d0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ blinker==1.9.0 Brotli==1.1.0 certifi==2025.10.5 click==8.3.0 +cryptography>=41.0 exceptiongroup==1.3.0 Flask==3.1.2 greenlet==3.2.4 diff --git a/services/federation_impl.py b/services/federation_impl.py new file mode 100644 index 0000000..cc197ec --- /dev/null +++ b/services/federation_impl.py @@ -0,0 +1,304 @@ +"""SQL-backed FederationService implementation. + +Queries ``shared.models.federation`` — only this module may read/write +federation-domain tables on behalf of other domains. +""" +from __future__ import annotations + +import os +import uuid +from datetime import datetime, timezone + +from sqlalchemy import select, func, delete +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.models.federation import ActorProfile, APActivity, APFollower +from shared.contracts.dtos import ActorProfileDTO, APActivityDTO, APFollowerDTO + + +def _domain() -> str: + return os.getenv("AP_DOMAIN", "rose-ash.com") + + +def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO: + domain = _domain() + username = actor.preferred_username + return ActorProfileDTO( + id=actor.id, + user_id=actor.user_id, + preferred_username=username, + public_key_pem=actor.public_key_pem, + display_name=actor.display_name, + summary=actor.summary, + inbox_url=f"https://{domain}/users/{username}/inbox", + outbox_url=f"https://{domain}/users/{username}/outbox", + created_at=actor.created_at, + ) + + +def _activity_to_dto(a: APActivity) -> APActivityDTO: + return APActivityDTO( + id=a.id, + activity_id=a.activity_id, + activity_type=a.activity_type, + actor_profile_id=a.actor_profile_id, + object_type=a.object_type, + object_data=a.object_data, + published=a.published, + is_local=a.is_local, + source_type=a.source_type, + source_id=a.source_id, + ipfs_cid=a.ipfs_cid, + ) + + +def _follower_to_dto(f: APFollower) -> APFollowerDTO: + return APFollowerDTO( + id=f.id, + actor_profile_id=f.actor_profile_id, + follower_acct=f.follower_acct, + follower_inbox=f.follower_inbox, + follower_actor_url=f.follower_actor_url, + created_at=f.created_at, + ) + + +class SqlFederationService: + # -- Actor management ----------------------------------------------------- + + async def get_actor_by_username( + self, session: AsyncSession, username: str, + ) -> ActorProfileDTO | None: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == username) + ) + ).scalar_one_or_none() + return _actor_to_dto(actor) if actor else None + + async def get_actor_by_user_id( + self, session: AsyncSession, user_id: int, + ) -> ActorProfileDTO | None: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.user_id == user_id) + ) + ).scalar_one_or_none() + return _actor_to_dto(actor) if actor else None + + async def create_actor( + self, session: AsyncSession, user_id: int, preferred_username: str, + display_name: str | None = None, summary: str | None = None, + ) -> ActorProfileDTO: + from shared.utils.http_signatures import generate_rsa_keypair + + private_pem, public_pem = generate_rsa_keypair() + + actor = ActorProfile( + user_id=user_id, + preferred_username=preferred_username, + display_name=display_name, + summary=summary, + public_key_pem=public_pem, + private_key_pem=private_pem, + ) + session.add(actor) + await session.flush() + return _actor_to_dto(actor) + + async def username_available( + self, session: AsyncSession, username: str, + ) -> bool: + count = ( + await session.execute( + select(func.count(ActorProfile.id)).where( + ActorProfile.preferred_username == username + ) + ) + ).scalar() or 0 + return count == 0 + + # -- Publishing ----------------------------------------------------------- + + async def publish_activity( + self, session: AsyncSession, *, + actor_user_id: int, + activity_type: str, + object_type: str, + object_data: dict, + source_type: str | None = None, + source_id: int | None = None, + ) -> APActivityDTO: + # Look up actor + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.user_id == actor_user_id) + ) + ).scalar_one_or_none() + if actor is None: + raise ValueError(f"No ActorProfile for user_id={actor_user_id}") + + domain = _domain() + username = actor.preferred_username + activity_uri = f"https://{domain}/users/{username}/activities/{uuid.uuid4()}" + + now = datetime.now(timezone.utc) + + activity = APActivity( + activity_id=activity_uri, + activity_type=activity_type, + actor_profile_id=actor.id, + object_type=object_type, + object_data=object_data, + published=now, + is_local=True, + source_type=source_type, + source_id=source_id, + ) + session.add(activity) + await session.flush() + + # Emit domain event for downstream processing (IPFS storage, delivery) + from shared.events import emit_event + await emit_event( + session, + "federation.activity_created", + "APActivity", + activity.id, + { + "activity_id": activity.activity_id, + "activity_type": activity_type, + "actor_username": username, + "object_type": object_type, + }, + ) + + return _activity_to_dto(activity) + + # -- Queries -------------------------------------------------------------- + + async def get_activity( + self, session: AsyncSession, activity_id: str, + ) -> APActivityDTO | None: + a = ( + await session.execute( + select(APActivity).where(APActivity.activity_id == activity_id) + ) + ).scalar_one_or_none() + return _activity_to_dto(a) if a else None + + async def get_outbox( + self, session: AsyncSession, username: str, + page: int = 1, per_page: int = 20, + ) -> tuple[list[APActivityDTO], int]: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == username) + ) + ).scalar_one_or_none() + if actor is None: + return [], 0 + + total = ( + await session.execute( + select(func.count(APActivity.id)).where( + APActivity.actor_profile_id == actor.id, + APActivity.is_local == True, # noqa: E712 + ) + ) + ).scalar() or 0 + + offset = (page - 1) * per_page + result = await session.execute( + select(APActivity) + .where( + APActivity.actor_profile_id == actor.id, + APActivity.is_local == True, # noqa: E712 + ) + .order_by(APActivity.published.desc()) + .limit(per_page) + .offset(offset) + ) + return [_activity_to_dto(a) for a in result.scalars().all()], total + + async def get_activity_for_source( + self, session: AsyncSession, source_type: str, source_id: int, + ) -> APActivityDTO | None: + a = ( + await session.execute( + select(APActivity).where( + APActivity.source_type == source_type, + APActivity.source_id == source_id, + ).order_by(APActivity.created_at.desc()) + ) + ).scalar_one_or_none() + return _activity_to_dto(a) if a else None + + # -- Followers ------------------------------------------------------------ + + async def get_followers( + self, session: AsyncSession, username: str, + ) -> list[APFollowerDTO]: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == username) + ) + ).scalar_one_or_none() + if actor is None: + return [] + + result = await session.execute( + select(APFollower).where(APFollower.actor_profile_id == actor.id) + ) + return [_follower_to_dto(f) for f in result.scalars().all()] + + async def add_follower( + self, session: AsyncSession, username: str, + follower_acct: str, follower_inbox: str, follower_actor_url: str, + follower_public_key: str | None = None, + ) -> APFollowerDTO: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == username) + ) + ).scalar_one_or_none() + if actor is None: + raise ValueError(f"Actor not found: {username}") + + follower = APFollower( + actor_profile_id=actor.id, + follower_acct=follower_acct, + follower_inbox=follower_inbox, + follower_actor_url=follower_actor_url, + follower_public_key=follower_public_key, + ) + session.add(follower) + await session.flush() + return _follower_to_dto(follower) + + async def remove_follower( + self, session: AsyncSession, username: str, follower_acct: str, + ) -> bool: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == username) + ) + ).scalar_one_or_none() + if actor is None: + return False + + result = await session.execute( + delete(APFollower).where( + APFollower.actor_profile_id == actor.id, + APFollower.follower_acct == follower_acct, + ) + ) + return result.rowcount > 0 + + # -- Stats ---------------------------------------------------------------- + + async def get_stats(self, session: AsyncSession) -> dict: + actors = (await session.execute(select(func.count(ActorProfile.id)))).scalar() or 0 + activities = (await session.execute(select(func.count(APActivity.id)))).scalar() or 0 + followers = (await session.execute(select(func.count(APFollower.id)))).scalar() or 0 + return {"actors": actors, "activities": activities, "followers": followers} diff --git a/services/registry.py b/services/registry.py index 83dc0e8..23d559b 100644 --- a/services/registry.py +++ b/services/registry.py @@ -21,6 +21,7 @@ from shared.contracts.protocols import ( CalendarService, MarketService, CartService, + FederationService, ) @@ -37,6 +38,7 @@ class _ServiceRegistry: self._calendar: CalendarService | None = None self._market: MarketService | None = None self._cart: CartService | None = None + self._federation: FederationService | None = None # -- blog ----------------------------------------------------------------- @property @@ -82,6 +84,17 @@ class _ServiceRegistry: def cart(self, impl: CartService) -> None: self._cart = impl + # -- federation ----------------------------------------------------------- + @property + def federation(self) -> FederationService: + if self._federation is None: + raise RuntimeError("FederationService not registered") + return self._federation + + @federation.setter + def federation(self, impl: FederationService) -> None: + self._federation = impl + # -- introspection -------------------------------------------------------- def has(self, name: str) -> bool: """Check whether a domain service is registered.""" diff --git a/services/stubs.py b/services/stubs.py index c37ebb7..4a02fc9 100644 --- a/services/stubs.py +++ b/services/stubs.py @@ -18,6 +18,9 @@ from shared.contracts.dtos import ( ProductDTO, CartItemDTO, CartSummaryDTO, + ActorProfileDTO, + APActivityDTO, + APFollowerDTO, ) @@ -182,3 +185,47 @@ class StubCartService: self, session: AsyncSession, user_id: int, session_id: str, ) -> None: pass + + +class StubFederationService: + """No-op federation stub for apps that don't own federation.""" + + async def get_actor_by_username(self, session, username): + return None + + async def get_actor_by_user_id(self, session, user_id): + return None + + async def create_actor(self, session, user_id, preferred_username, + display_name=None, summary=None): + raise RuntimeError("FederationService not available") + + async def username_available(self, session, username): + return False + + async def publish_activity(self, session, *, actor_user_id, activity_type, + object_type, object_data, source_type=None, + source_id=None): + return None + + async def get_activity(self, session, activity_id): + return None + + async def get_outbox(self, session, username, page=1, per_page=20): + return [], 0 + + async def get_activity_for_source(self, session, source_type, source_id): + return None + + async def get_followers(self, session, username): + return [] + + async def add_follower(self, session, username, follower_acct, follower_inbox, + follower_actor_url, follower_public_key=None): + raise RuntimeError("FederationService not available") + + async def remove_follower(self, session, username, follower_acct): + return False + + async def get_stats(self, session): + return {"actors": 0, "activities": 0, "followers": 0} diff --git a/utils/http_signatures.py b/utils/http_signatures.py new file mode 100644 index 0000000..942fcc1 --- /dev/null +++ b/utils/http_signatures.py @@ -0,0 +1,181 @@ +"""RSA key generation and HTTP Signature signing/verification. + +Keys are stored in DB (ActorProfile), not the filesystem. +Ported from ~/art-dag/activity-pub/keys.py. +""" +from __future__ import annotations + +import base64 +import hashlib +import json +from datetime import datetime, timezone + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa, padding + + +def generate_rsa_keypair() -> tuple[str, str]: + """Generate an RSA-2048 keypair. + + Returns: + (private_pem, public_pem) as UTF-8 strings. + """ + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + ) + + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + + public_pem = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode() + + return private_pem, public_pem + + +def sign_request( + private_key_pem: str, + key_id: str, + method: str, + path: str, + host: str, + body: bytes | None = None, + date: str | None = None, +) -> dict[str, str]: + """Build HTTP Signature headers for an outgoing request. + + Returns a dict of headers to merge into the request: + ``{"Signature": ..., "Date": ..., "Digest": ..., "Host": ...}`` + """ + if date is None: + date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT") + + headers_to_sign = [ + f"(request-target): {method.lower()} {path}", + f"host: {host}", + f"date: {date}", + ] + + out_headers: dict[str, str] = { + "Host": host, + "Date": date, + } + + if body is not None: + digest = base64.b64encode(hashlib.sha256(body).digest()).decode() + digest_header = f"SHA-256={digest}" + headers_to_sign.append(f"digest: {digest_header}") + out_headers["Digest"] = digest_header + + signed_string = "\n".join(headers_to_sign) + header_names = " ".join( + h.split(":")[0] for h in headers_to_sign + ) + + private_key = serialization.load_pem_private_key( + private_key_pem.encode(), password=None, + ) + signature_bytes = private_key.sign( + signed_string.encode(), + padding.PKCS1v15(), + hashes.SHA256(), + ) + signature_b64 = base64.b64encode(signature_bytes).decode() + + out_headers["Signature"] = ( + f'keyId="{key_id}",' + f'headers="{header_names}",' + f'signature="{signature_b64}",' + f'algorithm="rsa-sha256"' + ) + + return out_headers + + +def verify_request_signature( + public_key_pem: str, + signature_header: str, + method: str, + path: str, + headers: dict[str, str], +) -> bool: + """Verify an incoming HTTP Signature. + + Args: + public_key_pem: PEM-encoded public key of the sender. + signature_header: Value of the ``Signature`` header. + method: HTTP method (GET, POST, etc.). + path: Request path (e.g. ``/users/alice/inbox``). + headers: All request headers (case-insensitive keys). + + Returns: + True if the signature is valid. + """ + # Parse Signature header + parts: dict[str, str] = {} + for part in signature_header.split(","): + part = part.strip() + eq = part.index("=") + key = part[:eq] + val = part[eq + 1:].strip('"') + parts[key] = val + + signed_headers = parts.get("headers", "date").split() + signature_b64 = parts.get("signature", "") + + # Reconstruct the signed string + lines: list[str] = [] + # Normalize header lookup to lowercase + lc_headers = {k.lower(): v for k, v in headers.items()} + for h in signed_headers: + if h == "(request-target)": + lines.append(f"(request-target): {method.lower()} {path}") + else: + val = lc_headers.get(h, "") + lines.append(f"{h}: {val}") + + signed_string = "\n".join(lines) + + public_key = serialization.load_pem_public_key(public_key_pem.encode()) + try: + public_key.verify( + base64.b64decode(signature_b64), + signed_string.encode(), + padding.PKCS1v15(), + hashes.SHA256(), + ) + return True + except Exception: + return False + + +def create_ld_signature( + private_key_pem: str, + key_id: str, + activity: dict, +) -> dict: + """Create an RsaSignature2017 Linked Data signature for an activity.""" + canonical = json.dumps(activity, sort_keys=True, separators=(",", ":")) + + private_key = serialization.load_pem_private_key( + private_key_pem.encode(), password=None, + ) + signature_bytes = private_key.sign( + canonical.encode(), + padding.PKCS1v15(), + hashes.SHA256(), + ) + signature_b64 = base64.b64encode(signature_bytes).decode() + + return { + "type": "RsaSignature2017", + "creator": key_id, + "created": datetime.now(timezone.utc).isoformat(), + "signatureValue": signature_b64, + } diff --git a/utils/ipfs_client.py b/utils/ipfs_client.py new file mode 100644 index 0000000..4945bfe --- /dev/null +++ b/utils/ipfs_client.py @@ -0,0 +1,141 @@ +"""Async IPFS client for content-addressed storage. + +All content can be stored on IPFS — blog posts, products, activities, etc. +Ported from ~/art-dag/activity-pub/ipfs_client.py (converted to async httpx). + +Config via environment: + IPFS_API — multiaddr or URL (default: /ip4/127.0.0.1/tcp/5001) + IPFS_TIMEOUT — request timeout in seconds (default: 60) + IPFS_GATEWAY_URL — public gateway for CID links (default: https://ipfs.io) +""" +from __future__ import annotations + +import json +import logging +import os +import re + +import httpx + +logger = logging.getLogger(__name__) + + +class IPFSError(Exception): + """Raised when an IPFS operation fails.""" + + +# -- Config ------------------------------------------------------------------ + +IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001") +IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "60")) +IPFS_GATEWAY_URL = os.getenv("IPFS_GATEWAY_URL", "https://ipfs.io") + + +def _multiaddr_to_url(multiaddr: str) -> str: + """Convert IPFS multiaddr to HTTP URL.""" + dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr) + if dns_match: + return f"http://{dns_match.group(1)}:{dns_match.group(2)}" + + ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr) + if ip4_match: + return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}" + + if multiaddr.startswith("http"): + return multiaddr + return "http://127.0.0.1:5001" + + +IPFS_BASE_URL = _multiaddr_to_url(IPFS_API) + + +# -- Async client functions -------------------------------------------------- + +async def add_bytes(data: bytes, *, pin: bool = True) -> str: + """Add raw bytes to IPFS. + + Returns the CID. + """ + try: + async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client: + resp = await client.post( + f"{IPFS_BASE_URL}/api/v0/add", + params={"pin": str(pin).lower()}, + files={"file": ("data", data)}, + ) + resp.raise_for_status() + cid = resp.json()["Hash"] + logger.info("Added to IPFS: %d bytes -> %s", len(data), cid) + return cid + except Exception as e: + logger.error("Failed to add bytes to IPFS: %s", e) + raise IPFSError(f"Failed to add bytes: {e}") from e + + +async def add_json(data: dict) -> str: + """Serialize dict to sorted JSON and add to IPFS.""" + json_bytes = json.dumps(data, indent=2, sort_keys=True).encode("utf-8") + return await add_bytes(json_bytes, pin=True) + + +async def get_bytes(cid: str) -> bytes | None: + """Fetch content from IPFS by CID.""" + try: + async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client: + resp = await client.post( + f"{IPFS_BASE_URL}/api/v0/cat", + params={"arg": cid}, + ) + resp.raise_for_status() + logger.info("Retrieved from IPFS: %s (%d bytes)", cid, len(resp.content)) + return resp.content + except Exception as e: + logger.error("Failed to get from IPFS: %s", e) + return None + + +async def pin_cid(cid: str) -> bool: + """Pin a CID on this node.""" + try: + async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client: + resp = await client.post( + f"{IPFS_BASE_URL}/api/v0/pin/add", + params={"arg": cid}, + ) + resp.raise_for_status() + logger.info("Pinned on IPFS: %s", cid) + return True + except Exception as e: + logger.error("Failed to pin on IPFS: %s", e) + return False + + +async def unpin_cid(cid: str) -> bool: + """Unpin a CID from this node.""" + try: + async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client: + resp = await client.post( + f"{IPFS_BASE_URL}/api/v0/pin/rm", + params={"arg": cid}, + ) + resp.raise_for_status() + logger.info("Unpinned from IPFS: %s", cid) + return True + except Exception as e: + logger.error("Failed to unpin from IPFS: %s", e) + return False + + +async def is_available() -> bool: + """Check if IPFS daemon is reachable.""" + try: + async with httpx.AsyncClient(timeout=5) as client: + resp = await client.post(f"{IPFS_BASE_URL}/api/v0/id") + return resp.status_code == 200 + except Exception: + return False + + +def gateway_url(cid: str) -> str: + """Return a public gateway URL for a CID.""" + return f"{IPFS_GATEWAY_URL}/ipfs/{cid}"