Add IPFS-primary analysis task
- Fetches input from IPFS by CID - Stores analysis JSON on IPFS - Returns analysis_cid - Redis cache: input_hash:features → analysis CID Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
209
tasks/analyze_cid.py
Normal file
209
tasks/analyze_cid.py
Normal file
@@ -0,0 +1,209 @@
|
||||
"""
|
||||
IPFS-primary analysis tasks.
|
||||
|
||||
Fetches inputs from IPFS, stores analysis results on IPFS.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from celery import current_task
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from celery_app import app
|
||||
import ipfs_client
|
||||
|
||||
# Redis for caching analysis CIDs
|
||||
import redis
|
||||
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/5")
|
||||
_redis: Optional[redis.Redis] = None
|
||||
|
||||
def get_redis() -> redis.Redis:
|
||||
global _redis
|
||||
if _redis is None:
|
||||
_redis = redis.from_url(REDIS_URL, decode_responses=True)
|
||||
return _redis
|
||||
|
||||
# Import artdag analysis module
|
||||
try:
|
||||
from artdag.analysis import Analyzer, AnalysisResult
|
||||
except ImportError:
|
||||
Analyzer = None
|
||||
AnalysisResult = None
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Redis key for analysis cache
|
||||
ANALYSIS_CACHE_KEY = "artdag:analysis_cid" # hash: input_hash:features → analysis CID
|
||||
|
||||
|
||||
def get_analysis_cache_key(input_hash: str, features: List[str]) -> str:
|
||||
"""Generate cache key for analysis results."""
|
||||
features_str = ",".join(sorted(features))
|
||||
return f"{input_hash}:{features_str}"
|
||||
|
||||
|
||||
def get_cached_analysis_cid(input_hash: str, features: List[str]) -> Optional[str]:
|
||||
"""Check if analysis is already cached."""
|
||||
key = get_analysis_cache_key(input_hash, features)
|
||||
return get_redis().hget(ANALYSIS_CACHE_KEY, key)
|
||||
|
||||
|
||||
def set_cached_analysis_cid(input_hash: str, features: List[str], cid: str) -> None:
|
||||
"""Store analysis CID in cache."""
|
||||
key = get_analysis_cache_key(input_hash, features)
|
||||
get_redis().hset(ANALYSIS_CACHE_KEY, key, cid)
|
||||
|
||||
|
||||
@app.task(bind=True, name='tasks.analyze_input_cid')
|
||||
def analyze_input_cid(
|
||||
self,
|
||||
input_cid: str,
|
||||
input_hash: str,
|
||||
features: List[str],
|
||||
) -> dict:
|
||||
"""
|
||||
Analyze an input file using IPFS-primary architecture.
|
||||
|
||||
Args:
|
||||
input_cid: IPFS CID of the input file
|
||||
input_hash: Content hash of the input (for cache key)
|
||||
features: List of features to extract
|
||||
|
||||
Returns:
|
||||
Dict with 'analysis_cid' and 'result'
|
||||
"""
|
||||
if Analyzer is None:
|
||||
raise ImportError("artdag.analysis not available")
|
||||
|
||||
logger.info(f"[CID] Analyzing {input_hash[:16]}... for features: {features}")
|
||||
|
||||
# Check cache first
|
||||
cached_cid = get_cached_analysis_cid(input_hash, features)
|
||||
if cached_cid:
|
||||
logger.info(f"[CID] Analysis cache hit: {cached_cid}")
|
||||
# Fetch the cached analysis from IPFS
|
||||
analysis_bytes = ipfs_client.get_bytes(cached_cid)
|
||||
if analysis_bytes:
|
||||
result_dict = json.loads(analysis_bytes.decode('utf-8'))
|
||||
return {
|
||||
"status": "cached",
|
||||
"input_hash": input_hash,
|
||||
"analysis_cid": cached_cid,
|
||||
"result": result_dict,
|
||||
}
|
||||
|
||||
# Create temp workspace
|
||||
work_dir = Path(tempfile.mkdtemp(prefix="artdag_analysis_"))
|
||||
|
||||
try:
|
||||
# Fetch input from IPFS
|
||||
input_path = work_dir / f"input_{input_cid[:16]}.mkv"
|
||||
logger.info(f"[CID] Fetching input: {input_cid}")
|
||||
|
||||
if not ipfs_client.get_file(input_cid, input_path):
|
||||
raise RuntimeError(f"Failed to fetch input from IPFS: {input_cid}")
|
||||
|
||||
# Run analysis (no local cache, we'll store on IPFS)
|
||||
analyzer = Analyzer(cache_dir=None)
|
||||
|
||||
result = analyzer.analyze(
|
||||
input_hash=input_hash,
|
||||
features=features,
|
||||
input_path=input_path,
|
||||
)
|
||||
|
||||
result_dict = result.to_dict()
|
||||
|
||||
# Store analysis result on IPFS
|
||||
analysis_cid = ipfs_client.add_json(result_dict)
|
||||
if not analysis_cid:
|
||||
raise RuntimeError("Failed to store analysis on IPFS")
|
||||
|
||||
logger.info(f"[CID] Analysis complete: {input_hash[:16]}... → {analysis_cid}")
|
||||
|
||||
# Cache the mapping
|
||||
set_cached_analysis_cid(input_hash, features, analysis_cid)
|
||||
|
||||
return {
|
||||
"status": "completed",
|
||||
"input_hash": input_hash,
|
||||
"analysis_cid": analysis_cid,
|
||||
"result": result_dict,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[CID] Analysis failed for {input_hash}: {e}")
|
||||
return {
|
||||
"status": "failed",
|
||||
"input_hash": input_hash,
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
shutil.rmtree(work_dir, ignore_errors=True)
|
||||
|
||||
|
||||
@app.task(bind=True, name='tasks.analyze_inputs_cid')
|
||||
def analyze_inputs_cid(
|
||||
self,
|
||||
input_cids: Dict[str, str],
|
||||
features: List[str],
|
||||
) -> dict:
|
||||
"""
|
||||
Analyze multiple inputs using IPFS-primary architecture.
|
||||
|
||||
Args:
|
||||
input_cids: Dict mapping input_hash to IPFS CID
|
||||
features: List of features to extract from all inputs
|
||||
|
||||
Returns:
|
||||
Dict with analysis CIDs and results for all inputs
|
||||
"""
|
||||
from celery import group
|
||||
|
||||
logger.info(f"[CID] Analyzing {len(input_cids)} inputs for features: {features}")
|
||||
|
||||
# Dispatch analysis tasks in parallel
|
||||
tasks = [
|
||||
analyze_input_cid.s(cid, input_hash, features)
|
||||
for input_hash, cid in input_cids.items()
|
||||
]
|
||||
|
||||
if len(tasks) == 1:
|
||||
results = [tasks[0].apply_async().get(timeout=600)]
|
||||
else:
|
||||
job = group(tasks)
|
||||
results = job.apply_async().get(timeout=600)
|
||||
|
||||
# Collect results
|
||||
analysis_cids = {}
|
||||
analysis_results = {}
|
||||
errors = []
|
||||
|
||||
for result in results:
|
||||
input_hash = result.get("input_hash")
|
||||
if result.get("status") in ("completed", "cached"):
|
||||
analysis_cids[input_hash] = result["analysis_cid"]
|
||||
analysis_results[input_hash] = result["result"]
|
||||
else:
|
||||
errors.append({
|
||||
"input_hash": input_hash,
|
||||
"error": result.get("error", "Unknown error"),
|
||||
})
|
||||
|
||||
return {
|
||||
"status": "completed" if not errors else "partial",
|
||||
"analysis_cids": analysis_cids,
|
||||
"results": analysis_results,
|
||||
"errors": errors,
|
||||
"total": len(input_cids),
|
||||
"successful": len(analysis_cids),
|
||||
}
|
||||
Reference in New Issue
Block a user