diff --git a/cache_manager.py b/cache_manager.py index 04b3942..a0f0d3c 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -277,31 +277,25 @@ class L1CacheManager: def get_by_content_hash(self, content_hash: str) -> Optional[Path]: """Get cached file path by content_hash.""" - logger.info(f"get_by_content_hash({content_hash[:16]}...) - cache_dir={self.cache_dir}, nodes_dir={self.cache.cache_dir}") # Check index first (new cache structure) node_id = self._content_index.get(content_hash) if node_id: - logger.info(f" Found in content_index: node_id={node_id[:16]}...") path = self.cache.get(node_id) if path and path.exists(): logger.info(f" Found via index: {path}") return path - logger.info(f" Index entry but path not found: {path}") # For uploads, node_id == content_hash, so try direct lookup # This works even if cache index hasn't been reloaded - logger.info(f" Trying direct lookup with content_hash as node_id...") path = self.cache.get(content_hash) logger.info(f" cache.get({content_hash[:16]}...) returned: {path}") if path and path.exists(): - logger.info(f" Found via direct lookup: {path}") self._content_index[content_hash] = content_hash self._save_content_index() return path # Scan cache entries (fallback for new structure) - logger.info(f" Trying find_by_content_hash...") entry = self.cache.find_by_content_hash(content_hash) if entry and entry.output_path.exists(): logger.info(f" Found via scan: {entry.output_path}") @@ -311,12 +305,9 @@ class L1CacheManager: # Check legacy location (files stored directly as CACHE_DIR/{content_hash}) legacy_path = self.cache_dir / content_hash - logger.info(f" Checking legacy path: {legacy_path}, exists={legacy_path.exists()}") if legacy_path.exists() and legacy_path.is_file(): - logger.info(f" Found at legacy location: {legacy_path}") return legacy_path - logger.info(f" NOT FOUND anywhere") return None def has_content(self, content_hash: str) -> bool: diff --git a/requirements.txt b/requirements.txt index 880bacf..278ea5a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,6 @@ requests>=2.31.0 fastapi>=0.109.0 uvicorn>=0.27.0 python-multipart>=0.0.6 +PyYAML>=6.0 # Core artdag from GitHub git+https://github.com/gilesbradshaw/art-dag.git diff --git a/server.py b/server.py index 78b54d3..ef9909c 100644 --- a/server.py +++ b/server.py @@ -24,6 +24,7 @@ from pydantic import BaseModel import redis import requests as http_requests from urllib.parse import urlparse +import yaml from celery_app import app as celery_app from tasks import render_effect, execute_dag, build_effect_dag @@ -50,6 +51,7 @@ redis_client = redis.Redis( db=int(parsed.path.lstrip('/') or 0) ) RUNS_KEY_PREFIX = "artdag:run:" +CONFIGS_KEY_PREFIX = "artdag:config:" def save_run(run: "RunStatus"): @@ -123,6 +125,130 @@ class RunStatus(BaseModel): infrastructure: Optional[dict] = None # Hardware/software used for rendering +# ============ Config Models ============ + +class VariableInput(BaseModel): + """A variable input that must be filled at run time.""" + node_id: str + name: str + description: Optional[str] = None + required: bool = True + + +class FixedInput(BaseModel): + """A fixed input resolved from the registry.""" + node_id: str + asset: str + content_hash: str + + +class ConfigStatus(BaseModel): + """Status/metadata of a config.""" + config_id: str # Content hash of the YAML file + name: str + version: str + description: Optional[str] = None + variable_inputs: list[VariableInput] + fixed_inputs: list[FixedInput] + output_node: str + owner: Optional[str] = None + uploaded_at: str + uploader: Optional[str] = None + + +class ConfigRunRequest(BaseModel): + """Request to run a config with variable inputs.""" + inputs: dict[str, str] # node_id -> content_hash + + +def save_config(config: ConfigStatus): + """Save config to Redis.""" + redis_client.set(f"{CONFIGS_KEY_PREFIX}{config.config_id}", config.model_dump_json()) + + +def load_config(config_id: str) -> Optional[ConfigStatus]: + """Load config from Redis.""" + data = redis_client.get(f"{CONFIGS_KEY_PREFIX}{config_id}") + if data: + return ConfigStatus.model_validate_json(data) + return None + + +def list_all_configs() -> list[ConfigStatus]: + """List all configs from Redis.""" + configs = [] + for key in redis_client.scan_iter(f"{CONFIGS_KEY_PREFIX}*"): + data = redis_client.get(key) + if data: + configs.append(ConfigStatus.model_validate_json(data)) + return sorted(configs, key=lambda c: c.uploaded_at, reverse=True) + + +def delete_config_from_redis(config_id: str) -> bool: + """Delete config from Redis.""" + return redis_client.delete(f"{CONFIGS_KEY_PREFIX}{config_id}") > 0 + + +def parse_config_yaml(yaml_content: str, config_hash: str, uploader: str) -> ConfigStatus: + """Parse a config YAML file and extract metadata.""" + config = yaml.safe_load(yaml_content) + + # Extract basic info + name = config.get("name", "unnamed") + version = config.get("version", "1.0") + description = config.get("description") + owner = config.get("owner") + + # Parse registry + registry = config.get("registry", {}) + assets = registry.get("assets", {}) + + # Parse DAG nodes + dag = config.get("dag", {}) + nodes = dag.get("nodes", []) + output_node = dag.get("output") + + variable_inputs = [] + fixed_inputs = [] + + for node in nodes: + node_id = node.get("id") + node_type = node.get("type") + node_config = node.get("config", {}) + + if node_type == "SOURCE": + if node_config.get("input"): + # Variable input + variable_inputs.append(VariableInput( + node_id=node_id, + name=node_config.get("name", node_id), + description=node_config.get("description"), + required=node_config.get("required", True) + )) + elif "asset" in node_config: + # Fixed input - resolve from registry + asset_name = node_config["asset"] + asset_info = assets.get(asset_name, {}) + fixed_inputs.append(FixedInput( + node_id=node_id, + asset=asset_name, + content_hash=asset_info.get("hash", "") + )) + + return ConfigStatus( + config_id=config_hash, + name=name, + version=version, + description=description, + variable_inputs=variable_inputs, + fixed_inputs=fixed_inputs, + output_node=output_node or "", + owner=owner, + uploaded_at=datetime.now(timezone.utc).isoformat(), + uploader=uploader + ) + + # ============ Auth ============ security = HTTPBearer(auto_error=False) @@ -280,42 +406,6 @@ async def root(): return HOME_HTML -@app.get("/debug/cache/{content_hash}") -async def debug_cache(content_hash: str): - """Debug endpoint to check cache status for a content hash.""" - import os - - result = { - "content_hash": content_hash, - "cache_dir": str(cache_manager.cache_dir), - "nodes_dir": str(cache_manager.cache.cache_dir), - "in_content_index": content_hash in cache_manager._content_index, - "node_id_from_index": cache_manager._content_index.get(content_hash), - } - - # Check various locations - locations = { - "legacy_direct": cache_manager.cache_dir / content_hash, - "nodes_dir": cache_manager.cache.cache_dir / content_hash, - } - - for name, path in locations.items(): - result[f"{name}_path"] = str(path) - result[f"{name}_exists"] = path.exists() - if path.exists() and path.is_dir(): - result[f"{name}_contents"] = [f.name for f in path.iterdir()] - - # Check if artdag cache has it - result["artdag_cache_get"] = str(cache_manager.cache.get(content_hash)) - - # Check via cache_manager - found_path = cache_manager.get_by_content_hash(content_hash) - result["cache_manager_path"] = str(found_path) if found_path else None - result["has_content"] = cache_manager.has_content(content_hash) - - return result - - @app.post("/runs", response_model=RunStatus) async def create_run(request: RunRequest, username: str = Depends(get_required_user)): """Start a new rendering run. Requires authentication.""" @@ -914,6 +1004,531 @@ async def list_runs(request: Request, page: int = 1, limit: int = 20): } +# ============ Config Endpoints ============ + +@app.post("/configs/upload") +async def upload_config(file: UploadFile = File(...), username: str = Depends(get_required_user)): + """Upload a config YAML file. Requires authentication.""" + import tempfile + + # Read file content + content = await file.read() + try: + yaml_content = content.decode('utf-8') + except UnicodeDecodeError: + raise HTTPException(400, "Config file must be valid UTF-8 text") + + # Validate YAML + try: + yaml.safe_load(yaml_content) + except yaml.YAMLError as e: + raise HTTPException(400, f"Invalid YAML: {e}") + + # Store YAML file in cache + with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp: + tmp.write(content) + tmp_path = Path(tmp.name) + + cached = cache_manager.put(tmp_path, node_type="config", move=True) + config_hash = cached.content_hash + + # Parse and save metadata + actor_id = f"@{username}@{L2_DOMAIN}" + try: + config_status = parse_config_yaml(yaml_content, config_hash, actor_id) + except Exception as e: + raise HTTPException(400, f"Failed to parse config: {e}") + + save_config(config_status) + + # Save cache metadata + save_cache_meta(config_hash, actor_id, file.filename, type="config", config_name=config_status.name) + + return { + "config_id": config_hash, + "name": config_status.name, + "version": config_status.version, + "variable_inputs": len(config_status.variable_inputs), + "fixed_inputs": len(config_status.fixed_inputs) + } + + +@app.get("/configs") +async def list_configs_api(request: Request, page: int = 1, limit: int = 20): + """List configs. HTML for browsers, JSON for APIs.""" + current_user = get_user_from_cookie(request) + + all_configs = list_all_configs() + total = len(all_configs) + + # Pagination + start = (page - 1) * limit + end = start + limit + configs_page = all_configs[start:end] + has_more = end < total + + if wants_html(request): + # HTML response - redirect to /configs page with proper UI + return RedirectResponse(f"/configs?page={page}") + + # JSON response for APIs + return { + "configs": [c.model_dump() for c in configs_page], + "pagination": { + "page": page, + "limit": limit, + "total": total, + "has_more": has_more + } + } + + +@app.get("/configs/{config_id}") +async def get_config_api(config_id: str): + """Get config details.""" + config = load_config(config_id) + if not config: + raise HTTPException(404, f"Config {config_id} not found") + return config + + +@app.delete("/configs/{config_id}") +async def remove_config(config_id: str, username: str = Depends(get_required_user)): + """Delete a config. Requires authentication.""" + config = load_config(config_id) + if not config: + raise HTTPException(404, f"Config {config_id} not found") + + # Check ownership + actor_id = f"@{username}@{L2_DOMAIN}" + if config.uploader not in (username, actor_id): + raise HTTPException(403, "Access denied") + + # Check if pinned + pinned, reason = cache_manager.is_pinned(config_id) + if pinned: + raise HTTPException(400, f"Cannot delete pinned config: {reason}") + + # Delete from Redis and cache + delete_config_from_redis(config_id) + cache_manager.delete_by_content_hash(config_id) + + return {"deleted": True, "config_id": config_id} + + +@app.post("/configs/{config_id}/run") +async def run_config(config_id: str, request: ConfigRunRequest, username: str = Depends(get_required_user)): + """Run a config with provided variable inputs. Requires authentication.""" + config = load_config(config_id) + if not config: + raise HTTPException(404, f"Config {config_id} not found") + + # Validate all required inputs are provided + for var_input in config.variable_inputs: + if var_input.required and var_input.node_id not in request.inputs: + raise HTTPException(400, f"Missing required input: {var_input.name}") + + # Load config YAML + config_path = cache_manager.get_by_content_hash(config_id) + if not config_path: + raise HTTPException(500, "Config YAML not found in cache") + + with open(config_path) as f: + yaml_config = yaml.safe_load(f) + + # Build DAG from config + dag = build_dag_from_config(yaml_config, request.inputs, config) + + # Create run + run_id = str(uuid.uuid4()) + actor_id = f"@{username}@{L2_DOMAIN}" + + # Collect all input hashes + all_inputs = list(request.inputs.values()) + for fixed in config.fixed_inputs: + if fixed.content_hash: + all_inputs.append(fixed.content_hash) + + run = RunStatus( + run_id=run_id, + status="pending", + recipe=f"config:{config.name}", + inputs=all_inputs, + output_name=f"{config.name}-{run_id[:8]}", + created_at=datetime.now(timezone.utc).isoformat(), + username=actor_id + ) + + # Submit to Celery + dag_json = dag.to_json() + task = execute_dag.delay(dag_json, run.run_id) + run.celery_task_id = task.id + run.status = "running" + + save_run(run) + return run + + +def build_dag_from_config(yaml_config: dict, user_inputs: dict[str, str], config: ConfigStatus): + """Build a DAG from config YAML with user-provided inputs.""" + from artdag import DAG, Node + + dag = DAG() + node_map = {} # node_id -> Node + + registry = yaml_config.get("registry", {}) + assets = registry.get("assets", {}) + effects = registry.get("effects", {}) + dag_config = yaml_config.get("dag", {}) + nodes = dag_config.get("nodes", []) + + for node_def in nodes: + node_id = node_def.get("id") + node_type = node_def.get("type") + node_config = node_def.get("config", {}) + input_ids = node_def.get("inputs", []) + + if node_type == "SOURCE": + if node_config.get("input"): + # Variable input - use user-provided hash + content_hash = user_inputs.get(node_id) + if not content_hash: + raise HTTPException(400, f"Missing input for node {node_id}") + node = Node(node_id, "SOURCE", {"content_hash": content_hash}) + else: + # Fixed input - use registry hash + asset_name = node_config.get("asset") + asset_info = assets.get(asset_name, {}) + content_hash = asset_info.get("hash") + if not content_hash: + raise HTTPException(400, f"Asset {asset_name} not found in registry") + node = Node(node_id, "SOURCE", {"content_hash": content_hash}) + elif node_type == "EFFECT": + effect_name = node_config.get("effect") + effect_info = effects.get(effect_name, {}) + effect_hash = effect_info.get("hash") + node = Node(node_id, "EFFECT", {"effect": effect_name, "effect_hash": effect_hash}) + else: + node = Node(node_id, node_type, node_config) + + node_map[node_id] = node + dag.add_node(node) + + # Connect edges + for node_def in nodes: + node_id = node_def.get("id") + input_ids = node_def.get("inputs", []) + for input_id in input_ids: + dag.add_edge(input_id, node_id) + + return dag + + +# ============ Config UI Pages ============ + +@app.get("/configs", response_class=HTMLResponse) +async def configs_page(request: Request, page: int = 1): + """Configs list page (HTML).""" + current_user = get_user_from_cookie(request) + + if not current_user: + return HTMLResponse(render_page( + "Configs", + '

