All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 49s
Replace single WebFinger lookup with paginated search across cached remote actors and local profiles. New _search_results.html partial with htmx infinite scroll sentinel. Form submits via hx-get for seamless pagination. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
500 lines
18 KiB
Python
500 lines
18 KiB
Python
"""Social fediverse routes: timeline, compose, search, follow, interactions, notifications."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
from quart import Blueprint, request, g, redirect, url_for, abort, render_template, Response
|
|
|
|
from shared.services.registry import services
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _require_actor():
|
|
"""Return actor context or abort 403."""
|
|
actor = g.get("ctx", {}).get("actor") if hasattr(g, "ctx") else None
|
|
if not actor:
|
|
actor = getattr(g, "_social_actor", None)
|
|
if not actor:
|
|
abort(403, "You need to choose a federation username first")
|
|
return actor
|
|
|
|
|
|
def register(url_prefix="/social"):
|
|
bp = Blueprint("social", __name__, url_prefix=url_prefix)
|
|
|
|
@bp.before_request
|
|
async def load_actor():
|
|
"""Load actor profile for authenticated users."""
|
|
if g.get("user"):
|
|
actor = await services.federation.get_actor_by_user_id(g.s, g.user.id)
|
|
g._social_actor = actor
|
|
|
|
# -- Timeline -------------------------------------------------------------
|
|
|
|
@bp.get("/")
|
|
async def home_timeline():
|
|
if not g.get("user"):
|
|
return redirect(url_for("auth.login_form"))
|
|
actor = _require_actor()
|
|
items = await services.federation.get_home_timeline(g.s, actor.id)
|
|
return await render_template(
|
|
"federation/timeline.html",
|
|
items=items,
|
|
timeline_type="home",
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/timeline")
|
|
async def home_timeline_page():
|
|
actor = _require_actor()
|
|
before_str = request.args.get("before")
|
|
before = None
|
|
if before_str:
|
|
try:
|
|
before = datetime.fromisoformat(before_str)
|
|
except ValueError:
|
|
pass
|
|
items = await services.federation.get_home_timeline(
|
|
g.s, actor.id, before=before,
|
|
)
|
|
return await render_template(
|
|
"federation/_timeline_items.html",
|
|
items=items,
|
|
timeline_type="home",
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/public")
|
|
async def public_timeline():
|
|
items = await services.federation.get_public_timeline(g.s)
|
|
actor = getattr(g, "_social_actor", None)
|
|
return await render_template(
|
|
"federation/timeline.html",
|
|
items=items,
|
|
timeline_type="public",
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/public/timeline")
|
|
async def public_timeline_page():
|
|
before_str = request.args.get("before")
|
|
before = None
|
|
if before_str:
|
|
try:
|
|
before = datetime.fromisoformat(before_str)
|
|
except ValueError:
|
|
pass
|
|
items = await services.federation.get_public_timeline(g.s, before=before)
|
|
actor = getattr(g, "_social_actor", None)
|
|
return await render_template(
|
|
"federation/_timeline_items.html",
|
|
items=items,
|
|
timeline_type="public",
|
|
actor=actor,
|
|
)
|
|
|
|
# -- Compose --------------------------------------------------------------
|
|
|
|
@bp.get("/compose")
|
|
async def compose_form():
|
|
actor = _require_actor()
|
|
reply_to = request.args.get("reply_to")
|
|
return await render_template(
|
|
"federation/compose.html",
|
|
actor=actor,
|
|
reply_to=reply_to,
|
|
)
|
|
|
|
@bp.post("/compose")
|
|
async def compose_submit():
|
|
actor = _require_actor()
|
|
form = await request.form
|
|
content = form.get("content", "").strip()
|
|
if not content:
|
|
return redirect(url_for("social.compose_form"))
|
|
|
|
visibility = form.get("visibility", "public")
|
|
in_reply_to = form.get("in_reply_to") or None
|
|
|
|
await services.federation.create_local_post(
|
|
g.s, actor.id,
|
|
content=content,
|
|
visibility=visibility,
|
|
in_reply_to=in_reply_to,
|
|
)
|
|
return redirect(url_for("social.home_timeline"))
|
|
|
|
@bp.post("/delete/<int:post_id>")
|
|
async def delete_post(post_id: int):
|
|
actor = _require_actor()
|
|
await services.federation.delete_local_post(g.s, actor.id, post_id)
|
|
return redirect(url_for("social.home_timeline"))
|
|
|
|
# -- Search + Follow ------------------------------------------------------
|
|
|
|
@bp.get("/search")
|
|
async def search():
|
|
actor = getattr(g, "_social_actor", None)
|
|
query = request.args.get("q", "").strip()
|
|
actors = []
|
|
total = 0
|
|
followed_urls: set[str] = set()
|
|
if query:
|
|
actors, total = await services.federation.search_actors(g.s, query)
|
|
if actor:
|
|
following, _ = await services.federation.get_following(
|
|
g.s, actor.preferred_username, page=1, per_page=1000,
|
|
)
|
|
followed_urls = {a.actor_url for a in following}
|
|
return await render_template(
|
|
"federation/search.html",
|
|
query=query,
|
|
actors=actors,
|
|
total=total,
|
|
page=1,
|
|
followed_urls=followed_urls,
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/search/page")
|
|
async def search_page():
|
|
actor = getattr(g, "_social_actor", None)
|
|
query = request.args.get("q", "").strip()
|
|
page = request.args.get("page", 1, type=int)
|
|
actors = []
|
|
total = 0
|
|
followed_urls: set[str] = set()
|
|
if query:
|
|
actors, total = await services.federation.search_actors(
|
|
g.s, query, page=page,
|
|
)
|
|
if actor:
|
|
following, _ = await services.federation.get_following(
|
|
g.s, actor.preferred_username, page=1, per_page=1000,
|
|
)
|
|
followed_urls = {a.actor_url for a in following}
|
|
return await render_template(
|
|
"federation/_search_results.html",
|
|
actors=actors,
|
|
total=total,
|
|
page=page,
|
|
query=query,
|
|
followed_urls=followed_urls,
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.post("/follow")
|
|
async def follow():
|
|
actor = _require_actor()
|
|
form = await request.form
|
|
remote_actor_url = form.get("actor_url", "")
|
|
if remote_actor_url:
|
|
await services.federation.send_follow(
|
|
g.s, actor.preferred_username, remote_actor_url,
|
|
)
|
|
if request.headers.get("HX-Request"):
|
|
return await _actor_card_response(actor, remote_actor_url, is_followed=True)
|
|
return redirect(request.referrer or url_for("social.search"))
|
|
|
|
@bp.post("/unfollow")
|
|
async def unfollow():
|
|
actor = _require_actor()
|
|
form = await request.form
|
|
remote_actor_url = form.get("actor_url", "")
|
|
if remote_actor_url:
|
|
await services.federation.unfollow(
|
|
g.s, actor.preferred_username, remote_actor_url,
|
|
)
|
|
if request.headers.get("HX-Request"):
|
|
return await _actor_card_response(actor, remote_actor_url, is_followed=False)
|
|
return redirect(request.referrer or url_for("social.search"))
|
|
|
|
async def _actor_card_response(actor, remote_actor_url, is_followed):
|
|
"""Re-render a single actor card after follow/unfollow via HTMX."""
|
|
remote_dto = await services.federation.get_or_fetch_remote_actor(
|
|
g.s, remote_actor_url,
|
|
)
|
|
if not remote_dto:
|
|
return Response("", status=200)
|
|
followed_urls = {remote_actor_url} if is_followed else set()
|
|
# Detect list context from referer
|
|
referer = request.referrer or ""
|
|
if "/followers" in referer:
|
|
list_type = "followers"
|
|
else:
|
|
list_type = "following"
|
|
return await render_template(
|
|
"federation/_actor_list_items.html",
|
|
actors=[remote_dto],
|
|
total=0,
|
|
page=1,
|
|
list_type=list_type,
|
|
followed_urls=followed_urls,
|
|
actor=actor,
|
|
)
|
|
|
|
# -- Interactions ---------------------------------------------------------
|
|
|
|
@bp.post("/like")
|
|
async def like():
|
|
actor = _require_actor()
|
|
form = await request.form
|
|
object_id = form.get("object_id", "")
|
|
author_inbox = form.get("author_inbox", "")
|
|
await services.federation.like_post(g.s, actor.id, object_id, author_inbox)
|
|
# Return updated buttons for HTMX
|
|
return await _interaction_buttons_response(actor, object_id, author_inbox)
|
|
|
|
@bp.post("/unlike")
|
|
async def unlike():
|
|
actor = _require_actor()
|
|
form = await request.form
|
|
object_id = form.get("object_id", "")
|
|
author_inbox = form.get("author_inbox", "")
|
|
await services.federation.unlike_post(g.s, actor.id, object_id, author_inbox)
|
|
return await _interaction_buttons_response(actor, object_id, author_inbox)
|
|
|
|
@bp.post("/boost")
|
|
async def boost():
|
|
actor = _require_actor()
|
|
form = await request.form
|
|
object_id = form.get("object_id", "")
|
|
author_inbox = form.get("author_inbox", "")
|
|
await services.federation.boost_post(g.s, actor.id, object_id, author_inbox)
|
|
return await _interaction_buttons_response(actor, object_id, author_inbox)
|
|
|
|
@bp.post("/unboost")
|
|
async def unboost():
|
|
actor = _require_actor()
|
|
form = await request.form
|
|
object_id = form.get("object_id", "")
|
|
author_inbox = form.get("author_inbox", "")
|
|
await services.federation.unboost_post(g.s, actor.id, object_id, author_inbox)
|
|
return await _interaction_buttons_response(actor, object_id, author_inbox)
|
|
|
|
async def _interaction_buttons_response(actor, object_id, author_inbox):
|
|
"""Re-render interaction buttons after a like/boost action."""
|
|
from shared.models.federation import APInteraction, APRemotePost, APActivity
|
|
from sqlalchemy import select
|
|
from shared.services.federation_impl import SqlFederationService
|
|
|
|
svc = services.federation
|
|
post_type, post_id = await svc._resolve_post(g.s, object_id)
|
|
|
|
like_count = 0
|
|
boost_count = 0
|
|
liked_by_me = False
|
|
boosted_by_me = False
|
|
|
|
if post_type:
|
|
from sqlalchemy import func as sa_func
|
|
like_count = (await g.s.execute(
|
|
select(sa_func.count(APInteraction.id)).where(
|
|
APInteraction.post_type == post_type,
|
|
APInteraction.post_id == post_id,
|
|
APInteraction.interaction_type == "like",
|
|
)
|
|
)).scalar() or 0
|
|
boost_count = (await g.s.execute(
|
|
select(sa_func.count(APInteraction.id)).where(
|
|
APInteraction.post_type == post_type,
|
|
APInteraction.post_id == post_id,
|
|
APInteraction.interaction_type == "boost",
|
|
)
|
|
)).scalar() or 0
|
|
liked_by_me = bool((await g.s.execute(
|
|
select(APInteraction.id).where(
|
|
APInteraction.actor_profile_id == actor.id,
|
|
APInteraction.post_type == post_type,
|
|
APInteraction.post_id == post_id,
|
|
APInteraction.interaction_type == "like",
|
|
).limit(1)
|
|
)).scalar())
|
|
boosted_by_me = bool((await g.s.execute(
|
|
select(APInteraction.id).where(
|
|
APInteraction.actor_profile_id == actor.id,
|
|
APInteraction.post_type == post_type,
|
|
APInteraction.post_id == post_id,
|
|
APInteraction.interaction_type == "boost",
|
|
).limit(1)
|
|
)).scalar())
|
|
|
|
return await render_template(
|
|
"federation/_interaction_buttons.html",
|
|
item_object_id=object_id,
|
|
item_author_inbox=author_inbox,
|
|
like_count=like_count,
|
|
boost_count=boost_count,
|
|
liked_by_me=liked_by_me,
|
|
boosted_by_me=boosted_by_me,
|
|
actor=actor,
|
|
)
|
|
|
|
# -- Following / Followers ------------------------------------------------
|
|
|
|
@bp.get("/following")
|
|
async def following_list():
|
|
actor = _require_actor()
|
|
actors, total = await services.federation.get_following(
|
|
g.s, actor.preferred_username,
|
|
)
|
|
return await render_template(
|
|
"federation/following.html",
|
|
actors=actors,
|
|
total=total,
|
|
page=1,
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/following/page")
|
|
async def following_list_page():
|
|
actor = _require_actor()
|
|
page = request.args.get("page", 1, type=int)
|
|
actors, total = await services.federation.get_following(
|
|
g.s, actor.preferred_username, page=page,
|
|
)
|
|
return await render_template(
|
|
"federation/_actor_list_items.html",
|
|
actors=actors,
|
|
total=total,
|
|
page=page,
|
|
list_type="following",
|
|
followed_urls=set(),
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/followers")
|
|
async def followers_list():
|
|
actor = _require_actor()
|
|
actors, total = await services.federation.get_followers_paginated(
|
|
g.s, actor.preferred_username,
|
|
)
|
|
# Build set of followed actor URLs to show Follow Back vs Unfollow
|
|
following, _ = await services.federation.get_following(
|
|
g.s, actor.preferred_username, page=1, per_page=1000,
|
|
)
|
|
followed_urls = {a.actor_url for a in following}
|
|
return await render_template(
|
|
"federation/followers.html",
|
|
actors=actors,
|
|
total=total,
|
|
page=1,
|
|
followed_urls=followed_urls,
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/followers/page")
|
|
async def followers_list_page():
|
|
actor = _require_actor()
|
|
page = request.args.get("page", 1, type=int)
|
|
actors, total = await services.federation.get_followers_paginated(
|
|
g.s, actor.preferred_username, page=page,
|
|
)
|
|
following, _ = await services.federation.get_following(
|
|
g.s, actor.preferred_username, page=1, per_page=1000,
|
|
)
|
|
followed_urls = {a.actor_url for a in following}
|
|
return await render_template(
|
|
"federation/_actor_list_items.html",
|
|
actors=actors,
|
|
total=total,
|
|
page=page,
|
|
list_type="followers",
|
|
followed_urls=followed_urls,
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/actor/<int:id>")
|
|
async def actor_timeline(id: int):
|
|
actor = getattr(g, "_social_actor", None)
|
|
# Get remote actor info
|
|
from shared.models.federation import RemoteActor
|
|
from sqlalchemy import select as sa_select
|
|
remote = (
|
|
await g.s.execute(
|
|
sa_select(RemoteActor).where(RemoteActor.id == id)
|
|
)
|
|
).scalar_one_or_none()
|
|
if not remote:
|
|
abort(404)
|
|
from shared.services.federation_impl import _remote_actor_to_dto
|
|
remote_dto = _remote_actor_to_dto(remote)
|
|
items = await services.federation.get_actor_timeline(g.s, id)
|
|
# Check if we follow this actor
|
|
is_following = False
|
|
if actor:
|
|
from shared.models.federation import APFollowing
|
|
existing = (
|
|
await g.s.execute(
|
|
sa_select(APFollowing).where(
|
|
APFollowing.actor_profile_id == actor.id,
|
|
APFollowing.remote_actor_id == id,
|
|
)
|
|
)
|
|
).scalar_one_or_none()
|
|
is_following = existing is not None
|
|
return await render_template(
|
|
"federation/actor_timeline.html",
|
|
remote_actor=remote_dto,
|
|
items=items,
|
|
is_following=is_following,
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/actor/<int:id>/timeline")
|
|
async def actor_timeline_page(id: int):
|
|
actor = getattr(g, "_social_actor", None)
|
|
before_str = request.args.get("before")
|
|
before = None
|
|
if before_str:
|
|
try:
|
|
before = datetime.fromisoformat(before_str)
|
|
except ValueError:
|
|
pass
|
|
items = await services.federation.get_actor_timeline(
|
|
g.s, id, before=before,
|
|
)
|
|
return await render_template(
|
|
"federation/_timeline_items.html",
|
|
items=items,
|
|
timeline_type="actor",
|
|
actor_id=id,
|
|
actor=actor,
|
|
)
|
|
|
|
# -- Notifications --------------------------------------------------------
|
|
|
|
@bp.get("/notifications")
|
|
async def notifications():
|
|
actor = _require_actor()
|
|
items = await services.federation.get_notifications(g.s, actor.id)
|
|
await services.federation.mark_notifications_read(g.s, actor.id)
|
|
return await render_template(
|
|
"federation/notifications.html",
|
|
notifications=items,
|
|
actor=actor,
|
|
)
|
|
|
|
@bp.get("/notifications/count")
|
|
async def notification_count():
|
|
actor = getattr(g, "_social_actor", None)
|
|
if not actor:
|
|
return Response("0", content_type="text/plain")
|
|
count = await services.federation.unread_notification_count(g.s, actor.id)
|
|
if count > 0:
|
|
return Response(
|
|
f'<span class="bg-red-500 text-white text-xs rounded-full px-1.5 py-0.5">{count}</span>',
|
|
content_type="text/html",
|
|
)
|
|
return Response("", content_type="text/html")
|
|
|
|
@bp.post("/notifications/read")
|
|
async def mark_read():
|
|
actor = _require_actor()
|
|
await services.federation.mark_notifications_read(g.s, actor.id)
|
|
return redirect(url_for("social.notifications"))
|
|
|
|
return bp
|