Files
rose-ash/artdag/l2/ipfs_client.py
giles 1a74d811f7
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m33s
Incorporate art-dag-mono repo into artdag/ subfolder
Merges full history from art-dag/mono.git into the monorepo
under the artdag/ directory. Contains: core (DAG engine),
l1 (Celery rendering server), l2 (ActivityPub registry),
common (shared templates/middleware), client (CLI), test (e2e).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

git-subtree-dir: artdag
git-subtree-mainline: 1a179de547
git-subtree-split: 4c2e716558
2026-02-27 09:07:23 +00:00

227 lines
5.5 KiB
Python

# art-activity-pub/ipfs_client.py
"""
IPFS client for Art DAG L2 server.
Provides functions to fetch, pin, and add content to IPFS.
Uses direct HTTP API calls for compatibility with all Kubo versions.
"""
import json
import logging
import os
import re
from typing import Optional
import requests
class IPFSError(Exception):
"""Raised when an IPFS operation fails."""
pass
logger = logging.getLogger(__name__)
# IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001
IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001")
# Connection timeout in seconds
IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "60"))
def _multiaddr_to_url(multiaddr: str) -> str:
"""Convert IPFS multiaddr to HTTP URL."""
# Handle /dns/hostname/tcp/port format
dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr)
if dns_match:
return f"http://{dns_match.group(1)}:{dns_match.group(2)}"
# Handle /ip4/address/tcp/port format
ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr)
if ip4_match:
return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}"
# Fallback: assume it's already a URL or use default
if multiaddr.startswith("http"):
return multiaddr
return "http://127.0.0.1:5001"
# Base URL for IPFS API
IPFS_BASE_URL = _multiaddr_to_url(IPFS_API)
def get_bytes(cid: str) -> Optional[bytes]:
"""
Retrieve content from IPFS by CID.
Args:
cid: IPFS CID to retrieve
Returns:
Content as bytes or None on failure
"""
try:
url = f"{IPFS_BASE_URL}/api/v0/cat"
params = {"arg": cid}
response = requests.post(url, params=params, timeout=IPFS_TIMEOUT)
response.raise_for_status()
data = response.content
logger.info(f"Retrieved from IPFS: {cid} ({len(data)} bytes)")
return data
except Exception as e:
logger.error(f"Failed to get from IPFS: {e}")
return None
def pin(cid: str) -> bool:
"""
Pin a CID on this node.
Args:
cid: IPFS CID to pin
Returns:
True on success, False on failure
"""
try:
url = f"{IPFS_BASE_URL}/api/v0/pin/add"
params = {"arg": cid}
response = requests.post(url, params=params, timeout=IPFS_TIMEOUT)
response.raise_for_status()
logger.info(f"Pinned on IPFS: {cid}")
return True
except Exception as e:
logger.error(f"Failed to pin on IPFS: {e}")
return False
def unpin(cid: str) -> bool:
"""
Unpin a CID from this node.
Args:
cid: IPFS CID to unpin
Returns:
True on success, False on failure
"""
try:
url = f"{IPFS_BASE_URL}/api/v0/pin/rm"
params = {"arg": cid}
response = requests.post(url, params=params, timeout=IPFS_TIMEOUT)
response.raise_for_status()
logger.info(f"Unpinned from IPFS: {cid}")
return True
except Exception as e:
logger.error(f"Failed to unpin from IPFS: {e}")
return False
def is_available() -> bool:
"""
Check if IPFS daemon is available.
Returns:
True if IPFS is available, False otherwise
"""
try:
url = f"{IPFS_BASE_URL}/api/v0/id"
response = requests.post(url, timeout=5)
return response.status_code == 200
except Exception:
return False
def get_node_id() -> Optional[str]:
"""
Get this IPFS node's peer ID.
Returns:
Peer ID string or None on failure
"""
try:
url = f"{IPFS_BASE_URL}/api/v0/id"
response = requests.post(url, timeout=IPFS_TIMEOUT)
response.raise_for_status()
return response.json().get("ID")
except Exception as e:
logger.error(f"Failed to get node ID: {e}")
return None
def add_bytes(data: bytes, pin: bool = True) -> str:
"""
Add bytes data to IPFS and optionally pin it.
Args:
data: Bytes to add
pin: Whether to pin the data (default: True)
Returns:
IPFS CID
Raises:
IPFSError: If adding fails
"""
try:
url = f"{IPFS_BASE_URL}/api/v0/add"
params = {"pin": str(pin).lower()}
files = {"file": ("data", data)}
response = requests.post(url, params=params, files=files, timeout=IPFS_TIMEOUT)
response.raise_for_status()
result = response.json()
cid = result["Hash"]
logger.info(f"Added to IPFS: {len(data)} bytes -> {cid}")
return cid
except Exception as e:
logger.error(f"Failed to add bytes to IPFS: {e}")
raise IPFSError(f"Failed to add bytes to IPFS: {e}") from e
def add_json(data: dict) -> str:
"""
Serialize dict to JSON and add to IPFS.
Args:
data: Dictionary to serialize and store
Returns:
IPFS CID
Raises:
IPFSError: If adding fails
"""
json_bytes = json.dumps(data, indent=2, sort_keys=True).encode('utf-8')
return add_bytes(json_bytes, pin=True)
def pin_or_raise(cid: str) -> None:
"""
Pin a CID on IPFS. Raises exception on failure.
Args:
cid: IPFS CID to pin
Raises:
IPFSError: If pinning fails
"""
try:
url = f"{IPFS_BASE_URL}/api/v0/pin/add"
params = {"arg": cid}
response = requests.post(url, params=params, timeout=IPFS_TIMEOUT)
response.raise_for_status()
logger.info(f"Pinned on IPFS: {cid}")
except Exception as e:
logger.error(f"Failed to pin on IPFS: {e}")
raise IPFSError(f"Failed to pin {cid}: {e}") from e