# art-celery/ipfs_client.py """ IPFS client for Art DAG L1 server. Provides functions to add, retrieve, and pin files on IPFS. Uses direct HTTP API calls for compatibility with all Kubo versions. """ import logging import os import re from pathlib import Path from typing import Optional import requests logger = logging.getLogger(__name__) # IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001 IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001") # Connection timeout in seconds IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "30")) def _multiaddr_to_url(multiaddr: str) -> str: """Convert IPFS multiaddr to HTTP URL.""" # Handle /dns/hostname/tcp/port format dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr) if dns_match: return f"http://{dns_match.group(1)}:{dns_match.group(2)}" # Handle /ip4/address/tcp/port format ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr) if ip4_match: return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}" # Fallback: assume it's already a URL or use default if multiaddr.startswith("http"): return multiaddr return "http://127.0.0.1:5001" # Base URL for IPFS API IPFS_BASE_URL = _multiaddr_to_url(IPFS_API) def add_file(file_path: Path, pin: bool = True) -> Optional[str]: """ Add a file to IPFS and optionally pin it. Args: file_path: Path to the file to add pin: Whether to pin the file (default: True) Returns: IPFS CID (content identifier) or None on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/add" params = {"pin": str(pin).lower()} with open(file_path, "rb") as f: files = {"file": (file_path.name, f)} response = requests.post(url, params=params, files=files, timeout=IPFS_TIMEOUT) response.raise_for_status() result = response.json() cid = result["Hash"] logger.info(f"Added to IPFS: {file_path.name} -> {cid}") return cid except Exception as e: logger.error(f"Failed to add to IPFS: {e}") return None def add_bytes(data: bytes, pin: bool = True) -> Optional[str]: """ Add bytes data to IPFS and optionally pin it. Args: data: Bytes to add pin: Whether to pin the data (default: True) Returns: IPFS CID or None on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/add" params = {"pin": str(pin).lower()} files = {"file": ("data", data)} response = requests.post(url, params=params, files=files, timeout=IPFS_TIMEOUT) response.raise_for_status() result = response.json() cid = result["Hash"] logger.info(f"Added bytes to IPFS: {len(data)} bytes -> {cid}") return cid except Exception as e: logger.error(f"Failed to add bytes to IPFS: {e}") return None def get_file(cid: str, dest_path: Path) -> bool: """ Retrieve a file from IPFS and save to destination. Args: cid: IPFS CID to retrieve dest_path: Path to save the file Returns: True on success, False on failure """ try: data = get_bytes(cid) if data is None: return False dest_path.parent.mkdir(parents=True, exist_ok=True) dest_path.write_bytes(data) logger.info(f"Retrieved from IPFS: {cid} -> {dest_path}") return True except Exception as e: logger.error(f"Failed to get from IPFS: {e}") return False def get_bytes(cid: str) -> Optional[bytes]: """ Retrieve bytes data from IPFS. Args: cid: IPFS CID to retrieve Returns: File content as bytes or None on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/cat" params = {"arg": cid} response = requests.post(url, params=params, timeout=IPFS_TIMEOUT) response.raise_for_status() data = response.content logger.info(f"Retrieved from IPFS: {cid} ({len(data)} bytes)") return data except Exception as e: logger.error(f"Failed to get bytes from IPFS: {e}") return None def pin(cid: str) -> bool: """ Pin a CID on IPFS. Args: cid: IPFS CID to pin Returns: True on success, False on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/pin/add" params = {"arg": cid} response = requests.post(url, params=params, timeout=IPFS_TIMEOUT) response.raise_for_status() logger.info(f"Pinned on IPFS: {cid}") return True except Exception as e: logger.error(f"Failed to pin on IPFS: {e}") return False def unpin(cid: str) -> bool: """ Unpin a CID on IPFS. Args: cid: IPFS CID to unpin Returns: True on success, False on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/pin/rm" params = {"arg": cid} response = requests.post(url, params=params, timeout=IPFS_TIMEOUT) response.raise_for_status() logger.info(f"Unpinned on IPFS: {cid}") return True except Exception as e: logger.error(f"Failed to unpin on IPFS: {e}") return False def is_pinned(cid: str) -> bool: """ Check if a CID is pinned on IPFS. Args: cid: IPFS CID to check Returns: True if pinned, False otherwise """ try: url = f"{IPFS_BASE_URL}/api/v0/pin/ls" params = {"arg": cid, "type": "recursive"} response = requests.post(url, params=params, timeout=IPFS_TIMEOUT) if response.status_code == 200: result = response.json() return cid in result.get("Keys", {}) return False except Exception as e: logger.error(f"Failed to check pin status: {e}") return False def is_available() -> bool: """ Check if IPFS daemon is available. Returns: True if IPFS is available, False otherwise """ try: url = f"{IPFS_BASE_URL}/api/v0/id" response = requests.post(url, timeout=5) return response.status_code == 200 except Exception: return False def get_node_id() -> Optional[str]: """ Get this IPFS node's peer ID. Returns: Peer ID string or None on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/id" response = requests.post(url, timeout=IPFS_TIMEOUT) response.raise_for_status() return response.json().get("ID") except Exception as e: logger.error(f"Failed to get node ID: {e}") return None