Last active
January 21, 2026 18:06
-
-
Save tcely/d281889162274fa8883f2dc33e8e95de to your computer and use it in GitHub Desktop.
Asset fetcher script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Standard library imports | |
| import hashlib | |
| import platform | |
| import re | |
| import shutil | |
| import sys | |
| import tempfile | |
| import time | |
| from datetime import datetime | |
| from email.utils import parsedate_to_datetime | |
| from pathlib import Path | |
| # Third-party imports | |
| import requests | |
| # Host configuration for the GitHub API | |
| API_HOST = "https://api.github.com" | |
| # DL_CHUNK_SIZE: 2MiB balances network latency, TCP window scaling, and local storage | |
| # write-throughput constraints for modern high-speed infrastructure. | |
| DL_CHUNK_SIZE = (1024 * 1024) * 2 # MiB | |
| # HASH_CHUNK_SIZE: 256KiB matches the private L2 cache of the Ivy Bridge Xeon processor family. | |
| # NOTE: While 256KiB fits in L2, 2MiB is the recommended limit for staying within | |
| # the L3 cache slice requirements for most legacy and modern Xeon processors. | |
| HASH_CHUNK_SIZE = (1024) * 256 # KiB | |
| def format_size(bytes_size): | |
| """Converts a byte count into a human-readable string (MiB or KiB).""" | |
| if bytes_size >= (1024 * 1024): | |
| return f"{bytes_size / (1024 * 1024):.0f}MiB" | |
| return f"{bytes_size / 1024:.0f}KiB" | |
| def get_supported_algorithm(prefix): | |
| """ | |
| Accepts an algorithm prefix and returns the standard hashlib name if supported. | |
| This allows the script to map non-standard forms (like 'SHA-256' or 'sha_256') | |
| to the specific format required by hashlib ('sha256'). | |
| """ | |
| def normalize(s): | |
| return s.lower().replace("-", "").replace("_", "") | |
| # 1. Normalize the input prefix | |
| search_prefix = normalize(prefix) | |
| # 2. Build map of available hashlib algorithms | |
| algo_map = {normalize(a): a for a in hashlib.algorithms_available} | |
| # 3. Return the standard hashlib name or None | |
| return algo_map.get(search_prefix) | |
| def parse_digest(digest_raw): | |
| """ | |
| Helper to extract algorithm and hash from raw digest strings. | |
| Returns a tuple of (actual_algo, expected_hash, original_prefix). | |
| Defaults to 'sha256' if no colon separator is found. | |
| """ | |
| prefix, expected_hash = digest_raw.split(":", 1) if ":" in digest_raw else ("sha256", digest_raw) | |
| actual_algo = get_supported_algorithm(prefix) | |
| return actual_algo, expected_hash, prefix | |
| def get_file_hash(file_path, algorithm_name): | |
| """Calculate file checksum using the specified algorithm and cache-aligned chunks.""" | |
| hash_obj = hashlib.new(algorithm_name) | |
| with open(file_path, "rb") as f: | |
| # iter(lambda: ...) reads the file in chunks until EOF (represented by b"") | |
| # The lambda creates an anonymous function called by iter() repeatedly. | |
| # The sentinel b"" tells iter() to stop when f.read() returns an empty byte string at EOF. | |
| for byte_block in iter(lambda: f.read(HASH_CHUNK_SIZE), b""): | |
| hash_obj.update(byte_block) | |
| return hash_obj.hexdigest() | |
| def version_key(version_str): | |
| """Converts a version string into a sortable tuple of integers.""" | |
| # Strip leading 'v' and split by non-digit characters to handle 1.2.3-alpha | |
| parts = re.findall(r'\d+', version_str) | |
| return tuple(map(int, parts)) | |
| def resolve_version(repo, version): | |
| """ | |
| Finds the semantically latest matching tag (e.g., '1' -> '1.10.5') | |
| within the June 2025 Digest support window. | |
| """ | |
| if version.lower() == "latest": | |
| return "latest", f"{API_HOST}/repos/{repo}/releases/latest" | |
| base_v = version[1:] if version.lower().startswith('v') else version | |
| prefixes = (f"{base_v}.", f"v{base_v}.") | |
| # GitHub added native asset digests on June 3, 2025. | |
| # Releases before this date lack the 'digest' field. | |
| digest_cutoff = datetime(2025, 6, 3) | |
| url = f"{API_HOST}/repos/{repo}/releases?per_page=100" | |
| matches = [] | |
| while url: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| releases = response.json() | |
| if not releases: | |
| break | |
| for rel in releases: | |
| tag = rel['tag_name'] | |
| pub_date_str = rel.get("published_at") | |
| # Stop if release predates digest support | |
| if pub_date_str: | |
| pub_date = datetime.fromisoformat(pub_date_str.replace("Z", "+00:00")) | |
| if pub_date < digest_cutoff: | |
| url = None # Break outer loop | |
| break | |
| # Collect all matches in the allowed time window | |
| if tag == version or tag.startswith(prefixes): | |
| matches.append(rel) | |
| # When all releases are after the cutoff, proceed with the next page | |
| if url: | |
| url = response.links.get('next', {}).get('url') | |
| if matches: | |
| # Sort semantically using the version_key helper | |
| matches.sort(key=lambda r: version_key(r['tag_name']), reverse=True) | |
| best_match = matches[0] | |
| if best_match['tag_name'] != version: | |
| print(f"Notice: Resolved '{version}' to semantically latest '{best_match['tag_name']}'.") | |
| return best_match['tag_name'], best_match['url'] | |
| # Fallback to direct tag lookup if no matches found in window | |
| return version, f"{API_HOST}/repos/{repo}/releases/tags/{version}" | |
| def replace_with_capped_backoff(staging_path: Path, final_path: Path, retries: int = 9): | |
| """ | |
| Performs atomic replacement with a base-N exponential backoff. | |
| Expected Outcomes (retries=9): | |
| - Base 2: 1, 2, 4, 8, 16, 32, 64, 128, 256s. (Never hits 1200s cap). | |
| - Base 3: 1, 3, 9, 27, 81, 243, 729, 1200s, 1200s. (Hits cap at i=7). | |
| - Base 4: 1, 4, 16, 64, 256, 1024, 1200s, 1200s, 1200s. (Hits cap at i=6). | |
| """ | |
| MAX_SLEEP = (60) * 20 # minutes | |
| base = 2 + (time.monotonic_ns() % 3) | |
| for i in range(retries): | |
| try: | |
| # Atomic replacement (metadata-only on same filesystem) | |
| staging_path.replace(final_path) | |
| return True | |
| except PermissionError: | |
| if (1 + i) >= retries: | |
| raise | |
| time.sleep(min(base ** i, MAX_SLEEP)) | |
| def download_and_verify(version): | |
| repo = "Brainicism/bgutil-ytdlp-pot-provider" | |
| # 1. Detect OS (mapped to repo asset naming conventions) | |
| os_name = platform.system().lower() | |
| if "darwin" == os_name: os_name = "macos" | |
| # Resolve the version using fuzzy patterns or use the latest endpoint | |
| resolved_v, api_url = resolve_version(repo, version) | |
| # Fetch the release metadata | |
| response = requests.get(api_url) | |
| # raise_for_status() checks the HTTP response code. If the request was | |
| # unsuccessful (e.g., 404 Not Found or 500 Server Error), it raises a | |
| # requests.exceptions.HTTPError, halting execution to prevent processing invalid data. | |
| response.raise_for_status() | |
| release_data = response.json() | |
| if "latest" == resolved_v: | |
| print(f"Found latest version: {release_data.get('tag_name')}") | |
| # 2. Identify the correct asset for the current OS | |
| asset = next((a for a in release_data.get("assets", []) | |
| if os_name in a["name"].lower()), None) | |
| if not asset or "digest" not in asset: | |
| print(f"Error: Asset or digest missing for {os_name} in release {resolved_v}.") | |
| return | |
| # 3. Fuzzy Match the Algorithm Prefix via helper function | |
| actual_algo, expected_hash, prefix = parse_digest(asset["digest"]) | |
| if not actual_algo: | |
| print(f"Error: Algorithm '{prefix}' is not supported by this system's hashlib.") | |
| return | |
| # Case-sensitive conditional output for translation info | |
| trans_info = f" (translated from '{prefix}')" if actual_algo != prefix else "" | |
| print(f"Algorithm: {actual_algo.upper()}{trans_info}") | |
| print(f"Expected Hash: {expected_hash}") | |
| # 4. Download to Temporary File | |
| tmp_path = None | |
| dl_buffer_label = format_size(DL_CHUNK_SIZE) | |
| with tempfile.NamedTemporaryFile(delete=False) as tmp_file: | |
| tmp_path = Path(tmp_file.name) | |
| print(f"Downloading {asset['name']} (Buffer: {dl_buffer_label})...") | |
| with requests.get(asset["browser_download_url"], stream=True) as r: | |
| # Re-verify status for the binary download stream | |
| r.raise_for_status() | |
| last_modified = r.headers.get('Last-Modified') | |
| for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE): | |
| tmp_file.write(chunk) | |
| if last_modified: | |
| # 1. Parse the remote date to a Unix timestamp | |
| remote_mtime = parsedate_to_datetime(last_modified).timestamp() | |
| # 2. Update the mtime directly | |
| # Note: 'exist_ok=True' ensures it doesn't fail because the file is already there | |
| tmp_path.touch(mtime=remote_mtime, exist_ok=True) | |
| staging_path = None | |
| try: | |
| # 5. Verify Hash Integrity | |
| hash_buffer_label = format_size(HASH_CHUNK_SIZE) | |
| print(f"Verifying {actual_algo.upper()} (Buffer: {hash_buffer_label})...") | |
| actual_hash = get_file_hash(tmp_path, actual_algo) | |
| if actual_hash != expected_hash: | |
| print(f"CRITICAL: {actual_algo.upper()} mismatch! Purging temporary file.") | |
| tmp_path.unlink(missing_ok=True) | |
| return | |
| # 6. Finalize: Atomic replacement via staging file in the target directory | |
| final_path = Path(asset["name"]) | |
| target_dir = final_path.absolute().parent.resolve(strict=True) | |
| # Using NamedTemporaryFile guarantees a unique non-existing path | |
| # on the same partition, ensuring the subsequent replace is atomic. | |
| with tempfile.NamedTemporaryFile(dir=target_dir, prefix=f"{final_path}.tmp.", delete=False) as staging_file: | |
| staging_path = Path(staging_file.name) | |
| # Cross-device copy from system tmp to destination-specific staging file | |
| shutil.copy2(tmp_path, staging_path) | |
| # Unlink (remove) the first temporary file immediately after successful copy | |
| tmp_path.unlink(missing_ok=True) | |
| tmp_path = None # Mark as finished to prevent cleanup in 'finally' | |
| # Set permissions (0o755 = rwxr-xr-x) on the staging file | |
| staging_path.chmod(0o755) | |
| # Atomic rename ensures target file is never in a half-written state | |
| replace_with_capped_backoff(staging_path, final_path) | |
| staging_path = None # Mark as finished to prevent cleanup in 'finally' | |
| print(f"Successfully verified and saved: {final_path}") | |
| finally: | |
| # Cleanup any remaining temporary files on error | |
| if 'tmp_path' in locals() and tmp_path: | |
| tmp_path.unlink(missing_ok=True) | |
| if staging_path: | |
| staging_path.unlink(missing_ok=True) | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2: | |
| print("Usage: python3 install_asset.py <latest | version_tag>") | |
| sys.exit(1) | |
| else: | |
| # Standard error handling for network/API issues | |
| try: | |
| download_and_verify(sys.argv[1]) | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 404: | |
| print(f"Error 404: The version '{sys.argv[1]}' could not be resolved.") | |
| else: | |
| print(f"Network Error: {e}") | |
| except Exception as e: | |
| print(f"An unexpected error occurred: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment