Skip to content

Instantly share code, notes, and snippets.

@yuancu
Created January 13, 2026 09:05
Show Gist options
  • Select an option

  • Save yuancu/e722d937059c2aec5c1a04fa34ec8d59 to your computer and use it in GitHub Desktop.

Select an option

Save yuancu/e722d937059c2aec5c1a04fa34ec8d59 to your computer and use it in GitHub Desktop.
Profile PPL queries on big5, tpch, and clickbench
#!/usr/bin/env python3
"""
Profile PPL queries from test directories and report metrics separately by folder.
"""
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Dict, List
from collections import defaultdict
import requests
def sanitize_ppl(ppl: str) -> str:
"""
Sanitizes the PPL query by removing block comments and replacing new lines with spaces.
Matches the Java implementation in PPLIntegTestCase.sanitize()
Args:
ppl: the PPL query string
Returns:
the sanitized PPL query string
"""
# Remove block comments (/* ... */)
without_comments = re.sub(r'/\*.*?\*/', '', ppl, flags=re.DOTALL)
# Replace newlines with spaces
without_newlines = without_comments.replace('\r\n', ' ').replace('\n', ' ')
return without_newlines.strip()
def execute_profile_query(query: str, endpoint: str = "http://localhost:9200/_plugins/_ppl") -> Dict:
"""
Execute a PPL query with profiling enabled.
Args:
query: The PPL query string (already sanitized)
endpoint: The OpenSearch PPL endpoint
Returns:
Dictionary containing the response or error information
"""
headers = {
'Content-Type': 'application/json'
}
payload = {
"query": query,
"profile": True
}
try:
response = requests.post(endpoint, headers=headers, json=payload, timeout=300)
response.raise_for_status()
return {
"success": True,
"data": response.json(),
"status_code": response.status_code
}
except requests.exceptions.Timeout:
return {
"success": False,
"error": "Request timeout (300s)"
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"error": str(e),
"status_code": getattr(e.response, 'status_code', None) if hasattr(e, 'response') else None
}
except Exception as e:
return {
"success": False,
"error": f"Unexpected error: {str(e)}"
}
def find_ppl_files(base_path: str, folder_name: str) -> List[Path]:
"""
Find all .ppl files in a given folder.
Args:
base_path: Base directory path
folder_name: Name of the folder (clickbench, big5, or tpch)
Returns:
List of Path objects for .ppl files
"""
folder_path = Path(base_path) / folder_name
if not folder_path.exists():
print(f"Warning: Folder {folder_path} does not exist")
return []
ppl_files = sorted(folder_path.rglob("*.ppl"))
return ppl_files
def profile_folder(base_path: str, folder_name: str) -> Dict:
"""
Profile all queries in a specific folder.
Args:
base_path: Base directory path
folder_name: Name of the folder
Returns:
Dictionary with profiling results
"""
results = {
"folder": folder_name,
"total_queries": 0,
"successful_queries": 0,
"failed_queries": 0,
"queries": []
}
ppl_files = find_ppl_files(base_path, folder_name)
results["total_queries"] = len(ppl_files)
print(f"\n{'='*80}")
print(f"Profiling folder: {folder_name}")
print(f"Found {len(ppl_files)} PPL query files")
print(f"{'='*80}\n")
for ppl_file in ppl_files:
query_name = ppl_file.stem
print(f"Executing: {query_name} ... ", end="", flush=True)
# Read and sanitize the query
try:
with open(ppl_file, 'r', encoding='utf-8') as f:
raw_query = f.read()
sanitized_query = sanitize_ppl(raw_query)
# Execute with profiling
result = execute_profile_query(sanitized_query)
query_result = {
"name": query_name,
"file": str(ppl_file.relative_to(base_path)),
"success": result["success"]
}
if result["success"]:
profile_data = result["data"].get("profile", {})
if profile_data:
query_result["profile"] = profile_data
summary = profile_data.get("summary", {})
total_time = summary.get("total_time_ms", 0)
print(f"✓ ({total_time:.2f}ms)")
results["successful_queries"] += 1
else:
query_result["error"] = "No profile data in response"
print(f"✗ (No profile data)")
results["failed_queries"] += 1
else:
query_result["error"] = result.get("error", "Unknown error")
print(f"✗ ({query_result['error']})")
results["failed_queries"] += 1
results["queries"].append(query_result)
except Exception as e:
print(f"✗ (Error reading file: {str(e)})")
results["queries"].append({
"name": query_name,
"file": str(ppl_file.relative_to(base_path)),
"success": False,
"error": f"Error reading file: {str(e)}"
})
results["failed_queries"] += 1
return results
def print_summary_report(all_results: List[Dict]):
"""
Print a summary report of all profiling results.
Args:
all_results: List of results dictionaries from each folder
"""
print(f"\n{'='*80}")
print("PROFILING SUMMARY REPORT")
print(f"{'='*80}\n")
for folder_results in all_results:
folder_name = folder_results["folder"]
total = folder_results["total_queries"]
success = folder_results["successful_queries"]
failed = folder_results["failed_queries"]
print(f"Folder: {folder_name}")
print(f" Total queries: {total}")
print(f" Successful: {success}")
print(f" Failed: {failed}")
if success > 0:
# Calculate statistics for successful queries
successful_queries = [q for q in folder_results["queries"] if q["success"] and "profile" in q]
if successful_queries:
total_times = [q["profile"]["summary"]["total_time_ms"] for q in successful_queries]
avg_time = sum(total_times) / len(total_times)
min_time = min(total_times)
max_time = max(total_times)
print(f" Average time: {avg_time:.2f}ms")
print(f" Min time: {min_time:.2f}ms")
print(f" Max time: {max_time:.2f}ms")
# Phase breakdown
phases = ["analyze", "optimize", "execute", "format"]
phase_totals = defaultdict(float)
for query in successful_queries:
phases_data = query["profile"].get("phases", {})
for phase in phases:
if phase in phases_data:
phase_totals[phase] += phases_data[phase].get("time_ms", 0)
print(f" Phase averages:")
for phase in phases:
avg_phase = phase_totals[phase] / len(successful_queries)
print(f" {phase}: {avg_phase:.2f}ms")
print()
def save_detailed_results(all_results: List[Dict], output_file: str):
"""
Save detailed profiling results to a JSON file.
Args:
all_results: List of results dictionaries from each folder
output_file: Path to output JSON file
"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(all_results, f, indent=2)
print(f"Detailed results saved to: {output_file}")
def main():
"""
Main function to profile queries from all folders.
"""
parser = argparse.ArgumentParser(
description="Profile PPL queries from test directories and report metrics separately by folder."
)
parser.add_argument(
"--base-path",
type=str,
default="/Users/yuanchu/src/sql/integ-test/src/test/resources",
help="Base path for test resources (default: /Users/yuanchu/src/sql/integ-test/src/test/resources)"
)
args = parser.parse_args()
base_path = Path(args.base_path)
if not base_path.exists():
print(f"Error: Base path does not exist: {base_path}")
sys.exit(1)
# Folders to profile
folders = ["clickbench", "big5", "tpch"]
# Profile each folder
all_results = []
for folder in folders:
folder_results = profile_folder(base_path, folder)
all_results.append(folder_results)
# Print summary report
print_summary_report(all_results)
# Save detailed results
output_file = "profile_results.json"
save_detailed_results(all_results, output_file)
print(f"\n{'='*80}")
print("Profiling complete!")
print(f"{'='*80}\n")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment