""" Key management for ActivityPub signing. Keys are stored in DATA_DIR/keys/: - {username}.pem - Private key (chmod 600) - {username}.pub - Public key """ import base64 import hashlib import json from datetime import datetime, timezone from pathlib import Path from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa, padding def get_keys_dir(data_dir: Path) -> Path: """Get keys directory, create if needed.""" keys_dir = data_dir / "keys" keys_dir.mkdir(parents=True, exist_ok=True) return keys_dir def generate_keypair(data_dir: Path, username: str) -> tuple[str, str]: """Generate RSA keypair for a user. Returns (private_pem, public_pem) """ keys_dir = get_keys_dir(data_dir) private_path = keys_dir / f"{username}.pem" public_path = keys_dir / f"{username}.pub" # Generate key private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) # Serialize private key private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ).decode() # Serialize public key public_pem = private_key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ).decode() # Save keys private_path.write_text(private_pem) private_path.chmod(0o600) public_path.write_text(public_pem) return private_pem, public_pem def load_private_key(data_dir: Path, username: str): """Load private key for signing.""" keys_dir = get_keys_dir(data_dir) private_path = keys_dir / f"{username}.pem" if not private_path.exists(): raise FileNotFoundError(f"Private key not found: {private_path}") private_pem = private_path.read_text() return serialization.load_pem_private_key( private_pem.encode(), password=None ) def load_public_key_pem(data_dir: Path, username: str) -> str: """Load public key PEM for actor profile.""" keys_dir = get_keys_dir(data_dir) public_path = keys_dir / f"{username}.pub" if not public_path.exists(): raise FileNotFoundError(f"Public key not found: {public_path}") return public_path.read_text() def has_keys(data_dir: Path, username: str) -> bool: """Check if keys exist for user.""" keys_dir = get_keys_dir(data_dir) return (keys_dir / f"{username}.pem").exists() def sign_data(private_key, data: str) -> str: """Sign data with private key, return base64 signature.""" signature = private_key.sign( data.encode(), padding.PKCS1v15(), hashes.SHA256() ) return base64.b64encode(signature).decode() def create_signature(data_dir: Path, username: str, domain: str, activity: dict) -> dict: """Create RsaSignature2017 for an activity.""" private_key = load_private_key(data_dir, username) # Create canonical JSON for signing canonical = json.dumps(activity, sort_keys=True, separators=(',', ':')) # Sign signature_value = sign_data(private_key, canonical) return { "type": "RsaSignature2017", "creator": f"https://{domain}/users/{username}#main-key", "created": datetime.now(timezone.utc).isoformat(), "signatureValue": signature_value }