|
#!/usr/bin/env python3 |
|
""" |
|
Frigate Data Cleanup Script |
|
|
|
Identifies and optionally removes orphaned files in Frigate's data |
|
directory that are not referenced in the database. |
|
|
|
Usage: |
|
frigate_cleanup.py <data_dir> [options] |
|
|
|
Options: |
|
-v, --verbose Show detailed file listings |
|
--unowned-only Show only unowned files |
|
--delete Actually delete unowned files |
|
--db-path PATH Path to frigate.db (default: <data_dir>/../config/frigate.db) # noqa: E501 |
|
""" |
|
|
|
import argparse |
|
import os |
|
import sqlite3 |
|
import sys |
|
from collections import defaultdict |
|
from dataclasses import dataclass |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Dict, List, Set, Tuple |
|
|
|
|
|
@dataclass |
|
class FileInfo: |
|
"""Information about a file on disk.""" |
|
|
|
path: Path |
|
size: int |
|
mtime: datetime |
|
camera: str | None |
|
|
|
|
|
def format_size(size_bytes: int) -> str: |
|
"""Format bytes as human-readable size.""" |
|
for unit in ["B", "KB", "MB", "GB", "TB"]: |
|
if size_bytes < 1024.0: |
|
return f"{size_bytes:.2f} {unit}" |
|
size_bytes /= 1024.0 |
|
return f"{size_bytes:.2f} PB" |
|
|
|
|
|
def get_db_recordings(db_path: Path) -> Set[str]: |
|
"""Get all recording paths from the database.""" |
|
if not db_path.exists(): |
|
print(f"Error: Database not found at {db_path}", file=sys.stderr) |
|
sys.exit(1) |
|
|
|
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) |
|
cursor = conn.cursor() |
|
|
|
try: |
|
# Get all paths from recordings table |
|
cursor.execute("SELECT path FROM recordings") |
|
paths = {row[0] for row in cursor.fetchall()} |
|
except sqlite3.OperationalError as e: |
|
print(f"Error reading database: {e}", file=sys.stderr) |
|
sys.exit(1) |
|
finally: |
|
conn.close() |
|
|
|
return paths |
|
|
|
|
|
def get_db_snapshots(db_path: Path) -> Set[str]: |
|
"""Get all snapshot/thumbnail paths from the database.""" |
|
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) |
|
cursor = conn.cursor() |
|
|
|
paths = set() |
|
try: |
|
# Get thumbnail paths from events |
|
cursor.execute("SELECT thumbnail FROM events WHERE thumbnail IS NOT NULL") # noqa: E501 |
|
for row in cursor.fetchall(): |
|
if row[0]: |
|
paths.add(row[0]) |
|
|
|
# Get snapshot paths from events |
|
cursor.execute( |
|
"SELECT has_snapshot, camera, id FROM events WHERE has_snapshot = 1" |
|
) |
|
for row in cursor.fetchall(): |
|
# Snapshots are stored as clips/<camera>/<event_id>.jpg |
|
event_id = row[2] |
|
camera = row[1] |
|
paths.add(f"clips/{camera}/{event_id}.jpg") |
|
# Also check for clean snapshots |
|
paths.add(f"clips/{camera}/{event_id}-clean.png") |
|
|
|
except sqlite3.OperationalError: |
|
pass # Table might not exist in older versions |
|
finally: |
|
conn.close() |
|
|
|
return paths |
|
|
|
|
|
def scan_directory( |
|
data_dir: Path, db_recordings: Set[str], db_snapshots: Set[str] |
|
) -> Tuple[List[FileInfo], List[FileInfo]]: |
|
"""Scan data directory and classify files as owned or unowned.""" |
|
owned_files: List[FileInfo] = [] |
|
unowned_files: List[FileInfo] = [] |
|
|
|
# Scan recordings directory |
|
recordings_dir = data_dir / "recordings" |
|
if recordings_dir.exists(): |
|
for root, _, files in os.walk(recordings_dir): |
|
for filename in files: |
|
filepath = Path(root) / filename |
|
rel_path = filepath.relative_to(data_dir) |
|
rel_path_str = str(rel_path) |
|
|
|
# Extract camera name from path structure |
|
# recordings/YYYY-MM-DD/HH/<camera_name>/MM.SS.mp4 |
|
parts = rel_path.parts |
|
camera = parts[3] if len(parts) >= 5 else None |
|
|
|
stat = filepath.stat() |
|
file_info = FileInfo( |
|
path=filepath, |
|
size=stat.st_size, |
|
mtime=datetime.fromtimestamp(stat.st_mtime), |
|
camera=camera, |
|
) |
|
|
|
if rel_path_str in db_recordings: |
|
owned_files.append(file_info) |
|
else: |
|
unowned_files.append(file_info) |
|
|
|
# Scan clips/snapshots directory |
|
clips_dir = data_dir / "clips" |
|
if clips_dir.exists(): |
|
for root, _, files in os.walk(clips_dir): |
|
for filename in files: |
|
filepath = Path(root) / filename |
|
rel_path = filepath.relative_to(data_dir) |
|
rel_path_str = str(rel_path) |
|
|
|
# Extract camera name from clips/<camera>/<file> |
|
parts = rel_path.parts |
|
camera = parts[1] if len(parts) >= 3 else None |
|
|
|
stat = filepath.stat() |
|
file_info = FileInfo( |
|
path=filepath, |
|
size=stat.st_size, |
|
mtime=datetime.fromtimestamp(stat.st_mtime), |
|
camera=camera, |
|
) |
|
|
|
if rel_path_str in db_snapshots: |
|
owned_files.append(file_info) |
|
else: |
|
unowned_files.append(file_info) |
|
|
|
# Scan exports directory (these are always owned/wanted) |
|
exports_dir = data_dir / "exports" |
|
if exports_dir.exists(): |
|
for root, _, files in os.walk(exports_dir): |
|
for filename in files: |
|
filepath = Path(root) / filename |
|
stat = filepath.stat() |
|
file_info = FileInfo( |
|
path=filepath, |
|
size=stat.st_size, |
|
mtime=datetime.fromtimestamp(stat.st_mtime), |
|
camera=None, # Exports don't belong to specific cameras |
|
) |
|
owned_files.append(file_info) |
|
|
|
return owned_files, unowned_files |
|
|
|
|
|
def group_by_camera(files: List[FileInfo]) -> Dict[str, List[FileInfo]]: |
|
"""Group files by camera name.""" |
|
grouped: Dict[str, List[FileInfo]] = defaultdict(list) |
|
for file_info in files: |
|
camera = file_info.camera if file_info.camera else "unowned" |
|
grouped[camera].append(file_info) |
|
return dict(grouped) |
|
|
|
|
|
def print_summary( |
|
owned_files: List[FileInfo], |
|
unowned_files: List[FileInfo], |
|
unowned_only: bool, |
|
) -> None: |
|
"""Print summary of files by camera.""" |
|
if not unowned_only: |
|
owned_by_camera = group_by_camera(owned_files) |
|
print("Owned Files (in database):") |
|
print("-" * 70) |
|
|
|
total_size = 0 |
|
for camera in sorted(owned_by_camera.keys()): |
|
files = owned_by_camera[camera] |
|
camera_size = sum(f.size for f in files) |
|
total_size += camera_size |
|
print( |
|
f" {camera:20s}: {len(files):6d} files, {format_size(camera_size)}" # noqa: E501 |
|
) |
|
|
|
print(f"\n {'Total':20s}: {len(owned_files):6d} files, {format_size(total_size)}") # noqa: E501 |
|
print() |
|
|
|
# Print unowned files summary |
|
unowned_by_camera = group_by_camera(unowned_files) |
|
print("Unowned Files (NOT in database - candidates for deletion):") |
|
print("-" * 70) |
|
|
|
total_size = 0 |
|
for camera in sorted(unowned_by_camera.keys()): |
|
files = unowned_by_camera[camera] |
|
camera_size = sum(f.size for f in files) |
|
total_size += camera_size |
|
print( |
|
f" {camera:20s}: {len(files):6d} files, {format_size(camera_size)}" |
|
) |
|
|
|
print( |
|
f"\n {'Total':20s}: {len(unowned_files):6d} files, {format_size(total_size)}" # noqa: E501 |
|
) |
|
|
|
|
|
def print_verbose(files: List[FileInfo], title: str) -> None: |
|
"""Print detailed file listing.""" |
|
print(f"\n{title}") |
|
print("=" * 90) |
|
|
|
files_by_camera = group_by_camera(files) |
|
|
|
for camera in sorted(files_by_camera.keys()): |
|
camera_files = files_by_camera[camera] |
|
# Sort by modification time (oldest first) |
|
camera_files.sort(key=lambda f: f.mtime) |
|
|
|
camera_size = sum(f.size for f in camera_files) |
|
print( |
|
f"\n{camera} ({len(camera_files)} files, {format_size(camera_size)}):" # noqa: E501 |
|
) |
|
print("-" * 90) |
|
|
|
for file_info in camera_files: |
|
mtime_str = file_info.mtime.strftime("%Y-%m-%d %H:%M:%S") |
|
print( |
|
f" {mtime_str} {format_size(file_info.size):>12s} {file_info.path}" # noqa: E501 |
|
) |
|
|
|
|
|
def delete_files(files: List[FileInfo]) -> None: |
|
"""Delete the specified files.""" |
|
if not files: |
|
print("No files to delete.") |
|
return |
|
|
|
total_size = sum(f.size for f in files) |
|
print(f"\nDeleting {len(files)} files ({format_size(total_size)})...") |
|
|
|
deleted_count = 0 |
|
deleted_size = 0 |
|
failed_count = 0 |
|
|
|
for file_info in files: |
|
try: |
|
file_info.path.unlink() |
|
deleted_count += 1 |
|
deleted_size += file_info.size |
|
except OSError as e: |
|
print(f"Error deleting {file_info.path}: {e}", file=sys.stderr) |
|
failed_count += 1 |
|
|
|
print( |
|
f"\nDeleted {deleted_count} files ({format_size(deleted_size)})" |
|
) |
|
if failed_count > 0: |
|
print(f"Failed to delete {failed_count} files", file=sys.stderr) |
|
|
|
|
|
def main() -> None: |
|
"""Main entry point.""" |
|
parser = argparse.ArgumentParser( |
|
description="Clean up orphaned Frigate data files", |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
parser.add_argument( |
|
"data_dir", type=Path, help="Path to Frigate data directory" |
|
) |
|
parser.add_argument( |
|
"-v", |
|
"--verbose", |
|
action="store_true", |
|
help="Show detailed file listings", |
|
) |
|
parser.add_argument( |
|
"--unowned-only", |
|
action="store_true", |
|
help="Show only unowned files", |
|
) |
|
parser.add_argument( |
|
"--delete", |
|
action="store_true", |
|
help="Actually delete unowned files (USE WITH CAUTION)", |
|
) |
|
parser.add_argument( |
|
"--db-path", |
|
type=Path, |
|
help="Path to frigate.db (default: <data_dir>/../config/frigate.db)", |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
# Validate data directory |
|
if not args.data_dir.exists(): |
|
print( |
|
f"Error: Data directory not found: {args.data_dir}", |
|
file=sys.stderr, |
|
) |
|
sys.exit(1) |
|
|
|
# Determine database path |
|
if args.db_path: |
|
db_path = args.db_path |
|
else: |
|
# Default: assume data is at /media/frigate, db at /config/frigate.db |
|
db_path = args.data_dir.parent / "config" / "frigate.db" |
|
|
|
print(f"Data directory: {args.data_dir}") |
|
print(f"Database path: {db_path}") |
|
print() |
|
|
|
# Get database contents |
|
print("Reading database...") |
|
db_recordings = get_db_recordings(db_path) |
|
db_snapshots = get_db_snapshots(db_path) |
|
print( |
|
f"Found {len(db_recordings)} recordings and {len(db_snapshots)} snapshots in database" # noqa: E501 |
|
) |
|
print() |
|
|
|
# Scan filesystem |
|
print("Scanning filesystem...") |
|
owned_files, unowned_files = scan_directory( |
|
args.data_dir, db_recordings, db_snapshots |
|
) |
|
print() |
|
|
|
# Print summary |
|
print_summary(owned_files, unowned_files, args.unowned_only) |
|
|
|
# Print verbose listing if requested |
|
if args.verbose: |
|
if not args.unowned_only: |
|
print_verbose(owned_files, "Owned Files (Detailed)") |
|
print_verbose( |
|
unowned_files, "Unowned Files (Detailed - Candidates for Deletion)" |
|
) |
|
|
|
# Delete if requested |
|
if args.delete: |
|
print() |
|
response = input( |
|
f"Are you sure you want to delete {len(unowned_files)} unowned files? (yes/no): " # noqa: E501 |
|
) |
|
if response.lower() == "yes": |
|
delete_files(unowned_files) |
|
else: |
|
print("Deletion cancelled.") |
|
elif unowned_files: |
|
print( |
|
f"\nRun with --delete to remove {len(unowned_files)} unowned files" |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |