Skip to content

Instantly share code, notes, and snippets.

@yipo
Last active June 19, 2025 07:49
Show Gist options
  • Select an option

  • Save yipo/f29f53f493d1e64a3ff6e53bf3e1ae94 to your computer and use it in GitHub Desktop.

Select an option

Save yipo/f29f53f493d1e64a3ff6e53bf3e1ae94 to your computer and use it in GitHub Desktop.
Modbus holding write with custom function code.
*.pyc
/.venv/
import struct
import logging
import argparse
from pymodbus.pdu import ModbusRequest, ModbusResponse
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from pymodbus.client.sync import ModbusSerialClient
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)
# --- Custom WRITE classes --- #
class CustomWriteResponse(ModbusResponse):
function_code = None # Will be set dynamically
_rtu_frame_size = 0
def __init__(self, address=None, count=None, **kwargs):
super().__init__(**kwargs)
self.address = address
self.count = count
def encode(self):
return struct.pack('>HH', self.address, self.count)
def decode(self, data):
self.address, self.count = struct.unpack('>HH', data)
class CustomWriteRequest(ModbusRequest):
_rtu_frame_size = 8
def __init__(self, function_code, address, values, **kwargs):
super().__init__(**kwargs)
self.function_code = function_code
self.address = address
self.values = values # list of values
def encode(self):
return struct.pack('>2HBH', self.address, 1, 2, self.values[0])
def decode(self, data):
self.address = struct.unpack('>H', data[:2])[0]
self.values = [struct.unpack('>H', data[i:i+2])[0] for i in range(2, len(data), 2)]
def execute(self, context):
# Server-side: not used in client example
return CustomWriteResponse(self.address, len(self.values), function_code=self.function_code)
# --- Custom READ classes --- #
class CustomReadResponse(ModbusResponse):
function_code = None # Will be set dynamically
def __init__(self, values=None, **kwargs):
super().__init__(**kwargs)
self.values = values or []
def encode(self):
# First byte: number of bytes, then all registers as >H
result = struct.pack('B', len(self.values) * 2)
for v in self.values:
result += struct.pack('>H', v)
return result
def decode(self, data):
byte_count = data[0]
self.values = []
for i in range(1, byte_count+1, 2):
self.values.append(struct.unpack('>H', data[i:i+2])[0])
class CustomReadRequest(ModbusRequest):
_rtu_frame_size = 8
def __init__(self, function_code, address, count, **kwargs):
super().__init__(**kwargs)
self.function_code = function_code
self.address = address
self.count = count
def encode(self):
return struct.pack('>HH', self.address, self.count)
def decode(self, data):
self.address, self.count = struct.unpack('>HH', data)
def execute(self, context):
# Server-side: not used in client example
values = [0] * self.count # Placeholder
return CustomReadResponse(values=values, function_code=self.function_code)
def parse_args():
parser = argparse.ArgumentParser(
description="Modbus custom read/write tool.\n\n"
"Example usage:\n"
" TCP read: python modbus_custom.py --mode tcp --ip 34.81.214.117 --port 502"
" --modbus_id 2 --function_code 3 --address 40125 --length 1\n"
" TCP write: python modbus_custom.py --mode tcp --ip 34.81.214.117 --port 502"
" --modbus_id 2 --function_code 6 --address 40125 --length 1 --value 100\n"
" RTU read: python modbus_custom.py --mode rtu --serial /dev/ttyUSB0"
" --baudrate 9600 --parity N --bytesize 8 --stopbits 1 --modbus_id 1"
" --function_code 55 --address 100 --length 2\n"
" RTU write: python modbus_custom.py --mode rtu --serial /dev/ttyUSB0"
" --baudrate 9600 --parity N --bytesize 8 --stopbits 1 --modbus_id 1"
" --function_code 42 --address 100 --length 2 --value 123,456\n",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('--mode', choices=['tcp', 'rtu'],
required=True, help='Connection mode: "tcp" or "rtu"')
parser.add_argument('--ip', help='TCP server IP address')
parser.add_argument('--port', type=int, help='TCP port or Serial port (for RTU)')
parser.add_argument('--serial', help='Serial port device (for RTU)')
parser.add_argument('--baudrate', type=int, default=9600, help='Serial baudrate (for RTU)')
parser.add_argument('--parity', choices=['N', 'E', 'O'],
default='N', help='Serial parity (N/E/O)')
parser.add_argument('--bytesize', type=int,
choices=[5, 6, 7, 8], default=8, help='Serial bytesize')
parser.add_argument('--stopbits', type=int, choices=[1, 2], default=1, help='Serial stopbits')
parser.add_argument('--modbus_id', type=int, required=True, help='Modbus slave ID')
parser.add_argument('--function_code', type=int, required=True, help='Modbus function code')
parser.add_argument('--address', type=int, required=True, help='Register address')
parser.add_argument('--length', type=int, required=True,
help='Number of registers to read/write')
parser.add_argument(
'--value',
help='For write, single value or comma separated. Accepts dec or hex (e.g. 123,0x7B)')
return parser.parse_args()
def get_client(args):
if args.mode == 'tcp':
if not args.ip or not args.port:
raise ValueError('TCP mode requires --ip and --port')
return ModbusClient(host=args.ip, port=args.port)
elif args.mode == 'rtu':
if not args.serial:
raise ValueError('RTU mode requires --serial')
# port: serial port (e.g. COM3, /dev/ttyUSB0)
return ModbusSerialClient(
method='rtu',
port=args.serial,
baudrate=args.baudrate,
parity=args.parity,
bytesize=args.bytesize,
stopbits=args.stopbits,
timeout=1,
)
def main():
args = parse_args()
is_write = args.value is not None
if is_write:
if ',' in args.value:
values = [int(v, 0) for v in args.value.split(',')]
else:
values = [int(args.value)] * args.length if args.length > 1 else [int(args.value)]
client = get_client(args)
try:
client.connect()
if is_write:
CustomWriteResponse.function_code = args.function_code
client.register(CustomWriteResponse)
request = CustomWriteRequest(
args.function_code, args.address, values, unit=args.modbus_id)
else:
CustomReadResponse.function_code = args.function_code
client.register(CustomReadResponse)
request = CustomReadRequest(args.function_code, args.address,
args.length, unit=args.modbus_id)
print('request:', client.framer.buildPacket(request).hex(), request.encode().hex())
response = client.execute(request)
if is_write:
print(
f"Write Response: Address={getattr(response, 'address', None)}, "
f"Count={getattr(response, 'count', None)}"
)
else:
print(f"Read Response: Values={getattr(response, 'values', None)}")
except Exception as e:
print(f"Modbus request failed: {e}")
finally:
client.close()
if __name__ == '__main__':
main()
[flake8]
max_line_length=100
[isort]
line_length=100
multi_line_output=4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment