Created
July 21, 2025 20:55
-
-
Save symbioquine/3d040f482dfb920a74a44b2ba8c87bf7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/env python3 | |
| # pymodbus=3.5.2 | |
| import logging | |
| from datetime import datetime, timezone | |
| from zoneinfo import ZoneInfo | |
| from pymodbus.client.serial import ModbusSerialClient as ModbusClient | |
| from pymodbus.exceptions import ModbusException | |
| FORMAT = ('%(asctime)-15s %(threadName)-15s' | |
| ' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s') | |
| logging.basicConfig(format=FORMAT) | |
| log = logging.getLogger() | |
| log.setLevel(logging.DEBUG) | |
| _logger = logging.getLogger(__name__) | |
| PST = ZoneInfo('US/Pacific') | |
| def pack_as_int(v0, v1): | |
| return ((v0 << 8) | v1) & 0xFFFF | |
| def upper_int(v): | |
| return (v >> 8) & 0xFF | |
| def lower_int(v): | |
| return v & 0xFF | |
| def read_device_clock(modbus_client, unit_id): | |
| response = modbus_client.read_holding_registers(address=0x9013, count=0x3, slave=unit_id) | |
| if response.isError(): | |
| _logger.warning("Failed to read date from charge controller: unit_id={} {!r}".format(unit_id, response)) | |
| return None | |
| min_and_sec, day_and_hour, year_and_month = response.registers | |
| year = upper_int(year_and_month) + 2000 | |
| month = lower_int(year_and_month) | |
| day = upper_int(day_and_hour) | |
| hour = lower_int(day_and_hour) | |
| minute = upper_int(min_and_sec) | |
| sec = lower_int(min_and_sec) | |
| # Sanity check - require manually setting the clock in this case | |
| if year < 2010 or year > 2030: | |
| _logger.warning("Received faulty date from charge controller: unit_id={} {!r}".format(unit_id, (year, month, day, hour, minute, sec))) | |
| return None | |
| ddt = datetime(year, month, day, hour=hour, minute=minute, second=sec, microsecond=0, tzinfo=PST) | |
| return ddt | |
| def sync_clock(modbus_client, unit_id): | |
| metric_fields = {} | |
| device_time0 = read_device_clock(modbus_client, unit_id) | |
| device_time1 = read_device_clock(modbus_client, unit_id) | |
| # Don't record metrics or adjust controller date/time if we can't reliably tell | |
| # what time it has already set | |
| if device_time0 is None or device_time1 is None: | |
| return metric_fields | |
| time_delta_secs = (device_time0 - device_time1).total_seconds() | |
| if time_delta_secs < -10 or time_delta_secs > 10: | |
| metric_fields['devTimeDelta'] = int(time_delta_secs) | |
| return metric_fields | |
| dt = datetime.now(timezone.utc).astimezone(PST) | |
| time_drift_secs = (device_time1 - dt).total_seconds() | |
| metric_fields['devTime'] = int(device_time1.timestamp()) | |
| metric_fields['currTime'] = int(dt.timestamp()) | |
| metric_fields['timeDrift'] = int(time_drift_secs) | |
| # Only update controller date/time if it is more than an hour off or we're in the first five minutes of the day | |
| should_update = abs(time_drift_secs) > 3600 or (abs(time_drift_secs) > 5 and dt.hour == 0 and dt.minute <= 5) | |
| if not should_update: | |
| return metric_fields | |
| min_and_sec = pack_as_int(dt.minute, dt.second) | |
| day_and_hour = pack_as_int(dt.day, dt.hour) | |
| year_and_month = pack_as_int(dt.year - 2000, dt.month) | |
| try: | |
| response = modbus_client.write_registers(address=0x9013, values=[min_and_sec, day_and_hour, year_and_month], slave=unit_id) | |
| _logger.warning("Successfully set charge controller date/time. unit_id={} {!r}".format(unit_id, response)) | |
| except ModbusException as exc: | |
| _logger.warning("Failed to set charge controller date/time: unit_id={} {!r}".format(unit_id, exc)) | |
| return metric_fields |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment