Last active
January 20, 2026 03:52
-
-
Save junka/6b60c1bed66b5d85977deb1b206f9eaf to your computer and use it in GitHub Desktop.
tegramonitor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import re | |
| import json | |
| import subprocess | |
| import signal | |
| import sys | |
| import argparse | |
| from datetime import datetime | |
| # Global flag for graceful shutdown | |
| running = True | |
| def signal_handler(sig, frame): | |
| global running | |
| print("\n[INFO] Shutting down...", file=sys.stderr) | |
| running = False | |
| def parse_tegrastats_line(line): | |
| """Parse a single line of tegrastats output into a structured dict.""" | |
| data = {} | |
| # Optional timestamp at start (Jetson may or may not include it) | |
| ts_match = re.match(r'(\d{2}-\d{2}-\d{4} \d{2}:\d{2}:\d{2})', line) | |
| if ts_match: | |
| data['timestamp'] = ts_match.group(1) | |
| line = line[ts_match.end():].strip() | |
| else: | |
| data['timestamp'] = datetime.now().strftime("%m-%d-%Y %H:%M:%S") | |
| # RAM | |
| ram_match = re.search(r'RAM (\d+)/(\d+)MB $ lfb (\d+)x(\d+)MB $ ', line) | |
| if ram_match: | |
| data['ram'] = { | |
| 'used_mb': int(ram_match.group(1)), | |
| 'total_mb': int(ram_match.group(2)), | |
| 'lfb_chunks': int(ram_match.group(3)), | |
| 'lfb_chunk_size_mb': int(ram_match.group(4)) | |
| } | |
| # CPU cores | |
| cpu_match = re.search(r'CPU $ ([^ $ ]+) $ ', line) | |
| if cpu_match: | |
| cores = [] | |
| for core in cpu_match.group(1).split(','): | |
| parts = core.strip().split('@') | |
| if len(parts) == 2: | |
| util_str = parts[0].rstrip('%') | |
| freq_str = parts[1] | |
| if util_str.isdigit() and freq_str.isdigit(): | |
| cores.append({ | |
| 'util_percent': int(util_str), | |
| 'freq_mhz': int(freq_str) | |
| }) | |
| data['cpu_cores'] = cores | |
| # GPU (GR3D) | |
| gpu_match = re.search(r'GR3D_FREQ ([0-9]+)%(@ $ (\d+),(\d+) $ )?', line) | |
| if gpu_match: | |
| data['gpu'] = {'util_percent': int(gpu_match.group(1))} | |
| if gpu_match.group(3): | |
| data['gpu']['freq_mhz_min'] = int(gpu_match.group(3)) | |
| data['gpu']['freq_mhz_max'] = int(gpu_match.group(4)) | |
| # Accelerators: NVENC, NVDEC, VIC | |
| for acc in ['NVENC', 'NVDEC', 'VIC']: | |
| acc_match = re.search(rf'{acc} ([0-9]+)%@(\d+)', line) | |
| if acc_match: | |
| data[acc.lower()] = { | |
| 'util_percent': int(acc_match.group(1)), | |
| 'freq_mhz': int(acc_match.group(2)) | |
| } | |
| # Temperatures | |
| temp_pattern = r'([A-Za-z0-9_-]+)@([\d.]+)C' | |
| temps = {} | |
| for match in re.finditer(temp_pattern, line): | |
| sensor = match.group(1) | |
| try: | |
| temp = float(match.group(2)) | |
| temps[sensor] = temp | |
| except ValueError: | |
| continue | |
| if temps: | |
| data['temperatures_celsius'] = temps | |
| return data | |
| def format_top_like(data): | |
| """Format parsed data in a compact, top-like human-readable style.""" | |
| lines = [] | |
| # Header | |
| lines.append(f"[{data['timestamp']}]") | |
| lines.append("-" * 60) | |
| # RAM | |
| if 'ram' in data: | |
| ram = data['ram'] | |
| used, total = ram['used_mb'], ram['total_mb'] | |
| pct = used / total * 100 if total > 0 else 0 | |
| lines.append(f"RAM: {used}/{total} MB ({pct:.1f}%) | LFB: {ram['lfb_chunks']}x{ram['lfb_chunk_size_mb']}MB") | |
| # CPU | |
| if 'cpu_cores' in data: | |
| avg_util = sum(c['util_percent'] for c in data['cpu_cores']) / len(data['cpu_cores']) | |
| max_freq = max(c['freq_mhz'] for c in data['cpu_cores']) | |
| lines.append(f"CPU: Avg={avg_util:.1f}% | MaxFreq={max_freq} MHz") | |
| # Optional: show per-core if you want (commented out for brevity) | |
| # core_utils = ', '.join(f"C{i}:{c['util_percent']}%" for i, c in enumerate(data['cpu_cores'])) | |
| # lines.append(f" Cores: {core_utils}") | |
| # GPU | |
| if 'gpu' in data: | |
| gpu = data['gpu'] | |
| util = gpu['util_percent'] | |
| freq_info = f" @ {gpu.get('freq_mhz_min', '?')}~{gpu.get('freq_mhz_max', '?')} MHz" if 'freq_mhz_min' in gpu else "" | |
| lines.append(f"GPU: {util}%{freq_info}") | |
| # Accelerators | |
| for name, label in [('nvenc', 'NVENC'), ('nvdec', 'NVDEC'), ('vic', 'VIC')]: | |
| if name in data: | |
| acc = data[name] | |
| lines.append(f"{label}: {acc['util_percent']}% @ {acc['freq_mhz']} MHz") | |
| # Temperatures (show key ones) | |
| if 'temperatures_celsius' in data: | |
| temps = data['temperatures_celsius'] | |
| key_sensors = ['GPU', 'CPU', 'SOC0', 'tj'] | |
| temp_strs = [] | |
| for s in key_sensors: | |
| if s in temps: | |
| temp_strs.append(f"{s}: {temps[s]:.1f}°C") | |
| if temp_strs: | |
| lines.append("Temps: " + " | ".join(temp_strs)) | |
| return "\n".join(lines) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Monitor Jetson system stats via tegrastats.") | |
| parser.add_argument('-j', '--json', action='store_true', help="Output in JSON format (one object per line)") | |
| args = parser.parse_args() | |
| signal.signal(signal.SIGINT, signal_handler) | |
| # Check if tegrastats exists | |
| try: | |
| subprocess.run(['tegrastats', '--help'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) | |
| except (FileNotFoundError, subprocess.CalledProcessError): | |
| print("[ERROR] 'tegrastats' not found. This script must run on a Jetson device.", file=sys.stderr) | |
| sys.exit(1) | |
| print("[INFO] Starting monitor... Press Ctrl+C to stop.", file=sys.stderr) | |
| if not args.json: | |
| print("\n" + "="*60) | |
| try: | |
| proc = subprocess.Popen( | |
| ['tegrastats'], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| universal_newlines=True, | |
| bufsize=1 | |
| ) | |
| while running: | |
| line = proc.stdout.readline() | |
| if not line: | |
| if proc.poll() is not None: | |
| break | |
| continue | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| parsed = parse_tegrastats_line(line) | |
| if args.json: | |
| print(json.dumps(parsed)) | |
| sys.stdout.flush() | |
| else: | |
| print(format_top_like(parsed)) | |
| print("-" * 60) | |
| except Exception as e: | |
| print(f"[WARN] Parse error: {e}", file=sys.stderr) | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| proc.terminate() | |
| try: | |
| proc.wait(timeout=2) | |
| except subprocess.TimeoutExpired: | |
| proc.kill() | |
| if not args.json: | |
| print("\n[INFO] Monitor stopped.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment