feat: RSA key management for ActivityPub signing
- keys.py: Generate/load RSA-2048 keypairs, sign activities - setup_keys.py: CLI to generate keys - Real RsaSignature2017 signing (falls back to placeholder if no keys) - Public key included in actor profile - Private keys gitignored 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -2,3 +2,7 @@ __pycache__/
|
||||
*.py[cod]
|
||||
.venv/
|
||||
venv/
|
||||
|
||||
# Private keys - NEVER commit these
|
||||
*.pem
|
||||
keys/
|
||||
|
||||
24
README.md
24
README.md
@@ -20,10 +20,34 @@ export ARTDAG_USER=giles
|
||||
export ARTDAG_DATA=~/.artdag/l2
|
||||
export ARTDAG_L1=http://localhost:8100
|
||||
|
||||
# Generate signing keys (required for federation)
|
||||
python setup_keys.py
|
||||
|
||||
# Start server
|
||||
python server.py
|
||||
```
|
||||
|
||||
## Key Setup
|
||||
|
||||
ActivityPub requires RSA keys for signing activities. Generate them:
|
||||
|
||||
```bash
|
||||
# Local
|
||||
python setup_keys.py
|
||||
|
||||
# Or with custom paths
|
||||
python setup_keys.py --data-dir /data/l2 --user giles
|
||||
|
||||
# In Docker, exec into container or mount volume
|
||||
docker exec -it <container> python setup_keys.py
|
||||
```
|
||||
|
||||
Keys are stored in `$ARTDAG_DATA/keys/`:
|
||||
- `{username}.pem` - Private key (chmod 600, NEVER share)
|
||||
- `{username}.pub` - Public key (included in actor profile)
|
||||
|
||||
**Important**: Private keys are gitignored. Back them up securely. Losing them invalidates all your signatures.
|
||||
|
||||
## API Endpoints
|
||||
|
||||
### Server Info
|
||||
|
||||
119
keys.py
Normal file
119
keys.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""
|
||||
Key management for ActivityPub signing.
|
||||
|
||||
Keys are stored in DATA_DIR/keys/:
|
||||
- {username}.pem - Private key (chmod 600)
|
||||
- {username}.pub - Public key
|
||||
"""
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
||||
|
||||
|
||||
def get_keys_dir(data_dir: Path) -> Path:
|
||||
"""Get keys directory, create if needed."""
|
||||
keys_dir = data_dir / "keys"
|
||||
keys_dir.mkdir(parents=True, exist_ok=True)
|
||||
return keys_dir
|
||||
|
||||
|
||||
def generate_keypair(data_dir: Path, username: str) -> tuple[str, str]:
|
||||
"""Generate RSA keypair for a user.
|
||||
|
||||
Returns (private_pem, public_pem)
|
||||
"""
|
||||
keys_dir = get_keys_dir(data_dir)
|
||||
private_path = keys_dir / f"{username}.pem"
|
||||
public_path = keys_dir / f"{username}.pub"
|
||||
|
||||
# Generate key
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048,
|
||||
)
|
||||
|
||||
# Serialize private key
|
||||
private_pem = private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
).decode()
|
||||
|
||||
# Serialize public key
|
||||
public_pem = private_key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
).decode()
|
||||
|
||||
# Save keys
|
||||
private_path.write_text(private_pem)
|
||||
private_path.chmod(0o600)
|
||||
public_path.write_text(public_pem)
|
||||
|
||||
return private_pem, public_pem
|
||||
|
||||
|
||||
def load_private_key(data_dir: Path, username: str):
|
||||
"""Load private key for signing."""
|
||||
keys_dir = get_keys_dir(data_dir)
|
||||
private_path = keys_dir / f"{username}.pem"
|
||||
|
||||
if not private_path.exists():
|
||||
raise FileNotFoundError(f"Private key not found: {private_path}")
|
||||
|
||||
private_pem = private_path.read_text()
|
||||
return serialization.load_pem_private_key(
|
||||
private_pem.encode(),
|
||||
password=None
|
||||
)
|
||||
|
||||
|
||||
def load_public_key_pem(data_dir: Path, username: str) -> str:
|
||||
"""Load public key PEM for actor profile."""
|
||||
keys_dir = get_keys_dir(data_dir)
|
||||
public_path = keys_dir / f"{username}.pub"
|
||||
|
||||
if not public_path.exists():
|
||||
raise FileNotFoundError(f"Public key not found: {public_path}")
|
||||
|
||||
return public_path.read_text()
|
||||
|
||||
|
||||
def has_keys(data_dir: Path, username: str) -> bool:
|
||||
"""Check if keys exist for user."""
|
||||
keys_dir = get_keys_dir(data_dir)
|
||||
return (keys_dir / f"{username}.pem").exists()
|
||||
|
||||
|
||||
def sign_data(private_key, data: str) -> str:
|
||||
"""Sign data with private key, return base64 signature."""
|
||||
signature = private_key.sign(
|
||||
data.encode(),
|
||||
padding.PKCS1v15(),
|
||||
hashes.SHA256()
|
||||
)
|
||||
return base64.b64encode(signature).decode()
|
||||
|
||||
|
||||
def create_signature(data_dir: Path, username: str, domain: str, activity: dict) -> dict:
|
||||
"""Create RsaSignature2017 for an activity."""
|
||||
private_key = load_private_key(data_dir, username)
|
||||
|
||||
# Create canonical JSON for signing
|
||||
canonical = json.dumps(activity, sort_keys=True, separators=(',', ':'))
|
||||
|
||||
# Sign
|
||||
signature_value = sign_data(private_key, canonical)
|
||||
|
||||
return {
|
||||
"type": "RsaSignature2017",
|
||||
"creator": f"https://{domain}/users/{username}#main-key",
|
||||
"created": datetime.now(timezone.utc).isoformat(),
|
||||
"signatureValue": signature_value
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
fastapi>=0.109.0
|
||||
uvicorn>=0.27.0
|
||||
requests>=2.31.0
|
||||
cryptography>=42.0.0
|
||||
|
||||
32
server.py
32
server.py
@@ -117,13 +117,8 @@ def save_activities(activities: list):
|
||||
|
||||
|
||||
def load_actor() -> dict:
|
||||
"""Load actor data."""
|
||||
path = DATA_DIR / "actor.json"
|
||||
if path.exists():
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
# Return default actor
|
||||
return {
|
||||
"""Load actor data with public key if available."""
|
||||
actor = {
|
||||
"id": f"https://{DOMAIN}/users/{USERNAME}",
|
||||
"type": "Person",
|
||||
"preferredUsername": USERNAME,
|
||||
@@ -134,6 +129,17 @@ def load_actor() -> dict:
|
||||
"following": f"https://{DOMAIN}/users/{USERNAME}/following",
|
||||
}
|
||||
|
||||
# Add public key if available
|
||||
from keys import has_keys, load_public_key_pem
|
||||
if has_keys(DATA_DIR, USERNAME):
|
||||
actor["publicKey"] = {
|
||||
"id": f"https://{DOMAIN}/users/{USERNAME}#main-key",
|
||||
"owner": f"https://{DOMAIN}/users/{USERNAME}",
|
||||
"publicKeyPem": load_public_key_pem(DATA_DIR, USERNAME)
|
||||
}
|
||||
|
||||
return actor
|
||||
|
||||
|
||||
def load_followers() -> list:
|
||||
"""Load followers list."""
|
||||
@@ -153,15 +159,21 @@ def save_followers(followers: list):
|
||||
|
||||
# ============ Signing ============
|
||||
|
||||
from keys import has_keys, load_public_key_pem, create_signature
|
||||
|
||||
|
||||
def sign_activity(activity: dict) -> dict:
|
||||
"""Sign an activity (placeholder - real impl uses RSA)."""
|
||||
# In production, use artdag.activitypub.signatures
|
||||
"""Sign an activity with RSA private key."""
|
||||
if not has_keys(DATA_DIR, USERNAME):
|
||||
# No keys - use placeholder (for testing)
|
||||
activity["signature"] = {
|
||||
"type": "RsaSignature2017",
|
||||
"creator": f"https://{DOMAIN}/users/{USERNAME}#main-key",
|
||||
"created": datetime.now(timezone.utc).isoformat(),
|
||||
"signatureValue": "placeholder-implement-real-signing"
|
||||
"signatureValue": "NO_KEYS_CONFIGURED"
|
||||
}
|
||||
else:
|
||||
activity["signature"] = create_signature(DATA_DIR, USERNAME, DOMAIN, activity)
|
||||
return activity
|
||||
|
||||
|
||||
|
||||
51
setup_keys.py
Executable file
51
setup_keys.py
Executable file
@@ -0,0 +1,51 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generate RSA keypair for ActivityPub signing.
|
||||
|
||||
Usage:
|
||||
python setup_keys.py [--data-dir /path/to/data] [--user username]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from keys import generate_keypair, has_keys, get_keys_dir
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Generate RSA keypair for L2 server")
|
||||
parser.add_argument("--data-dir", default=os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2")),
|
||||
help="Data directory")
|
||||
parser.add_argument("--user", default=os.environ.get("ARTDAG_USER", "giles"),
|
||||
help="Username")
|
||||
parser.add_argument("--force", action="store_true",
|
||||
help="Overwrite existing keys")
|
||||
|
||||
args = parser.parse_args()
|
||||
data_dir = Path(args.data_dir)
|
||||
username = args.user
|
||||
|
||||
print(f"Data directory: {data_dir}")
|
||||
print(f"Username: {username}")
|
||||
|
||||
if has_keys(data_dir, username) and not args.force:
|
||||
print(f"\nKeys already exist for {username}!")
|
||||
print(f" Private: {get_keys_dir(data_dir) / f'{username}.pem'}")
|
||||
print(f" Public: {get_keys_dir(data_dir) / f'{username}.pub'}")
|
||||
print("\nUse --force to regenerate (will invalidate existing signatures)")
|
||||
return
|
||||
|
||||
print("\nGenerating RSA-2048 keypair...")
|
||||
private_pem, public_pem = generate_keypair(data_dir, username)
|
||||
|
||||
keys_dir = get_keys_dir(data_dir)
|
||||
print(f"\nKeys generated:")
|
||||
print(f" Private: {keys_dir / f'{username}.pem'} (chmod 600)")
|
||||
print(f" Public: {keys_dir / f'{username}.pub'}")
|
||||
print(f"\nPublic key (for verification):")
|
||||
print(public_pem)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user