Files
mono/shared/models/federation.py
giles f42042ccb7 Monorepo: consolidate 7 repos into one
Combines shared, blog, market, cart, events, federation, and account
into a single repository. Eliminates submodule sync, sibling model
copying at build time, and per-app CI orchestration.

Changes:
- Remove per-app .git, .gitmodules, .gitea, submodule shared/ dirs
- Remove stale sibling model copies from each app
- Update all 6 Dockerfiles for monorepo build context (root = .)
- Add build directives to docker-compose.yml
- Add single .gitea/workflows/ci.yml with change detection
- Add .dockerignore for monorepo build context
- Create __init__.py for federation and account (cross-app imports)
2026-02-24 19:44:17 +00:00

467 lines
20 KiB
Python

"""Federation / ActivityPub ORM models.
These models support AP identity, activities, followers, inbox processing,
IPFS content addressing, and OpenTimestamps anchoring.
"""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import (
String, Integer, DateTime, Text, Boolean, BigInteger,
ForeignKey, UniqueConstraint, Index, func,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from shared.db.base import Base
class ActorProfile(Base):
"""AP identity for a user. Created when user chooses a username."""
__tablename__ = "ap_actor_profiles"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
user_id: Mapped[int] = mapped_column(
Integer, ForeignKey("users.id", ondelete="CASCADE"),
unique=True, nullable=False,
)
preferred_username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
display_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
summary: Mapped[str | None] = mapped_column(Text, nullable=True)
public_key_pem: Mapped[str] = mapped_column(Text, nullable=False)
private_key_pem: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# Relationships
user = relationship("User", backref="actor_profile", uselist=False, lazy="selectin")
activities = relationship("APActivity", back_populates="actor_profile", lazy="dynamic")
followers = relationship("APFollower", back_populates="actor_profile", lazy="dynamic")
__table_args__ = (
Index("ix_ap_actor_user_id", "user_id", unique=True),
Index("ix_ap_actor_username", "preferred_username", unique=True),
)
def __repr__(self) -> str:
return f"<ActorProfile {self.id} @{self.preferred_username}>"
class APActivity(Base):
"""An ActivityPub activity (local or remote).
Also serves as the unified event bus: internal domain events and public
federation activities both live here, distinguished by ``visibility``.
The ``EventProcessor`` polls rows with ``process_state='pending'``.
"""
__tablename__ = "ap_activities"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
activity_type: Mapped[str] = mapped_column(String(64), nullable=False)
actor_profile_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True,
)
object_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
published: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
signature: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
is_local: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True, server_default="true")
# Link back to originating domain object (e.g. source_type='post', source_id=42)
source_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
source_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
# IPFS content-addressed copy of the activity
ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True)
# Anchoring (filled later when batched into a merkle tree)
anchor_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_anchors.id", ondelete="SET NULL"), nullable=True,
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# --- Unified event-bus columns ---
actor_uri: Mapped[str | None] = mapped_column(
String(512), nullable=True,
)
visibility: Mapped[str] = mapped_column(
String(20), nullable=False, default="public", server_default="public",
)
process_state: Mapped[str] = mapped_column(
String(20), nullable=False, default="completed", server_default="completed",
)
process_attempts: Mapped[int] = mapped_column(
Integer, nullable=False, default=0, server_default="0",
)
process_max_attempts: Mapped[int] = mapped_column(
Integer, nullable=False, default=5, server_default="5",
)
process_error: Mapped[str | None] = mapped_column(Text, nullable=True)
processed_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True,
)
origin_app: Mapped[str | None] = mapped_column(
String(64), nullable=True,
)
# Relationships
actor_profile = relationship("ActorProfile", back_populates="activities")
__table_args__ = (
Index("ix_ap_activity_actor", "actor_profile_id"),
Index("ix_ap_activity_source", "source_type", "source_id"),
Index("ix_ap_activity_published", "published"),
Index("ix_ap_activity_process", "process_state"),
)
def __repr__(self) -> str:
return f"<APActivity {self.id} {self.activity_type}>"
class APFollower(Base):
"""A remote follower of a local actor.
``app_domain`` scopes the follow to a specific app (e.g. "blog",
"market", "events"). "federation" means the aggregate — the
follower subscribes to all activities.
"""
__tablename__ = "ap_followers"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
follower_acct: Mapped[str] = mapped_column(String(512), nullable=False)
follower_inbox: Mapped[str] = mapped_column(String(512), nullable=False)
follower_actor_url: Mapped[str] = mapped_column(String(512), nullable=False)
follower_public_key: Mapped[str | None] = mapped_column(Text, nullable=True)
app_domain: Mapped[str] = mapped_column(
String(64), nullable=False, default="federation", server_default="federation",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# Relationships
actor_profile = relationship("ActorProfile", back_populates="followers")
__table_args__ = (
UniqueConstraint(
"actor_profile_id", "follower_acct", "app_domain",
name="uq_follower_acct_app",
),
Index("ix_ap_follower_actor", "actor_profile_id"),
Index("ix_ap_follower_app_domain", "actor_profile_id", "app_domain"),
)
def __repr__(self) -> str:
return f"<APFollower {self.id} {self.follower_acct}>"
class APInboxItem(Base):
"""Raw incoming AP activity, stored for async processing."""
__tablename__ = "ap_inbox_items"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
raw_json: Mapped[dict] = mapped_column(JSONB, nullable=False)
activity_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
from_actor: Mapped[str | None] = mapped_column(String(512), nullable=True)
state: Mapped[str] = mapped_column(
String(20), nullable=False, default="pending", server_default="pending",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
__table_args__ = (
Index("ix_ap_inbox_state", "state"),
Index("ix_ap_inbox_actor", "actor_profile_id"),
)
def __repr__(self) -> str:
return f"<APInboxItem {self.id} {self.activity_type} [{self.state}]>"
class APAnchor(Base):
"""OpenTimestamps anchoring batch — merkle tree of activities."""
__tablename__ = "ap_anchors"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
merkle_root: Mapped[str] = mapped_column(String(128), nullable=False)
tree_ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True)
ots_proof_cid: Mapped[str | None] = mapped_column(String(128), nullable=True)
activity_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
confirmed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
bitcoin_txid: Mapped[str | None] = mapped_column(String(128), nullable=True)
def __repr__(self) -> str:
return f"<APAnchor {self.id} activities={self.activity_count}>"
class IPFSPin(Base):
"""Tracks content stored on IPFS — used by all domains."""
__tablename__ = "ipfs_pins"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
content_hash: Mapped[str] = mapped_column(String(128), nullable=False)
ipfs_cid: Mapped[str] = mapped_column(String(128), nullable=False, unique=True)
pin_type: Mapped[str] = mapped_column(String(64), nullable=False)
source_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
source_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
size_bytes: Mapped[int | None] = mapped_column(BigInteger, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
__table_args__ = (
Index("ix_ipfs_pin_source", "source_type", "source_id"),
Index("ix_ipfs_pin_cid", "ipfs_cid", unique=True),
)
def __repr__(self) -> str:
return f"<IPFSPin {self.id} {self.ipfs_cid[:16]}...>"
class RemoteActor(Base):
"""Cached profile of a remote actor we interact with."""
__tablename__ = "ap_remote_actors"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_url: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
inbox_url: Mapped[str] = mapped_column(String(512), nullable=False)
shared_inbox_url: Mapped[str | None] = mapped_column(String(512), nullable=True)
preferred_username: Mapped[str] = mapped_column(String(255), nullable=False)
display_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
summary: Mapped[str | None] = mapped_column(Text, nullable=True)
icon_url: Mapped[str | None] = mapped_column(String(512), nullable=True)
public_key_pem: Mapped[str | None] = mapped_column(Text, nullable=True)
domain: Mapped[str] = mapped_column(String(255), nullable=False)
fetched_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
__table_args__ = (
Index("ix_ap_remote_actor_url", "actor_url", unique=True),
Index("ix_ap_remote_actor_domain", "domain"),
)
def __repr__(self) -> str:
return f"<RemoteActor {self.id} {self.preferred_username}@{self.domain}>"
class APFollowing(Base):
"""Outbound follow: local actor → remote actor."""
__tablename__ = "ap_following"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
remote_actor_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False,
)
state: Mapped[str] = mapped_column(
String(20), nullable=False, default="pending", server_default="pending",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
accepted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
# Relationships
actor_profile = relationship("ActorProfile")
remote_actor = relationship("RemoteActor")
__table_args__ = (
UniqueConstraint("actor_profile_id", "remote_actor_id", name="uq_following"),
Index("ix_ap_following_actor", "actor_profile_id"),
Index("ix_ap_following_remote", "remote_actor_id"),
)
def __repr__(self) -> str:
return f"<APFollowing {self.id} [{self.state}]>"
class APRemotePost(Base):
"""A federated post ingested from a remote actor."""
__tablename__ = "ap_remote_posts"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
remote_actor_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False,
)
activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
object_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
object_type: Mapped[str] = mapped_column(String(64), nullable=False, default="Note")
content: Mapped[str | None] = mapped_column(Text, nullable=True)
summary: Mapped[str | None] = mapped_column(Text, nullable=True)
url: Mapped[str | None] = mapped_column(String(512), nullable=True)
attachment_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
tag_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
in_reply_to: Mapped[str | None] = mapped_column(String(512), nullable=True)
conversation: Mapped[str | None] = mapped_column(String(512), nullable=True)
published: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
fetched_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# Relationships
remote_actor = relationship("RemoteActor")
__table_args__ = (
Index("ix_ap_remote_post_actor", "remote_actor_id"),
Index("ix_ap_remote_post_published", "published"),
Index("ix_ap_remote_post_object", "object_id", unique=True),
)
def __repr__(self) -> str:
return f"<APRemotePost {self.id} {self.object_type}>"
class APLocalPost(Base):
"""A native post composed in the federation UI."""
__tablename__ = "ap_local_posts"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
content: Mapped[str] = mapped_column(Text, nullable=False)
visibility: Mapped[str] = mapped_column(
String(20), nullable=False, default="public", server_default="public",
)
in_reply_to: Mapped[str | None] = mapped_column(String(512), nullable=True)
published: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now(),
)
# Relationships
actor_profile = relationship("ActorProfile")
__table_args__ = (
Index("ix_ap_local_post_actor", "actor_profile_id"),
Index("ix_ap_local_post_published", "published"),
)
def __repr__(self) -> str:
return f"<APLocalPost {self.id}>"
class APInteraction(Base):
"""Like or boost (local or remote)."""
__tablename__ = "ap_interactions"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True,
)
remote_actor_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=True,
)
post_type: Mapped[str] = mapped_column(String(20), nullable=False) # local/remote
post_id: Mapped[int] = mapped_column(Integer, nullable=False)
interaction_type: Mapped[str] = mapped_column(String(20), nullable=False) # like/boost
activity_id: Mapped[str | None] = mapped_column(String(512), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
__table_args__ = (
Index("ix_ap_interaction_post", "post_type", "post_id"),
Index("ix_ap_interaction_actor", "actor_profile_id"),
Index("ix_ap_interaction_remote", "remote_actor_id"),
)
def __repr__(self) -> str:
return f"<APInteraction {self.id} {self.interaction_type}>"
class APNotification(Base):
"""Notification for a local actor."""
__tablename__ = "ap_notifications"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
notification_type: Mapped[str] = mapped_column(String(20), nullable=False)
from_remote_actor_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_remote_actors.id", ondelete="SET NULL"), nullable=True,
)
from_actor_profile_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="SET NULL"), nullable=True,
)
target_activity_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_activities.id", ondelete="SET NULL"), nullable=True,
)
target_remote_post_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_remote_posts.id", ondelete="SET NULL"), nullable=True,
)
read: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default="false")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# Relationships
actor_profile = relationship("ActorProfile", foreign_keys=[actor_profile_id])
from_remote_actor = relationship("RemoteActor")
from_actor_profile = relationship("ActorProfile", foreign_keys=[from_actor_profile_id])
__table_args__ = (
Index("ix_ap_notification_actor", "actor_profile_id"),
Index("ix_ap_notification_read", "actor_profile_id", "read"),
Index("ix_ap_notification_created", "created_at"),
)
class APDeliveryLog(Base):
"""Tracks successful deliveries of activities to remote inboxes.
Used for idempotency: the delivery handler skips inboxes that already
have a success row, so retries after a crash never send duplicates.
"""
__tablename__ = "ap_delivery_log"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
activity_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_activities.id", ondelete="CASCADE"), nullable=False,
)
inbox_url: Mapped[str] = mapped_column(String(512), nullable=False)
app_domain: Mapped[str] = mapped_column(String(128), nullable=False, server_default="federation")
status_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
delivered_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
__table_args__ = (
UniqueConstraint("activity_id", "inbox_url", "app_domain", name="uq_delivery_activity_inbox_domain"),
Index("ix_ap_delivery_activity", "activity_id"),
)