-
-
Save drewclauson/74660ca678df122c0c8ab4aa07e511b6 to your computer and use it in GitHub Desktop.
Serial Sump Pump Inspecter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| from collections.abc import ( | |
| Callable, | |
| ) | |
| import datetime | |
| import enum | |
| import json | |
| import os | |
| import pathlib | |
| import sys | |
| import threading | |
| import time | |
| import traceback | |
| from types import ( | |
| TracebackType, | |
| ) | |
| from typing import ( | |
| Dict, | |
| List, | |
| Optional, | |
| Tuple, | |
| ) | |
| import serial | |
| ESC = "\033" | |
| CSI = f"{ESC}[" | |
| BLACK = 30 | |
| RED = 31 | |
| GREEN = 32 | |
| YELLOW = 33 | |
| LGRAY = 37 | |
| GRAY = 90 | |
| WHITE = 97 | |
| RESET = f"{CSI}0m" | |
| class IllegalStateException(ValueError): | |
| pass | |
| class Event(enum.Enum): | |
| SILENCE = (1, "silence pressed") | |
| RESET = (2, "reset pressed") | |
| TEST = (3, "test pressed") | |
| AC_ON = (4, "AC power on") | |
| AC_OFF = (5, "AC power off") | |
| BAT_ON = (6, "battery power on") | |
| BAT_OFF = (7, "battery power off") | |
| PUMP_ON = (8, "pump turned on") | |
| PUMP_OFF = (9, "pump turned off") | |
| FUSE_IN = (10, "fuse removed") | |
| FUSE_OUT = (11, "fuse replaced") | |
| EXIT = (99, "exit") | |
| @staticmethod | |
| def get_by_id(id: int) -> Event: | |
| if not hasattr(Event, "_by_id"): | |
| setattr( | |
| Event, | |
| "_by_id", | |
| {e.value[0]: e for e in Event}, | |
| ) | |
| return getattr(Event, "_by_id")[id] | |
| class LogType(enum.Enum): | |
| STATE = 1 | |
| EVENT = 2 | |
| INFO = 3 | |
| class Logger(object): | |
| def __init__(self) -> None: | |
| self._lock = threading.Lock() | |
| self._log: List[Tuple[float, LogType, str]] = [] | |
| self._data: List[Tuple[float, str, str, Optional[List[int]]]] = [] | |
| def log(self, log_type: LogType, text: str, data: List[int] = None) -> None: | |
| self._lock.acquire() | |
| t = time.time() | |
| self._log.append((t, log_type, text)) | |
| if data is not None: | |
| self._data.append((t, f"{log_type.name}", text, data)) | |
| else: | |
| self._data.append((t, f"{log_type.name}", text)) | |
| self._lock.release() | |
| def log_state(self, state: str, data: List[int]) -> None: | |
| self.log(LogType.STATE, state, data) | |
| def log_event(self, event: Event) -> None: | |
| self.log(LogType.EVENT, f"{event.value[1]}") | |
| def get_entries(self) -> List[Tuple[int, LogType, str]]: | |
| self._lock.acquire(); | |
| try: | |
| return list(self._log) | |
| finally: | |
| self._lock.release() | |
| def __enter__(self) -> Logger: | |
| return self | |
| def __exit__(self, *_, **__) -> Literal(False): | |
| data_file = pathlib.Path(f"dump-{datetime.datetime.now().isoformat()}.json").resolve() | |
| with data_file.open("w", encoding="utf-8") as target: | |
| json.dump(self._data, target) | |
| self.log(LogType.INFO, f"Data file: {data_file}") | |
| return False | |
| class Console(object): | |
| def __init__(self) -> None: | |
| self._lock = threading.Lock() | |
| def print_at(self, row: int, column: int, text: str) -> None: | |
| self._lock.acquire() | |
| sys.stdout.write(f"{CSI}{row};{column}H{text}") | |
| sys.stdout.flush() | |
| self._lock.release() | |
| def print_at_and_clear_line(self, row: int, column: int, text: str) -> None: | |
| self.print_at(row, column, f"{CSI}0K\r{text}") | |
| def go_to(self, row: int, column: int) -> None: | |
| self.print_at(row, column, "") | |
| def go_to_and_clear(self, row: int, column: int) -> None: | |
| self.print_at(row, column, f"{CSI}0K\r") | |
| class Watcher(object): | |
| def __init__( | |
| self, | |
| logger: Logger, | |
| console: Console, | |
| return_cursor: Callable[[], None], | |
| ) -> None: | |
| self._console = console | |
| self._logger = logger | |
| self._return_cursor = return_cursor | |
| self._thread: threading.Thread = None | |
| self._started: bool = False | |
| self._running: bool = False | |
| self._stop: bool = False | |
| self._lock = threading.RLock() | |
| self._running_cv: threading.Condition = None | |
| self._exc_info: Tuple[type[Exception], Exception, TracebackType] = None | |
| def start(self) -> Watcher: | |
| self._lock.acquire() | |
| if self._started or self._running: | |
| raise IllegalStateException("Watcher already running") | |
| self._started = True | |
| self._running = False | |
| self._stop = False | |
| self._lock.release() | |
| self._running_cv = threading.Condition() | |
| self._running_cv.acquire() | |
| self._thread = threading.Thread(target=self, name="Serial-Inspector") | |
| self._thread.start() | |
| self._running_cv.wait_for(lambda: self._running is True or self._exc_info is not None) | |
| self._running_cv.release() | |
| if self._exc_info is not None: | |
| raise self._exc_info[1].with_traceback(self._exc_info[2]) | |
| return self | |
| def stop(self) -> Watcher: | |
| self._lock.acquire() | |
| if not self._running: | |
| raise IllegalStateException("Watcher not running") | |
| self._stop = True | |
| self._lock.release() | |
| self._thread.join() | |
| return self | |
| def _stop_ordered(self) -> bool: | |
| self._lock.acquire() | |
| try: | |
| return self._stop | |
| finally: | |
| self._lock.release() | |
| def __call__(self) -> None: | |
| try: | |
| self._console.print_at(1, 1, "(no state)") | |
| self._return_cursor() | |
| with serial.Serial( | |
| port="/dev/tty.PL2303G-USBtoUART1410", | |
| baudrate=1200, | |
| bytesize=serial.EIGHTBITS, | |
| parity=serial.PARITY_NONE, | |
| stopbits=serial.STOPBITS_ONE, | |
| timeout=2, | |
| ) as port: | |
| self._lock.acquire() | |
| self._running = True | |
| self._lock.release() | |
| self._running_cv.acquire() | |
| self._running_cv.notify_all() | |
| self._running_cv.release() | |
| i = 1 | |
| while not self._stop_ordered(): | |
| data = port.read(12) # read 12 bytes | |
| self._logger.log_state( | |
| f"{' '.join(f'{b:02x}' for b in data)} " | |
| f"({' '.join(f'{b:08b}' for b in data)})" | |
| #f"({' '.join(f'{b:03}' for b in data)}) " | |
| #f"({''.join(chr(b) if b > 32 and b < 127 else chr(254) for b in data)})" | |
| , | |
| [b for b in data], | |
| ) | |
| self._console.print_at(1, 1, f"{i:04}: {' '.join(f'{b:02x}' for b in data)}") | |
| self._return_cursor() | |
| i += 1 | |
| except Exception: | |
| traceback.print_exc() | |
| self._exc_info = sys.exc_info() | |
| self._running_cv.acquire() | |
| self._running_cv.notify_all() | |
| self._running_cv.release() | |
| finally: | |
| self._lock.acquire() | |
| self._running = False | |
| self._lock.release() | |
| def __enter__(self) -> Watcher: | |
| return self.start() | |
| def __exit__(self, *_, **__) -> Literal[False]: | |
| self.stop() | |
| return False | |
| def main() -> int: | |
| columns, rows = os.get_terminal_size() | |
| clear = "\n" * rows | |
| print(f"{clear}") | |
| logger = Logger() | |
| console = Console() | |
| console.print_at_and_clear_line(2, 1, "-" * columns) | |
| console.print_at_and_clear_line(3, 1, "Event Options:") | |
| i = 4 | |
| for event in Event: | |
| id, label = event.value | |
| console.print_at_and_clear_line(i, 1, f"{id}) {label}") | |
| i += 1 | |
| prompt = "Enter Event >> " | |
| console.print_at_and_clear_line(i, 1, prompt) | |
| return_args = (i, len(prompt) + 1) | |
| i += 1 | |
| def return_to_cursor(): | |
| console.go_to(*return_args) | |
| start_time = time.time() | |
| with logger, Watcher(logger, console, return_to_cursor): | |
| while True: | |
| command = sys.stdin.readline().strip() | |
| console.print_at_and_clear_line(i + 1, 1, f"Debug: {command}") | |
| event: Event | |
| try: | |
| event = Event.get_by_id(int(command)) | |
| except (ValueError, KeyError): | |
| event = None | |
| if event: | |
| logger.log_event(event) | |
| console.print_at_and_clear_line(i, 1, f"Last event: {event.value[1]}") | |
| console.go_to_and_clear(*return_args) | |
| if event == Event.EXIT: | |
| break | |
| console.go_to(i + 2, 1) | |
| print("\nLog:") | |
| for timestamp, log_type, text in logger.get_entries(): | |
| print(f"{round(timestamp - start_time, 3):8.3f} {log_type.name} {text}") | |
| return 0 | |
| if __name__ == '__main__': | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment