Skip to content

Instantly share code, notes, and snippets.

@junka
Last active January 20, 2026 03:52
Show Gist options
  • Select an option

  • Save junka/6b60c1bed66b5d85977deb1b206f9eaf to your computer and use it in GitHub Desktop.

Select an option

Save junka/6b60c1bed66b5d85977deb1b206f9eaf to your computer and use it in GitHub Desktop.
tegramonitor.py
#!/usr/bin/env python3
import re
import json
import subprocess
import signal
import sys
import argparse
from datetime import datetime
# Global flag for graceful shutdown
running = True
def signal_handler(sig, frame):
global running
print("\n[INFO] Shutting down...", file=sys.stderr)
running = False
def parse_tegrastats_line(line):
"""Parse a single line of tegrastats output into a structured dict."""
data = {}
# Optional timestamp at start (Jetson may or may not include it)
ts_match = re.match(r'(\d{2}-\d{2}-\d{4} \d{2}:\d{2}:\d{2})', line)
if ts_match:
data['timestamp'] = ts_match.group(1)
line = line[ts_match.end():].strip()
else:
data['timestamp'] = datetime.now().strftime("%m-%d-%Y %H:%M:%S")
# RAM
ram_match = re.search(r'RAM (\d+)/(\d+)MB $ lfb (\d+)x(\d+)MB $ ', line)
if ram_match:
data['ram'] = {
'used_mb': int(ram_match.group(1)),
'total_mb': int(ram_match.group(2)),
'lfb_chunks': int(ram_match.group(3)),
'lfb_chunk_size_mb': int(ram_match.group(4))
}
# CPU cores
cpu_match = re.search(r'CPU $ ([^ $ ]+) $ ', line)
if cpu_match:
cores = []
for core in cpu_match.group(1).split(','):
parts = core.strip().split('@')
if len(parts) == 2:
util_str = parts[0].rstrip('%')
freq_str = parts[1]
if util_str.isdigit() and freq_str.isdigit():
cores.append({
'util_percent': int(util_str),
'freq_mhz': int(freq_str)
})
data['cpu_cores'] = cores
# GPU (GR3D)
gpu_match = re.search(r'GR3D_FREQ ([0-9]+)%(@ $ (\d+),(\d+) $ )?', line)
if gpu_match:
data['gpu'] = {'util_percent': int(gpu_match.group(1))}
if gpu_match.group(3):
data['gpu']['freq_mhz_min'] = int(gpu_match.group(3))
data['gpu']['freq_mhz_max'] = int(gpu_match.group(4))
# Accelerators: NVENC, NVDEC, VIC
for acc in ['NVENC', 'NVDEC', 'VIC']:
acc_match = re.search(rf'{acc} ([0-9]+)%@(\d+)', line)
if acc_match:
data[acc.lower()] = {
'util_percent': int(acc_match.group(1)),
'freq_mhz': int(acc_match.group(2))
}
# Temperatures
temp_pattern = r'([A-Za-z0-9_-]+)@([\d.]+)C'
temps = {}
for match in re.finditer(temp_pattern, line):
sensor = match.group(1)
try:
temp = float(match.group(2))
temps[sensor] = temp
except ValueError:
continue
if temps:
data['temperatures_celsius'] = temps
return data
def format_top_like(data):
"""Format parsed data in a compact, top-like human-readable style."""
lines = []
# Header
lines.append(f"[{data['timestamp']}]")
lines.append("-" * 60)
# RAM
if 'ram' in data:
ram = data['ram']
used, total = ram['used_mb'], ram['total_mb']
pct = used / total * 100 if total > 0 else 0
lines.append(f"RAM: {used}/{total} MB ({pct:.1f}%) | LFB: {ram['lfb_chunks']}x{ram['lfb_chunk_size_mb']}MB")
# CPU
if 'cpu_cores' in data:
avg_util = sum(c['util_percent'] for c in data['cpu_cores']) / len(data['cpu_cores'])
max_freq = max(c['freq_mhz'] for c in data['cpu_cores'])
lines.append(f"CPU: Avg={avg_util:.1f}% | MaxFreq={max_freq} MHz")
# Optional: show per-core if you want (commented out for brevity)
# core_utils = ', '.join(f"C{i}:{c['util_percent']}%" for i, c in enumerate(data['cpu_cores']))
# lines.append(f" Cores: {core_utils}")
# GPU
if 'gpu' in data:
gpu = data['gpu']
util = gpu['util_percent']
freq_info = f" @ {gpu.get('freq_mhz_min', '?')}~{gpu.get('freq_mhz_max', '?')} MHz" if 'freq_mhz_min' in gpu else ""
lines.append(f"GPU: {util}%{freq_info}")
# Accelerators
for name, label in [('nvenc', 'NVENC'), ('nvdec', 'NVDEC'), ('vic', 'VIC')]:
if name in data:
acc = data[name]
lines.append(f"{label}: {acc['util_percent']}% @ {acc['freq_mhz']} MHz")
# Temperatures (show key ones)
if 'temperatures_celsius' in data:
temps = data['temperatures_celsius']
key_sensors = ['GPU', 'CPU', 'SOC0', 'tj']
temp_strs = []
for s in key_sensors:
if s in temps:
temp_strs.append(f"{s}: {temps[s]:.1f}°C")
if temp_strs:
lines.append("Temps: " + " | ".join(temp_strs))
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Monitor Jetson system stats via tegrastats.")
parser.add_argument('-j', '--json', action='store_true', help="Output in JSON format (one object per line)")
args = parser.parse_args()
signal.signal(signal.SIGINT, signal_handler)
# Check if tegrastats exists
try:
subprocess.run(['tegrastats', '--help'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
except (FileNotFoundError, subprocess.CalledProcessError):
print("[ERROR] 'tegrastats' not found. This script must run on a Jetson device.", file=sys.stderr)
sys.exit(1)
print("[INFO] Starting monitor... Press Ctrl+C to stop.", file=sys.stderr)
if not args.json:
print("\n" + "="*60)
try:
proc = subprocess.Popen(
['tegrastats'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
bufsize=1
)
while running:
line = proc.stdout.readline()
if not line:
if proc.poll() is not None:
break
continue
line = line.strip()
if not line:
continue
try:
parsed = parse_tegrastats_line(line)
if args.json:
print(json.dumps(parsed))
sys.stdout.flush()
else:
print(format_top_like(parsed))
print("-" * 60)
except Exception as e:
print(f"[WARN] Parse error: {e}", file=sys.stderr)
except KeyboardInterrupt:
pass
finally:
proc.terminate()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
proc.kill()
if not args.json:
print("\n[INFO] Monitor stopped.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment