diff --git a/cache_manager.py b/cache_manager.py index 82fd32a..a1b01c4 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -7,6 +7,7 @@ Integrates artdag's Cache, ActivityStore, and ActivityManager to provide: - Activity tracking for runs (input/output/intermediate relationships) - Deletion rules enforcement (shared items protected) - L2 ActivityPub integration for "shared" status checks +- IPFS as durable backing store (local cache as hot storage) """ import hashlib @@ -24,6 +25,8 @@ import requests from artdag import Cache, CacheEntry, DAG, Node, NodeType from artdag.activities import Activity, ActivityStore, ActivityManager, make_is_shared_fn +import ipfs_client + logger = logging.getLogger(__name__) @@ -154,6 +157,10 @@ class L1CacheManager: self._content_index: Dict[str, str] = {} self._load_content_index() + # IPFS CID index: content_hash -> ipfs_cid + self._ipfs_cids: Dict[str, str] = {} + self._load_ipfs_index() + # Legacy files directory (for files uploaded directly by content_hash) self.legacy_dir = self.cache_dir / "legacy" self.legacy_dir.mkdir(parents=True, exist_ok=True) @@ -181,6 +188,28 @@ class L1CacheManager: with open(self._index_path(), "w") as f: json.dump(self._content_index, f, indent=2) + def _ipfs_index_path(self) -> Path: + return self.cache_dir / "ipfs_index.json" + + def _load_ipfs_index(self): + """Load content_hash -> ipfs_cid index.""" + if self._ipfs_index_path().exists(): + try: + with open(self._ipfs_index_path()) as f: + self._ipfs_cids = json.load(f) + except (json.JSONDecodeError, IOError) as e: + logger.warning(f"Failed to load IPFS index: {e}") + self._ipfs_cids = {} + + def _save_ipfs_index(self): + """Save content_hash -> ipfs_cid index.""" + with open(self._ipfs_index_path(), "w") as f: + json.dump(self._ipfs_cids, f, indent=2) + + def get_ipfs_cid(self, content_hash: str) -> Optional[str]: + """Get IPFS CID for a content hash.""" + return self._ipfs_cids.get(content_hash) + def _is_shared_by_node_id(self, content_hash: str) -> bool: """Check if a content_hash is shared via L2.""" return self.l2_checker.is_shared(content_hash) @@ -227,9 +256,9 @@ class L1CacheManager: node_id: str = None, execution_time: float = 0.0, move: bool = False, - ) -> CachedFile: + ) -> tuple[CachedFile, Optional[str]]: """ - Store a file in the cache. + Store a file in the cache and upload to IPFS. Args: source_path: Path to file to cache @@ -239,7 +268,7 @@ class L1CacheManager: move: If True, move instead of copy Returns: - CachedFile with both node_id and content_hash + Tuple of (CachedFile with both node_id and content_hash, IPFS CID or None) """ # Compute content hash first content_hash = file_hash(source_path) @@ -252,9 +281,16 @@ class L1CacheManager: # Check if already cached (by node_id) existing = self.cache.get_entry(node_id) if existing and existing.output_path.exists(): - return CachedFile.from_cache_entry(existing) + # Already cached - still try to get IPFS CID if we don't have it + ipfs_cid = self._ipfs_cids.get(content_hash) + if not ipfs_cid: + ipfs_cid = ipfs_client.add_file(existing.output_path) + if ipfs_cid: + self._ipfs_cids[content_hash] = ipfs_cid + self._save_ipfs_index() + return CachedFile.from_cache_entry(existing), ipfs_cid - # Store in cache + # Store in local cache self.cache.put( node_id=node_id, source_path=source_path, @@ -269,27 +305,34 @@ class L1CacheManager: self._content_index[entry.content_hash] = node_id self._save_content_index() - return CachedFile.from_cache_entry(entry) + # Upload to IPFS (async in background would be better, but sync for now) + ipfs_cid = ipfs_client.add_file(entry.output_path) + if ipfs_cid: + self._ipfs_cids[entry.content_hash] = ipfs_cid + self._save_ipfs_index() + logger.info(f"Uploaded to IPFS: {entry.content_hash[:16]}... -> {ipfs_cid}") + + return CachedFile.from_cache_entry(entry), ipfs_cid def get_by_node_id(self, node_id: str) -> Optional[Path]: """Get cached file path by node_id.""" return self.cache.get(node_id) def get_by_content_hash(self, content_hash: str) -> Optional[Path]: - """Get cached file path by content_hash.""" + """Get cached file path by content_hash. Falls back to IPFS if not in local cache.""" # Check index first (new cache structure) node_id = self._content_index.get(content_hash) if node_id: path = self.cache.get(node_id) if path and path.exists(): - logger.info(f" Found via index: {path}") + logger.debug(f" Found via index: {path}") return path # For uploads, node_id == content_hash, so try direct lookup # This works even if cache index hasn't been reloaded path = self.cache.get(content_hash) - logger.info(f" cache.get({content_hash[:16]}...) returned: {path}") + logger.debug(f" cache.get({content_hash[:16]}...) returned: {path}") if path and path.exists(): self._content_index[content_hash] = content_hash self._save_content_index() @@ -298,7 +341,7 @@ class L1CacheManager: # Scan cache entries (fallback for new structure) entry = self.cache.find_by_content_hash(content_hash) if entry and entry.output_path.exists(): - logger.info(f" Found via scan: {entry.output_path}") + logger.debug(f" Found via scan: {entry.output_path}") self._content_index[content_hash] = entry.node_id self._save_content_index() return entry.output_path @@ -308,6 +351,15 @@ class L1CacheManager: if legacy_path.exists() and legacy_path.is_file(): return legacy_path + # Try to recover from IPFS if we have a CID + ipfs_cid = self._ipfs_cids.get(content_hash) + if ipfs_cid: + logger.info(f"Recovering from IPFS: {content_hash[:16]}... ({ipfs_cid})") + recovery_path = self.legacy_dir / content_hash + if ipfs_client.get_file(ipfs_cid, recovery_path): + logger.info(f"Recovered from IPFS: {recovery_path}") + return recovery_path + return None def has_content(self, content_hash: str) -> bool: diff --git a/database.py b/database.py new file mode 100644 index 0000000..5829d1e --- /dev/null +++ b/database.py @@ -0,0 +1,430 @@ +# art-celery/database.py +""" +PostgreSQL database module for Art DAG L1 server. + +Provides connection pooling and CRUD operations for cache metadata. +""" + +import os +from datetime import datetime, timezone +from typing import List, Optional + +import asyncpg + +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://artdag:artdag@localhost:5432/artdag") + +pool: Optional[asyncpg.Pool] = None + +SCHEMA_SQL = """ +-- Core cache: just content hash and IPFS CID +CREATE TABLE IF NOT EXISTS cache_items ( + content_hash VARCHAR(64) PRIMARY KEY, + ipfs_cid VARCHAR(128), + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +-- Item types: metadata lives here (same item can be recipe AND media) +CREATE TABLE IF NOT EXISTS item_types ( + id SERIAL PRIMARY KEY, + content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE, + type VARCHAR(50) NOT NULL, + path VARCHAR(255), + description TEXT, + source_type VARCHAR(20), + source_url TEXT, + source_note TEXT, + pinned BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(content_hash, type, path) +); + +-- Pin reasons: one-to-many from item_types +CREATE TABLE IF NOT EXISTS pin_reasons ( + id SERIAL PRIMARY KEY, + item_type_id INTEGER REFERENCES item_types(id) ON DELETE CASCADE, + reason VARCHAR(100) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +-- L2 shares: includes content_type for role when shared +CREATE TABLE IF NOT EXISTS l2_shares ( + id SERIAL PRIMARY KEY, + content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE, + l2_server VARCHAR(255) NOT NULL, + asset_name VARCHAR(255) NOT NULL, + content_type VARCHAR(50) NOT NULL, + published_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + last_synced_at TIMESTAMP WITH TIME ZONE, + UNIQUE(content_hash, l2_server, content_type) +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash); +CREATE INDEX IF NOT EXISTS idx_item_types_type ON item_types(type); +CREATE INDEX IF NOT EXISTS idx_item_types_path ON item_types(path); +CREATE INDEX IF NOT EXISTS idx_pin_reasons_item_type ON pin_reasons(item_type_id); +CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash); +""" + + +async def init_db(): + """Initialize database connection pool and create schema.""" + global pool + pool = await asyncpg.create_pool(DATABASE_URL) + async with pool.acquire() as conn: + await conn.execute(SCHEMA_SQL) + + +async def close_db(): + """Close database connection pool.""" + global pool + if pool: + await pool.close() + pool = None + + +# ============ Cache Items ============ + +async def create_cache_item(content_hash: str, ipfs_cid: Optional[str] = None) -> dict: + """Create a cache item. Returns the created item.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO cache_items (content_hash, ipfs_cid) + VALUES ($1, $2) + ON CONFLICT (content_hash) DO UPDATE SET ipfs_cid = COALESCE($2, cache_items.ipfs_cid) + RETURNING content_hash, ipfs_cid, created_at + """, + content_hash, ipfs_cid + ) + return dict(row) + + +async def get_cache_item(content_hash: str) -> Optional[dict]: + """Get a cache item by content hash.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + "SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1", + content_hash + ) + return dict(row) if row else None + + +async def update_cache_item_ipfs_cid(content_hash: str, ipfs_cid: str) -> bool: + """Update the IPFS CID for a cache item.""" + async with pool.acquire() as conn: + result = await conn.execute( + "UPDATE cache_items SET ipfs_cid = $2 WHERE content_hash = $1", + content_hash, ipfs_cid + ) + return result == "UPDATE 1" + + +async def delete_cache_item(content_hash: str) -> bool: + """Delete a cache item and all associated data (cascades).""" + async with pool.acquire() as conn: + result = await conn.execute( + "DELETE FROM cache_items WHERE content_hash = $1", + content_hash + ) + return result == "DELETE 1" + + +async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]: + """List cache items with pagination.""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT content_hash, ipfs_cid, created_at + FROM cache_items + ORDER BY created_at DESC + LIMIT $1 OFFSET $2 + """, + limit, offset + ) + return [dict(row) for row in rows] + + +# ============ Item Types ============ + +async def add_item_type( + content_hash: str, + item_type: str, + path: Optional[str] = None, + description: Optional[str] = None, + source_type: Optional[str] = None, + source_url: Optional[str] = None, + source_note: Optional[str] = None, +) -> dict: + """Add a type to a cache item. Creates cache_item if needed.""" + async with pool.acquire() as conn: + # Ensure cache_item exists + await conn.execute( + "INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING", + content_hash + ) + # Insert or update item_type + row = await conn.fetchrow( + """ + INSERT INTO item_types (content_hash, type, path, description, source_type, source_url, source_note) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (content_hash, type, path) DO UPDATE SET + description = COALESCE($4, item_types.description), + source_type = COALESCE($5, item_types.source_type), + source_url = COALESCE($6, item_types.source_url), + source_note = COALESCE($7, item_types.source_note) + RETURNING id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at + """, + content_hash, item_type, path, description, source_type, source_url, source_note + ) + return dict(row) + + +async def get_item_types(content_hash: str) -> List[dict]: + """Get all types for a cache item.""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at + FROM item_types + WHERE content_hash = $1 + ORDER BY created_at + """, + content_hash + ) + return [dict(row) for row in rows] + + +async def get_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> Optional[dict]: + """Get a specific type for a cache item.""" + async with pool.acquire() as conn: + if path is None: + row = await conn.fetchrow( + """ + SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at + FROM item_types + WHERE content_hash = $1 AND type = $2 AND path IS NULL + """, + content_hash, item_type + ) + else: + row = await conn.fetchrow( + """ + SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at + FROM item_types + WHERE content_hash = $1 AND type = $2 AND path = $3 + """, + content_hash, item_type, path + ) + return dict(row) if row else None + + +async def update_item_type( + item_type_id: int, + description: Optional[str] = None, + source_type: Optional[str] = None, + source_url: Optional[str] = None, + source_note: Optional[str] = None, +) -> bool: + """Update an item type's metadata.""" + async with pool.acquire() as conn: + result = await conn.execute( + """ + UPDATE item_types SET + description = COALESCE($2, description), + source_type = COALESCE($3, source_type), + source_url = COALESCE($4, source_url), + source_note = COALESCE($5, source_note) + WHERE id = $1 + """, + item_type_id, description, source_type, source_url, source_note + ) + return result == "UPDATE 1" + + +async def delete_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> bool: + """Delete a specific type from a cache item.""" + async with pool.acquire() as conn: + if path is None: + result = await conn.execute( + "DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path IS NULL", + content_hash, item_type + ) + else: + result = await conn.execute( + "DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path = $3", + content_hash, item_type, path + ) + return result == "DELETE 1" + + +async def list_items_by_type(item_type: str, limit: int = 100, offset: int = 0) -> List[dict]: + """List all items of a specific type.""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT it.id, it.content_hash, it.type, it.path, it.description, + it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, + ci.ipfs_cid + FROM item_types it + JOIN cache_items ci ON it.content_hash = ci.content_hash + WHERE it.type = $1 + ORDER BY it.created_at DESC + LIMIT $2 OFFSET $3 + """, + item_type, limit, offset + ) + return [dict(row) for row in rows] + + +async def get_item_by_path(item_type: str, path: str) -> Optional[dict]: + """Get an item by its type and path (e.g., recipe:/effects/dog).""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT it.id, it.content_hash, it.type, it.path, it.description, + it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, + ci.ipfs_cid + FROM item_types it + JOIN cache_items ci ON it.content_hash = ci.content_hash + WHERE it.type = $1 AND it.path = $2 + """, + item_type, path + ) + return dict(row) if row else None + + +# ============ Pinning ============ + +async def pin_item_type(item_type_id: int, reason: str) -> bool: + """Pin an item type with a reason.""" + async with pool.acquire() as conn: + async with conn.transaction(): + # Set pinned flag + await conn.execute( + "UPDATE item_types SET pinned = TRUE WHERE id = $1", + item_type_id + ) + # Add pin reason + await conn.execute( + "INSERT INTO pin_reasons (item_type_id, reason) VALUES ($1, $2)", + item_type_id, reason + ) + return True + + +async def unpin_item_type(item_type_id: int, reason: Optional[str] = None) -> bool: + """Remove a pin reason from an item type. If no reasons left, unpins the item.""" + async with pool.acquire() as conn: + async with conn.transaction(): + if reason: + # Remove specific reason + await conn.execute( + "DELETE FROM pin_reasons WHERE item_type_id = $1 AND reason = $2", + item_type_id, reason + ) + else: + # Remove all reasons + await conn.execute( + "DELETE FROM pin_reasons WHERE item_type_id = $1", + item_type_id + ) + + # Check if any reasons remain + count = await conn.fetchval( + "SELECT COUNT(*) FROM pin_reasons WHERE item_type_id = $1", + item_type_id + ) + + if count == 0: + await conn.execute( + "UPDATE item_types SET pinned = FALSE WHERE id = $1", + item_type_id + ) + return True + + +async def get_pin_reasons(item_type_id: int) -> List[dict]: + """Get all pin reasons for an item type.""" + async with pool.acquire() as conn: + rows = await conn.fetch( + "SELECT id, reason, created_at FROM pin_reasons WHERE item_type_id = $1 ORDER BY created_at", + item_type_id + ) + return [dict(row) for row in rows] + + +async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) -> tuple[bool, List[str]]: + """Check if any type of a cache item is pinned. Returns (is_pinned, reasons).""" + async with pool.acquire() as conn: + if item_type: + rows = await conn.fetch( + """ + SELECT pr.reason + FROM pin_reasons pr + JOIN item_types it ON pr.item_type_id = it.id + WHERE it.content_hash = $1 AND it.type = $2 AND it.pinned = TRUE + """, + content_hash, item_type + ) + else: + rows = await conn.fetch( + """ + SELECT pr.reason + FROM pin_reasons pr + JOIN item_types it ON pr.item_type_id = it.id + WHERE it.content_hash = $1 AND it.pinned = TRUE + """, + content_hash + ) + reasons = [row["reason"] for row in rows] + return len(reasons) > 0, reasons + + +# ============ L2 Shares ============ + +async def add_l2_share( + content_hash: str, + l2_server: str, + asset_name: str, + content_type: str, +) -> dict: + """Add or update an L2 share.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO l2_shares (content_hash, l2_server, asset_name, content_type, last_synced_at) + VALUES ($1, $2, $3, $4, NOW()) + ON CONFLICT (content_hash, l2_server, content_type) DO UPDATE SET + asset_name = $3, + last_synced_at = NOW() + RETURNING id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at + """, + content_hash, l2_server, asset_name, content_type + ) + return dict(row) + + +async def get_l2_shares(content_hash: str) -> List[dict]: + """Get all L2 shares for a cache item.""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at + FROM l2_shares + WHERE content_hash = $1 + ORDER BY published_at + """, + content_hash + ) + return [dict(row) for row in rows] + + +async def delete_l2_share(content_hash: str, l2_server: str, content_type: str) -> bool: + """Delete an L2 share.""" + async with pool.acquire() as conn: + result = await conn.execute( + "DELETE FROM l2_shares WHERE content_hash = $1 AND l2_server = $2 AND content_type = $3", + content_hash, l2_server, content_type + ) + return result == "DELETE 1" diff --git a/docker-compose.yml b/docker-compose.yml index a2efc05..e6e2e35 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,18 +12,49 @@ services: restart_policy: condition: on-failure + postgres: + image: postgres:16-alpine + environment: + - POSTGRES_USER=artdag + - POSTGRES_PASSWORD=artdag + - POSTGRES_DB=artdag + volumes: + - postgres_data:/var/lib/postgresql/data + networks: + - celery + deploy: + replicas: 1 + restart_policy: + condition: on-failure + + ipfs: + image: ipfs/kubo:latest + volumes: + - ipfs_data:/data/ipfs + - l1_cache:/data/cache:ro # Read-only access to cache for adding files + networks: + - celery + deploy: + replicas: 1 + restart_policy: + condition: on-failure + l1-server: image: git.rose-ash.com/art-dag/l1-server:latest env_file: - .env environment: - REDIS_URL=redis://redis:6379/5 + - DATABASE_URL=postgresql://artdag:artdag@postgres:5432/artdag + - IPFS_API=/dns/ipfs/tcp/5001 - CACHE_DIR=/data/cache # L2_SERVER and L2_DOMAIN from .env file volumes: - l1_cache:/data/cache depends_on: - redis + - postgres + - ipfs networks: - celery - externalnet @@ -37,12 +68,16 @@ services: command: celery -A celery_app worker --loglevel=info environment: - REDIS_URL=redis://redis:6379/5 + - DATABASE_URL=postgresql://artdag:artdag@postgres:5432/artdag + - IPFS_API=/dns/ipfs/tcp/5001 - CACHE_DIR=/data/cache - C_FORCE_ROOT=true volumes: - l1_cache:/data/cache depends_on: - redis + - postgres + - ipfs networks: - celery deploy: @@ -52,6 +87,8 @@ services: volumes: redis_data: + postgres_data: + ipfs_data: l1_cache: networks: diff --git a/ipfs_client.py b/ipfs_client.py new file mode 100644 index 0000000..e7d1bc3 --- /dev/null +++ b/ipfs_client.py @@ -0,0 +1,192 @@ +# art-celery/ipfs_client.py +""" +IPFS client for Art DAG L1 server. + +Provides functions to add, retrieve, and pin files on IPFS. +Uses local cache as hot storage with IPFS as durable backing store. +""" + +import logging +import os +from pathlib import Path +from typing import Optional + +import ipfshttpclient + +logger = logging.getLogger(__name__) + +# IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001 +IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001") + +# Connection timeout in seconds +IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "30")) + + +def get_client(): + """Get an IPFS client connection.""" + return ipfshttpclient.connect(IPFS_API, timeout=IPFS_TIMEOUT) + + +def add_file(file_path: Path, pin: bool = True) -> Optional[str]: + """ + Add a file to IPFS and optionally pin it. + + Args: + file_path: Path to the file to add + pin: Whether to pin the file (default: True) + + Returns: + IPFS CID (content identifier) or None on failure + """ + try: + with get_client() as client: + result = client.add(str(file_path), pin=pin) + cid = result["Hash"] + logger.info(f"Added to IPFS: {file_path.name} -> {cid}") + return cid + except Exception as e: + logger.error(f"Failed to add to IPFS: {e}") + return None + + +def add_bytes(data: bytes, pin: bool = True) -> Optional[str]: + """ + Add bytes data to IPFS and optionally pin it. + + Args: + data: Bytes to add + pin: Whether to pin the data (default: True) + + Returns: + IPFS CID or None on failure + """ + try: + with get_client() as client: + result = client.add_bytes(data) + cid = result + if pin: + client.pin.add(cid) + logger.info(f"Added bytes to IPFS: {len(data)} bytes -> {cid}") + return cid + except Exception as e: + logger.error(f"Failed to add bytes to IPFS: {e}") + return None + + +def get_file(cid: str, dest_path: Path) -> bool: + """ + Retrieve a file from IPFS and save to destination. + + Args: + cid: IPFS CID to retrieve + dest_path: Path to save the file + + Returns: + True on success, False on failure + """ + try: + with get_client() as client: + # Get file content + data = client.cat(cid) + # Write to destination + dest_path.parent.mkdir(parents=True, exist_ok=True) + dest_path.write_bytes(data) + logger.info(f"Retrieved from IPFS: {cid} -> {dest_path}") + return True + except Exception as e: + logger.error(f"Failed to get from IPFS: {e}") + return False + + +def get_bytes(cid: str) -> Optional[bytes]: + """ + Retrieve bytes data from IPFS. + + Args: + cid: IPFS CID to retrieve + + Returns: + File content as bytes or None on failure + """ + try: + with get_client() as client: + data = client.cat(cid) + logger.info(f"Retrieved from IPFS: {cid} ({len(data)} bytes)") + return data + except Exception as e: + logger.error(f"Failed to get bytes from IPFS: {e}") + return None + + +def pin(cid: str) -> bool: + """ + Pin a CID on IPFS. + + Args: + cid: IPFS CID to pin + + Returns: + True on success, False on failure + """ + try: + with get_client() as client: + client.pin.add(cid) + logger.info(f"Pinned on IPFS: {cid}") + return True + except Exception as e: + logger.error(f"Failed to pin on IPFS: {e}") + return False + + +def unpin(cid: str) -> bool: + """ + Unpin a CID on IPFS. + + Args: + cid: IPFS CID to unpin + + Returns: + True on success, False on failure + """ + try: + with get_client() as client: + client.pin.rm(cid) + logger.info(f"Unpinned on IPFS: {cid}") + return True + except Exception as e: + logger.error(f"Failed to unpin on IPFS: {e}") + return False + + +def is_pinned(cid: str) -> bool: + """ + Check if a CID is pinned on IPFS. + + Args: + cid: IPFS CID to check + + Returns: + True if pinned, False otherwise + """ + try: + with get_client() as client: + pins = client.pin.ls(type="recursive") + return cid in pins.get("Keys", {}) + except Exception as e: + logger.error(f"Failed to check pin status: {e}") + return False + + +def is_available() -> bool: + """ + Check if IPFS daemon is available. + + Returns: + True if IPFS is available, False otherwise + """ + try: + with get_client() as client: + client.id() + return True + except Exception: + return False diff --git a/requirements.txt b/requirements.txt index 278ea5a..067482e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,7 @@ fastapi>=0.109.0 uvicorn>=0.27.0 python-multipart>=0.0.6 PyYAML>=6.0 +asyncpg>=0.29.0 +ipfshttpclient>=0.8.0 # Core artdag from GitHub git+https://github.com/gilesbradshaw/art-dag.git diff --git a/server.py b/server.py index 2fc3a84..c949a3e 100644 --- a/server.py +++ b/server.py @@ -28,7 +28,9 @@ import yaml from celery_app import app as celery_app from tasks import render_effect, execute_dag, build_effect_dag +from contextlib import asynccontextmanager from cache_manager import L1CacheManager, get_cache_manager +import database # L2 server for auth verification L2_SERVER = os.environ.get("L2_SERVER", "http://localhost:8200") @@ -51,7 +53,7 @@ redis_client = redis.Redis( db=int(parsed.path.lstrip('/') or 0) ) RUNS_KEY_PREFIX = "artdag:run:" -CONFIGS_KEY_PREFIX = "artdag:config:" +RECIPES_KEY_PREFIX = "artdag:recipe:" def save_run(run: "RunStatus"): @@ -91,10 +93,21 @@ def find_runs_using_content(content_hash: str) -> list[tuple["RunStatus", str]]: return results +@asynccontextmanager +async def lifespan(app: FastAPI): + """Initialize and cleanup resources.""" + # Startup: initialize database + await database.init_db() + yield + # Shutdown: close database + await database.close_db() + + app = FastAPI( title="Art DAG L1 Server", description="Distributed rendering server for Art DAG", - version="0.1.0" + version="0.1.0", + lifespan=lifespan ) @@ -125,7 +138,7 @@ class RunStatus(BaseModel): infrastructure: Optional[dict] = None # Hardware/software used for rendering -# ============ Config Models ============ +# ============ Recipe Models ============ class VariableInput(BaseModel): """A variable input that must be filled at run time.""" @@ -142,9 +155,9 @@ class FixedInput(BaseModel): content_hash: str -class ConfigStatus(BaseModel): - """Status/metadata of a config.""" - config_id: str # Content hash of the YAML file +class RecipeStatus(BaseModel): + """Status/metadata of a recipe.""" + recipe_id: str # Content hash of the YAML file name: str version: str description: Optional[str] = None @@ -156,41 +169,41 @@ class ConfigStatus(BaseModel): uploader: Optional[str] = None -class ConfigRunRequest(BaseModel): - """Request to run a config with variable inputs.""" +class RecipeRunRequest(BaseModel): + """Request to run a recipe with variable inputs.""" inputs: dict[str, str] # node_id -> content_hash -def save_config(config: ConfigStatus): - """Save config to Redis.""" - redis_client.set(f"{CONFIGS_KEY_PREFIX}{config.config_id}", config.model_dump_json()) +def save_recipe(recipe: RecipeStatus): + """Save recipe to Redis.""" + redis_client.set(f"{RECIPES_KEY_PREFIX}{recipe.recipe_id}", recipe.model_dump_json()) -def load_config(config_id: str) -> Optional[ConfigStatus]: - """Load config from Redis.""" - data = redis_client.get(f"{CONFIGS_KEY_PREFIX}{config_id}") +def load_recipe(recipe_id: str) -> Optional[RecipeStatus]: + """Load recipe from Redis.""" + data = redis_client.get(f"{RECIPES_KEY_PREFIX}{recipe_id}") if data: - return ConfigStatus.model_validate_json(data) + return RecipeStatus.model_validate_json(data) return None -def list_all_configs() -> list[ConfigStatus]: - """List all configs from Redis.""" - configs = [] - for key in redis_client.scan_iter(f"{CONFIGS_KEY_PREFIX}*"): +def list_all_recipes() -> list[RecipeStatus]: + """List all recipes from Redis.""" + recipes = [] + for key in redis_client.scan_iter(f"{RECIPES_KEY_PREFIX}*"): data = redis_client.get(key) if data: - configs.append(ConfigStatus.model_validate_json(data)) - return sorted(configs, key=lambda c: c.uploaded_at, reverse=True) + recipes.append(RecipeStatus.model_validate_json(data)) + return sorted(recipes, key=lambda c: c.uploaded_at, reverse=True) -def delete_config_from_redis(config_id: str) -> bool: - """Delete config from Redis.""" - return redis_client.delete(f"{CONFIGS_KEY_PREFIX}{config_id}") > 0 +def delete_recipe_from_redis(recipe_id: str) -> bool: + """Delete recipe from Redis.""" + return redis_client.delete(f"{RECIPES_KEY_PREFIX}{recipe_id}") > 0 -def parse_config_yaml(yaml_content: str, config_hash: str, uploader: str) -> ConfigStatus: - """Parse a config YAML file and extract metadata.""" +def parse_recipe_yaml(yaml_content: str, recipe_hash: str, uploader: str) -> RecipeStatus: + """Parse a recipe YAML file and extract metadata.""" config = yaml.safe_load(yaml_content) # Extract basic info @@ -235,8 +248,8 @@ def parse_config_yaml(yaml_content: str, config_hash: str, uploader: str) -> Con content_hash=asset_info.get("hash", "") )) - return ConfigStatus( - config_id=config_hash, + return RecipeStatus( + recipe_id=recipe_hash, name=name, version=version, description=description, @@ -305,7 +318,7 @@ def cache_file(source: Path, node_type: str = "output") -> str: Uses artdag's Cache internally for proper tracking. """ - cached = cache_manager.put(source, node_type=node_type) + cached, ipfs_cid = cache_manager.put(source, node_type=node_type) return cached.content_hash @@ -520,7 +533,7 @@ async def discard_run(run_id: str, username: str = Depends(get_required_user)): Enforces deletion rules: - Cannot discard if output is published to L2 (pinned) - Deletes outputs and intermediate cache entries - - Preserves inputs (cache items and configs are NOT deleted) + - Preserves inputs (cache items and recipes are NOT deleted) """ run = load_run(run_id) if not run: @@ -987,11 +1000,11 @@ async def list_runs(request: Request, page: int = 1, limit: int = 20): } -# ============ Config Endpoints ============ +# ============ Recipe Endpoints ============ -@app.post("/configs/upload") -async def upload_config(file: UploadFile = File(...), username: str = Depends(get_required_user)): - """Upload a config YAML file. Requires authentication.""" +@app.post("/recipes/upload") +async def upload_recipe(file: UploadFile = File(...), username: str = Depends(get_required_user)): + """Upload a recipe YAML file. Requires authentication.""" import tempfile # Read file content @@ -999,7 +1012,7 @@ async def upload_config(file: UploadFile = File(...), username: str = Depends(ge try: yaml_content = content.decode('utf-8') except UnicodeDecodeError: - raise HTTPException(400, "Config file must be valid UTF-8 text") + raise HTTPException(400, "Recipe file must be valid UTF-8 text") # Validate YAML try: @@ -1012,51 +1025,51 @@ async def upload_config(file: UploadFile = File(...), username: str = Depends(ge tmp.write(content) tmp_path = Path(tmp.name) - cached = cache_manager.put(tmp_path, node_type="config", move=True) - config_hash = cached.content_hash + cached, ipfs_cid = cache_manager.put(tmp_path, node_type="recipe", move=True) + recipe_hash = cached.content_hash # Parse and save metadata actor_id = f"@{username}@{L2_DOMAIN}" try: - config_status = parse_config_yaml(yaml_content, config_hash, actor_id) + recipe_status = parse_recipe_yaml(yaml_content, recipe_hash, actor_id) except Exception as e: - raise HTTPException(400, f"Failed to parse config: {e}") + raise HTTPException(400, f"Failed to parse recipe: {e}") - save_config(config_status) + save_recipe(recipe_status) # Save cache metadata - save_cache_meta(config_hash, actor_id, file.filename, type="config", config_name=config_status.name) + save_cache_meta(recipe_hash, actor_id, file.filename, type="recipe", recipe_name=recipe_status.name) return { - "config_id": config_hash, - "name": config_status.name, - "version": config_status.version, - "variable_inputs": len(config_status.variable_inputs), - "fixed_inputs": len(config_status.fixed_inputs) + "recipe_id": recipe_hash, + "name": recipe_status.name, + "version": recipe_status.version, + "variable_inputs": len(recipe_status.variable_inputs), + "fixed_inputs": len(recipe_status.fixed_inputs) } -@app.get("/configs") -async def list_configs_api(request: Request, page: int = 1, limit: int = 20): - """List configs. HTML for browsers, JSON for APIs.""" +@app.get("/recipes") +async def list_recipes_api(request: Request, page: int = 1, limit: int = 20): + """List recipes. HTML for browsers, JSON for APIs.""" current_user = get_user_from_cookie(request) - all_configs = list_all_configs() - total = len(all_configs) + all_recipes = list_all_recipes() + total = len(all_recipes) # Pagination start = (page - 1) * limit end = start + limit - configs_page = all_configs[start:end] + recipes_page = all_recipes[start:end] has_more = end < total if wants_html(request): - # HTML response - redirect to /configs page with proper UI - return RedirectResponse(f"/configs?page={page}") + # HTML response - redirect to /recipes page with proper UI + return RedirectResponse(f"/recipes?page={page}") # JSON response for APIs return { - "configs": [c.model_dump() for c in configs_page], + "recipes": [c.model_dump() for c in recipes_page], "pagination": { "page": page, "limit": limit, @@ -1066,61 +1079,61 @@ async def list_configs_api(request: Request, page: int = 1, limit: int = 20): } -@app.get("/configs/{config_id}") -async def get_config_api(config_id: str): - """Get config details.""" - config = load_config(config_id) - if not config: - raise HTTPException(404, f"Config {config_id} not found") - return config +@app.get("/recipes/{recipe_id}") +async def get_recipe_api(recipe_id: str): + """Get recipe details.""" + recipe = load_recipe(recipe_id) + if not recipe: + raise HTTPException(404, f"Recipe {recipe_id} not found") + return recipe -@app.delete("/configs/{config_id}") -async def remove_config(config_id: str, username: str = Depends(get_required_user)): - """Delete a config. Requires authentication.""" - config = load_config(config_id) - if not config: - raise HTTPException(404, f"Config {config_id} not found") +@app.delete("/recipes/{recipe_id}") +async def remove_recipe(recipe_id: str, username: str = Depends(get_required_user)): + """Delete a recipe. Requires authentication.""" + recipe = load_recipe(recipe_id) + if not recipe: + raise HTTPException(404, f"Recipe {recipe_id} not found") # Check ownership actor_id = f"@{username}@{L2_DOMAIN}" - if config.uploader not in (username, actor_id): + if recipe.uploader not in (username, actor_id): raise HTTPException(403, "Access denied") # Check if pinned - pinned, reason = cache_manager.is_pinned(config_id) + pinned, reason = cache_manager.is_pinned(recipe_id) if pinned: - raise HTTPException(400, f"Cannot delete pinned config: {reason}") + raise HTTPException(400, f"Cannot delete pinned recipe: {reason}") # Delete from Redis and cache - delete_config_from_redis(config_id) - cache_manager.delete_by_content_hash(config_id) + delete_recipe_from_redis(recipe_id) + cache_manager.delete_by_content_hash(recipe_id) - return {"deleted": True, "config_id": config_id} + return {"deleted": True, "recipe_id": recipe_id} -@app.post("/configs/{config_id}/run") -async def run_config(config_id: str, request: ConfigRunRequest, username: str = Depends(get_required_user)): - """Run a config with provided variable inputs. Requires authentication.""" - config = load_config(config_id) - if not config: - raise HTTPException(404, f"Config {config_id} not found") +@app.post("/recipes/{recipe_id}/run") +async def run_recipe(recipe_id: str, request: RecipeRunRequest, username: str = Depends(get_required_user)): + """Run a recipe with provided variable inputs. Requires authentication.""" + recipe = load_recipe(recipe_id) + if not recipe: + raise HTTPException(404, f"Recipe {recipe_id} not found") # Validate all required inputs are provided - for var_input in config.variable_inputs: + for var_input in recipe.variable_inputs: if var_input.required and var_input.node_id not in request.inputs: raise HTTPException(400, f"Missing required input: {var_input.name}") - # Load config YAML - config_path = cache_manager.get_by_content_hash(config_id) - if not config_path: - raise HTTPException(500, "Config YAML not found in cache") + # Load recipe YAML + recipe_path = cache_manager.get_by_content_hash(recipe_id) + if not recipe_path: + raise HTTPException(500, "Recipe YAML not found in cache") - with open(config_path) as f: + with open(recipe_path) as f: yaml_config = yaml.safe_load(f) - # Build DAG from config - dag = build_dag_from_config(yaml_config, request.inputs, config) + # Build DAG from recipe + dag = build_dag_from_recipe(yaml_config, request.inputs, recipe) # Create run run_id = str(uuid.uuid4()) @@ -1128,16 +1141,16 @@ async def run_config(config_id: str, request: ConfigRunRequest, username: str = # Collect all input hashes all_inputs = list(request.inputs.values()) - for fixed in config.fixed_inputs: + for fixed in recipe.fixed_inputs: if fixed.content_hash: all_inputs.append(fixed.content_hash) run = RunStatus( run_id=run_id, status="pending", - recipe=f"config:{config.name}", + recipe=f"recipe:{recipe.name}", inputs=all_inputs, - output_name=f"{config.name}-{run_id[:8]}", + output_name=f"{recipe.name}-{run_id[:8]}", created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) @@ -1152,8 +1165,8 @@ async def run_config(config_id: str, request: ConfigRunRequest, username: str = return run -def build_dag_from_config(yaml_config: dict, user_inputs: dict[str, str], config: ConfigStatus): - """Build a DAG from config YAML with user-provided inputs.""" +def build_dag_from_recipe(yaml_config: dict, user_inputs: dict[str, str], recipe: RecipeStatus): + """Build a DAG from recipe YAML with user-provided inputs.""" from artdag import DAG, Node dag = DAG() @@ -1207,39 +1220,39 @@ def build_dag_from_config(yaml_config: dict, user_inputs: dict[str, str], config return dag -# ============ Config UI Pages ============ +# ============ Recipe UI Pages ============ -@app.get("/configs", response_class=HTMLResponse) -async def configs_page(request: Request, page: int = 1): - """Configs list page (HTML).""" +@app.get("/recipes", response_class=HTMLResponse) +async def recipes_page(request: Request, page: int = 1): + """Recipes list page (HTML).""" current_user = get_user_from_cookie(request) if not current_user: return HTMLResponse(render_page( - "Configs", - '

