Compare commits
6 Commits
widget-pha
...
7de4a2e40e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7de4a2e40e | ||
|
|
1c1ab3576f | ||
|
|
07aa2e2be9 | ||
|
|
2e9db11925 | ||
|
|
d697709f60 | ||
|
|
04f7c5e85c |
80
README.md
80
README.md
@@ -1,6 +1,6 @@
|
||||
# Shared
|
||||
|
||||
Shared infrastructure, models, templates, and configuration used by all four Rose Ash microservices (blog, market, cart, events). Included as a git submodule in each app.
|
||||
Shared infrastructure, models, contracts, services, and templates used by all five Rose Ash microservices (blog, market, cart, events, federation). Included as a git submodule in each app.
|
||||
|
||||
## Structure
|
||||
|
||||
@@ -8,53 +8,78 @@ Shared infrastructure, models, templates, and configuration used by all four Ros
|
||||
shared/
|
||||
db/
|
||||
base.py # SQLAlchemy declarative Base
|
||||
session.py # Async session factory (get_session)
|
||||
models/ # Shared domain models
|
||||
session.py # Async session factory (get_session, register_db)
|
||||
models/ # Canonical domain models
|
||||
user.py # User
|
||||
magic_link.py # MagicLink (auth tokens)
|
||||
domain_event.py # DomainEvent (transactional outbox)
|
||||
domain_event.py # DomainEvent (legacy — being removed)
|
||||
kv.py # KeyValue (key-value store)
|
||||
menu_item.py # MenuItem
|
||||
menu_item.py # MenuItem (deprecated — use MenuNode)
|
||||
menu_node.py # MenuNode (navigation tree)
|
||||
container_relation.py # ContainerRelation (parent-child content)
|
||||
ghost_membership_entities.py # GhostNewsletter, UserNewsletter
|
||||
federation.py # ActorProfile, APActivity, APFollower, APFollowing,
|
||||
# RemoteActor, APRemotePost, APLocalPost,
|
||||
# APInteraction, APNotification, APAnchor, IPFSPin
|
||||
contracts/
|
||||
dtos.py # Frozen dataclasses for cross-domain data transfer
|
||||
protocols.py # Service protocols (Blog, Calendar, Market, Cart, Federation)
|
||||
widgets.py # Widget types (NavWidget, CardWidget, AccountPageWidget)
|
||||
services/
|
||||
registry.py # Typed singleton: services.blog, .calendar, .market, .cart, .federation
|
||||
blog_impl.py # SqlBlogService
|
||||
calendar_impl.py # SqlCalendarService
|
||||
market_impl.py # SqlMarketService
|
||||
cart_impl.py # SqlCartService
|
||||
federation_impl.py # SqlFederationService
|
||||
federation_publish.py # try_publish() — inline AP publication helper
|
||||
stubs.py # No-op stubs for absent domains
|
||||
navigation.py # get_navigation_tree()
|
||||
relationships.py # attach_child, get_children, detach_child
|
||||
widget_registry.py # Widget registry singleton
|
||||
widgets/ # Per-domain widget registration
|
||||
infrastructure/
|
||||
factory.py # create_base_app() — Quart app factory
|
||||
cart_identity.py # current_cart_identity() (user_id or session_id)
|
||||
cart_loader.py # Cart data loader for context processors
|
||||
context.py # Jinja2 context processors
|
||||
internal_api.py # Inter-app HTTP client (get/post via httpx)
|
||||
jinja_setup.py # Jinja2 template environment setup
|
||||
urls.py # URL helpers (coop_url, market_url, etc.)
|
||||
user_loader.py # Load current user from session
|
||||
http_utils.py # HTTP utility functions
|
||||
events/
|
||||
bus.py # emit_event(), register_handler()
|
||||
processor.py # EventProcessor (polls domain_events, runs handlers)
|
||||
browser/app/
|
||||
csrf.py # CSRF protection
|
||||
errors.py # Error handlers
|
||||
middleware.py # Request/response middleware
|
||||
redis_cacher.py # Tag-based Redis page caching
|
||||
authz.py # Authorization helpers
|
||||
filters/ # Jinja2 template filters (currency, truncate, etc.)
|
||||
utils/ # HTMX helpers, UTC time, parsing
|
||||
payments/sumup.py # SumUp checkout API integration
|
||||
browser/templates/ # ~300 Jinja2 templates shared across all apps
|
||||
config.py # YAML config loader
|
||||
bus.py # emit_activity(), register_activity_handler()
|
||||
processor.py # EventProcessor (polls ap_activities, runs handlers)
|
||||
handlers/ # Shared activity handlers
|
||||
container_handlers.py # Navigation rebuild on attach/detach
|
||||
login_handlers.py # Cart/entry adoption on login
|
||||
order_handlers.py # Order lifecycle events
|
||||
ap_delivery_handler.py # AP activity delivery to follower inboxes (wildcard)
|
||||
utils/
|
||||
__init__.py
|
||||
calendar_helpers.py # Calendar period/entry utilities
|
||||
http_signatures.py # RSA keypair generation, HTTP signature signing/verification
|
||||
ipfs_client.py # Async IPFS client (add_bytes, add_json, pin_cid)
|
||||
anchoring.py # Merkle trees + OpenTimestamps Bitcoin anchoring
|
||||
webfinger.py # WebFinger actor resolution
|
||||
browser/
|
||||
app/ # Middleware, CSRF, errors, Redis caching, authz, filters
|
||||
templates/ # ~300 Jinja2 templates shared across all apps
|
||||
containers.py # ContainerType, container_filter, content_filter helpers
|
||||
config.py # YAML config loader
|
||||
log_config/setup.py # Logging configuration (JSON formatter)
|
||||
utils.py # host_url and other shared utilities
|
||||
static/ # Shared static assets (CSS, JS, images, FontAwesome)
|
||||
editor/ # Koenig (Ghost) rich text editor build
|
||||
alembic/ # Database migrations (25 versions)
|
||||
env.py # Imports models from all apps (with try/except guards)
|
||||
versions/ # Migration files — single head: j0h8e4f6g7
|
||||
alembic/ # Database migrations
|
||||
```
|
||||
|
||||
## Key Patterns
|
||||
|
||||
- **App factory:** All apps call `create_base_app()` which sets up DB sessions, CSRF, error handling, event processing, logging, and the glue handler registry.
|
||||
- **Event bus:** `emit_event()` writes to `domain_events` table in the caller's transaction. `EventProcessor` polls and dispatches to registered handlers.
|
||||
- **Inter-app HTTP:** `internal_api.get/post("cart", "/internal/cart/summary")` for cross-app reads. URLs resolved from `app-config.yaml`.
|
||||
- **App factory:** All apps call `create_base_app()` which sets up DB sessions, CSRF, error handling, event processing, logging, widget registration, and domain service wiring.
|
||||
- **Service contracts:** Cross-domain communication via typed Protocols + frozen DTO dataclasses. Apps call `services.calendar.method()`, never import models from other domains.
|
||||
- **Service registry:** Typed singleton (`services.blog`, `.calendar`, `.market`, `.cart`, `.federation`). Apps wire their own domain + stubs for others via `register_domain_services()`.
|
||||
- **Activity bus:** `emit_activity()` writes to `ap_activities` table in the caller's transaction. `EventProcessor` polls pending activities and dispatches to registered handlers. Internal events use `visibility="internal"`; federation activities use `visibility="public"` and are delivered to follower inboxes by the wildcard delivery handler.
|
||||
- **Widget registry:** Domain services register widgets (nav, card, account); templates consume via `widgets.container_nav`, `widgets.container_cards`.
|
||||
- **Cart identity:** `current_cart_identity()` returns `{"user_id": int|None, "session_id": str|None}` from the request session.
|
||||
|
||||
## Alembic Migrations
|
||||
@@ -62,8 +87,5 @@ shared/
|
||||
All apps share one PostgreSQL database. Migrations are managed here and run from the blog app's entrypoint (other apps skip migrations on startup).
|
||||
|
||||
```bash
|
||||
# From any app directory (shared/ must be on sys.path)
|
||||
alembic -c shared/alembic.ini upgrade head
|
||||
```
|
||||
|
||||
Current head: `j0h8e4f6g7` (drop cross-domain FK constraints).
|
||||
|
||||
@@ -1 +1 @@
|
||||
# shared package — extracted from blog/shared_lib/
|
||||
# shared package — infrastructure, models, contracts, and services
|
||||
|
||||
@@ -19,7 +19,7 @@ from shared.db.base import Base
|
||||
|
||||
# Import ALL models so Base.metadata sees every table
|
||||
import shared.models # noqa: F401 User, KV, MagicLink, MenuItem, Ghost*
|
||||
for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models", "glue.models"):
|
||||
for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models"):
|
||||
try:
|
||||
__import__(_mod)
|
||||
except ImportError:
|
||||
|
||||
113
alembic/versions/m3k1h7i9j0_add_activity_bus_columns.py
Normal file
113
alembic/versions/m3k1h7i9j0_add_activity_bus_columns.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""add unified event bus columns to ap_activities
|
||||
|
||||
Revision ID: m3k1h7i9j0
|
||||
Revises: l2j0g6h8i9
|
||||
Create Date: 2026-02-22
|
||||
|
||||
Adds processing and visibility columns so ap_activities can serve as the
|
||||
unified event bus for both internal domain events and federation delivery.
|
||||
"""
|
||||
|
||||
revision = "m3k1h7i9j0"
|
||||
down_revision = "l2j0g6h8i9"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# Add new columns with defaults so existing rows stay valid
|
||||
op.add_column(
|
||||
"ap_activities",
|
||||
sa.Column("actor_uri", sa.String(512), nullable=True),
|
||||
)
|
||||
op.add_column(
|
||||
"ap_activities",
|
||||
sa.Column(
|
||||
"visibility", sa.String(20),
|
||||
nullable=False, server_default="public",
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"ap_activities",
|
||||
sa.Column(
|
||||
"process_state", sa.String(20),
|
||||
nullable=False, server_default="completed",
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"ap_activities",
|
||||
sa.Column(
|
||||
"process_attempts", sa.Integer(),
|
||||
nullable=False, server_default="0",
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"ap_activities",
|
||||
sa.Column(
|
||||
"process_max_attempts", sa.Integer(),
|
||||
nullable=False, server_default="5",
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"ap_activities",
|
||||
sa.Column("process_error", sa.Text(), nullable=True),
|
||||
)
|
||||
op.add_column(
|
||||
"ap_activities",
|
||||
sa.Column(
|
||||
"processed_at", sa.DateTime(timezone=True), nullable=True,
|
||||
),
|
||||
)
|
||||
|
||||
# Backfill actor_uri from the related actor_profile
|
||||
op.execute(
|
||||
"""
|
||||
UPDATE ap_activities a
|
||||
SET actor_uri = CONCAT(
|
||||
'https://',
|
||||
COALESCE(current_setting('app.ap_domain', true), 'rose-ash.com'),
|
||||
'/users/',
|
||||
p.preferred_username
|
||||
)
|
||||
FROM ap_actor_profiles p
|
||||
WHERE a.actor_profile_id = p.id
|
||||
AND a.actor_uri IS NULL
|
||||
"""
|
||||
)
|
||||
|
||||
# Make actor_profile_id nullable (internal events have no actor profile)
|
||||
op.alter_column(
|
||||
"ap_activities", "actor_profile_id",
|
||||
existing_type=sa.Integer(),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
# Index for processor polling
|
||||
op.create_index(
|
||||
"ix_ap_activity_process", "ap_activities", ["process_state"],
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_index("ix_ap_activity_process", table_name="ap_activities")
|
||||
|
||||
# Restore actor_profile_id NOT NULL (remove any rows without it first)
|
||||
op.execute(
|
||||
"DELETE FROM ap_activities WHERE actor_profile_id IS NULL"
|
||||
)
|
||||
op.alter_column(
|
||||
"ap_activities", "actor_profile_id",
|
||||
existing_type=sa.Integer(),
|
||||
nullable=False,
|
||||
)
|
||||
|
||||
op.drop_column("ap_activities", "processed_at")
|
||||
op.drop_column("ap_activities", "process_error")
|
||||
op.drop_column("ap_activities", "process_max_attempts")
|
||||
op.drop_column("ap_activities", "process_attempts")
|
||||
op.drop_column("ap_activities", "process_state")
|
||||
op.drop_column("ap_activities", "visibility")
|
||||
op.drop_column("ap_activities", "actor_uri")
|
||||
46
alembic/versions/n4l2i8j0k1_drop_domain_events_table.py
Normal file
46
alembic/versions/n4l2i8j0k1_drop_domain_events_table.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""drop domain_events table
|
||||
|
||||
Revision ID: n4l2i8j0k1
|
||||
Revises: m3k1h7i9j0
|
||||
Create Date: 2026-02-22
|
||||
|
||||
The domain_events table is no longer used — all events now flow through
|
||||
ap_activities with the unified activity bus.
|
||||
"""
|
||||
|
||||
revision = "n4l2i8j0k1"
|
||||
down_revision = "m3k1h7i9j0"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.drop_index("ix_domain_events_state", table_name="domain_events")
|
||||
op.drop_index("ix_domain_events_event_type", table_name="domain_events")
|
||||
op.drop_table("domain_events")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.create_table(
|
||||
"domain_events",
|
||||
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
|
||||
sa.Column("event_type", sa.String(128), nullable=False),
|
||||
sa.Column("aggregate_type", sa.String(64), nullable=False),
|
||||
sa.Column("aggregate_id", sa.Integer(), nullable=False),
|
||||
sa.Column("payload", JSONB(), nullable=True),
|
||||
sa.Column("state", sa.String(20), nullable=False, server_default="pending"),
|
||||
sa.Column("attempts", sa.Integer(), nullable=False, server_default="0"),
|
||||
sa.Column("max_attempts", sa.Integer(), nullable=False, server_default="5"),
|
||||
sa.Column("last_error", sa.Text(), nullable=True),
|
||||
sa.Column(
|
||||
"created_at", sa.DateTime(timezone=True),
|
||||
nullable=False, server_default=sa.func.now(),
|
||||
),
|
||||
sa.Column("processed_at", sa.DateTime(timezone=True), nullable=True),
|
||||
)
|
||||
op.create_index("ix_domain_events_event_type", "domain_events", ["event_type"])
|
||||
op.create_index("ix_domain_events_state", "domain_events", ["state"])
|
||||
@@ -1,169 +0,0 @@
|
||||
{% macro show_cart(oob=False) %}
|
||||
<div id="cart" {% if oob %} hx-swap-oob="{{oob}}" {% endif%}>
|
||||
{# Empty cart #}
|
||||
{% if not cart and not calendar_cart_entries %}
|
||||
<div class="rounded-2xl border border-dashed border-stone-300 bg-white/80 p-6 sm:p-8 text-center">
|
||||
<div class="inline-flex h-10 w-10 sm:h-12 sm:w-12 items-center justify-center rounded-full bg-stone-100 mb-3">
|
||||
<i class="fa fa-shopping-cart text-stone-500 text-sm sm:text-base" aria-hidden="true"></i>
|
||||
</div>
|
||||
<p class="text-base sm:text-lg font-medium text-stone-800">
|
||||
Your cart is empty
|
||||
</p>
|
||||
{#
|
||||
<p class="mt-1 text-xs sm:text-sm text-stone-600">
|
||||
Add some items from the shop to see them here.
|
||||
</p>
|
||||
<div class="mt-4">
|
||||
<a
|
||||
href="{{ market_url('/') }}"
|
||||
class="inline-flex items-center px-4 py-2 text-sm font-semibold rounded-full bg-emerald-600 text-white hover:bg-emerald-700"
|
||||
>
|
||||
Browse products
|
||||
</a>
|
||||
</div> #}
|
||||
</div>
|
||||
|
||||
{% else %}
|
||||
|
||||
<div _class="grid gap-y-6 lg:gap-8 lg:grid-cols-[minmax(0,2fr),minmax(0,1fr)]">
|
||||
{# Items list #}
|
||||
<section class="space-y-3 sm:space-y-4">
|
||||
{% for item in cart %}
|
||||
{% from '_types/product/_cart.html' import cart_item with context %}
|
||||
{{ cart_item()}}
|
||||
{% endfor %}
|
||||
{% if calendar_cart_entries %}
|
||||
<div class="mt-6 border-t border-stone-200 pt-4">
|
||||
<h2 class="text-base font-semibold mb-2">
|
||||
Calendar bookings
|
||||
</h2>
|
||||
|
||||
<ul class="space-y-2">
|
||||
{% for entry in calendar_cart_entries %}
|
||||
<li class="flex items-start justify-between text-sm">
|
||||
<div>
|
||||
<div class="font-medium">
|
||||
{{ entry.name or entry.calendar_name }}
|
||||
</div>
|
||||
<div class="text-xs text-stone-500">
|
||||
{{ entry.start_at }}
|
||||
{% if entry.end_at %}
|
||||
– {{ entry.end_at }}
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
<div class="ml-4 font-medium">
|
||||
£{{ "%.2f"|format(entry.cost or 0) }}
|
||||
</div>
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endif %}
|
||||
</section>
|
||||
{{summary(cart, total, calendar_total, calendar_cart_entries,)}}
|
||||
|
||||
</div>
|
||||
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro summary(cart, total, calendar_total, calendar_cart_entries, oob=False) %}
|
||||
<aside id="cart-summary" class="lg:pl-2" {% if oob %} hx-swap-oob="{{oob}}" {% endif %}>
|
||||
<div class="rounded-2xl bg-white shadow-sm border border-stone-200 p-4 sm:p-5">
|
||||
<h2 class="text-sm sm:text-base font-semibold text-stone-900 mb-3 sm:mb-4">
|
||||
Order summary
|
||||
</h2>
|
||||
|
||||
<dl class="space-y-2 text-xs sm:text-sm">
|
||||
<div class="flex items-center justify-between">
|
||||
<dt class="text-stone-600">Items</dt>
|
||||
<dd class="text-stone-900">
|
||||
{{ cart | sum(attribute="quantity") }}
|
||||
</dd>
|
||||
</div>
|
||||
<div class="flex items-center justify-between">
|
||||
<dt class="text-stone-600">Subtotal</dt>
|
||||
<dd class="text-stone-900">
|
||||
{{ cart_grand_total(cart, total, calendar_total, calendar_cart_entries ) }}
|
||||
</dd>
|
||||
</div>
|
||||
</dl>
|
||||
<div class="flex flex-col items-center w-full">
|
||||
<h1 class="text-5xl mt-2">
|
||||
This is a test - it will not take actual money
|
||||
</h1>
|
||||
<div>
|
||||
use dummy card number: 5555 5555 5555 4444
|
||||
</div>
|
||||
</div>
|
||||
<div class="mt-4 sm:mt-5">
|
||||
{% if g.user %}
|
||||
<form
|
||||
method="post"
|
||||
action="{{ page_cart_url(page_post.slug, '/checkout/') if page_post is defined and page_post else cart_url('/checkout/') }}"
|
||||
class="w-full"
|
||||
>
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
|
||||
<button
|
||||
type="submit"
|
||||
class="w-full inline-flex items-center justify-center px-4 py-2 text-xs sm:text-sm rounded-full border border-emerald-600 bg-emerald-600 text-white hover:bg-emerald-700 transition"
|
||||
>
|
||||
<i class="fa-solid fa-credit-card mr-2" aria-hidden="true"></i>
|
||||
Checkout as {{g.user.email}}
|
||||
</button>
|
||||
</form>
|
||||
{% else %}
|
||||
{% set href=login_url(request.url) %}
|
||||
<div
|
||||
class="w-full flex"
|
||||
>
|
||||
<a
|
||||
href="{{ href }}"
|
||||
hx-get="{{ href }}"
|
||||
hx-target="#main-panel"
|
||||
hx-select ="{{hx_select_search}}"
|
||||
hx-swap="outerHTML"
|
||||
hx-push-url="true"
|
||||
aria-selected="{{ 'true' if local_href == request.path else 'false' }}"
|
||||
class="w-full cursor-pointer flex flex-row items-center justify-center p-3 gap-2 rounded bg-stone-200 text-black {{select_colours}}"
|
||||
data-close-details
|
||||
>
|
||||
<i class="fa-solid fa-key"></i>
|
||||
<span>sign in or register to checkout</span>
|
||||
|
||||
</a>
|
||||
</div>
|
||||
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
</aside>
|
||||
{% endmacro %}
|
||||
|
||||
{% macro cart_total(cart, total) %}
|
||||
{% set cart_total = total(cart) %}
|
||||
{% if cart_total %}
|
||||
{% set symbol = "£" if cart[0].product.regular_price_currency == "GBP" else cart[0].product.regular_price_currency %}
|
||||
{{ symbol }}{{ "%.2f"|format(cart_total) }}
|
||||
{% else %}
|
||||
–
|
||||
{% endif %}
|
||||
{% endmacro %}
|
||||
|
||||
|
||||
{% macro cart_grand_total(cart, total, calendar_total, calendar_cart_entries) %}
|
||||
{% set product_total = total(cart) or 0 %}
|
||||
{% set cal_total = calendar_total(calendar_cart_entries) or 0 %}
|
||||
{% set grand = product_total + cal_total %}
|
||||
|
||||
{% if cart and cart[0].product.regular_price_currency %}
|
||||
{% set symbol = "£" if cart[0].product.regular_price_currency == "GBP" else cart[0].product.regular_price_currency %}
|
||||
{% else %}
|
||||
{% set symbol = "£" %}
|
||||
{% endif %}
|
||||
|
||||
{{ symbol }}{{ "%.2f"|format(grand) }}
|
||||
{% endmacro %}
|
||||
@@ -221,6 +221,11 @@ class FederationService(Protocol):
|
||||
self, session: AsyncSession, username: str,
|
||||
) -> list[APFollowerDTO]: ...
|
||||
|
||||
async def get_followers_paginated(
|
||||
self, session: AsyncSession, username: str,
|
||||
page: int = 1, per_page: int = 20,
|
||||
) -> tuple[list[RemoteActorDTO], int]: ...
|
||||
|
||||
async def add_follower(
|
||||
self, session: AsyncSession, username: str,
|
||||
follower_acct: str, follower_inbox: str, follower_actor_url: str,
|
||||
@@ -283,6 +288,11 @@ class FederationService(Protocol):
|
||||
before: datetime | None = None, limit: int = 20,
|
||||
) -> list[TimelineItemDTO]: ...
|
||||
|
||||
async def get_actor_timeline(
|
||||
self, session: AsyncSession, remote_actor_id: int,
|
||||
before: datetime | None = None, limit: int = 20,
|
||||
) -> list[TimelineItemDTO]: ...
|
||||
|
||||
# -- Local posts ----------------------------------------------------------
|
||||
async def create_local_post(
|
||||
self, session: AsyncSession, actor_profile_id: int,
|
||||
|
||||
@@ -15,7 +15,7 @@ _engine = create_async_engine(
|
||||
future=True,
|
||||
echo=False,
|
||||
pool_pre_ping=True,
|
||||
pool_size=-1 # ned to look at this!!!
|
||||
pool_size=0, # 0 = unlimited (NullPool equivalent for asyncpg)
|
||||
)
|
||||
|
||||
_Session = async_sessionmaker(
|
||||
@@ -34,43 +34,42 @@ async def get_session():
|
||||
await sess.close()
|
||||
|
||||
|
||||
|
||||
def register_db(app: Quart):
|
||||
|
||||
@app.before_request
|
||||
async def open_session():
|
||||
g.s = _Session()
|
||||
g.tx = await g.s.begin()
|
||||
g.had_error = False
|
||||
@app.before_request
|
||||
async def open_session():
|
||||
g.s = _Session()
|
||||
g.tx = await g.s.begin()
|
||||
g.had_error = False
|
||||
|
||||
@app.after_request
|
||||
async def maybe_commit(response):
|
||||
# Runs BEFORE bytes are sent.
|
||||
if not g.had_error and 200 <= response.status_code < 400:
|
||||
try:
|
||||
if hasattr(g, "tx"):
|
||||
await g.tx.commit()
|
||||
except Exception as e:
|
||||
print(f'commit failed {e}')
|
||||
if hasattr(g, "tx"):
|
||||
await g.tx.rollback()
|
||||
from quart import make_response
|
||||
return await make_response("Commit failed", 500)
|
||||
return response
|
||||
@app.after_request
|
||||
async def maybe_commit(response):
|
||||
# Runs BEFORE bytes are sent.
|
||||
if not g.had_error and 200 <= response.status_code < 400:
|
||||
try:
|
||||
if hasattr(g, "tx"):
|
||||
await g.tx.commit()
|
||||
except Exception as e:
|
||||
print(f'commit failed {e}')
|
||||
if hasattr(g, "tx"):
|
||||
await g.tx.rollback()
|
||||
from quart import make_response
|
||||
return await make_response("Commit failed", 500)
|
||||
return response
|
||||
|
||||
@app.teardown_request
|
||||
async def finish(exc):
|
||||
try:
|
||||
# If an exception occurred OR we didn't commit (still in txn), roll back.
|
||||
if hasattr(g, "s"):
|
||||
if exc is not None or g.s.in_transaction():
|
||||
if hasattr(g, "tx"):
|
||||
await g.tx.rollback()
|
||||
finally:
|
||||
if hasattr(g, "s"):
|
||||
await g.s.close()
|
||||
@app.teardown_request
|
||||
async def finish(exc):
|
||||
try:
|
||||
# If an exception occurred OR we didn't commit (still in txn), roll back.
|
||||
if hasattr(g, "s"):
|
||||
if exc is not None or g.s.in_transaction():
|
||||
if hasattr(g, "tx"):
|
||||
await g.tx.rollback()
|
||||
finally:
|
||||
if hasattr(g, "s"):
|
||||
await g.s.close()
|
||||
|
||||
@app.errorhandler(Exception)
|
||||
async def mark_error(e):
|
||||
g.had_error = True
|
||||
raise
|
||||
@app.errorhandler(Exception)
|
||||
async def mark_error(e):
|
||||
g.had_error = True
|
||||
raise
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
from .bus import emit_event, register_handler
|
||||
from .bus import emit_activity, register_activity_handler, get_activity_handlers
|
||||
from .processor import EventProcessor
|
||||
|
||||
__all__ = ["emit_event", "register_handler", "EventProcessor"]
|
||||
__all__ = [
|
||||
"emit_activity",
|
||||
"register_activity_handler",
|
||||
"get_activity_handlers",
|
||||
"EventProcessor",
|
||||
]
|
||||
|
||||
127
events/bus.py
127
events/bus.py
@@ -1,56 +1,109 @@
|
||||
"""
|
||||
Transactional outbox event bus.
|
||||
Unified activity bus.
|
||||
|
||||
emit_event() writes to the domain_events table within the caller's existing
|
||||
DB transaction — atomic with whatever domain change triggered the event.
|
||||
emit_activity() writes an APActivity row with process_state='pending' within
|
||||
the caller's existing DB transaction — atomic with the domain change.
|
||||
|
||||
register_handler() registers async handler functions that the EventProcessor
|
||||
will call when processing events of a given type.
|
||||
register_activity_handler() registers async handler functions that the
|
||||
EventProcessor dispatches when processing pending activities.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from typing import Any, Awaitable, Callable, Dict, List
|
||||
from typing import Awaitable, Callable, Dict, List, Tuple
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from shared.models.federation import APActivity
|
||||
|
||||
# handler signature: async def handler(event: DomainEvent, session: AsyncSession) -> None
|
||||
HandlerFn = Callable[[DomainEvent, AsyncSession], Awaitable[None]]
|
||||
# ---------------------------------------------------------------------------
|
||||
# Activity-handler registry
|
||||
# ---------------------------------------------------------------------------
|
||||
# Handler signature: async def handler(activity: APActivity, session: AsyncSession) -> None
|
||||
ActivityHandlerFn = Callable[[APActivity, AsyncSession], Awaitable[None]]
|
||||
|
||||
_handlers: Dict[str, List[HandlerFn]] = defaultdict(list)
|
||||
# Keyed by (activity_type, object_type). object_type="*" is wildcard.
|
||||
_activity_handlers: Dict[Tuple[str, str], List[ActivityHandlerFn]] = defaultdict(list)
|
||||
|
||||
|
||||
async def emit_event(
|
||||
def register_activity_handler(
|
||||
activity_type: str,
|
||||
fn: ActivityHandlerFn,
|
||||
*,
|
||||
object_type: str | None = None,
|
||||
) -> None:
|
||||
"""Register an async handler for an activity type + optional object type.
|
||||
|
||||
Use ``activity_type="*"`` as a wildcard that fires for every activity
|
||||
(e.g. federation delivery handler).
|
||||
"""
|
||||
key = (activity_type, object_type or "*")
|
||||
_activity_handlers[key].append(fn)
|
||||
|
||||
|
||||
def get_activity_handlers(
|
||||
activity_type: str,
|
||||
object_type: str | None = None,
|
||||
) -> List[ActivityHandlerFn]:
|
||||
"""Return all matching handlers for an activity.
|
||||
|
||||
Matches in order:
|
||||
1. Exact (activity_type, object_type)
|
||||
2. (activity_type, "*") — type-level wildcard
|
||||
3. ("*", "*") — global wildcard (e.g. delivery)
|
||||
"""
|
||||
handlers: List[ActivityHandlerFn] = []
|
||||
ot = object_type or "*"
|
||||
|
||||
# Exact match
|
||||
if ot != "*":
|
||||
handlers.extend(_activity_handlers.get((activity_type, ot), []))
|
||||
# Type-level wildcard
|
||||
handlers.extend(_activity_handlers.get((activity_type, "*"), []))
|
||||
# Global wildcard
|
||||
if activity_type != "*":
|
||||
handlers.extend(_activity_handlers.get(("*", "*"), []))
|
||||
|
||||
return handlers
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# emit_activity — the primary way to emit events
|
||||
# ---------------------------------------------------------------------------
|
||||
async def emit_activity(
|
||||
session: AsyncSession,
|
||||
event_type: str,
|
||||
aggregate_type: str,
|
||||
aggregate_id: int,
|
||||
payload: Dict[str, Any] | None = None,
|
||||
) -> DomainEvent:
|
||||
*,
|
||||
activity_type: str,
|
||||
actor_uri: str,
|
||||
object_type: str,
|
||||
object_data: dict | None = None,
|
||||
source_type: str | None = None,
|
||||
source_id: int | None = None,
|
||||
visibility: str = "internal",
|
||||
actor_profile_id: int | None = None,
|
||||
) -> APActivity:
|
||||
"""
|
||||
Write a domain event to the outbox table in the current transaction.
|
||||
Write an AP-shaped activity to ap_activities with process_state='pending'.
|
||||
|
||||
Call this inside your service function, using the same session that
|
||||
performs the domain change. The event and the change commit together.
|
||||
Called inside a service function using the same session that performs the
|
||||
domain change. The activity and the change commit together.
|
||||
"""
|
||||
event = DomainEvent(
|
||||
event_type=event_type,
|
||||
aggregate_type=aggregate_type,
|
||||
aggregate_id=aggregate_id,
|
||||
payload=payload or {},
|
||||
activity_uri = f"internal:{uuid.uuid4()}" if visibility == "internal" else f"urn:uuid:{uuid.uuid4()}"
|
||||
|
||||
activity = APActivity(
|
||||
activity_id=activity_uri,
|
||||
activity_type=activity_type,
|
||||
actor_profile_id=actor_profile_id,
|
||||
actor_uri=actor_uri,
|
||||
object_type=object_type,
|
||||
object_data=object_data or {},
|
||||
is_local=True,
|
||||
source_type=source_type,
|
||||
source_id=source_id,
|
||||
visibility=visibility,
|
||||
process_state="pending",
|
||||
)
|
||||
session.add(event)
|
||||
await session.flush() # assign event.id
|
||||
return event
|
||||
|
||||
|
||||
def register_handler(event_type: str, fn: HandlerFn) -> None:
|
||||
"""Register an async handler for a given event type."""
|
||||
_handlers[event_type].append(fn)
|
||||
|
||||
|
||||
def get_handlers(event_type: str) -> List[HandlerFn]:
|
||||
"""Return all registered handlers for an event type."""
|
||||
return _handlers.get(event_type, [])
|
||||
session.add(activity)
|
||||
await session.flush()
|
||||
return activity
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Shared event handlers (replaces glue.setup.register_glue_handlers)."""
|
||||
"""Shared event handlers."""
|
||||
|
||||
|
||||
def register_shared_handlers():
|
||||
@@ -6,5 +6,4 @@ def register_shared_handlers():
|
||||
import shared.events.handlers.container_handlers # noqa: F401
|
||||
import shared.events.handlers.login_handlers # noqa: F401
|
||||
import shared.events.handlers.order_handlers # noqa: F401
|
||||
# federation_handlers removed — publication is now inline at write sites
|
||||
import shared.events.handlers.ap_delivery_handler # noqa: F401
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Deliver AP activities to remote followers.
|
||||
|
||||
On ``federation.activity_created`` → load activity + actor + followers →
|
||||
sign with HTTP Signatures → POST to each follower inbox.
|
||||
Registered as a wildcard handler — fires for every activity. Skips
|
||||
non-public activities and those without an actor profile.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -11,7 +11,7 @@ import httpx
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events.bus import register_handler, DomainEvent
|
||||
from shared.events.bus import register_activity_handler
|
||||
from shared.models.federation import ActorProfile, APActivity, APFollower
|
||||
from shared.services.registry import services
|
||||
|
||||
@@ -33,12 +33,9 @@ def _build_activity_json(activity: APActivity, actor: ActorProfile, domain: str)
|
||||
object_id = activity.activity_id + "/object"
|
||||
|
||||
if activity.activity_type == "Delete":
|
||||
# Delete: object is a Tombstone with just id + type
|
||||
obj.setdefault("id", object_id)
|
||||
obj.setdefault("type", "Tombstone")
|
||||
else:
|
||||
# Create/Update: full object with attribution
|
||||
# Prefer stable id from object_data (set by try_publish), fall back to activity-derived
|
||||
obj.setdefault("id", object_id)
|
||||
obj.setdefault("type", activity.object_type)
|
||||
obj.setdefault("attributedTo", actor_url)
|
||||
@@ -105,30 +102,20 @@ async def _deliver_to_inbox(
|
||||
return False
|
||||
|
||||
|
||||
async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None:
|
||||
"""Deliver a newly created activity to all followers."""
|
||||
async def on_any_activity(activity: APActivity, session: AsyncSession) -> None:
|
||||
"""Deliver a public activity to all followers of its actor."""
|
||||
import os
|
||||
|
||||
# Only deliver public activities that have an actor profile
|
||||
if activity.visibility != "public":
|
||||
return
|
||||
if activity.actor_profile_id is None:
|
||||
return
|
||||
if not services.has("federation"):
|
||||
return
|
||||
|
||||
payload = event.payload
|
||||
activity_id_uri = payload.get("activity_id")
|
||||
if not activity_id_uri:
|
||||
return
|
||||
|
||||
domain = os.getenv("AP_DOMAIN", "rose-ash.com")
|
||||
|
||||
# Load the activity
|
||||
activity = (
|
||||
await session.execute(
|
||||
select(APActivity).where(APActivity.activity_id == activity_id_uri)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if not activity:
|
||||
log.warning("Activity not found: %s", activity_id_uri)
|
||||
return
|
||||
|
||||
# Load actor with private key
|
||||
actor = (
|
||||
await session.execute(
|
||||
@@ -136,7 +123,7 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if not actor or not actor.private_key_pem:
|
||||
log.warning("Actor not found or missing key for activity %s", activity_id_uri)
|
||||
log.warning("Actor not found or missing key for activity %s", activity.activity_id)
|
||||
return
|
||||
|
||||
# Load followers
|
||||
@@ -147,14 +134,13 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None
|
||||
).scalars().all()
|
||||
|
||||
if not followers:
|
||||
log.debug("No followers to deliver to for %s", activity_id_uri)
|
||||
log.debug("No followers to deliver to for %s", activity.activity_id)
|
||||
return
|
||||
|
||||
# Build activity JSON
|
||||
activity_json = _build_activity_json(activity, actor, domain)
|
||||
|
||||
# Deliver to each follower inbox
|
||||
# Deduplicate inboxes (multiple followers might share a shared inbox)
|
||||
# Deduplicate inboxes
|
||||
inboxes = {f.follower_inbox for f in followers if f.follower_inbox}
|
||||
|
||||
log.info(
|
||||
@@ -167,4 +153,5 @@ async def on_activity_created(event: DomainEvent, session: AsyncSession) -> None
|
||||
await _deliver_to_inbox(client, inbox_url, activity_json, actor, domain)
|
||||
|
||||
|
||||
register_handler("federation.activity_created", on_activity_created)
|
||||
# Wildcard: fires for every activity
|
||||
register_activity_handler("*", on_any_activity)
|
||||
|
||||
@@ -2,18 +2,18 @@ from __future__ import annotations
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events import register_handler
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from shared.events import register_activity_handler
|
||||
from shared.models.federation import APActivity
|
||||
from shared.services.navigation import rebuild_navigation
|
||||
|
||||
|
||||
async def on_child_attached(event: DomainEvent, session: AsyncSession) -> None:
|
||||
async def on_child_attached(activity: APActivity, session: AsyncSession) -> None:
|
||||
await rebuild_navigation(session)
|
||||
|
||||
|
||||
async def on_child_detached(event: DomainEvent, session: AsyncSession) -> None:
|
||||
async def on_child_detached(activity: APActivity, session: AsyncSession) -> None:
|
||||
await rebuild_navigation(session)
|
||||
|
||||
|
||||
register_handler("container.child_attached", on_child_attached)
|
||||
register_handler("container.child_detached", on_child_detached)
|
||||
register_activity_handler("Add", on_child_attached, object_type="rose:ContainerRelation")
|
||||
register_activity_handler("Remove", on_child_detached, object_type="rose:ContainerRelation")
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
"""Federation event handlers — REMOVED.
|
||||
|
||||
Federation publication is now inline at the write site (ghost_sync, entries,
|
||||
market routes) via shared.services.federation_publish.try_publish().
|
||||
|
||||
AP delivery (federation.activity_created → inbox POST) remains async via
|
||||
ap_delivery_handler.
|
||||
"""
|
||||
@@ -2,24 +2,22 @@ from __future__ import annotations
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events import register_handler
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from shared.events import register_activity_handler
|
||||
from shared.models.federation import APActivity
|
||||
from shared.services.registry import services
|
||||
|
||||
|
||||
async def on_user_logged_in(event: DomainEvent, session: AsyncSession) -> None:
|
||||
payload = event.payload
|
||||
user_id = payload["user_id"]
|
||||
session_id = payload["session_id"]
|
||||
async def on_user_logged_in(activity: APActivity, session: AsyncSession) -> None:
|
||||
data = activity.object_data
|
||||
user_id = data["user_id"]
|
||||
session_id = data["session_id"]
|
||||
|
||||
# Adopt cart items (if cart service is registered)
|
||||
if services.has("cart"):
|
||||
await services.cart.adopt_cart_for_user(session, user_id, session_id)
|
||||
|
||||
# Adopt calendar entries and tickets (if calendar service is registered)
|
||||
if services.has("calendar"):
|
||||
await services.calendar.adopt_entries_for_user(session, user_id, session_id)
|
||||
await services.calendar.adopt_tickets_for_user(session, user_id, session_id)
|
||||
|
||||
|
||||
register_handler("user.logged_in", on_user_logged_in)
|
||||
register_activity_handler("rose:Login", on_user_logged_in)
|
||||
|
||||
@@ -4,19 +4,19 @@ import logging
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events import register_handler
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from shared.events import register_activity_handler
|
||||
from shared.models.federation import APActivity
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def on_order_created(event: DomainEvent, session: AsyncSession) -> None:
|
||||
log.info("order.created: order_id=%s", event.payload.get("order_id"))
|
||||
async def on_order_created(activity: APActivity, session: AsyncSession) -> None:
|
||||
log.info("order.created: order_id=%s", activity.object_data.get("order_id"))
|
||||
|
||||
|
||||
async def on_order_paid(event: DomainEvent, session: AsyncSession) -> None:
|
||||
log.info("order.paid: order_id=%s", event.payload.get("order_id"))
|
||||
async def on_order_paid(activity: APActivity, session: AsyncSession) -> None:
|
||||
log.info("order.paid: order_id=%s", activity.object_data.get("order_id"))
|
||||
|
||||
|
||||
register_handler("order.created", on_order_created)
|
||||
register_handler("order.paid", on_order_paid)
|
||||
register_activity_handler("Create", on_order_created, object_type="rose:Order")
|
||||
register_activity_handler("rose:OrderPaid", on_order_paid)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
Event processor — polls the domain_events outbox table and dispatches
|
||||
to registered handlers.
|
||||
Event processor — polls the ap_activities table and dispatches to registered
|
||||
activity handlers.
|
||||
|
||||
Runs as an asyncio background task within each app process.
|
||||
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing.
|
||||
@@ -11,16 +11,16 @@ import asyncio
|
||||
import traceback
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.db.session import get_session
|
||||
from shared.models.domain_event import DomainEvent
|
||||
from .bus import get_handlers
|
||||
from shared.models.federation import APActivity
|
||||
from .bus import get_activity_handlers
|
||||
|
||||
|
||||
class EventProcessor:
|
||||
"""Background event processor that polls the outbox table."""
|
||||
"""Background event processor that polls the ap_activities table."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -64,54 +64,52 @@ class EventProcessor:
|
||||
await asyncio.sleep(self._poll_interval)
|
||||
|
||||
async def _process_batch(self) -> int:
|
||||
"""Fetch and process a batch of pending events. Returns count processed."""
|
||||
"""Fetch and process a batch of pending activities. Returns count processed."""
|
||||
processed = 0
|
||||
async with get_session() as session:
|
||||
# FOR UPDATE SKIP LOCKED: safe for concurrent processors
|
||||
stmt = (
|
||||
select(DomainEvent)
|
||||
select(APActivity)
|
||||
.where(
|
||||
DomainEvent.state == "pending",
|
||||
DomainEvent.attempts < DomainEvent.max_attempts,
|
||||
APActivity.process_state == "pending",
|
||||
APActivity.process_attempts < APActivity.process_max_attempts,
|
||||
)
|
||||
.order_by(DomainEvent.created_at)
|
||||
.order_by(APActivity.created_at)
|
||||
.limit(self._batch_size)
|
||||
.with_for_update(skip_locked=True)
|
||||
)
|
||||
result = await session.execute(stmt)
|
||||
events = result.scalars().all()
|
||||
activities = result.scalars().all()
|
||||
|
||||
for event in events:
|
||||
await self._process_one(session, event)
|
||||
for activity in activities:
|
||||
await self._process_one(session, activity)
|
||||
processed += 1
|
||||
|
||||
await session.commit()
|
||||
return processed
|
||||
|
||||
async def _process_one(self, session: AsyncSession, event: DomainEvent) -> None:
|
||||
"""Run all handlers for a single event."""
|
||||
handlers = get_handlers(event.event_type)
|
||||
async def _process_one(self, session: AsyncSession, activity: APActivity) -> None:
|
||||
"""Run all handlers for a single activity."""
|
||||
handlers = get_activity_handlers(activity.activity_type, activity.object_type)
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
event.state = "processing"
|
||||
event.attempts += 1
|
||||
activity.process_state = "processing"
|
||||
activity.process_attempts += 1
|
||||
await session.flush()
|
||||
|
||||
if not handlers:
|
||||
# No handlers registered — mark completed (nothing to do)
|
||||
event.state = "completed"
|
||||
event.processed_at = now
|
||||
activity.process_state = "completed"
|
||||
activity.processed_at = now
|
||||
return
|
||||
|
||||
try:
|
||||
for handler in handlers:
|
||||
await handler(event, session)
|
||||
event.state = "completed"
|
||||
event.processed_at = now
|
||||
await handler(activity, session)
|
||||
activity.process_state = "completed"
|
||||
activity.processed_at = now
|
||||
except Exception as exc:
|
||||
event.last_error = f"{exc.__class__.__name__}: {exc}"
|
||||
if event.attempts >= event.max_attempts:
|
||||
event.state = "failed"
|
||||
event.processed_at = now
|
||||
activity.process_error = f"{exc.__class__.__name__}: {exc}"
|
||||
if activity.process_attempts >= activity.process_max_attempts:
|
||||
activity.process_state = "failed"
|
||||
activity.processed_at = now
|
||||
else:
|
||||
event.state = "pending" # retry
|
||||
activity.process_state = "pending" # retry
|
||||
|
||||
@@ -67,8 +67,16 @@ def market_product_url(product_slug: str, suffix: str = "", market_place=None) -
|
||||
|
||||
def login_url(next_url: str = "") -> str:
|
||||
# Auth lives in blog (coop) for now. Set AUTH_APP=federation to switch.
|
||||
from quart import session as qsession
|
||||
auth_app = os.getenv("AUTH_APP", "coop")
|
||||
base = app_url(auth_app, "/auth/login/")
|
||||
params: list[str] = []
|
||||
if next_url:
|
||||
return f"{base}?next={quote(next_url, safe='')}"
|
||||
params.append(f"next={quote(next_url, safe='')}")
|
||||
# Pass anonymous cart session so the auth app can adopt it on login
|
||||
cart_sid = qsession.get("cart_sid")
|
||||
if cart_sid:
|
||||
params.append(f"cart_sid={quote(cart_sid, safe='')}")
|
||||
if params:
|
||||
return f"{base}?{'&'.join(params)}"
|
||||
return base
|
||||
|
||||
@@ -8,8 +8,6 @@ from .ghost_membership_entities import (
|
||||
GhostNewsletter, UserNewsletter,
|
||||
GhostTier, GhostSubscription,
|
||||
)
|
||||
from .domain_event import DomainEvent
|
||||
|
||||
from .ghost_content import Tag, Post, Author, PostAuthor, PostTag, PostLike
|
||||
from .page_config import PageConfig
|
||||
from .order import Order, OrderItem
|
||||
|
||||
@@ -50,14 +50,19 @@ class ActorProfile(Base):
|
||||
|
||||
|
||||
class APActivity(Base):
|
||||
"""An ActivityPub activity (local or remote)."""
|
||||
"""An ActivityPub activity (local or remote).
|
||||
|
||||
Also serves as the unified event bus: internal domain events and public
|
||||
federation activities both live here, distinguished by ``visibility``.
|
||||
The ``EventProcessor`` polls rows with ``process_state='pending'``.
|
||||
"""
|
||||
__tablename__ = "ap_activities"
|
||||
|
||||
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
|
||||
activity_id: Mapped[str] = mapped_column(String(512), unique=True, nullable=False)
|
||||
activity_type: Mapped[str] = mapped_column(String(64), nullable=False)
|
||||
actor_profile_id: Mapped[int] = mapped_column(
|
||||
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=False,
|
||||
actor_profile_id: Mapped[int | None] = mapped_column(
|
||||
Integer, ForeignKey("ap_actor_profiles.id", ondelete="CASCADE"), nullable=True,
|
||||
)
|
||||
object_type: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
||||
object_data: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
|
||||
@@ -83,6 +88,27 @@ class APActivity(Base):
|
||||
DateTime(timezone=True), nullable=False, server_default=func.now(),
|
||||
)
|
||||
|
||||
# --- Unified event-bus columns ---
|
||||
actor_uri: Mapped[str | None] = mapped_column(
|
||||
String(512), nullable=True,
|
||||
)
|
||||
visibility: Mapped[str] = mapped_column(
|
||||
String(20), nullable=False, default="public", server_default="public",
|
||||
)
|
||||
process_state: Mapped[str] = mapped_column(
|
||||
String(20), nullable=False, default="completed", server_default="completed",
|
||||
)
|
||||
process_attempts: Mapped[int] = mapped_column(
|
||||
Integer, nullable=False, default=0, server_default="0",
|
||||
)
|
||||
process_max_attempts: Mapped[int] = mapped_column(
|
||||
Integer, nullable=False, default=5, server_default="5",
|
||||
)
|
||||
process_error: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
processed_at: Mapped[datetime | None] = mapped_column(
|
||||
DateTime(timezone=True), nullable=True,
|
||||
)
|
||||
|
||||
# Relationships
|
||||
actor_profile = relationship("ActorProfile", back_populates="activities")
|
||||
|
||||
@@ -90,6 +116,7 @@ class APActivity(Base):
|
||||
Index("ix_ap_activity_actor", "actor_profile_id"),
|
||||
Index("ix_ap_activity_source", "source_type", "source_id"),
|
||||
Index("ix_ap_activity_published", "published"),
|
||||
Index("ix_ap_activity_process", "process_state"),
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
|
||||
@@ -6,7 +6,7 @@ from shared.db.base import Base
|
||||
|
||||
|
||||
class MenuItem(Base):
|
||||
"""Deprecated — kept so the table isn't dropped. Use glue.models.MenuNode."""
|
||||
"""Deprecated — kept so the table isn't dropped. Use shared.models.menu_node.MenuNode."""
|
||||
__tablename__ = "menu_items"
|
||||
|
||||
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
|
||||
|
||||
@@ -371,7 +371,7 @@ class SqlCalendarService:
|
||||
entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry))
|
||||
return entries_by_post
|
||||
|
||||
# -- writes (absorb glue lifecycle) ---------------------------------------
|
||||
# -- writes ---------------------------------------------------------------
|
||||
|
||||
async def adopt_entries_for_user(
|
||||
self, session: AsyncSession, user_id: int, session_id: str,
|
||||
|
||||
@@ -183,16 +183,21 @@ class SqlFederationService:
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
actor_url = f"https://{domain}/users/{username}"
|
||||
|
||||
activity = APActivity(
|
||||
activity_id=activity_uri,
|
||||
activity_type=activity_type,
|
||||
actor_profile_id=actor.id,
|
||||
actor_uri=actor_url,
|
||||
object_type=object_type,
|
||||
object_data=object_data,
|
||||
published=now,
|
||||
is_local=True,
|
||||
source_type=source_type,
|
||||
source_id=source_id,
|
||||
visibility="public",
|
||||
process_state="pending",
|
||||
)
|
||||
session.add(activity)
|
||||
await session.flush()
|
||||
@@ -208,7 +213,7 @@ class SqlFederationService:
|
||||
],
|
||||
"id": activity_uri,
|
||||
"type": activity_type,
|
||||
"actor": f"https://{domain}/users/{username}",
|
||||
"actor": actor_url,
|
||||
"published": now.isoformat(),
|
||||
"object": {
|
||||
"type": object_type,
|
||||
@@ -221,21 +226,6 @@ class SqlFederationService:
|
||||
except Exception:
|
||||
pass # IPFS failure is non-fatal
|
||||
|
||||
# Emit domain event for downstream processing (delivery)
|
||||
from shared.events import emit_event
|
||||
await emit_event(
|
||||
session,
|
||||
"federation.activity_created",
|
||||
"APActivity",
|
||||
activity.id,
|
||||
{
|
||||
"activity_id": activity.activity_id,
|
||||
"activity_type": activity_type,
|
||||
"actor_username": username,
|
||||
"object_type": object_type,
|
||||
},
|
||||
)
|
||||
|
||||
return _activity_to_dto(activity)
|
||||
|
||||
# -- Queries --------------------------------------------------------------
|
||||
@@ -376,6 +366,65 @@ class SqlFederationService:
|
||||
)
|
||||
return result.rowcount > 0
|
||||
|
||||
async def get_followers_paginated(
|
||||
self, session: AsyncSession, username: str,
|
||||
page: int = 1, per_page: int = 20,
|
||||
) -> tuple[list[RemoteActorDTO], int]:
|
||||
actor = (
|
||||
await session.execute(
|
||||
select(ActorProfile).where(ActorProfile.preferred_username == username)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if actor is None:
|
||||
return [], 0
|
||||
|
||||
total = (
|
||||
await session.execute(
|
||||
select(func.count(APFollower.id)).where(
|
||||
APFollower.actor_profile_id == actor.id,
|
||||
)
|
||||
)
|
||||
).scalar() or 0
|
||||
|
||||
offset = (page - 1) * per_page
|
||||
followers = (
|
||||
await session.execute(
|
||||
select(APFollower)
|
||||
.where(APFollower.actor_profile_id == actor.id)
|
||||
.order_by(APFollower.created_at.desc())
|
||||
.limit(per_page)
|
||||
.offset(offset)
|
||||
)
|
||||
).scalars().all()
|
||||
|
||||
results: list[RemoteActorDTO] = []
|
||||
for f in followers:
|
||||
# Try to resolve from cached remote actors first
|
||||
remote = (
|
||||
await session.execute(
|
||||
select(RemoteActor).where(
|
||||
RemoteActor.actor_url == f.follower_actor_url,
|
||||
)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if remote:
|
||||
results.append(_remote_actor_to_dto(remote))
|
||||
else:
|
||||
# Synthesise a minimal DTO from follower data
|
||||
from urllib.parse import urlparse
|
||||
domain = urlparse(f.follower_actor_url).netloc
|
||||
results.append(RemoteActorDTO(
|
||||
id=0,
|
||||
actor_url=f.follower_actor_url,
|
||||
inbox_url=f.follower_inbox,
|
||||
preferred_username=f.follower_acct.split("@")[0] if "@" in f.follower_acct else f.follower_acct,
|
||||
domain=domain,
|
||||
display_name=None,
|
||||
summary=None,
|
||||
icon_url=None,
|
||||
))
|
||||
return results, total
|
||||
|
||||
# -- Remote actors --------------------------------------------------------
|
||||
|
||||
async def get_or_fetch_remote_actor(
|
||||
@@ -966,6 +1015,46 @@ class SqlFederationService:
|
||||
))
|
||||
return items
|
||||
|
||||
async def get_actor_timeline(
|
||||
self, session: AsyncSession, remote_actor_id: int,
|
||||
before: datetime | None = None, limit: int = 20,
|
||||
) -> list[TimelineItemDTO]:
|
||||
remote_actor = (
|
||||
await session.execute(
|
||||
select(RemoteActor).where(RemoteActor.id == remote_actor_id)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if not remote_actor:
|
||||
return []
|
||||
|
||||
q = (
|
||||
select(APRemotePost)
|
||||
.where(APRemotePost.remote_actor_id == remote_actor_id)
|
||||
)
|
||||
if before:
|
||||
q = q.where(APRemotePost.published < before)
|
||||
q = q.order_by(APRemotePost.published.desc()).limit(limit)
|
||||
|
||||
posts = (await session.execute(q)).scalars().all()
|
||||
return [
|
||||
TimelineItemDTO(
|
||||
id=f"remote:{p.id}",
|
||||
post_type="remote",
|
||||
content=p.content or "",
|
||||
published=p.published,
|
||||
actor_name=remote_actor.display_name or remote_actor.preferred_username,
|
||||
actor_username=remote_actor.preferred_username,
|
||||
object_id=p.object_id,
|
||||
summary=p.summary,
|
||||
url=p.url,
|
||||
actor_domain=remote_actor.domain,
|
||||
actor_icon=remote_actor.icon_url,
|
||||
actor_url=remote_actor.actor_url,
|
||||
author_inbox=remote_actor.inbox_url,
|
||||
)
|
||||
for p in posts
|
||||
]
|
||||
|
||||
# -- Local posts ----------------------------------------------------------
|
||||
|
||||
async def create_local_post(
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
"""Inline federation publication — called at write time, not via async handler.
|
||||
|
||||
Replaces the old pattern where emit_event("post.published") → async handler →
|
||||
publish_activity(). Now the originating service calls try_publish() directly,
|
||||
which creates the APActivity in the same DB transaction. AP delivery
|
||||
(federation.activity_created → inbox POST) stays async.
|
||||
The originating service calls try_publish() directly, which creates the
|
||||
APActivity (with process_state='pending') in the same DB transaction.
|
||||
The EventProcessor picks it up and the delivery wildcard handler POSTs
|
||||
to follower inboxes.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
from sqlalchemy import select, func
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from shared.events import emit_event
|
||||
from shared.events import emit_activity
|
||||
from shared.models.container_relation import ContainerRelation
|
||||
|
||||
|
||||
@@ -40,17 +40,19 @@ async def attach_child(
|
||||
if label is not None:
|
||||
existing.label = label
|
||||
await session.flush()
|
||||
await emit_event(
|
||||
await emit_activity(
|
||||
session,
|
||||
event_type="container.child_attached",
|
||||
aggregate_type="container_relation",
|
||||
aggregate_id=existing.id,
|
||||
payload={
|
||||
activity_type="Add",
|
||||
actor_uri="internal:system",
|
||||
object_type="rose:ContainerRelation",
|
||||
object_data={
|
||||
"parent_type": parent_type,
|
||||
"parent_id": parent_id,
|
||||
"child_type": child_type,
|
||||
"child_id": child_id,
|
||||
},
|
||||
source_type="container_relation",
|
||||
source_id=existing.id,
|
||||
)
|
||||
return existing
|
||||
# Already attached and active — no-op
|
||||
@@ -77,17 +79,19 @@ async def attach_child(
|
||||
session.add(rel)
|
||||
await session.flush()
|
||||
|
||||
await emit_event(
|
||||
await emit_activity(
|
||||
session,
|
||||
event_type="container.child_attached",
|
||||
aggregate_type="container_relation",
|
||||
aggregate_id=rel.id,
|
||||
payload={
|
||||
activity_type="Add",
|
||||
actor_uri="internal:system",
|
||||
object_type="rose:ContainerRelation",
|
||||
object_data={
|
||||
"parent_type": parent_type,
|
||||
"parent_id": parent_id,
|
||||
"child_type": child_type,
|
||||
"child_id": child_id,
|
||||
},
|
||||
source_type="container_relation",
|
||||
source_id=rel.id,
|
||||
)
|
||||
|
||||
return rel
|
||||
@@ -139,17 +143,19 @@ async def detach_child(
|
||||
rel.deleted_at = func.now()
|
||||
await session.flush()
|
||||
|
||||
await emit_event(
|
||||
await emit_activity(
|
||||
session,
|
||||
event_type="container.child_detached",
|
||||
aggregate_type="container_relation",
|
||||
aggregate_id=rel.id,
|
||||
payload={
|
||||
activity_type="Remove",
|
||||
actor_uri="internal:system",
|
||||
object_type="rose:ContainerRelation",
|
||||
object_data={
|
||||
"parent_type": parent_type,
|
||||
"parent_id": parent_id,
|
||||
"child_type": child_type,
|
||||
"child_id": child_id,
|
||||
},
|
||||
source_type="container_relation",
|
||||
source_id=rel.id,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
@@ -239,6 +239,9 @@ class StubFederationService:
|
||||
async def get_following(self, session, username, page=1, per_page=20):
|
||||
return [], 0
|
||||
|
||||
async def get_followers_paginated(self, session, username, page=1, per_page=20):
|
||||
return [], 0
|
||||
|
||||
async def accept_follow_response(self, session, local_username, remote_actor_url):
|
||||
pass
|
||||
|
||||
@@ -260,6 +263,9 @@ class StubFederationService:
|
||||
async def get_public_timeline(self, session, before=None, limit=20):
|
||||
return []
|
||||
|
||||
async def get_actor_timeline(self, session, remote_actor_id, before=None, limit=20):
|
||||
return []
|
||||
|
||||
async def create_local_post(self, session, actor_profile_id, content, visibility="public", in_reply_to=None):
|
||||
raise RuntimeError("FederationService not available")
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ async def upgrade_ots_proof(proof_bytes: bytes) -> tuple[bytes, bool]:
|
||||
"""
|
||||
# OpenTimestamps upgrade is done via the `ots` CLI or their calendar API.
|
||||
# For now, return the proof as-is with is_confirmed=False.
|
||||
# TODO: Implement calendar-based upgrade polling.
|
||||
# Calendar-based upgrade polling not yet implemented.
|
||||
return proof_bytes, False
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user