Created
February 23, 2026 07:47
-
-
Save ducnh1022/2481f4acbf53d737cffc09298dbec338 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pyaudio | |
| import numpy as np | |
| import opensmile | |
| import time | |
| import datetime | |
| import collections | |
| import soundfile as sf # pip install soundfile | |
| # ================= Configuration ================= | |
| SAMPLE_RATE = 16000 | |
| CHUNK_DURATION_SEC = 0.5 # Analyze every 0.5 s | |
| CHUNK_SIZE = int(SAMPLE_RATE * CHUNK_DURATION_SEC) | |
| PRE_BUFFER_SEC = 3.0 # Keep last 3 seconds as pre-roll | |
| PRE_BUFFER_CHUNKS = int(PRE_BUFFER_SEC / CHUNK_DURATION_SEC) | |
| SILENCE_TIMEOUT_SEC = 4.0 # Stop recording after this much silence | |
| # Detection thresholds (tune these after testing!) | |
| LOUDNESS_SPEECH_THRESHOLD = 35.0 # dB – normal talking range | |
| LOUDNESS_DANGER_THRESHOLD = 70.0 # dB – scream / impact / very loud | |
| VOICING_PROB_SPEECH = 0.40 # > this → likely voiced speech | |
| # ================= openSMILE setup ================= | |
| smile = opensmile.Smile( | |
| feature_set=opensmile.FeatureSet.eGeMAPSv02, | |
| feature_level=opensmile.FeatureLevel.Functionals | |
| ) | |
| # Print ALL feature names (there are usually ~80–90 in eGeMAPSv02 functionals) | |
| print("\nAll available feature names:") | |
| for i, name in enumerate(smile.feature_names, 1): | |
| print(f"{i:2d}. {name}") | |
| # ================= Audio setup ================= | |
| p = pyaudio.PyAudio() | |
| stream = p.open( | |
| format=pyaudio.paFloat32, | |
| channels=1, | |
| rate=SAMPLE_RATE, | |
| input=True, | |
| frames_per_buffer=CHUNK_SIZE * 2 | |
| ) | |
| # Ring buffer for pre-trigger audio (last few seconds) | |
| pre_buffer = collections.deque(maxlen=PRE_BUFFER_CHUNKS) | |
| # Recording state | |
| is_recording = False | |
| recorded_chunks = [] | |
| recording_start_time = None | |
| last_trigger_time = time.time() | |
| print("Monitoring microphone... (Ctrl+C to stop)") | |
| print("Will record & save only on speech or loud danger events") | |
| print(f"Speech trigger: loudness > {LOUDNESS_SPEECH_THRESHOLD} dB + voicing > {VOICING_PROB_SPEECH}") | |
| print(f"Danger trigger: loudness > {LOUDNESS_DANGER_THRESHOLD} dB") | |
| try: | |
| while True: | |
| data = stream.read(CHUNK_SIZE, exception_on_overflow=False) | |
| audio_chunk = np.frombuffer(data, dtype=np.float32) | |
| # Always keep recent audio for pre-roll | |
| pre_buffer.append(audio_chunk.copy()) | |
| # Extract features | |
| features = smile.process_signal(audio_chunk, sampling_rate=SAMPLE_RATE) | |
| loudness = features['loudness_sma3_amean'].values[0] | |
| voicing = features.get('voicingProb_sma3nz_amean', np.array([0])).values[0] | |
| now = time.time() | |
| # Detect interesting event | |
| is_speech = (loudness > LOUDNESS_SPEECH_THRESHOLD) and (voicing > VOICING_PROB_SPEECH) | |
| is_danger = loudness > LOUDNESS_DANGER_THRESHOLD | |
| triggered = is_speech or is_danger | |
| if triggered: | |
| last_trigger_time = now | |
| if not is_recording: | |
| print(f"\n[{time.strftime('%H:%M:%S')}] TRIGGER! " | |
| f"Loudness: {loudness:5.1f} dB | Voicing: {voicing:4.2f} " | |
| f"{'DANGER' if is_danger else 'SPEECH'} → Starting recording") | |
| is_recording = True | |
| recording_start_time = now | |
| # Start with pre-buffer | |
| recorded_chunks = list(pre_buffer) # copy recent silent/leading audio | |
| # Add current chunk | |
| recorded_chunks.append(audio_chunk) | |
| # If recording, check if we should stop | |
| elif is_recording: | |
| if (now - last_trigger_time) > SILENCE_TIMEOUT_SEC: | |
| print(f"[{time.strftime('%H:%M:%S')}] Silence detected → Stopping & saving") | |
| # Concatenate all chunks into one array | |
| full_audio = np.concatenate(recorded_chunks) | |
| # Normalize to int16 range if needed (optional) | |
| full_audio = np.int16(full_audio / np.max(np.abs(full_audio)) * 32767) | |
| # Generate filename | |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"recording_{timestamp}.wav" | |
| # Save as WAV | |
| sf.write(filename, full_audio, SAMPLE_RATE, subtype='PCM_16') | |
| print(f"Saved: {filename} ({len(full_audio)/SAMPLE_RATE:.1f} seconds)") | |
| # Reset state | |
| is_recording = False | |
| recorded_chunks = [] | |
| recording_start_time = None | |
| # Optional: print status every few seconds even when idle | |
| if not is_recording and int(now) % 5 == 0: | |
| print(f"[{time.strftime('%H:%M:%S')}] Idle | Loudness: {loudness:5.1f} dB | Voicing: {voicing:4.2f}", end='\r') | |
| except KeyboardInterrupt: | |
| print("\nStopped by user.") | |
| finally: | |
| if is_recording and recorded_chunks: | |
| # Emergency save if interrupted during recording | |
| full_audio = np.concatenate(recorded_chunks) | |
| full_audio = np.int16(full_audio / np.max(np.abs(full_audio) + 1e-9) * 32767) | |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| sf.write(f"recording_{timestamp}_partial.wav", full_audio, SAMPLE_RATE, subtype='PCM_16') | |
| print("Partial recording saved (interrupted).") | |
| stream.stop_stream() | |
| stream.close() | |
| p.terminate() | |
| print("Microphone closed.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment