In my case, I download about 80k jpg files from S3 with three different method:
| method | total download time | avg. files download/sec |
|---|---|---|
| Seq | ~7hr | 3.2 |
| Async | ~40min | 35 |
| Async+MP | ~10min | 135 |
In my case, I download about 80k jpg files from S3 with three different method:
| method | total download time | avg. files download/sec |
|---|---|---|
| Seq | ~7hr | 3.2 |
| Async | ~40min | 35 |
| Async+MP | ~10min | 135 |
| import os | |
| import asyncio | |
| import aiobotocore | |
| import io | |
| from PIL import Image | |
| AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] | |
| AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] | |
| async def getS3Obj(client, s3_bucket, s3_key): | |
| """send request and retrieve the obj from S3""" | |
| resp = await client.get_object( | |
| Bucket=s3_bucket, | |
| Key=s3_key | |
| ) | |
| obj = await resp['Body'].read() | |
| return obj | |
| async def downloadImg(client, s3_bucket, s3_key, save_path): | |
| """convert the obj to image and save""" | |
| obj = await getS3Obj(client, s3_bucket, s3_key) | |
| img = Image.open(io.BytesIO(obj)) | |
| img.save(save_path) | |
| async def go(loop, download_list): | |
| """launch the process to download multiple files""" | |
| session = aiobotocore.get_session(loop=loop) | |
| async with session.create_client( | |
| 's3', | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| aws_access_key_id=AWS_ACCESS_KEY_ID) as client: | |
| tasks = [downloadImg(client, *file_) for file_ in download_list] | |
| await asyncio.gather(*tasks) | |
| if __name__ == "__main__": | |
| download_list = [ | |
| ("s3_bucket1", "s3_key1", "save_path1"), | |
| ("s3_bucket2", "s3_key2", "save_path2"), | |
| ("s3_bucket3", "s3_key3", "save_path3") | |
| ] | |
| loop = asyncio.new_event_loop() | |
| loop.run_until_complete(go(loop, download_list)) |
| import os | |
| import asyncio | |
| import aiobotocore | |
| import multiprocessing as mp | |
| import io | |
| from PIL import Image | |
| AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] | |
| AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] | |
| async def getS3Obj(client, s3_bucket, s3_key): | |
| """send request and retrieve the obj from S3""" | |
| resp = await client.get_object( | |
| Bucket=s3_bucket, | |
| Key=s3_key | |
| ) | |
| obj = await resp['Body'].read() | |
| return obj | |
| async def downloadImg(client, s3_bucket, s3_key, save_path): | |
| """convert the obj to image and save""" | |
| obj = await getS3Obj(client, s3_bucket, s3_key) | |
| img = Image.open(io.BytesIO(obj)) | |
| img.save(save_path) | |
| async def go(loop, download_list): | |
| """launch the process to download multiple files""" | |
| session = aiobotocore.get_session(loop=loop) | |
| async with session.create_client( | |
| 's3', | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| aws_access_key_id=AWS_ACCESS_KEY_ID) as client: | |
| tasks = [downloadImg(client, *file_) for file_ in download_list] | |
| await asyncio.gather(*tasks) | |
| def singleProcess(download_list_): | |
| """mission for single process""" | |
| loop = asyncio.new_event_loop() | |
| loop.run_until_complete(go(loop, download_list_)) | |
| if __name__ == "__main__": | |
| download_list = [ | |
| ("s3_bucket1", "s3_key1", "save_path1"), | |
| ("s3_bucket2", "s3_key2", "save_path2"), | |
| ("s3_bucket3", "s3_key3", "save_path3") | |
| ] | |
| # number of process | |
| no_mp = 8 # or no_mp = mp.cpu_count() | |
| # number of tasks | |
| no_tasks = 16 | |
| download_list_chunk = [download_list[i::no_tasks] for i in range(no_tasks)] | |
| with mp.Pool(no_mp) as p: | |
| p.map(singleProcess, download_list_chunk) |
| import os | |
| import botocore.session | |
| import io | |
| from PIL import Image | |
| AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID'] | |
| AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY'] | |
| def getS3Obj(client, s3_bucket, s3_key): | |
| """send request and retrieve the obj from S3""" | |
| resp = client.get_object( | |
| Bucket=s3_bucket, | |
| Key=s3_key | |
| ) | |
| obj = resp['Body'].read() | |
| return obj | |
| def downloadImg(client, s3_bucket, s3_key, save_path): | |
| """convert the obj to image and save""" | |
| obj = getS3Obj(client, s3_bucket, s3_key) | |
| img = Image.open(io.BytesIO(obj)) | |
| img.save(save_path) | |
| def go(download_list): | |
| """launch the process to download multiple files""" | |
| session = botocore.session.get_session() | |
| with session.create_client( | |
| 's3', | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| aws_access_key_id=AWS_ACCESS_KEY_ID) as client: | |
| tasks = [downloadImg(client, *file_) for file_ in download_list] | |
| if __name__ == "__main__": | |
| download_list = [ | |
| ("s3_bucket1", "s3_key1", "save_path1"), | |
| ("s3_bucket2", "s3_key2", "save_path2"), | |
| ("s3_bucket3", "s3_key3", "save_path3") | |
| ] | |
| go(download_list) |