120 lines
3.4 KiB
Python
120 lines
3.4 KiB
Python
"""
|
|
Key management for ActivityPub signing.
|
|
|
|
Keys are stored in DATA_DIR/keys/:
|
|
- {username}.pem - Private key (chmod 600)
|
|
- {username}.pub - Public key
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
|
|
|
|
|
def get_keys_dir(data_dir: Path) -> Path:
|
|
"""Get keys directory, create if needed."""
|
|
keys_dir = data_dir / "keys"
|
|
keys_dir.mkdir(parents=True, exist_ok=True)
|
|
return keys_dir
|
|
|
|
|
|
def generate_keypair(data_dir: Path, username: str) -> tuple[str, str]:
|
|
"""Generate RSA keypair for a user.
|
|
|
|
Returns (private_pem, public_pem)
|
|
"""
|
|
keys_dir = get_keys_dir(data_dir)
|
|
private_path = keys_dir / f"{username}.pem"
|
|
public_path = keys_dir / f"{username}.pub"
|
|
|
|
# Generate key
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=2048,
|
|
)
|
|
|
|
# Serialize private key
|
|
private_pem = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
).decode()
|
|
|
|
# Serialize public key
|
|
public_pem = private_key.public_key().public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
).decode()
|
|
|
|
# Save keys
|
|
private_path.write_text(private_pem)
|
|
private_path.chmod(0o600)
|
|
public_path.write_text(public_pem)
|
|
|
|
return private_pem, public_pem
|
|
|
|
|
|
def load_private_key(data_dir: Path, username: str):
|
|
"""Load private key for signing."""
|
|
keys_dir = get_keys_dir(data_dir)
|
|
private_path = keys_dir / f"{username}.pem"
|
|
|
|
if not private_path.exists():
|
|
raise FileNotFoundError(f"Private key not found: {private_path}")
|
|
|
|
private_pem = private_path.read_text()
|
|
return serialization.load_pem_private_key(
|
|
private_pem.encode(),
|
|
password=None
|
|
)
|
|
|
|
|
|
def load_public_key_pem(data_dir: Path, username: str) -> str:
|
|
"""Load public key PEM for actor profile."""
|
|
keys_dir = get_keys_dir(data_dir)
|
|
public_path = keys_dir / f"{username}.pub"
|
|
|
|
if not public_path.exists():
|
|
raise FileNotFoundError(f"Public key not found: {public_path}")
|
|
|
|
return public_path.read_text()
|
|
|
|
|
|
def has_keys(data_dir: Path, username: str) -> bool:
|
|
"""Check if keys exist for user."""
|
|
keys_dir = get_keys_dir(data_dir)
|
|
return (keys_dir / f"{username}.pem").exists()
|
|
|
|
|
|
def sign_data(private_key, data: str) -> str:
|
|
"""Sign data with private key, return base64 signature."""
|
|
signature = private_key.sign(
|
|
data.encode(),
|
|
padding.PKCS1v15(),
|
|
hashes.SHA256()
|
|
)
|
|
return base64.b64encode(signature).decode()
|
|
|
|
|
|
def create_signature(data_dir: Path, username: str, domain: str, activity: dict) -> dict:
|
|
"""Create RsaSignature2017 for an activity."""
|
|
private_key = load_private_key(data_dir, username)
|
|
|
|
# Create canonical JSON for signing
|
|
canonical = json.dumps(activity, sort_keys=True, separators=(',', ':'))
|
|
|
|
# Sign
|
|
signature_value = sign_data(private_key, canonical)
|
|
|
|
return {
|
|
"type": "RsaSignature2017",
|
|
"creator": f"https://{domain}/users/{username}#main-key",
|
|
"created": datetime.now(timezone.utc).isoformat(),
|
|
"signatureValue": signature_value
|
|
}
|