Skip to content

Instantly share code, notes, and snippets.

@tcely
Last active January 21, 2026 18:06
Show Gist options
  • Select an option

  • Save tcely/d281889162274fa8883f2dc33e8e95de to your computer and use it in GitHub Desktop.

Select an option

Save tcely/d281889162274fa8883f2dc33e8e95de to your computer and use it in GitHub Desktop.
Asset fetcher script
# Standard library imports
import hashlib
import platform
import re
import shutil
import sys
import tempfile
import time
from datetime import datetime
from email.utils import parsedate_to_datetime
from pathlib import Path
# Third-party imports
import requests
# Host configuration for the GitHub API
API_HOST = "https://api.github.com"
# DL_CHUNK_SIZE: 2MiB balances network latency, TCP window scaling, and local storage
# write-throughput constraints for modern high-speed infrastructure.
DL_CHUNK_SIZE = (1024 * 1024) * 2 # MiB
# HASH_CHUNK_SIZE: 256KiB matches the private L2 cache of the Ivy Bridge Xeon processor family.
# NOTE: While 256KiB fits in L2, 2MiB is the recommended limit for staying within
# the L3 cache slice requirements for most legacy and modern Xeon processors.
HASH_CHUNK_SIZE = (1024) * 256 # KiB
def format_size(bytes_size):
"""Converts a byte count into a human-readable string (MiB or KiB)."""
if bytes_size >= (1024 * 1024):
return f"{bytes_size / (1024 * 1024):.0f}MiB"
return f"{bytes_size / 1024:.0f}KiB"
def get_supported_algorithm(prefix):
"""
Accepts an algorithm prefix and returns the standard hashlib name if supported.
This allows the script to map non-standard forms (like 'SHA-256' or 'sha_256')
to the specific format required by hashlib ('sha256').
"""
def normalize(s):
return s.lower().replace("-", "").replace("_", "")
# 1. Normalize the input prefix
search_prefix = normalize(prefix)
# 2. Build map of available hashlib algorithms
algo_map = {normalize(a): a for a in hashlib.algorithms_available}
# 3. Return the standard hashlib name or None
return algo_map.get(search_prefix)
def parse_digest(digest_raw):
"""
Helper to extract algorithm and hash from raw digest strings.
Returns a tuple of (actual_algo, expected_hash, original_prefix).
Defaults to 'sha256' if no colon separator is found.
"""
prefix, expected_hash = digest_raw.split(":", 1) if ":" in digest_raw else ("sha256", digest_raw)
actual_algo = get_supported_algorithm(prefix)
return actual_algo, expected_hash, prefix
def get_file_hash(file_path, algorithm_name):
"""Calculate file checksum using the specified algorithm and cache-aligned chunks."""
hash_obj = hashlib.new(algorithm_name)
with open(file_path, "rb") as f:
# iter(lambda: ...) reads the file in chunks until EOF (represented by b"")
# The lambda creates an anonymous function called by iter() repeatedly.
# The sentinel b"" tells iter() to stop when f.read() returns an empty byte string at EOF.
for byte_block in iter(lambda: f.read(HASH_CHUNK_SIZE), b""):
hash_obj.update(byte_block)
return hash_obj.hexdigest()
def version_key(version_str):
"""Converts a version string into a sortable tuple of integers."""
# Strip leading 'v' and split by non-digit characters to handle 1.2.3-alpha
parts = re.findall(r'\d+', version_str)
return tuple(map(int, parts))
def resolve_version(repo, version):
"""
Finds the semantically latest matching tag (e.g., '1' -> '1.10.5')
within the June 2025 Digest support window.
"""
if version.lower() == "latest":
return "latest", f"{API_HOST}/repos/{repo}/releases/latest"
base_v = version[1:] if version.lower().startswith('v') else version
prefixes = (f"{base_v}.", f"v{base_v}.")
# GitHub added native asset digests on June 3, 2025.
# Releases before this date lack the 'digest' field.
digest_cutoff = datetime(2025, 6, 3)
url = f"{API_HOST}/repos/{repo}/releases?per_page=100"
matches = []
while url:
response = requests.get(url)
response.raise_for_status()
releases = response.json()
if not releases:
break
for rel in releases:
tag = rel['tag_name']
pub_date_str = rel.get("published_at")
# Stop if release predates digest support
if pub_date_str:
pub_date = datetime.fromisoformat(pub_date_str.replace("Z", "+00:00"))
if pub_date < digest_cutoff:
url = None # Break outer loop
break
# Collect all matches in the allowed time window
if tag == version or tag.startswith(prefixes):
matches.append(rel)
# When all releases are after the cutoff, proceed with the next page
if url:
url = response.links.get('next', {}).get('url')
if matches:
# Sort semantically using the version_key helper
matches.sort(key=lambda r: version_key(r['tag_name']), reverse=True)
best_match = matches[0]
if best_match['tag_name'] != version:
print(f"Notice: Resolved '{version}' to semantically latest '{best_match['tag_name']}'.")
return best_match['tag_name'], best_match['url']
# Fallback to direct tag lookup if no matches found in window
return version, f"{API_HOST}/repos/{repo}/releases/tags/{version}"
def replace_with_capped_backoff(staging_path: Path, final_path: Path, retries: int = 9):
"""
Performs atomic replacement with a base-N exponential backoff.
Expected Outcomes (retries=9):
- Base 2: 1, 2, 4, 8, 16, 32, 64, 128, 256s. (Never hits 1200s cap).
- Base 3: 1, 3, 9, 27, 81, 243, 729, 1200s, 1200s. (Hits cap at i=7).
- Base 4: 1, 4, 16, 64, 256, 1024, 1200s, 1200s, 1200s. (Hits cap at i=6).
"""
MAX_SLEEP = (60) * 20 # minutes
base = 2 + (time.monotonic_ns() % 3)
for i in range(retries):
try:
# Atomic replacement (metadata-only on same filesystem)
staging_path.replace(final_path)
return True
except PermissionError:
if (1 + i) >= retries:
raise
time.sleep(min(base ** i, MAX_SLEEP))
def download_and_verify(version):
repo = "Brainicism/bgutil-ytdlp-pot-provider"
# 1. Detect OS (mapped to repo asset naming conventions)
os_name = platform.system().lower()
if "darwin" == os_name: os_name = "macos"
# Resolve the version using fuzzy patterns or use the latest endpoint
resolved_v, api_url = resolve_version(repo, version)
# Fetch the release metadata
response = requests.get(api_url)
# raise_for_status() checks the HTTP response code. If the request was
# unsuccessful (e.g., 404 Not Found or 500 Server Error), it raises a
# requests.exceptions.HTTPError, halting execution to prevent processing invalid data.
response.raise_for_status()
release_data = response.json()
if "latest" == resolved_v:
print(f"Found latest version: {release_data.get('tag_name')}")
# 2. Identify the correct asset for the current OS
asset = next((a for a in release_data.get("assets", [])
if os_name in a["name"].lower()), None)
if not asset or "digest" not in asset:
print(f"Error: Asset or digest missing for {os_name} in release {resolved_v}.")
return
# 3. Fuzzy Match the Algorithm Prefix via helper function
actual_algo, expected_hash, prefix = parse_digest(asset["digest"])
if not actual_algo:
print(f"Error: Algorithm '{prefix}' is not supported by this system's hashlib.")
return
# Case-sensitive conditional output for translation info
trans_info = f" (translated from '{prefix}')" if actual_algo != prefix else ""
print(f"Algorithm: {actual_algo.upper()}{trans_info}")
print(f"Expected Hash: {expected_hash}")
# 4. Download to Temporary File
tmp_path = None
dl_buffer_label = format_size(DL_CHUNK_SIZE)
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_path = Path(tmp_file.name)
print(f"Downloading {asset['name']} (Buffer: {dl_buffer_label})...")
with requests.get(asset["browser_download_url"], stream=True) as r:
# Re-verify status for the binary download stream
r.raise_for_status()
last_modified = r.headers.get('Last-Modified')
for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE):
tmp_file.write(chunk)
if last_modified:
# 1. Parse the remote date to a Unix timestamp
remote_mtime = parsedate_to_datetime(last_modified).timestamp()
# 2. Update the mtime directly
# Note: 'exist_ok=True' ensures it doesn't fail because the file is already there
tmp_path.touch(mtime=remote_mtime, exist_ok=True)
staging_path = None
try:
# 5. Verify Hash Integrity
hash_buffer_label = format_size(HASH_CHUNK_SIZE)
print(f"Verifying {actual_algo.upper()} (Buffer: {hash_buffer_label})...")
actual_hash = get_file_hash(tmp_path, actual_algo)
if actual_hash != expected_hash:
print(f"CRITICAL: {actual_algo.upper()} mismatch! Purging temporary file.")
tmp_path.unlink(missing_ok=True)
return
# 6. Finalize: Atomic replacement via staging file in the target directory
final_path = Path(asset["name"])
target_dir = final_path.absolute().parent.resolve(strict=True)
# Using NamedTemporaryFile guarantees a unique non-existing path
# on the same partition, ensuring the subsequent replace is atomic.
with tempfile.NamedTemporaryFile(dir=target_dir, prefix=f"{final_path}.tmp.", delete=False) as staging_file:
staging_path = Path(staging_file.name)
# Cross-device copy from system tmp to destination-specific staging file
shutil.copy2(tmp_path, staging_path)
# Unlink (remove) the first temporary file immediately after successful copy
tmp_path.unlink(missing_ok=True)
tmp_path = None # Mark as finished to prevent cleanup in 'finally'
# Set permissions (0o755 = rwxr-xr-x) on the staging file
staging_path.chmod(0o755)
# Atomic rename ensures target file is never in a half-written state
replace_with_capped_backoff(staging_path, final_path)
staging_path = None # Mark as finished to prevent cleanup in 'finally'
print(f"Successfully verified and saved: {final_path}")
finally:
# Cleanup any remaining temporary files on error
if 'tmp_path' in locals() and tmp_path:
tmp_path.unlink(missing_ok=True)
if staging_path:
staging_path.unlink(missing_ok=True)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python3 install_asset.py <latest | version_tag>")
sys.exit(1)
else:
# Standard error handling for network/API issues
try:
download_and_verify(sys.argv[1])
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print(f"Error 404: The version '{sys.argv[1]}' could not be resolved.")
else:
print(f"Network Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment