Last active
June 19, 2025 07:49
-
-
Save yipo/f29f53f493d1e64a3ff6e53bf3e1ae94 to your computer and use it in GitHub Desktop.
Modbus holding write with custom function code.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| *.pyc | |
| /.venv/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import struct | |
| import logging | |
| import argparse | |
| from pymodbus.pdu import ModbusRequest, ModbusResponse | |
| from pymodbus.client.sync import ModbusTcpClient as ModbusClient | |
| from pymodbus.client.sync import ModbusSerialClient | |
| logging.basicConfig() | |
| log = logging.getLogger() | |
| log.setLevel(logging.INFO) | |
| # --- Custom WRITE classes --- # | |
| class CustomWriteResponse(ModbusResponse): | |
| function_code = None # Will be set dynamically | |
| _rtu_frame_size = 0 | |
| def __init__(self, address=None, count=None, **kwargs): | |
| super().__init__(**kwargs) | |
| self.address = address | |
| self.count = count | |
| def encode(self): | |
| return struct.pack('>HH', self.address, self.count) | |
| def decode(self, data): | |
| self.address, self.count = struct.unpack('>HH', data) | |
| class CustomWriteRequest(ModbusRequest): | |
| _rtu_frame_size = 8 | |
| def __init__(self, function_code, address, values, **kwargs): | |
| super().__init__(**kwargs) | |
| self.function_code = function_code | |
| self.address = address | |
| self.values = values # list of values | |
| def encode(self): | |
| return struct.pack('>2HBH', self.address, 1, 2, self.values[0]) | |
| def decode(self, data): | |
| self.address = struct.unpack('>H', data[:2])[0] | |
| self.values = [struct.unpack('>H', data[i:i+2])[0] for i in range(2, len(data), 2)] | |
| def execute(self, context): | |
| # Server-side: not used in client example | |
| return CustomWriteResponse(self.address, len(self.values), function_code=self.function_code) | |
| # --- Custom READ classes --- # | |
| class CustomReadResponse(ModbusResponse): | |
| function_code = None # Will be set dynamically | |
| def __init__(self, values=None, **kwargs): | |
| super().__init__(**kwargs) | |
| self.values = values or [] | |
| def encode(self): | |
| # First byte: number of bytes, then all registers as >H | |
| result = struct.pack('B', len(self.values) * 2) | |
| for v in self.values: | |
| result += struct.pack('>H', v) | |
| return result | |
| def decode(self, data): | |
| byte_count = data[0] | |
| self.values = [] | |
| for i in range(1, byte_count+1, 2): | |
| self.values.append(struct.unpack('>H', data[i:i+2])[0]) | |
| class CustomReadRequest(ModbusRequest): | |
| _rtu_frame_size = 8 | |
| def __init__(self, function_code, address, count, **kwargs): | |
| super().__init__(**kwargs) | |
| self.function_code = function_code | |
| self.address = address | |
| self.count = count | |
| def encode(self): | |
| return struct.pack('>HH', self.address, self.count) | |
| def decode(self, data): | |
| self.address, self.count = struct.unpack('>HH', data) | |
| def execute(self, context): | |
| # Server-side: not used in client example | |
| values = [0] * self.count # Placeholder | |
| return CustomReadResponse(values=values, function_code=self.function_code) | |
| def parse_args(): | |
| parser = argparse.ArgumentParser( | |
| description="Modbus custom read/write tool.\n\n" | |
| "Example usage:\n" | |
| " TCP read: python modbus_custom.py --mode tcp --ip 34.81.214.117 --port 502" | |
| " --modbus_id 2 --function_code 3 --address 40125 --length 1\n" | |
| " TCP write: python modbus_custom.py --mode tcp --ip 34.81.214.117 --port 502" | |
| " --modbus_id 2 --function_code 6 --address 40125 --length 1 --value 100\n" | |
| " RTU read: python modbus_custom.py --mode rtu --serial /dev/ttyUSB0" | |
| " --baudrate 9600 --parity N --bytesize 8 --stopbits 1 --modbus_id 1" | |
| " --function_code 55 --address 100 --length 2\n" | |
| " RTU write: python modbus_custom.py --mode rtu --serial /dev/ttyUSB0" | |
| " --baudrate 9600 --parity N --bytesize 8 --stopbits 1 --modbus_id 1" | |
| " --function_code 42 --address 100 --length 2 --value 123,456\n", | |
| formatter_class=argparse.RawTextHelpFormatter | |
| ) | |
| parser.add_argument('--mode', choices=['tcp', 'rtu'], | |
| required=True, help='Connection mode: "tcp" or "rtu"') | |
| parser.add_argument('--ip', help='TCP server IP address') | |
| parser.add_argument('--port', type=int, help='TCP port or Serial port (for RTU)') | |
| parser.add_argument('--serial', help='Serial port device (for RTU)') | |
| parser.add_argument('--baudrate', type=int, default=9600, help='Serial baudrate (for RTU)') | |
| parser.add_argument('--parity', choices=['N', 'E', 'O'], | |
| default='N', help='Serial parity (N/E/O)') | |
| parser.add_argument('--bytesize', type=int, | |
| choices=[5, 6, 7, 8], default=8, help='Serial bytesize') | |
| parser.add_argument('--stopbits', type=int, choices=[1, 2], default=1, help='Serial stopbits') | |
| parser.add_argument('--modbus_id', type=int, required=True, help='Modbus slave ID') | |
| parser.add_argument('--function_code', type=int, required=True, help='Modbus function code') | |
| parser.add_argument('--address', type=int, required=True, help='Register address') | |
| parser.add_argument('--length', type=int, required=True, | |
| help='Number of registers to read/write') | |
| parser.add_argument( | |
| '--value', | |
| help='For write, single value or comma separated. Accepts dec or hex (e.g. 123,0x7B)') | |
| return parser.parse_args() | |
| def get_client(args): | |
| if args.mode == 'tcp': | |
| if not args.ip or not args.port: | |
| raise ValueError('TCP mode requires --ip and --port') | |
| return ModbusClient(host=args.ip, port=args.port) | |
| elif args.mode == 'rtu': | |
| if not args.serial: | |
| raise ValueError('RTU mode requires --serial') | |
| # port: serial port (e.g. COM3, /dev/ttyUSB0) | |
| return ModbusSerialClient( | |
| method='rtu', | |
| port=args.serial, | |
| baudrate=args.baudrate, | |
| parity=args.parity, | |
| bytesize=args.bytesize, | |
| stopbits=args.stopbits, | |
| timeout=1, | |
| ) | |
| def main(): | |
| args = parse_args() | |
| is_write = args.value is not None | |
| if is_write: | |
| if ',' in args.value: | |
| values = [int(v, 0) for v in args.value.split(',')] | |
| else: | |
| values = [int(args.value)] * args.length if args.length > 1 else [int(args.value)] | |
| client = get_client(args) | |
| try: | |
| client.connect() | |
| if is_write: | |
| CustomWriteResponse.function_code = args.function_code | |
| client.register(CustomWriteResponse) | |
| request = CustomWriteRequest( | |
| args.function_code, args.address, values, unit=args.modbus_id) | |
| else: | |
| CustomReadResponse.function_code = args.function_code | |
| client.register(CustomReadResponse) | |
| request = CustomReadRequest(args.function_code, args.address, | |
| args.length, unit=args.modbus_id) | |
| print('request:', client.framer.buildPacket(request).hex(), request.encode().hex()) | |
| response = client.execute(request) | |
| if is_write: | |
| print( | |
| f"Write Response: Address={getattr(response, 'address', None)}, " | |
| f"Count={getattr(response, 'count', None)}" | |
| ) | |
| else: | |
| print(f"Read Response: Values={getattr(response, 'values', None)}") | |
| except Exception as e: | |
| print(f"Modbus request failed: {e}") | |
| finally: | |
| client.close() | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| pymodbus==2.3.0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [flake8] | |
| max_line_length=100 | |
| [isort] | |
| line_length=100 | |
| multi_line_output=4 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment