diff --git a/database.py b/database.py index 074bb71..04e6c70 100644 --- a/database.py +++ b/database.py @@ -71,6 +71,19 @@ CREATE TABLE IF NOT EXISTS l2_shares ( UNIQUE(content_hash, actor_id, l2_server, content_type) ); +-- Run cache: maps content-addressable run_id to output +-- run_id is a hash of (sorted inputs + recipe), making runs deterministic +CREATE TABLE IF NOT EXISTS run_cache ( + run_id VARCHAR(64) PRIMARY KEY, + output_hash VARCHAR(64) NOT NULL, + ipfs_cid VARCHAR(128), + provenance_cid VARCHAR(128), + recipe VARCHAR(255) NOT NULL, + inputs JSONB NOT NULL, + actor_id VARCHAR(255), + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + -- Indexes CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash); CREATE INDEX IF NOT EXISTS idx_item_types_actor_id ON item_types(actor_id); @@ -79,6 +92,7 @@ CREATE INDEX IF NOT EXISTS idx_item_types_path ON item_types(path); CREATE INDEX IF NOT EXISTS idx_pin_reasons_item_type ON pin_reasons(item_type_id); CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash); CREATE INDEX IF NOT EXISTS idx_l2_shares_actor_id ON l2_shares(actor_id); +CREATE INDEX IF NOT EXISTS idx_run_cache_output ON run_cache(output_hash); """ @@ -988,3 +1002,88 @@ async def count_user_items(actor_id: str, item_type: Optional[str] = None) -> in "SELECT COUNT(DISTINCT content_hash) FROM item_types WHERE actor_id = $1", actor_id ) + + +# ============ Run Cache ============ + +async def get_run_cache(run_id: str) -> Optional[dict]: + """Get cached run result by content-addressable run_id.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at + FROM run_cache WHERE run_id = $1 + """, + run_id + ) + if row: + return { + "run_id": row["run_id"], + "output_hash": row["output_hash"], + "ipfs_cid": row["ipfs_cid"], + "provenance_cid": row["provenance_cid"], + "recipe": row["recipe"], + "inputs": row["inputs"], + "actor_id": row["actor_id"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + } + return None + + +async def save_run_cache( + run_id: str, + output_hash: str, + recipe: str, + inputs: List[str], + ipfs_cid: Optional[str] = None, + provenance_cid: Optional[str] = None, + actor_id: Optional[str] = None, +) -> dict: + """Save run result to cache. Updates if run_id already exists.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO run_cache (run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (run_id) DO UPDATE SET + output_hash = EXCLUDED.output_hash, + ipfs_cid = COALESCE(EXCLUDED.ipfs_cid, run_cache.ipfs_cid), + provenance_cid = COALESCE(EXCLUDED.provenance_cid, run_cache.provenance_cid) + RETURNING run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at + """, + run_id, output_hash, ipfs_cid, provenance_cid, recipe, _json.dumps(inputs), actor_id + ) + return { + "run_id": row["run_id"], + "output_hash": row["output_hash"], + "ipfs_cid": row["ipfs_cid"], + "provenance_cid": row["provenance_cid"], + "recipe": row["recipe"], + "inputs": row["inputs"], + "actor_id": row["actor_id"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + } + + +async def get_run_by_output(output_hash: str) -> Optional[dict]: + """Get run cache entry by output hash.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at + FROM run_cache WHERE output_hash = $1 + """, + output_hash + ) + if row: + return { + "run_id": row["run_id"], + "output_hash": row["output_hash"], + "ipfs_cid": row["ipfs_cid"], + "provenance_cid": row["provenance_cid"], + "recipe": row["recipe"], + "inputs": row["inputs"], + "actor_id": row["actor_id"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + } + return None diff --git a/server.py b/server.py index 3def56f..b51c7c7 100644 --- a/server.py +++ b/server.py @@ -47,6 +47,25 @@ import database # L1 public URL for redirects L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100") + +def compute_run_id(input_hashes: list[str], recipe: str, recipe_hash: str = None) -> str: + """ + Compute a deterministic run_id from inputs and recipe. + + The run_id is a SHA3-256 hash of: + - Sorted input content hashes + - Recipe identifier (recipe_hash if provided, else "effect:{recipe}") + + This makes runs content-addressable: same inputs + recipe = same run_id. + """ + data = { + "inputs": sorted(input_hashes), + "recipe": recipe_hash or f"effect:{recipe}", + "version": "1", # For future schema changes + } + json_str = json.dumps(data, sort_keys=True, separators=(",", ":")) + return hashlib.sha3_256(json_str.encode()).hexdigest() + # Default L2 for login redirect when not logged in (user can login to any L2) DEFAULT_L2_SERVER = os.environ.get("DEFAULT_L2_SERVER", "http://localhost:8200") @@ -597,8 +616,9 @@ async def root(request: Request): @app.post("/runs", response_model=RunStatus) async def create_run(request: RunRequest, ctx: UserContext = Depends(get_required_user_context)): - """Start a new rendering run. Requires authentication.""" - run_id = str(uuid.uuid4()) + """Start a new rendering run. Checks cache before executing.""" + # Compute content-addressable run_id + run_id = compute_run_id(request.inputs, request.recipe) # Generate output name if not provided output_name = request.output_name or f"{request.recipe}-{run_id[:8]}" @@ -606,7 +626,78 @@ async def create_run(request: RunRequest, ctx: UserContext = Depends(get_require # Use actor_id from user context actor_id = ctx.actor_id - # Create run record + # Check L1 cache first + cached_run = await database.get_run_cache(run_id) + if cached_run: + output_hash = cached_run["output_hash"] + # Verify the output file still exists in cache + if cache_manager.has_content(output_hash): + logger.info(f"create_run: Cache hit for run_id={run_id[:16]}... output={output_hash[:16]}...") + return RunStatus( + run_id=run_id, + status="completed", + recipe=request.recipe, + inputs=request.inputs, + output_name=output_name, + created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()), + completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()), + output_hash=output_hash, + username=actor_id, + provenance_cid=cached_run.get("provenance_cid"), + ) + else: + logger.info(f"create_run: Cache entry exists but output missing, will re-run") + + # Check L2 if not in L1 + l2_server = ctx.l2_server + try: + l2_resp = http_requests.get( + f"{l2_server}/assets/by-run-id/{run_id}", + timeout=10 + ) + if l2_resp.status_code == 200: + l2_data = l2_resp.json() + output_hash = l2_data.get("output_hash") + ipfs_cid = l2_data.get("ipfs_cid") + if output_hash and ipfs_cid: + logger.info(f"create_run: Found on L2, pulling from IPFS: {ipfs_cid}") + # Pull from IPFS to L1 cache + import ipfs_client + legacy_dir = CACHE_DIR / "legacy" + legacy_dir.mkdir(parents=True, exist_ok=True) + recovery_path = legacy_dir / output_hash + if ipfs_client.get_file(ipfs_cid, str(recovery_path)): + # File retrieved - put() updates indexes, but file is already in legacy location + # Just update the content and IPFS indexes manually + cache_manager._set_content_index(output_hash, output_hash) + cache_manager._set_ipfs_index(output_hash, ipfs_cid) + # Save to run cache + await database.save_run_cache( + run_id=run_id, + output_hash=output_hash, + recipe=request.recipe, + inputs=request.inputs, + ipfs_cid=ipfs_cid, + provenance_cid=l2_data.get("provenance_cid"), + actor_id=actor_id, + ) + logger.info(f"create_run: Recovered from L2/IPFS: {output_hash[:16]}...") + return RunStatus( + run_id=run_id, + status="completed", + recipe=request.recipe, + inputs=request.inputs, + output_name=output_name, + created_at=datetime.now(timezone.utc).isoformat(), + completed_at=datetime.now(timezone.utc).isoformat(), + output_hash=output_hash, + username=actor_id, + provenance_cid=l2_data.get("provenance_cid"), + ) + except Exception as e: + logger.warning(f"create_run: L2 lookup failed (will run Celery): {e}") + + # Not cached anywhere - create run record and submit to Celery run = RunStatus( run_id=run_id, status="pending", @@ -716,6 +807,20 @@ async def get_run(run_id: str): output_hash=run.output_hash, run_id=run.run_id, ) + + # Save to run cache for content-addressable lookup + if run.output_hash: + ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash) + await database.save_run_cache( + run_id=run.run_id, + output_hash=run.output_hash, + recipe=run.recipe, + inputs=run.inputs, + ipfs_cid=ipfs_cid, + provenance_cid=run.provenance_cid, + actor_id=run.username, + ) + logger.info(f"get_run: Saved run cache for {run.run_id[:16]}...") else: run.status = "failed" run.error = error @@ -842,6 +947,18 @@ async def run_detail(run_id: str, request: Request): output_path = Path(result.get("output", {}).get("local_path", "")) if output_path.exists(): await cache_file(output_path) + # Save to run cache for content-addressable lookup + if run.output_hash: + ipfs_cid = cache_manager._get_ipfs_cid_from_index(run.output_hash) + await database.save_run_cache( + run_id=run.run_id, + output_hash=run.output_hash, + recipe=run.recipe, + inputs=run.inputs, + ipfs_cid=ipfs_cid, + provenance_cid=run.provenance_cid, + actor_id=run.username, + ) else: run.status = "failed" run.error = error @@ -4073,6 +4190,22 @@ async def ui_run_partial(run_id: str, request: Request): return html +# ============ Client Download ============ + +CLIENT_TARBALL = Path(__file__).parent / "artdag-client.tar.gz" + +@app.get("/download/client") +async def download_client(): + """Download the Art DAG CLI client.""" + if not CLIENT_TARBALL.exists(): + raise HTTPException(404, "Client package not found") + return FileResponse( + CLIENT_TARBALL, + media_type="application/gzip", + filename="artdag-client.tar.gz" + ) + + if __name__ == "__main__": import uvicorn # Workers enabled - cache indexes shared via Redis