Last active
February 10, 2025 18:39
-
-
Save samvv/8c4724b6f585a299562536c5bcde83c3 to your computer and use it in GitHub Desktop.
Download a file in Python 3 with progress reporting
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from tarfile import TarFile | |
| from typing import Callable, TypeVar, cast, Any | |
| import math | |
| import sys, os | |
| import argparse | |
| from urllib.parse import urlparse, urlunparse, ParseResult | |
| import errno | |
| from pathlib import Path | |
| from hashlib import sha512 | |
| from tempfile import TemporaryDirectory | |
| import shutil | |
| import urllib3 | |
| # Name of the app that is downloading things | |
| # Used for e.g. creating a directory in the home directory of the user. | |
| APP_NAME = 'myapp' | |
| http = urllib3.PoolManager() | |
| _homedir = Path.home() / '.cache' / APP_NAME | |
| def download(url, dest: Path | None = None, chunk_size=1024 * 50) -> Path: | |
| if isinstance(url, ParseResult): | |
| parsed = url | |
| url = urlunparse(url) | |
| else: | |
| url = str(url) | |
| parsed = urlparse(url) | |
| if dest is None: | |
| extnames = ''.join(f'.{ext}' for ext in parsed.path.split('/')[-1].split('.')[1:]) | |
| filename = sha512(url.encode('utf8')).hexdigest() | |
| out_file_path = _homedir / 'downloads' / (filename + extnames) | |
| out_file_final_path = _homedir / 'downloads' / (filename + '-complete' + extnames) | |
| else: | |
| out_file_path = dest.parent / (dest.name + '.downloading') | |
| out_file_final_path = dest | |
| if out_file_final_path.exists(): | |
| return out_file_final_path | |
| mkdirp(out_file_path.parent) | |
| try: | |
| start_byte = out_file_path.stat().st_size | |
| except OSError as e: | |
| if e.errno == errno.ENOENT: | |
| start_byte = 0 | |
| else: | |
| raise e | |
| headers = dict[str, str]() | |
| headers['Range'] = f'bytes={start_byte}-' | |
| req = http.request('GET', url, preload_content=False, headers=headers) | |
| total_bytes = int(req.headers['Content-Length']) if 'Content-Length' in req.headers else None | |
| if 'Content-Range' not in req.headers: | |
| start_wbyte = 0 | |
| rimraf(out_file_path) | |
| bytes_read = start_byte | |
| with open(out_file_path, 'ab') as f: | |
| while True: | |
| out = '' | |
| if total_bytes is not None: | |
| out += f'[{math.floor((bytes_read / total_bytes) * 100)}%] ' | |
| out += f'Downloading {url} ({humanbytes(bytes_read)})' | |
| print(out, end='\r') | |
| buf = req.read(chunk_size) | |
| if not buf: | |
| break | |
| bytes_read += len(buf) | |
| f.write(buf) | |
| sys.stdout.write('\n') | |
| os.rename(out_file_path, out_file_final_path) | |
| return out_file_final_path | |
| type PathFilter = Callable[[str], str | None] | |
| def extract_tar(tar: TarFile, dest, update_path=None, chunk_size=1024 * 50): | |
| if update_path is None: | |
| update_path = identity | |
| dest_path = Path(dest) | |
| while True: | |
| f = tar.next() | |
| if f is None: | |
| break | |
| updated_path = update_path(f.name) | |
| if updated_path is None: | |
| continue | |
| out_path = dest_path / updated_path | |
| if f.isfile(): | |
| try: | |
| mtime = out_path.stat().st_mtime | |
| except OSError as e: | |
| if e.errno == errno.ENOENT: | |
| mtime = None | |
| else: | |
| raise e | |
| if mtime is None or f.mtime > mtime: | |
| print(f"Extracting {f.name}", end='\r') | |
| mkdirp(out_path.parent) | |
| with open(out_path, 'wb') as out, nonnull(tar.extractfile(f)) as i: | |
| bytes_written = 0 | |
| while True: | |
| if f.size > 1024 * 1024 * 5: | |
| progress = bytes_written / f.size | |
| print(f"[{progress:.2f}] Extracting {f.name}", end='\r') | |
| buf = i.read(chunk_size) | |
| if not buf: | |
| break | |
| bytes_written += len(buf) | |
| out.write(buf) | |
| os.utime(out_path, (f.mtime, f.mtime)) | |
| os.chmod(out_path, f.mode) | |
| else: | |
| print(f"Skipping {f.name}", end='\r') | |
| elif f.isdir(): | |
| mkdirp(out_path) | |
| os.utime(out_path, (f.mtime, f.mtime)) | |
| os.chmod(out_path, f.mode) | |
| elif f.issym(): | |
| mkdirp(out_path.parent) | |
| rimraf(out_path) # FIXME Necessary? | |
| os.symlink(f.linkname, out_path) | |
| else: | |
| raise ValueError(f'unsupported tar entry for {f.name}') | |
| _T = TypeVar('_T') | |
| def identity(x: _T) -> _T: | |
| return x | |
| def nonnull(value: _T | None) -> _T: | |
| assert(value is not None) | |
| return value | |
| def extract(filepath: str | Path, dest: str | Path, strip_path=0) -> None: | |
| filepath = Path(filepath) | |
| dest = Path(dest) | |
| compression = [] | |
| def update_path(p: str) -> str | None: | |
| chunks = p.split('/') | |
| if len(chunks) > strip_path: | |
| return os.path.sep.join(chunks[strip_path:]) | |
| with TemporaryDirectory('-extracted', prefix=filepath.stem) as d: | |
| for ext in reversed(filepath.suffixes): | |
| if ext == '.zip': | |
| from zipfile import ZipFile | |
| with ZipFile(filepath) as f: | |
| f.extractall(d) | |
| print(d) | |
| break | |
| elif ext == '.gz': | |
| compression.append('gz') | |
| elif ext == '.tar': | |
| import tarfile | |
| with tarfile.open(filepath, 'r') as tar: | |
| extract_tar(tar, dest, update_path=update_path) | |
| def is_extractable(filepath: Path) -> bool: | |
| exts = filepath.name.split('.')[1:] | |
| for ext in exts: | |
| if ext == 'xz' or ext == 'gz' or ext == 'bz' or ext == 'bz2' or ext == 'lz': | |
| continue | |
| if ext == 'zip' or ext == 'tar': | |
| return True | |
| break | |
| return False | |
| def mkdirp(filepath: Path) -> None: | |
| Path(filepath).mkdir(parents=True, exist_ok=True) | |
| _UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'] | |
| def humanbytes(byte_count: int) -> str: | |
| if byte_count == 0: | |
| return '0 B' | |
| i = math.floor(math.log(byte_count, 1024)) | |
| return f'{(byte_count / pow(1024, i)):.2f} {_UNITS[i]}' | |
| def rimraf(filepath: Path): | |
| if filepath == Path.cwd(): | |
| raise RuntimeError(f'refusing to remove {filepath}: path is the current working directory') | |
| if filepath == filepath.root: | |
| raise RuntimeError(f'refusing to remove {filepath}: path points to an entire drive') | |
| if filepath == Path.home(): | |
| raise RuntimeError(f'refusing to remove {filepath}: path points to a home directory') | |
| shutil.rmtree(filepath, ignore_errors=True) | |
| def main(argv: list[str] | None = None) -> int: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('url', metavar='URL', help="The resource to download") | |
| parser.add_argument('dest', nargs='?', metavar='DEST', default='.', help="Target directory or file name") | |
| parsed = parser.parse_args(argv) | |
| url = urlparse(parsed.url) | |
| basename = url.path.split('/')[-1] | |
| dest = Path(basename if parsed.dest is None else parsed.dest) | |
| if dest.is_dir(): | |
| dest /= basename | |
| if dest.exists(): | |
| print(f'Error: file {dest} already exists') | |
| return 1 | |
| filepath = download(url) | |
| if is_extractable(filepath): | |
| extract(filepath, strip_path=1, dest=dest) | |
| else: | |
| shutil.copy(filepath, dest) | |
| return 0 | |
| if __name__ == '__main__': | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment