Compare commits

..

4 Commits

Author SHA1 Message Date
giles
bccfff0c69 Add fediverse social tables, protocols, and implementations
6 new ORM models (remote actors, following, remote posts, local posts,
interactions, notifications), 20 new FederationService methods with
SQL implementations and stubs, WebFinger client, and Alembic migration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 11:56:33 +00:00
giles
9a8b556c13 Fix duplicate AP posts + stable object IDs
- Stable object ID per source (Post#123 always gets the same id)
  instead of deriving from activity UUID
- Dedup Update activities (Ghost fires duplicate webhooks)
- Use setdefault for object id in delivery handler

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 10:14:40 +00:00
giles
a626dd849d Fix AP Delete: Tombstone id must match original Create object id
Mastodon ignored Delete activities because the Tombstone id was the
post URL, not the object id from the original Create activity. Now
looks up the existing Create activity and uses its object id.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 09:25:30 +00:00
giles
d0b1edea7a Add container_nav widget rendering to day and entry nav templates
Events app day view and entry detail nav now render registered
container_nav widgets (e.g. market links) alongside existing entries/posts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 09:13:44 +00:00
55 changed files with 490 additions and 933 deletions

View File

@@ -1,6 +1,6 @@
# Shared # Shared
Shared infrastructure, models, contracts, services, and templates used by all five Rose Ash microservices (blog, market, cart, events, federation). Included as a git submodule in each app. Shared infrastructure, models, templates, and configuration used by all four Rose Ash microservices (blog, market, cart, events). Included as a git submodule in each app.
## Structure ## Structure
@@ -8,78 +8,53 @@ Shared infrastructure, models, contracts, services, and templates used by all fi
shared/ shared/
db/ db/
base.py # SQLAlchemy declarative Base base.py # SQLAlchemy declarative Base
session.py # Async session factory (get_session, register_db) session.py # Async session factory (get_session)
models/ # Canonical domain models models/ # Shared domain models
user.py # User user.py # User
magic_link.py # MagicLink (auth tokens) magic_link.py # MagicLink (auth tokens)
(domain_event.py removed — table dropped, see migration n4l2i8j0k1) domain_event.py # DomainEvent (transactional outbox)
kv.py # KeyValue (key-value store) kv.py # KeyValue (key-value store)
menu_item.py # MenuItem (deprecated — use MenuNode) menu_item.py # MenuItem
menu_node.py # MenuNode (navigation tree)
container_relation.py # ContainerRelation (parent-child content)
ghost_membership_entities.py # GhostNewsletter, UserNewsletter ghost_membership_entities.py # GhostNewsletter, UserNewsletter
federation.py # ActorProfile, APActivity, APFollower, APFollowing,
# RemoteActor, APRemotePost, APLocalPost,
# APInteraction, APNotification, APAnchor, IPFSPin
contracts/
dtos.py # Frozen dataclasses for cross-domain data transfer
protocols.py # Service protocols (Blog, Calendar, Market, Cart, Federation)
widgets.py # Widget types (NavWidget, CardWidget, AccountPageWidget)
services/
registry.py # Typed singleton: services.blog, .calendar, .market, .cart, .federation
blog_impl.py # SqlBlogService
calendar_impl.py # SqlCalendarService
market_impl.py # SqlMarketService
cart_impl.py # SqlCartService
federation_impl.py # SqlFederationService
federation_publish.py # try_publish() — inline AP publication helper
stubs.py # No-op stubs for absent domains
navigation.py # get_navigation_tree()
relationships.py # attach_child, get_children, detach_child
widget_registry.py # Widget registry singleton
widgets/ # Per-domain widget registration
infrastructure/ infrastructure/
factory.py # create_base_app() — Quart app factory factory.py # create_base_app() — Quart app factory
cart_identity.py # current_cart_identity() (user_id or session_id) cart_identity.py # current_cart_identity() (user_id or session_id)
cart_loader.py # Cart data loader for context processors cart_loader.py # Cart data loader for context processors
context.py # Jinja2 context processors context.py # Jinja2 context processors
internal_api.py # Inter-app HTTP client (get/post via httpx)
jinja_setup.py # Jinja2 template environment setup jinja_setup.py # Jinja2 template environment setup
urls.py # URL helpers (blog_url, market_url, etc.) urls.py # URL helpers (coop_url, market_url, etc.)
user_loader.py # Load current user from session user_loader.py # Load current user from session
http_utils.py # HTTP utility functions http_utils.py # HTTP utility functions
events/ events/
bus.py # emit_activity(), register_activity_handler() bus.py # emit_event(), register_handler()
processor.py # EventProcessor (polls ap_activities, runs handlers) processor.py # EventProcessor (polls domain_events, runs handlers)
handlers/ # Shared activity handlers browser/app/
container_handlers.py # Navigation rebuild on attach/detach csrf.py # CSRF protection
login_handlers.py # Cart/entry adoption on login errors.py # Error handlers
order_handlers.py # Order lifecycle events middleware.py # Request/response middleware
ap_delivery_handler.py # AP activity delivery to follower inboxes (wildcard) redis_cacher.py # Tag-based Redis page caching
utils/ authz.py # Authorization helpers
__init__.py filters/ # Jinja2 template filters (currency, truncate, etc.)
calendar_helpers.py # Calendar period/entry utilities utils/ # HTMX helpers, UTC time, parsing
http_signatures.py # RSA keypair generation, HTTP signature signing/verification payments/sumup.py # SumUp checkout API integration
ipfs_client.py # Async IPFS client (add_bytes, add_json, pin_cid) browser/templates/ # ~300 Jinja2 templates shared across all apps
anchoring.py # Merkle trees + OpenTimestamps Bitcoin anchoring
webfinger.py # WebFinger actor resolution
browser/
app/ # Middleware, CSRF, errors, Redis caching, authz, filters
templates/ # ~300 Jinja2 templates shared across all apps
containers.py # ContainerType, container_filter, content_filter helpers
config.py # YAML config loader config.py # YAML config loader
containers.py # ContainerType, container_filter, content_filter helpers
log_config/setup.py # Logging configuration (JSON formatter) log_config/setup.py # Logging configuration (JSON formatter)
utils.py # host_url and other shared utilities
static/ # Shared static assets (CSS, JS, images, FontAwesome) static/ # Shared static assets (CSS, JS, images, FontAwesome)
editor/ # Koenig (Ghost) rich text editor build editor/ # Koenig (Ghost) rich text editor build
alembic/ # Database migrations alembic/ # Database migrations (25 versions)
env.py # Imports models from all apps (with try/except guards)
versions/ # Migration files — single head: j0h8e4f6g7
``` ```
## Key Patterns ## Key Patterns
- **App factory:** All apps call `create_base_app()` which sets up DB sessions, CSRF, error handling, event processing, logging, widget registration, and domain service wiring. - **App factory:** All apps call `create_base_app()` which sets up DB sessions, CSRF, error handling, event processing, logging, and the glue handler registry.
- **Service contracts:** Cross-domain communication via typed Protocols + frozen DTO dataclasses. Apps call `services.calendar.method()`, never import models from other domains. - **Event bus:** `emit_event()` writes to `domain_events` table in the caller's transaction. `EventProcessor` polls and dispatches to registered handlers.
- **Service registry:** Typed singleton (`services.blog`, `.calendar`, `.market`, `.cart`, `.federation`). Apps wire their own domain + stubs for others via `register_domain_services()`. - **Inter-app HTTP:** `internal_api.get/post("cart", "/internal/cart/summary")` for cross-app reads. URLs resolved from `app-config.yaml`.
- **Activity bus:** `emit_activity()` writes to `ap_activities` table in the caller's transaction. `EventProcessor` polls pending activities and dispatches to registered handlers. Internal events use `visibility="internal"`; federation activities use `visibility="public"` and are delivered to follower inboxes by the wildcard delivery handler.
- **Widget registry:** Domain services register widgets (nav, card, account); templates consume via `widgets.container_nav`, `widgets.container_cards`.
- **Cart identity:** `current_cart_identity()` returns `{"user_id": int|None, "session_id": str|None}` from the request session. - **Cart identity:** `current_cart_identity()` returns `{"user_id": int|None, "session_id": str|None}` from the request session.
## Alembic Migrations ## Alembic Migrations
@@ -87,5 +62,8 @@ shared/
All apps share one PostgreSQL database. Migrations are managed here and run from the blog app's entrypoint (other apps skip migrations on startup). All apps share one PostgreSQL database. Migrations are managed here and run from the blog app's entrypoint (other apps skip migrations on startup).
```bash ```bash
# From any app directory (shared/ must be on sys.path)
alembic -c shared/alembic.ini upgrade head alembic -c shared/alembic.ini upgrade head
``` ```
Current head: `j0h8e4f6g7` (drop cross-domain FK constraints).

View File

@@ -1 +1 @@
# shared package — infrastructure, models, contracts, and services # shared package — extracted from blog/shared_lib/

View File

@@ -19,7 +19,7 @@ from shared.db.base import Base
# Import ALL models so Base.metadata sees every table # Import ALL models so Base.metadata sees every table
import shared.models # noqa: F401 User, KV, MagicLink, MenuItem, Ghost* import shared.models # noqa: F401 User, KV, MagicLink, MenuItem, Ghost*
for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models"): for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models", "glue.models"):
try: try:
__import__(_mod) __import__(_mod)
except ImportError: except ImportError:

View File

@@ -1,113 +0,0 @@
"""add unified event bus columns to ap_activities
Revision ID: m3k1h7i9j0
Revises: l2j0g6h8i9
Create Date: 2026-02-22
Adds processing and visibility columns so ap_activities can serve as the
unified event bus for both internal domain events and federation delivery.
"""
revision = "m3k1h7i9j0"
down_revision = "l2j0g6h8i9"
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
# Add new columns with defaults so existing rows stay valid
op.add_column(
"ap_activities",
sa.Column("actor_uri", sa.String(512), nullable=True),
)
op.add_column(
"ap_activities",
sa.Column(
"visibility", sa.String(20),
nullable=False, server_default="public",
),
)
op.add_column(
"ap_activities",
sa.Column(
"process_state", sa.String(20),
nullable=False, server_default="completed",
),
)
op.add_column(
"ap_activities",
sa.Column(
"process_attempts", sa.Integer(),
nullable=False, server_default="0",
),
)
op.add_column(
"ap_activities",
sa.Column(
"process_max_attempts", sa.Integer(),
nullable=False, server_default="5",
),
)
op.add_column(
"ap_activities",
sa.Column("process_error", sa.Text(), nullable=True),
)
op.add_column(
"ap_activities",
sa.Column(
"processed_at", sa.DateTime(timezone=True), nullable=True,
),
)
# Backfill actor_uri from the related actor_profile
op.execute(
"""
UPDATE ap_activities a
SET actor_uri = CONCAT(
'https://',
COALESCE(current_setting('app.ap_domain', true), 'rose-ash.com'),
'/users/',
p.preferred_username
)
FROM ap_actor_profiles p
WHERE a.actor_profile_id = p.id
AND a.actor_uri IS NULL
"""
)
# Make actor_profile_id nullable (internal events have no actor profile)
op.alter_column(
"ap_activities", "actor_profile_id",
existing_type=sa.Integer(),
nullable=True,
)
# Index for processor polling
op.create_index(
"ix_ap_activity_process", "ap_activities", ["process_state"],
)
def downgrade() -> None:
op.drop_index("ix_ap_activity_process", table_name="ap_activities")
# Restore actor_profile_id NOT NULL (remove any rows without it first)
op.execute(
"DELETE FROM ap_activities WHERE actor_profile_id IS NULL"
)
op.alter_column(
"ap_activities", "actor_profile_id",
existing_type=sa.Integer(),
nullable=False,
)
op.drop_column("ap_activities", "processed_at")
op.drop_column("ap_activities", "process_error")
op.drop_column("ap_activities", "process_max_attempts")
op.drop_column("ap_activities", "process_attempts")
op.drop_column("ap_activities", "process_state")
op.drop_column("ap_activities", "visibility")
op.drop_column("ap_activities", "actor_uri")

View File

@@ -1,46 +0,0 @@
"""drop domain_events table
Revision ID: n4l2i8j0k1
Revises: m3k1h7i9j0
Create Date: 2026-02-22
The domain_events table is no longer used — all events now flow through
ap_activities with the unified activity bus.
"""
revision = "n4l2i8j0k1"
down_revision = "m3k1h7i9j0"
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
def upgrade() -> None:
op.drop_index("ix_domain_events_state", table_name="domain_events")
op.drop_index("ix_domain_events_event_type", table_name="domain_events")
op.drop_table("domain_events")
def downgrade() -> None:
op.create_table(
"domain_events",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
sa.Column("event_type", sa.String(128), nullable=False),
sa.Column("aggregate_type", sa.String(64), nullable=False),
sa.Column("aggregate_id", sa.Integer(), nullable=False),
sa.Column("payload", JSONB(), nullable=True),
sa.Column("state", sa.String(20), nullable=False, server_default="pending"),
sa.Column("attempts", sa.Integer(), nullable=False, server_default="0"),
sa.Column("max_attempts", sa.Integer(), nullable=False, server_default="5"),
sa.Column("last_error", sa.Text(), nullable=True),
sa.Column(
"created_at", sa.DateTime(timezone=True),
nullable=False, server_default=sa.func.now(),
),
sa.Column("processed_at", sa.DateTime(timezone=True), nullable=True),
)
op.create_index("ix_domain_events_event_type", "domain_events", ["event_type"])
op.create_index("ix_domain_events_state", "domain_events", ["state"])

View File

