diff --git a/db.py b/db.py index 64e9a73..886cbc7 100644 --- a/db.py +++ b/db.py @@ -52,6 +52,7 @@ CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS assets ( name VARCHAR(255) PRIMARY KEY, content_hash VARCHAR(128) NOT NULL, + ipfs_cid VARCHAR(128), asset_type VARCHAR(50) NOT NULL, tags JSONB DEFAULT '[]'::jsonb, metadata JSONB DEFAULT '{}'::jsonb, @@ -248,12 +249,13 @@ async def create_asset(asset: dict) -> dict: """Create a new asset.""" async with get_connection() as conn: row = await conn.fetchrow( - """INSERT INTO assets (name, content_hash, asset_type, tags, metadata, + """INSERT INTO assets (name, content_hash, ipfs_cid, asset_type, tags, metadata, url, provenance, description, origin, owner, created_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING *""", asset["name"], asset["content_hash"], + asset.get("ipfs_cid"), asset["asset_type"], json.dumps(asset.get("tags", [])), json.dumps(asset.get("metadata", {})), diff --git a/docker-compose.yml b/docker-compose.yml index f9863ed..3af252c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,6 +17,17 @@ services: timeout: 5s retries: 5 + ipfs: + image: ipfs/kubo:latest + volumes: + - ipfs_data:/data/ipfs + networks: + - internal + deploy: + replicas: 1 + restart_policy: + condition: on-failure + l2-server: image: git.rose-ash.com/art-dag/l2-server:latest env_file: @@ -24,6 +35,7 @@ services: environment: - ARTDAG_DATA=/data/l2 - DATABASE_URL=postgresql://artdag:${POSTGRES_PASSWORD:-artdag}@postgres:5432/artdag + - IPFS_API=/dns/ipfs/tcp/5001 # ARTDAG_DOMAIN, ARTDAG_USER, JWT_SECRET from .env file volumes: - l2_data:/data/l2 # Still needed for RSA keys @@ -32,6 +44,7 @@ services: - externalnet depends_on: - postgres + - ipfs deploy: replicas: 1 restart_policy: @@ -40,6 +53,7 @@ services: volumes: l2_data: postgres_data: + ipfs_data: networks: internal: diff --git a/ipfs_client.py b/ipfs_client.py new file mode 100644 index 0000000..fd46890 --- /dev/null +++ b/ipfs_client.py @@ -0,0 +1,117 @@ +# art-activity-pub/ipfs_client.py +""" +IPFS client for Art DAG L2 server. + +Provides functions to fetch and pin content from IPFS. +L2 uses IPFS to retrieve content from the federated network. +""" + +import logging +import os +from typing import Optional + +import ipfshttpclient + +logger = logging.getLogger(__name__) + +# IPFS API multiaddr - default to local, docker uses /dns/ipfs/tcp/5001 +IPFS_API = os.getenv("IPFS_API", "/ip4/127.0.0.1/tcp/5001") + +# Connection timeout in seconds +IPFS_TIMEOUT = int(os.getenv("IPFS_TIMEOUT", "60")) + + +def get_client(): + """Get an IPFS client connection.""" + return ipfshttpclient.connect(IPFS_API, timeout=IPFS_TIMEOUT) + + +def get_bytes(cid: str) -> Optional[bytes]: + """ + Retrieve content from IPFS by CID. + + Args: + cid: IPFS CID to retrieve + + Returns: + Content as bytes or None on failure + """ + try: + with get_client() as client: + data = client.cat(cid) + logger.info(f"Retrieved from IPFS: {cid} ({len(data)} bytes)") + return data + except Exception as e: + logger.error(f"Failed to get from IPFS: {e}") + return None + + +def pin(cid: str) -> bool: + """ + Pin a CID on this node. + + Args: + cid: IPFS CID to pin + + Returns: + True on success, False on failure + """ + try: + with get_client() as client: + client.pin.add(cid) + logger.info(f"Pinned on IPFS: {cid}") + return True + except Exception as e: + logger.error(f"Failed to pin on IPFS: {e}") + return False + + +def unpin(cid: str) -> bool: + """ + Unpin a CID from this node. + + Args: + cid: IPFS CID to unpin + + Returns: + True on success, False on failure + """ + try: + with get_client() as client: + client.pin.rm(cid) + logger.info(f"Unpinned from IPFS: {cid}") + return True + except Exception as e: + logger.error(f"Failed to unpin from IPFS: {e}") + return False + + +def is_available() -> bool: + """ + Check if IPFS daemon is available. + + Returns: + True if IPFS is available, False otherwise + """ + try: + with get_client() as client: + client.id() + return True + except Exception: + return False + + +def get_node_id() -> Optional[str]: + """ + Get this IPFS node's peer ID. + + Returns: + Peer ID string or None on failure + """ + try: + with get_client() as client: + info = client.id() + return info.get("ID") + except Exception as e: + logger.error(f"Failed to get node ID: {e}") + return None diff --git a/requirements.txt b/requirements.txt index faa8463..16e3a47 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ python-jose[cryptography]>=3.3.0 markdown>=3.5.0 python-multipart>=0.0.6 asyncpg>=0.29.0 +ipfshttpclient>=0.7.0 diff --git a/server.py b/server.py index 778a893..b31fd47 100644 --- a/server.py +++ b/server.py @@ -86,6 +86,7 @@ class Asset(BaseModel): """An owned asset.""" name: str content_hash: str + ipfs_cid: Optional[str] = None # IPFS content identifier asset_type: str # image, video, effect, recipe, infrastructure tags: list[str] = [] metadata: dict = {} @@ -108,6 +109,7 @@ class RegisterRequest(BaseModel): """Request to register an asset.""" name: str content_hash: str + ipfs_cid: Optional[str] = None # IPFS content identifier asset_type: str tags: list[str] = [] metadata: dict = {} @@ -125,6 +127,7 @@ class RecordRunRequest(BaseModel): class PublishCacheRequest(BaseModel): """Request to publish a cache item from L1.""" content_hash: str + ipfs_cid: Optional[str] = None # IPFS content identifier asset_name: str asset_type: str = "image" origin: dict # {type: "self"|"external", url?: str, note?: str} @@ -1613,11 +1616,21 @@ async def _register_asset_impl(req: RegisterRequest, owner: str): if await db.asset_exists(req.name): raise HTTPException(400, f"Asset already exists: {req.name}") + # Pin content on IPFS if CID provided + if req.ipfs_cid: + try: + import ipfs_client + if ipfs_client.is_available(): + ipfs_client.pin(req.ipfs_cid) + except Exception as e: + logger.warning(f"Failed to pin IPFS content {req.ipfs_cid}: {e}") + # Create asset now = datetime.now(timezone.utc).isoformat() asset = { "name": req.name, "content_hash": req.content_hash, + "ipfs_cid": req.ipfs_cid, "asset_type": req.asset_type, "tags": req.tags, "metadata": req.metadata, @@ -1735,11 +1748,21 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi if await db.asset_exists(req.asset_name): raise HTTPException(400, f"Asset name already exists: {req.asset_name}") + # Pin content on IPFS if CID provided + if req.ipfs_cid: + try: + import ipfs_client + if ipfs_client.is_available(): + ipfs_client.pin(req.ipfs_cid) + except Exception as e: + logger.warning(f"Failed to pin IPFS content {req.ipfs_cid}: {e}") + # Create asset now = datetime.now(timezone.utc).isoformat() asset = { "name": req.asset_name, "content_hash": req.content_hash, + "ipfs_cid": req.ipfs_cid, "asset_type": req.asset_type, "tags": req.tags, "description": req.description,