Last active
March 7, 2025 22:49
-
-
Save mouldybread/dbd47313f1c197c2d6f22471fa5fbcbd to your computer and use it in GitHub Desktop.
This Python script collects time synchronization statistics from the Chrony daemon using the chronyc command-line tool and publishes these stats to an MQTT broker. Daemonises itself and collects data every 60 seconds, log output send to syslog. Requires python-daemon.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import subprocess | |
| import paho.mqtt.client as mqtt | |
| import json | |
| import logging | |
| import time | |
| import daemon | |
| import signal | |
| from logging.handlers import SysLogHandler | |
| # MQTT configuration | |
| MQTT_BROKER = "your_mqtt_broker_address" | |
| MQTT_PORT = 1883 | |
| MQTT_USERNAME = "your_mqtt_username" | |
| MQTT_PASSWORD = "your_mqtt_password" | |
| # Configure logging to system log | |
| logger = logging.getLogger('ChronyMQTT') | |
| logger.setLevel(logging.INFO) | |
| handler = SysLogHandler(address='/dev/log') | |
| formatter = logging.Formatter('%(name)s: %(levelname)s %(message)s') | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| # Function to run chronyc commands and get output | |
| def run_chronyc_command(command): | |
| try: | |
| result = subprocess.run(['chronyc', command], capture_output=True, text=True, check=True) | |
| return result.stdout | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Error running chronyc command: {e}") | |
| return None | |
| # Function to collect Chrony stats | |
| def collect_chrony_stats(): | |
| tracking_output = run_chronyc_command('tracking') | |
| sources_output = run_chronyc_command('sources') | |
| if tracking_output and sources_output: | |
| tracking_lines = tracking_output.splitlines() | |
| sources_lines = sources_output.splitlines() | |
| tracking_stats = {} | |
| for line in tracking_lines: | |
| if ':' in line: | |
| key, value = line.split(':', 1) | |
| tracking_stats[key.strip()] = value.strip() | |
| sources_stats = [] | |
| for line in sources_lines[2:]: # Skip the header lines | |
| if line.strip(): | |
| sources_stats.append(line.strip()) | |
| return { | |
| 'tracking': tracking_stats, | |
| 'sources': sources_stats | |
| } | |
| else: | |
| return None | |
| # Function to publish stats to MQTT | |
| def publish_to_mqtt(stats, is_initial_startup=False): | |
| client = mqtt.Client() | |
| client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) | |
| client.connect(MQTT_BROKER, MQTT_PORT, 60) | |
| client.loop_start() | |
| for key, value in stats['tracking'].items(): | |
| topic = f"chrony/tracking/{key.replace(' ', '_')}" | |
| if is_initial_startup: | |
| client.publish(topic, value) | |
| logger.info(f"Published {key}: {value} to {topic} on startup") | |
| else: | |
| client.publish(topic, value) | |
| logger.info(f"Published {key}: {value} to {topic}") | |
| client.loop_stop() | |
| client.disconnect() | |
| # Main function to run as a daemon | |
| def main(): | |
| previous_stats = {'tracking': {}} | |
| # Publish all topics on startup | |
| chrony_stats = collect_chrony_stats() | |
| if chrony_stats: | |
| publish_to_mqtt(chrony_stats, is_initial_startup=True) | |
| previous_stats = chrony_stats | |
| else: | |
| logger.error("Failed to collect Chrony stats on startup") | |
| while True: | |
| chrony_stats = collect_chrony_stats() | |
| if chrony_stats: | |
| publish_to_mqtt(chrony_stats) | |
| previous_stats = chrony_stats | |
| else: | |
| logger.error("Failed to collect Chrony stats") | |
| time.sleep(60) # Wait for 60 seconds before the next update | |
| # Daemon context | |
| with daemon.DaemonContext( | |
| signal_map={ | |
| signal.SIGTERM: 'terminate', | |
| signal.SIGHUP: 'terminate', | |
| }, | |
| ): | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment