-
-
Save SpotlightKid/53e1eb408267315de620 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """Encrypt/decrypt files with symmetric AES cipher-block chaining (CBC) mode. | |
| Usage: | |
| File Encryption: | |
| aescrypt.py [-f] infile [outfile] | |
| File decryption: | |
| aescrypt.py -d [-f] infile [outfile] | |
| This script is derived from an answer to this StackOverflow question: | |
| http://stackoverflow.com/questions/16761458/ | |
| I changed the key derivation function to use PBKDF2. | |
| """ | |
| from __future__ import print_function, unicode_literals | |
| __all__ = ('encrypt', 'decrypt') | |
| import argparse | |
| import os | |
| import struct | |
| import sys | |
| from getpass import getpass | |
| from os.path import exists, splitext | |
| from Crypto.Cipher import AES | |
| from Crypto.Hash import SHA256 | |
| from pbkdf2 import PBKDF2 | |
| SALT_MARKER = b'$' | |
| ITERATIONS = 1000 | |
| def encrypt(infile, outfile, password, key_size=32, salt_marker=SALT_MARKER, | |
| kdf_iterations=ITERATIONS, hashmod=SHA256): | |
| """Encrypt infile and write it to outfile using password to generate key. | |
| The encryption algorithm used is symmetric AES in cipher-block chaining | |
| (CBC) mode. | |
| ``key_size`` may be 16, 24 or 32 (default). | |
| The key is derived via the PBKDF2 key derivation function (KDF) from the | |
| password and a random salt of 16 bytes (the AES block size) minus the | |
| length of the salt header (see below). | |
| The hash function used by PBKDF2 is SHA256 per default. You can pass a | |
| different hash function module via the ``hashmod`` argument. The module | |
| must adhere to the Python API for Cryptographic Hash Functions (PEP 247). | |
| PBKDF2 uses a number of iterations of the hash function to derive the key, | |
| which can be set via the ``kdf_iterations` keyword argumeent. The default | |
| number is 1000 and the maximum 65535. | |
| The header and the salt are written to the first block of the encrypted | |
| file. The header consist of the number of KDF iterations encoded as a | |
| big-endian word bytes wrapped by ``salt_marker`` on both sides. With the | |
| default value of ``salt_marker = b'$'``, the header size is thus 4 and the | |
| salt 12 bytes. The salt marker must be a byte string of 1-6 bytes length. | |
| The last block of the encrypted file is padded with up to 16 bytes, all | |
| having the value of the length of the padding. | |
| """ | |
| if not 1 <= len(salt_marker) <= 6: | |
| raise ValueError('The salt_marker must be one to six bytes long.') | |
| elif not isinstance(salt_marker, bytes): | |
| raise TypeError('salt_marker must be a bytes instance.') | |
| if kdf_iterations >= 65536: | |
| raise ValueError('kdf_iterations must be <= 65535.') | |
| bs = AES.block_size | |
| header = salt_marker + struct.pack('>H', kdf_iterations) + salt_marker | |
| salt = os.urandom(bs - len(header)) | |
| kdf = PBKDF2(password, salt, min(kdf_iterations, 65535), hashmod) | |
| key = kdf.read(key_size) | |
| iv = os.urandom(bs) | |
| cipher = AES.new(key, AES.MODE_CBC, iv) | |
| outfile.write(header + salt) | |
| outfile.write(iv) | |
| finished = False | |
| while not finished: | |
| chunk = infile.read(1024 * bs) | |
| if len(chunk) == 0 or len(chunk) % bs != 0: | |
| padding_length = (bs - len(chunk) % bs) or bs | |
| chunk += (padding_length * chr(padding_length)).encode() | |
| finished = True | |
| outfile.write(cipher.encrypt(chunk)) | |
| def decrypt(infile, outfile, password, key_size=32, salt_marker=SALT_MARKER, | |
| hashmod=SHA256): | |
| """Decrypt infile and write it to outfile using password to derive key. | |
| See `encrypt` for documentation of the encryption algorithm and parameters. | |
| """ | |
| mlen = len(salt_marker) | |
| hlen = mlen * 2 + 2 | |
| if not 1 <= mlen <= 6: | |
| raise ValueError('The salt_marker must be one to six bytes long.') | |
| elif not isinstance(salt_marker, bytes): | |
| raise TypeError('salt_marker must be a bytes instance.') | |
| bs = AES.block_size | |
| salt = infile.read(bs) | |
| if salt[:mlen] == salt_marker and salt[mlen + 2:hlen] == salt_marker: | |
| kdf_iterations = struct.unpack('>H', salt[mlen:mlen + 2])[0] | |
| salt = salt[hlen:] | |
| else: | |
| kdf_iterations = ITERATIONS | |
| if kdf_iterations >= 65536: | |
| raise ValueError('kdf_iterations must be <= 65535.') | |
| iv = infile.read(bs) | |
| kdf = PBKDF2(password, salt, kdf_iterations, hashmod) | |
| key = kdf.read(key_size) | |
| cipher = AES.new(key, AES.MODE_CBC, iv) | |
| next_chunk = b'' | |
| finished = False | |
| while not finished: | |
| chunk, next_chunk = next_chunk, cipher.decrypt(infile.read(1024 * bs)) | |
| if not next_chunk: | |
| padlen = chunk[-1] | |
| if isinstance(padlen, str): | |
| padlen = ord(padlen) | |
| padding = padlen * chr(padlen) | |
| else: | |
| padding = (padlen * chr(chunk[-1])).encode() | |
| if padlen < 1 or padlen > bs: | |
| raise ValueError("bad decrypt pad (%d)" % padlen) | |
| # all the pad-bytes must be the same | |
| if chunk[-padlen:] != padding: | |
| # this is similar to the bad decrypt:evp_enc.c | |
| # from openssl program | |
| raise ValueError("bad decrypt") | |
| chunk = chunk[:-padlen] | |
| finished = True | |
| outfile.write(chunk) | |
| def main(args=None): | |
| ap = argparse.ArgumentParser(description=__doc__.splitlines()[0]) | |
| ap.add_argument('-d', '--decrypt', action="store_true", | |
| help="Decrypt input file") | |
| ap.add_argument('-f', '--force', action="store_true", | |
| help="Overwrite output file if it exists") | |
| ap.add_argument('infile', help="Input file") | |
| ap.add_argument('outfile', nargs='?', help="Output file") | |
| args = ap.parse_args(args if args is not None else sys.argv[1:]) | |
| if not args.outfile: | |
| if args.decrypt: | |
| args.outfile = splitext(args.infile)[0] | |
| else: | |
| args.outfile = args.infile + '.enc' | |
| if args.outfile == args.infile: | |
| print("Input and output file must not be the same.") | |
| return 1 | |
| if exists(args.outfile) and not args.force: | |
| print("Output file '%s' exists. " | |
| "Use option -f to override." % args.outfile) | |
| return 1 | |
| with open(args.infile, 'rb') as infile, \ | |
| open(args.outfile, 'wb') as outfile: | |
| if args.decrypt: | |
| decrypt(infile, outfile, getpass("Enter decryption password: ")) | |
| else: | |
| try: | |
| while True: | |
| passwd = getpass("Enter encryption password: ") | |
| passwd2 = getpass("Verify password: ") | |
| if passwd != passwd2: | |
| print("Password mismatch!") | |
| else: | |
| break | |
| except (EOFError, KeyboardInterrupt): | |
| return 1 | |
| encrypt(infile, outfile, passwd) | |
| return 0 | |
| if __name__ == '__main__': | |
| sys.exit(main(sys.argv[1:]) or 0) |
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """Test suite for aescrypt.py.""" | |
| from __future__ import print_function, unicode_literals | |
| from io import BytesIO | |
| from aescrypt import encrypt, decrypt | |
| from Crypto.Cipher import AES | |
| from nose.tools import raises | |
| password = 'q1w2e3r4' | |
| plaintext = """\ | |
| Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque at euismod | |
| tortor, quis finibus mauris. Suspendisse dui augue, hendrerit at porttitor | |
| viverra, pulvinar ut velit. Quisque facilisis felis sed felis vestibulum, sit | |
| amet varius est vulputate. Curabitur venenatis dapibus risus, a molestie magna | |
| lobortis et. Donec a nulla in ligula sagittis dapibus et quis velit. Curabitur | |
| tincidunt faucibus lorem in viverra. Sed diam diam, suscipit sit amet quam nec, | |
| cursus sollicitudin est. Vestibulum condimentum gravida sem eget tincidunt. | |
| Nulla tincidunt massa in consectetur blandit. Ut sed nunc sed neque posuere | |
| porttitor. Fusce et libero pretium, facilisis ante eget, fermentum enim. Sed | |
| dignissim libero quis ultricies iaculis. Nunc eu lobortis tellus. Nam et cursus | |
| ligula. Sed vitae consequat nisl. Cras tempor nisl non metus commodo, vitae | |
| scelerisque neque congue. | |
| """ | |
| infn = 'test_input.txt' | |
| encfn = 'test_input.txt.enc' | |
| outfn = 'test_output.txt' | |
| def test_roundtrip(): | |
| """AES file encryption/decryption roundtrip produces identical files.""" | |
| with open(infn, 'rb') as infile, open(encfn, 'wb') as outfile: | |
| encrypt(infile, outfile, password) | |
| with open(encfn, 'rb') as infile, open(outfn, 'wb') as outfile: | |
| decrypt(infile, outfile, password) | |
| with open(infn, 'rb') as original, open(outfn, 'rb') as copy: | |
| assert original.read() == copy.read() | |
| @raises(ValueError) | |
| def test_bad_decrypt(): | |
| """Trying to decrypt invalid input raises ValueError.""" | |
| with BytesIO(plaintext[:256].encode()) as infile, BytesIO() as outfile: | |
| decrypt(infile, outfile, password) | |
| def test_key_size(): | |
| """Key sizes of 128, 192 and 256 bit produce valid ciphertexts.""" | |
| infile = BytesIO(plaintext.encode()) | |
| for key_size in AES.key_size: | |
| cipherfile = BytesIO() | |
| encrypt(infile, cipherfile, password, key_size=key_size) | |
| infile.seek(0) | |
| ciphertext = cipherfile.getvalue() | |
| assert len(ciphertext) % 16 == 0 | |
| cipherfile.seek(0) | |
| outfile = BytesIO() | |
| decrypt(cipherfile, outfile, password, key_size=key_size) | |
| decrypted = outfile.getvalue().decode('utf-8') | |
| assert decrypted == plaintext | |
| def test_salt_marker(): | |
| """Setting the salt marker produces valid header.""" | |
| marker = b'test' | |
| infile = BytesIO(plaintext.encode()) | |
| cipherfile = BytesIO() | |
| encrypt(infile, cipherfile, password, salt_marker=marker) | |
| ciphertext = cipherfile.getvalue() | |
| assert ciphertext[:4] == marker and ciphertext[6:10] == marker | |
| @raises(ValueError) | |
| def test_salt_marker_empty(): | |
| """Passing empty salt marker raises ValueError.""" | |
| marker = b'' | |
| infile = BytesIO(plaintext.encode()) | |
| cipherfile = BytesIO() | |
| encrypt(infile, cipherfile, password, salt_marker=marker) | |
| @raises(ValueError) | |
| def test_salt_marker_toolong(): | |
| """Passing too long salt marker raises ValueError.""" | |
| marker = b'iamlong' | |
| infile = BytesIO(plaintext.encode()) | |
| cipherfile = BytesIO() | |
| encrypt(infile, cipherfile, password, salt_marker=marker) | |
| @raises(TypeError) | |
| def test_salt_marker_notbytes(): | |
| """Passing not bytes-type salt marker raises TypeError.""" | |
| marker = '$' | |
| infile = BytesIO(plaintext.encode()) | |
| cipherfile = BytesIO() | |
| encrypt(infile, cipherfile, password, salt_marker=marker) | |
| def test_kdf_iterations(): | |
| """Passed kdf_iterations are set correctly in header.""" | |
| infile = BytesIO(plaintext.encode()) | |
| cipherfile = BytesIO() | |
| encrypt(infile, cipherfile, password, kdf_iterations=1000) | |
| assert cipherfile.getvalue()[1:3] == b'\x03\xe8' | |
| @raises(ValueError) | |
| def test_kdf_iterations_tolow(): | |
| """Setting kdf_iterations too low raises ValueError.""" | |
| infile = BytesIO(plaintext.encode()) | |
| cipherfile = BytesIO() | |
| encrypt(infile, cipherfile, password, kdf_iterations=0) | |
| @raises(ValueError) | |
| def test_kdf_iterations_tohigh(): | |
| """Setting kdf_iterations too high raises ValueError.""" | |
| infile = BytesIO(plaintext.encode()) | |
| cipherfile = BytesIO() | |
| encrypt(infile, cipherfile, password, kdf_iterations=65536) |
You should always add padding not only if its needed.
EDIT:
Oh I see, what happend, when I rewrote while to for loop I made a mistake.
in encrypt function:
pads = False
for chunk in iter(lambda: infile.read(1024 * bs), b''):
if len(chunk) == 0 or len(chunk) % bs != 0:
padding_length = (bs - len(chunk) % bs) or bs
chunk += (padding_length * chr(padding_length)).encode()
pads = True
# write encrypted chunks in file
outfile.write(cipher.encrypt(chunk))
if not pads:
outfile.write(cipher.encrypt((bs * chr(bs)).encode()))
@fumingshih I'm so sorry, I didn't see your request until today. Apparently Github doesn't notify me about comments on my own Gists :( Feel free to use this script however you like under the terms of the MIT License (Copyright Christopher Arndt 2014).
I would have released this a package on PyPI if I was confident enough about my knowledge of cryptography, but, really, this was only a coding exercise for me and I can't give any guarantees that it will not eat all your data! ;)
@Kyslik I'm not sure why you think a for loop is better here. The while loop is pretty clear and elegant IMHO.
The gist and notifications is broken thing :)
I am using for loop because I added progress.bar, but firstly I thought your script was buggy (I guess I modified it already...) and from then I just rewrote a bit + added integrity check.
@SpotlightKid Two questions:
- Why did you choose 65536 as max iterations? That seems pretty low in 2016, but then I don't really know what I'm talking about. http://stackoverflow.com/questions/6054082/recommended-of-iterations-when-using-pbkdf2-sha256
- Why did you use the pbkdf2 package instead of
from Crypto.Protocol.KDF import PBKDF2?
@dmwyatt: Again, sorry for the late response, I don't get notified about comments. For further comments, please considers dropping me a note (contact info via the link on my GH profile).
re 1.) a) This written in 2014. b) I chose to encode the iterations into the header as an unsigned word. If more iterations are desired, This would need to ne changed.
re 2.) Don't know, probably I just didn't know about this sub-module. PyCrypto's documentation isn't exactly user friendly.
Hi , this is not working on python 2.7
$ ./aescrypt.py config.ini config.ini.dat
Traceback (most recent call last):
File "./aescrypt.py", line 42, in <module>
from pbkdf2 import PBKDF2
ImportError: No module named pbkdf2
@ashraful1980: Please try Python 3.5+ instead of Python 2.7.
Alternatively, try removing unicode_literals from line 5.
@SpotlightKid what are the open ssl command to encrypt and decrypt the above generated files?
@dineshdad: Sorry, I don't know the answer to that.
Hi Chris. Found this script for aes encryption through your stackoverflow discussion. Thank you for this nice example! Could I reuse this code for non-commercial purpose? Do you have any license restriction for this piece of code you wrote? If there's any license statement, please let me know, so I can include them.