from __future__ import annotations from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from shared.events import emit_event from shared.models.container_relation import ContainerRelation async def attach_child( session: AsyncSession, parent_type: str, parent_id: int, child_type: str, child_id: int, label: str | None = None, sort_order: int | None = None, ) -> ContainerRelation: """ Create a ContainerRelation and emit container.child_attached event. Upsert behaviour: if a relation already exists (including soft-deleted), revive it instead of inserting a duplicate. """ # Check for existing (including soft-deleted) existing = await session.scalar( select(ContainerRelation).where( ContainerRelation.parent_type == parent_type, ContainerRelation.parent_id == parent_id, ContainerRelation.child_type == child_type, ContainerRelation.child_id == child_id, ) ) if existing: if existing.deleted_at is not None: # Revive soft-deleted relation existing.deleted_at = None if sort_order is not None: existing.sort_order = sort_order if label is not None: existing.label = label await session.flush() await emit_event( session, event_type="container.child_attached", aggregate_type="container_relation", aggregate_id=existing.id, payload={ "parent_type": parent_type, "parent_id": parent_id, "child_type": child_type, "child_id": child_id, }, ) return existing # Already attached and active — no-op return existing if sort_order is None: max_order = await session.scalar( select(func.max(ContainerRelation.sort_order)).where( ContainerRelation.parent_type == parent_type, ContainerRelation.parent_id == parent_id, ContainerRelation.deleted_at.is_(None), ) ) sort_order = (max_order or 0) + 1 rel = ContainerRelation( parent_type=parent_type, parent_id=parent_id, child_type=child_type, child_id=child_id, label=label, sort_order=sort_order, ) session.add(rel) await session.flush() await emit_event( session, event_type="container.child_attached", aggregate_type="container_relation", aggregate_id=rel.id, payload={ "parent_type": parent_type, "parent_id": parent_id, "child_type": child_type, "child_id": child_id, }, ) return rel async def get_children( session: AsyncSession, parent_type: str, parent_id: int, child_type: str | None = None, ) -> list[ContainerRelation]: """Query children of a container, optionally filtered by child_type.""" stmt = select(ContainerRelation).where( ContainerRelation.parent_type == parent_type, ContainerRelation.parent_id == parent_id, ContainerRelation.deleted_at.is_(None), ) if child_type is not None: stmt = stmt.where(ContainerRelation.child_type == child_type) stmt = stmt.order_by( ContainerRelation.sort_order.asc(), ContainerRelation.id.asc() ) result = await session.execute(stmt) return list(result.scalars().all()) async def detach_child( session: AsyncSession, parent_type: str, parent_id: int, child_type: str, child_id: int, ) -> bool: """Soft-delete a ContainerRelation and emit container.child_detached event.""" result = await session.execute( select(ContainerRelation).where( ContainerRelation.parent_type == parent_type, ContainerRelation.parent_id == parent_id, ContainerRelation.child_type == child_type, ContainerRelation.child_id == child_id, ContainerRelation.deleted_at.is_(None), ) ) rel = result.scalar_one_or_none() if not rel: return False rel.deleted_at = func.now() await session.flush() await emit_event( session, event_type="container.child_detached", aggregate_type="container_relation", aggregate_id=rel.id, payload={ "parent_type": parent_type, "parent_id": parent_id, "child_type": child_type, "child_id": child_id, }, ) return True