Add federation/ActivityPub models, contracts, and services

Phase 0+1 of ActivityPub integration:
- 6 ORM models (ActorProfile, APActivity, APFollower, APInboxItem, APAnchor, IPFSPin)
- FederationService protocol + SqlFederationService implementation + stub
- 4 DTOs (ActorProfileDTO, APActivityDTO, APFollowerDTO, APAnchorDTO)
- Registry slot for federation service
- Alembic migration for federation tables
- IPFS async client (httpx-based)
- HTTP Signatures (RSA-2048 sign/verify)
- login_url() now uses AUTH_APP env var for flexible auth routing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-21 15:10:08 +00:00
parent 7abef48cf2
commit 8850a0106a
14 changed files with 1158 additions and 4 deletions

View File

@@ -19,7 +19,7 @@ from shared.db.base import Base
# Import ALL models so Base.metadata sees every table
import shared.models # noqa: F401 User, KV, MagicLink, MenuItem, Ghost*
for _mod in ("blog.models", "market.models", "cart.models", "events.models", "glue.models"):
for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models", "glue.models"):
try:
__import__(_mod)
except ImportError:

View File

@@ -0,0 +1,142 @@
"""add federation tables
Revision ID: k1i9f5g7h8
Revises: j0h8e4f6g7
Create Date: 2026-02-21
Creates:
- ap_actor_profiles — AP identity per user
- ap_activities — local + remote AP activities
- ap_followers — remote followers
- ap_inbox_items — raw incoming AP activities
- ap_anchors — OpenTimestamps merkle batches
- ipfs_pins — IPFS content tracking (platform-wide)
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision = "k1i9f5g7h8"
down_revision = "j0h8e4f6g7"
branch_labels = None
depends_on = None
def upgrade() -> None:
# -- ap_anchors (referenced by ap_activities) ----------------------------
op.create_table(
"ap_anchors",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("merkle_root", sa.String(128), nullable=False),
sa.Column("tree_ipfs_cid", sa.String(128), nullable=True),
sa.Column("ots_proof_cid", sa.String(128), nullable=True),
sa.Column("activity_count", sa.Integer(), nullable=False, server_default="0"),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.Column("confirmed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("bitcoin_txid", sa.String(128), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
# -- ap_actor_profiles ---------------------------------------------------
op.create_table(
"ap_actor_profiles",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column("preferred_username", sa.String(64), nullable=False),
sa.Column("display_name", sa.String(255), nullable=True),
sa.Column("summary", sa.Text(), nullable=True),
sa.Column("public_key_pem", sa.Text(), nullable=False),
sa.Column("private_key_pem", sa.Text(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("preferred_username"),
sa.UniqueConstraint("user_id"),
)
op.create_index("ix_ap_actor_user_id", "ap_actor_profiles", ["user_id"], unique=True)
op.create_index("ix_ap_actor_username", "ap_actor_profiles", ["preferred_username"], unique=True)
# -- ap_activities -------------------------------------------------------
op.create_table(
"ap_activities",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("activity_id", sa.String(512), nullable=False),
sa.Column("activity_type", sa.String(64), nullable=False),
sa.Column("actor_profile_id", sa.Integer(), nullable=False),
sa.Column("object_type", sa.String(64), nullable=True),
sa.Column("object_data", postgresql.JSONB(), nullable=True),
sa.Column("published", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.Column("signature", postgresql.JSONB(), nullable=True),
sa.Column("is_local", sa.Boolean(), nullable=False, server_default="true"),
sa.Column("source_type", sa.String(64), nullable=True),
sa.Column("source_id", sa.Integer(), nullable=True),
sa.Column("ipfs_cid", sa.String(128), nullable=True),
sa.Column("anchor_id", sa.Integer(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.ForeignKeyConstraint(["actor_profile_id"], ["ap_actor_profiles.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["anchor_id"], ["ap_anchors.id"], ondelete="SET NULL"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("activity_id"),
)
op.create_index("ix_ap_activity_actor", "ap_activities", ["actor_profile_id"])
op.create_index("ix_ap_activity_source", "ap_activities", ["source_type", "source_id"])
op.create_index("ix_ap_activity_published", "ap_activities", ["published"])
# -- ap_followers --------------------------------------------------------
op.create_table(
"ap_followers",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("actor_profile_id", sa.Integer(), nullable=False),
sa.Column("follower_acct", sa.String(512), nullable=False),
sa.Column("follower_inbox", sa.String(512), nullable=False),
sa.Column("follower_actor_url", sa.String(512), nullable=False),
sa.Column("follower_public_key", sa.Text(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.ForeignKeyConstraint(["actor_profile_id"], ["ap_actor_profiles.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("actor_profile_id", "follower_acct", name="uq_follower_acct"),
)
op.create_index("ix_ap_follower_actor", "ap_followers", ["actor_profile_id"])
# -- ap_inbox_items ------------------------------------------------------
op.create_table(
"ap_inbox_items",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("actor_profile_id", sa.Integer(), nullable=False),
sa.Column("raw_json", postgresql.JSONB(), nullable=False),
sa.Column("activity_type", sa.String(64), nullable=True),
sa.Column("from_actor", sa.String(512), nullable=True),
sa.Column("state", sa.String(20), nullable=False, server_default="pending"),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.Column("processed_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(["actor_profile_id"], ["ap_actor_profiles.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_ap_inbox_state", "ap_inbox_items", ["state"])
op.create_index("ix_ap_inbox_actor", "ap_inbox_items", ["actor_profile_id"])
# -- ipfs_pins -----------------------------------------------------------
op.create_table(
"ipfs_pins",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("content_hash", sa.String(128), nullable=False),
sa.Column("ipfs_cid", sa.String(128), nullable=False),
sa.Column("pin_type", sa.String(64), nullable=False),
sa.Column("source_type", sa.String(64), nullable=True),
sa.Column("source_id", sa.Integer(), nullable=True),
sa.Column("size_bytes", sa.BigInteger(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("ipfs_cid"),
)
op.create_index("ix_ipfs_pin_source", "ipfs_pins", ["source_type", "source_id"])
op.create_index("ix_ipfs_pin_cid", "ipfs_pins", ["ipfs_cid"], unique=True)
def downgrade() -> None:
op.drop_table("ipfs_pins")
op.drop_table("ap_inbox_items")
op.drop_table("ap_followers")
op.drop_table("ap_activities")
op.drop_table("ap_actor_profiles")
op.drop_table("ap_anchors")

View File

@@ -134,3 +134,56 @@ class CartSummaryDTO:
items: list[CartItemDTO] = field(default_factory=list)
ticket_count: int = 0
ticket_total: Decimal = Decimal("0")
# ---------------------------------------------------------------------------
# Federation / ActivityPub domain
# ---------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class ActorProfileDTO:
id: int
user_id: int
preferred_username: str
public_key_pem: str
display_name: str | None = None
summary: str | None = None
inbox_url: str | None = None
outbox_url: str | None = None
created_at: datetime | None = None
@dataclass(frozen=True, slots=True)
class APActivityDTO:
id: int
activity_id: str
activity_type: str
actor_profile_id: int
object_type: str | None = None
object_data: dict | None = None
published: datetime | None = None
is_local: bool = True
source_type: str | None = None
source_id: int | None = None
ipfs_cid: str | None = None
@dataclass(frozen=True, slots=True)
class APFollowerDTO:
id: int
actor_profile_id: int
follower_acct: str
follower_inbox: str
follower_actor_url: str
created_at: datetime | None = None
@dataclass(frozen=True, slots=True)
class APAnchorDTO:
id: int
merkle_root: str
activity_count: int = 0
tree_ipfs_cid: str | None = None
ots_proof_cid: str | None = None
confirmed_at: datetime | None = None
bitcoin_txid: str | None = None

View File

@@ -19,6 +19,9 @@ from .dtos import (
ProductDTO,
CartItemDTO,
CartSummaryDTO,
ActorProfileDTO,
APActivityDTO,
APFollowerDTO,
)
@@ -162,3 +165,67 @@ class CartService(Protocol):
async def adopt_cart_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None: ...
@runtime_checkable
class FederationService(Protocol):
# -- Actor management -----------------------------------------------------
async def get_actor_by_username(
self, session: AsyncSession, username: str,
) -> ActorProfileDTO | None: ...
async def get_actor_by_user_id(
self, session: AsyncSession, user_id: int,
) -> ActorProfileDTO | None: ...
async def create_actor(
self, session: AsyncSession, user_id: int, preferred_username: str,
display_name: str | None = None, summary: str | None = None,
) -> ActorProfileDTO: ...
async def username_available(
self, session: AsyncSession, username: str,
) -> bool: ...
# -- Publishing (core cross-domain API) -----------------------------------
async def publish_activity(
self, session: AsyncSession, *,
actor_user_id: int,
activity_type: str,
object_type: str,
object_data: dict,
source_type: str | None = None,
source_id: int | None = None,
) -> APActivityDTO: ...
# -- Queries --------------------------------------------------------------
async def get_activity(
self, session: AsyncSession, activity_id: str,
) -> APActivityDTO | None: ...
async def get_outbox(
self, session: AsyncSession, username: str,
page: int = 1, per_page: int = 20,
) -> tuple[list[APActivityDTO], int]: ...
async def get_activity_for_source(
self, session: AsyncSession, source_type: str, source_id: int,
) -> APActivityDTO | None: ...
# -- Followers ------------------------------------------------------------
async def get_followers(
self, session: AsyncSession, username: str,
) -> list[APFollowerDTO]: ...
async def add_follower(
self, session: AsyncSession, username: str,
follower_acct: str, follower_inbox: str, follower_actor_url: str,
follower_public_key: str | None = None,
) -> APFollowerDTO: ...
async def remove_follower(
self, session: AsyncSession, username: str, follower_acct: str,
) -> bool: ...
# -- Stats ----------------------------------------------------------------
async def get_stats(self, session: AsyncSession) -> dict: ...

View File

@@ -11,7 +11,7 @@ from shared.config import init_config, config, pretty
from shared.models import KV # ensure shared models imported
# Register all app model classes with SQLAlchemy so cross-domain
# relationship() string references resolve correctly.
for _mod in ("blog.models", "market.models", "cart.models", "events.models"):
for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models"):
try:
__import__(_mod)
except ImportError:

View File

@@ -37,6 +37,10 @@ def events_url(path: str = "/") -> str:
return app_url("events", path)
def federation_url(path: str = "/") -> str:
return app_url("federation", path)
def page_cart_url(page_slug: str, path: str = "/") -> str:
if not path.startswith("/"):
path = "/" + path
@@ -62,6 +66,9 @@ def market_product_url(product_slug: str, suffix: str = "", market_place=None) -
def login_url(next_url: str = "") -> str:
# Auth lives in blog (coop) for now. Set AUTH_APP=federation to switch.
auth_app = os.getenv("AUTH_APP", "coop")
base = app_url(auth_app, "/auth/login/")
if next_url:
return coop_url(f"/auth/login/?next={quote(next_url, safe='')}")
return coop_url("/auth/login/")
return f"{base}?next={quote(next_url, safe='')}"
return base

View File

@@ -27,3 +27,6 @@ from .calendars import (
)
from .container_relation import ContainerRelation
from .menu_node import MenuNode
from .federation import (
ActorProfile, APActivity, APFollower, APInboxItem, APAnchor, IPFSPin,
)

195
models/federation.py Normal file
View File

@@ -0,0 +1,195 @@
"""Federation / ActivityPub ORM models.
These models support AP identity, activities, followers, inbox processing,
IPFS content addressing, and OpenTimestamps anchoring.
"""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import (
String, Integer, DateTime, Text, Boolean, BigInteger,
ForeignKey, UniqueConstraint, Index, func,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from shared.db.base import Base
class ActorProfile(Base):
"""AP identity for a user. Created when user chooses a username."""
__tablename__ = "ap_actor_profiles"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
user_id: Mapped[int] = mapped_column(
Integer, ForeignKey("users.id", ondelete="CASCADE"),
unique=True, nullable=False,
)
preferred_username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
display_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
summary: Mapped[str | None] = mapped_column(Text, nullable=True)
public_key_pem: Mapped[str] = mapped_column(Text, nullable=False)
private_key_pem: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# Relationships
user = relationship("User", backref="actor_profile", uselist=False, lazy="selectin")
activities = relationship("APActivity", back_populates="actor_profile", lazy="dynamic")
followers = relationship("APFollower", back_populates="actor_profile", lazy="dynamic")
__table_args__ = (
Index("ix_ap_actor_user_id", "user_id", unique=True),
Index("ix_ap_actor_username", "preferred_username", unique=True),
)
def __repr__(self) -> str:
return f"<ActorProfile {self.id} @{self.preferred_username}>"
class APActivity(Base):
"""An ActivityPub activity (local or remote)."""
__tablename__ = "ap_activities"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
activity_type: Mapped[str] = mapped_column(String(64), nullable=False)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
object_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
published: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
signature: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
is_local: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True, server_default="true")
# Link back to originating domain object (e.g. source_type='post', source_id=42)
source_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
source_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
# IPFS content-addressed copy of the activity
ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True)
# Anchoring (filled later when batched into a merkle tree)
anchor_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_anchors.id", ondelete="SET NULL"), nullable=True,
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# Relationships
actor_profile = relationship("ActorProfile", back_populates="activities")
__table_args__ = (
Index("ix_ap_activity_actor", "actor_profile_id"),
Index("ix_ap_activity_source", "source_type", "source_id"),
Index("ix_ap_activity_published", "published"),
)
def __repr__(self) -> str:
return f"<APActivity {self.id} {self.activity_type}>"
class APFollower(Base):
"""A remote follower of a local actor."""
__tablename__ = "ap_followers"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
follower_acct: Mapped[str] = mapped_column(String(512), nullable=False)
follower_inbox: Mapped[str] = mapped_column(String(512), nullable=False)
follower_actor_url: Mapped[str] = mapped_column(String(512), nullable=False)
follower_public_key: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# Relationships
actor_profile = relationship("ActorProfile", back_populates="followers")
__table_args__ = (
UniqueConstraint("actor_profile_id", "follower_acct", name="uq_follower_acct"),
Index("ix_ap_follower_actor", "actor_profile_id"),
)
def __repr__(self) -> str:
return f"<APFollower {self.id} {self.follower_acct}>"
class APInboxItem(Base):
"""Raw incoming AP activity, stored for async processing."""
__tablename__ = "ap_inbox_items"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
raw_json: Mapped[dict] = mapped_column(JSONB, nullable=False)
activity_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
from_actor: Mapped[str | None] = mapped_column(String(512), nullable=True)
state: Mapped[str] = mapped_column(
String(20), nullable=False, default="pending", server_default="pending",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
__table_args__ = (
Index("ix_ap_inbox_state", "state"),
Index("ix_ap_inbox_actor", "actor_profile_id"),
)
def __repr__(self) -> str:
return f"<APInboxItem {self.id} {self.activity_type} [{self.state}]>"
class APAnchor(Base):
"""OpenTimestamps anchoring batch — merkle tree of activities."""
__tablename__ = "ap_anchors"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
merkle_root: Mapped[str] = mapped_column(String(128), nullable=False)
tree_ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True)
ots_proof_cid: Mapped[str | None] = mapped_column(String(128), nullable=True)
activity_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
confirmed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
bitcoin_txid: Mapped[str | None] = mapped_column(String(128), nullable=True)
def __repr__(self) -> str:
return f"<APAnchor {self.id} activities={self.activity_count}>"
class IPFSPin(Base):
"""Tracks content stored on IPFS — used by all domains."""
__tablename__ = "ipfs_pins"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
content_hash: Mapped[str] = mapped_column(String(128), nullable=False)
ipfs_cid: Mapped[str] = mapped_column(String(128), nullable=False, unique=True)
pin_type: Mapped[str] = mapped_column(String(64), nullable=False)
source_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
source_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
size_bytes: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
__table_args__ = (
Index("ix_ipfs_pin_source", "source_type", "source_id"),
Index("ix_ipfs_pin_cid", "ipfs_cid", unique=True),
)
def __repr__(self) -> str:
return f"<IPFSPin {self.id} {self.ipfs_cid[:16]}...>"

View File

@@ -10,6 +10,7 @@ blinker==1.9.0
Brotli==1.1.0
certifi==2025.10.5
click==8.3.0
cryptography>=41.0
exceptiongroup==1.3.0
Flask==3.1.2
greenlet==3.2.4

304
services/federation_impl.py Normal file
View File

@@ -0,0 +1,304 @@
"""SQL-backed FederationService implementation.
Queries ``shared.models.federation`` — only this module may read/write
federation-domain tables on behalf of other domains.
"""
from __future__ import annotations
import os
import uuid
from datetime import datetime, timezone
from sqlalchemy import select, func, delete
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.federation import ActorProfile, APActivity, APFollower
from shared.contracts.dtos import ActorProfileDTO, APActivityDTO, APFollowerDTO
def _domain() -> str:
return os.getenv("AP_DOMAIN", "rose-ash.com")
def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO:
domain = _domain()
username = actor.preferred_username
return ActorProfileDTO(
id=actor.id,
user_id=actor.user_id,
preferred_username=username,
public_key_pem=actor.public_key_pem,
display_name=actor.display_name,
summary=actor.summary,
inbox_url=f"https://{domain}/users/{username}/inbox",
outbox_url=f"https://{domain}/users/{username}/outbox",
created_at=actor.created_at,
)
def _activity_to_dto(a: APActivity) -> APActivityDTO:
return APActivityDTO(
id=a.id,
activity_id=a.activity_id,
activity_type=a.activity_type,
actor_profile_id=a.actor_profile_id,
object_type=a.object_type,
object_data=a.object_data,
published=a.published,
is_local=a.is_local,
source_type=a.source_type,
source_id=a.source_id,
ipfs_cid=a.ipfs_cid,
)
def _follower_to_dto(f: APFollower) -> APFollowerDTO:
return APFollowerDTO(
id=f.id,
actor_profile_id=f.actor_profile_id,
follower_acct=f.follower_acct,
follower_inbox=f.follower_inbox,
follower_actor_url=f.follower_actor_url,
created_at=f.created_at,
)
class SqlFederationService:
# -- Actor management -----------------------------------------------------
async def get_actor_by_username(
self, session: AsyncSession, username: str,
) -> ActorProfileDTO | None:
actor = (
await session.execute(
select(ActorProfile).where(ActorProfile.preferred_username == username)
)
).scalar_one_or_none()
return _actor_to_dto(actor) if actor else None
async def get_actor_by_user_id(
self, session: AsyncSession, user_id: int,
) -> ActorProfileDTO | None:
actor = (
await session.execute(
select(ActorProfile).where(ActorProfile.user_id == user_id)
)
).scalar_one_or_none()
return _actor_to_dto(actor) if actor else None
async def create_actor(
self, session: AsyncSession, user_id: int, preferred_username: str,
display_name: str | None = None, summary: str | None = None,
) -> ActorProfileDTO:
from shared.utils.http_signatures import generate_rsa_keypair
private_pem, public_pem = generate_rsa_keypair()
actor = ActorProfile(
user_id=user_id,
preferred_username=preferred_username,
display_name=display_name,
summary=summary,
public_key_pem=public_pem,
private_key_pem=private_pem,
)
session.add(actor)
await session.flush()
return _actor_to_dto(actor)
async def username_available(
self, session: AsyncSession, username: str,
) -> bool:
count = (
await session.execute(
select(func.count(ActorProfile.id)).where(
ActorProfile.preferred_username == username
)
)
).scalar() or 0
return count == 0
# -- Publishing -----------------------------------------------------------
async def publish_activity(
self, session: AsyncSession, *,
actor_user_id: int,
activity_type: str,
object_type: str,
object_data: dict,
source_type: str | None = None,
source_id: int | None = None,
) -> APActivityDTO:
# Look up actor
actor = (
await session.execute(
select(ActorProfile).where(ActorProfile.user_id == actor_user_id)
)
).scalar_one_or_none()
if actor is None:
raise ValueError(f"No ActorProfile for user_id={actor_user_id}")
domain = _domain()
username = actor.preferred_username
activity_uri = f"https://{domain}/users/{username}/activities/{uuid.uuid4()}"
now = datetime.now(timezone.utc)
activity = APActivity(
activity_id=activity_uri,
activity_type=activity_type,
actor_profile_id=actor.id,
object_type=object_type,
object_data=object_data,
published=now,
is_local=True,
source_type=source_type,
source_id=source_id,
)
session.add(activity)
await session.flush()
# Emit domain event for downstream processing (IPFS storage, delivery)
from shared.events import emit_event
await emit_event(
session,
"federation.activity_created",
"APActivity",
activity.id,
{
"activity_id": activity.activity_id,
"activity_type": activity_type,
"actor_username": username,
"object_type": object_type,
},
)
return _activity_to_dto(activity)
# -- Queries --------------------------------------------------------------
async def get_activity(
self, session: AsyncSession, activity_id: str,
) -> APActivityDTO | None:
a = (
await session.execute(
select(APActivity).where(APActivity.activity_id == activity_id)
)
).scalar_one_or_none()
return _activity_to_dto(a) if a else None
async def get_outbox(
self, session: AsyncSession, username: str,
page: int = 1, per_page: int = 20,
) -> tuple[list[APActivityDTO], int]:
actor = (
await session.execute(
select(ActorProfile).where(ActorProfile.preferred_username == username)
)
).scalar_one_or_none()
if actor is None:
return [], 0
total = (
await session.execute(
select(func.count(APActivity.id)).where(
APActivity.actor_profile_id == actor.id,
APActivity.is_local == True, # noqa: E712
)
)
).scalar() or 0
offset = (page - 1) * per_page
result = await session.execute(
select(APActivity)
.where(
APActivity.actor_profile_id == actor.id,
APActivity.is_local == True, # noqa: E712
)
.order_by(APActivity.published.desc())
.limit(per_page)
.offset(offset)
)
return [_activity_to_dto(a) for a in result.scalars().all()], total
async def get_activity_for_source(
self, session: AsyncSession, source_type: str, source_id: int,
) -> APActivityDTO | None:
a = (
await session.execute(
select(APActivity).where(
APActivity.source_type == source_type,
APActivity.source_id == source_id,
).order_by(APActivity.created_at.desc())
)
).scalar_one_or_none()
return _activity_to_dto(a) if a else None
# -- Followers ------------------------------------------------------------
async def get_followers(
self, session: AsyncSession, username: str,
) -> list[APFollowerDTO]:
actor = (
await session.execute(
select(ActorProfile).where(ActorProfile.preferred_username == username)
)
).scalar_one_or_none()
if actor is None:
return []
result = await session.execute(
select(APFollower).where(APFollower.actor_profile_id == actor.id)
)
return [_follower_to_dto(f) for f in result.scalars().all()]
async def add_follower(
self, session: AsyncSession, username: str,
follower_acct: str, follower_inbox: str, follower_actor_url: str,
follower_public_key: str | None = None,
) -> APFollowerDTO:
actor = (
await session.execute(
select(ActorProfile).where(ActorProfile.preferred_username == username)
)
).scalar_one_or_none()
if actor is None:
raise ValueError(f"Actor not found: {username}")
follower = APFollower(
actor_profile_id=actor.id,
follower_acct=follower_acct,
follower_inbox=follower_inbox,
follower_actor_url=follower_actor_url,
follower_public_key=follower_public_key,
)
session.add(follower)
await session.flush()
return _follower_to_dto(follower)
async def remove_follower(
self, session: AsyncSession, username: str, follower_acct: str,
) -> bool:
actor = (
await session.execute(
select(ActorProfile).where(ActorProfile.preferred_username == username)
)
).scalar_one_or_none()
if actor is None:
return False
result = await session.execute(
delete(APFollower).where(
APFollower.actor_profile_id == actor.id,
APFollower.follower_acct == follower_acct,
)
)
return result.rowcount > 0
# -- Stats ----------------------------------------------------------------
async def get_stats(self, session: AsyncSession) -> dict:
actors = (await session.execute(select(func.count(ActorProfile.id)))).scalar() or 0
activities = (await session.execute(select(func.count(APActivity.id)))).scalar() or 0
followers = (await session.execute(select(func.count(APFollower.id)))).scalar() or 0
return {"actors": actors, "activities": activities, "followers": followers}

View File

@@ -21,6 +21,7 @@ from shared.contracts.protocols import (
CalendarService,
MarketService,
CartService,
FederationService,
)
@@ -37,6 +38,7 @@ class _ServiceRegistry:
self._calendar: CalendarService | None = None
self._market: MarketService | None = None
self._cart: CartService | None = None
self._federation: FederationService | None = None
# -- blog -----------------------------------------------------------------
@property
@@ -82,6 +84,17 @@ class _ServiceRegistry:
def cart(self, impl: CartService) -> None:
self._cart = impl
# -- federation -----------------------------------------------------------
@property
def federation(self) -> FederationService:
if self._federation is None:
raise RuntimeError("FederationService not registered")
return self._federation
@federation.setter
def federation(self, impl: FederationService) -> None:
self._federation = impl
# -- introspection --------------------------------------------------------
def has(self, name: str) -> bool:
"""Check whether a domain service is registered."""

View File

@@ -18,6 +18,9 @@ from shared.contracts.dtos import (
ProductDTO,
CartItemDTO,
CartSummaryDTO,
ActorProfileDTO,
APActivityDTO,
APFollowerDTO,
)
@@ -182,3 +185,47 @@ class StubCartService:
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
pass
class StubFederationService:
"""No-op federation stub for apps that don't own federation."""
async def get_actor_by_username(self, session, username):
return None
async def get_actor_by_user_id(self, session, user_id):
return None
async def create_actor(self, session, user_id, preferred_username,
display_name=None, summary=None):
raise RuntimeError("FederationService not available")
async def username_available(self, session, username):
return False
async def publish_activity(self, session, *, actor_user_id, activity_type,
object_type, object_data, source_type=None,
source_id=None):
return None
async def get_activity(self, session, activity_id):
return None
async def get_outbox(self, session, username, page=1, per_page=20):
return [], 0
async def get_activity_for_source(self, session, source_type, source_id):
return None
async def get_followers(self, session, username):
return []
async def add_follower(self, session, username, follower_acct, follower_inbox,
follower_actor_url, follower_public_key=None):
raise RuntimeError("FederationService not available")
async def remove_follower(self, session, username, follower_acct):
return False
async def get_stats(self, session):
return {"actors": 0, "activities": 0, "followers": 0}

181
utils/http_signatures.py Normal file
View File

@@ -0,0 +1,181 @@
"""RSA key generation and HTTP Signature signing/verification.
Keys are stored in DB (ActorProfile), not the filesystem.
Ported from ~/art-dag/activity-pub/keys.py.
"""
from __future__ import annotations
import base64
import hashlib
import json
from datetime import datetime, timezone
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
def generate_rsa_keypair() -> tuple[str, str]:
"""Generate an RSA-2048 keypair.
Returns:
(private_pem, public_pem) as UTF-8 strings.
"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
return private_pem, public_pem
def sign_request(
private_key_pem: str,
key_id: str,
method: str,
path: str,
host: str,
body: bytes | None = None,
date: str | None = None,
) -> dict[str, str]:
"""Build HTTP Signature headers for an outgoing request.
Returns a dict of headers to merge into the request:
``{"Signature": ..., "Date": ..., "Digest": ..., "Host": ...}``
"""
if date is None:
date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT")
headers_to_sign = [
f"(request-target): {method.lower()} {path}",
f"host: {host}",
f"date: {date}",
]
out_headers: dict[str, str] = {
"Host": host,
"Date": date,
}
if body is not None:
digest = base64.b64encode(hashlib.sha256(body).digest()).decode()
digest_header = f"SHA-256={digest}"
headers_to_sign.append(f"digest: {digest_header}")
out_headers["Digest"] = digest_header
signed_string = "\n".join(headers_to_sign)
header_names = " ".join(
h.split(":")[0] for h in headers_to_sign
)
private_key = serialization.load_pem_private_key(
private_key_pem.encode(), password=None,
)
signature_bytes = private_key.sign(
signed_string.encode(),
padding.PKCS1v15(),
hashes.SHA256(),
)
signature_b64 = base64.b64encode(signature_bytes).decode()
out_headers["Signature"] = (
f'keyId="{key_id}",'
f'headers="{header_names}",'
f'signature="{signature_b64}",'
f'algorithm="rsa-sha256"'
)
return out_headers
def verify_request_signature(
public_key_pem: str,
signature_header: str,
method: str,
path: str,
headers: dict[str, str],
) -> bool:
"""Verify an incoming HTTP Signature.
Args:
public_key_pem: PEM-encoded public key of the sender.
signature_header: Value of the ``Signature`` header.
method: HTTP method (GET, POST, etc.).
path: Request path (e.g. ``/users/alice/inbox``).
headers: All request headers (case-insensitive keys).
Returns:
True if the signature is valid.
"""
# Parse Signature header
parts: dict[str, str] = {}
for part in signature_header.split(","):
part = part.strip()
eq = part.index("=")
key = part[:eq]
val = part[eq + 1:].strip('"')
parts[key] = val
signed_headers = parts.get("headers", "date").split()
signature_b64 = parts.get("signature", "")
# Reconstruct the signed string
lines: list[str] = []
# Normalize header lookup to lowercase
lc_headers = {k.lower(): v for k, v in headers.items()}
for h in signed_headers:
if h == "(request-target)":
lines.append(f"(request-target): {method.lower()} {path}")
else:
val = lc_headers.get(h, "")
lines.append(f"{h}: {val}")
signed_string = "\n".join(lines)
public_key = serialization.load_pem_public_key(public_key_pem.encode())
try:
public_key.verify(
base64.b64decode(signature_b64),
signed_string.encode(),
padding.PKCS1v15(),
hashes.SHA256(),
)
return True
except Exception:
return False
def create_ld_signature(
private_key_pem: str,
key_id: str,
activity: dict,
) -> dict:
"""Create an RsaSignature2017 Linked Data signature for an activity."""
canonical = json.dumps(activity, sort_keys=True, separators=(",", ":"))
private_key = serialization.load_pem_private_key(
private_key_pem.encode(), password=None,
)
signature_bytes = private_key.sign(
canonical.encode(),
padding.PKCS1v15(),
hashes.SHA256(),
)
signature_b64 = base64.b64encode(signature_bytes).decode()
return {
"type": "RsaSignature2017",
"creator": key_id,
"created": datetime.now(timezone.utc).isoformat(),
"signatureValue": signature_b64,
}

141
utils/ipfs_client.py Normal file
View File

@@ -0,0 +1,141 @@
"""Async IPFS client for content-addressed storage.
All content can be stored on IPFS — blog posts, products, activities, etc.
Ported from ~/art-dag/activity-pub/ipfs_client.py (converted to async httpx).
Config via environment:
IPFS_API — multiaddr or URL (default: /ip4/127.0.0.1/tcp/5001)
IPFS_TIMEOUT — request timeout in seconds (default: 60)
IPFS_GATEWAY_URL — public gateway for CID links (default: https://ipfs.io)
"""
from __future__ import annotations
import json
import logging
import os
import re
import httpx
logger = logging.getLogger(__name__)
class IPFSError(Exception):
"""Raised when an IPFS operation fails."""
# -- Config ------------------------------------------------------------------
IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001")
IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "60"))
IPFS_GATEWAY_URL = os.getenv("IPFS_GATEWAY_URL", "https://ipfs.io")
def _multiaddr_to_url(multiaddr: str) -> str:
"""Convert IPFS multiaddr to HTTP URL."""
dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr)
if dns_match:
return f"http://{dns_match.group(1)}:{dns_match.group(2)}"
ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr)
if ip4_match:
return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}"
if multiaddr.startswith("http"):
return multiaddr
return "http://127.0.0.1:5001"
IPFS_BASE_URL = _multiaddr_to_url(IPFS_API)
# -- Async client functions --------------------------------------------------
async def add_bytes(data: bytes, *, pin: bool = True) -> str:
"""Add raw bytes to IPFS.
Returns the CID.
"""
try:
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
resp = await client.post(
f"{IPFS_BASE_URL}/api/v0/add",
params={"pin": str(pin).lower()},
files={"file": ("data", data)},
)
resp.raise_for_status()
cid = resp.json()["Hash"]
logger.info("Added to IPFS: %d bytes -> %s", len(data), cid)
return cid
except Exception as e:
logger.error("Failed to add bytes to IPFS: %s", e)
raise IPFSError(f"Failed to add bytes: {e}") from e
async def add_json(data: dict) -> str:
"""Serialize dict to sorted JSON and add to IPFS."""
json_bytes = json.dumps(data, indent=2, sort_keys=True).encode("utf-8")
return await add_bytes(json_bytes, pin=True)
async def get_bytes(cid: str) -> bytes | None:
"""Fetch content from IPFS by CID."""
try:
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
resp = await client.post(
f"{IPFS_BASE_URL}/api/v0/cat",
params={"arg": cid},
)
resp.raise_for_status()
logger.info("Retrieved from IPFS: %s (%d bytes)", cid, len(resp.content))
return resp.content
except Exception as e:
logger.error("Failed to get from IPFS: %s", e)
return None
async def pin_cid(cid: str) -> bool:
"""Pin a CID on this node."""
try:
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
resp = await client.post(
f"{IPFS_BASE_URL}/api/v0/pin/add",
params={"arg": cid},
)
resp.raise_for_status()
logger.info("Pinned on IPFS: %s", cid)
return True
except Exception as e:
logger.error("Failed to pin on IPFS: %s", e)
return False
async def unpin_cid(cid: str) -> bool:
"""Unpin a CID from this node."""
try:
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
resp = await client.post(
f"{IPFS_BASE_URL}/api/v0/pin/rm",
params={"arg": cid},
)
resp.raise_for_status()
logger.info("Unpinned from IPFS: %s", cid)
return True
except Exception as e:
logger.error("Failed to unpin from IPFS: %s", e)
return False
async def is_available() -> bool:
"""Check if IPFS daemon is reachable."""
try:
async with httpx.AsyncClient(timeout=5) as client:
resp = await client.post(f"{IPFS_BASE_URL}/api/v0/id")
return resp.status_code == 200
except Exception:
return False
def gateway_url(cid: str) -> str:
"""Return a public gateway URL for a CID."""
return f"{IPFS_GATEWAY_URL}/ipfs/{cid}"