"""SQL-backed FederationService implementation. Queries ``shared.models.federation`` — only this module may read/write federation-domain tables on behalf of other domains. """ from __future__ import annotations import os import uuid from datetime import datetime, timezone from sqlalchemy import select, func, delete from sqlalchemy.ext.asyncio import AsyncSession from shared.models.federation import ActorProfile, APActivity, APFollower from shared.contracts.dtos import ActorProfileDTO, APActivityDTO, APFollowerDTO def _domain() -> str: return os.getenv("AP_DOMAIN", "rose-ash.com") def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO: domain = _domain() username = actor.preferred_username return ActorProfileDTO( id=actor.id, user_id=actor.user_id, preferred_username=username, public_key_pem=actor.public_key_pem, display_name=actor.display_name, summary=actor.summary, inbox_url=f"https://{domain}/users/{username}/inbox", outbox_url=f"https://{domain}/users/{username}/outbox", created_at=actor.created_at, ) def _activity_to_dto(a: APActivity) -> APActivityDTO: return APActivityDTO( id=a.id, activity_id=a.activity_id, activity_type=a.activity_type, actor_profile_id=a.actor_profile_id, object_type=a.object_type, object_data=a.object_data, published=a.published, is_local=a.is_local, source_type=a.source_type, source_id=a.source_id, ipfs_cid=a.ipfs_cid, ) def _follower_to_dto(f: APFollower) -> APFollowerDTO: return APFollowerDTO( id=f.id, actor_profile_id=f.actor_profile_id, follower_acct=f.follower_acct, follower_inbox=f.follower_inbox, follower_actor_url=f.follower_actor_url, created_at=f.created_at, ) class SqlFederationService: # -- Actor management ----------------------------------------------------- async def get_actor_by_username( self, session: AsyncSession, username: str, ) -> ActorProfileDTO | None: actor = ( await session.execute( select(ActorProfile).where(ActorProfile.preferred_username == username) ) ).scalar_one_or_none() return _actor_to_dto(actor) if actor else None async def get_actor_by_user_id( self, session: AsyncSession, user_id: int, ) -> ActorProfileDTO | None: actor = ( await session.execute( select(ActorProfile).where(ActorProfile.user_id == user_id) ) ).scalar_one_or_none() return _actor_to_dto(actor) if actor else None async def create_actor( self, session: AsyncSession, user_id: int, preferred_username: str, display_name: str | None = None, summary: str | None = None, ) -> ActorProfileDTO: from shared.utils.http_signatures import generate_rsa_keypair private_pem, public_pem = generate_rsa_keypair() actor = ActorProfile( user_id=user_id, preferred_username=preferred_username, display_name=display_name, summary=summary, public_key_pem=public_pem, private_key_pem=private_pem, ) session.add(actor) await session.flush() return _actor_to_dto(actor) async def username_available( self, session: AsyncSession, username: str, ) -> bool: count = ( await session.execute( select(func.count(ActorProfile.id)).where( ActorProfile.preferred_username == username ) ) ).scalar() or 0 return count == 0 # -- Publishing ----------------------------------------------------------- async def publish_activity( self, session: AsyncSession, *, actor_user_id: int, activity_type: str, object_type: str, object_data: dict, source_type: str | None = None, source_id: int | None = None, ) -> APActivityDTO: # Look up actor actor = ( await session.execute( select(ActorProfile).where(ActorProfile.user_id == actor_user_id) ) ).scalar_one_or_none() if actor is None: raise ValueError(f"No ActorProfile for user_id={actor_user_id}") domain = _domain() username = actor.preferred_username activity_uri = f"https://{domain}/users/{username}/activities/{uuid.uuid4()}" now = datetime.now(timezone.utc) activity = APActivity( activity_id=activity_uri, activity_type=activity_type, actor_profile_id=actor.id, object_type=object_type, object_data=object_data, published=now, is_local=True, source_type=source_type, source_id=source_id, ) session.add(activity) await session.flush() # Store activity JSON on IPFS (best-effort — don't fail publish if IPFS down) try: from shared.utils.ipfs_client import add_json, is_available if await is_available(): activity_json = { "@context": "https://www.w3.org/ns/activitystreams", "id": activity_uri, "type": activity_type, "actor": f"https://{domain}/users/{username}", "published": now.isoformat(), "object": { "type": object_type, **object_data, }, } cid = await add_json(activity_json) activity.ipfs_cid = cid await session.flush() except Exception: pass # IPFS failure is non-fatal # Emit domain event for downstream processing (delivery) from shared.events import emit_event await emit_event( session, "federation.activity_created", "APActivity", activity.id, { "activity_id": activity.activity_id, "activity_type": activity_type, "actor_username": username, "object_type": object_type, }, ) return _activity_to_dto(activity) # -- Queries -------------------------------------------------------------- async def get_activity( self, session: AsyncSession, activity_id: str, ) -> APActivityDTO | None: a = ( await session.execute( select(APActivity).where(APActivity.activity_id == activity_id) ) ).scalar_one_or_none() return _activity_to_dto(a) if a else None async def get_outbox( self, session: AsyncSession, username: str, page: int = 1, per_page: int = 20, ) -> tuple[list[APActivityDTO], int]: actor = ( await session.execute( select(ActorProfile).where(ActorProfile.preferred_username == username) ) ).scalar_one_or_none() if actor is None: return [], 0 total = ( await session.execute( select(func.count(APActivity.id)).where( APActivity.actor_profile_id == actor.id, APActivity.is_local == True, # noqa: E712 ) ) ).scalar() or 0 offset = (page - 1) * per_page result = await session.execute( select(APActivity) .where( APActivity.actor_profile_id == actor.id, APActivity.is_local == True, # noqa: E712 ) .order_by(APActivity.published.desc()) .limit(per_page) .offset(offset) ) return [_activity_to_dto(a) for a in result.scalars().all()], total async def get_activity_for_source( self, session: AsyncSession, source_type: str, source_id: int, ) -> APActivityDTO | None: a = ( await session.execute( select(APActivity).where( APActivity.source_type == source_type, APActivity.source_id == source_id, ).order_by(APActivity.created_at.desc()) ) ).scalar_one_or_none() return _activity_to_dto(a) if a else None # -- Followers ------------------------------------------------------------ async def get_followers( self, session: AsyncSession, username: str, ) -> list[APFollowerDTO]: actor = ( await session.execute( select(ActorProfile).where(ActorProfile.preferred_username == username) ) ).scalar_one_or_none() if actor is None: return [] result = await session.execute( select(APFollower).where(APFollower.actor_profile_id == actor.id) ) return [_follower_to_dto(f) for f in result.scalars().all()] async def add_follower( self, session: AsyncSession, username: str, follower_acct: str, follower_inbox: str, follower_actor_url: str, follower_public_key: str | None = None, ) -> APFollowerDTO: actor = ( await session.execute( select(ActorProfile).where(ActorProfile.preferred_username == username) ) ).scalar_one_or_none() if actor is None: raise ValueError(f"Actor not found: {username}") # Upsert: update if already following, insert if new existing = ( await session.execute( select(APFollower).where( APFollower.actor_profile_id == actor.id, APFollower.follower_acct == follower_acct, ) ) ).scalar_one_or_none() if existing: existing.follower_inbox = follower_inbox existing.follower_actor_url = follower_actor_url existing.follower_public_key = follower_public_key await session.flush() return _follower_to_dto(existing) follower = APFollower( actor_profile_id=actor.id, follower_acct=follower_acct, follower_inbox=follower_inbox, follower_actor_url=follower_actor_url, follower_public_key=follower_public_key, ) session.add(follower) await session.flush() return _follower_to_dto(follower) async def remove_follower( self, session: AsyncSession, username: str, follower_acct: str, ) -> bool: actor = ( await session.execute( select(ActorProfile).where(ActorProfile.preferred_username == username) ) ).scalar_one_or_none() if actor is None: return False result = await session.execute( delete(APFollower).where( APFollower.actor_profile_id == actor.id, APFollower.follower_acct == follower_acct, ) ) return result.rowcount > 0 # -- Stats ---------------------------------------------------------------- async def get_stats(self, session: AsyncSession) -> dict: actors = (await session.execute(select(func.count(ActorProfile.id)))).scalar() or 0 activities = (await session.execute(select(func.count(APActivity.id)))).scalar() or 0 followers = (await session.execute(select(func.count(APFollower.id)))).scalar() or 0 return {"actors": actors, "activities": activities, "followers": followers}