Login to see configs.

', + "Recipes", + '

Login to see recipes.

', None, - active_tab="configs" + active_tab="recipes" )) - all_configs = list_all_configs() + all_recipes = list_all_recipes() # Filter to user's configs actor_id = f"@{current_user}@{L2_DOMAIN}" - user_configs = [c for c in all_configs if c.uploader in (current_user, actor_id)] - total = len(user_configs) + user_recipes = [c for c in all_recipes if c.uploader in (current_user, actor_id)] + total = len(user_recipes) - if not user_configs: + if not user_recipes: content = ''' -

Configs (0)

-

No configs yet. Upload a config YAML file to get started.

+

Recipes (0)

+

No recipes yet. Upload a recipe YAML file to get started.

''' - return HTMLResponse(render_page("Configs", content, current_user, active_tab="configs")) + return HTMLResponse(render_page("Recipes", content, current_user, active_tab="recipes")) html_parts = [] - for config in user_configs: - var_count = len(config.variable_inputs) - fixed_count = len(config.fixed_inputs) + for recipe in user_recipes: + var_count = len(recipe.variable_inputs) + fixed_count = len(recipe.fixed_inputs) input_info = [] if var_count: input_info.append(f"{var_count} variable") @@ -1248,54 +1261,54 @@ async def configs_page(request: Request, page: int = 1): inputs_str = ", ".join(input_info) if input_info else "no inputs" html_parts.append(f''' - +
- {config.name} - v{config.version} + {recipe.name} + v{recipe.version}
{inputs_str}
- {config.description or "No description"} + {recipe.description or "No description"}
- {config.config_id[:24]}... + {recipe.recipe_id[:24]}...
''') content = f''' -

Configs ({total})

+

Recipes ({total})

{''.join(html_parts)}
''' - return HTMLResponse(render_page("Configs", content, current_user, active_tab="configs")) + return HTMLResponse(render_page("Recipes", content, current_user, active_tab="recipes")) -@app.get("/config/{config_id}", response_class=HTMLResponse) -async def config_detail_page(config_id: str, request: Request): - """Config detail page with run form.""" +@app.get("/recipe/{recipe_id}", response_class=HTMLResponse) +async def recipe_detail_page(recipe_id: str, request: Request): + """Recipe detail page with run form.""" current_user = get_user_from_cookie(request) - config = load_config(config_id) + recipe = load_recipe(recipe_id) - if not config: + if not recipe: return HTMLResponse(render_page( - "Config Not Found", - f'

Config {config_id} not found.

', + "Recipe Not Found", + f'

Recipe {recipe_id} not found.

', current_user, - active_tab="configs" + active_tab="recipes" ), status_code=404) # Build variable inputs form var_inputs_html = "" - if config.variable_inputs: + if recipe.variable_inputs: var_inputs_html = '
' - for var_input in config.variable_inputs: + for var_input in recipe.variable_inputs: required = "required" if var_input.required else "" var_inputs_html += f'''
@@ -1310,86 +1323,86 @@ async def config_detail_page(config_id: str, request: Request): ''' var_inputs_html += '
' else: - var_inputs_html = '

This config has no variable inputs - it uses fixed assets only.

' + var_inputs_html = '

This recipe has no variable inputs - it uses fixed assets only.

' # Build fixed inputs display fixed_inputs_html = "" - if config.fixed_inputs: + if recipe.fixed_inputs: fixed_inputs_html = '

Fixed Inputs

' # Check if pinned - pinned, pin_reason = cache_manager.is_pinned(config_id) + pinned, pin_reason = cache_manager.is_pinned(recipe_id) pinned_badge = "" if pinned: pinned_badge = f'Pinned: {pin_reason}' content = f'''
- ← Back to configs + ← Back to recipes
-

{config.name}

- v{config.version} +

{recipe.name}

+ v{recipe.version} {pinned_badge}
-

{config.description or 'No description'}

-
{config.config_id}
+

{recipe.description or 'No description'}

+
{recipe.recipe_id}
{fixed_inputs_html}
-

Run this Config

-
+

Run this Recipe

+ {var_inputs_html}
''' - return HTMLResponse(render_page(f"Config: {config.name}", content, current_user, active_tab="configs")) + return HTMLResponse(render_page(f"Recipe: {recipe.name}", content, current_user, active_tab="recipes")) -@app.post("/ui/configs/{config_id}/run", response_class=HTMLResponse) -async def ui_run_config(config_id: str, request: Request): - """HTMX handler: run a config with form inputs.""" +@app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse) +async def ui_run_recipe(recipe_id: str, request: Request): + """HTMX handler: run a recipe with form inputs.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' - config = load_config(config_id) - if not config: - return '
Config not found
' + recipe = load_recipe(recipe_id) + if not recipe: + return '
Recipe not found
' # Parse form data form_data = await request.form() inputs = {} - for var_input in config.variable_inputs: + for var_input in recipe.variable_inputs: value = form_data.get(var_input.node_id, "").strip() if var_input.required and not value: return f'
Missing required input: {var_input.name}
' if value: inputs[var_input.node_id] = value - # Load config YAML - config_path = cache_manager.get_by_content_hash(config_id) - if not config_path: - return '
Config YAML not found in cache
' + # Load recipe YAML + recipe_path = cache_manager.get_by_content_hash(recipe_id) + if not recipe_path: + return '
Recipe YAML not found in cache
' try: - with open(config_path) as f: + with open(recipe_path) as f: yaml_config = yaml.safe_load(f) - # Build DAG from config - dag = build_dag_from_config(yaml_config, inputs, config) + # Build DAG from recipe + dag = build_dag_from_recipe(yaml_config, inputs, recipe) # Create run run_id = str(uuid.uuid4()) @@ -1397,16 +1410,16 @@ async def ui_run_config(config_id: str, request: Request): # Collect all input hashes all_inputs = list(inputs.values()) - for fixed in config.fixed_inputs: + for fixed in recipe.fixed_inputs: if fixed.content_hash: all_inputs.append(fixed.content_hash) run = RunStatus( run_id=run_id, status="pending", - recipe=f"config:{config.name}", + recipe=f"recipe:{recipe.name}", inputs=all_inputs, - output_name=f"{config.name}-{run_id[:8]}", + output_name=f"{recipe.name}-{run_id[:8]}", created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) @@ -1428,27 +1441,27 @@ async def ui_run_config(config_id: str, request: Request): return f'
Error: {str(e)}
' -@app.get("/ui/configs-list", response_class=HTMLResponse) -async def ui_configs_list(request: Request): - """HTMX partial: list of configs.""" +@app.get("/ui/recipes-list", response_class=HTMLResponse) +async def ui_recipes_list(request: Request): + """HTMX partial: list of recipes.""" current_user = get_user_from_cookie(request) if not current_user: - return '

Login to see configs.

' + return '

Login to see recipes.

' - all_configs = list_all_configs() + all_recipes = list_all_recipes() # Filter to user's configs actor_id = f"@{current_user}@{L2_DOMAIN}" - user_configs = [c for c in all_configs if c.uploader in (current_user, actor_id)] + user_recipes = [c for c in all_recipes if c.uploader in (current_user, actor_id)] - if not user_configs: - return '

No configs yet. Upload a config YAML file to get started.

' + if not user_recipes: + return '

No recipes yet. Upload a recipe YAML file to get started.

' html_parts = ['
'] - for config in user_configs: - var_count = len(config.variable_inputs) - fixed_count = len(config.fixed_inputs) + for recipe in user_recipes: + var_count = len(recipe.variable_inputs) + fixed_count = len(recipe.fixed_inputs) input_info = [] if var_count: input_info.append(f"{var_count} variable") @@ -1457,20 +1470,20 @@ async def ui_configs_list(request: Request): inputs_str = ", ".join(input_info) if input_info else "no inputs" html_parts.append(f''' - +
- {config.name} - v{config.version} + {recipe.name} + v{recipe.version}
{inputs_str}
- {config.description or "No description"} + {recipe.description or "No description"}
- {config.config_id[:24]}... + {recipe.recipe_id[:24]}...
@@ -1480,34 +1493,34 @@ async def ui_configs_list(request: Request): return '\n'.join(html_parts) -@app.delete("/ui/configs/{config_id}/discard", response_class=HTMLResponse) -async def ui_discard_config(config_id: str, request: Request): - """HTMX handler: discard a config.""" +@app.delete("/ui/recipes/{recipe_id}/discard", response_class=HTMLResponse) +async def ui_discard_recipe(recipe_id: str, request: Request): + """HTMX handler: discard a recipe.""" current_user = get_user_from_cookie(request) if not current_user: return '
Login required
' - config = load_config(config_id) - if not config: - return '
Config not found
' + recipe = load_recipe(recipe_id) + if not recipe: + return '
Recipe not found
' # Check ownership actor_id = f"@{current_user}@{L2_DOMAIN}" - if config.uploader not in (current_user, actor_id): + if recipe.uploader not in (current_user, actor_id): return '
Access denied
' # Check if pinned - pinned, reason = cache_manager.is_pinned(config_id) + pinned, reason = cache_manager.is_pinned(recipe_id) if pinned: - return f'
Cannot delete: config is pinned ({reason})
' + return f'
Cannot delete: recipe is pinned ({reason})
' # Delete from Redis and cache - delete_config_from_redis(config_id) - cache_manager.delete_by_content_hash(config_id) + delete_recipe_from_redis(recipe_id) + cache_manager.delete_by_content_hash(recipe_id) return '''
- Config deleted. Back to configs + Recipe deleted. Back to recipes
''' @@ -2493,7 +2506,7 @@ async def upload_to_cache(file: UploadFile = File(...), username: str = Depends( tmp_path = Path(tmp.name) # Store in cache via cache_manager - cached = cache_manager.put(tmp_path, node_type="upload", move=True) + cached, ipfs_cid = cache_manager.put(tmp_path, node_type="upload", move=True) content_hash = cached.content_hash # Save uploader metadata @@ -2949,7 +2962,7 @@ def render_page(title: str, content: str, username: Optional[str] = None, active ''' runs_active = "border-b-2 border-blue-500 text-white" if active_tab == "runs" else "text-gray-400 hover:text-white" - configs_active = "border-b-2 border-blue-500 text-white" if active_tab == "configs" else "text-gray-400 hover:text-white" + recipes_active = "border-b-2 border-blue-500 text-white" if active_tab == "recipes" else "text-gray-400 hover:text-white" cache_active = "border-b-2 border-blue-500 text-white" if active_tab == "cache" else "text-gray-400 hover:text-white" return f""" @@ -2972,7 +2985,7 @@ def render_page(title: str, content: str, username: Optional[str] = None, active @@ -3003,13 +3016,13 @@ def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str: ''' runs_active = "border-b-2 border-blue-500 text-white" if tab == "runs" else "text-gray-400 hover:text-white" - configs_active = "border-b-2 border-blue-500 text-white" if tab == "configs" else "text-gray-400 hover:text-white" + recipes_active = "border-b-2 border-blue-500 text-white" if tab == "recipes" else "text-gray-400 hover:text-white" cache_active = "border-b-2 border-blue-500 text-white" if tab == "cache" else "text-gray-400 hover:text-white" if tab == "runs": content_url = "/ui/runs" - elif tab == "configs": - content_url = "/ui/configs-list" + elif tab == "recipes": + content_url = "/ui/recipes-list" else: content_url = "/ui/cache-list" @@ -3033,7 +3046,7 @@ def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str: @@ -3262,17 +3275,17 @@ async def ui_publish_run(run_id: str, request: Request, output_name: str = Form( for input_hash in run.inputs: save_cache_meta(input_hash, pinned=True, pin_reason="input_to_published") - # If this was a config-based run, pin the config and its fixed inputs - if run.recipe.startswith("config:"): - config_name = run.recipe.replace("config:", "") - for config in list_all_configs(): - if config.name == config_name: - # Pin the config YAML - cache_manager.pin(config.config_id, reason="config_for_published") - # Pin all fixed inputs referenced by the config - for fixed in config.fixed_inputs: + # If this was a recipe-based run, pin the recipe and its fixed inputs + if run.recipe.startswith("recipe:"): + config_name = run.recipe.replace("recipe:", "") + for recipe in list_all_recipes(): + if recipe.name == config_name: + # Pin the recipe YAML + cache_manager.pin(recipe.recipe_id, reason="recipe_for_published") + # Pin all fixed inputs referenced by the recipe + for fixed in recipe.fixed_inputs: if fixed.content_hash: - cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_config") + cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_recipe") break return HTMLResponse(f'''