Initial artdag-common shared library

Shared components for L1 and L2 servers:
- Jinja2 template system with base template and components
- Middleware for auth and content negotiation
- Pydantic models for requests/responses
- Utility functions for pagination, media, formatting
- Constants for Tailwind/HTMX/Cytoscape CDNs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-01-11 07:07:59 +00:00
commit fd97812e3d
21 changed files with 1905 additions and 0 deletions

View File

@@ -0,0 +1,19 @@
"""
Utility functions shared across Art-DAG servers.
"""
from .pagination import paginate, get_pagination_params
from .media import detect_media_type, get_media_extension, is_streamable
from .formatting import format_date, format_size, truncate_hash, format_duration
__all__ = [
"paginate",
"get_pagination_params",
"detect_media_type",
"get_media_extension",
"is_streamable",
"format_date",
"format_size",
"truncate_hash",
"format_duration",
]

View File

@@ -0,0 +1,165 @@
"""
Formatting utilities for display.
"""
from datetime import datetime
from typing import Optional, Union
def format_date(
value: Optional[Union[str, datetime]],
length: int = 10,
include_time: bool = False,
) -> str:
"""
Format a date/datetime for display.
Args:
value: Date string or datetime object
length: Length to truncate to (default 10 for YYYY-MM-DD)
include_time: Whether to include time portion
Returns:
Formatted date string
"""
if value is None:
return ""
if isinstance(value, str):
# Parse ISO format string
try:
if "T" in value:
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
else:
return value[:length]
except ValueError:
return value[:length]
else:
dt = value
if include_time:
return dt.strftime("%Y-%m-%d %H:%M")
return dt.strftime("%Y-%m-%d")
def format_size(size_bytes: Optional[int]) -> str:
"""
Format file size in human-readable form.
Args:
size_bytes: Size in bytes
Returns:
Human-readable size string (e.g., "1.5 MB")
"""
if size_bytes is None:
return "Unknown"
if size_bytes < 0:
return "Unknown"
if size_bytes == 0:
return "0 B"
units = ["B", "KB", "MB", "GB", "TB"]
unit_index = 0
size = float(size_bytes)
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
if unit_index == 0:
return f"{int(size)} {units[unit_index]}"
return f"{size:.1f} {units[unit_index]}"
def truncate_hash(value: str, length: int = 16, suffix: str = "...") -> str:
"""
Truncate a hash or long string with ellipsis.
Args:
value: String to truncate
length: Maximum length before truncation
suffix: Suffix to add when truncated
Returns:
Truncated string
"""
if not value:
return ""
if len(value) <= length:
return value
return f"{value[:length]}{suffix}"
def format_duration(seconds: Optional[float]) -> str:
"""
Format duration in human-readable form.
Args:
seconds: Duration in seconds
Returns:
Human-readable duration string (e.g., "2m 30s")
"""
if seconds is None or seconds < 0:
return "Unknown"
if seconds < 1:
return f"{int(seconds * 1000)}ms"
if seconds < 60:
return f"{seconds:.1f}s"
minutes = int(seconds // 60)
remaining_seconds = int(seconds % 60)
if minutes < 60:
if remaining_seconds:
return f"{minutes}m {remaining_seconds}s"
return f"{minutes}m"
hours = minutes // 60
remaining_minutes = minutes % 60
if remaining_minutes:
return f"{hours}h {remaining_minutes}m"
return f"{hours}h"
def format_count(count: int) -> str:
"""
Format a count with abbreviation for large numbers.
Args:
count: Number to format
Returns:
Formatted string (e.g., "1.2K", "3.5M")
"""
if count < 1000:
return str(count)
if count < 1000000:
return f"{count / 1000:.1f}K"
if count < 1000000000:
return f"{count / 1000000:.1f}M"
return f"{count / 1000000000:.1f}B"
def format_percentage(value: float, decimals: int = 1) -> str:
"""
Format a percentage value.
Args:
value: Percentage value (0-100 or 0-1)
decimals: Number of decimal places
Returns:
Formatted percentage string
"""
# Assume 0-1 if less than 1
if value <= 1:
value *= 100
if decimals == 0:
return f"{int(value)}%"
return f"{value:.{decimals}f}%"

View File

@@ -0,0 +1,166 @@
"""
Media type detection and handling utilities.
"""
from pathlib import Path
from typing import Optional
import mimetypes
# Initialize mimetypes database
mimetypes.init()
# Media type categories
VIDEO_TYPES = {"video/mp4", "video/webm", "video/quicktime", "video/x-msvideo", "video/avi"}
IMAGE_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp", "image/svg+xml"}
AUDIO_TYPES = {"audio/mpeg", "audio/wav", "audio/ogg", "audio/flac", "audio/aac", "audio/mp3"}
# File extension mappings
EXTENSION_TO_CATEGORY = {
# Video
".mp4": "video",
".webm": "video",
".mov": "video",
".avi": "video",
".mkv": "video",
# Image
".jpg": "image",
".jpeg": "image",
".png": "image",
".gif": "image",
".webp": "image",
".svg": "image",
# Audio
".mp3": "audio",
".wav": "audio",
".ogg": "audio",
".flac": "audio",
".aac": "audio",
".m4a": "audio",
}
def detect_media_type(path: Path) -> str:
"""
Detect the media category for a file.
Args:
path: Path to the file
Returns:
Category string: "video", "image", "audio", or "unknown"
"""
if not path:
return "unknown"
# Try extension first
ext = path.suffix.lower()
if ext in EXTENSION_TO_CATEGORY:
return EXTENSION_TO_CATEGORY[ext]
# Try mimetypes
mime_type, _ = mimetypes.guess_type(str(path))
if mime_type:
if mime_type in VIDEO_TYPES or mime_type.startswith("video/"):
return "video"
if mime_type in IMAGE_TYPES or mime_type.startswith("image/"):
return "image"
if mime_type in AUDIO_TYPES or mime_type.startswith("audio/"):
return "audio"
return "unknown"
def get_mime_type(path: Path) -> str:
"""
Get the MIME type for a file.
Args:
path: Path to the file
Returns:
MIME type string or "application/octet-stream"
"""
mime_type, _ = mimetypes.guess_type(str(path))
return mime_type or "application/octet-stream"
def get_media_extension(media_type: str) -> str:
"""
Get the typical file extension for a media type.
Args:
media_type: Media category or MIME type
Returns:
File extension with dot (e.g., ".mp4")
"""
if media_type == "video":
return ".mp4"
if media_type == "image":
return ".png"
if media_type == "audio":
return ".mp3"
# Try as MIME type
ext = mimetypes.guess_extension(media_type)
return ext or ""
def is_streamable(path: Path) -> bool:
"""
Check if a file type is streamable (video/audio).
Args:
path: Path to the file
Returns:
True if the file can be streamed
"""
media_type = detect_media_type(path)
return media_type in ("video", "audio")
def needs_conversion(path: Path, target_format: str = "mp4") -> bool:
"""
Check if a video file needs format conversion.
Args:
path: Path to the file
target_format: Target format (default mp4)
Returns:
True if conversion is needed
"""
media_type = detect_media_type(path)
if media_type != "video":
return False
ext = path.suffix.lower().lstrip(".")
return ext != target_format
def get_video_src(
content_hash: str,
original_path: Optional[Path] = None,
is_ios: bool = False,
) -> str:
"""
Get the appropriate video source URL.
For iOS devices, prefer MP4 format.
Args:
content_hash: Content hash for the video
original_path: Optional original file path
is_ios: Whether the client is iOS
Returns:
URL path for the video source
"""
if is_ios:
return f"/cache/{content_hash}/mp4"
if original_path and original_path.suffix.lower() in (".mp4", ".webm"):
return f"/cache/{content_hash}/raw"
return f"/cache/{content_hash}/mp4"

View File

@@ -0,0 +1,85 @@
"""
Pagination utilities.
"""
from typing import List, Any, Tuple, Optional
from fastapi import Request
from ..constants import DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE
def get_pagination_params(request: Request) -> Tuple[int, int]:
"""
Extract pagination parameters from request query string.
Args:
request: FastAPI request
Returns:
Tuple of (page, limit)
"""
try:
page = int(request.query_params.get("page", 1))
page = max(1, page)
except ValueError:
page = 1
try:
limit = int(request.query_params.get("limit", DEFAULT_PAGE_SIZE))
limit = max(1, min(limit, MAX_PAGE_SIZE))
except ValueError:
limit = DEFAULT_PAGE_SIZE
return page, limit
def paginate(
items: List[Any],
page: int = 1,
limit: int = DEFAULT_PAGE_SIZE,
) -> Tuple[List[Any], dict]:
"""
Paginate a list of items.
Args:
items: Full list of items
page: Page number (1-indexed)
limit: Items per page
Returns:
Tuple of (paginated items, pagination info dict)
"""
total = len(items)
start = (page - 1) * limit
end = start + limit
paginated = items[start:end]
return paginated, {
"page": page,
"limit": limit,
"total": total,
"has_more": end < total,
"total_pages": (total + limit - 1) // limit if total > 0 else 1,
}
def calculate_offset(page: int, limit: int) -> int:
"""Calculate database offset from page and limit."""
return (page - 1) * limit
def build_pagination_info(
page: int,
limit: int,
total: int,
) -> dict:
"""Build pagination info dictionary."""
return {
"page": page,
"limit": limit,
"total": total,
"has_more": page * limit < total,
"total_pages": (total + limit - 1) // limit if total > 0 else 1,
}