- Add IPFSHLSOutput class that uploads segments to IPFS as they're created - Update streaming task to use IPFS HLS output for distributed streaming - Add /ipfs-stream endpoint to get IPFS playlist URL - Update /stream endpoint to redirect to IPFS when available - Add GPU persistence mode (STREAMING_GPU_PERSIST=1) to keep frames on GPU - Add hardware video decoding (NVDEC) support for faster video processing - Add GPU-accelerated primitive libraries: blending_gpu, color_ops_gpu, geometry_gpu - Add streaming_gpu module with GPUFrame class for tracking CPU/GPU data location - Add Dockerfile.gpu for building GPU-enabled worker image Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
346 lines
9.4 KiB
Python
346 lines
9.4 KiB
Python
# art-celery/ipfs_client.py
|
|
"""
|
|
IPFS client for Art DAG L1 server.
|
|
|
|
Provides functions to add, retrieve, and pin files on IPFS.
|
|
Uses direct HTTP API calls for compatibility with all Kubo versions.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Optional, Union
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001
|
|
IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001")
|
|
|
|
# Connection timeout in seconds (increased for large files)
|
|
IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "120"))
|
|
|
|
# IPFS gateway URLs for fallback when local node doesn't have content
|
|
# Comma-separated list of gateway URLs (without /ipfs/ suffix)
|
|
IPFS_GATEWAYS = [g.strip() for g in os.getenv(
|
|
"IPFS_GATEWAYS",
|
|
"https://ipfs.io,https://cloudflare-ipfs.com,https://dweb.link"
|
|
).split(",") if g.strip()]
|
|
|
|
# Gateway timeout (shorter than API timeout for faster fallback)
|
|
GATEWAY_TIMEOUT = int(os.getenv("GATEWAY_TIMEOUT", "30"))
|
|
|
|
|
|
def _multiaddr_to_url(multiaddr: str) -> str:
|
|
"""Convert IPFS multiaddr to HTTP URL."""
|
|
# Handle /dns/hostname/tcp/port format
|
|
dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if dns_match:
|
|
return f"http://{dns_match.group(1)}:{dns_match.group(2)}"
|
|
|
|
# Handle /ip4/address/tcp/port format
|
|
ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if ip4_match:
|
|
return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}"
|
|
|
|
# Fallback: assume it's already a URL or use default
|
|
if multiaddr.startswith("http"):
|
|
return multiaddr
|
|
return "http://127.0.0.1:5001"
|
|
|
|
|
|
# Base URL for IPFS API
|
|
IPFS_BASE_URL = _multiaddr_to_url(IPFS_API)
|
|
|
|
|
|
def add_file(file_path: Union[Path, str], pin: bool = True) -> Optional[str]:
|
|
"""
|
|
Add a file to IPFS and optionally pin it.
|
|
|
|
Args:
|
|
file_path: Path to the file to add (Path object or string)
|
|
pin: Whether to pin the file (default: True)
|
|
|
|
Returns:
|
|
IPFS CID (content identifier) or None on failure
|
|
"""
|
|
try:
|
|
# Ensure file_path is a Path object
|
|
if isinstance(file_path, str):
|
|
file_path = Path(file_path)
|
|
|
|
url = f"{IPFS_BASE_URL}/api/v0/add"
|
|
params = {"pin": str(pin).lower()}
|
|
|
|
with open(file_path, "rb") as f:
|
|
files = {"file": (file_path.name, f)}
|
|
response = requests.post(url, params=params, files=files, timeout=IPFS_TIMEOUT)
|
|
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
cid = result["Hash"]
|
|
logger.info(f"Added to IPFS: {file_path.name} -> {cid}")
|
|
return cid
|
|
except Exception as e:
|
|
logger.error(f"Failed to add to IPFS: {e}")
|
|
return None
|
|
|
|
|
|
def add_bytes(data: bytes, pin: bool = True) -> Optional[str]:
|
|
"""
|
|
Add bytes data to IPFS and optionally pin it.
|
|
|
|
Args:
|
|
data: Bytes to add
|
|
pin: Whether to pin the data (default: True)
|
|
|
|
Returns:
|
|
IPFS CID or None on failure
|
|
"""
|
|
try:
|
|
url = f"{IPFS_BASE_URL}/api/v0/add"
|
|
params = {"pin": str(pin).lower()}
|
|
files = {"file": ("data", data)}
|
|
|
|
response = requests.post(url, params=params, files=files, timeout=IPFS_TIMEOUT)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
cid = result["Hash"]
|
|
|
|
logger.info(f"Added bytes to IPFS: {len(data)} bytes -> {cid}")
|
|
return cid
|
|
except Exception as e:
|
|
logger.error(f"Failed to add bytes to IPFS: {e}")
|
|
return None
|
|
|
|
|
|
def add_json(data: dict, pin: bool = True) -> Optional[str]:
|
|
"""
|
|
Serialize dict to JSON and add to IPFS.
|
|
|
|
Args:
|
|
data: Dictionary to serialize and store
|
|
pin: Whether to pin the data (default: True)
|
|
|
|
Returns:
|
|
IPFS CID or None on failure
|
|
"""
|
|
import json
|
|
json_bytes = json.dumps(data, indent=2, sort_keys=True).encode('utf-8')
|
|
return add_bytes(json_bytes, pin=pin)
|
|
|
|
|
|
def add_string(content: str, pin: bool = True) -> Optional[str]:
|
|
"""
|
|
Add a string to IPFS and optionally pin it.
|
|
|
|
Args:
|
|
content: String content to add (e.g., S-expression)
|
|
pin: Whether to pin the data (default: True)
|
|
|
|
Returns:
|
|
IPFS CID or None on failure
|
|
"""
|
|
return add_bytes(content.encode('utf-8'), pin=pin)
|
|
|
|
|
|
def get_file(cid: str, dest_path: Union[Path, str]) -> bool:
|
|
"""
|
|
Retrieve a file from IPFS and save to destination.
|
|
|
|
Args:
|
|
cid: IPFS CID to retrieve
|
|
dest_path: Path to save the file (Path object or string)
|
|
|
|
Returns:
|
|
True on success, False on failure
|
|
"""
|
|
try:
|
|
data = get_bytes(cid)
|
|
if data is None:
|
|
return False
|
|
|
|
# Ensure dest_path is a Path object
|
|
if isinstance(dest_path, str):
|
|
dest_path = Path(dest_path)
|
|
|
|
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
dest_path.write_bytes(data)
|
|
logger.info(f"Retrieved from IPFS: {cid} -> {dest_path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to get from IPFS: {e}")
|
|
return False
|
|
|
|
|
|
def get_bytes_from_gateway(cid: str) -> Optional[bytes]:
|
|
"""
|
|
Retrieve bytes from IPFS via public gateways (fallback).
|
|
|
|
Tries each configured gateway in order until one succeeds.
|
|
|
|
Args:
|
|
cid: IPFS CID to retrieve
|
|
|
|
Returns:
|
|
File content as bytes or None if all gateways fail
|
|
"""
|
|
for gateway in IPFS_GATEWAYS:
|
|
try:
|
|
url = f"{gateway}/ipfs/{cid}"
|
|
logger.info(f"Trying gateway: {url}")
|
|
response = requests.get(url, timeout=GATEWAY_TIMEOUT)
|
|
response.raise_for_status()
|
|
data = response.content
|
|
logger.info(f"Retrieved from gateway {gateway}: {cid} ({len(data)} bytes)")
|
|
return data
|
|
except Exception as e:
|
|
logger.warning(f"Gateway {gateway} failed for {cid}: {e}")
|
|
continue
|
|
|
|
logger.error(f"All gateways failed for {cid}")
|
|
return None
|
|
|
|
|
|
def get_bytes(cid: str, use_gateway_fallback: bool = True) -> Optional[bytes]:
|
|
"""
|
|
Retrieve bytes data from IPFS.
|
|
|
|
Tries local IPFS node first, then falls back to public gateways
|
|
if configured and use_gateway_fallback is True.
|
|
|
|
Args:
|
|
cid: IPFS CID to retrieve
|
|
use_gateway_fallback: If True, try public gateways on local failure
|
|
|
|
Returns:
|
|
File content as bytes or None on failure
|
|
"""
|
|
# Try local IPFS node first
|
|
try:
|
|
url = f"{IPFS_BASE_URL}/api/v0/cat"
|
|
params = {"arg": cid}
|
|
|
|
response = requests.post(url, params=params, timeout=IPFS_TIMEOUT)
|
|
response.raise_for_status()
|
|
data = response.content
|
|
|
|
logger.info(f"Retrieved from IPFS: {cid} ({len(data)} bytes)")
|
|
return data
|
|
except Exception as e:
|
|
logger.warning(f"Local IPFS failed for {cid}: {e}")
|
|
|
|
# Try gateway fallback
|
|
if use_gateway_fallback and IPFS_GATEWAYS:
|
|
logger.info(f"Trying gateway fallback for {cid}")
|
|
return get_bytes_from_gateway(cid)
|
|
|
|
logger.error(f"Failed to get bytes from IPFS: {e}")
|
|
return None
|
|
|
|
|
|
def pin(cid: str) -> bool:
|
|
"""
|
|
Pin a CID on IPFS.
|
|
|
|
Args:
|
|
cid: IPFS CID to pin
|
|
|
|
Returns:
|
|
True on success, False on failure
|
|
"""
|
|
try:
|
|
url = f"{IPFS_BASE_URL}/api/v0/pin/add"
|
|
params = {"arg": cid}
|
|
|
|
response = requests.post(url, params=params, timeout=IPFS_TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
logger.info(f"Pinned on IPFS: {cid}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to pin on IPFS: {e}")
|
|
return False
|
|
|
|
|
|
def unpin(cid: str) -> bool:
|
|
"""
|
|
Unpin a CID on IPFS.
|
|
|
|
Args:
|
|
cid: IPFS CID to unpin
|
|
|
|
Returns:
|
|
True on success, False on failure
|
|
"""
|
|
try:
|
|
url = f"{IPFS_BASE_URL}/api/v0/pin/rm"
|
|
params = {"arg": cid}
|
|
|
|
response = requests.post(url, params=params, timeout=IPFS_TIMEOUT)
|
|
response.raise_for_status()
|
|
|
|
logger.info(f"Unpinned on IPFS: {cid}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to unpin on IPFS: {e}")
|
|
return False
|
|
|
|
|
|
def is_pinned(cid: str) -> bool:
|
|
"""
|
|
Check if a CID is pinned on IPFS.
|
|
|
|
Args:
|
|
cid: IPFS CID to check
|
|
|
|
Returns:
|
|
True if pinned, False otherwise
|
|
"""
|
|
try:
|
|
url = f"{IPFS_BASE_URL}/api/v0/pin/ls"
|
|
params = {"arg": cid, "type": "recursive"}
|
|
|
|
response = requests.post(url, params=params, timeout=IPFS_TIMEOUT)
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
return cid in result.get("Keys", {})
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Failed to check pin status: {e}")
|
|
return False
|
|
|
|
|
|
def is_available() -> bool:
|
|
"""
|
|
Check if IPFS daemon is available.
|
|
|
|
Returns:
|
|
True if IPFS is available, False otherwise
|
|
"""
|
|
try:
|
|
url = f"{IPFS_BASE_URL}/api/v0/id"
|
|
response = requests.post(url, timeout=5)
|
|
return response.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def get_node_id() -> Optional[str]:
|
|
"""
|
|
Get this IPFS node's peer ID.
|
|
|
|
Returns:
|
|
Peer ID string or None on failure
|
|
"""
|
|
try:
|
|
url = f"{IPFS_BASE_URL}/api/v0/id"
|
|
response = requests.post(url, timeout=IPFS_TIMEOUT)
|
|
response.raise_for_status()
|
|
return response.json().get("ID")
|
|
except Exception as e:
|
|
logger.error(f"Failed to get node ID: {e}")
|
|
return None
|