"""RSA key generation and HTTP Signature signing/verification. Keys are stored in DB (ActorProfile), not the filesystem. Ported from ~/art-dag/activity-pub/keys.py. """ from __future__ import annotations import base64 import hashlib import json from datetime import datetime, timezone from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa, padding def generate_rsa_keypair() -> tuple[str, str]: """Generate an RSA-2048 keypair. Returns: (private_pem, public_pem) as UTF-8 strings. """ private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ).decode() public_pem = private_key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ).decode() return private_pem, public_pem def sign_request( private_key_pem: str, key_id: str, method: str, path: str, host: str, body: bytes | None = None, date: str | None = None, ) -> dict[str, str]: """Build HTTP Signature headers for an outgoing request. Returns a dict of headers to merge into the request: ``{"Signature": ..., "Date": ..., "Digest": ..., "Host": ...}`` """ if date is None: date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT") headers_to_sign = [ f"(request-target): {method.lower()} {path}", f"host: {host}", f"date: {date}", ] out_headers: dict[str, str] = { "Host": host, "Date": date, } if body is not None: digest = base64.b64encode(hashlib.sha256(body).digest()).decode() digest_header = f"SHA-256={digest}" headers_to_sign.append(f"digest: {digest_header}") out_headers["Digest"] = digest_header signed_string = "\n".join(headers_to_sign) header_names = " ".join( h.split(":")[0] for h in headers_to_sign ) private_key = serialization.load_pem_private_key( private_key_pem.encode(), password=None, ) signature_bytes = private_key.sign( signed_string.encode(), padding.PKCS1v15(), hashes.SHA256(), ) signature_b64 = base64.b64encode(signature_bytes).decode() out_headers["Signature"] = ( f'keyId="{key_id}",' f'headers="{header_names}",' f'signature="{signature_b64}",' f'algorithm="rsa-sha256"' ) return out_headers def verify_request_signature( public_key_pem: str, signature_header: str, method: str, path: str, headers: dict[str, str], ) -> bool: """Verify an incoming HTTP Signature. Args: public_key_pem: PEM-encoded public key of the sender. signature_header: Value of the ``Signature`` header. method: HTTP method (GET, POST, etc.). path: Request path (e.g. ``/users/alice/inbox``). headers: All request headers (case-insensitive keys). Returns: True if the signature is valid. """ # Parse Signature header parts: dict[str, str] = {} for part in signature_header.split(","): part = part.strip() eq = part.index("=") key = part[:eq] val = part[eq + 1:].strip('"') parts[key] = val signed_headers = parts.get("headers", "date").split() signature_b64 = parts.get("signature", "") # Reconstruct the signed string lines: list[str] = [] # Normalize header lookup to lowercase lc_headers = {k.lower(): v for k, v in headers.items()} for h in signed_headers: if h == "(request-target)": lines.append(f"(request-target): {method.lower()} {path}") else: val = lc_headers.get(h, "") lines.append(f"{h}: {val}") signed_string = "\n".join(lines) public_key = serialization.load_pem_public_key(public_key_pem.encode()) try: public_key.verify( base64.b64decode(signature_b64), signed_string.encode(), padding.PKCS1v15(), hashes.SHA256(), ) return True except Exception: return False def create_ld_signature( private_key_pem: str, key_id: str, activity: dict, ) -> dict: """Create an RsaSignature2017 Linked Data signature for an activity.""" canonical = json.dumps(activity, sort_keys=True, separators=(",", ":")) private_key = serialization.load_pem_private_key( private_key_pem.encode(), password=None, ) signature_bytes = private_key.sign( canonical.encode(), padding.PKCS1v15(), hashes.SHA256(), ) signature_b64 = base64.b64encode(signature_bytes).decode() return { "type": "RsaSignature2017", "creator": key_id, "created": datetime.now(timezone.utc).isoformat(), "signatureValue": signature_b64, }