From 3116a70c3ee13b5a1b68f206e33c4e1c40d08c36 Mon Sep 17 00:00:00 2001 From: giles Date: Tue, 3 Feb 2026 21:17:22 +0000 Subject: [PATCH] Fix IPFS upload: sync instead of background task The background IPFS upload task was running on workers that don't have the file locally, causing uploads to fail silently. Now uploads go to IPFS synchronously so the IPFS CID is available immediately. Co-Authored-By: Claude Opus 4.5 --- app/services/cache_service.py | 21 +++++++++------------ sexp_effects/primitive_libs/streaming.py | 17 +++++++++++++++++ streaming/stream_sexp_generic.py | 1 - 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/app/services/cache_service.py b/app/services/cache_service.py index ddc50a4..9b7bcd8 100644 --- a/app/services/cache_service.py +++ b/app/services/cache_service.py @@ -554,13 +554,13 @@ class CacheService: # Detect media type (video/image/audio) before moving file media_type = detect_media_type(tmp_path) - # Store locally first (skip_ipfs=True for fast response) - # IPFS upload happens in background - cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True, skip_ipfs=True) - cid = cached.cid # Use local hash since we skipped IPFS + # Store locally AND upload to IPFS synchronously + # This ensures the IPFS CID is available immediately for distributed access + cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True, skip_ipfs=False) + cid = ipfs_cid or cached.cid # Prefer IPFS CID, fall back to local hash # Save to database with media category type - await self.db.create_cache_item(cid, ipfs_cid) # ipfs_cid is None initially + await self.db.create_cache_item(cached.cid, ipfs_cid) await self.db.save_item_metadata( cid=cid, actor_id=actor_id, @@ -568,13 +568,10 @@ class CacheService: filename=filename ) - # Queue background IPFS upload - try: - from tasks.ipfs_upload import upload_to_ipfs - upload_to_ipfs.delay(cid, actor_id) - logger.info(f"Queued background IPFS upload for {cid[:16]}...") - except Exception as e: - logger.warning(f"Failed to queue IPFS upload (will retry manually): {e}") + if ipfs_cid: + logger.info(f"Uploaded to IPFS: {ipfs_cid[:16]}...") + else: + logger.warning(f"IPFS upload failed, using local hash: {cid[:16]}...") return cid, ipfs_cid, None except Exception as e: diff --git a/sexp_effects/primitive_libs/streaming.py b/sexp_effects/primitive_libs/streaming.py index 34c1f07..7ad2793 100644 --- a/sexp_effects/primitive_libs/streaming.py +++ b/sexp_effects/primitive_libs/streaming.py @@ -422,3 +422,20 @@ def prim_audio_beat_count(analyzer: AudioAnalyzer, t: float) -> int: def prim_audio_duration(analyzer: AudioAnalyzer) -> float: """Get audio duration in seconds.""" return analyzer.duration + + +# Export primitives +PRIMITIVES = { + # Video source + 'make-video-source': prim_make_video_source, + 'source-read': prim_source_read, + 'source-skip': prim_source_skip, + 'source-size': prim_source_size, + + # Audio analyzer + 'make-audio-analyzer': prim_make_audio_analyzer, + 'audio-energy': prim_audio_energy, + 'audio-beat': prim_audio_beat, + 'audio-beat-count': prim_audio_beat_count, + 'audio-duration': prim_audio_duration, +} diff --git a/streaming/stream_sexp_generic.py b/streaming/stream_sexp_generic.py index 4d7679c..c311de6 100644 --- a/streaming/stream_sexp_generic.py +++ b/streaming/stream_sexp_generic.py @@ -414,7 +414,6 @@ class StreamInterpreter: self._load_effect(resolved) else: raise RuntimeError(f"Could not resolve include name '{fname}' - make sure it's uploaded and you're logged in") - raise RuntimeError(f"Could not resolve include name '{fname}' - make sure it's uploaded and you're logged in") i += 2 else: i += 1