Files
common/artdag_common/models/responses.py
giles fd97812e3d Initial artdag-common shared library
Shared components for L1 and L2 servers:
- Jinja2 template system with base template and components
- Middleware for auth and content negotiation
- Pydantic models for requests/responses
- Utility functions for pagination, media, formatting
- Constants for Tailwind/HTMX/Cytoscape CDNs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 07:07:59 +00:00

97 lines
2.7 KiB
Python

"""
Response models shared across L1 and L2 servers.
"""
from typing import Optional, List, Dict, Any, Generic, TypeVar
from pydantic import BaseModel, Field
T = TypeVar("T")
class PaginatedResponse(BaseModel, Generic[T]):
"""Generic paginated response."""
data: List[Any] = Field(default_factory=list)
pagination: Dict[str, Any] = Field(default_factory=dict)
@classmethod
def create(
cls,
items: List[Any],
page: int,
limit: int,
total: int,
) -> "PaginatedResponse":
"""Create a paginated response."""
return cls(
data=items,
pagination={
"page": page,
"limit": limit,
"total": total,
"has_more": page * limit < total,
"total_pages": (total + limit - 1) // limit,
}
)
class ErrorResponse(BaseModel):
"""Standard error response."""
error: str = Field(..., description="Error message")
detail: Optional[str] = Field(default=None, description="Detailed error info")
code: Optional[str] = Field(default=None, description="Error code")
class SuccessResponse(BaseModel):
"""Standard success response."""
success: bool = Field(default=True)
message: Optional[str] = Field(default=None)
data: Optional[Dict[str, Any]] = Field(default=None)
class RunStatus(BaseModel):
"""Run execution status."""
run_id: str
status: str = Field(..., description="pending, running, completed, failed")
recipe: Optional[str] = None
plan_id: Optional[str] = None
output_hash: Optional[str] = None
output_ipfs_cid: Optional[str] = None
total_steps: int = 0
cached_steps: int = 0
completed_steps: int = 0
error: Optional[str] = None
class CacheItemResponse(BaseModel):
"""Cached content item response."""
content_hash: str
media_type: Optional[str] = None
size: Optional[int] = None
name: Optional[str] = None
description: Optional[str] = None
tags: List[str] = Field(default_factory=list)
ipfs_cid: Optional[str] = None
created_at: Optional[str] = None
class RecipeResponse(BaseModel):
"""Recipe response."""
recipe_id: str
name: str
description: Optional[str] = None
inputs: List[Dict[str, Any]] = Field(default_factory=list)
outputs: List[str] = Field(default_factory=list)
node_count: int = 0
created_at: Optional[str] = None
class StorageProviderResponse(BaseModel):
"""Storage provider configuration response."""
storage_id: str
provider_type: str
name: str
is_default: bool = False
is_connected: bool = False
usage_bytes: Optional[int] = None
pin_count: int = 0