Skip to content

Instantly share code, notes, and snippets.

@kylemanna
Last active January 18, 2026 23:00
Show Gist options
  • Select an option

  • Save kylemanna/adcf850ee792042574dfd260028b8d94 to your computer and use it in GitHub Desktop.

Select an option

Save kylemanna/adcf850ee792042574dfd260028b8d94 to your computer and use it in GitHub Desktop.
NVMe tool to track degradation of read performance for old/stale data on NVMe drives
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.8"
# dependencies = [
# "matplotlib",
# "pandas",
# ]
# ///
"""
NVMe Drive Degradation Benchmark Tool
This tool provides comprehensive NVMe drive performance analysis including benchmarking
and visualization of results to detect degradation patterns.
Usage: uv run nvme_benchmark.py <command> [options]
Note: Run with 'uv run nvme_benchmark.py' to automatically install dependencies.
Commands:
benchmark Run performance benchmark on NVMe drive
plot Create visualizations from benchmark results
Run 'uv run nvme_benchmark.py benchmark --help' for benchmark options
Run 'uv run nvme_benchmark.py plot --help' for plotting options
"""
import argparse
import subprocess
import json
import csv
import time
import os
import sys
from datetime import datetime
try:
import pandas as pd
import matplotlib.pyplot as plt
HAS_PLOTTING = True
except ImportError:
HAS_PLOTTING = False
def get_fio_path():
"""Get the full path to fio executable."""
try:
result = subprocess.run(['which', 'fio'], capture_output=True, text=True)
if result.returncode == 0:
return result.stdout.strip()
return None
except:
return None
def check_fio_available():
"""Check if fio is available in PATH."""
return get_fio_path() is not None
def get_device_size(device):
"""Get the size of the block device in bytes."""
try:
result = subprocess.run(['sudo', 'blockdev', '--getsize64', device],
capture_output=True, text=True, check=True)
return int(result.stdout.strip())
except subprocess.CalledProcessError as e:
print(f"Error getting device size: {e}")
return None
def run_fio_test(device, offset, size_bytes):
"""Run a single fio test at the specified offset."""
# Create a temporary fio job file
job_file = f"""
[global]
name=nvme_benchmark
rw=read
bs=1M
size={size_bytes}
offset={offset}
filename={device}
direct=1
sync=1
time_based=0
runtime=0
numjobs=1
[job1]
name=test_{offset}
"""
with open('/tmp/fio_job.fio', 'w') as f:
f.write(job_file)
try:
# Run fio with JSON output for easy parsing
result = subprocess.run([
'sudo', 'fio', '--output-format=json', '/tmp/fio_job.fio'
], capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
# Extract relevant metrics
job = data['jobs'][0]
read_bw = job['read']['bw'] # Bandwidth in KB/s
read_lat = job['read']['lat_ns']['mean'] / 1000000 # Convert to milliseconds
return {
'offset': offset,
'bandwidth_kbps': read_bw,
'bandwidth_mbps': read_bw / 1024,
'bandwidth_gbps': read_bw / (1024 * 1024),
'latency_ms': read_lat
}
except subprocess.CalledProcessError as e:
print(f"Fio test failed at offset {offset}: {e}")
return None
except json.JSONDecodeError as e:
print(f"Failed to parse fio output: {e}")
return None
finally:
# Clean up temp file
if os.path.exists('/tmp/fio_job.fio'):
os.remove('/tmp/fio_job.fio')
def create_ascii_plot(data, width=80, height=20):
"""Create a simple ASCII plot of the bandwidth data."""
if not data:
return "No data to plot"
# Extract bandwidth values
offsets = [d['offset'] / (1024**3) for d in data] # Convert to GiB
speeds = [d['bandwidth_mbps'] for d in data]
min_speed = min(speeds)
max_speed = max(speeds)
speed_range = max_speed - min_speed or 1
# Create the plot
plot_lines = []
for y in range(height, 0, -1):
line = f"{min_speed + (y-1) * speed_range / (height-1):6.1f} |"
for x in range(width):
# Map x position to data index
data_idx = int(x * len(speeds) / width)
if data_idx < len(speeds):
normalized_speed = (speeds[data_idx] - min_speed) / speed_range
if normalized_speed >= (y-1) / (height-1):
line += '█'
else:
line += ' '
else:
line += ' '
plot_lines.append(line)
# Add x-axis labels
x_labels = []
for i in range(0, width, width//4):
data_idx = int(i * len(offsets) / width)
if data_idx < len(offsets):
x_labels.append(f"{offsets[data_idx]:6.1f}")
plot_str = '\n'.join(plot_lines)
plot_str += '\n +' + '-' * width + '+'
plot_str += '\n ' + ' '.join(f"{x:>6}" for x in x_labels)
plot_str += '\n GiB offset from start of drive'
return plot_str
def plot_results(csv_file, show_plot=True):
"""Plot NVMe benchmark results from CSV file."""
if not HAS_PLOTTING:
print("Error: matplotlib and pandas are required for plotting")
print("Install with: uv run --with matplotlib --with pandas nvme_benchmark.py plot ...")
return 1
try:
# Read the CSV data
df = pd.read_csv(csv_file)
# Create a figure with subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# Plot bandwidth
ax1.plot(df['offset_gib'], df['bandwidth_mbps'], 'b-', linewidth=2, marker='o', markersize=3)
ax1.set_title('NVMe Drive Read Speed Across Device', fontsize=14, fontweight='bold')
ax1.set_ylabel('Read Speed (MB/s)', fontsize=12)
ax1.grid(True, alpha=0.3)
ax1.set_xlabel('Offset (GiB)', fontsize=12)
# Add some statistics as text
avg_speed = df['bandwidth_mbps'].mean()
min_speed = df['bandwidth_mbps'].min()
max_speed = df['bandwidth_mbps'].max()
ax1.axhline(y=avg_speed, color='r', linestyle='--', alpha=0.7, label='.1f')
ax1.legend()
# Plot latency
ax2.plot(df['offset_gib'], df['latency_ms'], 'r-', linewidth=2, marker='s', markersize=3)
ax2.set_title('NVMe Drive Read Latency Across Device', fontsize=14, fontweight='bold')
ax2.set_ylabel('Latency (ms)', fontsize=12)
ax2.set_xlabel('Offset (GiB)', fontsize=12)
ax2.grid(True, alpha=0.3)
# Add latency statistics
avg_lat = df['latency_ms'].mean()
ax2.axhline(y=avg_lat, color='b', linestyle='--', alpha=0.7, label='.2f')
ax2.legend()
plt.tight_layout()
plt.savefig(f"{csv_file.replace('.csv', '')}_plot.png", dpi=300, bbox_inches='tight')
if show_plot:
plt.show()
else:
plt.close()
# Print summary statistics
print("\nSummary Statistics:")
print(f"Average Speed: {avg_speed:.1f} MB/s")
print(f"Min Speed: {min_speed:.1f} MB/s")
print(f"Max Speed: {max_speed:.1f} MB/s")
print(f"Speed Variation: {max_speed - min_speed:.1f} MB/s")
print(f"Average Latency: {avg_lat:.2f} ms")
# Check for degradation patterns
first_10pct = df['bandwidth_mbps'][:len(df)//10]
last_10pct = df['bandwidth_mbps'][-len(df)//10:]
if len(first_10pct) > 0 and len(last_10pct) > 0:
degradation = ((first_10pct.mean() - last_10pct.mean()) / first_10pct.mean()) * 100
print(".1f")
if abs(degradation) > 5:
print("⚠️ Significant degradation detected!")
else:
print("✅ Drive performance appears stable")
print(f"\nPlot saved as: {csv_file.replace('.csv', '')}_plot.png")
except FileNotFoundError:
print(f"Error: File '{csv_file}' not found")
return 1
except Exception as e:
print(f"Error plotting results: {e}")
return 1
return 0
def benchmark_command(args):
"""Run the benchmarking command."""
# Validate device exists
if not os.path.exists(args.device):
print(f"Error: Device {args.device} does not exist")
return 1
# Get device size
if args.test:
# Mock device size for testing
device_size = int(894.3 * 1024**3) # Mock 894.3 GiB drive
print(f"Using mock device size for testing ({device_size / (1024**3):.1f} GiB)")
else:
device_size = get_device_size(args.device)
if device_size is None:
print("Error: Could not determine device size")
return 1
device_size_gib = device_size / (1024**3)
print(f"NVMe Drive: {args.device} (Size: {device_size_gib:.1f} GiB)")
# Get fio path (needed for sudo calls which don't preserve PATH)
# Only needed if not in test mode
fio_path = None
if not args.test:
fio_path = get_fio_path()
if fio_path is None:
print("Error: fio not found in PATH")
return 1
# Calculate segment size and offsets
segment_size_bytes = args.read_size * (1024**2) # Convert MiB to bytes
# Adjust segments if needed to fit within drive size
max_segments = device_size // segment_size_bytes
if args.segments > max_segments:
actual_segments = max_segments
print(f"Warning: Reducing segments from {args.segments} to {actual_segments} to fit drive size")
args.segments = actual_segments
if args.segments == 0:
print(f"Error: Read size ({args.read_size} MiB) is larger than drive size ({device_size_gib:.1f} GiB)")
return 1
# Generate output filename if not specified
if args.output is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
args.output = f"nvme_benchmark_{timestamp}.csv"
print(f"Testing {args.segments} segments of {args.read_size} MiB each across {device_size_gib:.1f} GiB drive")
print(f"Output file: {args.output}")
print("This may take a while depending on drive speed and number of segments...")
print()
# Prepare CSV file
with open(args.output, 'w', newline='') as csvfile:
fieldnames = ['offset_gib', 'bandwidth_kbps', 'bandwidth_mbps', 'bandwidth_gbps', 'latency_ms']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
results = []
# Space segments evenly across the drive, ensuring each read fits
if args.segments == 1:
segment_spacing = 0
else:
segment_spacing = (device_size - segment_size_bytes) // (args.segments - 1)
for i in range(args.segments):
offset = i * segment_spacing
# Ensure we don't exceed device size (shouldn't happen with new logic, but safety check)
if offset + segment_size_bytes > device_size:
print(f"Stopping at segment {i+1}: offset ({offset / (1024**3):.1f} GiB) + read_size ({segment_size_bytes / (1024**2):.1f} MiB) exceeds device size ({device_size / (1024**3):.1f} GiB)")
break
print(f"Testing segment {i+1}/{args.segments} (offset: {offset} bytes, {offset / (1024**3):.1f} GiB)")
if args.test:
# Test mode: simulate benchmark results
import random
mock_bw = random.randint(500000, 800000) # KB/s
mock_lat = random.uniform(0.1, 0.5) # ms
data = {
'offset': offset,
'bandwidth_kbps': mock_bw,
'bandwidth_mbps': mock_bw / 1024,
'bandwidth_gbps': mock_bw / (1024 * 1024),
'latency_ms': mock_lat
}
results.append(data)
else:
# Run the test using fio
# Write Python script to temp file to avoid shell escaping issues
# Check if we're already root to avoid unnecessary sudo
# Use absolute path to fio for sudo (sudo doesn't preserve PATH)
is_root = os.geteuid() == 0
if i == 0: # Print once at the start
print(f" Running as root: {is_root}, euid: {os.geteuid()}, fio: {fio_path}")
if is_root:
fio_cmd_str = f'["{fio_path}"]'
else:
fio_cmd_str = f'["sudo", "{fio_path}"]'
python_script = f'''import subprocess, json, sys, os, re
job_file = """
[global]
name=nvme_benchmark
rw=read
bs=1M
size={segment_size_bytes}
offset={offset}
filename={args.device}
direct=0
sync=0
time_based=0
runtime=0
numjobs=1
[job1]
name=test_{offset}
"""
with open("/tmp/fio_job.fio", "w") as f:
f.write(job_file)
# Only use sudo if not already root
fio_cmd = {fio_cmd_str}
full_cmd = fio_cmd + ["--output-format=json", "/tmp/fio_job.fio"]
result = subprocess.run(full_cmd, capture_output=True, text=True)
# Extract JSON from stdout (fio may write error messages before JSON)
stdout = result.stdout
# Find the fio JSON output - it should contain "fio version"
# Use brace counting to find the complete JSON object
json_str = None
# Find where "fio version" appears (start of fio JSON)
fio_start = stdout.find('"fio version"')
if fio_start == -1:
fio_start = stdout.find(chr(123)) # chr(123) is '{{'
if fio_start != -1:
# Find the opening brace before "fio version"
brace_start = stdout.rfind(chr(123), 0, fio_start) # chr(123) is '{{'
if brace_start == -1:
brace_start = fio_start - 1
# Now count braces to find the matching closing brace
brace_count = 0
open_brace = chr(123) # '{{'
close_brace = chr(125) # '}}'
for i in range(brace_start, len(stdout)):
if stdout[i] == open_brace:
brace_count += 1
elif stdout[i] == close_brace:
brace_count -= 1
if brace_count == 0:
json_str = stdout[brace_start:i+1]
break
if json_str:
# Validate it looks like fio JSON before parsing
if '"fio version"' not in json_str and '"jobs"' not in json_str:
sys.stderr.write(f"FIO ERROR: Extracted string doesn't look like fio JSON\\n")
sys.stderr.write(f"Extracted (first 200): {{json_str[:200]}}\\n")
sys.stderr.write(f"Full stdout (first 1000): {{stdout[:1000]}}\\n")
sys.exit(1)
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
sys.stderr.write(f"JSON parse error: {{e}}\\n")
sys.stderr.write(f"JSON string (first 500 chars): {{json_str[:500]}}\\n")
sys.stderr.write(f"JSON string (last 500 chars): {{json_str[-500:]}}\\n")
sys.stderr.write(f"Full stdout length: {{len(stdout)}}\\n")
sys.exit(1)
else:
# No JSON found, fio must have failed completely
sys.stderr.write(f"FIO ERROR: No JSON output found\\n")
sys.stderr.write(f"FIO STDOUT (first 1000): {{stdout[:1000]}}\\n")
sys.stderr.write(f"FIO STDOUT (last 1000): {{stdout[-1000:]}}\\n")
sys.stderr.write(f"FIO STDERR: {{result.stderr[:1000] if result.stderr else 'None'}}\\n")
sys.exit(1)
# Check if job succeeded
if "jobs" not in data or len(data["jobs"]) == 0:
sys.stderr.write(f"FIO ERROR: No job results in JSON\\n")
sys.stderr.write(f"JSON: {{json.dumps(data, indent=2)[:1000]}}\\n")
sys.exit(1)
job = data["jobs"][0]
# Check for errors in job
if "error" in job and job["error"] != 0:
sys.stderr.write(f"FIO ERROR: Job error code {{job['error']}}\\n")
if "error" in job.get("read", {{}}):
sys.stderr.write(f"Read error: {{job['read']['error']}}\\n")
sys.exit(1)
# Extract metrics
if "read" not in job:
sys.stderr.write(f"FIO ERROR: No read results in job\\n")
sys.exit(1)
read_bw = job["read"]["bw"]
read_lat = job["read"]["lat_ns"]["mean"] / 1000000
offset_val = {offset}
# Output JSON to stdout (errors already went to stderr)
result_json = {{'offset': offset_val, 'bandwidth_kbps': read_bw, 'bandwidth_mbps': read_bw / 1024, 'bandwidth_gbps': read_bw / (1024 * 1024), 'latency_ms': read_lat}}
print(json.dumps(result_json))
sys.stdout.flush()
'''
# Write script to temp file
script_file = f'/tmp/fio_bench_{offset}.py'
try:
with open(script_file, 'w') as f:
f.write(python_script)
# Verify script file was written
if not os.path.exists(script_file):
print(f" Error: Failed to create script file {script_file}")
continue
# fio is guaranteed to be in PATH (checked at startup)
cmd = ['python3', script_file]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Parse the JSON output from the inner python command
# The inner script should only output valid JSON to stdout
data_str = result.stdout.strip()
if data_str:
try:
data = json.loads(data_str)
except json.JSONDecodeError as e:
print(f" Failed to parse JSON output at offset {offset} bytes ({offset / (1024**3):.1f} GiB): {e}")
print(f" Output (first 500 chars): {data_str[:500]}")
print(f" Output (last 500 chars): {data_str[-500:]}")
if result.stderr:
print(f" Inner script stderr: {result.stderr[:1000]}")
continue
results.append(data)
else:
print(f" Warning: No output from fio test at offset {offset} bytes")
if result.stderr:
print(f" Inner script stderr: {result.stderr[:1000]}")
continue
writer.writerow({
'offset_gib': data['offset'] / (1024**3),
'bandwidth_kbps': data['bandwidth_kbps'],
'bandwidth_mbps': data['bandwidth_mbps'],
'bandwidth_gbps': data['bandwidth_gbps'],
'latency_ms': data['latency_ms']
})
except subprocess.CalledProcessError as e:
# Show actual error output for debugging
error_msg = e.stderr.strip() if e.stderr else "No error output"
stdout_msg = e.stdout.strip() if e.stdout else "No stdout"
print(f" Failed at offset {offset} bytes ({offset / (1024**3):.1f} GiB)")
if error_msg:
# Show full error message (not truncated)
print(f" stderr: {error_msg}")
if stdout_msg and stdout_msg != error_msg:
print(f" stdout: {stdout_msg}")
# Check if we've exceeded device size
if offset + segment_size_bytes > device_size:
print(f" Stopping: offset + read_size ({offset + segment_size_bytes} bytes) exceeds device size ({device_size} bytes)")
break
continue
except json.JSONDecodeError as e:
print(f" Failed to parse JSON output: {e}")
continue
finally:
# Clean up temp script file
if os.path.exists(script_file):
os.remove(script_file)
print(f"\nBenchmark complete! Results saved to {args.output}")
# Generate simple statistics
if results:
speeds = [r['bandwidth_mbps'] for r in results]
latencies = [r['latency_ms'] for r in results]
print("\nSummary Statistics:")
print(f"Average Speed: {sum(speeds)/len(speeds):.1f} MB/s")
print(f"Min Speed: {min(speeds):.1f} MB/s")
print(f"Max Speed: {max(speeds):.1f} MB/s")
print(f"Speed Variation: {max(speeds) - min(speeds):.1f} MB/s")
print(f"Average Latency: {sum(latencies)/len(latencies):.2f} ms")
# Check for degradation (compare first 10% vs last 10%)
first_10pct = speeds[:len(speeds)//10]
last_10pct = speeds[-len(speeds)//10:]
if first_10pct and last_10pct:
avg_first = sum(first_10pct) / len(first_10pct)
avg_last = sum(last_10pct) / len(last_10pct)
degradation = ((avg_first - avg_last) / avg_first) * 100
print(f"Degradation: {degradation:+.1f}% (end vs beginning)")
if abs(degradation) > 5:
print(" ⚠️ Significant performance variation detected!")
else:
print(" ✅ Drive performance appears consistent")
# Generate ASCII plot if requested
if args.plot and results:
print("\nBandwidth Plot (MB/s across drive):")
print(create_ascii_plot(results))
# Generate matplotlib plot if requested
if args.matplotlib and results:
print("\nGenerating matplotlib visualization...")
plot_results(args.output, show_plot=False)
return 0
def plot_command(args):
"""Run the plotting command."""
return plot_results(args.input, show_plot=not args.no_show)
def main():
parser = argparse.ArgumentParser(
description='NVMe Drive Degradation Benchmark Tool',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
uv run nvme_benchmark.py benchmark --device /dev/nvme0n1 --segments 500
uv run nvme_benchmark.py benchmark --plot --matplotlib
uv run nvme_benchmark.py plot results.csv
uv run nvme_benchmark.py plot results.csv --no-show
# Or run directly with dependencies installed:
./nvme_benchmark.py benchmark --segments 100 --plot
"""
)
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Benchmark subcommand
benchmark_parser = subparsers.add_parser('benchmark', help='Run performance benchmark on NVMe drive')
benchmark_parser.add_argument('--device', default='/dev/nvme0n1',
help='NVMe device to test (default: /dev/nvme0n1)')
benchmark_parser.add_argument('--segments', type=int, default=100,
help='Number of segments to test (default: 1000)')
benchmark_parser.add_argument('--read-size', type=int, default=128,
help='Size to read at each offset in MiB (default: 128)')
benchmark_parser.add_argument('--output', default=None,
help='Output CSV file (default: auto-generated)')
benchmark_parser.add_argument('--plot', action='store_true',
help='Generate ASCII plot')
benchmark_parser.add_argument('--matplotlib', action='store_true',
help='Generate matplotlib plot (saves PNG file)')
benchmark_parser.add_argument('--test', action='store_true',
help='Test mode - simulate benchmark without running fio')
# Plot subcommand
plot_parser = subparsers.add_parser('plot', help='Create visualizations from benchmark results')
plot_parser.add_argument('input', help='Input CSV file from benchmark')
plot_parser.add_argument('--no-show', action='store_true',
help='Save plot but don\'t display it')
args = parser.parse_args()
# For benchmark command, ensure fio is available
if args.command == 'benchmark' and not args.test:
if not check_fio_available():
print("Error: fio not found in PATH.")
print("\nTo fix this, install fio:")
print(" - Nix: nix profile install nixpkgs#fio")
print(" - Arch: sudo pacman -S fio")
print(" - Debian/Ubuntu: sudo apt install fio")
print(" - Or use your system's package manager")
return 1
if args.command == 'benchmark':
return benchmark_command(args)
elif args.command == 'plot':
return plot_command(args)
else:
parser.print_help()
return 1
if __name__ == '__main__':
exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment