Files
celery/ipfs_client.py
gilesb ba244b9ebc Add PostgreSQL + IPFS backend, rename configs to recipes
- Add PostgreSQL database for cache metadata storage with schema for
  cache_items, item_types, pin_reasons, and l2_shares tables
- Add IPFS integration as durable backing store (local cache as hot storage)
- Add postgres and ipfs services to docker-compose.yml
- Update cache_manager to upload to IPFS and track CIDs
- Rename all config references to recipe throughout server.py
- Update API endpoints: /configs/* -> /recipes/*
- Update models: ConfigStatus -> RecipeStatus, ConfigRunRequest -> RecipeRunRequest
- Update UI tabs and pages to show Recipes instead of Configs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-08 14:58:29 +00:00

193 lines
4.6 KiB
Python

# art-celery/ipfs_client.py
"""
IPFS client for Art DAG L1 server.
Provides functions to add, retrieve, and pin files on IPFS.
Uses local cache as hot storage with IPFS as durable backing store.
"""
import logging
import os
from pathlib import Path
from typing import Optional
import ipfshttpclient
logger = logging.getLogger(__name__)
# IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001
IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001")
# Connection timeout in seconds
IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "30"))
def get_client():
"""Get an IPFS client connection."""
return ipfshttpclient.connect(IPFS_API, timeout=IPFS_TIMEOUT)
def add_file(file_path: Path, pin: bool = True) -> Optional[str]:
"""
Add a file to IPFS and optionally pin it.
Args:
file_path: Path to the file to add
pin: Whether to pin the file (default: True)
Returns:
IPFS CID (content identifier) or None on failure
"""
try:
with get_client() as client:
result = client.add(str(file_path), pin=pin)
cid = result["Hash"]
logger.info(f"Added to IPFS: {file_path.name} -> {cid}")
return cid
except Exception as e:
logger.error(f"Failed to add to IPFS: {e}")
return None
def add_bytes(data: bytes, pin: bool = True) -> Optional[str]:
"""
Add bytes data to IPFS and optionally pin it.
Args:
data: Bytes to add
pin: Whether to pin the data (default: True)
Returns:
IPFS CID or None on failure
"""
try:
with get_client() as client:
result = client.add_bytes(data)
cid = result
if pin:
client.pin.add(cid)
logger.info(f"Added bytes to IPFS: {len(data)} bytes -> {cid}")
return cid
except Exception as e:
logger.error(f"Failed to add bytes to IPFS: {e}")
return None
def get_file(cid: str, dest_path: Path) -> bool:
"""
Retrieve a file from IPFS and save to destination.
Args:
cid: IPFS CID to retrieve
dest_path: Path to save the file
Returns:
True on success, False on failure
"""
try:
with get_client() as client:
# Get file content
data = client.cat(cid)
# Write to destination
dest_path.parent.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(data)
logger.info(f"Retrieved from IPFS: {cid} -> {dest_path}")
return True
except Exception as e:
logger.error(f"Failed to get from IPFS: {e}")
return False
def get_bytes(cid: str) -> Optional[bytes]:
"""
Retrieve bytes data from IPFS.
Args:
cid: IPFS CID to retrieve
Returns:
File content as bytes or None on failure
"""
try:
with get_client() as client:
data = client.cat(cid)
logger.info(f"Retrieved from IPFS: {cid} ({len(data)} bytes)")
return data
except Exception as e:
logger.error(f"Failed to get bytes from IPFS: {e}")
return None
def pin(cid: str) -> bool:
"""
Pin a CID on IPFS.
Args:
cid: IPFS CID to pin
Returns:
True on success, False on failure
"""
try:
with get_client() as client:
client.pin.add(cid)
logger.info(f"Pinned on IPFS: {cid}")
return True
except Exception as e:
logger.error(f"Failed to pin on IPFS: {e}")
return False
def unpin(cid: str) -> bool:
"""
Unpin a CID on IPFS.
Args:
cid: IPFS CID to unpin
Returns:
True on success, False on failure
"""
try:
with get_client() as client:
client.pin.rm(cid)
logger.info(f"Unpinned on IPFS: {cid}")
return True
except Exception as e:
logger.error(f"Failed to unpin on IPFS: {e}")
return False
def is_pinned(cid: str) -> bool:
"""
Check if a CID is pinned on IPFS.
Args:
cid: IPFS CID to check
Returns:
True if pinned, False otherwise
"""
try:
with get_client() as client:
pins = client.pin.ls(type="recursive")
return cid in pins.get("Keys", {})
except Exception as e:
logger.error(f"Failed to check pin status: {e}")
return False
def is_available() -> bool:
"""
Check if IPFS daemon is available.
Returns:
True if IPFS is available, False otherwise
"""
try:
with get_client() as client:
client.id()
return True
except Exception:
return False