# art-activity-pub/ipfs_client.py """ IPFS client for Art DAG L2 server. Provides functions to fetch and pin content from IPFS. Uses direct HTTP API calls for compatibility with all Kubo versions. """ import logging import os import re from typing import Optional import requests logger = logging.getLogger(__name__) # IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001 IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001") # Connection timeout in seconds IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "60")) def _multiaddr_to_url(multiaddr: str) -> str: """Convert IPFS multiaddr to HTTP URL.""" # Handle /dns/hostname/tcp/port format dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr) if dns_match: return f"http://{dns_match.group(1)}:{dns_match.group(2)}" # Handle /ip4/address/tcp/port format ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr) if ip4_match: return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}" # Fallback: assume it's already a URL or use default if multiaddr.startswith("http"): return multiaddr return "http://127.0.0.1:5001" # Base URL for IPFS API IPFS_BASE_URL = _multiaddr_to_url(IPFS_API) def get_bytes(cid: str) -> Optional[bytes]: """ Retrieve content from IPFS by CID. Args: cid: IPFS CID to retrieve Returns: Content as bytes or None on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/cat" params = {"arg": cid} response = requests.post(url, params=params, timeout=IPFS_TIMEOUT) response.raise_for_status() data = response.content logger.info(f"Retrieved from IPFS: {cid} ({len(data)} bytes)") return data except Exception as e: logger.error(f"Failed to get from IPFS: {e}") return None def pin(cid: str) -> bool: """ Pin a CID on this node. Args: cid: IPFS CID to pin Returns: True on success, False on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/pin/add" params = {"arg": cid} response = requests.post(url, params=params, timeout=IPFS_TIMEOUT) response.raise_for_status() logger.info(f"Pinned on IPFS: {cid}") return True except Exception as e: logger.error(f"Failed to pin on IPFS: {e}") return False def unpin(cid: str) -> bool: """ Unpin a CID from this node. Args: cid: IPFS CID to unpin Returns: True on success, False on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/pin/rm" params = {"arg": cid} response = requests.post(url, params=params, timeout=IPFS_TIMEOUT) response.raise_for_status() logger.info(f"Unpinned from IPFS: {cid}") return True except Exception as e: logger.error(f"Failed to unpin from IPFS: {e}") return False def is_available() -> bool: """ Check if IPFS daemon is available. Returns: True if IPFS is available, False otherwise """ try: url = f"{IPFS_BASE_URL}/api/v0/id" response = requests.post(url, timeout=5) return response.status_code == 200 except Exception: return False def get_node_id() -> Optional[str]: """ Get this IPFS node's peer ID. Returns: Peer ID string or None on failure """ try: url = f"{IPFS_BASE_URL}/api/v0/id" response = requests.post(url, timeout=IPFS_TIMEOUT) response.raise_for_status() return response.json().get("ID") except Exception as e: logger.error(f"Failed to get node ID: {e}") return None