""" IPFS-primary analysis tasks. Fetches inputs from IPFS, stores analysis results on IPFS. Uses HybridStateManager for: - Fast local Redis operations - Background IPNS sync with other L1 nodes """ import json import logging import os import shutil import tempfile from pathlib import Path from typing import Dict, List, Optional from celery import current_task import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from celery_app import app import ipfs_client from hybrid_state import get_state_manager # Import artdag analysis module try: from artdag.analysis import Analyzer, AnalysisResult except ImportError: Analyzer = None AnalysisResult = None logger = logging.getLogger(__name__) def get_cached_analysis_cid(input_hash: str, features: List[str]) -> Optional[str]: """Check if analysis is already cached.""" return get_state_manager().get_analysis_cid(input_hash, features) def set_cached_analysis_cid(input_hash: str, features: List[str], cid: str) -> None: """Store analysis CID in cache.""" get_state_manager().set_analysis_cid(input_hash, features, cid) @app.task(bind=True, name='tasks.analyze_input_cid') def analyze_input_cid( self, input_cid: str, input_hash: str, features: List[str], ) -> dict: """ Analyze an input file using IPFS-primary architecture. Args: input_cid: IPFS CID of the input file input_hash: Content hash of the input (for cache key) features: List of features to extract Returns: Dict with 'analysis_cid' and 'result' """ if Analyzer is None: raise ImportError("artdag.analysis not available") logger.info(f"[CID] Analyzing {input_hash[:16]}... for features: {features}") # Check cache first cached_cid = get_cached_analysis_cid(input_hash, features) if cached_cid: logger.info(f"[CID] Analysis cache hit: {cached_cid}") # Fetch the cached analysis from IPFS analysis_bytes = ipfs_client.get_bytes(cached_cid) if analysis_bytes: result_dict = json.loads(analysis_bytes.decode('utf-8')) return { "status": "cached", "input_hash": input_hash, "analysis_cid": cached_cid, "result": result_dict, } # Create temp workspace work_dir = Path(tempfile.mkdtemp(prefix="artdag_analysis_")) try: # Fetch input from IPFS input_path = work_dir / f"input_{input_cid[:16]}.mkv" logger.info(f"[CID] Fetching input: {input_cid}") if not ipfs_client.get_file(input_cid, input_path): raise RuntimeError(f"Failed to fetch input from IPFS: {input_cid}") # Run analysis (no local cache, we'll store on IPFS) analyzer = Analyzer(cache_dir=None) result = analyzer.analyze( input_hash=input_hash, features=features, input_path=input_path, ) result_dict = result.to_dict() # Store analysis result on IPFS analysis_cid = ipfs_client.add_json(result_dict) if not analysis_cid: raise RuntimeError("Failed to store analysis on IPFS") logger.info(f"[CID] Analysis complete: {input_hash[:16]}... → {analysis_cid}") # Cache the mapping set_cached_analysis_cid(input_hash, features, analysis_cid) return { "status": "completed", "input_hash": input_hash, "analysis_cid": analysis_cid, "result": result_dict, } except Exception as e: logger.error(f"[CID] Analysis failed for {input_hash}: {e}") return { "status": "failed", "input_hash": input_hash, "error": str(e), } finally: # Cleanup shutil.rmtree(work_dir, ignore_errors=True) @app.task(bind=True, name='tasks.analyze_inputs_cid') def analyze_inputs_cid( self, input_cids: Dict[str, str], features: List[str], ) -> dict: """ Analyze multiple inputs using IPFS-primary architecture. Args: input_cids: Dict mapping input_hash to IPFS CID features: List of features to extract from all inputs Returns: Dict with analysis CIDs and results for all inputs """ from celery import group logger.info(f"[CID] Analyzing {len(input_cids)} inputs for features: {features}") # Dispatch analysis tasks in parallel tasks = [ analyze_input_cid.s(cid, input_hash, features) for input_hash, cid in input_cids.items() ] if len(tasks) == 1: results = [tasks[0].apply_async().get(timeout=600)] else: job = group(tasks) results = job.apply_async().get(timeout=600) # Collect results analysis_cids = {} analysis_results = {} errors = [] for result in results: input_hash = result.get("input_hash") if result.get("status") in ("completed", "cached"): analysis_cids[input_hash] = result["analysis_cid"] analysis_results[input_hash] = result["result"] else: errors.append({ "input_hash": input_hash, "error": result.get("error", "Unknown error"), }) return { "status": "completed" if not errors else "partial", "analysis_cids": analysis_cids, "results": analysis_results, "errors": errors, "total": len(input_cids), "successful": len(analysis_cids), }