@@ -1,35 +0,0 @@
"""Add origin_app column to ap_activities
Revision ID: o5m3j9k1l2
Revises: n4l2i8j0k1
Create Date: 2026-02-22
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect as sa_inspect
revision = "o5m3j9k1l2"
down_revision = "n4l2i8j0k1"
branch_labels = None
depends_on = None
def upgrade() -> None:
conn = op.get_bind()
inspector = sa_inspect(conn)
columns = [c["name"] for c in inspector.get_columns("ap_activities")]
if "origin_app" not in columns:
op.add_column(
"ap_activities",
sa.Column("origin_app", sa.String(64), nullable=True),
)
# Index is idempotent with if_not_exists
op.create_index(
"ix_ap_activity_origin_app", "ap_activities", ["origin_app"],
if_not_exists=True,
)
def downgrade() -> None:
op.drop_index("ix_ap_activity_origin_app", table_name="ap_activities")
op.drop_column("ap_activities", "origin_app")

View File

@@ -1,9 +1,9 @@
# The monolith has been split into three apps (apps/blog, apps/market, apps/cart). # The monolith has been split into three apps (apps/coop, apps/market, apps/cart).
# This package remains for shared infrastructure modules (middleware, redis_cacher, # This package remains for shared infrastructure modules (middleware, redis_cacher,
# csrf, errors, authz, filters, utils, bp/*). # csrf, errors, authz, filters, utils, bp/*).
# #
# To run individual apps: # To run individual apps:
# hypercorn apps.blog.app:app --bind 0.0.0.0:8000 # hypercorn apps.coop.app:app --bind 0.0.0.0:8000
# hypercorn apps.market.app:app --bind 0.0.0.0:8001 # hypercorn apps.market.app:app --bind 0.0.0.0:8001
# hypercorn apps.cart.app:app --bind 0.0.0.0:8002 # hypercorn apps.cart.app:app --bind 0.0.0.0:8002
# #

View File

@@ -18,7 +18,7 @@
{% endif %} {% endif %}
{% endif %} {% endif %}
</div> </div>
<form action="{{ federation_url('/auth/logout/') }}" method="post"> <form action="{{ coop_url('/auth/logout/') }}" method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"> <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button <button
type="submit" type="submit"

View File

@@ -1,5 +1,5 @@
{% import 'macros/links.html' as links %} {% import 'macros/links.html' as links %}
{% call links.link(blog_url('/auth/newsletters/'), hx_select_search, select_colours, True, aclass=styles.nav_button) %} {% call links.link(coop_url('/auth/newsletters/'), hx_select_search, select_colours, True, aclass=styles.nav_button) %}
newsletters newsletters
{% endcall %} {% endcall %}
{% for link in account_nav_links %} {% for link in account_nav_links %}

View File

@@ -1,6 +1,6 @@
<div id="nl-{{ un.newsletter_id }}" class="flex items-center"> <div id="nl-{{ un.newsletter_id }}" class="flex items-center">
<button <button
hx-post="{{ blog_url('/auth/newsletter/' ~ un.newsletter_id ~ '/toggle/') }}" hx-post="{{ coop_url('/auth/newsletter/' ~ un.newsletter_id ~ '/toggle/') }}"
hx-headers='{"X-CSRFToken": "{{ csrf_token() }}"}' hx-headers='{"X-CSRFToken": "{{ csrf_token() }}"}'
hx-target="#nl-{{ un.newsletter_id }}" hx-target="#nl-{{ un.newsletter_id }}"
hx-swap="outerHTML" hx-swap="outerHTML"

View File

@@ -22,7 +22,7 @@
{# No subscription row yet — show an off toggle that will create one #} {# No subscription row yet — show an off toggle that will create one #}
<div id="nl-{{ item.newsletter.id }}" class="flex items-center"> <div id="nl-{{ item.newsletter.id }}" class="flex items-center">
<button <button
hx-post="{{ blog_url('/auth/newsletter/' ~ item.newsletter.id ~ '/toggle/') }}" hx-post="{{ coop_url('/auth/newsletter/' ~ item.newsletter.id ~ '/toggle/') }}"
hx-headers='{"X-CSRFToken": "{{ csrf_token() }}"}' hx-headers='{"X-CSRFToken": "{{ csrf_token() }}"}'
hx-target="#nl-{{ item.newsletter.id }}" hx-target="#nl-{{ item.newsletter.id }}"
hx-swap="outerHTML" hx-swap="outerHTML"

View File

@@ -22,7 +22,7 @@
<p class="mt-6 text-sm"> <p class="mt-6 text-sm">
<a <a
href="{{ blog_url('/auth/login/') }}" href="{{ coop_url('/auth/login/') }}"
class="text-stone-600 dark:text-stone-300 hover:underline" class="text-stone-600 dark:text-stone-300 hover:underline"
> >
← Back ← Back

View File

@@ -1,7 +1,7 @@
{% import 'macros/links.html' as links %} {% import 'macros/links.html' as links %}
{% macro header_row(oob=False) %} {% macro header_row(oob=False) %}
{% call links.menu_row(id='auth-row', oob=oob) %} {% call links.menu_row(id='auth-row', oob=oob) %}
{% call links.link(blog_url('/auth/account/'), hx_select_search ) %} {% call links.link(coop_url('/auth/account/'), hx_select_search ) %}
<i class="fa-solid fa-user"></i> <i class="fa-solid fa-user"></i>
<div>account</div> <div>account</div>
{% endcall %} {% endcall %}

View File

@@ -14,7 +14,7 @@
{% endif %} {% endif %}
<form <form
method="post" action="{{ blog_url('/auth/start/') }}" method="post" action="{{ coop_url('/auth/start/') }}"
class="mt-6 space-y-5" class="mt-6 space-y-5"
> >
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"> <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">

View File

@@ -0,0 +1,169 @@
{% macro show_cart(oob=False) %}
<div id="cart" {% if oob %} hx-swap-oob="{{oob}}" {% endif%}>
{# Empty cart #}
{% if not cart and not calendar_cart_entries %}
<div class="rounded-2xl border border-dashed border-stone-300 bg-white/80 p-6 sm:p-8 text-center">
<div class="inline-flex h-10 w-10 sm:h-12 sm:w-12 items-center justify-center rounded-full bg-stone-100 mb-3">
<i class="fa fa-shopping-cart text-stone-500 text-sm sm:text-base" aria-hidden="true"></i>
</div>
<p class="text-base sm:text-lg font-medium text-stone-800">
Your cart is empty
</p>
{#
<p class="mt-1 text-xs sm:text-sm text-stone-600">
Add some items from the shop to see them here.
</p>
<div class="mt-4">
<a
href="{{ market_url('/') }}"
class="inline-flex items-center px-4 py-2 text-sm font-semibold rounded-full bg-emerald-600 text-white hover:bg-emerald-700"
>
Browse products
</a>
</div> #}
</div>
{% else %}
<div _class="grid gap-y-6 lg:gap-8 lg:grid-cols-[minmax(0,2fr),minmax(0,1fr)]">
{# Items list #}
<section class="space-y-3 sm:space-y-4">
{% for item in cart %}
{% from '_types/product/_cart.html' import cart_item with context %}
{{ cart_item()}}
{% endfor %}
{% if calendar_cart_entries %}
<div class="mt-6 border-t border-stone-200 pt-4">
<h2 class="text-base font-semibold mb-2">
Calendar bookings
</h2>
<ul class="space-y-2">
{% for entry in calendar_cart_entries %}
<li class="flex items-start justify-between text-sm">
<div>
<div class="font-medium">
{{ entry.name or entry.calendar_name }}
</div>
<div class="text-xs text-stone-500">
{{ entry.start_at }}
{% if entry.end_at %}
{{ entry.end_at }}
{% endif %}
</div>
</div>
<div class="ml-4 font-medium">
£{{ "%.2f"|format(entry.cost or 0) }}
</div>
</li>
{% endfor %}
</ul>
</div>
{% endif %}
</section>
{{summary(cart, total, calendar_total, calendar_cart_entries,)}}
</div>
{% endif %}
</div>
{% endmacro %}
{% macro summary(cart, total, calendar_total, calendar_cart_entries, oob=False) %}
<aside id="cart-summary" class="lg:pl-2" {% if oob %} hx-swap-oob="{{oob}}" {% endif %}>
<div class="rounded-2xl bg-white shadow-sm border border-stone-200 p-4 sm:p-5">
<h2 class="text-sm sm:text-base font-semibold text-stone-900 mb-3 sm:mb-4">
Order summary
</h2>
<dl class="space-y-2 text-xs sm:text-sm">
<div class="flex items-center justify-between">
<dt class="text-stone-600">Items</dt>
<dd class="text-stone-900">
{{ cart | sum(attribute="quantity") }}
</dd>
</div>
<div class="flex items-center justify-between">
<dt class="text-stone-600">Subtotal</dt>
<dd class="text-stone-900">
{{ cart_grand_total(cart, total, calendar_total, calendar_cart_entries ) }}
</dd>
</div>
</dl>
<div class="flex flex-col items-center w-full">
<h1 class="text-5xl mt-2">
This is a test - it will not take actual money
</h1>
<div>
use dummy card number: 5555 5555 5555 4444
</div>
</div>
<div class="mt-4 sm:mt-5">
{% if g.user %}
<form
method="post"
action="{{ page_cart_url(page_post.slug, '/checkout/') if page_post is defined and page_post else cart_url('/checkout/') }}"
class="w-full"
>
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button
type="submit"
class="w-full inline-flex items-center justify-center px-4 py-2 text-xs sm:text-sm rounded-full border border-emerald-600 bg-emerald-600 text-white hover:bg-emerald-700 transition"
>
<i class="fa-solid fa-credit-card mr-2" aria-hidden="true"></i>
Checkout as {{g.user.email}}
</button>
</form>
{% else %}
{% set href=login_url(request.url) %}
<div
class="w-full flex"
>
<a
href="{{ href }}"
hx-get="{{ href }}"
hx-target="#main-panel"
hx-select ="{{hx_select_search}}"
hx-swap="outerHTML"
hx-push-url="true"
aria-selected="{{ 'true' if local_href == request.path else 'false' }}"
class="w-full cursor-pointer flex flex-row items-center justify-center p-3 gap-2 rounded bg-stone-200 text-black {{select_colours}}"
data-close-details
>
<i class="fa-solid fa-key"></i>
<span>sign in or register to checkout</span>
</a>
</div>
{% endif %}
</div>
</div>
</aside>
{% endmacro %}
{% macro cart_total(cart, total) %}
{% set cart_total = total(cart) %}
{% if cart_total %}
{% set symbol = "£" if cart[0].product.regular_price_currency == "GBP" else cart[0].product.regular_price_currency %}
{{ symbol }}{{ "%.2f"|format(cart_total) }}
{% else %}
{% endif %}
{% endmacro %}
{% macro cart_grand_total(cart, total, calendar_total, calendar_cart_entries) %}
{% set product_total = total(cart) or 0 %}
{% set cal_total = calendar_total(calendar_cart_entries) or 0 %}
{% set grand = product_total + cal_total %}
{% if cart and cart[0].product.regular_price_currency %}
{% set symbol = "£" if cart[0].product.regular_price_currency == "GBP" else cart[0].product.regular_price_currency %}
{% else %}
{% set symbol = "£" %}
{% endif %}
{{ symbol }}{{ "%.2f"|format(grand) }}
{% endmacro %}

View File

@@ -17,7 +17,7 @@
{% if _count == 0 %} {% if _count == 0 %}
<div class="h-12 w-12 rounded-full overflow-hidden border border-stone-300 flex-shrink-0"> <div class="h-12 w-12 rounded-full overflow-hidden border border-stone-300 flex-shrink-0">
<a <a
href="{{ blog_url('/') }}" href="{{ coop_url('/') }}"
class="h-full w-full font-bold text-5xl flex-shrink-0 flex flex-row items-center gap-1" class="h-full w-full font-bold text-5xl flex-shrink-0 flex flex-row items-center gap-1"
> >
<img <img

View File

@@ -6,7 +6,7 @@
{% from 'macros/scrolling_menu.html' import scrolling_menu with context %} {% from 'macros/scrolling_menu.html' import scrolling_menu with context %}
{% call(entry_post) scrolling_menu('entry-posts-container', entry_posts) %} {% call(entry_post) scrolling_menu('entry-posts-container', entry_posts) %}
<a <a
href="{{ blog_url('/' + entry_post.slug + '/') }}" href="{{ coop_url('/' + entry_post.slug + '/') }}"
class="flex items-center gap-2 px-3 py-2 hover:bg-stone-100 rounded transition text-sm border sm:whitespace-nowrap sm:flex-shrink-0"> class="flex items-center gap-2 px-3 py-2 hover:bg-stone-100 rounded transition text-sm border sm:whitespace-nowrap sm:flex-shrink-0">
{% if entry_post.feature_image %} {% if entry_post.feature_image %}
<img src="{{ entry_post.feature_image }}" <img src="{{ entry_post.feature_image }}"

View File

@@ -9,7 +9,7 @@
{% from 'macros/scrolling_menu.html' import scrolling_menu with context %} {% from 'macros/scrolling_menu.html' import scrolling_menu with context %}
{% call(entry_post) scrolling_menu('entry-posts-container', entry_posts) %} {% call(entry_post) scrolling_menu('entry-posts-container', entry_posts) %}
<a <a
href="{{ blog_url('/' + entry_post.slug + '/') }}" href="{{ coop_url('/' + entry_post.slug + '/') }}"
class="{{styles.nav_button}}" class="{{styles.nav_button}}"
> >
{% if entry_post.feature_image %} {% if entry_post.feature_image %}

View File

@@ -2,7 +2,7 @@
class="font-bold text-xl flex-shrink-0 flex gap-2 items-center"> class="font-bold text-xl flex-shrink-0 flex gap-2 items-center">
<div> <div>
<i class="fa fa-shop"></i> <i class="fa fa-shop"></i>
{{ market_title }} {{ coop_title }}
</div> </div>
<div class="flex flex-col md:flex-row md:gap-2 text-xs"> <div class="flex flex-col md:flex-row md:gap-2 text-xs">
<div> <div>

View File

@@ -5,7 +5,7 @@
hx-swap-oob="outerHTML"> hx-swap-oob="outerHTML">
{% from 'macros/scrolling_menu.html' import scrolling_menu with context %} {% from 'macros/scrolling_menu.html' import scrolling_menu with context %}
{% call(item) scrolling_menu('menu-items-container', menu_items) %} {% call(item) scrolling_menu('menu-items-container', menu_items) %}
{% set _href = _app_slugs.get(item.slug, blog_url('/' + item.slug + '/')) %} {% set _href = _app_slugs.get(item.slug, coop_url('/' + item.slug + '/')) %}
<a <a
href="{{ _href }}" href="{{ _href }}"
{% if item.slug not in _app_slugs %} {% if item.slug not in _app_slugs %}

View File

@@ -1,7 +1,7 @@
{% import 'macros/links.html' as links %} {% import 'macros/links.html' as links %}
{% macro header_row(oob=False) %} {% macro header_row(oob=False) %}
{% call links.menu_row(id='post-row', oob=oob) %} {% call links.menu_row(id='post-row', oob=oob) %}
<a href="{{ blog_url('/' + post.slug + '/') }}" class="flex items-center gap-2 px-3 py-2 rounded whitespace-normal text-center break-words leading-snug"> <a href="{{ coop_url('/' + post.slug + '/') }}" class="flex items-center gap-2 px-3 py-2 rounded whitespace-normal text-center break-words leading-snug">
{% if post.feature_image %} {% if post.feature_image %}
<img <img
src="{{ post.feature_image }}" src="{{ post.feature_image }}"

View File

@@ -30,7 +30,7 @@
{% block filter %} {% block filter %}
{% call layout.details() %} {% call layout.details() %}
{% call layout.summary('blog-child-header') %} {% call layout.summary('coop-child-header') %}
{% endcall %} {% endcall %}
{% call layout.menu('blog-child-menu') %} {% call layout.menu('blog-child-menu') %}
{% endcall %} {% endcall %}

View File

@@ -30,8 +30,8 @@
{% block filter %} {% block filter %}
{% call layout.details() %} {% call layout.details() %}
{% call layout.summary('blog-child-header') %} {% call layout.summary('coop-child-header') %}
{% block blog_child_summary %} {% block coop_child_summary %}
{% endblock %} {% endblock %}
{% endcall %} {% endcall %}
{% call layout.menu('blog-child-menu') %} {% call layout.menu('blog-child-menu') %}

View File

@@ -1,5 +1,5 @@
{% set href=blog_url('/auth/account/') %} {% set href=coop_url('/auth/account/') %}
<a <a
href="{{ href }}" href="{{ href }}"
class="justify-center cursor-pointer flex flex-row items-center p-3 gap-2 rounded bg-stone-200 text-black {{select_colours}}" class="justify-center cursor-pointer flex flex-row items-center p-3 gap-2 rounded bg-stone-200 text-black {{select_colours}}"

View File

@@ -4,7 +4,7 @@
id="menu-items-nav-wrapper"> id="menu-items-nav-wrapper">
{% from 'macros/scrolling_menu.html' import scrolling_menu with context %} {% from 'macros/scrolling_menu.html' import scrolling_menu with context %}
{% call(item) scrolling_menu('menu-items-container', menu_items) %} {% call(item) scrolling_menu('menu-items-container', menu_items) %}
{% set _href = _app_slugs.get(item.slug, blog_url('/' + item.slug + '/')) %} {% set _href = _app_slugs.get(item.slug, coop_url('/' + item.slug + '/')) %}
<a <a
href="{{ _href }}" href="{{ _href }}"
aria-selected="{{ 'true' if (item.slug == _first_seg or item.slug == app_name) else 'false' }}" aria-selected="{{ 'true' if (item.slug == _first_seg or item.slug == app_name) else 'false' }}"

View File

@@ -1,6 +1,6 @@
{% import 'macros/links.html' as links %} {% import 'macros/links.html' as links %}
{% if g.rights.admin %} {% if g.rights.admin %}
<a href="{{ blog_url('/settings/') }}" class="{{styles.nav_button}}"> <a href="{{ coop_url('/settings/') }}" class="{{styles.nav_button}}">
<i class="fa fa-cog" aria-hidden="true"></i> <i class="fa fa-cog" aria-hidden="true"></i>
</a> </a>
{% endif %} {% endif %}

View File

@@ -31,7 +31,7 @@
← Go Back ← Go Back
</button> </button>
<a <a
href="{{ blog_url('/') }}" href="{{ coop_url('/') }}"
class="px-4 py-2 bg-stone-800 text-white rounded hover:bg-stone-700 transition-colors text-center" class="px-4 py-2 bg-stone-800 text-white rounded hover:bg-stone-700 transition-colors text-center"
> >
Home Home

View File

@@ -1,5 +1,5 @@
{% set href=blog_url('/auth/account/') %} {% set href=coop_url('/auth/account/') %}
<a <a
href="{{ href }}" href="{{ href }}"
data-close-details data-close-details

View File

@@ -1,6 +1,6 @@
{% macro title(_class='') %} {% macro title(_class='') %}
<a <a
href="{{ blog_url('/') }}" href="{{ coop_url('/') }}"
class="{{_class}}" class="{{_class}}"
> >
<h1> <h1>

View File

@@ -129,12 +129,6 @@ class CalendarService(Protocol):
self, session: AsyncSession, content_type: str, content_id: int, self, session: AsyncSession, content_type: str, content_id: int,
) -> set[int]: ... ) -> set[int]: ...
async def upcoming_entries_for_container(
self, session: AsyncSession,
container_type: str | None = None, container_id: int | None = None,
*, page: int = 1, per_page: int = 20,
) -> tuple[list[CalendarEntryDTO], bool]: ...
async def visible_entries_for_period( async def visible_entries_for_period(
self, session: AsyncSession, calendar_id: int, self, session: AsyncSession, calendar_id: int,
period_start: datetime, period_end: datetime, period_start: datetime, period_end: datetime,
@@ -155,12 +149,6 @@ class MarketService(Protocol):
name: str, slug: str, name: str, slug: str,
) -> MarketPlaceDTO: ... ) -> MarketPlaceDTO: ...
async def list_marketplaces(
self, session: AsyncSession,
container_type: str | None = None, container_id: int | None = None,
*, page: int = 1, per_page: int = 20,
) -> tuple[list[MarketPlaceDTO], bool]: ...
async def soft_delete_marketplace( async def soft_delete_marketplace(
self, session: AsyncSession, container_type: str, container_id: int, self, session: AsyncSession, container_type: str, container_id: int,
slug: str, slug: str,
@@ -228,21 +216,11 @@ class FederationService(Protocol):
self, session: AsyncSession, source_type: str, source_id: int, self, session: AsyncSession, source_type: str, source_id: int,
) -> APActivityDTO | None: ... ) -> APActivityDTO | None: ...
async def count_activities_for_source(
self, session: AsyncSession, source_type: str, source_id: int,
*, activity_type: str,
) -> int: ...
# -- Followers ------------------------------------------------------------ # -- Followers ------------------------------------------------------------
async def get_followers( async def get_followers(
self, session: AsyncSession, username: str, self, session: AsyncSession, username: str,
) -> list[APFollowerDTO]: ... ) -> list[APFollowerDTO]: ...
async def get_followers_paginated(
self, session: AsyncSession, username: str,
page: int = 1, per_page: int = 20,
) -> tuple[list[RemoteActorDTO], int]: ...
async def add_follower( async def add_follower(
self, session: AsyncSession, username: str, self, session: AsyncSession, username: str,
follower_acct: str, follower_inbox: str, follower_actor_url: str, follower_acct: str, follower_inbox: str, follower_actor_url: str,
@@ -262,10 +240,6 @@ class FederationService(Protocol):
self, session: AsyncSession, acct: str, self, session: AsyncSession, acct: str,
) -> RemoteActorDTO | None: ... ) -> RemoteActorDTO | None: ...
async def search_actors(
self, session: AsyncSession, query: str, page: int = 1, limit: int = 20,
) -> tuple[list[RemoteActorDTO], int]: ...
# -- Following (outbound) ------------------------------------------------- # -- Following (outbound) -------------------------------------------------
async def send_follow( async def send_follow(
self, session: AsyncSession, local_username: str, remote_actor_url: str, self, session: AsyncSession, local_username: str, remote_actor_url: str,
@@ -309,11 +283,6 @@ class FederationService(Protocol):
before: datetime | None = None, limit: int = 20, before: datetime | None = None, limit: int = 20,
) -> list[TimelineItemDTO]: ... ) -> list[TimelineItemDTO]: ...
async def get_actor_timeline(
self, session: AsyncSession, remote_actor_id: int,
before: datetime | None = None, limit: int = 20,
) -> list[TimelineItemDTO]: ...
# -- Local posts ---------------------------------------------------------- # -- Local posts ----------------------------------------------------------
async def create_local_post( async def create_local_post(
self, session: AsyncSession, actor_profile_id: int, self, session: AsyncSession, actor_profile_id: int,

View File

@@ -7,7 +7,7 @@ from quart import Quart, g
DATABASE_URL = ( DATABASE_URL = (
os.getenv("DATABASE_URL_ASYNC") os.getenv("DATABASE_URL_ASYNC")
or os.getenv("DATABASE_URL") or os.getenv("DATABASE_URL")
or "postgresql+asyncpg://localhost/blog" or "postgresql+asyncpg://localhost/coop"
) )
_engine = create_async_engine( _engine = create_async_engine(
@@ -15,7 +15,7 @@ _engine = create_async_engine(
future=True, future=True,
echo=False, echo=False,
pool_pre_ping=True, pool_pre_ping=True,
pool_size=0, # 0 = unlimited (NullPool equivalent for asyncpg) pool_size=-1 # ned to look at this!!!
) )
_Session = async_sessionmaker( _Session = async_sessionmaker(
@@ -34,42 +34,43 @@ async def get_session():
await sess.close() await sess.close()
def register_db(app: Quart): def register_db(app: Quart):
@app.before_request @app.before_request
async def open_session(): async def open_session():
g.s = _Session() g.s = _Session()
g.tx = await g.s.begin() g.tx = await g.s.begin()
g.had_error = False g.had_error = False
@app.after_request @app.after_request
async def maybe_commit(response): async def maybe_commit(response):
# Runs BEFORE bytes are sent. # Runs BEFORE bytes are sent.
if not g.had_error and 200 <= response.status_code < 400: if not g.had_error and 200 <= response.status_code < 400:
try: try:
if hasattr(g, "tx"): if hasattr(g, "tx"):
await g.tx.commit() await g.tx.commit()
except Exception as e: except Exception as e:
print(f'commit failed {e}') print(f'commit failed {e}')
if hasattr(g, "tx"): if hasattr(g, "tx"):
await g.tx.rollback() await g.tx.rollback()
from quart import make_response from quart import make_response
return await make_response("Commit failed", 500) return await make_response("Commit failed", 500)
return response return response
@app.teardown_request @app.teardown_request
async def finish(exc): async def finish(exc):
try: try:
# If an exception occurred OR we didn't commit (still in txn), roll back. # If an exception occurred OR we didn't commit (still in txn), roll back.
if hasattr(g, "s"): if hasattr(g, "s"):
if exc is not None or g.s.in_transaction(): if exc is not None or g.s.in_transaction():
if hasattr(g, "tx"): if hasattr(g, "tx"):
await g.tx.rollback() await g.tx.rollback()
finally: finally:
if hasattr(g, "s"): if hasattr(g, "s"):
await g.s.close() await g.s.close()
@app.errorhandler(Exception) @app.errorhandler(Exception)
async def mark_error(e): async def mark_error(e):
g.had_error = True g.had_error = True
raise raise

View File

@@ -1,9 +1,4 @@
from .bus import emit_activity, register_activity_handler, get_activity_handlers from .bus import emit_event, register_handler
from .processor import EventProcessor from .processor import EventProcessor
__all__ = [ __all__ = ["emit_event", "register_handler", "EventProcessor"]
"emit_activity",
"register_activity_handler",
"get_activity_handlers",
"EventProcessor",
]

View File

@@ -1,122 +1,56 @@
""" """
Unified activity bus. Transactional outbox event bus.
emit_activity() writes an APActivity row with process_state='pending' within emit_event() writes to the domain_events table within the caller's existing
the caller's existing DB transaction — atomic with the domain change. DB transaction — atomic with whatever domain change triggered the event.
register_activity_handler() registers async handler functions that the register_handler() registers async handler functions that the EventProcessor
EventProcessor dispatches when processing pending activities. will call when processing events of a given type.
""" """
from __future__ import annotations from __future__ import annotations
import logging
import uuid
from collections import defaultdict from collections import defaultdict
from typing import Awaitable, Callable, Dict, List, Tuple from typing import Any, Awaitable, Callable, Dict, List
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.federation import APActivity from shared.models.domain_event import DomainEvent
log = logging.getLogger(__name__) # handler signature: async def handler(event: DomainEvent, session: AsyncSession) -> None
HandlerFn = Callable[[DomainEvent, AsyncSession], Awaitable[None]]
# --------------------------------------------------------------------------- _handlers: Dict[str, List[HandlerFn]] = defaultdict(list)
# Activity-handler registry
# ---------------------------------------------------------------------------
# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None
ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]]
# Keyed by (activity_type, object_type). object_type="*" is wildcard.
_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list)
def register_activity_handler( async def emit_event(
activity_type: str,
fn: ActivityHandlerFn,
*,
object_type: str | None = None,
) -> None:
"""Register an async handler for an activity type + optional object type.
Use ``activity_type="*"`` as a wildcard that fires for every activity
(e.g. federation delivery handler).
"""
key = (activity_type, object_type or "*")
_activity_handlers[key].append(fn)
log.info("Registered activity handler %s.%s for key %s", fn.__module__, fn.__qualname__, key)
def get_activity_handlers(
activity_type: str,
object_type: str | None = None,
) -> List[ActivityHandlerFn]:
"""Return all matching handlers for an activity.
Matches in order:
1. Exact (activity_type, object_type)
2. (activity_type, "*") — type-level wildcard
3. ("*", "*") — global wildcard (e.g. delivery)
"""
handlers: List[ActivityHandlerFn] = []
ot = object_type or "*"
# Exact match
if ot != "*":
handlers.extend(_activity_handlers.get((activity_type, ot), []))
# Type-level wildcard
handlers.extend(_activity_handlers.get((activity_type, "*"), []))
# Global wildcard
if activity_type != "*":
handlers.extend(_activity_handlers.get(("*", "*"), []))
return handlers
# ---------------------------------------------------------------------------
# emit_activity — the primary way to emit events
# ---------------------------------------------------------------------------
async def emit_activity(
session: AsyncSession, session: AsyncSession,
*, event_type: str,
activity_type: str, aggregate_type: str,
actor_uri: str, aggregate_id: int,
object_type: str, payload: Dict[str, Any] | None = None,
object_data: dict | None = None, ) -> DomainEvent:
source_type: str | None = None,
source_id: int | None = None,
visibility: str = "internal",
actor_profile_id: int | None = None,
origin_app: str | None = None,
) -> APActivity:
""" """
Write an AP-shaped activity to ap_activities with process_state='pending'. Write a domain event to the outbox table in the current transaction.
Called inside a service function using the same session that performs the Call this inside your service function, using the same session that
domain change. The activity and the change commit together. performs the domain change. The event and the change commit together.
""" """
if not origin_app: event = DomainEvent(
try: event_type=event_type,
from quart import current_app aggregate_type=aggregate_type,
origin_app = current_app.name aggregate_id=aggregate_id,
except (ImportError, RuntimeError): payload=payload or {},
pass
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
activity = APActivity(
activity_id=activity_uri,
activity_type=activity_type,
actor_profile_id=actor_profile_id,
actor_uri=actor_uri,
object_type=object_type,
object_data=object_data or {},
is_local=True,
source_type=source_type,
source_id=source_id,
visibility=visibility,
process_state="pending",
origin_app=origin_app,
) )
session.add(activity) session.add(event)
await session.flush() await session.flush() # assign event.id
return activity return event
def register_handler(event_type: str, fn: HandlerFn) -> None:
"""Register an async handler for a given event type."""
_handlers[event_type].append(fn)
def get_handlers(event_type: str) -> List[HandlerFn]:
"""Return all registered handlers for an event type."""
return _handlers.get(event_type, [])

View File

@@ -1,4 +1,4 @@
"""Shared event handlers.""" """Shared event handlers (replaces glue.setup.register_glue_handlers)."""
def register_shared_handlers(): def register_shared_handlers():
@@ -6,4 +6,5 @@ def register_shared_handlers():
import shared.events.handlers.container_handlers # noqa: F401 import shared.events.handlers.container_handlers # noqa: F401
import shared.events.handlers.login_handlers # noqa: F401 import shared.events.handlers.login_handlers # noqa: F401
import shared.events.handlers.order_handlers # noqa: F401 import shared.events.handlers.order_handlers # noqa: F401
# federation_handlers removed — publication is now inline at write sites
import shared.events.handlers.ap_delivery_handler # noqa: F401 import shared.events.handlers.ap_delivery_handler # noqa: F401

View File

@@ -1,7 +1,7 @@
"""Deliver AP activities to remote followers. """Deliver AP activities to remote followers.
Registered as a wildcard handler — fires for every activity. Skips On ``federation.activity_created`` → load activity + actor + followers →
non-public activities and those without an actor profile. sign with HTTP Signatures → POST to each follower inbox.
""" """
from __future__ import annotations from __future__ import annotations
@@ -11,7 +11,7 @@ import httpx
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events.bus import register_activity_handler from shared.events.bus import register_handler, DomainEvent
from shared.models.federation import ActorProfile, APActivity, APFollower from shared.models.federation import ActorProfile, APActivity, APFollower
from shared.services.registry import services from shared.services.registry import services
@@ -29,22 +29,22 @@ def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str)
obj = dict(activity.object_data or {}) obj = dict(activity.object_data or {})
# Object id MUST be on the actor's domain (Mastodon origin check). # Object id MUST be on the actor's domain (Mastodon origin check).
# The post URL (e.g. blog.rose-ash.com/slug/) goes in "url" only. # The post URL (e.g. coop.rose-ash.com/slug/) goes in "url" only.
object_id = activity.activity_id + "/object" object_id = activity.activity_id + "/object"
if activity.activity_type == "Delete": if activity.activity_type == "Delete":
# Delete: object is a Tombstone with just id + type
obj.setdefault("id", object_id) obj.setdefault("id", object_id)
obj.setdefault("type", "Tombstone") obj.setdefault("type", "Tombstone")
else: else:
# Create/Update: full object with attribution
# Prefer stable id from object_data (set by try_publish), fall back to activity-derived
obj.setdefault("id", object_id) obj.setdefault("id", object_id)
obj.setdefault("type", activity.object_type) obj.setdefault("type", activity.object_type)
obj.setdefault("attributedTo", actor_url) obj.setdefault("attributedTo", actor_url)
obj.setdefault("published", activity.published.isoformat() if activity.published else None) obj.setdefault("published", activity.published.isoformat() if activity.published else None)
obj.setdefault("to", ["https://www.w3.org/ns/activitystreams#Public"]) obj.setdefault("to", ["https://www.w3.org/ns/activitystreams#Public"])
obj.setdefault("cc", [f"{actor_url}/followers"]) obj.setdefault("cc", [f"{actor_url}/followers"])
if activity.activity_type == "Update":
from datetime import datetime, timezone
obj["updated"] = datetime.now(timezone.utc).isoformat()
return { return {
"@context": [ "@context": [
@@ -105,19 +105,29 @@ async def _deliver_to_inbox(
return False return False
async def on_any_activity(activity: APActivity, session: AsyncSession) -> None: async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None:
"""Deliver a public activity to all followers of its actor.""" """Deliver a newly created activity to all followers."""
import os import os
# Only deliver public activities that have an actor profile
if activity.visibility != "public":
return
if activity.actor_profile_id is None:
return
if not services.has("federation"): if not services.has("federation"):
return return
domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com") payload = event.payload
activity_id_uri = payload.get("activity_id")
if not activity_id_uri:
return
domain = os.getenv("AP_DOMAIN", "rose-ash.com")
# Load the activity
activity = (
await session.execute(
select(APActivity).where(APActivity.activity_id == activity_id_uri)
)
).scalar_one_or_none()
if not activity:
log.warning("Activity not found: %s", activity_id_uri)
return
# Load actor with private key # Load actor with private key
actor = ( actor = (
@@ -126,7 +136,7 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
) )
).scalar_one_or_none() ).scalar_one_or_none()
if not actor or not actor.private_key_pem: if not actor or not actor.private_key_pem:
log.warning("Actor not found or missing key for activity %s", activity.activity_id) log.warning("Actor not found or missing key for activity %s", activity_id_uri)
return return
# Load followers # Load followers
@@ -137,13 +147,14 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
).scalars().all() ).scalars().all()
if not followers: if not followers:
log.debug("No followers to deliver to for %s", activity.activity_id) log.debug("No followers to deliver to for %s", activity_id_uri)
return return
# Build activity JSON # Build activity JSON
activity_json = _build_activity_json(activity, actor, domain) activity_json = _build_activity_json(activity, actor, domain)
# Deduplicate inboxes # Deliver to each follower inbox
# Deduplicate inboxes (multiple followers might share a shared inbox)
inboxes = {f.follower_inbox for f in followers if f.follower_inbox} inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
log.info( log.info(
@@ -156,5 +167,4 @@ async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain) await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain)
# Wildcard: fires for every activity register_handler("federation.activity_created", on_activity_created)
register_activity_handler("*", on_any_activity)

View File

@@ -2,18 +2,18 @@ from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import register_activity_handler from shared.events import register_handler
from shared.models.federation import APActivity from shared.models.domain_event import DomainEvent
from shared.services.navigation import rebuild_navigation from shared.services.navigation import rebuild_navigation
async def on_child_attached(activity: APActivity, session: AsyncSession) -> None: async def on_child_attached(event: DomainEvent, session: AsyncSession) -> None:
await rebuild_navigation(session) await rebuild_navigation(session)
async def on_child_detached(activity: APActivity, session: AsyncSession) -> None: async def on_child_detached(event: DomainEvent, session: AsyncSession) -> None:
await rebuild_navigation(session) await rebuild_navigation(session)
register_activity_handler("Add", on_child_attached, object_type="rose:ContainerRelation") register_handler("container.child_attached", on_child_attached)
register_activity_handler("Remove", on_child_detached, object_type="rose:ContainerRelation") register_handler("container.child_detached", on_child_detached)

View File

@@ -0,0 +1,8 @@
"""Federation event handlers — REMOVED.
Federation publication is now inline at the write site (ghost_sync, entries,
market routes) via shared.services.federation_publish.try_publish().
AP delivery (federation.activity_created → inbox POST) remains async via
ap_delivery_handler.
"""

View File

@@ -2,22 +2,24 @@ from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import register_activity_handler from shared.events import register_handler
from shared.models.federation import APActivity from shared.models.domain_event import DomainEvent
from shared.services.registry import services from shared.services.registry import services
async def on_user_logged_in(activity: APActivity, session: AsyncSession) -> None: async def on_user_logged_in(event: DomainEvent, session: AsyncSession) -> None:
data = activity.object_data payload = event.payload
user_id = data["user_id"] user_id = payload["user_id"]
session_id = data["session_id"] session_id = payload["session_id"]
# Adopt cart items (if cart service is registered)
if services.has("cart"): if services.has("cart"):
await services.cart.adopt_cart_for_user(session, user_id, session_id) await services.cart.adopt_cart_for_user(session, user_id, session_id)
# Adopt calendar entries and tickets (if calendar service is registered)
if services.has("calendar"): if services.has("calendar"):
await services.calendar.adopt_entries_for_user(session, user_id, session_id) await services.calendar.adopt_entries_for_user(session, user_id, session_id)
await services.calendar.adopt_tickets_for_user(session, user_id, session_id) await services.calendar.adopt_tickets_for_user(session, user_id, session_id)
register_activity_handler("rose:Login", on_user_logged_in) register_handler("user.logged_in", on_user_logged_in)

View File

@@ -4,19 +4,19 @@ import logging
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import register_activity_handler from shared.events import register_handler
from shared.models.federation import APActivity from shared.models.domain_event import DomainEvent
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def on_order_created(activity: APActivity, session: AsyncSession) -> None: async def on_order_created(event: DomainEvent, session: AsyncSession) -> None:
log.info("order.created: order_id=%s", activity.object_data.get("order_id")) log.info("order.created: order_id=%s", event.payload.get("order_id"))
async def on_order_paid(activity: APActivity, session: AsyncSession) -> None: async def on_order_paid(event: DomainEvent, session: AsyncSession) -> None:
log.info("order.paid: order_id=%s", activity.object_data.get("order_id")) log.info("order.paid: order_id=%s", event.payload.get("order_id"))
register_activity_handler("Create", on_order_created, object_type="rose:Order") register_handler("order.created", on_order_created)
register_activity_handler("rose:OrderPaid", on_order_paid) register_handler("order.paid", on_order_paid)

View File

@@ -1,6 +1,6 @@
""" """
Event processor — polls the ap_activities table and dispatches to registered Event processor — polls the domain_events outbox table and dispatches
activity handlers. to registered handlers.
Runs as an asyncio background task within each app process. Runs as an asyncio background task within each app process.
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing. Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
@@ -8,31 +8,26 @@ Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
import traceback import traceback
from datetime import datetime, timezone from datetime import datetime, timezone
from sqlalchemy import select from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.db.session import get_session from shared.db.session import get_session
from shared.models.federation import APActivity from shared.models.domain_event import DomainEvent
from .bus import get_activity_handlers from .bus import get_handlers
log = logging.getLogger(__name__)
class EventProcessor: class EventProcessor:
"""Background event processor that polls the ap_activities table.""" """Background event processor that polls the outbox table."""
def __init__( def __init__(
self, self,
*, *,
app_name: str | None = None,
poll_interval: float = 2.0, poll_interval: float = 2.0,
batch_size: int = 10, batch_size: int = 10,
): ):
self._app_name = app_name
self._poll_interval = poll_interval self._poll_interval = poll_interval
self._batch_size = batch_size self._batch_size = batch_size
self._task: asyncio.Task | None = None self._task: asyncio.Task | None = None
@@ -69,66 +64,54 @@ class EventProcessor:
await asyncio.sleep(self._poll_interval) await asyncio.sleep(self._poll_interval)
async def _process_batch(self) -> int: async def _process_batch(self) -> int:
"""Fetch and process a batch of pending activities. Returns count processed.""" """Fetch and process a batch of pending events. Returns count processed."""
processed = 0 processed = 0
async with get_session() as session: async with get_session() as session:
filters = [ # FOR UPDATE SKIP LOCKED: safe for concurrent processors
APActivity.process_state == "pending",
APActivity.process_attempts < APActivity.process_max_attempts,
]
if self._app_name:
filters.append(APActivity.origin_app == self._app_name)
stmt = ( stmt = (
select(APActivity) select(DomainEvent)
.where(*filters) .where(
.order_by(APActivity.created_at) DomainEvent.state == "pending",
DomainEvent.attempts < DomainEvent.max_attempts,
)
.order_by(DomainEvent.created_at)
.limit(self._batch_size) .limit(self._batch_size)
.with_for_update(skip_locked=True) .with_for_update(skip_locked=True)
) )
result = await session.execute(stmt) result = await session.execute(stmt)
activities = result.scalars().all() events = result.scalars().all()
for activity in activities: for event in events:
await self._process_one(session, activity) await self._process_one(session, event)
processed += 1 processed += 1
await session.commit() await session.commit()
return processed return processed
async def _process_one(self, session: AsyncSession, activity: APActivity) -> None: async def _process_one(self, session: AsyncSession, event: DomainEvent) -> None:
"""Run all handlers for a single activity.""" """Run all handlers for a single event."""
handlers = get_activity_handlers(activity.activity_type, activity.object_type) handlers = get_handlers(event.event_type)
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
log.info( event.state = "processing"
"Processing activity %s: type=%s object_type=%s visibility=%s actor_profile_id=%s%d handler(s) found", event.attempts += 1
activity.id, activity.activity_type, activity.object_type,
activity.visibility, activity.actor_profile_id, len(handlers),
)
for h in handlers:
log.info(" handler: %s.%s", h.__module__, h.__qualname__)
activity.process_state = "processing"
activity.process_attempts += 1
await session.flush() await session.flush()
if not handlers: if not handlers:
activity.process_state = "completed" # No handlers registered — mark completed (nothing to do)
activity.processed_at = now event.state = "completed"
event.processed_at = now
return return
try: try:
for handler in handlers: for handler in handlers:
log.info(" calling %s.%s", handler.__module__, handler.__qualname__) await handler(event, session)
await handler(activity, session) event.state = "completed"
log.info(" done %s.%s", handler.__module__, handler.__qualname__) event.processed_at = now
activity.process_state = "completed"
activity.processed_at = now
except Exception as exc: except Exception as exc:
log.exception("Handler failed for activity %s", activity.id) event.last_error = f"{exc.__class__.__name__}: {exc}"
activity.process_error = f"{exc.__class__.__name__}: {exc}" if event.attempts >= event.max_attempts:
if activity.process_attempts >= activity.process_max_attempts: event.state = "failed"
activity.process_state = "failed" event.processed_at = now
activity.processed_at = now
else: else:
activity.process_state = "pending" # retry event.state = "pending" # retry

View File

@@ -54,7 +54,7 @@ def create_base_app(
context_fn: context_fn:
Async function returning a dict for template context. Async function returning a dict for template context.
Each app provides its own — the cart app queries locally, Each app provides its own — the cart app queries locally,
while blog/market apps fetch via internal API. while coop/market apps fetch via internal API.
If not provided, a minimal default context is used. If not provided, a minimal default context is used.
before_request_fns: before_request_fns:
Extra before-request hooks (e.g. cart_loader for the cart app). Extra before-request hooks (e.g. cart_loader for the cart app).
@@ -84,7 +84,7 @@ def create_base_app(
cookie_domain = os.getenv("SESSION_COOKIE_DOMAIN") # e.g. ".rose-ash.com" cookie_domain = os.getenv("SESSION_COOKIE_DOMAIN") # e.g. ".rose-ash.com"
if cookie_domain: if cookie_domain:
app.config["SESSION_COOKIE_DOMAIN"] = cookie_domain app.config["SESSION_COOKIE_DOMAIN"] = cookie_domain
app.config["SESSION_COOKIE_NAME"] = "blog_session" app.config["SESSION_COOKIE_NAME"] = "coop_session"
# Ghost / Redis config # Ghost / Redis config
app.config["GHOST_API_URL"] = os.getenv("GHOST_API_URL") app.config["GHOST_API_URL"] = os.getenv("GHOST_API_URL")
@@ -144,7 +144,7 @@ def create_base_app(
return await base_context() return await base_context()
# --- event processor --- # --- event processor ---
_event_processor = EventProcessor(app_name=name) _event_processor = EventProcessor()
# --- startup --- # --- startup ---
@app.before_serving @app.before_serving

View File

@@ -13,7 +13,7 @@ from shared.browser.app.csrf import generate_csrf_token
from shared.browser.app.authz import has_access from shared.browser.app.authz import has_access
from shared.browser.app.filters import register as register_filters from shared.browser.app.filters import register as register_filters
from .urls import blog_url, market_url, cart_url, events_url, federation_url, login_url, page_cart_url, market_product_url from .urls import coop_url, market_url, cart_url, events_url, login_url, page_cart_url, market_product_url
def setup_jinja(app: Quart) -> None: def setup_jinja(app: Quart) -> None:
@@ -93,11 +93,10 @@ def setup_jinja(app: Quart) -> None:
app.jinja_env.globals["site"] = site app.jinja_env.globals["site"] = site
# cross-app URL helpers available in all templates # cross-app URL helpers available in all templates
app.jinja_env.globals["blog_url"] = blog_url app.jinja_env.globals["coop_url"] = coop_url
app.jinja_env.globals["market_url"] = market_url app.jinja_env.globals["market_url"] = market_url
app.jinja_env.globals["cart_url"] = cart_url app.jinja_env.globals["cart_url"] = cart_url
app.jinja_env.globals["events_url"] = events_url app.jinja_env.globals["events_url"] = events_url
app.jinja_env.globals["federation_url"] = federation_url
app.jinja_env.globals["login_url"] = login_url app.jinja_env.globals["login_url"] = login_url
app.jinja_env.globals["page_cart_url"] = page_cart_url app.jinja_env.globals["page_cart_url"] = page_cart_url
app.jinja_env.globals["market_product_url"] = market_product_url app.jinja_env.globals["market_product_url"] = market_product_url

View File

@@ -21,8 +21,8 @@ def app_url(app_name: str, path: str = "/") -> str:
return base + path return base + path
def blog_url(path: str = "/") -> str: def coop_url(path: str = "/") -> str:
return app_url("blog", path) return app_url("coop", path)
def market_url(path: str = "/") -> str: def market_url(path: str = "/") -> str:
@@ -66,17 +66,9 @@ def market_product_url(product_slug: str, suffix: str = "", market_place=None) -
def login_url(next_url: str = "") -> str: def login_url(next_url: str = "") -> str:
# Auth lives in federation. Set AUTH_APP to override. # Auth lives in blog (coop) for now. Set AUTH_APP=federation to switch.
from quart import session as qsession auth_app = os.getenv("AUTH_APP", "coop")
auth_app = os.getenv("AUTH_APP", "federation")
base = app_url(auth_app, "/auth/login/") base = app_url(auth_app, "/auth/login/")
params: list[str] = []
if next_url: if next_url:
params.append(f"next={quote(next_url, safe='')}") return f"{base}?next={quote(next_url, safe='')}"
# Pass anonymous cart session so the auth app can adopt it on login
cart_sid = qsession.get("cart_sid")
if cart_sid:
params.append(f"cart_sid={quote(cart_sid, safe='')}")
if params:
return f"{base}?{'&'.join(params)}"
return base return base

View File

@@ -8,6 +8,8 @@ from .ghost_membership_entities import (
GhostNewsletter, UserNewsletter, GhostNewsletter, UserNewsletter,
GhostTier, GhostSubscription, GhostTier, GhostSubscription,
) )
from .domain_event import DomainEvent
from .ghost_content import Tag, Post, Author, PostAuthor, PostTag, PostLike from .ghost_content import Tag, Post, Author, PostAuthor, PostTag, PostLike
from .page_config import PageConfig from .page_config import PageConfig
from .order import Order, OrderItem from .order import Order, OrderItem

30
models/domain_event.py Normal file
View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from datetime import datetime
from sqlalchemy import String, Integer, DateTime, Text, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from shared.db.base import Base
class DomainEvent(Base):
__tablename__ = "domain_events"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
event_type: Mapped[str] = mapped_column(String(128), nullable=False, index=True)
aggregate_type: Mapped[str] = mapped_column(String(64), nullable=False)
aggregate_id: Mapped[int] = mapped_column(Integer, nullable=False)
payload: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
state: Mapped[str] = mapped_column(
String(20), nullable=False, default="pending", server_default="pending", index=True
)
attempts: Mapped[int] = mapped_column(Integer, nullable=False, default=0, server_default="0")
max_attempts: Mapped[int] = mapped_column(Integer, nullable=False, default=5, server_default="5")
last_error: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
def __repr__(self) -> str:
return f"<DomainEvent {self.id} {self.event_type} [{self.state}]>"

View File

@@ -50,19 +50,14 @@ class ActorProfile(Base):
class APActivity(Base): class APActivity(Base):
"""An ActivityPub activity (local or remote). """An ActivityPub activity (local or remote)."""
Also serves as the unified event bus: internal domain events and public
federation activities both live here, distinguished by ``visibility``.
The ``EventProcessor`` polls rows with ``process_state='pending'``.
"""
__tablename__ = "ap_activities" __tablename__ = "ap_activities"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
activity_type: Mapped[str] = mapped_column(String(64), nullable=False) activity_type: Mapped[str] = mapped_column(String(64), nullable=False)
actor_profile_id: Mapped[int | None] = mapped_column( actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True, Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
) )
object_type: Mapped[str | None] = mapped_column(String(64), nullable=True) object_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
@@ -88,30 +83,6 @@ class APActivity(Base):
DateTime(timezone=True), nullable=False, server_default=func.now(), DateTime(timezone=True), nullable=False, server_default=func.now(),
) )
# --- Unified event-bus columns ---
actor_uri: Mapped[str | None] = mapped_column(
String(512), nullable=True,
)
visibility: Mapped[str] = mapped_column(
String(20), nullable=False, default="public", server_default="public",
)
process_state: Mapped[str] = mapped_column(
String(20), nullable=False, default="completed", server_default="completed",
)
process_attempts: Mapped[int] = mapped_column(
Integer, nullable=False, default=0, server_default="0",
)
process_max_attempts: Mapped[int] = mapped_column(
Integer, nullable=False, default=5, server_default="5",
)
process_error: Mapped[str | None] = mapped_column(Text, nullable=True)
processed_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True,
)
origin_app: Mapped[str | None] = mapped_column(
String(64), nullable=True,
)
# Relationships # Relationships
actor_profile = relationship("ActorProfile", back_populates="activities") actor_profile = relationship("ActorProfile", back_populates="activities")
@@ -119,7 +90,6 @@ class APActivity(Base):
Index("ix_ap_activity_actor", "actor_profile_id"), Index("ix_ap_activity_actor", "actor_profile_id"),
Index("ix_ap_activity_source", "source_type", "source_id"), Index("ix_ap_activity_source", "source_type", "source_id"),
Index("ix_ap_activity_published", "published"), Index("ix_ap_activity_published", "published"),
Index("ix_ap_activity_process", "process_state"),
) )
def __repr__(self) -> str: def __repr__(self) -> str:

View File

@@ -6,7 +6,7 @@ from shared.db.base import Base
class MenuItem(Base): class MenuItem(Base):
"""Deprecated — kept so the table isn't dropped. Use shared.models.menu_node.MenuNode.""" """Deprecated — kept so the table isn't dropped. Use glue.models.MenuNode."""
__tablename__ = "menu_items" __tablename__ = "menu_items"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)

View File

@@ -239,45 +239,6 @@ class SqlCalendarService:
merged = sorted(entries_by_id.values(), key=lambda e: e.start_at or period_start) merged = sorted(entries_by_id.values(), key=lambda e: e.start_at or period_start)
return [_entry_to_dto(e) for e in merged] return [_entry_to_dto(e) for e in merged]
async def upcoming_entries_for_container(
self, session: AsyncSession,
container_type: str | None = None, container_id: int | None = None,
*, page: int = 1, per_page: int = 20,
) -> tuple[list[CalendarEntryDTO], bool]:
"""Upcoming confirmed entries. Optionally scoped to a container."""
filters = [
CalendarEntry.state == "confirmed",
CalendarEntry.deleted_at.is_(None),
CalendarEntry.start_at >= func.now(),
]
if container_type is not None and container_id is not None:
cal_ids = select(Calendar.id).where(
Calendar.container_type == container_type,
Calendar.container_id == container_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters.append(CalendarEntry.calendar_id.in_(cal_ids))
else:
# Still exclude entries from deleted calendars
cal_ids = select(Calendar.id).where(
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters.append(CalendarEntry.calendar_id.in_(cal_ids))
offset = (page - 1) * per_page
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.limit(per_page)
.offset(offset)
.options(selectinload(CalendarEntry.calendar))
)
entries = result.scalars().all()
has_more = len(entries) == per_page
return [_entry_to_dto(e) for e in entries], has_more
async def associated_entries( async def associated_entries(
self, session: AsyncSession, content_type: str, content_id: int, page: int, self, session: AsyncSession, content_type: str, content_id: int, page: int,
) -> tuple[list[CalendarEntryDTO], bool]: ) -> tuple[list[CalendarEntryDTO], bool]:
@@ -410,7 +371,7 @@ class SqlCalendarService:
entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry)) entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry))
return entries_by_post return entries_by_post
# -- writes --------------------------------------------------------------- # -- writes (absorb glue lifecycle) ---------------------------------------
async def adopt_entries_for_user( async def adopt_entries_for_user(
self, session: AsyncSession, user_id: int, session_id: str, self, session: AsyncSession, user_id: int, session_id: str,

View File

@@ -24,15 +24,7 @@ from shared.contracts.dtos import (
def _domain() -> str: def _domain() -> str:
return os.getenv("AP_DOMAIN", "federation.rose-ash.com") return os.getenv("AP_DOMAIN", "rose-ash.com")
def _get_origin_app() -> str | None:
try:
from quart import current_app
return current_app.name
except (ImportError, RuntimeError):
return None
def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO: def _actor_to_dto(actor: ActorProfile) -> ActorProfileDTO:
@@ -191,22 +183,16 @@ class SqlFederationService:
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
actor_url = f"https://{domain}/users/{username}"
activity = APActivity( activity = APActivity(
activity_id=activity_uri, activity_id=activity_uri,
activity_type=activity_type, activity_type=activity_type,
actor_profile_id=actor.id, actor_profile_id=actor.id,
actor_uri=actor_url,
object_type=object_type, object_type=object_type,
object_data=object_data, object_data=object_data,
published=now, published=now,
is_local=True, is_local=True,
source_type=source_type, source_type=source_type,
source_id=source_id, source_id=source_id,
visibility="public",
process_state="pending",
origin_app=_get_origin_app(),
) )
session.add(activity) session.add(activity)
await session.flush() await session.flush()
@@ -222,7 +208,7 @@ class SqlFederationService:
], ],
"id": activity_uri, "id": activity_uri,
"type": activity_type, "type": activity_type,
"actor": actor_url, "actor": f"https://{domain}/users/{username}",
"published": now.isoformat(), "published": now.isoformat(),
"object": { "object": {
"type": object_type, "type": object_type,
@@ -235,6 +221,21 @@ class SqlFederationService:
except Exception: except Exception:
pass # IPFS failure is non-fatal pass # IPFS failure is non-fatal
# Emit domain event for downstream processing (delivery)
from shared.events import emit_event
await emit_event(
session,
"federation.activity_created",
"APActivity",
activity.id,
{
"activity_id": activity.activity_id,
"activity_type": activity_type,
"actor_username": username,
"object_type": object_type,
},
)
return _activity_to_dto(activity) return _activity_to_dto(activity)
# -- Queries -------------------------------------------------------------- # -- Queries --------------------------------------------------------------
@@ -297,20 +298,6 @@ class SqlFederationService:
).scalars().first() ).scalars().first()
return _activity_to_dto(a) if a else None return _activity_to_dto(a) if a else None
async def count_activities_for_source(
self, session: AsyncSession, source_type: str, source_id: int,
*, activity_type: str,
) -> int:
from sqlalchemy import func
result = await session.execute(
select(func.count()).select_from(APActivity).where(
APActivity.source_type == source_type,
APActivity.source_id == source_id,
APActivity.activity_type == activity_type,
)
)
return result.scalar_one()
# -- Followers ------------------------------------------------------------ # -- Followers ------------------------------------------------------------
async def get_followers( async def get_followers(
@@ -389,65 +376,6 @@ class SqlFederationService:
) )
return result.rowcount > 0 return result.rowcount > 0
async def get_followers_paginated(
self, session: AsyncSession, username: str,
page: int = 1, per_page: int = 20,
) -> tuple[list[RemoteActorDTO], int]:
actor = (
await session.execute(
select(ActorProfile).where(ActorProfile.preferred_username == username)
)
).scalar_one_or_none()
if actor is None:
return [], 0
total = (
await session.execute(
select(func.count(APFollower.id)).where(
APFollower.actor_profile_id == actor.id,
)
)
).scalar() or 0
offset = (page - 1) * per_page
followers = (
await session.execute(
select(APFollower)
.where(APFollower.actor_profile_id == actor.id)
.order_by(APFollower.created_at.desc())
.limit(per_page)
.offset(offset)
)
).scalars().all()
results: list[RemoteActorDTO] = []
for f in followers:
# Try to resolve from cached remote actors first
remote = (
await session.execute(
select(RemoteActor).where(
RemoteActor.actor_url == f.follower_actor_url,
)
)
).scalar_one_or_none()
if remote:
results.append(_remote_actor_to_dto(remote))
else:
# Synthesise a minimal DTO from follower data
from urllib.parse import urlparse
domain = urlparse(f.follower_actor_url).netloc
results.append(RemoteActorDTO(
id=0,
actor_url=f.follower_actor_url,
inbox_url=f.follower_inbox,
preferred_username=f.follower_acct.split("@")[0] if "@" in f.follower_acct else f.follower_acct,
domain=domain,
display_name=None,
summary=None,
icon_url=None,
))
return results, total
# -- Remote actors -------------------------------------------------------- # -- Remote actors --------------------------------------------------------
async def get_or_fetch_remote_actor( async def get_or_fetch_remote_actor(
@@ -541,92 +469,6 @@ class SqlFederationService:
return await self._upsert_remote_actor(session, actor_url, data) return await self._upsert_remote_actor(session, actor_url, data)
async def search_actors(
self, session: AsyncSession, query: str, page: int = 1, limit: int = 20,
) -> tuple[list[RemoteActorDTO], int]:
from sqlalchemy import or_
pattern = f"%{query}%"
offset = (page - 1) * limit
# WebFinger resolve for @user@domain queries (first page only)
webfinger_result: RemoteActorDTO | None = None
if page == 1 and "@" in query:
webfinger_result = await self.search_remote_actor(session, query)
# Search cached remote actors
remote_filter = or_(
RemoteActor.preferred_username.ilike(pattern),
RemoteActor.display_name.ilike(pattern),
RemoteActor.domain.ilike(pattern),
)
remote_total = (
await session.execute(
select(func.count(RemoteActor.id)).where(remote_filter)
)
).scalar() or 0
# Search local actor profiles
local_filter = or_(
ActorProfile.preferred_username.ilike(pattern),
ActorProfile.display_name.ilike(pattern),
)
local_total = (
await session.execute(
select(func.count(ActorProfile.id)).where(local_filter)
)
).scalar() or 0
total = remote_total + local_total
# Fetch remote actors page
remote_rows = (
await session.execute(
select(RemoteActor)
.where(remote_filter)
.order_by(RemoteActor.preferred_username)
.limit(limit)
.offset(offset)
)
).scalars().all()
results: list[RemoteActorDTO] = [_remote_actor_to_dto(r) for r in remote_rows]
# Fill remaining slots with local actors
remaining = limit - len(results)
local_offset = max(0, offset - remote_total)
if remaining > 0 and offset + len(results) >= remote_total:
domain = _domain()
local_rows = (
await session.execute(
select(ActorProfile)
.where(local_filter)
.order_by(ActorProfile.preferred_username)
.limit(remaining)
.offset(local_offset)
)
).scalars().all()
for lp in local_rows:
results.append(RemoteActorDTO(
id=0,
actor_url=f"https://{domain}/users/{lp.preferred_username}",
inbox_url=f"https://{domain}/users/{lp.preferred_username}/inbox",
preferred_username=lp.preferred_username,
domain=domain,
display_name=lp.display_name,
summary=lp.summary,
icon_url=None,
))
# Prepend WebFinger result (deduped)
if webfinger_result:
existing_urls = {r.actor_url for r in results}
if webfinger_result.actor_url not in existing_urls:
results.insert(0, webfinger_result)
total += 1
return results, total
# -- Following (outbound) ------------------------------------------------- # -- Following (outbound) -------------------------------------------------
async def send_follow( async def send_follow(
@@ -1124,46 +966,6 @@ class SqlFederationService:
)) ))
return items return items
async def get_actor_timeline(
self, session: AsyncSession, remote_actor_id: int,
before: datetime | None = None, limit: int = 20,
) -> list[TimelineItemDTO]:
remote_actor = (
await session.execute(
select(RemoteActor).where(RemoteActor.id == remote_actor_id)
)
).scalar_one_or_none()
if not remote_actor:
return []
q = (
select(APRemotePost)
.where(APRemotePost.remote_actor_id == remote_actor_id)
)
if before:
q = q.where(APRemotePost.published < before)
q = q.order_by(APRemotePost.published.desc()).limit(limit)
posts = (await session.execute(q)).scalars().all()
return [
TimelineItemDTO(
id=f"remote:{p.id}",
post_type="remote",
content=p.content or "",
published=p.published,
actor_name=remote_actor.display_name or remote_actor.preferred_username,
actor_username=remote_actor.preferred_username,
object_id=p.object_id,
summary=p.summary,
url=p.url,
actor_domain=remote_actor.domain,
actor_icon=remote_actor.icon_url,
actor_url=remote_actor.actor_url,
author_inbox=remote_actor.inbox_url,
)
for p in posts
]
# -- Local posts ---------------------------------------------------------- # -- Local posts ----------------------------------------------------------
async def create_local_post( async def create_local_post(

View File

@@ -1,9 +1,9 @@
"""Inline federation publication — called at write time, not via async handler. """Inline federation publication — called at write time, not via async handler.
The originating service calls try_publish() directly, which creates the Replaces the old pattern where emit_event("post.published") → async handler →
APActivity (with process_state='pending') in the same DB transaction. publish_activity(). Now the originating service calls try_publish() directly,
The EventProcessor picks it up and the delivery wildcard handler POSTs which creates the APActivity in the same DB transaction. AP delivery
to follower inboxes. (federation.activity_created → inbox POST) stays async.
""" """
from __future__ import annotations from __future__ import annotations
@@ -49,30 +49,20 @@ async def try_publish(
if existing: if existing:
if activity_type == "Create" and existing.activity_type != "Delete": if activity_type == "Create" and existing.activity_type != "Delete":
return # already published (allow re-Create after Delete/unpublish) return # already published (allow re-Create after Delete/unpublish)
if activity_type == "Update" and existing.activity_type == "Update":
return # already updated (Ghost fires duplicate webhooks)
if activity_type == "Delete" and existing.activity_type == "Delete": if activity_type == "Delete" and existing.activity_type == "Delete":
return # already deleted return # already deleted
elif activity_type in ("Delete", "Update"): elif activity_type in ("Delete", "Update"):
return # never published, nothing to delete/update return # never published, nothing to delete/update
# Stable object ID within a publish cycle. After Delete + re-Create # Stable object ID: same source always gets the same object id so
# we append a version suffix so remote servers (Mastodon) treat it as # Mastodon treats Create/Update/Delete as the same post.
# a brand-new post rather than ignoring the tombstoned ID. domain = os.getenv("AP_DOMAIN", "rose-ash.com")
domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com") object_data["id"] = (
base_object_id = (
f"https://{domain}/users/{actor.preferred_username}" f"https://{domain}/users/{actor.preferred_username}"
f"/objects/{source_type.lower()}/{source_id}" f"/objects/{source_type.lower()}/{source_id}"
) )
if activity_type == "Create" and existing and existing.activity_type == "Delete":
# Count prior Creates to derive a version number
create_count = await services.federation.count_activities_for_source(
session, source_type, source_id, activity_type="Create",
)
object_data["id"] = f"{base_object_id}/v{create_count + 1}"
elif activity_type in ("Update", "Delete") and existing and existing.object_data:
# Use the same object ID as the most recent activity
object_data["id"] = existing.object_data.get("id", base_object_id)
else:
object_data["id"] = base_object_id
try: try:
await services.federation.publish_activity( await services.federation.publish_activity(

View File

@@ -52,23 +52,6 @@ class SqlMarketService:
) )
return [_mp_to_dto(mp) for mp in result.scalars().all()] return [_mp_to_dto(mp) for mp in result.scalars().all()]
async def list_marketplaces(
self, session: AsyncSession,
container_type: str | None = None, container_id: int | None = None,
*, page: int = 1, per_page: int = 20,
) -> tuple[list[MarketPlaceDTO], bool]:
stmt = select(MarketPlace).where(MarketPlace.deleted_at.is_(None))
if container_type is not None and container_id is not None:
stmt = stmt.where(
MarketPlace.container_type == container_type,
MarketPlace.container_id == container_id,
)
stmt = stmt.order_by(MarketPlace.name.asc())
stmt = stmt.offset((page - 1) * per_page).limit(per_page + 1)
rows = (await session.execute(stmt)).scalars().all()
has_more = len(rows) > per_page
return [_mp_to_dto(mp) for mp in rows[:per_page]], has_more
async def product_by_id(self, session: AsyncSession, product_id: int) -> ProductDTO | None: async def product_by_id(self, session: AsyncSession, product_id: int) -> ProductDTO | None:
product = ( product = (
await session.execute(select(Product).where(Product.id == product_id)) await session.execute(select(Product).where(Product.id == product_id))

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
from sqlalchemy import select, func from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import emit_activity from shared.events import emit_event
from shared.models.container_relation import ContainerRelation from shared.models.container_relation import ContainerRelation
@@ -40,19 +40,17 @@ async def attach_child(
if label is not None: if label is not None:
existing.label = label existing.label = label
await session.flush() await session.flush()
await emit_activity( await emit_event(
session, session,
activity_type="Add", event_type="container.child_attached",
actor_uri="internal:system", aggregate_type="container_relation",
object_type="rose:ContainerRelation", aggregate_id=existing.id,
object_data={ payload={
"parent_type": parent_type, "parent_type": parent_type,
"parent_id": parent_id, "parent_id": parent_id,
"child_type": child_type, "child_type": child_type,
"child_id": child_id, "child_id": child_id,
}, },
source_type="container_relation",
source_id=existing.id,
) )
return existing return existing
# Already attached and active — no-op # Already attached and active — no-op
@@ -79,19 +77,17 @@ async def attach_child(
session.add(rel) session.add(rel)
await session.flush() await session.flush()
await emit_activity( await emit_event(
session, session,
activity_type="Add", event_type="container.child_attached",
actor_uri="internal:system", aggregate_type="container_relation",
object_type="rose:ContainerRelation", aggregate_id=rel.id,
object_data={ payload={
"parent_type": parent_type, "parent_type": parent_type,
"parent_id": parent_id, "parent_id": parent_id,
"child_type": child_type, "child_type": child_type,
"child_id": child_id, "child_id": child_id,
}, },
source_type="container_relation",
source_id=rel.id,
) )
return rel return rel
@@ -143,19 +139,17 @@ async def detach_child(
rel.deleted_at = func.now() rel.deleted_at = func.now()
await session.flush() await session.flush()
await emit_activity( await emit_event(
session, session,
activity_type="Remove", event_type="container.child_detached",
actor_uri="internal:system", aggregate_type="container_relation",
object_type="rose:ContainerRelation", aggregate_id=rel.id,
object_data={ payload={
"parent_type": parent_type, "parent_type": parent_type,
"parent_id": parent_id, "parent_id": parent_id,
"child_type": child_type, "child_type": child_type,
"child_id": child_id, "child_id": child_id,
}, },
source_type="container_relation",
source_id=rel.id,
) )
return True return True

View File

@@ -140,9 +140,6 @@ class StubCalendarService:
) -> int: ) -> int:
return 0 return 0
async def upcoming_entries_for_container(self, session, container_type, container_id, *, page=1, per_page=20):
return [], False
async def entry_ids_for_content(self, session, content_type, content_id): async def entry_ids_for_content(self, session, content_type, content_id):
return set() return set()
@@ -156,13 +153,6 @@ class StubMarketService:
) -> list[MarketPlaceDTO]: ) -> list[MarketPlaceDTO]:
return [] return []
async def list_marketplaces(
self, session: AsyncSession,
container_type: str | None = None, container_id: int | None = None,
*, page: int = 1, per_page: int = 20,
) -> tuple[list[MarketPlaceDTO], bool]:
return [], False
async def product_by_id(self, session: AsyncSession, product_id: int) -> ProductDTO | None: async def product_by_id(self, session: AsyncSession, product_id: int) -> ProductDTO | None:
return None return None
@@ -227,9 +217,6 @@ class StubFederationService:
async def get_activity_for_source(self, session, source_type, source_id): async def get_activity_for_source(self, session, source_type, source_id):
return None return None
async def count_activities_for_source(self, session, source_type, source_id, *, activity_type):
return 0
async def get_followers(self, session, username): async def get_followers(self, session, username):
return [] return []
@@ -246,18 +233,12 @@ class StubFederationService:
async def search_remote_actor(self, session, acct): async def search_remote_actor(self, session, acct):
return None return None
async def search_actors(self, session, query, page=1, limit=20):
return [], 0
async def send_follow(self, session, local_username, remote_actor_url): async def send_follow(self, session, local_username, remote_actor_url):
raise RuntimeError("FederationService not available") raise RuntimeError("FederationService not available")
async def get_following(self, session, username, page=1, per_page=20): async def get_following(self, session, username, page=1, per_page=20):
return [], 0 return [], 0
async def get_followers_paginated(self, session, username, page=1, per_page=20):
return [], 0
async def accept_follow_response(self, session, local_username, remote_actor_url): async def accept_follow_response(self, session, local_username, remote_actor_url):
pass pass
@@ -279,9 +260,6 @@ class StubFederationService:
async def get_public_timeline(self, session, before=None, limit=20): async def get_public_timeline(self, session, before=None, limit=20):
return [] return []
async def get_actor_timeline(self, session, remote_actor_id, before=None, limit=20):
return []
async def create_local_post(self, session, actor_profile_id, content, visibility="public", in_reply_to=None): async def create_local_post(self, session, actor_profile_id, content, visibility="public", in_reply_to=None):
raise RuntimeError("FederationService not available") raise RuntimeError("FederationService not available")

View File

@@ -48,8 +48,8 @@ class _WidgetRegistry:
slug = w.slug slug = w.slug
def _href(s=slug): def _href(s=slug):
from shared.infrastructure.urls import blog_url from shared.infrastructure.urls import coop_url
return blog_url(f"/auth/{s}/") return coop_url(f"/auth/{s}/")
self._account_nav.append(AccountNavLink( self._account_nav.append(AccountNavLink(
label=w.label, label=w.label,

View File

@@ -145,7 +145,7 @@ async def upgrade_ots_proof(proof_bytes: bytes) -> tuple[bytes, bool]:
""" """
# OpenTimestamps upgrade is done via the `ots` CLI or their calendar API. # OpenTimestamps upgrade is done via the `ots` CLI or their calendar API.
# For now, return the proof as-is with is_confirmed=False. # For now, return the proof as-is with is_confirmed=False.
# Calendar-based upgrade polling not yet implemented. # TODO: Implement calendar-based upgrade polling.
return proof_bytes, False return proof_bytes, False