Skip to content

Instantly share code, notes, and snippets.

@ntkathole
Created December 8, 2025 05:25
Show Gist options
  • Select an option

  • Save ntkathole/4d0dad7448a338c83d611b9d26baa28c to your computer and use it in GitHub Desktop.

Select an option

Save ntkathole/4d0dad7448a338c83d611b9d26baa28c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
CodeFlare SDK Connection Debugging Script
This script helps diagnose issues with connecting to KubeRay clusters via CodeFlare SDK.
It tests authentication, cluster discovery, and connection setup.
Usage:
python debug_codeflare_connection.py \
--cluster-name test-kuberay \
--namespace kuberay \
--auth-token <your-token> \
--auth-server https://<your-eks-server>.eks.amazonaws.com \
[--skip-tls]
"""
import argparse
import sys
import traceback
from typing import Optional
def print_section(title: str):
"""Print a formatted section header."""
print("\n" + "=" * 80)
print(f" {title}")
print("=" * 80)
def print_success(message: str):
"""Print a success message."""
print(f"✓ {message}")
def print_error(message: str):
"""Print an error message."""
print(f"✗ {message}")
def print_warning(message: str):
"""Print a warning message."""
print(f"⚠ {message}")
def print_info(message: str):
"""Print an info message."""
print(f"ℹ {message}")
def test_codeflare_import():
"""Test if CodeFlare SDK can be imported."""
print_section("Testing CodeFlare SDK Import")
try:
from codeflare_sdk import TokenAuthentication, get_cluster, list_clusters
print_success("CodeFlare SDK imported successfully")
return True
except ImportError as e:
print_error(f"Failed to import CodeFlare SDK: {e}")
print_info("Install it with: pip install codeflare-sdk")
return False
except Exception as e:
print_error(f"Unexpected error importing CodeFlare SDK: {e}")
traceback.print_exc()
return False
def test_kubernetes_import():
"""Test if Kubernetes client can be imported."""
print_section("Testing Kubernetes Client Import")
try:
import kubernetes
from kubernetes import client
print_success(f"Kubernetes client imported successfully (version: {kubernetes.__version__})")
return True
except ImportError:
print_warning("Kubernetes client not available (optional for diagnostics)")
print_info("Install it with: pip install kubernetes")
return False
except Exception as e:
print_warning(f"Unexpected error importing Kubernetes client: {e}")
return False
def test_authentication(auth_token: str, auth_server: str, skip_tls: bool):
"""Test CodeFlare SDK authentication."""
print_section("Testing CodeFlare SDK Authentication")
try:
from codeflare_sdk import TokenAuthentication
print_info(f"Server: {auth_server}")
print_info(f"Skip TLS: {skip_tls}")
print_info("Attempting authentication...")
auth = TokenAuthentication(
token=auth_token,
server=auth_server,
skip_tls=skip_tls,
)
auth.login()
print_success("Authentication successful!")
return True, auth
except Exception as e:
print_error(f"Authentication failed: {e}")
print_info("Please verify:")
print_info(" 1. The auth_token is correct and not expired")
print_info(" 2. The auth_server URL is correct")
print_info(" 3. The token has proper permissions")
print_info(" 4. Network connectivity to the server")
traceback.print_exc()
return False, None
def test_kubernetes_client_config(auth_token: str, auth_server: str, skip_tls: bool):
"""Test Kubernetes client configuration with token."""
print_section("Testing Kubernetes Client Configuration")
try:
from kubernetes import client
print_info("Configuring Kubernetes client with token authentication...")
configuration = client.Configuration()
configuration.host = auth_server
configuration.api_key_prefix['authorization'] = 'Bearer'
configuration.api_key['authorization'] = auth_token
configuration.verify_ssl = not skip_tls
# Test the configuration
api_client = client.ApiClient(configuration)
core_api = client.CoreV1Api(api_client)
# Try a simple API call to verify connectivity
try:
version = core_api.get_code()
print_success(f"Kubernetes API connection successful!")
print_info(f" Kubernetes version: {version.git_version}")
return True, configuration
except Exception as api_error:
print_warning(f"Kubernetes API call failed: {api_error}")
print_info("Configuration created but API call failed (this may be expected)")
return True, configuration
except ImportError:
print_warning("Kubernetes client not available, skipping configuration test")
return False, None
except Exception as e:
print_error(f"Failed to configure Kubernetes client: {e}")
traceback.print_exc()
return False, None
def test_list_clusters(namespace: str):
"""Test listing clusters in the namespace."""
print_section(f"Testing Cluster Listing (namespace: {namespace})")
try:
from codeflare_sdk import list_clusters
print_info(f"Attempting to list clusters in namespace '{namespace}'...")
clusters = list_clusters(namespace=namespace)
if clusters:
print_success(f"Found {len(clusters)} cluster(s):")
for cluster in clusters:
print_info(f" - {cluster}")
else:
print_warning(f"No clusters found in namespace '{namespace}'")
print_info("This might indicate:")
print_info(" 1. No clusters exist in this namespace")
print_info(" 2. The authentication token doesn't have list permissions")
print_info(" 3. The namespace name is incorrect")
return True, clusters
except Exception as e:
print_error(f"Failed to list clusters: {e}")
print_info("This might indicate:")
print_info(" 1. Authentication issues")
print_info(" 2. Permission issues")
print_info(" 3. Network connectivity issues")
traceback.print_exc()
return False, []
def test_get_cluster(cluster_name: str, namespace: str):
"""Test getting a specific cluster."""
print_section(f"Testing Cluster Discovery (cluster: {cluster_name}, namespace: {namespace})")
try:
from codeflare_sdk import get_cluster
print_info(f"Attempting to get cluster '{cluster_name}' from namespace '{namespace}'...")
cluster = get_cluster(cluster_name=cluster_name, namespace=namespace)
if cluster is None:
print_error(f"Cluster '{cluster_name}' not found!")
return False, None
else:
print_success(f"Cluster '{cluster_name}' found!")
try:
cluster_uri = cluster.cluster_uri()
print_info(f" Cluster URI: {cluster_uri}")
except Exception as e:
print_warning(f"Could not get cluster URI: {e}")
return True, cluster
except Exception as e:
print_error(f"Failed to get cluster: {e}")
traceback.print_exc()
return False, None
def test_kubernetes_api_cluster_check(
cluster_name: str, namespace: str, k8s_config
):
"""Test checking cluster existence via Kubernetes API."""
print_section("Testing Cluster Existence via Kubernetes API")
if k8s_config is None:
print_warning("Kubernetes client not configured, skipping API check")
return False
try:
from kubernetes import client
print_info(f"Checking if cluster '{cluster_name}' exists via Kubernetes API...")
api_client = client.ApiClient(k8s_config)
custom_api = client.CustomObjectsApi(api_client)
try:
cluster_obj = custom_api.get_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
name=cluster_name,
)
print_success(f"Cluster '{cluster_name}' exists in Kubernetes API!")
# Print cluster status
status = cluster_obj.get('status', {})
state = status.get('state', 'unknown')
print_info(f" State: {state}")
# Print cluster spec if available
spec = cluster_obj.get('spec', {})
if 'workerGroupSpecs' in spec:
worker_count = len(spec['workerGroupSpecs'])
print_info(f" Worker groups: {worker_count}")
return True
except client.rest.ApiException as api_error:
if api_error.status == 404:
print_error(
f"Cluster '{cluster_name}' not found in Kubernetes API (404 Not Found)"
)
print_info("Please verify:")
print_info(f" 1. The cluster name '{cluster_name}' is correct")
print_info(f" 2. The namespace '{namespace}' is correct")
print_info(f" 3. The cluster exists: kubectl get rayclusters.ray.io -n {namespace}")
elif api_error.status == 403:
print_error("Access denied (403 Forbidden)")
print_info("The authentication token may not have sufficient permissions")
else:
print_error(f"Kubernetes API error: {api_error.status} - {api_error.reason}")
return False
except ImportError:
print_warning("Kubernetes client not available, skipping API check")
return False
except Exception as e:
print_error(f"Failed to check cluster via Kubernetes API: {e}")
traceback.print_exc()
return False
def test_tls_cert_generation(cluster_name: str, namespace: str):
"""Test TLS certificate generation."""
print_section("Testing TLS Certificate Generation")
try:
from codeflare_sdk import generate_cert
print_info(f"Attempting to generate TLS certificate for cluster '{cluster_name}'...")
generate_cert.generate_tls_cert(cluster_name, namespace)
print_success("TLS certificate generated successfully!")
print_info("Exporting certificate environment variables...")
generate_cert.export_env(cluster_name, namespace)
print_success("Certificate environment variables exported!")
return True
except Exception as e:
print_error(f"Failed to generate TLS certificate: {e}")
traceback.print_exc()
return False
def test_ray_connection(cluster):
"""Test Ray connection to the cluster."""
print_section("Testing Ray Connection")
if cluster is None:
print_warning("No cluster available, skipping Ray connection test")
return False
try:
import ray
print_info("Getting cluster URI...")
cluster_uri = cluster.cluster_uri()
print_info(f"Cluster URI: {cluster_uri}")
print_info("Attempting to connect to Ray cluster...")
ray.shutdown() # Ensure clean state
ray.init(
address=cluster_uri,
ignore_reinit_error=True,
logging_level="ERROR",
)
print_success("Ray connection successful!")
# Get cluster resources
try:
resources = ray.cluster_resources()
print_info("Cluster resources:")
for key, value in resources.items():
if key in ['CPU', 'memory', 'GPU']:
if key == 'memory':
value_gb = value / (1024 ** 3)
print_info(f" {key}: {value_gb:.2f} GB")
else:
print_info(f" {key}: {value}")
except Exception as e:
print_warning(f"Could not get cluster resources: {e}")
ray.shutdown()
return True
except Exception as e:
print_error(f"Failed to connect to Ray cluster: {e}")
traceback.print_exc()
return False
def main():
"""Main debugging function."""
parser = argparse.ArgumentParser(
description="Debug CodeFlare SDK connection to KubeRay clusters"
)
parser.add_argument(
"--cluster-name",
required=True,
help="Name of the KubeRay cluster",
)
parser.add_argument(
"--namespace",
required=True,
help="Kubernetes namespace where the cluster is located",
)
parser.add_argument(
"--auth-token",
required=True,
help="Authentication token for Kubernetes API server",
)
parser.add_argument(
"--auth-server",
required=True,
help="Kubernetes API server URL (e.g., https://xxx.eks.amazonaws.com)",
)
parser.add_argument(
"--skip-tls",
action="store_true",
help="Skip TLS verification (not recommended for production)",
)
args = parser.parse_args()
print("\n" + "=" * 80)
print(" CodeFlare SDK Connection Debugging Script")
print("=" * 80)
print(f"\nConfiguration:")
print(f" Cluster Name: {args.cluster_name}")
print(f" Namespace: {args.namespace}")
print(f" Auth Server: {args.auth_server}")
print(f" Skip TLS: {args.skip_tls}")
# Track overall success
all_tests_passed = True
# Test imports
if not test_codeflare_import():
print_error("\nCodeFlare SDK is required. Please install it first.")
sys.exit(1)
test_kubernetes_import() # Optional, so we don't fail if it's missing
# Test authentication
auth_success, auth = test_authentication(
args.auth_token, args.auth_server, args.skip_tls
)
if not auth_success:
print_error("\nAuthentication failed. Please fix authentication issues first.")
sys.exit(1)
# Test Kubernetes client configuration
k8s_config_success, k8s_config = test_kubernetes_client_config(
args.auth_token, args.auth_server, args.skip_tls
)
# Test listing clusters
list_success, clusters = test_list_clusters(args.namespace)
if not list_success:
all_tests_passed = False
# Test getting specific cluster
get_success, cluster = test_get_cluster(args.cluster_name, args.namespace)
if not get_success:
all_tests_passed = False
# Test Kubernetes API check
if k8s_config_success:
api_check_success = test_kubernetes_api_cluster_check(
args.cluster_name, args.namespace, k8s_config
)
if not api_check_success and get_success:
# API check failed but get_cluster succeeded - this is odd but not critical
print_warning("Kubernetes API check failed but get_cluster succeeded")
# Test TLS certificate generation (only if cluster was found)
if get_success:
tls_success = test_tls_cert_generation(args.cluster_name, args.namespace)
if not tls_success:
all_tests_passed = False
# Test Ray connection (only if cluster was found)
ray_success = test_ray_connection(cluster)
if not ray_success:
all_tests_passed = False
# Final summary
print_section("Summary")
if all_tests_passed and get_success:
print_success("All tests passed! CodeFlare SDK connection is working correctly.")
sys.exit(0)
else:
print_error("Some tests failed. Please review the errors above.")
if not get_success:
print_info("\nTroubleshooting steps:")
print_info(" 1. Verify cluster exists: kubectl get rayclusters.ray.io -n " + args.namespace)
print_info(" 2. Check cluster status: kubectl get rayclusters.ray.io " + args.cluster_name + " -n " + args.namespace + " -o yaml")
print_info(" 3. Verify authentication token has proper permissions")
print_info(" 4. Check network connectivity to Kubernetes API server")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment