Skip to content

Instantly share code, notes, and snippets.

@annawoodard
Created November 11, 2025 01:50
Show Gist options
  • Select an option

  • Save annawoodard/633082d7a70197b8ecfa73a61969d007 to your computer and use it in GitHub Desktop.

Select an option

Save annawoodard/633082d7a70197b8ecfa73a61969d007 to your computer and use it in GitHub Desktop.
import os
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Callable, Sequence
PROGRESS_BAR_WIDTH = 28
__all__ = ["parallel_rsync_copy"]
def _format_interval(seconds: float) -> str:
seconds = max(0, int(round(seconds)))
minutes, sec = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours:d}:{minutes:02d}:{sec:02d}"
return f"{minutes:02d}:{sec:02d}"
def _format_progress_bar(fraction: float, width: int = PROGRESS_BAR_WIDTH) -> str:
frac = max(0.0, min(1.0, float(fraction)))
filled = int(round(frac * width))
filled = min(filled, width)
empty = max(width - filled, 0)
return "[" + ("#" * filled) + ("-" * empty) + "]"
def _format_gb(num_bytes: float | int) -> str:
gb = float(num_bytes) / 1_000_000_000
return f"{gb:.1f} GB"
def _dir_size_bytes(path: Path) -> int:
if not path.exists():
return 0
try:
if path.is_file():
return path.stat().st_size
except OSError:
return 0
total = 0
for dirpath, _, filenames in os.walk(path):
for filename in filenames:
try:
total += os.path.getsize(os.path.join(dirpath, filename))
except OSError:
continue
return total
def _rsync_base_cmd(delete: bool) -> list[str]:
cmd = [
"rsync",
"-a",
"--partial",
"--partial-dir=.rsync-partial",
"--whole-file",
]
if delete:
cmd.append("--delete")
return cmd
def _run_rsync(cmd: list[str]) -> None:
subprocess.run(cmd, check=True)
def parallel_rsync_copy(
*,
src_root: Path,
dest_root: Path,
entries: Sequence[Path],
workers: int = 1,
log: Callable[[str], None] = print,
log_prefix: str = "[parallel-rsync]",
progress_interval: float = 5.0,
delete_dirs: bool = True,
) -> None:
"""
Copy multiple relative paths from src_root to dest_root in parallel using rsync.
Args:
src_root: Source directory that owns the relative entries.
dest_root: Target directory to mirror.
entries: Iterable of relative paths (files or directories) to copy.
workers: Maximum concurrent rsync processes.
log: Logging callable (defaults to print).
log_prefix: Prefix attached to helper log lines.
progress_interval: Minimum seconds between progress updates.
delete_dirs: When True, directory copies include `--delete` so the
destination folder mirrors the source by removing stale files.
"""
def emit(message: str) -> None:
log(f"{log_prefix} {message}")
rel_entries = [Path(entry) for entry in entries]
if not rel_entries:
emit("no entries to copy; skipping parallel rsync.")
return
workers = max(1, workers)
label = f"{src_root} -> {dest_root}"
emit(
f"parallel rsync fan-out: {len(rel_entries)} entries "
f"with {workers} workers ({label})"
)
entry_sizes: dict[str, int] = {}
total_entry_bytes = 0
for rel in rel_entries:
abs_entry = src_root / rel
rel_key = rel.as_posix()
size_bytes = _dir_size_bytes(abs_entry)
entry_sizes[rel_key] = size_bytes
total_entry_bytes += size_bytes
base_dir_cmd = _rsync_base_cmd(delete_dirs)
base_file_cmd = _rsync_base_cmd(delete=False)
def _sync_entry(rel_path: Path) -> None:
abs_src = src_root / rel_path
abs_dest = dest_root / rel_path
if abs_src.is_dir():
abs_dest.mkdir(parents=True, exist_ok=True)
cmd = list(base_dir_cmd)
cmd.extend([f"{abs_src}/", f"{abs_dest}/"])
_run_rsync(cmd)
return
abs_dest.parent.mkdir(parents=True, exist_ok=True)
cmd = list(base_file_cmd)
cmd.extend([str(abs_src), f"{abs_dest.parent}/"])
_run_rsync(cmd)
errors: list[tuple[Path, Exception]] = []
start = time.monotonic()
completed_bytes = 0
completed_entries = 0
last_progress_log = start
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(_sync_entry, entry): entry for entry in rel_entries}
for future in as_completed(futures):
rel = futures[future]
try:
future.result()
except Exception as exc:
errors.append((rel, exc))
continue
completed_entries += 1
completed_bytes += entry_sizes.get(rel.as_posix(), 0)
now = time.monotonic()
if (
now - last_progress_log
) < progress_interval and completed_entries < len(rel_entries):
continue
last_progress_log = now
elapsed = now - start
if total_entry_bytes > 0:
fraction = completed_bytes / max(total_entry_bytes, 1)
pct = min(100.0, fraction * 100.0)
bar = _format_progress_bar(fraction)
eta = _format_interval(
elapsed
* max(0.0, total_entry_bytes - completed_bytes)
/ max(completed_bytes, 1)
)
emit(
f"progress {bar} {pct:5.1f}% "
f"{_format_gb(completed_bytes)}/"
f"{_format_gb(total_entry_bytes)} "
f"elapsed={_format_interval(elapsed)} ETA≈{eta}"
)
else:
fraction = completed_entries / max(len(rel_entries), 1)
bar = _format_progress_bar(fraction)
eta = _format_interval(
elapsed
* max(0.0, len(rel_entries) - completed_entries)
/ max(completed_entries, 1)
)
emit(
f"progress {bar} "
f"{completed_entries}/{len(rel_entries)} entries "
f"elapsed={_format_interval(elapsed)} ETA≈{eta}"
)
if errors:
details = "; ".join(f"{rel}: {err}" for rel, err in errors)
first_exc = errors[0][1]
raise RuntimeError(f"parallel rsync copy failed: {details}") from first_exc
elapsed = time.monotonic() - start
emit(f"parallel rsync complete in {elapsed:.1f}s ({label})")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment