Created
November 11, 2025 01:50
-
-
Save annawoodard/633082d7a70197b8ecfa73a61969d007 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import subprocess | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from typing import Callable, Sequence | |
| PROGRESS_BAR_WIDTH = 28 | |
| __all__ = ["parallel_rsync_copy"] | |
| def _format_interval(seconds: float) -> str: | |
| seconds = max(0, int(round(seconds))) | |
| minutes, sec = divmod(seconds, 60) | |
| hours, minutes = divmod(minutes, 60) | |
| if hours: | |
| return f"{hours:d}:{minutes:02d}:{sec:02d}" | |
| return f"{minutes:02d}:{sec:02d}" | |
| def _format_progress_bar(fraction: float, width: int = PROGRESS_BAR_WIDTH) -> str: | |
| frac = max(0.0, min(1.0, float(fraction))) | |
| filled = int(round(frac * width)) | |
| filled = min(filled, width) | |
| empty = max(width - filled, 0) | |
| return "[" + ("#" * filled) + ("-" * empty) + "]" | |
| def _format_gb(num_bytes: float | int) -> str: | |
| gb = float(num_bytes) / 1_000_000_000 | |
| return f"{gb:.1f} GB" | |
| def _dir_size_bytes(path: Path) -> int: | |
| if not path.exists(): | |
| return 0 | |
| try: | |
| if path.is_file(): | |
| return path.stat().st_size | |
| except OSError: | |
| return 0 | |
| total = 0 | |
| for dirpath, _, filenames in os.walk(path): | |
| for filename in filenames: | |
| try: | |
| total += os.path.getsize(os.path.join(dirpath, filename)) | |
| except OSError: | |
| continue | |
| return total | |
| def _rsync_base_cmd(delete: bool) -> list[str]: | |
| cmd = [ | |
| "rsync", | |
| "-a", | |
| "--partial", | |
| "--partial-dir=.rsync-partial", | |
| "--whole-file", | |
| ] | |
| if delete: | |
| cmd.append("--delete") | |
| return cmd | |
| def _run_rsync(cmd: list[str]) -> None: | |
| subprocess.run(cmd, check=True) | |
| def parallel_rsync_copy( | |
| *, | |
| src_root: Path, | |
| dest_root: Path, | |
| entries: Sequence[Path], | |
| workers: int = 1, | |
| log: Callable[[str], None] = print, | |
| log_prefix: str = "[parallel-rsync]", | |
| progress_interval: float = 5.0, | |
| delete_dirs: bool = True, | |
| ) -> None: | |
| """ | |
| Copy multiple relative paths from src_root to dest_root in parallel using rsync. | |
| Args: | |
| src_root: Source directory that owns the relative entries. | |
| dest_root: Target directory to mirror. | |
| entries: Iterable of relative paths (files or directories) to copy. | |
| workers: Maximum concurrent rsync processes. | |
| log: Logging callable (defaults to print). | |
| log_prefix: Prefix attached to helper log lines. | |
| progress_interval: Minimum seconds between progress updates. | |
| delete_dirs: When True, directory copies include `--delete` so the | |
| destination folder mirrors the source by removing stale files. | |
| """ | |
| def emit(message: str) -> None: | |
| log(f"{log_prefix} {message}") | |
| rel_entries = [Path(entry) for entry in entries] | |
| if not rel_entries: | |
| emit("no entries to copy; skipping parallel rsync.") | |
| return | |
| workers = max(1, workers) | |
| label = f"{src_root} -> {dest_root}" | |
| emit( | |
| f"parallel rsync fan-out: {len(rel_entries)} entries " | |
| f"with {workers} workers ({label})" | |
| ) | |
| entry_sizes: dict[str, int] = {} | |
| total_entry_bytes = 0 | |
| for rel in rel_entries: | |
| abs_entry = src_root / rel | |
| rel_key = rel.as_posix() | |
| size_bytes = _dir_size_bytes(abs_entry) | |
| entry_sizes[rel_key] = size_bytes | |
| total_entry_bytes += size_bytes | |
| base_dir_cmd = _rsync_base_cmd(delete_dirs) | |
| base_file_cmd = _rsync_base_cmd(delete=False) | |
| def _sync_entry(rel_path: Path) -> None: | |
| abs_src = src_root / rel_path | |
| abs_dest = dest_root / rel_path | |
| if abs_src.is_dir(): | |
| abs_dest.mkdir(parents=True, exist_ok=True) | |
| cmd = list(base_dir_cmd) | |
| cmd.extend([f"{abs_src}/", f"{abs_dest}/"]) | |
| _run_rsync(cmd) | |
| return | |
| abs_dest.parent.mkdir(parents=True, exist_ok=True) | |
| cmd = list(base_file_cmd) | |
| cmd.extend([str(abs_src), f"{abs_dest.parent}/"]) | |
| _run_rsync(cmd) | |
| errors: list[tuple[Path, Exception]] = [] | |
| start = time.monotonic() | |
| completed_bytes = 0 | |
| completed_entries = 0 | |
| last_progress_log = start | |
| with ThreadPoolExecutor(max_workers=workers) as executor: | |
| futures = {executor.submit(_sync_entry, entry): entry for entry in rel_entries} | |
| for future in as_completed(futures): | |
| rel = futures[future] | |
| try: | |
| future.result() | |
| except Exception as exc: | |
| errors.append((rel, exc)) | |
| continue | |
| completed_entries += 1 | |
| completed_bytes += entry_sizes.get(rel.as_posix(), 0) | |
| now = time.monotonic() | |
| if ( | |
| now - last_progress_log | |
| ) < progress_interval and completed_entries < len(rel_entries): | |
| continue | |
| last_progress_log = now | |
| elapsed = now - start | |
| if total_entry_bytes > 0: | |
| fraction = completed_bytes / max(total_entry_bytes, 1) | |
| pct = min(100.0, fraction * 100.0) | |
| bar = _format_progress_bar(fraction) | |
| eta = _format_interval( | |
| elapsed | |
| * max(0.0, total_entry_bytes - completed_bytes) | |
| / max(completed_bytes, 1) | |
| ) | |
| emit( | |
| f"progress {bar} {pct:5.1f}% " | |
| f"{_format_gb(completed_bytes)}/" | |
| f"{_format_gb(total_entry_bytes)} " | |
| f"elapsed={_format_interval(elapsed)} ETA≈{eta}" | |
| ) | |
| else: | |
| fraction = completed_entries / max(len(rel_entries), 1) | |
| bar = _format_progress_bar(fraction) | |
| eta = _format_interval( | |
| elapsed | |
| * max(0.0, len(rel_entries) - completed_entries) | |
| / max(completed_entries, 1) | |
| ) | |
| emit( | |
| f"progress {bar} " | |
| f"{completed_entries}/{len(rel_entries)} entries " | |
| f"elapsed={_format_interval(elapsed)} ETA≈{eta}" | |
| ) | |
| if errors: | |
| details = "; ".join(f"{rel}: {err}" for rel, err in errors) | |
| first_exc = errors[0][1] | |
| raise RuntimeError(f"parallel rsync copy failed: {details}") from first_exc | |
| elapsed = time.monotonic() - start | |
| emit(f"parallel rsync complete in {elapsed:.1f}s ({label})") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment