Last active
December 1, 2025 08:41
-
-
Save yipo/cbab9d260736be30a3d06019a0534c2e to your computer and use it in GitHub Desktop.
Write Modbus holding registers (using a custom function code).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| *.pyc | |
| /.venv/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import logging | |
| from operator import itemgetter | |
| from pymodbus.client.sync import ModbusSerialClient, ModbusTcpClient | |
| from pymodbus.register_write_message import WriteMultipleRegistersRequest, WriteMultipleRegistersResponse, \ | |
| WriteSingleRegisterRequest, WriteSingleRegisterResponse | |
| logger = logging.getLogger(__name__) | |
| def main(): | |
| args = parse_args() | |
| client = create_client(args) | |
| try: | |
| if not client.connect(): | |
| raise ConnectionError('cannot connect to "{}".'.format(args.rtu or args.tcp)) | |
| response = write(client, args.slave_id, args.func, args.addr, args.data, | |
| 'single' if args.pdu_single else 'multiple') | |
| logger.info('response: %s', response) | |
| finally: | |
| client.close() | |
| def parse_args(): | |
| args = argparse.ArgumentParser() | |
| conn = args.add_mutually_exclusive_group(required=True) | |
| conn.add_argument( | |
| '--rtu', | |
| metavar='DEVICE', | |
| ) | |
| conn.add_argument( | |
| '--tcp', | |
| metavar='ADDRESS', | |
| ) | |
| config = (('BAUDRATE', 9600), ('BYTESIZE', 8), ('PARITY', 'N'), ('STOPBITS', 1)) | |
| args.add_argument( | |
| '--config', | |
| nargs=len(config), | |
| default=tuple(map(itemgetter(1), config)), | |
| metavar=tuple(map(itemgetter(0), config)), | |
| ) | |
| args.add_argument( | |
| '--port', | |
| default=502, | |
| type=int, | |
| ) | |
| args.add_argument( | |
| '--slave-id', | |
| type=int, | |
| required=True, | |
| ) | |
| args.add_argument( | |
| '--func', | |
| type=any_base_int, | |
| ) | |
| args.add_argument( | |
| '--addr', | |
| type=any_base_int, | |
| required=True, | |
| ) | |
| args.add_argument( | |
| '--data', | |
| nargs='+', | |
| type=any_base_int, | |
| required=True, | |
| ) | |
| args.add_argument( | |
| '--pdu-single', | |
| action='store_true', | |
| ) | |
| return args.parse_args() | |
| def any_base_int(value): | |
| return int(value, 0) | |
| def create_client(args): | |
| if args.rtu: | |
| baudrate, bytesize, parity, stopbits = args.config | |
| return ModbusSerialClient( | |
| 'rtu', | |
| port=args.rtu, | |
| baudrate=baudrate, | |
| bytesize=bytesize, | |
| parity=parity, | |
| stopbits=stopbits, | |
| ) | |
| if args.tcp: | |
| return ModbusTcpClient(args.tcp, args.port) | |
| raise ValueError('one of the connection types must be selected.') | |
| def write(client, slave_id, func, addr, data, pdu): | |
| if func is None: | |
| func = 0x10 if len(data) > 1 else 0x06 | |
| if func == 0x10: | |
| return client.write_registers(addr, data, unit=slave_id) | |
| if func == 0x06: | |
| return client.write_register(addr, data[0], unit=slave_id) | |
| params = dict(unit=slave_id, function_code=func, address=addr) | |
| if pdu != 'single': | |
| request = Request0(**params, values=data) | |
| Response0.function_code = func | |
| client.register(Response0) | |
| else: | |
| request = Request1(**params, value=data[0]) | |
| Response1.function_code = func | |
| client.register(Response1) | |
| return client.execute(request) | |
| class Request0(WriteMultipleRegistersRequest): | |
| def __init__(self, function_code, **kwargs): | |
| self.function_code = function_code | |
| super().__init__(**kwargs) | |
| class Response0(WriteMultipleRegistersResponse): | |
| pass | |
| class Request1(WriteSingleRegisterRequest): | |
| def __init__(self, function_code, **kwargs): | |
| self.function_code = function_code | |
| super().__init__(**kwargs) | |
| class Response1(WriteSingleRegisterResponse): | |
| pass | |
| if __name__ == '__main__': | |
| logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.DEBUG) | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [tool.yapf] | |
| column_limit = 120 | |
| continuation_align_style = 'fixed' | |
| [tool.isort] | |
| line_length = 120 | |
| multi_line_output = 2 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| pymodbus==2.3.0 | |
| pyserial==3.4 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment