Fix media type detection using magic bytes
Use detect_media_type() with magic bytes instead of mimetypes.guess_type() which requires file extensions. Cache files are stored by content hash without extensions, so magic byte detection is needed. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -157,17 +157,28 @@ async def get_run(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to load recipe for plan: {e}")
|
logger.warning(f"Failed to load recipe for plan: {e}")
|
||||||
|
|
||||||
|
# Helper to convert simple type to MIME type prefix for template
|
||||||
|
def type_to_mime(simple_type: str) -> str:
|
||||||
|
if simple_type == "video":
|
||||||
|
return "video/mp4"
|
||||||
|
elif simple_type == "image":
|
||||||
|
return "image/jpeg"
|
||||||
|
elif simple_type == "audio":
|
||||||
|
return "audio/mpeg"
|
||||||
|
return None
|
||||||
|
|
||||||
# Build artifacts list from output and inputs
|
# Build artifacts list from output and inputs
|
||||||
artifacts = []
|
artifacts = []
|
||||||
if run.get("output_hash"):
|
if run.get("output_hash"):
|
||||||
# Detect media type
|
# Detect media type using magic bytes
|
||||||
output_hash = run["output_hash"]
|
output_hash = run["output_hash"]
|
||||||
media_type = None
|
media_type = None
|
||||||
try:
|
try:
|
||||||
cache_path = get_cache_manager().get_path(output_hash)
|
from ..services.run_service import detect_media_type
|
||||||
|
cache_path = get_cache_manager().get_by_content_hash(output_hash)
|
||||||
if cache_path and cache_path.exists():
|
if cache_path and cache_path.exists():
|
||||||
import mimetypes
|
simple_type = detect_media_type(cache_path)
|
||||||
media_type, _ = mimetypes.guess_type(str(cache_path))
|
media_type = type_to_mime(simple_type)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
artifacts.append({
|
artifacts.append({
|
||||||
@@ -179,14 +190,15 @@ async def get_run(
|
|||||||
# Build inputs list with media types
|
# Build inputs list with media types
|
||||||
run_inputs = []
|
run_inputs = []
|
||||||
if run.get("inputs"):
|
if run.get("inputs"):
|
||||||
import mimetypes
|
from ..services.run_service import detect_media_type
|
||||||
cache_manager = get_cache_manager()
|
cache_manager = get_cache_manager()
|
||||||
for i, input_hash in enumerate(run["inputs"]):
|
for i, input_hash in enumerate(run["inputs"]):
|
||||||
media_type = None
|
media_type = None
|
||||||
try:
|
try:
|
||||||
cache_path = cache_manager.get_path(input_hash)
|
cache_path = cache_manager.get_by_content_hash(input_hash)
|
||||||
if cache_path and cache_path.exists():
|
if cache_path and cache_path.exists():
|
||||||
media_type, _ = mimetypes.guess_type(str(cache_path))
|
simple_type = detect_media_type(cache_path)
|
||||||
|
media_type = type_to_mime(simple_type)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
run_inputs.append({
|
run_inputs.append({
|
||||||
|
|||||||
Reference in New Issue
Block a user