Created
March 13, 2025 18:23
-
-
Save amstocker/aa28af6555ae992d58eefc68a673dc7b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import wave | |
| from math import sqrt, sin, pi | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| # Y(n) = (1-a) * 0.5 * (x(n) + x(n-1)) + a*y(n-1) | |
| #class LowPassFilter: | |
| # def __init__(self, b=0.99, n=1024): | |
| # # should n be the block size? or a multiple of the block size? | |
| # self.b = b | |
| # #self.n = n | |
| # #self.ir = np.logspace(0, n-1, num=n, base=b) | |
| # #self.buffer = np.zeros(n) | |
| # | |
| # def process_sample(self, sample): | |
| # pass | |
| # How to vectorize this operation using numpy? | |
| class LowPassFilter: | |
| def __init__(self, init=0.0, decay=0.9): | |
| self.b = 1 - decay | |
| self.y = init | |
| self.prev_sample = 0.0 | |
| def process(self, sample): | |
| self.y += self.b * (0.5 * (sample + self.prev_sample) - self.y) | |
| self.prev_sample = sample | |
| return self.y | |
| class SlewLimiter: | |
| def __init__(self, attack=0.9, decay=0.99, init=0.0): | |
| self.rise = 1 - attack | |
| self.fall = 1 - decay | |
| self.y = init | |
| self.prev_sample = 0.0 | |
| def process(self, sample): | |
| if sample > self.y: | |
| self.y += self.rise * (sample - self.y) | |
| else: | |
| self.y += self.fall * (sample - self.y) | |
| return self.y | |
| class DCFilter: | |
| def __init__(self, decay=0.99): | |
| self.r = decay | |
| self.prev_sample = 0.0 | |
| self.y = 0.0 | |
| def process(self, sample): | |
| self.y = sample - self.prev_sample + self.r * self.y | |
| self.prev_sample = sample | |
| return self.y | |
| class Oscillator: | |
| def __init__(self): | |
| self.phase = 0.0 | |
| def process(self, freq): | |
| self.phase += freq | |
| if self.phase > 1.0: | |
| self.phase -= 1.0 | |
| return sin(2 * pi * self.phase) | |
| class PhaseLockedLoop: | |
| def __init__(self): | |
| self.lpf = LowPassFilter(decay=0.9995) | |
| self.osc = Oscillator() | |
| self.cmp = 0.0 | |
| def process(self, sample): | |
| lpf_value = self.lpf.process(self.cmp) | |
| osc_value = self.osc.process(lpf_value) | |
| self.cmp = (sample > 0) ^ (osc_value > 0) | |
| return lpf_value | |
| class AutoGain: | |
| def __init__(self): | |
| self.lpf = LowPassFilter(init=1.0, decay=0.9992) | |
| self.threshold = 1e-6 | |
| def process(self, sample): | |
| lpf_value = self.lpf.process(sample * sample) | |
| if lpf_value > self.threshold: | |
| return sample / sqrt(2 * lpf_value) | |
| else: | |
| return 0.0 | |
| class CycleDetector: | |
| def __init__(self, n): | |
| self.attack = 0.9 | |
| self.decay = 0.995 | |
| self.env_high = SlewLimiter(self.attack, self.decay) | |
| self.env_low = SlewLimiter(self.decay, self.attack) | |
| self.high_state = False | |
| self.low_state = False | |
| self.cycle_state = False | |
| self.last_cycle_start = 0 | |
| self.cycle_length_lpf = LowPassFilter(decay=0.9) | |
| self.cycle_length = 1 | |
| self.env_high_history = np.zeros(n) | |
| self.env_low_history = np.zeros(n) | |
| self.high_state_history = np.zeros(n) | |
| self.low_state_history = np.zeros(n) | |
| self.cycle_state_history = np.zeros(n) | |
| def process(self, sample, sample_index): | |
| env_high_value = self.env_high.process(max(sample, 0)) | |
| env_low_value = self.env_low.process(min(sample, 0)) | |
| high_off_to_on = False | |
| if not self.high_state and sample > env_high_value: | |
| self.high_state = True | |
| high_off_to_on = True | |
| if self.high_state and sample < env_high_value: | |
| self.high_state = False | |
| low_off_to_on = False | |
| if not self.low_state and sample < env_low_value: | |
| low_off_to_on = True | |
| self.low_state = True | |
| if self.low_state and sample > env_low_value: | |
| self.low_state = False | |
| if not self.cycle_state and high_off_to_on: | |
| self.cycle_state = True | |
| self.cycle_length = sample_index - self.last_cycle_start | |
| self.last_cycle_start = sample_index | |
| if self.cycle_state and low_off_to_on: | |
| self.cycle_state = False | |
| self.env_high_history[sample_index] = env_high_value | |
| self.env_low_history[sample_index] = env_low_value | |
| self.high_state_history[sample_index] = self.high_state | |
| self.low_state_history[sample_index] = self.low_state | |
| self.cycle_state_history[sample_index] = self.cycle_state | |
| #return self.cycle_length_lpf.process(self.cycle_length) | |
| return self.cycle_length | |
| def process(samples, samplerate): | |
| lpf = LowPassFilter(decay=0.99) | |
| #acg = AutoGain() | |
| cyc = CycleDetector(samples.size) | |
| processed_history = np.zeros(samples.size) | |
| frq_history = np.zeros(samples.size) | |
| for i in range(samples.size): | |
| #processed_history[i] = acg.process(lpf.process(samples[i])) | |
| processed_history[i] = lpf.process(samples[i]) | |
| cyc_length = cyc.process(processed_history[i], i) | |
| frq_history[i] = samplerate / cyc_length | |
| plt.plot(500 * samples, label="samples") | |
| plt.plot(300 * processed_history, label="processed") | |
| plt.plot(frq_history, label="freq") | |
| plt.ylim(0, 500) | |
| plt.legend() | |
| plt.show() | |
| if __name__ == "__main__": | |
| with open("test.wav", "rb") as wav_file: | |
| wav = wave.open(wav_file) | |
| length = wav.getnframes() | |
| buffer = wav.readframes(length) | |
| samples = np.frombuffer(buffer, dtype='<h').astype(float) / 32767 | |
| process(samples, wav.getframerate()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment