Skip to content

Instantly share code, notes, and snippets.

@eriksywu
Created March 12, 2026 14:21
Show Gist options
  • Select an option

  • Save eriksywu/91944e4d39f4a8d0bef6edb4f4316011 to your computer and use it in GitHub Desktop.

Select an option

Save eriksywu/91944e4d39f4a8d0bef6edb4f4316011 to your computer and use it in GitHub Desktop.
sidecar_metrics_profiler.py
#!/usr/bin/env python3
"""Profile which Chronosphere entities reference sidecar metrics across tenants."""
import argparse
import csv
import json
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
def parse_metrics(csv_path):
"""Extract unique matched_metric_name values, their services, and metric types from the CSV.
Returns (sorted_metrics, metric_to_service, metric_to_type).
"""
metric_to_service = {}
metric_to_type = {}
with open(csv_path, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
# Stop at the ignore marker
if row.get("service", "").strip().startswith("IGNORE BELOW FOR NOW"):
break
name = row.get("matched_metric_name", "").strip()
service = row.get("service", "").strip()
metric_type = row.get("metric_type", "").strip()
if name and name != "-":
metric_to_service[name] = service
metric_to_type[name] = metric_type
return sorted(metric_to_service.keys()), metric_to_service, metric_to_type
def query_references(metric, tenant):
"""Run droidcli to get entity references for a metric in a tenant."""
cmd = [
"droidcli", "metrics-profiler", "references",
"--metric-name", metric,
"--include-chronosphere-entities",
f"--environment={tenant}",
"--format", "json",
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
print(f" [WARN] droidcli failed for {metric}@{tenant}: {result.stderr.strip()}", file=sys.stderr)
return []
output = result.stdout.strip()
if not output:
return []
return json.loads(output)
except subprocess.TimeoutExpired:
print(f" [WARN] droidcli timed out for {metric}@{tenant}", file=sys.stderr)
return []
except json.JSONDecodeError as e:
print(f" [WARN] bad JSON for {metric}@{tenant}: {e}", file=sys.stderr)
return []
def classify_slug(slug):
"""Classify an entity slug as internal, managed, or customer."""
if slug.startswith("__chronosphere_internal"):
return "internal"
if slug.startswith("__chronosphere_managed"):
return "managed"
return "customer"
def main():
parser = argparse.ArgumentParser(description="Profile sidecar metric references across tenants")
parser.add_argument("--csv", default=str(Path.home() / "sidecar_metrics.csv"),
help="Path to sidecar_metrics.csv")
parser.add_argument("--tenants", nargs="+", default=["doordash", "yahoo", "affirm"],
help="Tenants to query")
parser.add_argument("--output", default=str(Path.home() / "Downloads" / "sidecar_metrics_profiler_output.csv"),
help="Path to output CSV file")
parser.add_argument("--no-output-raw", dest="output_raw", action="store_false", default=True,
help="Disable raw per-reference CSV output")
args = parser.parse_args()
metrics, metric_to_service, metric_to_type = parse_metrics(args.csv)
print(f"Found {len(metrics)} unique metrics to profile", file=sys.stderr)
# results[metric][(source, ownership)] = count
results = defaultdict(lambda: defaultdict(int))
raw_rows = []
total = len(metrics) * len(args.tenants)
done = 0
for metric in metrics:
for tenant in args.tenants:
done += 1
print(f" [{done}/{total}] {metric} @ {tenant}", file=sys.stderr)
refs = query_references(metric, tenant)
for ref in refs:
source = ref.get("Source", "unknown")
ownership = classify_slug(ref.get("EntitySlug", ""))
results[metric][(source, ownership)] += 1
if args.output_raw:
raw_rows.append({
"Metric": metric,
"Service": metric_to_service.get(metric, ""),
"MetricType": metric_to_type.get(metric, ""),
"Tenant": ref.get("Tenant", tenant),
"Cluster": ref.get("Cluster", ""),
"Source": source,
"EntitySlug": ref.get("EntitySlug", ""),
"Ownership": ownership,
"Query": ref.get("Query", ""),
})
# Print summary table
print()
print(f"{'Metric':<55} {'Service':<25} {'MetricType':<12} {'Source':<20} {'Ownership':<12} {'Count':>5}")
print("-" * 132)
for metric in sorted(results.keys()):
groups = results[metric]
service = metric_to_service.get(metric, "")
mtype = metric_to_type.get(metric, "")
first = True
for (source, ownership), count in sorted(groups.items(), key=lambda x: (-x[1], x[0])):
label = metric if first else ""
svc = service if first else ""
mt = mtype if first else ""
print(f"{label:<55} {svc:<25} {mt:<12} {source:<20} {ownership:<12} {count:>5}")
first = False
# Also print metrics with zero references
no_refs = sorted(set(metrics) - set(results.keys()))
if no_refs:
print()
print("Metrics with no references found:")
for m in no_refs:
print(f" {m} (service={metric_to_service.get(m, '')}, type={metric_to_type.get(m, '')})")
# Write CSV output
with open(args.output, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Metric", "Service", "MetricType", "Source", "Ownership", "Count"])
for metric in sorted(results.keys()):
service = metric_to_service.get(metric, "")
mtype = metric_to_type.get(metric, "")
for (source, ownership), count in sorted(results[metric].items(), key=lambda x: (-x[1], x[0])):
writer.writerow([metric, service, mtype, source, ownership, count])
for m in no_refs:
writer.writerow([m, metric_to_service.get(m, ""), metric_to_type.get(m, ""), "", "", 0])
print(f"\nCSV written to {args.output}", file=sys.stderr)
if args.output_raw and raw_rows:
raw_path = Path(args.output).with_name(Path(args.output).stem + "_raw.csv")
with open(raw_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["Metric", "Service", "MetricType", "Tenant", "Cluster", "Source", "EntitySlug", "Ownership", "Query"])
writer.writeheader()
writer.writerows(raw_rows)
print(f"Raw CSV written to {raw_path}", file=sys.stderr)
if __name__ == "__main__":
main()%
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment