Compare commits

...

8 Commits

Author SHA1 Message Date
giles
9cdd2195df Restore all 33 deleted shared templates
Templates were incorrectly identified as dead code because individual
apps override them, but other apps still depend on the shared versions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 19:26:48 +00:00
giles
46f6ca4a0f Remove dead code: domain_event.py + 39 overridden templates
- Delete shared/models/domain_event.py (table dropped, model orphaned)
- Delete 39 shared templates that are overridden by app-local copies:
  - 8 blog overrides (blog/_action_buttons, post/_meta, etc.)
  - 27 events overrides (calendar/*, day/*, entry/*, post_entries/*)
  - 4 market overrides (market/index, browse/_oob_elements, etc.)

These shared copies were never served — Quart loads app-level
templates first, so the app-local versions always win.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 18:09:02 +00:00
giles
7de4a2e40e Remove dead shared _cart.html template
The cart app has its own override with ticket support. The shared
copy was never used and would only cause confusion.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 18:05:17 +00:00
giles
1c1ab3576f Pass cart_sid through login URL for cross-app cart adoption
When login_url() is called from a different app (e.g. cart), the
anonymous cart_sid is in that app's session cookie. Pass it as a
query parameter so the auth app can store it and use it for adoption.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 17:46:08 +00:00
giles
07aa2e2be9 Fix cart sign-in button: use plain link instead of HTMX
The login URL is cross-origin (blog app), so hx-get can't load it
into the current page. Use a regular <a href> for cross-app navigation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 17:37:16 +00:00
giles
2e9db11925 Unify domain_events + ap_activities into AP-shaped event bus
All cross-service events now flow through ap_activities with a unified
EventProcessor. Internal events use visibility="internal"; federation
activities use visibility="public" and get delivered by a wildcard handler.

- Add processing columns to APActivity (process_state, actor_uri, etc.)
- New emit_activity() / register_activity_handler() API
- EventProcessor polls ap_activities instead of domain_events
- Rewrite all handlers to accept APActivity
- Migrate all 7 emit_event call sites to emit_activity
- publish_activity() sets process_state=pending directly (no emit_event bridge)
- Migration to drop domain_events table

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 16:19:29 +00:00
giles
d697709f60 Tech debt cleanup: fix session.py, remove stale references, update docs
- db/session.py: fix indentation (2→4 space), pool_size=0 (unlimited),
  remove "ned to look at this" typo
- Remove glue.models from alembic env.py import list
- Update shared __init__.py, menu_item.py docstring, calendar_impl.py,
  handlers/__init__.py to remove glue terminology
- Remove federation_handlers.py tombstone file
- Remove TODO comments (replace with explanatory comments)
- Rewrite README.md to reflect current architecture
- Update anchoring.py TODO to plain comment

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 15:11:31 +00:00
giles
04f7c5e85c Add fediverse social features: followers/following lists, actor timelines
Adds get_followers_paginated and get_actor_timeline to FederationService
protocol + SQL implementation + stubs. Includes accumulated federation
changes: models, DTOs, delivery handler, webfinger, inline publishing,
widget nav templates, and migration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-22 13:41:58 +00:00
33 changed files with 2369 additions and 446 deletions

View File

@@ -1,6 +1,6 @@
# Shared # Shared
Shared infrastructure, models, templates, and configuration used by all four Rose Ash microservices (blog, market, cart, events). Included as a git submodule in each app. Shared infrastructure, models, contracts, services, and templates used by all five Rose Ash microservices (blog, market, cart, events, federation). Included as a git submodule in each app.
## Structure ## Structure
@@ -8,53 +8,78 @@ Shared infrastructure, models, templates, and configuration used by all four Ros
shared/ shared/
db/ db/
base.py # SQLAlchemy declarative Base base.py # SQLAlchemy declarative Base
session.py # Async session factory (get_session) session.py # Async session factory (get_session, register_db)
models/ # Shared domain models models/ # Canonical domain models
user.py # User user.py # User
magic_link.py # MagicLink (auth tokens) magic_link.py # MagicLink (auth tokens)
domain_event.py # DomainEvent (transactional outbox) (domain_event.py removed — table dropped, see migration n4l2i8j0k1)
kv.py # KeyValue (key-value store) kv.py # KeyValue (key-value store)
menu_item.py # MenuItem menu_item.py # MenuItem (deprecated — use MenuNode)
menu_node.py # MenuNode (navigation tree)
container_relation.py # ContainerRelation (parent-child content)
ghost_membership_entities.py # GhostNewsletter, UserNewsletter ghost_membership_entities.py # GhostNewsletter, UserNewsletter
federation.py # ActorProfile, APActivity, APFollower, APFollowing,
# RemoteActor, APRemotePost, APLocalPost,
# APInteraction, APNotification, APAnchor, IPFSPin
contracts/
dtos.py # Frozen dataclasses for cross-domain data transfer
protocols.py # Service protocols (Blog, Calendar, Market, Cart, Federation)
widgets.py # Widget types (NavWidget, CardWidget, AccountPageWidget)
services/
registry.py # Typed singleton: services.blog, .calendar, .market, .cart, .federation
blog_impl.py # SqlBlogService
calendar_impl.py # SqlCalendarService
market_impl.py # SqlMarketService
cart_impl.py # SqlCartService
federation_impl.py # SqlFederationService
federation_publish.py # try_publish() — inline AP publication helper
stubs.py # No-op stubs for absent domains
navigation.py # get_navigation_tree()
relationships.py # attach_child, get_children, detach_child
widget_registry.py # Widget registry singleton
widgets/ # Per-domain widget registration
infrastructure/ infrastructure/
factory.py # create_base_app() — Quart app factory factory.py # create_base_app() — Quart app factory
cart_identity.py # current_cart_identity() (user_id or session_id) cart_identity.py # current_cart_identity() (user_id or session_id)
cart_loader.py # Cart data loader for context processors cart_loader.py # Cart data loader for context processors
context.py # Jinja2 context processors context.py # Jinja2 context processors
internal_api.py # Inter-app HTTP client (get/post via httpx)
jinja_setup.py # Jinja2 template environment setup jinja_setup.py # Jinja2 template environment setup
urls.py # URL helpers (coop_url, market_url, etc.) urls.py # URL helpers (coop_url, market_url, etc.)
user_loader.py # Load current user from session user_loader.py # Load current user from session
http_utils.py # HTTP utility functions http_utils.py # HTTP utility functions
events/ events/
bus.py # emit_event(), register_handler() bus.py # emit_activity(), register_activity_handler()
processor.py # EventProcessor (polls domain_events, runs handlers) processor.py # EventProcessor (polls ap_activities, runs handlers)
browser/app/ handlers/ # Shared activity handlers
csrf.py # CSRF protection container_handlers.py # Navigation rebuild on attach/detach
errors.py # Error handlers login_handlers.py # Cart/entry adoption on login
middleware.py # Request/response middleware order_handlers.py # Order lifecycle events
redis_cacher.py # Tag-based Redis page caching ap_delivery_handler.py # AP activity delivery to follower inboxes (wildcard)
authz.py # Authorization helpers utils/
filters/ # Jinja2 template filters (currency, truncate, etc.) __init__.py
utils/ # HTMX helpers, UTC time, parsing calendar_helpers.py # Calendar period/entry utilities
payments/sumup.py # SumUp checkout API integration http_signatures.py # RSA keypair generation, HTTP signature signing/verification
browser/templates/ # ~300 Jinja2 templates shared across all apps ipfs_client.py # Async IPFS client (add_bytes, add_json, pin_cid)
config.py # YAML config loader anchoring.py # Merkle trees + OpenTimestamps Bitcoin anchoring
webfinger.py # WebFinger actor resolution
browser/
app/ # Middleware, CSRF, errors, Redis caching, authz, filters
templates/ # ~300 Jinja2 templates shared across all apps
containers.py # ContainerType, container_filter, content_filter helpers containers.py # ContainerType, container_filter, content_filter helpers
config.py # YAML config loader
log_config/setup.py # Logging configuration (JSON formatter) log_config/setup.py # Logging configuration (JSON formatter)
utils.py # host_url and other shared utilities
static/ # Shared static assets (CSS, JS, images, FontAwesome) static/ # Shared static assets (CSS, JS, images, FontAwesome)
editor/ # Koenig (Ghost) rich text editor build editor/ # Koenig (Ghost) rich text editor build
alembic/ # Database migrations (25 versions) alembic/ # Database migrations
env.py # Imports models from all apps (with try/except guards)
versions/ # Migration files — single head: j0h8e4f6g7
``` ```
## Key Patterns ## Key Patterns
- **App factory:** All apps call `create_base_app()` which sets up DB sessions, CSRF, error handling, event processing, logging, and the glue handler registry. - **App factory:** All apps call `create_base_app()` which sets up DB sessions, CSRF, error handling, event processing, logging, widget registration, and domain service wiring.
- **Event bus:** `emit_event()` writes to `domain_events` table in the caller's transaction. `EventProcessor` polls and dispatches to registered handlers. - **Service contracts:** Cross-domain communication via typed Protocols + frozen DTO dataclasses. Apps call `services.calendar.method()`, never import models from other domains.
- **Inter-app HTTP:** `internal_api.get/post("cart", "/internal/cart/summary")` for cross-app reads. URLs resolved from `app-config.yaml`. - **Service registry:** Typed singleton (`services.blog`, `.calendar`, `.market`, `.cart`, `.federation`). Apps wire their own domain + stubs for others via `register_domain_services()`.
- **Activity bus:** `emit_activity()` writes to `ap_activities` table in the caller's transaction. `EventProcessor` polls pending activities and dispatches to registered handlers. Internal events use `visibility="internal"`; federation activities use `visibility="public"` and are delivered to follower inboxes by the wildcard delivery handler.
- **Widget registry:** Domain services register widgets (nav, card, account); templates consume via `widgets.container_nav`, `widgets.container_cards`.
- **Cart identity:** `current_cart_identity()` returns `{"user_id": int|None, "session_id": str|None}` from the request session. - **Cart identity:** `current_cart_identity()` returns `{"user_id": int|None, "session_id": str|None}` from the request session.
## Alembic Migrations ## Alembic Migrations
@@ -62,8 +87,5 @@ shared/
All apps share one PostgreSQL database. Migrations are managed here and run from the blog app's entrypoint (other apps skip migrations on startup). All apps share one PostgreSQL database. Migrations are managed here and run from the blog app's entrypoint (other apps skip migrations on startup).
```bash ```bash
# From any app directory (shared/ must be on sys.path)
alembic -c shared/alembic.ini upgrade head alembic -c shared/alembic.ini upgrade head
``` ```
Current head: `j0h8e4f6g7` (drop cross-domain FK constraints).

View File

@@ -1 +1 @@
# shared package — extracted from blog/shared_lib/ # shared package — infrastructure, models, contracts, and services

View File

@@ -19,7 +19,7 @@ from shared.db.base import Base
# Import ALL models so Base.metadata sees every table # Import ALL models so Base.metadata sees every table
import shared.models # noqa: F401 User, KV, MagicLink, MenuItem, Ghost* import shared.models # noqa: F401 User, KV, MagicLink, MenuItem, Ghost*
for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models", "glue.models"): for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models"):
try: try:
__import__(_mod) __import__(_mod)
except ImportError: except ImportError:

View File

@@ -0,0 +1,138 @@
"""add fediverse social tables
Revision ID: l2j0g6h8i9
Revises: k1i9f5g7h8
Create Date: 2026-02-22
Creates:
- ap_remote_actors — cached profiles of remote actors
- ap_following — outbound follows (local → remote)
- ap_remote_posts — ingested posts from remote actors
- ap_local_posts — native posts composed in federation UI
- ap_interactions — likes and boosts
- ap_notifications — follow/like/boost/mention/reply notifications
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
revision = "l2j0g6h8i9"
down_revision = "k1i9f5g7h8"
branch_labels = None
depends_on = None
def upgrade() -> None:
# -- ap_remote_actors --
op.create_table(
"ap_remote_actors",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("actor_url", sa.String(512), unique=True, nullable=False),
sa.Column("inbox_url", sa.String(512), nullable=False),
sa.Column("shared_inbox_url", sa.String(512), nullable=True),
sa.Column("preferred_username", sa.String(255), nullable=False),
sa.Column("display_name", sa.String(255), nullable=True),
sa.Column("summary", sa.Text, nullable=True),
sa.Column("icon_url", sa.String(512), nullable=True),
sa.Column("public_key_pem", sa.Text, nullable=True),
sa.Column("domain", sa.String(255), nullable=False),
sa.Column("fetched_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_index("ix_ap_remote_actor_url", "ap_remote_actors", ["actor_url"], unique=True)
op.create_index("ix_ap_remote_actor_domain", "ap_remote_actors", ["domain"])
# -- ap_following --
op.create_table(
"ap_following",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False),
sa.Column("remote_actor_id", sa.Integer, sa.ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False),
sa.Column("state", sa.String(20), nullable=False, server_default="pending"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("accepted_at", sa.DateTime(timezone=True), nullable=True),
sa.UniqueConstraint("actor_profile_id", "remote_actor_id", name="uq_following"),
)
op.create_index("ix_ap_following_actor", "ap_following", ["actor_profile_id"])
op.create_index("ix_ap_following_remote", "ap_following", ["remote_actor_id"])
# -- ap_remote_posts --
op.create_table(
"ap_remote_posts",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("remote_actor_id", sa.Integer, sa.ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False),
sa.Column("activity_id", sa.String(512), unique=True, nullable=False),
sa.Column("object_id", sa.String(512), unique=True, nullable=False),
sa.Column("object_type", sa.String(64), nullable=False, server_default="Note"),
sa.Column("content", sa.Text, nullable=True),
sa.Column("summary", sa.Text, nullable=True),
sa.Column("url", sa.String(512), nullable=True),
sa.Column("attachment_data", JSONB, nullable=True),
sa.Column("tag_data", JSONB, nullable=True),
sa.Column("in_reply_to", sa.String(512), nullable=True),
sa.Column("conversation", sa.String(512), nullable=True),
sa.Column("published", sa.DateTime(timezone=True), nullable=True),
sa.Column("fetched_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_index("ix_ap_remote_post_actor", "ap_remote_posts", ["remote_actor_id"])
op.create_index("ix_ap_remote_post_published", "ap_remote_posts", ["published"])
op.create_index("ix_ap_remote_post_object", "ap_remote_posts", ["object_id"], unique=True)
# -- ap_local_posts --
op.create_table(
"ap_local_posts",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False),
sa.Column("content", sa.Text, nullable=False),
sa.Column("visibility", sa.String(20), nullable=False, server_default="public"),
sa.Column("in_reply_to", sa.String(512), nullable=True),
sa.Column("published", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_index("ix_ap_local_post_actor", "ap_local_posts", ["actor_profile_id"])
op.create_index("ix_ap_local_post_published", "ap_local_posts", ["published"])
# -- ap_interactions --
op.create_table(
"ap_interactions",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True),
sa.Column("remote_actor_id", sa.Integer, sa.ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=True),
sa.Column("post_type", sa.String(20), nullable=False),
sa.Column("post_id", sa.Integer, nullable=False),
sa.Column("interaction_type", sa.String(20), nullable=False),
sa.Column("activity_id", sa.String(512), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_index("ix_ap_interaction_post", "ap_interactions", ["post_type", "post_id"])
op.create_index("ix_ap_interaction_actor", "ap_interactions", ["actor_profile_id"])
op.create_index("ix_ap_interaction_remote", "ap_interactions", ["remote_actor_id"])
# -- ap_notifications --
op.create_table(
"ap_notifications",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False),
sa.Column("notification_type", sa.String(20), nullable=False),
sa.Column("from_remote_actor_id", sa.Integer, sa.ForeignKey("ap_remote_actors.id", ondelete="SET NULL"), nullable=True),
sa.Column("from_actor_profile_id", sa.Integer, sa.ForeignKey("ap_actor_profiles.id", ondelete="SET NULL"), nullable=True),
sa.Column("target_activity_id", sa.Integer, sa.ForeignKey("ap_activities.id", ondelete="SET NULL"), nullable=True),
sa.Column("target_remote_post_id", sa.Integer, sa.ForeignKey("ap_remote_posts.id", ondelete="SET NULL"), nullable=True),
sa.Column("read", sa.Boolean, nullable=False, server_default="false"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False),
)
op.create_index("ix_ap_notification_actor", "ap_notifications", ["actor_profile_id"])
op.create_index("ix_ap_notification_read", "ap_notifications", ["actor_profile_id", "read"])
op.create_index("ix_ap_notification_created", "ap_notifications", ["created_at"])
def downgrade() -> None:
op.drop_table("ap_notifications")
op.drop_table("ap_interactions")
op.drop_table("ap_local_posts")
op.drop_table("ap_remote_posts")
op.drop_table("ap_following")
op.drop_table("ap_remote_actors")

View File

@@ -0,0 +1,113 @@
"""add unified event bus columns to ap_activities
Revision ID: m3k1h7i9j0
Revises: l2j0g6h8i9
Create Date: 2026-02-22
Adds processing and visibility columns so ap_activities can serve as the
unified event bus for both internal domain events and federation delivery.
"""
revision = "m3k1h7i9j0"
down_revision = "l2j0g6h8i9"
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
# Add new columns with defaults so existing rows stay valid
op.add_column(
"ap_activities",
sa.Column("actor_uri", sa.String(512), nullable=True),
)
op.add_column(
"ap_activities",
sa.Column(
"visibility", sa.String(20),
nullable=False, server_default="public",
),
)
op.add_column(
"ap_activities",
sa.Column(
"process_state", sa.String(20),
nullable=False, server_default="completed",
),
)
op.add_column(
"ap_activities",
sa.Column(
"process_attempts", sa.Integer(),
nullable=False, server_default="0",
),
)
op.add_column(
"ap_activities",
sa.Column(
"process_max_attempts", sa.Integer(),
nullable=False, server_default="5",
),
)
op.add_column(
"ap_activities",
sa.Column("process_error", sa.Text(), nullable=True),
)
op.add_column(
"ap_activities",
sa.Column(
"processed_at", sa.DateTime(timezone=True), nullable=True,
),
)
# Backfill actor_uri from the related actor_profile
op.execute(
"""
UPDATE ap_activities a
SET actor_uri = CONCAT(
'https://',
COALESCE(current_setting('app.ap_domain', true), 'rose-ash.com'),
'/users/',
p.preferred_username
)
FROM ap_actor_profiles p
WHERE a.actor_profile_id = p.id
AND a.actor_uri IS NULL
"""
)
# Make actor_profile_id nullable (internal events have no actor profile)
op.alter_column(
"ap_activities", "actor_profile_id",
existing_type=sa.Integer(),
nullable=True,
)
# Index for processor polling
op.create_index(
"ix_ap_activity_process", "ap_activities", ["process_state"],
)
def downgrade() -> None:
op.drop_index("ix_ap_activity_process", table_name="ap_activities")
# Restore actor_profile_id NOT NULL (remove any rows without it first)
op.execute(
"DELETE FROM ap_activities WHERE actor_profile_id IS NULL"
)
op.alter_column(
"ap_activities", "actor_profile_id",
existing_type=sa.Integer(),
nullable=False,
)
op.drop_column("ap_activities", "processed_at")
op.drop_column("ap_activities", "process_error")
op.drop_column("ap_activities", "process_max_attempts")
op.drop_column("ap_activities", "process_attempts")
op.drop_column("ap_activities", "process_state")
op.drop_column("ap_activities", "visibility")
op.drop_column("ap_activities", "actor_uri")

View File

@@ -0,0 +1,46 @@
"""drop domain_events table
Revision ID: n4l2i8j0k1
Revises: m3k1h7i9j0
Create Date: 2026-02-22
The domain_events table is no longer used — all events now flow through
ap_activities with the unified activity bus.
"""
revision = "n4l2i8j0k1"
down_revision = "m3k1h7i9j0"
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
def upgrade() -> None:
op.drop_index("ix_domain_events_state", table_name="domain_events")
op.drop_index("ix_domain_events_event_type", table_name="domain_events")
op.drop_table("domain_events")
def downgrade() -> None:
op.create_table(
"domain_events",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
sa.Column("event_type", sa.String(128), nullable=False),
sa.Column("aggregate_type", sa.String(64), nullable=False),
sa.Column("aggregate_id", sa.Integer(), nullable=False),
sa.Column("payload", JSONB(), nullable=True),
sa.Column("state", sa.String(20), nullable=False, server_default="pending"),
sa.Column("attempts", sa.Integer(), nullable=False, server_default="0"),
sa.Column("max_attempts", sa.Integer(), nullable=False, server_default="5"),
sa.Column("last_error", sa.Text(), nullable=True),
sa.Column(
"created_at", sa.DateTime(timezone=True),
nullable=False, server_default=sa.func.now(),
),
sa.Column("processed_at", sa.DateTime(timezone=True), nullable=True),
)
op.create_index("ix_domain_events_event_type", "domain_events", ["event_type"])
op.create_index("ix_domain_events_state", "domain_events", ["state"])

View File

@@ -1,169 +0,0 @@
{% macro show_cart(oob=False) %}
<div id="cart" {% if oob %} hx-swap-oob="{{oob}}" {% endif%}>
{# Empty cart #}
{% if not cart and not calendar_cart_entries %}
<div class="rounded-2xl border border-dashed border-stone-300 bg-white/80 p-6 sm:p-8 text-center">
<div class="inline-flex h-10 w-10 sm:h-12 sm:w-12 items-center justify-center rounded-full bg-stone-100 mb-3">
<i class="fa fa-shopping-cart text-stone-500 text-sm sm:text-base" aria-hidden="true"></i>
</div>
<p class="text-base sm:text-lg font-medium text-stone-800">
Your cart is empty
</p>
{#
<p class="mt-1 text-xs sm:text-sm text-stone-600">
Add some items from the shop to see them here.
</p>
<div class="mt-4">
<a
href="{{ market_url('/') }}"
class="inline-flex items-center px-4 py-2 text-sm font-semibold rounded-full bg-emerald-600 text-white hover:bg-emerald-700"
>
Browse products
</a>
</div> #}
</div>
{% else %}
<div _class="grid gap-y-6 lg:gap-8 lg:grid-cols-[minmax(0,2fr),minmax(0,1fr)]">
{# Items list #}
<section class="space-y-3 sm:space-y-4">
{% for item in cart %}
{% from '_types/product/_cart.html' import cart_item with context %}
{{ cart_item()}}
{% endfor %}
{% if calendar_cart_entries %}
<div class="mt-6 border-t border-stone-200 pt-4">
<h2 class="text-base font-semibold mb-2">
Calendar bookings
</h2>
<ul class="space-y-2">
{% for entry in calendar_cart_entries %}
<li class="flex items-start justify-between text-sm">
<div>
<div class="font-medium">
{{ entry.name or entry.calendar_name }}
</div>
<div class="text-xs text-stone-500">
{{ entry.start_at }}
{% if entry.end_at %}
{{ entry.end_at }}
{% endif %}
</div>
</div>
<div class="ml-4 font-medium">
£{{ "%.2f"|format(entry.cost or 0) }}
</div>
</li>
{% endfor %}
</ul>
</div>
{% endif %}
</section>
{{summary(cart, total, calendar_total, calendar_cart_entries,)}}
</div>
{% endif %}
</div>
{% endmacro %}
{% macro summary(cart, total, calendar_total, calendar_cart_entries, oob=False) %}
<aside id="cart-summary" class="lg:pl-2" {% if oob %} hx-swap-oob="{{oob}}" {% endif %}>
<div class="rounded-2xl bg-white shadow-sm border border-stone-200 p-4 sm:p-5">
<h2 class="text-sm sm:text-base font-semibold text-stone-900 mb-3 sm:mb-4">
Order summary
</h2>
<dl class="space-y-2 text-xs sm:text-sm">
<div class="flex items-center justify-between">
<dt class="text-stone-600">Items</dt>
<dd class="text-stone-900">
{{ cart | sum(attribute="quantity") }}
</dd>
</div>
<div class="flex items-center justify-between">
<dt class="text-stone-600">Subtotal</dt>
<dd class="text-stone-900">
{{ cart_grand_total(cart, total, calendar_total, calendar_cart_entries ) }}
</dd>
</div>
</dl>
<div class="flex flex-col items-center w-full">
<h1 class="text-5xl mt-2">
This is a test - it will not take actual money
</h1>
<div>
use dummy card number: 5555 5555 5555 4444
</div>
</div>
<div class="mt-4 sm:mt-5">
{% if g.user %}
<form
method="post"
action="{{ page_cart_url(page_post.slug, '/checkout/') if page_post is defined and page_post else cart_url('/checkout/') }}"
class="w-full"
>
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button
type="submit"
class="w-full inline-flex items-center justify-center px-4 py-2 text-xs sm:text-sm rounded-full border border-emerald-600 bg-emerald-600 text-white hover:bg-emerald-700 transition"
>
<i class="fa-solid fa-credit-card mr-2" aria-hidden="true"></i>
Checkout as {{g.user.email}}
</button>
</form>
{% else %}
{% set href=login_url(request.url) %}
<div
class="w-full flex"
>
<a
href="{{ href }}"
hx-get="{{ href }}"
hx-target="#main-panel"
hx-select ="{{hx_select_search}}"
hx-swap="outerHTML"
hx-push-url="true"
aria-selected="{{ 'true' if local_href == request.path else 'false' }}"
class="w-full cursor-pointer flex flex-row items-center justify-center p-3 gap-2 rounded bg-stone-200 text-black {{select_colours}}"
data-close-details
>
<i class="fa-solid fa-key"></i>
<span>sign in or register to checkout</span>
</a>
</div>
{% endif %}
</div>
</div>
</aside>
{% endmacro %}
{% macro cart_total(cart, total) %}
{% set cart_total = total(cart) %}
{% if cart_total %}
{% set symbol = "£" if cart[0].product.regular_price_currency == "GBP" else cart[0].product.regular_price_currency %}
{{ symbol }}{{ "%.2f"|format(cart_total) }}
{% else %}
{% endif %}
{% endmacro %}
{% macro cart_grand_total(cart, total, calendar_total, calendar_cart_entries) %}
{% set product_total = total(cart) or 0 %}
{% set cal_total = calendar_total(calendar_cart_entries) or 0 %}
{% set grand = product_total + cal_total %}
{% if cart and cart[0].product.regular_price_currency %}
{% set symbol = "£" if cart[0].product.regular_price_currency == "GBP" else cart[0].product.regular_price_currency %}
{% else %}
{% set symbol = "£" %}
{% endif %}
{{ symbol }}{{ "%.2f"|format(grand) }}
{% endmacro %}

View File

@@ -25,6 +25,15 @@
{% endcall %} {% endcall %}
</div> </div>
{# Container nav widgets (market links, etc.) #}
{% if container_nav_widgets %}
{% for wdata in container_nav_widgets %}
{% with ctx=wdata.ctx %}
{% include wdata.widget.template with context %}
{% endwith %}
{% endfor %}
{% endif %}
{# Admin link #} {# Admin link #}
{% if g.rights.admin %} {% if g.rights.admin %}
{% from 'macros/admin_nav.html' import admin_nav_item %} {% from 'macros/admin_nav.html' import admin_nav_item %}

View File

@@ -22,6 +22,14 @@
{% endcall %} {% endcall %}
</div> </div>
{% if container_nav_widgets %}
{% for wdata in container_nav_widgets %}
{% with ctx=wdata.ctx %}
{% include wdata.widget.template with context %}
{% endwith %}
{% endfor %}
{% endif %}
{# Admin link #} {# Admin link #}
{% if g.rights.admin %} {% if g.rights.admin %}

View File

@@ -187,3 +187,68 @@ class APAnchorDTO:
ots_proof_cid: str | None = None ots_proof_cid: str | None = None
confirmed_at: datetime | None = None confirmed_at: datetime | None = None
bitcoin_txid: str | None = None bitcoin_txid: str | None = None
@dataclass(frozen=True, slots=True)
class RemoteActorDTO:
id: int
actor_url: str
inbox_url: str
preferred_username: str
domain: str
display_name: str | None = None
summary: str | None = None
icon_url: str | None = None
shared_inbox_url: str | None = None
public_key_pem: str | None = None
@dataclass(frozen=True, slots=True)
class RemotePostDTO:
id: int
remote_actor_id: int
object_id: str
content: str
summary: str | None = None
url: str | None = None
attachments: list[dict] = field(default_factory=list)
tags: list[dict] = field(default_factory=list)
published: datetime | None = None
actor: RemoteActorDTO | None = None
@dataclass(frozen=True, slots=True)
class TimelineItemDTO:
id: str # composite key for cursor pagination
post_type: str # "local" | "remote" | "boost"
content: str # HTML
published: datetime
actor_name: str
actor_username: str
object_id: str | None = None
summary: str | None = None
url: str | None = None
attachments: list[dict] = field(default_factory=list)
tags: list[dict] = field(default_factory=list)
actor_domain: str | None = None # None = local
actor_icon: str | None = None
actor_url: str | None = None
boosted_by: str | None = None
like_count: int = 0
boost_count: int = 0
liked_by_me: bool = False
boosted_by_me: bool = False
author_inbox: str | None = None
@dataclass(frozen=True, slots=True)
class NotificationDTO:
id: int
notification_type: str # follow/like/boost/mention/reply
from_actor_name: str
from_actor_username: str
created_at: datetime
read: bool
from_actor_domain: str | None = None
from_actor_icon: str | None = None
target_content_preview: str | None = None

View File

@@ -22,6 +22,10 @@ from .dtos import (
ActorProfileDTO, ActorProfileDTO,
APActivityDTO, APActivityDTO,
APFollowerDTO, APFollowerDTO,
RemoteActorDTO,
RemotePostDTO,
TimelineItemDTO,
NotificationDTO,
) )
@@ -217,6 +221,11 @@ class FederationService(Protocol):
self, session: AsyncSession, username: str, self, session: AsyncSession, username: str,
) -> list[APFollowerDTO]: ... ) -> list[APFollowerDTO]: ...
async def get_followers_paginated(
self, session: AsyncSession, username: str,
page: int = 1, per_page: int = 20,
) -> tuple[list[RemoteActorDTO], int]: ...
async def add_follower( async def add_follower(
self, session: AsyncSession, username: str, self, session: AsyncSession, username: str,
follower_acct: str, follower_inbox: str, follower_actor_url: str, follower_acct: str, follower_inbox: str, follower_actor_url: str,
@@ -227,5 +236,108 @@ class FederationService(Protocol):
self, session: AsyncSession, username: str, follower_acct: str, self, session: AsyncSession, username: str, follower_acct: str,
) -> bool: ... ) -> bool: ...
# -- Remote actors --------------------------------------------------------
async def get_or_fetch_remote_actor(
self, session: AsyncSession, actor_url: str,
) -> RemoteActorDTO | None: ...
async def search_remote_actor(
self, session: AsyncSession, acct: str,
) -> RemoteActorDTO | None: ...
# -- Following (outbound) -------------------------------------------------
async def send_follow(
self, session: AsyncSession, local_username: str, remote_actor_url: str,
) -> None: ...
async def get_following(
self, session: AsyncSession, username: str,
page: int = 1, per_page: int = 20,
) -> tuple[list[RemoteActorDTO], int]: ...
async def accept_follow_response(
self, session: AsyncSession, local_username: str, remote_actor_url: str,
) -> None: ...
async def unfollow(
self, session: AsyncSession, local_username: str, remote_actor_url: str,
) -> None: ...
# -- Remote posts ---------------------------------------------------------
async def ingest_remote_post(
self, session: AsyncSession, remote_actor_id: int,
activity_json: dict, object_json: dict,
) -> None: ...
async def delete_remote_post(
self, session: AsyncSession, object_id: str,
) -> None: ...
async def get_remote_post(
self, session: AsyncSession, object_id: str,
) -> RemotePostDTO | None: ...
# -- Timelines ------------------------------------------------------------
async def get_home_timeline(
self, session: AsyncSession, actor_profile_id: int,
before: datetime | None = None, limit: int = 20,
) -> list[TimelineItemDTO]: ...
async def get_public_timeline(
self, session: AsyncSession,
before: datetime | None = None, limit: int = 20,
) -> list[TimelineItemDTO]: ...
async def get_actor_timeline(
self, session: AsyncSession, remote_actor_id: int,
before: datetime | None = None, limit: int = 20,
) -> list[TimelineItemDTO]: ...
# -- Local posts ----------------------------------------------------------
async def create_local_post(
self, session: AsyncSession, actor_profile_id: int,
content: str, visibility: str = "public",
in_reply_to: str | None = None,
) -> int: ...
async def delete_local_post(
self, session: AsyncSession, actor_profile_id: int, post_id: int,
) -> None: ...
# -- Interactions ---------------------------------------------------------
async def like_post(
self, session: AsyncSession, actor_profile_id: int,
object_id: str, author_inbox: str,
) -> None: ...
async def unlike_post(
self, session: AsyncSession, actor_profile_id: int,
object_id: str, author_inbox: str,
) -> None: ...
async def boost_post(
self, session: AsyncSession, actor_profile_id: int,
object_id: str, author_inbox: str,
) -> None: ...
async def unboost_post(
self, session: AsyncSession, actor_profile_id: int,
object_id: str, author_inbox: str,
) -> None: ...
# -- Notifications --------------------------------------------------------
async def get_notifications(
self, session: AsyncSession, actor_profile_id: int,
before: datetime | None = None, limit: int = 20,
) -> list[NotificationDTO]: ...
async def unread_notification_count(
self, session: AsyncSession, actor_profile_id: int,
) -> int: ...
async def mark_notifications_read(
self, session: AsyncSession, actor_profile_id: int,
) -> None: ...
# -- Stats ---------------------------------------------------------------- # -- Stats ----------------------------------------------------------------
async def get_stats(self, session: AsyncSession) -> dict: ... async def get_stats(self, session: AsyncSession) -> dict: ...

View File

@@ -15,7 +15,7 @@ _engine = create_async_engine(
future=True, future=True,
echo=False, echo=False,
pool_pre_ping=True, pool_pre_ping=True,
pool_size=-1 # ned to look at this!!! pool_size=0, # 0 = unlimited (NullPool equivalent for asyncpg)
) )
_Session = async_sessionmaker( _Session = async_sessionmaker(
@@ -34,43 +34,42 @@ async def get_session():
await sess.close() await sess.close()
def register_db(app: Quart): def register_db(app: Quart):
@app.before_request @app.before_request
async def open_session(): async def open_session():
g.s = _Session() g.s = _Session()
g.tx = await g.s.begin() g.tx = await g.s.begin()
g.had_error = False g.had_error = False
@app.after_request @app.after_request
async def maybe_commit(response): async def maybe_commit(response):
# Runs BEFORE bytes are sent. # Runs BEFORE bytes are sent.
if not g.had_error and 200 <= response.status_code < 400: if not g.had_error and 200 <= response.status_code < 400:
try: try:
if hasattr(g, "tx"): if hasattr(g, "tx"):
await g.tx.commit() await g.tx.commit()
except Exception as e: except Exception as e:
print(f'commit failed {e}') print(f'commit failed {e}')
if hasattr(g, "tx"): if hasattr(g, "tx"):
await g.tx.rollback() await g.tx.rollback()
from quart import make_response from quart import make_response
return await make_response("Commit failed", 500) return await make_response("Commit failed", 500)
return response return response
@app.teardown_request @app.teardown_request
async def finish(exc): async def finish(exc):
try: try:
# If an exception occurred OR we didn't commit (still in txn), roll back. # If an exception occurred OR we didn't commit (still in txn), roll back.
if hasattr(g, "s"): if hasattr(g, "s"):
if exc is not None or g.s.in_transaction(): if exc is not None or g.s.in_transaction():
if hasattr(g, "tx"): if hasattr(g, "tx"):
await g.tx.rollback() await g.tx.rollback()
finally: finally:
if hasattr(g, "s"): if hasattr(g, "s"):
await g.s.close() await g.s.close()
@app.errorhandler(Exception) @app.errorhandler(Exception)
async def mark_error(e): async def mark_error(e):
g.had_error = True g.had_error = True
raise raise

View File

@@ -1,4 +1,9 @@
from .bus import emit_event, register_handler from .bus import emit_activity, register_activity_handler, get_activity_handlers
from .processor import EventProcessor from .processor import EventProcessor
__all__ = ["emit_event", "register_handler", "EventProcessor"] __all__ = [
"emit_activity",
"register_activity_handler",
"get_activity_handlers",
"EventProcessor",
]

View File

@@ -1,56 +1,109 @@
""" """
Transactional outbox event bus. Unified activity bus.
emit_event() writes to the domain_events table within the caller's existing emit_activity() writes an APActivity row with process_state='pending' within
DB transaction — atomic with whatever domain change triggered the event. the caller's existing DB transaction — atomic with the domain change.
register_handler() registers async handler functions that the EventProcessor register_activity_handler() registers async handler functions that the
will call when processing events of a given type. EventProcessor dispatches when processing pending activities.
""" """
from __future__ import annotations from __future__ import annotations
import uuid
from collections import defaultdict from collections import defaultdict
from typing import Any, Awaitable, Callable, Dict, List from typing import Awaitable, Callable, Dict, List, Tuple
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.domain_event import DomainEvent from shared.models.federation import APActivity
# handler signature: async def handler(event: DomainEvent, session: AsyncSession) -> None # ---------------------------------------------------------------------------
HandlerFn = Callable[[DomainEvent, AsyncSession], Awaitable[None]] # Activity-handler registry
# ---------------------------------------------------------------------------
# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None
ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]]
_handlers: Dict[str, List[HandlerFn]] = defaultdict(list) # Keyed by (activity_type, object_type). object_type="*" is wildcard.
_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list)
async def emit_event( def register_activity_handler(
activity_type: str,
fn: ActivityHandlerFn,
*,
object_type: str | None = None,
) -> None:
"""Register an async handler for an activity type + optional object type.
Use ``activity_type="*"`` as a wildcard that fires for every activity
(e.g. federation delivery handler).
"""
key = (activity_type, object_type or "*")
_activity_handlers[key].append(fn)
def get_activity_handlers(
activity_type: str,
object_type: str | None = None,
) -> List[ActivityHandlerFn]:
"""Return all matching handlers for an activity.
Matches in order:
1. Exact (activity_type, object_type)
2. (activity_type, "*") — type-level wildcard
3. ("*", "*") — global wildcard (e.g. delivery)
"""
handlers: List[ActivityHandlerFn] = []
ot = object_type or "*"
# Exact match
if ot != "*":
handlers.extend(_activity_handlers.get((activity_type, ot), []))
# Type-level wildcard
handlers.extend(_activity_handlers.get((activity_type, "*"), []))
# Global wildcard
if activity_type != "*":
handlers.extend(_activity_handlers.get(("*", "*"), []))
return handlers
# ---------------------------------------------------------------------------
# emit_activity — the primary way to emit events
# ---------------------------------------------------------------------------
async def emit_activity(
session: AsyncSession, session: AsyncSession,
event_type: str, *,
aggregate_type: str, activity_type: str,
aggregate_id: int, actor_uri: str,
payload: Dict[str, Any] | None = None, object_type: str,
) -> DomainEvent: object_data: dict | None = None,
source_type: str | None = None,
source_id: int | None = None,
visibility: str = "internal",
actor_profile_id: int | None = None,
) -> APActivity:
""" """
Write a domain event to the outbox table in the current transaction. Write an AP-shaped activity to ap_activities with process_state='pending'.
Call this inside your service function, using the same session that Called inside a service function using the same session that performs the
performs the domain change. The event and the change commit together. domain change. The activity and the change commit together.
""" """
event = DomainEvent( activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
event_type=event_type,
aggregate_type=aggregate_type, activity = APActivity(
aggregate_id=aggregate_id, activity_id=activity_uri,
payload=payload or {}, activity_type=activity_type,
actor_profile_id=actor_profile_id,
actor_uri=actor_uri,
object_type=object_type,
object_data=object_data or {},
is_local=True,
source_type=source_type,
source_id=source_id,
visibility=visibility,
process_state="pending",
) )
session.add(event) session.add(activity)
await session.flush() # assign event.id await session.flush()
return event return activity
def register_handler(event_type: str, fn: HandlerFn) -> None:
"""Register an async handler for a given event type."""
_handlers[event_type].append(fn)
def get_handlers(event_type: str) -> List[HandlerFn]:
"""Return all registered handlers for an event type."""
return _handlers.get(event_type, [])

View File

@@ -1,4 +1,4 @@
"""Shared event handlers (replaces glue.setup.register_glue_handlers).""" """Shared event handlers."""
def register_shared_handlers(): def register_shared_handlers():
@@ -6,5 +6,4 @@ def register_shared_handlers():
import shared.events.handlers.container_handlers # noqa: F401 import shared.events.handlers.container_handlers # noqa: F401
import shared.events.handlers.login_handlers # noqa: F401 import shared.events.handlers.login_handlers # noqa: F401
import shared.events.handlers.order_handlers # noqa: F401 import shared.events.handlers.order_handlers # noqa: F401
# federation_handlers removed — publication is now inline at write sites
import shared.events.handlers.ap_delivery_handler # noqa: F401 import shared.events.handlers.ap_delivery_handler # noqa: F401

View File

@@ -1,7 +1,7 @@
"""Deliver AP activities to remote followers. """Deliver AP activities to remote followers.
On ``federation.activity_created`` → load activity + actor + followers → Registered as a wildcard handler — fires for every activity. Skips
sign with HTTP Signatures → POST to each follower inbox. non-public activities and those without an actor profile.
""" """
from __future__ import annotations from __future__ import annotations
@@ -11,7 +11,7 @@ import httpx
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events.bus import register_handler, DomainEvent from shared.events.bus import register_activity_handler
from shared.models.federation import ActorProfile, APActivity, APFollower from shared.models.federation import ActorProfile, APActivity, APFollower
from shared.services.registry import services from shared.services.registry import services
@@ -33,12 +33,10 @@ def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str)
object_id = activity.activity_id + "/object" object_id = activity.activity_id + "/object"
if activity.activity_type == "Delete": if activity.activity_type == "Delete":
# Delete: object is a Tombstone with just id + type
obj.setdefault("id", object_id) obj.setdefault("id", object_id)
obj.setdefault("type", "Tombstone") obj.setdefault("type", "Tombstone")
else: else:
# Create/Update: full object with attribution obj.setdefault("id", object_id)
obj["id"] = object_id
obj.setdefault("type", activity.object_type) obj.setdefault("type", activity.object_type)
obj.setdefault("attributedTo", actor_url) obj.setdefault("attributedTo", actor_url)
obj.setdefault("published", activity.published.isoformat() if activity.published else None) obj.setdefault("published", activity.published.isoformat() if activity.published else None)
@@ -104,30 +102,20 @@ async def _deliver_to_inbox(
return False return False
async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None: async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
"""Deliver a newly created activity to all followers.""" """Deliver a public activity to all followers of its actor."""
import os import os
# Only deliver public activities that have an actor profile
if activity.visibility != "public":
return
if activity.actor_profile_id is None:
return
if not services.has("federation"): if not services.has("federation"):
return return
payload = event.payload
activity_id_uri = payload.get("activity_id")
if not activity_id_uri:
return
domain = os.getenv("AP_DOMAIN", "rose-ash.com") domain = os.getenv("AP_DOMAIN", "rose-ash.com")
# Load the activity
activity = (
await session.execute(
select(APActivity).where(APActivity.activity_id == activity_id_uri)
)
).scalar_one_or_none()
if not activity:
log.warning("Activity not found: %s", activity_id_uri)
return
# Load actor with private key # Load actor with private key
actor = ( actor = (
await session.execute( await session.execute(
@@ -135,7 +123,7 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None
) )
).scalar_one_or_none() ).scalar_one_or_none()
if not actor or not actor.private_key_pem: if not actor or not actor.private_key_pem:
log.warning("Actor not found or missing key for activity %s", activity_id_uri) log.warning("Actor not found or missing key for activity %s", activity.activity_id)
return return
# Load followers # Load followers
@@ -146,14 +134,13 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None
).scalars().all() ).scalars().all()
if not followers: if not followers:
log.debug("No followers to deliver to for %s", activity_id_uri) log.debug("No followers to deliver to for %s", activity.activity_id)
return return
# Build activity JSON # Build activity JSON
activity_json = _build_activity_json(activity, actor, domain) activity_json = _build_activity_json(activity, actor, domain)
# Deliver to each follower inbox # Deduplicate inboxes
# Deduplicate inboxes (multiple followers might share a shared inbox)
inboxes = {f.follower_inbox for f in followers if f.follower_inbox} inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
log.info( log.info(
@@ -166,4 +153,5 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None
await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain) await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain)
register_handler("federation.activity_created", on_activity_created) # Wildcard: fires for every activity
register_activity_handler("*", on_any_activity)

View File

@@ -2,18 +2,18 @@ from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import register_handler from shared.events import register_activity_handler
from shared.models.domain_event import DomainEvent from shared.models.federation import APActivity
from shared.services.navigation import rebuild_navigation from shared.services.navigation import rebuild_navigation
async def on_child_attached(event: DomainEvent, session: AsyncSession) -> None: async def on_child_attached(activity: APActivity, session: AsyncSession) -> None:
await rebuild_navigation(session) await rebuild_navigation(session)
async def on_child_detached(event: DomainEvent, session: AsyncSession) -> None: async def on_child_detached(activity: APActivity, session: AsyncSession) -> None:
await rebuild_navigation(session) await rebuild_navigation(session)
register_handler("container.child_attached", on_child_attached) register_activity_handler("Add", on_child_attached, object_type="rose:ContainerRelation")
register_handler("container.child_detached", on_child_detached) register_activity_handler("Remove", on_child_detached, object_type="rose:ContainerRelation")

View File

@@ -1,8 +0,0 @@
"""Federation event handlers — REMOVED.
Federation publication is now inline at the write site (ghost_sync, entries,
market routes) via shared.services.federation_publish.try_publish().
AP delivery (federation.activity_created → inbox POST) remains async via
ap_delivery_handler.
"""

View File

@@ -2,24 +2,22 @@ from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import register_handler from shared.events import register_activity_handler
from shared.models.domain_event import DomainEvent from shared.models.federation import APActivity
from shared.services.registry import services from shared.services.registry import services
async def on_user_logged_in(event: DomainEvent, session: AsyncSession) -> None: async def on_user_logged_in(activity: APActivity, session: AsyncSession) -> None:
payload = event.payload data = activity.object_data
user_id = payload["user_id"] user_id = data["user_id"]
session_id = payload["session_id"] session_id = data["session_id"]
# Adopt cart items (if cart service is registered)
if services.has("cart"): if services.has("cart"):
await services.cart.adopt_cart_for_user(session, user_id, session_id) await services.cart.adopt_cart_for_user(session, user_id, session_id)
# Adopt calendar entries and tickets (if calendar service is registered)
if services.has("calendar"): if services.has("calendar"):
await services.calendar.adopt_entries_for_user(session, user_id, session_id) await services.calendar.adopt_entries_for_user(session, user_id, session_id)
await services.calendar.adopt_tickets_for_user(session, user_id, session_id) await services.calendar.adopt_tickets_for_user(session, user_id, session_id)
register_handler("user.logged_in", on_user_logged_in) register_activity_handler("rose:Login", on_user_logged_in)

View File

@@ -4,19 +4,19 @@ import logging
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import register_handler from shared.events import register_activity_handler
from shared.models.domain_event import DomainEvent from shared.models.federation import APActivity
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def on_order_created(event: DomainEvent, session: AsyncSession) -> None: async def on_order_created(activity: APActivity, session: AsyncSession) -> None:
log.info("order.created: order_id=%s", event.payload.get("order_id")) log.info("order.created: order_id=%s", activity.object_data.get("order_id"))
async def on_order_paid(event: DomainEvent, session: AsyncSession) -> None: async def on_order_paid(activity: APActivity, session: AsyncSession) -> None:
log.info("order.paid: order_id=%s", event.payload.get("order_id")) log.info("order.paid: order_id=%s", activity.object_data.get("order_id"))
register_handler("order.created", on_order_created) register_activity_handler("Create", on_order_created, object_type="rose:Order")
register_handler("order.paid", on_order_paid) register_activity_handler("rose:OrderPaid", on_order_paid)

View File

@@ -1,6 +1,6 @@
""" """
Event processor — polls the domain_events outbox table and dispatches Event processor — polls the ap_activities table and dispatches to registered
to registered handlers. activity handlers.
Runs as an asyncio background task within each app process. Runs as an asyncio background task within each app process.
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing. Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
@@ -11,16 +11,16 @@ import asyncio
import traceback import traceback
from datetime import datetime, timezone from datetime import datetime, timezone
from sqlalchemy import select, update from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.db.session import get_session from shared.db.session import get_session
from shared.models.domain_event import DomainEvent from shared.models.federation import APActivity
from .bus import get_handlers from .bus import get_activity_handlers
class EventProcessor: class EventProcessor:
"""Background event processor that polls the outbox table.""" """Background event processor that polls the ap_activities table."""
def __init__( def __init__(
self, self,
@@ -64,54 +64,52 @@ class EventProcessor:
await asyncio.sleep(self._poll_interval) await asyncio.sleep(self._poll_interval)
async def _process_batch(self) -> int: async def _process_batch(self) -> int:
"""Fetch and process a batch of pending events. Returns count processed.""" """Fetch and process a batch of pending activities. Returns count processed."""
processed = 0 processed = 0
async with get_session() as session: async with get_session() as session:
# FOR UPDATE SKIP LOCKED: safe for concurrent processors
stmt = ( stmt = (
select(DomainEvent) select(APActivity)
.where( .where(
DomainEvent.state == "pending", APActivity.process_state == "pending",
DomainEvent.attempts < DomainEvent.max_attempts, APActivity.process_attempts < APActivity.process_max_attempts,
) )
.order_by(DomainEvent.created_at) .order_by(APActivity.created_at)
.limit(self._batch_size) .limit(self._batch_size)
.with_for_update(skip_locked=True) .with_for_update(skip_locked=True)
) )
result = await session.execute(stmt) result = await session.execute(stmt)
events = result.scalars().all() activities = result.scalars().all()
for event in events: for activity in activities:
await self._process_one(session, event) await self._process_one(session, activity)
processed += 1 processed += 1
await session.commit() await session.commit()
return processed return processed
async def _process_one(self, session: AsyncSession, event: DomainEvent) -> None: async def _process_one(self, session: AsyncSession, activity: APActivity) -> None:
"""Run all handlers for a single event.""" """Run all handlers for a single activity."""
handlers = get_handlers(event.event_type) handlers = get_activity_handlers(activity.activity_type, activity.object_type)
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
event.state = "processing" activity.process_state = "processing"
event.attempts += 1 activity.process_attempts += 1
await session.flush() await session.flush()
if not handlers: if not handlers:
# No handlers registered — mark completed (nothing to do) activity.process_state = "completed"
event.state = "completed" activity.processed_at = now
event.processed_at = now
return return
try: try:
for handler in handlers: for handler in handlers:
await handler(event, session) await handler(activity, session)
event.state = "completed" activity.process_state = "completed"
event.processed_at = now activity.processed_at = now
except Exception as exc: except Exception as exc:
event.last_error = f"{exc.__class__.__name__}: {exc}" activity.process_error = f"{exc.__class__.__name__}: {exc}"
if event.attempts >= event.max_attempts: if activity.process_attempts >= activity.process_max_attempts:
event.state = "failed" activity.process_state = "failed"
event.processed_at = now activity.processed_at = now
else: else:
event.state = "pending" # retry activity.process_state = "pending" # retry

View File

@@ -67,8 +67,16 @@ def market_product_url(product_slug: str, suffix: str = "", market_place=None) -
def login_url(next_url: str = "") -> str: def login_url(next_url: str = "") -> str:
# Auth lives in blog (coop) for now. Set AUTH_APP=federation to switch. # Auth lives in blog (coop) for now. Set AUTH_APP=federation to switch.
from quart import session as qsession
auth_app = os.getenv("AUTH_APP", "coop") auth_app = os.getenv("AUTH_APP", "coop")
base = app_url(auth_app, "/auth/login/") base = app_url(auth_app, "/auth/login/")
params: list[str] = []
if next_url: if next_url:
return f"{base}?next={quote(next_url, safe='')}" params.append(f"next={quote(next_url, safe='')}")
# Pass anonymous cart session so the auth app can adopt it on login
cart_sid = qsession.get("cart_sid")
if cart_sid:
params.append(f"cart_sid={quote(cart_sid, safe='')}")
if params:
return f"{base}?{'&'.join(params)}"
return base return base

View File

@@ -8,8 +8,6 @@ from .ghost_membership_entities import (
GhostNewsletter, UserNewsletter, GhostNewsletter, UserNewsletter,
GhostTier, GhostSubscription, GhostTier, GhostSubscription,
) )
from .domain_event import DomainEvent
from .ghost_content import Tag, Post, Author, PostAuthor, PostTag, PostLike from .ghost_content import Tag, Post, Author, PostAuthor, PostTag, PostLike
from .page_config import PageConfig from .page_config import PageConfig
from .order import Order, OrderItem from .order import Order, OrderItem
@@ -29,4 +27,5 @@ from .container_relation import ContainerRelation
from .menu_node import MenuNode from .menu_node import MenuNode
from .federation import ( from .federation import (
ActorProfile, APActivity, APFollower, APInboxItem, APAnchor, IPFSPin, ActorProfile, APActivity, APFollower, APInboxItem, APAnchor, IPFSPin,
RemoteActor, APFollowing, APRemotePost, APLocalPost, APInteraction, APNotification,
) )

View File

@@ -1,30 +0,0 @@
from __future__ import annotations
from datetime import datetime
from sqlalchemy import String, Integer, DateTime, Text, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from shared.db.base import Base
class DomainEvent(Base):
__tablename__ = "domain_events"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
event_type: Mapped[str] = mapped_column(String(128), nullable=False, index=True)
aggregate_type: Mapped[str] = mapped_column(String(64), nullable=False)
aggregate_id: Mapped[int] = mapped_column(Integer, nullable=False)
payload: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
state: Mapped[str] = mapped_column(
String(20), nullable=False, default="pending", server_default="pending", index=True
)
attempts: Mapped[int] = mapped_column(Integer, nullable=False, default=0, server_default="0")
max_attempts: Mapped[int] = mapped_column(Integer, nullable=False, default=5, server_default="5")
last_error: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
def __repr__(self) -> str:
return f"<DomainEvent {self.id} {self.event_type} [{self.state}]>"

View File

@@ -50,14 +50,19 @@ class ActorProfile(Base):
class APActivity(Base): class APActivity(Base):
"""An ActivityPub activity (local or remote).""" """An ActivityPub activity (local or remote).
Also serves as the unified event bus: internal domain events and public
federation activities both live here, distinguished by ``visibility``.
The ``EventProcessor`` polls rows with ``process_state='pending'``.
"""
__tablename__ = "ap_activities" __tablename__ = "ap_activities"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False) activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
activity_type: Mapped[str] = mapped_column(String(64), nullable=False) activity_type: Mapped[str] = mapped_column(String(64), nullable=False)
actor_profile_id: Mapped[int] = mapped_column( actor_profile_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False, Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True,
) )
object_type: Mapped[str | None] = mapped_column(String(64), nullable=True) object_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True) object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
@@ -83,6 +88,27 @@ class APActivity(Base):
DateTime(timezone=True), nullable=False, server_default=func.now(), DateTime(timezone=True), nullable=False, server_default=func.now(),
) )
# --- Unified event-bus columns ---
actor_uri: Mapped[str | None] = mapped_column(
String(512), nullable=True,
)
visibility: Mapped[str] = mapped_column(
String(20), nullable=False, default="public", server_default="public",
)
process_state: Mapped[str] = mapped_column(
String(20), nullable=False, default="completed", server_default="completed",
)
process_attempts: Mapped[int] = mapped_column(
Integer, nullable=False, default=0, server_default="0",
)
process_max_attempts: Mapped[int] = mapped_column(
Integer, nullable=False, default=5, server_default="5",
)
process_error: Mapped[str | None] = mapped_column(Text, nullable=True)
processed_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True,
)
# Relationships # Relationships
actor_profile = relationship("ActorProfile", back_populates="activities") actor_profile = relationship("ActorProfile", back_populates="activities")
@@ -90,6 +116,7 @@ class APActivity(Base):
Index("ix_ap_activity_actor", "actor_profile_id"), Index("ix_ap_activity_actor", "actor_profile_id"),
Index("ix_ap_activity_source", "source_type", "source_id"), Index("ix_ap_activity_source", "source_type", "source_id"),
Index("ix_ap_activity_published", "published"), Index("ix_ap_activity_published", "published"),
Index("ix_ap_activity_process", "process_state"),
) )
def __repr__(self) -> str: def __repr__(self) -> str:
@@ -193,3 +220,207 @@ class IPFSPin(Base):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<IPFSPin {self.id} {self.ipfs_cid[:16]}...>" return f"<IPFSPin {self.id} {self.ipfs_cid[:16]}...>"
class RemoteActor(Base):
"""Cached profile of a remote actor we interact with."""
__tablename__ = "ap_remote_actors"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_url: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
inbox_url: Mapped[str] = mapped_column(String(512), nullable=False)
shared_inbox_url: Mapped[str | None] = mapped_column(String(512), nullable=True)
preferred_username: Mapped[str] = mapped_column(String(255), nullable=False)
display_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
summary: Mapped[str | None] = mapped_column(Text, nullable=True)
icon_url: Mapped[str | None] = mapped_column(String(512), nullable=True)
public_key_pem: Mapped[str | None] = mapped_column(Text, nullable=True)
domain: Mapped[str] = mapped_column(String(255), nullable=False)
fetched_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
__table_args__ = (
Index("ix_ap_remote_actor_url", "actor_url", unique=True),
Index("ix_ap_remote_actor_domain", "domain"),
)
def __repr__(self) -> str:
return f"<RemoteActor {self.id} {self.preferred_username}@{self.domain}>"
class APFollowing(Base):
"""Outbound follow: local actor → remote actor."""
__tablename__ = "ap_following"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
remote_actor_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False,
)
state: Mapped[str] = mapped_column(
String(20), nullable=False, default="pending", server_default="pending",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
accepted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
# Relationships
actor_profile = relationship("ActorProfile")
remote_actor = relationship("RemoteActor")
__table_args__ = (
UniqueConstraint("actor_profile_id", "remote_actor_id", name="uq_following"),
Index("ix_ap_following_actor", "actor_profile_id"),
Index("ix_ap_following_remote", "remote_actor_id"),
)
def __repr__(self) -> str:
return f"<APFollowing {self.id} [{self.state}]>"
class APRemotePost(Base):
"""A federated post ingested from a remote actor."""
__tablename__ = "ap_remote_posts"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
remote_actor_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=False,
)
activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
object_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
object_type: Mapped[str] = mapped_column(String(64), nullable=False, default="Note")
content: Mapped[str | None] = mapped_column(Text, nullable=True)
summary: Mapped[str | None] = mapped_column(Text, nullable=True)
url: Mapped[str | None] = mapped_column(String(512), nullable=True)
attachment_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
tag_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
in_reply_to: Mapped[str | None] = mapped_column(String(512), nullable=True)
conversation: Mapped[str | None] = mapped_column(String(512), nullable=True)
published: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
fetched_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# Relationships
remote_actor = relationship("RemoteActor")
__table_args__ = (
Index("ix_ap_remote_post_actor", "remote_actor_id"),
Index("ix_ap_remote_post_published", "published"),
Index("ix_ap_remote_post_object", "object_id", unique=True),
)
def __repr__(self) -> str:
return f"<APRemotePost {self.id} {self.object_type}>"
class APLocalPost(Base):
"""A native post composed in the federation UI."""
__tablename__ = "ap_local_posts"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
content: Mapped[str] = mapped_column(Text, nullable=False)
visibility: Mapped[str] = mapped_column(
String(20), nullable=False, default="public", server_default="public",
)
in_reply_to: Mapped[str | None] = mapped_column(String(512), nullable=True)
published: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now(),
)
# Relationships
actor_profile = relationship("ActorProfile")
__table_args__ = (
Index("ix_ap_local_post_actor", "actor_profile_id"),
Index("ix_ap_local_post_published", "published"),
)
def __repr__(self) -> str:
return f"<APLocalPost {self.id}>"
class APInteraction(Base):
"""Like or boost (local or remote)."""
__tablename__ = "ap_interactions"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True,
)
remote_actor_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_remote_actors.id", ondelete="CASCADE"), nullable=True,
)
post_type: Mapped[str] = mapped_column(String(20), nullable=False) # local/remote
post_id: Mapped[int] = mapped_column(Integer, nullable=False)
interaction_type: Mapped[str] = mapped_column(String(20), nullable=False) # like/boost
activity_id: Mapped[str | None] = mapped_column(String(512), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
__table_args__ = (
Index("ix_ap_interaction_post", "post_type", "post_id"),
Index("ix_ap_interaction_actor", "actor_profile_id"),
Index("ix_ap_interaction_remote", "remote_actor_id"),
)
def __repr__(self) -> str:
return f"<APInteraction {self.id} {self.interaction_type}>"
class APNotification(Base):
"""Notification for a local actor."""
__tablename__ = "ap_notifications"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actor_profile_id: Mapped[int] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
)
notification_type: Mapped[str] = mapped_column(String(20), nullable=False)
from_remote_actor_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_remote_actors.id", ondelete="SET NULL"), nullable=True,
)
from_actor_profile_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_actor_profiles.id", ondelete="SET NULL"), nullable=True,
)
target_activity_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_activities.id", ondelete="SET NULL"), nullable=True,
)
target_remote_post_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("ap_remote_posts.id", ondelete="SET NULL"), nullable=True,
)
read: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default="false")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now(),
)
# Relationships
actor_profile = relationship("ActorProfile", foreign_keys=[actor_profile_id])
from_remote_actor = relationship("RemoteActor")
from_actor_profile = relationship("ActorProfile", foreign_keys=[from_actor_profile_id])
__table_args__ = (
Index("ix_ap_notification_actor", "actor_profile_id"),
Index("ix_ap_notification_read", "actor_profile_id", "read"),
Index("ix_ap_notification_created", "created_at"),
)

View File

@@ -6,7 +6,7 @@ from shared.db.base import Base
class MenuItem(Base): class MenuItem(Base):
"""Deprecated — kept so the table isn't dropped. Use glue.models.MenuNode.""" """Deprecated — kept so the table isn't dropped. Use shared.models.menu_node.MenuNode."""
__tablename__ = "menu_items" __tablename__ = "menu_items"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)

View File

@@ -371,7 +371,7 @@ class SqlCalendarService:
entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry)) entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry))
return entries_by_post return entries_by_post
# -- writes (absorb glue lifecycle) --------------------------------------- # -- writes ---------------------------------------------------------------
async def adopt_entries_for_user( async def adopt_entries_for_user(
self, session: AsyncSession, user_id: int, session_id: str, self, session: AsyncSession, user_id: int, session_id: str,

File diff suppressed because it is too large Load Diff

View File

@@ -1,13 +1,14 @@
"""Inline federation publication — called at write time, not via async handler. """Inline federation publication — called at write time, not via async handler.
Replaces the old pattern where emit_event("post.published") → async handler → The originating service calls try_publish() directly, which creates the
publish_activity(). Now the originating service calls try_publish() directly, APActivity (with process_state='pending') in the same DB transaction.
which creates the APActivity in the same DB transaction. AP delivery The EventProcessor picks it up and the delivery wildcard handler POSTs
(federation.activity_created → inbox POST) stays async. to follower inboxes.
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
import os
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@@ -48,10 +49,20 @@ async def try_publish(
if existing: if existing:
if activity_type == "Create" and existing.activity_type != "Delete": if activity_type == "Create" and existing.activity_type != "Delete":
return # already published (allow re-Create after Delete/unpublish) return # already published (allow re-Create after Delete/unpublish)
if activity_type == "Update" and existing.activity_type == "Update":
return # already updated (Ghost fires duplicate webhooks)
if activity_type == "Delete" and existing.activity_type == "Delete": if activity_type == "Delete" and existing.activity_type == "Delete":
return # already deleted return # already deleted
elif activity_type == "Delete": elif activity_type in ("Delete", "Update"):
return # never published, nothing to delete return # never published, nothing to delete/update
# Stable object ID: same source always gets the same object id so
# Mastodon treats Create/Update/Delete as the same post.
domain = os.getenv("AP_DOMAIN", "rose-ash.com")
object_data["id"] = (
f"https://{domain}/users/{actor.preferred_username}"
f"/objects/{source_type.lower()}/{source_id}"
)
try: try:
await services.federation.publish_activity( await services.federation.publish_activity(

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
from sqlalchemy import select, func from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import emit_event from shared.events import emit_activity
from shared.models.container_relation import ContainerRelation from shared.models.container_relation import ContainerRelation
@@ -40,17 +40,19 @@ async def attach_child(
if label is not None: if label is not None:
existing.label = label existing.label = label
await session.flush() await session.flush()
await emit_event( await emit_activity(
session, session,
event_type="container.child_attached", activity_type="Add",
aggregate_type="container_relation", actor_uri="internal:system",
aggregate_id=existing.id, object_type="rose:ContainerRelation",
payload={ object_data={
"parent_type": parent_type, "parent_type": parent_type,
"parent_id": parent_id, "parent_id": parent_id,
"child_type": child_type, "child_type": child_type,
"child_id": child_id, "child_id": child_id,
}, },
source_type="container_relation",
source_id=existing.id,
) )
return existing return existing
# Already attached and active — no-op # Already attached and active — no-op
@@ -77,17 +79,19 @@ async def attach_child(
session.add(rel) session.add(rel)
await session.flush() await session.flush()
await emit_event( await emit_activity(
session, session,
event_type="container.child_attached", activity_type="Add",
aggregate_type="container_relation", actor_uri="internal:system",
aggregate_id=rel.id, object_type="rose:ContainerRelation",
payload={ object_data={
"parent_type": parent_type, "parent_type": parent_type,
"parent_id": parent_id, "parent_id": parent_id,
"child_type": child_type, "child_type": child_type,
"child_id": child_id, "child_id": child_id,
}, },
source_type="container_relation",
source_id=rel.id,
) )
return rel return rel
@@ -139,17 +143,19 @@ async def detach_child(
rel.deleted_at = func.now() rel.deleted_at = func.now()
await session.flush() await session.flush()
await emit_event( await emit_activity(
session, session,
event_type="container.child_detached", activity_type="Remove",
aggregate_type="container_relation", actor_uri="internal:system",
aggregate_id=rel.id, object_type="rose:ContainerRelation",
payload={ object_data={
"parent_type": parent_type, "parent_type": parent_type,
"parent_id": parent_id, "parent_id": parent_id,
"child_type": child_type, "child_type": child_type,
"child_id": child_id, "child_id": child_id,
}, },
source_type="container_relation",
source_id=rel.id,
) )
return True return True

View File

@@ -227,5 +227,71 @@ class StubFederationService:
async def remove_follower(self, session, username, follower_acct): async def remove_follower(self, session, username, follower_acct):
return False return False
async def get_or_fetch_remote_actor(self, session, actor_url):
return None
async def search_remote_actor(self, session, acct):
return None
async def send_follow(self, session, local_username, remote_actor_url):
raise RuntimeError("FederationService not available")
async def get_following(self, session, username, page=1, per_page=20):
return [], 0
async def get_followers_paginated(self, session, username, page=1, per_page=20):
return [], 0
async def accept_follow_response(self, session, local_username, remote_actor_url):
pass
async def unfollow(self, session, local_username, remote_actor_url):
pass
async def ingest_remote_post(self, session, remote_actor_id, activity_json, object_json):
pass
async def delete_remote_post(self, session, object_id):
pass
async def get_remote_post(self, session, object_id):
return None
async def get_home_timeline(self, session, actor_profile_id, before=None, limit=20):
return []
async def get_public_timeline(self, session, before=None, limit=20):
return []
async def get_actor_timeline(self, session, remote_actor_id, before=None, limit=20):
return []
async def create_local_post(self, session, actor_profile_id, content, visibility="public", in_reply_to=None):
raise RuntimeError("FederationService not available")
async def delete_local_post(self, session, actor_profile_id, post_id):
raise RuntimeError("FederationService not available")
async def like_post(self, session, actor_profile_id, object_id, author_inbox):
pass
async def unlike_post(self, session, actor_profile_id, object_id, author_inbox):
pass
async def boost_post(self, session, actor_profile_id, object_id, author_inbox):
pass
async def unboost_post(self, session, actor_profile_id, object_id, author_inbox):
pass
async def get_notifications(self, session, actor_profile_id, before=None, limit=20):
return []
async def unread_notification_count(self, session, actor_profile_id):
return 0
async def mark_notifications_read(self, session, actor_profile_id):
pass
async def get_stats(self, session): async def get_stats(self, session):
return {"actors": 0, "activities": 0, "followers": 0} return {"actors": 0, "activities": 0, "followers": 0}

View File

@@ -145,7 +145,7 @@ async def upgrade_ots_proof(proof_bytes: bytes) -> tuple[bytes, bool]:
""" """
# OpenTimestamps upgrade is done via the `ots` CLI or their calendar API. # OpenTimestamps upgrade is done via the `ots` CLI or their calendar API.
# For now, return the proof as-is with is_confirmed=False. # For now, return the proof as-is with is_confirmed=False.
# TODO: Implement calendar-based upgrade polling. # Calendar-based upgrade polling not yet implemented.
return proof_bytes, False return proof_bytes, False

68
utils/webfinger.py Normal file
View File

@@ -0,0 +1,68 @@
"""WebFinger client for resolving remote AP actor profiles."""
from __future__ import annotations
import logging
import httpx
log = logging.getLogger(__name__)
AP_CONTENT_TYPE = "application/activity+json"
async def resolve_actor(acct: str) -> dict | None:
"""Resolve user@domain to actor JSON via WebFinger + actor fetch.
Args:
acct: Handle in the form ``user@domain`` (no leading ``@``).
Returns:
Actor JSON-LD dict, or None if resolution fails.
"""
acct = acct.lstrip("@")
if "@" not in acct:
return None
_, domain = acct.rsplit("@", 1)
webfinger_url = f"https://{domain}/.well-known/webfinger"
try:
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
# Step 1: WebFinger lookup
resp = await client.get(
webfinger_url,
params={"resource": f"acct:{acct}"},
headers={"Accept": "application/jrd+json, application/json"},
)
if resp.status_code != 200:
log.debug("WebFinger %s returned %d", webfinger_url, resp.status_code)
return None
data = resp.json()
# Find self link with AP content type
actor_url = None
for link in data.get("links", []):
if link.get("rel") == "self" and link.get("type") in (
AP_CONTENT_TYPE,
"application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\"",
):
actor_url = link.get("href")
break
if not actor_url:
log.debug("No AP self link in WebFinger response for %s", acct)
return None
# Step 2: Fetch actor JSON
resp = await client.get(
actor_url,
headers={"Accept": AP_CONTENT_TYPE},
)
if resp.status_code == 200:
return resp.json()
log.debug("Actor fetch %s returned %d", actor_url, resp.status_code)
except Exception:
log.exception("WebFinger resolution failed for %s", acct)
return None