Skip to content

Instantly share code, notes, and snippets.

@witoldsz
Last active January 4, 2026 20:01
Show Gist options
  • Select an option

  • Save witoldsz/532995cc63a280c1febab3e8151f68b3 to your computer and use it in GitHub Desktop.

Select an option

Save witoldsz/532995cc63a280c1febab3e8151f68b3 to your computer and use it in GitHub Desktop.
Application for my home surveillance camera.
import os
import shutil
import time
import logging
import subprocess
import threading
import re
from flask import Flask, render_template_string, send_from_directory, request
from itertools import groupby
SOURCE_DIR = "/data/source"
DEST_DIR = "/data/destination"
CHECK_INTERVAL = 3 # seconds
STABLE_TIME = 7 # how long file must stay unchanged to be moved
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("/data/logs/monitor.log"),
logging.StreamHandler()
]
)
def get_files_with_size(path):
return {
f: os.path.getsize(os.path.join(path, f))
for f in os.listdir(path)
if os.path.isfile(os.path.join(path, f)) and f.lower().endswith(".mp4")
}
def make_thumbnail(in_file, out_file):
cmd = [
"ffmpeg",
"-y", # overwrite
"-i", in_file,
# "-ss", "00:00:02", # 2nd second
# "-vframes", "1", # one frame
"-vf", "fps=2,scale=240:-1:flags=lanczos,setpts=0.1*PTS", # 2fps 10x speed
"-crf", "62", # quality
"-cpu-used", "8", # speed
"-loop", "0", # loop forever
out_file
]
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def monitor_loop():
logging.info("Monitoring v3 started...")
last_sizes = {}
last_seen = {}
while True:
current_files = get_files_with_size(SOURCE_DIR)
for filename, size in current_files.items():
src_path = os.path.join(SOURCE_DIR, filename)
dst_path_mp4 = os.path.join(DEST_DIR, filename)
dst_path_avif = os.path.join(DEST_DIR, filename.replace('.mp4', '.avif'))
# If file already exists in destination – log warning
if os.path.exists(dst_path_mp4):
logging.warning(f"File already exists in destination: {filename}. Skipping.")
continue
# First time seeing the file
if filename not in last_sizes:
last_sizes[filename] = size
last_seen[filename] = time.time()
continue
# If file size has changed – update timestamp
if size != last_sizes[filename]:
last_sizes[filename] = size
last_seen[filename] = time.time()
continue
# If file hasn't changed for STABLE_TIME – move it
if time.time() - last_seen[filename] >= STABLE_TIME:
try:
make_thumbnail(src_path, dst_path_avif)
shutil.move(src_path, dst_path_mp4)
logging.info(f"Moved file: {filename} size: {size}")
except Exception as e:
logging.error(f"Failed to move {filename}: {e}")
last_sizes.pop(filename)
last_seen.pop(filename)
time.sleep(CHECK_INTERVAL)
app = Flask(__name__)
@app.route("/")
def index():
disk_usage = shutil.disk_usage(DEST_DIR)
disk = {
"free_gb": disk_usage.free / (1024**3),
"total_gb": disk_usage.total / (1024**3),
"used_percent": (disk_usage.used / disk_usage.total) * 100
}
pattern = re.compile(
r'k1a-front_(?P<kam>..)_(?P<y>\d{4})(?P<m>\d{2})(?P<d>\d{2})(?P<H>\d{2})(?P<M>\d{2})(?P<S>\d{2}).mp4')
files = []
for f in os.listdir(DEST_DIR):
if f.lower().endswith(".mp4"):
files.append({
"video": f,
"thumb": f.replace(".mp4", ".avif"),
"wide": pattern.sub(r'\g<kam>', f) == "00",
"date": pattern.sub(r'\g<y>-\g<m>-\g<d>', f),
"time": pattern.sub(r'\g<H>:\g<M>:\g<S>', f)
})
files.sort(key=lambda f: f["date"] + f["time"] + str(f["wide"]), reverse=True)
groups = []
for day, items in groupby(files, key=lambda f: f["date"]):
groups.append({
"date": day,
"files": list(items)
})
# files = files[:100]
return render_template_string("""
<!DOCTYPE html>
<html lang="pl">
<head>
<meta charset="UTF-8">
<title>Moje nagrania</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-gray-100 p-6">
<span class="text-2xl font-bold mb-6">📷 k1a</span>
<button id="deleteModeBtn" onclick="toggleDeleteMode()" class="mb-4 px-4 py-2 text-white rounded">...</button>
Disk space: {{ "%.2f"|format(disk.free_gb) }} GB free
/ {{ "%.2f"|format(disk.total_gb) }} GB total ({{ "%.1f"|format(disk.used_percent) }}% used)
<br>
{{ groups | map(attribute='files') | map('length') | sum }} videos
{% for group in groups %}
<h2 class="text-xl font-semibold my-4 sticky top-0 bg-gray-100 py-2 z-10"
onclick="clickGroup('{{ group.date }}')">
{{ group.date }} ({{ group.files | length }} recordings)
</h2>
<div class="grid grid-cols-4 lg:grid-cols-6 gap-4">
{% for f in group.files %}
<div class="date-{{ group.date }} bg-white shadow rounded overflow-hidden cursor-pointer"
data-video="{{ f.video }}"
onclick="clickVideo(this)">
<img src="/video/{{ f.thumb }}" loading="lazy" class="w-full h-auto object-contain">
<div class="p-2 text-center text-sm {{ "font-bold" if f["wide"] else "" }}">{{ f.time }}</div>
</div>
{% endfor %}
</div>
{% endfor %}
<div id="modal" class="hidden fixed inset-0 z-[99999] flex" role="dialog" aria-modal="true">
<div class="mx-auto h-full flex items-center">
<div class="w-full max-h-full rounded-3xl shadow-2xl aspect-video bg-black overflow-hidden relative">
<video id="player" controls autoplay class="w-full rounded"></video>
<button onclick="closeVideo()" class="absolute top-4 right-4 px-3 py-1 bg-red-600 text-white rounded">X</button>
</div>
</div>
</div>
<script>
const playerCls = ["ring", "ring-4", "ring-blue-500"]
const deleteCls = ["ring", "ring-4", "ring-red-500"]
const deleteSet = new Set() // list of files selected for deletion
let oldEl = null
let deleteMode = null
function setDeleteMode(value) {
deleteMode = value ?? deleteMode
const btn = document.getElementById("deleteModeBtn")
if (deleteMode) {
btn.classList.add("bg-red-600")
btn.classList.remove("bg-blue-600")
btn.textContent = `Confirm delete ${deleteSet.size} recordings`
} else {
btn.classList.add("bg-blue-600")
btn.classList.remove("bg-red-600")
btn.textContent = 'Delete recordings'
}
}
setDeleteMode(false) // initial state + button setup
function clickVideo(el) {
src = el.getAttribute("data-video")
if (deleteMode) toggleDelete(el, src)
else openVideo(el, src)
}
function clickGroup(date) {
if (!deleteMode) return // only works in delete mode
const items = document.querySelectorAll(`.date-${date}`)
for (const el of items) clickVideo(el)
}
function toggleDelete(el, src) {
if (deleteSet.has(src)) {
deleteSet.delete(src)
el.classList.remove(...deleteCls)
} else {
deleteSet.add(src)
el.classList.add(...deleteCls)
}
setDeleteMode() // update button text
}
async function toggleDeleteMode() {
if (!deleteMode) {
setDeleteMode(true)
return
}
try {
if (deleteSet.size === 0) return // nothing to delete
if (!confirm(`Are you sure you want to delete ${deleteSet.size} recordings?`)) return
// Send delete request to server
const response = await fetch('/video', {
method: 'DELETE',
headers: { 'Content-Type': 'text/plain' },
body: Array.from(deleteSet).join('\\n')
})
if (response.ok) {
alert(`Selected recordings deleted.\n${await response.text()}`)
location.reload()
} else {
alert("Error deleting recordings.")
}
} finally {
setDeleteMode(false)
}
}
function openVideo(el, src) {
if (oldEl) oldEl.classList.remove(...playerCls)
el.classList.add(...playerCls)
oldEl = el
document.getElementById("player").src = `/video/${src}`
document.getElementById("modal").classList.remove("hidden")
// document.getElementById("modal").classList.add("flex")
}
function closeVideo() {
document.getElementById("player").pause()
document.getElementById("player").src = ""
document.getElementById("modal").classList.add("hidden")
}
document.addEventListener('keydown', function(event) {
if (event.key === 'Escape') closeVideo()
})
// Close modal when clicking outside video
document.getElementById('modal').addEventListener('click', function(event) {
if (event.target === this) loseVideo()
})
</script>
</body>
</html>
""", groups=groups, disk=disk)
@app.route("/video/<path:filename>")
def video(filename):
return send_from_directory(DEST_DIR, filename)
@app.route("/video", methods=["DELETE"])
def deleteVideos():
headers = {'Content-Type': 'text/plain; charset=utf-8'}
body = request.get_data(as_text=True)
if not body:
return "No files specified", 400, headers
response = []
filenames = body.split('\n')
for filename in filenames:
if not filename.lower().endswith(".mp4"):
response.append(f"Invalid file: {filename}")
continue
video_path = os.path.join(DEST_DIR, filename)
thumb_path = os.path.join(DEST_DIR, filename.replace(".mp4", ".avif"))
for f in [video_path, thumb_path]:
try:
if os.path.exists(f):
os.remove(f)
logging.info(f"Deleted: {f}")
response.append(f"Removed: {os.path.basename(f)}")
else:
logging.error(f"File not found: {f}")
response.append(f"File not found: {os.path.basename(f)}")
except Exception as e:
response.append(f"Failed to remove {os.path.basename(f)}: {e}")
logging.error(f"Delete error: {e}")
return "\n".join(response), 200, headers
if __name__ == "__main__":
# Disable werkzeug request logging
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR) # Only show errors, not INFO requests
threading.Thread(target=monitor_loop, daemon=True).start()
app.run(host="0.0.0.0", port=5000, debug=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment