-
-
Save enkiusz/6408645efd622b8a638a14957cd37f47 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python3 | |
| """ | |
| A simple CLI tool to control and monitor the ZKETECH EBC-A20 battery | |
| tester from https://www.zketech.com/en/ | |
| It can trigger charge or discharge as well as perform charge-discharge | |
| cycles to measure battery capacity. | |
| Currenly only the CC (Constant Current) load type is implemented, the | |
| CP (Constant Power) load type is a simple fixme. Additionally, some | |
| small improvement opportunities are documented in the code as FIXME | |
| comments. | |
| Miscellaneous features include monitoring packets being sent out by the | |
| tester without triggering any actions and decoding of arbitrary status | |
| packets. | |
| """ | |
| #pylint: disable=too-many-lines | |
| import argparse | |
| import csv | |
| import json | |
| import logging | |
| import operator | |
| import re | |
| import time | |
| from binascii import hexlify, unhexlify | |
| from collections import namedtuple | |
| from contextlib import nullcontext, AbstractContextManager | |
| from enum import IntEnum | |
| from functools import reduce | |
| from pathlib import Path | |
| from statistics import mean | |
| from struct import pack, unpack | |
| from typing import Tuple | |
| import pint | |
| import serial | |
| import structlog | |
| # Reference: https://stackoverflow.com/a/49724281 | |
| LOG_LEVEL_NAMES = [logging.getLevelName(v) for v in | |
| sorted(getattr(logging, '_levelToName', None) or | |
| logging._levelNames) if getattr(v, "real", 0)] | |
| log = structlog.get_logger() | |
| ureg = pint.UnitRegistry() | |
| class ZKETechEBCModels(IntEnum): | |
| """ | |
| ZKETECH EBC model codes. | |
| """ | |
| EBC_A05 = 0x05 | |
| EBC_A10H = 0x06 | |
| EBC_A20 = 0x09 | |
| # Packet start & end markers | |
| SOF = 0xfa | |
| EOF = 0xf8 | |
| class CommandCode(IntEnum): | |
| """ | |
| Command codes for building packets sent from the PC | |
| to the battery tester. | |
| """ | |
| CONNECT = 0x05 | |
| DISCONNECT = 0x06 | |
| STOP = 0x02 | |
| CHRG_CCCV_START = 0x21 | |
| # Send the number of minutes of charging time elapsed to the charger ??? | |
| # Sent every 60 seconds after CHRG_CCCV is sent | |
| CHRG_TIME = 0x0a | |
| DISCH_CC_START = 0x01 | |
| DISCH_CC_ADJUST = 0x07 | |
| DISCH_CC_STOP = 0x08 | |
| class StatusCode(IntEnum): | |
| """ | |
| Status codes for decoding packets sent from the | |
| battery tester to the PC. | |
| """ | |
| # ??? | |
| # Sent when charger is idle | |
| IDLE = 0x02 | |
| IDLE_MONITOR = 0x66 | |
| CHRG_CCCV = 0x0c | |
| CHRG_CCCV_MONITOR = 0x70 | |
| CHRG_CCCV_END = 0x16 | |
| # Why two status codes during CC discharge | |
| DISC_CC_IDLE = 0x00 | |
| DISC_CC = 0x0a | |
| DISC_CC_END = 0x14 | |
| DISC_CC_MONITOR = 0x64 | |
| UNK1 = 0x6e | |
| # This encoding is used to prevent bytes > 240 to prevent in the byte stream. | |
| # This feature allows the bytestream to safely use 0xfa and 0xf8 as SOF (Start Of Frame) | |
| # and EOF (End Of Frame) | |
| def encode_base240(value: int) -> Tuple[int, int]: | |
| """ | |
| Encode a value into base240 encoded 2 byte sequence. | |
| """ | |
| assert value < 0xf0 * 0xf0 + 0xf0 | |
| h = value // 0xf0 | |
| l = value - h * 0xf0 | |
| return (h,l) | |
| def decode_base240(h: int, l: int) -> int: | |
| """ | |
| Decode a base240 encoded 2 byte sequence into a single integer. | |
| """ | |
| return 0xf0 * h + l | |
| #pylint: disable=line-too-long | |
| # Reference: https://github.com/dev-strom/esp-ebc-mqtt/blob/main/lib/commands/EbcA20.cpp | |
| # bool EbcA20::Decode(uint16_t source, double& value, ParameterPacking pp) const | |
| # { | |
| # switch (pp) // PP_None, PP_V, PP_V_set, PP_A, PP_A_set, PP_P, PP_T, PP_Ah | |
| # { | |
| # case PP_V: | |
| # // TODO: is PP_V the same way decoded as PP_Ah? (don't know, because I never had a voltage above 10V) | |
| # value = (source == 0x0000) ? 0.0 : ((static_cast<double> (((source >> 8) * 240) + (source & 0xFF))) / 1000.0); | |
| # break; | |
| # case PP_Ah: | |
| # if (source & 0x8000) { | |
| # // capacity >= 10.0 Ah | |
| # if ((source & 0xE000) == 0xE000) { | |
| # // capacity >= 200.0 Ah | |
| # value = ((static_cast<double> ((((source >> 8) & 0x3F) * 240) + (source & 0xFF) - 0x1C00)) / 10.0); | |
| # } else { | |
| # // capacity < 200.0 Ah | |
| # value = ((static_cast<double> ((((source >> 8) & 0x7F) * 240) + (source & 0xFF) - 0x0800)) / 100.0); | |
| # } | |
| # } else { | |
| # // capacity < 10.0 Ah | |
| # value = (source == 0x0000) ? 0.0 : ((static_cast<double> (((source >> 8) * 240) + (source & 0xFF))) / 1000.0); | |
| # } | |
| #pylint: enable=line-too-long | |
| def decode_ranged(h: int, l: int) -> float: | |
| """ | |
| Decode a base240 value to a float taking into account different offsets and scaling | |
| factors for multiple ranges. | |
| """ | |
| if h & 0x80 == 0x80: | |
| if h & 0xe0 == 0xe0: | |
| # range is > 200.0 | |
| v = 240 * (h & 0x3f) + l | |
| m = 0.1 | |
| offset = 0x1c00 | |
| else: | |
| # 10.00 < range < 200.0 | |
| v = 240 * (h & 0x7f) + l | |
| m = 0.01 | |
| offset = 0x800 | |
| else: | |
| # range is < 10.00 | |
| v = 240 * h + l | |
| m = 0.001 | |
| offset = 0 | |
| return (v - offset) * m | |
| Field = namedtuple('Field', 'key value') | |
| #pylint: disable=unnecessary-lambda | |
| pkt_field_defs = { | |
| # Example packet: | |
| # fa 02 0000 0a13 0014 0000 0032 010a 000a 09 35 f8 | |
| (ZKETechEBCModels.EBC_A20, StatusCode.IDLE): [ | |
| # Sent when device is idle ('OFF' on display) | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Setpoint | |
| Field(key='chrg_cc_current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='chrg_cv_voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
| Field(key='chrg_cutoff_current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere) | |
| ], | |
| # Example packet: | |
| # fa 66 0000 0888 0014 0000 013e 0c8f 0905 09 4b f8 | |
| (0x09, StatusCode.IDLE_MONITOR): [ | |
| # Sent when device is idle ('OFF' on display) | |
| # Contains FW and/or hardware information | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Device | |
| # Displayed in the ZKETECH application UI as 'V3.02' | |
| Field(key='fw_ver', | |
| value=lambda h,l: str(decode_base240(h,l) / 100)), | |
| Field(key='unk2', | |
| value=lambda h,l: decode_base240(h,l)), # Always 3023 | |
| Field(key='unk3', | |
| value=lambda h,l: decode_base240(h,l)) # Always 2165 | |
| ], | |
| # Example packets:V3 | |
| # fa 0c 0032 07de 0000 0000 0032 01b4 000a 09 63 f8 | |
| # fa 0c 0214 1059 299e 0000 0214 01b4 000a 09 44 f8 | |
| # Charge = 9998 mAh | |
| # fa 0c 0214 1059 8ca8 0000 0214 01b4 000a 09 d7 f8 | |
| # Charge > 10000 mAh = 10 Ah display will be 10.01 Ah | |
| # fa 0c 0214 1078 9124 0000 0214 01b4 000a 09 67 f8 | |
| # charge = 20.69 Ah | |
| # fa 0c 000b 1178 c478 0000 0214 01b4 000a 09 72 f8 | |
| # charge = 143.9 Ah | |
| (ZKETechEBCModels.EBC_A20, StatusCode.CHRG_CCCV): [ | |
| # Mode: CHRG-CCCV | |
| # Sent in both CC and CV phases of charging | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Setpoint | |
| Field(key='chrg_cccv_current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='chrg_cccv_voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
| Field(key='chrg_cutoff_current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere) | |
| ], | |
| # Example packet: | |
| # fa 70 0000 0453 0000 0000 013e 0c8f 0905 09 9e f8 | |
| (ZKETechEBCModels.EBC_A20, StatusCode.CHRG_CCCV_MONITOR): [ | |
| # Mode: CHRG-CCCV | |
| # Sent during the first few seconds of charging | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Device | |
| # Displayed in the ZKETECH application UI as 'V3.02' | |
| Field(key='fw_ver', value=lambda h,l: str(decode_base240(h,l) / 100)), | |
| Field(key='unk2', value=lambda h,l: decode_base240(h,l)), # Always 3023 | |
| Field(key='unk3', value=lambda h,l: decode_base240(h,l)) # Always 2165 | |
| ], | |
| # Example packet: | |
| # fa 16 000a 0a64 0014 0000 0032 010a 000a 09 5c f8 | |
| (ZKETechEBCModels.EBC_A20, StatusCode.CHRG_CCCV_END): [ | |
| # Mode: CHRG-CCCV | |
| # Sent when charging stops due to current cutoff reached | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Setpoint | |
| Field(key='cccv_current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='cccv_voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
| Field(key='cutoff_current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere) | |
| ], | |
| # Example packet: | |
| # fa 00 0000 1049 0000 0000 0032 013c 0078 09 27 f8 | |
| (ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC_IDLE): [ | |
| # Mode: DISC-CC | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Setpoint | |
| Field(key='disc_cc_current', value= | |
| lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='disc_cc_cutoff_voltage', value= | |
| lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
| Field(key='disc_max_time', value= | |
| lambda h,l: decode_base240(h,l) * ureg.minute) | |
| ], | |
| # Example packets: | |
| # fa 0a 0214 0f22 bbcf 0000 0214 013c 0000 09 67 f8 | |
| # here the charge is > 100 Ah | |
| # charge == 123.2Ah | |
| # fa 0a 0214 0e80 dbd0 0000 0214 013c 0000 09 bb f8 | |
| # charge = 200.0 (decoded) | |
| # fa 0a 0214 0e80 e630 0000 0214 013c 0000 09 66 f8 | |
| # charge = 200.0Ah | |
| # fa 00 0000 0dbf e777 0000 0214 013c 0000 09 00 f8 | |
| # charge = 231.1Ah | |
| # fa 0a 0214 0e4f e68d 0000 0214 013c 0000 09 14 f8 | |
| # charge = 209.3Ah | |
| # BUG!!! | |
| # 2025-01-22 18:46:37 [debug ] decode status pkt | |
| # pkt=b'fa0a02140f01cf8800000214013c00000977f8' | |
| # {'.calculated_crc': 119, '.crc_check': True, 'ptype': <StatusCode.DISC_CC: 10>, | |
| # 'dev_model': <ZKETechEBCModels.EBC_A20: 9>, | |
| # 'current': <Quantity(5.0, 'ampere')>, 'voltage': <Quantity(3.601, 'volt')>, | |
| # 'charge': <Quantity(-343.2, 'ampere_hour')>, | |
| # 'energy': <Quantity(0.0, 'watt_hour')>, 'disc_cc_current': <Quantity(5.0, 'ampere')>, | |
| # 'disc_cc_cutoff_voltage': <Quantity(3.0, 'volt')>, 'disc_max_time': <Quantity(0, 'minute')>} | |
| # charge is 170.5Ah on display | |
| (ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC): [ | |
| # Mode: DISC-CC | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Setpoint | |
| Field(key='disc_cc_current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='disc_cc_cutoff_voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
| Field(key='disc_max_time', | |
| value=lambda h,l: decode_base240(h,l) * ureg.minute) | |
| ], | |
| (ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC_MONITOR): [ | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Device | |
| # Displayed in the ZKETECH application UI as 'V3.02' | |
| Field(key='fw_ver', | |
| value=lambda h,l: str(decode_base240(h,l) / 100)), | |
| Field(key='unk2', | |
| value=lambda h,l: decode_base240(h,l)), # Always 3023 | |
| Field(key='unk3', | |
| value=lambda h,l: decode_base240(h,l)) # Always 2165 | |
| ], | |
| # Example packet: | |
| # fa 0a 0032 0f41 0002 0000 0032 013c 003c 09 4e f8 | |
| (ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC): [ # Mode: D-CC | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Setpoint | |
| Field(key='disc_cc_current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='disc_cc_cutoff_voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
| Field(key='disc_max_time', | |
| value=lambda h,l: decode_base240(h,l) * ureg.minute) | |
| ], | |
| # Example packet: | |
| # fa 14 0032 0c77 0159 0000 0032 013c 0078 09 7b f8 | |
| (ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC_END): [ # Mode: D-CC | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Setpoint | |
| Field(key='disc_cc_current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='disc_cc_cutoff_voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
| Field(key='disc_max_time', | |
| value=lambda h,l: decode_base240(h,l) * ureg.minute) | |
| ], | |
| # Example packet: | |
| # fa 6e 0032 0f41 0002 0000 013e 0c8f 0905 09 a9 f8 | |
| (ZKETechEBCModels.EBC_A20, StatusCode.UNK1): [ | |
| # Current data | |
| Field(key='current', | |
| value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
| Field(key='voltage', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
| Field(key='charge', | |
| value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
| # Does not seem to work, always 0 | |
| Field(key='energy', | |
| value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
| # Device | |
| # Displayed in the ZKETECH application UI as 'V3.02' | |
| Field(key='fw_ver', value=lambda h,l: str(decode_base240(h,l) /100)), | |
| Field(key='unk2', value=lambda h,l: decode_base240(h,l)), | |
| Field(key='unk3', value=lambda h,l: decode_base240(h,l)) | |
| ], | |
| } | |
| #pylint: enable=unnecessary-lambda | |
| def decode_status_pkt(pkt: bytes) -> dict: | |
| """ | |
| Decode a status packet received from the battery tester into | |
| a dictionary of fields. | |
| Some of the fields are metadata fields which contain information | |
| about the decoding process itself. For example, the '.crc_check' | |
| key contains a boolean indicating whether the CRC is valid for the | |
| status packet. | |
| """ | |
| log.debug('decode status pkt', pkt=hexlify(pkt)) | |
| if len(pkt) < 4: | |
| log.error('packet too short', pkt=hexlify(pkt)) | |
| return None | |
| pktb = bytearray(pkt) | |
| pkt_fields = {} | |
| marker1 = pktb.pop(0) | |
| marker2 = pktb.pop() | |
| crc = pktb.pop() | |
| if marker1 != SOF or marker2 != EOF: | |
| log.error('unknown packet format', pkt=hexlify(pkt)) | |
| return None | |
| calculated_crc = reduce(operator.xor, pktb) | |
| pkt_fields['.calculated_crc'] = calculated_crc | |
| # CRC values which have the highest 4 bits set are transmitted | |
| # with those bits turned off to not insert a SOF or EOF byte | |
| # by chance and confuse the framing decoder | |
| # Reference: https://github.com/dev-strom/esp-ebc-mqtt/blob/main/lib/commands/message.cpp | |
| if (calculated_crc & 0xf0) == 0xf0: | |
| calculated_crc = calculated_crc & 0x0f | |
| if calculated_crc != crc: | |
| log.warn('crc verification failed', pkt=hexlify(pkt)) | |
| pkt_fields['.crc_check'] = False | |
| else: | |
| pkt_fields['.crc_check'] = True | |
| ptype = pktb.pop(0) | |
| dev_model = pktb.pop() | |
| if ptype in StatusCode and dev_model in ZKETechEBCModels: | |
| ptype = StatusCode( ptype ) | |
| dev_model = ZKETechEBCModels( dev_model ) | |
| pkt_fields['ptype'] = ptype | |
| pkt_fields['dev_model'] = dev_model | |
| field_defs = pkt_field_defs.get( (dev_model, ptype) ) | |
| if not field_defs: | |
| log.error('no field definitions', dev_model=dev_model, ptype=ptype, pkt=hexlify(pkt)) | |
| return None | |
| for field_def in field_defs: | |
| (h, l) = unpack('2B', pktb[0:2]) | |
| pkt_fields[field_def.key] = field_def.value(h, l) | |
| pktb = pktb[2:] | |
| return pkt_fields | |
| log.warn('cannot recognize packet', packet_type=ptype, device=dev_model, pkt=hexlify(pkt)) | |
| return None | |
| def wrap_pkt(payload: bytes) -> bytes: | |
| """ | |
| Wrap the control packet payload with a CRC and start/stop frame indicators. | |
| """ | |
| crc = reduce(operator.xor, payload) | |
| return bytes([ SOF ]) + payload + bytes([crc]) + bytes([ EOF ]) | |
| def ctl_packet(command: CommandCode, p1: int = 0, p2: int = 0, p3: int = 0) -> bytes: | |
| """ | |
| Build a control packet payload with a command code and 3 parameters | |
| """ | |
| log.debug('building control packet', command=command, p1=p1, p2=p2, p3=p3) | |
| payload = pack('B 2B 2B 2B', int(command), | |
| *encode_base240(p1), | |
| *encode_base240(p2), | |
| *encode_base240(p3)) | |
| pkt = wrap_pkt(payload) | |
| log.debug('control packet bytes', pkt=hexlify(pkt)) | |
| return pkt | |
| def read_packet(s: serial.Serial) -> bytes: | |
| """ | |
| Read data from the serial port and return a complete packet. | |
| It uses the SOF and EOF tags as delimiters. | |
| """ | |
| pkt = [] | |
| while True: | |
| b = s.read(1)[0] | |
| if b != SOF: | |
| continue | |
| break | |
| pkt.append(SOF) | |
| while True: | |
| b = s.read(1)[0] | |
| pkt.append(b) | |
| if b != EOF: | |
| continue | |
| break | |
| return bytes(pkt) | |
| def packet_rx(s: serial.Serial, terminate_on: list[StatusCode]): | |
| """ | |
| Receive and yield packets until one of the termination status codes | |
| is received. | |
| This is used to stop when a "Charging End" status packet is received. | |
| """ | |
| # fields in pkt_fields are numeric not enum | |
| terminate_on = [ int(t) for t in terminate_on] | |
| while True: | |
| pkt = read_packet(s) | |
| pkt_fields = decode_status_pkt(pkt) | |
| if not pkt_fields: | |
| log.warn('unknown status packet', pkt=hexlify(pkt)) | |
| yield (pkt, pkt_fields) | |
| if pkt_fields is not None and pkt_fields['ptype'] in terminate_on: | |
| break | |
| def curve_plot_filename(args: argparse.ArgumentParser, name: str = None) -> Tuple[Path, AbstractContextManager]: #pylint: disable=line-too-long | |
| """ | |
| Build the filename for the charge/discharge curve CSV plot and a file context | |
| based on argument flags and base CSV filename. | |
| """ | |
| if args.output_prefix and args.save_curves: | |
| csv_filename = args.output_prefix | |
| if name is not None: | |
| csv_filename = csv_filename.with_name(name) | |
| csv_filename = csv_filename.with_suffix('.csv') | |
| csv_file_ctx = csv_filename.open("w") | |
| else: | |
| csv_filename = None | |
| csv_file_ctx = nullcontext(None) | |
| return (csv_filename, csv_file_ctx) | |
| def charge(s: serial.Serial, settings: dict, **kwargs): | |
| """ | |
| Start charging a battery using the provided settings. | |
| Monitors the charging progress and stores the charge curve points | |
| using a specified CSV writer. | |
| Returns a dictionary with summary data about the charging process. | |
| """ | |
| log.info('charge begin', serial=s, settings=settings, kwargs=kwargs) | |
| summary_data = { | |
| 'action': 'charge', | |
| 'algorithm': 'CC-CV', | |
| # pint.Quantity cannot be generically serialized to JSON | |
| 'settings': { k: str(v) for (k,v) in settings.items() }, | |
| 'flags': {}, | |
| } | |
| if kwargs.get('plot_filename'): | |
| summary_data['plot_filename'] = kwargs['plot_filename'] | |
| if kwargs.get('transmit', False): | |
| pkt = ctl_packet(CommandCode.CHRG_CCCV_START, | |
| int(settings['current'].to(ureg.milliampere).m / 10), | |
| # pint defines mA but does not define mV | |
| int( (settings['voltage'] * 1000).m / 10 ), | |
| int( settings['cutoff_current'].to(ureg.milliampere).m / 10) | |
| ) | |
| log.info('transmit control packet', pkt=hexlify(pkt)) | |
| s.write(pkt) | |
| # CHRG_TIME | |
| # p1 -> current time [min] | |
| # p2 -> always 0 | |
| # p3 -> always 0 | |
| #chrg_time = ctl_packet(CommandCode.CHRG_TIME, 50, 420, 2) | |
| #s.write(chrg_time) | |
| summary_data['start_ts'] = time.time() | |
| last_status_update = time.time() | |
| for (pkt, pkt_fields) in packet_rx(s, [ StatusCode.CHRG_CCCV_END ]): | |
| # In case packet was not decoded | |
| if not pkt_fields: | |
| continue | |
| if time.time() - last_status_update > 30: | |
| log.info('charging', voltage=str(pkt_fields['voltage'].to(ureg.volt)), | |
| current=str(pkt_fields['current'].to(ureg.ampere)), | |
| charge=str(pkt_fields['charge'].to(ureg.ampere_hour))) | |
| last_status_update = time.time() | |
| if kwargs.get('csv_writer'): | |
| kwargs['csv_writer'].writerow({ | |
| # TODO: Make precision depend on the reading like on device LCD | |
| # For example 100mAh is "100mAh" but 40Ah is "40.00Ah" and 111Ah is | |
| # "111.0Ah" | |
| 't': time.time(), | |
| 'voltage': round(pkt_fields['voltage'].to(ureg.volt).m, 3), | |
| 'chrg_current': round(pkt_fields['current'].to(ureg.ampere).m, 2), | |
| }) | |
| if pkt_fields['ptype'] == int(StatusCode.CHRG_CCCV_END): | |
| log.info('charging finished') | |
| summary_data['finish_ts'] = time.time() | |
| summary_data['charge'] = str(pkt_fields['charge']) | |
| summary_data['flags']['current_cutoff'] = True | |
| break | |
| time.sleep(1) | |
| log.info('charge end', summary=summary_data) | |
| return summary_data | |
| def cmd_charge(args: argparse.Namespace, s: serial.Serial): | |
| """ | |
| Handle the 'charge' CLI command. | |
| """ | |
| kwargs = { 'transmit': args.transmit } | |
| (csv_filename, csv_file_ctx) = curve_plot_filename(args) | |
| kwargs['plot_filename'] = str(csv_filename) | |
| with csv_file_ctx as csv_file: | |
| if csv_file: | |
| csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'chrg_current']) | |
| csv_writer.writeheader() | |
| kwargs['csv_writer'] = csv_writer | |
| summary_data = charge(s, settings={ | |
| 'current': args.current, | |
| 'voltage': args.voltage, | |
| 'cutoff_current': args.cutoff_current | |
| }, **kwargs) | |
| return summary_data | |
| def discharge(s: serial.Serial, settings: dict, **kwargs): | |
| """ | |
| Start discharging a battery using the provided settings. | |
| Currently supports only the Constant Current (CC) load mode. | |
| Monitors the discharge progress and stores the discharge curve points | |
| using a specified CSV writer. | |
| Returns a dictionary with summary data about the discharge process. | |
| """ | |
| log.info('discharge start', serial=s, settings=settings, kwargs=kwargs) | |
| # TODO: Add support for Constant Power discharge | |
| summary_data = { | |
| 'action': 'discharge', | |
| # pint.Quantity cannot be generically serialized to JSON | |
| 'settings': { k: str(v) for (k,v) in settings.items() }, | |
| 'flags': {}, | |
| } | |
| summary_data['settings']['load'] = 'CC' | |
| if kwargs.get('plot_filename'): | |
| summary_data['plot_filename'] = kwargs['plot_filename'] | |
| if kwargs.get('transmit', False): | |
| # DISC-CC start | |
| # p1 -> current, unit is 10 mA | |
| # p2 -> cutoff voltage, unit is 10 mV | |
| # p3 -> maximum time, unit is min | |
| pkt = ctl_packet(CommandCode.DISCH_CC_START, | |
| int(settings['current'].to(ureg.milliampere).m / 10), | |
| # pint defines mA but does not define mV | |
| int( (settings['cutoff_voltage'] * 1000).m / 10 ), | |
| int( settings['time_limit'].to(ureg.minute).m ) | |
| ) | |
| log.info('transmit control packet', pkt=hexlify(pkt)) | |
| s.write(pkt) | |
| summary_data['start_ts'] = time.time() | |
| last_status_update = time.time() | |
| for (pkt, pkt_fields) in packet_rx(s, [ StatusCode.DISC_CC_END ]): | |
| # In case packet was not decoded | |
| if not pkt_fields: | |
| continue | |
| if time.time() - last_status_update > 30: | |
| log.info('discharging', voltage=str(pkt_fields['voltage'].to(ureg.volt)), | |
| current=str(pkt_fields['current'].to(ureg.ampere)), | |
| charge=str(pkt_fields['charge'].to(ureg.ampere_hour))) | |
| last_status_update = time.time() | |
| if kwargs.get('csv_writer'): | |
| kwargs['csv_writer'].writerow({ | |
| 't': time.time(), | |
| # TODO: Make precision configurable | |
| 'voltage': round(pkt_fields['voltage'].to(ureg.volt).m, 3), | |
| 'disch_current': round(pkt_fields['current'].to(ureg.ampere).m, 2), | |
| }) | |
| # TODO: Handle time limit properly | |
| if pkt_fields and pkt_fields['ptype'] == int(StatusCode.DISC_CC_END): | |
| log.info('discharge finished') | |
| summary_data['finish_ts'] = time.time() | |
| summary_data['charge'] = str(pkt_fields['charge']) | |
| summary_data['flags']['voltage_cutoff'] = True | |
| break | |
| time.sleep(1) | |
| log.info('discharge end', summary=summary_data) | |
| return summary_data | |
| def cmd_discharge(args: argparse.Namespace, s: serial.Serial): | |
| """ | |
| Handle the 'charge' CLI command. | |
| """ | |
| kwargs = { 'transmit' : args.transmit } | |
| (csv_filename, csv_file_ctx) = curve_plot_filename(args) | |
| kwargs['plot_filename'] = str(csv_filename) | |
| with csv_file_ctx as csv_file: | |
| if csv_file: | |
| csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'disch_current']) | |
| csv_writer.writeheader() | |
| kwargs['csv_writer'] = csv_writer | |
| summary_data = discharge(s, settings={ | |
| 'current': args.current, | |
| 'cutoff_voltage': args.cutoff_voltage, | |
| 'time_limit': args.time_limit | |
| }, **kwargs) | |
| return summary_data | |
| def cmd_cycle(args: argparse.Namespace, s: serial.Serial): | |
| """ | |
| Handle the 'cycle' CLI command. | |
| """ | |
| log.info('cycle', serial=s, num_cycles=args.num_cycles) | |
| if not args.transmit: | |
| log.error('cycle requires tx enabled') | |
| return None | |
| main_summary_data = { | |
| 'action': 'cycle', | |
| 'settings': { | |
| 'num_cycles': args.num_cycles | |
| }, | |
| 'cycles': [], | |
| 'result': {} | |
| } | |
| # Store charge measurements for each cycle | |
| charge_measurements = [] | |
| # Pre-charge | |
| kwargs = { 'transmit': args.transmit } | |
| (csv_filename, csv_file_ctx) = curve_plot_filename(args, | |
| name=f'{args.output_prefix.name}-precharge') | |
| kwargs['plot_filename'] = str(csv_filename) | |
| with csv_file_ctx as csv_file: | |
| if csv_file: | |
| csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'chrg_current']) | |
| csv_writer.writeheader() | |
| kwargs['csv_writer'] = csv_writer | |
| precharge_summary_data = charge(s, settings={ | |
| 'current': args.pre_charge_current, | |
| 'voltage': args.pre_charge_voltage, | |
| 'cutoff_current': args.pre_cutoff_current | |
| }, **kwargs) | |
| main_summary_data['cycles'].append(precharge_summary_data) | |
| cycles_left = args.num_cycles | |
| cycle_num = 1 | |
| while cycles_left > 0: | |
| # Test discharge | |
| kwargs = { 'transmit': args.transmit } | |
| (csv_filename, csv_file_ctx) = curve_plot_filename(args, | |
| name=f'{args.output_prefix.name}-cycle-{cycle_num}-discharge') | |
| kwargs['plot_filename'] = str(csv_filename) | |
| with csv_file_ctx as csv_file: | |
| if csv_file: | |
| csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'disch_current']) | |
| csv_writer.writeheader() | |
| kwargs['csv_writer'] = csv_writer | |
| discharge_summary_data = discharge(s, settings={ | |
| 'current': args.discharge_current, | |
| 'cutoff_voltage': args.discharge_cutoff_voltage, | |
| 'time_limit': args.discharge_time_limit | |
| }, **kwargs) | |
| charge_measurements.append(discharge_summary_data['charge']) | |
| main_summary_data['cycles'].append(discharge_summary_data) | |
| # Post-charge | |
| kwargs = { 'transmit': args.transmit } | |
| (csv_filename, csv_file_ctx) = curve_plot_filename(args, | |
| name=f'{args.output_prefix.name}-cycle-{cycle_num}-postcharge') | |
| kwargs['plot_filename'] = str(csv_filename) | |
| with csv_file_ctx as csv_file: | |
| if csv_file: | |
| csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'chrg_current']) | |
| csv_writer.writeheader() | |
| kwargs['csv_writer'] = csv_writer | |
| postcharge_summary_data = charge(s, settings={ | |
| 'current': args.post_charge_current, | |
| 'voltage': args.post_charge_voltage, | |
| 'cutoff_current': args.post_cutoff_current | |
| }, **kwargs) | |
| main_summary_data['cycles'].append(postcharge_summary_data) | |
| cycle_num += 1 | |
| cycles_left -= 1 | |
| mean_capacity = mean([ureg.parse_expression(c) for c in charge_measurements]) | |
| # mean is often returned as a Fraction | |
| mean_capacity = pint.Quantity(float(mean_capacity.m), ureg.ampere_hour) | |
| main_summary_data['results'] = { | |
| # We need to parse as pint expressions again as charge measurements | |
| # are presented as strings in the summary_data | |
| 'mean_capacity': str( round(mean_capacity, 3) ) | |
| } | |
| log.info('cycle finished', mean_capacity=round(mean_capacity, 3)) | |
| return main_summary_data | |
| def cmd_monitor(args: argparse.Namespace, s: serial.Serial): | |
| """ | |
| Handle the 'monitor' CLI command. | |
| """ | |
| log.info('monitor', serial=s) | |
| if args.output_prefix and args.save_curves: | |
| csv_file_ctx = args.output_prefix.with_suffix('.csv').open("w") | |
| else: | |
| csv_file_ctx = nullcontext(None) | |
| with csv_file_ctx as csv_file: | |
| if csv_file: | |
| csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'current']) | |
| csv_writer.writeheader() | |
| while True: | |
| pkt = read_packet(s) | |
| pkt_fields = decode_status_pkt(pkt) | |
| print(pkt_fields) | |
| if not pkt_fields: | |
| log.warn('unknown status packet', pkt=hexlify(pkt)) | |
| else: | |
| if csv_file: | |
| csv_writer.writerow({ | |
| 't': time.time(), | |
| # TODO: Make precision configurable | |
| 'voltage': round(pkt_fields['voltage'].to(ureg.volt).m, 3), | |
| 'current': round(pkt_fields['current'].to(ureg.ampere).m, 2), | |
| }) | |
| time.sleep(1) | |
| def cmd_decode(args: argparse.Namespace, | |
| s: serial.Serial # pylint: disable=unused-argument | |
| ): | |
| """ | |
| Handle the 'decode' CLI command. | |
| """ | |
| log.info('decode') | |
| for pkt in args.packets: | |
| pkt = re.sub(r'\s', '', pkt) | |
| pkt = unhexlify(pkt) | |
| pkt_fields = decode_status_pkt(pkt) | |
| print(pkt_fields) | |
| def pint_bind_unit(unit: pint.Unit) -> argparse.Action: | |
| """ | |
| Handle a CLI argument parsing it as a pint expression and | |
| converting to the expected unit (for example miliamperes will be | |
| converted to amperes). | |
| """ | |
| class PintArgument(argparse.Action): | |
| """ | |
| argparse action using pint. | |
| """ | |
| def __call__(self, parser, args, value, option_string=None): | |
| try: | |
| setattr( args, self.dest, ureg.parse_expression(value).to(unit) ) | |
| except pint.errors.PintError as e: | |
| log.error('cannot parse expression', | |
| dest=self.dest, expected_unit=unit, value=value, msg=str(e)) | |
| return PintArgument | |
| def output_data_path(value: str) -> Path: | |
| """ | |
| Check if a path can be used as a prefix for the output summary | |
| data. | |
| """ | |
| p = Path(value) | |
| if not p.parent.exists(): | |
| raise argparse.ArgumentTypeError(f'path {p.parent} needs to exist') | |
| return p | |
| def add_charge_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
| """ | |
| Add arguments for the 'charge' CLI command. | |
| """ | |
| parser.add_argument('--current', | |
| metavar='A', required=True, | |
| action=pint_bind_unit(ureg.ampere), | |
| help='Charging current') | |
| parser.add_argument('--voltage', | |
| metavar='V', required=True, | |
| action=pint_bind_unit(ureg.volt), | |
| help='Charging voltage') | |
| parser.add_argument('--cutoff-current', metavar='A', | |
| action=pint_bind_unit(ureg.ampere), | |
| # This is the lowest cutoff current for EBC-A20 | |
| default=ureg.parse_expression("100 mA"), | |
| help='Charge termination current') | |
| parser.set_defaults(cmd=cmd_charge) | |
| parser.set_defaults(need_serial=True) | |
| return parser | |
| def add_discharge_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
| """ | |
| Add arguments for the 'discharge' CLI command. | |
| """ | |
| parser.add_argument('--current', metavar='A', required=True, | |
| action=pint_bind_unit(ureg.ampere), | |
| help='Constant Current discharge current') | |
| # Constant Power discharge not yet implemented | |
| #discharge_parser.add_argument('--power', | |
| # action=pint_bind_unit(ureg.watt), | |
| # help='Constant Power discharge power') | |
| parser.add_argument('--cutoff-voltage', metavar='V', required=True, | |
| action=pint_bind_unit(ureg.volt), | |
| help='Discharge cutoff voltage') | |
| parser.add_argument('--time-limit', metavar='T', | |
| action=pint_bind_unit(ureg.minute), | |
| default=0 * ureg.second, | |
| help='Time limit') | |
| parser.set_defaults(cmd=cmd_discharge) | |
| parser.set_defaults(need_serial=True) | |
| return parser | |
| def add_cycle_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
| """ | |
| Add arguments for the 'cycle' CLI command. | |
| """ | |
| parser.add_argument('--num-cycles', | |
| type=int, default=1, help='Number of measurement test cycles') | |
| prepare_charge_group = parser.add_argument_group('Prepare Charge') | |
| prepare_charge_group.add_argument('--pre-charge-current', | |
| metavar='A', required=True, | |
| action=pint_bind_unit(ureg.ampere), | |
| help='Charging current') | |
| prepare_charge_group.add_argument('--pre-charge-voltage', | |
| metavar='V', required=True, | |
| action=pint_bind_unit(ureg.volt), | |
| help='Charging voltage') | |
| prepare_charge_group.add_argument('--pre-cutoff-current', | |
| metavar='A', | |
| action=pint_bind_unit(ureg.ampere), | |
| default=ureg.parse_expression('100 mA'), | |
| help='Charge termination current') | |
| test_discharge_group = parser.add_argument_group('Test discharge') | |
| test_discharge_group.add_argument('--discharge-current', | |
| metavar='A', required=True, | |
| action=pint_bind_unit(ureg.ampere), | |
| help='Discharging current') | |
| test_discharge_group.add_argument('--discharge-cutoff-voltage', | |
| metavar='V', required=True, | |
| action=pint_bind_unit(ureg.volt), | |
| help='Discharge cutoff voltage') | |
| test_discharge_group.add_argument('--discharge-time-limit', | |
| metavar='T', | |
| action=pint_bind_unit(ureg.minute), | |
| default=0 * ureg.second, | |
| help='Discharge time limit') | |
| post_charge_group = parser.add_argument_group('Post charge') | |
| post_charge_group.add_argument('--post-charge-current', | |
| metavar='A', required=True, | |
| action=pint_bind_unit(ureg.ampere), | |
| help='Charging current') | |
| post_charge_group.add_argument('--post-charge-voltage', | |
| metavar='V', required=True, | |
| action=pint_bind_unit(ureg.volt), | |
| help='Charging voltage') | |
| post_charge_group.add_argument('--post-cutoff-current', | |
| metavar='T', required=True, | |
| action=pint_bind_unit(ureg.ampere), | |
| help='Charging termination current') | |
| parser.set_defaults(cmd=cmd_cycle) | |
| parser.set_defaults(need_serial=True) | |
| return parser | |
| def add_monitor_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
| """ | |
| Add arguments for the 'monitor' CLI command. | |
| """ | |
| parser.set_defaults(cmd=cmd_monitor) | |
| parser.set_defaults(need_serial=True) | |
| return parser | |
| def add_decode_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
| """ | |
| Add arguments for the 'decode' CLI command. | |
| """ | |
| parser.add_argument('packets', metavar='PKT', nargs='+', | |
| help='Hexlified packets to decode') | |
| parser.set_defaults(cmd=cmd_decode) | |
| parser.set_defaults(need_serial=False) | |
| parser.set_defaults(transmit=False) | |
| return parser | |
| def main(): | |
| """ | |
| Main CLI function. | |
| """ | |
| parser = argparse.ArgumentParser('ebc-a20', formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| description='CLI for the ZKETECH EBC-A20 Charger/Dummy Load') | |
| parser.add_argument('--port', type=str, default='/dev/ttyUSB0', | |
| help='Serial port to use') | |
| parser.add_argument('--loglevel', choices=LOG_LEVEL_NAMES, default='INFO', | |
| help='Change log level') | |
| parser.add_argument('--transmit', action=argparse.BooleanOptionalAction, default=True, | |
| help='Transmit commands to the charger') | |
| parser.add_argument('--output-prefix', metavar='PATH', default=None, | |
| type=output_data_path, help='Output path prefix for charge/discharge data') | |
| parser.add_argument('--save-curves', action=argparse.BooleanOptionalAction, default=True, | |
| help='Save charge/discharge curves as .csv') | |
| parser.set_defaults(need_serial=False) | |
| subparsers = parser.add_subparsers(help='sub-commands help') | |
| charge_parser = subparsers.add_parser('charge', help='Charge a cell', formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| epilog="All values need to specify units, for example '0.5A', '1500mA' or '4.2V'") | |
| add_charge_args(charge_parser) | |
| discharge_parser = subparsers.add_parser('discharge', help='Discharge a cell', formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| epilog="All values need to specify units, for example '0.5A', '1500mA' or '4.2V'") | |
| add_discharge_args(discharge_parser) | |
| cycle_parser = subparsers.add_parser('cycle', formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| help='Charge-Discharge-Charge capacity measurement cycle', | |
| epilog="All values need to specify units, for example '0.5A', '1500mA' or '4.2V'") | |
| add_cycle_args(cycle_parser) | |
| monitor_parser = subparsers.add_parser('monitor', | |
| help='Monitor and log the charger state') | |
| add_monitor_args(monitor_parser) | |
| decode_parser = subparsers.add_parser('decode', help='Test packet decoder') | |
| add_decode_args(decode_parser) | |
| args = parser.parse_args() | |
| structlog.configure( | |
| wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, args.loglevel)) | |
| ) | |
| log.debug('arguments', args=args) | |
| try: | |
| if args.need_serial: | |
| s = serial.Serial(port=args.port, | |
| baudrate=9600, | |
| bytesize=serial.EIGHTBITS, | |
| parity=serial.PARITY_ODD) | |
| else: | |
| s = None | |
| if args.transmit and s is not None: | |
| pkt = ctl_packet(CommandCode.CONNECT) | |
| log.info('transmit control packet', pkt=hexlify(pkt)) | |
| s.write(pkt) | |
| if hasattr(args, 'cmd'): | |
| summary_data = args.cmd(args, s=s if args.need_serial else None) | |
| if args.output_prefix: | |
| args.output_prefix.with_suffix('.json').write_text(json.dumps(summary_data)) | |
| else: | |
| log.fatal('command needs to be specified') | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| if args.transmit: | |
| pkt = ctl_packet(CommandCode.STOP) | |
| log.info('transmit control packet', pkt=hexlify(pkt)) | |
| s.write(pkt) | |
| pkt = ctl_packet(CommandCode.DISCONNECT) | |
| log.info('transmit control packet', pkt=hexlify(pkt)) | |
| s.write(pkt) | |
| if __name__ == "__main__": | |
| main() |
Update:
./zketech-ebc-a20.py cycle --pre-charge-current 5A
has to be used. Perhaps the --help option could be a more clear about this.
Thanks a lot for the script!
Update:
./zketech-ebc-a20.py cycle --pre-charge-current 5Ahas to be used. Perhaps the--helpoption could be a more clear about this.Thanks a lot for the script!
The --help option does indicate this, please note:
╰─❯ venv/bin/python3 zketech-ebc-a20.py cycle --help
usage: ebc-a20 cycle [-h] [--num-cycles NUM_CYCLES] --pre-charge-current A --pre-charge-voltage V [--pre-cutoff-current A] --discharge-current A
--discharge-cutoff-voltage V [--discharge-time-limit T] --post-charge-current A --post-charge-voltage V [--post-cutoff-current T]
options:
-h, --help show this help message and exit
--num-cycles NUM_CYCLES
Number of measurement test cycles
The --pre-chage-current and --pre-charge-voltage are not in square brackets which is the convention to indicate that those options are required.
Thanks for the fast reaction!
But there is a missunderstanding. I noticed that this options are required. I was talking about the fact, that the unit is required: --pre-charge-current 5A instead of --pre-charge-current 5. (At least for me it was not obviously, that I have to use a unit.)
Thanks for the fast reaction!
But there is a missunderstanding. I noticed that this options are required. I was talking about the fact, that the unit is required:
--pre-charge-current 5Ainstead of--pre-charge-current 5. (At least for me it was not obviously, that I have to use a unit.)
Right! I have added this information to the help text.
Thanks!
Another hint:
The options --cutoff-current in charge-mode and --post-cutoff-current in cycle-mode are marked as optional in --help:
$ ./zketech-ebc-a20.py charge --help
usage: ebc-a20 charge [-h] --current A --voltage V [--cutoff-current A]
[...]
and
$ ./zketech-ebc-a20.py cycle --help
usage: ebc-a20 cycle [-h] [--num-cycles NUM_CYCLES] --pre-charge-current A --pre-charge-voltage V
[--pre-cutoff-current A] --discharge-current A --discharge-cutoff-voltage V
[--discharge-time-limit T] --post-charge-current A --post-charge-voltage V
[--post-cutoff-current T]
[...]
But they are not optional. zketech-ebc_a20.py will fail, if they are not provided:
date 06:21:43 [info ] charge begin kwargs={'transmit': True, 'plot_filename': 'cell01-\
cycle-1-postcharge.csv', 'csv_writer': <csv.DictWriter object at 0xffffffffffff>} serial=Serial<id=0xfffffff\
fffff, open=True>(port='/dev/ttyUSB0', baudrate=9600, bytesize=8, parity='O', stopbits=1, timeout=None, xonx\
off=False, rtscts=False, dsrdtr=False) settings={'current': <Quantity(5, 'ampere')>, 'voltage': <Quantity(3.\
6, 'volt')>, 'cutoff_current': None}
date 06:21:43 [info ] transmit control packet pkt=b'fa0200000000000002f8'
date 06:21:43 [info ] transmit control packet pkt=b'fa0600000000000006f8'
Traceback (most recent call last):
File "/home/user/./zketech-ebc-a20.py", line 1249, in <module>
main()
~~~~^^
File "/home/user/./zketech-ebc-a20.py", line 1228, in main
summary_data = args.cmd(args, s=s if args.need_serial else None)
File "/home/user/./zketech-ebc-a20.py", line 938, in cmd_cycle
postcharge_summary_data = charge(s, settings={
'current': args.post_charge_current,
'voltage': args.post_charge_voltage,
'cutoff_current': args.post_cutoff_current
}, **kwargs)
File "/home/user/./zketech-ebc-a20.py", line 680, in charge
int( settings['cutoff_current'].to(ureg.milliampere).m / 10)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'to'
So they should be shown as mandatory in --help and zketech-ebc-a20.py should not start, if they are not provided IMHO.
I have fixed the --post-cutoff-current for the cycle command, it is now marked as required. But for the --cutoff-current a default value of 100mA is specified so this parameter is not required, see below:
$ zketech-ebc-a20.py --loglevel DEBUG charge --current 1A --voltage 2V
2026-03-10 23:01:50 [debug ] arguments args=Namespace(port='/dev/ttyUSB0', loglevel='DEBUG', transmit=True, output_prefix=None, save_curves=True, need_serial=True, current=<Quantity(1, 'ampere')>, voltage=<Quantity(2, 'volt')>, cutoff_current=<Quantity(100, 'milliampere')>, cmd=<function cmd_charge at 0x7f2adf323880>)
2026-03-10 23:01:50 [debug ] building control packet command=<CommandCode.STOP: 2> p1=0 p2=0 p3=0
2026-03-10 23:01:50 [debug ] control packet bytes pkt=b'fa0200000000000002f8'
Here it's not running. There is an AttributeError: