Phase 0+1 of ActivityPub integration: - 6 ORM models (ActorProfile, APActivity, APFollower, APInboxItem, APAnchor, IPFSPin) - FederationService protocol + SqlFederationService implementation + stub - 4 DTOs (ActorProfileDTO, APActivityDTO, APFollowerDTO, APAnchorDTO) - Registry slot for federation service - Alembic migration for federation tables - IPFS async client (httpx-based) - HTTP Signatures (RSA-2048 sign/verify) - login_url() now uses AUTH_APP env var for flexible auth routing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
142 lines
4.4 KiB
Python
142 lines
4.4 KiB
Python
"""Async IPFS client for content-addressed storage.
|
|
|
|
All content can be stored on IPFS — blog posts, products, activities, etc.
|
|
Ported from ~/art-dag/activity-pub/ipfs_client.py (converted to async httpx).
|
|
|
|
Config via environment:
|
|
IPFS_API — multiaddr or URL (default: /ip4/127.0.0.1/tcp/5001)
|
|
IPFS_TIMEOUT — request timeout in seconds (default: 60)
|
|
IPFS_GATEWAY_URL — public gateway for CID links (default: https://ipfs.io)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IPFSError(Exception):
|
|
"""Raised when an IPFS operation fails."""
|
|
|
|
|
|
# -- Config ------------------------------------------------------------------
|
|
|
|
IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001")
|
|
IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "60"))
|
|
IPFS_GATEWAY_URL = os.getenv("IPFS_GATEWAY_URL", "https://ipfs.io")
|
|
|
|
|
|
def _multiaddr_to_url(multiaddr: str) -> str:
|
|
"""Convert IPFS multiaddr to HTTP URL."""
|
|
dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if dns_match:
|
|
return f"http://{dns_match.group(1)}:{dns_match.group(2)}"
|
|
|
|
ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if ip4_match:
|
|
return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}"
|
|
|
|
if multiaddr.startswith("http"):
|
|
return multiaddr
|
|
return "http://127.0.0.1:5001"
|
|
|
|
|
|
IPFS_BASE_URL = _multiaddr_to_url(IPFS_API)
|
|
|
|
|
|
# -- Async client functions --------------------------------------------------
|
|
|
|
async def add_bytes(data: bytes, *, pin: bool = True) -> str:
|
|
"""Add raw bytes to IPFS.
|
|
|
|
Returns the CID.
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
f"{IPFS_BASE_URL}/api/v0/add",
|
|
params={"pin": str(pin).lower()},
|
|
files={"file": ("data", data)},
|
|
)
|
|
resp.raise_for_status()
|
|
cid = resp.json()["Hash"]
|
|
logger.info("Added to IPFS: %d bytes -> %s", len(data), cid)
|
|
return cid
|
|
except Exception as e:
|
|
logger.error("Failed to add bytes to IPFS: %s", e)
|
|
raise IPFSError(f"Failed to add bytes: {e}") from e
|
|
|
|
|
|
async def add_json(data: dict) -> str:
|
|
"""Serialize dict to sorted JSON and add to IPFS."""
|
|
json_bytes = json.dumps(data, indent=2, sort_keys=True).encode("utf-8")
|
|
return await add_bytes(json_bytes, pin=True)
|
|
|
|
|
|
async def get_bytes(cid: str) -> bytes | None:
|
|
"""Fetch content from IPFS by CID."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
f"{IPFS_BASE_URL}/api/v0/cat",
|
|
params={"arg": cid},
|
|
)
|
|
resp.raise_for_status()
|
|
logger.info("Retrieved from IPFS: %s (%d bytes)", cid, len(resp.content))
|
|
return resp.content
|
|
except Exception as e:
|
|
logger.error("Failed to get from IPFS: %s", e)
|
|
return None
|
|
|
|
|
|
async def pin_cid(cid: str) -> bool:
|
|
"""Pin a CID on this node."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
f"{IPFS_BASE_URL}/api/v0/pin/add",
|
|
params={"arg": cid},
|
|
)
|
|
resp.raise_for_status()
|
|
logger.info("Pinned on IPFS: %s", cid)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to pin on IPFS: %s", e)
|
|
return False
|
|
|
|
|
|
async def unpin_cid(cid: str) -> bool:
|
|
"""Unpin a CID from this node."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
f"{IPFS_BASE_URL}/api/v0/pin/rm",
|
|
params={"arg": cid},
|
|
)
|
|
resp.raise_for_status()
|
|
logger.info("Unpinned from IPFS: %s", cid)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to unpin from IPFS: %s", e)
|
|
return False
|
|
|
|
|
|
async def is_available() -> bool:
|
|
"""Check if IPFS daemon is reachable."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as client:
|
|
resp = await client.post(f"{IPFS_BASE_URL}/api/v0/id")
|
|
return resp.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def gateway_url(cid: str) -> str:
|
|
"""Return a public gateway URL for a CID."""
|
|
return f"{IPFS_GATEWAY_URL}/ipfs/{cid}"
|