Last active
March 3, 2026 16:46
-
-
Save JonBons/9314039d5088441dffb5d00e8f9df867 to your computer and use it in GitHub Desktop.
MeshMonitor Utilization Alert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # mm_meta: | |
| # name: Utilization Alert | |
| # emoji: 📊 | |
| # language: Python | |
| """ | |
| Utilization Alert Script - Notify when TX air or channel utilization exceeds limits. | |
| Designed to run on a schedule (e.g. cron every 1–2 minutes). Tracks how long each | |
| node's channel utilization and air utilization (TX) have been over the configured | |
| limits and notifies only after the threshold has been exceeded for a sustained | |
| period (default 5 minutes). Once an alert is sent for a given node/metric, | |
| it will not notify again until the metric drops back below the limit (which resets the alert state) | |
| and then exceeds the limit again for another sustained period. | |
| Ideally configured with no channel for no mesh output. | |
| Requirements: | |
| - Python 3.6+ | |
| - MM_API_TOKEN (generate from Settings > API Tokens) | |
| - Optional: MM_API_URL, limit/state env vars (see below) | |
| Environment variables: | |
| - MM_API_TOKEN API token for MeshMonitor v1 API (required) | |
| - MM_API_URL Base URL (default: http://localhost:3001) | |
| - MM_CHANNEL_UTIL_LIMIT Channel utilization limit % (default: 25) | |
| - MM_AIR_TX_UTIL_LIMIT Air TX utilization limit % (default: 7) | |
| - MM_OVER_MINUTES Minutes over limit before alerting (default: 5) | |
| - MM_UTIL_STATE_FILE Path to state file for persistence (default: ./.utilization-alert-state.json) | |
| - MM_ALERT_WEBHOOK_URL Optional: POST to this URL when an alert fires (Discord webhook URLs get a rich embed) | |
| - MM_NODE_IDS Optional: comma-separated node IDs to monitor (default: only the connected/virtual node from /api/config localNodeInfo) | |
| - MM_UTIL_ALERT_INDIVIDUALLY If set (1/true/yes/on), alert when EITHER metric is over limit (process each metric individually). If unset (default), alert only when BOTH Air TX and Channel utilization are over limits. | |
| Cron example (run every 2 minutes): | |
| */2 * * * * MM_API_TOKEN=your_token MM_API_URL=https://yourserver /path/to/utilization-alert.py >> /var/log/utilization-alert.log 2>&1 | |
| Output: | |
| - Logs and alert summary to stdout (cron can mail or append to a log) | |
| - Optional webhook POST on alert (if URL is Discord, sends a rich embed with node stats) | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import urllib.request | |
| import urllib.error | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, List | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| # Config from environment | |
| API_TOKEN = os.environ.get("MM_API_TOKEN", "").strip() | |
| API_URL = (os.environ.get("MM_API_URL", "http://localhost:3001") or "").rstrip("/") | |
| CHANNEL_LIMIT = float(os.environ.get("MM_CHANNEL_UTIL_LIMIT", "25")) | |
| AIR_TX_LIMIT = float(os.environ.get("MM_AIR_TX_UTIL_LIMIT", "7")) | |
| OVER_MINUTES = float(os.environ.get("MM_OVER_MINUTES", "5")) | |
| STATE_FILE = os.path.join(BASE_DIR, os.environ.get("MM_UTIL_STATE_FILE", "utilization-alert-state.json")) | |
| WEBHOOK_URL = (os.environ.get("MM_ALERsT_WEBHOOK_URL") or "").strip() | |
| NODE_IDS_FILTER = os.environ.get("MM_NODE_IDS", "").strip() | |
| # Default: require both metrics over limit before alerting. Set MM_UTIL_ALERT_INDIVIDUALLY to alert on each metric individually. | |
| ALERT_INDIVIDUALLY = (os.environ.get("MM_UTIL_ALERT_INDIVIDUALLY", "0").strip().lower() in ("1", "true", "yes", "on")) | |
| REQUIRE_BOTH_METRICS = not ALERT_INDIVIDUALLY | |
| if NODE_IDS_FILTER: | |
| NODE_IDS_SET = {n.strip() for n in NODE_IDS_FILTER.split(",") if n.strip()} | |
| else: | |
| NODE_IDS_SET = None | |
| OVER_SECONDS = int(OVER_MINUTES * 60) | |
| def get_local_node_id_from_config(timeout: int = 10) -> Optional[str]: | |
| """ | |
| Fetch GET /api/config (no auth) and return localNodeInfo.nodeId when set. | |
| This is the node MeshMonitor is connected to (the virtual node's physical device). | |
| Returns None if config or localNodeInfo.nodeId is missing. | |
| """ | |
| url = f"{API_URL}/api/config" | |
| try: | |
| req = urllib.request.Request( | |
| url, | |
| headers={"Accept": "application/json", "User-Agent": "MeshMonitor-UtilizationAlert/1.0"}, | |
| ) | |
| with urllib.request.urlopen(req, timeout=timeout) as response: | |
| data = json.loads(response.read().decode("utf-8")) | |
| local = data.get("localNodeInfo") if isinstance(data, dict) else None | |
| if isinstance(local, dict): | |
| node_id = local.get("nodeId") | |
| if isinstance(node_id, str) and node_id.strip(): | |
| return node_id.strip() | |
| except Exception: | |
| pass | |
| return None | |
| def api_request(endpoint: str, timeout: int = 10) -> Optional[Dict[str, Any]]: | |
| """Make an authenticated GET request to the MeshMonitor v1 API.""" | |
| if not API_TOKEN: | |
| return None | |
| url = f"{API_URL}{endpoint}" | |
| headers = { | |
| "Authorization": f"Bearer {API_TOKEN}", | |
| "Accept": "application/json", | |
| "User-Agent": "MeshMonitor-UtilizationAlert/1.0", | |
| } | |
| try: | |
| req = urllib.request.Request(url, headers=headers) | |
| with urllib.request.urlopen(req, timeout=timeout) as response: | |
| return json.loads(response.read().decode("utf-8")) | |
| except urllib.error.HTTPError as e: | |
| print(f"[utilization-alert] API HTTP error {e.code}: {e.reason}", file=sys.stderr) | |
| if e.fp: | |
| try: | |
| body = e.fp.read().decode("utf-8", errors="replace") | |
| print(body[:500], file=sys.stderr) | |
| except Exception: | |
| pass | |
| return None | |
| except urllib.error.URLError as e: | |
| print(f"[utilization-alert] API URL error: {e.reason}", file=sys.stderr) | |
| return None | |
| except Exception as e: | |
| print(f"[utilization-alert] API error: {e}", file=sys.stderr) | |
| return None | |
| def load_state() -> Dict[str, Any]: | |
| """Load persisted state (node -> metric -> overSince / notifiedAt).""" | |
| path = Path(STATE_FILE) | |
| if not path.exists(): | |
| return {} | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"[utilization-alert] Could not load state: {e}", file=sys.stderr) | |
| return {} | |
| def save_state(state: Dict[str, Any]) -> None: | |
| """Persist state to STATE_FILE.""" | |
| path = Path(STATE_FILE) | |
| try: | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(state, f, indent=2) | |
| except Exception as e: | |
| print(f"[utilization-alert] Could not save state: {e}", file=sys.stderr) | |
| def ensure_node_state(state: Dict[str, Any], node_id: str) -> Dict[str, Any]: | |
| """Return state for node; ensure structure for channelUtilization and airUtilTx.""" | |
| if node_id not in state: | |
| state[node_id] = { | |
| "channelUtilization": {"overSince": None, "notifiedAt": None}, | |
| "airUtilTx": {"overSince": None, "notifiedAt": None}, | |
| } | |
| node_state = state[node_id] | |
| for key in ("channelUtilization", "airUtilTx"): | |
| if key not in node_state: | |
| node_state[key] = {"overSince": None, "notifiedAt": None} | |
| return node_state | |
| def is_discord_webhook(url: str) -> bool: | |
| """Return True if URL is a Discord webhook (discord.com or discordapp.com).""" | |
| return "discord.com/api/webhooks" in url or "discordapp.com/api/webhooks" in url | |
| def build_discord_embed_payload(alerts: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Build Discord webhook payload with a rich embed for utilization alerts.""" | |
| # Orange color for alert (Discord color is decimal 0xRRGGBB) | |
| ALERT_COLOR = 0xF39C12 # Orange | |
| metric_labels = { | |
| "channelUtilization": "Channel utilization", | |
| "airUtilTx": "Air TX utilization", | |
| } | |
| fields = [] | |
| for a in alerts: | |
| metric_name = a.get("metric", "") | |
| label = metric_labels.get(metric_name, metric_name) | |
| value_pct = a.get("value", 0) | |
| limit_pct = a.get("limit", 0) | |
| over_mins = (a.get("overSeconds") or 0) // 60 | |
| node_name = a.get("longName", "—") | |
| node_id = a.get("nodeId", "—") | |
| # Field value (max 1024 chars) | |
| value_text = ( | |
| f"**ID:** `{node_id}`\n" | |
| f"**Metric:** {label}\n" | |
| f"**Current:** {value_pct:.1f}%\n" | |
| f"**Limit:** {limit_pct:.1f}%\n" | |
| f"**Over limit for:** {over_mins} min" | |
| ) | |
| fields.append({ | |
| "name": f"⚠️ {node_name}", | |
| "value": value_text, | |
| "inline": True, | |
| }) | |
| # Discord timestamp must be ISO8601 | |
| ts_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z") | |
| embed = { | |
| "title": "📊 MeshMonitor Utilization Alert", | |
| "description": ( | |
| "The following node(s) have exceeded utilization limits " | |
| f"for more than **{int(OVER_MINUTES)}** minutes." | |
| ), | |
| "color": ALERT_COLOR, | |
| "fields": fields, | |
| "footer": {"text": "MeshMonitor Utilization Alert"}, | |
| "timestamp": ts_iso, | |
| } | |
| return { | |
| "content": "", | |
| "tts": False, | |
| "embeds": [embed], | |
| "components": [], | |
| "actions": {}, | |
| "flags": 0 | |
| } | |
| def notify_webhook(alerts: List[Dict[str, Any]]) -> None: | |
| """POST alert payload to MM_ALERT_WEBHOOK_URL. Uses Discord rich embed if URL is Discord.""" | |
| if not WEBHOOK_URL or not alerts: | |
| return | |
| if is_discord_webhook(WEBHOOK_URL): | |
| payload = build_discord_embed_payload(alerts) | |
| else: | |
| payload = { | |
| "source": "meshmonitor-utilization-alert", | |
| "timestamp": int(time.time()), | |
| "alerts": alerts, | |
| } | |
| try: | |
| data = json.dumps(payload).encode("utf-8") | |
| req = urllib.request.Request( | |
| WEBHOOK_URL, | |
| data=data, | |
| headers={ | |
| "Content-Type": "application/json", | |
| "User-Agent": "MeshMonitor-UtilizationAlert/1.0" | |
| }, | |
| method="POST", | |
| ) | |
| with urllib.request.urlopen(req, timeout=10) as resp: | |
| if resp.status >= 400: | |
| print(f"[utilization-alert] Webhook returned {resp.status}", file=sys.stderr) | |
| except Exception as e: | |
| print(f"[utilization-alert] Webhook error: {e}", file=sys.stderr) | |
| def main() -> None: | |
| now = int(time.time()) | |
| state = load_state() | |
| if not API_TOKEN: | |
| print("[utilization-alert] MM_API_TOKEN not set; skipping.") | |
| sys.exit(0) | |
| result = api_request("/api/v1/nodes") | |
| if not result or not result.get("success"): | |
| print("[utilization-alert] Failed to fetch nodes; skipping this run.") | |
| sys.exit(1) | |
| nodes: List[Dict[str, Any]] = result.get("data") or [] | |
| # When MM_NODE_IDS is not set, default to the connected node (localNodeInfo from /api/config) | |
| node_ids_to_monitor = NODE_IDS_SET | |
| if node_ids_to_monitor is None: | |
| local_id = get_local_node_id_from_config() | |
| if local_id: | |
| node_ids_to_monitor = {local_id} | |
| if node_ids_to_monitor is not None: | |
| nodes = [n for n in nodes if n.get("nodeId") in node_ids_to_monitor] | |
| alerts: List[Dict[str, Any]] = [] | |
| for node in nodes: | |
| node_id = node.get("nodeId") or "" | |
| long_name = node.get("longName") or node.get("shortName") or node_id | |
| channel_util = node.get("channelUtilization") | |
| air_tx = node.get("airUtilTx") | |
| # Skip if no metrics (optional: only monitor nodes that report metrics) | |
| if channel_util is None and air_tx is None: | |
| continue | |
| node_state = ensure_node_state(state, node_id) | |
| for metric_name, limit, value in ( | |
| ("channelUtilization", CHANNEL_LIMIT, channel_util), | |
| ("airUtilTx", AIR_TX_LIMIT, air_tx), | |
| ): | |
| if value is None: | |
| # Clear over state when metric is missing (e.g. node stopped reporting) | |
| node_state[metric_name]["overSince"] = None | |
| node_state[metric_name]["notifiedAt"] = None | |
| continue | |
| print(metric_name, "=", value) | |
| over = value > limit | |
| meta = node_state[metric_name] | |
| if over: | |
| if meta["overSince"] is None: | |
| meta["overSince"] = now | |
| over_duration = now - meta["overSince"] | |
| if ALERT_INDIVIDUALLY and over_duration >= OVER_SECONDS and meta["notifiedAt"] is None: | |
| meta["notifiedAt"] = now | |
| label = "Channel utilization" if metric_name == "channelUtilization" else "Air TX utilization" | |
| msg = ( | |
| f"{label} over limit: node {long_name} ({node_id}) " | |
| f"{metric_name}={value:.1f}% (limit {limit:.1f}%, over for {over_duration // 60}m)" | |
| ) | |
| alerts.append({ | |
| "nodeId": node_id, | |
| "longName": long_name, | |
| "metric": metric_name, | |
| "value": value, | |
| "limit": limit, | |
| "overSeconds": over_duration, | |
| "message": msg, | |
| }) | |
| print(msg) | |
| else: | |
| # Under limit: reset so a later spike is treated as a new incident (we will alert again after 5 min over) | |
| meta["overSince"] = None | |
| meta["notifiedAt"] = None | |
| if REQUIRE_BOTH_METRICS and channel_util is not None and air_tx is not None: | |
| ch_over = channel_util > CHANNEL_LIMIT | |
| air_over = air_tx > AIR_TX_LIMIT | |
| ch_meta = node_state["channelUtilization"] | |
| air_meta = node_state["airUtilTx"] | |
| ch_duration = (now - ch_meta["overSince"]) if ch_meta["overSince"] else 0 | |
| air_duration = (now - air_meta["overSince"]) if air_meta["overSince"] else 0 | |
| both_ready = ( | |
| ch_over and air_over | |
| and ch_duration >= OVER_SECONDS and air_duration >= OVER_SECONDS | |
| and (ch_meta["notifiedAt"] is None or air_meta["notifiedAt"] is None) | |
| ) | |
| if both_ready: | |
| ch_meta["notifiedAt"] = now | |
| air_meta["notifiedAt"] = now | |
| for metric_name, limit, value, over_duration in ( | |
| ("channelUtilization", CHANNEL_LIMIT, channel_util, ch_duration), | |
| ("airUtilTx", AIR_TX_LIMIT, air_tx, air_duration), | |
| ): | |
| label = "Channel utilization" if metric_name == "channelUtilization" else "Air TX utilization" | |
| msg = ( | |
| f"{label} over limit: node {long_name} ({node_id}) " | |
| f"{metric_name}={value:.1f}% (limit {limit:.1f}%, over for {over_duration // 60}m)" | |
| ) | |
| alerts.append({ | |
| "nodeId": node_id, | |
| "longName": long_name, | |
| "metric": metric_name, | |
| "value": value, | |
| "limit": limit, | |
| "overSeconds": over_duration, | |
| "message": msg, | |
| }) | |
| print(msg) | |
| save_state(state) | |
| notify_webhook(alerts) | |
| if alerts: | |
| sys.exit(0) # Exit 2 = alerts fired (useful for cron to detect) | |
| sys.exit(0) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment