Created
January 17, 2026 05:04
-
-
Save mie00/f742cabb3a62b56a2a32d64756f4bb68 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Room Acoustics Testing Tool | |
| Test different acoustic foam placements by playing test signals | |
| and analyzing the room's response. Compare multiple sessions to | |
| find the optimal treatment configuration. | |
| Usage: | |
| python room_acoustics_test.py devices | |
| python room_acoustics_test.py record --name "no_foam" | |
| python room_acoustics_test.py record --name "foam_left" --type sweep | |
| python room_acoustics_test.py compare --sessions "no_foam,foam_left" | |
| python room_acoustics_test.py list | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| from datetime import datetime | |
| from pathlib import Path | |
| import numpy as np | |
| import sounddevice as sd | |
| from scipy import signal | |
| from scipy.io import wavfile | |
| import matplotlib.pyplot as plt | |
| # Configuration | |
| SAMPLE_RATE = 48000 | |
| SESSIONS_DIR = Path("sessions") | |
| TEST_TYPES = ["sweep", "impulse", "noise", "bursts", "ambient", "voice"] | |
| # Octave band center frequencies for RT60 analysis | |
| OCTAVE_BANDS = [125, 250, 500, 1000, 2000, 4000, 8000] | |
| # ============================================================================= | |
| # Signal Generation | |
| # ============================================================================= | |
| def generate_sweep(duration=3.0, f_start=20, f_end=20000, sample_rate=SAMPLE_RATE): | |
| """Generate a logarithmic sine sweep from f_start to f_end Hz.""" | |
| t = np.linspace(0, duration, int(sample_rate * duration), dtype=np.float32) | |
| sweep = signal.chirp(t, f_start, duration, f_end, method='logarithmic') | |
| # Apply fade in/out to avoid clicks | |
| fade_samples = int(0.01 * sample_rate) | |
| sweep[:fade_samples] *= np.linspace(0, 1, fade_samples) | |
| sweep[-fade_samples:] *= np.linspace(1, 0, fade_samples) | |
| return sweep.astype(np.float32) | |
| def generate_impulse(duration=1.0, sample_rate=SAMPLE_RATE): | |
| """Generate an impulse (click) signal for RT60 measurement.""" | |
| samples = int(sample_rate * duration) | |
| impulse = np.zeros(samples, dtype=np.float32) | |
| # Create a short click (1ms) at the start with a smooth envelope | |
| click_samples = int(0.001 * sample_rate) | |
| t = np.linspace(0, np.pi, click_samples) | |
| impulse[:click_samples] = np.sin(t) * np.sin(2 * np.pi * 1000 * np.linspace(0, 0.001, click_samples)) | |
| return impulse.astype(np.float32) | |
| def generate_tone_burst(frequency, burst_duration=0.1, tail_duration=1.0, sample_rate=SAMPLE_RATE): | |
| """ | |
| Generate a tone burst at a specific frequency. | |
| A tone burst is a short sine wave that stops abruptly, allowing | |
| measurement of how long that specific frequency reverberates. | |
| """ | |
| burst_samples = int(burst_duration * sample_rate) | |
| tail_samples = int(tail_duration * sample_rate) | |
| total_samples = burst_samples + tail_samples | |
| tone = np.zeros(total_samples, dtype=np.float32) | |
| # Generate the tone burst | |
| t = np.linspace(0, burst_duration, burst_samples) | |
| burst = np.sin(2 * np.pi * frequency * t) | |
| # Apply smooth envelope to avoid clicks | |
| envelope = np.ones(burst_samples) | |
| fade_samples = min(int(0.01 * sample_rate), burst_samples // 4) | |
| envelope[:fade_samples] = np.linspace(0, 1, fade_samples) | |
| envelope[-fade_samples:] = np.linspace(1, 0, fade_samples) | |
| tone[:burst_samples] = burst * envelope | |
| return tone.astype(np.float32) | |
| def generate_multi_frequency_bursts(frequencies=None, gap_duration=2.0, sample_rate=SAMPLE_RATE): | |
| """ | |
| Generate a sequence of tone bursts at different frequencies. | |
| Each burst is followed by silence to capture the decay at that frequency. | |
| This provides more accurate per-band RT60 measurements than a single impulse. | |
| Low frequencies get longer bursts to build up enough energy. | |
| Returns: | |
| Tuple of (signal, frequency_times, burst_durations) where: | |
| - frequency_times maps each frequency to its start time in seconds | |
| - burst_durations maps each frequency to its burst duration | |
| """ | |
| if frequencies is None: | |
| frequencies = OCTAVE_BANDS | |
| signals = [] | |
| frequency_times = {} | |
| burst_durations = {} | |
| current_time = 0.0 | |
| for freq in frequencies: | |
| # Low frequencies need longer bursts to build energy | |
| # At 125Hz, one cycle is 8ms, so 100ms is only ~12 cycles | |
| # We want at least 20-30 cycles for stable measurement | |
| if freq <= 125: | |
| burst_duration = 0.5 # 500ms for very low frequencies | |
| elif freq <= 250: | |
| burst_duration = 0.3 # 300ms | |
| elif freq <= 500: | |
| burst_duration = 0.2 # 200ms | |
| else: | |
| burst_duration = 0.1 # 100ms for higher frequencies | |
| burst = generate_tone_burst(freq, burst_duration, gap_duration, sample_rate) | |
| signals.append(burst) | |
| frequency_times[freq] = current_time | |
| burst_durations[freq] = burst_duration | |
| current_time += len(burst) / sample_rate | |
| combined = np.concatenate(signals) | |
| return combined.astype(np.float32), frequency_times, burst_durations | |
| def generate_pink_noise(duration=2.0, sample_rate=SAMPLE_RATE): | |
| """Generate pink noise (1/f spectrum) for broadband testing.""" | |
| samples = int(sample_rate * duration) | |
| # Generate white noise | |
| white = np.random.randn(samples) | |
| # Apply 1/f filter using the Voss-McCartney algorithm approximation | |
| # Simple approach: filter white noise | |
| b, a = signal.butter(1, 0.002, btype='low') | |
| pink = signal.lfilter(b, a, white) | |
| # Add some higher frequency content back | |
| pink = 0.7 * pink + 0.3 * white | |
| # Normalize | |
| pink = pink / np.max(np.abs(pink)) * 0.8 | |
| # Fade in/out | |
| fade_samples = int(0.05 * sample_rate) | |
| pink[:fade_samples] *= np.linspace(0, 1, fade_samples) | |
| pink[-fade_samples:] *= np.linspace(1, 0, fade_samples) | |
| return pink.astype(np.float32) | |
| # ============================================================================= | |
| # Audio I/O | |
| # ============================================================================= | |
| def list_devices(): | |
| """List available audio devices.""" | |
| print("\n=== Audio Devices ===\n") | |
| devices = sd.query_devices() | |
| for i, dev in enumerate(devices): | |
| direction = [] | |
| if dev['max_input_channels'] > 0: | |
| direction.append(f"IN:{dev['max_input_channels']}ch") | |
| if dev['max_output_channels'] > 0: | |
| direction.append(f"OUT:{dev['max_output_channels']}ch") | |
| default_markers = [] | |
| if i == sd.default.device[0]: | |
| default_markers.append("*DEFAULT INPUT*") | |
| if i == sd.default.device[1]: | |
| default_markers.append("*DEFAULT OUTPUT*") | |
| print(f"[{i}] {dev['name']}") | |
| print(f" {', '.join(direction)} @ {int(dev['default_samplerate'])}Hz {' '.join(default_markers)}") | |
| print("\nTo use specific devices, set environment variables:") | |
| print(" SD_INPUT_DEVICE=<id> SD_OUTPUT_DEVICE=<id>") | |
| def play_and_record(test_signal, sample_rate=SAMPLE_RATE, pre_silence=0.5, post_silence=1.5): | |
| """ | |
| Play a test signal while simultaneously recording. | |
| Args: | |
| test_signal: The signal to play | |
| sample_rate: Sample rate in Hz | |
| pre_silence: Silence before playback (seconds) | |
| post_silence: Silence after playback for capturing reverb tail | |
| Returns: | |
| Tuple of (recorded audio, padded playback signal) as numpy arrays | |
| """ | |
| pre_samples = int(pre_silence * sample_rate) | |
| post_samples = int(post_silence * sample_rate) | |
| # Create padded playback signal | |
| playback = np.concatenate([ | |
| np.zeros(pre_samples, dtype=np.float32), | |
| test_signal, | |
| np.zeros(post_samples, dtype=np.float32) | |
| ]) | |
| total_samples = len(playback) | |
| print(f" Recording {total_samples / sample_rate:.1f}s (including {post_silence}s reverb tail)...") | |
| # Simultaneous play and record with low latency to minimize processing | |
| # Using 'low' latency and exclusive_stream helps bypass some system processing | |
| recording = sd.playrec( | |
| playback, | |
| samplerate=sample_rate, | |
| channels=1, | |
| dtype=np.float32, | |
| latency='low' | |
| ) | |
| sd.wait() | |
| return recording.flatten(), playback | |
| def extract_impulse_response(recording, original_signal, sample_rate=SAMPLE_RATE): | |
| """ | |
| Extract the room's impulse response by deconvolving the recording. | |
| For sweep signals, this gives a clean impulse response. | |
| For other signals, we use a simpler subtraction approach. | |
| Returns: | |
| Impulse response as numpy array | |
| """ | |
| # Ensure same length | |
| min_len = min(len(recording), len(original_signal)) | |
| recording = recording[:min_len] | |
| original = original_signal[:min_len] | |
| # Find the delay (cross-correlation peak) | |
| correlation = np.correlate(recording, original[:len(original)//4], mode='valid') | |
| delay = np.argmax(np.abs(correlation)) | |
| # Align signals | |
| if delay > 0 and delay < len(recording): | |
| aligned_recording = recording[delay:] | |
| aligned_original = original[:len(aligned_recording)] | |
| else: | |
| aligned_recording = recording | |
| aligned_original = original[:len(recording)] | |
| # For deconvolution, use frequency domain division | |
| # Add small regularization to avoid division by zero | |
| rec_fft = np.fft.rfft(aligned_recording) | |
| orig_fft = np.fft.rfft(aligned_original) | |
| # Wiener deconvolution with regularization | |
| epsilon = 0.01 * np.max(np.abs(orig_fft)) | |
| ir_fft = rec_fft * np.conj(orig_fft) / (np.abs(orig_fft)**2 + epsilon) | |
| impulse_response = np.fft.irfft(ir_fft) | |
| # Normalize | |
| impulse_response = impulse_response / (np.max(np.abs(impulse_response)) + 1e-10) | |
| return impulse_response.astype(np.float32) | |
| # ============================================================================= | |
| # Analysis Functions | |
| # ============================================================================= | |
| def bandpass_filter(data, lowcut, highcut, sample_rate, order=4): | |
| """Apply a bandpass filter to the data.""" | |
| nyq = 0.5 * sample_rate | |
| low = lowcut / nyq | |
| high = min(highcut / nyq, 0.99) # Stay below Nyquist | |
| b, a = signal.butter(order, [low, high], btype='band') | |
| return signal.filtfilt(b, a, data) | |
| def calculate_rt60(recording, sample_rate=SAMPLE_RATE): | |
| """ | |
| Calculate RT60 (reverberation time) for each octave band. | |
| RT60 is the time for sound to decay by 60dB. We measure RT30 or RT20 | |
| and extrapolate, as getting a full 60dB decay is often not possible. | |
| Returns: | |
| Dictionary mapping frequency bands to RT60 values in seconds | |
| """ | |
| rt60_values = {} | |
| for center_freq in OCTAVE_BANDS: | |
| # Calculate octave band edges | |
| low = center_freq / np.sqrt(2) | |
| high = center_freq * np.sqrt(2) | |
| # Skip if outside valid range | |
| if high >= sample_rate / 2: | |
| continue | |
| # Filter to octave band | |
| filtered = bandpass_filter(recording, low, high, sample_rate) | |
| # Calculate envelope using Hilbert transform | |
| envelope = np.abs(signal.hilbert(filtered)) | |
| # Smooth the envelope | |
| window_size = int(0.01 * sample_rate) | |
| envelope = np.convolve(envelope, np.ones(window_size)/window_size, mode='same') | |
| # Convert to dB | |
| envelope_db = 20 * np.log10(envelope + 1e-10) | |
| # Find the peak (start of decay) | |
| peak_idx = np.argmax(envelope_db) | |
| decay = envelope_db[peak_idx:] | |
| if len(decay) < 100: | |
| rt60_values[center_freq] = 0 | |
| continue | |
| # Find -5dB and -25dB points for RT20 measurement | |
| peak_db = decay[0] | |
| try: | |
| idx_5db = np.where(decay <= peak_db - 5)[0][0] | |
| idx_25db = np.where(decay <= peak_db - 25)[0][0] | |
| # Calculate RT20 and extrapolate to RT60 | |
| rt20 = (idx_25db - idx_5db) / sample_rate | |
| rt60 = rt20 * 3 # Extrapolate from 20dB to 60dB decay | |
| rt60_values[center_freq] = round(rt60, 3) | |
| except (IndexError, ValueError): | |
| rt60_values[center_freq] = 0 | |
| return rt60_values | |
| def calculate_rt60_from_bursts(recording, frequency_times, burst_durations, gap_duration=2.0, sample_rate=SAMPLE_RATE): | |
| """ | |
| Calculate RT60 from tone burst recording with known frequency timing. | |
| This is more accurate than broadband analysis because each frequency | |
| has dedicated measurement time with strong signal energy. | |
| Args: | |
| recording: The recorded audio | |
| frequency_times: Dict mapping frequency to start time in seconds | |
| burst_durations: Dict mapping frequency to burst duration in seconds | |
| gap_duration: Silence after each burst for decay measurement | |
| Returns: | |
| Dictionary mapping frequency bands to RT60 values in seconds | |
| """ | |
| rt60_values = {} | |
| for freq, start_time in frequency_times.items(): | |
| burst_duration = burst_durations.get(freq, 0.1) | |
| # Extract the section for this frequency | |
| start_sample = int(start_time * sample_rate) | |
| end_sample = int((start_time + burst_duration + gap_duration) * sample_rate) | |
| if end_sample > len(recording): | |
| continue | |
| section = recording[start_sample:end_sample] | |
| # Bandpass filter around the burst frequency (narrow band for accuracy) | |
| low = freq * 0.8 | |
| high = min(freq * 1.2, sample_rate / 2 - 100) | |
| try: | |
| filtered = bandpass_filter(section, low, high, sample_rate) | |
| except ValueError: | |
| rt60_values[freq] = 0 | |
| continue | |
| # Calculate envelope | |
| envelope = np.abs(signal.hilbert(filtered)) | |
| # Smooth - use longer window for low frequencies | |
| if freq <= 250: | |
| window_size = int(0.02 * sample_rate) # 20ms for low freq | |
| else: | |
| window_size = int(0.005 * sample_rate) # 5ms for high freq | |
| envelope = np.convolve(envelope, np.ones(window_size)/window_size, mode='same') | |
| # Convert to dB | |
| envelope_db = 20 * np.log10(envelope + 1e-10) | |
| # Find the end of the burst (peak) - should be around burst_duration | |
| burst_end_sample = int(burst_duration * sample_rate) | |
| search_start = max(0, burst_end_sample - int(0.05 * sample_rate)) | |
| search_end = min(len(envelope_db), burst_end_sample + int(0.1 * sample_rate)) | |
| peak_idx = search_start + np.argmax(envelope_db[search_start:search_end]) | |
| decay = envelope_db[peak_idx:] | |
| if len(decay) < 100: | |
| rt60_values[freq] = 0 | |
| continue | |
| # Measure RT20 (from -5dB to -25dB) and extrapolate | |
| peak_db = decay[0] | |
| try: | |
| idx_5db = np.where(decay <= peak_db - 5)[0][0] | |
| idx_25db = np.where(decay <= peak_db - 25)[0][0] | |
| rt20 = (idx_25db - idx_5db) / sample_rate | |
| rt60 = rt20 * 3 | |
| # Sanity check: RT60 should be reasonable (0.1s to 3s for most rooms) | |
| if rt60 > 3.0: | |
| rt60_values[freq] = 0 # Likely measurement error | |
| else: | |
| rt60_values[freq] = round(rt60, 3) | |
| except (IndexError, ValueError): | |
| rt60_values[freq] = 0 | |
| return rt60_values | |
| def calculate_frequency_response(recording, original_sweep, sample_rate=SAMPLE_RATE): | |
| """ | |
| Calculate frequency response by comparing recorded sweep to original. | |
| Returns: | |
| Tuple of (frequencies, magnitude_db) | |
| """ | |
| # Use the portion of recording that contains the sweep response | |
| # Trim to match lengths, accounting for latency | |
| min_len = min(len(recording), len(original_sweep)) | |
| # Calculate transfer function using FFT | |
| rec_fft = np.fft.rfft(recording[:min_len]) | |
| orig_fft = np.fft.rfft(original_sweep[:min_len]) | |
| # Avoid division by zero | |
| orig_fft[np.abs(orig_fft) < 1e-10] = 1e-10 | |
| # Transfer function | |
| h = rec_fft / orig_fft | |
| magnitude_db = 20 * np.log10(np.abs(h) + 1e-10) | |
| # Frequency axis | |
| frequencies = np.fft.rfftfreq(min_len, 1/sample_rate) | |
| # Smooth the response | |
| window_size = max(1, len(magnitude_db) // 500) | |
| magnitude_db = np.convolve(magnitude_db, np.ones(window_size)/window_size, mode='same') | |
| return frequencies, magnitude_db | |
| def analyze_impulse_response(recording, sample_rate=SAMPLE_RATE): | |
| """ | |
| Analyze impulse response for early reflections. | |
| Returns: | |
| Dictionary with analysis results | |
| """ | |
| # Find the direct sound (main peak) | |
| peak_idx = np.argmax(np.abs(recording)) | |
| peak_time = peak_idx / sample_rate | |
| # Analyze first 100ms for early reflections | |
| early_window = int(0.1 * sample_rate) | |
| early_section = recording[peak_idx:peak_idx + early_window] | |
| # Find significant reflections (peaks above threshold) | |
| threshold = 0.1 * np.max(np.abs(recording)) | |
| peaks, properties = signal.find_peaks(np.abs(early_section), height=threshold, distance=int(0.005 * sample_rate)) | |
| reflections = [] | |
| for peak in peaks[1:10]: # Skip direct sound, get up to 9 reflections | |
| time_ms = (peak / sample_rate) * 1000 | |
| level_db = 20 * np.log10(np.abs(early_section[peak]) / np.abs(early_section[0]) + 1e-10) | |
| reflections.append({ | |
| 'time_ms': round(time_ms, 1), | |
| 'level_db': round(level_db, 1) | |
| }) | |
| return { | |
| 'direct_sound_time': round(peak_time * 1000, 1), | |
| 'early_reflections': reflections | |
| } | |
| def analyze_voice_recording(recording, sample_rate=SAMPLE_RATE): | |
| """ | |
| Analyze a voice recording to understand how the room affects the user's voice. | |
| Returns: | |
| Dictionary with voice analysis including frequency distribution, | |
| estimated fundamental frequency, and energy in voice-relevant bands. | |
| """ | |
| # Find voice activity (non-silent portions) | |
| frame_size = int(0.02 * sample_rate) # 20ms frames | |
| hop_size = frame_size // 2 | |
| frames = [] | |
| for i in range(0, len(recording) - frame_size, hop_size): | |
| frame = recording[i:i + frame_size] | |
| rms = np.sqrt(np.mean(frame ** 2)) | |
| frames.append(rms) | |
| frames = np.array(frames) | |
| threshold = np.max(frames) * 0.1 | |
| voice_frames = np.where(frames > threshold)[0] | |
| if len(voice_frames) == 0: | |
| return {'error': 'No voice detected in recording'} | |
| # Extract voiced portion | |
| start_frame = voice_frames[0] | |
| end_frame = voice_frames[-1] | |
| start_sample = start_frame * hop_size | |
| end_sample = min(end_frame * hop_size + frame_size, len(recording)) | |
| voiced_section = recording[start_sample:end_sample] | |
| # Analyze frequency content | |
| fft = np.fft.rfft(voiced_section) | |
| freqs = np.fft.rfftfreq(len(voiced_section), 1/sample_rate) | |
| magnitude = np.abs(fft) | |
| # Find fundamental frequency (F0) - typically 80-300Hz for speech | |
| f0_range = (freqs >= 80) & (freqs <= 400) | |
| if np.any(f0_range): | |
| f0_freqs = freqs[f0_range] | |
| f0_mags = magnitude[f0_range] | |
| peaks, _ = signal.find_peaks(f0_mags, height=np.max(f0_mags) * 0.3) | |
| if len(peaks) > 0: | |
| f0_estimate = f0_freqs[peaks[0]] | |
| else: | |
| f0_estimate = f0_freqs[np.argmax(f0_mags)] | |
| else: | |
| f0_estimate = None | |
| # Energy in voice-relevant frequency bands | |
| voice_bands = { | |
| 'low_voice': (80, 250), # Chest resonance, male fundamental | |
| 'mid_voice': (250, 2000), # Main speech intelligibility | |
| 'presence': (2000, 4000), # Clarity, consonants | |
| 'brilliance': (4000, 8000), # Air, sibilance | |
| } | |
| band_energy = {} | |
| for band_name, (low, high) in voice_bands.items(): | |
| mask = (freqs >= low) & (freqs <= high) | |
| if np.any(mask): | |
| energy = np.sum(magnitude[mask] ** 2) | |
| energy_db = 10 * np.log10(energy + 1e-10) | |
| band_energy[band_name] = round(energy_db, 1) | |
| # Calculate crest factor (peak to RMS ratio) - indicates dynamic range | |
| peak = np.max(np.abs(voiced_section)) | |
| rms = np.sqrt(np.mean(voiced_section ** 2)) | |
| crest_factor_db = 20 * np.log10(peak / (rms + 1e-10)) | |
| # Voice activity duration | |
| voice_duration = len(voiced_section) / sample_rate | |
| return { | |
| 'fundamental_frequency_hz': round(f0_estimate, 1) if f0_estimate else None, | |
| 'voice_duration_seconds': round(voice_duration, 2), | |
| 'energy_by_band_db': band_energy, | |
| 'crest_factor_db': round(crest_factor_db, 1), | |
| 'peak_level_db': round(20 * np.log10(peak + 1e-10), 1), | |
| 'rms_level_db': round(20 * np.log10(rms + 1e-10), 1) | |
| } | |
| def analyze_ambient_noise(recording, sample_rate=SAMPLE_RATE): | |
| """ | |
| Analyze ambient noise recording for noise floor characteristics. | |
| Returns: | |
| Dictionary with noise analysis including: | |
| - Overall noise level (dBA equivalent estimate) | |
| - Per-octave-band noise levels | |
| - Estimated NC (Noise Criteria) rating | |
| """ | |
| # Calculate overall RMS level | |
| rms = np.sqrt(np.mean(recording ** 2)) | |
| overall_db = 20 * np.log10(rms + 1e-10) | |
| # Analyze noise per octave band | |
| noise_by_band = {} | |
| for freq in OCTAVE_BANDS: | |
| low = freq / np.sqrt(2) | |
| high = min(freq * np.sqrt(2), sample_rate / 2 - 100) | |
| try: | |
| filtered = bandpass_filter(recording, low, high, sample_rate) | |
| band_rms = np.sqrt(np.mean(filtered ** 2)) | |
| band_db = 20 * np.log10(band_rms + 1e-10) | |
| noise_by_band[freq] = round(band_db, 1) | |
| except ValueError: | |
| noise_by_band[freq] = None | |
| # Estimate NC rating (simplified - based on mid-frequency noise) | |
| # NC curves are defined at specific frequencies, we approximate | |
| mid_freq_noise = [] | |
| for freq in [500, 1000, 2000]: | |
| if freq in noise_by_band and noise_by_band[freq] is not None: | |
| mid_freq_noise.append(noise_by_band[freq]) | |
| if mid_freq_noise: | |
| # NC rating roughly corresponds to noise level at 1kHz | |
| # Typical: NC-25 is very quiet, NC-40 is office, NC-50+ is loud | |
| avg_mid = np.mean(mid_freq_noise) | |
| # Map dB to NC (very approximate, assumes calibrated mic) | |
| # In practice this needs calibration, but relative comparison is useful | |
| nc_estimate = int(avg_mid + 90) # Rough offset for typical mic sensitivity | |
| nc_estimate = max(15, min(70, nc_estimate)) # Clamp to reasonable range | |
| else: | |
| nc_estimate = None | |
| # Detect dominant frequencies (potential noise sources) | |
| fft = np.fft.rfft(recording) | |
| freqs = np.fft.rfftfreq(len(recording), 1/sample_rate) | |
| magnitude = np.abs(fft) | |
| # Find peaks in spectrum | |
| peaks, _ = signal.find_peaks(magnitude, height=np.max(magnitude) * 0.1, distance=10) | |
| dominant_freqs = [] | |
| for peak in peaks[:5]: # Top 5 peaks | |
| if freqs[peak] > 20 and freqs[peak] < 10000: | |
| dominant_freqs.append({ | |
| 'frequency_hz': round(freqs[peak], 1), | |
| 'relative_level_db': round(20 * np.log10(magnitude[peak] / np.max(magnitude) + 1e-10), 1) | |
| }) | |
| return { | |
| 'overall_level_db': round(overall_db, 1), | |
| 'noise_by_octave_band_db': noise_by_band, | |
| 'nc_rating_estimate': nc_estimate, | |
| 'dominant_frequencies': dominant_freqs, | |
| 'note': 'Levels are relative (uncalibrated). Compare between sessions for meaningful differences.' | |
| } | |
| # ============================================================================= | |
| # Session Management | |
| # ============================================================================= | |
| def get_session_dir(name): | |
| """Get the directory path for a session.""" | |
| return SESSIONS_DIR / name | |
| def save_session(name, recordings, analysis, impulse_responses=None, sample_rate=SAMPLE_RATE, | |
| input_device=None, output_device=None): | |
| """Save session recordings and analysis to disk.""" | |
| session_dir = get_session_dir(name) | |
| session_dir.mkdir(parents=True, exist_ok=True) | |
| # Save each recording as WAV | |
| for test_type, recording in recordings.items(): | |
| wav_path = session_dir / f"recording_{test_type}.wav" | |
| # Convert to 16-bit integer for WAV | |
| recording_int = (recording * 32767).astype(np.int16) | |
| wavfile.write(wav_path, sample_rate, recording_int) | |
| print(f" Saved: {wav_path}") | |
| # Save impulse responses (room response isolated from input) | |
| if impulse_responses: | |
| for test_type, ir in impulse_responses.items(): | |
| ir_path = session_dir / f"impulse_response_{test_type}.wav" | |
| ir_int = (ir * 32767).astype(np.int16) | |
| wavfile.write(ir_path, sample_rate, ir_int) | |
| print(f" Saved: {ir_path} (room impulse response)") | |
| # Build professional analysis JSON with units and method info | |
| professional_analysis = { | |
| 'measurement_info': { | |
| 'description': 'Room acoustics analysis for acoustic treatment evaluation', | |
| 'method': 'Tone burst decay analysis with RT20 extrapolation to RT60', | |
| 'test_signals': list(recordings.keys()), | |
| 'standards_reference': 'Based on ISO 3382-2 (room acoustic parameters)' | |
| }, | |
| 'rt60': { | |
| 'description': 'Reverberation Time (RT60) - time for sound to decay by 60dB', | |
| 'unit': 'seconds', | |
| 'method': 'Measured via RT20 (20dB decay) and extrapolated to 60dB', | |
| 'values_by_frequency_hz': analysis.get('rt60', {}) | |
| }, | |
| 'impulse_response': {}, | |
| 'metadata': { | |
| 'session_name': name, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'sample_rate_hz': sample_rate, | |
| 'equipment': { | |
| 'input_device': input_device or 'not specified', | |
| 'output_device': output_device or 'not specified' | |
| } | |
| } | |
| } | |
| # Add impulse response analysis if present | |
| if 'impulse_response' in analysis: | |
| ir_data = analysis['impulse_response'] | |
| professional_analysis['impulse_response'] = { | |
| 'direct_sound': { | |
| 'description': 'Time from test signal start to direct sound arrival', | |
| 'value': ir_data.get('direct_sound_time'), | |
| 'unit': 'samples', | |
| 'time_ms': round(ir_data.get('direct_sound_time', 0) / sample_rate * 1000, 1) | |
| }, | |
| 'early_reflections': { | |
| 'description': 'Discrete reflections within first 100ms after direct sound', | |
| 'note': 'Strong reflections (above -20dB) may cause comb filtering and coloration', | |
| 'values': [ | |
| { | |
| 'time_after_direct_ms': ref.get('time_ms'), | |
| 'relative_level_db': ref.get('level_db'), | |
| 'unit_time': 'milliseconds', | |
| 'unit_level': 'dB relative to direct sound' | |
| } | |
| for ref in ir_data.get('early_reflections', []) | |
| ] | |
| } | |
| } | |
| # Add ambient noise analysis if present | |
| if 'ambient_noise' in analysis: | |
| ambient = analysis['ambient_noise'] | |
| professional_analysis['ambient_noise'] = { | |
| 'description': 'Background noise floor measurement (no test signal played)', | |
| 'method': 'RMS level analysis per octave band', | |
| 'note': ambient.get('note', 'Levels are relative unless microphone is calibrated'), | |
| 'overall_level': { | |
| 'value': ambient.get('overall_level_db'), | |
| 'unit': 'dB (relative, uncalibrated)' | |
| }, | |
| 'nc_rating': { | |
| 'description': 'Noise Criteria rating estimate (lower is quieter)', | |
| 'value': ambient.get('nc_rating_estimate'), | |
| 'reference': 'NC-25: quiet studio, NC-35: private office, NC-45: open office' | |
| }, | |
| 'levels_by_octave_band': { | |
| 'description': 'Noise level per frequency band', | |
| 'unit': 'dB (relative)', | |
| 'values_by_frequency_hz': ambient.get('noise_by_octave_band_db', {}) | |
| }, | |
| 'dominant_frequencies': { | |
| 'description': 'Prominent noise sources detected in spectrum', | |
| 'values': ambient.get('dominant_frequencies', []) | |
| } | |
| } | |
| # Add voice analysis if present | |
| if 'voice' in analysis: | |
| voice = analysis['voice'] | |
| professional_analysis['voice_test'] = { | |
| 'description': 'Analysis of user voice recording to assess room impact on vocals', | |
| 'method': 'Spectral analysis of voiced segments', | |
| 'fundamental_frequency': { | |
| 'description': 'Estimated pitch of voice (F0)', | |
| 'value': voice.get('fundamental_frequency_hz'), | |
| 'unit': 'Hz' | |
| }, | |
| 'voice_duration': { | |
| 'value': voice.get('voice_duration_seconds'), | |
| 'unit': 'seconds' | |
| }, | |
| 'energy_distribution': { | |
| 'description': 'Energy in voice-relevant frequency bands', | |
| 'unit': 'dB (relative)', | |
| 'bands': { | |
| 'low_voice_80_250hz': voice.get('energy_by_band_db', {}).get('low_voice'), | |
| 'mid_voice_250_2000hz': voice.get('energy_by_band_db', {}).get('mid_voice'), | |
| 'presence_2000_4000hz': voice.get('energy_by_band_db', {}).get('presence'), | |
| 'brilliance_4000_8000hz': voice.get('energy_by_band_db', {}).get('brilliance') | |
| }, | |
| 'note': 'Compare between sessions - if presence/brilliance drops relative to mid_voice, room may be absorbing highs' | |
| }, | |
| 'dynamics': { | |
| 'crest_factor_db': voice.get('crest_factor_db'), | |
| 'peak_level_db': voice.get('peak_level_db'), | |
| 'rms_level_db': voice.get('rms_level_db'), | |
| 'note': 'Crest factor indicates dynamic range (typical speech: 12-18dB)' | |
| } | |
| } | |
| json_path = session_dir / "analysis.json" | |
| with open(json_path, 'w') as f: | |
| json.dump(professional_analysis, f, indent=2) | |
| print(f" Saved: {json_path}") | |
| def load_session(name): | |
| """Load a saved session.""" | |
| session_dir = get_session_dir(name) | |
| if not session_dir.exists(): | |
| raise FileNotFoundError(f"Session '{name}' not found") | |
| # Load analysis | |
| json_path = session_dir / "analysis.json" | |
| with open(json_path, 'r') as f: | |
| analysis = json.load(f) | |
| # Load recordings | |
| recordings = {} | |
| for test_type in TEST_TYPES: | |
| wav_path = session_dir / f"recording_{test_type}.wav" | |
| if wav_path.exists(): | |
| rate, data = wavfile.read(wav_path) | |
| recordings[test_type] = data.astype(np.float32) / 32767 | |
| return recordings, analysis | |
| def list_sessions(): | |
| """List all saved sessions.""" | |
| if not SESSIONS_DIR.exists(): | |
| print("\nNo sessions found.") | |
| return | |
| sessions = [d.name for d in SESSIONS_DIR.iterdir() if d.is_dir()] | |
| if not sessions: | |
| print("\nNo sessions found.") | |
| return | |
| print("\n=== Saved Sessions ===\n") | |
| for name in sorted(sessions): | |
| json_path = SESSIONS_DIR / name / "analysis.json" | |
| if json_path.exists(): | |
| with open(json_path, 'r') as f: | |
| analysis = json.load(f) | |
| timestamp = analysis.get('metadata', {}).get('timestamp', 'Unknown') | |
| print(f" {name}") | |
| print(f" Recorded: {timestamp}") | |
| if 'rt60' in analysis: | |
| avg_rt60 = np.mean([v for v in analysis['rt60'].values() if v > 0]) | |
| print(f" Avg RT60: {avg_rt60:.2f}s") | |
| else: | |
| print(f" {name} (no analysis)") | |
| print() | |
| # ============================================================================= | |
| # Visualization | |
| # ============================================================================= | |
| def plot_rt60_comparison(sessions_data, save_path=None): | |
| """Plot RT60 comparison across sessions.""" | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| n_sessions = len(sessions_data) | |
| bar_width = 0.8 / n_sessions | |
| x = np.arange(len(OCTAVE_BANDS)) | |
| colors = plt.cm.Set2(np.linspace(0, 1, n_sessions)) | |
| for i, (name, analysis) in enumerate(sessions_data.items()): | |
| rt60 = analysis.get('rt60', {}) | |
| values = [rt60.get(str(f), rt60.get(f, 0)) for f in OCTAVE_BANDS] | |
| offset = (i - n_sessions/2 + 0.5) * bar_width | |
| ax.bar(x + offset, values, bar_width, label=name, color=colors[i]) | |
| ax.set_xlabel('Frequency (Hz)') | |
| ax.set_ylabel('RT60 (seconds)') | |
| ax.set_title('Reverberation Time (RT60) by Frequency Band') | |
| ax.set_xticks(x) | |
| ax.set_xticklabels([str(f) for f in OCTAVE_BANDS]) | |
| ax.legend() | |
| ax.grid(axis='y', alpha=0.3) | |
| plt.tight_layout() | |
| if save_path: | |
| plt.savefig(save_path, dpi=150) | |
| print(f" Saved: {save_path}") | |
| return fig | |
| def plot_frequency_response_comparison(sessions_data, save_path=None): | |
| """Plot frequency response comparison across sessions.""" | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| colors = plt.cm.Set2(np.linspace(0, 1, len(sessions_data))) | |
| for i, (name, (recordings, analysis)) in enumerate(sessions_data.items()): | |
| if 'sweep' in recordings: | |
| # Regenerate sweep for comparison | |
| sweep = generate_sweep() | |
| freqs, magnitude = calculate_frequency_response(recordings['sweep'], sweep) | |
| # Only plot audible range | |
| mask = (freqs >= 20) & (freqs <= 20000) | |
| ax.semilogx(freqs[mask], magnitude[mask], label=name, color=colors[i], alpha=0.8) | |
| ax.set_xlabel('Frequency (Hz)') | |
| ax.set_ylabel('Magnitude (dB)') | |
| ax.set_title('Room Frequency Response') | |
| ax.set_xlim(20, 20000) | |
| ax.legend() | |
| ax.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| if save_path: | |
| plt.savefig(save_path, dpi=150) | |
| print(f" Saved: {save_path}") | |
| return fig | |
| def plot_impulse_response(recording, name, sample_rate=SAMPLE_RATE, save_path=None): | |
| """Plot impulse response waveform.""" | |
| fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8)) | |
| # Time axis | |
| t = np.arange(len(recording)) / sample_rate * 1000 # ms | |
| # Full response | |
| ax1.plot(t, recording, color='steelblue', linewidth=0.5) | |
| ax1.set_xlabel('Time (ms)') | |
| ax1.set_ylabel('Amplitude') | |
| ax1.set_title(f'Impulse Response - {name}') | |
| ax1.grid(True, alpha=0.3) | |
| # Early reflections (first 100ms) | |
| mask = t <= 100 | |
| ax2.plot(t[mask], recording[mask], color='steelblue', linewidth=0.8) | |
| ax2.set_xlabel('Time (ms)') | |
| ax2.set_ylabel('Amplitude') | |
| ax2.set_title(f'Early Reflections (first 100ms) - {name}') | |
| ax2.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| if save_path: | |
| plt.savefig(save_path, dpi=150) | |
| print(f" Saved: {save_path}") | |
| return fig | |
| def plot_session_summary(name, recordings, analysis, save_path=None): | |
| """Plot a summary of a single session.""" | |
| fig = plt.figure(figsize=(14, 10)) | |
| # RT60 bar chart | |
| ax1 = fig.add_subplot(2, 2, 1) | |
| rt60 = analysis.get('rt60', {}) | |
| freqs = [str(f) for f in OCTAVE_BANDS if str(f) in rt60 or f in rt60] | |
| values = [rt60.get(str(f), rt60.get(f, 0)) for f in OCTAVE_BANDS if str(f) in rt60 or f in rt60] | |
| ax1.bar(freqs, values, color='steelblue') | |
| ax1.set_xlabel('Frequency (Hz)') | |
| ax1.set_ylabel('RT60 (seconds)') | |
| ax1.set_title('Reverberation Time (RT60)') | |
| ax1.grid(axis='y', alpha=0.3) | |
| # Frequency response (if sweep available) | |
| ax2 = fig.add_subplot(2, 2, 2) | |
| if 'sweep' in recordings: | |
| sweep = generate_sweep() | |
| freqs_resp, magnitude = calculate_frequency_response(recordings['sweep'], sweep) | |
| mask = (freqs_resp >= 20) & (freqs_resp <= 20000) | |
| ax2.semilogx(freqs_resp[mask], magnitude[mask], color='steelblue') | |
| ax2.set_xlabel('Frequency (Hz)') | |
| ax2.set_ylabel('Magnitude (dB)') | |
| ax2.set_title('Frequency Response') | |
| ax2.set_xlim(20, 20000) | |
| ax2.grid(True, alpha=0.3) | |
| # Impulse response (if available) | |
| ax3 = fig.add_subplot(2, 2, 3) | |
| if 'impulse' in recordings: | |
| t = np.arange(len(recordings['impulse'])) / SAMPLE_RATE * 1000 | |
| ax3.plot(t, recordings['impulse'], color='steelblue', linewidth=0.5) | |
| ax3.set_xlabel('Time (ms)') | |
| ax3.set_ylabel('Amplitude') | |
| ax3.set_title('Impulse Response') | |
| ax3.set_xlim(0, min(500, t[-1])) | |
| ax3.grid(True, alpha=0.3) | |
| # Info text | |
| ax4 = fig.add_subplot(2, 2, 4) | |
| ax4.axis('off') | |
| info_text = f"Session: {name}\n\n" | |
| if 'metadata' in analysis: | |
| info_text += f"Recorded: {analysis['metadata'].get('timestamp', 'N/A')}\n" | |
| info_text += f"Sample Rate: {analysis['metadata'].get('sample_rate', SAMPLE_RATE)} Hz\n\n" | |
| if rt60: | |
| avg_rt60 = np.mean([v for v in rt60.values() if isinstance(v, (int, float)) and v > 0]) | |
| info_text += f"Average RT60: {avg_rt60:.3f}s\n\n" | |
| info_text += "RT60 by band:\n" | |
| for freq in OCTAVE_BANDS: | |
| val = rt60.get(str(freq), rt60.get(freq, 0)) | |
| if val > 0: | |
| info_text += f" {freq:>5} Hz: {val:.3f}s\n" | |
| ax4.text(0.1, 0.9, info_text, transform=ax4.transAxes, fontsize=10, | |
| verticalalignment='top', fontfamily='monospace') | |
| plt.suptitle(f'Room Acoustics Analysis - {name}', fontsize=14, fontweight='bold') | |
| plt.tight_layout() | |
| if save_path: | |
| plt.savefig(save_path, dpi=150) | |
| print(f" Saved: {save_path}") | |
| return fig | |
| # ============================================================================= | |
| # CLI Commands | |
| # ============================================================================= | |
| def cmd_record(args): | |
| """Run acoustic tests and save session.""" | |
| name = args.name | |
| test_types = [args.type] if args.type else TEST_TYPES | |
| # Set audio devices if specified | |
| if args.input is not None: | |
| sd.default.device[0] = args.input | |
| if args.output is not None: | |
| sd.default.device[1] = args.output | |
| print(f"\n=== Recording Session: {name} ===\n") | |
| print(f"Input device: {sd.query_devices(sd.default.device[0])['name']}") | |
| print(f"Output device: {sd.query_devices(sd.default.device[1])['name']}") | |
| # Tip for raw recording | |
| input_dev_name = sd.query_devices(sd.default.device[0])['name'] | |
| if 'pulse' in input_dev_name.lower() or 'default' in input_dev_name.lower(): | |
| print("\n TIP: Using PulseAudio which may apply echo cancellation.") | |
| print(" For raw recording, use a hw: device (e.g., --input 9 for RØDE NT1)") | |
| # Check for existing session | |
| if get_session_dir(name).exists() and not args.force: | |
| print(f"Session '{name}' already exists. Use --force to overwrite.") | |
| return 1 | |
| recordings = {} | |
| impulse_responses = {} | |
| analysis = {'rt60': {}} | |
| for test_type in test_types: | |
| print(f"\n[{test_type.upper()}]") | |
| # Handle ambient noise recording separately (no playback) | |
| if test_type == 'ambient': | |
| print(" Recording ambient noise (5 seconds of silence)...") | |
| print(" Please remain quiet...") | |
| sd.sleep(1000) # Give user a moment | |
| # Record 5 seconds of ambient noise | |
| ambient_duration = 5.0 | |
| recording = sd.rec( | |
| int(ambient_duration * SAMPLE_RATE), | |
| samplerate=SAMPLE_RATE, | |
| channels=1, | |
| dtype=np.float32 | |
| ) | |
| sd.wait() | |
| recording = recording.flatten() | |
| recordings[test_type] = recording | |
| # Analyze ambient noise | |
| print(" Analyzing noise floor...") | |
| ambient_analysis = analyze_ambient_noise(recording) | |
| analysis['ambient_noise'] = ambient_analysis | |
| # Print summary | |
| print(f" Overall level: {ambient_analysis['overall_level_db']:.1f} dB (relative)") | |
| if ambient_analysis['nc_rating_estimate']: | |
| print(f" Estimated NC rating: NC-{ambient_analysis['nc_rating_estimate']}") | |
| if ambient_analysis['dominant_frequencies']: | |
| print(f" Dominant frequencies: {', '.join(str(int(f['frequency_hz'])) + 'Hz' for f in ambient_analysis['dominant_frequencies'][:3])}") | |
| continue # Skip the rest of the loop for ambient | |
| # Handle voice recording separately (user sings/speaks) | |
| if test_type == 'voice': | |
| print(" Voice test - Record yourself singing or speaking") | |
| print("") | |
| print(" Suggested text to read aloud:") | |
| print(" ┌─────────────────────────────────────────────────────────────┐") | |
| print(" │ \"The quick brown fox jumps over the lazy dog. │") | |
| print(" │ She sells seashells by the seashore. │") | |
| print(" │ Peter Piper picked a peck of pickled peppers.\" │") | |
| print(" └─────────────────────────────────────────────────────────────┘") | |
| print("") | |
| print(" Or sing a few bars of your favorite song!") | |
| print("") | |
| print(" Recording starts in...") | |
| for i in [3, 2, 1]: | |
| print(f" {i}...", flush=True) | |
| sd.sleep(1000) | |
| print(" GO! (recording for 8 seconds)") | |
| # Record 8 seconds of voice | |
| voice_duration = 8.0 | |
| recording = sd.rec( | |
| int(voice_duration * SAMPLE_RATE), | |
| samplerate=SAMPLE_RATE, | |
| channels=1, | |
| dtype=np.float32 | |
| ) | |
| sd.wait() | |
| recording = recording.flatten() | |
| recordings[test_type] = recording | |
| print(" Recording complete!") | |
| # Analyze voice | |
| print(" Analyzing voice recording...") | |
| voice_analysis = analyze_voice_recording(recording) | |
| analysis['voice'] = voice_analysis | |
| # Print summary | |
| if 'error' not in voice_analysis: | |
| print(f" Detected voice duration: {voice_analysis['voice_duration_seconds']:.1f}s") | |
| if voice_analysis['fundamental_frequency_hz']: | |
| print(f" Estimated pitch (F0): {voice_analysis['fundamental_frequency_hz']:.0f} Hz") | |
| print(f" Peak level: {voice_analysis['peak_level_db']:.1f} dB") | |
| print(f" Dynamic range (crest): {voice_analysis['crest_factor_db']:.1f} dB") | |
| else: | |
| print(f" Warning: {voice_analysis['error']}") | |
| continue # Skip the rest of the loop for voice | |
| # Generate test signal | |
| frequency_times = None # For bursts test | |
| burst_durations = None | |
| if test_type == 'sweep': | |
| print(" Generating sine sweep (20Hz - 20kHz)...") | |
| test_signal = generate_sweep() | |
| elif test_type == 'impulse': | |
| print(" Generating impulse signal...") | |
| test_signal = generate_impulse() | |
| elif test_type == 'noise': | |
| print(" Generating pink noise...") | |
| test_signal = generate_pink_noise() | |
| elif test_type == 'bursts': | |
| print(f" Generating tone bursts at {len(OCTAVE_BANDS)} frequencies...") | |
| print(f" Frequencies: {', '.join(str(f) + 'Hz' for f in OCTAVE_BANDS)}") | |
| print(" (longer bursts for low frequencies)") | |
| test_signal, frequency_times, burst_durations = generate_multi_frequency_bursts() | |
| # Play and record | |
| print(" Playing test signal and recording room response...") | |
| recording, playback = play_and_record(test_signal) | |
| recordings[test_type] = recording | |
| # Extract impulse response (what the room added) | |
| print(" Extracting room impulse response...") | |
| ir = extract_impulse_response(recording, playback) | |
| impulse_responses[test_type] = ir | |
| # Analyze | |
| print(" Analyzing...") | |
| if test_type in ['impulse', 'sweep']: | |
| rt60 = calculate_rt60(recording) | |
| # Merge RT60 values (impulse is typically more accurate) | |
| for freq, val in rt60.items(): | |
| if val > 0: | |
| analysis['rt60'][freq] = val | |
| if test_type == 'bursts' and frequency_times and burst_durations: | |
| print(" Calculating RT60 from tone bursts (more accurate per-band)...") | |
| rt60 = calculate_rt60_from_bursts(recording, frequency_times, burst_durations) | |
| # Bursts are most accurate, so they take priority | |
| for freq, val in rt60.items(): | |
| if val > 0: | |
| analysis['rt60'][freq] = val | |
| if test_type == 'impulse': | |
| ir_analysis = analyze_impulse_response(recording) | |
| analysis['impulse_response'] = ir_analysis | |
| # Save session with device info | |
| print(f"\nSaving session '{name}'...") | |
| input_dev_name = sd.query_devices(sd.default.device[0])['name'] | |
| output_dev_name = sd.query_devices(sd.default.device[1])['name'] | |
| save_session(name, recordings, analysis, impulse_responses, | |
| input_device=input_dev_name, output_device=output_dev_name) | |
| # Show summary plot | |
| print("\nGenerating analysis plots...") | |
| fig = plot_session_summary(name, recordings, analysis, | |
| save_path=get_session_dir(name) / "summary.png") | |
| plt.show() | |
| print(f"\n=== Session '{name}' complete ===") | |
| return 0 | |
| def cmd_compare(args): | |
| """Compare multiple sessions.""" | |
| session_names = [s.strip() for s in args.sessions.split(',')] | |
| if len(session_names) < 2: | |
| print("Please specify at least 2 sessions to compare (comma-separated)") | |
| return 1 | |
| print(f"\n=== Comparing Sessions: {', '.join(session_names)} ===\n") | |
| sessions_data = {} | |
| sessions_full = {} | |
| for name in session_names: | |
| try: | |
| recordings, analysis = load_session(name) | |
| sessions_data[name] = analysis | |
| sessions_full[name] = (recordings, analysis) | |
| print(f" Loaded: {name}") | |
| except FileNotFoundError: | |
| print(f" Warning: Session '{name}' not found, skipping") | |
| if len(sessions_data) < 2: | |
| print("\nNot enough sessions to compare") | |
| return 1 | |
| # Create comparison directory | |
| compare_dir = SESSIONS_DIR / "comparisons" | |
| compare_dir.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| print("\nGenerating comparison plots...") | |
| # RT60 comparison | |
| fig1 = plot_rt60_comparison(sessions_data, | |
| save_path=compare_dir / f"rt60_compare_{timestamp}.png") | |
| # Frequency response comparison | |
| fig2 = plot_frequency_response_comparison(sessions_full, | |
| save_path=compare_dir / f"freq_compare_{timestamp}.png") | |
| # Print summary | |
| print("\n=== RT60 Comparison Summary ===\n") | |
| print(f"{'Frequency':<12}", end='') | |
| for name in sessions_data: | |
| print(f"{name:<15}", end='') | |
| print() | |
| print("-" * (12 + 15 * len(sessions_data))) | |
| for freq in OCTAVE_BANDS: | |
| print(f"{freq:<12}", end='') | |
| for name, analysis in sessions_data.items(): | |
| rt60 = analysis.get('rt60', {}) | |
| val = rt60.get(str(freq), rt60.get(freq, 0)) | |
| print(f"{val:<15.3f}" if val > 0 else f"{'N/A':<15}", end='') | |
| print() | |
| plt.show() | |
| return 0 | |
| def cmd_list(args): | |
| """List saved sessions.""" | |
| list_sessions() | |
| return 0 | |
| def cmd_devices(args): | |
| """List audio devices.""" | |
| list_devices() | |
| return 0 | |
| def generate_spoken_number(number, sample_rate=SAMPLE_RATE): | |
| """Generate a distinct tone pattern for each number (1-9).""" | |
| duration = 0.8 | |
| t = np.linspace(0, duration, int(sample_rate * duration), dtype=np.float32) | |
| # Each number gets a unique frequency pattern | |
| base_freq = 300 + (number * 100) # 400Hz for 1, 500Hz for 2, etc. | |
| # Create a pattern: number of beeps = the number | |
| samples_per_beep = len(t) // (number * 2) | |
| tone = np.zeros_like(t) | |
| for i in range(number): | |
| start = i * samples_per_beep * 2 | |
| end = start + samples_per_beep | |
| if end <= len(t): | |
| beep_t = np.linspace(0, samples_per_beep / sample_rate, samples_per_beep) | |
| beep = np.sin(2 * np.pi * base_freq * beep_t) | |
| # Envelope | |
| env = np.sin(np.linspace(0, np.pi, samples_per_beep)) | |
| tone[start:end] = beep * env | |
| return (tone * 0.7).astype(np.float32) | |
| def detect_silence(audio, threshold_db=-40): | |
| """Check if audio is mostly silence.""" | |
| rms = np.sqrt(np.mean(audio ** 2)) | |
| db = 20 * np.log10(rms + 1e-10) | |
| return db < threshold_db | |
| def cmd_test_devices(args): | |
| """Interactive device testing - identify speakers and microphones.""" | |
| devices = sd.query_devices() | |
| # Get output devices | |
| output_devices = [(i, d) for i, d in enumerate(devices) if d['max_output_channels'] > 0] | |
| # Get input devices | |
| input_devices = [(i, d) for i, d in enumerate(devices) if d['max_input_channels'] > 0] | |
| print("\n" + "=" * 60) | |
| print(" SPEAKER TEST") | |
| print("=" * 60) | |
| print("\nI'll play a numbered beep pattern on each speaker.") | |
| print("Listen and note which speaker plays which number.\n") | |
| speaker_names = {} | |
| for idx, (dev_id, dev) in enumerate(output_devices): | |
| # Use device ID as the number of beeps (cap at 9 for sanity) | |
| beep_count = min(dev_id, 9) if dev_id > 0 else 1 | |
| print(f" [{dev_id}] {dev['name']}") | |
| print(f" Playing {beep_count} beep(s)...", end=' ', flush=True) | |
| try: | |
| tone = generate_spoken_number(beep_count) | |
| sd.play(tone, samplerate=SAMPLE_RATE, device=dev_id) | |
| sd.wait() | |
| print("done") | |
| speaker_names[dev_id] = (beep_count, dev['name']) | |
| except Exception as e: | |
| print(f"FAILED ({e})") | |
| # Small pause between devices | |
| sd.sleep(300) | |
| # Let user select default output | |
| print("\n" + "-" * 40) | |
| while True: | |
| choice = input("\nEnter the device ID for your preferred speaker (or 'skip'): ").strip() | |
| if choice.lower() == 'skip': | |
| selected_output = None | |
| break | |
| try: | |
| selected_output = int(choice) | |
| if selected_output in [d[0] for d in output_devices]: | |
| print(f" Selected: {devices[selected_output]['name']}") | |
| break | |
| else: | |
| print(" Invalid device ID. Try again.") | |
| except ValueError: | |
| print(" Please enter a number or 'skip'.") | |
| print("\n" + "=" * 60) | |
| print(" MICROPHONE TEST") | |
| print("=" * 60) | |
| print("\nI'll record from each microphone for 2 seconds.") | |
| print("Say the device number shown when prompted, then I'll play it back.") | |
| print("\nGet ready! Recording starts in...") | |
| # Single countdown before all mic tests | |
| for i in [3, 2, 1]: | |
| print(f" {i}...", flush=True) | |
| sd.sleep(1000) | |
| print(" GO! Respond quickly to each prompt!\n") | |
| # Use selected output or default for playback | |
| playback_device = selected_output if selected_output is not None else sd.default.device[1] | |
| mic_results = {} | |
| for idx, (dev_id, dev) in enumerate(input_devices): | |
| print(f" [{dev_id}] {dev['name']}") | |
| print(f" Say '{dev_id}' NOW! ", end='', flush=True) | |
| try: | |
| # Use device's native sample rate to avoid errors | |
| dev_sr = int(dev['default_samplerate']) | |
| # Record for 2 seconds | |
| recording = sd.rec( | |
| int(2 * dev_sr), | |
| samplerate=dev_sr, | |
| channels=1, | |
| device=dev_id, | |
| dtype=np.float32 | |
| ) | |
| sd.wait() | |
| recording = recording.flatten() | |
| is_silent = detect_silence(recording) | |
| if is_silent: | |
| print("(silence detected)") | |
| mic_results[dev_id] = ('silent', dev['name']) | |
| else: | |
| print("playing back... ", end='', flush=True) | |
| # Normalize and play back at device's sample rate | |
| recording = recording / (np.max(np.abs(recording)) + 1e-10) * 0.8 | |
| sd.play(recording, samplerate=dev_sr, device=playback_device) | |
| sd.wait() | |
| print("done") | |
| mic_results[dev_id] = ('audio', dev['name']) | |
| except Exception as e: | |
| print(f"FAILED ({e})") | |
| mic_results[dev_id] = ('error', dev['name']) | |
| sd.sleep(500) | |
| # Let user select default input | |
| print("\n" + "-" * 40) | |
| working_mics = [dev_id for dev_id, (status, _) in mic_results.items() if status == 'audio'] | |
| if working_mics: | |
| print("\nMicrophones with audio detected:") | |
| for dev_id in working_mics: | |
| print(f" [{dev_id}] {mic_results[dev_id][1]}") | |
| while True: | |
| choice = input("\nEnter the device ID for your preferred microphone (or 'skip'): ").strip() | |
| if choice.lower() == 'skip': | |
| selected_input = None | |
| break | |
| try: | |
| selected_input = int(choice) | |
| if selected_input in [d[0] for d in input_devices]: | |
| print(f" Selected: {devices[selected_input]['name']}") | |
| break | |
| else: | |
| print(" Invalid device ID. Try again.") | |
| except ValueError: | |
| print(" Please enter a number or 'skip'.") | |
| # Summary | |
| print("\n" + "=" * 60) | |
| print(" SUMMARY") | |
| print("=" * 60) | |
| if selected_output is not None: | |
| print(f"\n Output: [{selected_output}] {devices[selected_output]['name']}") | |
| else: | |
| print(f"\n Output: (not selected, using system default)") | |
| if selected_input is not None: | |
| print(f" Input: [{selected_input}] {devices[selected_input]['name']}") | |
| else: | |
| print(f" Input: (not selected, using system default)") | |
| print("\n Use these with the record command:") | |
| cmd_parts = ["python room_acoustics_test.py record --name SESSION_NAME"] | |
| if selected_input is not None: | |
| cmd_parts.append(f"--input {selected_input}") | |
| if selected_output is not None: | |
| cmd_parts.append(f"--output {selected_output}") | |
| print(f" {' '.join(cmd_parts)}") | |
| print() | |
| return 0 | |
| # ============================================================================= | |
| # Main | |
| # ============================================================================= | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='Room Acoustics Testing Tool', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=__doc__ | |
| ) | |
| subparsers = parser.add_subparsers(dest='command', help='Commands') | |
| # devices command | |
| parser_devices = subparsers.add_parser('devices', help='List audio devices') | |
| parser_devices.set_defaults(func=cmd_devices) | |
| # test-devices command | |
| parser_test = subparsers.add_parser('test-devices', | |
| help='Interactive speaker/microphone identification') | |
| parser_test.set_defaults(func=cmd_test_devices) | |
| # record command | |
| parser_record = subparsers.add_parser('record', help='Record a test session') | |
| parser_record.add_argument('--name', '-n', required=True, help='Session name') | |
| parser_record.add_argument('--type', '-t', choices=TEST_TYPES, | |
| help='Test type (default: all)') | |
| parser_record.add_argument('--input', '-i', type=int, metavar='ID', | |
| help='Input device ID (from "devices" command)') | |
| parser_record.add_argument('--output', '-o', type=int, metavar='ID', | |
| help='Output device ID (from "devices" command)') | |
| parser_record.add_argument('--force', '-f', action='store_true', | |
| help='Overwrite existing session') | |
| parser_record.set_defaults(func=cmd_record) | |
| # compare command | |
| parser_compare = subparsers.add_parser('compare', help='Compare sessions') | |
| parser_compare.add_argument('--sessions', '-s', required=True, | |
| help='Comma-separated session names') | |
| parser_compare.set_defaults(func=cmd_compare) | |
| # list command | |
| parser_list = subparsers.add_parser('list', help='List saved sessions') | |
| parser_list.set_defaults(func=cmd_list) | |
| args = parser.parse_args() | |
| if not args.command: | |
| parser.print_help() | |
| return 1 | |
| try: | |
| return args.func(args) | |
| except KeyboardInterrupt: | |
| print("\nAborted.") | |
| return 1 | |
| except Exception as e: | |
| print(f"\nError: {e}") | |
| return 1 | |
| if __name__ == '__main__': | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment