Integration
Sample client code for sending faxes and receiving webhooks. Each example takes the keys.zip you downloaded after redeeming your invite — load it once, then send or receive as many messages as you want.
Sending a fax
Top-level function: takes the keys.zip path, recipient fax number, and the file to send. Looks up the recipient's key bundle, encrypts and signs locally, then uploads the opaque ciphertext to the relay. Throws a coverage error if the recipient isn't on fxe — fall back to PSTN in that case.
"""Send a fax over fxe.
Top-level entry point: ``send_fax(keys_zip, recipient_fax, file_path, from_number)``.
Takes the keys.zip you downloaded after redeeming your invite plus the
recipient fax number + the file to send. Returns the relay's message_id.
Public PyPI deps only:
pip install httpx cryptography liboqs-python
(liboqs-python also requires the liboqs native library — see
https://github.com/open-quantum-safe/liboqs.)
"""
from __future__ import annotations
import base64
import hashlib
import io
import json
import os
import secrets
import time
import uuid
import zipfile
from pathlib import Path
import httpx
import oqs # liboqs-python
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
SUITE = "fxe-v1-mlkem768-x25519-xchacha20"
# Domain-separation tags. Each is length-prefixed so signatures from one
# domain can't be replayed as another. Format: byte(len) || tag || payload.
DOMAIN_REQUEST = b"\x0e" + b"fxe-v1-request" # 14 chars
DOMAIN_ENVELOPE = b"\x0f" + b"fxe-v1-envelope" # 15 chars
DOMAIN_BUNDLE = b"\x0d" + b"fxe-v1-bundle" # 13 chars
AAD_DOMAIN = b"fxe-v1-aad"
HKDF_INFO_WRAP = b"fxe-v1-wrap"
class CoverageError(Exception):
"""The recipient fax isn't on fxe. Fall back to PSTN."""
def send_fax(
keys_zip: Path,
recipient_fax: str,
file_path: Path,
from_number: str,
from_name: str | None = None,
metadata: dict | None = None,
) -> str:
"""Send `file_path` to `recipient_fax`.
`from_number` is your caller-ID. It MUST be one of your org's registered
fax numbers; the relay rejects mismatches with 403.
"""
creds, signing_sk, bundle_sk = _load_keys(keys_zip)
relay = creds["relay"]
# 1. Ask the relay which org currently owns this fax number.
lookup = _signed(
relay, signing_sk, creds["key_id"], "GET",
f"/v1/lookup?fax_number={recipient_fax}",
)
if lookup.status_code == 404:
raise CoverageError(f"{recipient_fax} not covered by fxe")
lookup.raise_for_status()
recipient = lookup.json()
# 2. Encrypt + sign locally. No plaintext or CEK leaves this machine.
meta = metadata or {"filename": file_path.name, "content_type": "application/pdf"}
envelope_json, ciphertext = _encrypt_for(
plaintext=file_path.read_bytes(),
sender_org_id=creds["org_id"],
sender_key_bundle_id=creds["bundle_id"],
sender_bundle_sk=bundle_sk,
recipient_org_id=recipient["org_id"],
recipient_bundles=recipient["bundles"],
from_number=from_number,
from_name=from_name,
to_number=recipient_fax,
metadata=meta,
)
# 3. Upload as multipart: signed envelope (JSON) + opaque ciphertext.
body, content_type = _multipart(envelope_json, ciphertext)
upload = _signed(
relay, signing_sk, creds["key_id"], "POST", "/v1/messages",
body=body, content_type=content_type,
)
upload.raise_for_status()
return upload.json()["message_id"]
# --- crypto ----------------------------------------------------------------
def _encrypt_for(
*,
plaintext: bytes,
sender_org_id: str,
sender_key_bundle_id: str,
sender_bundle_sk: Ed25519PrivateKey,
recipient_org_id: str,
recipient_bundles: list[dict],
from_number: str,
from_name: str | None,
to_number: str,
metadata: dict,
) -> tuple[dict, bytes]:
"""Build a signed envelope + opaque ciphertext for one or more recipient
bundles of the recipient org (multiple = sender's view of rotation overlap)."""
message_id = str(uuid.uuid4())
cek = os.urandom(32)
nonce = os.urandom(24) # full 24-byte nonce stored; we use the first 12 today
aad = _aad(message_id, sender_org_id, recipient_org_id, metadata)
ct_body = ChaCha20Poly1305(cek).encrypt(nonce[:12], plaintext, aad)
ciphertext = nonce + ct_body
ciphertext_hash = hashlib.sha256(ciphertext).digest()
wrapped_keys = []
for b in recipient_bundles:
ml_kem_pub = _b64d(b["ml_kem_pub"])
x25519_pub = _b64d(b["x25519_pub"])
# ML-KEM-768 encapsulate against recipient.
with oqs.KeyEncapsulation("ML-KEM-768") as kem:
kem_ct, ss_pq = kem.encap_secret(ml_kem_pub)
# Ephemeral X25519 -> classical shared secret.
eph_sk = X25519PrivateKey.generate()
eph_pub = eph_sk.public_key().public_bytes_raw()
ss_classical = eph_sk.exchange(X25519PublicKey.from_public_bytes(x25519_pub))
# HKDF binds both shared secrets and the recipient's public keys into
# a single wrap key — thwarts unknown-key-share attacks.
info = HKDF_INFO_WRAP + b"|" + eph_pub + b"|" + x25519_pub + b"|" + ml_kem_pub
wrap_key = HKDF(algorithm=hashes.SHA256(), length=32, salt=None, info=info).derive(
ss_pq + ss_classical
)
wnonce = os.urandom(12)
wrapped_cek = AESGCM(wrap_key).encrypt(wnonce, cek, aad)
wrapped_keys.append({
"recipient_key_bundle_id": b["bundle_id"],
"kem_ct": _b64u(kem_ct),
"x25519_ephemeral_pub": _b64u(eph_pub),
"wrap_nonce": _b64u(wnonce),
"wrapped_cek": _b64u(wrapped_cek),
})
created_at = int(time.time())
# Canonical signing payload — sorted keys, no whitespace, so independent
# clients produce identical bytes for the same envelope.
sig_payload = json.dumps(
{
"message_id": message_id,
"suite": SUITE,
"sender_org_id": sender_org_id,
"sender_key_bundle_id": sender_key_bundle_id,
"recipient_org_id": recipient_org_id,
"from_number": from_number,
"from_name": from_name,
"to_number": to_number,
"ciphertext_sha256": ciphertext_hash.hex(),
"metadata_sha256": hashlib.sha256(
json.dumps(metadata, sort_keys=True).encode()
).hexdigest(),
"created_at": created_at,
},
sort_keys=True,
separators=(",", ":"),
).encode()
sender_sig = sender_bundle_sk.sign(DOMAIN_ENVELOPE + sig_payload)
envelope = {
"message_id": message_id,
"suite": SUITE,
"sender_org_id": sender_org_id,
"sender_key_bundle_id": sender_key_bundle_id,
"recipient_org_id": recipient_org_id,
"from_number": from_number,
"from_name": from_name,
"to_number": to_number,
"size": len(ciphertext),
"ciphertext_sha256": ciphertext_hash.hex(),
"metadata": metadata,
"created_at": created_at,
"sender_sig": _b64u(sender_sig),
"wrapped_keys": wrapped_keys,
}
return envelope, ciphertext
def _aad(message_id: str, sender: str, recipient: str, metadata: dict) -> bytes:
return (
AAD_DOMAIN
+ b"|" + message_id.encode()
+ b"|" + sender.encode()
+ b"|" + recipient.encode()
+ b"|" + hashlib.sha256(json.dumps(metadata, sort_keys=True).encode()).digest()
)
# --- HTTP signing ----------------------------------------------------------
def _signed(relay, signing_sk, key_id, method, path, *, body=b"", content_type=None):
"""Make a signed request (FXE1 scheme). Signature is over a canonical
METHOD\\nPATH\\nsha256(body)\\nts\\nnonce payload, domain-tagged."""
ts = str(int(time.time()))
nonce = _b64u(secrets.token_bytes(16))
body_hash = hashlib.sha256(body).hexdigest()
payload = f"{method}\n{path}\n{body_hash}\n{ts}\n{nonce}".encode()
sig = signing_sk.sign(DOMAIN_REQUEST + payload)
headers = {"Authorization": f"FXE1 key_id={key_id},ts={ts},nonce={nonce},sig={_b64u(sig)}"}
if content_type:
headers["Content-Type"] = content_type
return httpx.request(method, relay + path, headers=headers, content=body, timeout=60)
# --- helpers ---------------------------------------------------------------
def _load_keys(keys_zip: Path) -> tuple[dict, Ed25519PrivateKey, Ed25519PrivateKey]:
"""Open the keys.zip your /redeem download produced and load the two
Ed25519 private keys you'll need to sign requests + envelopes."""
with zipfile.ZipFile(keys_zip) as z:
creds = json.loads(z.read("credentials.json"))
signing_seed = z.read("keys/signing.ed25519.key")
bundle_seed = z.read("keys/bundle.ed25519.key")
return (
creds,
Ed25519PrivateKey.from_private_bytes(signing_seed),
Ed25519PrivateKey.from_private_bytes(bundle_seed),
)
def _b64u(b: bytes) -> str:
return base64.urlsafe_b64encode(b).rstrip(b"=").decode()
def _b64d(s: str) -> bytes:
return base64.urlsafe_b64decode(s + "=" * (-len(s) % 4))
def _multipart(envelope: dict, ciphertext: bytes) -> tuple[bytes, str]:
boundary = "----fxe" + secrets.token_hex(8)
body = io.BytesIO()
body.write(f"--{boundary}\r\n".encode())
body.write(b'Content-Disposition: form-data; name="envelope"\r\n')
body.write(b"Content-Type: application/json\r\n\r\n")
body.write(json.dumps(envelope).encode() + b"\r\n")
body.write(f"--{boundary}\r\n".encode())
body.write(b'Content-Disposition: form-data; name="ciphertext"; filename="ct.bin"\r\n')
body.write(b"Content-Type: application/octet-stream\r\n\r\n")
body.write(ciphertext + b"\r\n")
body.write(f"--{boundary}--\r\n".encode())
return body.getvalue(), f"multipart/form-data; boundary={boundary}"
if __name__ == "__main__":
import sys
# usage: send.py keys.zip +15559876543 fax.pdf +15551110001 ["Acme Fax"]
mid = send_fax(
keys_zip=Path(sys.argv[1]),
recipient_fax=sys.argv[2],
file_path=Path(sys.argv[3]),
from_number=sys.argv[4],
from_name=sys.argv[5] if len(sys.argv) > 5 else None,
)
print(f"sent: {mid}")
Receiving webhooks
Verify the per-endpoint HMAC, dedupe on the event ID, and on message.received fetch + decrypt the envelope, then POST a signed delivery receipt so the relay can delete the ciphertext.
"""Receive fxe webhooks (FastAPI).
When a `message.received` event arrives, fetch the envelope, decrypt locally,
then POST a signed receipt back to /v1/messages/{id}/receipt so the relay can
delete the ciphertext.
Public PyPI deps only:
pip install httpx fastapi uvicorn cryptography liboqs-python
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import time
import zipfile
from pathlib import Path
import oqs
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from fastapi import FastAPI, Header, HTTPException, Request
# Reuse the helpers from send.py — sibling file, same dir.
from send import _load_keys, _signed, _b64u, _b64d # noqa: E402
# Domain tags must match those in send.py / the fxe wire protocol.
DOMAIN_RECEIPT = b"\x0e" + b"fxe-v1-receipt" # 14 chars
DOMAIN_ENVELOPE = b"\x0f" + b"fxe-v1-envelope" # 15 chars
AAD_DOMAIN = b"fxe-v1-aad"
HKDF_INFO_WRAP = b"fxe-v1-wrap"
app = FastAPI()
KEYS_ZIP = Path("/etc/fxe/keys.zip")
WEBHOOK_SECRET = b"<the secret you got when you registered the webhook>"
MAX_SKEW = 300 # seconds
@app.post("/fxe-webhook")
async def webhook(
request: Request,
x_fxe_timestamp: str = Header(...),
x_fxe_signature: str = Header(...),
x_fxe_event_id: str = Header(...),
x_fxe_event_type: str = Header(...),
):
body = await request.body()
if not _verify(body, x_fxe_timestamp, x_fxe_signature):
raise HTTPException(401, "bad signature")
# Idempotency: if you've processed this event_id before, return 200 and skip.
if already_handled(x_fxe_event_id):
return {"ok": True}
event = json.loads(body)
if x_fxe_event_type == "message.received":
await handle_received(event["message_id"])
mark_handled(x_fxe_event_id)
return {"ok": True}
def _verify(body: bytes, ts: str, sig_header: str) -> bool:
if abs(time.time() - int(ts)) > MAX_SKEW:
return False
parts = dict(p.split("=", 1) for p in sig_header.split(","))
expected = hmac.new(WEBHOOK_SECRET, f"{ts}.".encode() + body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, parts.get("v1", ""))
async def handle_received(message_id: str) -> None:
creds, signing_sk, bundle_sk = _load_keys(KEYS_ZIP)
relay = creds["relay"]
# 1. Pull the envelope + ciphertext (signed GET).
res = _signed(relay, signing_sk, creds["key_id"], "GET", f"/v1/messages/{message_id}")
res.raise_for_status()
detail = res.json()
envelope = detail["envelope"]
ciphertext = _b64d(detail["ciphertext"]) # relay returns base64url
# 2. Decrypt locally with your bundle's KEM private keys (loaded from keys.zip).
sender_signing_pub = _fetch_sender_signing_pub(
relay, signing_sk, creds["key_id"], envelope["sender_key_bundle_id"],
)
plaintext = _decrypt(envelope, ciphertext, creds, sender_signing_pub)
persist_to_inbox(message_id, plaintext, envelope.get("metadata", {}))
# 3. Sign a delivery receipt so the relay deletes the ciphertext.
receipt = _sign_receipt(envelope, creds, bundle_sk)
_signed(
relay, signing_sk, creds["key_id"], "POST",
f"/v1/messages/{message_id}/receipt",
body=json.dumps(receipt).encode(),
content_type="application/json",
)
# --- crypto ----------------------------------------------------------------
def _decrypt(
envelope: dict, ciphertext: bytes, creds: dict, sender_signing_pub: bytes,
) -> bytes:
"""Verify the sender's envelope signature, then unwrap our CEK and
AEAD-decrypt the ciphertext. Raises on any tamper."""
# 1. Reconstruct the canonical signing payload and verify.
sig_payload = json.dumps(
{
"message_id": envelope["message_id"],
"suite": envelope["suite"],
"sender_org_id": envelope["sender_org_id"],
"sender_key_bundle_id": envelope["sender_key_bundle_id"],
"recipient_org_id": envelope["recipient_org_id"],
"from_number": envelope["from_number"],
"from_name": envelope.get("from_name"),
"to_number": envelope["to_number"],
"ciphertext_sha256": envelope["ciphertext_sha256"],
"metadata_sha256": hashlib.sha256(
json.dumps(envelope.get("metadata", {}), sort_keys=True).encode()
).hexdigest(),
"created_at": envelope["created_at"],
},
sort_keys=True,
separators=(",", ":"),
).encode()
try:
Ed25519PublicKey.from_public_bytes(sender_signing_pub).verify(
_b64d(envelope["sender_sig"]),
DOMAIN_ENVELOPE + sig_payload,
)
except InvalidSignature:
raise ValueError("envelope signature invalid")
# 2. Verify the ciphertext hasn't been mangled in transit.
if hashlib.sha256(ciphertext).hexdigest() != envelope["ciphertext_sha256"]:
raise ValueError("ciphertext hash mismatch")
# 3. Find the wrapped CEK addressed to *our* bundle and unwrap it.
our_bundle = creds["bundle_id"]
wk = next(
(w for w in envelope["wrapped_keys"] if w["recipient_key_bundle_id"] == our_bundle),
None,
)
if wk is None:
raise KeyError(f"no wrapped key for our bundle {our_bundle}")
ml_kem_sk = _read_secret("keys/kem.mlkem768.key")
ml_kem_pub = _read_secret("keys/kem.mlkem768.pub")
x25519_sk = _read_secret("keys/kem.x25519.key")
x25519_pub = _read_secret("keys/kem.x25519.pub")
with oqs.KeyEncapsulation("ML-KEM-768", secret_key=ml_kem_sk) as kem:
ss_pq = kem.decap_secret(_b64d(wk["kem_ct"]))
eph_pub = _b64d(wk["x25519_ephemeral_pub"])
sk = X25519PrivateKey.from_private_bytes(x25519_sk)
ss_classical = sk.exchange(X25519PublicKey.from_public_bytes(eph_pub))
info = HKDF_INFO_WRAP + b"|" + eph_pub + b"|" + x25519_pub + b"|" + ml_kem_pub
wrap_key = HKDF(
algorithm=hashes.SHA256(), length=32, salt=None, info=info,
).derive(ss_pq + ss_classical)
aad = _aad(
envelope["message_id"], envelope["sender_org_id"],
envelope["recipient_org_id"], envelope.get("metadata", {}),
)
cek = AESGCM(wrap_key).decrypt(_b64d(wk["wrap_nonce"]), _b64d(wk["wrapped_cek"]), aad)
# 4. Finally, AEAD-decrypt the body.
nonce = ciphertext[:24]
body = ciphertext[24:]
return ChaCha20Poly1305(cek).decrypt(nonce[:12], body, aad)
def _sign_receipt(envelope: dict, creds: dict, bundle_sk: Ed25519PrivateKey) -> dict:
"""Sign a delivery receipt the relay verifies before purging the blob."""
now = int(time.time())
payload = json.dumps(
{
"ciphertext_sha256": envelope["ciphertext_sha256"],
"message_id": envelope["message_id"],
"recipient_bundle_id": creds["bundle_id"],
"recipient_org_id": creds["org_id"],
"signed_at": now,
},
sort_keys=True,
separators=(",", ":"),
).encode()
sig = bundle_sk.sign(DOMAIN_RECEIPT + payload)
return {
"message_id": envelope["message_id"],
"ciphertext_sha256": envelope["ciphertext_sha256"],
"recipient_bundle_id": creds["bundle_id"],
"signed_at": now,
"recipient_sig": _b64u(sig),
}
def _aad(message_id: str, sender: str, recipient: str, metadata: dict) -> bytes:
return (
AAD_DOMAIN
+ b"|" + message_id.encode()
+ b"|" + sender.encode()
+ b"|" + recipient.encode()
+ b"|" + hashlib.sha256(json.dumps(metadata, sort_keys=True).encode()).digest()
)
def _fetch_sender_signing_pub(relay, signing_sk, key_id, sender_bundle_id) -> bytes:
"""The relay's /v1/key-bundles/{id} returns the bundle including its
per-bundle ed25519_pub. We need it to verify the envelope signature."""
res = _signed(relay, signing_sk, key_id, "GET", f"/v1/key-bundles/{sender_bundle_id}")
res.raise_for_status()
return _b64d(res.json()["ed25519_pub"])
def _read_secret(name: str) -> bytes:
"""Pull a raw key file out of the keys.zip without re-extracting everything."""
with zipfile.ZipFile(KEYS_ZIP) as z:
return z.read(name)
# --- placeholders you implement -------------------------------------------
def already_handled(event_id: str) -> bool: ...
def mark_handled(event_id: str) -> None: ...
def persist_to_inbox(message_id: str, plaintext: bytes, metadata: dict) -> None: ...
Notes
- All three samples are self-contained — they depend only on public packages (
cryptography+liboqs-python+httpxfor Python, Go 1.24 stdlib +x/crypto,@noble/*for Node). Copy them, adjust the package layout, and they're ready to wire into your stack. - Every API call is signed (per-org Ed25519 over a canonical
METHOD\nPATH\nsha256(body)\nts\nnoncepayload). The relay rejects anything older than ±300 seconds; nonces are remembered for 600 seconds to prevent replay. - Maximum ciphertext size: 100 MiB per message. Larger files are rejected by the relay before they hit storage.