Implement flexible entity relation system (Phases A–E)

Declarative relation registry via defrelation s-expressions with
cardinality enforcement (one-to-one, one-to-many, many-to-many),
registry-aware relate/unrelate/can-relate API endpoints, generic
container-nav fragment, and relation-driven UI components.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 08:35:17 +00:00
parent 6f1d5bac3c
commit a0a0f5ebc2
17 changed files with 928 additions and 28 deletions

View File

@@ -0,0 +1,52 @@
"""Add relation_type and metadata columns to container_relations
Revision ID: relations_0002
Revises: relations_0001
Create Date: 2026-02-28
"""
import sqlalchemy as sa
from alembic import op
revision = "relations_0002"
down_revision = "relations_0001"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"container_relations",
sa.Column("relation_type", sa.String(64), nullable=True),
)
op.add_column(
"container_relations",
sa.Column("metadata", sa.JSON(), nullable=True),
)
op.create_index(
"ix_container_relations_relation_type",
"container_relations",
["relation_type", "parent_type", "parent_id"],
)
# Backfill relation_type for existing rows based on parent/child type pairs
op.execute("""
UPDATE container_relations
SET relation_type = CASE
WHEN parent_type = 'page' AND child_type = 'market'
THEN 'page->market'
WHEN parent_type = 'page' AND child_type = 'calendar'
THEN 'page->calendar'
WHEN parent_type = 'page' AND child_type = 'menu_node'
THEN 'page->menu_node'
END
WHERE relation_type IS NULL
AND parent_type = 'page'
AND child_type IN ('market', 'calendar', 'menu_node')
""")
def downgrade() -> None:
op.drop_index("ix_container_relations_relation_type", "container_relations")
op.drop_column("container_relations", "metadata")
op.drop_column("container_relations", "relation_type")

View File

@@ -3,7 +3,7 @@ import path_setup # noqa: F401
from shared.infrastructure.factory import create_base_app
from bp import register_actions, register_data
from bp import register_actions, register_data, register_fragments
from services import register_domain_services
@@ -15,6 +15,7 @@ def create_app() -> "Quart":
app.register_blueprint(register_actions())
app.register_blueprint(register_data())
app.register_blueprint(register_fragments())
return app

View File

@@ -1,2 +1,3 @@
from .data.routes import register as register_data
from .actions.routes import register as register_actions
from .fragments.routes import register as register_fragments

View File

@@ -46,6 +46,8 @@ def register() -> Blueprint:
child_id=data["child_id"],
label=data.get("label"),
sort_order=data.get("sort_order"),
relation_type=data.get("relation_type"),
metadata=data.get("metadata"),
)
return {
"id": rel.id,
@@ -54,6 +56,7 @@ def register() -> Blueprint:
"child_type": rel.child_type,
"child_id": rel.child_id,
"sort_order": rel.sort_order,
"relation_type": rel.relation_type,
}
_handlers["attach-child"] = _attach_child
@@ -70,9 +73,122 @@ def register() -> Blueprint:
parent_id=data["parent_id"],
child_type=data["child_type"],
child_id=data["child_id"],
relation_type=data.get("relation_type"),
)
return {"deleted": deleted}
_handlers["detach-child"] = _detach_child
# --- relate (registry-aware) ---
async def _relate():
"""Create a typed relation with registry validation and cardinality enforcement."""
from shared.services.relationships import attach_child, get_children
from shared.sexp.relations import get_relation
data = await request.get_json(force=True)
rel_type = data.get("relation_type")
if not rel_type:
return {"error": "relation_type is required"}, 400
defn = get_relation(rel_type)
if defn is None:
return {"error": f"unknown relation_type: {rel_type}"}, 400
from_id = data["from_id"]
to_id = data["to_id"]
# Cardinality enforcement
if defn.cardinality == "one-to-one":
existing = await get_children(
g.s,
parent_type=defn.from_type,
parent_id=from_id,
child_type=defn.to_type,
relation_type=rel_type,
)
if existing:
return {"error": "one-to-one relation already exists", "existing_id": existing[0].child_id}, 409
rel = await attach_child(
g.s,
parent_type=defn.from_type,
parent_id=from_id,
child_type=defn.to_type,
child_id=to_id,
label=data.get("label"),
sort_order=data.get("sort_order"),
relation_type=rel_type,
metadata=data.get("metadata"),
)
return {
"id": rel.id,
"relation_type": rel.relation_type,
"parent_type": rel.parent_type,
"parent_id": rel.parent_id,
"child_type": rel.child_type,
"child_id": rel.child_id,
"sort_order": rel.sort_order,
}
_handlers["relate"] = _relate
# --- unrelate (registry-aware) ---
async def _unrelate():
"""Remove a typed relation with registry validation."""
from shared.services.relationships import detach_child
from shared.sexp.relations import get_relation
data = await request.get_json(force=True)
rel_type = data.get("relation_type")
if not rel_type:
return {"error": "relation_type is required"}, 400
defn = get_relation(rel_type)
if defn is None:
return {"error": f"unknown relation_type: {rel_type}"}, 400
deleted = await detach_child(
g.s,
parent_type=defn.from_type,
parent_id=data["from_id"],
child_type=defn.to_type,
child_id=data["to_id"],
relation_type=rel_type,
)
return {"deleted": deleted}
_handlers["unrelate"] = _unrelate
# --- can-relate (pre-flight check) ---
async def _can_relate():
"""Check if a relation can be created (cardinality, registry validation)."""
from shared.services.relationships import get_children
from shared.sexp.relations import get_relation
data = await request.get_json(force=True)
rel_type = data.get("relation_type")
if not rel_type:
return {"error": "relation_type is required"}, 400
defn = get_relation(rel_type)
if defn is None:
return {"allowed": False, "reason": f"unknown relation_type: {rel_type}"}
from_id = data["from_id"]
if defn.cardinality == "one-to-one":
existing = await get_children(
g.s,
parent_type=defn.from_type,
parent_id=from_id,
child_type=defn.to_type,
relation_type=rel_type,
)
if existing:
return {"allowed": False, "reason": "one-to-one relation already exists"}
return {"allowed": True}
_handlers["can-relate"] = _can_relate
return bp

View File

@@ -35,22 +35,43 @@ def register() -> Blueprint:
parent_type = request.args.get("parent_type", "")
parent_id = request.args.get("parent_id", type=int)
child_type = request.args.get("child_type")
relation_type = request.args.get("relation_type")
if not parent_type or parent_id is None:
return []
rels = await get_children(g.s, parent_type, parent_id, child_type)
return [
{
"id": r.id,
"parent_type": r.parent_type,
"parent_id": r.parent_id,
"child_type": r.child_type,
"child_id": r.child_id,
"sort_order": r.sort_order,
"label": r.label,
}
for r in rels
]
rels = await get_children(g.s, parent_type, parent_id, child_type, relation_type=relation_type)
return [_serialize_rel(r) for r in rels]
_handlers["get-children"] = _get_children
# --- get-parents ---
async def _get_parents():
"""Return ContainerRelation parents for a child."""
from shared.services.relationships import get_parents
child_type = request.args.get("child_type", "")
child_id = request.args.get("child_id", type=int)
parent_type = request.args.get("parent_type")
relation_type = request.args.get("relation_type")
if not child_type or child_id is None:
return []
rels = await get_parents(g.s, child_type, child_id, parent_type, relation_type=relation_type)
return [_serialize_rel(r) for r in rels]
_handlers["get-parents"] = _get_parents
return bp
def _serialize_rel(r):
"""Serialize a ContainerRelation to a dict."""
return {
"id": r.id,
"parent_type": r.parent_type,
"parent_id": r.parent_id,
"child_type": r.child_type,
"child_id": r.child_id,
"sort_order": r.sort_order,
"label": r.label,
"relation_type": r.relation_type,
"metadata": r.metadata_,
}

View File

View File

@@ -0,0 +1,88 @@
"""Relations app fragment endpoints.
Generic container-nav fragment that renders navigation items for all
related entities, driven by the relation registry.
"""
from __future__ import annotations
from quart import Blueprint, Response, g, request
from shared.infrastructure.fragments import FRAGMENT_HEADER
def register():
bp = Blueprint("fragments", __name__, url_prefix="/internal/fragments")
_handlers: dict[str, object] = {}
@bp.before_request
async def _require_fragment_header():
if not request.headers.get(FRAGMENT_HEADER):
return Response("", status=403)
@bp.get("/<fragment_type>")
async def get_fragment(fragment_type: str):
handler = _handlers.get(fragment_type)
if handler is None:
return Response("", status=200, content_type="text/html")
html = await handler()
return Response(html, status=200, content_type="text/html")
# --- generic container-nav fragment ----------------------------------------
async def _container_nav_handler():
"""Render nav items for all visible relations of a container entity.
Query params:
container_type: entity type (e.g. "page")
container_id: entity id
post_slug: used for URL construction
exclude: comma-separated relation types to skip
"""
from shared.sexp.jinja_bridge import sexp as render_sexp
from shared.sexp.relations import relations_from
from shared.services.relationships import get_children
container_type = request.args.get("container_type", "page")
container_id = int(request.args.get("container_id", 0))
post_slug = request.args.get("post_slug", "")
exclude_raw = request.args.get("exclude", "")
exclude = set(exclude_raw.split(",")) if exclude_raw else set()
nav_defs = [
d for d in relations_from(container_type)
if d.nav != "hidden" and d.name not in exclude
]
if not nav_defs:
return ""
parts = []
for defn in nav_defs:
children = await get_children(
g.s,
parent_type=container_type,
parent_id=container_id,
child_type=defn.to_type,
relation_type=defn.name,
)
for child in children:
slug = (child.metadata_ or {}).get("slug", "")
href = f"/{post_slug}/{slug}/" if post_slug else f"/{slug}/"
parts.append(render_sexp(
'(~relation-nav :href href :name name :icon icon :nav-class nav-class :relation-type relation-type)',
href=href,
name=child.label or "",
icon=defn.nav_icon or "",
**{
"nav-class": "",
"relation-type": defn.name,
},
))
return "\n".join(parts)
_handlers["container-nav"] = _container_nav_handler
return bp