"""Federation / ActivityPub ORM models. These models support AP identity, activities, followers, inbox processing, IPFS content addressing, and OpenTimestamps anchoring. """ from __future__ import annotations from datetime import datetime from sqlalchemy import ( String, Integer, DateTime, Text, Boolean, BigInteger, ForeignKey, UniqueConstraint, Index, func, ) from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column, relationship from shared.db.base import Base class ActorProfile(Base): """AP identity for a user. Created when user chooses a username.""" __tablename__ = "ap_actor_profiles" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) user_id: Mapped[int] = mapped_column( Integer, unique=True, nullable=False, ) preferred_username: Mapped[str] = mapped_column(String(64), unique=True, nullable=False) display_name: Mapped[str | None] = mapped_column(String(255), nullable=True) summary: Mapped[str | None] = mapped_column(Text, nullable=True) public_key_pem: Mapped[str] = mapped_column(Text, nullable=False) private_key_pem: Mapped[str] = mapped_column(Text, nullable=False) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) # Relationships activities = relationship("APActivity", back_populates="actor_profile", lazy="dynamic") followers = relationship("APFollower", back_populates="actor_profile", lazy="dynamic") __table_args__ = ( Index("ix_ap_actor_user_id", "user_id", unique=True), Index("ix_ap_actor_username", "preferred_username", unique=True), ) def __repr__(self) -> str: return f"" class APActivity(Base): """An ActivityPub activity (local or remote). Also serves as the unified event bus: internal domain events and public federation activities both live here, distinguished by ``visibility``. The ``EventProcessor`` polls rows with ``process_state='pending'``. """ __tablename__ = "ap_activities" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) activity_type: Mapped[str] = mapped_column(String(64), nullable=False) actor_profile_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True, ) object_type: Mapped[str | None] = mapped_column(String(64), nullable=True) object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) published: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) signature: Mapped[dict | None] = mapped_column(JSONB, nullable=True) is_local: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True, server_default="true") # Link back to originating domain object (e.g. source_type='post', source_id=42) source_type: Mapped[str | None] = mapped_column(String(64), nullable=True) source_id: Mapped[int | None] = mapped_column(Integer, nullable=True) # IPFS content-addressed copy of the activity ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True) # Anchoring (filled later when batched into a merkle tree) anchor_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("ap_anchors.id", ondelete="SET NULL"), nullable=True, ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) # --- Unified event-bus columns --- actor_uri: Mapped[str | None] = mapped_column( String(512), nullable=True, ) visibility: Mapped[str] = mapped_column( String(20), nullable=False, default="public", server_default="public", ) process_state: Mapped[str] = mapped_column( String(20), nullable=False, default="completed", server_default="completed", ) process_attempts: Mapped[int] = mapped_column( Integer, nullable=False, default=0, server_default="0", ) process_max_attempts: Mapped[int] = mapped_column( Integer, nullable=False, default=5, server_default="5", ) process_error: Mapped[str | None] = mapped_column(Text, nullable=True) processed_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True, ) origin_app: Mapped[str | None] = mapped_column( String(64), nullable=True, ) # Relationships actor_profile = relationship("ActorProfile", back_populates="activities") __table_args__ = ( Index("ix_ap_activity_actor", "actor_profile_id"), Index("ix_ap_activity_source", "source_type", "source_id"), Index("ix_ap_activity_published", "published"), Index("ix_ap_activity_process", "process_state"), ) def __repr__(self) -> str: return f"" class APFollower(Base): """A remote follower of a local actor. ``app_domain`` scopes the follow to a specific app (e.g. "blog", "market", "events"). "federation" means the aggregate — the follower subscribes to all activities. """ __tablename__ = "ap_followers" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) actor_profile_id: Mapped[int] = mapped_column( Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, ) follower_acct: Mapped[str] = mapped_column(String(512), nullable=False) follower_inbox: Mapped[str] = mapped_column(String(512), nullable=False) follower_actor_url: Mapped[str] = mapped_column(String(512), nullable=False) follower_public_key: Mapped[str | None] = mapped_column(Text, nullable=True) app_domain: Mapped[str] = mapped_column( String(64), nullable=False, default="federation", server_default="federation", ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) # Relationships actor_profile = relationship("ActorProfile", back_populates="followers") __table_args__ = ( UniqueConstraint( "actor_profile_id", "follower_acct", "app_domain", name="uq_follower_acct_app", ), Index("ix_ap_follower_actor", "actor_profile_id"), Index("ix_ap_follower_app_domain", "actor_profile_id", "app_domain"), ) def __repr__(self) -> str: return f"" class APInboxItem(Base): """Raw incoming AP activity, stored for async processing.""" __tablename__ = "ap_inbox_items" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) actor_profile_id: Mapped[int] = mapped_column( Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, ) raw_json: Mapped[dict] = mapped_column(JSONB, nullable=False) activity_type: Mapped[str | None] = mapped_column(String(64), nullable=True) from_actor: Mapped[str | None] = mapped_column(String(512), nullable=True) state: Mapped[str] = mapped_column( String(20), nullable=False, default="pending", server_default="pending", ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) __table_args__ = ( Index("ix_ap_inbox_state", "state"), Index("ix_ap_inbox_actor", "actor_profile_id"), ) def __repr__(self) -> str: return f"" class APAnchor(Base): """OpenTimestamps anchoring batch — merkle tree of activities.""" __tablename__ = "ap_anchors" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) merkle_root: Mapped[str] = mapped_column(String(128), nullable=False) tree_ipfs_cid: Mapped[str | None] = mapped_column(String(128), nullable=True) ots_proof_cid: Mapped[str | None] = mapped_column(String(128), nullable=True) activity_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) confirmed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) bitcoin_txid: Mapped[str | None] = mapped_column(String(128), nullable=True) def __repr__(self) -> str: return f"" class IPFSPin(Base): """Tracks content stored on IPFS — used by all domains.""" __tablename__ = "ipfs_pins" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) content_hash: Mapped[str] = mapped_column(String(128), nullable=False) ipfs_cid: Mapped[str] = mapped_column(String(128), nullable=False, unique=True) pin_type: Mapped[str] = mapped_column(String(64), nullable=False) source_type: Mapped[str | None] = mapped_column(String(64), nullable=True) source_id: Mapped[int | None] = mapped_column(Integer, nullable=True) size_bytes: Mapped[int | None] = mapped_column(BigInteger, nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) __table_args__ = ( Index("ix_ipfs_pin_source", "source_type", "source_id"), Index("ix_ipfs_pin_cid", "ipfs_cid", unique=True), ) def __repr__(self) -> str: return f"" class RemoteActor(Base): """Cached profile of a remote actor we interact with.""" __tablename__ = "ap_remote_actors" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) actor_url: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) inbox_url: Mapped[str] = mapped_column(String(512), nullable=False) shared_inbox_url: Mapped[str | None] = mapped_column(String(512), nullable=True) preferred_username: Mapped[str] = mapped_column(String(255), nullable=False) display_name: Mapped[str | None] = mapped_column(String(255), nullable=True) summary: Mapped[str | None] = mapped_column(Text, nullable=True) icon_url: Mapped[str | None] = mapped_column(String(512), nullable=True) public_key_pem: Mapped[str | None] = mapped_column(Text, nullable=True) domain: Mapped[str] = mapped_column(String(255), nullable=False) fetched_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) __table_args__ = ( Index("ix_ap_remote_actor_url", "actor_url", unique=True), Index("ix_ap_remote_actor_domain", "domain"), ) def __repr__(self) -> str: return f"" class APFollowing(Base): """Outbound follow: local actor → remote actor.""" __tablename__ = "ap_following" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) actor_profile_id: Mapped[int] = mapped_column( Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, ) remote_actor_id: Mapped[int] = mapped_column( Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False, ) state: Mapped[str] = mapped_column( String(20), nullable=False, default="pending", server_default="pending", ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) accepted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) # Relationships actor_profile = relationship("ActorProfile") remote_actor = relationship("RemoteActor") __table_args__ = ( UniqueConstraint("actor_profile_id", "remote_actor_id", name="uq_following"), Index("ix_ap_following_actor", "actor_profile_id"), Index("ix_ap_following_remote", "remote_actor_id"), ) def __repr__(self) -> str: return f"" class APRemotePost(Base): """A federated post ingested from a remote actor.""" __tablename__ = "ap_remote_posts" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) remote_actor_id: Mapped[int] = mapped_column( Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False, ) activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) object_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) object_type: Mapped[str] = mapped_column(String(64), nullable=False, default="Note") content: Mapped[str | None] = mapped_column(Text, nullable=True) summary: Mapped[str | None] = mapped_column(Text, nullable=True) url: Mapped[str | None] = mapped_column(String(512), nullable=True) attachment_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) tag_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) in_reply_to: Mapped[str | None] = mapped_column(String(512), nullable=True) conversation: Mapped[str | None] = mapped_column(String(512), nullable=True) published: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) fetched_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) # Relationships remote_actor = relationship("RemoteActor") __table_args__ = ( Index("ix_ap_remote_post_actor", "remote_actor_id"), Index("ix_ap_remote_post_published", "published"), Index("ix_ap_remote_post_object", "object_id", unique=True), ) def __repr__(self) -> str: return f"" class APLocalPost(Base): """A native post composed in the federation UI.""" __tablename__ = "ap_local_posts" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) actor_profile_id: Mapped[int] = mapped_column( Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, ) content: Mapped[str] = mapped_column(Text, nullable=False) visibility: Mapped[str] = mapped_column( String(20), nullable=False, default="public", server_default="public", ) in_reply_to: Mapped[str | None] = mapped_column(String(512), nullable=True) published: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now(), ) # Relationships actor_profile = relationship("ActorProfile") __table_args__ = ( Index("ix_ap_local_post_actor", "actor_profile_id"), Index("ix_ap_local_post_published", "published"), ) def __repr__(self) -> str: return f"" class APInteraction(Base): """Like or boost (local or remote).""" __tablename__ = "ap_interactions" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) actor_profile_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True, ) remote_actor_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=True, ) post_type: Mapped[str] = mapped_column(String(20), nullable=False) # local/remote post_id: Mapped[int] = mapped_column(Integer, nullable=False) interaction_type: Mapped[str] = mapped_column(String(20), nullable=False) # like/boost activity_id: Mapped[str | None] = mapped_column(String(512), nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) __table_args__ = ( Index("ix_ap_interaction_post", "post_type", "post_id"), Index("ix_ap_interaction_actor", "actor_profile_id"), Index("ix_ap_interaction_remote", "remote_actor_id"), ) def __repr__(self) -> str: return f"" class APNotification(Base): """Notification for a local actor.""" __tablename__ = "ap_notifications" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) actor_profile_id: Mapped[int] = mapped_column( Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, ) notification_type: Mapped[str] = mapped_column(String(20), nullable=False) from_remote_actor_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("ap_remote_actors.id", ondelete="SET NULL"), nullable=True, ) from_actor_profile_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("ap_actor_profiles.id", ondelete="SET NULL"), nullable=True, ) target_activity_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("ap_activities.id", ondelete="SET NULL"), nullable=True, ) target_remote_post_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("ap_remote_posts.id", ondelete="SET NULL"), nullable=True, ) app_domain: Mapped[str | None] = mapped_column(String(30), nullable=True) read: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default="false") created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) # Relationships actor_profile = relationship("ActorProfile", foreign_keys=[actor_profile_id]) from_remote_actor = relationship("RemoteActor") from_actor_profile = relationship("ActorProfile", foreign_keys=[from_actor_profile_id]) __table_args__ = ( Index("ix_ap_notification_actor", "actor_profile_id"), Index("ix_ap_notification_read", "actor_profile_id", "read"), Index("ix_ap_notification_created", "created_at"), ) class APDeliveryLog(Base): """Tracks successful deliveries of activities to remote inboxes. Used for idempotency: the delivery handler skips inboxes that already have a success row, so retries after a crash never send duplicates. """ __tablename__ = "ap_delivery_log" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) activity_id: Mapped[int] = mapped_column( Integer, ForeignKey("ap_activities.id", ondelete="CASCADE"), nullable=False, ) inbox_url: Mapped[str] = mapped_column(String(512), nullable=False) app_domain: Mapped[str] = mapped_column(String(128), nullable=False, server_default="federation") status_code: Mapped[int | None] = mapped_column(Integer, nullable=True) delivered_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), ) __table_args__ = ( UniqueConstraint("activity_id", "inbox_url", "app_domain", name="uq_delivery_activity_inbox_domain"), Index("ix_ap_delivery_activity", "activity_id"), )