Last active
January 18, 2026 23:00
-
-
Save kylemanna/adcf850ee792042574dfd260028b8d94 to your computer and use it in GitHub Desktop.
NVMe tool to track degradation of read performance for old/stale data on NVMe drives
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.8" | |
| # dependencies = [ | |
| # "matplotlib", | |
| # "pandas", | |
| # ] | |
| # /// | |
| """ | |
| NVMe Drive Degradation Benchmark Tool | |
| This tool provides comprehensive NVMe drive performance analysis including benchmarking | |
| and visualization of results to detect degradation patterns. | |
| Usage: uv run nvme_benchmark.py <command> [options] | |
| Note: Run with 'uv run nvme_benchmark.py' to automatically install dependencies. | |
| Commands: | |
| benchmark Run performance benchmark on NVMe drive | |
| plot Create visualizations from benchmark results | |
| Run 'uv run nvme_benchmark.py benchmark --help' for benchmark options | |
| Run 'uv run nvme_benchmark.py plot --help' for plotting options | |
| """ | |
| import argparse | |
| import subprocess | |
| import json | |
| import csv | |
| import time | |
| import os | |
| import sys | |
| from datetime import datetime | |
| try: | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| HAS_PLOTTING = True | |
| except ImportError: | |
| HAS_PLOTTING = False | |
| def get_fio_path(): | |
| """Get the full path to fio executable.""" | |
| try: | |
| result = subprocess.run(['which', 'fio'], capture_output=True, text=True) | |
| if result.returncode == 0: | |
| return result.stdout.strip() | |
| return None | |
| except: | |
| return None | |
| def check_fio_available(): | |
| """Check if fio is available in PATH.""" | |
| return get_fio_path() is not None | |
| def get_device_size(device): | |
| """Get the size of the block device in bytes.""" | |
| try: | |
| result = subprocess.run(['sudo', 'blockdev', '--getsize64', device], | |
| capture_output=True, text=True, check=True) | |
| return int(result.stdout.strip()) | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error getting device size: {e}") | |
| return None | |
| def run_fio_test(device, offset, size_bytes): | |
| """Run a single fio test at the specified offset.""" | |
| # Create a temporary fio job file | |
| job_file = f""" | |
| [global] | |
| name=nvme_benchmark | |
| rw=read | |
| bs=1M | |
| size={size_bytes} | |
| offset={offset} | |
| filename={device} | |
| direct=1 | |
| sync=1 | |
| time_based=0 | |
| runtime=0 | |
| numjobs=1 | |
| [job1] | |
| name=test_{offset} | |
| """ | |
| with open('/tmp/fio_job.fio', 'w') as f: | |
| f.write(job_file) | |
| try: | |
| # Run fio with JSON output for easy parsing | |
| result = subprocess.run([ | |
| 'sudo', 'fio', '--output-format=json', '/tmp/fio_job.fio' | |
| ], capture_output=True, text=True, check=True) | |
| data = json.loads(result.stdout) | |
| # Extract relevant metrics | |
| job = data['jobs'][0] | |
| read_bw = job['read']['bw'] # Bandwidth in KB/s | |
| read_lat = job['read']['lat_ns']['mean'] / 1000000 # Convert to milliseconds | |
| return { | |
| 'offset': offset, | |
| 'bandwidth_kbps': read_bw, | |
| 'bandwidth_mbps': read_bw / 1024, | |
| 'bandwidth_gbps': read_bw / (1024 * 1024), | |
| 'latency_ms': read_lat | |
| } | |
| except subprocess.CalledProcessError as e: | |
| print(f"Fio test failed at offset {offset}: {e}") | |
| return None | |
| except json.JSONDecodeError as e: | |
| print(f"Failed to parse fio output: {e}") | |
| return None | |
| finally: | |
| # Clean up temp file | |
| if os.path.exists('/tmp/fio_job.fio'): | |
| os.remove('/tmp/fio_job.fio') | |
| def create_ascii_plot(data, width=80, height=20): | |
| """Create a simple ASCII plot of the bandwidth data.""" | |
| if not data: | |
| return "No data to plot" | |
| # Extract bandwidth values | |
| offsets = [d['offset'] / (1024**3) for d in data] # Convert to GiB | |
| speeds = [d['bandwidth_mbps'] for d in data] | |
| min_speed = min(speeds) | |
| max_speed = max(speeds) | |
| speed_range = max_speed - min_speed or 1 | |
| # Create the plot | |
| plot_lines = [] | |
| for y in range(height, 0, -1): | |
| line = f"{min_speed + (y-1) * speed_range / (height-1):6.1f} |" | |
| for x in range(width): | |
| # Map x position to data index | |
| data_idx = int(x * len(speeds) / width) | |
| if data_idx < len(speeds): | |
| normalized_speed = (speeds[data_idx] - min_speed) / speed_range | |
| if normalized_speed >= (y-1) / (height-1): | |
| line += '█' | |
| else: | |
| line += ' ' | |
| else: | |
| line += ' ' | |
| plot_lines.append(line) | |
| # Add x-axis labels | |
| x_labels = [] | |
| for i in range(0, width, width//4): | |
| data_idx = int(i * len(offsets) / width) | |
| if data_idx < len(offsets): | |
| x_labels.append(f"{offsets[data_idx]:6.1f}") | |
| plot_str = '\n'.join(plot_lines) | |
| plot_str += '\n +' + '-' * width + '+' | |
| plot_str += '\n ' + ' '.join(f"{x:>6}" for x in x_labels) | |
| plot_str += '\n GiB offset from start of drive' | |
| return plot_str | |
| def plot_results(csv_file, show_plot=True): | |
| """Plot NVMe benchmark results from CSV file.""" | |
| if not HAS_PLOTTING: | |
| print("Error: matplotlib and pandas are required for plotting") | |
| print("Install with: uv run --with matplotlib --with pandas nvme_benchmark.py plot ...") | |
| return 1 | |
| try: | |
| # Read the CSV data | |
| df = pd.read_csv(csv_file) | |
| # Create a figure with subplots | |
| fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8)) | |
| # Plot bandwidth | |
| ax1.plot(df['offset_gib'], df['bandwidth_mbps'], 'b-', linewidth=2, marker='o', markersize=3) | |
| ax1.set_title('NVMe Drive Read Speed Across Device', fontsize=14, fontweight='bold') | |
| ax1.set_ylabel('Read Speed (MB/s)', fontsize=12) | |
| ax1.grid(True, alpha=0.3) | |
| ax1.set_xlabel('Offset (GiB)', fontsize=12) | |
| # Add some statistics as text | |
| avg_speed = df['bandwidth_mbps'].mean() | |
| min_speed = df['bandwidth_mbps'].min() | |
| max_speed = df['bandwidth_mbps'].max() | |
| ax1.axhline(y=avg_speed, color='r', linestyle='--', alpha=0.7, label='.1f') | |
| ax1.legend() | |
| # Plot latency | |
| ax2.plot(df['offset_gib'], df['latency_ms'], 'r-', linewidth=2, marker='s', markersize=3) | |
| ax2.set_title('NVMe Drive Read Latency Across Device', fontsize=14, fontweight='bold') | |
| ax2.set_ylabel('Latency (ms)', fontsize=12) | |
| ax2.set_xlabel('Offset (GiB)', fontsize=12) | |
| ax2.grid(True, alpha=0.3) | |
| # Add latency statistics | |
| avg_lat = df['latency_ms'].mean() | |
| ax2.axhline(y=avg_lat, color='b', linestyle='--', alpha=0.7, label='.2f') | |
| ax2.legend() | |
| plt.tight_layout() | |
| plt.savefig(f"{csv_file.replace('.csv', '')}_plot.png", dpi=300, bbox_inches='tight') | |
| if show_plot: | |
| plt.show() | |
| else: | |
| plt.close() | |
| # Print summary statistics | |
| print("\nSummary Statistics:") | |
| print(f"Average Speed: {avg_speed:.1f} MB/s") | |
| print(f"Min Speed: {min_speed:.1f} MB/s") | |
| print(f"Max Speed: {max_speed:.1f} MB/s") | |
| print(f"Speed Variation: {max_speed - min_speed:.1f} MB/s") | |
| print(f"Average Latency: {avg_lat:.2f} ms") | |
| # Check for degradation patterns | |
| first_10pct = df['bandwidth_mbps'][:len(df)//10] | |
| last_10pct = df['bandwidth_mbps'][-len(df)//10:] | |
| if len(first_10pct) > 0 and len(last_10pct) > 0: | |
| degradation = ((first_10pct.mean() - last_10pct.mean()) / first_10pct.mean()) * 100 | |
| print(".1f") | |
| if abs(degradation) > 5: | |
| print("⚠️ Significant degradation detected!") | |
| else: | |
| print("✅ Drive performance appears stable") | |
| print(f"\nPlot saved as: {csv_file.replace('.csv', '')}_plot.png") | |
| except FileNotFoundError: | |
| print(f"Error: File '{csv_file}' not found") | |
| return 1 | |
| except Exception as e: | |
| print(f"Error plotting results: {e}") | |
| return 1 | |
| return 0 | |
| def benchmark_command(args): | |
| """Run the benchmarking command.""" | |
| # Validate device exists | |
| if not os.path.exists(args.device): | |
| print(f"Error: Device {args.device} does not exist") | |
| return 1 | |
| # Get device size | |
| if args.test: | |
| # Mock device size for testing | |
| device_size = int(894.3 * 1024**3) # Mock 894.3 GiB drive | |
| print(f"Using mock device size for testing ({device_size / (1024**3):.1f} GiB)") | |
| else: | |
| device_size = get_device_size(args.device) | |
| if device_size is None: | |
| print("Error: Could not determine device size") | |
| return 1 | |
| device_size_gib = device_size / (1024**3) | |
| print(f"NVMe Drive: {args.device} (Size: {device_size_gib:.1f} GiB)") | |
| # Get fio path (needed for sudo calls which don't preserve PATH) | |
| # Only needed if not in test mode | |
| fio_path = None | |
| if not args.test: | |
| fio_path = get_fio_path() | |
| if fio_path is None: | |
| print("Error: fio not found in PATH") | |
| return 1 | |
| # Calculate segment size and offsets | |
| segment_size_bytes = args.read_size * (1024**2) # Convert MiB to bytes | |
| # Adjust segments if needed to fit within drive size | |
| max_segments = device_size // segment_size_bytes | |
| if args.segments > max_segments: | |
| actual_segments = max_segments | |
| print(f"Warning: Reducing segments from {args.segments} to {actual_segments} to fit drive size") | |
| args.segments = actual_segments | |
| if args.segments == 0: | |
| print(f"Error: Read size ({args.read_size} MiB) is larger than drive size ({device_size_gib:.1f} GiB)") | |
| return 1 | |
| # Generate output filename if not specified | |
| if args.output is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| args.output = f"nvme_benchmark_{timestamp}.csv" | |
| print(f"Testing {args.segments} segments of {args.read_size} MiB each across {device_size_gib:.1f} GiB drive") | |
| print(f"Output file: {args.output}") | |
| print("This may take a while depending on drive speed and number of segments...") | |
| print() | |
| # Prepare CSV file | |
| with open(args.output, 'w', newline='') as csvfile: | |
| fieldnames = ['offset_gib', 'bandwidth_kbps', 'bandwidth_mbps', 'bandwidth_gbps', 'latency_ms'] | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| results = [] | |
| # Space segments evenly across the drive, ensuring each read fits | |
| if args.segments == 1: | |
| segment_spacing = 0 | |
| else: | |
| segment_spacing = (device_size - segment_size_bytes) // (args.segments - 1) | |
| for i in range(args.segments): | |
| offset = i * segment_spacing | |
| # Ensure we don't exceed device size (shouldn't happen with new logic, but safety check) | |
| if offset + segment_size_bytes > device_size: | |
| print(f"Stopping at segment {i+1}: offset ({offset / (1024**3):.1f} GiB) + read_size ({segment_size_bytes / (1024**2):.1f} MiB) exceeds device size ({device_size / (1024**3):.1f} GiB)") | |
| break | |
| print(f"Testing segment {i+1}/{args.segments} (offset: {offset} bytes, {offset / (1024**3):.1f} GiB)") | |
| if args.test: | |
| # Test mode: simulate benchmark results | |
| import random | |
| mock_bw = random.randint(500000, 800000) # KB/s | |
| mock_lat = random.uniform(0.1, 0.5) # ms | |
| data = { | |
| 'offset': offset, | |
| 'bandwidth_kbps': mock_bw, | |
| 'bandwidth_mbps': mock_bw / 1024, | |
| 'bandwidth_gbps': mock_bw / (1024 * 1024), | |
| 'latency_ms': mock_lat | |
| } | |
| results.append(data) | |
| else: | |
| # Run the test using fio | |
| # Write Python script to temp file to avoid shell escaping issues | |
| # Check if we're already root to avoid unnecessary sudo | |
| # Use absolute path to fio for sudo (sudo doesn't preserve PATH) | |
| is_root = os.geteuid() == 0 | |
| if i == 0: # Print once at the start | |
| print(f" Running as root: {is_root}, euid: {os.geteuid()}, fio: {fio_path}") | |
| if is_root: | |
| fio_cmd_str = f'["{fio_path}"]' | |
| else: | |
| fio_cmd_str = f'["sudo", "{fio_path}"]' | |
| python_script = f'''import subprocess, json, sys, os, re | |
| job_file = """ | |
| [global] | |
| name=nvme_benchmark | |
| rw=read | |
| bs=1M | |
| size={segment_size_bytes} | |
| offset={offset} | |
| filename={args.device} | |
| direct=0 | |
| sync=0 | |
| time_based=0 | |
| runtime=0 | |
| numjobs=1 | |
| [job1] | |
| name=test_{offset} | |
| """ | |
| with open("/tmp/fio_job.fio", "w") as f: | |
| f.write(job_file) | |
| # Only use sudo if not already root | |
| fio_cmd = {fio_cmd_str} | |
| full_cmd = fio_cmd + ["--output-format=json", "/tmp/fio_job.fio"] | |
| result = subprocess.run(full_cmd, capture_output=True, text=True) | |
| # Extract JSON from stdout (fio may write error messages before JSON) | |
| stdout = result.stdout | |
| # Find the fio JSON output - it should contain "fio version" | |
| # Use brace counting to find the complete JSON object | |
| json_str = None | |
| # Find where "fio version" appears (start of fio JSON) | |
| fio_start = stdout.find('"fio version"') | |
| if fio_start == -1: | |
| fio_start = stdout.find(chr(123)) # chr(123) is '{{' | |
| if fio_start != -1: | |
| # Find the opening brace before "fio version" | |
| brace_start = stdout.rfind(chr(123), 0, fio_start) # chr(123) is '{{' | |
| if brace_start == -1: | |
| brace_start = fio_start - 1 | |
| # Now count braces to find the matching closing brace | |
| brace_count = 0 | |
| open_brace = chr(123) # '{{' | |
| close_brace = chr(125) # '}}' | |
| for i in range(brace_start, len(stdout)): | |
| if stdout[i] == open_brace: | |
| brace_count += 1 | |
| elif stdout[i] == close_brace: | |
| brace_count -= 1 | |
| if brace_count == 0: | |
| json_str = stdout[brace_start:i+1] | |
| break | |
| if json_str: | |
| # Validate it looks like fio JSON before parsing | |
| if '"fio version"' not in json_str and '"jobs"' not in json_str: | |
| sys.stderr.write(f"FIO ERROR: Extracted string doesn't look like fio JSON\\n") | |
| sys.stderr.write(f"Extracted (first 200): {{json_str[:200]}}\\n") | |
| sys.stderr.write(f"Full stdout (first 1000): {{stdout[:1000]}}\\n") | |
| sys.exit(1) | |
| try: | |
| data = json.loads(json_str) | |
| except json.JSONDecodeError as e: | |
| sys.stderr.write(f"JSON parse error: {{e}}\\n") | |
| sys.stderr.write(f"JSON string (first 500 chars): {{json_str[:500]}}\\n") | |
| sys.stderr.write(f"JSON string (last 500 chars): {{json_str[-500:]}}\\n") | |
| sys.stderr.write(f"Full stdout length: {{len(stdout)}}\\n") | |
| sys.exit(1) | |
| else: | |
| # No JSON found, fio must have failed completely | |
| sys.stderr.write(f"FIO ERROR: No JSON output found\\n") | |
| sys.stderr.write(f"FIO STDOUT (first 1000): {{stdout[:1000]}}\\n") | |
| sys.stderr.write(f"FIO STDOUT (last 1000): {{stdout[-1000:]}}\\n") | |
| sys.stderr.write(f"FIO STDERR: {{result.stderr[:1000] if result.stderr else 'None'}}\\n") | |
| sys.exit(1) | |
| # Check if job succeeded | |
| if "jobs" not in data or len(data["jobs"]) == 0: | |
| sys.stderr.write(f"FIO ERROR: No job results in JSON\\n") | |
| sys.stderr.write(f"JSON: {{json.dumps(data, indent=2)[:1000]}}\\n") | |
| sys.exit(1) | |
| job = data["jobs"][0] | |
| # Check for errors in job | |
| if "error" in job and job["error"] != 0: | |
| sys.stderr.write(f"FIO ERROR: Job error code {{job['error']}}\\n") | |
| if "error" in job.get("read", {{}}): | |
| sys.stderr.write(f"Read error: {{job['read']['error']}}\\n") | |
| sys.exit(1) | |
| # Extract metrics | |
| if "read" not in job: | |
| sys.stderr.write(f"FIO ERROR: No read results in job\\n") | |
| sys.exit(1) | |
| read_bw = job["read"]["bw"] | |
| read_lat = job["read"]["lat_ns"]["mean"] / 1000000 | |
| offset_val = {offset} | |
| # Output JSON to stdout (errors already went to stderr) | |
| result_json = {{'offset': offset_val, 'bandwidth_kbps': read_bw, 'bandwidth_mbps': read_bw / 1024, 'bandwidth_gbps': read_bw / (1024 * 1024), 'latency_ms': read_lat}} | |
| print(json.dumps(result_json)) | |
| sys.stdout.flush() | |
| ''' | |
| # Write script to temp file | |
| script_file = f'/tmp/fio_bench_{offset}.py' | |
| try: | |
| with open(script_file, 'w') as f: | |
| f.write(python_script) | |
| # Verify script file was written | |
| if not os.path.exists(script_file): | |
| print(f" Error: Failed to create script file {script_file}") | |
| continue | |
| # fio is guaranteed to be in PATH (checked at startup) | |
| cmd = ['python3', script_file] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| # Parse the JSON output from the inner python command | |
| # The inner script should only output valid JSON to stdout | |
| data_str = result.stdout.strip() | |
| if data_str: | |
| try: | |
| data = json.loads(data_str) | |
| except json.JSONDecodeError as e: | |
| print(f" Failed to parse JSON output at offset {offset} bytes ({offset / (1024**3):.1f} GiB): {e}") | |
| print(f" Output (first 500 chars): {data_str[:500]}") | |
| print(f" Output (last 500 chars): {data_str[-500:]}") | |
| if result.stderr: | |
| print(f" Inner script stderr: {result.stderr[:1000]}") | |
| continue | |
| results.append(data) | |
| else: | |
| print(f" Warning: No output from fio test at offset {offset} bytes") | |
| if result.stderr: | |
| print(f" Inner script stderr: {result.stderr[:1000]}") | |
| continue | |
| writer.writerow({ | |
| 'offset_gib': data['offset'] / (1024**3), | |
| 'bandwidth_kbps': data['bandwidth_kbps'], | |
| 'bandwidth_mbps': data['bandwidth_mbps'], | |
| 'bandwidth_gbps': data['bandwidth_gbps'], | |
| 'latency_ms': data['latency_ms'] | |
| }) | |
| except subprocess.CalledProcessError as e: | |
| # Show actual error output for debugging | |
| error_msg = e.stderr.strip() if e.stderr else "No error output" | |
| stdout_msg = e.stdout.strip() if e.stdout else "No stdout" | |
| print(f" Failed at offset {offset} bytes ({offset / (1024**3):.1f} GiB)") | |
| if error_msg: | |
| # Show full error message (not truncated) | |
| print(f" stderr: {error_msg}") | |
| if stdout_msg and stdout_msg != error_msg: | |
| print(f" stdout: {stdout_msg}") | |
| # Check if we've exceeded device size | |
| if offset + segment_size_bytes > device_size: | |
| print(f" Stopping: offset + read_size ({offset + segment_size_bytes} bytes) exceeds device size ({device_size} bytes)") | |
| break | |
| continue | |
| except json.JSONDecodeError as e: | |
| print(f" Failed to parse JSON output: {e}") | |
| continue | |
| finally: | |
| # Clean up temp script file | |
| if os.path.exists(script_file): | |
| os.remove(script_file) | |
| print(f"\nBenchmark complete! Results saved to {args.output}") | |
| # Generate simple statistics | |
| if results: | |
| speeds = [r['bandwidth_mbps'] for r in results] | |
| latencies = [r['latency_ms'] for r in results] | |
| print("\nSummary Statistics:") | |
| print(f"Average Speed: {sum(speeds)/len(speeds):.1f} MB/s") | |
| print(f"Min Speed: {min(speeds):.1f} MB/s") | |
| print(f"Max Speed: {max(speeds):.1f} MB/s") | |
| print(f"Speed Variation: {max(speeds) - min(speeds):.1f} MB/s") | |
| print(f"Average Latency: {sum(latencies)/len(latencies):.2f} ms") | |
| # Check for degradation (compare first 10% vs last 10%) | |
| first_10pct = speeds[:len(speeds)//10] | |
| last_10pct = speeds[-len(speeds)//10:] | |
| if first_10pct and last_10pct: | |
| avg_first = sum(first_10pct) / len(first_10pct) | |
| avg_last = sum(last_10pct) / len(last_10pct) | |
| degradation = ((avg_first - avg_last) / avg_first) * 100 | |
| print(f"Degradation: {degradation:+.1f}% (end vs beginning)") | |
| if abs(degradation) > 5: | |
| print(" ⚠️ Significant performance variation detected!") | |
| else: | |
| print(" ✅ Drive performance appears consistent") | |
| # Generate ASCII plot if requested | |
| if args.plot and results: | |
| print("\nBandwidth Plot (MB/s across drive):") | |
| print(create_ascii_plot(results)) | |
| # Generate matplotlib plot if requested | |
| if args.matplotlib and results: | |
| print("\nGenerating matplotlib visualization...") | |
| plot_results(args.output, show_plot=False) | |
| return 0 | |
| def plot_command(args): | |
| """Run the plotting command.""" | |
| return plot_results(args.input, show_plot=not args.no_show) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='NVMe Drive Degradation Benchmark Tool', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| uv run nvme_benchmark.py benchmark --device /dev/nvme0n1 --segments 500 | |
| uv run nvme_benchmark.py benchmark --plot --matplotlib | |
| uv run nvme_benchmark.py plot results.csv | |
| uv run nvme_benchmark.py plot results.csv --no-show | |
| # Or run directly with dependencies installed: | |
| ./nvme_benchmark.py benchmark --segments 100 --plot | |
| """ | |
| ) | |
| subparsers = parser.add_subparsers(dest='command', help='Available commands') | |
| # Benchmark subcommand | |
| benchmark_parser = subparsers.add_parser('benchmark', help='Run performance benchmark on NVMe drive') | |
| benchmark_parser.add_argument('--device', default='/dev/nvme0n1', | |
| help='NVMe device to test (default: /dev/nvme0n1)') | |
| benchmark_parser.add_argument('--segments', type=int, default=100, | |
| help='Number of segments to test (default: 1000)') | |
| benchmark_parser.add_argument('--read-size', type=int, default=128, | |
| help='Size to read at each offset in MiB (default: 128)') | |
| benchmark_parser.add_argument('--output', default=None, | |
| help='Output CSV file (default: auto-generated)') | |
| benchmark_parser.add_argument('--plot', action='store_true', | |
| help='Generate ASCII plot') | |
| benchmark_parser.add_argument('--matplotlib', action='store_true', | |
| help='Generate matplotlib plot (saves PNG file)') | |
| benchmark_parser.add_argument('--test', action='store_true', | |
| help='Test mode - simulate benchmark without running fio') | |
| # Plot subcommand | |
| plot_parser = subparsers.add_parser('plot', help='Create visualizations from benchmark results') | |
| plot_parser.add_argument('input', help='Input CSV file from benchmark') | |
| plot_parser.add_argument('--no-show', action='store_true', | |
| help='Save plot but don\'t display it') | |
| args = parser.parse_args() | |
| # For benchmark command, ensure fio is available | |
| if args.command == 'benchmark' and not args.test: | |
| if not check_fio_available(): | |
| print("Error: fio not found in PATH.") | |
| print("\nTo fix this, install fio:") | |
| print(" - Nix: nix profile install nixpkgs#fio") | |
| print(" - Arch: sudo pacman -S fio") | |
| print(" - Debian/Ubuntu: sudo apt install fio") | |
| print(" - Or use your system's package manager") | |
| return 1 | |
| if args.command == 'benchmark': | |
| return benchmark_command(args) | |
| elif args.command == 'plot': | |
| return plot_command(args) | |
| else: | |
| parser.print_help() | |
| return 1 | |
| if __name__ == '__main__': | |
| exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment