Skip to content

Instantly share code, notes, and snippets.

@drewclauson
Forked from beamerblvd/inspector.py
Created July 11, 2025 14:31
Show Gist options
  • Select an option

  • Save drewclauson/74660ca678df122c0c8ab4aa07e511b6 to your computer and use it in GitHub Desktop.

Select an option

Save drewclauson/74660ca678df122c0c8ab4aa07e511b6 to your computer and use it in GitHub Desktop.
Serial Sump Pump Inspecter
from __future__ import annotations
from collections.abc import (
Callable,
)
import datetime
import enum
import json
import os
import pathlib
import sys
import threading
import time
import traceback
from types import (
TracebackType,
)
from typing import (
Dict,
List,
Optional,
Tuple,
)
import serial
ESC = "\033"
CSI = f"{ESC}["
BLACK = 30
RED = 31
GREEN = 32
YELLOW = 33
LGRAY = 37
GRAY = 90
WHITE = 97
RESET = f"{CSI}0m"
class IllegalStateException(ValueError):
pass
class Event(enum.Enum):
SILENCE = (1, "silence pressed")
RESET = (2, "reset pressed")
TEST = (3, "test pressed")
AC_ON = (4, "AC power on")
AC_OFF = (5, "AC power off")
BAT_ON = (6, "battery power on")
BAT_OFF = (7, "battery power off")
PUMP_ON = (8, "pump turned on")
PUMP_OFF = (9, "pump turned off")
FUSE_IN = (10, "fuse removed")
FUSE_OUT = (11, "fuse replaced")
EXIT = (99, "exit")
@staticmethod
def get_by_id(id: int) -> Event:
if not hasattr(Event, "_by_id"):
setattr(
Event,
"_by_id",
{e.value[0]: e for e in Event},
)
return getattr(Event, "_by_id")[id]
class LogType(enum.Enum):
STATE = 1
EVENT = 2
INFO = 3
class Logger(object):
def __init__(self) -> None:
self._lock = threading.Lock()
self._log: List[Tuple[float, LogType, str]] = []
self._data: List[Tuple[float, str, str, Optional[List[int]]]] = []
def log(self, log_type: LogType, text: str, data: List[int] = None) -> None:
self._lock.acquire()
t = time.time()
self._log.append((t, log_type, text))
if data is not None:
self._data.append((t, f"{log_type.name}", text, data))
else:
self._data.append((t, f"{log_type.name}", text))
self._lock.release()
def log_state(self, state: str, data: List[int]) -> None:
self.log(LogType.STATE, state, data)
def log_event(self, event: Event) -> None:
self.log(LogType.EVENT, f"{event.value[1]}")
def get_entries(self) -> List[Tuple[int, LogType, str]]:
self._lock.acquire();
try:
return list(self._log)
finally:
self._lock.release()
def __enter__(self) -> Logger:
return self
def __exit__(self, *_, **__) -> Literal(False):
data_file = pathlib.Path(f"dump-{datetime.datetime.now().isoformat()}.json").resolve()
with data_file.open("w", encoding="utf-8") as target:
json.dump(self._data, target)
self.log(LogType.INFO, f"Data file: {data_file}")
return False
class Console(object):
def __init__(self) -> None:
self._lock = threading.Lock()
def print_at(self, row: int, column: int, text: str) -> None:
self._lock.acquire()
sys.stdout.write(f"{CSI}{row};{column}H{text}")
sys.stdout.flush()
self._lock.release()
def print_at_and_clear_line(self, row: int, column: int, text: str) -> None:
self.print_at(row, column, f"{CSI}0K\r{text}")
def go_to(self, row: int, column: int) -> None:
self.print_at(row, column, "")
def go_to_and_clear(self, row: int, column: int) -> None:
self.print_at(row, column, f"{CSI}0K\r")
class Watcher(object):
def __init__(
self,
logger: Logger,
console: Console,
return_cursor: Callable[[], None],
) -> None:
self._console = console
self._logger = logger
self._return_cursor = return_cursor
self._thread: threading.Thread = None
self._started: bool = False
self._running: bool = False
self._stop: bool = False
self._lock = threading.RLock()
self._running_cv: threading.Condition = None
self._exc_info: Tuple[type[Exception], Exception, TracebackType] = None
def start(self) -> Watcher:
self._lock.acquire()
if self._started or self._running:
raise IllegalStateException("Watcher already running")
self._started = True
self._running = False
self._stop = False
self._lock.release()
self._running_cv = threading.Condition()
self._running_cv.acquire()
self._thread = threading.Thread(target=self, name="Serial-Inspector")
self._thread.start()
self._running_cv.wait_for(lambda: self._running is True or self._exc_info is not None)
self._running_cv.release()
if self._exc_info is not None:
raise self._exc_info[1].with_traceback(self._exc_info[2])
return self
def stop(self) -> Watcher:
self._lock.acquire()
if not self._running:
raise IllegalStateException("Watcher not running")
self._stop = True
self._lock.release()
self._thread.join()
return self
def _stop_ordered(self) -> bool:
self._lock.acquire()
try:
return self._stop
finally:
self._lock.release()
def __call__(self) -> None:
try:
self._console.print_at(1, 1, "(no state)")
self._return_cursor()
with serial.Serial(
port="/dev/tty.PL2303G-USBtoUART1410",
baudrate=1200,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=2,
) as port:
self._lock.acquire()
self._running = True
self._lock.release()
self._running_cv.acquire()
self._running_cv.notify_all()
self._running_cv.release()
i = 1
while not self._stop_ordered():
data = port.read(12) # read 12 bytes
self._logger.log_state(
f"{' '.join(f'{b:02x}' for b in data)} "
f"({' '.join(f'{b:08b}' for b in data)})"
#f"({' '.join(f'{b:03}' for b in data)}) "
#f"({''.join(chr(b) if b > 32 and b < 127 else chr(254) for b in data)})"
,
[b for b in data],
)
self._console.print_at(1, 1, f"{i:04}: {' '.join(f'{b:02x}' for b in data)}")
self._return_cursor()
i += 1
except Exception:
traceback.print_exc()
self._exc_info = sys.exc_info()
self._running_cv.acquire()
self._running_cv.notify_all()
self._running_cv.release()
finally:
self._lock.acquire()
self._running = False
self._lock.release()
def __enter__(self) -> Watcher:
return self.start()
def __exit__(self, *_, **__) -> Literal[False]:
self.stop()
return False
def main() -> int:
columns, rows = os.get_terminal_size()
clear = "\n" * rows
print(f"{clear}")
logger = Logger()
console = Console()
console.print_at_and_clear_line(2, 1, "-" * columns)
console.print_at_and_clear_line(3, 1, "Event Options:")
i = 4
for event in Event:
id, label = event.value
console.print_at_and_clear_line(i, 1, f"{id}) {label}")
i += 1
prompt = "Enter Event >> "
console.print_at_and_clear_line(i, 1, prompt)
return_args = (i, len(prompt) + 1)
i += 1
def return_to_cursor():
console.go_to(*return_args)
start_time = time.time()
with logger, Watcher(logger, console, return_to_cursor):
while True:
command = sys.stdin.readline().strip()
console.print_at_and_clear_line(i + 1, 1, f"Debug: {command}")
event: Event
try:
event = Event.get_by_id(int(command))
except (ValueError, KeyError):
event = None
if event:
logger.log_event(event)
console.print_at_and_clear_line(i, 1, f"Last event: {event.value[1]}")
console.go_to_and_clear(*return_args)
if event == Event.EXIT:
break
console.go_to(i + 2, 1)
print("\nLog:")
for timestamp, log_type, text in logger.get_entries():
print(f"{round(timestamp - start_time, 3):8.3f} {log_type.name} {text}")
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment