Created
January 18, 2026 22:30
-
-
Save lastforkbender/a8d221042f442381ee01feded4a4af9d to your computer and use it in GitHub Desktop.
panama sv2026 spectrum net error chained encryption
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # panama_sv2026.py | |
| import os | |
| import time | |
| import secrets | |
| import numpy as np | |
| import multiprocessing as mp | |
| from numba import njit, prange | |
| from typing import List, Tuple | |
| from dataclasses import dataclass | |
| from scipy.spatial.distance import cdist | |
| from cryptography.hazmat.primitives import hashes, hmac | |
| from cryptography.hazmat.primitives.kdf.hkdf import HKDF | |
| try: | |
| import bchlib | |
| except Exception as e: | |
| raise RuntimeError("bchlib required") from e | |
| SPECTRUM_LEN = 256 | |
| BLOCKS = 8 | |
| BLOCK_SIZE = SPECTRUM_LEN // BLOCKS | |
| CODEBOOK_PER_BLOCK = 16 | |
| PACKETS = 4096 | |
| NOISE_STD = 0.08 | |
| BCH_POLY = 137 | |
| BCH_T = 5 | |
| DERIVED_KEY_LEN = 32 | |
| HMAC_KEY_LEN = 32 | |
| NUM_WORKERS = max(1, mp.cpu_count() - 1) | |
| BATCH_SIZE = 64 | |
| EXPT_SEED = 2026 | |
| rng_expt = np.random.default_rng(EXPT_SEED) | |
| def secure_random_bytes(n: int) -> bytes: | |
| return secrets.token_bytes(n) | |
| def hkdf(ikm: bytes, salt: bytes, info: bytes, length: int = DERIVED_KEY_LEN) -> bytes: | |
| hk = HKDF(algorithm=hashes.SHA256(), length=length, salt=salt, info=info) | |
| return hk.derive(ikm) | |
| def hmac_sign(key: bytes, data: bytes) -> bytes: | |
| h = hmac.HMAC(key, hashes.SHA256()) | |
| h.update(data) | |
| return h.finalize() | |
| def hmac_verify(key: bytes, data: bytes, tag: bytes) -> bool: | |
| h = hmac.HMAC(key, hashes.SHA256()) | |
| h.update(data) | |
| try: | |
| h.verify(tag) | |
| return True | |
| except Exception: | |
| return False | |
| def make_bch(payload_len: int): | |
| bch = bchlib.BCH(BCH_POLY, BCH_T) | |
| dummy = bytes([0] * payload_len) | |
| _ = bch.encode(dummy) | |
| return bch | |
| def generate_spectrum(spectrum_len=SPECTRUM_LEN, peaks=6, rng=None): | |
| if rng is None: | |
| rng = np.random.default_rng() | |
| x = np.linspace(0, 1, spectrum_len) | |
| spec = np.zeros_like(x) | |
| for _ in range(peaks): | |
| c = rng.random() | |
| w = 0.01 + 0.06 * rng.random() | |
| a = 0.4 + 0.6 * rng.random() | |
| spec += a * np.exp(-0.5 * ((x - c) / w) ** 2) | |
| spec += 0.03 * rng.random(spectrum_len) | |
| maxv = np.max(spec) | |
| if maxv > 0: | |
| spec /= maxv | |
| return spec.astype(np.float32) | |
| def kmeans_pp_init(X, k, rng): | |
| n = X.shape[0] | |
| centers = np.empty((k, X.shape[1]), dtype=X.dtype) | |
| first = rng.integers(0, n) | |
| centers[0] = X[first] | |
| dist_sq = cdist(X, centers[0:1], metric='euclidean').ravel() ** 2 | |
| for i in range(1, k): | |
| probs = dist_sq / np.sum(dist_sq) | |
| idx = rng.choice(n, p=probs) | |
| centers[i] = X[idx] | |
| d = cdist(X, centers[i:i+1], metric='euclidean').ravel() ** 2 | |
| dist_sq = np.minimum(dist_sq, d) | |
| return centers | |
| def build_codebooks(samples: np.ndarray, blocks=BLOCKS, k=CODEBOOK_PER_BLOCK, iters=10): | |
| n, dim = samples.shape | |
| if dim % blocks != 0: | |
| raise ValueError("dimension must be divisible by blocks") | |
| block_len = dim // blocks | |
| codebooks = [] | |
| rng = rng_expt | |
| for b in range(blocks): | |
| start = b * block_len | |
| end = start + block_len | |
| bs = samples[:, start:end] | |
| centroids = kmeans_pp_init(bs, k, rng) | |
| for _ in range(iters): | |
| d = cdist(bs, centroids, metric='euclidean') | |
| labels = np.argmin(d, axis=1) | |
| for j in range(k): | |
| mem = bs[labels == j] | |
| if len(mem) > 0: | |
| centroids[j] = mem.mean(axis=0) | |
| else: | |
| centroids[j] = bs[rng.integers(0, n)] | |
| codebooks.append(centroids.astype(np.float32)) | |
| return codebooks | |
| def prepare_flat_codebooks(codebooks: List[np.ndarray]) -> Tuple[np.ndarray, np.ndarray]: | |
| blocks = len(codebooks) | |
| k = codebooks[0].shape[0] | |
| block_len = codebooks[0].shape[1] | |
| flat = np.empty((blocks, k, block_len), dtype=np.float32) | |
| for i, cb in enumerate(codebooks): | |
| flat[i, :cb.shape[0], :] = cb | |
| return flat | |
| @njit(parallel=True, fastmath=True) | |
| def batched_quantize(vectors: np.ndarray, flat_centroids: np.ndarray) -> np.ndarray: | |
| B, D = vectors.shape | |
| blocks, k, block_len = flat_centroids.shape | |
| out = np.empty((B, blocks), dtype=np.uint8) | |
| for bi in prange(B): | |
| vec = vectors[bi] | |
| for blk in range(blocks): | |
| start = blk * block_len | |
| best_j = 0 | |
| best_d = 0.0 | |
| for j in range(k): | |
| d = 0.0 | |
| for t in range(block_len): | |
| diff = vec[start + t] - flat_centroids[blk, j, t] | |
| d += diff * diff | |
| if j == 0 or d < best_d: | |
| best_d = d | |
| best_j = j | |
| out[bi, blk] = best_j | |
| return out | |
| def pack_indices(indices: np.ndarray) -> bytes: | |
| return indices.astype(np.uint8).tobytes() | |
| def make_helper_data(indices: np.ndarray, bch, hmac_key: bytes) -> Tuple[bytes, bytes]: | |
| payload = pack_indices(indices) | |
| ecc = bch.encode(payload) | |
| tag = hmac_sign(hmac_key, payload + ecc) | |
| return ecc, tag | |
| def reconcile(indices_noisy: np.ndarray, ecc: bytes, tag: bytes, bch, hmac_key: bytes) -> Tuple[np.ndarray, bool]: | |
| payload_noisy = pack_indices(indices_noisy) | |
| if not hmac_verify(hmac_key, payload_noisy + ecc, tag): | |
| return indices_noisy, False | |
| try: | |
| pkt = bytearray(payload_noisy) + bytearray(ecc) | |
| decoded, status = bch.decode(pkt) | |
| if status >= 0: | |
| corrected = np.frombuffer(bytes(decoded), dtype=np.uint8)[:indices_noisy.size].copy() | |
| return corrected, True | |
| else: | |
| return indices_noisy, False | |
| except Exception: | |
| return indices_noisy, False | |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
| @dataclass | |
| class HelperBlob: | |
| version: int | |
| flags: int | |
| salt: bytes | |
| iv: bytes | |
| payload: bytes | |
| ecc: bytes | |
| mac: bytes | |
| def to_bytes(self) -> bytes: | |
| parts = [] | |
| parts.append(bytes([self.version])) | |
| parts.append(bytes([self.flags])) | |
| parts.append(len(self.salt).to_bytes(1, 'big')) | |
| parts.append(self.salt) | |
| parts.append(len(self.iv).to_bytes(1, 'big')) | |
| parts.append(self.iv) | |
| parts.append(len(self.payload).to_bytes(2, 'big')) | |
| parts.append(self.payload) | |
| parts.append(len(self.ecc).to_bytes(2, 'big')) | |
| parts.append(self.ecc) | |
| parts.append(len(self.mac).to_bytes(2, 'big')) | |
| parts.append(self.mac) | |
| return b"".join(parts) | |
| @staticmethod | |
| def from_bytes(b: bytes): | |
| i = 0 | |
| version = b[i]; i += 1 | |
| flags = b[i]; i += 1 | |
| ls = b[i]; i += 1 | |
| salt = b[i:i+ls]; i += ls | |
| li = b[i]; i += 1 | |
| iv = b[i:i+li]; i += li | |
| lp = int.from_bytes(b[i:i+2], 'big'); i += 2 | |
| payload = b[i:i+lp]; i += lp | |
| le = int.from_bytes(b[i:i+2], 'big'); i += 2 | |
| ecc = b[i:i+le]; i += le | |
| lm = int.from_bytes(b[i:i+2], 'big'); i += 2 | |
| mac = b[i:i+lm]; i += lm | |
| return HelperBlob(version=version, flags=flags, salt=salt, iv=iv, payload=payload, ecc=ecc, mac=mac) | |
| class SecureSketch: | |
| def __init__(self, bch, mac_key: bytes, encrypt_key: bytes = None, hkdf_salt_len: int = 16, derive_len: int = DERIVED_KEY_LEN, version: int = 1): | |
| if mac_key is None or len(mac_key) < 16: | |
| raise ValueError("mac_key must be provided and greater or equal to 16 bytes") | |
| self.bch = bch | |
| self.mac_key = mac_key | |
| self.encrypt_key = encrypt_key | |
| self.hkdf_salt_len = hkdf_salt_len | |
| self.derive_len = derive_len | |
| self.version = version | |
| def _encrypt(self, plaintext: bytes, aad: bytes = b'') -> Tuple[bytes, bytes]: | |
| if self.encrypt_key is None: | |
| return plaintext, b'' | |
| iv = secure_random_bytes(12) | |
| algorithm = algorithms.AES(self.encrypt_key) | |
| cipher = Cipher(algorithm, modes.GCM(iv)) | |
| enc = cipher.encryptor() | |
| if aad: | |
| enc.authenticate_additional_data(aad) | |
| ct = enc.update(plaintext) + enc.finalize() | |
| tag = enc.tag | |
| return iv + ct, tag | |
| def _decrypt(self, iv_ct: bytes, tag: bytes, aad: bytes = b'') -> bytes: | |
| if self.encrypt_key is None: | |
| return iv_ct | |
| iv = iv_ct[:12] | |
| ct = iv_ct[12:] | |
| algorithm = algorithms.AES(self.encrypt_key) | |
| cipher = Cipher(algorithm, modes.GCM(iv, tag)) | |
| dec = cipher.decryptor() | |
| if aad: | |
| dec.authenticate_additional_data(aad) | |
| return dec.update(ct) + dec.finalize() | |
| def enroll(self, indices: np.ndarray, info: bytes = b'') -> bytes: | |
| if indices.ndim != 1: | |
| raise ValueError("enroll must expect a single index vector") | |
| payload = pack_indices(indices) | |
| salt = secure_random_bytes(self.hkdf_salt_len) | |
| flags = 0 | |
| ecc = self.bch.encode(payload) | |
| aad = b"SSv1" + salt | |
| iv_ct, tag_enc = self._encrypt(payload + ecc, aad=aad) | |
| if self.encrypt_key is not None: | |
| flags |= 0x1 | |
| stored_payload = iv_ct | |
| stored_ecc = tag_enc | |
| mac_input = stored_payload + stored_ecc + salt + bytes([flags, self.version]) | |
| else: | |
| stored_payload = payload | |
| stored_ecc = ecc | |
| mac_input = stored_payload + stored_ecc + salt + bytes([flags, self.version]) | |
| mac = hmac_sign(self.mac_key, mac_input) | |
| blob = HelperBlob(version=self.version, flags=flags, salt=salt, iv=b'' if self.encrypt_key is None else iv_ct[:12], payload=stored_payload, ecc=stored_ecc, mac=mac) | |
| return blob.to_bytes() | |
| def verify(self, indices_noisy: np.ndarray, helper_blob_bytes: bytes, info: bytes = b'') -> Tuple[bool, bytes]: | |
| blob = HelperBlob.from_bytes(helper_blob_bytes) | |
| flags = blob.flags | |
| mac_input = blob.payload + blob.ecc + blob.salt + bytes([flags, blob.version]) | |
| if not hmac_verify(self.mac_key, mac_input, blob.mac): | |
| return False, b'' | |
| if flags & 0x1: | |
| if self.encrypt_key is None: | |
| return False, b'' | |
| aad = b"SSv1" + blob.salt | |
| try: | |
| pt = self._decrypt(blob.payload, blob.ecc, aad=aad) | |
| except Exception: | |
| return False, b'' | |
| payload_len = indices_noisy.size | |
| payload = pt[:payload_len] | |
| ecc = pt[payload_len:] | |
| else: | |
| payload = blob.payload | |
| ecc = blob.ecc | |
| try: | |
| pkt = bytearray(payload) + bytearray(ecc) | |
| decoded, status = self.bch.decode(pkt) | |
| if status < 0: | |
| return False, b'' | |
| ref_indices = np.frombuffer(bytes(decoded), dtype=np.uint8)[:indices_noisy.size].copy() | |
| packed = pack_indices(ref_indices) | |
| salt = blob.salt | |
| prk = hkdf(packed, salt, info, length=self.derive_len) | |
| return True, prk | |
| except Exception: | |
| return False, b'' | |
| def enroll_blob_from_indices(self, indices: np.ndarray) -> bytes: | |
| return self.enroll(indices) | |
| def verify_blob_with_noisy(self, indices_noisy: np.ndarray, helper_blob_bytes: bytes) -> Tuple[bool, bytes]: | |
| return self.verify(indices_noisy, helper_blob_bytes) | |
| @dataclass | |
| class PacketResult: | |
| pkt_id: int | |
| success: bool | |
| success_blocks: int | |
| total_blocks: int | |
| GLOBAL = {} | |
| def worker_init(state): | |
| GLOBAL['flat_centroids'] = state['flat_centroids'] | |
| GLOBAL['bch'] = state['bch'] | |
| GLOBAL['hmac_key'] = state['hmac_key'] | |
| GLOBAL['session_salt'] = state['session_salt'] | |
| encrypt_key = state.get('encrypt_key', None) | |
| GLOBAL['secsketch'] = SecureSketch(state['bch'], state['hmac_key'], encrypt_key=encrypt_key) | |
| def worker_task_batch(args): | |
| tasks = args | |
| n = len(tasks) | |
| rngs = [np.random.default_rng(seed) for (_, seed) in tasks] | |
| bases = np.empty((n, SPECTRUM_LEN), dtype=np.float32) | |
| noised = np.empty((n, SPECTRUM_LEN), dtype=np.float32) | |
| for i, ((pkt_id, seed), rng) in enumerate(zip(tasks, rngs)): | |
| base = generate_spectrum(rng=rng) | |
| noisy = base + NOISE_STD * rng.standard_normal(base.shape).astype(np.float32) | |
| noisy = np.clip(noisy, 0.0, 1.0) | |
| bases[i] = base | |
| noised[i] = noisy | |
| flat = GLOBAL['flat_centroids'] | |
| idx_ref = batched_quantize(bases, flat) | |
| idx_noise = batched_quantize(noised, flat) | |
| results = [] | |
| bch = GLOBAL['bch'] | |
| hmac_key = GLOBAL['hmac_key'] | |
| secsketch = GLOBAL.get('secsketch', None) | |
| for i, (pkt_id, _) in enumerate(tasks): | |
| helper_blob = secsketch.enroll_blob_from_indices(idx_ref[i]) | |
| ok, derived_key = secsketch.verify_blob_with_noisy(idx_noise[i], helper_blob) | |
| success_blocks = 0 | |
| if ok: | |
| blob = HelperBlob.from_bytes(helper_blob) | |
| if blob.flags & 0x1: | |
| aad = b"SSv1" + blob.salt | |
| pt = secsketch._decrypt(blob.payload, blob.ecc, aad=aad) | |
| payload_len = idx_noise[i].size | |
| payload = pt[:payload_len] | |
| ecc = pt[payload_len:] | |
| else: | |
| payload = blob.payload | |
| ref_indices = np.frombuffer(payload, dtype=np.uint8)[:idx_noise[i].size].copy() | |
| success_blocks = int(np.sum(ref_indices == idx_noise[i])) | |
| else: | |
| success_blocks = int(np.sum(idx_ref[i] == idx_noise[i])) | |
| results.append(PacketResult(pkt_id=pkt_id, success=ok, success_blocks=success_blocks, total_blocks=idx_ref.shape[1])) | |
| return results | |
| def run_throughput(packets=PACKETS, batch_size=BATCH_SIZE, workers=NUM_WORKERS, use_encryption=False): | |
| samples = np.stack([generate_spectrum(rng=rng_expt) for _ in range(2000)], axis=0) | |
| codebooks = build_codebooks(samples, blocks=BLOCKS, k=CODEBOOK_PER_BLOCK, iters=12) | |
| flat_centroids = prepare_flat_codebooks(codebooks) | |
| bch = make_bch(BLOCKS) | |
| hmac_key = secure_random_bytes(HMAC_KEY_LEN) | |
| session_salt = secure_random_bytes(16) | |
| encrypt_key = secure_random_bytes(32) if use_encryption else None | |
| state = {'flat_centroids': flat_centroids, 'bch': bch, 'hmac_key': hmac_key, 'session_salt': session_salt, 'encrypt_key': encrypt_key} | |
| tasks = [(i, EXPT_SEED + i * 31) for i in range(packets)] | |
| batched = [tasks[i:i+batch_size] for i in range(0, len(tasks), batch_size)] | |
| ctx = mp.get_context('spawn') | |
| t0 = time.time() | |
| results_all = [] | |
| with ctx.Pool(processes=workers, initializer=worker_init, initargs=(state,)) as pool: | |
| for batch in batched: | |
| res = pool.apply(worker_task_batch, (batch,)) | |
| for r in res: | |
| results_all.append(r) | |
| dt = time.time() - t0 | |
| avg_blocks = sum(r.success_blocks for r in results_all) / len(results_all) | |
| full_success = sum(1 for r in results_all if r.success and r.success_blocks == BLOCKS) / len(results_all) | |
| print(f"Packets={len(results_all)} Time={dt:.2f}s Throughput={len(results_all)/dt:.1f} pkt/s AvgBlocks={avg_blocks:.2f} FullSuccess={full_success:.3f}") | |
| def frr_far_sweep(noise_levels=[0.02,0.05,0.08,0.12], trials=512, use_encryption=False): | |
| samples = np.stack([generate_spectrum(rng=rng_expt) for _ in range(3000)], axis=0) | |
| codebooks = build_codebooks(samples, blocks=BLOCKS, k=CODEBOOK_PER_BLOCK, iters=12) | |
| flat_centroids = prepare_flat_codebooks(codebooks) | |
| bch = make_bch(BLOCKS) | |
| hmac_key = secure_random_bytes(HMAC_KEY_LEN) | |
| encrypt_key = secure_random_bytes(32) if use_encryption else None | |
| secsketch = SecureSketch(bch, hmac_key, encrypt_key=encrypt_key) | |
| def quant_and_recon_pair(base, noise_std, rng): | |
| noisy = base + noise_std * rng.standard_normal(base.shape).astype(np.float32) | |
| noisy = np.clip(noisy, 0.0, 1.0) | |
| idx_ref = batched_quantize(base.reshape(1, -1), flat_centroids)[0] | |
| idx_noisy = batched_quantize(noisy.reshape(1, -1), flat_centroids)[0] | |
| helper = secsketch.enroll_blob_from_indices(idx_ref) | |
| ok, key = secsketch.verify_blob_with_noisy(idx_noisy, helper) | |
| return ok, int(np.sum(idx_ref == idx_noisy)) | |
| rng = np.random.default_rng(EXPT_SEED) | |
| for ns in noise_levels: | |
| succ = 0 | |
| full = 0 | |
| for _ in range(trials): | |
| base = generate_spectrum(rng=rng) | |
| ok, sb = quant_and_recon_pair(base, ns, rng) | |
| if ok and sb == BLOCKS: | |
| succ += 1 | |
| if sb == BLOCKS: | |
| full += 1 | |
| frr = 1.0 - (succ / trials) | |
| false_accepts = 0 | |
| for _ in range(trials): | |
| ref = generate_spectrum(rng=rng) | |
| other = generate_spectrum(rng=rng) | |
| idx_ref = batched_quantize(ref.reshape(1, -1), flat_centroids)[0] | |
| helper = secsketch.enroll_blob_from_indices(idx_ref) | |
| idx_other = batched_quantize(other.reshape(1, -1), flat_centroids)[0] | |
| ok, key = secsketch.verify_blob_with_noisy(idx_other, helper) | |
| if ok and np.sum(idx_other == idx_ref) == BLOCKS: | |
| false_accepts += 1 | |
| far = false_accepts / trials | |
| print(f"noise={ns:.3f} FRR={frr:.3f} FAR={far:.6f}") |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
psv2026C; includes the AES slice thru error-chain char graphing yet specific BCH builds not shown