Lightweight social pages (search, follow/unfollow, followers, following, actor timeline) auto-registered for AP-enabled apps via shared blueprint. Federation keeps the full social hub. Followers scoped per app_domain; post cards show "View on Hub" link instead of interaction buttons. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1658 lines
57 KiB
Python
1658 lines
57 KiB
Python
"""SQL-backed FederationService implementation.
|
|
|
|
Queries ``shared.models.federation`` — only this module may read/write
|
|
federation-domain tables on behalf of other domains.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlalchemy import select, func, delete
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.models.federation import (
|
|
ActorProfile, APActivity, APFollower,
|
|
RemoteActor, APFollowing, APRemotePost, APLocalPost,
|
|
APInteraction, APNotification,
|
|
)
|
|
from shared.contracts.dtos import (
|
|
ActorProfileDTO, APActivityDTO, APFollowerDTO,
|
|
RemoteActorDTO, RemotePostDTO, TimelineItemDTO, NotificationDTO,
|
|
)
|
|
|
|
|
|
def _domain() -> str:
|
|
return os.getenv("AP_DOMAIN", "federation.rose-ash.com")
|
|
|
|
|
|
def _get_origin_app() -> str | None:
|
|
try:
|
|
from quart import current_app
|
|
return current_app.name
|
|
except (ImportError, RuntimeError):
|
|
return None
|
|
|
|
|
|
def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO:
|
|
domain = _domain()
|
|
username = actor.preferred_username
|
|
return ActorProfileDTO(
|
|
id=actor.id,
|
|
user_id=actor.user_id,
|
|
preferred_username=username,
|
|
public_key_pem=actor.public_key_pem,
|
|
display_name=actor.display_name,
|
|
summary=actor.summary,
|
|
inbox_url=f"https://{domain}/users/{username}/inbox",
|
|
outbox_url=f"https://{domain}/users/{username}/outbox",
|
|
created_at=actor.created_at,
|
|
)
|
|
|
|
|
|
def _activity_to_dto(a: APActivity) -> APActivityDTO:
|
|
return APActivityDTO(
|
|
id=a.id,
|
|
activity_id=a.activity_id,
|
|
activity_type=a.activity_type,
|
|
actor_profile_id=a.actor_profile_id,
|
|
object_type=a.object_type,
|
|
object_data=a.object_data,
|
|
published=a.published,
|
|
is_local=a.is_local,
|
|
source_type=a.source_type,
|
|
source_id=a.source_id,
|
|
ipfs_cid=a.ipfs_cid,
|
|
)
|
|
|
|
|
|
def _follower_to_dto(f: APFollower) -> APFollowerDTO:
|
|
return APFollowerDTO(
|
|
id=f.id,
|
|
actor_profile_id=f.actor_profile_id,
|
|
follower_acct=f.follower_acct,
|
|
follower_inbox=f.follower_inbox,
|
|
follower_actor_url=f.follower_actor_url,
|
|
created_at=f.created_at,
|
|
app_domain=f.app_domain,
|
|
)
|
|
|
|
|
|
def _remote_actor_to_dto(r: RemoteActor) -> RemoteActorDTO:
|
|
return RemoteActorDTO(
|
|
id=r.id,
|
|
actor_url=r.actor_url,
|
|
inbox_url=r.inbox_url,
|
|
preferred_username=r.preferred_username,
|
|
domain=r.domain,
|
|
display_name=r.display_name,
|
|
summary=r.summary,
|
|
icon_url=r.icon_url,
|
|
shared_inbox_url=r.shared_inbox_url,
|
|
public_key_pem=r.public_key_pem,
|
|
)
|
|
|
|
|
|
def _remote_post_to_dto(
|
|
p: APRemotePost, actor: RemoteActor | None = None,
|
|
) -> RemotePostDTO:
|
|
return RemotePostDTO(
|
|
id=p.id,
|
|
remote_actor_id=p.remote_actor_id,
|
|
object_id=p.object_id,
|
|
content=p.content or "",
|
|
summary=p.summary,
|
|
url=p.url,
|
|
attachments=p.attachment_data or [],
|
|
tags=p.tag_data or [],
|
|
published=p.published,
|
|
actor=_remote_actor_to_dto(actor) if actor else None,
|
|
)
|
|
|
|
|
|
class SqlFederationService:
|
|
# -- Actor management -----------------------------------------------------
|
|
|
|
async def get_actor_by_username(
|
|
self, session: AsyncSession, username: str,
|
|
) -> ActorProfileDTO | None:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == username)
|
|
)
|
|
).scalar_one_or_none()
|
|
return _actor_to_dto(actor) if actor else None
|
|
|
|
async def get_actor_by_user_id(
|
|
self, session: AsyncSession, user_id: int,
|
|
) -> ActorProfileDTO | None:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.user_id == user_id)
|
|
)
|
|
).scalar_one_or_none()
|
|
return _actor_to_dto(actor) if actor else None
|
|
|
|
async def create_actor(
|
|
self, session: AsyncSession, user_id: int, preferred_username: str,
|
|
display_name: str | None = None, summary: str | None = None,
|
|
) -> ActorProfileDTO:
|
|
from shared.utils.http_signatures import generate_rsa_keypair
|
|
|
|
private_pem, public_pem = generate_rsa_keypair()
|
|
|
|
actor = ActorProfile(
|
|
user_id=user_id,
|
|
preferred_username=preferred_username,
|
|
display_name=display_name,
|
|
summary=summary,
|
|
public_key_pem=public_pem,
|
|
private_key_pem=private_pem,
|
|
)
|
|
session.add(actor)
|
|
await session.flush()
|
|
return _actor_to_dto(actor)
|
|
|
|
async def username_available(
|
|
self, session: AsyncSession, username: str,
|
|
) -> bool:
|
|
count = (
|
|
await session.execute(
|
|
select(func.count(ActorProfile.id)).where(
|
|
ActorProfile.preferred_username == username
|
|
)
|
|
)
|
|
).scalar() or 0
|
|
return count == 0
|
|
|
|
# -- Publishing -----------------------------------------------------------
|
|
|
|
async def publish_activity(
|
|
self, session: AsyncSession, *,
|
|
actor_user_id: int,
|
|
activity_type: str,
|
|
object_type: str,
|
|
object_data: dict,
|
|
source_type: str | None = None,
|
|
source_id: int | None = None,
|
|
) -> APActivityDTO:
|
|
# Look up actor
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.user_id == actor_user_id)
|
|
)
|
|
).scalar_one_or_none()
|
|
if actor is None:
|
|
raise ValueError(f"No ActorProfile for user_id={actor_user_id}")
|
|
|
|
domain = _domain()
|
|
username = actor.preferred_username
|
|
activity_uri = f"https://{domain}/users/{username}/activities/{uuid.uuid4()}"
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
actor_url = f"https://{domain}/users/{username}"
|
|
|
|
activity = APActivity(
|
|
activity_id=activity_uri,
|
|
activity_type=activity_type,
|
|
actor_profile_id=actor.id,
|
|
actor_uri=actor_url,
|
|
object_type=object_type,
|
|
object_data=object_data,
|
|
published=now,
|
|
is_local=True,
|
|
source_type=source_type,
|
|
source_id=source_id,
|
|
visibility="public",
|
|
process_state="pending",
|
|
origin_app=_get_origin_app(),
|
|
)
|
|
session.add(activity)
|
|
await session.flush()
|
|
|
|
# Store activity JSON on IPFS (best-effort — don't fail publish if IPFS down)
|
|
try:
|
|
from shared.utils.ipfs_client import add_json, is_available
|
|
if await is_available():
|
|
activity_json = {
|
|
"@context": [
|
|
"https://www.w3.org/ns/activitystreams",
|
|
"https://w3id.org/security/v1",
|
|
],
|
|
"id": activity_uri,
|
|
"type": activity_type,
|
|
"actor": actor_url,
|
|
"published": now.isoformat(),
|
|
"object": {
|
|
"type": object_type,
|
|
**object_data,
|
|
},
|
|
}
|
|
cid = await add_json(activity_json)
|
|
activity.ipfs_cid = cid
|
|
await session.flush()
|
|
except Exception:
|
|
pass # IPFS failure is non-fatal
|
|
|
|
return _activity_to_dto(activity)
|
|
|
|
# -- Queries --------------------------------------------------------------
|
|
|
|
async def get_activity(
|
|
self, session: AsyncSession, activity_id: str,
|
|
) -> APActivityDTO | None:
|
|
a = (
|
|
await session.execute(
|
|
select(APActivity).where(APActivity.activity_id == activity_id)
|
|
)
|
|
).scalar_one_or_none()
|
|
return _activity_to_dto(a) if a else None
|
|
|
|
async def get_outbox(
|
|
self, session: AsyncSession, username: str,
|
|
page: int = 1, per_page: int = 20,
|
|
origin_app: str | None = None,
|
|
) -> tuple[list[APActivityDTO], int]:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == username)
|
|
)
|
|
).scalar_one_or_none()
|
|
if actor is None:
|
|
return [], 0
|
|
|
|
filters = [
|
|
APActivity.actor_profile_id == actor.id,
|
|
APActivity.is_local == True, # noqa: E712
|
|
]
|
|
if origin_app is not None:
|
|
filters.append(APActivity.origin_app == origin_app)
|
|
|
|
total = (
|
|
await session.execute(
|
|
select(func.count(APActivity.id)).where(*filters)
|
|
)
|
|
).scalar() or 0
|
|
|
|
offset = (page - 1) * per_page
|
|
result = await session.execute(
|
|
select(APActivity)
|
|
.where(*filters)
|
|
.order_by(APActivity.published.desc())
|
|
.limit(per_page)
|
|
.offset(offset)
|
|
)
|
|
return [_activity_to_dto(a) for a in result.scalars().all()], total
|
|
|
|
async def get_activity_for_source(
|
|
self, session: AsyncSession, source_type: str, source_id: int,
|
|
) -> APActivityDTO | None:
|
|
a = (
|
|
await session.execute(
|
|
select(APActivity).where(
|
|
APActivity.source_type == source_type,
|
|
APActivity.source_id == source_id,
|
|
).order_by(APActivity.created_at.desc())
|
|
.limit(1)
|
|
)
|
|
).scalars().first()
|
|
return _activity_to_dto(a) if a else None
|
|
|
|
async def count_activities_for_source(
|
|
self, session: AsyncSession, source_type: str, source_id: int,
|
|
*, activity_type: str,
|
|
) -> int:
|
|
from sqlalchemy import func
|
|
result = await session.execute(
|
|
select(func.count()).select_from(APActivity).where(
|
|
APActivity.source_type == source_type,
|
|
APActivity.source_id == source_id,
|
|
APActivity.activity_type == activity_type,
|
|
)
|
|
)
|
|
return result.scalar_one()
|
|
|
|
# -- Followers ------------------------------------------------------------
|
|
|
|
async def get_followers(
|
|
self, session: AsyncSession, username: str,
|
|
app_domain: str | None = None,
|
|
) -> list[APFollowerDTO]:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == username)
|
|
)
|
|
).scalar_one_or_none()
|
|
if actor is None:
|
|
return []
|
|
|
|
q = select(APFollower).where(APFollower.actor_profile_id == actor.id)
|
|
if app_domain is not None:
|
|
q = q.where(APFollower.app_domain == app_domain)
|
|
|
|
result = await session.execute(q)
|
|
return [_follower_to_dto(f) for f in result.scalars().all()]
|
|
|
|
async def add_follower(
|
|
self, session: AsyncSession, username: str,
|
|
follower_acct: str, follower_inbox: str, follower_actor_url: str,
|
|
follower_public_key: str | None = None,
|
|
app_domain: str = "federation",
|
|
) -> APFollowerDTO:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == username)
|
|
)
|
|
).scalar_one_or_none()
|
|
if actor is None:
|
|
raise ValueError(f"Actor not found: {username}")
|
|
|
|
# Upsert: update if already following this (actor, acct, app_domain)
|
|
existing = (
|
|
await session.execute(
|
|
select(APFollower).where(
|
|
APFollower.actor_profile_id == actor.id,
|
|
APFollower.follower_acct == follower_acct,
|
|
APFollower.app_domain == app_domain,
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
if existing:
|
|
existing.follower_inbox = follower_inbox
|
|
existing.follower_actor_url = follower_actor_url
|
|
existing.follower_public_key = follower_public_key
|
|
await session.flush()
|
|
return _follower_to_dto(existing)
|
|
|
|
follower = APFollower(
|
|
actor_profile_id=actor.id,
|
|
follower_acct=follower_acct,
|
|
follower_inbox=follower_inbox,
|
|
follower_actor_url=follower_actor_url,
|
|
follower_public_key=follower_public_key,
|
|
app_domain=app_domain,
|
|
)
|
|
session.add(follower)
|
|
await session.flush()
|
|
return _follower_to_dto(follower)
|
|
|
|
async def remove_follower(
|
|
self, session: AsyncSession, username: str, follower_acct: str,
|
|
app_domain: str = "federation",
|
|
) -> bool:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == username)
|
|
)
|
|
).scalar_one_or_none()
|
|
if actor is None:
|
|
return False
|
|
|
|
result = await session.execute(
|
|
delete(APFollower).where(
|
|
APFollower.actor_profile_id == actor.id,
|
|
APFollower.follower_acct == follower_acct,
|
|
APFollower.app_domain == app_domain,
|
|
)
|
|
)
|
|
return result.rowcount > 0
|
|
|
|
async def get_followers_paginated(
|
|
self, session: AsyncSession, username: str,
|
|
page: int = 1, per_page: int = 20,
|
|
app_domain: str | None = None,
|
|
) -> tuple[list[RemoteActorDTO], int]:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == username)
|
|
)
|
|
).scalar_one_or_none()
|
|
if actor is None:
|
|
return [], 0
|
|
|
|
filters = [APFollower.actor_profile_id == actor.id]
|
|
if app_domain is not None:
|
|
filters.append(APFollower.app_domain == app_domain)
|
|
|
|
total = (
|
|
await session.execute(
|
|
select(func.count(APFollower.id)).where(*filters)
|
|
)
|
|
).scalar() or 0
|
|
|
|
offset = (page - 1) * per_page
|
|
followers = (
|
|
await session.execute(
|
|
select(APFollower)
|
|
.where(*filters)
|
|
.order_by(APFollower.created_at.desc())
|
|
.limit(per_page)
|
|
.offset(offset)
|
|
)
|
|
).scalars().all()
|
|
|
|
results: list[RemoteActorDTO] = []
|
|
for f in followers:
|
|
# Try to resolve from cached remote actors first
|
|
remote = (
|
|
await session.execute(
|
|
select(RemoteActor).where(
|
|
RemoteActor.actor_url == f.follower_actor_url,
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if remote:
|
|
results.append(_remote_actor_to_dto(remote))
|
|
else:
|
|
# Synthesise a minimal DTO from follower data
|
|
from urllib.parse import urlparse
|
|
domain = urlparse(f.follower_actor_url).netloc
|
|
results.append(RemoteActorDTO(
|
|
id=0,
|
|
actor_url=f.follower_actor_url,
|
|
inbox_url=f.follower_inbox,
|
|
preferred_username=f.follower_acct.split("@")[0] if "@" in f.follower_acct else f.follower_acct,
|
|
domain=domain,
|
|
display_name=None,
|
|
summary=None,
|
|
icon_url=None,
|
|
))
|
|
return results, total
|
|
|
|
# -- Remote actors --------------------------------------------------------
|
|
|
|
async def get_or_fetch_remote_actor(
|
|
self, session: AsyncSession, actor_url: str,
|
|
) -> RemoteActorDTO | None:
|
|
# Check cache first
|
|
row = (
|
|
await session.execute(
|
|
select(RemoteActor).where(RemoteActor.actor_url == actor_url)
|
|
)
|
|
).scalar_one_or_none()
|
|
if row:
|
|
return _remote_actor_to_dto(row)
|
|
|
|
# Fetch from remote
|
|
import httpx
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
|
|
resp = await client.get(
|
|
actor_url,
|
|
headers={"Accept": "application/activity+json"},
|
|
)
|
|
if resp.status_code != 200:
|
|
return None
|
|
data = resp.json()
|
|
except Exception:
|
|
return None
|
|
|
|
return await self._upsert_remote_actor(session, actor_url, data)
|
|
|
|
async def _upsert_remote_actor(
|
|
self, session: AsyncSession, actor_url: str, data: dict,
|
|
) -> RemoteActorDTO | None:
|
|
from urllib.parse import urlparse
|
|
domain = urlparse(actor_url).netloc
|
|
|
|
icon_url = None
|
|
icon = data.get("icon")
|
|
if isinstance(icon, dict):
|
|
icon_url = icon.get("url")
|
|
|
|
pub_key = (data.get("publicKey") or {}).get("publicKeyPem")
|
|
|
|
# Upsert
|
|
existing = (
|
|
await session.execute(
|
|
select(RemoteActor).where(RemoteActor.actor_url == actor_url)
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
now = datetime.now(timezone.utc)
|
|
if existing:
|
|
existing.inbox_url = data.get("inbox", existing.inbox_url)
|
|
existing.shared_inbox_url = (data.get("endpoints") or {}).get("sharedInbox")
|
|
existing.preferred_username = data.get("preferredUsername", existing.preferred_username)
|
|
existing.display_name = data.get("name")
|
|
existing.summary = data.get("summary")
|
|
existing.icon_url = icon_url
|
|
existing.public_key_pem = pub_key
|
|
existing.fetched_at = now
|
|
await session.flush()
|
|
return _remote_actor_to_dto(existing)
|
|
|
|
row = RemoteActor(
|
|
actor_url=actor_url,
|
|
inbox_url=data.get("inbox", ""),
|
|
shared_inbox_url=(data.get("endpoints") or {}).get("sharedInbox"),
|
|
preferred_username=data.get("preferredUsername", ""),
|
|
display_name=data.get("name"),
|
|
summary=data.get("summary"),
|
|
icon_url=icon_url,
|
|
public_key_pem=pub_key,
|
|
domain=domain,
|
|
fetched_at=now,
|
|
)
|
|
session.add(row)
|
|
await session.flush()
|
|
return _remote_actor_to_dto(row)
|
|
|
|
async def search_remote_actor(
|
|
self, session: AsyncSession, acct: str,
|
|
) -> RemoteActorDTO | None:
|
|
from shared.utils.webfinger import resolve_actor
|
|
data = await resolve_actor(acct)
|
|
if not data:
|
|
return None
|
|
|
|
actor_url = data.get("id")
|
|
if not actor_url:
|
|
return None
|
|
|
|
return await self._upsert_remote_actor(session, actor_url, data)
|
|
|
|
async def search_actors(
|
|
self, session: AsyncSession, query: str, page: int = 1, limit: int = 20,
|
|
) -> tuple[list[RemoteActorDTO], int]:
|
|
from sqlalchemy import or_
|
|
|
|
pattern = f"%{query}%"
|
|
offset = (page - 1) * limit
|
|
|
|
# WebFinger resolve for @user@domain queries (first page only)
|
|
webfinger_result: RemoteActorDTO | None = None
|
|
if page == 1 and "@" in query:
|
|
webfinger_result = await self.search_remote_actor(session, query)
|
|
|
|
# Search cached remote actors
|
|
remote_filter = or_(
|
|
RemoteActor.preferred_username.ilike(pattern),
|
|
RemoteActor.display_name.ilike(pattern),
|
|
RemoteActor.domain.ilike(pattern),
|
|
)
|
|
remote_total = (
|
|
await session.execute(
|
|
select(func.count(RemoteActor.id)).where(remote_filter)
|
|
)
|
|
).scalar() or 0
|
|
|
|
# Search local actor profiles
|
|
local_filter = or_(
|
|
ActorProfile.preferred_username.ilike(pattern),
|
|
ActorProfile.display_name.ilike(pattern),
|
|
)
|
|
local_total = (
|
|
await session.execute(
|
|
select(func.count(ActorProfile.id)).where(local_filter)
|
|
)
|
|
).scalar() or 0
|
|
|
|
total = remote_total + local_total
|
|
|
|
# Fetch remote actors page
|
|
remote_rows = (
|
|
await session.execute(
|
|
select(RemoteActor)
|
|
.where(remote_filter)
|
|
.order_by(RemoteActor.preferred_username)
|
|
.limit(limit)
|
|
.offset(offset)
|
|
)
|
|
).scalars().all()
|
|
|
|
results: list[RemoteActorDTO] = [_remote_actor_to_dto(r) for r in remote_rows]
|
|
|
|
# Fill remaining slots with local actors
|
|
remaining = limit - len(results)
|
|
local_offset = max(0, offset - remote_total)
|
|
if remaining > 0 and offset + len(results) >= remote_total:
|
|
domain = _domain()
|
|
local_rows = (
|
|
await session.execute(
|
|
select(ActorProfile)
|
|
.where(local_filter)
|
|
.order_by(ActorProfile.preferred_username)
|
|
.limit(remaining)
|
|
.offset(local_offset)
|
|
)
|
|
).scalars().all()
|
|
for lp in local_rows:
|
|
results.append(RemoteActorDTO(
|
|
id=0,
|
|
actor_url=f"https://{domain}/users/{lp.preferred_username}",
|
|
inbox_url=f"https://{domain}/users/{lp.preferred_username}/inbox",
|
|
preferred_username=lp.preferred_username,
|
|
domain=domain,
|
|
display_name=lp.display_name,
|
|
summary=lp.summary,
|
|
icon_url=None,
|
|
))
|
|
|
|
# Prepend WebFinger result (deduped)
|
|
if webfinger_result:
|
|
existing_urls = {r.actor_url for r in results}
|
|
if webfinger_result.actor_url not in existing_urls:
|
|
results.insert(0, webfinger_result)
|
|
total += 1
|
|
|
|
return results, total
|
|
|
|
# -- Following (outbound) -------------------------------------------------
|
|
|
|
async def send_follow(
|
|
self, session: AsyncSession, local_username: str, remote_actor_url: str,
|
|
) -> None:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == local_username)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not actor:
|
|
raise ValueError(f"Actor not found: {local_username}")
|
|
|
|
# Get or fetch remote actor
|
|
remote_dto = await self.get_or_fetch_remote_actor(session, remote_actor_url)
|
|
if not remote_dto:
|
|
raise ValueError(f"Could not resolve remote actor: {remote_actor_url}")
|
|
|
|
remote = (
|
|
await session.execute(
|
|
select(RemoteActor).where(RemoteActor.actor_url == remote_actor_url)
|
|
)
|
|
).scalar_one()
|
|
|
|
# Check for existing follow
|
|
existing = (
|
|
await session.execute(
|
|
select(APFollowing).where(
|
|
APFollowing.actor_profile_id == actor.id,
|
|
APFollowing.remote_actor_id == remote.id,
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
if existing:
|
|
return # already following or pending
|
|
|
|
follow = APFollowing(
|
|
actor_profile_id=actor.id,
|
|
remote_actor_id=remote.id,
|
|
state="pending",
|
|
)
|
|
session.add(follow)
|
|
await session.flush()
|
|
|
|
# Send Follow activity
|
|
domain = _domain()
|
|
actor_url = f"https://{domain}/users/{local_username}"
|
|
follow_id = f"{actor_url}/activities/{uuid.uuid4()}"
|
|
|
|
follow_activity = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"id": follow_id,
|
|
"type": "Follow",
|
|
"actor": actor_url,
|
|
"object": remote_actor_url,
|
|
}
|
|
|
|
import json
|
|
import httpx
|
|
from shared.utils.http_signatures import sign_request
|
|
from urllib.parse import urlparse
|
|
|
|
body_bytes = json.dumps(follow_activity).encode()
|
|
parsed = urlparse(remote.inbox_url)
|
|
headers = sign_request(
|
|
private_key_pem=actor.private_key_pem,
|
|
key_id=f"{actor_url}#main-key",
|
|
method="POST",
|
|
path=parsed.path,
|
|
host=parsed.netloc,
|
|
body=body_bytes,
|
|
)
|
|
headers["Content-Type"] = "application/activity+json"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
await client.post(remote.inbox_url, content=body_bytes, headers=headers)
|
|
except Exception:
|
|
import logging
|
|
logging.getLogger(__name__).exception("Failed to send Follow to %s", remote.inbox_url)
|
|
|
|
async def get_following(
|
|
self, session: AsyncSession, username: str,
|
|
page: int = 1, per_page: int = 20,
|
|
) -> tuple[list[RemoteActorDTO], int]:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == username)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not actor:
|
|
return [], 0
|
|
|
|
total = (
|
|
await session.execute(
|
|
select(func.count(APFollowing.id)).where(
|
|
APFollowing.actor_profile_id == actor.id,
|
|
APFollowing.state == "accepted",
|
|
)
|
|
)
|
|
).scalar() or 0
|
|
|
|
offset = (page - 1) * per_page
|
|
result = await session.execute(
|
|
select(RemoteActor)
|
|
.join(APFollowing, APFollowing.remote_actor_id == RemoteActor.id)
|
|
.where(
|
|
APFollowing.actor_profile_id == actor.id,
|
|
APFollowing.state == "accepted",
|
|
)
|
|
.order_by(APFollowing.accepted_at.desc())
|
|
.limit(per_page)
|
|
.offset(offset)
|
|
)
|
|
return [_remote_actor_to_dto(r) for r in result.scalars().all()], total
|
|
|
|
async def accept_follow_response(
|
|
self, session: AsyncSession, local_username: str, remote_actor_url: str,
|
|
) -> None:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == local_username)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not actor:
|
|
return
|
|
|
|
remote = (
|
|
await session.execute(
|
|
select(RemoteActor).where(RemoteActor.actor_url == remote_actor_url)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not remote:
|
|
return
|
|
|
|
follow = (
|
|
await session.execute(
|
|
select(APFollowing).where(
|
|
APFollowing.actor_profile_id == actor.id,
|
|
APFollowing.remote_actor_id == remote.id,
|
|
APFollowing.state == "pending",
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if follow:
|
|
follow.state = "accepted"
|
|
follow.accepted_at = datetime.now(timezone.utc)
|
|
await session.flush()
|
|
|
|
async def unfollow(
|
|
self, session: AsyncSession, local_username: str, remote_actor_url: str,
|
|
) -> None:
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.preferred_username == local_username)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not actor:
|
|
return
|
|
|
|
remote = (
|
|
await session.execute(
|
|
select(RemoteActor).where(RemoteActor.actor_url == remote_actor_url)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not remote:
|
|
return
|
|
|
|
follow = (
|
|
await session.execute(
|
|
select(APFollowing).where(
|
|
APFollowing.actor_profile_id == actor.id,
|
|
APFollowing.remote_actor_id == remote.id,
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not follow:
|
|
return
|
|
|
|
await session.delete(follow)
|
|
await session.flush()
|
|
|
|
# Send Undo(Follow) to remote
|
|
domain = _domain()
|
|
actor_url = f"https://{domain}/users/{local_username}"
|
|
undo_id = f"{actor_url}/activities/{uuid.uuid4()}"
|
|
|
|
undo_activity = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"id": undo_id,
|
|
"type": "Undo",
|
|
"actor": actor_url,
|
|
"object": {
|
|
"type": "Follow",
|
|
"actor": actor_url,
|
|
"object": remote_actor_url,
|
|
},
|
|
}
|
|
|
|
import json
|
|
import httpx
|
|
from shared.utils.http_signatures import sign_request
|
|
from urllib.parse import urlparse
|
|
|
|
body_bytes = json.dumps(undo_activity).encode()
|
|
parsed = urlparse(remote.inbox_url)
|
|
headers = sign_request(
|
|
private_key_pem=actor.private_key_pem,
|
|
key_id=f"{actor_url}#main-key",
|
|
method="POST",
|
|
path=parsed.path,
|
|
host=parsed.netloc,
|
|
body=body_bytes,
|
|
)
|
|
headers["Content-Type"] = "application/activity+json"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
await client.post(remote.inbox_url, content=body_bytes, headers=headers)
|
|
except Exception:
|
|
import logging
|
|
logging.getLogger(__name__).exception("Failed to send Undo Follow to %s", remote.inbox_url)
|
|
|
|
# -- Remote posts ---------------------------------------------------------
|
|
|
|
async def ingest_remote_post(
|
|
self, session: AsyncSession, remote_actor_id: int,
|
|
activity_json: dict, object_json: dict,
|
|
) -> None:
|
|
activity_id_str = activity_json.get("id", "")
|
|
object_id_str = object_json.get("id", "")
|
|
if not object_id_str:
|
|
return
|
|
|
|
# Upsert
|
|
existing = (
|
|
await session.execute(
|
|
select(APRemotePost).where(APRemotePost.object_id == object_id_str)
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
published = None
|
|
pub_str = object_json.get("published")
|
|
if pub_str:
|
|
try:
|
|
published = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
|
|
# Sanitise HTML content
|
|
content = object_json.get("content", "")
|
|
|
|
if existing:
|
|
existing.content = content
|
|
existing.summary = object_json.get("summary")
|
|
existing.url = object_json.get("url")
|
|
existing.attachment_data = object_json.get("attachment")
|
|
existing.tag_data = object_json.get("tag")
|
|
existing.in_reply_to = object_json.get("inReplyTo")
|
|
existing.conversation = object_json.get("conversation")
|
|
existing.published = published or existing.published
|
|
existing.fetched_at = datetime.now(timezone.utc)
|
|
await session.flush()
|
|
return
|
|
|
|
post = APRemotePost(
|
|
remote_actor_id=remote_actor_id,
|
|
activity_id=activity_id_str,
|
|
object_id=object_id_str,
|
|
object_type=object_json.get("type", "Note"),
|
|
content=content,
|
|
summary=object_json.get("summary"),
|
|
url=object_json.get("url"),
|
|
attachment_data=object_json.get("attachment"),
|
|
tag_data=object_json.get("tag"),
|
|
in_reply_to=object_json.get("inReplyTo"),
|
|
conversation=object_json.get("conversation"),
|
|
published=published,
|
|
)
|
|
session.add(post)
|
|
await session.flush()
|
|
|
|
async def delete_remote_post(
|
|
self, session: AsyncSession, object_id: str,
|
|
) -> None:
|
|
await session.execute(
|
|
delete(APRemotePost).where(APRemotePost.object_id == object_id)
|
|
)
|
|
|
|
async def get_remote_post(
|
|
self, session: AsyncSession, object_id: str,
|
|
) -> RemotePostDTO | None:
|
|
post = (
|
|
await session.execute(
|
|
select(APRemotePost).where(APRemotePost.object_id == object_id)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not post:
|
|
return None
|
|
|
|
actor = (
|
|
await session.execute(
|
|
select(RemoteActor).where(RemoteActor.id == post.remote_actor_id)
|
|
)
|
|
).scalar_one_or_none()
|
|
|
|
return _remote_post_to_dto(post, actor)
|
|
|
|
# -- Timelines ------------------------------------------------------------
|
|
|
|
async def get_home_timeline(
|
|
self, session: AsyncSession, actor_profile_id: int,
|
|
before: datetime | None = None, limit: int = 20,
|
|
) -> list[TimelineItemDTO]:
|
|
from sqlalchemy import union_all, literal_column, cast, String as SaString
|
|
from sqlalchemy.orm import aliased
|
|
|
|
# Query 1: Remote posts from followed actors
|
|
following_subq = (
|
|
select(APFollowing.remote_actor_id)
|
|
.where(
|
|
APFollowing.actor_profile_id == actor_profile_id,
|
|
APFollowing.state == "accepted",
|
|
)
|
|
.subquery()
|
|
)
|
|
|
|
remote_q = (
|
|
select(
|
|
APRemotePost.id.label("post_id"),
|
|
literal_column("'remote'").label("post_type"),
|
|
APRemotePost.content.label("content"),
|
|
APRemotePost.summary.label("summary"),
|
|
APRemotePost.url.label("url"),
|
|
APRemotePost.published.label("published"),
|
|
APRemotePost.object_id.label("object_id"),
|
|
RemoteActor.display_name.label("actor_name"),
|
|
RemoteActor.preferred_username.label("actor_username"),
|
|
RemoteActor.domain.label("actor_domain"),
|
|
RemoteActor.icon_url.label("actor_icon"),
|
|
RemoteActor.actor_url.label("actor_url"),
|
|
RemoteActor.inbox_url.label("author_inbox"),
|
|
)
|
|
.join(RemoteActor, RemoteActor.id == APRemotePost.remote_actor_id)
|
|
.where(APRemotePost.remote_actor_id.in_(following_subq))
|
|
)
|
|
if before:
|
|
remote_q = remote_q.where(APRemotePost.published < before)
|
|
|
|
# Query 2: Local activities (Create) by this actor
|
|
local_q = (
|
|
select(
|
|
APActivity.id.label("post_id"),
|
|
literal_column("'local'").label("post_type"),
|
|
func.coalesce(
|
|
APActivity.object_data.op("->>")("content"),
|
|
literal_column("''"),
|
|
).label("content"),
|
|
APActivity.object_data.op("->>")("summary").label("summary"),
|
|
APActivity.object_data.op("->>")("url").label("url"),
|
|
APActivity.published.label("published"),
|
|
APActivity.activity_id.label("object_id"),
|
|
func.coalesce(
|
|
ActorProfile.display_name,
|
|
ActorProfile.preferred_username,
|
|
).label("actor_name"),
|
|
ActorProfile.preferred_username.label("actor_username"),
|
|
literal_column("NULL").label("actor_domain"),
|
|
literal_column("NULL").label("actor_icon"),
|
|
literal_column("NULL").label("actor_url"),
|
|
literal_column("NULL").label("author_inbox"),
|
|
)
|
|
.join(ActorProfile, ActorProfile.id == APActivity.actor_profile_id)
|
|
.where(
|
|
APActivity.actor_profile_id == actor_profile_id,
|
|
APActivity.is_local == True, # noqa: E712
|
|
APActivity.activity_type == "Create",
|
|
)
|
|
)
|
|
if before:
|
|
local_q = local_q.where(APActivity.published < before)
|
|
|
|
# Union and sort
|
|
combined = union_all(remote_q, local_q).subquery()
|
|
result = await session.execute(
|
|
select(combined)
|
|
.order_by(combined.c.published.desc())
|
|
.limit(limit)
|
|
)
|
|
|
|
items = []
|
|
for row in result.mappings().all():
|
|
# Look up interaction counts + user state
|
|
object_id = row["object_id"]
|
|
like_count = 0
|
|
boost_count = 0
|
|
liked_by_me = False
|
|
boosted_by_me = False
|
|
|
|
if object_id:
|
|
post_type_val = row["post_type"]
|
|
post_id_val = row["post_id"]
|
|
|
|
like_count = (await session.execute(
|
|
select(func.count(APInteraction.id)).where(
|
|
APInteraction.post_type == post_type_val,
|
|
APInteraction.post_id == post_id_val,
|
|
APInteraction.interaction_type == "like",
|
|
)
|
|
)).scalar() or 0
|
|
boost_count = (await session.execute(
|
|
select(func.count(APInteraction.id)).where(
|
|
APInteraction.post_type == post_type_val,
|
|
APInteraction.post_id == post_id_val,
|
|
APInteraction.interaction_type == "boost",
|
|
)
|
|
)).scalar() or 0
|
|
liked_by_me = bool((await session.execute(
|
|
select(APInteraction.id).where(
|
|
APInteraction.actor_profile_id == actor_profile_id,
|
|
APInteraction.post_type == post_type_val,
|
|
APInteraction.post_id == post_id_val,
|
|
APInteraction.interaction_type == "like",
|
|
).limit(1)
|
|
)).scalar())
|
|
boosted_by_me = bool((await session.execute(
|
|
select(APInteraction.id).where(
|
|
APInteraction.actor_profile_id == actor_profile_id,
|
|
APInteraction.post_type == post_type_val,
|
|
APInteraction.post_id == post_id_val,
|
|
APInteraction.interaction_type == "boost",
|
|
).limit(1)
|
|
)).scalar())
|
|
|
|
items.append(TimelineItemDTO(
|
|
id=f"{row['post_type']}:{row['post_id']}",
|
|
post_type=row["post_type"],
|
|
content=row["content"] or "",
|
|
published=row["published"],
|
|
actor_name=row["actor_name"] or row["actor_username"] or "",
|
|
actor_username=row["actor_username"] or "",
|
|
object_id=object_id,
|
|
summary=row["summary"],
|
|
url=row["url"],
|
|
actor_domain=row["actor_domain"],
|
|
actor_icon=row["actor_icon"],
|
|
actor_url=row["actor_url"],
|
|
like_count=like_count,
|
|
boost_count=boost_count,
|
|
liked_by_me=liked_by_me,
|
|
boosted_by_me=boosted_by_me,
|
|
author_inbox=row["author_inbox"],
|
|
))
|
|
return items
|
|
|
|
async def get_public_timeline(
|
|
self, session: AsyncSession,
|
|
before: datetime | None = None, limit: int = 20,
|
|
) -> list[TimelineItemDTO]:
|
|
# Public timeline: all local Create activities
|
|
q = (
|
|
select(APActivity, ActorProfile)
|
|
.join(ActorProfile, ActorProfile.id == APActivity.actor_profile_id)
|
|
.where(
|
|
APActivity.is_local == True, # noqa: E712
|
|
APActivity.activity_type == "Create",
|
|
)
|
|
)
|
|
if before:
|
|
q = q.where(APActivity.published < before)
|
|
q = q.order_by(APActivity.published.desc()).limit(limit)
|
|
|
|
result = await session.execute(q)
|
|
items = []
|
|
for activity, actor in result.all():
|
|
content = ""
|
|
summary = None
|
|
url = None
|
|
if activity.object_data:
|
|
content = activity.object_data.get("content", "")
|
|
summary = activity.object_data.get("summary")
|
|
url = activity.object_data.get("url")
|
|
|
|
items.append(TimelineItemDTO(
|
|
id=f"local:{activity.id}",
|
|
post_type="local",
|
|
content=content,
|
|
published=activity.published,
|
|
actor_name=actor.display_name or actor.preferred_username,
|
|
actor_username=actor.preferred_username,
|
|
object_id=activity.activity_id,
|
|
summary=summary,
|
|
url=url,
|
|
))
|
|
return items
|
|
|
|
async def get_actor_timeline(
|
|
self, session: AsyncSession, remote_actor_id: int,
|
|
before: datetime | None = None, limit: int = 20,
|
|
) -> list[TimelineItemDTO]:
|
|
remote_actor = (
|
|
await session.execute(
|
|
select(RemoteActor).where(RemoteActor.id == remote_actor_id)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not remote_actor:
|
|
return []
|
|
|
|
q = (
|
|
select(APRemotePost)
|
|
.where(APRemotePost.remote_actor_id == remote_actor_id)
|
|
)
|
|
if before:
|
|
q = q.where(APRemotePost.published < before)
|
|
q = q.order_by(APRemotePost.published.desc()).limit(limit)
|
|
|
|
posts = (await session.execute(q)).scalars().all()
|
|
return [
|
|
TimelineItemDTO(
|
|
id=f"remote:{p.id}",
|
|
post_type="remote",
|
|
content=p.content or "",
|
|
published=p.published,
|
|
actor_name=remote_actor.display_name or remote_actor.preferred_username,
|
|
actor_username=remote_actor.preferred_username,
|
|
object_id=p.object_id,
|
|
summary=p.summary,
|
|
url=p.url,
|
|
actor_domain=remote_actor.domain,
|
|
actor_icon=remote_actor.icon_url,
|
|
actor_url=remote_actor.actor_url,
|
|
author_inbox=remote_actor.inbox_url,
|
|
)
|
|
for p in posts
|
|
]
|
|
|
|
# -- Local posts ----------------------------------------------------------
|
|
|
|
async def create_local_post(
|
|
self, session: AsyncSession, actor_profile_id: int,
|
|
content: str, visibility: str = "public",
|
|
in_reply_to: str | None = None,
|
|
) -> int:
|
|
now = datetime.now(timezone.utc)
|
|
post = APLocalPost(
|
|
actor_profile_id=actor_profile_id,
|
|
content=content,
|
|
visibility=visibility,
|
|
in_reply_to=in_reply_to,
|
|
published=now,
|
|
)
|
|
session.add(post)
|
|
await session.flush()
|
|
|
|
# Get actor for publishing
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.id == actor_profile_id)
|
|
)
|
|
).scalar_one()
|
|
|
|
domain = _domain()
|
|
username = actor.preferred_username
|
|
|
|
# Convert content to simple HTML
|
|
import html as html_mod
|
|
html_content = "".join(
|
|
f"<p>{html_mod.escape(line)}</p>" if line.strip() else ""
|
|
for line in content.split("\n")
|
|
)
|
|
|
|
object_id = f"https://{domain}/users/{username}/posts/{post.id}"
|
|
object_data = {
|
|
"id": object_id,
|
|
"type": "Note",
|
|
"content": html_content,
|
|
"url": object_id,
|
|
"attributedTo": f"https://{domain}/users/{username}",
|
|
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
|
"cc": [f"https://{domain}/users/{username}/followers"],
|
|
"published": now.isoformat(),
|
|
}
|
|
if in_reply_to:
|
|
object_data["inReplyTo"] = in_reply_to
|
|
|
|
# Publish via existing activity system
|
|
await self.publish_activity(
|
|
session,
|
|
actor_user_id=actor.user_id,
|
|
activity_type="Create",
|
|
object_type="Note",
|
|
object_data=object_data,
|
|
source_type="local_post",
|
|
source_id=post.id,
|
|
)
|
|
|
|
return post.id
|
|
|
|
async def delete_local_post(
|
|
self, session: AsyncSession, actor_profile_id: int, post_id: int,
|
|
) -> None:
|
|
post = (
|
|
await session.execute(
|
|
select(APLocalPost).where(
|
|
APLocalPost.id == post_id,
|
|
APLocalPost.actor_profile_id == actor_profile_id,
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not post:
|
|
return
|
|
|
|
# Get actor
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.id == actor_profile_id)
|
|
)
|
|
).scalar_one()
|
|
|
|
domain = _domain()
|
|
object_id = f"https://{domain}/users/{actor.preferred_username}/posts/{post.id}"
|
|
|
|
# Publish Delete activity
|
|
await self.publish_activity(
|
|
session,
|
|
actor_user_id=actor.user_id,
|
|
activity_type="Delete",
|
|
object_type="Note",
|
|
object_data={"id": object_id},
|
|
source_type="local_post",
|
|
source_id=post.id,
|
|
)
|
|
|
|
await session.delete(post)
|
|
await session.flush()
|
|
|
|
# -- Interactions ---------------------------------------------------------
|
|
|
|
async def like_post(
|
|
self, session: AsyncSession, actor_profile_id: int,
|
|
object_id: str, author_inbox: str,
|
|
) -> None:
|
|
# Determine post type and id
|
|
post_type, post_id = await self._resolve_post(session, object_id)
|
|
if not post_type:
|
|
return
|
|
|
|
# Check for existing
|
|
existing = (
|
|
await session.execute(
|
|
select(APInteraction).where(
|
|
APInteraction.actor_profile_id == actor_profile_id,
|
|
APInteraction.post_type == post_type,
|
|
APInteraction.post_id == post_id,
|
|
APInteraction.interaction_type == "like",
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if existing:
|
|
return
|
|
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.id == actor_profile_id)
|
|
)
|
|
).scalar_one()
|
|
|
|
domain = _domain()
|
|
actor_url = f"https://{domain}/users/{actor.preferred_username}"
|
|
like_id = f"{actor_url}/activities/{uuid.uuid4()}"
|
|
|
|
interaction = APInteraction(
|
|
actor_profile_id=actor_profile_id,
|
|
post_type=post_type,
|
|
post_id=post_id,
|
|
interaction_type="like",
|
|
activity_id=like_id,
|
|
)
|
|
session.add(interaction)
|
|
await session.flush()
|
|
|
|
# Send Like to author
|
|
if author_inbox:
|
|
await self._send_activity_to_inbox(
|
|
actor, {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"id": like_id,
|
|
"type": "Like",
|
|
"actor": actor_url,
|
|
"object": object_id,
|
|
}, author_inbox,
|
|
)
|
|
|
|
async def unlike_post(
|
|
self, session: AsyncSession, actor_profile_id: int,
|
|
object_id: str, author_inbox: str,
|
|
) -> None:
|
|
post_type, post_id = await self._resolve_post(session, object_id)
|
|
if not post_type:
|
|
return
|
|
|
|
interaction = (
|
|
await session.execute(
|
|
select(APInteraction).where(
|
|
APInteraction.actor_profile_id == actor_profile_id,
|
|
APInteraction.post_type == post_type,
|
|
APInteraction.post_id == post_id,
|
|
APInteraction.interaction_type == "like",
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not interaction:
|
|
return
|
|
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.id == actor_profile_id)
|
|
)
|
|
).scalar_one()
|
|
|
|
domain = _domain()
|
|
actor_url = f"https://{domain}/users/{actor.preferred_username}"
|
|
|
|
# Send Undo(Like)
|
|
if author_inbox and interaction.activity_id:
|
|
await self._send_activity_to_inbox(
|
|
actor, {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"id": f"{actor_url}/activities/{uuid.uuid4()}",
|
|
"type": "Undo",
|
|
"actor": actor_url,
|
|
"object": {
|
|
"id": interaction.activity_id,
|
|
"type": "Like",
|
|
"actor": actor_url,
|
|
"object": object_id,
|
|
},
|
|
}, author_inbox,
|
|
)
|
|
|
|
await session.delete(interaction)
|
|
await session.flush()
|
|
|
|
async def boost_post(
|
|
self, session: AsyncSession, actor_profile_id: int,
|
|
object_id: str, author_inbox: str,
|
|
) -> None:
|
|
post_type, post_id = await self._resolve_post(session, object_id)
|
|
if not post_type:
|
|
return
|
|
|
|
existing = (
|
|
await session.execute(
|
|
select(APInteraction).where(
|
|
APInteraction.actor_profile_id == actor_profile_id,
|
|
APInteraction.post_type == post_type,
|
|
APInteraction.post_id == post_id,
|
|
APInteraction.interaction_type == "boost",
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if existing:
|
|
return
|
|
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.id == actor_profile_id)
|
|
)
|
|
).scalar_one()
|
|
|
|
domain = _domain()
|
|
actor_url = f"https://{domain}/users/{actor.preferred_username}"
|
|
announce_id = f"{actor_url}/activities/{uuid.uuid4()}"
|
|
|
|
interaction = APInteraction(
|
|
actor_profile_id=actor_profile_id,
|
|
post_type=post_type,
|
|
post_id=post_id,
|
|
interaction_type="boost",
|
|
activity_id=announce_id,
|
|
)
|
|
session.add(interaction)
|
|
await session.flush()
|
|
|
|
# Send Announce to author and deliver to followers via publish_activity
|
|
if author_inbox:
|
|
announce_activity = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"id": announce_id,
|
|
"type": "Announce",
|
|
"actor": actor_url,
|
|
"object": object_id,
|
|
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
|
"cc": [f"{actor_url}/followers"],
|
|
}
|
|
await self._send_activity_to_inbox(actor, announce_activity, author_inbox)
|
|
|
|
# Also publish as our own activity for delivery to our followers
|
|
await self.publish_activity(
|
|
session,
|
|
actor_user_id=actor.user_id,
|
|
activity_type="Announce",
|
|
object_type="Note",
|
|
object_data={"id": object_id},
|
|
)
|
|
|
|
async def unboost_post(
|
|
self, session: AsyncSession, actor_profile_id: int,
|
|
object_id: str, author_inbox: str,
|
|
) -> None:
|
|
post_type, post_id = await self._resolve_post(session, object_id)
|
|
if not post_type:
|
|
return
|
|
|
|
interaction = (
|
|
await session.execute(
|
|
select(APInteraction).where(
|
|
APInteraction.actor_profile_id == actor_profile_id,
|
|
APInteraction.post_type == post_type,
|
|
APInteraction.post_id == post_id,
|
|
APInteraction.interaction_type == "boost",
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not interaction:
|
|
return
|
|
|
|
actor = (
|
|
await session.execute(
|
|
select(ActorProfile).where(ActorProfile.id == actor_profile_id)
|
|
)
|
|
).scalar_one()
|
|
|
|
domain = _domain()
|
|
actor_url = f"https://{domain}/users/{actor.preferred_username}"
|
|
|
|
if author_inbox and interaction.activity_id:
|
|
await self._send_activity_to_inbox(
|
|
actor, {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"id": f"{actor_url}/activities/{uuid.uuid4()}",
|
|
"type": "Undo",
|
|
"actor": actor_url,
|
|
"object": {
|
|
"id": interaction.activity_id,
|
|
"type": "Announce",
|
|
"actor": actor_url,
|
|
"object": object_id,
|
|
},
|
|
}, author_inbox,
|
|
)
|
|
|
|
await session.delete(interaction)
|
|
await session.flush()
|
|
|
|
async def _resolve_post(
|
|
self, session: AsyncSession, object_id: str,
|
|
) -> tuple[str | None, int | None]:
|
|
"""Resolve an AP object_id to (post_type, post_id)."""
|
|
# Check remote posts
|
|
remote = (
|
|
await session.execute(
|
|
select(APRemotePost.id).where(APRemotePost.object_id == object_id).limit(1)
|
|
)
|
|
).scalar()
|
|
if remote:
|
|
return "remote", remote
|
|
|
|
# Check local activities
|
|
local = (
|
|
await session.execute(
|
|
select(APActivity.id).where(APActivity.activity_id == object_id).limit(1)
|
|
)
|
|
).scalar()
|
|
if local:
|
|
return "local", local
|
|
|
|
return None, None
|
|
|
|
async def _send_activity_to_inbox(
|
|
self, actor: ActorProfile, activity: dict, inbox_url: str,
|
|
) -> None:
|
|
import json
|
|
import httpx
|
|
from shared.utils.http_signatures import sign_request
|
|
from urllib.parse import urlparse
|
|
|
|
domain = _domain()
|
|
actor_url = f"https://{domain}/users/{actor.preferred_username}"
|
|
|
|
body_bytes = json.dumps(activity).encode()
|
|
parsed = urlparse(inbox_url)
|
|
headers = sign_request(
|
|
private_key_pem=actor.private_key_pem,
|
|
key_id=f"{actor_url}#main-key",
|
|
method="POST",
|
|
path=parsed.path,
|
|
host=parsed.netloc,
|
|
body=body_bytes,
|
|
)
|
|
headers["Content-Type"] = "application/activity+json"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
await client.post(inbox_url, content=body_bytes, headers=headers)
|
|
except Exception:
|
|
import logging
|
|
logging.getLogger(__name__).exception(
|
|
"Failed to deliver activity to %s", inbox_url,
|
|
)
|
|
|
|
# -- Notifications --------------------------------------------------------
|
|
|
|
async def get_notifications(
|
|
self, session: AsyncSession, actor_profile_id: int,
|
|
before: datetime | None = None, limit: int = 20,
|
|
) -> list[NotificationDTO]:
|
|
q = (
|
|
select(APNotification, RemoteActor, ActorProfile)
|
|
.outerjoin(RemoteActor, RemoteActor.id == APNotification.from_remote_actor_id)
|
|
.outerjoin(
|
|
ActorProfile,
|
|
ActorProfile.id == APNotification.from_actor_profile_id,
|
|
)
|
|
.where(APNotification.actor_profile_id == actor_profile_id)
|
|
)
|
|
if before:
|
|
q = q.where(APNotification.created_at < before)
|
|
q = q.order_by(APNotification.created_at.desc()).limit(limit)
|
|
|
|
result = await session.execute(q)
|
|
items = []
|
|
for notif, remote_actor, from_actor_profile in result.all():
|
|
if remote_actor:
|
|
name = remote_actor.display_name or remote_actor.preferred_username
|
|
username = remote_actor.preferred_username
|
|
domain = remote_actor.domain
|
|
icon = remote_actor.icon_url
|
|
elif from_actor_profile:
|
|
name = from_actor_profile.display_name or from_actor_profile.preferred_username
|
|
username = from_actor_profile.preferred_username
|
|
domain = None
|
|
icon = None
|
|
else:
|
|
name = "Unknown"
|
|
username = "unknown"
|
|
domain = None
|
|
icon = None
|
|
|
|
# Get preview if target exists
|
|
preview = None
|
|
if notif.target_activity_id:
|
|
act = (await session.execute(
|
|
select(APActivity).where(APActivity.id == notif.target_activity_id)
|
|
)).scalar_one_or_none()
|
|
if act and act.object_data:
|
|
content = act.object_data.get("content", "")
|
|
# Strip HTML tags for preview
|
|
import re
|
|
preview = re.sub(r"<[^>]+>", "", content)[:100]
|
|
elif notif.target_remote_post_id:
|
|
rp = (await session.execute(
|
|
select(APRemotePost).where(APRemotePost.id == notif.target_remote_post_id)
|
|
)).scalar_one_or_none()
|
|
if rp and rp.content:
|
|
import re
|
|
preview = re.sub(r"<[^>]+>", "", rp.content)[:100]
|
|
|
|
items.append(NotificationDTO(
|
|
id=notif.id,
|
|
notification_type=notif.notification_type,
|
|
from_actor_name=name,
|
|
from_actor_username=username,
|
|
from_actor_domain=domain,
|
|
from_actor_icon=icon,
|
|
target_content_preview=preview,
|
|
created_at=notif.created_at,
|
|
read=notif.read,
|
|
))
|
|
return items
|
|
|
|
async def unread_notification_count(
|
|
self, session: AsyncSession, actor_profile_id: int,
|
|
) -> int:
|
|
return (
|
|
await session.execute(
|
|
select(func.count(APNotification.id)).where(
|
|
APNotification.actor_profile_id == actor_profile_id,
|
|
APNotification.read == False, # noqa: E712
|
|
)
|
|
)
|
|
).scalar() or 0
|
|
|
|
async def mark_notifications_read(
|
|
self, session: AsyncSession, actor_profile_id: int,
|
|
) -> None:
|
|
from sqlalchemy import update
|
|
await session.execute(
|
|
update(APNotification)
|
|
.where(
|
|
APNotification.actor_profile_id == actor_profile_id,
|
|
APNotification.read == False, # noqa: E712
|
|
)
|
|
.values(read=True)
|
|
)
|
|
|
|
# -- Stats ----------------------------------------------------------------
|
|
|
|
async def get_stats(self, session: AsyncSession) -> dict:
|
|
actors = (await session.execute(select(func.count(ActorProfile.id)))).scalar() or 0
|
|
activities = (await session.execute(select(func.count(APActivity.id)))).scalar() or 0
|
|
followers = (await session.execute(select(func.count(APFollower.id)))).scalar() or 0
|
|
return {"actors": actors, "activities": activities, "followers": followers}
|