"""add unified event bus columns to ap_activities Revision ID: m3k1h7i9j0 Revises: l2j0g6h8i9 Create Date: 2026-02-22 Adds processing and visibility columns so ap_activities can serve as the unified event bus for both internal domain events and federation delivery. """ revision = "m3k1h7i9j0" down_revision = "l2j0g6h8i9" branch_labels = None depends_on = None from alembic import op import sqlalchemy as sa def upgrade() -> None: # Add new columns with defaults so existing rows stay valid op.add_column( "ap_activities", sa.Column("actor_uri", sa.String(512), nullable=True), ) op.add_column( "ap_activities", sa.Column( "visibility", sa.String(20), nullable=False, server_default="public", ), ) op.add_column( "ap_activities", sa.Column( "process_state", sa.String(20), nullable=False, server_default="completed", ), ) op.add_column( "ap_activities", sa.Column( "process_attempts", sa.Integer(), nullable=False, server_default="0", ), ) op.add_column( "ap_activities", sa.Column( "process_max_attempts", sa.Integer(), nullable=False, server_default="5", ), ) op.add_column( "ap_activities", sa.Column("process_error", sa.Text(), nullable=True), ) op.add_column( "ap_activities", sa.Column( "processed_at", sa.DateTime(timezone=True), nullable=True, ), ) # Backfill actor_uri from the related actor_profile op.execute( """ UPDATE ap_activities a SET actor_uri = CONCAT( 'https://', COALESCE(current_setting('app.ap_domain', true), 'rose-ash.com'), '/users/', p.preferred_username ) FROM ap_actor_profiles p WHERE a.actor_profile_id = p.id AND a.actor_uri IS NULL """ ) # Make actor_profile_id nullable (internal events have no actor profile) op.alter_column( "ap_activities", "actor_profile_id", existing_type=sa.Integer(), nullable=True, ) # Index for processor polling op.create_index( "ix_ap_activity_process", "ap_activities", ["process_state"], ) def downgrade() -> None: op.drop_index("ix_ap_activity_process", table_name="ap_activities") # Restore actor_profile_id NOT NULL (remove any rows without it first) op.execute( "DELETE FROM ap_activities WHERE actor_profile_id IS NULL" ) op.alter_column( "ap_activities", "actor_profile_id", existing_type=sa.Integer(), nullable=False, ) op.drop_column("ap_activities", "processed_at") op.drop_column("ap_activities", "process_error") op.drop_column("ap_activities", "process_max_attempts") op.drop_column("ap_activities", "process_attempts") op.drop_column("ap_activities", "process_state") op.drop_column("ap_activities", "visibility") op.drop_column("ap_activities", "actor_uri")