Skip to content

Instantly share code, notes, and snippets.

@samvv
Last active February 10, 2025 18:39
Show Gist options
  • Select an option

  • Save samvv/8c4724b6f585a299562536c5bcde83c3 to your computer and use it in GitHub Desktop.

Select an option

Save samvv/8c4724b6f585a299562536c5bcde83c3 to your computer and use it in GitHub Desktop.
Download a file in Python 3 with progress reporting
#!/usr/bin/env python3
from tarfile import TarFile
from typing import Callable, TypeVar, cast, Any
import math
import sys, os
import argparse
from urllib.parse import urlparse, urlunparse, ParseResult
import errno
from pathlib import Path
from hashlib import sha512
from tempfile import TemporaryDirectory
import shutil
import urllib3
# Name of the app that is downloading things
# Used for e.g. creating a directory in the home directory of the user.
APP_NAME = 'myapp'
http = urllib3.PoolManager()
_homedir = Path.home() / '.cache' / APP_NAME
def download(url, dest: Path | None = None, chunk_size=1024 * 50) -> Path:
if isinstance(url, ParseResult):
parsed = url
url = urlunparse(url)
else:
url = str(url)
parsed = urlparse(url)
if dest is None:
extnames = ''.join(f'.{ext}' for ext in parsed.path.split('/')[-1].split('.')[1:])
filename = sha512(url.encode('utf8')).hexdigest()
out_file_path = _homedir / 'downloads' / (filename + extnames)
out_file_final_path = _homedir / 'downloads' / (filename + '-complete' + extnames)
else:
out_file_path = dest.parent / (dest.name + '.downloading')
out_file_final_path = dest
if out_file_final_path.exists():
return out_file_final_path
mkdirp(out_file_path.parent)
try:
start_byte = out_file_path.stat().st_size
except OSError as e:
if e.errno == errno.ENOENT:
start_byte = 0
else:
raise e
headers = dict[str, str]()
headers['Range'] = f'bytes={start_byte}-'
req = http.request('GET', url, preload_content=False, headers=headers)
total_bytes = int(req.headers['Content-Length']) if 'Content-Length' in req.headers else None
if 'Content-Range' not in req.headers:
start_wbyte = 0
rimraf(out_file_path)
bytes_read = start_byte
with open(out_file_path, 'ab') as f:
while True:
out = ''
if total_bytes is not None:
out += f'[{math.floor((bytes_read / total_bytes) * 100)}%] '
out += f'Downloading {url} ({humanbytes(bytes_read)})'
print(out, end='\r')
buf = req.read(chunk_size)
if not buf:
break
bytes_read += len(buf)
f.write(buf)
sys.stdout.write('\n')
os.rename(out_file_path, out_file_final_path)
return out_file_final_path
type PathFilter = Callable[[str], str | None]
def extract_tar(tar: TarFile, dest, update_path=None, chunk_size=1024 * 50):
if update_path is None:
update_path = identity
dest_path = Path(dest)
while True:
f = tar.next()
if f is None:
break
updated_path = update_path(f.name)
if updated_path is None:
continue
out_path = dest_path / updated_path
if f.isfile():
try:
mtime = out_path.stat().st_mtime
except OSError as e:
if e.errno == errno.ENOENT:
mtime = None
else:
raise e
if mtime is None or f.mtime > mtime:
print(f"Extracting {f.name}", end='\r')
mkdirp(out_path.parent)
with open(out_path, 'wb') as out, nonnull(tar.extractfile(f)) as i:
bytes_written = 0
while True:
if f.size > 1024 * 1024 * 5:
progress = bytes_written / f.size
print(f"[{progress:.2f}] Extracting {f.name}", end='\r')
buf = i.read(chunk_size)
if not buf:
break
bytes_written += len(buf)
out.write(buf)
os.utime(out_path, (f.mtime, f.mtime))
os.chmod(out_path, f.mode)
else:
print(f"Skipping {f.name}", end='\r')
elif f.isdir():
mkdirp(out_path)
os.utime(out_path, (f.mtime, f.mtime))
os.chmod(out_path, f.mode)
elif f.issym():
mkdirp(out_path.parent)
rimraf(out_path) # FIXME Necessary?
os.symlink(f.linkname, out_path)
else:
raise ValueError(f'unsupported tar entry for {f.name}')
_T = TypeVar('_T')
def identity(x: _T) -> _T:
return x
def nonnull(value: _T | None) -> _T:
assert(value is not None)
return value
def extract(filepath: str | Path, dest: str | Path, strip_path=0) -> None:
filepath = Path(filepath)
dest = Path(dest)
compression = []
def update_path(p: str) -> str | None:
chunks = p.split('/')
if len(chunks) > strip_path:
return os.path.sep.join(chunks[strip_path:])
with TemporaryDirectory('-extracted', prefix=filepath.stem) as d:
for ext in reversed(filepath.suffixes):
if ext == '.zip':
from zipfile import ZipFile
with ZipFile(filepath) as f:
f.extractall(d)
print(d)
break
elif ext == '.gz':
compression.append('gz')
elif ext == '.tar':
import tarfile
with tarfile.open(filepath, 'r') as tar:
extract_tar(tar, dest, update_path=update_path)
def is_extractable(filepath: Path) -> bool:
exts = filepath.name.split('.')[1:]
for ext in exts:
if ext == 'xz' or ext == 'gz' or ext == 'bz' or ext == 'bz2' or ext == 'lz':
continue
if ext == 'zip' or ext == 'tar':
return True
break
return False
def mkdirp(filepath: Path) -> None:
Path(filepath).mkdir(parents=True, exist_ok=True)
_UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']
def humanbytes(byte_count: int) -> str:
if byte_count == 0:
return '0 B'
i = math.floor(math.log(byte_count, 1024))
return f'{(byte_count / pow(1024, i)):.2f} {_UNITS[i]}'
def rimraf(filepath: Path):
if filepath == Path.cwd():
raise RuntimeError(f'refusing to remove {filepath}: path is the current working directory')
if filepath == filepath.root:
raise RuntimeError(f'refusing to remove {filepath}: path points to an entire drive')
if filepath == Path.home():
raise RuntimeError(f'refusing to remove {filepath}: path points to a home directory')
shutil.rmtree(filepath, ignore_errors=True)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser()
parser.add_argument('url', metavar='URL', help="The resource to download")
parser.add_argument('dest', nargs='?', metavar='DEST', default='.', help="Target directory or file name")
parsed = parser.parse_args(argv)
url = urlparse(parsed.url)
basename = url.path.split('/')[-1]
dest = Path(basename if parsed.dest is None else parsed.dest)
if dest.is_dir():
dest /= basename
if dest.exists():
print(f'Error: file {dest} already exists')
return 1
filepath = download(url)
if is_extractable(filepath):
extract(filepath, strip_path=1, dest=dest)
else:
shutil.copy(filepath, dest)
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment