Last active
February 25, 2026 17:29
-
-
Save dennisheitmann/a9bb2937545aa46e3fbbfcc50484b53c to your computer and use it in GitHub Desktop.
Get status information from Hytera RD985 DMR repeater via SNMP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Hytera RD985 SNMP-to-MQTT Gateway | |
| # --------------------------------- | |
| # Python-based monitoring tool that polls real-time telemetry from a | |
| # Hytera RD985 repeater via SNMP and optionally publishes the data to | |
| # an MQTT broker. | |
| # --------------------------------- | |
| # Dennis Heitmann, DO7DH | |
| # 2026-02-24 | |
| # --------------------------------- | |
| import struct | |
| import time | |
| import warnings | |
| import argparse | |
| import paho.mqtt.client as mqtt | |
| from pysnmp.hlapi import * | |
| # --- Configuration --- | |
| REPEATER_IP = "44.xxx.xxx.xxx" | |
| REPEATER_PORT = 161 | |
| COMMUNITY = "public" | |
| MQTT_SERVER = 'mqttserver' | |
| MQTT_PORT = 1883 | |
| MQTT_USER = 'username' | |
| MQTT_PW = 'password' | |
| TOPIC_BASE = 'HAMNET/CA0LL/' | |
| warnings.filterwarnings("ignore", category=SyntaxWarning) | |
| # OID Map optimized for RD985 firmware | |
| OIDS = { | |
| "Model_Name": ".1.3.6.1.4.1.40297.1.2.4.1.0", | |
| "Serial_No": ".1.3.6.1.4.1.40297.1.2.4.5.0", | |
| "Firmware": ".1.3.6.1.4.1.40297.1.2.4.3.0", | |
| "Hardware_Ver": ".1.3.6.1.4.1.40297.1.2.4.4.0", | |
| "Callsign": ".1.3.6.1.4.1.40297.1.2.4.13.0", | |
| "Channel_Name": ".1.3.6.1.4.1.40297.1.2.4.9.0", | |
| "Channel_Index": ".1.3.6.1.4.1.40297.1.2.4.12.0", | |
| "TX_Freq": ".1.3.6.1.4.1.40297.1.2.4.10.0", | |
| "RX_Freq": ".1.3.6.1.4.1.40297.1.2.4.11.0", | |
| "Voltage": ".1.3.6.1.4.1.40297.1.2.1.2.1.0", | |
| "Temperature": ".1.3.6.1.4.1.40297.1.2.1.2.2.0", | |
| "SWR": ".1.3.6.1.4.1.40297.1.2.1.2.4.0", | |
| "TX_Fwd_Power": ".1.3.6.1.4.1.40297.1.2.1.2.5.0", | |
| "TX_Ref_Power": ".1.3.6.1.4.1.40297.1.2.1.2.6.0", | |
| "RSSI_TS1": ".1.3.6.1.4.1.40297.1.2.1.2.9.0", | |
| "RSSI_TS2": ".1.3.6.1.4.1.40297.1.2.1.2.10.0", | |
| "Alarm_Voltage": ".1.3.6.1.4.1.40297.1.2.1.1.1.0", | |
| "Alarm_Temp": ".1.3.6.1.4.1.40297.1.2.1.1.2.0", | |
| "Alarm_Fan": ".1.3.6.1.4.1.40297.1.2.1.1.3.0", | |
| } | |
| def decode_value(label, value): | |
| try: | |
| raw_bytes = value.asOctets() | |
| except: | |
| raw_bytes = str(value).encode() | |
| # Strings (Force clean ASCII) | |
| if label in ["Model_Name", "Serial_No", "Firmware", "Hardware_Ver", "Callsign", "Channel_Name"]: | |
| return raw_bytes.decode('ascii', errors='ignore').strip() | |
| # Floats (Telemetry) | |
| if label in ["Voltage", "Temperature", "SWR", "TX_Fwd_Power", "TX_Ref_Power"]: | |
| if len(raw_bytes) == 4: | |
| raw_float = struct.unpack('<f', raw_bytes)[0] | |
| # Power feels 2x too high, so apply the factor here: | |
| if "Power" in label: | |
| return round(raw_float / 2.0, 2) # Divide by 2 | |
| return round(raw_float, 2) | |
| return 0.0 | |
| # Integers (Freq, RSSI, Alarms, Index) | |
| try: | |
| return int(str(value)) | |
| except: | |
| return str(value) | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--mqtt', action='store_true') | |
| args = parser.parse_args() | |
| client = None | |
| if args.mqtt: | |
| client = mqtt.Client() | |
| client.username_pw_set(MQTT_USER, MQTT_PW) | |
| try: | |
| client.connect(MQTT_SERVER, MQTT_PORT, 60) | |
| client.loop_start() | |
| print("[*] MQTT Connected.") | |
| except: | |
| args.mqtt = False | |
| print(f"[*] Monitoring {REPEATER_IP} (Adaptive: 4s - 32s)...") | |
| current_sleep = 4 | |
| while True: | |
| timestamp = time.strftime('%H:%M:%S') | |
| print(f"\n--- Update: {timestamp} (Next poll: {current_sleep}s) ---") | |
| active_signal = False | |
| for label, oid in OIDS.items(): | |
| iterator = getCmd( | |
| SnmpEngine(), | |
| CommunityData(COMMUNITY, mpModel=0), | |
| UdpTransportTarget((REPEATER_IP, REPEATER_PORT), timeout=1, retries=1), | |
| ContextData(), | |
| ObjectType(ObjectIdentity(oid)) | |
| ) | |
| error_indication, error_status, error_index, var_binds = next(iterator) | |
| if error_indication: | |
| print(f"[!] Error fetching {label}: {error_indication}") | |
| continue | |
| val = decode_value(label, var_binds[0][1]) | |
| # Logic to detect activity based on RSSI | |
| if label in ["RSSI_TS1", "RSSI_TS2"]: | |
| try: | |
| if int(val) > -200: | |
| active_signal = True | |
| except (ValueError, TypeError): | |
| pass | |
| # Precise Print Logic | |
| if "Freq" in label: | |
| print(f"{label:15}: {val / 1_000_000:.4f} MHz") | |
| elif "Power" in label: | |
| print(f"{label:15}: {val} W") | |
| elif label == "Voltage": # Only live Voltage gets 'V' | |
| print(f"{label:15}: {val} V") | |
| elif label == "Temperature": # Only live Temperature gets '°C' | |
| print(f"{label:15}: {val} °C") | |
| else: | |
| # Alarms, RSSI, Names, and Indices print raw | |
| print(f"{label:15}: {val}") | |
| # MQTT Publish | |
| if args.mqtt: | |
| client.publish(f"{TOPIC_BASE}{label}", str(val)) | |
| # --- Adaptive Interval Logic --- | |
| if active_signal: | |
| current_sleep = 4 | |
| else: | |
| current_sleep = min(current_sleep * 2, 32) | |
| time.sleep(current_sleep) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\nStopping...") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment