"""Async IPFS client for content-addressed storage. All content can be stored on IPFS — blog posts, products, activities, etc. Ported from ~/art-dag/activity-pub/ipfs_client.py (converted to async httpx). Config via environment: IPFS_API — multiaddr or URL (default: /ip4/127.0.0.1/tcp/5001) IPFS_TIMEOUT — request timeout in seconds (default: 60) IPFS_GATEWAY_URL — public gateway for CID links (default: https://ipfs.io) """ from __future__ import annotations import json import logging import os import re import httpx logger = logging.getLogger(__name__) class IPFSError(Exception): """Raised when an IPFS operation fails.""" # -- Config ------------------------------------------------------------------ IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001") IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "60")) IPFS_GATEWAY_URL = os.getenv("IPFS_GATEWAY_URL", "https://ipfs.io") def _multiaddr_to_url(multiaddr: str) -> str: """Convert IPFS multiaddr to HTTP URL.""" dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr) if dns_match: return f"http://{dns_match.group(1)}:{dns_match.group(2)}" ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr) if ip4_match: return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}" if multiaddr.startswith("http"): return multiaddr return "http://127.0.0.1:5001" IPFS_BASE_URL = _multiaddr_to_url(IPFS_API) # -- Async client functions -------------------------------------------------- async def add_bytes(data: bytes, *, pin: bool = True) -> str: """Add raw bytes to IPFS. Returns the CID. """ try: async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client: resp = await client.post( f"{IPFS_BASE_URL}/api/v0/add", params={"pin": str(pin).lower()}, files={"file": ("data", data)}, ) resp.raise_for_status() cid = resp.json()["Hash"] logger.info("Added to IPFS: %d bytes -> %s", len(data), cid) return cid except Exception as e: logger.error("Failed to add bytes to IPFS: %s", e) raise IPFSError(f"Failed to add bytes: {e}") from e async def add_json(data: dict) -> str: """Serialize dict to sorted JSON and add to IPFS.""" json_bytes = json.dumps(data, indent=2, sort_keys=True).encode("utf-8") return await add_bytes(json_bytes, pin=True) async def get_bytes(cid: str) -> bytes | None: """Fetch content from IPFS by CID.""" try: async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client: resp = await client.post( f"{IPFS_BASE_URL}/api/v0/cat", params={"arg": cid}, ) resp.raise_for_status() logger.info("Retrieved from IPFS: %s (%d bytes)", cid, len(resp.content)) return resp.content except Exception as e: logger.error("Failed to get from IPFS: %s", e) return None async def pin_cid(cid: str) -> bool: """Pin a CID on this node.""" try: async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client: resp = await client.post( f"{IPFS_BASE_URL}/api/v0/pin/add", params={"arg": cid}, ) resp.raise_for_status() logger.info("Pinned on IPFS: %s", cid) return True except Exception as e: logger.error("Failed to pin on IPFS: %s", e) return False async def unpin_cid(cid: str) -> bool: """Unpin a CID from this node.""" try: async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client: resp = await client.post( f"{IPFS_BASE_URL}/api/v0/pin/rm", params={"arg": cid}, ) resp.raise_for_status() logger.info("Unpinned from IPFS: %s", cid) return True except Exception as e: logger.error("Failed to unpin from IPFS: %s", e) return False async def is_available() -> bool: """Check if IPFS daemon is reachable.""" try: async with httpx.AsyncClient(timeout=5) as client: resp = await client.post(f"{IPFS_BASE_URL}/api/v0/id") return resp.status_code == 200 except Exception: return False def gateway_url(cid: str) -> str: """Return a public gateway URL for a CID.""" return f"{IPFS_GATEWAY_URL}/ipfs/{cid}"