Skip to content

Instantly share code, notes, and snippets.

@adiroiban
Created July 17, 2025 10:30
Show Gist options
  • Select an option

  • Save adiroiban/18dcb49ee3ed755c003c74a4eb85d3a4 to your computer and use it in GitHub Desktop.

Select an option

Save adiroiban/18dcb49ee3ed755c003c74a4eb85d3a4 to your computer and use it in GitHub Desktop.
"""
Code for handling ZIP archives.
Inspired by pyzipper. MIT licence
"""
import struct
import zipfile
import zlib
import time
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hmac, hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
# Compression type.
AES_COMPRESSION_TYPE = 99
# The id for the extra data.
EXTRA_AES_HEADER_ID = 0x9901
AES_VENDOR_ID = b'AE'
AES_V1 = b'\x01\x00'
AES_V2 = b'\x02\x00'
AES_128 = b'\x01'
AES_192 = b'\x02'
AES_256 = b'\x03'
AES_HMAC_SIZE = 10
AES_SALT_LENGTHS = {
AES_128: 8, # 128 bit
AES_192: 12, # 192 bit
AES_256: 16, # 256 bit
}
AES_KEY_LENGTHS = {
AES_128: 16, # 128 bit
AES_192: 24, # 192 bit
AES_256: 32, # 256 bit
}
def _get_encryption_header_length(zinfo):
# salt_length + pwd_verify_length
salt_length = AES_SALT_LENGTHS[zinfo._aes_strength]
return salt_length + 2
class ZipInfoWithAES(zipfile.ZipInfo):
__slots__ = (
'_aes_version',
'_aes_strength',
'_aes_compression',
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._aes_strength = None
self._aes_version = None
self._aes_compression = None
def _decodeExtra(self, filename_crc):
"""
Little endian encoding.
Fragment has minimum 11 bytes:
* 2 bytes - extra fragment type
* 2 bytes - extra data length
* 2 bytes - AES encryption version
* 2 bytes - Vendor always b'AE"
* 1 byte - AES strength
* 2 bytes - compression type
"""
# Do the upstream decoding first.
# This should validate each fragment.
super()._decodeExtra(filename_crc)
# Re-read the extra fragments to check for AES.
extra = self.extra
while len(extra) >= 4:
tp, ln = struct.unpack('<HH', extra[:4])
if tp != EXTRA_AES_HEADER_ID:
# Not AES.
extra = extra[ln+4:]
continue
if ln < 7 or len(extra) < 11:
# We validate both the actual data
# and the advertised length.
raise zipfile.BadZipFile("Short AES extra data.")
vendor = extra[6:8]
if vendor != AES_VENDOR_ID:
raise zipfile.BadZipFile("Unknown AES vendor.")
vendor_version = extra[4:6]
if vendor_version == AES_V1:
self._aes_version = AES_V1
elif vendor_version == AES_V2:
self._aes_version = AES_V2
else:
raise zipfile.BadZipFile("Unknown AES version.")
aes_strength = extra[8:9]
if aes_strength == AES_128:
self._aes_strength = AES_128
elif aes_strength == AES_192:
self._aes_strength = AES_192
elif aes_strength == AES_256:
self._aes_strength = AES_256
else:
raise zipfile.BadZipFile("Unknown AES strength.")
compression_method = struct.unpack('<H', extra[9:11])[0]
self._aes_compression = compression_method
# Don't look for other extension fragments.
return
class ZipExtFileWithAES(zipfile.ZipExtFile):
def __init__(self, fileobj, mode, zipinfo, pwd=None,
close_fileobj=False):
have_aes = False
if zipinfo.compress_type == AES_COMPRESSION_TYPE:
have_aes = True
zipinfo.compress_type = zipinfo._aes_compression
self._zipinfo = zipinfo
super().__init__(fileobj, mode, zipinfo, pwd, close_fileobj)
if zipinfo._aes_version == AES_V2:
# CRC is not used for v2.
# Only the HMAC is used.
self._expected_crc = None
if have_aes:
zipinfo.compress_type = AES_COMPRESSION_TYPE
def _init_decrypter(self):
"""
This is upstream method called from `__init__`.
"""
if self._zipinfo._aes_version:
return self._init_aes_decrypter()
# Use upstream code.
return super()._init_decrypter()
def _init_aes_decrypter(self):
if not self._pwd:
raise zipfile.BadZipFile(
'File is AES encrypted and requires a password.')
header_length = _get_encryption_header_length(self._zipinfo)
header = self._fileobj.read(header_length)
# Adjust read size for encrypted files since the start of the file
# may be used for the encryption/password information.
self._compress_left -= header_length
# Also remove the hmac length from the end of the file.
self._compress_left -= AES_HMAC_SIZE
self._decrypter = AESZipDecrypter(self._zipinfo, self._pwd, header)
if self._zipinfo._aes_version == AES_V2:
# The CRC check is not used for v2.
# This is done to prevent disclosure of data for very small files.
return 0
return self._decrypter(header)[11]
class AESZipDecrypter:
hmac_size = 10
def __init__(self, zinfo, pwd, encryption_header):
self.filename = zinfo.filename
key_length = AES_KEY_LENGTHS[zinfo._aes_strength]
salt_length = AES_SALT_LENGTHS[zinfo._aes_strength]
salt = struct.unpack(
"<{}s".format(salt_length),
encryption_header[:salt_length]
)[0]
pwd_verify_length = 2
pwd_verify = encryption_header[salt_length:]
dkLen = 2 * key_length + pwd_verify_length
kdf = PBKDF2HMAC(
algorithm=hashes.SHA1(),
length=dkLen,
salt=salt,
iterations=1000,
)
keymaterial = kdf.derive(pwd)
encpwdverify = keymaterial[2 * key_length:]
if encpwdverify != pwd_verify:
raise RuntimeError("Bad password for file %r" % zinfo.filename)
self._enckey = keymaterial[:key_length]
self._counter = 0
encmac_key = keymaterial[key_length:2 * key_length]
self.hmac = hmac.HMAC(encmac_key, hashes.SHA1())
def __call__(self, data):
"""
This is the main public API.
"""
return b''.join(self._decrypt(self._getBlocks(data)))
def check_hmac(self, hmac_check):
if self.hmac.finalize()[:10] != hmac_check:
raise zipfile.BadZipFile(
f"Bad HMAC check for file {self.filename}")
def _decrypt(self, blocks):
for block in blocks:
self._counter += 1
cipher = Cipher(
algorithms.AES(self._enckey),
modes.CTR((self._counter).to_bytes(16, byteorder='little')),
)
self.hmac.update(block)
data = cipher.decryptor().update(block)
data += cipher.decryptor().finalize()
yield data
def _getBlocks(self, original):
"""
Return AES blocks.
"""
for i in range(0, len(original), 16):
yield original[i:i+16]
class ZipFileWithAES(zipfile.ZipFile):
"""
ZipFile which can also handle AES encrypted files.
"""
_ZipInfo = ZipInfoWithAES
_ZipExtFile = ZipExtFileWithAES
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment