Add PostgreSQL + IPFS backend, rename configs to recipes

- Add PostgreSQL database for cache metadata storage with schema for
  cache_items, item_types, pin_reasons, and l2_shares tables
- Add IPFS integration as durable backing store (local cache as hot storage)
- Add postgres and ipfs services to docker-compose.yml
- Update cache_manager to upload to IPFS and track CIDs
- Rename all config references to recipe throughout server.py
- Update API endpoints: /configs/* -> /recipes/*
- Update models: ConfigStatus -> RecipeStatus, ConfigRunRequest -> RecipeRunRequest
- Update UI tabs and pages to show Recipes instead of Configs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-08 14:58:29 +00:00
parent 4639a98231
commit ba244b9ebc
6 changed files with 938 additions and 212 deletions

View File

@@ -7,6 +7,7 @@ Integrates artdag's Cache, ActivityStore, and ActivityManager to provide:
- Activity tracking for runs (input/output/intermediate relationships)
- Deletion rules enforcement (shared items protected)
- L2 ActivityPub integration for "shared" status checks
- IPFS as durable backing store (local cache as hot storage)
"""
import hashlib
@@ -24,6 +25,8 @@ import requests
from artdag import Cache, CacheEntry, DAG, Node, NodeType
from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn
import ipfs_client
logger = logging.getLogger(__name__)
@@ -154,6 +157,10 @@ class L1CacheManager:
self._content_index: Dict[str, str] = {}
self._load_content_index()
# IPFS CID index: content_hash -> ipfs_cid
self._ipfs_cids: Dict[str, str] = {}
self._load_ipfs_index()
# Legacy files directory (for files uploaded directly by content_hash)
self.legacy_dir = self.cache_dir / "legacy"
self.legacy_dir.mkdir(parents=True, exist_ok=True)
@@ -181,6 +188,28 @@ class L1CacheManager:
with open(self._index_path(), "w") as f:
json.dump(self._content_index, f, indent=2)
def _ipfs_index_path(self) -> Path:
return self.cache_dir / "ipfs_index.json"
def _load_ipfs_index(self):
"""Load content_hash -> ipfs_cid index."""
if self._ipfs_index_path().exists():
try:
with open(self._ipfs_index_path()) as f:
self._ipfs_cids = json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Failed to load IPFS index: {e}")
self._ipfs_cids = {}
def _save_ipfs_index(self):
"""Save content_hash -> ipfs_cid index."""
with open(self._ipfs_index_path(), "w") as f:
json.dump(self._ipfs_cids, f, indent=2)
def get_ipfs_cid(self, content_hash: str) -> Optional[str]:
"""Get IPFS CID for a content hash."""
return self._ipfs_cids.get(content_hash)
def _is_shared_by_node_id(self, content_hash: str) -> bool:
"""Check if a content_hash is shared via L2."""
return self.l2_checker.is_shared(content_hash)
@@ -227,9 +256,9 @@ class L1CacheManager:
node_id: str = None,
execution_time: float = 0.0,
move: bool = False,
) -> CachedFile:
) -> tuple[CachedFile, Optional[str]]:
"""
Store a file in the cache.
Store a file in the cache and upload to IPFS.
Args:
source_path: Path to file to cache
@@ -239,7 +268,7 @@ class L1CacheManager:
move: If True, move instead of copy
Returns:
CachedFile with both node_id and content_hash
Tuple of (CachedFile with both node_id and content_hash, IPFS CID or None)
"""
# Compute content hash first
content_hash = file_hash(source_path)
@@ -252,9 +281,16 @@ class L1CacheManager:
# Check if already cached (by node_id)
existing = self.cache.get_entry(node_id)
if existing and existing.output_path.exists():
return CachedFile.from_cache_entry(existing)
# Already cached - still try to get IPFS CID if we don't have it
ipfs_cid = self._ipfs_cids.get(content_hash)
if not ipfs_cid:
ipfs_cid = ipfs_client.add_file(existing.output_path)
if ipfs_cid:
self._ipfs_cids[content_hash] = ipfs_cid
self._save_ipfs_index()
return CachedFile.from_cache_entry(existing), ipfs_cid
# Store in cache
# Store in local cache
self.cache.put(
node_id=node_id,
source_path=source_path,
@@ -269,27 +305,34 @@ class L1CacheManager:
self._content_index[entry.content_hash] = node_id
self._save_content_index()
return CachedFile.from_cache_entry(entry)
# Upload to IPFS (async in background would be better, but sync for now)
ipfs_cid = ipfs_client.add_file(entry.output_path)
if ipfs_cid:
self._ipfs_cids[entry.content_hash] = ipfs_cid
self._save_ipfs_index()
logger.info(f"Uploaded to IPFS: {entry.content_hash[:16]}... -> {ipfs_cid}")
return CachedFile.from_cache_entry(entry), ipfs_cid
def get_by_node_id(self, node_id: str) -> Optional[Path]:
"""Get cached file path by node_id."""
return self.cache.get(node_id)
def get_by_content_hash(self, content_hash: str) -> Optional[Path]:
"""Get cached file path by content_hash."""
"""Get cached file path by content_hash. Falls back to IPFS if not in local cache."""
# Check index first (new cache structure)
node_id = self._content_index.get(content_hash)
if node_id:
path = self.cache.get(node_id)
if path and path.exists():
logger.info(f" Found via index: {path}")
logger.debug(f" Found via index: {path}")
return path
# For uploads, node_id == content_hash, so try direct lookup
# This works even if cache index hasn't been reloaded
path = self.cache.get(content_hash)
logger.info(f" cache.get({content_hash[:16]}...) returned: {path}")
logger.debug(f" cache.get({content_hash[:16]}...) returned: {path}")
if path and path.exists():
self._content_index[content_hash] = content_hash
self._save_content_index()
@@ -298,7 +341,7 @@ class L1CacheManager:
# Scan cache entries (fallback for new structure)
entry = self.cache.find_by_content_hash(content_hash)
if entry and entry.output_path.exists():
logger.info(f" Found via scan: {entry.output_path}")
logger.debug(f" Found via scan: {entry.output_path}")
self._content_index[content_hash] = entry.node_id
self._save_content_index()
return entry.output_path
@@ -308,6 +351,15 @@ class L1CacheManager:
if legacy_path.exists() and legacy_path.is_file():
return legacy_path
# Try to recover from IPFS if we have a CID
ipfs_cid = self._ipfs_cids.get(content_hash)
if ipfs_cid:
logger.info(f"Recovering from IPFS: {content_hash[:16]}... ({ipfs_cid})")
recovery_path = self.legacy_dir / content_hash
if ipfs_client.get_file(ipfs_cid, recovery_path):
logger.info(f"Recovered from IPFS: {recovery_path}")
return recovery_path
return None
def has_content(self, content_hash: str) -> bool:

430
database.py Normal file
View File

@@ -0,0 +1,430 @@
# art-celery/database.py
"""
PostgreSQL database module for Art DAG L1 server.
Provides connection pooling and CRUD operations for cache metadata.
"""
import os
from datetime import datetime, timezone
from typing import List, Optional
import asyncpg
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://artdag:artdag@localhost:5432/artdag")
pool: Optional[asyncpg.Pool] = None
SCHEMA_SQL = """
-- Core cache: just content hash and IPFS CID
CREATE TABLE IF NOT EXISTS cache_items (
content_hash VARCHAR(64) PRIMARY KEY,
ipfs_cid VARCHAR(128),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Item types: metadata lives here (same item can be recipe AND media)
CREATE TABLE IF NOT EXISTS item_types (
id SERIAL PRIMARY KEY,
content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
type VARCHAR(50) NOT NULL,
path VARCHAR(255),
description TEXT,
source_type VARCHAR(20),
source_url TEXT,
source_note TEXT,
pinned BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(content_hash, type, path)
);
-- Pin reasons: one-to-many from item_types
CREATE TABLE IF NOT EXISTS pin_reasons (
id SERIAL PRIMARY KEY,
item_type_id INTEGER REFERENCES item_types(id) ON DELETE CASCADE,
reason VARCHAR(100) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- L2 shares: includes content_type for role when shared
CREATE TABLE IF NOT EXISTS l2_shares (
id SERIAL PRIMARY KEY,
content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
l2_server VARCHAR(255) NOT NULL,
asset_name VARCHAR(255) NOT NULL,
content_type VARCHAR(50) NOT NULL,
published_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_synced_at TIMESTAMP WITH TIME ZONE,
UNIQUE(content_hash, l2_server, content_type)
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash);
CREATE INDEX IF NOT EXISTS idx_item_types_type ON item_types(type);
CREATE INDEX IF NOT EXISTS idx_item_types_path ON item_types(path);
CREATE INDEX IF NOT EXISTS idx_pin_reasons_item_type ON pin_reasons(item_type_id);
CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash);
"""
async def init_db():
"""Initialize database connection pool and create schema."""
global pool
pool = await asyncpg.create_pool(DATABASE_URL)
async with pool.acquire() as conn:
await conn.execute(SCHEMA_SQL)
async def close_db():
"""Close database connection pool."""
global pool
if pool:
await pool.close()
pool = None
# ============ Cache Items ============
async def create_cache_item(content_hash: str, ipfs_cid: Optional[str] = None) -> dict:
"""Create a cache item. Returns the created item."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO cache_items (content_hash, ipfs_cid)
VALUES ($1, $2)
ON CONFLICT (content_hash) DO UPDATE SET ipfs_cid = COALESCE($2, cache_items.ipfs_cid)
RETURNING content_hash, ipfs_cid, created_at
""",
content_hash, ipfs_cid
)
return dict(row)
async def get_cache_item(content_hash: str) -> Optional[dict]:
"""Get a cache item by content hash."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1",
content_hash
)
return dict(row) if row else None
async def update_cache_item_ipfs_cid(content_hash: str, ipfs_cid: str) -> bool:
"""Update the IPFS CID for a cache item."""
async with pool.acquire() as conn:
result = await conn.execute(
"UPDATE cache_items SET ipfs_cid = $2 WHERE content_hash = $1",
content_hash, ipfs_cid
)
return result == "UPDATE 1"
async def delete_cache_item(content_hash: str) -> bool:
"""Delete a cache item and all associated data (cascades)."""
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM cache_items WHERE content_hash = $1",
content_hash
)
return result == "DELETE 1"
async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]:
"""List cache items with pagination."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT content_hash, ipfs_cid, created_at
FROM cache_items
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
""",
limit, offset
)
return [dict(row) for row in rows]
# ============ Item Types ============
async def add_item_type(
content_hash: str,
item_type: str,
path: Optional[str] = None,
description: Optional[str] = None,
source_type: Optional[str] = None,
source_url: Optional[str] = None,
source_note: Optional[str] = None,
) -> dict:
"""Add a type to a cache item. Creates cache_item if needed."""
async with pool.acquire() as conn:
# Ensure cache_item exists
await conn.execute(
"INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING",
content_hash
)
# Insert or update item_type
row = await conn.fetchrow(
"""
INSERT INTO item_types (content_hash, type, path, description, source_type, source_url, source_note)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (content_hash, type, path) DO UPDATE SET
description = COALESCE($4, item_types.description),
source_type = COALESCE($5, item_types.source_type),
source_url = COALESCE($6, item_types.source_url),
source_note = COALESCE($7, item_types.source_note)
RETURNING id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
""",
content_hash, item_type, path, description, source_type, source_url, source_note
)
return dict(row)
async def get_item_types(content_hash: str) -> List[dict]:
"""Get all types for a cache item."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1
ORDER BY created_at
""",
content_hash
)
return [dict(row) for row in rows]
async def get_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> Optional[dict]:
"""Get a specific type for a cache item."""
async with pool.acquire() as conn:
if path is None:
row = await conn.fetchrow(
"""
SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1 AND type = $2 AND path IS NULL
""",
content_hash, item_type
)
else:
row = await conn.fetchrow(
"""
SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1 AND type = $2 AND path = $3
""",
content_hash, item_type, path
)
return dict(row) if row else None
async def update_item_type(
item_type_id: int,
description: Optional[str] = None,
source_type: Optional[str] = None,
source_url: Optional[str] = None,
source_note: Optional[str] = None,
) -> bool:
"""Update an item type's metadata."""
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE item_types SET
description = COALESCE($2, description),
source_type = COALESCE($3, source_type),
source_url = COALESCE($4, source_url),
source_note = COALESCE($5, source_note)
WHERE id = $1
""",
item_type_id, description, source_type, source_url, source_note
)
return result == "UPDATE 1"
async def delete_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> bool:
"""Delete a specific type from a cache item."""
async with pool.acquire() as conn:
if path is None:
result = await conn.execute(
"DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path IS NULL",
content_hash, item_type
)
else:
result = await conn.execute(
"DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path = $3",
content_hash, item_type, path
)
return result == "DELETE 1"
async def list_items_by_type(item_type: str, limit: int = 100, offset: int = 0) -> List[dict]:
"""List all items of a specific type."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT it.id, it.content_hash, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.type = $1
ORDER BY it.created_at DESC
LIMIT $2 OFFSET $3
""",
item_type, limit, offset
)
return [dict(row) for row in rows]
async def get_item_by_path(item_type: str, path: str) -> Optional[dict]:
"""Get an item by its type and path (e.g., recipe:/effects/dog)."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT it.id, it.content_hash, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.type = $1 AND it.path = $2
""",
item_type, path
)
return dict(row) if row else None
# ============ Pinning ============
async def pin_item_type(item_type_id: int, reason: str) -> bool:
"""Pin an item type with a reason."""
async with pool.acquire() as conn:
async with conn.transaction():
# Set pinned flag
await conn.execute(
"UPDATE item_types SET pinned = TRUE WHERE id = $1",
item_type_id
)
# Add pin reason
await conn.execute(
"INSERT INTO pin_reasons (item_type_id, reason) VALUES ($1, $2)",
item_type_id, reason
)
return True
async def unpin_item_type(item_type_id: int, reason: Optional[str] = None) -> bool:
"""Remove a pin reason from an item type. If no reasons left, unpins the item."""
async with pool.acquire() as conn:
async with conn.transaction():
if reason:
# Remove specific reason
await conn.execute(
"DELETE FROM pin_reasons WHERE item_type_id = $1 AND reason = $2",
item_type_id, reason
)
else:
# Remove all reasons
await conn.execute(
"DELETE FROM pin_reasons WHERE item_type_id = $1",
item_type_id
)
# Check if any reasons remain
count = await conn.fetchval(
"SELECT COUNT(*) FROM pin_reasons WHERE item_type_id = $1",
item_type_id
)
if count == 0:
await conn.execute(
"UPDATE item_types SET pinned = FALSE WHERE id = $1",
item_type_id
)
return True
async def get_pin_reasons(item_type_id: int) -> List[dict]:
"""Get all pin reasons for an item type."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, reason, created_at FROM pin_reasons WHERE item_type_id = $1 ORDER BY created_at",
item_type_id
)
return [dict(row) for row in rows]
async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) -> tuple[bool, List[str]]:
"""Check if any type of a cache item is pinned. Returns (is_pinned, reasons)."""
async with pool.acquire() as conn:
if item_type:
rows = await conn.fetch(
"""
SELECT pr.reason
FROM pin_reasons pr
JOIN item_types it ON pr.item_type_id = it.id
WHERE it.content_hash = $1 AND it.type = $2 AND it.pinned = TRUE
""",
content_hash, item_type
)
else:
rows = await conn.fetch(
"""
SELECT pr.reason
FROM pin_reasons pr
JOIN item_types it ON pr.item_type_id = it.id
WHERE it.content_hash = $1 AND it.pinned = TRUE
""",
content_hash
)
reasons = [row["reason"] for row in rows]
return len(reasons) > 0, reasons
# ============ L2 Shares ============
async def add_l2_share(
content_hash: str,
l2_server: str,
asset_name: str,
content_type: str,
) -> dict:
"""Add or update an L2 share."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO l2_shares (content_hash, l2_server, asset_name, content_type, last_synced_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (content_hash, l2_server, content_type) DO UPDATE SET
asset_name = $3,
last_synced_at = NOW()
RETURNING id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at
""",
content_hash, l2_server, asset_name, content_type
)
return dict(row)
async def get_l2_shares(content_hash: str) -> List[dict]:
"""Get all L2 shares for a cache item."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at
FROM l2_shares
WHERE content_hash = $1
ORDER BY published_at
""",
content_hash
)
return [dict(row) for row in rows]
async def delete_l2_share(content_hash: str, l2_server: str, content_type: str) -> bool:
"""Delete an L2 share."""
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM l2_shares WHERE content_hash = $1 AND l2_server = $2 AND content_type = $3",
content_hash, l2_server, content_type
)
return result == "DELETE 1"

View File

@@ -12,18 +12,49 @@ services:
restart_policy:
condition: on-failure
postgres:
image: postgres:16-alpine
environment:
- POSTGRES_USER=artdag
- POSTGRES_PASSWORD=artdag
- POSTGRES_DB=artdag
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- celery
deploy:
replicas: 1
restart_policy:
condition: on-failure
ipfs:
image: ipfs/kubo:latest
volumes:
- ipfs_data:/data/ipfs
- l1_cache:/data/cache:ro # Read-only access to cache for adding files
networks:
- celery
deploy:
replicas: 1
restart_policy:
condition: on-failure
l1-server:
image: git.rose-ash.com/art-dag/l1-server:latest
env_file:
- .env
environment:
- REDIS_URL=redis://redis:6379/5
- DATABASE_URL=postgresql://artdag:artdag@postgres:5432/artdag
- IPFS_API=/dns/ipfs/tcp/5001
- CACHE_DIR=/data/cache
# L2_SERVER and L2_DOMAIN from .env file
volumes:
- l1_cache:/data/cache
depends_on:
- redis
- postgres
- ipfs
networks:
- celery
- externalnet
@@ -37,12 +68,16 @@ services:
command: celery -A celery_app worker --loglevel=info
environment:
- REDIS_URL=redis://redis:6379/5
- DATABASE_URL=postgresql://artdag:artdag@postgres:5432/artdag
- IPFS_API=/dns/ipfs/tcp/5001
- CACHE_DIR=/data/cache
- C_FORCE_ROOT=true
volumes:
- l1_cache:/data/cache
depends_on:
- redis
- postgres
- ipfs
networks:
- celery
deploy:
@@ -52,6 +87,8 @@ services:
volumes:
redis_data:
postgres_data:
ipfs_data:
l1_cache:
networks:

192
ipfs_client.py Normal file
View File

@@ -0,0 +1,192 @@
# art-celery/ipfs_client.py
"""
IPFS client for Art DAG L1 server.
Provides functions to add, retrieve, and pin files on IPFS.
Uses local cache as hot storage with IPFS as durable backing store.
"""
import logging
import os
from pathlib import Path
from typing import Optional
import ipfshttpclient
logger = logging.getLogger(__name__)
# IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001
IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001")
# Connection timeout in seconds
IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "30"))
def get_client():
"""Get an IPFS client connection."""
return ipfshttpclient.connect(IPFS_API, timeout=IPFS_TIMEOUT)
def add_file(file_path: Path, pin: bool = True) -> Optional[str]:
"""
Add a file to IPFS and optionally pin it.
Args:
file_path: Path to the file to add
pin: Whether to pin the file (default: True)
Returns:
IPFS CID (content identifier) or None on failure
"""
try:
with get_client() as client:
result = client.add(str(file_path), pin=pin)
cid = result["Hash"]
logger.info(f"Added to IPFS: {file_path.name} -> {cid}")
return cid
except Exception as e:
logger.error(f"Failed to add to IPFS: {e}")
return None
def add_bytes(data: bytes, pin: bool = True) -> Optional[str]:
"""
Add bytes data to IPFS and optionally pin it.
Args:
data: Bytes to add
pin: Whether to pin the data (default: True)
Returns:
IPFS CID or None on failure
"""
try:
with get_client() as client:
result = client.add_bytes(data)
cid = result
if pin:
client.pin.add(cid)
logger.info(f"Added bytes to IPFS: {len(data)} bytes -> {cid}")
return cid
except Exception as e:
logger.error(f"Failed to add bytes to IPFS: {e}")
return None
def get_file(cid: str, dest_path: Path) -> bool:
"""
Retrieve a file from IPFS and save to destination.
Args:
cid: IPFS CID to retrieve
dest_path: Path to save the file
Returns:
True on success, False on failure
"""
try:
with get_client() as client:
# Get file content
data = client.cat(cid)
# Write to destination
dest_path.parent.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(data)
logger.info(f"Retrieved from IPFS: {cid} -> {dest_path}")
return True
except Exception as e:
logger.error(f"Failed to get from IPFS: {e}")
return False
def get_bytes(cid: str) -> Optional[bytes]:
"""
Retrieve bytes data from IPFS.
Args:
cid: IPFS CID to retrieve
Returns:
File content as bytes or None on failure
"""
try:
with get_client() as client:
data = client.cat(cid)
logger.info(f"Retrieved from IPFS: {cid} ({len(data)} bytes)")
return data
except Exception as e:
logger.error(f"Failed to get bytes from IPFS: {e}")
return None
def pin(cid: str) -> bool:
"""
Pin a CID on IPFS.
Args:
cid: IPFS CID to pin
Returns:
True on success, False on failure
"""
try:
with get_client() as client:
client.pin.add(cid)
logger.info(f"Pinned on IPFS: {cid}")
return True
except Exception as e:
logger.error(f"Failed to pin on IPFS: {e}")
return False
def unpin(cid: str) -> bool:
"""
Unpin a CID on IPFS.
Args:
cid: IPFS CID to unpin
Returns:
True on success, False on failure
"""
try:
with get_client() as client:
client.pin.rm(cid)
logger.info(f"Unpinned on IPFS: {cid}")
return True
except Exception as e:
logger.error(f"Failed to unpin on IPFS: {e}")
return False
def is_pinned(cid: str) -> bool:
"""
Check if a CID is pinned on IPFS.
Args:
cid: IPFS CID to check
Returns:
True if pinned, False otherwise
"""
try:
with get_client() as client:
pins = client.pin.ls(type="recursive")
return cid in pins.get("Keys", {})
except Exception as e:
logger.error(f"Failed to check pin status: {e}")
return False
def is_available() -> bool:
"""
Check if IPFS daemon is available.
Returns:
True if IPFS is available, False otherwise
"""
try:
with get_client() as client:
client.id()
return True
except Exception:
return False

View File

@@ -5,5 +5,7 @@ fastapi>=0.109.0
uvicorn>=0.27.0
python-multipart>=0.0.6
PyYAML>=6.0
asyncpg>=0.29.0
ipfshttpclient>=0.8.0
# Core artdag from GitHub
git+https://github.com/gilesbradshaw/art-dag.git

417
server.py
View File

@@ -28,7 +28,9 @@ import yaml
from celery_app import app as celery_app
from tasks import render_effect, execute_dag, build_effect_dag
from contextlib import asynccontextmanager
from cache_manager import L1CacheManager, get_cache_manager
import database
# L2 server for auth verification
L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200")
@@ -51,7 +53,7 @@ redis_client = redis.Redis(
db=int(parsed.path.lstrip('/') or 0)
)
RUNS_KEY_PREFIX = "artdag:run:"
CONFIGS_KEY_PREFIX = "artdag:config:"
RECIPES_KEY_PREFIX = "artdag:recipe:"
def save_run(run: "RunStatus"):
@@ -91,10 +93,21 @@ def find_runs_using_content(content_hash: str) -> list[tuple["RunStatus", str]]:
return results
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize and cleanup resources."""
# Startup: initialize database
await database.init_db()
yield
# Shutdown: close database
await database.close_db()
app = FastAPI(
title="Art DAG L1 Server",
description="Distributed rendering server for Art DAG",
version="0.1.0"
version="0.1.0",
lifespan=lifespan
)
@@ -125,7 +138,7 @@ class RunStatus(BaseModel):
infrastructure: Optional[dict] = None # Hardware/software used for rendering
# ============ Config Models ============
# ============ Recipe Models ============
class VariableInput(BaseModel):
"""A variable input that must be filled at run time."""
@@ -142,9 +155,9 @@ class FixedInput(BaseModel):
content_hash: str
class ConfigStatus(BaseModel):
"""Status/metadata of a config."""
config_id: str # Content hash of the YAML file
class RecipeStatus(BaseModel):
"""Status/metadata of a recipe."""
recipe_id: str # Content hash of the YAML file
name: str
version: str
description: Optional[str] = None
@@ -156,41 +169,41 @@ class ConfigStatus(BaseModel):
uploader: Optional[str] = None
class ConfigRunRequest(BaseModel):
"""Request to run a config with variable inputs."""
class RecipeRunRequest(BaseModel):
"""Request to run a recipe with variable inputs."""
inputs: dict[str, str] # node_id -> content_hash
def save_config(config: ConfigStatus):
"""Save config to Redis."""
redis_client.set(f"{CONFIGS_KEY_PREFIX}{config.config_id}", config.model_dump_json())
def save_recipe(recipe: RecipeStatus):
"""Save recipe to Redis."""
redis_client.set(f"{RECIPES_KEY_PREFIX}{recipe.recipe_id}", recipe.model_dump_json())
def load_config(config_id: str) -> Optional[ConfigStatus]:
"""Load config from Redis."""
data = redis_client.get(f"{CONFIGS_KEY_PREFIX}{config_id}")
def load_recipe(recipe_id: str) -> Optional[RecipeStatus]:
"""Load recipe from Redis."""
data = redis_client.get(f"{RECIPES_KEY_PREFIX}{recipe_id}")
if data:
return ConfigStatus.model_validate_json(data)
return RecipeStatus.model_validate_json(data)
return None
def list_all_configs() -> list[ConfigStatus]:
"""List all configs from Redis."""
configs = []
for key in redis_client.scan_iter(f"{CONFIGS_KEY_PREFIX}*"):
def list_all_recipes() -> list[RecipeStatus]:
"""List all recipes from Redis."""
recipes = []
for key in redis_client.scan_iter(f"{RECIPES_KEY_PREFIX}*"):
data = redis_client.get(key)
if data:
configs.append(ConfigStatus.model_validate_json(data))
return sorted(configs, key=lambda c: c.uploaded_at, reverse=True)
recipes.append(RecipeStatus.model_validate_json(data))
return sorted(recipes, key=lambda c: c.uploaded_at, reverse=True)
def delete_config_from_redis(config_id: str) -> bool:
"""Delete config from Redis."""
return redis_client.delete(f"{CONFIGS_KEY_PREFIX}{config_id}") > 0
def delete_recipe_from_redis(recipe_id: str) -> bool:
"""Delete recipe from Redis."""
return redis_client.delete(f"{RECIPES_KEY_PREFIX}{recipe_id}") > 0
def parse_config_yaml(yaml_content: str, config_hash: str, uploader: str) -> ConfigStatus:
"""Parse a config YAML file and extract metadata."""
def parse_recipe_yaml(yaml_content: str, recipe_hash: str, uploader: str) -> RecipeStatus:
"""Parse a recipe YAML file and extract metadata."""
config = yaml.safe_load(yaml_content)
# Extract basic info
@@ -235,8 +248,8 @@ def parse_config_yaml(yaml_content: str, config_hash: str, uploader: str) -> Con
content_hash=asset_info.get("hash", "")
))
return ConfigStatus(
config_id=config_hash,
return RecipeStatus(
recipe_id=recipe_hash,
name=name,
version=version,
description=description,
@@ -305,7 +318,7 @@ def cache_file(source: Path, node_type: str = "output") -> str:
Uses artdag's Cache internally for proper tracking.
"""
cached = cache_manager.put(source, node_type=node_type)
cached, ipfs_cid = cache_manager.put(source, node_type=node_type)
return cached.content_hash
@@ -520,7 +533,7 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)):
Enforces deletion rules:
- Cannot discard if output is published to L2 (pinned)
- Deletes outputs and intermediate cache entries
- Preserves inputs (cache items and configs are NOT deleted)
- Preserves inputs (cache items and recipes are NOT deleted)
"""
run = load_run(run_id)
if not run:
@@ -987,11 +1000,11 @@ async def list_runs(request: Request, page: int = 1, limit: int = 20):
}
# ============ Config Endpoints ============
# ============ Recipe Endpoints ============
@app.post("/configs/upload")
async def upload_config(file: UploadFile = File(...), username: str = Depends(get_required_user)):
"""Upload a config YAML file. Requires authentication."""
@app.post("/recipes/upload")
async def upload_recipe(file: UploadFile = File(...), username: str = Depends(get_required_user)):
"""Upload a recipe YAML file. Requires authentication."""
import tempfile
# Read file content
@@ -999,7 +1012,7 @@ async def upload_config(file: UploadFile = File(...), username: str = Depends(ge
try:
yaml_content = content.decode('utf-8')
except UnicodeDecodeError:
raise HTTPException(400, "Config file must be valid UTF-8 text")
raise HTTPException(400, "Recipe file must be valid UTF-8 text")
# Validate YAML
try:
@@ -1012,51 +1025,51 @@ async def upload_config(file: UploadFile = File(...), username: str = Depends(ge
tmp.write(content)
tmp_path = Path(tmp.name)
cached = cache_manager.put(tmp_path, node_type="config", move=True)
config_hash = cached.content_hash
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="recipe", move=True)
recipe_hash = cached.content_hash
# Parse and save metadata
actor_id = f"@{username}@{L2_DOMAIN}"
try:
config_status = parse_config_yaml(yaml_content, config_hash, actor_id)
recipe_status = parse_recipe_yaml(yaml_content, recipe_hash, actor_id)
except Exception as e:
raise HTTPException(400, f"Failed to parse config: {e}")
raise HTTPException(400, f"Failed to parse recipe: {e}")
save_config(config_status)
save_recipe(recipe_status)
# Save cache metadata
save_cache_meta(config_hash, actor_id, file.filename, type="config", config_name=config_status.name)
save_cache_meta(recipe_hash, actor_id, file.filename, type="recipe", recipe_name=recipe_status.name)
return {
"config_id": config_hash,
"name": config_status.name,
"version": config_status.version,
"variable_inputs": len(config_status.variable_inputs),
"fixed_inputs": len(config_status.fixed_inputs)
"recipe_id": recipe_hash,
"name": recipe_status.name,
"version": recipe_status.version,
"variable_inputs": len(recipe_status.variable_inputs),
"fixed_inputs": len(recipe_status.fixed_inputs)
}
@app.get("/configs")
async def list_configs_api(request: Request, page: int = 1, limit: int = 20):
"""List configs. HTML for browsers, JSON for APIs."""
@app.get("/recipes")
async def list_recipes_api(request: Request, page: int = 1, limit: int = 20):
"""List recipes. HTML for browsers, JSON for APIs."""
current_user = get_user_from_cookie(request)
all_configs = list_all_configs()
total = len(all_configs)
all_recipes = list_all_recipes()
total = len(all_recipes)
# Pagination
start = (page - 1) * limit
end = start + limit
configs_page = all_configs[start:end]
recipes_page = all_recipes[start:end]
has_more = end < total
if wants_html(request):
# HTML response - redirect to /configs page with proper UI
return RedirectResponse(f"/configs?page={page}")
# HTML response - redirect to /recipes page with proper UI
return RedirectResponse(f"/recipes?page={page}")
# JSON response for APIs
return {
"configs": [c.model_dump() for c in configs_page],
"recipes": [c.model_dump() for c in recipes_page],
"pagination": {
"page": page,
"limit": limit,
@@ -1066,61 +1079,61 @@ async def list_configs_api(request: Request, page: int = 1, limit: int = 20):
}
@app.get("/configs/{config_id}")
async def get_config_api(config_id: str):
"""Get config details."""
config = load_config(config_id)
if not config:
raise HTTPException(404, f"Config {config_id} not found")
return config
@app.get("/recipes/{recipe_id}")
async def get_recipe_api(recipe_id: str):
"""Get recipe details."""
recipe = load_recipe(recipe_id)
if not recipe:
raise HTTPException(404, f"Recipe {recipe_id} not found")
return recipe
@app.delete("/configs/{config_id}")
async def remove_config(config_id: str, username: str = Depends(get_required_user)):
"""Delete a config. Requires authentication."""
config = load_config(config_id)
if not config:
raise HTTPException(404, f"Config {config_id} not found")
@app.delete("/recipes/{recipe_id}")
async def remove_recipe(recipe_id: str, username: str = Depends(get_required_user)):
"""Delete a recipe. Requires authentication."""
recipe = load_recipe(recipe_id)
if not recipe:
raise HTTPException(404, f"Recipe {recipe_id} not found")
# Check ownership
actor_id = f"@{username}@{L2_DOMAIN}"
if config.uploader not in (username, actor_id):
if recipe.uploader not in (username, actor_id):
raise HTTPException(403, "Access denied")
# Check if pinned
pinned, reason = cache_manager.is_pinned(config_id)
pinned, reason = cache_manager.is_pinned(recipe_id)
if pinned:
raise HTTPException(400, f"Cannot delete pinned config: {reason}")
raise HTTPException(400, f"Cannot delete pinned recipe: {reason}")
# Delete from Redis and cache
delete_config_from_redis(config_id)
cache_manager.delete_by_content_hash(config_id)
delete_recipe_from_redis(recipe_id)
cache_manager.delete_by_content_hash(recipe_id)
return {"deleted": True, "config_id": config_id}
return {"deleted": True, "recipe_id": recipe_id}
@app.post("/configs/{config_id}/run")
async def run_config(config_id: str, request: ConfigRunRequest, username: str = Depends(get_required_user)):
"""Run a config with provided variable inputs. Requires authentication."""
config = load_config(config_id)
if not config:
raise HTTPException(404, f"Config {config_id} not found")
@app.post("/recipes/{recipe_id}/run")
async def run_recipe(recipe_id: str, request: RecipeRunRequest, username: str = Depends(get_required_user)):
"""Run a recipe with provided variable inputs. Requires authentication."""
recipe = load_recipe(recipe_id)
if not recipe:
raise HTTPException(404, f"Recipe {recipe_id} not found")
# Validate all required inputs are provided
for var_input in config.variable_inputs:
for var_input in recipe.variable_inputs:
if var_input.required and var_input.node_id not in request.inputs:
raise HTTPException(400, f"Missing required input: {var_input.name}")
# Load config YAML
config_path = cache_manager.get_by_content_hash(config_id)
if not config_path:
raise HTTPException(500, "Config YAML not found in cache")
# Load recipe YAML
recipe_path = cache_manager.get_by_content_hash(recipe_id)
if not recipe_path:
raise HTTPException(500, "Recipe YAML not found in cache")
with open(config_path) as f:
with open(recipe_path) as f:
yaml_config = yaml.safe_load(f)
# Build DAG from config
dag = build_dag_from_config(yaml_config, request.inputs, config)
# Build DAG from recipe
dag = build_dag_from_recipe(yaml_config, request.inputs, recipe)
# Create run
run_id = str(uuid.uuid4())
@@ -1128,16 +1141,16 @@ async def run_config(config_id: str, request: ConfigRunRequest, username: str =
# Collect all input hashes
all_inputs = list(request.inputs.values())
for fixed in config.fixed_inputs:
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
all_inputs.append(fixed.content_hash)
run = RunStatus(
run_id=run_id,
status="pending",
recipe=f"config:{config.name}",
recipe=f"recipe:{recipe.name}",
inputs=all_inputs,
output_name=f"{config.name}-{run_id[:8]}",
output_name=f"{recipe.name}-{run_id[:8]}",
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
@@ -1152,8 +1165,8 @@ async def run_config(config_id: str, request: ConfigRunRequest, username: str =
return run
def build_dag_from_config(yaml_config: dict, user_inputs: dict[str, str], config: ConfigStatus):
"""Build a DAG from config YAML with user-provided inputs."""
def build_dag_from_recipe(yaml_config: dict, user_inputs: dict[str, str], recipe: RecipeStatus):
"""Build a DAG from recipe YAML with user-provided inputs."""
from artdag import DAG, Node
dag = DAG()
@@ -1207,39 +1220,39 @@ def build_dag_from_config(yaml_config: dict, user_inputs: dict[str, str], config
return dag
# ============ Config UI Pages ============
# ============ Recipe UI Pages ============
@app.get("/configs", response_class=HTMLResponse)
async def configs_page(request: Request, page: int = 1):
"""Configs list page (HTML)."""
@app.get("/recipes", response_class=HTMLResponse)
async def recipes_page(request: Request, page: int = 1):
"""Recipes list page (HTML)."""
current_user = get_user_from_cookie(request)
if not current_user:
return HTMLResponse(render_page(
"Configs",
'<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login</a> to see configs.</p>',
"Recipes",
'<p class="text-gray-400 py-8 text-center"><a href="/login" class="text-blue-400 hover:text-blue-300">Login</a> to see recipes.</p>',
None,
active_tab="configs"
active_tab="recipes"
))
all_configs = list_all_configs()
all_recipes = list_all_recipes()
# Filter to user's configs
actor_id = f"@{current_user}@{L2_DOMAIN}"
user_configs = [c for c in all_configs if c.uploader in (current_user, actor_id)]
total = len(user_configs)
user_recipes = [c for c in all_recipes if c.uploader in (current_user, actor_id)]
total = len(user_recipes)
if not user_configs:
if not user_recipes:
content = '''
<h2 class="text-xl font-semibold text-white mb-6">Configs (0)</h2>
<p class="text-gray-400 py-8 text-center">No configs yet. Upload a config YAML file to get started.</p>
<h2 class="text-xl font-semibold text-white mb-6">Recipes (0)</h2>
<p class="text-gray-400 py-8 text-center">No recipes yet. Upload a recipe YAML file to get started.</p>
'''
return HTMLResponse(render_page("Configs", content, current_user, active_tab="configs"))
return HTMLResponse(render_page("Recipes", content, current_user, active_tab="recipes"))
html_parts = []
for config in user_configs:
var_count = len(config.variable_inputs)
fixed_count = len(config.fixed_inputs)
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
@@ -1248,54 +1261,54 @@ async def configs_page(request: Request, page: int = 1):
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
<a href="/config/{config.config_id}" class="block">
<a href="/recipe/{recipe.recipe_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-purple-600 text-white text-sm font-medium rounded-full">{config.name}</span>
<span class="text-gray-400 text-xs">v{config.version}</span>
<span class="px-3 py-1 bg-purple-600 text-white text-sm font-medium rounded-full">{recipe.name}</span>
<span class="text-gray-400 text-xs">v{recipe.version}</span>
</div>
<span class="text-xs text-gray-400">{inputs_str}</span>
</div>
<div class="text-sm text-gray-400 mb-2">
{config.description or "No description"}
{recipe.description or "No description"}
</div>
<div class="text-xs text-gray-500 font-mono truncate">
{config.config_id[:24]}...
{recipe.recipe_id[:24]}...
</div>
</div>
</a>
''')
content = f'''
<h2 class="text-xl font-semibold text-white mb-6">Configs ({total})</h2>
<h2 class="text-xl font-semibold text-white mb-6">Recipes ({total})</h2>
<div class="space-y-4">
{''.join(html_parts)}
</div>
'''
return HTMLResponse(render_page("Configs", content, current_user, active_tab="configs"))
return HTMLResponse(render_page("Recipes", content, current_user, active_tab="recipes"))
@app.get("/config/{config_id}", response_class=HTMLResponse)
async def config_detail_page(config_id: str, request: Request):
"""Config detail page with run form."""
@app.get("/recipe/{recipe_id}", response_class=HTMLResponse)
async def recipe_detail_page(recipe_id: str, request: Request):
"""Recipe detail page with run form."""
current_user = get_user_from_cookie(request)
config = load_config(config_id)
recipe = load_recipe(recipe_id)
if not config:
if not recipe:
return HTMLResponse(render_page(
"Config Not Found",
f'<p class="text-red-400">Config {config_id} not found.</p>',
"Recipe Not Found",
f'<p class="text-red-400">Recipe {recipe_id} not found.</p>',
current_user,
active_tab="configs"
active_tab="recipes"
), status_code=404)
# Build variable inputs form
var_inputs_html = ""
if config.variable_inputs:
if recipe.variable_inputs:
var_inputs_html = '<div class="space-y-4 mb-6">'
for var_input in config.variable_inputs:
for var_input in recipe.variable_inputs:
required = "required" if var_input.required else ""
var_inputs_html += f'''
<div>
@@ -1310,86 +1323,86 @@ async def config_detail_page(config_id: str, request: Request):
'''
var_inputs_html += '</div>'
else:
var_inputs_html = '<p class="text-gray-400 mb-4">This config has no variable inputs - it uses fixed assets only.</p>'
var_inputs_html = '<p class="text-gray-400 mb-4">This recipe has no variable inputs - it uses fixed assets only.</p>'
# Build fixed inputs display
fixed_inputs_html = ""
if config.fixed_inputs:
if recipe.fixed_inputs:
fixed_inputs_html = '<div class="mt-4"><h4 class="text-sm font-medium text-gray-300 mb-2">Fixed Inputs</h4><ul class="text-sm text-gray-400 space-y-1">'
for fixed in config.fixed_inputs:
for fixed in recipe.fixed_inputs:
fixed_inputs_html += f'<li><span class="text-gray-500">{fixed.asset}:</span> <span class="font-mono text-xs">{fixed.content_hash[:16]}...</span></li>'
fixed_inputs_html += '</ul></div>'
# Check if pinned
pinned, pin_reason = cache_manager.is_pinned(config_id)
pinned, pin_reason = cache_manager.is_pinned(recipe_id)
pinned_badge = ""
if pinned:
pinned_badge = f'<span class="px-2 py-1 bg-yellow-600 text-white text-xs rounded-full ml-2">Pinned: {pin_reason}</span>'
content = f'''
<div class="mb-6">
<a href="/configs" class="text-blue-400 hover:text-blue-300 text-sm">&larr; Back to configs</a>
<a href="/recipes" class="text-blue-400 hover:text-blue-300 text-sm">&larr; Back to recipes</a>
</div>
<div class="bg-dark-700 rounded-lg p-6 mb-6">
<div class="flex items-center gap-3 mb-4">
<h2 class="text-2xl font-bold text-white">{config.name}</h2>
<span class="px-2 py-1 bg-gray-600 text-white text-xs rounded-full">v{config.version}</span>
<h2 class="text-2xl font-bold text-white">{recipe.name}</h2>
<span class="px-2 py-1 bg-gray-600 text-white text-xs rounded-full">v{recipe.version}</span>
{pinned_badge}
</div>
<p class="text-gray-400 mb-4">{config.description or 'No description'}</p>
<div class="text-xs text-gray-500 font-mono">{config.config_id}</div>
<p class="text-gray-400 mb-4">{recipe.description or 'No description'}</p>
<div class="text-xs text-gray-500 font-mono">{recipe.recipe_id}</div>
{fixed_inputs_html}
</div>
<div class="bg-dark-700 rounded-lg p-6">
<h3 class="text-lg font-semibold text-white mb-4">Run this Config</h3>
<form hx-post="/ui/configs/{config_id}/run" hx-target="#run-result" hx-swap="innerHTML">
<h3 class="text-lg font-semibold text-white mb-4">Run this Recipe</h3>
<form hx-post="/ui/recipes/{recipe_id}/run" hx-target="#run-result" hx-swap="innerHTML">
{var_inputs_html}
<div id="run-result"></div>
<button type="submit"
class="px-6 py-2 bg-green-600 hover:bg-green-700 text-white font-medium rounded-lg transition-colors">
Run Config
Run Recipe
</button>
</form>
</div>
'''
return HTMLResponse(render_page(f"Config: {config.name}", content, current_user, active_tab="configs"))
return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, current_user, active_tab="recipes"))
@app.post("/ui/configs/{config_id}/run", response_class=HTMLResponse)
async def ui_run_config(config_id: str, request: Request):
"""HTMX handler: run a config with form inputs."""
@app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse)
async def ui_run_recipe(recipe_id: str, request: Request):
"""HTMX handler: run a recipe with form inputs."""
current_user = get_user_from_cookie(request)
if not current_user:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
config = load_config(config_id)
if not config:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Config not found</div>'
recipe = load_recipe(recipe_id)
if not recipe:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Recipe not found</div>'
# Parse form data
form_data = await request.form()
inputs = {}
for var_input in config.variable_inputs:
for var_input in recipe.variable_inputs:
value = form_data.get(var_input.node_id, "").strip()
if var_input.required and not value:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Missing required input: {var_input.name}</div>'
if value:
inputs[var_input.node_id] = value
# Load config YAML
config_path = cache_manager.get_by_content_hash(config_id)
if not config_path:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Config YAML not found in cache</div>'
# Load recipe YAML
recipe_path = cache_manager.get_by_content_hash(recipe_id)
if not recipe_path:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Recipe YAML not found in cache</div>'
try:
with open(config_path) as f:
with open(recipe_path) as f:
yaml_config = yaml.safe_load(f)
# Build DAG from config
dag = build_dag_from_config(yaml_config, inputs, config)
# Build DAG from recipe
dag = build_dag_from_recipe(yaml_config, inputs, recipe)
# Create run
run_id = str(uuid.uuid4())
@@ -1397,16 +1410,16 @@ async def ui_run_config(config_id: str, request: Request):
# Collect all input hashes
all_inputs = list(inputs.values())
for fixed in config.fixed_inputs:
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
all_inputs.append(fixed.content_hash)
run = RunStatus(
run_id=run_id,
status="pending",
recipe=f"config:{config.name}",
recipe=f"recipe:{recipe.name}",
inputs=all_inputs,
output_name=f"{config.name}-{run_id[:8]}",
output_name=f"{recipe.name}-{run_id[:8]}",
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
)
@@ -1428,27 +1441,27 @@ async def ui_run_config(config_id: str, request: Request):
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {str(e)}</div>'
@app.get("/ui/configs-list", response_class=HTMLResponse)
async def ui_configs_list(request: Request):
"""HTMX partial: list of configs."""
@app.get("/ui/recipes-list", response_class=HTMLResponse)
async def ui_recipes_list(request: Request):
"""HTMX partial: list of recipes."""
current_user = get_user_from_cookie(request)
if not current_user:
return '<p class="text-gray-400 py-8 text-center"><a href="/ui/login" class="text-blue-400 hover:text-blue-300">Login</a> to see configs.</p>'
return '<p class="text-gray-400 py-8 text-center"><a href="/ui/login" class="text-blue-400 hover:text-blue-300">Login</a> to see recipes.</p>'
all_configs = list_all_configs()
all_recipes = list_all_recipes()
# Filter to user's configs
actor_id = f"@{current_user}@{L2_DOMAIN}"
user_configs = [c for c in all_configs if c.uploader in (current_user, actor_id)]
user_recipes = [c for c in all_recipes if c.uploader in (current_user, actor_id)]
if not user_configs:
return '<p class="text-gray-400 py-8 text-center">No configs yet. Upload a config YAML file to get started.</p>'
if not user_recipes:
return '<p class="text-gray-400 py-8 text-center">No recipes yet. Upload a recipe YAML file to get started.</p>'
html_parts = ['<div class="space-y-4">']
for config in user_configs:
var_count = len(config.variable_inputs)
fixed_count = len(config.fixed_inputs)
for recipe in user_recipes:
var_count = len(recipe.variable_inputs)
fixed_count = len(recipe.fixed_inputs)
input_info = []
if var_count:
input_info.append(f"{var_count} variable")
@@ -1457,20 +1470,20 @@ async def ui_configs_list(request: Request):
inputs_str = ", ".join(input_info) if input_info else "no inputs"
html_parts.append(f'''
<a href="/config/{config.config_id}" class="block">
<a href="/recipe/{recipe.recipe_id}" class="block">
<div class="bg-dark-700 rounded-lg p-4 hover:bg-dark-600 transition-colors">
<div class="flex flex-wrap items-center justify-between gap-3 mb-3">
<div class="flex items-center gap-3">
<span class="px-3 py-1 bg-purple-600 text-white text-sm font-medium rounded-full">{config.name}</span>
<span class="text-gray-400 text-xs">v{config.version}</span>
<span class="px-3 py-1 bg-purple-600 text-white text-sm font-medium rounded-full">{recipe.name}</span>
<span class="text-gray-400 text-xs">v{recipe.version}</span>
</div>
<span class="text-xs text-gray-400">{inputs_str}</span>
</div>
<div class="text-sm text-gray-400 mb-2">
{config.description or "No description"}
{recipe.description or "No description"}
</div>
<div class="text-xs text-gray-500 font-mono truncate">
{config.config_id[:24]}...
{recipe.recipe_id[:24]}...
</div>
</div>
</a>
@@ -1480,34 +1493,34 @@ async def ui_configs_list(request: Request):
return '\n'.join(html_parts)
@app.delete("/ui/configs/{config_id}/discard", response_class=HTMLResponse)
async def ui_discard_config(config_id: str, request: Request):
"""HTMX handler: discard a config."""
@app.delete("/ui/recipes/{recipe_id}/discard", response_class=HTMLResponse)
async def ui_discard_recipe(recipe_id: str, request: Request):
"""HTMX handler: discard a recipe."""
current_user = get_user_from_cookie(request)
if not current_user:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
config = load_config(config_id)
if not config:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Config not found</div>'
recipe = load_recipe(recipe_id)
if not recipe:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Recipe not found</div>'
# Check ownership
actor_id = f"@{current_user}@{L2_DOMAIN}"
if config.uploader not in (current_user, actor_id):
if recipe.uploader not in (current_user, actor_id):
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Access denied</div>'
# Check if pinned
pinned, reason = cache_manager.is_pinned(config_id)
pinned, reason = cache_manager.is_pinned(recipe_id)
if pinned:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot delete: config is pinned ({reason})</div>'
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Cannot delete: recipe is pinned ({reason})</div>'
# Delete from Redis and cache
delete_config_from_redis(config_id)
cache_manager.delete_by_content_hash(config_id)
delete_recipe_from_redis(recipe_id)
cache_manager.delete_by_content_hash(recipe_id)
return '''
<div class="bg-green-900/50 border border-green-700 text-green-300 px-4 py-3 rounded-lg mb-4">
Config deleted. <a href="/configs" class="underline">Back to configs</a>
Recipe deleted. <a href="/recipes" class="underline">Back to recipes</a>
</div>
'''
@@ -2493,7 +2506,7 @@ async def upload_to_cache(file: UploadFile = File(...), username: str = Depends(
tmp_path = Path(tmp.name)
# Store in cache via cache_manager
cached = cache_manager.put(tmp_path, node_type="upload", move=True)
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="upload", move=True)
content_hash = cached.content_hash
# Save uploader metadata
@@ -2949,7 +2962,7 @@ def render_page(title: str, content: str, username: Optional[str] = None, active
'''
runs_active = "border-b-2 border-blue-500 text-white" if active_tab == "runs" else "text-gray-400 hover:text-white"
configs_active = "border-b-2 border-blue-500 text-white" if active_tab == "configs" else "text-gray-400 hover:text-white"
recipes_active = "border-b-2 border-blue-500 text-white" if active_tab == "recipes" else "text-gray-400 hover:text-white"
cache_active = "border-b-2 border-blue-500 text-white" if active_tab == "cache" else "text-gray-400 hover:text-white"
return f"""
@@ -2972,7 +2985,7 @@ def render_page(title: str, content: str, username: Optional[str] = None, active
<nav class="flex gap-6 mb-6 border-b border-dark-500 pb-0">
<a href="/runs" class="pb-3 px-1 font-medium transition-colors {runs_active}">Runs</a>
<a href="/configs" class="pb-3 px-1 font-medium transition-colors {configs_active}">Configs</a>
<a href="/recipes" class="pb-3 px-1 font-medium transition-colors {recipes_active}">Recipes</a>
<a href="/cache" class="pb-3 px-1 font-medium transition-colors {cache_active}">Cache</a>
</nav>
@@ -3003,13 +3016,13 @@ def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str:
'''
runs_active = "border-b-2 border-blue-500 text-white" if tab == "runs" else "text-gray-400 hover:text-white"
configs_active = "border-b-2 border-blue-500 text-white" if tab == "configs" else "text-gray-400 hover:text-white"
recipes_active = "border-b-2 border-blue-500 text-white" if tab == "recipes" else "text-gray-400 hover:text-white"
cache_active = "border-b-2 border-blue-500 text-white" if tab == "cache" else "text-gray-400 hover:text-white"
if tab == "runs":
content_url = "/ui/runs"
elif tab == "configs":
content_url = "/ui/configs-list"
elif tab == "recipes":
content_url = "/ui/recipes-list"
else:
content_url = "/ui/cache-list"
@@ -3033,7 +3046,7 @@ def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str:
<nav class="flex gap-6 mb-6 border-b border-dark-500 pb-0">
<a href="/ui" class="pb-3 px-1 font-medium transition-colors {runs_active}">Runs</a>
<a href="/ui?tab=configs" class="pb-3 px-1 font-medium transition-colors {configs_active}">Configs</a>
<a href="/ui?tab=recipes" class="pb-3 px-1 font-medium transition-colors {recipes_active}">Recipes</a>
<a href="/ui?tab=cache" class="pb-3 px-1 font-medium transition-colors {cache_active}">Cache</a>
</nav>
@@ -3262,17 +3275,17 @@ async def ui_publish_run(run_id: str, request: Request, output_name: str = Form(
for input_hash in run.inputs:
save_cache_meta(input_hash, pinned=True, pin_reason="input_to_published")
# If this was a config-based run, pin the config and its fixed inputs
if run.recipe.startswith("config:"):
config_name = run.recipe.replace("config:", "")
for config in list_all_configs():
if config.name == config_name:
# Pin the config YAML
cache_manager.pin(config.config_id, reason="config_for_published")
# Pin all fixed inputs referenced by the config
for fixed in config.fixed_inputs:
# If this was a recipe-based run, pin the recipe and its fixed inputs
if run.recipe.startswith("recipe:"):
config_name = run.recipe.replace("recipe:", "")
for recipe in list_all_recipes():
if recipe.name == config_name:
# Pin the recipe YAML
cache_manager.pin(recipe.recipe_id, reason="recipe_for_published")
# Pin all fixed inputs referenced by the recipe
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_config")
cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_recipe")
break
return HTMLResponse(f'''