Login to see configs.

', + None, + active_tab="configs" + )) + + all_configs = list_all_configs() + + # Filter to user's configs + actor_id = f"@{current_user}@{L2_DOMAIN}" + user_configs = [c for c in all_configs if c.uploader in (current_user, actor_id)] + total = len(user_configs) + + if not user_configs: + content = ''' +

Configs (0)

+

No configs yet. Upload a config YAML file to get started.

+ ''' + return HTMLResponse(render_page("Configs", content, current_user, active_tab="configs")) + + html_parts = [] + for config in user_configs: + var_count = len(config.variable_inputs) + fixed_count = len(config.fixed_inputs) + input_info = [] + if var_count: + input_info.append(f"{var_count} variable") + if fixed_count: + input_info.append(f"{fixed_count} fixed") + inputs_str = ", ".join(input_info) if input_info else "no inputs" + + html_parts.append(f''' + +
+
+
+ {config.name} + v{config.version} +
+ {inputs_str} +
+
+ {config.description or "No description"} +
+
+ {config.config_id[:24]}... +
+
+
+ ''') + + content = f''' +

Configs ({total})

+
+ {''.join(html_parts)} +
+ ''' + + return HTMLResponse(render_page("Configs", content, current_user, active_tab="configs")) + + +@app.get("/config/{config_id}", response_class=HTMLResponse) +async def config_detail_page(config_id: str, request: Request): + """Config detail page with run form.""" + current_user = get_user_from_cookie(request) + config = load_config(config_id) + + if not config: + return HTMLResponse(render_page( + "Config Not Found", + f'

Config {config_id} not found.

', + current_user, + active_tab="configs" + ), status_code=404) + + # Build variable inputs form + var_inputs_html = "" + if config.variable_inputs: + var_inputs_html = '
' + for var_input in config.variable_inputs: + required = "required" if var_input.required else "" + var_inputs_html += f''' +
+ + +

{var_input.description or 'Enter a content hash from your cache'}

+
+ ''' + var_inputs_html += '
' + else: + var_inputs_html = '

This config has no variable inputs - it uses fixed assets only.

' + + # Build fixed inputs display + fixed_inputs_html = "" + if config.fixed_inputs: + fixed_inputs_html = '

Fixed Inputs

' + + # Check if pinned + pinned, pin_reason = cache_manager.is_pinned(config_id) + pinned_badge = "" + if pinned: + pinned_badge = f'Pinned: {pin_reason}' + + content = f''' +
+ ← Back to configs +
+ +
+
+

{config.name}

+ v{config.version} + {pinned_badge} +
+

{config.description or 'No description'}

+
{config.config_id}
+ {fixed_inputs_html} +
+ +
+

Run this Config

+
+ {var_inputs_html} +
+ +
+
+ ''' + + return HTMLResponse(render_page(f"Config: {config.name}", content, current_user, active_tab="configs")) + + +@app.post("/ui/configs/{config_id}/run", response_class=HTMLResponse) +async def ui_run_config(config_id: str, request: Request): + """HTMX handler: run a config with form inputs.""" + current_user = get_user_from_cookie(request) + if not current_user: + return '
Login required
' + + config = load_config(config_id) + if not config: + return '
Config not found
' + + # Parse form data + form_data = await request.form() + inputs = {} + for var_input in config.variable_inputs: + value = form_data.get(var_input.node_id, "").strip() + if var_input.required and not value: + return f'
Missing required input: {var_input.name}
' + if value: + inputs[var_input.node_id] = value + + # Load config YAML + config_path = cache_manager.get_by_content_hash(config_id) + if not config_path: + return '
Config YAML not found in cache
' + + try: + with open(config_path) as f: + yaml_config = yaml.safe_load(f) + + # Build DAG from config + dag = build_dag_from_config(yaml_config, inputs, config) + + # Create run + run_id = str(uuid.uuid4()) + actor_id = f"@{current_user}@{L2_DOMAIN}" + + # Collect all input hashes + all_inputs = list(inputs.values()) + for fixed in config.fixed_inputs: + if fixed.content_hash: + all_inputs.append(fixed.content_hash) + + run = RunStatus( + run_id=run_id, + status="pending", + recipe=f"config:{config.name}", + inputs=all_inputs, + output_name=f"{config.name}-{run_id[:8]}", + created_at=datetime.now(timezone.utc).isoformat(), + username=actor_id + ) + + # Submit to Celery + dag_json = dag.to_json() + task = execute_dag.delay(dag_json, run.run_id) + run.celery_task_id = task.id + run.status = "running" + + save_run(run) + + return f''' +
+ Run started! View run +
+ ''' + except Exception as e: + return f'
Error: {str(e)}
' + + +@app.get("/ui/configs-list", response_class=HTMLResponse) +async def ui_configs_list(request: Request): + """HTMX partial: list of configs.""" + current_user = get_user_from_cookie(request) + + if not current_user: + return '

Login to see configs.

' + + all_configs = list_all_configs() + + # Filter to user's configs + actor_id = f"@{current_user}@{L2_DOMAIN}" + user_configs = [c for c in all_configs if c.uploader in (current_user, actor_id)] + + if not user_configs: + return '

No configs yet. Upload a config YAML file to get started.

' + + html_parts = ['
'] + for config in user_configs: + var_count = len(config.variable_inputs) + fixed_count = len(config.fixed_inputs) + input_info = [] + if var_count: + input_info.append(f"{var_count} variable") + if fixed_count: + input_info.append(f"{fixed_count} fixed") + inputs_str = ", ".join(input_info) if input_info else "no inputs" + + html_parts.append(f''' + +
+
+
+ {config.name} + v{config.version} +
+ {inputs_str} +
+
+ {config.description or "No description"} +
+
+ {config.config_id[:24]}... +
+
+
+ ''') + + html_parts.append('
') + return '\n'.join(html_parts) + + +@app.delete("/ui/configs/{config_id}/discard", response_class=HTMLResponse) +async def ui_discard_config(config_id: str, request: Request): + """HTMX handler: discard a config.""" + current_user = get_user_from_cookie(request) + if not current_user: + return '
Login required
' + + config = load_config(config_id) + if not config: + return '
Config not found
' + + # Check ownership + actor_id = f"@{current_user}@{L2_DOMAIN}" + if config.uploader not in (current_user, actor_id): + return '
Access denied
' + + # Check if pinned + pinned, reason = cache_manager.is_pinned(config_id) + if pinned: + return f'
Cannot delete: config is pinned ({reason})
' + + # Delete from Redis and cache + delete_config_from_redis(config_id) + cache_manager.delete_by_content_hash(config_id) + + return ''' +
+ Config deleted. Back to configs +
+ ''' + + @app.get("/cache/{content_hash}") async def get_cached(content_hash: str): """Get cached content by hash.""" @@ -2317,6 +2932,7 @@ def render_page(title: str, content: str, username: Optional[str] = None, active ''' runs_active = "border-b-2 border-blue-500 text-white" if active_tab == "runs" else "text-gray-400 hover:text-white" + configs_active = "border-b-2 border-blue-500 text-white" if active_tab == "configs" else "text-gray-400 hover:text-white" cache_active = "border-b-2 border-blue-500 text-white" if active_tab == "cache" else "text-gray-400 hover:text-white" return f""" @@ -2339,6 +2955,7 @@ def render_page(title: str, content: str, username: Optional[str] = None, active @@ -2369,9 +2986,15 @@ def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str: ''' runs_active = "border-b-2 border-blue-500 text-white" if tab == "runs" else "text-gray-400 hover:text-white" + configs_active = "border-b-2 border-blue-500 text-white" if tab == "configs" else "text-gray-400 hover:text-white" cache_active = "border-b-2 border-blue-500 text-white" if tab == "cache" else "text-gray-400 hover:text-white" - content_url = "/ui/runs" if tab == "runs" else "/ui/cache-list" + if tab == "runs": + content_url = "/ui/runs" + elif tab == "configs": + content_url = "/ui/configs-list" + else: + content_url = "/ui/cache-list" return f""" @@ -2393,6 +3016,7 @@ def render_ui_html(username: Optional[str] = None, tab: str = "runs") -> str: @@ -2621,6 +3245,19 @@ async def ui_publish_run(run_id: str, request: Request, output_name: str = Form( for input_hash in run.inputs: save_cache_meta(input_hash, pinned=True, pin_reason="input_to_published") + # If this was a config-based run, pin the config and its fixed inputs + if run.recipe.startswith("config:"): + config_name = run.recipe.replace("config:", "") + for config in list_all_configs(): + if config.name == config_name: + # Pin the config YAML + cache_manager.pin(config.config_id, reason="config_for_published") + # Pin all fixed inputs referenced by the config + for fixed in config.fixed_inputs: + if fixed.content_hash: + cache_manager.pin(fixed.content_hash, reason="fixed_input_in_published_config") + break + return HTMLResponse(f'''
Published to L2 as {result["asset"]["name"]}