""" Naming service for friendly names. Handles: - Name normalization (My Cool Effect -> my-cool-effect) - Version ID generation (server-signed timestamps) - Friendly name assignment and resolution """ import hmac import os import re import time from typing import Optional, Tuple import database # Base32 Crockford alphabet (excludes I, L, O, U to avoid confusion) CROCKFORD_ALPHABET = "0123456789abcdefghjkmnpqrstvwxyz" def _get_server_secret() -> bytes: """Get server secret for signing version IDs.""" secret = os.environ.get("SERVER_SECRET", "") if not secret: # Fall back to a derived secret from other env vars # In production, SERVER_SECRET should be set explicitly secret = os.environ.get("SECRET_KEY", "default-dev-secret") return secret.encode("utf-8") def _base32_crockford_encode(data: bytes) -> str: """Encode bytes as base32-crockford (lowercase).""" # Convert bytes to integer num = int.from_bytes(data, "big") if num == 0: return CROCKFORD_ALPHABET[0] result = [] while num > 0: result.append(CROCKFORD_ALPHABET[num % 32]) num //= 32 return "".join(reversed(result)) def generate_version_id() -> str: """ Generate a version ID that is: - Always increasing (timestamp-based prefix) - Verifiable as originating from this server (HMAC suffix) - Short and URL-safe (13 chars) Format: 6 bytes timestamp (ms) + 2 bytes HMAC = 8 bytes = 13 base32 chars """ timestamp_ms = int(time.time() * 1000) timestamp_bytes = timestamp_ms.to_bytes(6, "big") # HMAC the timestamp with server secret secret = _get_server_secret() sig = hmac.new(secret, timestamp_bytes, "sha256").digest() # Combine: 6 bytes timestamp + 2 bytes HMAC signature combined = timestamp_bytes + sig[:2] # Encode as base32-crockford return _base32_crockford_encode(combined) def normalize_name(name: str) -> str: """ Normalize a display name to a base name. - Lowercase - Replace spaces and underscores with dashes - Remove special characters (keep alphanumeric and dashes) - Collapse multiple dashes - Strip leading/trailing dashes Examples: "My Cool Effect" -> "my-cool-effect" "Brightness_V2" -> "brightness-v2" "Test!!!Effect" -> "test-effect" """ # Lowercase name = name.lower() # Replace spaces and underscores with dashes name = re.sub(r"[\s_]+", "-", name) # Remove anything that's not alphanumeric or dash name = re.sub(r"[^a-z0-9-]", "", name) # Collapse multiple dashes name = re.sub(r"-+", "-", name) # Strip leading/trailing dashes name = name.strip("-") return name or "unnamed" def parse_friendly_name(friendly_name: str) -> Tuple[str, Optional[str]]: """ Parse a friendly name into base name and optional version. Args: friendly_name: Name like "my-effect" or "my-effect 01hw3x9k" Returns: Tuple of (base_name, version_id or None) """ parts = friendly_name.strip().split(" ", 1) base_name = parts[0] version_id = parts[1] if len(parts) > 1 else None return base_name, version_id def format_friendly_name(base_name: str, version_id: str) -> str: """Format a base name and version into a full friendly name.""" return f"{base_name} {version_id}" def format_l2_name(actor_id: str, base_name: str, version_id: str) -> str: """ Format a friendly name for L2 sharing. Format: @user@domain base-name version-id """ return f"{actor_id} {base_name} {version_id}" class NamingService: """Service for managing friendly names.""" async def assign_name( self, cid: str, actor_id: str, item_type: str, display_name: Optional[str] = None, filename: Optional[str] = None, ) -> dict: """ Assign a friendly name to content. Args: cid: Content ID actor_id: User ID item_type: Type (recipe, effect, media) display_name: Human-readable name (optional) filename: Original filename (used as fallback for media) Returns: Friendly name entry dict """ # Determine display name if not display_name: if filename: # Use filename without extension display_name = os.path.splitext(filename)[0] else: display_name = f"unnamed-{item_type}" # Normalize to base name base_name = normalize_name(display_name) # Generate version ID version_id = generate_version_id() # Create database entry entry = await database.create_friendly_name( actor_id=actor_id, base_name=base_name, version_id=version_id, cid=cid, item_type=item_type, display_name=display_name, ) return entry async def get_by_cid(self, actor_id: str, cid: str) -> Optional[dict]: """Get friendly name entry by CID.""" return await database.get_friendly_name_by_cid(actor_id, cid) async def resolve( self, actor_id: str, name: str, item_type: Optional[str] = None, ) -> Optional[str]: """ Resolve a friendly name to a CID. Args: actor_id: User ID name: Friendly name ("base-name" or "base-name version") item_type: Optional type filter Returns: CID or None if not found """ return await database.resolve_friendly_name(actor_id, name, item_type) async def list_names( self, actor_id: str, item_type: Optional[str] = None, latest_only: bool = False, ) -> list: """List friendly names for a user.""" return await database.list_friendly_names( actor_id=actor_id, item_type=item_type, latest_only=latest_only, ) async def delete(self, actor_id: str, cid: str) -> bool: """Delete a friendly name entry.""" return await database.delete_friendly_name(actor_id, cid) # Module-level instance _naming_service: Optional[NamingService] = None def get_naming_service() -> NamingService: """Get the naming service singleton.""" global _naming_service if _naming_service is None: _naming_service = NamingService() return _naming_service