Created
December 8, 2025 05:25
-
-
Save ntkathole/4d0dad7448a338c83d611b9d26baa28c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| CodeFlare SDK Connection Debugging Script | |
| This script helps diagnose issues with connecting to KubeRay clusters via CodeFlare SDK. | |
| It tests authentication, cluster discovery, and connection setup. | |
| Usage: | |
| python debug_codeflare_connection.py \ | |
| --cluster-name test-kuberay \ | |
| --namespace kuberay \ | |
| --auth-token <your-token> \ | |
| --auth-server https://<your-eks-server>.eks.amazonaws.com \ | |
| [--skip-tls] | |
| """ | |
| import argparse | |
| import sys | |
| import traceback | |
| from typing import Optional | |
| def print_section(title: str): | |
| """Print a formatted section header.""" | |
| print("\n" + "=" * 80) | |
| print(f" {title}") | |
| print("=" * 80) | |
| def print_success(message: str): | |
| """Print a success message.""" | |
| print(f"✓ {message}") | |
| def print_error(message: str): | |
| """Print an error message.""" | |
| print(f"✗ {message}") | |
| def print_warning(message: str): | |
| """Print a warning message.""" | |
| print(f"⚠ {message}") | |
| def print_info(message: str): | |
| """Print an info message.""" | |
| print(f"ℹ {message}") | |
| def test_codeflare_import(): | |
| """Test if CodeFlare SDK can be imported.""" | |
| print_section("Testing CodeFlare SDK Import") | |
| try: | |
| from codeflare_sdk import TokenAuthentication, get_cluster, list_clusters | |
| print_success("CodeFlare SDK imported successfully") | |
| return True | |
| except ImportError as e: | |
| print_error(f"Failed to import CodeFlare SDK: {e}") | |
| print_info("Install it with: pip install codeflare-sdk") | |
| return False | |
| except Exception as e: | |
| print_error(f"Unexpected error importing CodeFlare SDK: {e}") | |
| traceback.print_exc() | |
| return False | |
| def test_kubernetes_import(): | |
| """Test if Kubernetes client can be imported.""" | |
| print_section("Testing Kubernetes Client Import") | |
| try: | |
| import kubernetes | |
| from kubernetes import client | |
| print_success(f"Kubernetes client imported successfully (version: {kubernetes.__version__})") | |
| return True | |
| except ImportError: | |
| print_warning("Kubernetes client not available (optional for diagnostics)") | |
| print_info("Install it with: pip install kubernetes") | |
| return False | |
| except Exception as e: | |
| print_warning(f"Unexpected error importing Kubernetes client: {e}") | |
| return False | |
| def test_authentication(auth_token: str, auth_server: str, skip_tls: bool): | |
| """Test CodeFlare SDK authentication.""" | |
| print_section("Testing CodeFlare SDK Authentication") | |
| try: | |
| from codeflare_sdk import TokenAuthentication | |
| print_info(f"Server: {auth_server}") | |
| print_info(f"Skip TLS: {skip_tls}") | |
| print_info("Attempting authentication...") | |
| auth = TokenAuthentication( | |
| token=auth_token, | |
| server=auth_server, | |
| skip_tls=skip_tls, | |
| ) | |
| auth.login() | |
| print_success("Authentication successful!") | |
| return True, auth | |
| except Exception as e: | |
| print_error(f"Authentication failed: {e}") | |
| print_info("Please verify:") | |
| print_info(" 1. The auth_token is correct and not expired") | |
| print_info(" 2. The auth_server URL is correct") | |
| print_info(" 3. The token has proper permissions") | |
| print_info(" 4. Network connectivity to the server") | |
| traceback.print_exc() | |
| return False, None | |
| def test_kubernetes_client_config(auth_token: str, auth_server: str, skip_tls: bool): | |
| """Test Kubernetes client configuration with token.""" | |
| print_section("Testing Kubernetes Client Configuration") | |
| try: | |
| from kubernetes import client | |
| print_info("Configuring Kubernetes client with token authentication...") | |
| configuration = client.Configuration() | |
| configuration.host = auth_server | |
| configuration.api_key_prefix['authorization'] = 'Bearer' | |
| configuration.api_key['authorization'] = auth_token | |
| configuration.verify_ssl = not skip_tls | |
| # Test the configuration | |
| api_client = client.ApiClient(configuration) | |
| core_api = client.CoreV1Api(api_client) | |
| # Try a simple API call to verify connectivity | |
| try: | |
| version = core_api.get_code() | |
| print_success(f"Kubernetes API connection successful!") | |
| print_info(f" Kubernetes version: {version.git_version}") | |
| return True, configuration | |
| except Exception as api_error: | |
| print_warning(f"Kubernetes API call failed: {api_error}") | |
| print_info("Configuration created but API call failed (this may be expected)") | |
| return True, configuration | |
| except ImportError: | |
| print_warning("Kubernetes client not available, skipping configuration test") | |
| return False, None | |
| except Exception as e: | |
| print_error(f"Failed to configure Kubernetes client: {e}") | |
| traceback.print_exc() | |
| return False, None | |
| def test_list_clusters(namespace: str): | |
| """Test listing clusters in the namespace.""" | |
| print_section(f"Testing Cluster Listing (namespace: {namespace})") | |
| try: | |
| from codeflare_sdk import list_clusters | |
| print_info(f"Attempting to list clusters in namespace '{namespace}'...") | |
| clusters = list_clusters(namespace=namespace) | |
| if clusters: | |
| print_success(f"Found {len(clusters)} cluster(s):") | |
| for cluster in clusters: | |
| print_info(f" - {cluster}") | |
| else: | |
| print_warning(f"No clusters found in namespace '{namespace}'") | |
| print_info("This might indicate:") | |
| print_info(" 1. No clusters exist in this namespace") | |
| print_info(" 2. The authentication token doesn't have list permissions") | |
| print_info(" 3. The namespace name is incorrect") | |
| return True, clusters | |
| except Exception as e: | |
| print_error(f"Failed to list clusters: {e}") | |
| print_info("This might indicate:") | |
| print_info(" 1. Authentication issues") | |
| print_info(" 2. Permission issues") | |
| print_info(" 3. Network connectivity issues") | |
| traceback.print_exc() | |
| return False, [] | |
| def test_get_cluster(cluster_name: str, namespace: str): | |
| """Test getting a specific cluster.""" | |
| print_section(f"Testing Cluster Discovery (cluster: {cluster_name}, namespace: {namespace})") | |
| try: | |
| from codeflare_sdk import get_cluster | |
| print_info(f"Attempting to get cluster '{cluster_name}' from namespace '{namespace}'...") | |
| cluster = get_cluster(cluster_name=cluster_name, namespace=namespace) | |
| if cluster is None: | |
| print_error(f"Cluster '{cluster_name}' not found!") | |
| return False, None | |
| else: | |
| print_success(f"Cluster '{cluster_name}' found!") | |
| try: | |
| cluster_uri = cluster.cluster_uri() | |
| print_info(f" Cluster URI: {cluster_uri}") | |
| except Exception as e: | |
| print_warning(f"Could not get cluster URI: {e}") | |
| return True, cluster | |
| except Exception as e: | |
| print_error(f"Failed to get cluster: {e}") | |
| traceback.print_exc() | |
| return False, None | |
| def test_kubernetes_api_cluster_check( | |
| cluster_name: str, namespace: str, k8s_config | |
| ): | |
| """Test checking cluster existence via Kubernetes API.""" | |
| print_section("Testing Cluster Existence via Kubernetes API") | |
| if k8s_config is None: | |
| print_warning("Kubernetes client not configured, skipping API check") | |
| return False | |
| try: | |
| from kubernetes import client | |
| print_info(f"Checking if cluster '{cluster_name}' exists via Kubernetes API...") | |
| api_client = client.ApiClient(k8s_config) | |
| custom_api = client.CustomObjectsApi(api_client) | |
| try: | |
| cluster_obj = custom_api.get_namespaced_custom_object( | |
| group="ray.io", | |
| version="v1", | |
| namespace=namespace, | |
| plural="rayclusters", | |
| name=cluster_name, | |
| ) | |
| print_success(f"Cluster '{cluster_name}' exists in Kubernetes API!") | |
| # Print cluster status | |
| status = cluster_obj.get('status', {}) | |
| state = status.get('state', 'unknown') | |
| print_info(f" State: {state}") | |
| # Print cluster spec if available | |
| spec = cluster_obj.get('spec', {}) | |
| if 'workerGroupSpecs' in spec: | |
| worker_count = len(spec['workerGroupSpecs']) | |
| print_info(f" Worker groups: {worker_count}") | |
| return True | |
| except client.rest.ApiException as api_error: | |
| if api_error.status == 404: | |
| print_error( | |
| f"Cluster '{cluster_name}' not found in Kubernetes API (404 Not Found)" | |
| ) | |
| print_info("Please verify:") | |
| print_info(f" 1. The cluster name '{cluster_name}' is correct") | |
| print_info(f" 2. The namespace '{namespace}' is correct") | |
| print_info(f" 3. The cluster exists: kubectl get rayclusters.ray.io -n {namespace}") | |
| elif api_error.status == 403: | |
| print_error("Access denied (403 Forbidden)") | |
| print_info("The authentication token may not have sufficient permissions") | |
| else: | |
| print_error(f"Kubernetes API error: {api_error.status} - {api_error.reason}") | |
| return False | |
| except ImportError: | |
| print_warning("Kubernetes client not available, skipping API check") | |
| return False | |
| except Exception as e: | |
| print_error(f"Failed to check cluster via Kubernetes API: {e}") | |
| traceback.print_exc() | |
| return False | |
| def test_tls_cert_generation(cluster_name: str, namespace: str): | |
| """Test TLS certificate generation.""" | |
| print_section("Testing TLS Certificate Generation") | |
| try: | |
| from codeflare_sdk import generate_cert | |
| print_info(f"Attempting to generate TLS certificate for cluster '{cluster_name}'...") | |
| generate_cert.generate_tls_cert(cluster_name, namespace) | |
| print_success("TLS certificate generated successfully!") | |
| print_info("Exporting certificate environment variables...") | |
| generate_cert.export_env(cluster_name, namespace) | |
| print_success("Certificate environment variables exported!") | |
| return True | |
| except Exception as e: | |
| print_error(f"Failed to generate TLS certificate: {e}") | |
| traceback.print_exc() | |
| return False | |
| def test_ray_connection(cluster): | |
| """Test Ray connection to the cluster.""" | |
| print_section("Testing Ray Connection") | |
| if cluster is None: | |
| print_warning("No cluster available, skipping Ray connection test") | |
| return False | |
| try: | |
| import ray | |
| print_info("Getting cluster URI...") | |
| cluster_uri = cluster.cluster_uri() | |
| print_info(f"Cluster URI: {cluster_uri}") | |
| print_info("Attempting to connect to Ray cluster...") | |
| ray.shutdown() # Ensure clean state | |
| ray.init( | |
| address=cluster_uri, | |
| ignore_reinit_error=True, | |
| logging_level="ERROR", | |
| ) | |
| print_success("Ray connection successful!") | |
| # Get cluster resources | |
| try: | |
| resources = ray.cluster_resources() | |
| print_info("Cluster resources:") | |
| for key, value in resources.items(): | |
| if key in ['CPU', 'memory', 'GPU']: | |
| if key == 'memory': | |
| value_gb = value / (1024 ** 3) | |
| print_info(f" {key}: {value_gb:.2f} GB") | |
| else: | |
| print_info(f" {key}: {value}") | |
| except Exception as e: | |
| print_warning(f"Could not get cluster resources: {e}") | |
| ray.shutdown() | |
| return True | |
| except Exception as e: | |
| print_error(f"Failed to connect to Ray cluster: {e}") | |
| traceback.print_exc() | |
| return False | |
| def main(): | |
| """Main debugging function.""" | |
| parser = argparse.ArgumentParser( | |
| description="Debug CodeFlare SDK connection to KubeRay clusters" | |
| ) | |
| parser.add_argument( | |
| "--cluster-name", | |
| required=True, | |
| help="Name of the KubeRay cluster", | |
| ) | |
| parser.add_argument( | |
| "--namespace", | |
| required=True, | |
| help="Kubernetes namespace where the cluster is located", | |
| ) | |
| parser.add_argument( | |
| "--auth-token", | |
| required=True, | |
| help="Authentication token for Kubernetes API server", | |
| ) | |
| parser.add_argument( | |
| "--auth-server", | |
| required=True, | |
| help="Kubernetes API server URL (e.g., https://xxx.eks.amazonaws.com)", | |
| ) | |
| parser.add_argument( | |
| "--skip-tls", | |
| action="store_true", | |
| help="Skip TLS verification (not recommended for production)", | |
| ) | |
| args = parser.parse_args() | |
| print("\n" + "=" * 80) | |
| print(" CodeFlare SDK Connection Debugging Script") | |
| print("=" * 80) | |
| print(f"\nConfiguration:") | |
| print(f" Cluster Name: {args.cluster_name}") | |
| print(f" Namespace: {args.namespace}") | |
| print(f" Auth Server: {args.auth_server}") | |
| print(f" Skip TLS: {args.skip_tls}") | |
| # Track overall success | |
| all_tests_passed = True | |
| # Test imports | |
| if not test_codeflare_import(): | |
| print_error("\nCodeFlare SDK is required. Please install it first.") | |
| sys.exit(1) | |
| test_kubernetes_import() # Optional, so we don't fail if it's missing | |
| # Test authentication | |
| auth_success, auth = test_authentication( | |
| args.auth_token, args.auth_server, args.skip_tls | |
| ) | |
| if not auth_success: | |
| print_error("\nAuthentication failed. Please fix authentication issues first.") | |
| sys.exit(1) | |
| # Test Kubernetes client configuration | |
| k8s_config_success, k8s_config = test_kubernetes_client_config( | |
| args.auth_token, args.auth_server, args.skip_tls | |
| ) | |
| # Test listing clusters | |
| list_success, clusters = test_list_clusters(args.namespace) | |
| if not list_success: | |
| all_tests_passed = False | |
| # Test getting specific cluster | |
| get_success, cluster = test_get_cluster(args.cluster_name, args.namespace) | |
| if not get_success: | |
| all_tests_passed = False | |
| # Test Kubernetes API check | |
| if k8s_config_success: | |
| api_check_success = test_kubernetes_api_cluster_check( | |
| args.cluster_name, args.namespace, k8s_config | |
| ) | |
| if not api_check_success and get_success: | |
| # API check failed but get_cluster succeeded - this is odd but not critical | |
| print_warning("Kubernetes API check failed but get_cluster succeeded") | |
| # Test TLS certificate generation (only if cluster was found) | |
| if get_success: | |
| tls_success = test_tls_cert_generation(args.cluster_name, args.namespace) | |
| if not tls_success: | |
| all_tests_passed = False | |
| # Test Ray connection (only if cluster was found) | |
| ray_success = test_ray_connection(cluster) | |
| if not ray_success: | |
| all_tests_passed = False | |
| # Final summary | |
| print_section("Summary") | |
| if all_tests_passed and get_success: | |
| print_success("All tests passed! CodeFlare SDK connection is working correctly.") | |
| sys.exit(0) | |
| else: | |
| print_error("Some tests failed. Please review the errors above.") | |
| if not get_success: | |
| print_info("\nTroubleshooting steps:") | |
| print_info(" 1. Verify cluster exists: kubectl get rayclusters.ray.io -n " + args.namespace) | |
| print_info(" 2. Check cluster status: kubectl get rayclusters.ray.io " + args.cluster_name + " -n " + args.namespace + " -o yaml") | |
| print_info(" 3. Verify authentication token has proper permissions") | |
| print_info(" 4. Check network connectivity to Kubernetes API server") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment