Skip to content

Instantly share code, notes, and snippets.

@yipo
Last active December 1, 2025 08:41
Show Gist options
  • Select an option

  • Save yipo/cbab9d260736be30a3d06019a0534c2e to your computer and use it in GitHub Desktop.

Select an option

Save yipo/cbab9d260736be30a3d06019a0534c2e to your computer and use it in GitHub Desktop.
Write Modbus holding registers (using a custom function code).
*.pyc
/.venv/
import argparse
import logging
from operator import itemgetter
from pymodbus.client.sync import ModbusSerialClient, ModbusTcpClient
from pymodbus.register_write_message import WriteMultipleRegistersRequest, WriteMultipleRegistersResponse, \
WriteSingleRegisterRequest, WriteSingleRegisterResponse
logger = logging.getLogger(__name__)
def main():
args = parse_args()
client = create_client(args)
try:
if not client.connect():
raise ConnectionError('cannot connect to "{}".'.format(args.rtu or args.tcp))
response = write(client, args.slave_id, args.func, args.addr, args.data,
'single' if args.pdu_single else 'multiple')
logger.info('response: %s', response)
finally:
client.close()
def parse_args():
args = argparse.ArgumentParser()
conn = args.add_mutually_exclusive_group(required=True)
conn.add_argument(
'--rtu',
metavar='DEVICE',
)
conn.add_argument(
'--tcp',
metavar='ADDRESS',
)
config = (('BAUDRATE', 9600), ('BYTESIZE', 8), ('PARITY', 'N'), ('STOPBITS', 1))
args.add_argument(
'--config',
nargs=len(config),
default=tuple(map(itemgetter(1), config)),
metavar=tuple(map(itemgetter(0), config)),
)
args.add_argument(
'--port',
default=502,
type=int,
)
args.add_argument(
'--slave-id',
type=int,
required=True,
)
args.add_argument(
'--func',
type=any_base_int,
)
args.add_argument(
'--addr',
type=any_base_int,
required=True,
)
args.add_argument(
'--data',
nargs='+',
type=any_base_int,
required=True,
)
args.add_argument(
'--pdu-single',
action='store_true',
)
return args.parse_args()
def any_base_int(value):
return int(value, 0)
def create_client(args):
if args.rtu:
baudrate, bytesize, parity, stopbits = args.config
return ModbusSerialClient(
'rtu',
port=args.rtu,
baudrate=baudrate,
bytesize=bytesize,
parity=parity,
stopbits=stopbits,
)
if args.tcp:
return ModbusTcpClient(args.tcp, args.port)
raise ValueError('one of the connection types must be selected.')
def write(client, slave_id, func, addr, data, pdu):
if func is None:
func = 0x10 if len(data) > 1 else 0x06
if func == 0x10:
return client.write_registers(addr, data, unit=slave_id)
if func == 0x06:
return client.write_register(addr, data[0], unit=slave_id)
params = dict(unit=slave_id, function_code=func, address=addr)
if pdu != 'single':
request = Request0(**params, values=data)
Response0.function_code = func
client.register(Response0)
else:
request = Request1(**params, value=data[0])
Response1.function_code = func
client.register(Response1)
return client.execute(request)
class Request0(WriteMultipleRegistersRequest):
def __init__(self, function_code, **kwargs):
self.function_code = function_code
super().__init__(**kwargs)
class Response0(WriteMultipleRegistersResponse):
pass
class Request1(WriteSingleRegisterRequest):
def __init__(self, function_code, **kwargs):
self.function_code = function_code
super().__init__(**kwargs)
class Response1(WriteSingleRegisterResponse):
pass
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.DEBUG)
main()
[tool.yapf]
column_limit = 120
continuation_align_style = 'fixed'
[tool.isort]
line_length = 120
multi_line_output = 2
pymodbus==2.3.0
pyserial==3.4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment