Created
January 21, 2026 08:29
-
-
Save ajangrahmat/2782e0b48160fc1cf7dc377587a61cfc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import threading | |
| import time | |
| from flask import Flask, render_template, Response, jsonify, request | |
| import cv2 | |
| import mediapipe as mp | |
| import math | |
| import numpy as np | |
| import base64 | |
| import pygame | |
| import os | |
| import requests | |
| from queue import Queue, Empty | |
| from dataclasses import dataclass | |
| from typing import Tuple | |
| import json | |
| import paho.mqtt.client as mqtt | |
| # ============================================ | |
| # PI CAMERA IMPORT | |
| # ============================================ | |
| try: | |
| from picamera2 import Picamera2 | |
| from libcamera import Transform | |
| PICAMERA_AVAILABLE = True | |
| print("✅ PiCamera2 is available") | |
| except ImportError: | |
| PICAMERA_AVAILABLE = False | |
| print("⚠️ PiCamera2 not available. Using OpenCV camera instead.") | |
| # ============================================ | |
| # MQTT CONFIGURATION | |
| # ============================================ | |
| MQTT_BROKER = "broker.emqx.io" | |
| MQTT_PORT = 1883 | |
| TOPIC_BPM = "healthmonitor/HM2025/bpm" | |
| TOPIC_NGANTUK = "healthmonitor/HM2025/ngantuk" | |
| # Global variables untuk MQTT | |
| mqtt_client = None | |
| current_bpm = 0 | |
| bpm_last_update = 0 | |
| system_mode = "manual" # "manual" atau "auto" | |
| system_active = False # True jika sistem deteksi aktif | |
| auto_activated = False # Flag untuk auto-activation | |
| index_camera = 0 | |
| app = Flask(__name__) | |
| # ============================================ | |
| # FUNGSI KIRIM TELEGRAM - DIMODIFIKASI UNTUK SERTAKAN BPM | |
| # ============================================ | |
| def send_telegram_image(img, img_name, caption="Gambar dari CCTV"): | |
| """Kirim gambar ke Telegram dengan informasi BPM""" | |
| BOT_TOKEN = "8398550472:AAFAINimxLn3Vo8Qxp2GDLF4B5t9s8UjmQo" | |
| CHAT_ID = "8583818545" | |
| SAVE_PATH = "img" | |
| try: | |
| os.makedirs(SAVE_PATH, exist_ok=True) | |
| img_path = os.path.join(SAVE_PATH, img_name) | |
| cv2.imwrite(img_path, img) | |
| url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendPhoto" | |
| # Tambahkan informasi BPM ke caption jika ada | |
| global current_bpm | |
| if current_bpm > 0: | |
| caption_with_bpm = f"{caption}\n\n❤️ **DETAK JANTUNG (BPM): {current_bpm}**" | |
| # Tambahkan status BPM | |
| if current_bpm < 62: | |
| caption_with_bpm += "\n⚠️ **Status: BPM RENDAH!**" | |
| elif current_bpm > 100: | |
| caption_with_bpm += "\n⚠️ **Status: BPM TINGGI!**" | |
| else: | |
| caption_with_bpm += "\n✅ **Status: BPM Normal**" | |
| else: | |
| caption_with_bpm = caption | |
| with open(img_path, 'rb') as photo: | |
| r = requests.post( | |
| url, | |
| data={"chat_id": CHAT_ID, "caption": caption_with_bpm, "parse_mode": "Markdown"}, | |
| files={"photo": photo} | |
| ) | |
| print(f"📸 Telegram status: {r.status_code}") | |
| print(f"📝 Caption sent with BPM: {current_bpm}") | |
| return r.status_code == 200 | |
| except Exception as e: | |
| print(f"❌ Error sending to Telegram: {e}") | |
| return False | |
| # ============================================ | |
| # MQTT CALLBACKS | |
| # ============================================ | |
| def on_connect(client, userdata, flags, rc): | |
| if rc == 0: | |
| print("✅ Connected to MQTT Broker!") | |
| client.subscribe(TOPIC_BPM) | |
| print(f"📡 Subscribed to: {TOPIC_BPM}") | |
| else: | |
| print(f"❌ Failed to connect, return code {rc}") | |
| def on_message(client, userdata, msg): | |
| global current_bpm, bpm_last_update, system_active, auto_activated | |
| try: | |
| if msg.topic == TOPIC_BPM: | |
| current_bpm = int(msg.payload.decode()) | |
| bpm_last_update = time.time() | |
| print(f"💓 BPM Received: {current_bpm}") | |
| # AUTO MODE: Aktifkan sistem jika BPM < 62 | |
| if system_mode == "auto": | |
| if current_bpm < 62 and current_bpm > 0: | |
| if not system_active: | |
| system_active = True | |
| auto_activated = True | |
| print("🚨 AUTO ACTIVATION: BPM < 62 - Sistem Deteksi AKTIF!") | |
| elif current_bpm >= 62: | |
| if auto_activated: | |
| system_active = False | |
| auto_activated = False | |
| print("✅ AUTO DEACTIVATION: BPM Normal - Sistem Deteksi NONAKTIF") | |
| send_drowsiness_level(0) | |
| except Exception as e: | |
| print(f"❌ Error processing MQTT message: {e}") | |
| def send_drowsiness_level(level): | |
| """Kirim tingkat ngantuk ke ESP32""" | |
| global mqtt_client | |
| try: | |
| if mqtt_client and mqtt_client.is_connected(): | |
| mqtt_client.publish(TOPIC_NGANTUK, str(level)) | |
| status_text = ["Normal", "Ringan", "Sedang", "BERAT"][min(level, 3)] | |
| print(f"📤 Sent to ESP32 - Tingkat Ngantuk: {level} ({status_text})") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error sending drowsiness level: {e}") | |
| return False | |
| def init_mqtt(): | |
| """Initialize MQTT client""" | |
| global mqtt_client | |
| mqtt_client = mqtt.Client() | |
| mqtt_client.on_connect = on_connect | |
| mqtt_client.on_message = on_message | |
| try: | |
| mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) | |
| mqtt_client.loop_start() | |
| print("🚀 MQTT Client initialized") | |
| return True | |
| except Exception as e: | |
| print(f"❌ MQTT connection failed: {e}") | |
| return False | |
| # ============================================ | |
| # KONFIGURASI SENSITIVITAS & KALIBRASI | |
| # ============================================ | |
| @dataclass | |
| class CameraSensitivity: | |
| """Kelas untuk mengatur sensitivitas berdasarkan tipe kamera""" | |
| ear_threshold: float = 0.3 | |
| mar_threshold: float = 0.45 | |
| drowsy_time_threshold: float = 0.8 | |
| wake_time_threshold: float = 0.3 | |
| ear_decay: float = 0.7 | |
| mar_decay: float = 0.7 | |
| history_length: int = 20 | |
| drowsy_frames_threshold: int = 5 | |
| ear_weight: float = 0.6 | |
| mar_weight: float = 0.4 | |
| time_weight: float = 0.2 | |
| # Preset untuk berbagai tipe kamera | |
| CAMERA_PRESETS = { | |
| 'normal': CameraSensitivity(), | |
| 'wide': CameraSensitivity( | |
| ear_threshold=0.25, | |
| mar_threshold=0.42, | |
| drowsy_time_threshold=1.0, | |
| wake_time_threshold=0.4, | |
| ear_decay=0.65, | |
| mar_decay=0.65 | |
| ), | |
| 'telephoto': CameraSensitivity( | |
| ear_threshold=0.29, | |
| mar_threshold=0.48, | |
| drowsy_time_threshold=0.7, | |
| wake_time_threshold=0.25, | |
| ear_decay=0.75, | |
| mar_decay=0.75 | |
| ), | |
| } | |
| # Data kalibrasi - PERBAIKAN: Inisialisasi yang lebih robust | |
| calibration_data = { | |
| 'awake': { | |
| 'collecting': False, | |
| 'ear_values': [], | |
| 'mar_values': [], | |
| 'samples': 0, | |
| 'completed': False, | |
| 'avg_ear': 0, | |
| 'avg_mar': 0 | |
| }, | |
| 'drowsy': { | |
| 'collecting': False, | |
| 'ear_values': [], | |
| 'mar_values': [], | |
| 'samples': 0, | |
| 'completed': False, | |
| 'avg_ear': 0, | |
| 'avg_mar': 0 | |
| }, | |
| 'calculated_thresholds': { | |
| 'ear': 0.3, | |
| 'mar': 0.45 | |
| } | |
| } | |
| # Global variables | |
| drowsy_timer = { | |
| 'status': 0, | |
| 'drowsy_start_time': None, | |
| 'wake_start_time': None, | |
| 'alarm_playing': False, | |
| 'drowsiness_level': 0, | |
| 'drowsiness_history': [], | |
| 'telegram_sent': False, | |
| } | |
| ear_history = [] | |
| mar_history = [] | |
| ear_main = 0 | |
| mar_main = 0 | |
| frame_counter = 0 | |
| current_ear = 0 | |
| current_mar = 0 | |
| # Shared frame queue for web streaming | |
| frame_queue = Queue(maxsize=2) | |
| latest_frame = None | |
| frame_lock = threading.Lock() | |
| # Initialize pygame mixer for alarm | |
| pygame.mixer.init() | |
| # Current sensitivity settings (default normal) | |
| current_sensitivity = CAMERA_PRESETS['normal'] | |
| # ============================================ | |
| # FUNGSI KALIBRASI - DIPERBAIKI | |
| # ============================================ | |
| def start_calibration(mode): | |
| """Mulai proses kalibrasi untuk mode tertentu""" | |
| global calibration_data | |
| if mode not in ['awake', 'drowsy']: | |
| print(f"❌ Mode kalibrasi tidak valid: {mode}") | |
| return False | |
| # Reset data kalibrasi untuk mode ini | |
| calibration_data[mode]['collecting'] = True | |
| calibration_data[mode]['ear_values'] = [] | |
| calibration_data[mode]['mar_values'] = [] | |
| calibration_data[mode]['samples'] = 0 | |
| calibration_data[mode]['completed'] = False | |
| print(f"🎯 Mulai kalibrasi mode: {mode.upper()}") | |
| print(f" Collecting: {calibration_data[mode]['collecting']}") | |
| return True | |
| def stop_calibration(mode): | |
| """Stop proses kalibrasi untuk mode tertentu""" | |
| global calibration_data | |
| if mode not in ['awake', 'drowsy']: | |
| print(f"❌ Mode kalibrasi tidak valid: {mode}") | |
| return False | |
| calibration_data[mode]['collecting'] = False | |
| if calibration_data[mode]['samples'] > 0: | |
| calibration_data[mode]['avg_ear'] = np.mean(calibration_data[mode]['ear_values']) | |
| calibration_data[mode]['avg_mar'] = np.mean(calibration_data[mode]['mar_values']) | |
| calibration_data[mode]['completed'] = True | |
| print(f"✅ Kalibrasi {mode.upper()} selesai:") | |
| print(f" Samples: {calibration_data[mode]['samples']}") | |
| print(f" EAR rata-rata: {calibration_data[mode]['avg_ear']:.3f}") | |
| print(f" MAR rata-rata: {calibration_data[mode]['avg_mar']:.3f}") | |
| else: | |
| print(f"⚠️ Kalibrasi {mode.upper()} dihentikan tanpa data") | |
| return True | |
| def calculate_calibrated_thresholds(): | |
| """Hitung threshold berdasarkan data kalibrasi""" | |
| global calibration_data | |
| if not (calibration_data['awake']['completed'] and calibration_data['drowsy']['completed']): | |
| print("❌ Kalibrasi belum lengkap!") | |
| print(f" Awake completed: {calibration_data['awake']['completed']}") | |
| print(f" Drowsy completed: {calibration_data['drowsy']['completed']}") | |
| return False | |
| awake_ear = calibration_data['awake']['avg_ear'] | |
| awake_mar = calibration_data['awake']['avg_mar'] | |
| drowsy_ear = calibration_data['drowsy']['avg_ear'] | |
| drowsy_mar = calibration_data['drowsy']['avg_mar'] | |
| # Hitung threshold di tengah-tengah | |
| ear_threshold = (awake_ear + drowsy_ear) / 2 | |
| mar_threshold = (awake_mar + drowsy_mar) / 2 | |
| # Adjustment untuk keamanan | |
| ear_threshold = ear_threshold * 0.9 # Lebih sensitif untuk EAR | |
| mar_threshold = mar_threshold * 1.1 # Lebih toleran untuk MAR | |
| # Batasi nilai threshold | |
| ear_threshold = max(0.15, min(0.4, ear_threshold)) | |
| mar_threshold = max(0.3, min(0.7, mar_threshold)) | |
| calibration_data['calculated_thresholds']['ear'] = ear_threshold | |
| calibration_data['calculated_thresholds']['mar'] = mar_threshold | |
| print(f"📊 Threshold terkalibrasi:") | |
| print(f" EAR: {ear_threshold:.3f} (Awake: {awake_ear:.3f}, Drowsy: {drowsy_ear:.3f})") | |
| print(f" MAR: {mar_threshold:.3f} (Awake: {awake_mar:.3f}, Drowsy: {drowsy_mar:.3f})") | |
| save_calibration_to_file() | |
| return True | |
| def apply_calibrated_thresholds(): | |
| """Terapkan threshold yang sudah dihitung""" | |
| global current_sensitivity, calibration_data | |
| ear_threshold = calibration_data['calculated_thresholds']['ear'] | |
| mar_threshold = calibration_data['calculated_thresholds']['mar'] | |
| custom_params = { | |
| 'ear_threshold': ear_threshold, | |
| 'mar_threshold': mar_threshold, | |
| 'drowsy_time_threshold': 0.8, | |
| 'wake_time_threshold': 0.3 | |
| } | |
| set_camera_sensitivity('normal', custom_params) | |
| print(f"✅ Threshold diterapkan: EAR={ear_threshold:.3f}, MAR={mar_threshold:.3f}") | |
| return True | |
| def collect_calibration_data(): | |
| """Kumpulkan data kalibrasi saat proses deteksi berjalan""" | |
| global calibration_data, current_ear, current_mar | |
| for mode in ['awake', 'drowsy']: | |
| if calibration_data[mode]['collecting'] and current_ear > 0: | |
| calibration_data[mode]['ear_values'].append(current_ear) | |
| calibration_data[mode]['mar_values'].append(current_mar) | |
| calibration_data[mode]['samples'] += 1 | |
| # Auto-stop setelah 50 samples | |
| if calibration_data[mode]['samples'] >= 50: | |
| stop_calibration(mode) | |
| print(f"🎉 Auto-stop kalibrasi {mode.upper()} - 50 samples tercapai") | |
| def reset_calibration(): | |
| """Reset semua data kalibrasi""" | |
| global calibration_data | |
| calibration_data = { | |
| 'awake': { | |
| 'collecting': False, | |
| 'ear_values': [], | |
| 'mar_values': [], | |
| 'samples': 0, | |
| 'completed': False, | |
| 'avg_ear': 0, | |
| 'avg_mar': 0 | |
| }, | |
| 'drowsy': { | |
| 'collecting': False, | |
| 'ear_values': [], | |
| 'mar_values': [], | |
| 'samples': 0, | |
| 'completed': False, | |
| 'avg_ear': 0, | |
| 'avg_mar': 0 | |
| }, | |
| 'calculated_thresholds': { | |
| 'ear': 0.3, | |
| 'mar': 0.45 | |
| } | |
| } | |
| print("🔄 Kalibrasi direset ke nilai awal") | |
| return True | |
| def save_calibration_to_file(): | |
| """Simpan data kalibrasi ke file JSON""" | |
| try: | |
| with open('calibration_data.json', 'w') as f: | |
| json.dump(calibration_data, f, indent=2) | |
| print("💾 Data kalibrasi disimpan ke file") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Gagal menyimpan kalibrasi: {e}") | |
| return False | |
| def load_calibration_from_file(): | |
| """Load data kalibrasi dari file JSON""" | |
| global calibration_data | |
| try: | |
| if os.path.exists('calibration_data.json'): | |
| with open('calibration_data.json', 'r') as f: | |
| loaded_data = json.load(f) | |
| if 'calculated_thresholds' in loaded_data: | |
| calibration_data['calculated_thresholds'] = loaded_data['calculated_thresholds'] | |
| apply_calibrated_thresholds() | |
| print("📂 Data kalibrasi dimuat dari file") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Gagal memuat kalibrasi: {e}") | |
| return False | |
| def set_camera_sensitivity(camera_type: str = 'normal', custom_params: dict = None): | |
| """Set sensitivitas kamera""" | |
| global current_sensitivity | |
| if camera_type in CAMERA_PRESETS: | |
| current_sensitivity = CAMERA_PRESETS[camera_type] | |
| print(f"✅ Set sensitivitas ke preset: {camera_type}") | |
| else: | |
| current_sensitivity = CAMERA_PRESETS['normal'] | |
| print(f"⚠️ Tipe kamera tidak dikenal, menggunakan preset: normal") | |
| if custom_params: | |
| for key, value in custom_params.items(): | |
| if hasattr(current_sensitivity, key): | |
| setattr(current_sensitivity, key, value) | |
| print(f" ↳ {key}: {value}") | |
| def calculate_drowsiness_level(ear: float, mar: float, drowsy_duration: float = 0) -> float: | |
| """Hitung tingkat kantuk berdasarkan EAR, MAR, dan durasi""" | |
| if ear <= 0: | |
| return 0 | |
| global current_sensitivity | |
| ear_normal = current_sensitivity.ear_threshold * 1.2 | |
| ear_closed = current_sensitivity.ear_threshold * 0.5 | |
| if ear >= ear_normal: | |
| ear_score = 0 | |
| elif ear <= ear_closed: | |
| ear_score = 100 | |
| else: | |
| ear_score = ((ear_normal - ear) / (ear_normal - ear_closed)) * 100 | |
| mar_normal = current_sensitivity.mar_threshold * 0.8 | |
| mar_yawn = current_sensitivity.mar_threshold * 1.5 | |
| if mar <= mar_normal: | |
| mar_score = 0 | |
| elif mar >= mar_yawn: | |
| mar_score = 100 | |
| else: | |
| mar_score = ((mar - mar_normal) / (mar_yawn - mar_normal)) * 100 | |
| max_duration = 5.0 | |
| time_score = min(100, (drowsy_duration / max_duration) * 100) | |
| drowsiness_level = ( | |
| (ear_score * current_sensitivity.ear_weight) + | |
| (mar_score * current_sensitivity.mar_weight) + | |
| (time_score * current_sensitivity.time_weight) | |
| ) | |
| drowsiness_level = max(0, min(100, drowsiness_level)) | |
| drowsiness_level = round(drowsiness_level, 1) | |
| # Kirim ke MQTT | |
| if system_active: | |
| if drowsiness_level < 30: | |
| tingkat = 0 # Normal | |
| elif drowsiness_level < 50: | |
| tingkat = 1 # Ringan | |
| elif drowsiness_level < 70: | |
| tingkat = 2 # Sedang | |
| else: | |
| tingkat = 3 # Berat | |
| send_drowsiness_level(tingkat) | |
| return drowsiness_level | |
| def update_drowsiness_history(level: float): | |
| """Update history tingkat kantuk untuk smoothing""" | |
| drowsy_timer['drowsiness_history'].append(level) | |
| if len(drowsy_timer['drowsiness_history']) > 50: | |
| drowsy_timer['drowsiness_history'].pop(0) | |
| if len(drowsy_timer['drowsiness_history']) >= 5: | |
| recent_levels = drowsy_timer['drowsiness_history'][-5:] | |
| smoothed_level = sum(recent_levels) / len(recent_levels) | |
| drowsy_timer['drowsiness_level'] = round(smoothed_level, 1) | |
| else: | |
| drowsy_timer['drowsiness_level'] = level | |
| # ============================================ | |
| # ALARM FUNCTIONS | |
| # ============================================ | |
| def create_alarm_sound(): | |
| """Create alarm sound file""" | |
| try: | |
| import numpy as np | |
| sample_rate = 44100 | |
| duration = 1.0 | |
| frequency = 800 | |
| t = np.linspace(0, duration, int(sample_rate * duration)) | |
| wave = np.sin(2 * np.pi * frequency * t) | |
| modulation = np.sin(2 * np.pi * 5 * t) | |
| wave = wave * (0.5 + 0.5 * modulation) | |
| wave = (wave * 32767).astype(np.int16) | |
| import wave as wav_module | |
| with wav_module.open('alarm.wav', 'w') as wav_file: | |
| wav_file.setnchannels(1) | |
| wav_file.setsampwidth(2) | |
| wav_file.setframerate(sample_rate) | |
| wav_file.writeframes(wave.tobytes()) | |
| print("✅ Alarm sound created successfully!") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Could not create alarm sound: {e}") | |
| return False | |
| def init_alarm(): | |
| """Initialize alarm system""" | |
| if not os.path.exists('alarm.wav'): | |
| print("Creating alarm sound...") | |
| if not create_alarm_sound(): | |
| print("⚠️ Warning: Could not create alarm sound.") | |
| return False | |
| try: | |
| pygame.mixer.music.load('alarm.wav') | |
| print("✅ Alarm system initialized!") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Could not initialize alarm: {e}") | |
| return False | |
| def play_alarm(): | |
| """Play alarm sound""" | |
| try: | |
| if not drowsy_timer['alarm_playing']: | |
| pygame.mixer.music.play(-1) | |
| drowsy_timer['alarm_playing'] = True | |
| print("🚨 ALARM ACTIVATED!") | |
| except Exception as e: | |
| print(f"❌ Could not play alarm: {e}") | |
| def stop_alarm(): | |
| """Stop alarm sound""" | |
| try: | |
| if drowsy_timer['alarm_playing']: | |
| pygame.mixer.music.stop() | |
| drowsy_timer['alarm_playing'] = False | |
| print("✅ Alarm stopped") | |
| except Exception as e: | |
| print(f"❌ Could not stop alarm: {e}") | |
| # ============================================ | |
| # MEDIAPIPE FUNCTIONS | |
| # ============================================ | |
| def distance(p1, p2): | |
| """Calculate Euclidean distance""" | |
| return (((p1[:2] - p2[:2])**2).sum())**0.5 | |
| def eye_aspect_ratio(landmarks, eye): | |
| """Calculate Eye Aspect Ratio""" | |
| N1 = distance(landmarks[eye[1][0]], landmarks[eye[1][1]]) | |
| N2 = distance(landmarks[eye[2][0]], landmarks[eye[2][1]]) | |
| N3 = distance(landmarks[eye[3][0]], landmarks[eye[3][1]]) | |
| D = distance(landmarks[eye[0][0]], landmarks[eye[0][1]]) | |
| return (N1 + N2 + N3) / (3 * D) | |
| def eye_feature(landmarks): | |
| """Get average EAR from both eyes""" | |
| return (eye_aspect_ratio(landmarks, left_eye) + \ | |
| eye_aspect_ratio(landmarks, right_eye)) / 2 | |
| def mouth_feature(landmarks): | |
| """Calculate Mouth Aspect Ratio""" | |
| N1 = distance(landmarks[mouth[1][0]], landmarks[mouth[1][1]]) | |
| N2 = distance(landmarks[mouth[2][0]], landmarks[mouth[2][1]]) | |
| N3 = distance(landmarks[mouth[3][0]], landmarks[mouth[3][1]]) | |
| D = distance(landmarks[mouth[0][0]], landmarks[mouth[0][1]]) | |
| return (N1 + N2 + N3) / (3 * D) | |
| def run_face_mp(image): | |
| """Run MediaPipe face detection""" | |
| image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB) | |
| image.flags.writeable = False | |
| results = face_mesh.process(image) | |
| image.flags.writeable = True | |
| image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| if results.multi_face_landmarks: | |
| landmarks_positions = [] | |
| for _, data_point in enumerate(results.multi_face_landmarks[0].landmark): | |
| landmarks_positions.append([data_point.x, data_point.y, data_point.z]) | |
| landmarks_positions = np.array(landmarks_positions) | |
| landmarks_positions[:, 0] *= image.shape[1] | |
| landmarks_positions[:, 1] *= image.shape[0] | |
| for face_landmarks in results.multi_face_landmarks: | |
| mp_drawing.draw_landmarks( | |
| image=image, | |
| landmark_list=face_landmarks, | |
| connections=face_connections, | |
| landmark_drawing_spec=drawing_spec, | |
| connection_drawing_spec=drawing_spec) | |
| ear = eye_feature(landmarks_positions) | |
| mar = mouth_feature(landmarks_positions) | |
| else: | |
| ear = -1000 | |
| mar = -1000 | |
| return ear, mar, image | |
| def add_info_to_frame(image, ear_main, mar_main, drowsy_timer): | |
| """Add information overlay to frame""" | |
| global current_sensitivity, system_active, system_mode, current_bpm | |
| # Mode dan BPM | |
| mode_color = (0, 255, 255) if system_mode == "auto" else (255, 165, 0) | |
| cv2.putText(image, f"Mode: {system_mode.upper()}", (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, mode_color, 2) | |
| # Tampilkan BPM | |
| bpm_color = (255, 100, 100) | |
| bpm_status = "" | |
| if current_bpm < 62: | |
| bpm_color = (0, 0, 255) # Merah untuk BPM rendah | |
| bpm_status = " (LOW!)" | |
| elif current_bpm > 100: | |
| bpm_color = (0, 255, 255) # Kuning untuk BPM tinggi | |
| bpm_status = " (HIGH!)" | |
| cv2.putText(image, f"BPM: {current_bpm}{bpm_status}", (10, 55), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, bpm_color, 2) | |
| # System status | |
| status_text = "ACTIVE" if system_active else "INACTIVE" | |
| status_color = (0, 255, 0) if system_active else (100, 100, 100) | |
| cv2.putText(image, f"System: {status_text}", (10, 80), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, status_color, 2) | |
| # Info kalibrasi | |
| if calibration_data['awake']['collecting']: | |
| cv2.putText(image, "KALIBRASI: TIDAK NGANTUK", (10, 105), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2) | |
| cv2.putText(image, f"Samples: {calibration_data['awake']['samples']}/50", | |
| (10, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) | |
| elif calibration_data['drowsy']['collecting']: | |
| cv2.putText(image, "KALIBRASI: NGANTUK", (10, 105), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 165, 255), 2) | |
| cv2.putText(image, f"Samples: {calibration_data['drowsy']['samples']}/50", | |
| (10, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 2) | |
| if ear_main > 0 and system_active: | |
| cv2.putText(image, f"EAR: {ear_main:.3f} (Thresh: {current_sensitivity.ear_threshold:.3f})", | |
| (10, 165), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| cv2.putText(image, f"MAR: {mar_main:.3f} (Thresh: {current_sensitivity.mar_threshold:.3f})", | |
| (10, 195), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| if system_active: | |
| drowsiness_level = drowsy_timer['drowsiness_level'] | |
| # Progress bar | |
| bar_width = 200 | |
| bar_height = 20 | |
| bar_x = 10 | |
| bar_y = 220 | |
| cv2.rectangle(image, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), (50, 50, 50), -1) | |
| fill_width = int(bar_width * (drowsiness_level / 100)) | |
| if drowsiness_level < 30: | |
| color = (0, 255, 0) | |
| elif drowsiness_level < 70: | |
| color = (0, 255, 255) | |
| else: | |
| color = (0, 0, 255) | |
| cv2.rectangle(image, (bar_x, bar_y), (bar_x + fill_width, bar_y + bar_height), color, -1) | |
| cv2.rectangle(image, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), (200, 200, 200), 2) | |
| level_text = f"Drowsiness: {drowsiness_level}%" | |
| cv2.putText(image, level_text, (bar_x + bar_width + 10, bar_y + 15), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| # Status text | |
| current_status = drowsy_timer['status'] | |
| if current_status == 0: | |
| status = "AWAKE" | |
| color = (0, 255, 0) | |
| bg_color = (0, 100, 0) | |
| else: | |
| status = "DROWSY - ALARM!" | |
| color = (0, 0, 255) | |
| bg_color = (0, 0, 150) | |
| if int(time.time() * 4) % 2: | |
| color = (255, 255, 255) | |
| text_size = cv2.getTextSize(status, cv2.FONT_HERSHEY_SIMPLEX, 1.0, 2)[0] | |
| cv2.rectangle(image, (5, 250), (text_size[0] + 15, 280), bg_color, -1) | |
| cv2.putText(image, status, (10, 275), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2) | |
| return image | |
| def drowsiness_detection_with_timer(ear, mar, ear_history, mar_history, drowsy_timer, current_frame=None): | |
| """Deteksi kantuk dengan timer - DIMODIFIKASI UNTUK SERTAKAN BPM""" | |
| global current_sensitivity, system_active, current_bpm | |
| if not system_active: | |
| return 0, drowsy_timer | |
| EAR_THRESHOLD = current_sensitivity.ear_threshold | |
| MAR_THRESHOLD = current_sensitivity.mar_threshold | |
| DROWSY_TIME_THRESHOLD = current_sensitivity.drowsy_time_threshold | |
| WAKE_TIME_THRESHOLD = current_sensitivity.wake_time_threshold | |
| current_time = time.time() | |
| currently_drowsy = False | |
| if ear < EAR_THRESHOLD and ear > 0: | |
| currently_drowsy = True | |
| if mar > MAR_THRESHOLD and ear > 0: | |
| currently_drowsy = True | |
| if len(ear_history) >= current_sensitivity.history_length: | |
| recent_history = ear_history[-current_sensitivity.history_length:] | |
| closed_eye_frames = sum(1 for e in recent_history if e < EAR_THRESHOLD) | |
| if closed_eye_frames >= current_sensitivity.drowsy_frames_threshold: | |
| currently_drowsy = True | |
| drowsy_duration = 0 | |
| if drowsy_timer['drowsy_start_time'] is not None: | |
| drowsy_duration = current_time - drowsy_timer['drowsy_start_time'] | |
| drowsiness_level = calculate_drowsiness_level(ear, mar, drowsy_duration) | |
| update_drowsiness_history(drowsiness_level) | |
| if currently_drowsy: | |
| if drowsy_timer['drowsy_start_time'] is None: | |
| drowsy_timer['drowsy_start_time'] = current_time | |
| drowsy_timer['wake_start_time'] = None | |
| drowsy_duration = current_time - drowsy_timer['drowsy_start_time'] | |
| if drowsy_duration >= DROWSY_TIME_THRESHOLD: | |
| if drowsy_timer['status'] != 1: | |
| drowsy_timer['status'] = 1 | |
| play_alarm() | |
| if not drowsy_timer['telegram_sent'] and current_frame is not None: | |
| print("📸 Mengirim gambar ke Telegram...") | |
| timestamp = time.strftime("%Y%m%d_%H%M%S") | |
| img_name = f"drowsy_alert_{timestamp}.png" | |
| # Buat caption dengan informasi lengkap termasuk BPM | |
| caption = f"""⚠️ **PERINGATAN KANTUK!** | |
| **📊 DATA DETEKSI:** | |
| 👁️ EAR: {ear:.3f} | |
| 👄 MAR: {mar:.3f} | |
| 😴 Tingkat Kantuk: {drowsiness_level}% | |
| ⏰ Durasi: {drowsy_duration:.1f} detik | |
| 🕐 Waktu: {time.strftime('%Y-%m-%d %H:%M:%S')} | |
| **📈 DATA KESEHATAN:** | |
| ❤️ Detak Jantung (BPM): {current_bpm if current_bpm > 0 else 'Tidak tersedia'} | |
| ⚠️ **PERHATIAN:** Pengemudi menunjukkan tanda-tanda kantuk!""" | |
| success = send_telegram_image( | |
| current_frame, | |
| img_name, | |
| caption=caption | |
| ) | |
| if success: | |
| drowsy_timer['telegram_sent'] = True | |
| else: | |
| if drowsy_timer['wake_start_time'] is None: | |
| drowsy_timer['wake_start_time'] = current_time | |
| if drowsy_timer['wake_start_time'] is not None: | |
| wake_duration = current_time - drowsy_timer['wake_start_time'] | |
| if wake_duration >= WAKE_TIME_THRESHOLD: | |
| if drowsy_timer['status'] != 0: | |
| drowsy_timer['status'] = 0 | |
| drowsy_timer['drowsy_start_time'] = None | |
| stop_alarm() | |
| if drowsy_timer['telegram_sent']: | |
| drowsy_timer['telegram_sent'] = False | |
| return drowsy_timer['status'], drowsy_timer | |
| # ============================================ | |
| # MEDIAPIPE INITIALIZATION | |
| # ============================================ | |
| right_eye = [[33, 133], [160, 144], [159, 145], [158, 153]] | |
| left_eye = [[263, 362], [387, 373], [386, 374], [385, 380]] | |
| mouth = [[61, 291], [39, 181], [0, 17], [269, 405]] | |
| mp_face_mesh = mp.solutions.face_mesh | |
| try: | |
| face_connections = mp_face_mesh.FACEMESH_CONTOURS | |
| except AttributeError: | |
| face_connections = mp_face_mesh.FACE_CONNECTIONS | |
| face_mesh = mp_face_mesh.FaceMesh( | |
| min_detection_confidence=0.3, min_tracking_confidence=0.8) | |
| mp_drawing = mp.solutions.drawing_utils | |
| drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1) | |
| # ============================================ | |
| # DROWSINESS DETECTOR CLASS | |
| # ============================================ | |
| class DrowsinessDetector: | |
| def __init__(self, mode='web', camera_type='normal', custom_params=None): | |
| self.mode = mode | |
| self.running = False | |
| self.detection_thread = None | |
| self.cap = None | |
| self.picam2 = None | |
| # Initialize camera availability | |
| self.picamera_available = PICAMERA_AVAILABLE | |
| load_calibration_from_file() | |
| set_camera_sensitivity(camera_type, custom_params) | |
| self.alarm_ready = init_alarm() | |
| def start_detection(self): | |
| self.running = True | |
| if self.mode == 'web': | |
| self.detection_thread = threading.Thread(target=self.shared_camera_loop, daemon=True) | |
| self.detection_thread.start() | |
| def stop_detection(self): | |
| self.running = False | |
| stop_alarm() | |
| # Stop Pi Camera | |
| if self.picam2 is not None: | |
| try: | |
| self.picam2.stop() | |
| print("✅ Pi Camera stopped") | |
| except: | |
| pass | |
| # Stop OpenCV camera | |
| if self.cap: | |
| self.cap.release() | |
| def shared_camera_loop(self): | |
| global drowsy_timer, ear_history, mar_history, ear_main, mar_main | |
| global frame_counter, current_ear, current_mar, latest_frame, frame_lock | |
| decay = current_sensitivity.ear_decay | |
| detection_frequency = 2 | |
| print("🚀 Shared camera loop started...") | |
| print(f"📷 Camera type: {'Pi Camera' if self.picamera_available else 'OpenCV Camera'}") | |
| # Initialize camera based on availability | |
| if self.picamera_available: | |
| try: | |
| # Initialize Pi Camera | |
| self.picam2 = Picamera2() | |
| # Configure camera with simpler settings | |
| config = self.picam2.create_preview_configuration( | |
| main={"size": (640, 480), "format": "RGB888"} | |
| ) | |
| self.picam2.configure(config) | |
| self.picam2.start() | |
| print("✅ Pi Camera initialized successfully!") | |
| except Exception as e: | |
| print(f"❌ Failed to initialize Pi Camera: {e}") | |
| print("📷 Falling back to OpenCV camera...") | |
| self.picamera_available = False | |
| # Fallback to OpenCV camera if Pi Camera not available | |
| if not self.picamera_available or self.picam2 is None: | |
| self.cap = cv2.VideoCapture(index_camera) | |
| if not self.cap.isOpened(): | |
| print(f"❌ Cannot open camera with index {index_camera}") | |
| # Try another index | |
| self.cap = cv2.VideoCapture(0) | |
| if not self.cap.isOpened(): | |
| print("❌ Cannot open any camera") | |
| return | |
| print("✅ OpenCV camera initialized") | |
| while self.running: | |
| try: | |
| # Capture frame based on camera type | |
| if self.picamera_available and self.picam2 is not None: | |
| # Capture from Pi Camera | |
| frame = self.picam2.capture_array() | |
| # Ensure frame has correct format | |
| if len(frame.shape) == 2: | |
| frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) | |
| elif frame.shape[2] == 4: | |
| frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR) | |
| elif frame.shape[2] == 3: | |
| # PiCamera returns RGB, convert to BGR for OpenCV | |
| frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| else: | |
| # Capture from OpenCV camera | |
| success, frame = self.cap.read() | |
| if not success: | |
| time.sleep(0.1) | |
| continue | |
| # Process frame | |
| ear, mar, processed_image = run_face_mp(frame) | |
| if ear != -1000: | |
| if ear_main == 0: | |
| ear_main = ear | |
| mar_main = mar | |
| else: | |
| ear_main = ear_main * decay + (1 - decay) * ear | |
| mar_main = mar_main * current_sensitivity.mar_decay + (1 - current_sensitivity.mar_decay) * mar | |
| ear_history.append(ear_main) | |
| mar_history.append(mar_main) | |
| if len(ear_history) > current_sensitivity.history_length: | |
| ear_history.pop(0) | |
| if len(mar_history) > current_sensitivity.history_length: | |
| mar_history.pop(0) | |
| current_ear = ear_main if ear != -1000 else 0 | |
| current_mar = mar_main if ear != -1000 else 0 | |
| collect_calibration_data() | |
| frame_counter += 1 | |
| if frame_counter >= detection_frequency and ear != -1000: | |
| frame_counter = 0 | |
| label, drowsy_timer = drowsiness_detection_with_timer( | |
| ear_main, mar_main, ear_history, mar_history, drowsy_timer, | |
| current_frame=processed_image) | |
| display_image = add_info_to_frame(processed_image.copy(), ear_main, mar_main, drowsy_timer) | |
| with frame_lock: | |
| latest_frame = display_image.copy() | |
| time.sleep(0.05) | |
| except Exception as e: | |
| print(f"❌ Error in camera loop: {e}") | |
| time.sleep(0.1) | |
| # Cleanup | |
| if self.picam2 is not None: | |
| try: | |
| self.picam2.stop() | |
| except: | |
| pass | |
| if self.cap is not None: | |
| self.cap.release() | |
| print("🛑 Shared camera loop stopped") | |
| def generate_frames(): | |
| """Generator untuk streaming video""" | |
| global latest_frame, frame_lock | |
| while True: | |
| with frame_lock: | |
| if latest_frame is not None: | |
| frame = latest_frame.copy() | |
| else: | |
| frame = np.zeros((480, 640, 3), dtype=np.uint8) | |
| cv2.putText(frame, "Waiting for camera...", (50, 240), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | |
| ret, buffer = cv2.imencode('.jpg', frame) | |
| frame_bytes = buffer.tobytes() | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n') | |
| time.sleep(0.05) | |
| # ============================================ | |
| # FLASK ROUTES - DIPERBAIKI & LENGKAP | |
| # ============================================ | |
| @app.route('/') | |
| def index(): | |
| return render_template('index.html') | |
| @app.route('/video_feed') | |
| def video_feed(): | |
| return Response(generate_frames(), | |
| mimetype='multipart/x-mixed-replace; boundary=frame') | |
| @app.route('/api/status') | |
| def get_status(): | |
| """Get detection status""" | |
| current_time = time.time() | |
| drowsy_duration = 0 | |
| wake_duration = 0 | |
| if drowsy_timer['drowsy_start_time'] is not None: | |
| drowsy_duration = current_time - drowsy_timer['drowsy_start_time'] | |
| if drowsy_timer['wake_start_time'] is not None: | |
| wake_duration = current_time - drowsy_timer['wake_start_time'] | |
| return jsonify({ | |
| 'status': 'DROWSY' if drowsy_timer['status'] == 1 else 'WAKE', | |
| 'ear': round(current_ear, 3), | |
| 'mar': round(current_mar, 3), | |
| 'drowsiness_level': drowsy_timer['drowsiness_level'], | |
| 'drowsy_duration': round(drowsy_duration, 1), | |
| 'wake_duration': round(wake_duration, 1), | |
| 'alarm_playing': drowsy_timer['alarm_playing'], | |
| 'thresholds': { | |
| 'ear': current_sensitivity.ear_threshold, | |
| 'mar': current_sensitivity.mar_threshold | |
| }, | |
| 'current_bpm': current_bpm, | |
| 'bpm_last_update': bpm_last_update | |
| }) | |
| @app.route('/api/mqtt/status') | |
| def get_mqtt_status(): | |
| """Get MQTT status""" | |
| global current_bpm, bpm_last_update, system_mode, system_active | |
| bpm_age = time.time() - bpm_last_update if bpm_last_update > 0 else 999 | |
| bpm_connected = bpm_age < 5 | |
| return jsonify({ | |
| 'mqtt_connected': mqtt_client.is_connected() if mqtt_client else False, | |
| 'current_bpm': current_bpm, | |
| 'bpm_connected': bpm_connected, | |
| 'bpm_age': round(bpm_age, 1), | |
| 'system_mode': system_mode, | |
| 'system_active': system_active, | |
| 'auto_activated': auto_activated | |
| }) | |
| @app.route('/api/system/mode/<mode>', methods=['POST']) | |
| def set_system_mode(mode): | |
| """Set system mode (manual/auto)""" | |
| global system_mode, system_active, auto_activated | |
| if mode not in ['manual', 'auto']: | |
| return jsonify({'status': 'error', 'message': 'Mode tidak valid'}), 400 | |
| system_mode = mode | |
| if mode == 'manual': | |
| auto_activated = False | |
| elif mode == 'auto': | |
| if current_bpm < 62 and current_bpm > 0: | |
| system_active = True | |
| auto_activated = True | |
| else: | |
| system_active = False | |
| auto_activated = False | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': f'Mode changed to {mode}', | |
| 'system_mode': system_mode, | |
| 'system_active': system_active | |
| }) | |
| @app.route('/api/system/toggle', methods=['POST']) | |
| def toggle_system(): | |
| """Toggle system active/inactive (manual mode only)""" | |
| global system_active, system_mode | |
| if system_mode == 'auto': | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'Cannot toggle in AUTO mode' | |
| }), 400 | |
| system_active = not system_active | |
| if not system_active: | |
| send_drowsiness_level(0) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'system_active': system_active | |
| }) | |
| # ============================================ | |
| # CALIBRATION API ROUTES | |
| # ============================================ | |
| @app.route('/api/calibration/start/<mode>', methods=['POST']) | |
| def api_start_calibration(mode): | |
| """API untuk memulai kalibrasi""" | |
| try: | |
| success = start_calibration(mode) | |
| if success: | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': f'Kalibrasi {mode} dimulai', | |
| 'mode': mode | |
| }) | |
| else: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': f'Gagal memulai kalibrasi {mode}' | |
| }), 400 | |
| except Exception as e: | |
| print(f"❌ Error in start_calibration API: {e}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': str(e) | |
| }), 500 | |
| @app.route('/api/calibration/stop/<mode>', methods=['POST']) | |
| def api_stop_calibration(mode): | |
| """API untuk menghentikan kalibrasi""" | |
| try: | |
| success = stop_calibration(mode) | |
| if success: | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': f'Kalibrasi {mode} dihentikan', | |
| 'mode': mode, | |
| 'samples': calibration_data[mode]['samples'] | |
| }) | |
| else: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': f'Gagal menghentikan kalibrasi {mode}' | |
| }), 400 | |
| except Exception as e: | |
| print(f"❌ Error in stop_calibration API: {e}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': str(e) | |
| }), 500 | |
| @app.route('/api/calibration/calculate', methods=['POST']) | |
| def api_calculate_thresholds(): | |
| """API untuk menghitung threshold""" | |
| try: | |
| success = calculate_calibrated_thresholds() | |
| if success: | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': 'Threshold berhasil dihitung', | |
| 'thresholds': calibration_data['calculated_thresholds'] | |
| }) | |
| else: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'Kalibrasi belum lengkap. Selesaikan kalibrasi awake dan drowsy terlebih dahulu.' | |
| }), 400 | |
| except Exception as e: | |
| print(f"❌ Error in calculate_thresholds API: {e}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': str(e) | |
| }), 500 | |
| @app.route('/api/calibration/apply', methods=['POST']) | |
| def api_apply_thresholds(): | |
| """API untuk menerapkan threshold""" | |
| try: | |
| success = apply_calibrated_thresholds() | |
| if success: | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': 'Threshold berhasil diterapkan', | |
| 'thresholds': { | |
| 'ear': current_sensitivity.ear_threshold, | |
| 'mar': current_sensitivity.mar_threshold | |
| } | |
| }) | |
| else: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'Gagal menerapkan threshold' | |
| }), 400 | |
| except Exception as e: | |
| print(f"❌ Error in apply_thresholds API: {e}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': str(e) | |
| }), 500 | |
| @app.route('/api/calibration/reset', methods=['POST']) | |
| def api_reset_calibration(): | |
| """API untuk mereset kalibrasi""" | |
| try: | |
| success = reset_calibration() | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': 'Kalibrasi direset' | |
| }) | |
| except Exception as e: | |
| print(f"❌ Error in reset_calibration API: {e}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': str(e) | |
| }), 500 | |
| @app.route('/api/calibration/status') | |
| def api_calibration_status(): | |
| """API untuk mendapatkan status kalibrasi""" | |
| try: | |
| return jsonify({ | |
| 'awake': { | |
| 'collecting': calibration_data['awake']['collecting'], | |
| 'samples': calibration_data['awake']['samples'], | |
| 'completed': calibration_data['awake']['completed'], | |
| 'avg_ear': round(calibration_data['awake']['avg_ear'], 3), | |
| 'avg_mar': round(calibration_data['awake']['avg_mar'], 3) | |
| }, | |
| 'drowsy': { | |
| 'collecting': calibration_data['drowsy']['collecting'], | |
| 'samples': calibration_data['drowsy']['samples'], | |
| 'completed': calibration_data['drowsy']['completed'], | |
| 'avg_ear': round(calibration_data['drowsy']['avg_ear'], 3), | |
| 'avg_mar': round(calibration_data['drowsy']['avg_mar'], 3) | |
| }, | |
| 'calculated_thresholds': calibration_data['calculated_thresholds'] | |
| }) | |
| except Exception as e: | |
| print(f"❌ Error in calibration_status API: {e}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': str(e) | |
| }), 500 | |
| @app.route('/api/set_sensitivity/<camera_type>', methods=['POST']) | |
| def api_set_sensitivity(camera_type): | |
| """Set sensitivity preset atau custom""" | |
| try: | |
| custom_params = request.get_json() if request.is_json else None | |
| set_camera_sensitivity(camera_type, custom_params) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': f'Sensitivity set to {camera_type}', | |
| 'thresholds': { | |
| 'ear': current_sensitivity.ear_threshold, | |
| 'mar': current_sensitivity.mar_threshold | |
| } | |
| }) | |
| except Exception as e: | |
| print(f"❌ Error in set_sensitivity API: {e}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': str(e) | |
| }), 500 | |
| @app.route('/api/stop_alarm', methods=['POST']) | |
| def api_stop_alarm(): | |
| """Stop alarm manually""" | |
| stop_alarm() | |
| return jsonify({'status': 'success'}) | |
| # ============================================ | |
| # MAIN FUNCTION | |
| # ============================================ | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Drowsiness Detection with MQTT') | |
| parser.add_argument('--mode', choices=['web'], default='web') | |
| parser.add_argument('--port', type=int, default=5000) | |
| parser.add_argument('--system-mode', choices=['manual', 'auto'], default='manual') | |
| args = parser.parse_args() | |
| global system_mode | |
| system_mode = args.system_mode | |
| print("=" * 60) | |
| print("DROWSINESS DETECTION SYSTEM WITH MQTT") | |
| print("=" * 60) | |
| print(f"📡 MQTT Broker: {MQTT_BROKER}") | |
| print(f"📨 Subscribe: {TOPIC_BPM}") | |
| print(f"📤 Publish: {TOPIC_NGANTUK}") | |
| print(f"⚙️ System Mode: {system_mode.upper()}") | |
| print("\n🚀 Initializing MQTT...") | |
| if not init_mqtt(): | |
| print("❌ MQTT Failed") | |
| return | |
| time.sleep(2) | |
| detector = DrowsinessDetector(mode=args.mode) | |
| try: | |
| print(f"\n🌐 Starting WEB mode on port {args.port}") | |
| detector.start_detection() | |
| app.run(debug=False, host='0.0.0.0', port=args.port, use_reloader=False) | |
| except KeyboardInterrupt: | |
| print("\n🛑 Shutting down...") | |
| if mqtt_client: | |
| mqtt_client.loop_stop() | |
| mqtt_client.disconnect() | |
| detector.stop_detection() | |
| print("✅ Stopped") | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment