Created
January 13, 2026 09:05
-
-
Save yuancu/e722d937059c2aec5c1a04fa34ec8d59 to your computer and use it in GitHub Desktop.
Profile PPL queries on big5, tpch, and clickbench
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Profile PPL queries from test directories and report metrics separately by folder. | |
| """ | |
| import argparse | |
| import json | |
| import re | |
| import sys | |
| from pathlib import Path | |
| from typing import Dict, List | |
| from collections import defaultdict | |
| import requests | |
| def sanitize_ppl(ppl: str) -> str: | |
| """ | |
| Sanitizes the PPL query by removing block comments and replacing new lines with spaces. | |
| Matches the Java implementation in PPLIntegTestCase.sanitize() | |
| Args: | |
| ppl: the PPL query string | |
| Returns: | |
| the sanitized PPL query string | |
| """ | |
| # Remove block comments (/* ... */) | |
| without_comments = re.sub(r'/\*.*?\*/', '', ppl, flags=re.DOTALL) | |
| # Replace newlines with spaces | |
| without_newlines = without_comments.replace('\r\n', ' ').replace('\n', ' ') | |
| return without_newlines.strip() | |
| def execute_profile_query(query: str, endpoint: str = "http://localhost:9200/_plugins/_ppl") -> Dict: | |
| """ | |
| Execute a PPL query with profiling enabled. | |
| Args: | |
| query: The PPL query string (already sanitized) | |
| endpoint: The OpenSearch PPL endpoint | |
| Returns: | |
| Dictionary containing the response or error information | |
| """ | |
| headers = { | |
| 'Content-Type': 'application/json' | |
| } | |
| payload = { | |
| "query": query, | |
| "profile": True | |
| } | |
| try: | |
| response = requests.post(endpoint, headers=headers, json=payload, timeout=300) | |
| response.raise_for_status() | |
| return { | |
| "success": True, | |
| "data": response.json(), | |
| "status_code": response.status_code | |
| } | |
| except requests.exceptions.Timeout: | |
| return { | |
| "success": False, | |
| "error": "Request timeout (300s)" | |
| } | |
| except requests.exceptions.RequestException as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "status_code": getattr(e.response, 'status_code', None) if hasattr(e, 'response') else None | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Unexpected error: {str(e)}" | |
| } | |
| def find_ppl_files(base_path: str, folder_name: str) -> List[Path]: | |
| """ | |
| Find all .ppl files in a given folder. | |
| Args: | |
| base_path: Base directory path | |
| folder_name: Name of the folder (clickbench, big5, or tpch) | |
| Returns: | |
| List of Path objects for .ppl files | |
| """ | |
| folder_path = Path(base_path) / folder_name | |
| if not folder_path.exists(): | |
| print(f"Warning: Folder {folder_path} does not exist") | |
| return [] | |
| ppl_files = sorted(folder_path.rglob("*.ppl")) | |
| return ppl_files | |
| def profile_folder(base_path: str, folder_name: str) -> Dict: | |
| """ | |
| Profile all queries in a specific folder. | |
| Args: | |
| base_path: Base directory path | |
| folder_name: Name of the folder | |
| Returns: | |
| Dictionary with profiling results | |
| """ | |
| results = { | |
| "folder": folder_name, | |
| "total_queries": 0, | |
| "successful_queries": 0, | |
| "failed_queries": 0, | |
| "queries": [] | |
| } | |
| ppl_files = find_ppl_files(base_path, folder_name) | |
| results["total_queries"] = len(ppl_files) | |
| print(f"\n{'='*80}") | |
| print(f"Profiling folder: {folder_name}") | |
| print(f"Found {len(ppl_files)} PPL query files") | |
| print(f"{'='*80}\n") | |
| for ppl_file in ppl_files: | |
| query_name = ppl_file.stem | |
| print(f"Executing: {query_name} ... ", end="", flush=True) | |
| # Read and sanitize the query | |
| try: | |
| with open(ppl_file, 'r', encoding='utf-8') as f: | |
| raw_query = f.read() | |
| sanitized_query = sanitize_ppl(raw_query) | |
| # Execute with profiling | |
| result = execute_profile_query(sanitized_query) | |
| query_result = { | |
| "name": query_name, | |
| "file": str(ppl_file.relative_to(base_path)), | |
| "success": result["success"] | |
| } | |
| if result["success"]: | |
| profile_data = result["data"].get("profile", {}) | |
| if profile_data: | |
| query_result["profile"] = profile_data | |
| summary = profile_data.get("summary", {}) | |
| total_time = summary.get("total_time_ms", 0) | |
| print(f"✓ ({total_time:.2f}ms)") | |
| results["successful_queries"] += 1 | |
| else: | |
| query_result["error"] = "No profile data in response" | |
| print(f"✗ (No profile data)") | |
| results["failed_queries"] += 1 | |
| else: | |
| query_result["error"] = result.get("error", "Unknown error") | |
| print(f"✗ ({query_result['error']})") | |
| results["failed_queries"] += 1 | |
| results["queries"].append(query_result) | |
| except Exception as e: | |
| print(f"✗ (Error reading file: {str(e)})") | |
| results["queries"].append({ | |
| "name": query_name, | |
| "file": str(ppl_file.relative_to(base_path)), | |
| "success": False, | |
| "error": f"Error reading file: {str(e)}" | |
| }) | |
| results["failed_queries"] += 1 | |
| return results | |
| def print_summary_report(all_results: List[Dict]): | |
| """ | |
| Print a summary report of all profiling results. | |
| Args: | |
| all_results: List of results dictionaries from each folder | |
| """ | |
| print(f"\n{'='*80}") | |
| print("PROFILING SUMMARY REPORT") | |
| print(f"{'='*80}\n") | |
| for folder_results in all_results: | |
| folder_name = folder_results["folder"] | |
| total = folder_results["total_queries"] | |
| success = folder_results["successful_queries"] | |
| failed = folder_results["failed_queries"] | |
| print(f"Folder: {folder_name}") | |
| print(f" Total queries: {total}") | |
| print(f" Successful: {success}") | |
| print(f" Failed: {failed}") | |
| if success > 0: | |
| # Calculate statistics for successful queries | |
| successful_queries = [q for q in folder_results["queries"] if q["success"] and "profile" in q] | |
| if successful_queries: | |
| total_times = [q["profile"]["summary"]["total_time_ms"] for q in successful_queries] | |
| avg_time = sum(total_times) / len(total_times) | |
| min_time = min(total_times) | |
| max_time = max(total_times) | |
| print(f" Average time: {avg_time:.2f}ms") | |
| print(f" Min time: {min_time:.2f}ms") | |
| print(f" Max time: {max_time:.2f}ms") | |
| # Phase breakdown | |
| phases = ["analyze", "optimize", "execute", "format"] | |
| phase_totals = defaultdict(float) | |
| for query in successful_queries: | |
| phases_data = query["profile"].get("phases", {}) | |
| for phase in phases: | |
| if phase in phases_data: | |
| phase_totals[phase] += phases_data[phase].get("time_ms", 0) | |
| print(f" Phase averages:") | |
| for phase in phases: | |
| avg_phase = phase_totals[phase] / len(successful_queries) | |
| print(f" {phase}: {avg_phase:.2f}ms") | |
| print() | |
| def save_detailed_results(all_results: List[Dict], output_file: str): | |
| """ | |
| Save detailed profiling results to a JSON file. | |
| Args: | |
| all_results: List of results dictionaries from each folder | |
| output_file: Path to output JSON file | |
| """ | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| json.dump(all_results, f, indent=2) | |
| print(f"Detailed results saved to: {output_file}") | |
| def main(): | |
| """ | |
| Main function to profile queries from all folders. | |
| """ | |
| parser = argparse.ArgumentParser( | |
| description="Profile PPL queries from test directories and report metrics separately by folder." | |
| ) | |
| parser.add_argument( | |
| "--base-path", | |
| type=str, | |
| default="/Users/yuanchu/src/sql/integ-test/src/test/resources", | |
| help="Base path for test resources (default: /Users/yuanchu/src/sql/integ-test/src/test/resources)" | |
| ) | |
| args = parser.parse_args() | |
| base_path = Path(args.base_path) | |
| if not base_path.exists(): | |
| print(f"Error: Base path does not exist: {base_path}") | |
| sys.exit(1) | |
| # Folders to profile | |
| folders = ["clickbench", "big5", "tpch"] | |
| # Profile each folder | |
| all_results = [] | |
| for folder in folders: | |
| folder_results = profile_folder(base_path, folder) | |
| all_results.append(folder_results) | |
| # Print summary report | |
| print_summary_report(all_results) | |
| # Save detailed results | |
| output_file = "profile_results.json" | |
| save_detailed_results(all_results, output_file) | |
| print(f"\n{'='*80}") | |
| print("Profiling complete!") | |
| print(f"{'='*80}\n") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment