Skip to content

Instantly share code, notes, and snippets.

@amstocker
Created March 13, 2025 18:23
Show Gist options
  • Select an option

  • Save amstocker/aa28af6555ae992d58eefc68a673dc7b to your computer and use it in GitHub Desktop.

Select an option

Save amstocker/aa28af6555ae992d58eefc68a673dc7b to your computer and use it in GitHub Desktop.
import wave
from math import sqrt, sin, pi
import numpy as np
import matplotlib.pyplot as plt
# Y(n) = (1-a) * 0.5 * (x(n) + x(n-1)) + a*y(n-1)
#class LowPassFilter:
# def __init__(self, b=0.99, n=1024):
# # should n be the block size? or a multiple of the block size?
# self.b = b
# #self.n = n
# #self.ir = np.logspace(0, n-1, num=n, base=b)
# #self.buffer = np.zeros(n)
#
# def process_sample(self, sample):
# pass
# How to vectorize this operation using numpy?
class LowPassFilter:
def __init__(self, init=0.0, decay=0.9):
self.b = 1 - decay
self.y = init
self.prev_sample = 0.0
def process(self, sample):
self.y += self.b * (0.5 * (sample + self.prev_sample) - self.y)
self.prev_sample = sample
return self.y
class SlewLimiter:
def __init__(self, attack=0.9, decay=0.99, init=0.0):
self.rise = 1 - attack
self.fall = 1 - decay
self.y = init
self.prev_sample = 0.0
def process(self, sample):
if sample > self.y:
self.y += self.rise * (sample - self.y)
else:
self.y += self.fall * (sample - self.y)
return self.y
class DCFilter:
def __init__(self, decay=0.99):
self.r = decay
self.prev_sample = 0.0
self.y = 0.0
def process(self, sample):
self.y = sample - self.prev_sample + self.r * self.y
self.prev_sample = sample
return self.y
class Oscillator:
def __init__(self):
self.phase = 0.0
def process(self, freq):
self.phase += freq
if self.phase > 1.0:
self.phase -= 1.0
return sin(2 * pi * self.phase)
class PhaseLockedLoop:
def __init__(self):
self.lpf = LowPassFilter(decay=0.9995)
self.osc = Oscillator()
self.cmp = 0.0
def process(self, sample):
lpf_value = self.lpf.process(self.cmp)
osc_value = self.osc.process(lpf_value)
self.cmp = (sample > 0) ^ (osc_value > 0)
return lpf_value
class AutoGain:
def __init__(self):
self.lpf = LowPassFilter(init=1.0, decay=0.9992)
self.threshold = 1e-6
def process(self, sample):
lpf_value = self.lpf.process(sample * sample)
if lpf_value > self.threshold:
return sample / sqrt(2 * lpf_value)
else:
return 0.0
class CycleDetector:
def __init__(self, n):
self.attack = 0.9
self.decay = 0.995
self.env_high = SlewLimiter(self.attack, self.decay)
self.env_low = SlewLimiter(self.decay, self.attack)
self.high_state = False
self.low_state = False
self.cycle_state = False
self.last_cycle_start = 0
self.cycle_length_lpf = LowPassFilter(decay=0.9)
self.cycle_length = 1
self.env_high_history = np.zeros(n)
self.env_low_history = np.zeros(n)
self.high_state_history = np.zeros(n)
self.low_state_history = np.zeros(n)
self.cycle_state_history = np.zeros(n)
def process(self, sample, sample_index):
env_high_value = self.env_high.process(max(sample, 0))
env_low_value = self.env_low.process(min(sample, 0))
high_off_to_on = False
if not self.high_state and sample > env_high_value:
self.high_state = True
high_off_to_on = True
if self.high_state and sample < env_high_value:
self.high_state = False
low_off_to_on = False
if not self.low_state and sample < env_low_value:
low_off_to_on = True
self.low_state = True
if self.low_state and sample > env_low_value:
self.low_state = False
if not self.cycle_state and high_off_to_on:
self.cycle_state = True
self.cycle_length = sample_index - self.last_cycle_start
self.last_cycle_start = sample_index
if self.cycle_state and low_off_to_on:
self.cycle_state = False
self.env_high_history[sample_index] = env_high_value
self.env_low_history[sample_index] = env_low_value
self.high_state_history[sample_index] = self.high_state
self.low_state_history[sample_index] = self.low_state
self.cycle_state_history[sample_index] = self.cycle_state
#return self.cycle_length_lpf.process(self.cycle_length)
return self.cycle_length
def process(samples, samplerate):
lpf = LowPassFilter(decay=0.99)
#acg = AutoGain()
cyc = CycleDetector(samples.size)
processed_history = np.zeros(samples.size)
frq_history = np.zeros(samples.size)
for i in range(samples.size):
#processed_history[i] = acg.process(lpf.process(samples[i]))
processed_history[i] = lpf.process(samples[i])
cyc_length = cyc.process(processed_history[i], i)
frq_history[i] = samplerate / cyc_length
plt.plot(500 * samples, label="samples")
plt.plot(300 * processed_history, label="processed")
plt.plot(frq_history, label="freq")
plt.ylim(0, 500)
plt.legend()
plt.show()
if __name__ == "__main__":
with open("test.wav", "rb") as wav_file:
wav = wave.open(wav_file)
length = wav.getnframes()
buffer = wav.readframes(length)
samples = np.frombuffer(buffer, dtype='<h').astype(float) / 32767
process(samples, wav.getframerate())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment