diff --git a/alembic/versions/l2j0g6h8i9_add_fediverse_tables.py b/alembic/versions/l2j0g6h8i9_add_fediverse_tables.py new file mode 100644 index 0000000..c186bcc --- /dev/null +++ b/alembic/versions/l2j0g6h8i9_add_fediverse_tables.py @@ -0,0 +1,138 @@ +"""add fediverse social tables + +Revision ID: l2j0g6h8i9 +Revises: k1i9f5g7h8 +Create Date: 2026-02-22 + +Creates: +- ap_remote_actors — cached profiles of remote actors +- ap_following — outbound follows (local → remote) +- ap_remote_posts — ingested posts from remote actors +- ap_local_posts — native posts composed in federation UI +- ap_interactions — likes and boosts +- ap_notifications — follow/like/boost/mention/reply notifications +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB + +revision = "l2j0g6h8i9" +down_revision = "k1i9f5g7h8" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # -- ap_remote_actors -- + op.create_table( + "ap_remote_actors", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("actor_url", sa.String(512), unique=True, nullable=False), + sa.Column("inbox_url", sa.String(512), nullable=False), + sa.Column("shared_inbox_url", sa.String(512), nullable=True), + sa.Column("preferred_username", sa.String(255), nullable=False), + sa.Column("display_name", sa.String(255), nullable=True), + sa.Column("summary", sa.Text, nullable=True), + sa.Column("icon_url", sa.String(512), nullable=True), + sa.Column("public_key_pem", sa.Text, nullable=True), + sa.Column("domain", sa.String(255), nullable=False), + sa.Column("fetched_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + ) + op.create_index("ix_ap_remote_actor_url", "ap_remote_actors", ["actor_url"], unique=True) + op.create_index("ix_ap_remote_actor_domain", "ap_remote_actors", ["domain"]) + + # -- ap_following -- + op.create_table( + "ap_following", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False), + sa.Column("remote_actor_id", sa.Integer, sa.ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False), + sa.Column("state", sa.String(20), nullable=False, server_default="pending"), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("accepted_at", sa.DateTime(timezone=True), nullable=True), + sa.UniqueConstraint("actor_profile_id", "remote_actor_id", name="uq_following"), + ) + op.create_index("ix_ap_following_actor", "ap_following", ["actor_profile_id"]) + op.create_index("ix_ap_following_remote", "ap_following", ["remote_actor_id"]) + + # -- ap_remote_posts -- + op.create_table( + "ap_remote_posts", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("remote_actor_id", sa.Integer, sa.ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False), + sa.Column("activity_id", sa.String(512), unique=True, nullable=False), + sa.Column("object_id", sa.String(512), unique=True, nullable=False), + sa.Column("object_type", sa.String(64), nullable=False, server_default="Note"), + sa.Column("content", sa.Text, nullable=True), + sa.Column("summary", sa.Text, nullable=True), + sa.Column("url", sa.String(512), nullable=True), + sa.Column("attachment_data", JSONB, nullable=True), + sa.Column("tag_data", JSONB, nullable=True), + sa.Column("in_reply_to", sa.String(512), nullable=True), + sa.Column("conversation", sa.String(512), nullable=True), + sa.Column("published", sa.DateTime(timezone=True), nullable=True), + sa.Column("fetched_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + ) + op.create_index("ix_ap_remote_post_actor", "ap_remote_posts", ["remote_actor_id"]) + op.create_index("ix_ap_remote_post_published", "ap_remote_posts", ["published"]) + op.create_index("ix_ap_remote_post_object", "ap_remote_posts", ["object_id"], unique=True) + + # -- ap_local_posts -- + op.create_table( + "ap_local_posts", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False), + sa.Column("content", sa.Text, nullable=False), + sa.Column("visibility", sa.String(20), nullable=False, server_default="public"), + sa.Column("in_reply_to", sa.String(512), nullable=True), + sa.Column("published", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + ) + op.create_index("ix_ap_local_post_actor", "ap_local_posts", ["actor_profile_id"]) + op.create_index("ix_ap_local_post_published", "ap_local_posts", ["published"]) + + # -- ap_interactions -- + op.create_table( + "ap_interactions", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True), + sa.Column("remote_actor_id", sa.Integer, sa.ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=True), + sa.Column("post_type", sa.String(20), nullable=False), + sa.Column("post_id", sa.Integer, nullable=False), + sa.Column("interaction_type", sa.String(20), nullable=False), + sa.Column("activity_id", sa.String(512), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + ) + op.create_index("ix_ap_interaction_post", "ap_interactions", ["post_type", "post_id"]) + op.create_index("ix_ap_interaction_actor", "ap_interactions", ["actor_profile_id"]) + op.create_index("ix_ap_interaction_remote", "ap_interactions", ["remote_actor_id"]) + + # -- ap_notifications -- + op.create_table( + "ap_notifications", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False), + sa.Column("notification_type", sa.String(20), nullable=False), + sa.Column("from_remote_actor_id", sa.Integer, sa.ForeignKey("ap_remote_actors.id", ondelete="SET NULL"), nullable=True), + sa.Column("from_actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="SET NULL"), nullable=True), + sa.Column("target_activity_id", sa.Integer, sa.ForeignKey("ap_activities.id", ondelete="SET NULL"), nullable=True), + sa.Column("target_remote_post_id", sa.Integer, sa.ForeignKey("ap_remote_posts.id", ondelete="SET NULL"), nullable=True), + sa.Column("read", sa.Boolean, nullable=False, server_default="false"), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + ) + op.create_index("ix_ap_notification_actor", "ap_notifications", ["actor_profile_id"]) + op.create_index("ix_ap_notification_read", "ap_notifications", ["actor_profile_id", "read"]) + op.create_index("ix_ap_notification_created", "ap_notifications", ["created_at"]) + + +def downgrade() -> None: + op.drop_table("ap_notifications") + op.drop_table("ap_interactions") + op.drop_table("ap_local_posts") + op.drop_table("ap_remote_posts") + op.drop_table("ap_following") + op.drop_table("ap_remote_actors") diff --git a/browser/templates/_types/day/_nav.html b/browser/templates/_types/day/_nav.html index 3e4e5ed..cfb8aca 100644 --- a/browser/templates/_types/day/_nav.html +++ b/browser/templates/_types/day/_nav.html @@ -25,6 +25,15 @@ {% endcall %} +{# Container nav widgets (market links, etc.) #} +{% if container_nav_widgets %} + {% for wdata in container_nav_widgets %} + {% with ctx=wdata.ctx %} + {% include wdata.widget.template with context %} + {% endwith %} + {% endfor %} +{% endif %} + {# Admin link #} {% if g.rights.admin %} {% from 'macros/admin_nav.html' import admin_nav_item %} diff --git a/browser/templates/_types/entry/_nav.html b/browser/templates/_types/entry/_nav.html index 513f517..388fc2e 100644 --- a/browser/templates/_types/entry/_nav.html +++ b/browser/templates/_types/entry/_nav.html @@ -22,6 +22,14 @@ {% endcall %} +{% if container_nav_widgets %} + {% for wdata in container_nav_widgets %} + {% with ctx=wdata.ctx %} + {% include wdata.widget.template with context %} + {% endwith %} + {% endfor %} +{% endif %} + {# Admin link #} {% if g.rights.admin %} diff --git a/contracts/dtos.py b/contracts/dtos.py index 34223e9..e7c560b 100644 --- a/contracts/dtos.py +++ b/contracts/dtos.py @@ -187,3 +187,68 @@ class APAnchorDTO: ots_proof_cid: str | None = None confirmed_at: datetime | None = None bitcoin_txid: str | None = None + + +@dataclass(frozen=True, slots=True) +class RemoteActorDTO: + id: int + actor_url: str + inbox_url: str + preferred_username: str + domain: str + display_name: str | None = None + summary: str | None = None + icon_url: str | None = None + shared_inbox_url: str | None = None + public_key_pem: str | None = None + + +@dataclass(frozen=True, slots=True) +class RemotePostDTO: + id: int + remote_actor_id: int + object_id: str + content: str + summary: str | None = None + url: str | None = None + attachments: list[dict] = field(default_factory=list) + tags: list[dict] = field(default_factory=list) + published: datetime | None = None + actor: RemoteActorDTO | None = None + + +@dataclass(frozen=True, slots=True) +class TimelineItemDTO: + id: str # composite key for cursor pagination + post_type: str # "local" | "remote" | "boost" + content: str # HTML + published: datetime + actor_name: str + actor_username: str + object_id: str | None = None + summary: str | None = None + url: str | None = None + attachments: list[dict] = field(default_factory=list) + tags: list[dict] = field(default_factory=list) + actor_domain: str | None = None # None = local + actor_icon: str | None = None + actor_url: str | None = None + boosted_by: str | None = None + like_count: int = 0 + boost_count: int = 0 + liked_by_me: bool = False + boosted_by_me: bool = False + author_inbox: str | None = None + + +@dataclass(frozen=True, slots=True) +class NotificationDTO: + id: int + notification_type: str # follow/like/boost/mention/reply + from_actor_name: str + from_actor_username: str + created_at: datetime + read: bool + from_actor_domain: str | None = None + from_actor_icon: str | None = None + target_content_preview: str | None = None diff --git a/contracts/protocols.py b/contracts/protocols.py index 360dd3b..8f3ac3c 100644 --- a/contracts/protocols.py +++ b/contracts/protocols.py @@ -22,6 +22,10 @@ from .dtos import ( ActorProfileDTO, APActivityDTO, APFollowerDTO, + RemoteActorDTO, + RemotePostDTO, + TimelineItemDTO, + NotificationDTO, ) @@ -217,6 +221,11 @@ class FederationService(Protocol): self, session: AsyncSession, username: str, ) -> list[APFollowerDTO]: ... + async def get_followers_paginated( + self, session: AsyncSession, username: str, + page: int = 1, per_page: int = 20, + ) -> tuple[list[RemoteActorDTO], int]: ... + async def add_follower( self, session: AsyncSession, username: str, follower_acct: str, follower_inbox: str, follower_actor_url: str, @@ -227,5 +236,108 @@ class FederationService(Protocol): self, session: AsyncSession, username: str, follower_acct: str, ) -> bool: ... + # -- Remote actors -------------------------------------------------------- + async def get_or_fetch_remote_actor( + self, session: AsyncSession, actor_url: str, + ) -> RemoteActorDTO | None: ... + + async def search_remote_actor( + self, session: AsyncSession, acct: str, + ) -> RemoteActorDTO | None: ... + + # -- Following (outbound) ------------------------------------------------- + async def send_follow( + self, session: AsyncSession, local_username: str, remote_actor_url: str, + ) -> None: ... + + async def get_following( + self, session: AsyncSession, username: str, + page: int = 1, per_page: int = 20, + ) -> tuple[list[RemoteActorDTO], int]: ... + + async def accept_follow_response( + self, session: AsyncSession, local_username: str, remote_actor_url: str, + ) -> None: ... + + async def unfollow( + self, session: AsyncSession, local_username: str, remote_actor_url: str, + ) -> None: ... + + # -- Remote posts --------------------------------------------------------- + async def ingest_remote_post( + self, session: AsyncSession, remote_actor_id: int, + activity_json: dict, object_json: dict, + ) -> None: ... + + async def delete_remote_post( + self, session: AsyncSession, object_id: str, + ) -> None: ... + + async def get_remote_post( + self, session: AsyncSession, object_id: str, + ) -> RemotePostDTO | None: ... + + # -- Timelines ------------------------------------------------------------ + async def get_home_timeline( + self, session: AsyncSession, actor_profile_id: int, + before: datetime | None = None, limit: int = 20, + ) -> list[TimelineItemDTO]: ... + + async def get_public_timeline( + self, session: AsyncSession, + before: datetime | None = None, limit: int = 20, + ) -> list[TimelineItemDTO]: ... + + async def get_actor_timeline( + self, session: AsyncSession, remote_actor_id: int, + before: datetime | None = None, limit: int = 20, + ) -> list[TimelineItemDTO]: ... + + # -- Local posts ---------------------------------------------------------- + async def create_local_post( + self, session: AsyncSession, actor_profile_id: int, + content: str, visibility: str = "public", + in_reply_to: str | None = None, + ) -> int: ... + + async def delete_local_post( + self, session: AsyncSession, actor_profile_id: int, post_id: int, + ) -> None: ... + + # -- Interactions --------------------------------------------------------- + async def like_post( + self, session: AsyncSession, actor_profile_id: int, + object_id: str, author_inbox: str, + ) -> None: ... + + async def unlike_post( + self, session: AsyncSession, actor_profile_id: int, + object_id: str, author_inbox: str, + ) -> None: ... + + async def boost_post( + self, session: AsyncSession, actor_profile_id: int, + object_id: str, author_inbox: str, + ) -> None: ... + + async def unboost_post( + self, session: AsyncSession, actor_profile_id: int, + object_id: str, author_inbox: str, + ) -> None: ... + + # -- Notifications -------------------------------------------------------- + async def get_notifications( + self, session: AsyncSession, actor_profile_id: int, + before: datetime | None = None, limit: int = 20, + ) -> list[NotificationDTO]: ... + + async def unread_notification_count( + self, session: AsyncSession, actor_profile_id: int, + ) -> int: ... + + async def mark_notifications_read( + self, session: AsyncSession, actor_profile_id: int, + ) -> None: ... + # -- Stats ---------------------------------------------------------------- async def get_stats(self, session: AsyncSession) -> dict: ... diff --git a/events/handlers/ap_delivery_handler.py b/events/handlers/ap_delivery_handler.py index 702b7db..3cbbc8f 100644 --- a/events/handlers/ap_delivery_handler.py +++ b/events/handlers/ap_delivery_handler.py @@ -38,7 +38,8 @@ def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str) obj.setdefault("type", "Tombstone") else: # Create/Update: full object with attribution - obj["id"] = object_id + # Prefer stable id from object_data (set by try_publish), fall back to activity-derived + obj.setdefault("id", object_id) obj.setdefault("type", activity.object_type) obj.setdefault("attributedTo", actor_url) obj.setdefault("published", activity.published.isoformat() if activity.published else None) diff --git a/models/__init__.py b/models/__init__.py index e4fcc27..e696236 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -29,4 +29,5 @@ from .container_relation import ContainerRelation from .menu_node import MenuNode from .federation import ( ActorProfile, APActivity, APFollower, APInboxItem, APAnchor, IPFSPin, + RemoteActor, APFollowing, APRemotePost, APLocalPost, APInteraction, APNotification, ) diff --git a/models/federation.py b/models/federation.py index 0c96763..422388a 100644 --- a/models/federation.py +++ b/models/federation.py @@ -193,3 +193,207 @@ class IPFSPin(Base): def __repr__(self) -> str: return f"" + + +class RemoteActor(Base): + """Cached profile of a remote actor we interact with.""" + __tablename__ = "ap_remote_actors" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + actor_url: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) + inbox_url: Mapped[str] = mapped_column(String(512), nullable=False) + shared_inbox_url: Mapped[str | None] = mapped_column(String(512), nullable=True) + preferred_username: Mapped[str] = mapped_column(String(255), nullable=False) + display_name: Mapped[str | None] = mapped_column(String(255), nullable=True) + summary: Mapped[str | None] = mapped_column(Text, nullable=True) + icon_url: Mapped[str | None] = mapped_column(String(512), nullable=True) + public_key_pem: Mapped[str | None] = mapped_column(Text, nullable=True) + domain: Mapped[str] = mapped_column(String(255), nullable=False) + fetched_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + + __table_args__ = ( + Index("ix_ap_remote_actor_url", "actor_url", unique=True), + Index("ix_ap_remote_actor_domain", "domain"), + ) + + def __repr__(self) -> str: + return f"" + + +class APFollowing(Base): + """Outbound follow: local actor → remote actor.""" + __tablename__ = "ap_following" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + actor_profile_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, + ) + remote_actor_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False, + ) + state: Mapped[str] = mapped_column( + String(20), nullable=False, default="pending", server_default="pending", + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + accepted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + + # Relationships + actor_profile = relationship("ActorProfile") + remote_actor = relationship("RemoteActor") + + __table_args__ = ( + UniqueConstraint("actor_profile_id", "remote_actor_id", name="uq_following"), + Index("ix_ap_following_actor", "actor_profile_id"), + Index("ix_ap_following_remote", "remote_actor_id"), + ) + + def __repr__(self) -> str: + return f"" + + +class APRemotePost(Base): + """A federated post ingested from a remote actor.""" + __tablename__ = "ap_remote_posts" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + remote_actor_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False, + ) + activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) + object_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) + object_type: Mapped[str] = mapped_column(String(64), nullable=False, default="Note") + content: Mapped[str | None] = mapped_column(Text, nullable=True) + summary: Mapped[str | None] = mapped_column(Text, nullable=True) + url: Mapped[str | None] = mapped_column(String(512), nullable=True) + attachment_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + tag_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + in_reply_to: Mapped[str | None] = mapped_column(String(512), nullable=True) + conversation: Mapped[str | None] = mapped_column(String(512), nullable=True) + published: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + fetched_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + + # Relationships + remote_actor = relationship("RemoteActor") + + __table_args__ = ( + Index("ix_ap_remote_post_actor", "remote_actor_id"), + Index("ix_ap_remote_post_published", "published"), + Index("ix_ap_remote_post_object", "object_id", unique=True), + ) + + def __repr__(self) -> str: + return f"" + + +class APLocalPost(Base): + """A native post composed in the federation UI.""" + __tablename__ = "ap_local_posts" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + actor_profile_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, + ) + content: Mapped[str] = mapped_column(Text, nullable=False) + visibility: Mapped[str] = mapped_column( + String(20), nullable=False, default="public", server_default="public", + ) + in_reply_to: Mapped[str | None] = mapped_column(String(512), nullable=True) + published: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now(), + ) + + # Relationships + actor_profile = relationship("ActorProfile") + + __table_args__ = ( + Index("ix_ap_local_post_actor", "actor_profile_id"), + Index("ix_ap_local_post_published", "published"), + ) + + def __repr__(self) -> str: + return f"" + + +class APInteraction(Base): + """Like or boost (local or remote).""" + __tablename__ = "ap_interactions" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + actor_profile_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True, + ) + remote_actor_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=True, + ) + post_type: Mapped[str] = mapped_column(String(20), nullable=False) # local/remote + post_id: Mapped[int] = mapped_column(Integer, nullable=False) + interaction_type: Mapped[str] = mapped_column(String(20), nullable=False) # like/boost + activity_id: Mapped[str | None] = mapped_column(String(512), nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + + __table_args__ = ( + Index("ix_ap_interaction_post", "post_type", "post_id"), + Index("ix_ap_interaction_actor", "actor_profile_id"), + Index("ix_ap_interaction_remote", "remote_actor_id"), + ) + + def __repr__(self) -> str: + return f"" + + +class APNotification(Base): + """Notification for a local actor.""" + __tablename__ = "ap_notifications" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + actor_profile_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, + ) + notification_type: Mapped[str] = mapped_column(String(20), nullable=False) + from_remote_actor_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("ap_remote_actors.id", ondelete="SET NULL"), nullable=True, + ) + from_actor_profile_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("ap_actor_profiles.id", ondelete="SET NULL"), nullable=True, + ) + target_activity_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("ap_activities.id", ondelete="SET NULL"), nullable=True, + ) + target_remote_post_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("ap_remote_posts.id", ondelete="SET NULL"), nullable=True, + ) + read: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default="false") + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now(), + ) + + # Relationships + actor_profile = relationship("ActorProfile", foreign_keys=[actor_profile_id]) + from_remote_actor = relationship("RemoteActor") + from_actor_profile = relationship("ActorProfile", foreign_keys=[from_actor_profile_id]) + + __table_args__ = ( + Index("ix_ap_notification_actor", "actor_profile_id"), + Index("ix_ap_notification_read", "actor_profile_id", "read"), + Index("ix_ap_notification_created", "created_at"), + ) diff --git a/services/federation_impl.py b/services/federation_impl.py index 67767c1..de0ad94 100644 --- a/services/federation_impl.py +++ b/services/federation_impl.py @@ -12,8 +12,15 @@ from datetime import datetime, timezone from sqlalchemy import select, func, delete from sqlalchemy.ext.asyncio import AsyncSession -from shared.models.federation import ActorProfile, APActivity, APFollower -from shared.contracts.dtos import ActorProfileDTO, APActivityDTO, APFollowerDTO +from shared.models.federation import ( + ActorProfile, APActivity, APFollower, + RemoteActor, APFollowing, APRemotePost, APLocalPost, + APInteraction, APNotification, +) +from shared.contracts.dtos import ( + ActorProfileDTO, APActivityDTO, APFollowerDTO, + RemoteActorDTO, RemotePostDTO, TimelineItemDTO, NotificationDTO, +) def _domain() -> str: @@ -63,6 +70,38 @@ def _follower_to_dto(f: APFollower) -> APFollowerDTO: ) +def _remote_actor_to_dto(r: RemoteActor) -> RemoteActorDTO: + return RemoteActorDTO( + id=r.id, + actor_url=r.actor_url, + inbox_url=r.inbox_url, + preferred_username=r.preferred_username, + domain=r.domain, + display_name=r.display_name, + summary=r.summary, + icon_url=r.icon_url, + shared_inbox_url=r.shared_inbox_url, + public_key_pem=r.public_key_pem, + ) + + +def _remote_post_to_dto( + p: APRemotePost, actor: RemoteActor | None = None, +) -> RemotePostDTO: + return RemotePostDTO( + id=p.id, + remote_actor_id=p.remote_actor_id, + object_id=p.object_id, + content=p.content or "", + summary=p.summary, + url=p.url, + attachments=p.attachment_data or [], + tags=p.tag_data or [], + published=p.published, + actor=_remote_actor_to_dto(actor) if actor else None, + ) + + class SqlFederationService: # -- Actor management ----------------------------------------------------- @@ -337,6 +376,1165 @@ class SqlFederationService: ) return result.rowcount > 0 + async def get_followers_paginated( + self, session: AsyncSession, username: str, + page: int = 1, per_page: int = 20, + ) -> tuple[list[RemoteActorDTO], int]: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == username) + ) + ).scalar_one_or_none() + if actor is None: + return [], 0 + + total = ( + await session.execute( + select(func.count(APFollower.id)).where( + APFollower.actor_profile_id == actor.id, + ) + ) + ).scalar() or 0 + + offset = (page - 1) * per_page + followers = ( + await session.execute( + select(APFollower) + .where(APFollower.actor_profile_id == actor.id) + .order_by(APFollower.created_at.desc()) + .limit(per_page) + .offset(offset) + ) + ).scalars().all() + + results: list[RemoteActorDTO] = [] + for f in followers: + # Try to resolve from cached remote actors first + remote = ( + await session.execute( + select(RemoteActor).where( + RemoteActor.actor_url == f.follower_actor_url, + ) + ) + ).scalar_one_or_none() + if remote: + results.append(_remote_actor_to_dto(remote)) + else: + # Synthesise a minimal DTO from follower data + from urllib.parse import urlparse + domain = urlparse(f.follower_actor_url).netloc + results.append(RemoteActorDTO( + id=0, + actor_url=f.follower_actor_url, + inbox_url=f.follower_inbox, + preferred_username=f.follower_acct.split("@")[0] if "@" in f.follower_acct else f.follower_acct, + domain=domain, + display_name=None, + summary=None, + icon_url=None, + )) + return results, total + + # -- Remote actors -------------------------------------------------------- + + async def get_or_fetch_remote_actor( + self, session: AsyncSession, actor_url: str, + ) -> RemoteActorDTO | None: + # Check cache first + row = ( + await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == actor_url) + ) + ).scalar_one_or_none() + if row: + return _remote_actor_to_dto(row) + + # Fetch from remote + import httpx + try: + async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client: + resp = await client.get( + actor_url, + headers={"Accept": "application/activity+json"}, + ) + if resp.status_code != 200: + return None + data = resp.json() + except Exception: + return None + + return await self._upsert_remote_actor(session, actor_url, data) + + async def _upsert_remote_actor( + self, session: AsyncSession, actor_url: str, data: dict, + ) -> RemoteActorDTO | None: + from urllib.parse import urlparse + domain = urlparse(actor_url).netloc + + icon_url = None + icon = data.get("icon") + if isinstance(icon, dict): + icon_url = icon.get("url") + + pub_key = (data.get("publicKey") or {}).get("publicKeyPem") + + # Upsert + existing = ( + await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == actor_url) + ) + ).scalar_one_or_none() + + now = datetime.now(timezone.utc) + if existing: + existing.inbox_url = data.get("inbox", existing.inbox_url) + existing.shared_inbox_url = (data.get("endpoints") or {}).get("sharedInbox") + existing.preferred_username = data.get("preferredUsername", existing.preferred_username) + existing.display_name = data.get("name") + existing.summary = data.get("summary") + existing.icon_url = icon_url + existing.public_key_pem = pub_key + existing.fetched_at = now + await session.flush() + return _remote_actor_to_dto(existing) + + row = RemoteActor( + actor_url=actor_url, + inbox_url=data.get("inbox", ""), + shared_inbox_url=(data.get("endpoints") or {}).get("sharedInbox"), + preferred_username=data.get("preferredUsername", ""), + display_name=data.get("name"), + summary=data.get("summary"), + icon_url=icon_url, + public_key_pem=pub_key, + domain=domain, + fetched_at=now, + ) + session.add(row) + await session.flush() + return _remote_actor_to_dto(row) + + async def search_remote_actor( + self, session: AsyncSession, acct: str, + ) -> RemoteActorDTO | None: + from shared.utils.webfinger import resolve_actor + data = await resolve_actor(acct) + if not data: + return None + + actor_url = data.get("id") + if not actor_url: + return None + + return await self._upsert_remote_actor(session, actor_url, data) + + # -- Following (outbound) ------------------------------------------------- + + async def send_follow( + self, session: AsyncSession, local_username: str, remote_actor_url: str, + ) -> None: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == local_username) + ) + ).scalar_one_or_none() + if not actor: + raise ValueError(f"Actor not found: {local_username}") + + # Get or fetch remote actor + remote_dto = await self.get_or_fetch_remote_actor(session, remote_actor_url) + if not remote_dto: + raise ValueError(f"Could not resolve remote actor: {remote_actor_url}") + + remote = ( + await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == remote_actor_url) + ) + ).scalar_one() + + # Check for existing follow + existing = ( + await session.execute( + select(APFollowing).where( + APFollowing.actor_profile_id == actor.id, + APFollowing.remote_actor_id == remote.id, + ) + ) + ).scalar_one_or_none() + + if existing: + return # already following or pending + + follow = APFollowing( + actor_profile_id=actor.id, + remote_actor_id=remote.id, + state="pending", + ) + session.add(follow) + await session.flush() + + # Send Follow activity + domain = _domain() + actor_url = f"https://{domain}/users/{local_username}" + follow_id = f"{actor_url}/activities/{uuid.uuid4()}" + + follow_activity = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": follow_id, + "type": "Follow", + "actor": actor_url, + "object": remote_actor_url, + } + + import json + import httpx + from shared.utils.http_signatures import sign_request + from urllib.parse import urlparse + + body_bytes = json.dumps(follow_activity).encode() + parsed = urlparse(remote.inbox_url) + headers = sign_request( + private_key_pem=actor.private_key_pem, + key_id=f"{actor_url}#main-key", + method="POST", + path=parsed.path, + host=parsed.netloc, + body=body_bytes, + ) + headers["Content-Type"] = "application/activity+json" + + try: + async with httpx.AsyncClient(timeout=15) as client: + await client.post(remote.inbox_url, content=body_bytes, headers=headers) + except Exception: + import logging + logging.getLogger(__name__).exception("Failed to send Follow to %s", remote.inbox_url) + + async def get_following( + self, session: AsyncSession, username: str, + page: int = 1, per_page: int = 20, + ) -> tuple[list[RemoteActorDTO], int]: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == username) + ) + ).scalar_one_or_none() + if not actor: + return [], 0 + + total = ( + await session.execute( + select(func.count(APFollowing.id)).where( + APFollowing.actor_profile_id == actor.id, + APFollowing.state == "accepted", + ) + ) + ).scalar() or 0 + + offset = (page - 1) * per_page + result = await session.execute( + select(RemoteActor) + .join(APFollowing, APFollowing.remote_actor_id == RemoteActor.id) + .where( + APFollowing.actor_profile_id == actor.id, + APFollowing.state == "accepted", + ) + .order_by(APFollowing.accepted_at.desc()) + .limit(per_page) + .offset(offset) + ) + return [_remote_actor_to_dto(r) for r in result.scalars().all()], total + + async def accept_follow_response( + self, session: AsyncSession, local_username: str, remote_actor_url: str, + ) -> None: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == local_username) + ) + ).scalar_one_or_none() + if not actor: + return + + remote = ( + await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == remote_actor_url) + ) + ).scalar_one_or_none() + if not remote: + return + + follow = ( + await session.execute( + select(APFollowing).where( + APFollowing.actor_profile_id == actor.id, + APFollowing.remote_actor_id == remote.id, + APFollowing.state == "pending", + ) + ) + ).scalar_one_or_none() + if follow: + follow.state = "accepted" + follow.accepted_at = datetime.now(timezone.utc) + await session.flush() + + async def unfollow( + self, session: AsyncSession, local_username: str, remote_actor_url: str, + ) -> None: + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.preferred_username == local_username) + ) + ).scalar_one_or_none() + if not actor: + return + + remote = ( + await session.execute( + select(RemoteActor).where(RemoteActor.actor_url == remote_actor_url) + ) + ).scalar_one_or_none() + if not remote: + return + + follow = ( + await session.execute( + select(APFollowing).where( + APFollowing.actor_profile_id == actor.id, + APFollowing.remote_actor_id == remote.id, + ) + ) + ).scalar_one_or_none() + if not follow: + return + + await session.delete(follow) + await session.flush() + + # Send Undo(Follow) to remote + domain = _domain() + actor_url = f"https://{domain}/users/{local_username}" + undo_id = f"{actor_url}/activities/{uuid.uuid4()}" + + undo_activity = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": undo_id, + "type": "Undo", + "actor": actor_url, + "object": { + "type": "Follow", + "actor": actor_url, + "object": remote_actor_url, + }, + } + + import json + import httpx + from shared.utils.http_signatures import sign_request + from urllib.parse import urlparse + + body_bytes = json.dumps(undo_activity).encode() + parsed = urlparse(remote.inbox_url) + headers = sign_request( + private_key_pem=actor.private_key_pem, + key_id=f"{actor_url}#main-key", + method="POST", + path=parsed.path, + host=parsed.netloc, + body=body_bytes, + ) + headers["Content-Type"] = "application/activity+json" + + try: + async with httpx.AsyncClient(timeout=15) as client: + await client.post(remote.inbox_url, content=body_bytes, headers=headers) + except Exception: + import logging + logging.getLogger(__name__).exception("Failed to send Undo Follow to %s", remote.inbox_url) + + # -- Remote posts --------------------------------------------------------- + + async def ingest_remote_post( + self, session: AsyncSession, remote_actor_id: int, + activity_json: dict, object_json: dict, + ) -> None: + activity_id_str = activity_json.get("id", "") + object_id_str = object_json.get("id", "") + if not object_id_str: + return + + # Upsert + existing = ( + await session.execute( + select(APRemotePost).where(APRemotePost.object_id == object_id_str) + ) + ).scalar_one_or_none() + + published = None + pub_str = object_json.get("published") + if pub_str: + try: + published = datetime.fromisoformat(pub_str.replace("Z", "+00:00")) + except (ValueError, AttributeError): + pass + + # Sanitise HTML content + content = object_json.get("content", "") + + if existing: + existing.content = content + existing.summary = object_json.get("summary") + existing.url = object_json.get("url") + existing.attachment_data = object_json.get("attachment") + existing.tag_data = object_json.get("tag") + existing.in_reply_to = object_json.get("inReplyTo") + existing.conversation = object_json.get("conversation") + existing.published = published or existing.published + existing.fetched_at = datetime.now(timezone.utc) + await session.flush() + return + + post = APRemotePost( + remote_actor_id=remote_actor_id, + activity_id=activity_id_str, + object_id=object_id_str, + object_type=object_json.get("type", "Note"), + content=content, + summary=object_json.get("summary"), + url=object_json.get("url"), + attachment_data=object_json.get("attachment"), + tag_data=object_json.get("tag"), + in_reply_to=object_json.get("inReplyTo"), + conversation=object_json.get("conversation"), + published=published, + ) + session.add(post) + await session.flush() + + async def delete_remote_post( + self, session: AsyncSession, object_id: str, + ) -> None: + await session.execute( + delete(APRemotePost).where(APRemotePost.object_id == object_id) + ) + + async def get_remote_post( + self, session: AsyncSession, object_id: str, + ) -> RemotePostDTO | None: + post = ( + await session.execute( + select(APRemotePost).where(APRemotePost.object_id == object_id) + ) + ).scalar_one_or_none() + if not post: + return None + + actor = ( + await session.execute( + select(RemoteActor).where(RemoteActor.id == post.remote_actor_id) + ) + ).scalar_one_or_none() + + return _remote_post_to_dto(post, actor) + + # -- Timelines ------------------------------------------------------------ + + async def get_home_timeline( + self, session: AsyncSession, actor_profile_id: int, + before: datetime | None = None, limit: int = 20, + ) -> list[TimelineItemDTO]: + from sqlalchemy import union_all, literal_column, cast, String as SaString + from sqlalchemy.orm import aliased + + # Query 1: Remote posts from followed actors + following_subq = ( + select(APFollowing.remote_actor_id) + .where( + APFollowing.actor_profile_id == actor_profile_id, + APFollowing.state == "accepted", + ) + .subquery() + ) + + remote_q = ( + select( + APRemotePost.id.label("post_id"), + literal_column("'remote'").label("post_type"), + APRemotePost.content.label("content"), + APRemotePost.summary.label("summary"), + APRemotePost.url.label("url"), + APRemotePost.published.label("published"), + APRemotePost.object_id.label("object_id"), + RemoteActor.display_name.label("actor_name"), + RemoteActor.preferred_username.label("actor_username"), + RemoteActor.domain.label("actor_domain"), + RemoteActor.icon_url.label("actor_icon"), + RemoteActor.actor_url.label("actor_url"), + RemoteActor.inbox_url.label("author_inbox"), + ) + .join(RemoteActor, RemoteActor.id == APRemotePost.remote_actor_id) + .where(APRemotePost.remote_actor_id.in_(following_subq)) + ) + if before: + remote_q = remote_q.where(APRemotePost.published < before) + + # Query 2: Local activities (Create) by this actor + local_q = ( + select( + APActivity.id.label("post_id"), + literal_column("'local'").label("post_type"), + func.coalesce( + APActivity.object_data.op("->>")("content"), + literal_column("''"), + ).label("content"), + APActivity.object_data.op("->>")("summary").label("summary"), + APActivity.object_data.op("->>")("url").label("url"), + APActivity.published.label("published"), + APActivity.activity_id.label("object_id"), + func.coalesce( + ActorProfile.display_name, + ActorProfile.preferred_username, + ).label("actor_name"), + ActorProfile.preferred_username.label("actor_username"), + literal_column("NULL").label("actor_domain"), + literal_column("NULL").label("actor_icon"), + literal_column("NULL").label("actor_url"), + literal_column("NULL").label("author_inbox"), + ) + .join(ActorProfile, ActorProfile.id == APActivity.actor_profile_id) + .where( + APActivity.actor_profile_id == actor_profile_id, + APActivity.is_local == True, # noqa: E712 + APActivity.activity_type == "Create", + ) + ) + if before: + local_q = local_q.where(APActivity.published < before) + + # Union and sort + combined = union_all(remote_q, local_q).subquery() + result = await session.execute( + select(combined) + .order_by(combined.c.published.desc()) + .limit(limit) + ) + + items = [] + for row in result.mappings().all(): + # Look up interaction counts + user state + object_id = row["object_id"] + like_count = 0 + boost_count = 0 + liked_by_me = False + boosted_by_me = False + + if object_id: + post_type_val = row["post_type"] + post_id_val = row["post_id"] + + like_count = (await session.execute( + select(func.count(APInteraction.id)).where( + APInteraction.post_type == post_type_val, + APInteraction.post_id == post_id_val, + APInteraction.interaction_type == "like", + ) + )).scalar() or 0 + boost_count = (await session.execute( + select(func.count(APInteraction.id)).where( + APInteraction.post_type == post_type_val, + APInteraction.post_id == post_id_val, + APInteraction.interaction_type == "boost", + ) + )).scalar() or 0 + liked_by_me = bool((await session.execute( + select(APInteraction.id).where( + APInteraction.actor_profile_id == actor_profile_id, + APInteraction.post_type == post_type_val, + APInteraction.post_id == post_id_val, + APInteraction.interaction_type == "like", + ).limit(1) + )).scalar()) + boosted_by_me = bool((await session.execute( + select(APInteraction.id).where( + APInteraction.actor_profile_id == actor_profile_id, + APInteraction.post_type == post_type_val, + APInteraction.post_id == post_id_val, + APInteraction.interaction_type == "boost", + ).limit(1) + )).scalar()) + + items.append(TimelineItemDTO( + id=f"{row['post_type']}:{row['post_id']}", + post_type=row["post_type"], + content=row["content"] or "", + published=row["published"], + actor_name=row["actor_name"] or row["actor_username"] or "", + actor_username=row["actor_username"] or "", + object_id=object_id, + summary=row["summary"], + url=row["url"], + actor_domain=row["actor_domain"], + actor_icon=row["actor_icon"], + actor_url=row["actor_url"], + like_count=like_count, + boost_count=boost_count, + liked_by_me=liked_by_me, + boosted_by_me=boosted_by_me, + author_inbox=row["author_inbox"], + )) + return items + + async def get_public_timeline( + self, session: AsyncSession, + before: datetime | None = None, limit: int = 20, + ) -> list[TimelineItemDTO]: + # Public timeline: all local Create activities + q = ( + select(APActivity, ActorProfile) + .join(ActorProfile, ActorProfile.id == APActivity.actor_profile_id) + .where( + APActivity.is_local == True, # noqa: E712 + APActivity.activity_type == "Create", + ) + ) + if before: + q = q.where(APActivity.published < before) + q = q.order_by(APActivity.published.desc()).limit(limit) + + result = await session.execute(q) + items = [] + for activity, actor in result.all(): + content = "" + summary = None + url = None + if activity.object_data: + content = activity.object_data.get("content", "") + summary = activity.object_data.get("summary") + url = activity.object_data.get("url") + + items.append(TimelineItemDTO( + id=f"local:{activity.id}", + post_type="local", + content=content, + published=activity.published, + actor_name=actor.display_name or actor.preferred_username, + actor_username=actor.preferred_username, + object_id=activity.activity_id, + summary=summary, + url=url, + )) + return items + + async def get_actor_timeline( + self, session: AsyncSession, remote_actor_id: int, + before: datetime | None = None, limit: int = 20, + ) -> list[TimelineItemDTO]: + remote_actor = ( + await session.execute( + select(RemoteActor).where(RemoteActor.id == remote_actor_id) + ) + ).scalar_one_or_none() + if not remote_actor: + return [] + + q = ( + select(APRemotePost) + .where(APRemotePost.remote_actor_id == remote_actor_id) + ) + if before: + q = q.where(APRemotePost.published < before) + q = q.order_by(APRemotePost.published.desc()).limit(limit) + + posts = (await session.execute(q)).scalars().all() + return [ + TimelineItemDTO( + id=f"remote:{p.id}", + post_type="remote", + content=p.content or "", + published=p.published, + actor_name=remote_actor.display_name or remote_actor.preferred_username, + actor_username=remote_actor.preferred_username, + object_id=p.object_id, + summary=p.summary, + url=p.url, + actor_domain=remote_actor.domain, + actor_icon=remote_actor.icon_url, + actor_url=remote_actor.actor_url, + author_inbox=remote_actor.inbox_url, + ) + for p in posts + ] + + # -- Local posts ---------------------------------------------------------- + + async def create_local_post( + self, session: AsyncSession, actor_profile_id: int, + content: str, visibility: str = "public", + in_reply_to: str | None = None, + ) -> int: + now = datetime.now(timezone.utc) + post = APLocalPost( + actor_profile_id=actor_profile_id, + content=content, + visibility=visibility, + in_reply_to=in_reply_to, + published=now, + ) + session.add(post) + await session.flush() + + # Get actor for publishing + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.id == actor_profile_id) + ) + ).scalar_one() + + domain = _domain() + username = actor.preferred_username + + # Convert content to simple HTML + import html as html_mod + html_content = "".join( + f"

{html_mod.escape(line)}

" if line.strip() else "" + for line in content.split("\n") + ) + + object_id = f"https://{domain}/users/{username}/posts/{post.id}" + object_data = { + "id": object_id, + "type": "Note", + "content": html_content, + "url": object_id, + "attributedTo": f"https://{domain}/users/{username}", + "to": ["https://www.w3.org/ns/activitystreams#Public"], + "cc": [f"https://{domain}/users/{username}/followers"], + "published": now.isoformat(), + } + if in_reply_to: + object_data["inReplyTo"] = in_reply_to + + # Publish via existing activity system + await self.publish_activity( + session, + actor_user_id=actor.user_id, + activity_type="Create", + object_type="Note", + object_data=object_data, + source_type="local_post", + source_id=post.id, + ) + + return post.id + + async def delete_local_post( + self, session: AsyncSession, actor_profile_id: int, post_id: int, + ) -> None: + post = ( + await session.execute( + select(APLocalPost).where( + APLocalPost.id == post_id, + APLocalPost.actor_profile_id == actor_profile_id, + ) + ) + ).scalar_one_or_none() + if not post: + return + + # Get actor + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.id == actor_profile_id) + ) + ).scalar_one() + + domain = _domain() + object_id = f"https://{domain}/users/{actor.preferred_username}/posts/{post.id}" + + # Publish Delete activity + await self.publish_activity( + session, + actor_user_id=actor.user_id, + activity_type="Delete", + object_type="Note", + object_data={"id": object_id}, + source_type="local_post", + source_id=post.id, + ) + + await session.delete(post) + await session.flush() + + # -- Interactions --------------------------------------------------------- + + async def like_post( + self, session: AsyncSession, actor_profile_id: int, + object_id: str, author_inbox: str, + ) -> None: + # Determine post type and id + post_type, post_id = await self._resolve_post(session, object_id) + if not post_type: + return + + # Check for existing + existing = ( + await session.execute( + select(APInteraction).where( + APInteraction.actor_profile_id == actor_profile_id, + APInteraction.post_type == post_type, + APInteraction.post_id == post_id, + APInteraction.interaction_type == "like", + ) + ) + ).scalar_one_or_none() + if existing: + return + + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.id == actor_profile_id) + ) + ).scalar_one() + + domain = _domain() + actor_url = f"https://{domain}/users/{actor.preferred_username}" + like_id = f"{actor_url}/activities/{uuid.uuid4()}" + + interaction = APInteraction( + actor_profile_id=actor_profile_id, + post_type=post_type, + post_id=post_id, + interaction_type="like", + activity_id=like_id, + ) + session.add(interaction) + await session.flush() + + # Send Like to author + if author_inbox: + await self._send_activity_to_inbox( + actor, { + "@context": "https://www.w3.org/ns/activitystreams", + "id": like_id, + "type": "Like", + "actor": actor_url, + "object": object_id, + }, author_inbox, + ) + + async def unlike_post( + self, session: AsyncSession, actor_profile_id: int, + object_id: str, author_inbox: str, + ) -> None: + post_type, post_id = await self._resolve_post(session, object_id) + if not post_type: + return + + interaction = ( + await session.execute( + select(APInteraction).where( + APInteraction.actor_profile_id == actor_profile_id, + APInteraction.post_type == post_type, + APInteraction.post_id == post_id, + APInteraction.interaction_type == "like", + ) + ) + ).scalar_one_or_none() + if not interaction: + return + + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.id == actor_profile_id) + ) + ).scalar_one() + + domain = _domain() + actor_url = f"https://{domain}/users/{actor.preferred_username}" + + # Send Undo(Like) + if author_inbox and interaction.activity_id: + await self._send_activity_to_inbox( + actor, { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"{actor_url}/activities/{uuid.uuid4()}", + "type": "Undo", + "actor": actor_url, + "object": { + "id": interaction.activity_id, + "type": "Like", + "actor": actor_url, + "object": object_id, + }, + }, author_inbox, + ) + + await session.delete(interaction) + await session.flush() + + async def boost_post( + self, session: AsyncSession, actor_profile_id: int, + object_id: str, author_inbox: str, + ) -> None: + post_type, post_id = await self._resolve_post(session, object_id) + if not post_type: + return + + existing = ( + await session.execute( + select(APInteraction).where( + APInteraction.actor_profile_id == actor_profile_id, + APInteraction.post_type == post_type, + APInteraction.post_id == post_id, + APInteraction.interaction_type == "boost", + ) + ) + ).scalar_one_or_none() + if existing: + return + + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.id == actor_profile_id) + ) + ).scalar_one() + + domain = _domain() + actor_url = f"https://{domain}/users/{actor.preferred_username}" + announce_id = f"{actor_url}/activities/{uuid.uuid4()}" + + interaction = APInteraction( + actor_profile_id=actor_profile_id, + post_type=post_type, + post_id=post_id, + interaction_type="boost", + activity_id=announce_id, + ) + session.add(interaction) + await session.flush() + + # Send Announce to author and deliver to followers via publish_activity + if author_inbox: + announce_activity = { + "@context": "https://www.w3.org/ns/activitystreams", + "id": announce_id, + "type": "Announce", + "actor": actor_url, + "object": object_id, + "to": ["https://www.w3.org/ns/activitystreams#Public"], + "cc": [f"{actor_url}/followers"], + } + await self._send_activity_to_inbox(actor, announce_activity, author_inbox) + + # Also publish as our own activity for delivery to our followers + await self.publish_activity( + session, + actor_user_id=actor.user_id, + activity_type="Announce", + object_type="Note", + object_data={"id": object_id}, + ) + + async def unboost_post( + self, session: AsyncSession, actor_profile_id: int, + object_id: str, author_inbox: str, + ) -> None: + post_type, post_id = await self._resolve_post(session, object_id) + if not post_type: + return + + interaction = ( + await session.execute( + select(APInteraction).where( + APInteraction.actor_profile_id == actor_profile_id, + APInteraction.post_type == post_type, + APInteraction.post_id == post_id, + APInteraction.interaction_type == "boost", + ) + ) + ).scalar_one_or_none() + if not interaction: + return + + actor = ( + await session.execute( + select(ActorProfile).where(ActorProfile.id == actor_profile_id) + ) + ).scalar_one() + + domain = _domain() + actor_url = f"https://{domain}/users/{actor.preferred_username}" + + if author_inbox and interaction.activity_id: + await self._send_activity_to_inbox( + actor, { + "@context": "https://www.w3.org/ns/activitystreams", + "id": f"{actor_url}/activities/{uuid.uuid4()}", + "type": "Undo", + "actor": actor_url, + "object": { + "id": interaction.activity_id, + "type": "Announce", + "actor": actor_url, + "object": object_id, + }, + }, author_inbox, + ) + + await session.delete(interaction) + await session.flush() + + async def _resolve_post( + self, session: AsyncSession, object_id: str, + ) -> tuple[str | None, int | None]: + """Resolve an AP object_id to (post_type, post_id).""" + # Check remote posts + remote = ( + await session.execute( + select(APRemotePost.id).where(APRemotePost.object_id == object_id).limit(1) + ) + ).scalar() + if remote: + return "remote", remote + + # Check local activities + local = ( + await session.execute( + select(APActivity.id).where(APActivity.activity_id == object_id).limit(1) + ) + ).scalar() + if local: + return "local", local + + return None, None + + async def _send_activity_to_inbox( + self, actor: ActorProfile, activity: dict, inbox_url: str, + ) -> None: + import json + import httpx + from shared.utils.http_signatures import sign_request + from urllib.parse import urlparse + + domain = _domain() + actor_url = f"https://{domain}/users/{actor.preferred_username}" + + body_bytes = json.dumps(activity).encode() + parsed = urlparse(inbox_url) + headers = sign_request( + private_key_pem=actor.private_key_pem, + key_id=f"{actor_url}#main-key", + method="POST", + path=parsed.path, + host=parsed.netloc, + body=body_bytes, + ) + headers["Content-Type"] = "application/activity+json" + + try: + async with httpx.AsyncClient(timeout=15) as client: + await client.post(inbox_url, content=body_bytes, headers=headers) + except Exception: + import logging + logging.getLogger(__name__).exception( + "Failed to deliver activity to %s", inbox_url, + ) + + # -- Notifications -------------------------------------------------------- + + async def get_notifications( + self, session: AsyncSession, actor_profile_id: int, + before: datetime | None = None, limit: int = 20, + ) -> list[NotificationDTO]: + q = ( + select(APNotification, RemoteActor, ActorProfile) + .outerjoin(RemoteActor, RemoteActor.id == APNotification.from_remote_actor_id) + .outerjoin( + ActorProfile, + ActorProfile.id == APNotification.from_actor_profile_id, + ) + .where(APNotification.actor_profile_id == actor_profile_id) + ) + if before: + q = q.where(APNotification.created_at < before) + q = q.order_by(APNotification.created_at.desc()).limit(limit) + + result = await session.execute(q) + items = [] + for notif, remote_actor, from_actor_profile in result.all(): + if remote_actor: + name = remote_actor.display_name or remote_actor.preferred_username + username = remote_actor.preferred_username + domain = remote_actor.domain + icon = remote_actor.icon_url + elif from_actor_profile: + name = from_actor_profile.display_name or from_actor_profile.preferred_username + username = from_actor_profile.preferred_username + domain = None + icon = None + else: + name = "Unknown" + username = "unknown" + domain = None + icon = None + + # Get preview if target exists + preview = None + if notif.target_activity_id: + act = (await session.execute( + select(APActivity).where(APActivity.id == notif.target_activity_id) + )).scalar_one_or_none() + if act and act.object_data: + content = act.object_data.get("content", "") + # Strip HTML tags for preview + import re + preview = re.sub(r"<[^>]+>", "", content)[:100] + elif notif.target_remote_post_id: + rp = (await session.execute( + select(APRemotePost).where(APRemotePost.id == notif.target_remote_post_id) + )).scalar_one_or_none() + if rp and rp.content: + import re + preview = re.sub(r"<[^>]+>", "", rp.content)[:100] + + items.append(NotificationDTO( + id=notif.id, + notification_type=notif.notification_type, + from_actor_name=name, + from_actor_username=username, + from_actor_domain=domain, + from_actor_icon=icon, + target_content_preview=preview, + created_at=notif.created_at, + read=notif.read, + )) + return items + + async def unread_notification_count( + self, session: AsyncSession, actor_profile_id: int, + ) -> int: + return ( + await session.execute( + select(func.count(APNotification.id)).where( + APNotification.actor_profile_id == actor_profile_id, + APNotification.read == False, # noqa: E712 + ) + ) + ).scalar() or 0 + + async def mark_notifications_read( + self, session: AsyncSession, actor_profile_id: int, + ) -> None: + from sqlalchemy import update + await session.execute( + update(APNotification) + .where( + APNotification.actor_profile_id == actor_profile_id, + APNotification.read == False, # noqa: E712 + ) + .values(read=True) + ) + # -- Stats ---------------------------------------------------------------- async def get_stats(self, session: AsyncSession) -> dict: diff --git a/services/federation_publish.py b/services/federation_publish.py index d20053d..965055f 100644 --- a/services/federation_publish.py +++ b/services/federation_publish.py @@ -8,6 +8,7 @@ which creates the APActivity in the same DB transaction. AP delivery from __future__ import annotations import logging +import os from sqlalchemy.ext.asyncio import AsyncSession @@ -48,10 +49,20 @@ async def try_publish( if existing: if activity_type == "Create" and existing.activity_type != "Delete": return # already published (allow re-Create after Delete/unpublish) + if activity_type == "Update" and existing.activity_type == "Update": + return # already updated (Ghost fires duplicate webhooks) if activity_type == "Delete" and existing.activity_type == "Delete": return # already deleted - elif activity_type == "Delete": - return # never published, nothing to delete + elif activity_type in ("Delete", "Update"): + return # never published, nothing to delete/update + + # Stable object ID: same source always gets the same object id so + # Mastodon treats Create/Update/Delete as the same post. + domain = os.getenv("AP_DOMAIN", "rose-ash.com") + object_data["id"] = ( + f"https://{domain}/users/{actor.preferred_username}" + f"/objects/{source_type.lower()}/{source_id}" + ) try: await services.federation.publish_activity( diff --git a/services/stubs.py b/services/stubs.py index 4a02fc9..520eefd 100644 --- a/services/stubs.py +++ b/services/stubs.py @@ -227,5 +227,71 @@ class StubFederationService: async def remove_follower(self, session, username, follower_acct): return False + async def get_or_fetch_remote_actor(self, session, actor_url): + return None + + async def search_remote_actor(self, session, acct): + return None + + async def send_follow(self, session, local_username, remote_actor_url): + raise RuntimeError("FederationService not available") + + async def get_following(self, session, username, page=1, per_page=20): + return [], 0 + + async def get_followers_paginated(self, session, username, page=1, per_page=20): + return [], 0 + + async def accept_follow_response(self, session, local_username, remote_actor_url): + pass + + async def unfollow(self, session, local_username, remote_actor_url): + pass + + async def ingest_remote_post(self, session, remote_actor_id, activity_json, object_json): + pass + + async def delete_remote_post(self, session, object_id): + pass + + async def get_remote_post(self, session, object_id): + return None + + async def get_home_timeline(self, session, actor_profile_id, before=None, limit=20): + return [] + + async def get_public_timeline(self, session, before=None, limit=20): + return [] + + async def get_actor_timeline(self, session, remote_actor_id, before=None, limit=20): + return [] + + async def create_local_post(self, session, actor_profile_id, content, visibility="public", in_reply_to=None): + raise RuntimeError("FederationService not available") + + async def delete_local_post(self, session, actor_profile_id, post_id): + raise RuntimeError("FederationService not available") + + async def like_post(self, session, actor_profile_id, object_id, author_inbox): + pass + + async def unlike_post(self, session, actor_profile_id, object_id, author_inbox): + pass + + async def boost_post(self, session, actor_profile_id, object_id, author_inbox): + pass + + async def unboost_post(self, session, actor_profile_id, object_id, author_inbox): + pass + + async def get_notifications(self, session, actor_profile_id, before=None, limit=20): + return [] + + async def unread_notification_count(self, session, actor_profile_id): + return 0 + + async def mark_notifications_read(self, session, actor_profile_id): + pass + async def get_stats(self, session): return {"actors": 0, "activities": 0, "followers": 0} diff --git a/utils/webfinger.py b/utils/webfinger.py new file mode 100644 index 0000000..3eb99c9 --- /dev/null +++ b/utils/webfinger.py @@ -0,0 +1,68 @@ +"""WebFinger client for resolving remote AP actor profiles.""" +from __future__ import annotations + +import logging + +import httpx + +log = logging.getLogger(__name__) + +AP_CONTENT_TYPE = "application/activity+json" + + +async def resolve_actor(acct: str) -> dict | None: + """Resolve user@domain to actor JSON via WebFinger + actor fetch. + + Args: + acct: Handle in the form ``user@domain`` (no leading ``@``). + + Returns: + Actor JSON-LD dict, or None if resolution fails. + """ + acct = acct.lstrip("@") + if "@" not in acct: + return None + + _, domain = acct.rsplit("@", 1) + webfinger_url = f"https://{domain}/.well-known/webfinger" + + try: + async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client: + # Step 1: WebFinger lookup + resp = await client.get( + webfinger_url, + params={"resource": f"acct:{acct}"}, + headers={"Accept": "application/jrd+json, application/json"}, + ) + if resp.status_code != 200: + log.debug("WebFinger %s returned %d", webfinger_url, resp.status_code) + return None + + data = resp.json() + # Find self link with AP content type + actor_url = None + for link in data.get("links", []): + if link.get("rel") == "self" and link.get("type") in ( + AP_CONTENT_TYPE, + "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"", + ): + actor_url = link.get("href") + break + + if not actor_url: + log.debug("No AP self link in WebFinger response for %s", acct) + return None + + # Step 2: Fetch actor JSON + resp = await client.get( + actor_url, + headers={"Accept": AP_CONTENT_TYPE}, + ) + if resp.status_code == 200: + return resp.json() + + log.debug("Actor fetch %s returned %d", actor_url, resp.status_code) + except Exception: + log.exception("WebFinger resolution failed for %s", acct) + + return None