235 lines
6.4 KiB
Python
235 lines
6.4 KiB
Python
"""
|
|
Naming service for friendly names.
|
|
|
|
Handles:
|
|
- Name normalization (My Cool Effect -> my-cool-effect)
|
|
- Version ID generation (server-signed timestamps)
|
|
- Friendly name assignment and resolution
|
|
"""
|
|
|
|
import hmac
|
|
import os
|
|
import re
|
|
import time
|
|
from typing import Optional, Tuple
|
|
|
|
import database
|
|
|
|
|
|
# Base32 Crockford alphabet (excludes I, L, O, U to avoid confusion)
|
|
CROCKFORD_ALPHABET = "0123456789abcdefghjkmnpqrstvwxyz"
|
|
|
|
|
|
def _get_server_secret() -> bytes:
|
|
"""Get server secret for signing version IDs."""
|
|
secret = os.environ.get("SERVER_SECRET", "")
|
|
if not secret:
|
|
# Fall back to a derived secret from other env vars
|
|
# In production, SERVER_SECRET should be set explicitly
|
|
secret = os.environ.get("SECRET_KEY", "default-dev-secret")
|
|
return secret.encode("utf-8")
|
|
|
|
|
|
def _base32_crockford_encode(data: bytes) -> str:
|
|
"""Encode bytes as base32-crockford (lowercase)."""
|
|
# Convert bytes to integer
|
|
num = int.from_bytes(data, "big")
|
|
if num == 0:
|
|
return CROCKFORD_ALPHABET[0]
|
|
|
|
result = []
|
|
while num > 0:
|
|
result.append(CROCKFORD_ALPHABET[num % 32])
|
|
num //= 32
|
|
|
|
return "".join(reversed(result))
|
|
|
|
|
|
def generate_version_id() -> str:
|
|
"""
|
|
Generate a version ID that is:
|
|
- Always increasing (timestamp-based prefix)
|
|
- Verifiable as originating from this server (HMAC suffix)
|
|
- Short and URL-safe (13 chars)
|
|
|
|
Format: 6 bytes timestamp (ms) + 2 bytes HMAC = 8 bytes = 13 base32 chars
|
|
"""
|
|
timestamp_ms = int(time.time() * 1000)
|
|
timestamp_bytes = timestamp_ms.to_bytes(6, "big")
|
|
|
|
# HMAC the timestamp with server secret
|
|
secret = _get_server_secret()
|
|
sig = hmac.new(secret, timestamp_bytes, "sha256").digest()
|
|
|
|
# Combine: 6 bytes timestamp + 2 bytes HMAC signature
|
|
combined = timestamp_bytes + sig[:2]
|
|
|
|
# Encode as base32-crockford
|
|
return _base32_crockford_encode(combined)
|
|
|
|
|
|
def normalize_name(name: str) -> str:
|
|
"""
|
|
Normalize a display name to a base name.
|
|
|
|
- Lowercase
|
|
- Replace spaces and underscores with dashes
|
|
- Remove special characters (keep alphanumeric and dashes)
|
|
- Collapse multiple dashes
|
|
- Strip leading/trailing dashes
|
|
|
|
Examples:
|
|
"My Cool Effect" -> "my-cool-effect"
|
|
"Brightness_V2" -> "brightness-v2"
|
|
"Test!!!Effect" -> "test-effect"
|
|
"""
|
|
# Lowercase
|
|
name = name.lower()
|
|
|
|
# Replace spaces and underscores with dashes
|
|
name = re.sub(r"[\s_]+", "-", name)
|
|
|
|
# Remove anything that's not alphanumeric or dash
|
|
name = re.sub(r"[^a-z0-9-]", "", name)
|
|
|
|
# Collapse multiple dashes
|
|
name = re.sub(r"-+", "-", name)
|
|
|
|
# Strip leading/trailing dashes
|
|
name = name.strip("-")
|
|
|
|
return name or "unnamed"
|
|
|
|
|
|
def parse_friendly_name(friendly_name: str) -> Tuple[str, Optional[str]]:
|
|
"""
|
|
Parse a friendly name into base name and optional version.
|
|
|
|
Args:
|
|
friendly_name: Name like "my-effect" or "my-effect 01hw3x9k"
|
|
|
|
Returns:
|
|
Tuple of (base_name, version_id or None)
|
|
"""
|
|
parts = friendly_name.strip().split(" ", 1)
|
|
base_name = parts[0]
|
|
version_id = parts[1] if len(parts) > 1 else None
|
|
return base_name, version_id
|
|
|
|
|
|
def format_friendly_name(base_name: str, version_id: str) -> str:
|
|
"""Format a base name and version into a full friendly name."""
|
|
return f"{base_name} {version_id}"
|
|
|
|
|
|
def format_l2_name(actor_id: str, base_name: str, version_id: str) -> str:
|
|
"""
|
|
Format a friendly name for L2 sharing.
|
|
|
|
Format: @user@domain base-name version-id
|
|
"""
|
|
return f"{actor_id} {base_name} {version_id}"
|
|
|
|
|
|
class NamingService:
|
|
"""Service for managing friendly names."""
|
|
|
|
async def assign_name(
|
|
self,
|
|
cid: str,
|
|
actor_id: str,
|
|
item_type: str,
|
|
display_name: Optional[str] = None,
|
|
filename: Optional[str] = None,
|
|
) -> dict:
|
|
"""
|
|
Assign a friendly name to content.
|
|
|
|
Args:
|
|
cid: Content ID
|
|
actor_id: User ID
|
|
item_type: Type (recipe, effect, media)
|
|
display_name: Human-readable name (optional)
|
|
filename: Original filename (used as fallback for media)
|
|
|
|
Returns:
|
|
Friendly name entry dict
|
|
"""
|
|
# Determine display name
|
|
if not display_name:
|
|
if filename:
|
|
# Use filename without extension
|
|
display_name = os.path.splitext(filename)[0]
|
|
else:
|
|
display_name = f"unnamed-{item_type}"
|
|
|
|
# Normalize to base name
|
|
base_name = normalize_name(display_name)
|
|
|
|
# Generate version ID
|
|
version_id = generate_version_id()
|
|
|
|
# Create database entry
|
|
entry = await database.create_friendly_name(
|
|
actor_id=actor_id,
|
|
base_name=base_name,
|
|
version_id=version_id,
|
|
cid=cid,
|
|
item_type=item_type,
|
|
display_name=display_name,
|
|
)
|
|
|
|
return entry
|
|
|
|
async def get_by_cid(self, actor_id: str, cid: str) -> Optional[dict]:
|
|
"""Get friendly name entry by CID."""
|
|
return await database.get_friendly_name_by_cid(actor_id, cid)
|
|
|
|
async def resolve(
|
|
self,
|
|
actor_id: str,
|
|
name: str,
|
|
item_type: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""
|
|
Resolve a friendly name to a CID.
|
|
|
|
Args:
|
|
actor_id: User ID
|
|
name: Friendly name ("base-name" or "base-name version")
|
|
item_type: Optional type filter
|
|
|
|
Returns:
|
|
CID or None if not found
|
|
"""
|
|
return await database.resolve_friendly_name(actor_id, name, item_type)
|
|
|
|
async def list_names(
|
|
self,
|
|
actor_id: str,
|
|
item_type: Optional[str] = None,
|
|
latest_only: bool = False,
|
|
) -> list:
|
|
"""List friendly names for a user."""
|
|
return await database.list_friendly_names(
|
|
actor_id=actor_id,
|
|
item_type=item_type,
|
|
latest_only=latest_only,
|
|
)
|
|
|
|
async def delete(self, actor_id: str, cid: str) -> bool:
|
|
"""Delete a friendly name entry."""
|
|
return await database.delete_friendly_name(actor_id, cid)
|
|
|
|
|
|
# Module-level instance
|
|
_naming_service: Optional[NamingService] = None
|
|
|
|
|
|
def get_naming_service() -> NamingService:
|
|
"""Get the naming service singleton."""
|
|
global _naming_service
|
|
if _naming_service is None:
|
|
_naming_service = NamingService()
|
|
return _naming_service
|