Skip to content

Instantly share code, notes, and snippets.

@ajangrahmat
Created January 21, 2026 08:29
Show Gist options
  • Select an option

  • Save ajangrahmat/2782e0b48160fc1cf7dc377587a61cfc to your computer and use it in GitHub Desktop.

Select an option

Save ajangrahmat/2782e0b48160fc1cf7dc377587a61cfc to your computer and use it in GitHub Desktop.
import argparse
import threading
import time
from flask import Flask, render_template, Response, jsonify, request
import cv2
import mediapipe as mp
import math
import numpy as np
import base64
import pygame
import os
import requests
from queue import Queue, Empty
from dataclasses import dataclass
from typing import Tuple
import json
import paho.mqtt.client as mqtt
# ============================================
# PI CAMERA IMPORT
# ============================================
try:
from picamera2 import Picamera2
from libcamera import Transform
PICAMERA_AVAILABLE = True
print("✅ PiCamera2 is available")
except ImportError:
PICAMERA_AVAILABLE = False
print("⚠️ PiCamera2 not available. Using OpenCV camera instead.")
# ============================================
# MQTT CONFIGURATION
# ============================================
MQTT_BROKER = "broker.emqx.io"
MQTT_PORT = 1883
TOPIC_BPM = "healthmonitor/HM2025/bpm"
TOPIC_NGANTUK = "healthmonitor/HM2025/ngantuk"
# Global variables untuk MQTT
mqtt_client = None
current_bpm = 0
bpm_last_update = 0
system_mode = "manual" # "manual" atau "auto"
system_active = False # True jika sistem deteksi aktif
auto_activated = False # Flag untuk auto-activation
index_camera = 0
app = Flask(__name__)
# ============================================
# FUNGSI KIRIM TELEGRAM - DIMODIFIKASI UNTUK SERTAKAN BPM
# ============================================
def send_telegram_image(img, img_name, caption="Gambar dari CCTV"):
"""Kirim gambar ke Telegram dengan informasi BPM"""
BOT_TOKEN = "8398550472:AAFAINimxLn3Vo8Qxp2GDLF4B5t9s8UjmQo"
CHAT_ID = "8583818545"
SAVE_PATH = "img"
try:
os.makedirs(SAVE_PATH, exist_ok=True)
img_path = os.path.join(SAVE_PATH, img_name)
cv2.imwrite(img_path, img)
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendPhoto"
# Tambahkan informasi BPM ke caption jika ada
global current_bpm
if current_bpm > 0:
caption_with_bpm = f"{caption}\n\n❤️ **DETAK JANTUNG (BPM): {current_bpm}**"
# Tambahkan status BPM
if current_bpm < 62:
caption_with_bpm += "\n⚠️ **Status: BPM RENDAH!**"
elif current_bpm > 100:
caption_with_bpm += "\n⚠️ **Status: BPM TINGGI!**"
else:
caption_with_bpm += "\n✅ **Status: BPM Normal**"
else:
caption_with_bpm = caption
with open(img_path, 'rb') as photo:
r = requests.post(
url,
data={"chat_id": CHAT_ID, "caption": caption_with_bpm, "parse_mode": "Markdown"},
files={"photo": photo}
)
print(f"📸 Telegram status: {r.status_code}")
print(f"📝 Caption sent with BPM: {current_bpm}")
return r.status_code == 200
except Exception as e:
print(f"❌ Error sending to Telegram: {e}")
return False
# ============================================
# MQTT CALLBACKS
# ============================================
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("✅ Connected to MQTT Broker!")
client.subscribe(TOPIC_BPM)
print(f"📡 Subscribed to: {TOPIC_BPM}")
else:
print(f"❌ Failed to connect, return code {rc}")
def on_message(client, userdata, msg):
global current_bpm, bpm_last_update, system_active, auto_activated
try:
if msg.topic == TOPIC_BPM:
current_bpm = int(msg.payload.decode())
bpm_last_update = time.time()
print(f"💓 BPM Received: {current_bpm}")
# AUTO MODE: Aktifkan sistem jika BPM < 62
if system_mode == "auto":
if current_bpm < 62 and current_bpm > 0:
if not system_active:
system_active = True
auto_activated = True
print("🚨 AUTO ACTIVATION: BPM < 62 - Sistem Deteksi AKTIF!")
elif current_bpm >= 62:
if auto_activated:
system_active = False
auto_activated = False
print("✅ AUTO DEACTIVATION: BPM Normal - Sistem Deteksi NONAKTIF")
send_drowsiness_level(0)
except Exception as e:
print(f"❌ Error processing MQTT message: {e}")
def send_drowsiness_level(level):
"""Kirim tingkat ngantuk ke ESP32"""
global mqtt_client
try:
if mqtt_client and mqtt_client.is_connected():
mqtt_client.publish(TOPIC_NGANTUK, str(level))
status_text = ["Normal", "Ringan", "Sedang", "BERAT"][min(level, 3)]
print(f"📤 Sent to ESP32 - Tingkat Ngantuk: {level} ({status_text})")
return True
except Exception as e:
print(f"❌ Error sending drowsiness level: {e}")
return False
def init_mqtt():
"""Initialize MQTT client"""
global mqtt_client
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
try:
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()
print("🚀 MQTT Client initialized")
return True
except Exception as e:
print(f"❌ MQTT connection failed: {e}")
return False
# ============================================
# KONFIGURASI SENSITIVITAS & KALIBRASI
# ============================================
@dataclass
class CameraSensitivity:
"""Kelas untuk mengatur sensitivitas berdasarkan tipe kamera"""
ear_threshold: float = 0.3
mar_threshold: float = 0.45
drowsy_time_threshold: float = 0.8
wake_time_threshold: float = 0.3
ear_decay: float = 0.7
mar_decay: float = 0.7
history_length: int = 20
drowsy_frames_threshold: int = 5
ear_weight: float = 0.6
mar_weight: float = 0.4
time_weight: float = 0.2
# Preset untuk berbagai tipe kamera
CAMERA_PRESETS = {
'normal': CameraSensitivity(),
'wide': CameraSensitivity(
ear_threshold=0.25,
mar_threshold=0.42,
drowsy_time_threshold=1.0,
wake_time_threshold=0.4,
ear_decay=0.65,
mar_decay=0.65
),
'telephoto': CameraSensitivity(
ear_threshold=0.29,
mar_threshold=0.48,
drowsy_time_threshold=0.7,
wake_time_threshold=0.25,
ear_decay=0.75,
mar_decay=0.75
),
}
# Data kalibrasi - PERBAIKAN: Inisialisasi yang lebih robust
calibration_data = {
'awake': {
'collecting': False,
'ear_values': [],
'mar_values': [],
'samples': 0,
'completed': False,
'avg_ear': 0,
'avg_mar': 0
},
'drowsy': {
'collecting': False,
'ear_values': [],
'mar_values': [],
'samples': 0,
'completed': False,
'avg_ear': 0,
'avg_mar': 0
},
'calculated_thresholds': {
'ear': 0.3,
'mar': 0.45
}
}
# Global variables
drowsy_timer = {
'status': 0,
'drowsy_start_time': None,
'wake_start_time': None,
'alarm_playing': False,
'drowsiness_level': 0,
'drowsiness_history': [],
'telegram_sent': False,
}
ear_history = []
mar_history = []
ear_main = 0
mar_main = 0
frame_counter = 0
current_ear = 0
current_mar = 0
# Shared frame queue for web streaming
frame_queue = Queue(maxsize=2)
latest_frame = None
frame_lock = threading.Lock()
# Initialize pygame mixer for alarm
pygame.mixer.init()
# Current sensitivity settings (default normal)
current_sensitivity = CAMERA_PRESETS['normal']
# ============================================
# FUNGSI KALIBRASI - DIPERBAIKI
# ============================================
def start_calibration(mode):
"""Mulai proses kalibrasi untuk mode tertentu"""
global calibration_data
if mode not in ['awake', 'drowsy']:
print(f"❌ Mode kalibrasi tidak valid: {mode}")
return False
# Reset data kalibrasi untuk mode ini
calibration_data[mode]['collecting'] = True
calibration_data[mode]['ear_values'] = []
calibration_data[mode]['mar_values'] = []
calibration_data[mode]['samples'] = 0
calibration_data[mode]['completed'] = False
print(f"🎯 Mulai kalibrasi mode: {mode.upper()}")
print(f" Collecting: {calibration_data[mode]['collecting']}")
return True
def stop_calibration(mode):
"""Stop proses kalibrasi untuk mode tertentu"""
global calibration_data
if mode not in ['awake', 'drowsy']:
print(f"❌ Mode kalibrasi tidak valid: {mode}")
return False
calibration_data[mode]['collecting'] = False
if calibration_data[mode]['samples'] > 0:
calibration_data[mode]['avg_ear'] = np.mean(calibration_data[mode]['ear_values'])
calibration_data[mode]['avg_mar'] = np.mean(calibration_data[mode]['mar_values'])
calibration_data[mode]['completed'] = True
print(f"✅ Kalibrasi {mode.upper()} selesai:")
print(f" Samples: {calibration_data[mode]['samples']}")
print(f" EAR rata-rata: {calibration_data[mode]['avg_ear']:.3f}")
print(f" MAR rata-rata: {calibration_data[mode]['avg_mar']:.3f}")
else:
print(f"⚠️ Kalibrasi {mode.upper()} dihentikan tanpa data")
return True
def calculate_calibrated_thresholds():
"""Hitung threshold berdasarkan data kalibrasi"""
global calibration_data
if not (calibration_data['awake']['completed'] and calibration_data['drowsy']['completed']):
print("❌ Kalibrasi belum lengkap!")
print(f" Awake completed: {calibration_data['awake']['completed']}")
print(f" Drowsy completed: {calibration_data['drowsy']['completed']}")
return False
awake_ear = calibration_data['awake']['avg_ear']
awake_mar = calibration_data['awake']['avg_mar']
drowsy_ear = calibration_data['drowsy']['avg_ear']
drowsy_mar = calibration_data['drowsy']['avg_mar']
# Hitung threshold di tengah-tengah
ear_threshold = (awake_ear + drowsy_ear) / 2
mar_threshold = (awake_mar + drowsy_mar) / 2
# Adjustment untuk keamanan
ear_threshold = ear_threshold * 0.9 # Lebih sensitif untuk EAR
mar_threshold = mar_threshold * 1.1 # Lebih toleran untuk MAR
# Batasi nilai threshold
ear_threshold = max(0.15, min(0.4, ear_threshold))
mar_threshold = max(0.3, min(0.7, mar_threshold))
calibration_data['calculated_thresholds']['ear'] = ear_threshold
calibration_data['calculated_thresholds']['mar'] = mar_threshold
print(f"📊 Threshold terkalibrasi:")
print(f" EAR: {ear_threshold:.3f} (Awake: {awake_ear:.3f}, Drowsy: {drowsy_ear:.3f})")
print(f" MAR: {mar_threshold:.3f} (Awake: {awake_mar:.3f}, Drowsy: {drowsy_mar:.3f})")
save_calibration_to_file()
return True
def apply_calibrated_thresholds():
"""Terapkan threshold yang sudah dihitung"""
global current_sensitivity, calibration_data
ear_threshold = calibration_data['calculated_thresholds']['ear']
mar_threshold = calibration_data['calculated_thresholds']['mar']
custom_params = {
'ear_threshold': ear_threshold,
'mar_threshold': mar_threshold,
'drowsy_time_threshold': 0.8,
'wake_time_threshold': 0.3
}
set_camera_sensitivity('normal', custom_params)
print(f"✅ Threshold diterapkan: EAR={ear_threshold:.3f}, MAR={mar_threshold:.3f}")
return True
def collect_calibration_data():
"""Kumpulkan data kalibrasi saat proses deteksi berjalan"""
global calibration_data, current_ear, current_mar
for mode in ['awake', 'drowsy']:
if calibration_data[mode]['collecting'] and current_ear > 0:
calibration_data[mode]['ear_values'].append(current_ear)
calibration_data[mode]['mar_values'].append(current_mar)
calibration_data[mode]['samples'] += 1
# Auto-stop setelah 50 samples
if calibration_data[mode]['samples'] >= 50:
stop_calibration(mode)
print(f"🎉 Auto-stop kalibrasi {mode.upper()} - 50 samples tercapai")
def reset_calibration():
"""Reset semua data kalibrasi"""
global calibration_data
calibration_data = {
'awake': {
'collecting': False,
'ear_values': [],
'mar_values': [],
'samples': 0,
'completed': False,
'avg_ear': 0,
'avg_mar': 0
},
'drowsy': {
'collecting': False,
'ear_values': [],
'mar_values': [],
'samples': 0,
'completed': False,
'avg_ear': 0,
'avg_mar': 0
},
'calculated_thresholds': {
'ear': 0.3,
'mar': 0.45
}
}
print("🔄 Kalibrasi direset ke nilai awal")
return True
def save_calibration_to_file():
"""Simpan data kalibrasi ke file JSON"""
try:
with open('calibration_data.json', 'w') as f:
json.dump(calibration_data, f, indent=2)
print("💾 Data kalibrasi disimpan ke file")
return True
except Exception as e:
print(f"❌ Gagal menyimpan kalibrasi: {e}")
return False
def load_calibration_from_file():
"""Load data kalibrasi dari file JSON"""
global calibration_data
try:
if os.path.exists('calibration_data.json'):
with open('calibration_data.json', 'r') as f:
loaded_data = json.load(f)
if 'calculated_thresholds' in loaded_data:
calibration_data['calculated_thresholds'] = loaded_data['calculated_thresholds']
apply_calibrated_thresholds()
print("📂 Data kalibrasi dimuat dari file")
return True
except Exception as e:
print(f"❌ Gagal memuat kalibrasi: {e}")
return False
def set_camera_sensitivity(camera_type: str = 'normal', custom_params: dict = None):
"""Set sensitivitas kamera"""
global current_sensitivity
if camera_type in CAMERA_PRESETS:
current_sensitivity = CAMERA_PRESETS[camera_type]
print(f"✅ Set sensitivitas ke preset: {camera_type}")
else:
current_sensitivity = CAMERA_PRESETS['normal']
print(f"⚠️ Tipe kamera tidak dikenal, menggunakan preset: normal")
if custom_params:
for key, value in custom_params.items():
if hasattr(current_sensitivity, key):
setattr(current_sensitivity, key, value)
print(f" ↳ {key}: {value}")
def calculate_drowsiness_level(ear: float, mar: float, drowsy_duration: float = 0) -> float:
"""Hitung tingkat kantuk berdasarkan EAR, MAR, dan durasi"""
if ear <= 0:
return 0
global current_sensitivity
ear_normal = current_sensitivity.ear_threshold * 1.2
ear_closed = current_sensitivity.ear_threshold * 0.5
if ear >= ear_normal:
ear_score = 0
elif ear <= ear_closed:
ear_score = 100
else:
ear_score = ((ear_normal - ear) / (ear_normal - ear_closed)) * 100
mar_normal = current_sensitivity.mar_threshold * 0.8
mar_yawn = current_sensitivity.mar_threshold * 1.5
if mar <= mar_normal:
mar_score = 0
elif mar >= mar_yawn:
mar_score = 100
else:
mar_score = ((mar - mar_normal) / (mar_yawn - mar_normal)) * 100
max_duration = 5.0
time_score = min(100, (drowsy_duration / max_duration) * 100)
drowsiness_level = (
(ear_score * current_sensitivity.ear_weight) +
(mar_score * current_sensitivity.mar_weight) +
(time_score * current_sensitivity.time_weight)
)
drowsiness_level = max(0, min(100, drowsiness_level))
drowsiness_level = round(drowsiness_level, 1)
# Kirim ke MQTT
if system_active:
if drowsiness_level < 30:
tingkat = 0 # Normal
elif drowsiness_level < 50:
tingkat = 1 # Ringan
elif drowsiness_level < 70:
tingkat = 2 # Sedang
else:
tingkat = 3 # Berat
send_drowsiness_level(tingkat)
return drowsiness_level
def update_drowsiness_history(level: float):
"""Update history tingkat kantuk untuk smoothing"""
drowsy_timer['drowsiness_history'].append(level)
if len(drowsy_timer['drowsiness_history']) > 50:
drowsy_timer['drowsiness_history'].pop(0)
if len(drowsy_timer['drowsiness_history']) >= 5:
recent_levels = drowsy_timer['drowsiness_history'][-5:]
smoothed_level = sum(recent_levels) / len(recent_levels)
drowsy_timer['drowsiness_level'] = round(smoothed_level, 1)
else:
drowsy_timer['drowsiness_level'] = level
# ============================================
# ALARM FUNCTIONS
# ============================================
def create_alarm_sound():
"""Create alarm sound file"""
try:
import numpy as np
sample_rate = 44100
duration = 1.0
frequency = 800
t = np.linspace(0, duration, int(sample_rate * duration))
wave = np.sin(2 * np.pi * frequency * t)
modulation = np.sin(2 * np.pi * 5 * t)
wave = wave * (0.5 + 0.5 * modulation)
wave = (wave * 32767).astype(np.int16)
import wave as wav_module
with wav_module.open('alarm.wav', 'w') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
wav_file.writeframes(wave.tobytes())
print("✅ Alarm sound created successfully!")
return True
except Exception as e:
print(f"❌ Could not create alarm sound: {e}")
return False
def init_alarm():
"""Initialize alarm system"""
if not os.path.exists('alarm.wav'):
print("Creating alarm sound...")
if not create_alarm_sound():
print("⚠️ Warning: Could not create alarm sound.")
return False
try:
pygame.mixer.music.load('alarm.wav')
print("✅ Alarm system initialized!")
return True
except Exception as e:
print(f"❌ Could not initialize alarm: {e}")
return False
def play_alarm():
"""Play alarm sound"""
try:
if not drowsy_timer['alarm_playing']:
pygame.mixer.music.play(-1)
drowsy_timer['alarm_playing'] = True
print("🚨 ALARM ACTIVATED!")
except Exception as e:
print(f"❌ Could not play alarm: {e}")
def stop_alarm():
"""Stop alarm sound"""
try:
if drowsy_timer['alarm_playing']:
pygame.mixer.music.stop()
drowsy_timer['alarm_playing'] = False
print("✅ Alarm stopped")
except Exception as e:
print(f"❌ Could not stop alarm: {e}")
# ============================================
# MEDIAPIPE FUNCTIONS
# ============================================
def distance(p1, p2):
"""Calculate Euclidean distance"""
return (((p1[:2] - p2[:2])**2).sum())**0.5
def eye_aspect_ratio(landmarks, eye):
"""Calculate Eye Aspect Ratio"""
N1 = distance(landmarks[eye[1][0]], landmarks[eye[1][1]])
N2 = distance(landmarks[eye[2][0]], landmarks[eye[2][1]])
N3 = distance(landmarks[eye[3][0]], landmarks[eye[3][1]])
D = distance(landmarks[eye[0][0]], landmarks[eye[0][1]])
return (N1 + N2 + N3) / (3 * D)
def eye_feature(landmarks):
"""Get average EAR from both eyes"""
return (eye_aspect_ratio(landmarks, left_eye) + \
eye_aspect_ratio(landmarks, right_eye)) / 2
def mouth_feature(landmarks):
"""Calculate Mouth Aspect Ratio"""
N1 = distance(landmarks[mouth[1][0]], landmarks[mouth[1][1]])
N2 = distance(landmarks[mouth[2][0]], landmarks[mouth[2][1]])
N3 = distance(landmarks[mouth[3][0]], landmarks[mouth[3][1]])
D = distance(landmarks[mouth[0][0]], landmarks[mouth[0][1]])
return (N1 + N2 + N3) / (3 * D)
def run_face_mp(image):
"""Run MediaPipe face detection"""
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
image.flags.writeable = False
results = face_mesh.process(image)
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results.multi_face_landmarks:
landmarks_positions = []
for _, data_point in enumerate(results.multi_face_landmarks[0].landmark):
landmarks_positions.append([data_point.x, data_point.y, data_point.z])
landmarks_positions = np.array(landmarks_positions)
landmarks_positions[:, 0] *= image.shape[1]
landmarks_positions[:, 1] *= image.shape[0]
for face_landmarks in results.multi_face_landmarks:
mp_drawing.draw_landmarks(
image=image,
landmark_list=face_landmarks,
connections=face_connections,
landmark_drawing_spec=drawing_spec,
connection_drawing_spec=drawing_spec)
ear = eye_feature(landmarks_positions)
mar = mouth_feature(landmarks_positions)
else:
ear = -1000
mar = -1000
return ear, mar, image
def add_info_to_frame(image, ear_main, mar_main, drowsy_timer):
"""Add information overlay to frame"""
global current_sensitivity, system_active, system_mode, current_bpm
# Mode dan BPM
mode_color = (0, 255, 255) if system_mode == "auto" else (255, 165, 0)
cv2.putText(image, f"Mode: {system_mode.upper()}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, mode_color, 2)
# Tampilkan BPM
bpm_color = (255, 100, 100)
bpm_status = ""
if current_bpm < 62:
bpm_color = (0, 0, 255) # Merah untuk BPM rendah
bpm_status = " (LOW!)"
elif current_bpm > 100:
bpm_color = (0, 255, 255) # Kuning untuk BPM tinggi
bpm_status = " (HIGH!)"
cv2.putText(image, f"BPM: {current_bpm}{bpm_status}", (10, 55),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, bpm_color, 2)
# System status
status_text = "ACTIVE" if system_active else "INACTIVE"
status_color = (0, 255, 0) if system_active else (100, 100, 100)
cv2.putText(image, f"System: {status_text}", (10, 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, status_color, 2)
# Info kalibrasi
if calibration_data['awake']['collecting']:
cv2.putText(image, "KALIBRASI: TIDAK NGANTUK", (10, 105),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
cv2.putText(image, f"Samples: {calibration_data['awake']['samples']}/50",
(10, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
elif calibration_data['drowsy']['collecting']:
cv2.putText(image, "KALIBRASI: NGANTUK", (10, 105),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 165, 255), 2)
cv2.putText(image, f"Samples: {calibration_data['drowsy']['samples']}/50",
(10, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 2)
if ear_main > 0 and system_active:
cv2.putText(image, f"EAR: {ear_main:.3f} (Thresh: {current_sensitivity.ear_threshold:.3f})",
(10, 165), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.putText(image, f"MAR: {mar_main:.3f} (Thresh: {current_sensitivity.mar_threshold:.3f})",
(10, 195), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
if system_active:
drowsiness_level = drowsy_timer['drowsiness_level']
# Progress bar
bar_width = 200
bar_height = 20
bar_x = 10
bar_y = 220
cv2.rectangle(image, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), (50, 50, 50), -1)
fill_width = int(bar_width * (drowsiness_level / 100))
if drowsiness_level < 30:
color = (0, 255, 0)
elif drowsiness_level < 70:
color = (0, 255, 255)
else:
color = (0, 0, 255)
cv2.rectangle(image, (bar_x, bar_y), (bar_x + fill_width, bar_y + bar_height), color, -1)
cv2.rectangle(image, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), (200, 200, 200), 2)
level_text = f"Drowsiness: {drowsiness_level}%"
cv2.putText(image, level_text, (bar_x + bar_width + 10, bar_y + 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# Status text
current_status = drowsy_timer['status']
if current_status == 0:
status = "AWAKE"
color = (0, 255, 0)
bg_color = (0, 100, 0)
else:
status = "DROWSY - ALARM!"
color = (0, 0, 255)
bg_color = (0, 0, 150)
if int(time.time() * 4) % 2:
color = (255, 255, 255)
text_size = cv2.getTextSize(status, cv2.FONT_HERSHEY_SIMPLEX, 1.0, 2)[0]
cv2.rectangle(image, (5, 250), (text_size[0] + 15, 280), bg_color, -1)
cv2.putText(image, status, (10, 275),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2)
return image
def drowsiness_detection_with_timer(ear, mar, ear_history, mar_history, drowsy_timer, current_frame=None):
"""Deteksi kantuk dengan timer - DIMODIFIKASI UNTUK SERTAKAN BPM"""
global current_sensitivity, system_active, current_bpm
if not system_active:
return 0, drowsy_timer
EAR_THRESHOLD = current_sensitivity.ear_threshold
MAR_THRESHOLD = current_sensitivity.mar_threshold
DROWSY_TIME_THRESHOLD = current_sensitivity.drowsy_time_threshold
WAKE_TIME_THRESHOLD = current_sensitivity.wake_time_threshold
current_time = time.time()
currently_drowsy = False
if ear < EAR_THRESHOLD and ear > 0:
currently_drowsy = True
if mar > MAR_THRESHOLD and ear > 0:
currently_drowsy = True
if len(ear_history) >= current_sensitivity.history_length:
recent_history = ear_history[-current_sensitivity.history_length:]
closed_eye_frames = sum(1 for e in recent_history if e < EAR_THRESHOLD)
if closed_eye_frames >= current_sensitivity.drowsy_frames_threshold:
currently_drowsy = True
drowsy_duration = 0
if drowsy_timer['drowsy_start_time'] is not None:
drowsy_duration = current_time - drowsy_timer['drowsy_start_time']
drowsiness_level = calculate_drowsiness_level(ear, mar, drowsy_duration)
update_drowsiness_history(drowsiness_level)
if currently_drowsy:
if drowsy_timer['drowsy_start_time'] is None:
drowsy_timer['drowsy_start_time'] = current_time
drowsy_timer['wake_start_time'] = None
drowsy_duration = current_time - drowsy_timer['drowsy_start_time']
if drowsy_duration >= DROWSY_TIME_THRESHOLD:
if drowsy_timer['status'] != 1:
drowsy_timer['status'] = 1
play_alarm()
if not drowsy_timer['telegram_sent'] and current_frame is not None:
print("📸 Mengirim gambar ke Telegram...")
timestamp = time.strftime("%Y%m%d_%H%M%S")
img_name = f"drowsy_alert_{timestamp}.png"
# Buat caption dengan informasi lengkap termasuk BPM
caption = f"""⚠️ **PERINGATAN KANTUK!**
**📊 DATA DETEKSI:**
👁️ EAR: {ear:.3f}
👄 MAR: {mar:.3f}
😴 Tingkat Kantuk: {drowsiness_level}%
⏰ Durasi: {drowsy_duration:.1f} detik
🕐 Waktu: {time.strftime('%Y-%m-%d %H:%M:%S')}
**📈 DATA KESEHATAN:**
❤️ Detak Jantung (BPM): {current_bpm if current_bpm > 0 else 'Tidak tersedia'}
⚠️ **PERHATIAN:** Pengemudi menunjukkan tanda-tanda kantuk!"""
success = send_telegram_image(
current_frame,
img_name,
caption=caption
)
if success:
drowsy_timer['telegram_sent'] = True
else:
if drowsy_timer['wake_start_time'] is None:
drowsy_timer['wake_start_time'] = current_time
if drowsy_timer['wake_start_time'] is not None:
wake_duration = current_time - drowsy_timer['wake_start_time']
if wake_duration >= WAKE_TIME_THRESHOLD:
if drowsy_timer['status'] != 0:
drowsy_timer['status'] = 0
drowsy_timer['drowsy_start_time'] = None
stop_alarm()
if drowsy_timer['telegram_sent']:
drowsy_timer['telegram_sent'] = False
return drowsy_timer['status'], drowsy_timer
# ============================================
# MEDIAPIPE INITIALIZATION
# ============================================
right_eye = [[33, 133], [160, 144], [159, 145], [158, 153]]
left_eye = [[263, 362], [387, 373], [386, 374], [385, 380]]
mouth = [[61, 291], [39, 181], [0, 17], [269, 405]]
mp_face_mesh = mp.solutions.face_mesh
try:
face_connections = mp_face_mesh.FACEMESH_CONTOURS
except AttributeError:
face_connections = mp_face_mesh.FACE_CONNECTIONS
face_mesh = mp_face_mesh.FaceMesh(
min_detection_confidence=0.3, min_tracking_confidence=0.8)
mp_drawing = mp.solutions.drawing_utils
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
# ============================================
# DROWSINESS DETECTOR CLASS
# ============================================
class DrowsinessDetector:
def __init__(self, mode='web', camera_type='normal', custom_params=None):
self.mode = mode
self.running = False
self.detection_thread = None
self.cap = None
self.picam2 = None
# Initialize camera availability
self.picamera_available = PICAMERA_AVAILABLE
load_calibration_from_file()
set_camera_sensitivity(camera_type, custom_params)
self.alarm_ready = init_alarm()
def start_detection(self):
self.running = True
if self.mode == 'web':
self.detection_thread = threading.Thread(target=self.shared_camera_loop, daemon=True)
self.detection_thread.start()
def stop_detection(self):
self.running = False
stop_alarm()
# Stop Pi Camera
if self.picam2 is not None:
try:
self.picam2.stop()
print("✅ Pi Camera stopped")
except:
pass
# Stop OpenCV camera
if self.cap:
self.cap.release()
def shared_camera_loop(self):
global drowsy_timer, ear_history, mar_history, ear_main, mar_main
global frame_counter, current_ear, current_mar, latest_frame, frame_lock
decay = current_sensitivity.ear_decay
detection_frequency = 2
print("🚀 Shared camera loop started...")
print(f"📷 Camera type: {'Pi Camera' if self.picamera_available else 'OpenCV Camera'}")
# Initialize camera based on availability
if self.picamera_available:
try:
# Initialize Pi Camera
self.picam2 = Picamera2()
# Configure camera with simpler settings
config = self.picam2.create_preview_configuration(
main={"size": (640, 480), "format": "RGB888"}
)
self.picam2.configure(config)
self.picam2.start()
print("✅ Pi Camera initialized successfully!")
except Exception as e:
print(f"❌ Failed to initialize Pi Camera: {e}")
print("📷 Falling back to OpenCV camera...")
self.picamera_available = False
# Fallback to OpenCV camera if Pi Camera not available
if not self.picamera_available or self.picam2 is None:
self.cap = cv2.VideoCapture(index_camera)
if not self.cap.isOpened():
print(f"❌ Cannot open camera with index {index_camera}")
# Try another index
self.cap = cv2.VideoCapture(0)
if not self.cap.isOpened():
print("❌ Cannot open any camera")
return
print("✅ OpenCV camera initialized")
while self.running:
try:
# Capture frame based on camera type
if self.picamera_available and self.picam2 is not None:
# Capture from Pi Camera
frame = self.picam2.capture_array()
# Ensure frame has correct format
if len(frame.shape) == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
elif frame.shape[2] == 4:
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR)
elif frame.shape[2] == 3:
# PiCamera returns RGB, convert to BGR for OpenCV
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
else:
# Capture from OpenCV camera
success, frame = self.cap.read()
if not success:
time.sleep(0.1)
continue
# Process frame
ear, mar, processed_image = run_face_mp(frame)
if ear != -1000:
if ear_main == 0:
ear_main = ear
mar_main = mar
else:
ear_main = ear_main * decay + (1 - decay) * ear
mar_main = mar_main * current_sensitivity.mar_decay + (1 - current_sensitivity.mar_decay) * mar
ear_history.append(ear_main)
mar_history.append(mar_main)
if len(ear_history) > current_sensitivity.history_length:
ear_history.pop(0)
if len(mar_history) > current_sensitivity.history_length:
mar_history.pop(0)
current_ear = ear_main if ear != -1000 else 0
current_mar = mar_main if ear != -1000 else 0
collect_calibration_data()
frame_counter += 1
if frame_counter >= detection_frequency and ear != -1000:
frame_counter = 0
label, drowsy_timer = drowsiness_detection_with_timer(
ear_main, mar_main, ear_history, mar_history, drowsy_timer,
current_frame=processed_image)
display_image = add_info_to_frame(processed_image.copy(), ear_main, mar_main, drowsy_timer)
with frame_lock:
latest_frame = display_image.copy()
time.sleep(0.05)
except Exception as e:
print(f"❌ Error in camera loop: {e}")
time.sleep(0.1)
# Cleanup
if self.picam2 is not None:
try:
self.picam2.stop()
except:
pass
if self.cap is not None:
self.cap.release()
print("🛑 Shared camera loop stopped")
def generate_frames():
"""Generator untuk streaming video"""
global latest_frame, frame_lock
while True:
with frame_lock:
if latest_frame is not None:
frame = latest_frame.copy()
else:
frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(frame, "Waiting for camera...", (50, 240),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
ret, buffer = cv2.imencode('.jpg', frame)
frame_bytes = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
time.sleep(0.05)
# ============================================
# FLASK ROUTES - DIPERBAIKI & LENGKAP
# ============================================
@app.route('/')
def index():
return render_template('index.html')
@app.route('/video_feed')
def video_feed():
return Response(generate_frames(),
mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/api/status')
def get_status():
"""Get detection status"""
current_time = time.time()
drowsy_duration = 0
wake_duration = 0
if drowsy_timer['drowsy_start_time'] is not None:
drowsy_duration = current_time - drowsy_timer['drowsy_start_time']
if drowsy_timer['wake_start_time'] is not None:
wake_duration = current_time - drowsy_timer['wake_start_time']
return jsonify({
'status': 'DROWSY' if drowsy_timer['status'] == 1 else 'WAKE',
'ear': round(current_ear, 3),
'mar': round(current_mar, 3),
'drowsiness_level': drowsy_timer['drowsiness_level'],
'drowsy_duration': round(drowsy_duration, 1),
'wake_duration': round(wake_duration, 1),
'alarm_playing': drowsy_timer['alarm_playing'],
'thresholds': {
'ear': current_sensitivity.ear_threshold,
'mar': current_sensitivity.mar_threshold
},
'current_bpm': current_bpm,
'bpm_last_update': bpm_last_update
})
@app.route('/api/mqtt/status')
def get_mqtt_status():
"""Get MQTT status"""
global current_bpm, bpm_last_update, system_mode, system_active
bpm_age = time.time() - bpm_last_update if bpm_last_update > 0 else 999
bpm_connected = bpm_age < 5
return jsonify({
'mqtt_connected': mqtt_client.is_connected() if mqtt_client else False,
'current_bpm': current_bpm,
'bpm_connected': bpm_connected,
'bpm_age': round(bpm_age, 1),
'system_mode': system_mode,
'system_active': system_active,
'auto_activated': auto_activated
})
@app.route('/api/system/mode/<mode>', methods=['POST'])
def set_system_mode(mode):
"""Set system mode (manual/auto)"""
global system_mode, system_active, auto_activated
if mode not in ['manual', 'auto']:
return jsonify({'status': 'error', 'message': 'Mode tidak valid'}), 400
system_mode = mode
if mode == 'manual':
auto_activated = False
elif mode == 'auto':
if current_bpm < 62 and current_bpm > 0:
system_active = True
auto_activated = True
else:
system_active = False
auto_activated = False
return jsonify({
'status': 'success',
'message': f'Mode changed to {mode}',
'system_mode': system_mode,
'system_active': system_active
})
@app.route('/api/system/toggle', methods=['POST'])
def toggle_system():
"""Toggle system active/inactive (manual mode only)"""
global system_active, system_mode
if system_mode == 'auto':
return jsonify({
'status': 'error',
'message': 'Cannot toggle in AUTO mode'
}), 400
system_active = not system_active
if not system_active:
send_drowsiness_level(0)
return jsonify({
'status': 'success',
'system_active': system_active
})
# ============================================
# CALIBRATION API ROUTES
# ============================================
@app.route('/api/calibration/start/<mode>', methods=['POST'])
def api_start_calibration(mode):
"""API untuk memulai kalibrasi"""
try:
success = start_calibration(mode)
if success:
return jsonify({
'status': 'success',
'message': f'Kalibrasi {mode} dimulai',
'mode': mode
})
else:
return jsonify({
'status': 'error',
'message': f'Gagal memulai kalibrasi {mode}'
}), 400
except Exception as e:
print(f"❌ Error in start_calibration API: {e}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/calibration/stop/<mode>', methods=['POST'])
def api_stop_calibration(mode):
"""API untuk menghentikan kalibrasi"""
try:
success = stop_calibration(mode)
if success:
return jsonify({
'status': 'success',
'message': f'Kalibrasi {mode} dihentikan',
'mode': mode,
'samples': calibration_data[mode]['samples']
})
else:
return jsonify({
'status': 'error',
'message': f'Gagal menghentikan kalibrasi {mode}'
}), 400
except Exception as e:
print(f"❌ Error in stop_calibration API: {e}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/calibration/calculate', methods=['POST'])
def api_calculate_thresholds():
"""API untuk menghitung threshold"""
try:
success = calculate_calibrated_thresholds()
if success:
return jsonify({
'status': 'success',
'message': 'Threshold berhasil dihitung',
'thresholds': calibration_data['calculated_thresholds']
})
else:
return jsonify({
'status': 'error',
'message': 'Kalibrasi belum lengkap. Selesaikan kalibrasi awake dan drowsy terlebih dahulu.'
}), 400
except Exception as e:
print(f"❌ Error in calculate_thresholds API: {e}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/calibration/apply', methods=['POST'])
def api_apply_thresholds():
"""API untuk menerapkan threshold"""
try:
success = apply_calibrated_thresholds()
if success:
return jsonify({
'status': 'success',
'message': 'Threshold berhasil diterapkan',
'thresholds': {
'ear': current_sensitivity.ear_threshold,
'mar': current_sensitivity.mar_threshold
}
})
else:
return jsonify({
'status': 'error',
'message': 'Gagal menerapkan threshold'
}), 400
except Exception as e:
print(f"❌ Error in apply_thresholds API: {e}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/calibration/reset', methods=['POST'])
def api_reset_calibration():
"""API untuk mereset kalibrasi"""
try:
success = reset_calibration()
return jsonify({
'status': 'success',
'message': 'Kalibrasi direset'
})
except Exception as e:
print(f"❌ Error in reset_calibration API: {e}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/calibration/status')
def api_calibration_status():
"""API untuk mendapatkan status kalibrasi"""
try:
return jsonify({
'awake': {
'collecting': calibration_data['awake']['collecting'],
'samples': calibration_data['awake']['samples'],
'completed': calibration_data['awake']['completed'],
'avg_ear': round(calibration_data['awake']['avg_ear'], 3),
'avg_mar': round(calibration_data['awake']['avg_mar'], 3)
},
'drowsy': {
'collecting': calibration_data['drowsy']['collecting'],
'samples': calibration_data['drowsy']['samples'],
'completed': calibration_data['drowsy']['completed'],
'avg_ear': round(calibration_data['drowsy']['avg_ear'], 3),
'avg_mar': round(calibration_data['drowsy']['avg_mar'], 3)
},
'calculated_thresholds': calibration_data['calculated_thresholds']
})
except Exception as e:
print(f"❌ Error in calibration_status API: {e}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/set_sensitivity/<camera_type>', methods=['POST'])
def api_set_sensitivity(camera_type):
"""Set sensitivity preset atau custom"""
try:
custom_params = request.get_json() if request.is_json else None
set_camera_sensitivity(camera_type, custom_params)
return jsonify({
'status': 'success',
'message': f'Sensitivity set to {camera_type}',
'thresholds': {
'ear': current_sensitivity.ear_threshold,
'mar': current_sensitivity.mar_threshold
}
})
except Exception as e:
print(f"❌ Error in set_sensitivity API: {e}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/api/stop_alarm', methods=['POST'])
def api_stop_alarm():
"""Stop alarm manually"""
stop_alarm()
return jsonify({'status': 'success'})
# ============================================
# MAIN FUNCTION
# ============================================
def main():
parser = argparse.ArgumentParser(description='Drowsiness Detection with MQTT')
parser.add_argument('--mode', choices=['web'], default='web')
parser.add_argument('--port', type=int, default=5000)
parser.add_argument('--system-mode', choices=['manual', 'auto'], default='manual')
args = parser.parse_args()
global system_mode
system_mode = args.system_mode
print("=" * 60)
print("DROWSINESS DETECTION SYSTEM WITH MQTT")
print("=" * 60)
print(f"📡 MQTT Broker: {MQTT_BROKER}")
print(f"📨 Subscribe: {TOPIC_BPM}")
print(f"📤 Publish: {TOPIC_NGANTUK}")
print(f"⚙️ System Mode: {system_mode.upper()}")
print("\n🚀 Initializing MQTT...")
if not init_mqtt():
print("❌ MQTT Failed")
return
time.sleep(2)
detector = DrowsinessDetector(mode=args.mode)
try:
print(f"\n🌐 Starting WEB mode on port {args.port}")
detector.start_detection()
app.run(debug=False, host='0.0.0.0', port=args.port, use_reloader=False)
except KeyboardInterrupt:
print("\n🛑 Shutting down...")
if mqtt_client:
mqtt_client.loop_stop()
mqtt_client.disconnect()
detector.stop_detection()
print("✅ Stopped")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment