Created
March 7, 2026 21:03
-
-
Save cahna/e69e61fbb778eb7a3b00eae9257f3bd4 to your computer and use it in GitHub Desktop.
Find and handle duplicate files in a directory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Find and handle duplicate files in a directory. | |
| Usage: | |
| python find_duplicates.py [directory] [--dry-run] [--delete] [--pattern PATTERN] | |
| Options: | |
| --dry-run Only list duplicates, don't make any changes | |
| --delete Delete duplicates instead of renaming them with .duplicate suffix | |
| --pattern PATTERN Glob pattern to filter files (default: * for all files) | |
| """ | |
| import argparse | |
| import hashlib | |
| import os | |
| import sys | |
| from collections import defaultdict | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| def get_file_hash(filepath: Path, chunk_size: int = 8 * 1024 * 1024) -> str: | |
| """ | |
| Compute SHA256 hash of a file, reading in chunks for memory efficiency. | |
| Args: | |
| filepath: Path to the file | |
| chunk_size: Size of chunks to read (default 8MB, good for video files) | |
| Returns: | |
| Hex digest of the file's SHA256 hash | |
| """ | |
| sha256 = hashlib.sha256() | |
| with open(filepath, 'rb') as f: | |
| while chunk := f.read(chunk_size): | |
| sha256.update(chunk) | |
| return sha256.hexdigest() | |
| def get_partial_hash(filepath: Path, sample_size: int = 64 * 1024) -> str: | |
| """ | |
| Compute a quick partial hash using first and last chunks of a file. | |
| This is much faster than a full hash and catches most non-duplicates early. | |
| Files that match on partial hash still need a full hash to confirm. | |
| Args: | |
| filepath: Path to the file | |
| sample_size: Size of chunks to read from start and end (default 64KB) | |
| Returns: | |
| Hex digest of the partial hash | |
| """ | |
| file_size = filepath.stat().st_size | |
| sha256 = hashlib.sha256() | |
| with open(filepath, 'rb') as f: | |
| # Read from start | |
| sha256.update(f.read(sample_size)) | |
| # Read from end (if file is large enough) | |
| if file_size > sample_size * 2: | |
| f.seek(-sample_size, 2) # Seek from end | |
| sha256.update(f.read(sample_size)) | |
| return sha256.hexdigest() | |
| def hash_file_with_info(filepath: Path, full: bool = False) -> tuple[Path, str | None]: | |
| """ | |
| Hash a file and return the result with the filepath. | |
| Designed to be used with ThreadPoolExecutor. | |
| Args: | |
| filepath: Path to the file | |
| full: If True, compute full hash; otherwise compute partial hash | |
| Returns: | |
| Tuple of (filepath, hash) or (filepath, None) if error | |
| """ | |
| try: | |
| if full: | |
| return (filepath, get_file_hash(filepath)) | |
| else: | |
| return (filepath, get_partial_hash(filepath)) | |
| except (IOError, OSError): | |
| return (filepath, None) | |
| def find_duplicates( | |
| directory: Path, | |
| pattern: str = "*", | |
| workers: int = 4 | |
| ) -> dict[str, list[Path]]: | |
| """ | |
| Find duplicate files in the given directory. | |
| Uses a three-phase approach for efficiency: | |
| 1. Group files by size (duplicates must have same size) | |
| 2. Partial hash to quickly eliminate non-duplicates (parallel) | |
| 3. Full hash only for files matching on partial hash (parallel) | |
| Args: | |
| directory: Directory to search for duplicates | |
| pattern: Glob pattern to filter files (default: * for all files) | |
| workers: Number of parallel workers for hashing (default: 4) | |
| Returns: | |
| Dictionary mapping hash -> list of duplicate file paths | |
| Only includes entries with 2+ files (actual duplicates) | |
| """ | |
| print("Phase 1: Grouping files by size...") | |
| size_groups: dict[int, list[Path]] = defaultdict(list) | |
| matched_files = [f for f in directory.glob(pattern) if f.is_file()] | |
| total_files = len(matched_files) | |
| print(f"Found {total_files} files matching '{pattern}'") | |
| for filepath in matched_files: | |
| size = filepath.stat().st_size | |
| size_groups[size].append(filepath) | |
| potential_duplicates = { | |
| size: files for size, files in size_groups.items() if len(files) > 1 | |
| } | |
| files_to_check = sum(len(files) for files in potential_duplicates.values()) | |
| print(f"Found {len(potential_duplicates)} size groups with {files_to_check} files to check") | |
| if files_to_check == 0: | |
| print("No potential duplicates found based on file size.") | |
| return {} | |
| print(f"\nPhase 2: Quick partial hashing ({workers} workers)...") | |
| partial_hash_groups: dict[str, list[Path]] = defaultdict(list) | |
| all_files = [f for files in potential_duplicates.values() for f in files] | |
| with ThreadPoolExecutor(max_workers=workers) as executor: | |
| futures = { | |
| executor.submit(hash_file_with_info, f, False): f | |
| for f in all_files | |
| } | |
| completed = 0 | |
| for future in as_completed(futures): | |
| completed += 1 | |
| filepath, partial_hash = future.result() | |
| if partial_hash: | |
| partial_hash_groups[partial_hash].append(filepath) | |
| else: | |
| print(f" Warning: Could not read {filepath}", file=sys.stderr) | |
| # Progress update every 10 files or at the end | |
| if completed % 10 == 0 or completed == len(futures): | |
| print(f" Partial hashed: {completed}/{len(futures)}", end='\r') | |
| print() | |
| partial_matches = { | |
| h: files for h, files in partial_hash_groups.items() if len(files) > 1 | |
| } | |
| files_to_full_hash = sum(len(files) for files in partial_matches.values()) | |
| print(f"Found {len(partial_matches)} partial hash groups with {files_to_full_hash} files needing full hash") | |
| if files_to_full_hash == 0: | |
| print("No duplicates found after partial hash comparison.") | |
| return {} | |
| print(f"\nPhase 3: Full hashing to confirm duplicates ({workers} workers)...") | |
| full_hash_groups: dict[str, list[Path]] = defaultdict(list) | |
| files_for_full_hash = [f for files in partial_matches.values() for f in files] | |
| with ThreadPoolExecutor(max_workers=workers) as executor: | |
| futures = { | |
| executor.submit(hash_file_with_info, f, True): f | |
| for f in files_for_full_hash | |
| } | |
| completed = 0 | |
| for future in as_completed(futures): | |
| completed += 1 | |
| filepath, full_hash = future.result() | |
| if full_hash: | |
| full_hash_groups[full_hash].append(filepath) | |
| else: | |
| print(f" Warning: Could not read {filepath}", file=sys.stderr) | |
| # Progress update every 5 files or at the end | |
| if completed % 5 == 0 or completed == len(futures): | |
| print(f" Full hashed: {completed}/{len(futures)}", end='\r') | |
| print() | |
| duplicates = { | |
| hash_val: sorted(files, key=lambda p: p.name) | |
| for hash_val, files in full_hash_groups.items() | |
| if len(files) > 1 | |
| } | |
| return duplicates | |
| def format_size(size_bytes: int) -> str: | |
| """Format file size in human-readable format.""" | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if size_bytes < 1024: | |
| return f"{size_bytes:.1f} {unit}" | |
| size_bytes /= 1024 | |
| return f"{size_bytes:.1f} TB" | |
| def print_duplicates(duplicates: dict[str, list[Path]]) -> None: | |
| """Print a summary of found duplicates.""" | |
| if not duplicates: | |
| print("\nNo duplicates found.") | |
| return | |
| total_duplicate_files = sum(len(files) - 1 for files in duplicates.values()) | |
| total_wasted_space = sum( | |
| (len(files) - 1) * files[0].stat().st_size | |
| for files in duplicates.values() | |
| ) | |
| print(f"\n{'='*60}") | |
| print(f"DUPLICATE SUMMARY") | |
| print(f"{'='*60}") | |
| print(f"Duplicate groups found: {len(duplicates)}") | |
| print(f"Total duplicate files: {total_duplicate_files}") | |
| print(f"Wasted space: {format_size(total_wasted_space)}") | |
| print(f"{'='*60}\n") | |
| for i, (hash_val, files) in enumerate(duplicates.items(), 1): | |
| file_size = format_size(files[0].stat().st_size) | |
| print(f"Group {i} ({len(files)} files, {file_size} each, hash: {hash_val[:12]}...):") | |
| print(f" [KEEP] {files[0].name}") | |
| for dup_file in files[1:]: | |
| print(f" [DUPLICATE] {dup_file.name}") | |
| print() | |
| def handle_duplicates(duplicates: dict[str, list[Path]], delete: bool) -> tuple[int, int]: | |
| """ | |
| Handle duplicate files by either renaming or deleting them. | |
| For each group of duplicates, keeps the first file (alphabetically) | |
| and processes the rest. | |
| Args: | |
| duplicates: Dictionary of hash -> list of duplicate files | |
| delete: If True, delete duplicates; if False, rename with .duplicate suffix | |
| Returns: | |
| Tuple of (success_count, error_count) | |
| """ | |
| action = "Deleting" if delete else "Renaming" | |
| success_count = 0 | |
| error_count = 0 | |
| for hash_val, files in duplicates.items(): | |
| # Keep the first file (sorted alphabetically), process the rest | |
| for dup_file in files[1:]: | |
| try: | |
| if delete: | |
| print(f" Deleting: {dup_file.name}") | |
| dup_file.unlink() | |
| else: | |
| new_name = dup_file.with_suffix(dup_file.suffix + ".duplicate") | |
| print(f" Renaming: {dup_file.name} -> {new_name.name}") | |
| dup_file.rename(new_name) | |
| success_count += 1 | |
| except OSError as e: | |
| print(f" Error processing {dup_file}: {e}", file=sys.stderr) | |
| error_count += 1 | |
| return success_count, error_count | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Find and handle duplicate files in a directory.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s # Find duplicates in current dir, rename with .duplicate | |
| %(prog)s --dry-run # Only list duplicates, don't change anything | |
| %(prog)s /path/to/videos # Search in specific directory | |
| %(prog)s --delete # Delete duplicates instead of renaming | |
| %(prog)s --dry-run --delete # Preview what would be deleted | |
| %(prog)s --pattern "*.mp4" # Only check .mp4 files | |
| %(prog)s --workers 8 # Use 8 parallel workers for faster hashing | |
| """ | |
| ) | |
| parser.add_argument( | |
| "directory", | |
| nargs="?", | |
| default=".", | |
| help="Directory to search for duplicates (default: current directory)" | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Only list duplicates without making any changes" | |
| ) | |
| parser.add_argument( | |
| "--delete", | |
| action="store_true", | |
| help="Delete duplicates instead of renaming them with .duplicate suffix" | |
| ) | |
| parser.add_argument( | |
| "--pattern", | |
| default="*", | |
| help="Glob pattern to filter files (default: * for all files, e.g., '*.mp4', '*.jpg')" | |
| ) | |
| parser.add_argument( | |
| "--workers", | |
| type=int, | |
| default=4, | |
| help="Number of parallel workers for hashing (default: 4)" | |
| ) | |
| args = parser.parse_args() | |
| directory = Path(args.directory).resolve() | |
| if not directory.is_dir(): | |
| print(f"Error: '{directory}' is not a valid directory", file=sys.stderr) | |
| sys.exit(1) | |
| pattern_desc = f"matching '{args.pattern}'" if args.pattern != "*" else "" | |
| print(f"Searching for duplicate files {pattern_desc} in: {directory}\n") | |
| duplicates = find_duplicates(directory, args.pattern, args.workers) | |
| print_duplicates(duplicates) | |
| if not duplicates: | |
| sys.exit(0) | |
| # Handle duplicates based on flags | |
| if args.dry_run: | |
| action = "deleted" if args.delete else "renamed" | |
| total = sum(len(files) - 1 for files in duplicates.values()) | |
| print(f"[DRY RUN] Would have {action} {total} duplicate file(s).") | |
| print("Run without --dry-run to apply changes.") | |
| else: | |
| action = "Deleting" if args.delete else "Renaming" | |
| print(f"{action} duplicate files...\n") | |
| success, errors = handle_duplicates(duplicates, args.delete) | |
| print(f"\nDone! Successfully processed {success} file(s).") | |
| if errors: | |
| print(f"Encountered {errors} error(s).", file=sys.stderr) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment