|
#!/usr/bin/env python3 |
|
|
|
from __future__ import annotations |
|
import subprocess |
|
import json |
|
import time |
|
from apex_arena._types import GradingResult |
|
import re |
|
from datetime import timedelta |
|
|
|
|
|
_grafana_api_ready = None |
|
_active_port_forwards: set = set() |
|
|
|
|
|
def sh(cmd): |
|
r = subprocess.run(cmd, shell=True, capture_output=True, text=True) |
|
return r.returncode, r.stdout.strip(), r.stderr.strip() |
|
|
|
def normalize_image(img): |
|
"""Strip docker.io/ prefix for image comparison.""" |
|
for prefix in ("docker.io/library/", "docker.io/"): |
|
if img.startswith(prefix): |
|
return img[len(prefix):] |
|
return img |
|
|
|
|
|
def port_forward(kind, name, namespace, local_port, remote_port): |
|
key = (name, namespace, local_port) |
|
if key in _active_port_forwards: |
|
return |
|
|
|
sh(f"pkill -f 'kubectl port-forward.*{local_port}' || true") |
|
sh( |
|
f"kubectl port-forward -n {namespace} {kind}/{name} " |
|
f"{local_port}:{remote_port} >/dev/null 2>&1 &" |
|
) |
|
time.sleep(5) |
|
_active_port_forwards.add(key) |
|
|
|
|
|
def parse_duration(duration_str): |
|
"""Parse Prometheus duration string to timedelta.""" |
|
match = re.match(r'(\d+)(s|m|h)', str(duration_str)) |
|
if not match: |
|
return timedelta(0) |
|
value, unit = int(match.group(1)), match.group(2) |
|
if unit == 's': |
|
return timedelta(seconds=value) |
|
elif unit == 'm': |
|
return timedelta(minutes=value) |
|
elif unit == 'h': |
|
return timedelta(hours=value) |
|
return timedelta(0) |
|
|
|
|
|
_TERMINAL_POD_STATES = {"ImagePullBackOff", "ErrImagePull", "InvalidImageName"} |
|
|
|
|
|
def check_deployment_health(name, namespace): |
|
"""Quick check for terminal pod errors. Returns (healthy, message).""" |
|
code, out, _ = sh( |
|
f"kubectl get deployment {name} -n {namespace} -o json" |
|
) |
|
if code != 0: |
|
return False, f"Deployment '{name}' not found in {namespace}" |
|
|
|
try: |
|
data = json.loads(out) |
|
selector = data["spec"]["selector"]["matchLabels"] |
|
label_str = ",".join(f"{k}={v}" for k, v in selector.items()) |
|
except (json.JSONDecodeError, KeyError): |
|
return True, "Could not parse selector, assuming healthy" |
|
|
|
code, out, _ = sh( |
|
f"kubectl get pods -n {namespace} -l {label_str} -o json" |
|
) |
|
if code != 0: |
|
return True, "Could not list pods, assuming healthy" |
|
|
|
try: |
|
pods = json.loads(out).get("items", []) |
|
except json.JSONDecodeError: |
|
return True, "Could not parse pods, assuming healthy" |
|
|
|
for pod in pods: |
|
statuses = pod.get("status", {}).get("containerStatuses", []) |
|
for cs in statuses: |
|
waiting = cs.get("state", {}).get("waiting", {}) |
|
reason = waiting.get("reason", "") |
|
if reason in _TERMINAL_POD_STATES: |
|
pod_name = pod.get("metadata", {}).get("name", "unknown") |
|
return False, ( |
|
f"Pod '{pod_name}' in {namespace} is stuck in {reason} — " |
|
f"deployment '{name}' will not recover" |
|
) |
|
|
|
return True, f"Deployment '{name}' pods are not in terminal error state" |
|
|
|
|
|
# ------------------------------------------------------------------- |
|
# RESOURCE CHECKS |
|
# ------------------------------------------------------------------- |
|
|
|
def deployment_exists(name, namespace): |
|
code, _, _ = sh(f"kubectl get deployment {name} -n {namespace}") |
|
return ( |
|
(True, f"Deployment '{name}' exists in {namespace}") |
|
if code == 0 |
|
else (False, f"Deployment '{name}' not found in {namespace}") |
|
) |
|
|
|
|
|
def configmap_contains(name, namespace, required_strings): |
|
code, out, _ = sh( |
|
f"kubectl get configmap {name} -n {namespace} -o yaml" |
|
) |
|
if code != 0: |
|
return False, f"ConfigMap '{name}' not readable" |
|
|
|
missing = [s for s in required_strings if s not in out] |
|
if missing: |
|
return False, f"ConfigMap '{name}' missing: {missing}" |
|
|
|
return True, f"ConfigMap '{name}' contains required content" |
|
|
|
|
|
def service_exists(name, namespace, port): |
|
code, out, _ = sh( |
|
f"kubectl get svc {name} -n {namespace} -o json" |
|
) |
|
if code != 0: |
|
return False, f"Service '{name}' not found in {namespace}" |
|
|
|
data = json.loads(out) |
|
ports = [p.get("port") for p in data.get("spec", {}).get("ports", [])] |
|
|
|
if port in ports: |
|
return True, f"Service '{name}' exposes port {port}" |
|
return False, f"Service '{name}' does not expose port {port}" |
|
|
|
|
|
def wait_for_grafana_api(): |
|
global _grafana_api_ready |
|
if _grafana_api_ready is not None: |
|
return _grafana_api_ready |
|
|
|
sh( |
|
"kubectl wait --for=condition=ready pod -l app=grafana " |
|
"-n observability --timeout=60s" |
|
) |
|
|
|
for _ in range(6): |
|
code, _, _ = sh( |
|
"kubectl exec -n observability deploy/grafana -- " |
|
"wget -qO- http://localhost:3000/api/health" |
|
) |
|
if code == 0: |
|
_grafana_api_ready = True |
|
return True |
|
time.sleep(5) |
|
|
|
_grafana_api_ready = False |
|
return False |
|
|
|
|
|
|
|
def configmap_exists(name, namespace): |
|
code, _, _ = sh(f"kubectl get configmap {name} -n {namespace}") |
|
return ( |
|
(True, f"ConfigMap '{name}' exists in {namespace}") |
|
if code == 0 |
|
else (False, f"ConfigMap '{name}' not found in {namespace}") |
|
) |
|
|
|
|
|
|
|
def pvc_exists(name, namespace): |
|
code, _, _ = sh(f"kubectl get pvc {name} -n {namespace}") |
|
return ( |
|
(True, f"PVC '{name}' exists in {namespace}") |
|
if code == 0 |
|
else (False, f"PVC '{name}' not found in {namespace}") |
|
) |
|
|
|
|
|
# ------------------------------------------------------------------- |
|
# BLACKBOX FUNCTIONAL CHECKS |
|
# ------------------------------------------------------------------- |
|
|
|
def blackbox_metrics_exposed(): |
|
healthy, msg = check_deployment_health("blackbox-exporter", "observability") |
|
if not healthy: |
|
return False, msg |
|
|
|
port_forward( |
|
"svc", |
|
"blackbox-exporter", |
|
"observability", |
|
9115, |
|
9115, |
|
) |
|
|
|
code, out, _ = sh("curl -s http://localhost:9115/metrics") |
|
if code == 0 and "blackbox_exporter_config_last_reload_successful" in out: |
|
return True, "Blackbox exporter metrics endpoint is reachable" |
|
|
|
return False, "Blackbox exporter metrics endpoint not responding correctly" |
|
|
|
def blackbox_config_has_required_modules(): |
|
code, out, _ = sh( |
|
"kubectl get configmap blackbox-config " |
|
"-n observability -o jsonpath='{.data}'" |
|
) |
|
if code != 0: |
|
return False, "blackbox-config not readable" |
|
|
|
required_modules = ["http_2xx", "tcp_connect"] |
|
missing = [m for m in required_modules if m not in out] |
|
|
|
if missing: |
|
return False, f"blackbox-config missing required modules: {missing}" |
|
|
|
return True, "blackbox-config contains all required probe modules" |
|
|
|
|
|
def kubernetes_api_tcp_probe_configured(): |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o jsonpath='{.data.prometheus\\.yml}'" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
if "kubernetes.default" not in out: |
|
return False, "Kubernetes API server not configured as probe target" |
|
|
|
|
|
import re |
|
tcp_module_pattern = r'module:\s*\[?\s*["\']?tcp_connect["\']?\s*\]?' |
|
if not re.search(tcp_module_pattern, out) or "tcp_connect_tls" in out: |
|
return False, "Must use 'tcp_connect' module (not tcp_connect_tls) for Kubernetes API TCP probe" |
|
|
|
return True, "Kubernetes API TCP probe is configured with tcp_connect module" |
|
|
|
|
|
def prometheus_has_probe_metrics(): |
|
healthy, msg = check_deployment_health("prometheus", "observability") |
|
if not healthy: |
|
return False, msg |
|
|
|
port_forward("svc", "prometheus", "observability", 9090, 9090) |
|
|
|
|
|
for _ in range(6): |
|
code, out, _ = sh( |
|
"curl -s " |
|
"'http://localhost:9090/api/v1/query?query=probe_success'" |
|
) |
|
|
|
if code == 0 and '"result"' in out: |
|
return True, "Prometheus is collecting probe metrics" |
|
|
|
time.sleep(5) |
|
|
|
return False, "Prometheus not returning probe metrics after retries" |
|
|
|
|
|
def check_slo_burn_rate_alerts(): |
|
"""Verify alerts implement proper multi-window SLO burn rate logic.""" |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
# Must have multiple time windows in recording rules or alert expressions |
|
windows = re.findall( |
|
r"avg_over_time\([^)]*\[(\d+[mh])\]\)", |
|
out |
|
) |
|
if len(set(windows)) < 2: |
|
return False, ( |
|
"Burn rate alerts must use multiple time windows " |
|
"(e.g., 5m and 1h)" |
|
) |
|
|
|
# Must have at least 2 distinct 'for:' durations across alert rules |
|
# (evidence of fast-burn vs slow-burn detection windows) |
|
for_durations = re.findall(r"for:\s*(\d+[smh])", out) |
|
unique_durations = {parse_duration(d) for d in for_durations} |
|
unique_durations.discard(timedelta(0)) |
|
|
|
if len(unique_durations) < 2: |
|
return False, ( |
|
"SLO burn rate alerting requires multiple detection windows " |
|
"(e.g., a fast-burn alert with 'for: 2m' and a slow-burn " |
|
"alert with 'for: 1h')" |
|
) |
|
|
|
return True, "Valid multi-window SLO burn rate alerts detected" |
|
|
|
|
|
def prometheus_scrape_interval_valid(): |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o jsonpath='{.data.prometheus\\.yml}'" |
|
) |
|
|
|
|
|
if "global:" not in out: |
|
return False, "Prometheus config missing global section" |
|
|
|
global_section = out.split("scrape_configs")[0] if "scrape_configs" in out else out |
|
if "scrape_interval: 15s" not in global_section and "scrape_interval: 10s" not in global_section: |
|
return False, "Global scrape_interval must be 10s or 15s" |
|
|
|
return True, "Scrape interval is appropriately configured" |
|
|
|
|
|
def check_alert_for_duration(): |
|
"""Verify alerts have appropriate 'for' duration for timely detection.""" |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
for_match = re.search( |
|
r'alert:\s*SyntheticProbeFailure.*?for:\s*(\d+[smh])', |
|
out, |
|
re.DOTALL |
|
) |
|
if not for_match: |
|
if 'SyntheticProbeFailure' not in out: |
|
return False, "SyntheticProbeFailure alert not found" |
|
return False, "Alert missing 'for' duration" |
|
|
|
for_duration = for_match.group(1) |
|
duration = parse_duration(for_duration) |
|
|
|
if duration > timedelta(minutes=2): |
|
return False, f"Alert 'for' duration {for_duration} exceeds 2m detection requirement" |
|
if duration < timedelta(seconds=30): |
|
return False, f"Alert 'for' duration {for_duration} too short, will cause flapping" |
|
|
|
return True, f"Alert 'for' duration {for_duration} is appropriate" |
|
|
|
|
|
def check_alert_annotations(): |
|
"""Verify alerts have required annotations for operational use.""" |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
if 'SyntheticProbeFailure' not in out: |
|
return False, "SyntheticProbeFailure alert not found" |
|
|
|
alert_section = out[out.find('SyntheticProbeFailure'):] |
|
|
|
if 'annotations:' not in alert_section: |
|
return False, "Alert missing annotations section" |
|
|
|
if 'description' not in alert_section and 'summary' not in alert_section: |
|
return False, "Alert missing description/summary annotation" |
|
|
|
return True, "Alert has required annotations" |
|
|
|
|
|
def check_recording_rules(): |
|
"""Verify recording rules exist AND are used in alert expressions.""" |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
# Must define recording rules |
|
if "record:" not in out: |
|
return False, "Prometheus should define recording rules" |
|
|
|
# Extract recording rule names |
|
record_names = re.findall( |
|
r"record:\s*([a-zA-Z_:][a-zA-Z0-9_:]*)", |
|
out |
|
) |
|
|
|
if not record_names: |
|
return False, "No valid recording rule names found" |
|
|
|
# Multi-window availability requires at least 2 recording rules |
|
if len(record_names) < 2: |
|
return False, ( |
|
"Multiple recording rules needed for multi-window " |
|
"availability signals (e.g., 5m and 1h windows)" |
|
) |
|
|
|
# At least 2 recording rules must be referenced in alert expressions |
|
alert_section = out[out.find("alert:"):] if "alert:" in out else out |
|
|
|
used_count = sum(1 for name in record_names if name in alert_section) |
|
|
|
if used_count < 2: |
|
return False, ( |
|
"At least 2 recording rules should be referenced in alert " |
|
"expressions for multi-window burn rate detection" |
|
) |
|
|
|
return True, "Recording rules exist and are used in alerts" |
|
|
|
|
|
def check_blackbox_modules(): |
|
"""Verify correct Blackbox modules used for each protocol.""" |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o jsonpath='{.data.prometheus\\.yml}'" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
out_lower = out.lower() |
|
|
|
# --- Kubernetes API must use tcp_connect --- |
|
if 'kubernetes.default' in out: |
|
kube_pos = out.find('kubernetes.default') |
|
kube_section = out[max(0, kube_pos - 500):kube_pos + 200] |
|
if 'tcp_connect' not in kube_section: |
|
return False, "Kubernetes API target should use tcp_connect module" |
|
|
|
# --- HTTP endpoints must use http_2xx --- |
|
if 'argocd' in out_lower: |
|
argocd_pos = out_lower.find('argocd') |
|
argocd_section = out[max(0, argocd_pos - 500):argocd_pos + 200] |
|
if 'http_2xx' not in argocd_section and 'http' in argocd_section: |
|
return False, "HTTP targets should use http_2xx module" |
|
|
|
return True, "Blackbox modules correctly matched to target protocols" |
|
|
|
|
|
def check_alert_severity_labels(): |
|
"""Verify alerts define severity labels.""" |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
if "severity:" not in out: |
|
return False, ( |
|
"Alerts must define severity labels " |
|
"(critical or warning)" |
|
) |
|
|
|
return True, "Alert severity labels present" |
|
|
|
|
|
def check_dashboard_uses_recording_rules(): |
|
"""Dashboard should reference recording rules instead of raw PromQL.""" |
|
code, dash_out, _ = sh( |
|
"kubectl get configmap grafana-dashboards " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "grafana-dashboards ConfigMap not readable" |
|
|
|
# Extract actual recording rule names from prometheus-config |
|
code, prom_out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o yaml" |
|
) |
|
|
|
if code == 0: |
|
record_names = re.findall( |
|
r"record:\s*([a-zA-Z_:][a-zA-Z0-9_:]*)", |
|
prom_out |
|
) |
|
if record_names and any(name in dash_out for name in record_names): |
|
return True, "Dashboard references recording rules" |
|
|
|
# Fallback: accept any probe:*:* pattern (recording rule convention) |
|
if re.search(r"probe:[a-zA-Z_]+:[a-zA-Z0-9_]+", dash_out): |
|
return True, "Dashboard references recording rules" |
|
|
|
return False, ( |
|
"Dashboard should reference pre-computed recording rules " |
|
"(e.g., probe:availability:5m) instead of raw PromQL" |
|
) |
|
|
|
|
|
def argocd_probe_success(): |
|
port_forward( |
|
"svc", |
|
"blackbox-exporter", |
|
"observability", |
|
9115, |
|
9115, |
|
) |
|
|
|
cmd = ( |
|
"curl -s " |
|
"'http://localhost:9115/probe?" |
|
"target=http://argocd.devops.local:80/api/version&module=http_2xx' | " |
|
"grep '^probe_success 1'" |
|
) |
|
|
|
code, out, _ = sh(cmd) |
|
if code == 0 and out: |
|
return True, "Synthetic probe reports Argo CD endpoint as available" |
|
|
|
return False, "Synthetic probe did not report Argo CD as available" |
|
|
|
|
|
def deployment_uses_image(name, namespace, expected_image): |
|
code, out, _ = sh( |
|
f"kubectl get deployment {name} -n {namespace} -o json" |
|
) |
|
if code != 0: |
|
return False, f"Deployment '{name}' not found" |
|
|
|
data = json.loads(out) |
|
containers = data["spec"]["template"]["spec"]["containers"] |
|
images = [c.get("image", "") for c in containers] |
|
|
|
actual = [normalize_image(i) for i in images] |
|
expected = normalize_image(expected_image) |
|
|
|
if expected in actual: |
|
return True, f"Deployment '{name}' uses image '{expected_image}'" |
|
|
|
return False, f"Expected {expected_image}, found {images}" |
|
|
|
|
|
def prometheus_blackbox_relabeling_present(): |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o jsonpath='{.data.prometheus\\.yml}'" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
|
|
has_blackbox_addr = ("blackbox-exporter:9115" in out or |
|
"blackbox-exporter.observability" in out) |
|
|
|
required_snippets = [ |
|
"metrics_path: /probe", |
|
"__param_target", |
|
] |
|
|
|
missing = [s for s in required_snippets if s not in out] |
|
|
|
if not has_blackbox_addr: |
|
missing.append("blackbox-exporter address") |
|
|
|
if not missing: |
|
return True, "Prometheus blackbox relabeling is correctly configured" |
|
|
|
return False, f"Missing blackbox relabeling elements: {missing}" |
|
|
|
def prometheus_alert_fires_for_failing_probe(): |
|
for dep in ("prometheus", "blackbox-exporter"): |
|
healthy, msg = check_deployment_health(dep, "observability") |
|
if not healthy: |
|
return False, msg |
|
|
|
port_forward("svc", "prometheus", "observability", 9090, 9090) |
|
|
|
for _ in range(18): |
|
code, out, _ = sh( |
|
"curl -s http://localhost:9090/api/v1/alerts" |
|
) |
|
|
|
if ( |
|
code == 0 |
|
and "SyntheticProbeFailure" in out |
|
and "does-not-exist.devops.local" in out |
|
): |
|
return True, "SyntheticProbeFailure alert is firing" |
|
|
|
time.sleep(8) |
|
|
|
return False, "SyntheticProbeFailure alert did not fire" |
|
|
|
|
|
def grafana_has_prometheus_datasource(): |
|
healthy, msg = check_deployment_health("grafana", "observability") |
|
if not healthy: |
|
return False, msg |
|
|
|
if not wait_for_grafana_api(): |
|
return False, "Grafana API not reachable" |
|
|
|
for _ in range(6): |
|
code, out, _ = sh( |
|
"kubectl exec -n observability deploy/grafana -- " |
|
"wget -qO- --header='Authorization: Basic YWRtaW46YWRtaW4=' " |
|
"http://localhost:3000/api/datasources" |
|
) |
|
|
|
|
|
if code == 0 and "Prometheus" in out: |
|
return True, "Grafana Prometheus datasource configured" |
|
time.sleep(5) |
|
|
|
return False, "Grafana Prometheus datasource missing" |
|
|
|
|
|
def grafana_has_blackbox_dashboard(): |
|
healthy, msg = check_deployment_health("grafana", "observability") |
|
if not healthy: |
|
return False, msg |
|
|
|
if not wait_for_grafana_api(): |
|
return False, "Grafana API not reachable" |
|
|
|
for _ in range(6): |
|
code, out, _ = sh( |
|
"kubectl exec -n observability deploy/grafana -- " |
|
"wget -qO- --header='Authorization: Basic YWRtaW46YWRtaW4=' " |
|
"http://localhost:3000/api/search" |
|
) |
|
|
|
|
|
if code == 0 and any(kw in out for kw in [ |
|
"Synthetic", "Blackbox", "Probe", "Endpoint" |
|
]): |
|
return True, "Grafana dashboard for synthetic probes exists" |
|
time.sleep(5) |
|
|
|
return False, "Grafana dashboard missing" |
|
|
|
|
|
def prometheus_uses_pvc(): |
|
code, out, _ = sh( |
|
"kubectl get deployment prometheus " |
|
"-n observability -o json" |
|
) |
|
if code != 0: |
|
return False, "Prometheus deployment not found" |
|
|
|
data = json.loads(out) |
|
volumes = data["spec"]["template"]["spec"].get("volumes", []) |
|
mounts = data["spec"]["template"]["spec"]["containers"][0].get("volumeMounts", []) |
|
|
|
pvc_used = any(v.get("persistentVolumeClaim") for v in volumes) |
|
mounted = any(m.get("mountPath") == "/prometheus" for m in mounts) |
|
|
|
if pvc_used and mounted: |
|
return True, "Prometheus is using persistent storage" |
|
|
|
return False, "Prometheus PVC is not mounted at /prometheus" |
|
|
|
|
|
def alert_rule_identifies_endpoint(): |
|
"""Verify alert annotations reference the failing endpoint.""" |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
# Alerts must reference the endpoint in annotations so operators |
|
# can identify which endpoint failed |
|
has_label_template = re.search( |
|
r'\{\{\s*\$labels\.(instance|target)\s*\}\}', out |
|
) |
|
|
|
if has_label_template: |
|
return True, "Alert annotations identify the failing endpoint" |
|
|
|
return False, ( |
|
"Alert annotations must reference the failing endpoint " |
|
"(e.g., {{ $labels.instance }}) for operational use" |
|
) |
|
|
|
|
|
def alert_has_minimum_duration(): |
|
"""Verify alert rule has for: 2m or greater duration""" |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
duration_pattern = r'for:\s*([2-9]|[1-9]\d+)m' |
|
|
|
if re.search(duration_pattern, out): |
|
return True, "Alert rule has correct minimum duration (>=2m)" |
|
|
|
return False, "Alert rule must fire 'for: 2m' or longer (not immediate)" |
|
|
|
|
|
def prometheus_alert_is_per_endpoint(): |
|
|
|
for dep in ("prometheus", "blackbox-exporter"): |
|
healthy, msg = check_deployment_health(dep, "observability") |
|
if not healthy: |
|
return False, msg |
|
|
|
for _ in range(18): |
|
code, out, _ = sh( |
|
"kubectl exec -n observability deploy/prometheus -- " |
|
"wget -qO- http://localhost:9090/api/v1/alerts" |
|
) |
|
if code != 0 or "SyntheticProbeFailure" not in out: |
|
time.sleep(8) |
|
continue |
|
|
|
try: |
|
data = json.loads(out) |
|
alerts = data.get("data", {}).get("alerts", []) |
|
synthetic = [ |
|
a for a in alerts |
|
if a.get("labels", {}).get("alertname") == "SyntheticProbeFailure" |
|
] |
|
|
|
if len(synthetic) < 1: |
|
time.sleep(8) |
|
continue |
|
|
|
instances = { |
|
a.get("labels", {}).get("instance", "") for a in synthetic |
|
} |
|
if not all(instances): |
|
return False, ( |
|
"SyntheticProbeFailure alerts lack instance labels — " |
|
"alerting is not scoped per endpoint" |
|
) |
|
|
|
firing = { |
|
a.get("labels", {}).get("instance", "") |
|
for a in synthetic if a.get("state") == "firing" |
|
} |
|
|
|
has_failing = any("does-not-exist" in i for i in firing) |
|
|
|
if has_failing: |
|
return True, ( |
|
"Alerts fire per endpoint (failing endpoint alerts " |
|
"independently with instance labels)" |
|
) |
|
|
|
time.sleep(8) |
|
|
|
except (json.JSONDecodeError, KeyError): |
|
time.sleep(10) |
|
continue |
|
|
|
return False, "Could not verify per-endpoint alert scoping" |
|
|
|
|
|
def prometheus_does_not_use_up_metric(): |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
if " up " in out or "up==" in out or "up ==" in out: |
|
return False, "Alerting incorrectly uses exporter 'up' metric" |
|
|
|
return True, "Alerting correctly avoids exporter 'up' metric" |
|
|
|
|
|
def check_endpoint_count(): |
|
"""Verify at least 3 probe targets are configured.""" |
|
code, out, _ = sh( |
|
"kubectl get configmap prometheus-config " |
|
"-n observability -o jsonpath='{.data.prometheus\\.yml}'" |
|
) |
|
if code != 0: |
|
return False, "Prometheus config not readable" |
|
|
|
out_lower = out.lower() |
|
|
|
required_targets = { |
|
'argocd': 'argocd' in out_lower, |
|
'kubernetes_api': 'kubernetes.default' in out_lower, |
|
'test_endpoint': 'does-not-exist' in out_lower, |
|
} |
|
|
|
missing = [k for k, found in required_targets.items() if not found] |
|
|
|
if missing: |
|
return False, f"Missing probe targets: {missing}" |
|
|
|
return True, "All required endpoints configured" |
|
|
|
|
|
def check_grafana_dashboard_semantics(): |
|
"""Verify Grafana dashboard uses correct semantic patterns for synthetic monitoring.""" |
|
code, out, _ = sh( |
|
"kubectl get configmap grafana-dashboards " |
|
"-n observability -o yaml" |
|
) |
|
if code != 0: |
|
return False, "grafana-dashboards ConfigMap not readable" |
|
|
|
issues = [] |
|
|
|
# ------------------------------------------------------------------ |
|
# Check 1: dashboard must show availability data (not raw binary) |
|
# ------------------------------------------------------------------ |
|
has_availability_metric = ( |
|
"probe_success" in out |
|
or re.search(r"probe:[a-zA-Z_]*availab", out) |
|
or re.search(r"probe:[a-zA-Z_]*success", out) |
|
) |
|
|
|
if has_availability_metric: |
|
if "probe_success" in out: |
|
has_time_agg = any(fn in out for fn in [ |
|
"avg_over_time", |
|
"min_over_time", |
|
"max_over_time", |
|
"sum_over_time", |
|
]) |
|
if not has_time_agg: |
|
issues.append( |
|
"Dashboard uses raw probe_success without time " |
|
"aggregation (expected avg_over_time or similar)" |
|
) |
|
else: |
|
issues.append( |
|
"Dashboard does not reference probe availability metrics " |
|
"(probe_success or a recording rule like probe:availability)" |
|
) |
|
|
|
# ------------------------------------------------------------------ |
|
# Check 2: per-endpoint breakdown (instance / target) |
|
# ------------------------------------------------------------------ |
|
has_grouping = any(x in out for x in [ |
|
"by (instance)", |
|
"by (target)", |
|
"$labels.instance", |
|
"$labels.target", |
|
"{{ instance }}", |
|
"{{instance}}", |
|
"{{ target }}", |
|
"{{target}}", |
|
]) |
|
|
|
if not has_grouping: |
|
issues.append( |
|
"Dashboard does not show per-endpoint breakdown " |
|
"(missing by(instance), legendFormat with {{ instance }}, " |
|
"or target label usage)" |
|
) |
|
|
|
# ------------------------------------------------------------------ |
|
# Check 3: availability not shown as raw binary signal |
|
# ------------------------------------------------------------------ |
|
has_normalized = ( |
|
# Percentage form (e.g., * 100) |
|
any(x in out for x in ["* 100", "*100", "100 *"]) |
|
# Or [0,1] normalized via time aggregation (avg_over_time already |
|
# produces a continuous availability ratio, not binary) |
|
or any(fn in out for fn in [ |
|
"avg_over_time", "min_over_time", "max_over_time", |
|
]) |
|
# Or uses a recording rule that pre-computes availability |
|
or re.search(r"probe:[a-zA-Z_]+:", out) |
|
) |
|
|
|
if not has_normalized: |
|
issues.append( |
|
"Dashboard should represent availability as a normalized " |
|
"measure (e.g., avg_over_time for ratio or * 100 for percentage)" |
|
) |
|
|
|
# ------------------------------------------------------------------ |
|
# Check 4: response-time / latency metrics present |
|
# ------------------------------------------------------------------ |
|
latency_metrics = [ |
|
"probe_duration_seconds", |
|
"probe_http_duration_seconds", |
|
"probe_tcp_connection_duration_seconds", |
|
"probe_dns_lookup_time_seconds", |
|
] |
|
|
|
has_latency = any(metric in out for metric in latency_metrics) |
|
|
|
if not has_latency: |
|
issues.append( |
|
"Dashboard should include response-time metrics " |
|
"(e.g. probe_duration_seconds or protocol-specific durations)" |
|
) |
|
|
|
# ------------------------------------------------------------------ |
|
# Final result |
|
# ------------------------------------------------------------------ |
|
if issues: |
|
return False, "; ".join(issues) |
|
|
|
return True, ( |
|
"Dashboard uses time-aggregated probe metrics, per-endpoint breakdown, " |
|
"availability percentage, and latency visualization" |
|
) |
|
|
|
|
|
def grade(transcript: str) -> GradingResult: |
|
feedback = [] |
|
|
|
# ------------------------ |
|
# Gate checks (non-scored) |
|
# ------------------------ |
|
gate_checks = [ |
|
# Resource existence |
|
lambda: deployment_exists("blackbox-exporter", "observability"), |
|
lambda: service_exists("blackbox-exporter", "observability", 9115), |
|
lambda: configmap_exists("blackbox-config", "observability"), |
|
lambda: deployment_exists("prometheus", "observability"), |
|
lambda: configmap_exists("prometheus-config", "observability"), |
|
lambda: pvc_exists("prometheus-data", "observability"), |
|
blackbox_config_has_required_modules, |
|
prometheus_scrape_interval_valid, |
|
|
|
# Image correctness |
|
lambda: deployment_uses_image( |
|
"blackbox-exporter", "observability", |
|
"prom/blackbox-exporter:v0.25.0", |
|
), |
|
lambda: deployment_uses_image( |
|
"prometheus", "observability", |
|
"prom/prometheus:v3.8.1", |
|
), |
|
lambda: deployment_uses_image( |
|
"grafana", "observability", |
|
"grafana/grafana:12.2", |
|
), |
|
|
|
# Core operational |
|
blackbox_metrics_exposed, |
|
prometheus_has_probe_metrics, |
|
prometheus_blackbox_relabeling_present, |
|
argocd_probe_success, |
|
kubernetes_api_tcp_probe_configured, |
|
prometheus_uses_pvc, |
|
|
|
# Basic config quality |
|
grafana_has_prometheus_datasource, |
|
check_alert_severity_labels, |
|
check_alert_annotations, |
|
check_alert_for_duration, |
|
alert_has_minimum_duration, |
|
prometheus_does_not_use_up_metric, |
|
check_slo_burn_rate_alerts, |
|
check_blackbox_modules, |
|
] |
|
|
|
for fn in gate_checks: |
|
try: |
|
ok, msg = fn() |
|
except Exception as e: |
|
ok = False |
|
msg = str(e) |
|
|
|
feedback.append(("✓ " if ok else "✗ ") + msg) |
|
|
|
# ------------------------ |
|
# Scored checks (partial) |
|
# ------------------------ |
|
scored_checks = { |
|
"grafana_dashboard_present": grafana_has_blackbox_dashboard, |
|
"endpoint_count": check_endpoint_count, |
|
"grafana_dashboard_semantics": check_grafana_dashboard_semantics, |
|
"failing_in_alert": prometheus_alert_fires_for_failing_probe, |
|
"alert_identifies_endpoint": alert_rule_identifies_endpoint, |
|
"per_endpoint": prometheus_alert_is_per_endpoint, |
|
"records": check_recording_rules, |
|
"recording_rules": check_dashboard_uses_recording_rules, |
|
} |
|
|
|
subscores = {} |
|
|
|
for key, fn in scored_checks.items(): |
|
try: |
|
ok, msg = fn() |
|
except Exception as e: |
|
ok = False |
|
msg = str(e) |
|
|
|
subscores[key] = 1.0 if ok else 0.0 |
|
feedback.append(("✓ " if ok else "✗ ") + msg) |
|
|
|
# |
|
total_checks = len(scored_checks) |
|
weight = 1.0 / total_checks |
|
weights = {k: weight for k in scored_checks} |
|
|
|
score = sum(subscores[k] * weights[k] for k in subscores) |
|
|
|
return GradingResult( |
|
score=round(score, 4), |
|
subscores=subscores, |
|
weights=weights, |
|
feedback=" | ".join(feedback), |
|
) |