Files
rose-ash/app/routers/activities.py
giles f54b0fb5da Squashed 'l2/' content from commit 79caa24
git-subtree-dir: l2
git-subtree-split: 79caa24e2129bf6e2cee819327d5622425306b67
2026-02-24 23:07:31 +00:00

100 lines
2.5 KiB
Python

"""
Activity routes for L2 server.
Handles ActivityPub activities and outbox.
"""
import logging
from typing import Optional
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import JSONResponse
from artdag_common import render
from artdag_common.middleware import wants_html, wants_json
from ..config import settings
from ..dependencies import get_templates, require_auth, get_user_from_cookie
router = APIRouter()
logger = logging.getLogger(__name__)
@router.get("")
async def list_activities(
request: Request,
offset: int = 0,
limit: int = 20,
):
"""List recent activities."""
import db
username = get_user_from_cookie(request)
activities, total = await db.get_activities_paginated(limit=limit, offset=offset)
has_more = offset + len(activities) < total
if wants_json(request):
return {"activities": activities, "offset": offset, "limit": limit}
templates = get_templates(request)
return render(templates, "activities/list.html", request,
activities=activities,
user={"username": username} if username else None,
offset=offset,
limit=limit,
has_more=has_more,
active_tab="activities",
)
@router.get("/{activity_id}")
async def get_activity(
activity_id: str,
request: Request,
):
"""Get activity details."""
import db
activity = await db.get_activity(activity_id)
if not activity:
raise HTTPException(404, "Activity not found")
# ActivityPub response
if "application/activity+json" in request.headers.get("accept", ""):
return JSONResponse(
content=activity.get("activity_json", activity),
media_type="application/activity+json",
)
if wants_json(request):
return activity
username = get_user_from_cookie(request)
templates = get_templates(request)
return render(templates, "activities/detail.html", request,
activity=activity,
user={"username": username} if username else None,
active_tab="activities",
)
@router.post("")
async def create_activity(
request: Request,
user: dict = Depends(require_auth),
):
"""Create a new activity (internal use)."""
import db
import json
body = await request.json()
activity_id = await db.create_activity(
actor=user["actor_id"],
activity_type=body.get("type", "Create"),
object_data=body.get("object"),
)
return {"activity_id": activity_id, "created": True}