Skip to content

Instantly share code, notes, and snippets.

@marcusmueller
Last active July 14, 2025 12:16
Show Gist options
  • Select an option

  • Save marcusmueller/005320c24fddf9cb750e40cb7e75d69b to your computer and use it in GitHub Desktop.

Select an option

Save marcusmueller/005320c24fddf9cb750e40cb7e75d69b to your computer and use it in GitHub Desktop.
options:
parameters:
alias: ''
author: "Marcus M\xFCller"
catch_exceptions: 'True'
comment: ''
copyright: '20256'
description: ''
gen_linking: dynamic
generate_options: no_gui
generator_class_name: PythonNoGuiGenerator
generator_module: gnuradio.grc.workflows.python_nogui
hier_block_src_path: '.:'
id: switchamp
max_nouts: '0'
output_language: python
realtime_scheduling: '1'
run_command: '{python} -u {filename}'
run_options: run
thread_safe_setters: ''
title: Volume-based external amplifier switching
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [16, 12.0]
rotation: 0
state: enabled
blocks:
- name: audio_source_0
id: audio_source
parameters:
affinity: ''
alias: ''
comment: '"Device Name": audio_src_name'
device_name: audio_src_name
maxoutbuf: '0'
minoutbuf: '0'
num_outputs: '2'
ok_to_block: 'True'
samp_rate: samp_rate
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [24, 360.0]
rotation: 0
state: enabled
- name: audio_src_name
id: parameter
parameters:
alias: ''
comment: 'Find device names by running
aplay -L'
hide: none
label: Audio Source Name
short_id: s
type: str
value: '""'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [360, 12.0]
rotation: 0
state: enabled
- name: avg_time
id: parameter
parameters:
alias: ''
comment: 'Time (seconds) for which
the volume gets integrated'
hide: none
label: Averaging time (s)
short_id: a
type: eng_float
value: '0.02'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [544, 12.0]
rotation: 0
state: enabled
- name: blocks_abs_xx_0
id: blocks_abs_xx
parameters:
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
type: float
vlen: '1'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [312, 268.0]
rotation: 0
state: enabled
- name: blocks_abs_xx_1
id: blocks_abs_xx
parameters:
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
type: float
vlen: '1'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [320, 476.0]
rotation: 0
state: enabled
- name: blocks_max_xx_0
id: blocks_max_xx
parameters:
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
num_inputs: '2'
type: float
vlen: '1'
vlen_out: '1'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [1048, 352.0]
rotation: 0
state: enabled
- name: blocks_moving_average_xx_0
id: blocks_moving_average_xx
parameters:
affinity: ''
alias: ''
comment: ''
length: int(avg_time*samp_rate)
max_iter: '4000'
maxoutbuf: '0'
minoutbuf: '0'
scale: '1'
type: float
vlen: '1'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [504, 236.0]
rotation: 0
state: enabled
- name: blocks_moving_average_xx_0_0
id: blocks_moving_average_xx
parameters:
affinity: ''
alias: ''
comment: ''
length: int(avg_time*samp_rate)
max_iter: '4000'
maxoutbuf: '0'
minoutbuf: '0'
scale: '1'
type: float
vlen: '1'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [504, 444.0]
rotation: 0
state: enabled
- name: blocks_threshold_ff_0
id: blocks_threshold_ff
parameters:
affinity: ''
alias: ''
comment: 'We use 0.9*threshold as
low-going threshold, to
give us some _hysteresis_
(i.e., don''t toggle on/off in
rapid succession if signal
is around threshold)'
high: threshold
init: '0'
low: 0.9*threshold
maxoutbuf: '0'
minoutbuf: '0'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [768, 244.0]
rotation: 0
state: enabled
- name: blocks_threshold_ff_0_0
id: blocks_threshold_ff
parameters:
affinity: ''
alias: ''
comment: 'We use 0.9*threshold as
low-going threshold, to
give us some _hysteresis_
(i.e., don''t toggle on/off in
rapid succession if signal
is around threshold)'
high: threshold
init: '0'
low: 0.9*threshold
maxoutbuf: '0'
minoutbuf: '0'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [760, 452.0]
rotation: 0
state: enabled
- name: custom_block
id: epy_block
parameters:
_source_code: "import numpy as np\nfrom gnuradio import gr\ntry: \n import\
\ RPi.GPIO as GPIO\nexcept:\n # don't have the RPi module \u2013 are we on\
\ the development PC?\n # just a dummy, so that we can mock up the example\n\
\ class GPIO:\n def setmode(self, *args):\n pass\n \
\ def setup(self, *args):\n pass\n def output(self, *args):\n\
\ pass\n BCM=12\n OUT=12\n\nclass blk(gr.sync_block):\
\ # other base classes are basic_block, decim_block, interp_block\n \"\"\
\"Toggle GPIO according to input values\"\"\"\n\n def __init__(self, output_pin=17):\
\ # only default arguments here\n \"\"\"arguments to this function show\
\ up as parameters in GRC\"\"\"\n gr.sync_block.__init__(\n \
\ self,\n name='Control GPIO', # will show up in GRC\n \
\ in_sig=[np.float32],\n out_sig=[]\n )\n self.output_pin\
\ = output_pin\n # Set up GPIO\n GPIO.setmode(GPIO.BCM)\n \
\ GPIO.setup(self.output_pin, GPIO.OUT)\n\n\n def work(self, input_items,\
\ output_items):\n # limit ourself to 400 samples at a time, to control\n\
\ # buffer fillage\n to_do = min(len(input_items[0]), 400)\n \
\ value = input_items[0][to_do-1]\n GPIO.output(self.output_pin,\
\ bool(value))\n # How many samples did we consume?\n return to_do\n"
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
output_pin: '17'
states:
_io_cache: ('Control GPIO', 'blk', [('output_pin', '17')], [('0', 'float', 1)],
[], 'Toggle GPIO according to input values', ['output_pin'])
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [1232, 356.0]
rotation: 0
state: enabled
- name: samp_rate
id: parameter
parameters:
alias: ''
comment: ''
hide: none
label: Sample Rate
short_id: r
type: intx
value: '48000'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [216, 12.0]
rotation: 0
state: enabled
- name: threshold
id: parameter
parameters:
alias: ''
comment: 'Threshold above
which to turn on
the amplifier'
hide: none
label: Threshold
short_id: t
type: eng_float
value: '0.05'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [728, 12.0]
rotation: 0
state: enabled
connections:
- [audio_source_0, '0', blocks_abs_xx_0, '0']
- [audio_source_0, '1', blocks_abs_xx_1, '0']
- [blocks_abs_xx_0, '0', blocks_moving_average_xx_0, '0']
- [blocks_abs_xx_1, '0', blocks_moving_average_xx_0_0, '0']
- [blocks_max_xx_0, '0', custom_block, '0']
- [blocks_moving_average_xx_0, '0', blocks_threshold_ff_0, '0']
- [blocks_moving_average_xx_0_0, '0', blocks_threshold_ff_0_0, '0']
- [blocks_threshold_ff_0, '0', blocks_max_xx_0, '0']
- [blocks_threshold_ff_0_0, '0', blocks_max_xx_0, '1']
metadata:
file_format: 1
grc_version: v3.11.0.0git-980-g25047da6
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: Volume-based external amplifier switching
# Author: Marcus Müller
# Copyright: 20256
# GNU Radio version: v3.11.0.0git-980-g25047da6
from gnuradio import audio
from gnuradio import blocks
import switchamp_custom_block as custom_block # embedded python block
import threading
from gnuradio import gr
from gnuradio.filter import firdes
from gnuradio.fft import window
import sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
class switchamp(gr.top_block):
def __init__(self, audio_src_name="", avg_time=0.02, samp_rate=48000, threshold=0.05):
gr.top_block.__init__(self, "Volume-based external amplifier switching", catch_exceptions=True)
##################################################
# Parameters
##################################################
self.audio_src_name = audio_src_name
self.avg_time = avg_time
self.samp_rate = samp_rate
self.threshold = threshold
##################################################
# Blocks
##################################################
self.custom_block = custom_block.blk(output_pin=17)
self.blocks_threshold_ff_0_0 = blocks.threshold_ff((0.9*threshold), threshold, 0)
self.blocks_threshold_ff_0 = blocks.threshold_ff((0.9*threshold), threshold, 0)
self.blocks_moving_average_xx_0_0 = blocks.moving_average_ff((int(avg_time*samp_rate)), 1, 4000, 1)
self.blocks_moving_average_xx_0 = blocks.moving_average_ff((int(avg_time*samp_rate)), 1, 4000, 1)
self.blocks_max_xx_0 = blocks.max_ff(1, 1)
self.blocks_abs_xx_1 = blocks.abs_ff(1)
self.blocks_abs_xx_0 = blocks.abs_ff(1)
self.audio_source_0 = audio.source(samp_rate, audio_src_name, True)
##################################################
# Connections
##################################################
self.connect((self.audio_source_0, 0), (self.blocks_abs_xx_0, 0))
self.connect((self.audio_source_0, 1), (self.blocks_abs_xx_1, 0))
self.connect((self.blocks_abs_xx_0, 0), (self.blocks_moving_average_xx_0, 0))
self.connect((self.blocks_abs_xx_1, 0), (self.blocks_moving_average_xx_0_0, 0))
self.connect((self.blocks_max_xx_0, 0), (self.custom_block, 0))
self.connect((self.blocks_moving_average_xx_0, 0), (self.blocks_threshold_ff_0, 0))
self.connect((self.blocks_moving_average_xx_0_0, 0), (self.blocks_threshold_ff_0_0, 0))
self.connect((self.blocks_threshold_ff_0, 0), (self.blocks_max_xx_0, 0))
self.connect((self.blocks_threshold_ff_0_0, 0), (self.blocks_max_xx_0, 1))
def get_audio_src_name(self):
return self.audio_src_name
def set_audio_src_name(self, audio_src_name):
self.audio_src_name = audio_src_name
def get_avg_time(self):
return self.avg_time
def set_avg_time(self, avg_time):
self.avg_time = avg_time
self.blocks_moving_average_xx_0.set_length_and_scale((int(self.avg_time*self.samp_rate)), 1)
self.blocks_moving_average_xx_0_0.set_length_and_scale((int(self.avg_time*self.samp_rate)), 1)
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.blocks_moving_average_xx_0.set_length_and_scale((int(self.avg_time*self.samp_rate)), 1)
self.blocks_moving_average_xx_0_0.set_length_and_scale((int(self.avg_time*self.samp_rate)), 1)
def get_threshold(self):
return self.threshold
def set_threshold(self, threshold):
self.threshold = threshold
self.blocks_threshold_ff_0.set_hi(self.threshold)
self.blocks_threshold_ff_0.set_lo((0.9*self.threshold))
self.blocks_threshold_ff_0_0.set_hi(self.threshold)
self.blocks_threshold_ff_0_0.set_lo((0.9*self.threshold))
def argument_parser():
parser = ArgumentParser()
parser.add_argument(
"-s", "--audio-src-name", dest="audio_src_name", type=str, default="",
help="Set Audio Source Name [default=%(default)r]")
parser.add_argument(
"-a", "--avg-time", dest="avg_time", type=eng_float, default=eng_notation.num_to_str(float(0.02)),
help="Set Averaging time (s) [default=%(default)r]")
parser.add_argument(
"-r", "--samp-rate", dest="samp_rate", type=intx, default=48000,
help="Set Sample Rate [default=%(default)r]")
parser.add_argument(
"-t", "--threshold", dest="threshold", type=eng_float, default=eng_notation.num_to_str(float(0.05)),
help="Set Threshold [default=%(default)r]")
return parser
def main(top_block_cls=switchamp, options=None):
if options is None:
options = argument_parser().parse_args()
if gr.enable_realtime_scheduling() != gr.RT_OK:
gr.logger("realtime").warn("Error: failed to enable real-time scheduling.")
tb = top_block_cls(audio_src_name=options.audio_src_name, avg_time=options.avg_time, samp_rate=options.samp_rate, threshold=options.threshold)
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
tb.wait()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment