Skip to content

Instantly share code, notes, and snippets.

@cahna
Created March 7, 2026 21:03
Show Gist options
  • Select an option

  • Save cahna/e69e61fbb778eb7a3b00eae9257f3bd4 to your computer and use it in GitHub Desktop.

Select an option

Save cahna/e69e61fbb778eb7a3b00eae9257f3bd4 to your computer and use it in GitHub Desktop.
Find and handle duplicate files in a directory
#!/usr/bin/env python3
"""
Find and handle duplicate files in a directory.
Usage:
python find_duplicates.py [directory] [--dry-run] [--delete] [--pattern PATTERN]
Options:
--dry-run Only list duplicates, don't make any changes
--delete Delete duplicates instead of renaming them with .duplicate suffix
--pattern PATTERN Glob pattern to filter files (default: * for all files)
"""
import argparse
import hashlib
import os
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
def get_file_hash(filepath: Path, chunk_size: int = 8 * 1024 * 1024) -> str:
"""
Compute SHA256 hash of a file, reading in chunks for memory efficiency.
Args:
filepath: Path to the file
chunk_size: Size of chunks to read (default 8MB, good for video files)
Returns:
Hex digest of the file's SHA256 hash
"""
sha256 = hashlib.sha256()
with open(filepath, 'rb') as f:
while chunk := f.read(chunk_size):
sha256.update(chunk)
return sha256.hexdigest()
def get_partial_hash(filepath: Path, sample_size: int = 64 * 1024) -> str:
"""
Compute a quick partial hash using first and last chunks of a file.
This is much faster than a full hash and catches most non-duplicates early.
Files that match on partial hash still need a full hash to confirm.
Args:
filepath: Path to the file
sample_size: Size of chunks to read from start and end (default 64KB)
Returns:
Hex digest of the partial hash
"""
file_size = filepath.stat().st_size
sha256 = hashlib.sha256()
with open(filepath, 'rb') as f:
# Read from start
sha256.update(f.read(sample_size))
# Read from end (if file is large enough)
if file_size > sample_size * 2:
f.seek(-sample_size, 2) # Seek from end
sha256.update(f.read(sample_size))
return sha256.hexdigest()
def hash_file_with_info(filepath: Path, full: bool = False) -> tuple[Path, str | None]:
"""
Hash a file and return the result with the filepath.
Designed to be used with ThreadPoolExecutor.
Args:
filepath: Path to the file
full: If True, compute full hash; otherwise compute partial hash
Returns:
Tuple of (filepath, hash) or (filepath, None) if error
"""
try:
if full:
return (filepath, get_file_hash(filepath))
else:
return (filepath, get_partial_hash(filepath))
except (IOError, OSError):
return (filepath, None)
def find_duplicates(
directory: Path,
pattern: str = "*",
workers: int = 4
) -> dict[str, list[Path]]:
"""
Find duplicate files in the given directory.
Uses a three-phase approach for efficiency:
1. Group files by size (duplicates must have same size)
2. Partial hash to quickly eliminate non-duplicates (parallel)
3. Full hash only for files matching on partial hash (parallel)
Args:
directory: Directory to search for duplicates
pattern: Glob pattern to filter files (default: * for all files)
workers: Number of parallel workers for hashing (default: 4)
Returns:
Dictionary mapping hash -> list of duplicate file paths
Only includes entries with 2+ files (actual duplicates)
"""
print("Phase 1: Grouping files by size...")
size_groups: dict[int, list[Path]] = defaultdict(list)
matched_files = [f for f in directory.glob(pattern) if f.is_file()]
total_files = len(matched_files)
print(f"Found {total_files} files matching '{pattern}'")
for filepath in matched_files:
size = filepath.stat().st_size
size_groups[size].append(filepath)
potential_duplicates = {
size: files for size, files in size_groups.items() if len(files) > 1
}
files_to_check = sum(len(files) for files in potential_duplicates.values())
print(f"Found {len(potential_duplicates)} size groups with {files_to_check} files to check")
if files_to_check == 0:
print("No potential duplicates found based on file size.")
return {}
print(f"\nPhase 2: Quick partial hashing ({workers} workers)...")
partial_hash_groups: dict[str, list[Path]] = defaultdict(list)
all_files = [f for files in potential_duplicates.values() for f in files]
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {
executor.submit(hash_file_with_info, f, False): f
for f in all_files
}
completed = 0
for future in as_completed(futures):
completed += 1
filepath, partial_hash = future.result()
if partial_hash:
partial_hash_groups[partial_hash].append(filepath)
else:
print(f" Warning: Could not read {filepath}", file=sys.stderr)
# Progress update every 10 files or at the end
if completed % 10 == 0 or completed == len(futures):
print(f" Partial hashed: {completed}/{len(futures)}", end='\r')
print()
partial_matches = {
h: files for h, files in partial_hash_groups.items() if len(files) > 1
}
files_to_full_hash = sum(len(files) for files in partial_matches.values())
print(f"Found {len(partial_matches)} partial hash groups with {files_to_full_hash} files needing full hash")
if files_to_full_hash == 0:
print("No duplicates found after partial hash comparison.")
return {}
print(f"\nPhase 3: Full hashing to confirm duplicates ({workers} workers)...")
full_hash_groups: dict[str, list[Path]] = defaultdict(list)
files_for_full_hash = [f for files in partial_matches.values() for f in files]
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {
executor.submit(hash_file_with_info, f, True): f
for f in files_for_full_hash
}
completed = 0
for future in as_completed(futures):
completed += 1
filepath, full_hash = future.result()
if full_hash:
full_hash_groups[full_hash].append(filepath)
else:
print(f" Warning: Could not read {filepath}", file=sys.stderr)
# Progress update every 5 files or at the end
if completed % 5 == 0 or completed == len(futures):
print(f" Full hashed: {completed}/{len(futures)}", end='\r')
print()
duplicates = {
hash_val: sorted(files, key=lambda p: p.name)
for hash_val, files in full_hash_groups.items()
if len(files) > 1
}
return duplicates
def format_size(size_bytes: int) -> str:
"""Format file size in human-readable format."""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} TB"
def print_duplicates(duplicates: dict[str, list[Path]]) -> None:
"""Print a summary of found duplicates."""
if not duplicates:
print("\nNo duplicates found.")
return
total_duplicate_files = sum(len(files) - 1 for files in duplicates.values())
total_wasted_space = sum(
(len(files) - 1) * files[0].stat().st_size
for files in duplicates.values()
)
print(f"\n{'='*60}")
print(f"DUPLICATE SUMMARY")
print(f"{'='*60}")
print(f"Duplicate groups found: {len(duplicates)}")
print(f"Total duplicate files: {total_duplicate_files}")
print(f"Wasted space: {format_size(total_wasted_space)}")
print(f"{'='*60}\n")
for i, (hash_val, files) in enumerate(duplicates.items(), 1):
file_size = format_size(files[0].stat().st_size)
print(f"Group {i} ({len(files)} files, {file_size} each, hash: {hash_val[:12]}...):")
print(f" [KEEP] {files[0].name}")
for dup_file in files[1:]:
print(f" [DUPLICATE] {dup_file.name}")
print()
def handle_duplicates(duplicates: dict[str, list[Path]], delete: bool) -> tuple[int, int]:
"""
Handle duplicate files by either renaming or deleting them.
For each group of duplicates, keeps the first file (alphabetically)
and processes the rest.
Args:
duplicates: Dictionary of hash -> list of duplicate files
delete: If True, delete duplicates; if False, rename with .duplicate suffix
Returns:
Tuple of (success_count, error_count)
"""
action = "Deleting" if delete else "Renaming"
success_count = 0
error_count = 0
for hash_val, files in duplicates.items():
# Keep the first file (sorted alphabetically), process the rest
for dup_file in files[1:]:
try:
if delete:
print(f" Deleting: {dup_file.name}")
dup_file.unlink()
else:
new_name = dup_file.with_suffix(dup_file.suffix + ".duplicate")
print(f" Renaming: {dup_file.name} -> {new_name.name}")
dup_file.rename(new_name)
success_count += 1
except OSError as e:
print(f" Error processing {dup_file}: {e}", file=sys.stderr)
error_count += 1
return success_count, error_count
def main():
parser = argparse.ArgumentParser(
description="Find and handle duplicate files in a directory.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # Find duplicates in current dir, rename with .duplicate
%(prog)s --dry-run # Only list duplicates, don't change anything
%(prog)s /path/to/videos # Search in specific directory
%(prog)s --delete # Delete duplicates instead of renaming
%(prog)s --dry-run --delete # Preview what would be deleted
%(prog)s --pattern "*.mp4" # Only check .mp4 files
%(prog)s --workers 8 # Use 8 parallel workers for faster hashing
"""
)
parser.add_argument(
"directory",
nargs="?",
default=".",
help="Directory to search for duplicates (default: current directory)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Only list duplicates without making any changes"
)
parser.add_argument(
"--delete",
action="store_true",
help="Delete duplicates instead of renaming them with .duplicate suffix"
)
parser.add_argument(
"--pattern",
default="*",
help="Glob pattern to filter files (default: * for all files, e.g., '*.mp4', '*.jpg')"
)
parser.add_argument(
"--workers",
type=int,
default=4,
help="Number of parallel workers for hashing (default: 4)"
)
args = parser.parse_args()
directory = Path(args.directory).resolve()
if not directory.is_dir():
print(f"Error: '{directory}' is not a valid directory", file=sys.stderr)
sys.exit(1)
pattern_desc = f"matching '{args.pattern}'" if args.pattern != "*" else ""
print(f"Searching for duplicate files {pattern_desc} in: {directory}\n")
duplicates = find_duplicates(directory, args.pattern, args.workers)
print_duplicates(duplicates)
if not duplicates:
sys.exit(0)
# Handle duplicates based on flags
if args.dry_run:
action = "deleted" if args.delete else "renamed"
total = sum(len(files) - 1 for files in duplicates.values())
print(f"[DRY RUN] Would have {action} {total} duplicate file(s).")
print("Run without --dry-run to apply changes.")
else:
action = "Deleting" if args.delete else "Renaming"
print(f"{action} duplicate files...\n")
success, errors = handle_duplicates(duplicates, args.delete)
print(f"\nDone! Successfully processed {success} file(s).")
if errors:
print(f"Encountered {errors} error(s).", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment