Skip to content

Instantly share code, notes, and snippets.

@ryanmaclean
Last active February 17, 2026 17:56
Show Gist options
  • Select an option

  • Save ryanmaclean/84f50d6474b11ef6795fa0262d5ef93a to your computer and use it in GitHub Desktop.

Select an option

Save ryanmaclean/84f50d6474b11ef6795fa0262d5ef93a to your computer and use it in GitHub Desktop.
macOS Tahoe logging with Datadog (Vector + Unified Log noise control)
#!/usr/bin/env python3
"""Analyze unsquelched macOS Unified Log noise drift from Datadog logs."""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import urllib.error
import urllib.request
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
@dataclass
class LogRecord:
timestamp: str
service: str
subsystem: str
status: str
message: str
def find_repo_root(start: Path) -> Path:
for current in [start, *start.parents]:
if (current / ".git").exists():
return current
raise RuntimeError("Could not locate repository root")
def request_json(url: str, api_key: str, app_key: str, body: dict) -> dict:
req = urllib.request.Request(
url=url,
method="POST",
headers={
"DD-API-KEY": api_key,
"DD-APPLICATION-KEY": app_key,
"Content-Type": "application/json",
},
data=json.dumps(body).encode("utf-8"),
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Datadog API error HTTP {exc.code}: {detail}") from exc
def parse_duration(text: str) -> timedelta:
m = re.fullmatch(r"(\d+)([hd])", text.strip())
if not m:
raise ValueError("Duration must be like 24h or 7d")
value = int(m.group(1))
unit = m.group(2)
if unit == "h":
return timedelta(hours=value)
return timedelta(days=value)
def normalize_message(message: str) -> str:
msg = message
msg = re.sub(r"\b[0-9a-fA-F]{8,}\b", "<hex>", msg)
msg = re.sub(r"\b\d+\b", "<n>", msg)
msg = re.sub(r"\"[^\"]+\"", "\"<str>\"", msg)
msg = re.sub(r"\s+", " ", msg).strip()
return msg
def extract_records(payload: dict) -> list[LogRecord]:
out: list[LogRecord] = []
for item in payload.get("data", []):
attrs = item.get("attributes", {})
nested = attrs.get("attributes", {}) if isinstance(attrs.get("attributes"), dict) else {}
message = attrs.get("message") or nested.get("message") or ""
service = attrs.get("service") or nested.get("service") or "unknown"
subsystem = nested.get("subsystem") or attrs.get("subsystem") or "unknown"
status = attrs.get("status") or "unknown"
ts = attrs.get("timestamp") or ""
out.append(LogRecord(timestamp=ts, service=service, subsystem=subsystem, status=status, message=message))
return out
def fetch_logs(site: str, api_key: str, app_key: str, query: str, from_ms: int, to_ms: int, max_events: int) -> list[LogRecord]:
url = f"https://api.{site}/api/v2/logs/events/search"
cursor = None
results: list[LogRecord] = []
while len(results) < max_events:
limit = min(1000, max_events - len(results))
body = {
"filter": {
"query": query,
"from": str(from_ms),
"to": str(to_ms),
},
"sort": "timestamp",
"page": {"limit": limit},
}
if cursor:
body["page"]["cursor"] = cursor
payload = request_json(url, api_key, app_key, body)
batch = extract_records(payload)
if not batch:
break
results.extend(batch)
cursor = (((payload.get("meta") or {}).get("page") or {}).get("after"))
if not cursor:
break
return results
def write_reports(
repo_root: Path,
now: datetime,
duration: str,
query: str,
max_events: int,
records: list[LogRecord],
min_count: int,
) -> tuple[Path, Path]:
reports_dir = repo_root / "reports" / "observability"
reports_dir.mkdir(parents=True, exist_ok=True)
stamp = now.strftime("%Y%m%dT%H%M%SZ")
json_path = reports_dir / f"macos-noise-drift-{stamp}.json"
md_path = reports_dir / f"macos-noise-drift-{stamp}.md"
service_counts = Counter(r.service for r in records)
subsystem_counts = Counter(r.subsystem for r in records)
message_counts = Counter((r.service, normalize_message(r.message)) for r in records if r.message)
candidates = []
for (service, pattern), count in message_counts.most_common():
if count < min_count:
continue
candidates.append(
{
"service": service,
"count": count,
"pattern": pattern,
"suggested_query": f'service:{service} @message:"{pattern[:120]}"',
}
)
payload = {
"generated_at": now.isoformat(),
"duration": duration,
"query": query,
"max_events": max_events,
"total_events": len(records),
"top_services": service_counts.most_common(30),
"top_subsystems": subsystem_counts.most_common(30),
"candidate_noise_patterns": candidates[:50],
"samples": [
{
"timestamp": r.timestamp,
"service": r.service,
"subsystem": r.subsystem,
"status": r.status,
"message": r.message[:500],
}
for r in records[:50]
],
}
json_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
lines = [
"# macOS Unified Log Noise Drift Report",
"",
f"- Generated: `{now.isoformat()}`",
f"- Window: `{duration}`",
f"- Query: `{query}`",
f"- Events analyzed: `{len(records)}` (max `{max_events}`)",
"",
"## Top Services",
"",
]
for name, count in service_counts.most_common(20):
lines.append(f"- `{name}`: {count}")
lines.extend(["", "## Top Subsystems", ""])
for name, count in subsystem_counts.most_common(20):
lines.append(f"- `{name}`: {count}")
lines.extend(["", "## Candidate New Noise Patterns", ""])
if candidates:
for c in candidates[:20]:
lines.append(f"- `{c['service']}` ({c['count']}): `{c['pattern']}`")
else:
lines.append("- No recurring unsquelched patterns above threshold.")
lines.extend(
[
"",
"## Suggested Actions",
"",
"- Promote recurring benign patterns into Vector `is_known_noise_msg` filters.",
"- Mirror the same patterns into Datadog `noise_class` categories.",
"- Keep faults and app logs unsuppressed to preserve incident signal.",
]
)
md_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return json_path, md_path
def main() -> int:
parser = argparse.ArgumentParser(description="Analyze 24h macOS Unified Log noise drift from Datadog.")
parser.add_argument("--duration", default="24h", help="Lookback window like 24h or 7d (default: 24h)")
parser.add_argument("--max-events", type=int, default=5000, help="Maximum events to fetch (default: 5000)")
parser.add_argument("--min-count", type=int, default=3, help="Minimum repeats to suggest as candidate (default: 3)")
parser.add_argument(
"--query",
default="source:macos.unifiedlog status:error -@noise_class:*",
help="Datadog log query (default focuses on unsquelched errors)",
)
args = parser.parse_args()
dd_api_key = os.getenv("DD_API_KEY", "").strip()
dd_app_key = os.getenv("DD_APP_KEY", "").strip()
dd_site = os.getenv("DD_SITE", "datadoghq.com").strip() or "datadoghq.com"
if not dd_api_key or not dd_app_key:
print("[ERROR] DD_API_KEY and DD_APP_KEY must be set", file=sys.stderr)
return 1
delta = parse_duration(args.duration)
now = datetime.now(timezone.utc)
start = now - delta
from_ms = int(start.timestamp() * 1000)
to_ms = int(now.timestamp() * 1000)
repo_root = find_repo_root(Path(__file__).resolve())
records = fetch_logs(dd_site, dd_api_key, dd_app_key, args.query, from_ms, to_ms, args.max_events)
json_path, md_path = write_reports(repo_root, now, args.duration, args.query, args.max_events, records, args.min_count)
print(json.dumps({"json_report": str(json_path), "markdown_report": str(md_path), "events_analyzed": len(records)}, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
{
"name": "macOS Unified Log Noise Classifier",
"is_enabled": true,
"filter": {
"query": "source:macos.unifiedlog"
},
"processors": [
{
"type": "category-processor",
"name": "Classify known non-actionable macOS noise",
"is_enabled": true,
"target": "noise_class",
"categories": [
{
"name": "runningboard_identity_chatter",
"filter": {
"query": "service:runningboardd @message:\"Two equal instances have unequal identities\""
}
},
{
"name": "token_generation_inference_chatter",
"filter": {
"query": "service:TGOnDeviceInferenceProviderService (@message:\"Unable to find node for token\" OR @message:\"Attempting to release asset\")"
}
},
{
"name": "imagent_coredata_xpc_chatter",
"filter": {
"query": "service:imagent @message:\"CoreData: Unable to create token NSXPCConnection\""
}
},
{
"name": "findmy_cfprefs_sandbox_fault",
"filter": {
"query": "service:findmybeaconingd @message:\"CFPrefsPlistSource\""
}
},
{
"name": "apsd_entitlement_noise",
"filter": {
"query": "service:apsd (@message:\"ultra constrained topics\" OR @message:\"lacks APSConnectionInitiateEntitlement\")"
}
},
{
"name": "apsd_connection_chatter",
"filter": {
"query": "service:apsd (@message:\"Unexpected nilling of connection via setter\" OR @message:\"Unexpected replacement of connection\")"
}
},
{
"name": "akd_attestation_noise",
"filter": {
"query": "service:akd @message:\"Attestation map does not contain the cert attestation\""
}
},
{
"name": "cloudd_ckoperation_fallback_noise",
"filter": {
"query": "service:cloudd @message:\"Unknown proxied class, falling back\""
}
},
{
"name": "worldclock_mobiletimer_connection_chatter",
"filter": {
"query": "service:WorldClockWidget (@message:\"connection invalidated\" OR @message:\"not retrying invalidated connection\" OR @message:\"will retry connection because the response to the lifecycle notification failed\")"
}
},
{
"name": "fseventsd_missing_bundleid_noise",
"filter": {
"query": "service:fseventsd @message:\"fsevent_add_client: no bundle id available for pid\""
}
},
{
"name": "pkd_persona_generation_noise",
"filter": {
"query": "service:pkd (@message:\"persona generation ID unavailable\" OR @message:\"could not create extension point record\")"
}
},
{
"name": "containermanagerd_diagnosticd_filter_noise",
"filter": {
"query": "service:containermanagerd (@message:\"failed to read /Library/Preferences/Logging/com.apple.diagnosticd.filter.plist\" OR @message:\"Query result: count = 0, error = (null)\" OR @message:\"declares its class to be [2] while scanning for class [7]\")"
}
},
{
"name": "backupd_helper_capability_permission_noise",
"filter": {
"query": "service:backupd-helper @message:\"Failed to read capabilities for\" @message:\"Operation not permitted\""
}
}
]
}
]
}

macOS Tahoe Logging With Datadog (Vector + Unified Log)

Goal: get macOS Unified Logs into Datadog without drowning in high-volume system chatter.

What we do:

  • Local (Vector): collect, normalize, redact, and drop obviously-benign spam before it leaves the host.
  • Remote (Datadog pipeline): tag known benign patterns as @noise_class so dashboards/monitors can exclude them consistently.

Prerequisites

  • macOS Tahoe
  • Homebrew
  • Vector (brew install vectordotdev/brew/vector)
  • Datadog Agent installed
  • Environment variables set in shell/session for Datadog API usage:
    • DD_API_KEY
    • DD_APP_KEY
    • DD_SITE (for example datadoghq.com)

Setup (One Command)

From repo root:

python3 scripts/datadog/setup_macos_tahoe_logging.py --upsert-noise-pipeline

Preview-only mode (no writes / no API calls):

python3 scripts/datadog/setup_macos_tahoe_logging.py --no-apply-vector --upsert-noise-pipeline --dry-run-pipeline

Where The Rules Live (Source Of Truth)

The authoritative pipeline payload lives here:

  • config/observability/datadog-macos-unifiedlog-noise-pipeline.json

You can upsert it with:

python3 scripts/datadog/upsert_macos_noise_pipeline.py

Verify

python3 .claude/skills/macos-unifiedlog-vector/scripts/analyze_noise.py --limit 25 --max-lines 200000
bash scripts/datadog/verify-setup.sh
python3 scripts/datadog/analyze_macos_noise_drift.py --duration 24h --max-events 5000 --min-count 3

Query Patterns

Hide noise-classed logs:

source:macos.unifiedlog -@noise_class:*

Only noise-classed logs (for auditing rules):

source:macos.unifiedlog @noise_class:*

Apple Guidance Used


Appendix: Noise Producers + Why Noise/Chatter

These producers are macOS system services that can emit high-volume log lines (often at error) that typically do not require operator action in a dev workstation context.

  • runningboard_identity_chatter
    • Producer: runningboardd (process lifecycle/resource assertions).
    • Why: repetitive identity-mismatch diagnostics; usually self-resolving.
  • token_generation_inference_chatter
    • Producer: TGOnDeviceInferenceProviderService (on-device inference asset lifecycle).
    • Why: expected asset/token retry churn during background transitions.
  • imagent_coredata_xpc_chatter
    • Producer: imagent (Messages identity/account sync).
    • Why: transient CoreData/XPC token setup issues under account/network churn.
  • findmy_cfprefs_sandbox_fault
    • Producer: findmybeaconingd.
    • Why: recurring sandbox/preferences faults often without user-visible degradation.
  • apsd_entitlement_noise
    • Producer: apsd (Apple Push Service daemon).
    • Why: entitlement capability checks common on dev/non-internal contexts.
  • apsd_connection_chatter
    • Producer: apsd.
    • Why: reconnect/replacement messages during normal backoff behavior.
  • akd_attestation_noise
    • Producer: akd (Apple account/auth daemon).
    • Why: recurring attestation warnings during identity refresh.
  • cloudd_ckoperation_fallback_noise
    • Producer: cloudd (CloudKit daemon).
    • Why: compatibility fallback selection logs; commonly benign.
  • worldclock_mobiletimer_connection_chatter
    • Producer: WorldClockWidget via MobileTimer XPC.
    • Why: widget lifecycle/XPC reconnect chatter is frequent and low-value.
  • fseventsd_missing_bundleid_noise
    • Producer: fseventsd.
    • Why: many clients lack bundle IDs; noisy but expected.
  • pkd_persona_generation_noise
    • Producer: pkd / PlugInKit.
    • Why: persona generation/extension point record retries during registry churn.
  • containermanagerd_diagnosticd_filter_noise
    • Producer: containermanagerd.
    • Why: benign periodic probes (diagnostic filter access / empty query results).
  • backupd_helper_capability_permission_noise
    • Producer: backupd-helper / Time Machine helper.
    • Why: capability/permission probes against volumes often fail by design and retry.
# Stream macOS Unified Logs, reduce noise, redact common secrets, and write JSONL for Datadog Agent to tail.
#
# Notes:
# - This runs as a user LaunchAgent (not a system daemon).
# - Keep volume sane: prefer targeted drops and "faults-only" strategies for noisy daemons.
data_dir = "__VECTOR_DATA_DIR__"
[sources.macos_unifiedlog]
type = "exec"
command = [
"/usr/bin/log", "stream",
"--style", "ndjson",
"--color", "none",
"--level", "debug",
"--type", "log",
"--predicate",
# Keep errors/faults everywhere, plus full logs for our main processes.
"(messageType == 'Error' OR messageType == 'Fault') OR (process IN {'openclaw-gateway','openclaw','node','python3','python','datadog-agent','vector','CodexBar'})",
]
mode = "streaming"
[transforms.parse_json]
type = "remap"
inputs = ["macos_unifiedlog"]
source = '''
parsed = parse_json(.message) ?? null
if is_null(parsed) {
._drop = true
} else {
. = parsed
}
'''
[transforms.keep_relevant]
type = "filter"
inputs = ["parse_json"]
condition = '''
mt = downcase(to_string(.messageType) ?? "")
proc = to_string(.process) ?? ""
subsystem = to_string(.subsystem) ?? ""
msg = to_string(.eventMessage) ?? to_string(.message) ?? ""
# Derive process name when Unified Log omits `.process`.
if proc == "" {
proc_path = to_string(.processImagePath) ?? ""
if proc_path == "" { proc_path = to_string(.senderImagePath) ?? "" }
if proc_path != "" {
m = parse_regex(proc_path, r'.*/(?P<name>[^/]+)$') ?? {}
proc = to_string(m.name)
}
}
is_high_value = includes(["openclaw-gateway", "openclaw", "datadog-agent", "vector", "CodexBar"], proc)
is_lang_runtime = includes(["node", "python3", "python"], proc)
is_non_apple = subsystem != "" && !starts_with(subsystem, "com.apple.")
is_chatter_subsystem = includes([
"com.apple.syspolicy.exec",
"com.apple.libsqlite3",
"com.apple.bluetooth",
"com.apple.bluetoothuser",
"com.apple.contacts",
"com.apple.TCC",
"com.apple.locationd.Core",
"com.apple.apsd",
"com.apple.icloud.searchpartyd",
"com.apple.SkyLight",
"com.apple.TextInputUI",
"com.apple.runningboard",
"com.apple.sharing",
"com.apple.WiFiManager",
"com.apple.network",
"com.apple.defaults",
"com.apple.coreaudio",
"com.apple.coreanimation",
"com.apple.CFPasteboard",
"com.apple.powerd",
], subsystem)
is_chatter_process = includes([
"syspolicyd",
"PerfPowerServices",
"kernel",
"bluetoothd",
"contactsd",
"locationd",
"apsd",
"searchpartyd",
"rapportd",
"runningboardd",
"airportd",
"biomesyncd",
"nfcd",
"sharingd",
"duetexpertd",
"secd",
"mDNSResponder",
"WeatherWidget",
"coreaudiod",
"mediaremoted",
"accessoryd",
"bluetoothuserd",
"loginwindow",
"WindowServer",
"ControlCenter",
"powerd",
"AddressBookSourceSync",
"siriactionsd",
"chronod",
"avconferenced",
], proc)
# Drop a very noisy, usually non-actionable WebKit cancellation error spam from CodexBar.
is_codexbar_webkit_cancel = (proc == "CodexBar" && subsystem == "com.apple.WebKit" && contains(msg, "didFailProvisionalLoadForFrame") && contains(msg, "code=-999"))
# Some Apple daemons are extremely noisy with low actionability; keep their faults but drop their errors.
is_faults_only_process = includes([
"imagent",
"mediaanalysisd",
"routined",
# Common high-volume Apple daemons that spam errors with low actionability in dev.
"triald",
"triald_system",
"runningboardd",
"deleted",
"spotlightknowledged",
"dasd",
"apsd",
"corespotlightd",
"backupd",
"parsec-fbf",
], proc)
is_faults_only_error = (is_faults_only_process && mt == "error")
# Additional known-noise message patterns (keep safety-critical data, drop repetitive system spam).
is_known_noise_msg =
(proc == "TGOnDeviceInferenceProviderService" && (contains(msg, "Unable to find node for token") || contains(msg, "Attempting to release asset") || contains(msg, "did not return expected metrics"))) ||
(proc == "runningboardd" && contains(msg, "Two equal instances have unequal identities")) ||
(proc == "imagent" && contains(msg, "CoreData: Unable to create token NSXPCConnection")) ||
(proc == "ContinuityCaptureAgent" && contains(msg, "No valid device to onboard")) ||
(proc == "akd" && contains(msg, "Attestation map does not contain the cert attestation")) ||
(proc == "cdpd" && contains(msg, "non-internal build, ignoring")) ||
(proc == "cloudd" && contains(msg, "Unknown proxied class, falling back to``CKOperation``")) ||
(proc == "WorldClockWidget" && (contains(msg, "connection invalidated") || contains(msg, "not retrying invalidated connection") || contains(msg, "will retry connection because the response to the lifecycle notification failed"))) ||
(proc == "fseventsd" && contains(msg, "fsevent_add_client: no bundle id available for pid")) ||
(proc == "apsd" && (contains(msg, "Unexpected nilling of connection via setter") || contains(msg, "Unexpected replacement of connection"))) ||
(proc == "suggestd" && (contains(msg, "Data type not yet set for stream rankedSocialHighlights") || contains(msg, "ContextKit error: Error Domain=com.apple.siri.context Code=7") || contains(msg, "Error in executeWithReply: Error Domain=com.apple.siri.context Code=7"))) ||
(proc == "textunderstandingd" && contains(msg, "could not be converted to a Document. Ignoring item.")) ||
(proc == "pkd" && (contains(msg, "persona generation ID unavailable") || contains(msg, "could not create extension point record"))) ||
(proc == "containermanagerd" && (contains(msg, "failed to read /Library/Preferences/Logging/com.apple.diagnosticd.filter.plist") || contains(msg, "Query result: count = 0, error = (null)") || contains(msg, "declares its class to be [2] while scanning for class [7]"))) ||
(proc == "backupd-helper" && contains(msg, "Failed to read capabilities for") && contains(msg, "Operation not permitted"))
!exists(._drop) &&
!is_codexbar_webkit_cancel &&
!is_faults_only_error &&
!is_known_noise_msg &&
(
is_high_value ||
mt == "fault" ||
(mt == "error" && !is_chatter_subsystem && !is_chatter_process) ||
(is_lang_runtime && (mt == "error" || mt == "fault")) ||
(is_non_apple && mt != "debug")
)
'''
[transforms.normalize_redact]
type = "remap"
inputs = ["keep_relevant"]
source = '''
mt = downcase(to_string(.messageType) ?? "")
proc = to_string(.process) ?? ""
msg = to_string(.eventMessage) ?? to_string(.message) ?? ""
# Best-effort redaction. (Don't treat this as a security boundary.)
msg = replace(msg, r'(?i)dd-api-key=[^,\\s]+' , "dd-api-key=REDACTED")
msg = replace(msg, r'(?i)dd_api_key=[^,\\s]+' , "dd_api_key=REDACTED")
msg = replace(msg, r'(?i)dd_app_key=[^,\\s]+' , "dd_app_key=REDACTED")
msg = replace(msg, r'(?i)api[_-]?key=[^,\\s]+' , "api_key=REDACTED")
msg = replace(msg, r'(?i)(authorization:\\s*bearer\\s+)[^,\\s]+' , "authorization: bearer REDACTED")
msg = replace(msg, r'(?i)(x-api-key=)[^,\\s]+' , "x-api-key=REDACTED")
msg = replace(msg, r'(?i)(x-api-key:\\s*)[^,\\s]+' , "x-api-key: REDACTED")
if proc == "" {
proc_path = to_string(.processImagePath) ?? ""
if proc_path == "" { proc_path = to_string(.senderImagePath) ?? "" }
if proc_path != "" {
m = parse_regex(proc_path, r'.*/(?P<name>[^/]+)$') ?? {}
proc = to_string(m.name)
}
}
.message = msg
.ddsource = "macos.unifiedlog"
.service = if proc != "" { proc } else { "macos" }
.status = if mt != "" { mt } else { "info" }
.process = proc
.subsystem = to_string(.subsystem) ?? null
.category = to_string(.category) ?? null
ts = if !is_null(.timestamp) { .timestamp } else { .time }
.timestamp = if is_string(ts) { ts } else { now() }
del(.eventMessage)
del(.messageType)
del(.senderImagePath)
del(.processImagePath)
del(.machTimestamp)
del(.formatString)
del(.timezoneName)
'''
[sinks.unifiedlog_file]
type = "file"
inputs = ["normalize_redact"]
path = "/tmp/macos-unifiedlog-%F-%H.jsonl"
encoding.codec = "json"
#!/usr/bin/env python3
"""Repeatable macOS Tahoe logging setup for Vector + Datadog."""
from __future__ import annotations
import argparse
import subprocess
import sys
from pathlib import Path
def find_repo_root(start: Path) -> Path:
for current in [start, *start.parents]:
if (current / ".git").exists():
return current
raise RuntimeError("Could not locate repository root from current path")
def run(cmd: list[str]) -> None:
subprocess.run(cmd, check=True)
def main() -> int:
parser = argparse.ArgumentParser(
description="Configure macOS Tahoe logging for Vector + Datadog in a repeatable way."
)
parser.add_argument("--no-apply-vector", action="store_true", help="Render-only Vector setup (no file writes).")
parser.add_argument(
"--upsert-noise-pipeline",
action="store_true",
help="Create/update Datadog macOS noise-classification pipeline.",
)
parser.add_argument(
"--dry-run-pipeline",
action="store_true",
help="Preview Datadog pipeline upsert without API writes.",
)
args = parser.parse_args()
repo_root = find_repo_root(Path(__file__).resolve())
setup_py = repo_root / ".claude" / "skills" / "macos-unifiedlog-vector" / "scripts" / "setup.py"
upsert_py = repo_root / "scripts" / "datadog" / "upsert_macos_noise_pipeline.py"
setup_cmd = [sys.executable, str(setup_py), "--print-datadog-conf"]
if args.no_apply_vector:
setup_cmd.append("--no-apply")
print("[1/3] Configuring Vector unified-log pipeline...")
run(setup_cmd)
print("\n[2/3] Install Datadog Agent tail config (sudo required):")
print("sudo mkdir -p /opt/datadog-agent/etc/conf.d/macos_unifiedlog.d")
print("sudo cp ~/.config/datadog/macos_unifiedlog.d_conf.yaml /opt/datadog-agent/etc/conf.d/macos_unifiedlog.d/conf.yaml")
print("sudo launchctl kickstart -k system/com.datadoghq.agent")
if args.upsert_noise_pipeline:
print("\n[3/3] Upserting Datadog noise classification pipeline...")
cmd = [sys.executable, str(upsert_py)]
if args.dry_run_pipeline:
cmd.append("--dry-run")
run(cmd)
else:
print("\n[3/3] Skipped Datadog noise pipeline upsert. Use --upsert-noise-pipeline to apply it.")
print("\nVerification:")
print("python3 .claude/skills/macos-unifiedlog-vector/scripts/analyze_noise.py --limit 25 --max-lines 200000")
print("bash scripts/datadog/verify-setup.sh")
print("python3 scripts/datadog/analyze_macos_noise_drift.py --duration 24h --max-events 5000 --min-count 3")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Create or update the macOS Unified Log noise classifier Datadog pipeline."""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.request
from pathlib import Path
def _request(method: str, url: str, api_key: str, app_key: str, body: bytes | None = None) -> dict:
headers = {
"DD-API-KEY": api_key,
"DD-APPLICATION-KEY": app_key,
}
if body is not None:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, method=method, data=body, headers=headers)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
data = resp.read().decode("utf-8")
return json.loads(data) if data else {}
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"{method} {url} failed: HTTP {exc.code}: {detail}") from exc
def main() -> int:
root_dir = Path(__file__).resolve().parents[2]
default_payload = root_dir / "config" / "observability" / "datadog-macos-unifiedlog-noise-pipeline.json"
parser = argparse.ArgumentParser(
description="Upsert Datadog Logs Pipeline for macOS Unified Log noise classification."
)
parser.add_argument("--file", default=str(default_payload), help="Pipeline JSON payload path")
parser.add_argument(
"--name",
default="macOS Unified Log Noise Classifier",
help="Pipeline name used for lookup",
)
parser.add_argument("--dry-run", action="store_true", help="Print action without API write")
args = parser.parse_args()
dd_api_key = os.getenv("DD_API_KEY", "").strip()
dd_app_key = os.getenv("DD_APP_KEY", "").strip()
dd_site = os.getenv("DD_SITE", "datadoghq.com").strip() or "datadoghq.com"
if not dd_api_key or not dd_app_key:
print("[ERROR] DD_API_KEY and DD_APP_KEY must be set", file=sys.stderr)
return 1
payload_path = Path(args.file).expanduser()
if not payload_path.exists():
print(f"[ERROR] Pipeline file not found: {payload_path}", file=sys.stderr)
return 1
payload = json.loads(payload_path.read_text(encoding="utf-8"))
if not isinstance(payload, dict) or "name" not in payload or "filter" not in payload or "processors" not in payload:
print(
f"[ERROR] Invalid pipeline payload (missing name/filter/processors): {payload_path}",
file=sys.stderr,
)
return 1
base_url = f"https://api.{dd_site}/api/v1/logs/config/pipelines"
pipelines = _request("GET", base_url, dd_api_key, dd_app_key)
if not isinstance(pipelines, list):
print("[ERROR] Unexpected Datadog API response for pipeline list", file=sys.stderr)
return 1
existing = next((p for p in pipelines if p.get("name") == args.name), None)
if existing:
method = "PUT"
target_url = f"{base_url}/{existing.get('id')}"
action = "update"
else:
method = "POST"
target_url = base_url
action = "create"
print(f"[INFO] Datadog site: {dd_site}", file=sys.stderr)
print(f"[INFO] Pipeline file: {payload_path}", file=sys.stderr)
print(f"[INFO] Action: {action}", file=sys.stderr)
if existing and existing.get("id"):
print(f"[INFO] Existing pipeline id: {existing['id']}", file=sys.stderr)
if args.dry_run:
print(f"[DRY RUN] {method} {target_url}", file=sys.stderr)
print(
json.dumps(
{
"name": payload.get("name"),
"filter": payload.get("filter"),
"processor_count": len(payload.get("processors", [])),
},
indent=2,
)
)
return 0
result = _request(method, target_url, dd_api_key, dd_app_key, body=json.dumps(payload).encode("utf-8"))
print(
json.dumps(
{
"id": result.get("id"),
"name": result.get("name"),
"is_enabled": result.get("is_enabled"),
"filter": result.get("filter"),
},
indent=2,
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment