Initial federation app — ActivityPub server for Rose-Ash
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 41s

Phase 0+1 of AP integration. New 5th Quart microservice:

Blueprints:
- wellknown: WebFinger, NodeInfo 2.0, host-meta
- actors: AP actor profiles (JSON-LD + HTML), outbox, inbox, followers
- identity: username selection flow (creates ActorProfile + RSA keypair)
- auth: magic link login/logout (ported from blog, self-contained)

Services:
- Registers SqlFederationService (real impl) for federation domain
- Registers real impls for blog, calendar, market, cart
- All cross-domain via shared service contracts

Templates:
- Actor profiles, username selection, platform home
- Auth login/check-email (ported from blog)

Infrastructure:
- Dockerfile + entrypoint.sh (matches other apps)
- CI/CD via Gitea Actions
- shared/ as git submodule

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-21 15:11:52 +00:00
commit 41e9670975
32 changed files with 1450 additions and 0 deletions

0
bp/auth/__init__.py Normal file
View File

168
bp/auth/routes.py Normal file
View File

@@ -0,0 +1,168 @@
"""Authentication routes for the federation app.
Ported from blog/bp/auth/routes.py — owns magic link login/logout.
Simplified: no Ghost sync, no newsletter management (those stay in blog).
"""
from __future__ import annotations
from datetime import datetime, timezone
from quart import (
Blueprint,
request,
render_template,
make_response,
redirect,
url_for,
session as qsession,
g,
current_app,
)
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from shared.db.session import get_session
from shared.models import User
from shared.config import config
from shared.utils import host_url
from shared.infrastructure.urls import federation_url
from shared.infrastructure.cart_identity import current_cart_identity
from shared.events import emit_event
from shared.services.registry import services
from .services import (
pop_login_redirect_target,
store_login_redirect_target,
send_magic_email,
find_or_create_user,
create_magic_link,
validate_magic_link,
validate_email,
)
SESSION_USER_KEY = "uid"
def register(url_prefix="/auth"):
auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix)
@auth_bp.get("/login/")
async def login_form():
store_login_redirect_target()
if g.get("user"):
return redirect(federation_url("/"))
return await render_template("auth/login.html")
@auth_bp.get("/account/")
async def account():
if not g.get("user"):
return redirect(host_url(url_for("auth.login_form")))
# Check if user has an ActorProfile
actor = await services.federation.get_actor_by_user_id(g.s, g.user.id)
return await render_template(
"federation/account.html",
actor=actor,
)
@auth_bp.post("/start/")
async def start_login():
form = await request.form
email_input = form.get("email") or ""
is_valid, email = validate_email(email_input)
if not is_valid:
return (
await render_template(
"auth/login.html",
error="Please enter a valid email address.",
email=email_input,
),
400,
)
user = await find_or_create_user(g.s, email)
token, expires = await create_magic_link(g.s, user.id)
magic_url = host_url(url_for("auth.magic", token=token))
email_error = None
try:
await send_magic_email(email, magic_url)
except Exception as e:
current_app.logger.error("EMAIL SEND FAILED: %r", e)
email_error = (
"We couldn't send the email automatically. "
"Please try again in a moment."
)
return await render_template(
"auth/check_email.html",
email=email,
email_error=email_error,
)
@auth_bp.get("/magic/<token>/")
async def magic(token: str):
now = datetime.now(timezone.utc)
user_id: int | None = None
try:
async with get_session() as s:
async with s.begin():
user, error = await validate_magic_link(s, token)
if error:
return (
await render_template("auth/login.html", error=error),
400,
)
user_id = user.id
except Exception:
return (
await render_template(
"auth/login.html",
error="Could not sign you in right now. Please try again.",
),
502,
)
assert user_id is not None
ident = current_cart_identity()
anon_session_id = ident.get("session_id")
try:
async with get_session() as s:
async with s.begin():
u2 = await s.get(User, user_id)
if u2:
u2.last_login_at = now
if anon_session_id:
await emit_event(
s,
"user.logged_in",
"user",
user_id,
{
"user_id": user_id,
"session_id": anon_session_id,
},
)
except SQLAlchemyError:
current_app.logger.exception(
"[auth] non-fatal DB update for user_id=%s", user_id
)
qsession[SESSION_USER_KEY] = user_id
redirect_url = pop_login_redirect_target()
return redirect(redirect_url, 303)
@auth_bp.post("/logout/")
async def logout():
qsession.pop(SESSION_USER_KEY, None)
return redirect(federation_url("/"))
return auth_bp

View File

@@ -0,0 +1,24 @@
from .login_redirect import pop_login_redirect_target, store_login_redirect_target
from .auth_operations import (
get_app_host,
get_app_root,
send_magic_email,
load_user_by_id,
find_or_create_user,
create_magic_link,
validate_magic_link,
validate_email,
)
__all__ = [
"pop_login_redirect_target",
"store_login_redirect_target",
"get_app_host",
"get_app_root",
"send_magic_email",
"load_user_by_id",
"find_or_create_user",
"create_magic_link",
"validate_magic_link",
"validate_email",
]

View File

@@ -0,0 +1,157 @@
"""Auth operations for the federation app.
Copied from blog/bp/auth/services/auth_operations.py to avoid cross-app
import chains. The logic is identical — shared models, shared config.
"""
from __future__ import annotations
import os
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional, Tuple
from quart import current_app, render_template, request, g
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from shared.models import User, MagicLink
from shared.config import config
def get_app_host() -> str:
host = (
config().get("host") or os.getenv("APP_HOST") or "http://localhost:8000"
).rstrip("/")
return host
def get_app_root() -> str:
root = (g.root).rstrip("/")
return root
async def send_magic_email(to_email: str, link_url: str) -> None:
host = os.getenv("SMTP_HOST")
port = int(os.getenv("SMTP_PORT") or "587")
username = os.getenv("SMTP_USER")
password = os.getenv("SMTP_PASS")
mail_from = os.getenv("MAIL_FROM") or "no-reply@example.com"
site_name = config().get("title", "Rose Ash")
subject = f"Your sign-in link \u2014 {site_name}"
tpl_vars = dict(site_name=site_name, link_url=link_url)
text_body = await render_template("_email/magic_link.txt", **tpl_vars)
html_body = await render_template("_email/magic_link.html", **tpl_vars)
if not host or not username or not password:
current_app.logger.warning(
"SMTP not configured. Printing magic link to console for %s: %s",
to_email,
link_url,
)
print(f"[DEV] Magic link for {to_email}: {link_url}")
return
import aiosmtplib
from email.message import EmailMessage
msg = EmailMessage()
msg["From"] = mail_from
msg["To"] = to_email
msg["Subject"] = subject
msg.set_content(text_body)
msg.add_alternative(html_body, subtype="html")
is_secure = port == 465
if is_secure:
smtp = aiosmtplib.SMTP(
hostname=host, port=port, use_tls=True,
username=username, password=password,
)
else:
smtp = aiosmtplib.SMTP(
hostname=host, port=port, start_tls=True,
username=username, password=password,
)
async with smtp:
await smtp.send_message(msg)
async def load_user_by_id(session: AsyncSession, user_id: int) -> Optional[User]:
stmt = (
select(User)
.options(selectinload(User.labels))
.where(User.id == user_id)
)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def find_or_create_user(session: AsyncSession, email: str) -> User:
result = await session.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()
if user is None:
user = User(email=email)
session.add(user)
await session.flush()
return user
async def create_magic_link(
session: AsyncSession,
user_id: int,
purpose: str = "signin",
expires_minutes: int = 15,
) -> Tuple[str, datetime]:
token = secrets.token_urlsafe(32)
expires = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes)
ml = MagicLink(
token=token,
user_id=user_id,
purpose=purpose,
expires_at=expires,
ip=request.headers.get("x-forwarded-for", request.remote_addr),
user_agent=request.headers.get("user-agent"),
)
session.add(ml)
return token, expires
async def validate_magic_link(
session: AsyncSession,
token: str,
) -> Tuple[Optional[User], Optional[str]]:
now = datetime.now(timezone.utc)
ml = await session.scalar(
select(MagicLink)
.where(MagicLink.token == token)
.with_for_update()
)
if not ml or ml.purpose != "signin":
return None, "Invalid or expired link."
if ml.used_at or ml.expires_at < now:
return None, "This link has expired. Please request a new one."
user = await session.get(User, ml.user_id)
if not user:
return None, "User not found."
ml.used_at = now
return user, None
def validate_email(email: str) -> Tuple[bool, str]:
email = email.strip().lower()
if not email or "@" not in email:
return False, email
return True, email

View File

@@ -0,0 +1,45 @@
from urllib.parse import urlparse
from quart import session
from shared.infrastructure.urls import federation_url
LOGIN_REDIRECT_SESSION_KEY = "login_redirect_to"
def store_login_redirect_target() -> None:
from quart import request
target = request.args.get("next")
if not target:
ref = request.referrer or ""
try:
parsed = urlparse(ref)
target = parsed.path or ""
except Exception:
target = ""
if not target:
return
# Accept both relative paths and absolute URLs (cross-app redirects)
if target.startswith("http://") or target.startswith("https://"):
session[LOGIN_REDIRECT_SESSION_KEY] = target
elif target.startswith("/") and not target.startswith("//"):
session[LOGIN_REDIRECT_SESSION_KEY] = target
def pop_login_redirect_target() -> str:
path = session.pop(LOGIN_REDIRECT_SESSION_KEY, None)
if not path or not isinstance(path, str):
return federation_url("/auth/")
# Absolute URL: return as-is (cross-app redirect)
if path.startswith("http://") or path.startswith("https://"):
return path
# Relative path: must start with / and not //
if path.startswith("/") and not path.startswith("//"):
return federation_url(path)
return federation_url("/auth/")