All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m5s
Combines shared, blog, market, cart, events, federation, and account into a single repository. Eliminates submodule sync, sibling model copying at build time, and per-app CI orchestration. Changes: - Remove per-app .git, .gitmodules, .gitea, submodule shared/ dirs - Remove stale sibling model copies from each app - Update all 6 Dockerfiles for monorepo build context (root = .) - Add build directives to docker-compose.yml - Add single .gitea/workflows/ci.yml with change detection - Add .dockerignore for monorepo build context - Create __init__.py for federation and account (cross-app imports)
142 lines
4.4 KiB
Python
142 lines
4.4 KiB
Python
"""Async IPFS client for content-addressed storage.
|
|
|
|
All content can be stored on IPFS — blog posts, products, activities, etc.
|
|
Ported from ~/art-dag/activity-pub/ipfs_client.py (converted to async httpx).
|
|
|
|
Config via environment:
|
|
IPFS_API — multiaddr or URL (default: /ip4/127.0.0.1/tcp/5001)
|
|
IPFS_TIMEOUT — request timeout in seconds (default: 60)
|
|
IPFS_GATEWAY_URL — public gateway for CID links (default: https://ipfs.io)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IPFSError(Exception):
|
|
"""Raised when an IPFS operation fails."""
|
|
|
|
|
|
# -- Config ------------------------------------------------------------------
|
|
|
|
IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001")
|
|
IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "60"))
|
|
IPFS_GATEWAY_URL = os.getenv("IPFS_GATEWAY_URL", "https://ipfs.io")
|
|
|
|
|
|
def _multiaddr_to_url(multiaddr: str) -> str:
|
|
"""Convert IPFS multiaddr to HTTP URL."""
|
|
dns_match = re.match(r"/dns[46]?/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if dns_match:
|
|
return f"http://{dns_match.group(1)}:{dns_match.group(2)}"
|
|
|
|
ip4_match = re.match(r"/ip4/([^/]+)/tcp/(\d+)", multiaddr)
|
|
if ip4_match:
|
|
return f"http://{ip4_match.group(1)}:{ip4_match.group(2)}"
|
|
|
|
if multiaddr.startswith("http"):
|
|
return multiaddr
|
|
return "http://127.0.0.1:5001"
|
|
|
|
|
|
IPFS_BASE_URL = _multiaddr_to_url(IPFS_API)
|
|
|
|
|
|
# -- Async client functions --------------------------------------------------
|
|
|
|
async def add_bytes(data: bytes, *, pin: bool = True) -> str:
|
|
"""Add raw bytes to IPFS.
|
|
|
|
Returns the CID.
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
f"{IPFS_BASE_URL}/api/v0/add",
|
|
params={"pin": str(pin).lower()},
|
|
files={"file": ("data", data)},
|
|
)
|
|
resp.raise_for_status()
|
|
cid = resp.json()["Hash"]
|
|
logger.info("Added to IPFS: %d bytes -> %s", len(data), cid)
|
|
return cid
|
|
except Exception as e:
|
|
logger.error("Failed to add bytes to IPFS: %s", e)
|
|
raise IPFSError(f"Failed to add bytes: {e}") from e
|
|
|
|
|
|
async def add_json(data: dict) -> str:
|
|
"""Serialize dict to sorted JSON and add to IPFS."""
|
|
json_bytes = json.dumps(data, indent=2, sort_keys=True).encode("utf-8")
|
|
return await add_bytes(json_bytes, pin=True)
|
|
|
|
|
|
async def get_bytes(cid: str) -> bytes | None:
|
|
"""Fetch content from IPFS by CID."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
f"{IPFS_BASE_URL}/api/v0/cat",
|
|
params={"arg": cid},
|
|
)
|
|
resp.raise_for_status()
|
|
logger.info("Retrieved from IPFS: %s (%d bytes)", cid, len(resp.content))
|
|
return resp.content
|
|
except Exception as e:
|
|
logger.error("Failed to get from IPFS: %s", e)
|
|
return None
|
|
|
|
|
|
async def pin_cid(cid: str) -> bool:
|
|
"""Pin a CID on this node."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
f"{IPFS_BASE_URL}/api/v0/pin/add",
|
|
params={"arg": cid},
|
|
)
|
|
resp.raise_for_status()
|
|
logger.info("Pinned on IPFS: %s", cid)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to pin on IPFS: %s", e)
|
|
return False
|
|
|
|
|
|
async def unpin_cid(cid: str) -> bool:
|
|
"""Unpin a CID from this node."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=IPFS_TIMEOUT) as client:
|
|
resp = await client.post(
|
|
f"{IPFS_BASE_URL}/api/v0/pin/rm",
|
|
params={"arg": cid},
|
|
)
|
|
resp.raise_for_status()
|
|
logger.info("Unpinned from IPFS: %s", cid)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to unpin from IPFS: %s", e)
|
|
return False
|
|
|
|
|
|
async def is_available() -> bool:
|
|
"""Check if IPFS daemon is reachable."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as client:
|
|
resp = await client.post(f"{IPFS_BASE_URL}/api/v0/id")
|
|
return resp.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def gateway_url(cid: str) -> str:
|
|
"""Return a public gateway URL for a CID."""
|
|
return f"{IPFS_GATEWAY_URL}/ipfs/{cid}"
|