diff --git a/tasks/analyze_cid.py b/tasks/analyze_cid.py new file mode 100644 index 0000000..673c902 --- /dev/null +++ b/tasks/analyze_cid.py @@ -0,0 +1,209 @@ +""" +IPFS-primary analysis tasks. + +Fetches inputs from IPFS, stores analysis results on IPFS. +""" + +import json +import logging +import os +import shutil +import tempfile +from pathlib import Path +from typing import Dict, List, Optional + +from celery import current_task + +import sys +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from celery_app import app +import ipfs_client + +# Redis for caching analysis CIDs +import redis +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/5") +_redis: Optional[redis.Redis] = None + +def get_redis() -> redis.Redis: + global _redis + if _redis is None: + _redis = redis.from_url(REDIS_URL, decode_responses=True) + return _redis + +# Import artdag analysis module +try: + from artdag.analysis import Analyzer, AnalysisResult +except ImportError: + Analyzer = None + AnalysisResult = None + +logger = logging.getLogger(__name__) + +# Redis key for analysis cache +ANALYSIS_CACHE_KEY = "artdag:analysis_cid" # hash: input_hash:features → analysis CID + + +def get_analysis_cache_key(input_hash: str, features: List[str]) -> str: + """Generate cache key for analysis results.""" + features_str = ",".join(sorted(features)) + return f"{input_hash}:{features_str}" + + +def get_cached_analysis_cid(input_hash: str, features: List[str]) -> Optional[str]: + """Check if analysis is already cached.""" + key = get_analysis_cache_key(input_hash, features) + return get_redis().hget(ANALYSIS_CACHE_KEY, key) + + +def set_cached_analysis_cid(input_hash: str, features: List[str], cid: str) -> None: + """Store analysis CID in cache.""" + key = get_analysis_cache_key(input_hash, features) + get_redis().hset(ANALYSIS_CACHE_KEY, key, cid) + + +@app.task(bind=True, name='tasks.analyze_input_cid') +def analyze_input_cid( + self, + input_cid: str, + input_hash: str, + features: List[str], +) -> dict: + """ + Analyze an input file using IPFS-primary architecture. + + Args: + input_cid: IPFS CID of the input file + input_hash: Content hash of the input (for cache key) + features: List of features to extract + + Returns: + Dict with 'analysis_cid' and 'result' + """ + if Analyzer is None: + raise ImportError("artdag.analysis not available") + + logger.info(f"[CID] Analyzing {input_hash[:16]}... for features: {features}") + + # Check cache first + cached_cid = get_cached_analysis_cid(input_hash, features) + if cached_cid: + logger.info(f"[CID] Analysis cache hit: {cached_cid}") + # Fetch the cached analysis from IPFS + analysis_bytes = ipfs_client.get_bytes(cached_cid) + if analysis_bytes: + result_dict = json.loads(analysis_bytes.decode('utf-8')) + return { + "status": "cached", + "input_hash": input_hash, + "analysis_cid": cached_cid, + "result": result_dict, + } + + # Create temp workspace + work_dir = Path(tempfile.mkdtemp(prefix="artdag_analysis_")) + + try: + # Fetch input from IPFS + input_path = work_dir / f"input_{input_cid[:16]}.mkv" + logger.info(f"[CID] Fetching input: {input_cid}") + + if not ipfs_client.get_file(input_cid, input_path): + raise RuntimeError(f"Failed to fetch input from IPFS: {input_cid}") + + # Run analysis (no local cache, we'll store on IPFS) + analyzer = Analyzer(cache_dir=None) + + result = analyzer.analyze( + input_hash=input_hash, + features=features, + input_path=input_path, + ) + + result_dict = result.to_dict() + + # Store analysis result on IPFS + analysis_cid = ipfs_client.add_json(result_dict) + if not analysis_cid: + raise RuntimeError("Failed to store analysis on IPFS") + + logger.info(f"[CID] Analysis complete: {input_hash[:16]}... → {analysis_cid}") + + # Cache the mapping + set_cached_analysis_cid(input_hash, features, analysis_cid) + + return { + "status": "completed", + "input_hash": input_hash, + "analysis_cid": analysis_cid, + "result": result_dict, + } + + except Exception as e: + logger.error(f"[CID] Analysis failed for {input_hash}: {e}") + return { + "status": "failed", + "input_hash": input_hash, + "error": str(e), + } + + finally: + # Cleanup + shutil.rmtree(work_dir, ignore_errors=True) + + +@app.task(bind=True, name='tasks.analyze_inputs_cid') +def analyze_inputs_cid( + self, + input_cids: Dict[str, str], + features: List[str], +) -> dict: + """ + Analyze multiple inputs using IPFS-primary architecture. + + Args: + input_cids: Dict mapping input_hash to IPFS CID + features: List of features to extract from all inputs + + Returns: + Dict with analysis CIDs and results for all inputs + """ + from celery import group + + logger.info(f"[CID] Analyzing {len(input_cids)} inputs for features: {features}") + + # Dispatch analysis tasks in parallel + tasks = [ + analyze_input_cid.s(cid, input_hash, features) + for input_hash, cid in input_cids.items() + ] + + if len(tasks) == 1: + results = [tasks[0].apply_async().get(timeout=600)] + else: + job = group(tasks) + results = job.apply_async().get(timeout=600) + + # Collect results + analysis_cids = {} + analysis_results = {} + errors = [] + + for result in results: + input_hash = result.get("input_hash") + if result.get("status") in ("completed", "cached"): + analysis_cids[input_hash] = result["analysis_cid"] + analysis_results[input_hash] = result["result"] + else: + errors.append({ + "input_hash": input_hash, + "error": result.get("error", "Unknown error"), + }) + + return { + "status": "completed" if not errors else "partial", + "analysis_cids": analysis_cids, + "results": analysis_results, + "errors": errors, + "total": len(input_cids), + "successful": len(analysis_cids), + }