""" Analysis tasks for extracting features from input media. Phase 1 of the 3-phase execution model. """ import json import logging import os from pathlib import Path from typing import Dict, List, Optional from celery import current_task # Import from the Celery app import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from celery_app import app # Import artdag analysis module try: from artdag.analysis import Analyzer, AnalysisResult except ImportError: # artdag not installed, will fail at runtime Analyzer = None AnalysisResult = None logger = logging.getLogger(__name__) # Cache directory for analysis results CACHE_DIR = Path(os.environ.get('CACHE_DIR', str(Path.home() / ".artdag" / "cache"))) ANALYSIS_CACHE_DIR = CACHE_DIR / 'analysis' @app.task(bind=True, name='tasks.analyze_input') def analyze_input( self, input_hash: str, input_path: str, features: List[str], ) -> dict: """ Analyze a single input file. Args: input_hash: Content hash of the input input_path: Path to the input file features: List of features to extract Returns: Dict with analysis results """ if Analyzer is None: raise ImportError("artdag.analysis not available") logger.info(f"Analyzing {input_hash[:16]}... for features: {features}") # Create analyzer with caching ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR) try: result = analyzer.analyze( input_hash=input_hash, features=features, input_path=Path(input_path), ) return { "status": "completed", "input_hash": input_hash, "cache_id": result.cache_id, "features": features, "result": result.to_dict(), } except Exception as e: logger.error(f"Analysis failed for {input_hash}: {e}") return { "status": "failed", "input_hash": input_hash, "error": str(e), } @app.task(bind=True, name='tasks.analyze_inputs') def analyze_inputs( self, inputs: Dict[str, str], features: List[str], ) -> dict: """ Analyze multiple inputs in parallel. Args: inputs: Dict mapping input_hash to file path features: List of features to extract from all inputs Returns: Dict with all analysis results """ if Analyzer is None: raise ImportError("artdag.analysis not available") logger.info(f"Analyzing {len(inputs)} inputs for features: {features}") ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR) results = {} errors = [] for input_hash, input_path in inputs.items(): try: result = analyzer.analyze( input_hash=input_hash, features=features, input_path=Path(input_path), ) results[input_hash] = result.to_dict() except Exception as e: logger.error(f"Analysis failed for {input_hash}: {e}") errors.append({"input_hash": input_hash, "error": str(e)}) return { "status": "completed" if not errors else "partial", "results": results, "errors": errors, "total": len(inputs), "successful": len(results), }