Skip to content

Instantly share code, notes, and snippets.

@mouldybread
Last active March 7, 2025 22:49
Show Gist options
  • Select an option

  • Save mouldybread/dbd47313f1c197c2d6f22471fa5fbcbd to your computer and use it in GitHub Desktop.

Select an option

Save mouldybread/dbd47313f1c197c2d6f22471fa5fbcbd to your computer and use it in GitHub Desktop.
This Python script collects time synchronization statistics from the Chrony daemon using the chronyc command-line tool and publishes these stats to an MQTT broker. Daemonises itself and collects data every 60 seconds, log output send to syslog. Requires python-daemon.
import subprocess
import paho.mqtt.client as mqtt
import json
import logging
import time
import daemon
import signal
from logging.handlers import SysLogHandler
# MQTT configuration
MQTT_BROKER = "your_mqtt_broker_address"
MQTT_PORT = 1883
MQTT_USERNAME = "your_mqtt_username"
MQTT_PASSWORD = "your_mqtt_password"
# Configure logging to system log
logger = logging.getLogger('ChronyMQTT')
logger.setLevel(logging.INFO)
handler = SysLogHandler(address='/dev/log')
formatter = logging.Formatter('%(name)s: %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Function to run chronyc commands and get output
def run_chronyc_command(command):
try:
result = subprocess.run(['chronyc', command], capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
logger.error(f"Error running chronyc command: {e}")
return None
# Function to collect Chrony stats
def collect_chrony_stats():
tracking_output = run_chronyc_command('tracking')
sources_output = run_chronyc_command('sources')
if tracking_output and sources_output:
tracking_lines = tracking_output.splitlines()
sources_lines = sources_output.splitlines()
tracking_stats = {}
for line in tracking_lines:
if ':' in line:
key, value = line.split(':', 1)
tracking_stats[key.strip()] = value.strip()
sources_stats = []
for line in sources_lines[2:]: # Skip the header lines
if line.strip():
sources_stats.append(line.strip())
return {
'tracking': tracking_stats,
'sources': sources_stats
}
else:
return None
# Function to publish stats to MQTT
def publish_to_mqtt(stats, is_initial_startup=False):
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
for key, value in stats['tracking'].items():
topic = f"chrony/tracking/{key.replace(' ', '_')}"
if is_initial_startup:
client.publish(topic, value)
logger.info(f"Published {key}: {value} to {topic} on startup")
else:
client.publish(topic, value)
logger.info(f"Published {key}: {value} to {topic}")
client.loop_stop()
client.disconnect()
# Main function to run as a daemon
def main():
previous_stats = {'tracking': {}}
# Publish all topics on startup
chrony_stats = collect_chrony_stats()
if chrony_stats:
publish_to_mqtt(chrony_stats, is_initial_startup=True)
previous_stats = chrony_stats
else:
logger.error("Failed to collect Chrony stats on startup")
while True:
chrony_stats = collect_chrony_stats()
if chrony_stats:
publish_to_mqtt(chrony_stats)
previous_stats = chrony_stats
else:
logger.error("Failed to collect Chrony stats")
time.sleep(60) # Wait for 60 seconds before the next update
# Daemon context
with daemon.DaemonContext(
signal_map={
signal.SIGTERM: 'terminate',
signal.SIGHUP: 'terminate',
},
):
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment