"""Social fediverse routes: timeline, compose, search, follow, interactions, notifications.""" from __future__ import annotations import logging from datetime import datetime from quart import Blueprint, request, g, redirect, url_for, abort, Response from shared.services.registry import services from shared.sx.helpers import sx_response, sx_call log = logging.getLogger(__name__) def _require_actor(): """Return actor context or abort 403.""" actor = g.get("ctx", {}).get("actor") if hasattr(g, "ctx") else None if not actor: actor = getattr(g, "_social_actor", None) if not actor: abort(403, "You need to choose a federation username first") return actor def register(url_prefix="/social"): bp = Blueprint("social", __name__, url_prefix=url_prefix) @bp.before_request async def load_actor(): """Load actor profile for authenticated users.""" if g.get("user"): actor = await services.federation.get_actor_by_user_id(g.s, g.user.id) g._social_actor = actor # -- Timeline pagination --------------------------------------------------- @bp.get("/timeline") async def home_timeline_page(): actor = _require_actor() before_str = request.args.get("before") before = None if before_str: try: before = datetime.fromisoformat(before_str) except ValueError: pass items = await services.federation.get_home_timeline( g.s, actor.id, before=before, ) sx_src = _render_timeline_items(items, "home", actor) return sx_response(sx_src) @bp.get("/public/timeline") async def public_timeline_page(): before_str = request.args.get("before") before = None if before_str: try: before = datetime.fromisoformat(before_str) except ValueError: pass items = await services.federation.get_public_timeline(g.s, before=before) actor = getattr(g, "_social_actor", None) sx_src = _render_timeline_items(items, "public", actor) return sx_response(sx_src) # -- Compose --------------------------------------------------------------- @bp.post("/compose") async def compose_submit(): actor = _require_actor() form = await request.form content = form.get("content", "").strip() if not content: return redirect(url_for("defpage_compose_form")) visibility = form.get("visibility", "public") in_reply_to = form.get("in_reply_to") or None await services.federation.create_local_post( g.s, actor.id, content=content, visibility=visibility, in_reply_to=in_reply_to, ) return redirect(url_for("defpage_home_timeline")) @bp.post("/delete/") async def delete_post(post_id: int): actor = _require_actor() await services.federation.delete_local_post(g.s, actor.id, post_id) return redirect(url_for("defpage_home_timeline")) # -- Search + Follow ------------------------------------------------------- @bp.get("/search/page") async def search_page(): from sxc.pages.utils import _serialize_remote_actor, _serialize_actor actor = getattr(g, "_social_actor", None) query = request.args.get("q", "").strip() page = request.args.get("page", 1, type=int) actors_list = [] total = 0 followed_urls: set[str] = set() if query: actors_list, total = await services.federation.search_actors( g.s, query, page=page, ) if actor: following, _ = await services.federation.get_following( g.s, actor.preferred_username, page=1, per_page=1000, ) followed_urls = {a.actor_url for a in following} actor_dicts = [_serialize_remote_actor(a) for a in actors_list] actor_data = _serialize_actor(actor) parts = [] for ad in actor_dicts: parts.append(sx_call("federation-actor-card-from-data", a=ad, actor=actor_data, followed_urls=list(followed_urls), list_type="search")) if len(actors_list) >= 20: next_url = url_for("social.search_page", q=query, page=page + 1) parts.append(sx_call("federation-scroll-sentinel", url=next_url)) sx_src = "(<> " + " ".join(parts) + ")" if parts else "" return sx_response(sx_src) @bp.post("/follow") async def follow(): actor = _require_actor() form = await request.form remote_actor_url = form.get("actor_url", "") if remote_actor_url: await services.federation.send_follow( g.s, actor.preferred_username, remote_actor_url, ) if request.headers.get("SX-Request") or request.headers.get("HX-Request"): return await _actor_card_response(actor, remote_actor_url, is_followed=True) return redirect(request.referrer or url_for("defpage_search")) @bp.post("/unfollow") async def unfollow(): actor = _require_actor() form = await request.form remote_actor_url = form.get("actor_url", "") if remote_actor_url: await services.federation.unfollow( g.s, actor.preferred_username, remote_actor_url, ) if request.headers.get("SX-Request") or request.headers.get("HX-Request"): return await _actor_card_response(actor, remote_actor_url, is_followed=False) return redirect(request.referrer or url_for("defpage_search")) async def _actor_card_response(actor, remote_actor_url, is_followed): """Re-render a single actor card after follow/unfollow via HTMX.""" from sxc.pages.utils import _serialize_remote_actor, _serialize_actor remote_dto = await services.federation.get_or_fetch_remote_actor( g.s, remote_actor_url, ) if not remote_dto: return Response("", status=200) followed_urls = {remote_actor_url} if is_followed else set() referer = request.referrer or "" list_type = "followers" if "/followers" in referer else "following" actor_data = _serialize_actor(actor) ad = _serialize_remote_actor(remote_dto) return sx_response(sx_call("federation-actor-card-from-data", a=ad, actor=actor_data, followed_urls=list(followed_urls), list_type=list_type)) # -- Interactions ---------------------------------------------------------- @bp.post("/like") async def like(): actor = _require_actor() form = await request.form object_id = form.get("object_id", "") author_inbox = form.get("author_inbox", "") await services.federation.like_post(g.s, actor.id, object_id, author_inbox) return await _interaction_buttons_response(actor, object_id, author_inbox) @bp.post("/unlike") async def unlike(): actor = _require_actor() form = await request.form object_id = form.get("object_id", "") author_inbox = form.get("author_inbox", "") await services.federation.unlike_post(g.s, actor.id, object_id, author_inbox) return await _interaction_buttons_response(actor, object_id, author_inbox) @bp.post("/boost") async def boost(): actor = _require_actor() form = await request.form object_id = form.get("object_id", "") author_inbox = form.get("author_inbox", "") await services.federation.boost_post(g.s, actor.id, object_id, author_inbox) return await _interaction_buttons_response(actor, object_id, author_inbox) @bp.post("/unboost") async def unboost(): actor = _require_actor() form = await request.form object_id = form.get("object_id", "") author_inbox = form.get("author_inbox", "") await services.federation.unboost_post(g.s, actor.id, object_id, author_inbox) return await _interaction_buttons_response(actor, object_id, author_inbox) async def _interaction_buttons_response(actor, object_id, author_inbox): """Re-render interaction buttons after a like/boost action.""" from shared.models.federation import APInteraction from shared.browser.app.csrf import generate_csrf_token from sqlalchemy import select svc = services.federation post_type, post_id = await svc._resolve_post(g.s, object_id) like_count = 0 boost_count = 0 liked_by_me = False boosted_by_me = False if post_type: from sqlalchemy import func as sa_func like_count = (await g.s.execute( select(sa_func.count(APInteraction.id)).where( APInteraction.post_type == post_type, APInteraction.post_id == post_id, APInteraction.interaction_type == "like", ) )).scalar() or 0 boost_count = (await g.s.execute( select(sa_func.count(APInteraction.id)).where( APInteraction.post_type == post_type, APInteraction.post_id == post_id, APInteraction.interaction_type == "boost", ) )).scalar() or 0 liked_by_me = bool((await g.s.execute( select(APInteraction.id).where( APInteraction.actor_profile_id == actor.id, APInteraction.post_type == post_type, APInteraction.post_id == post_id, APInteraction.interaction_type == "like", ).limit(1) )).scalar()) boosted_by_me = bool((await g.s.execute( select(APInteraction.id).where( APInteraction.actor_profile_id == actor.id, APInteraction.post_type == post_type, APInteraction.post_id == post_id, APInteraction.interaction_type == "boost", ).limit(1) )).scalar()) csrf = generate_csrf_token() safe_id = object_id.replace("/", "_").replace(":", "_") target = f"#interactions-{safe_id}" if liked_by_me: like_action = url_for("social.unlike") like_cls = "text-red-500 hover:text-red-600" like_icon = "\u2665" else: like_action = url_for("social.like") like_cls = "hover:text-red-500" like_icon = "\u2661" if boosted_by_me: boost_action = url_for("social.unboost") boost_cls = "text-green-600 hover:text-green-700" else: boost_action = url_for("social.boost") boost_cls = "hover:text-green-600" reply_url = url_for("social.defpage_compose_form", reply_to=object_id) if object_id else "" reply_sx = sx_call("federation-reply-link", url=reply_url) if reply_url else "" like_form = sx_call("federation-like-form", action=like_action, target=target, oid=object_id, ainbox=author_inbox, csrf=csrf, cls=f"flex items-center gap-1 {like_cls}", icon=like_icon, count=str(like_count)) boost_form = sx_call("federation-boost-form", action=boost_action, target=target, oid=object_id, ainbox=author_inbox, csrf=csrf, cls=f"flex items-center gap-1 {boost_cls}", count=str(boost_count)) return sx_response(sx_call("federation-interaction-buttons", like=like_form, boost=boost_form, reply=reply_sx or None)) # -- Following / Followers pagination -------------------------------------- @bp.get("/following/page") async def following_list_page(): from sxc.pages.utils import _serialize_remote_actor, _serialize_actor actor = _require_actor() page = request.args.get("page", 1, type=int) actors_list, total = await services.federation.get_following( g.s, actor.preferred_username, page=page, ) actor_dicts = [_serialize_remote_actor(a) for a in actors_list] actor_data = _serialize_actor(actor) parts = [] for ad in actor_dicts: parts.append(sx_call("federation-actor-card-from-data", a=ad, actor=actor_data, followed_urls=[], list_type="following")) if len(actors_list) >= 20: next_url = url_for("social.following_list_page", page=page + 1) parts.append(sx_call("federation-scroll-sentinel", url=next_url)) sx_src = "(<> " + " ".join(parts) + ")" if parts else "" return sx_response(sx_src) @bp.get("/followers/page") async def followers_list_page(): from sxc.pages.utils import _serialize_remote_actor, _serialize_actor actor = _require_actor() page = request.args.get("page", 1, type=int) actors_list, total = await services.federation.get_followers_paginated( g.s, actor.preferred_username, page=page, ) following, _ = await services.federation.get_following( g.s, actor.preferred_username, page=1, per_page=1000, ) followed_urls = {a.actor_url for a in following} actor_dicts = [_serialize_remote_actor(a) for a in actors_list] actor_data = _serialize_actor(actor) parts = [] for ad in actor_dicts: parts.append(sx_call("federation-actor-card-from-data", a=ad, actor=actor_data, followed_urls=list(followed_urls), list_type="followers")) if len(actors_list) >= 20: next_url = url_for("social.followers_list_page", page=page + 1) parts.append(sx_call("federation-scroll-sentinel", url=next_url)) sx_src = "(<> " + " ".join(parts) + ")" if parts else "" return sx_response(sx_src) @bp.get("/actor//timeline") async def actor_timeline_page(id: int): actor = getattr(g, "_social_actor", None) before_str = request.args.get("before") before = None if before_str: try: before = datetime.fromisoformat(before_str) except ValueError: pass items = await services.federation.get_actor_timeline( g.s, id, before=before, ) sx_src = _render_timeline_items(items, "actor", actor, id) return sx_response(sx_src) # -- Notifications --------------------------------------------------------- @bp.get("/notifications/count") async def notification_count(): actor = getattr(g, "_social_actor", None) if not actor: return Response("0", content_type="text/plain") count = await services.federation.unread_notification_count(g.s, actor.id) if count > 0: from shared.sx.jinja_bridge import render as render_comp return Response( render_comp("notification-badge", count=str(count)), content_type="text/html", ) return Response("", content_type="text/html") @bp.post("/notifications/read") async def mark_read(): actor = _require_actor() await services.federation.mark_notifications_read(g.s, actor.id) return redirect(url_for("defpage_notifications")) return bp def _render_timeline_items(items, timeline_type, actor, actor_id=None): """Render timeline pagination items as SX fragment.""" from sxc.pages.utils import _serialize_timeline_item, _serialize_actor item_dicts = [_serialize_timeline_item(i) for i in items] actor_data = _serialize_actor(actor) next_url = None if items: last = items[-1] before = last.published.isoformat() if last.published else "" if timeline_type == "actor" and actor_id is not None: next_url = url_for("social.actor_timeline_page", id=actor_id, before=before) else: next_url = url_for(f"social.{timeline_type}_timeline_page", before=before) return sx_call("federation-timeline-items", items=item_dicts, timeline_type=timeline_type, actor=actor_data, next_url=next_url)