Created
December 6, 2025 10:54
-
-
Save KaushikShresth07/8413b5ec28a5e83b3f1bd5531acc7471 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # HuskyLens Python Library - Advanced Enhanced Version | |
| # Author: Robert Prast (robert@dfrobot.com) | |
| # Enhanced with: Advanced error handling, retry mechanisms, input validation, health checks | |
| # 08/03/2020 (Original) | Advanced Enhanced 2024 | |
| # Dependencies: | |
| # pyserial | |
| # smbus or smbus2 | |
| # pypng | |
| # | |
| # How to use: | |
| # 1) First import the library into your project and connect your HuskyLens | |
| # 2) Init huskylens | |
| # A) Serial | |
| # huskyLens = HuskyLensLibrary("SERIAL","COM_PORT", speed, debug=True) | |
| # B) I2C | |
| # huskyLens = HuskyLensLibrary("I2C","", address=0xADDR, debug=True) | |
| # 3) Call your desired functions on the huskyLens object! | |
| import time | |
| import serial | |
| import png | |
| import json | |
| import sys | |
| import threading | |
| from datetime import datetime | |
| from typing import Optional, List, Union, Tuple | |
| from functools import wraps | |
| # Custom Exceptions | |
| class HuskyLensError(Exception): | |
| """Base exception for HuskyLens errors""" | |
| pass | |
| class HuskyLensConnectionError(HuskyLensError): | |
| """Connection-related errors""" | |
| pass | |
| class HuskyLensCommunicationError(HuskyLensError): | |
| """Communication errors""" | |
| pass | |
| class HuskyLensTimeoutError(HuskyLensError): | |
| """Timeout errors""" | |
| pass | |
| class HuskyLensValidationError(HuskyLensError): | |
| """Input validation errors""" | |
| pass | |
| commandHeaderAndAddress = "55AA11" | |
| algorthimsByteID = { | |
| "ALGORITHM_OBJECT_TRACKING": "0100", | |
| "ALGORITHM_FACE_RECOGNITION": "0000", | |
| "ALGORITHM_OBJECT_RECOGNITION": "0200", | |
| "ALGORITHM_LINE_TRACKING": "0300", | |
| "ALGORITHM_COLOR_RECOGNITION": "0400", | |
| "ALGORITHM_TAG_RECOGNITION": "0500", | |
| "ALGORITHM_OBJECT_CLASSIFICATION": "0600", | |
| "ALGORITHM_QR_CODE_RECOGNTITION": "0700", | |
| "ALGORITHM_BARCODE_RECOGNTITION": "0800", | |
| } | |
| def retry_on_error(max_retries=3, delay=0.1, backoff=2, exceptions=(Exception,)): | |
| """Decorator for retrying operations with exponential backoff""" | |
| def decorator(func): | |
| @wraps(func) | |
| def wrapper(self, *args, **kwargs): | |
| last_exception = None | |
| current_delay = delay | |
| for attempt in range(max_retries): | |
| try: | |
| return func(self, *args, **kwargs) | |
| except exceptions as e: | |
| last_exception = e | |
| if attempt < max_retries - 1: | |
| self._log("WARNING", f"{func.__name__} failed (attempt {attempt+1}/{max_retries}): {e}. Retrying in {current_delay:.3f}s...") | |
| time.sleep(current_delay) | |
| current_delay *= backoff | |
| else: | |
| self._log("ERROR", f"{func.__name__} failed after {max_retries} attempts: {e}") | |
| raise last_exception | |
| return wrapper | |
| return decorator | |
| def validate_input(func): | |
| """Decorator for input validation""" | |
| @wraps(func) | |
| def wrapper(self, *args, **kwargs): | |
| # Validate protocol-specific inputs | |
| if hasattr(self, 'proto') and self.proto not in ["SERIAL", "I2C"]: | |
| raise HuskyLensValidationError(f"Invalid protocol: {self.proto}") | |
| return func(self, *args, **kwargs) | |
| return wrapper | |
| class Arrow: | |
| def __init__(self, xTail, yTail, xHead, yHead, ID): | |
| # Input validation | |
| if not all(isinstance(x, (int, float)) for x in [xTail, yTail, xHead, yHead, ID]): | |
| raise HuskyLensValidationError("Arrow coordinates and ID must be numeric") | |
| if not (0 <= xTail <= 320 and 0 <= yTail <= 240): | |
| raise HuskyLensValidationError(f"Arrow tail coordinates out of range: ({xTail}, {yTail})") | |
| if not (0 <= xHead <= 320 and 0 <= yHead <= 240): | |
| raise HuskyLensValidationError(f"Arrow head coordinates out of range: ({xHead}, {yHead})") | |
| self.xTail = int(xTail) | |
| self.yTail = int(yTail) | |
| self.xHead = int(xHead) | |
| self.yHead = int(yHead) | |
| self.ID = int(ID) | |
| self.learned = True if ID > 0 else False | |
| self.type = "ARROW" | |
| def __repr__(self): | |
| return f"Arrow(ID={self.ID}, Tail=({self.xTail},{self.yTail}), Head=({self.xHead},{self.yHead}), Learned={self.learned})" | |
| def __str__(self): | |
| return self.__repr__() | |
| def length(self): | |
| """Calculate arrow length""" | |
| import math | |
| return math.sqrt((self.xHead - self.xTail)**2 + (self.yHead - self.yTail)**2) | |
| def angle(self): | |
| """Calculate arrow angle in degrees""" | |
| import math | |
| dx = self.xHead - self.xTail | |
| dy = self.yHead - self.yTail | |
| return math.degrees(math.atan2(dy, dx)) | |
| class Block: | |
| def __init__(self, x, y, width, height, ID): | |
| # Input validation | |
| if not all(isinstance(x, (int, float)) for x in [x, y, width, height, ID]): | |
| raise HuskyLensValidationError("Block coordinates, dimensions and ID must be numeric") | |
| if not (0 <= x <= 320 and 0 <= y <= 240): | |
| raise HuskyLensValidationError(f"Block position out of range: ({x}, {y})") | |
| if width <= 0 or height <= 0: | |
| raise HuskyLensValidationError(f"Block dimensions must be positive: {width}x{height}") | |
| if width > 320 or height > 240: | |
| raise HuskyLensValidationError(f"Block dimensions too large: {width}x{height}") | |
| self.x = int(x) | |
| self.y = int(y) | |
| self.width = int(width) | |
| self.height = int(height) | |
| self.ID = int(ID) | |
| self.learned = True if ID > 0 else False | |
| self.type = "BLOCK" | |
| def __repr__(self): | |
| return f"Block(ID={self.ID}, X={self.x}, Y={self.y}, W={self.width}, H={self.height}, Learned={self.learned})" | |
| def __str__(self): | |
| return self.__repr__() | |
| def center(self): | |
| """Calculate center point of the block""" | |
| return (self.x + self.width // 2, self.y + self.height // 2) | |
| def area(self): | |
| """Calculate area of the block""" | |
| return self.width * self.height | |
| def contains_point(self, x, y): | |
| """Check if point is inside block""" | |
| return self.x <= x <= self.x + self.width and self.y <= y <= self.y + self.height | |
| def overlaps(self, other): | |
| """Check if this block overlaps with another block""" | |
| if not isinstance(other, Block): | |
| return False | |
| return not (self.x + self.width < other.x or | |
| other.x + other.width < self.x or | |
| self.y + self.height < other.y or | |
| other.y + other.height < self.y) | |
| class HuskyLensLibrary: | |
| def __init__(self, proto, comPort="", speed=3000000, channel=1, address=0x32, | |
| debug=False, verbose=False, max_retries=3, timeout=5.0, | |
| health_check_interval=30.0): | |
| """ | |
| Initialize HuskyLens Library with advanced features | |
| Args: | |
| proto: "SERIAL" or "I2C" | |
| comPort: Serial port (e.g., "/dev/ttyUSB1") for SERIAL mode | |
| speed: Baud rate for SERIAL mode (default: 3000000) | |
| channel: I2C channel (default: 1 for Raspberry Pi) | |
| address: I2C address (default: 0x32) | |
| debug: Enable debug logging (default: False) | |
| verbose: Enable verbose output (default: False) | |
| max_retries: Maximum retry attempts for failed operations (default: 3) | |
| timeout: Operation timeout in seconds (default: 5.0) | |
| health_check_interval: Health check interval in seconds (default: 30.0) | |
| """ | |
| # Input validation | |
| if proto not in ["SERIAL", "I2C"]: | |
| raise HuskyLensValidationError(f"Invalid protocol: {proto}. Must be 'SERIAL' or 'I2C'") | |
| if proto == "SERIAL" and not comPort: | |
| raise HuskyLensValidationError("Serial port must be specified for SERIAL protocol") | |
| if proto == "I2C" and not (0x08 <= address <= 0x77): | |
| raise HuskyLensValidationError(f"I2C address out of valid range: 0x{address:02X}") | |
| if speed <= 0 or speed > 10000000: | |
| raise HuskyLensValidationError(f"Invalid baud rate: {speed}") | |
| if timeout <= 0: | |
| raise HuskyLensValidationError(f"Timeout must be positive: {timeout}") | |
| self.proto = proto | |
| self.address = address | |
| self.checkOnceAgain = True | |
| self.debug = debug | |
| self.verbose = verbose | |
| self.max_retries = max_retries | |
| self.timeout = timeout | |
| self.health_check_interval = health_check_interval | |
| self.command_count = 0 | |
| self.error_count = 0 | |
| self.start_time = time.time() | |
| self.last_health_check = time.time() | |
| self.connection_healthy = False | |
| self._lock = threading.Lock() # Thread safety | |
| self._cache = {} # Simple cache for frequently accessed data | |
| self._cache_timeout = 0.1 # Cache timeout in seconds | |
| self._log("INFO", f"Initializing HuskyLens with protocol: {proto}") | |
| try: | |
| if proto == "SERIAL": | |
| self._init_serial(comPort, speed) | |
| elif proto == "I2C": | |
| self._init_i2c(channel, address) | |
| # Perform initial health check | |
| self._health_check() | |
| self._log("INFO", "HuskyLens library initialized successfully") | |
| except Exception as e: | |
| self._log("ERROR", f"Initialization failed: {e}") | |
| raise HuskyLensConnectionError(f"Failed to initialize HuskyLens: {e}") from e | |
| def _init_serial(self, comPort, speed): | |
| """Initialize SERIAL connection""" | |
| self._log("DEBUG", f"Setting up SERIAL connection on {comPort} at {speed} baud") | |
| try: | |
| self.huskylensSer = serial.Serial( | |
| baudrate=speed, | |
| parity=serial.PARITY_NONE, | |
| stopbits=serial.STOPBITS_ONE, | |
| bytesize=serial.EIGHTBITS, | |
| timeout=self.timeout | |
| ) | |
| self.huskylensSer.dtr = False | |
| self.huskylensSer.rts = False | |
| time.sleep(0.1) | |
| self.huskylensSer.port = comPort | |
| self.huskylensSer.open() | |
| self._log("INFO", f"SERIAL port {comPort} opened successfully") | |
| time.sleep(2) | |
| # Send knock commands to initialize | |
| for i in range(3): | |
| self._log("DEBUG", f"Sending knock command {i+1}/3") | |
| try: | |
| self._knock_internal() | |
| except Exception as e: | |
| self._log("WARNING", f"Knock {i+1} failed: {e}") | |
| time.sleep(0.5) | |
| self.huskylensSer.flushInput() | |
| self.huskylensSer.flushOutput() | |
| self.huskylensSer.flush() | |
| self._log("INFO", "SERIAL connection initialized successfully") | |
| except serial.SerialException as e: | |
| raise HuskyLensConnectionError(f"SERIAL connection failed: {e}") from e | |
| except Exception as e: | |
| raise HuskyLensConnectionError(f"SERIAL initialization error: {e}") from e | |
| def _init_i2c(self, channel, address): | |
| """Initialize I2C connection""" | |
| self._log("DEBUG", f"Setting up I2C connection on channel {channel}, address 0x{address:02X}") | |
| try: | |
| try: | |
| import smbus | |
| self.huskylensSer = smbus.SMBus(channel) | |
| self._log("INFO", "Using smbus module") | |
| except ImportError: | |
| try: | |
| import smbus2 as smbus | |
| self.huskylensSer = smbus.SMBus(channel) | |
| self._log("INFO", "Using smbus2 module") | |
| except ImportError: | |
| raise ImportError("Neither smbus nor smbus2 is installed. Install with: sudo apt install python3-smbus") | |
| # Test I2C connection with retry | |
| for attempt in range(3): | |
| try: | |
| test_byte = self.huskylensSer.read_byte(self.address) | |
| self._log("INFO", f"I2C connection established. Test read: 0x{test_byte:02X}") | |
| break | |
| except Exception as e: | |
| if attempt < 2: | |
| self._log("WARNING", f"I2C test read failed (attempt {attempt+1}/3): {e}. Retrying...") | |
| time.sleep(0.1) | |
| else: | |
| self._log("WARNING", f"I2C test read failed (may be normal): {e}") | |
| self._log("INFO", "I2C connection initialized successfully") | |
| except ImportError as e: | |
| raise HuskyLensConnectionError(f"I2C module not available: {e}") from e | |
| except Exception as e: | |
| raise HuskyLensConnectionError(f"I2C initialization error: {e}") from e | |
| def _health_check(self): | |
| """Perform health check on connection""" | |
| try: | |
| start = time.time() | |
| result = self._knock_internal() | |
| duration = time.time() - start | |
| self.connection_healthy = True | |
| self.last_health_check = time.time() | |
| if self.verbose: | |
| self._log("DEBUG", f"Health check passed in {duration:.3f}s: {result}") | |
| return True | |
| except Exception as e: | |
| self.connection_healthy = False | |
| self._log("WARNING", f"Health check failed: {e}") | |
| return False | |
| def _check_health(self): | |
| """Check if health check is needed and perform it""" | |
| if time.time() - self.last_health_check > self.health_check_interval: | |
| self._health_check() | |
| return self.connection_healthy | |
| def _log(self, level, message): | |
| """Internal logging function with thread safety""" | |
| if self.debug or self.verbose or level in ["ERROR", "WARNING"]: | |
| timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] | |
| with self._lock: | |
| print(f"[{timestamp}] [{level}] {message}", | |
| file=sys.stderr if level == "ERROR" else sys.stdout, flush=True) | |
| @retry_on_error(max_retries=3, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def writeToHuskyLens(self, cmd): | |
| """Write command to HuskyLens with retry mechanism""" | |
| with self._lock: | |
| self.command_count += 1 | |
| self.lastCmdSent = cmd.hex() if isinstance(cmd, bytes) else str(cmd) | |
| if self.verbose: | |
| cmd_hex = cmd.hex() if isinstance(cmd, bytes) else str(cmd) | |
| self._log("DEBUG", f"Writing command #{self.command_count}: {cmd_hex[:50]}...") | |
| try: | |
| if self.proto == "SERIAL": | |
| if not self.huskylensSer.is_open: | |
| raise HuskyLensConnectionError("SERIAL port is not open") | |
| self.huskylensSer.flush() | |
| self.huskylensSer.flushInput() | |
| bytes_written = self.huskylensSer.write(cmd) | |
| if bytes_written != len(cmd): | |
| raise HuskyLensCommunicationError(f"Partial write: {bytes_written}/{len(cmd)} bytes") | |
| if self.verbose: | |
| self._log("DEBUG", f"Command written to SERIAL port: {bytes_written} bytes") | |
| else: | |
| # I2C write | |
| cmd_list = list(cmd) if isinstance(cmd, bytes) else cmd | |
| if len(cmd_list) > 32: # I2C block write limit | |
| raise HuskyLensValidationError(f"Command too long for I2C: {len(cmd_list)} bytes") | |
| self.huskylensSer.write_i2c_block_data(self.address, 12, cmd_list) | |
| if self.verbose: | |
| self._log("DEBUG", f"Command written to I2C address 0x{self.address:02X}, register 12") | |
| except serial.SerialException as e: | |
| self.error_count += 1 | |
| raise HuskyLensCommunicationError(f"SERIAL write failed: {e}") from e | |
| except OSError as e: | |
| self.error_count += 1 | |
| raise HuskyLensCommunicationError(f"I2C write failed: {e}") from e | |
| except Exception as e: | |
| self.error_count += 1 | |
| raise HuskyLensCommunicationError(f"Write failed: {e}") from e | |
| def calculateChecksum(self, hexStr): | |
| """Calculate checksum for command with validation""" | |
| if not hexStr or len(hexStr) % 2 != 0: | |
| raise HuskyLensValidationError(f"Invalid hex string length: {len(hexStr)}") | |
| try: | |
| total = 0 | |
| for i in range(0, len(hexStr), 2): | |
| total += int(hexStr[i:i+2], 16) | |
| hexStr = hex(total)[-2:] | |
| if self.verbose: | |
| self._log("DEBUG", f"Checksum calculated: {hexStr}") | |
| return hexStr | |
| except ValueError as e: | |
| raise HuskyLensValidationError(f"Invalid hex string: {hexStr}") from e | |
| def cmdToBytes(self, cmd): | |
| """Convert hex string to bytes with validation""" | |
| if not isinstance(cmd, str): | |
| raise HuskyLensValidationError(f"Command must be string, got {type(cmd)}") | |
| if len(cmd) % 2 != 0: | |
| raise HuskyLensValidationError(f"Hex string must have even length: {len(cmd)}") | |
| try: | |
| return bytes.fromhex(cmd) | |
| except ValueError as e: | |
| raise HuskyLensValidationError(f"Invalid hex string: {cmd}") from e | |
| def splitCommandToParts(self, hex_str): | |
| """Split command into parts with validation""" | |
| if not hex_str or len(hex_str) < 10: | |
| raise HuskyLensValidationError(f"Command too short: {len(hex_str)}") | |
| try: | |
| headers = hex_str[0:4] | |
| address = hex_str[4:6] | |
| data_length = int(hex_str[6:8], 16) | |
| command = hex_str[8:10] | |
| if data_length > 0: | |
| if len(hex_str) < 10 + data_length * 2: | |
| raise HuskyLensValidationError(f"Command incomplete: expected {10 + data_length * 2} chars, got {len(hex_str)}") | |
| data = hex_str[10:10+data_length*2] | |
| else: | |
| data = "" | |
| expected_len = 2 * (6 + data_length - 1) + 2 | |
| if len(hex_str) < expected_len: | |
| raise HuskyLensValidationError(f"Command incomplete: expected {expected_len} chars, got {len(hex_str)}") | |
| checkSum = hex_str[expected_len-2:expected_len] | |
| if self.verbose: | |
| self._log("DEBUG", f"Command parsed: Header={headers}, Addr={address}, Len={data_length}, Cmd={command}, DataLen={len(data)//2}, Checksum={checkSum}") | |
| return [headers, address, data_length, command, data, checkSum] | |
| except ValueError as e: | |
| raise HuskyLensValidationError(f"Invalid command format: {hex_str}") from e | |
| @retry_on_error(max_retries=2, delay=0.05, exceptions=(HuskyLensCommunicationError,)) | |
| def getBlockOrArrowCommand(self): | |
| """Get block or arrow command from HuskyLens with retry""" | |
| try: | |
| if self.proto == "SERIAL": | |
| if not self.huskylensSer.is_open: | |
| raise HuskyLensConnectionError("SERIAL port is not open") | |
| byteString = self.huskylensSer.read(5) | |
| if len(byteString) < 5: | |
| raise HuskyLensCommunicationError("Incomplete header read from SERIAL") | |
| byteString += self.huskylensSer.read(int(byteString[3])) | |
| byteString += self.huskylensSer.read(1) | |
| else: | |
| byteString = b'' | |
| for i in range(5): | |
| try: | |
| byte = self.huskylensSer.read_byte(self.address) | |
| byteString += bytes([byte]) | |
| time.sleep(0.001) | |
| except OSError as e: | |
| raise HuskyLensCommunicationError(f"I2C read failed at byte {i}: {e}") from e | |
| if len(byteString) < 5: | |
| raise HuskyLensCommunicationError("Incomplete header read from I2C") | |
| data_len = int(byteString[3]) | |
| if data_len > 100: # Sanity check | |
| raise HuskyLensValidationError(f"Suspicious data length: {data_len}") | |
| for i in range(data_len + 1): | |
| try: | |
| byte = self.huskylensSer.read_byte(self.address) | |
| byteString += bytes([byte]) | |
| time.sleep(0.001) | |
| except OSError as e: | |
| raise HuskyLensCommunicationError(f"I2C read failed at data byte {i}: {e}") from e | |
| commandSplit = self.splitCommandToParts(byteString.hex()) | |
| isBlock = True if commandSplit[3] == "2a" else False | |
| return (commandSplit[4], isBlock) | |
| except Exception as e: | |
| if isinstance(e, (HuskyLensCommunicationError, HuskyLensValidationError)): | |
| raise | |
| raise HuskyLensCommunicationError(f"Failed to get block/arrow command: {e}") from e | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def processReturnData(self, numIdLearnFlag=False, frameFlag=False): | |
| """Process return data from HuskyLens with comprehensive error handling""" | |
| # Check health before processing | |
| self._check_health() | |
| byteString = b'' | |
| start_time = time.time() | |
| try: | |
| # Add small delay for I2C to ensure data is ready | |
| if self.proto == "I2C": | |
| time.sleep(0.01) | |
| if self.proto == "SERIAL": | |
| if not self.huskylensSer.is_open: | |
| raise HuskyLensConnectionError("SERIAL port is not open") | |
| byteString = self.huskylensSer.read(5) | |
| if len(byteString) < 5: | |
| raise HuskyLensCommunicationError("Incomplete header read from SERIAL") | |
| if time.time() - start_time > self.timeout: | |
| raise HuskyLensTimeoutError("SERIAL read timeout") | |
| data_len = int(byteString[3]) | |
| byteString += self.huskylensSer.read(data_len) | |
| byteString += self.huskylensSer.read(1) | |
| else: | |
| byteString = b'' | |
| for i in range(5): | |
| try: | |
| byte = self.huskylensSer.read_byte(self.address) | |
| byteString += bytes([byte]) | |
| time.sleep(0.001) | |
| if time.time() - start_time > self.timeout: | |
| raise HuskyLensTimeoutError("I2C read timeout") | |
| except OSError as e: | |
| raise HuskyLensCommunicationError(f"I2C header read failed at byte {i}: {e}") from e | |
| if len(byteString) < 5: | |
| raise HuskyLensCommunicationError("Incomplete header read from I2C") | |
| data_len = int(byteString[3]) | |
| if data_len > 100: # Sanity check | |
| raise HuskyLensValidationError(f"Suspicious data length: {data_len}") | |
| for i in range(data_len + 1): | |
| try: | |
| byte = self.huskylensSer.read_byte(self.address) | |
| byteString += bytes([byte]) | |
| time.sleep(0.001) | |
| if time.time() - start_time > self.timeout: | |
| raise HuskyLensTimeoutError("I2C read timeout") | |
| except OSError as e: | |
| raise HuskyLensCommunicationError(f"I2C data read failed at byte {i}: {e}") from e | |
| commandSplit = self.splitCommandToParts(byteString.hex()) | |
| if self.verbose: | |
| self._log("DEBUG", f"Received command type: 0x{commandSplit[3]}") | |
| if commandSplit[3] == "2e": | |
| self.checkOnceAgain = True | |
| return "Knock Recieved" | |
| else: | |
| returnData = [] | |
| try: | |
| numberOfBlocksOrArrow = int(commandSplit[4][2:4] + commandSplit[4][0:2], 16) | |
| numberOfIDLearned = int(commandSplit[4][6:8] + commandSplit[4][4:6], 16) | |
| frameNumber = int(commandSplit[4][10:12] + commandSplit[4][8:10], 16) | |
| except (ValueError, IndexError) as e: | |
| raise HuskyLensValidationError(f"Invalid data format in response: {e}") from e | |
| if self.verbose: | |
| self._log("DEBUG", f"Data: Blocks/Arrows={numberOfBlocksOrArrow}, Learned={numberOfIDLearned}, Frame={frameNumber}") | |
| if numberOfBlocksOrArrow > 50: # Sanity check | |
| raise HuskyLensValidationError(f"Suspicious number of objects: {numberOfBlocksOrArrow}") | |
| isBlock = True | |
| for i in range(numberOfBlocksOrArrow): | |
| try: | |
| tmpObj = self.getBlockOrArrowCommand() | |
| isBlock = tmpObj[1] | |
| returnData.append(tmpObj[0]) | |
| except Exception as e: | |
| self._log("WARNING", f"Failed to get object {i+1}/{numberOfBlocksOrArrow}: {e}") | |
| break | |
| finalData = [] | |
| for i in returnData: | |
| try: | |
| tmp = [] | |
| for q in range(0, len(i), 4): | |
| if q + 4 > len(i): | |
| break | |
| low = int(i[q:q+2], 16) | |
| high = int(i[q+2:q+4], 16) | |
| if high > 0: | |
| val = low + 255 + high | |
| else: | |
| val = low | |
| tmp.append(val) | |
| if len(tmp) >= 5: # Valid object data | |
| finalData.append(tmp) | |
| except (ValueError, IndexError) as e: | |
| self._log("WARNING", f"Failed to parse object data: {e}") | |
| continue | |
| self.checkOnceAgain = True | |
| ret = self.convert_to_class_object(finalData, isBlock) | |
| if numIdLearnFlag: | |
| ret.append(numberOfIDLearned) | |
| if frameFlag: | |
| ret.append(frameNumber) | |
| if self.verbose: | |
| self._log("DEBUG", f"Processed {len(ret)} object(s)") | |
| return ret | |
| except (HuskyLensConnectionError, HuskyLensCommunicationError, HuskyLensTimeoutError, HuskyLensValidationError): | |
| raise | |
| except AttributeError as e: | |
| if "timeout" in str(e).lower() and self.proto == "I2C": | |
| self._log("WARNING", "I2C timeout attribute error (ignored), retrying...") | |
| if self.checkOnceAgain: | |
| self.checkOnceAgain = False | |
| time.sleep(0.1) | |
| return self.processReturnData(numIdLearnFlag, frameFlag) | |
| return [] | |
| else: | |
| self.error_count += 1 | |
| raise HuskyLensCommunicationError(f"AttributeError: {e}") from e | |
| except Exception as e: | |
| self.error_count += 1 | |
| self._log("WARNING", f"Error in processReturnData: {e}, attempting recovery...") | |
| if self.checkOnceAgain: | |
| if self.proto == "SERIAL": | |
| try: | |
| self.huskylensSer.timeout = 5 | |
| except AttributeError: | |
| pass | |
| self.checkOnceAgain = False | |
| if self.proto == "SERIAL": | |
| try: | |
| self.huskylensSer.timeout = .5 | |
| except AttributeError: | |
| pass | |
| return self.processReturnData(numIdLearnFlag, frameFlag) | |
| self._log("ERROR", f"Read response error: {e}") | |
| if self.proto == "SERIAL": | |
| try: | |
| self.huskylensSer.flushInput() | |
| self.huskylensSer.flushOutput() | |
| self.huskylensSer.flush() | |
| except AttributeError: | |
| pass | |
| return [] | |
| def convert_to_class_object(self, data, isBlock): | |
| """Convert raw data to Block or Arrow objects with validation""" | |
| tmp = [] | |
| for i, obj_data in enumerate(data): | |
| try: | |
| if len(obj_data) < 5: | |
| self._log("WARNING", f"Object data too short: {len(obj_data)} values") | |
| continue | |
| if isBlock: | |
| obj = Block(obj_data[0], obj_data[1], obj_data[2], obj_data[3], obj_data[4]) | |
| else: | |
| obj = Arrow(obj_data[0], obj_data[1], obj_data[2], obj_data[3], obj_data[4]) | |
| tmp.append(obj) | |
| except HuskyLensValidationError as e: | |
| self._log("WARNING", f"Invalid object data at index {i}: {e}") | |
| continue | |
| except Exception as e: | |
| self._log("WARNING", f"Failed to create object at index {i}: {e}") | |
| continue | |
| return tmp | |
| def get_stats(self): | |
| """Get library statistics""" | |
| uptime = time.time() - self.start_time | |
| return { | |
| "protocol": self.proto, | |
| "address": f"0x{self.address:02X}" if self.proto == "I2C" else "N/A", | |
| "commands_sent": self.command_count, | |
| "errors": self.error_count, | |
| "uptime_seconds": round(uptime, 2), | |
| "commands_per_second": round(self.command_count / uptime, 2) if uptime > 0 else 0, | |
| "connection_healthy": self.connection_healthy, | |
| "last_health_check": round(time.time() - self.last_health_check, 2), | |
| "error_rate": round(self.error_count / self.command_count * 100, 2) if self.command_count > 0 else 0 | |
| } | |
| def _knock_internal(self): | |
| """Internal knock method without retry""" | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002c3c") | |
| self.writeToHuskyLens(cmd) | |
| return self.processReturnData() | |
| # Public API methods with enhanced error handling and validation | |
| @validate_input | |
| @retry_on_error(max_retries=3, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def knock(self): | |
| """Send knock command to HuskyLens""" | |
| self._log("DEBUG", "Sending knock command") | |
| result = self._knock_internal() | |
| self._log("DEBUG", f"Knock response: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.2, exceptions=(HuskyLensCommunicationError,)) | |
| def learn(self, x): | |
| """Learn object with ID x""" | |
| if not isinstance(x, int) or x < 0 or x > 255: | |
| raise HuskyLensValidationError(f"Invalid learn ID: {x}. Must be 0-255") | |
| self._log("INFO", f"Learning object with ID: {x}") | |
| data = "{:04x}".format(x) | |
| part1 = data[2:] | |
| part2 = data[0:2] | |
| data = part1 + part2 | |
| dataLen = "{:02x}".format(len(data) // 2) | |
| cmd = commandHeaderAndAddress + dataLen + "36" + data | |
| cmd += self.calculateChecksum(cmd) | |
| cmd = self.cmdToBytes(cmd) | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"Learn command result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def forget(self): | |
| """Forget all learned objects""" | |
| self._log("INFO", "Forgetting all learned objects") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "003747") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"Forget command result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def setCustomName(self, name, idV): | |
| """Set custom name for object ID""" | |
| if not isinstance(name, str) or len(name) == 0: | |
| raise HuskyLensValidationError(f"Invalid name: must be non-empty string") | |
| if not isinstance(idV, int) or idV < 0 or idV > 255: | |
| raise HuskyLensValidationError(f"Invalid ID: {idV}. Must be 0-255") | |
| if len(name) > 20: | |
| raise HuskyLensValidationError(f"Name too long: {len(name)}. Max 20 characters") | |
| self._log("INFO", f"Setting custom name '{name}' for ID {idV}") | |
| nameDataSize = "{:02x}".format(len(name) + 1) | |
| name_hex = name.encode("utf-8").hex() + "00" | |
| localId = "{:02x}".format(idV) | |
| data = localId + nameDataSize + name_hex | |
| dataLen = "{:02x}".format(len(data) // 2) | |
| cmd = commandHeaderAndAddress + dataLen + "2f" + data | |
| cmd += self.calculateChecksum(cmd) | |
| cmd = self.cmdToBytes(cmd) | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"SetCustomName result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def customText(self, nameV, xV, yV): | |
| """Display custom text on HuskyLens screen""" | |
| if not isinstance(nameV, str) or len(nameV) == 0: | |
| raise HuskyLensValidationError(f"Invalid text: must be non-empty string") | |
| if not isinstance(xV, (int, float)) or not (0 <= xV <= 320): | |
| raise HuskyLensValidationError(f"Invalid X coordinate: {xV}. Must be 0-320") | |
| if not isinstance(yV, (int, float)) or not (0 <= yV <= 240): | |
| raise HuskyLensValidationError(f"Invalid Y coordinate: {yV}. Must be 0-240") | |
| if len(nameV) > 20: | |
| raise HuskyLensValidationError(f"Text too long: {len(nameV)}. Max 20 characters") | |
| self._log("INFO", f"Displaying custom text '{nameV}' at ({xV}, {yV})") | |
| name = nameV.encode("utf-8").hex() | |
| nameDataSize = "{:02x}".format(len(name) // 2) | |
| if xV > 255: | |
| x = "ff" + "{:02x}".format(int(xV) % 255) | |
| else: | |
| x = "00" + "{:02x}".format(int(xV)) | |
| y = "{:02x}".format(int(yV)) | |
| data = nameDataSize + x + y + name | |
| dataLen = "{:02x}".format(len(data) // 2) | |
| cmd = commandHeaderAndAddress + dataLen + "34" + data | |
| cmd += self.calculateChecksum(cmd) | |
| cmd = self.cmdToBytes(cmd) | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"CustomText result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def clearText(self): | |
| """Clear custom text from HuskyLens screen""" | |
| self._log("INFO", "Clearing custom text") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "003545") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"ClearText result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def requestAll(self): | |
| """Request all data from HuskyLens""" | |
| self._log("DEBUG", "Requesting all data") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002030") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("DEBUG", f"RequestAll returned {len(result) if isinstance(result, list) else 0} object(s)") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.2, exceptions=(HuskyLensCommunicationError,)) | |
| def saveModelToSDCard(self, idVal): | |
| """Save model to SD card""" | |
| if not isinstance(idVal, int) or idVal < 0 or idVal > 65535: | |
| raise HuskyLensValidationError(f"Invalid model ID: {idVal}") | |
| self._log("INFO", f"Saving model ID {idVal} to SD card") | |
| idVal_hex = "{:04x}".format(idVal) | |
| idVal_hex = idVal_hex[2:] + idVal_hex[0:2] | |
| cmd = commandHeaderAndAddress + "0232" + idVal_hex | |
| cmd += self.calculateChecksum(cmd) | |
| cmd = self.cmdToBytes(cmd) | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"SaveModel result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.2, exceptions=(HuskyLensCommunicationError,)) | |
| def loadModelFromSDCard(self, idVal): | |
| """Load model from SD card""" | |
| if not isinstance(idVal, int) or idVal < 0 or idVal > 65535: | |
| raise HuskyLensValidationError(f"Invalid model ID: {idVal}") | |
| self._log("INFO", f"Loading model ID {idVal} from SD card") | |
| idVal_hex = "{:04x}".format(idVal) | |
| idVal_hex = idVal_hex[2:] + idVal_hex[0:2] | |
| cmd = commandHeaderAndAddress + "0233" + idVal_hex | |
| cmd += self.calculateChecksum(cmd) | |
| cmd = self.cmdToBytes(cmd) | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"LoadModel result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.5, exceptions=(HuskyLensCommunicationError,)) | |
| def savePictureToSDCard(self): | |
| """Save picture to SD card""" | |
| self._log("INFO", "Saving picture to SD card") | |
| if self.proto == "SERIAL": | |
| try: | |
| self.huskylensSer.timeout = 5 | |
| except AttributeError: | |
| pass | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "003040") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"SavePicture result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.2, exceptions=(HuskyLensCommunicationError,)) | |
| def saveScreenshotToSDCard(self): | |
| """Save screenshot to SD card""" | |
| self._log("INFO", "Saving screenshot to SD card") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "003949") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"SaveScreenshot result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def blocks(self): | |
| """Get all blocks (detected objects)""" | |
| self._log("DEBUG", "Requesting blocks") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002131") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData()[0] | |
| self._log("DEBUG", f"Blocks returned: {len(result) if isinstance(result, list) else 0} block(s)") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def arrows(self): | |
| """Get all arrows (direction indicators)""" | |
| self._log("DEBUG", "Requesting arrows") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002232") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData()[0] | |
| self._log("DEBUG", f"Arrows returned: {len(result) if isinstance(result, list) else 0} arrow(s)") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def learned(self): | |
| """Get all learned objects""" | |
| self._log("DEBUG", "Requesting learned objects") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002333") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData()[0] | |
| self._log("DEBUG", f"Learned returned: {len(result) if isinstance(result, list) else 0} object(s)") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def learnedBlocks(self): | |
| """Get learned blocks""" | |
| self._log("DEBUG", "Requesting learned blocks") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002434") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData()[0] | |
| self._log("DEBUG", f"LearnedBlocks returned: {len(result) if isinstance(result, list) else 0} block(s)") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def learnedArrows(self): | |
| """Get learned arrows""" | |
| self._log("DEBUG", "Requesting learned arrows") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002535") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData()[0] | |
| self._log("DEBUG", f"LearnedArrows returned: {len(result) if isinstance(result, list) else 0} arrow(s)") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def getObjectByID(self, idVal): | |
| """Get object by ID""" | |
| if not isinstance(idVal, int) or idVal < 0 or idVal > 255: | |
| raise HuskyLensValidationError(f"Invalid object ID: {idVal}. Must be 0-255") | |
| self._log("DEBUG", f"Requesting object with ID {idVal}") | |
| idVal_hex = "{:04x}".format(idVal) | |
| idVal_hex = idVal_hex[2:] + idVal_hex[0:2] | |
| cmd = commandHeaderAndAddress + "0226" + idVal_hex | |
| cmd += self.calculateChecksum(cmd) | |
| cmd = self.cmdToBytes(cmd) | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData()[0] | |
| self._log("DEBUG", f"GetObjectByID returned: {len(result) if isinstance(result, list) else 0} object(s)") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def getBlocksByID(self, idVal): | |
| """Get blocks by ID""" | |
| if not isinstance(idVal, int) or idVal < 0 or idVal > 255: | |
| raise HuskyLensValidationError(f"Invalid block ID: {idVal}. Must be 0-255") | |
| self._log("DEBUG", f"Requesting blocks with ID {idVal}") | |
| idVal_hex = "{:04x}".format(idVal) | |
| idVal_hex = idVal_hex[2:] + idVal_hex[0:2] | |
| cmd = commandHeaderAndAddress + "0227" + idVal_hex | |
| cmd += self.calculateChecksum(cmd) | |
| cmd = self.cmdToBytes(cmd) | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData()[0] | |
| self._log("DEBUG", f"GetBlocksByID returned: {len(result) if isinstance(result, list) else 0} block(s)") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def getArrowsByID(self, idVal): | |
| """Get arrows by ID""" | |
| if not isinstance(idVal, int) or idVal < 0 or idVal > 255: | |
| raise HuskyLensValidationError(f"Invalid arrow ID: {idVal}. Must be 0-255") | |
| self._log("DEBUG", f"Requesting arrows with ID {idVal}") | |
| idVal_hex = "{:04x}".format(idVal) | |
| idVal_hex = idVal_hex[2:] + idVal_hex[0:2] | |
| cmd = commandHeaderAndAddress + "0228" + idVal_hex | |
| cmd += self.calculateChecksum(cmd) | |
| cmd = self.cmdToBytes(cmd) | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData()[0] | |
| self._log("DEBUG", f"GetArrowsByID returned: {len(result) if isinstance(result, list) else 0} arrow(s)") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def algorthim(self, alg): | |
| """Set algorithm""" | |
| if not isinstance(alg, str): | |
| raise HuskyLensValidationError(f"Algorithm must be string, got {type(alg)}") | |
| if alg not in algorthimsByteID: | |
| error_msg = f"INCORRECT ALGORITHM NAME: {alg}. Valid algorithms: {list(algorthimsByteID.keys())}" | |
| self._log("ERROR", error_msg) | |
| raise HuskyLensValidationError(error_msg) | |
| self._log("INFO", f"Setting algorithm: {alg}") | |
| cmd = commandHeaderAndAddress + "022d" + algorthimsByteID[alg] | |
| cmd += self.calculateChecksum(cmd) | |
| cmd = self.cmdToBytes(cmd) | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData() | |
| self._log("INFO", f"Algorithm set result: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def count(self): | |
| """Get total count of objects""" | |
| self._log("DEBUG", "Requesting object count") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002030") | |
| self.writeToHuskyLens(cmd) | |
| result = len(self.processReturnData()) | |
| self._log("DEBUG", f"Count: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def learnedObjCount(self): | |
| """Get count of learned objects""" | |
| self._log("DEBUG", "Requesting learned object count") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002030") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData(numIdLearnFlag=True)[-1] | |
| self._log("DEBUG", f"LearnedObjCount: {result}") | |
| return result | |
| @validate_input | |
| @retry_on_error(max_retries=2, delay=0.1, exceptions=(HuskyLensCommunicationError,)) | |
| def frameNumber(self): | |
| """Get current frame number""" | |
| self._log("DEBUG", "Requesting frame number") | |
| cmd = self.cmdToBytes(commandHeaderAndAddress + "002030") | |
| self.writeToHuskyLens(cmd) | |
| result = self.processReturnData(frameFlag=True)[-1] | |
| self._log("DEBUG", f"FrameNumber: {result}") | |
| return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment