- Fix get_activities to use get_activities_paginated - Add get_user_assets, delete_asset, count_users, count_user_activities - Add get_user_activities, get_renderer, update_anchor, delete_anchor - Add record_run and get_run functions - Fix create_asset calls to use dict parameter - Fix update_asset call signature Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
100 lines
2.5 KiB
Python
100 lines
2.5 KiB
Python
"""
|
|
Activity routes for L2 server.
|
|
|
|
Handles ActivityPub activities and outbox.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Request, Depends, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from artdag_common import render
|
|
from artdag_common.middleware import wants_html, wants_json
|
|
|
|
from ..config import settings
|
|
from ..dependencies import get_templates, require_auth, get_user_from_cookie
|
|
|
|
router = APIRouter()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@router.get("")
|
|
async def list_activities(
|
|
request: Request,
|
|
offset: int = 0,
|
|
limit: int = 20,
|
|
):
|
|
"""List recent activities."""
|
|
import db
|
|
|
|
username = get_user_from_cookie(request)
|
|
|
|
activities, total = await db.get_activities_paginated(limit=limit, offset=offset)
|
|
has_more = offset + len(activities) < total
|
|
|
|
if wants_json(request):
|
|
return {"activities": activities, "offset": offset, "limit": limit}
|
|
|
|
templates = get_templates(request)
|
|
return render(templates, "activities/list.html", request,
|
|
activities=activities,
|
|
user={"username": username} if username else None,
|
|
offset=offset,
|
|
limit=limit,
|
|
has_more=has_more,
|
|
active_tab="activities",
|
|
)
|
|
|
|
|
|
@router.get("/{activity_id}")
|
|
async def get_activity(
|
|
activity_id: str,
|
|
request: Request,
|
|
):
|
|
"""Get activity details."""
|
|
import db
|
|
|
|
activity = await db.get_activity(activity_id)
|
|
if not activity:
|
|
raise HTTPException(404, "Activity not found")
|
|
|
|
# ActivityPub response
|
|
if "application/activity+json" in request.headers.get("accept", ""):
|
|
return JSONResponse(
|
|
content=activity.get("activity_json", activity),
|
|
media_type="application/activity+json",
|
|
)
|
|
|
|
if wants_json(request):
|
|
return activity
|
|
|
|
username = get_user_from_cookie(request)
|
|
templates = get_templates(request)
|
|
return render(templates, "activities/detail.html", request,
|
|
activity=activity,
|
|
user={"username": username} if username else None,
|
|
active_tab="activities",
|
|
)
|
|
|
|
|
|
@router.post("")
|
|
async def create_activity(
|
|
request: Request,
|
|
user: dict = Depends(require_auth),
|
|
):
|
|
"""Create a new activity (internal use)."""
|
|
import db
|
|
import json
|
|
|
|
body = await request.json()
|
|
|
|
activity_id = await db.create_activity(
|
|
actor=user["actor_id"],
|
|
activity_type=body.get("type", "Create"),
|
|
object_data=body.get("object"),
|
|
)
|
|
|
|
return {"activity_id": activity_id, "created": True}
|