Changed default from /data/cache to ~/.artdag/cache for local runs. Docker sets CACHE_DIR=/data/cache via environment variable. Files updated: - tasks/analyze.py - tasks/orchestrate.py - app/config.py 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
133 lines
3.4 KiB
Python
133 lines
3.4 KiB
Python
"""
|
|
Analysis tasks for extracting features from input media.
|
|
|
|
Phase 1 of the 3-phase execution model.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from celery import current_task
|
|
|
|
# Import from the Celery app
|
|
import sys
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from celery_app import app
|
|
|
|
# Import artdag analysis module
|
|
try:
|
|
from artdag.analysis import Analyzer, AnalysisResult
|
|
except ImportError:
|
|
# artdag not installed, will fail at runtime
|
|
Analyzer = None
|
|
AnalysisResult = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Cache directory for analysis results
|
|
CACHE_DIR = Path(os.environ.get('CACHE_DIR', str(Path.home() / ".artdag" / "cache")))
|
|
ANALYSIS_CACHE_DIR = CACHE_DIR / 'analysis'
|
|
|
|
|
|
@app.task(bind=True, name='tasks.analyze_input')
|
|
def analyze_input(
|
|
self,
|
|
input_hash: str,
|
|
input_path: str,
|
|
features: List[str],
|
|
) -> dict:
|
|
"""
|
|
Analyze a single input file.
|
|
|
|
Args:
|
|
input_hash: Content hash of the input
|
|
input_path: Path to the input file
|
|
features: List of features to extract
|
|
|
|
Returns:
|
|
Dict with analysis results
|
|
"""
|
|
if Analyzer is None:
|
|
raise ImportError("artdag.analysis not available")
|
|
|
|
logger.info(f"Analyzing {input_hash[:16]}... for features: {features}")
|
|
|
|
# Create analyzer with caching
|
|
ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR)
|
|
|
|
try:
|
|
result = analyzer.analyze(
|
|
input_hash=input_hash,
|
|
features=features,
|
|
input_path=Path(input_path),
|
|
)
|
|
|
|
return {
|
|
"status": "completed",
|
|
"input_hash": input_hash,
|
|
"cache_id": result.cache_id,
|
|
"features": features,
|
|
"result": result.to_dict(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Analysis failed for {input_hash}: {e}")
|
|
return {
|
|
"status": "failed",
|
|
"input_hash": input_hash,
|
|
"error": str(e),
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.analyze_inputs')
|
|
def analyze_inputs(
|
|
self,
|
|
inputs: Dict[str, str],
|
|
features: List[str],
|
|
) -> dict:
|
|
"""
|
|
Analyze multiple inputs in parallel.
|
|
|
|
Args:
|
|
inputs: Dict mapping input_hash to file path
|
|
features: List of features to extract from all inputs
|
|
|
|
Returns:
|
|
Dict with all analysis results
|
|
"""
|
|
if Analyzer is None:
|
|
raise ImportError("artdag.analysis not available")
|
|
|
|
logger.info(f"Analyzing {len(inputs)} inputs for features: {features}")
|
|
|
|
ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR)
|
|
|
|
results = {}
|
|
errors = []
|
|
|
|
for input_hash, input_path in inputs.items():
|
|
try:
|
|
result = analyzer.analyze(
|
|
input_hash=input_hash,
|
|
features=features,
|
|
input_path=Path(input_path),
|
|
)
|
|
results[input_hash] = result.to_dict()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Analysis failed for {input_hash}: {e}")
|
|
errors.append({"input_hash": input_hash, "error": str(e)})
|
|
|
|
return {
|
|
"status": "completed" if not errors else "partial",
|
|
"results": results,
|
|
"errors": errors,
|
|
"total": len(inputs),
|
|
"successful": len(results),
|
|
}
|