Skip to content

Instantly share code, notes, and snippets.

@alek3y
Last active March 11, 2026 17:00
Show Gist options
  • Select an option

  • Save alek3y/f9f67acfb98f70a5f7585a914fbf307a to your computer and use it in GitHub Desktop.

Select an option

Save alek3y/f9f67acfb98f70a5f7585a914fbf307a to your computer and use it in GitHub Desktop.
Downloads and computes SHA256 hashes for a snapshot of the least populated entries at https://hash.minerva-archive.org
import requests
import csv
import os
import sys
import json
import random
import hashlib
import signal
import string
import queue
import atexit
from threading import Thread
from math import log2
# NOTE: The data is from https://discord.com/channels/1476739093982613686/1476739094603239458/1480711193759973458 by @haruka_ff at 2026-03-09 23:37 and md5 24d1d5e60bb5bbfb670f9fa53ad08943
DB_FILE = "minerva_public_file.tsv"
CHECKSUMS_FILE = "checksums.sha256"
DOWNLOAD_URL = "https://myrient.erista.me/files"
THREADS = 4
MAX_SIZE = 4*1024*1024*1024
CHUNK_SIZE = 256*1024
CACHE_FILE = "cache.json"
PROGRESS_DELAY = 0.2
STEEPNESS_FACTOR = 6 # Decides how much more likely jobs with lower counts should be
SPINNER = "⠋⠙⠹⠼⠴⠦⠧⠏" # See https://web.archive.org/web/20220714023717/https://antofthy.gitlab.io/info/ascii/HeartBeats_howto.txt
def human_size(size):
sizes = ["B", "KB", "MB", "GB", "TB"]
base = int(log2(size) / 10) if size > 0 else 0
unit = sizes[base]
size = round(size / 1024**base, 2)
size = f"{size:.2f}" if base > 0 else str(int(size))
return f"{size}{unit}"
def worker(job_queue, sums_queue, callback=lambda job, progress: ()):
while True:
job = job_queue.get()
job_queue.task_done()
callback(job, 0)
(path, size, count) = job
try:
with requests.get(f"{DOWNLOAD_URL}/{path}", stream=True) as response:
response.raise_for_status()
progress = 0
hasher = hashlib.sha256()
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
hasher.update(chunk)
progress += len(chunk)
try:
callback(job, progress)
except Exception:
pass
digest = hasher.hexdigest()
sums_queue.put((path, digest))
except requests.exceptions.RequestException:
sums_queue.put((path, None))
def generator(job_queue, files, weights, cache=set()):
while True:
job_queue.join()
while True:
job = random.choices(files, weights=weights, k=1)[0]
if job[0] not in cache:
job_queue.put(job)
break
def show(path, info, prefix=""):
columns = os.get_terminal_size().columns
color = "36" if info["count"] >= 5 else "33"
count = str(info["count"]).rjust(2)
progress = human_size(info["progress"]).rjust(8) # Length of "999.99MB"
size = human_size(info["size"]).rjust(8)
spinner = " "
if info["spinner"] != None:
spinner = SPINNER[info["spinner"]]
output = f"{spinner} {count}, {progress} / {size} - "
path = path[:columns-len(output)]
sys.stdout.write(f"{spinner} ")
sys.stdout.write(f"{prefix}\033[{color}m{count}\033[0m")
sys.stdout.write(f"{prefix}, \033[35m{progress}\033[0m")
sys.stdout.write(f"{prefix} / \033[35m{size}\033[0m")
sys.stdout.write(f"{prefix} - \033[90m{path}\033[0m")
sys.stdout.write(f"\033[0K\n")
sys.stdout.flush()
if __name__ == "__main__":
checksums = open(CHECKSUMS_FILE, "a")
cache = set()
if os.path.isfile(CACHE_FILE):
with open(CACHE_FILE, "r") as file:
cache = set(json.load(file))
reader = None
with open(DB_FILE, "r") as file:
reader = csv.reader(file.readlines(), delimiter="\t")
files = []
for entry in reader:
path = entry[1]
count = int(entry[2])
size = int(entry[3])
files.append((path, size, count))
files = filter(lambda file: file[0] not in cache, files) # Helps with job selection on generator
files = filter(lambda file: 0 < file[1] and file[1] < MAX_SIZE, files)
files = sorted(files, key=lambda file: (file[2], file[1]))
counts = {}
for [_, _, count] in files:
counts.setdefault(count, 0)
counts[count] += 1
max_size = max(file[1] for file in files)
max_count = max(file[2] for file in files)
weights = [
1/counts[file[2]] * (
(max_size-file[1])/max_size * (max_count-file[2])/max_count
)**STEEPNESS_FACTOR
for file in files
]
job_queue = queue.Queue()
sums_queue = queue.Queue()
reports = {}
def report(job, progress):
(path, size, count) = job
reports[path] = {
"count": count,
"progress": progress,
"size": size,
"spinner": reports.get(path, {}).get("spinner", 0)
}
threads = []
generator = Thread(target=generator, args=(job_queue, files, weights, cache))
generator.start()
for _ in range(THREADS):
thread = Thread(target=worker, args=(job_queue, sums_queue), kwargs={"callback": report})
thread.start()
threads.append(thread)
sys.stdout.write("\033[?25l")
atexit.register(lambda: sys.stdout.write("\033[?25h"))
last_shown = None
successes = 0
failures = 0
while True:
try:
(path, checksum) = sums_queue.get(timeout=PROGRESS_DELAY)
except queue.Empty:
(path, checksum) = (None, None)
if last_shown != None and last_shown > 0:
sys.stdout.write(f"\033[{last_shown}A")
sys.stdout.write("\r")
sys.stdout.flush()
if path != None:
success = (
checksum != None
and len(checksum) == 64
and all(symbol in string.hexdigits for symbol in checksum)
)
sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
if success:
checksums.write(f"{checksum} {path}\n")
checksums.flush()
cache.add(path)
with open(CACHE_FILE, "w") as file:
json.dump(list(cache), file)
signal.signal(signal.SIGINT, sigint_handler)
if success:
successes += 1
else:
failures += 1
info = reports.pop(path)
info["spinner"] = None
show(path, info, prefix=("" if success else "\033[9m"))
shown = list(reports.items())[::-1] # Flipped so oldest are at the bottom
last_shown = len(shown)
for (path, info) in shown:
show(path, info)
info["spinner"] = (info["spinner"]+1) % len(SPINNER)
sys.stdout.write(f"\033[90mSuccess: \033[32m{successes} ")
sys.stdout.write(f"\033[90mFail: \033[31m{failures}\033[0m")
sys.stdout.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment