Files
rose-ash/shared/utils/http_signatures.py
giles f42042ccb7
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m5s
Monorepo: consolidate 7 repos into one
Combines shared, blog, market, cart, events, federation, and account
into a single repository. Eliminates submodule sync, sibling model
copying at build time, and per-app CI orchestration.

Changes:
- Remove per-app .git, .gitmodules, .gitea, submodule shared/ dirs
- Remove stale sibling model copies from each app
- Update all 6 Dockerfiles for monorepo build context (root = .)
- Add build directives to docker-compose.yml
- Add single .gitea/workflows/ci.yml with change detection
- Add .dockerignore for monorepo build context
- Create __init__.py for federation and account (cross-app imports)
2026-02-24 19:44:17 +00:00

182 lines
5.0 KiB
Python

"""RSA key generation and HTTP Signature signing/verification.
Keys are stored in DB (ActorProfile), not the filesystem.
Ported from ~/art-dag/activity-pub/keys.py.
"""
from __future__ import annotations
import base64
import hashlib
import json
from datetime import datetime, timezone
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
def generate_rsa_keypair() -> tuple[str, str]:
"""Generate an RSA-2048 keypair.
Returns:
(private_pem, public_pem) as UTF-8 strings.
"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
return private_pem, public_pem
def sign_request(
private_key_pem: str,
key_id: str,
method: str,
path: str,
host: str,
body: bytes | None = None,
date: str | None = None,
) -> dict[str, str]:
"""Build HTTP Signature headers for an outgoing request.
Returns a dict of headers to merge into the request:
``{"Signature": ..., "Date": ..., "Digest": ..., "Host": ...}``
"""
if date is None:
date = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT")
headers_to_sign = [
f"(request-target): {method.lower()} {path}",
f"host: {host}",
f"date: {date}",
]
out_headers: dict[str, str] = {
"Host": host,
"Date": date,
}
if body is not None:
digest = base64.b64encode(hashlib.sha256(body).digest()).decode()
digest_header = f"SHA-256={digest}"
headers_to_sign.append(f"digest: {digest_header}")
out_headers["Digest"] = digest_header
signed_string = "\n".join(headers_to_sign)
header_names = " ".join(
h.split(":")[0] for h in headers_to_sign
)
private_key = serialization.load_pem_private_key(
private_key_pem.encode(), password=None,
)
signature_bytes = private_key.sign(
signed_string.encode(),
padding.PKCS1v15(),
hashes.SHA256(),
)
signature_b64 = base64.b64encode(signature_bytes).decode()
out_headers["Signature"] = (
f'keyId="{key_id}",'
f'headers="{header_names}",'
f'signature="{signature_b64}",'
f'algorithm="rsa-sha256"'
)
return out_headers
def verify_request_signature(
public_key_pem: str,
signature_header: str,
method: str,
path: str,
headers: dict[str, str],
) -> bool:
"""Verify an incoming HTTP Signature.
Args:
public_key_pem: PEM-encoded public key of the sender.
signature_header: Value of the ``Signature`` header.
method: HTTP method (GET, POST, etc.).
path: Request path (e.g. ``/users/alice/inbox``).
headers: All request headers (case-insensitive keys).
Returns:
True if the signature is valid.
"""
# Parse Signature header
parts: dict[str, str] = {}
for part in signature_header.split(","):
part = part.strip()
eq = part.index("=")
key = part[:eq]
val = part[eq + 1:].strip('"')
parts[key] = val
signed_headers = parts.get("headers", "date").split()
signature_b64 = parts.get("signature", "")
# Reconstruct the signed string
lines: list[str] = []
# Normalize header lookup to lowercase
lc_headers = {k.lower(): v for k, v in headers.items()}
for h in signed_headers:
if h == "(request-target)":
lines.append(f"(request-target): {method.lower()} {path}")
else:
val = lc_headers.get(h, "")
lines.append(f"{h}: {val}")
signed_string = "\n".join(lines)
public_key = serialization.load_pem_public_key(public_key_pem.encode())
try:
public_key.verify(
base64.b64decode(signature_b64),
signed_string.encode(),
padding.PKCS1v15(),
hashes.SHA256(),
)
return True
except Exception:
return False
def create_ld_signature(
private_key_pem: str,
key_id: str,
activity: dict,
) -> dict:
"""Create an RsaSignature2017 Linked Data signature for an activity."""
canonical = json.dumps(activity, sort_keys=True, separators=(",", ":"))
private_key = serialization.load_pem_private_key(
private_key_pem.encode(), password=None,
)
signature_bytes = private_key.sign(
canonical.encode(),
padding.PKCS1v15(),
hashes.SHA256(),
)
signature_b64 = base64.b64encode(signature_bytes).decode()
return {
"type": "RsaSignature2017",
"creator": key_id,
"created": datetime.now(timezone.utc).isoformat(),
"signatureValue": signature_b64,
}