Files
rose-ash/shared/services/relationships.py
giles a0a0f5ebc2 Implement flexible entity relation system (Phases A–E)
Declarative relation registry via defrelation s-expressions with
cardinality enforcement (one-to-one, one-to-many, many-to-many),
registry-aware relate/unrelate/can-relate API endpoints, generic
container-nav fragment, and relation-driven UI components.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 08:35:17 +00:00

214 lines
7.0 KiB
Python

from __future__ import annotations
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from shared.events import emit_activity
from shared.models.container_relation import ContainerRelation
async def attach_child(
session: AsyncSession,
parent_type: str,
parent_id: int,
child_type: str,
child_id: int,
label: str | None = None,
sort_order: int | None = None,
relation_type: str | None = None,
metadata: dict | None = None,
) -> ContainerRelation:
"""
Create a ContainerRelation and emit container.child_attached event.
Upsert behaviour: if a relation already exists (including soft-deleted),
revive it instead of inserting a duplicate.
"""
# Check for existing (including soft-deleted)
existing = await session.scalar(
select(ContainerRelation).where(
ContainerRelation.parent_type == parent_type,
ContainerRelation.parent_id == parent_id,
ContainerRelation.child_type == child_type,
ContainerRelation.child_id == child_id,
)
)
if existing:
if existing.deleted_at is not None:
# Revive soft-deleted relation
existing.deleted_at = None
if sort_order is not None:
existing.sort_order = sort_order
if label is not None:
existing.label = label
if relation_type is not None:
existing.relation_type = relation_type
if metadata is not None:
existing.metadata_ = metadata
await session.flush()
await emit_activity(
session,
activity_type="Add",
actor_uri="internal:system",
object_type="rose:ContainerRelation",
object_data={
"parent_type": parent_type,
"parent_id": parent_id,
"child_type": child_type,
"child_id": child_id,
**({"relation_type": relation_type} if relation_type else {}),
},
source_type="container_relation",
source_id=existing.id,
)
return existing
# Already attached and active — update mutable fields if provided
changed = False
if relation_type is not None and existing.relation_type != relation_type:
existing.relation_type = relation_type
changed = True
if metadata is not None and existing.metadata_ != metadata:
existing.metadata_ = metadata
changed = True
if label is not None and existing.label != label:
existing.label = label
changed = True
if changed:
await session.flush()
return existing
if sort_order is None:
max_order = await session.scalar(
select(func.max(ContainerRelation.sort_order)).where(
ContainerRelation.parent_type == parent_type,
ContainerRelation.parent_id == parent_id,
ContainerRelation.deleted_at.is_(None),
)
)
sort_order = (max_order or 0) + 1
rel = ContainerRelation(
parent_type=parent_type,
parent_id=parent_id,
child_type=child_type,
child_id=child_id,
label=label,
sort_order=sort_order,
relation_type=relation_type,
metadata_=metadata,
)
session.add(rel)
await session.flush()
await emit_activity(
session,
activity_type="Add",
actor_uri="internal:system",
object_type="rose:ContainerRelation",
object_data={
"parent_type": parent_type,
"parent_id": parent_id,
"child_type": child_type,
"child_id": child_id,
**({"relation_type": relation_type} if relation_type else {}),
},
source_type="container_relation",
source_id=rel.id,
)
return rel
async def get_children(
session: AsyncSession,
parent_type: str,
parent_id: int,
child_type: str | None = None,
relation_type: str | None = None,
) -> list[ContainerRelation]:
"""Query children of a container, optionally filtered by child_type or relation_type."""
stmt = select(ContainerRelation).where(
ContainerRelation.parent_type == parent_type,
ContainerRelation.parent_id == parent_id,
ContainerRelation.deleted_at.is_(None),
)
if child_type is not None:
stmt = stmt.where(ContainerRelation.child_type == child_type)
if relation_type is not None:
stmt = stmt.where(ContainerRelation.relation_type == relation_type)
stmt = stmt.order_by(
ContainerRelation.sort_order.asc(), ContainerRelation.id.asc()
)
result = await session.execute(stmt)
return list(result.scalars().all())
async def get_parents(
session: AsyncSession,
child_type: str,
child_id: int,
parent_type: str | None = None,
relation_type: str | None = None,
) -> list[ContainerRelation]:
"""Query parents of an entity, optionally filtered by parent_type or relation_type."""
stmt = select(ContainerRelation).where(
ContainerRelation.child_type == child_type,
ContainerRelation.child_id == child_id,
ContainerRelation.deleted_at.is_(None),
)
if parent_type is not None:
stmt = stmt.where(ContainerRelation.parent_type == parent_type)
if relation_type is not None:
stmt = stmt.where(ContainerRelation.relation_type == relation_type)
stmt = stmt.order_by(ContainerRelation.id.asc())
result = await session.execute(stmt)
return list(result.scalars().all())
async def detach_child(
session: AsyncSession,
parent_type: str,
parent_id: int,
child_type: str,
child_id: int,
relation_type: str | None = None,
) -> bool:
"""Soft-delete a ContainerRelation and emit container.child_detached event."""
stmt = select(ContainerRelation).where(
ContainerRelation.parent_type == parent_type,
ContainerRelation.parent_id == parent_id,
ContainerRelation.child_type == child_type,
ContainerRelation.child_id == child_id,
ContainerRelation.deleted_at.is_(None),
)
if relation_type is not None:
stmt = stmt.where(ContainerRelation.relation_type == relation_type)
result = await session.execute(stmt)
rel = result.scalar_one_or_none()
if not rel:
return False
rel.deleted_at = func.now()
await session.flush()
await emit_activity(
session,
activity_type="Remove",
actor_uri="internal:system",
object_type="rose:ContainerRelation",
object_data={
"parent_type": parent_type,
"parent_id": parent_id,
"child_type": child_type,
"child_id": child_id,
**({"relation_type": rel.relation_type} if rel.relation_type else {}),
},
source_type="container_relation",
source_id=rel.id,
)
return True