Skip to content

Instantly share code, notes, and snippets.

@parkerlreed
Last active October 27, 2025 18:35
Show Gist options
  • Select an option

  • Save parkerlreed/0ce45e907ce536a0541afb90b5b49350 to your computer and use it in GitHub Desktop.

Select an option

Save parkerlreed/0ce45e907ce536a0541afb90b5b49350 to your computer and use it in GitHub Desktop.
FNIRSI FNB58 Bluetoth data logger - Requires bleak - Currently prints voltage, amperage, and wattage: more may be possible
import asyncio
import struct
import sys
from bleak import BleakClient
# Bluetooth device address and attributes
DEVICE_ADDRESS = "98:DA:B0:08:A1:82" # Replace with your device's Bluetooth MAC address
WRITE_CHARACTERISTIC = "0000ffe9-0000-1000-8000-00805f9b34fb"
NOTIFY_CHARACTERISTIC = "0000ffe4-0000-1000-8000-00805f9b34fb"
# Constants for offset and scaling
offset = 21
scale = 10000 # The known scaling factor
# Values to write
WRITE_VALUES = [
bytes([0xaa, 0x81, 0x00, 0xf4]),
bytes([0xaa, 0x82, 0x00, 0xa7])
]
def parse_data(data):
# Define a reasonable voltage range for filtering
min_voltage = 0.0 # Adjust this lower bound as needed
max_voltage = 150.0 # Adjust this upper bound as needed
"""Parses data from notification and extracts values at offsets."""
if len(data) < offset + 4:
# Insufficient data for parsing
return
# Unpack the 4-byte values directly from data at each offset
voltage, amperage, wattage = (value / scale for value in struct.unpack_from('<iii', data, offset))
# Print only if the value is within the specified range
if min_voltage <= voltage <= max_voltage:
print(f"Voltage: {voltage} Amperage: {amperage} Wattage: {wattage}")
sys.stdout.flush() # Ensure output is immediately flushed
async def enable_notifications(client):
"""Enable notifications and handle incoming data."""
async def notification_handler(sender, data):
parse_data(data)
# Enable notifications
await client.start_notify(NOTIFY_CHARACTERISTIC, notification_handler)
print("Notifications enabled.")
sys.stdout.flush() # Ensure output is immediately flushed
async def main():
async with BleakClient(DEVICE_ADDRESS) as client:
print("Connected to Bluetooth device.")
sys.stdout.flush()
# Write values to the specified characteristic
for value in WRITE_VALUES:
await client.write_gatt_char(WRITE_CHARACTERISTIC, value)
print(f"Wrote value: {value.hex()}")
sys.stdout.flush()
# Enable notifications on the notify characteristic
await enable_notifications(client)
# Keep the script running to continue receiving notifications
await asyncio.sleep(float('inf'))
# Run the async main function
asyncio.run(main())
import sys
import re
import select
from PyQt6.QtWidgets import QApplication, QLabel, QVBoxLayout, QWidget, QMainWindow
from PyQt6.QtCore import QTimer, Qt
from PyQt6.QtGui import QFont
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas # <-- PyQt6-friendly
from matplotlib.figure import Figure
from matplotlib import pyplot as plt
import numpy as np
from collections import deque
# Accepts 123, 123.456, or 1.23e+02
NUM = r"([0-9]*\.?[0-9]+(?:[eE][+-]?\d+)?)"
PATTERN = re.compile(
rf"Voltage:\s*{NUM}\s*(?:V)?\s+Amperage:\s*{NUM}\s*(?:A)?\s+Wattage:\s*{NUM}\s*(?:W)?"
)
class PowerMonitorGUI(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("Power Monitor")
self.setGeometry(100, 100, 300, 200)
self.layout = QVBoxLayout()
self.voltage_label = QLabel("Voltage: 0.0000 V")
self.setup_label(self.voltage_label, 'yellow')
self.voltage_label.mousePressEvent = lambda event: self.open_graph_window("Voltage", 'yellow')
self.amperage_label = QLabel("Amperage: 0.0000 A")
self.setup_label(self.amperage_label, 'cyan')
self.amperage_label.mousePressEvent = lambda event: self.open_graph_window("Amperage", 'cyan')
self.wattage_label = QLabel("Wattage: 0.0000 W")
self.setup_label(self.wattage_label, 'purple')
self.wattage_label.mousePressEvent = lambda event: self.open_graph_window("Wattage", 'purple')
self.layout.addWidget(self.voltage_label)
self.layout.addWidget(self.amperage_label)
self.layout.addWidget(self.wattage_label)
self.setLayout(self.layout)
self.timer = QTimer()
self.timer.timeout.connect(self.update_display_from_stdin)
self.timer.start(100)
self.graph_windows = {}
def setup_label(self, label, color):
font = QFont("Arial", 20)
label.setFont(font)
label.setStyleSheet(f"color: {color}; background-color: black;")
label.setAlignment(Qt.AlignmentFlag.AlignCenter)
def update_display_from_stdin(self):
# Drain all available lines without blocking
while True:
r, _, _ = select.select([sys.stdin], [], [], 0)
if not r:
break
line = sys.stdin.readline()
if not line: # EOF
break
line = line.strip()
m = PATTERN.search(line)
if not m:
# If your upstream sometimes prints only one metric per line,
# you can accumulate and parse across lines here.
continue
voltage, amperage, wattage = map(float, m.groups())
self.voltage_label.setText(f"Voltage: {voltage:.4f} V")
self.amperage_label.setText(f"Amperage: {amperage:.4f} A")
self.wattage_label.setText(f"Wattage: {wattage:.4f} W")
for key, window in self.graph_windows.items():
if key == "Voltage":
window.update_plot(voltage)
elif key == "Amperage":
window.update_plot(amperage)
elif key == "Wattage":
window.update_plot(wattage)
def open_graph_window(self, label, color):
if label not in self.graph_windows:
self.graph_windows[label] = GraphWindow(label, color, self)
self.graph_windows[label].show()
class GraphWindow(QMainWindow):
def __init__(self, label, color, parent):
super().__init__()
self.label = label
self.color = color
self.parent = parent
self.setWindowTitle(f"{label} Graph")
self.canvas = PlotCanvas(self, label, color)
self.setCentralWidget(self.canvas)
def update_plot(self, value):
self.canvas.update_data(value)
def closeEvent(self, event):
if self.label in self.parent.graph_windows:
del self.parent.graph_windows[self.label]
event.accept()
class PlotCanvas(FigureCanvas):
def __init__(self, parent, label, color):
plt.style.use('dark_background')
fig = Figure()
self.axes = fig.add_subplot(111)
super().__init__(fig)
self.setParent(parent)
self.label = label
self.color = color
self.data = deque([0] * 50, maxlen=50)
self.x = np.linspace(-5, 0, 50)
(self.plot_ref,) = self.axes.plot(self.x, self.data, label=label, color=self.color)
self.axes.set_facecolor("#2e2e2e")
self.axes.set_title(f"{label} Over Time", color="white")
self.axes.set_xlabel("Time (s)", color="white")
self.axes.set_ylabel(label, color="white")
self.axes.legend(loc="upper right", facecolor="#2e2e2e", edgecolor="white")
self.axes.grid(color="gray", linestyle="--", linewidth=0.5)
self.axes.tick_params(colors="white")
if label == "Voltage":
self.axes.set_ylim(0, 25) # leave headroom
if label == "Amperage":
self.axes.set_ylim(0, 10)
if label == "Wattage":
self.axes.set_ylim(0, 150) # avoid clipping ~80W
def update_data(self, value):
self.data.append(value)
self.plot_ref.set_ydata(self.data)
if self.label not in ("Voltage", "Wattage"):
self.axes.relim()
self.axes.autoscale_view()
self.draw()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = PowerMonitorGUI()
window.show()
sys.exit(app.exec())
@parkerlreed
Copy link
Author

parkerlreed commented Nov 13, 2024

GUI with matplotlib. Click on labels to open.

python fnirsi-fnb58.py | python gui.py

@parkerlreed
Copy link
Author

You can adjust the axis bounds at the bottom of the file.

@parkerlreed
Copy link
Author

image

@jv-k
Copy link

jv-k commented Aug 17, 2025

Great work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment