Skip to content

Instantly share code, notes, and snippets.

@luigifcruz
Last active November 6, 2021 21:53
Show Gist options
  • Select an option

  • Save luigifcruz/ddae07bdee32599be64fbd772437588d to your computer and use it in GitHub Desktop.

Select an option

Save luigifcruz/ddae07bdee32599be64fbd772437588d to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "saved-condition",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import lzma\n",
"import pyflac\n",
"import numpy as np\n",
"from radiocore import Chopper\n",
"from SoapySDR import Device, SOAPY_SDR_RX, SOAPY_SDR_CF32"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "crazy-motorcycle",
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"import matplotlib.pyplot as plt"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "chicken-payday",
"metadata": {},
"outputs": [],
"source": [
"input_rate = 1.5e6\n",
"device = \"rtlsdr\"\n",
"freq_start = 70e6\n",
"freq_end = 1.8e9"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cultural-passing",
"metadata": {},
"outputs": [],
"source": [
"chopper = Chopper(chunk_size=2048, multiplier=64)\n",
"buffer = np.zeros(chopper.size, dtype=np.complex64)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "interstate-reasoning",
"metadata": {},
"outputs": [],
"source": [
"sdr = Device({\"driver\": device})\n",
"sdr.setGainMode(SOAPY_SDR_RX, 0, False)\n",
"sdr.setFrequency(SOAPY_SDR_RX, 0, 96.9e6)\n",
"sdr.setSampleRate(SOAPY_SDR_RX, 0, input_rate)\n",
"sdr.setBandwidth(SOAPY_SDR_RX, 0, input_rate)\n",
"\n",
"rx = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32)\n",
"sdr.activateStream(rx)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "false-paris",
"metadata": {},
"outputs": [],
"source": [
"def encoder_callback(buffer: bytes, num_bytes: int, num_samples: int, current_frame: int): \n",
" _flac_file.write(buffer)\n",
" _flac_file.flush()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "broken-willow",
"metadata": {},
"outputs": [],
"source": [
"_frequency = []\n",
"_lzma_ratio = []\n",
"_flac_ratio = []\n",
"_pwr_ratio = []"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "frozen-skiing",
"metadata": {},
"outputs": [],
"source": [
"for freq in range(int(freq_start), int(freq_end), int(input_rate)): \n",
" _flac_file = open(\"SDR_IQ.flac\", \"bw\")\n",
" _lzma_file = open(\"SDR_IQ.lzma\", \"bw\")\n",
" _int16_file = open(\"SDR_IQ.ii16\", \"bw\")\n",
"\n",
" _flac_encoder = pyflac.StreamEncoder(write_callback=encoder_callback,\n",
" sample_rate=192000, # bullshit, but it doesn't need to be accurate\n",
" compression_level=8) # 5 is the default, 8 is the highest compression\n",
"\n",
" sdr.setFrequency(SOAPY_SDR_RX, 0, freq)\n",
" \n",
" for _ in range(16): \n",
" for chunk in chopper.chop(buffer):\n",
" sr = sdr.readStream(rx, [chunk], len(chunk), timeoutUs=int(1e9))\n",
" \n",
" _float_data = buffer.view(np.float32)\n",
" _int16_data = (_float_data * 32767).astype(np.int16)\n",
"\n",
" _int16_file.write(_int16_data.tobytes())\n",
" _lzma_file.write(lzma.compress(_int16_data.tobytes()))\n",
" _flac_encoder.process(_int16_data.reshape((len(_int16_data) // 2, 2)))\n",
"\n",
" _flac_encoder.finish()\n",
" \n",
" _flac_file.close()\n",
" _lzma_file.close()\n",
" _int16_file.close()\n",
" \n",
" print(np.min(_float_data), np.max(_float_data))\n",
" plt.psd(_float_data, NFFT=2048)\n",
" plt.show()\n",
" \n",
" _int16_size = os.path.getsize(_int16_file.name)\n",
" _lzma_size = os.path.getsize(_lzma_file.name)\n",
" _flac_size = os.path.getsize(_flac_file.name)\n",
" \n",
" os.remove(_int16_file.name)\n",
" os.remove(_lzma_file.name)\n",
" os.remove(_flac_file.name)\n",
" \n",
" _frequency.append(freq)\n",
" _lzma_ratio.append((_lzma_size*100)/_int16_size)\n",
" _flac_ratio.append((_flac_size*100)/_int16_size)\n",
" _pwr_ratio.append(np.average(10 * np.log10(_float_data[0::2]**2 + _float_data[1::2]**2)))\n",
"\n",
" print(f\"--------------------------------------------\")\n",
" print(f\"Frequency: {_frequency[-1]//int(1e6)} MHz\")\n",
" print(f\"Power: {_pwr_ratio[-1]} dBFS\")\n",
" print(f\"Uncompressed file: {_int16_size} bytes\")\n",
" print(f\"LZMA compressed file: {_lzma_size} bytes ({_lzma_ratio[-1]:.0f}%)\")\n",
" print(f\"FLAC compressed file: {_flac_size} bytes ({_flac_ratio[-1]:.0f}%)\")\n",
"\n",
"sdr.deactivateStream(rx)\n",
"sdr.closeStream(rx)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "overhead-atlantic",
"metadata": {},
"outputs": [],
"source": [
"_frequency = np.array(_frequency)\n",
"_lzma_ratio = np.array(_lzma_ratio)\n",
"_flac_ratio = np.array(_flac_ratio)\n",
"_pwr_ratio = np.array(_pwr_ratio)\n",
"_diff_ratio = _lzma_ratio - _flac_ratio\n",
"\n",
"_lzma_avg = np.average(_lzma_ratio)\n",
"_flac_avg = np.average(_flac_ratio)\n",
"_diff_min = np.min(_diff_ratio)\n",
"_diff_max = np.max(_diff_ratio)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "comparative-photography",
"metadata": {},
"outputs": [],
"source": [
"plt.style.use('dracula')\n",
"\n",
"fig1 = plt.figure(1, figsize=(8, 6), dpi=200)\n",
"frame1 = fig1.add_axes((.1,.5,.8,.4))\n",
"frame1.set_ylabel('Relative File Size (%)')\n",
"plt.title(f\"Real-world I/Q compression - FLAC vs. LZMA\\n In relation to uncompressed 16-bit integer samples ({device}).\")\n",
"plt.plot(_frequency, _lzma_ratio, color=\"lightgreen\", linewidth=1, label=f\"LZMA (average {_lzma_avg:.1f}%)\")\n",
"plt.plot(_frequency, _flac_ratio, color=\"fuchsia\", linewidth=1, label=f\"FLAC (average {_flac_avg:.1f}%)\")\n",
"plt.legend()\n",
"frame1.set_xticklabels([])\n",
"\n",
"frame2 = fig1.add_axes((.1,.3,.8,.2))\n",
"frame2.set_ylabel('Difference (%)')\n",
"plt.plot(_frequency, _diff_ratio, color=\"c\", linewidth=1,\n",
" label=f\"LZMA vs. FLAC ({_diff_min:.1f}% to {_diff_max:.1f}%)\")\n",
"plt.legend()\n",
"frame2.set_xticklabels([])\n",
"\n",
"frame3 = fig1.add_axes((.1,.1,.8,.2))\n",
"frame3.set_ylabel('Power (dBFS)')\n",
"frame3.set_xlabel('Frequency (Hz)')\n",
"plt.plot(_frequency, _pwr_ratio, color=\"salmon\", linewidth=1)\n",
"\n",
"fig1.savefig(f\"{device}.svg\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "extreme-branch",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "historical-light",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment