from __future__ import annotations from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from shared.events import emit_activity from shared.models.container_relation import ContainerRelation async def attach_child( session: AsyncSession, parent_type: str, parent_id: int, child_type: str, child_id: int, label: str | None = None, sort_order: int | None = None, relation_type: str | None = None, metadata: dict | None = None, ) -> ContainerRelation: """ Create a ContainerRelation and emit container.child_attached event. Upsert behaviour: if a relation already exists (including soft-deleted), revive it instead of inserting a duplicate. """ # Check for existing (including soft-deleted) existing = await session.scalar( select(ContainerRelation).where( ContainerRelation.parent_type == parent_type, ContainerRelation.parent_id == parent_id, ContainerRelation.child_type == child_type, ContainerRelation.child_id == child_id, ) ) if existing: if existing.deleted_at is not None: # Revive soft-deleted relation existing.deleted_at = None if sort_order is not None: existing.sort_order = sort_order if label is not None: existing.label = label if relation_type is not None: existing.relation_type = relation_type if metadata is not None: existing.metadata_ = metadata await session.flush() await emit_activity( session, activity_type="Add", actor_uri="internal:system", object_type="rose:ContainerRelation", object_data={ "parent_type": parent_type, "parent_id": parent_id, "child_type": child_type, "child_id": child_id, **({"relation_type": relation_type} if relation_type else {}), }, source_type="container_relation", source_id=existing.id, ) return existing # Already attached and active — update mutable fields if provided changed = False if relation_type is not None and existing.relation_type != relation_type: existing.relation_type = relation_type changed = True if metadata is not None and existing.metadata_ != metadata: existing.metadata_ = metadata changed = True if label is not None and existing.label != label: existing.label = label changed = True if changed: await session.flush() return existing if sort_order is None: max_order = await session.scalar( select(func.max(ContainerRelation.sort_order)).where( ContainerRelation.parent_type == parent_type, ContainerRelation.parent_id == parent_id, ContainerRelation.deleted_at.is_(None), ) ) sort_order = (max_order or 0) + 1 rel = ContainerRelation( parent_type=parent_type, parent_id=parent_id, child_type=child_type, child_id=child_id, label=label, sort_order=sort_order, relation_type=relation_type, metadata_=metadata, ) session.add(rel) await session.flush() await emit_activity( session, activity_type="Add", actor_uri="internal:system", object_type="rose:ContainerRelation", object_data={ "parent_type": parent_type, "parent_id": parent_id, "child_type": child_type, "child_id": child_id, **({"relation_type": relation_type} if relation_type else {}), }, source_type="container_relation", source_id=rel.id, ) return rel async def get_children( session: AsyncSession, parent_type: str, parent_id: int, child_type: str | None = None, relation_type: str | None = None, ) -> list[ContainerRelation]: """Query children of a container, optionally filtered by child_type or relation_type.""" stmt = select(ContainerRelation).where( ContainerRelation.parent_type == parent_type, ContainerRelation.parent_id == parent_id, ContainerRelation.deleted_at.is_(None), ) if child_type is not None: stmt = stmt.where(ContainerRelation.child_type == child_type) if relation_type is not None: stmt = stmt.where(ContainerRelation.relation_type == relation_type) stmt = stmt.order_by( ContainerRelation.sort_order.asc(), ContainerRelation.id.asc() ) result = await session.execute(stmt) return list(result.scalars().all()) async def get_parents( session: AsyncSession, child_type: str, child_id: int, parent_type: str | None = None, relation_type: str | None = None, ) -> list[ContainerRelation]: """Query parents of an entity, optionally filtered by parent_type or relation_type.""" stmt = select(ContainerRelation).where( ContainerRelation.child_type == child_type, ContainerRelation.child_id == child_id, ContainerRelation.deleted_at.is_(None), ) if parent_type is not None: stmt = stmt.where(ContainerRelation.parent_type == parent_type) if relation_type is not None: stmt = stmt.where(ContainerRelation.relation_type == relation_type) stmt = stmt.order_by(ContainerRelation.id.asc()) result = await session.execute(stmt) return list(result.scalars().all()) async def detach_child( session: AsyncSession, parent_type: str, parent_id: int, child_type: str, child_id: int, relation_type: str | None = None, ) -> bool: """Soft-delete a ContainerRelation and emit container.child_detached event.""" stmt = select(ContainerRelation).where( ContainerRelation.parent_type == parent_type, ContainerRelation.parent_id == parent_id, ContainerRelation.child_type == child_type, ContainerRelation.child_id == child_id, ContainerRelation.deleted_at.is_(None), ) if relation_type is not None: stmt = stmt.where(ContainerRelation.relation_type == relation_type) result = await session.execute(stmt) rel = result.scalar_one_or_none() if not rel: return False rel.deleted_at = func.now() await session.flush() await emit_activity( session, activity_type="Remove", actor_uri="internal:system", object_type="rose:ContainerRelation", object_data={ "parent_type": parent_type, "parent_id": parent_id, "child_type": child_type, "child_id": child_id, **({"relation_type": rel.relation_type} if rel.relation_type else {}), }, source_type="container_relation", source_id=rel.id, ) return True