Skip to content

Instantly share code, notes, and snippets.

@dennisheitmann
Last active February 25, 2026 17:29
Show Gist options
  • Select an option

  • Save dennisheitmann/a9bb2937545aa46e3fbbfcc50484b53c to your computer and use it in GitHub Desktop.

Select an option

Save dennisheitmann/a9bb2937545aa46e3fbbfcc50484b53c to your computer and use it in GitHub Desktop.
Get status information from Hytera RD985 DMR repeater via SNMP
# Hytera RD985 SNMP-to-MQTT Gateway
# ---------------------------------
# Python-based monitoring tool that polls real-time telemetry from a
# Hytera RD985 repeater via SNMP and optionally publishes the data to
# an MQTT broker.
# ---------------------------------
# Dennis Heitmann, DO7DH
# 2026-02-24
# ---------------------------------
import struct
import time
import warnings
import argparse
import paho.mqtt.client as mqtt
from pysnmp.hlapi import *
# --- Configuration ---
REPEATER_IP = "44.xxx.xxx.xxx"
REPEATER_PORT = 161
COMMUNITY = "public"
MQTT_SERVER = 'mqttserver'
MQTT_PORT = 1883
MQTT_USER = 'username'
MQTT_PW = 'password'
TOPIC_BASE = 'HAMNET/CA0LL/'
warnings.filterwarnings("ignore", category=SyntaxWarning)
# OID Map optimized for RD985 firmware
OIDS = {
"Model_Name": ".1.3.6.1.4.1.40297.1.2.4.1.0",
"Serial_No": ".1.3.6.1.4.1.40297.1.2.4.5.0",
"Firmware": ".1.3.6.1.4.1.40297.1.2.4.3.0",
"Hardware_Ver": ".1.3.6.1.4.1.40297.1.2.4.4.0",
"Callsign": ".1.3.6.1.4.1.40297.1.2.4.13.0",
"Channel_Name": ".1.3.6.1.4.1.40297.1.2.4.9.0",
"Channel_Index": ".1.3.6.1.4.1.40297.1.2.4.12.0",
"TX_Freq": ".1.3.6.1.4.1.40297.1.2.4.10.0",
"RX_Freq": ".1.3.6.1.4.1.40297.1.2.4.11.0",
"Voltage": ".1.3.6.1.4.1.40297.1.2.1.2.1.0",
"Temperature": ".1.3.6.1.4.1.40297.1.2.1.2.2.0",
"SWR": ".1.3.6.1.4.1.40297.1.2.1.2.4.0",
"TX_Fwd_Power": ".1.3.6.1.4.1.40297.1.2.1.2.5.0",
"TX_Ref_Power": ".1.3.6.1.4.1.40297.1.2.1.2.6.0",
"RSSI_TS1": ".1.3.6.1.4.1.40297.1.2.1.2.9.0",
"RSSI_TS2": ".1.3.6.1.4.1.40297.1.2.1.2.10.0",
"Alarm_Voltage": ".1.3.6.1.4.1.40297.1.2.1.1.1.0",
"Alarm_Temp": ".1.3.6.1.4.1.40297.1.2.1.1.2.0",
"Alarm_Fan": ".1.3.6.1.4.1.40297.1.2.1.1.3.0",
}
def decode_value(label, value):
try:
raw_bytes = value.asOctets()
except:
raw_bytes = str(value).encode()
# Strings (Force clean ASCII)
if label in ["Model_Name", "Serial_No", "Firmware", "Hardware_Ver", "Callsign", "Channel_Name"]:
return raw_bytes.decode('ascii', errors='ignore').strip()
# Floats (Telemetry)
if label in ["Voltage", "Temperature", "SWR", "TX_Fwd_Power", "TX_Ref_Power"]:
if len(raw_bytes) == 4:
raw_float = struct.unpack('<f', raw_bytes)[0]
# Power feels 2x too high, so apply the factor here:
if "Power" in label:
return round(raw_float / 2.0, 2) # Divide by 2
return round(raw_float, 2)
return 0.0
# Integers (Freq, RSSI, Alarms, Index)
try:
return int(str(value))
except:
return str(value)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--mqtt', action='store_true')
args = parser.parse_args()
client = None
if args.mqtt:
client = mqtt.Client()
client.username_pw_set(MQTT_USER, MQTT_PW)
try:
client.connect(MQTT_SERVER, MQTT_PORT, 60)
client.loop_start()
print("[*] MQTT Connected.")
except:
args.mqtt = False
print(f"[*] Monitoring {REPEATER_IP} (Adaptive: 4s - 32s)...")
current_sleep = 4
while True:
timestamp = time.strftime('%H:%M:%S')
print(f"\n--- Update: {timestamp} (Next poll: {current_sleep}s) ---")
active_signal = False
for label, oid in OIDS.items():
iterator = getCmd(
SnmpEngine(),
CommunityData(COMMUNITY, mpModel=0),
UdpTransportTarget((REPEATER_IP, REPEATER_PORT), timeout=1, retries=1),
ContextData(),
ObjectType(ObjectIdentity(oid))
)
error_indication, error_status, error_index, var_binds = next(iterator)
if error_indication:
print(f"[!] Error fetching {label}: {error_indication}")
continue
val = decode_value(label, var_binds[0][1])
# Logic to detect activity based on RSSI
if label in ["RSSI_TS1", "RSSI_TS2"]:
try:
if int(val) > -200:
active_signal = True
except (ValueError, TypeError):
pass
# Precise Print Logic
if "Freq" in label:
print(f"{label:15}: {val / 1_000_000:.4f} MHz")
elif "Power" in label:
print(f"{label:15}: {val} W")
elif label == "Voltage": # Only live Voltage gets 'V'
print(f"{label:15}: {val} V")
elif label == "Temperature": # Only live Temperature gets '°C'
print(f"{label:15}: {val} °C")
else:
# Alarms, RSSI, Names, and Indices print raw
print(f"{label:15}: {val}")
# MQTT Publish
if args.mqtt:
client.publish(f"{TOPIC_BASE}{label}", str(val))
# --- Adaptive Interval Logic ---
if active_signal:
current_sleep = 4
else:
current_sleep = min(current_sleep * 2, 32)
time.sleep(current_sleep)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nStopping...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment