Created
March 6, 2026 05:06
-
-
Save motebaya/8c01feac16681512f69bf6aeff85f977 to your computer and use it in GitHub Desktop.
python CLI logic for github.com/motebaya/vaulten-mobile encryption.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # python logic to decrypt/encrypt vaulten containers and credentials | |
| # resources: github.com/motebaya/vaulten-mobile | |
| # date: 2026-03-06 11:42 AM | |
| import argparse | |
| import base64 | |
| import json | |
| import os | |
| import zipfile | |
| import struct | |
| from typing import Dict, Any, List | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
| from cryptography.hazmat.primitives import hashes | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| MAGIC: bytes = b"VLT1" | |
| VERSION: int = 1 | |
| KDF_TYPE: int = 1 | |
| SALT_SIZE: int = 32 | |
| NONCE_SIZE: int = 12 | |
| KEY_SIZE: int = 32 | |
| ITERATIONS: int = 310000 | |
| ENCRYPTED_FIELDS: List[str] = [ | |
| "encryptedPassword", | |
| "encryptedNotes", | |
| "encryptedBackupEmail", | |
| "encryptedPhoneNumber", | |
| "encryptedRecoveryCodes", | |
| "encryptedPrivateKey", | |
| "encryptedSeedPhrase" | |
| ] | |
| def derive_key(passphrase: str, salt: bytes) -> bytes: | |
| """ | |
| Derive a KEK (Key Encryption Key) from a passphrase. | |
| Uses PBKDF2-HMAC-SHA256 with 310,000 iterations to match | |
| the Vaulten Android implementation. | |
| :param passphrase: User master passphrase | |
| :type passphrase: str | |
| :param salt: 32 byte random salt | |
| :type salt: bytes | |
| :return: Derived 32 byte encryption key | |
| :rtype: bytes | |
| """ | |
| kdf: PBKDF2HMAC = PBKDF2HMAC( | |
| algorithm=hashes.SHA256(), | |
| length=KEY_SIZE, | |
| salt=salt, | |
| iterations=ITERATIONS | |
| ) | |
| return kdf.derive(passphrase.encode()) | |
| def decrypt_container(data: bytes, passphrase: str) -> bytes: | |
| """ | |
| Decrypt a Vaulten container (vault.enc). | |
| Container layout:: | |
| magic (4) | |
| version (1) | |
| kdf type (1) | |
| salt (32) | |
| nonce (12) | |
| ciphertext length (4) | |
| ciphertext + tag (N) | |
| :param data: Raw container bytes | |
| :type data: bytes | |
| :param passphrase: User passphrase | |
| :type passphrase: str | |
| :return: Decrypted plaintext JSON | |
| :rtype: bytes | |
| """ | |
| pos: int = 0 | |
| if data[:4] != MAGIC: | |
| raise ValueError("Invalid container magic") | |
| pos += 4 | |
| version: int = data[pos] | |
| pos += 1 | |
| kdf: int = data[pos] | |
| pos += 1 | |
| if version != VERSION or kdf != KDF_TYPE: | |
| raise ValueError("Unsupported vault format") | |
| salt: bytes = data[pos:pos + SALT_SIZE] | |
| pos += SALT_SIZE | |
| nonce: bytes = data[pos:pos + NONCE_SIZE] | |
| pos += NONCE_SIZE | |
| clen: int = struct.unpack(">I", data[pos:pos + 4])[0] | |
| pos += 4 | |
| ciphertext: bytes = data[pos:pos + clen] | |
| key: bytes = derive_key(passphrase, salt) | |
| aes: AESGCM = AESGCM(key) | |
| return aes.decrypt(nonce, ciphertext, None) | |
| def encrypt_container(data: bytes, passphrase: str) -> bytes: | |
| """ | |
| Encrypt plaintext JSON into Vaulten container format. | |
| :param data: Plaintext JSON bytes | |
| :type data: bytes | |
| :param passphrase: User passphrase | |
| :type passphrase: str | |
| :return: vault.enc container bytes | |
| :rtype: bytes | |
| """ | |
| salt: bytes = os.urandom(SALT_SIZE) | |
| nonce: bytes = os.urandom(NONCE_SIZE) | |
| key: bytes = derive_key(passphrase, salt) | |
| aes: AESGCM = AESGCM(key) | |
| ciphertext: bytes = aes.encrypt(nonce, data, None) | |
| header: bytearray = bytearray() | |
| header += MAGIC | |
| header += bytes([VERSION]) | |
| header += bytes([KDF_TYPE]) | |
| header += salt | |
| header += nonce | |
| header += struct.pack(">I", len(ciphertext)) | |
| return bytes(header) + ciphertext | |
| def decrypt_field(value: str, dek: bytes) -> str: | |
| """ | |
| Decrypt a credential field encrypted with DEK. | |
| Encrypted format:: | |
| base64(nonce + ciphertext + tag) | |
| :param value: Base64 encoded encrypted value | |
| :type value: str | |
| :param dek: Data Encryption Key | |
| :type dek: bytes | |
| :return: Decrypted plaintext string | |
| :rtype: str | |
| """ | |
| raw: bytes = base64.b64decode(value) | |
| nonce: bytes = raw[:NONCE_SIZE] | |
| ciphertext: bytes = raw[NONCE_SIZE:] | |
| aes: AESGCM = AESGCM(dek) | |
| plaintext: bytes = aes.decrypt(nonce, ciphertext, None) | |
| return plaintext.decode() | |
| def encrypt_field(value: str, dek: bytes) -> str: | |
| """ | |
| Encrypt a credential field using DEK. | |
| :param value: Plaintext value | |
| :type value: str | |
| :param dek: Data Encryption Key | |
| :type dek: bytes | |
| :return: Base64 encoded encrypted value | |
| :rtype: str | |
| """ | |
| nonce: bytes = os.urandom(NONCE_SIZE) | |
| aes: AESGCM = AESGCM(dek) | |
| ciphertext: bytes = aes.encrypt(nonce, value.encode(), None) | |
| return base64.b64encode(nonce + ciphertext).decode() | |
| def decrypt_credentials(json_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Decrypt all credential fields using DEK. | |
| :param json_data: Parsed vault JSON | |
| :type json_data: dict | |
| :return: Updated JSON with decrypted fields | |
| :rtype: dict | |
| """ | |
| dek: bytes = base64.b64decode(json_data["dek"]) | |
| for cred in json_data.get("credentials", []): | |
| for field in ENCRYPTED_FIELDS: | |
| if field in cred and cred[field]: | |
| try: | |
| cred[field] = decrypt_field(cred[field], dek) | |
| except Exception: | |
| pass | |
| return json_data | |
| def encrypt_credentials(json_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Encrypt credential fields using DEK. | |
| :param json_data: Parsed vault JSON | |
| :type json_data: dict | |
| :return: JSON with encrypted fields | |
| :rtype: dict | |
| """ | |
| dek: bytes = base64.b64decode(json_data["dek"]) | |
| for cred in json_data.get("credentials", []): | |
| for field in ENCRYPTED_FIELDS: | |
| if field in cred and cred[field]: | |
| try: | |
| cred[field] = encrypt_field(cred[field], dek) | |
| except Exception: | |
| pass | |
| return json_data | |
| def decrypt_file(inp: str, out: str, passphrase: str) -> None: | |
| """ | |
| Decrypt vault.enc file into JSON. | |
| :param inp: Input vault.enc file | |
| :type inp: str | |
| :param out: Output JSON file | |
| :type out: str | |
| :param passphrase: Master passphrase | |
| :type passphrase: str | |
| """ | |
| data: bytes = open(inp, "rb").read() | |
| plaintext: bytes = decrypt_container(data, passphrase) | |
| obj: Dict[str, Any] = json.loads(plaintext) | |
| with open(out, "w") as f: | |
| json.dump(obj, f, indent=2) | |
| print("container decrypted ->", out) | |
| def encrypt_file(inp: str, out: str, passphrase: str) -> None: | |
| """ | |
| Encrypt JSON file into vault.enc container. | |
| :param inp: Input JSON file | |
| :type inp: str | |
| :param out: Output vault.enc file | |
| :type out: str | |
| :param passphrase: Master passphrase | |
| :type passphrase: str | |
| """ | |
| data: bytes = open(inp, "rb").read() | |
| container: bytes = encrypt_container(data, passphrase) | |
| with open(out, "wb") as f: | |
| f.write(container) | |
| print("container encrypted ->", out) | |
| def cred_decrypt(file: str) -> None: | |
| """ | |
| Decrypt credential fields inside JSON. | |
| :param file: JSON vault file | |
| :type file: str | |
| """ | |
| obj: Dict[str, Any] = json.load(open(file)) | |
| obj = decrypt_credentials(obj) | |
| with open(file, "w") as f: | |
| json.dump(obj, f, indent=2) | |
| print("credentials decrypted") | |
| def cred_encrypt(file: str) -> None: | |
| """ | |
| Encrypt credential fields inside JSON. | |
| :param file: JSON vault file | |
| :type file: str | |
| """ | |
| obj: Dict[str, Any] = json.load(open(file)) | |
| obj = encrypt_credentials(obj) | |
| with open(file, "w") as f: | |
| json.dump(obj, f, indent=2) | |
| print("credentials encrypted") | |
| def pack_backup() -> None: | |
| """ | |
| Pack vault.enc and metadata.json into a backup zip archive. | |
| The archive name follows format:: | |
| vault-backup_YYYY-MM-DD_HH-MM-SS.zip | |
| If metadata.json does not exist, it will be generated automatically | |
| by inspecting the decrypted vault JSON. | |
| :return: None | |
| """ | |
| vault_path: Path = Path("vault.enc") | |
| metadata_path: Path = Path("metadata.json") | |
| if not vault_path.exists(): | |
| raise FileNotFoundError("vault.enc not found") | |
| if not metadata_path.exists(): | |
| print("metadata.json not found, generating...") | |
| json_candidates = list(Path(".").glob("*.json")) | |
| vault_json: Dict[str, Any] | None = None | |
| for file in json_candidates: | |
| if file.name == "metadata.json": | |
| continue | |
| try: | |
| vault_json = json.load(open(file)) | |
| if "credentials" in vault_json: | |
| break | |
| except Exception: | |
| continue | |
| if vault_json is None: | |
| raise RuntimeError( | |
| "Cannot generate metadata.json (vault JSON not found)" | |
| ) | |
| credential_count: int = len(vault_json.get("credentials", [])) | |
| platform_count: int = len(vault_json.get("platforms", [])) | |
| metadata: Dict[str, Any] = { | |
| "version": 1, | |
| "exportedAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), | |
| "format": "VLT1", | |
| "stats": { | |
| "credentialCount": credential_count, | |
| "platformCount": platform_count | |
| } | |
| } | |
| with open(metadata_path, "w") as f: | |
| json.dump(metadata, f, indent=2) | |
| print("metadata.json generated") | |
| # create backup filename | |
| now: datetime = datetime.now() | |
| name: str = now.strftime("vault-backup_%Y-%m-%d_%H-%M-%S.zip") | |
| print("creating backup:", name) | |
| with zipfile.ZipFile(name, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| z.write(vault_path, "vault.enc") | |
| z.write(metadata_path, "metadata.json") | |
| print("backup created ->", name) | |
| def main() -> None: | |
| """ | |
| CLI entrypoint for Vaulten tool. | |
| """ | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--dec", help="Decrypt vault.enc to JSON", metavar="") | |
| parser.add_argument("--enc", help="Encrypt JSON to vault.enc", metavar="") | |
| parser.add_argument("--cred-dec", help="Decrypt credential fields in JSON", metavar="") | |
| parser.add_argument("--cred-enc", help="Encrypt credential fields in JSON", metavar="") | |
| parser.add_argument("output", nargs="?", help="Output file") | |
| parser.add_argument("-p", "--passphrase", help="Master passphrase for container encryption/decryption", metavar="") | |
| parser.add_argument("--pack", action="store_true", help="pack vault.enc and metadata into zip") | |
| args = parser.parse_args() | |
| if args.dec and args.passphrase: | |
| decrypt_file(args.dec, args.output, args.passphrase) | |
| elif args.enc and args.passphrase: | |
| encrypt_file(args.enc, args.output, args.passphrase) | |
| elif args.cred_dec: | |
| cred_decrypt(args.cred_dec) | |
| elif args.cred_enc: | |
| cred_encrypt(args.cred_enc) | |
| elif args.pack: | |
| pack_backup() | |
| else: | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment