Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save JyotinderSingh/ee5aaa6a3b23b03fcaccbb831ea03006 to your computer and use it in GitHub Desktop.

Select an option

Save JyotinderSingh/ee5aaa6a3b23b03fcaccbb831ea03006 to your computer and use it in GitHub Desktop.
`Data()` Benchmarking
#!/usr/bin/env python3
"""One-off benchmark for Data.content_hash().
Creates temporary datasets on disk, then times content_hash().
Usage:
python bench_content_hash.py
python bench_content_hash.py --files 50000 --total-mb 500
"""
import argparse
import os
import shutil
import tempfile
import time
from keras_remote.data import Data
def create_dataset(base_dir: str, num_files: int, total_bytes: int) -> str:
"""Create a flat directory with num_files files totaling total_bytes."""
dataset_dir = os.path.join(base_dir, "dataset")
os.makedirs(dataset_dir)
file_size = max(1, total_bytes // num_files)
# Pre-generate one block of random data and reuse it (we're benchmarking
# hashing speed, not randomness).
block = os.urandom(min(file_size, 65536))
for i in range(num_files):
fpath = os.path.join(dataset_dir, f"file_{i:07d}.bin")
with open(fpath, "wb") as f:
remaining = file_size
while remaining > 0:
chunk = block[:remaining]
f.write(chunk)
remaining -= len(chunk)
return dataset_dir
def time_it(fn, *args, warmup: int = 1, runs: int = 3):
"""Time fn over multiple runs, return (median_seconds, result)."""
for _ in range(warmup):
result = fn(*args)
times = []
for _ in range(runs):
t0 = time.perf_counter()
result = fn(*args)
times.append(time.perf_counter() - t0)
times.sort()
median = times[len(times) // 2]
return median, result
def format_size(nbytes: int) -> str:
if nbytes >= 1 << 30:
return f"{nbytes / (1 << 30):.1f} GB"
if nbytes >= 1 << 20:
return f"{nbytes / (1 << 20):.1f} MB"
if nbytes >= 1 << 10:
return f"{nbytes / (1 << 10):.1f} KB"
return f"{nbytes} B"
def run_benchmark(num_files: int, total_mb: int, runs: int):
total_bytes = total_mb * 1024 * 1024
file_size = total_bytes // num_files
print(f"\n{'=' * 60}")
print(f" {num_files:,} files x {format_size(file_size)} each "
f"= {format_size(total_bytes)} total")
print(f" CPUs: {os.cpu_count()}, "
f"runs: {runs} (median reported)")
print(f"{'=' * 60}")
tmpdir = tempfile.mkdtemp(prefix="bench_hash_")
try:
print(" Creating dataset...", end=" ", flush=True)
t0 = time.perf_counter()
dataset_dir = create_dataset(tmpdir, num_files, total_bytes)
print(f"done ({time.perf_counter() - t0:.1f}s)")
d = Data(dataset_dir)
print(" Hashing...", end=" ", flush=True)
elapsed, h = time_it(d.content_hash, warmup=1, runs=runs)
throughput = total_bytes / elapsed / (1 << 20)
print(f"{elapsed:.3f}s ({throughput:.0f} MB/s) hash={h[:12]}")
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def main():
parser = argparse.ArgumentParser(
description="Benchmark Data.content_hash() performance"
)
parser.add_argument(
"--files", type=int, default=None,
help="Number of files (overrides preset scenarios)",
)
parser.add_argument(
"--total-mb", type=int, default=500,
help="Total dataset size in MB (default: 500)",
)
parser.add_argument(
"--runs", type=int, default=3,
help="Number of timed runs per method (default: 3)",
)
args = parser.parse_args()
print(f"System: {os.cpu_count()} CPUs")
if args.files is not None:
run_benchmark(args.files, args.total_mb, args.runs)
else:
scenarios = [
# (num_files, total_mb)
# (10, 500), # Few large files
# (1_000, 500), # Medium count
# (50_000, 500), # Many small files
# (200_000, 500), # Lots of small files
(1_000_000, 20000), # Extreme case
]
for num_files, total_mb in scenarios:
run_benchmark(num_files, total_mb, args.runs)
print()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment