Last active
July 14, 2025 12:16
-
-
Save marcusmueller/005320c24fddf9cb750e40cb7e75d69b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| options: | |
| parameters: | |
| alias: '' | |
| author: "Marcus M\xFCller" | |
| catch_exceptions: 'True' | |
| comment: '' | |
| copyright: '20256' | |
| description: '' | |
| gen_linking: dynamic | |
| generate_options: no_gui | |
| generator_class_name: PythonNoGuiGenerator | |
| generator_module: gnuradio.grc.workflows.python_nogui | |
| hier_block_src_path: '.:' | |
| id: switchamp | |
| max_nouts: '0' | |
| output_language: python | |
| realtime_scheduling: '1' | |
| run_command: '{python} -u {filename}' | |
| run_options: run | |
| thread_safe_setters: '' | |
| title: Volume-based external amplifier switching | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [16, 12.0] | |
| rotation: 0 | |
| state: enabled | |
| blocks: | |
| - name: audio_source_0 | |
| id: audio_source | |
| parameters: | |
| affinity: '' | |
| alias: '' | |
| comment: '"Device Name": audio_src_name' | |
| device_name: audio_src_name | |
| maxoutbuf: '0' | |
| minoutbuf: '0' | |
| num_outputs: '2' | |
| ok_to_block: 'True' | |
| samp_rate: samp_rate | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [24, 360.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: audio_src_name | |
| id: parameter | |
| parameters: | |
| alias: '' | |
| comment: 'Find device names by running | |
| aplay -L' | |
| hide: none | |
| label: Audio Source Name | |
| short_id: s | |
| type: str | |
| value: '""' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [360, 12.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: avg_time | |
| id: parameter | |
| parameters: | |
| alias: '' | |
| comment: 'Time (seconds) for which | |
| the volume gets integrated' | |
| hide: none | |
| label: Averaging time (s) | |
| short_id: a | |
| type: eng_float | |
| value: '0.02' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [544, 12.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: blocks_abs_xx_0 | |
| id: blocks_abs_xx | |
| parameters: | |
| affinity: '' | |
| alias: '' | |
| comment: '' | |
| maxoutbuf: '0' | |
| minoutbuf: '0' | |
| type: float | |
| vlen: '1' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [312, 268.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: blocks_abs_xx_1 | |
| id: blocks_abs_xx | |
| parameters: | |
| affinity: '' | |
| alias: '' | |
| comment: '' | |
| maxoutbuf: '0' | |
| minoutbuf: '0' | |
| type: float | |
| vlen: '1' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [320, 476.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: blocks_max_xx_0 | |
| id: blocks_max_xx | |
| parameters: | |
| affinity: '' | |
| alias: '' | |
| comment: '' | |
| maxoutbuf: '0' | |
| minoutbuf: '0' | |
| num_inputs: '2' | |
| type: float | |
| vlen: '1' | |
| vlen_out: '1' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [1048, 352.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: blocks_moving_average_xx_0 | |
| id: blocks_moving_average_xx | |
| parameters: | |
| affinity: '' | |
| alias: '' | |
| comment: '' | |
| length: int(avg_time*samp_rate) | |
| max_iter: '4000' | |
| maxoutbuf: '0' | |
| minoutbuf: '0' | |
| scale: '1' | |
| type: float | |
| vlen: '1' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [504, 236.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: blocks_moving_average_xx_0_0 | |
| id: blocks_moving_average_xx | |
| parameters: | |
| affinity: '' | |
| alias: '' | |
| comment: '' | |
| length: int(avg_time*samp_rate) | |
| max_iter: '4000' | |
| maxoutbuf: '0' | |
| minoutbuf: '0' | |
| scale: '1' | |
| type: float | |
| vlen: '1' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [504, 444.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: blocks_threshold_ff_0 | |
| id: blocks_threshold_ff | |
| parameters: | |
| affinity: '' | |
| alias: '' | |
| comment: 'We use 0.9*threshold as | |
| low-going threshold, to | |
| give us some _hysteresis_ | |
| (i.e., don''t toggle on/off in | |
| rapid succession if signal | |
| is around threshold)' | |
| high: threshold | |
| init: '0' | |
| low: 0.9*threshold | |
| maxoutbuf: '0' | |
| minoutbuf: '0' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [768, 244.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: blocks_threshold_ff_0_0 | |
| id: blocks_threshold_ff | |
| parameters: | |
| affinity: '' | |
| alias: '' | |
| comment: 'We use 0.9*threshold as | |
| low-going threshold, to | |
| give us some _hysteresis_ | |
| (i.e., don''t toggle on/off in | |
| rapid succession if signal | |
| is around threshold)' | |
| high: threshold | |
| init: '0' | |
| low: 0.9*threshold | |
| maxoutbuf: '0' | |
| minoutbuf: '0' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [760, 452.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: custom_block | |
| id: epy_block | |
| parameters: | |
| _source_code: "import numpy as np\nfrom gnuradio import gr\ntry: \n import\ | |
| \ RPi.GPIO as GPIO\nexcept:\n # don't have the RPi module \u2013 are we on\ | |
| \ the development PC?\n # just a dummy, so that we can mock up the example\n\ | |
| \ class GPIO:\n def setmode(self, *args):\n pass\n \ | |
| \ def setup(self, *args):\n pass\n def output(self, *args):\n\ | |
| \ pass\n BCM=12\n OUT=12\n\nclass blk(gr.sync_block):\ | |
| \ # other base classes are basic_block, decim_block, interp_block\n \"\"\ | |
| \"Toggle GPIO according to input values\"\"\"\n\n def __init__(self, output_pin=17):\ | |
| \ # only default arguments here\n \"\"\"arguments to this function show\ | |
| \ up as parameters in GRC\"\"\"\n gr.sync_block.__init__(\n \ | |
| \ self,\n name='Control GPIO', # will show up in GRC\n \ | |
| \ in_sig=[np.float32],\n out_sig=[]\n )\n self.output_pin\ | |
| \ = output_pin\n # Set up GPIO\n GPIO.setmode(GPIO.BCM)\n \ | |
| \ GPIO.setup(self.output_pin, GPIO.OUT)\n\n\n def work(self, input_items,\ | |
| \ output_items):\n # limit ourself to 400 samples at a time, to control\n\ | |
| \ # buffer fillage\n to_do = min(len(input_items[0]), 400)\n \ | |
| \ value = input_items[0][to_do-1]\n GPIO.output(self.output_pin,\ | |
| \ bool(value))\n # How many samples did we consume?\n return to_do\n" | |
| affinity: '' | |
| alias: '' | |
| comment: '' | |
| maxoutbuf: '0' | |
| minoutbuf: '0' | |
| output_pin: '17' | |
| states: | |
| _io_cache: ('Control GPIO', 'blk', [('output_pin', '17')], [('0', 'float', 1)], | |
| [], 'Toggle GPIO according to input values', ['output_pin']) | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [1232, 356.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: samp_rate | |
| id: parameter | |
| parameters: | |
| alias: '' | |
| comment: '' | |
| hide: none | |
| label: Sample Rate | |
| short_id: r | |
| type: intx | |
| value: '48000' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [216, 12.0] | |
| rotation: 0 | |
| state: enabled | |
| - name: threshold | |
| id: parameter | |
| parameters: | |
| alias: '' | |
| comment: 'Threshold above | |
| which to turn on | |
| the amplifier' | |
| hide: none | |
| label: Threshold | |
| short_id: t | |
| type: eng_float | |
| value: '0.05' | |
| states: | |
| bus_sink: false | |
| bus_source: false | |
| bus_structure: null | |
| coordinate: [728, 12.0] | |
| rotation: 0 | |
| state: enabled | |
| connections: | |
| - [audio_source_0, '0', blocks_abs_xx_0, '0'] | |
| - [audio_source_0, '1', blocks_abs_xx_1, '0'] | |
| - [blocks_abs_xx_0, '0', blocks_moving_average_xx_0, '0'] | |
| - [blocks_abs_xx_1, '0', blocks_moving_average_xx_0_0, '0'] | |
| - [blocks_max_xx_0, '0', custom_block, '0'] | |
| - [blocks_moving_average_xx_0, '0', blocks_threshold_ff_0, '0'] | |
| - [blocks_moving_average_xx_0_0, '0', blocks_threshold_ff_0_0, '0'] | |
| - [blocks_threshold_ff_0, '0', blocks_max_xx_0, '0'] | |
| - [blocks_threshold_ff_0_0, '0', blocks_max_xx_0, '1'] | |
| metadata: | |
| file_format: 1 | |
| grc_version: v3.11.0.0git-980-g25047da6 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| # | |
| # SPDX-License-Identifier: GPL-3.0 | |
| # | |
| # GNU Radio Python Flow Graph | |
| # Title: Volume-based external amplifier switching | |
| # Author: Marcus Müller | |
| # Copyright: 20256 | |
| # GNU Radio version: v3.11.0.0git-980-g25047da6 | |
| from gnuradio import audio | |
| from gnuradio import blocks | |
| import switchamp_custom_block as custom_block # embedded python block | |
| import threading | |
| from gnuradio import gr | |
| from gnuradio.filter import firdes | |
| from gnuradio.fft import window | |
| import sys | |
| import signal | |
| from argparse import ArgumentParser | |
| from gnuradio.eng_arg import eng_float, intx | |
| from gnuradio import eng_notation | |
| class switchamp(gr.top_block): | |
| def __init__(self, audio_src_name="", avg_time=0.02, samp_rate=48000, threshold=0.05): | |
| gr.top_block.__init__(self, "Volume-based external amplifier switching", catch_exceptions=True) | |
| ################################################## | |
| # Parameters | |
| ################################################## | |
| self.audio_src_name = audio_src_name | |
| self.avg_time = avg_time | |
| self.samp_rate = samp_rate | |
| self.threshold = threshold | |
| ################################################## | |
| # Blocks | |
| ################################################## | |
| self.custom_block = custom_block.blk(output_pin=17) | |
| self.blocks_threshold_ff_0_0 = blocks.threshold_ff((0.9*threshold), threshold, 0) | |
| self.blocks_threshold_ff_0 = blocks.threshold_ff((0.9*threshold), threshold, 0) | |
| self.blocks_moving_average_xx_0_0 = blocks.moving_average_ff((int(avg_time*samp_rate)), 1, 4000, 1) | |
| self.blocks_moving_average_xx_0 = blocks.moving_average_ff((int(avg_time*samp_rate)), 1, 4000, 1) | |
| self.blocks_max_xx_0 = blocks.max_ff(1, 1) | |
| self.blocks_abs_xx_1 = blocks.abs_ff(1) | |
| self.blocks_abs_xx_0 = blocks.abs_ff(1) | |
| self.audio_source_0 = audio.source(samp_rate, audio_src_name, True) | |
| ################################################## | |
| # Connections | |
| ################################################## | |
| self.connect((self.audio_source_0, 0), (self.blocks_abs_xx_0, 0)) | |
| self.connect((self.audio_source_0, 1), (self.blocks_abs_xx_1, 0)) | |
| self.connect((self.blocks_abs_xx_0, 0), (self.blocks_moving_average_xx_0, 0)) | |
| self.connect((self.blocks_abs_xx_1, 0), (self.blocks_moving_average_xx_0_0, 0)) | |
| self.connect((self.blocks_max_xx_0, 0), (self.custom_block, 0)) | |
| self.connect((self.blocks_moving_average_xx_0, 0), (self.blocks_threshold_ff_0, 0)) | |
| self.connect((self.blocks_moving_average_xx_0_0, 0), (self.blocks_threshold_ff_0_0, 0)) | |
| self.connect((self.blocks_threshold_ff_0, 0), (self.blocks_max_xx_0, 0)) | |
| self.connect((self.blocks_threshold_ff_0_0, 0), (self.blocks_max_xx_0, 1)) | |
| def get_audio_src_name(self): | |
| return self.audio_src_name | |
| def set_audio_src_name(self, audio_src_name): | |
| self.audio_src_name = audio_src_name | |
| def get_avg_time(self): | |
| return self.avg_time | |
| def set_avg_time(self, avg_time): | |
| self.avg_time = avg_time | |
| self.blocks_moving_average_xx_0.set_length_and_scale((int(self.avg_time*self.samp_rate)), 1) | |
| self.blocks_moving_average_xx_0_0.set_length_and_scale((int(self.avg_time*self.samp_rate)), 1) | |
| def get_samp_rate(self): | |
| return self.samp_rate | |
| def set_samp_rate(self, samp_rate): | |
| self.samp_rate = samp_rate | |
| self.blocks_moving_average_xx_0.set_length_and_scale((int(self.avg_time*self.samp_rate)), 1) | |
| self.blocks_moving_average_xx_0_0.set_length_and_scale((int(self.avg_time*self.samp_rate)), 1) | |
| def get_threshold(self): | |
| return self.threshold | |
| def set_threshold(self, threshold): | |
| self.threshold = threshold | |
| self.blocks_threshold_ff_0.set_hi(self.threshold) | |
| self.blocks_threshold_ff_0.set_lo((0.9*self.threshold)) | |
| self.blocks_threshold_ff_0_0.set_hi(self.threshold) | |
| self.blocks_threshold_ff_0_0.set_lo((0.9*self.threshold)) | |
| def argument_parser(): | |
| parser = ArgumentParser() | |
| parser.add_argument( | |
| "-s", "--audio-src-name", dest="audio_src_name", type=str, default="", | |
| help="Set Audio Source Name [default=%(default)r]") | |
| parser.add_argument( | |
| "-a", "--avg-time", dest="avg_time", type=eng_float, default=eng_notation.num_to_str(float(0.02)), | |
| help="Set Averaging time (s) [default=%(default)r]") | |
| parser.add_argument( | |
| "-r", "--samp-rate", dest="samp_rate", type=intx, default=48000, | |
| help="Set Sample Rate [default=%(default)r]") | |
| parser.add_argument( | |
| "-t", "--threshold", dest="threshold", type=eng_float, default=eng_notation.num_to_str(float(0.05)), | |
| help="Set Threshold [default=%(default)r]") | |
| return parser | |
| def main(top_block_cls=switchamp, options=None): | |
| if options is None: | |
| options = argument_parser().parse_args() | |
| if gr.enable_realtime_scheduling() != gr.RT_OK: | |
| gr.logger("realtime").warn("Error: failed to enable real-time scheduling.") | |
| tb = top_block_cls(audio_src_name=options.audio_src_name, avg_time=options.avg_time, samp_rate=options.samp_rate, threshold=options.threshold) | |
| def sig_handler(sig=None, frame=None): | |
| tb.stop() | |
| tb.wait() | |
| sys.exit(0) | |
| signal.signal(signal.SIGINT, sig_handler) | |
| signal.signal(signal.SIGTERM, sig_handler) | |
| tb.start() | |
| tb.wait() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
