Created
February 28, 2026 21:18
-
-
Save mdrakiburrahman/137012f17f0c51d65a40f416e77a636c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Fuzzy title bucketing: TF-IDF clustering → Soundex rebalancing. | |
| Groups incident titles that are semantically similar (e.g. same alert | |
| with a different region suffix, or "errors" vs "failures" variants) into a | |
| single bucket label. | |
| Pipeline | |
| -------- | |
| 1. **Normalise** — strip bracketed prefixes (``[topic=…]``), quoted strings, | |
| ``Region: …`` labels, UUIDs, IPs, timestamps, and uppercase region codes. | |
| 2. **TF-IDF + Agglomerative Clustering** — character n-gram vectorisation | |
| with cosine-distance threshold. | |
| 3. **Soundex Rebalancing** — compute an American-Soundex signature for each | |
| cluster label and merge clusters whose Jaccard similarity exceeds a | |
| threshold. This catches "errors" vs "failures" style differences that | |
| TF-IDF alone may miss. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| from collections import Counter | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.cluster import AgglomerativeClustering | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_distances | |
| log = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Normalisation | |
| # --------------------------------------------------------------------------- | |
| _BRACKET_RE = re.compile(r"\[[^\]]*\]") | |
| _QUOTE_RE = re.compile(r"'[^']*'|\"[^\"]*\"") | |
| _REGION_LABEL_RE = re.compile(r"Region:\s*\S+", re.IGNORECASE) | |
| _NOISE_RE = re.compile( | |
| r""" | |
| [0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12} | |
| | \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3} | |
| | \d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2} | |
| """, | |
| re.VERBOSE, | |
| ) | |
| _REGION_CODE_RE = re.compile(r"\b[A-Z]{2,5}\d{0,2}\b") | |
| def _normalize(title: str) -> str: | |
| """Strip noise tokens and collapse whitespace for better similarity.""" | |
| title = _BRACKET_RE.sub(" ", title) | |
| title = _QUOTE_RE.sub(" ", title) | |
| title = _REGION_LABEL_RE.sub(" ", title) | |
| title = _NOISE_RE.sub(" ", title) | |
| title = _REGION_CODE_RE.sub(" ", title) | |
| return re.sub(r"\s+", " ", title).strip().lower() | |
| # --------------------------------------------------------------------------- | |
| # American Soundex | |
| # --------------------------------------------------------------------------- | |
| _SOUNDEX_MAP = str.maketrans( | |
| "ABCDEFGHIJKLMNOPQRSTUVWXYZ", | |
| "01230120022455012623010202", | |
| ) | |
| def _soundex(word: str) -> str: | |
| """American Soundex code for a single word (4-character code).""" | |
| word = "".join(c for c in word.upper() if c.isalpha()) | |
| if not word: | |
| return "" | |
| coded = word.translate(_SOUNDEX_MAP) | |
| out = [word[0]] | |
| prev = coded[0] | |
| for i in range(1, len(coded)): | |
| c = coded[i] | |
| if c == "0": # vowels, H, W — transparent | |
| continue | |
| if c != prev: | |
| out.append(c) | |
| prev = c | |
| return ("".join(out) + "000")[:4] | |
| def _soundex_signature(text: str) -> frozenset[str]: | |
| """Soundex codes for every word in *text* (deduplicated).""" | |
| return frozenset( | |
| code for w in text.split() if (code := _soundex(w)) | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Soundex-based cluster rebalancing | |
| # --------------------------------------------------------------------------- | |
| def _rebalance_soundex( | |
| unique_normed: list[str], | |
| labels: np.ndarray, | |
| *, | |
| soundex_threshold: float = 0.45, | |
| ) -> np.ndarray: | |
| """Merge clusters whose Soundex Jaccard similarity ≥ *soundex_threshold*. | |
| Uses a union-find to transitively merge all qualifying pairs. | |
| """ | |
| # Build one soundex signature per cluster (from first member) | |
| cluster_sig: dict[int, frozenset[str]] = {} | |
| for idx, cid in enumerate(labels): | |
| if cid not in cluster_sig: | |
| cluster_sig[cid] = _soundex_signature(unique_normed[idx]) | |
| cids = list(cluster_sig.keys()) | |
| parent: dict[int, int] = {c: c for c in cids} | |
| def _find(x: int) -> int: | |
| while parent[x] != x: | |
| parent[x] = parent[parent[x]] | |
| x = parent[x] | |
| return x | |
| def _union(a: int, b: int) -> None: | |
| a, b = _find(a), _find(b) | |
| if a != b: | |
| parent[b] = a | |
| for i in range(len(cids)): | |
| for j in range(i + 1, len(cids)): | |
| si, sj = cluster_sig[cids[i]], cluster_sig[cids[j]] | |
| if not si or not sj: | |
| continue | |
| jaccard = len(si & sj) / len(si | sj) | |
| if jaccard >= soundex_threshold: | |
| _union(cids[i], cids[j]) | |
| merged = 0 | |
| new_labels = labels.copy() | |
| for idx in range(len(new_labels)): | |
| root = _find(new_labels[idx]) | |
| if root != new_labels[idx]: | |
| merged += 1 | |
| new_labels[idx] = root | |
| if merged: | |
| n_before = len(set(labels)) | |
| n_after = len(set(new_labels)) | |
| log.info("Soundex rebalancer merged %d cluster(s): %d → %d", n_before - n_after, n_before, n_after) | |
| return new_labels | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def add_title_buckets( | |
| df: pd.DataFrame, | |
| *, | |
| title_col: str = "Title", | |
| bucket_col: str = "icm_ai_categorized_bucket", | |
| distance_threshold: float = 0.40, | |
| soundex_threshold: float = 0.45, | |
| ) -> pd.DataFrame: | |
| """Add a *bucket_col* column that groups similar titles together. | |
| **Pass 1** — TF-IDF (character n-gram) + agglomerative clustering. | |
| **Pass 2** — Soundex Jaccard rebalancing to merge remaining near-misses. | |
| Args: | |
| df: DataFrame that must contain *title_col*. | |
| title_col: Column with incident titles. | |
| bucket_col: Name of the new column to add. | |
| distance_threshold: Max cosine distance for TF-IDF clustering. | |
| soundex_threshold: Min Jaccard similarity of Soundex signatures to | |
| merge two clusters in the rebalancing pass. | |
| Returns: | |
| A **copy** of *df* with *bucket_col* inserted right after *title_col*. | |
| """ | |
| df = df.copy() | |
| if df.empty or title_col not in df.columns: | |
| df[bucket_col] = pd.Series(dtype="str") | |
| return df | |
| titles = df[title_col].fillna("").astype(str) | |
| normed = titles.map(_normalize) | |
| unique_normed = normed.unique().tolist() | |
| if len(unique_normed) == 1: | |
| df[bucket_col] = titles.iloc[0] | |
| return _reorder(df, title_col, bucket_col) | |
| # --- Pass 1: TF-IDF clustering --- | |
| vec = TfidfVectorizer( | |
| analyzer="char_wb", | |
| ngram_range=(3, 5), | |
| max_features=10_000, | |
| sublinear_tf=True, | |
| ) | |
| tfidf = vec.fit_transform(unique_normed) | |
| dist = cosine_distances(tfidf) | |
| clustering = AgglomerativeClustering( | |
| n_clusters=None, | |
| metric="precomputed", | |
| linkage="average", | |
| distance_threshold=distance_threshold, | |
| ) | |
| labels = clustering.fit_predict(dist) | |
| log.info("TF-IDF pass: %d unique title(s) → %d cluster(s)", len(unique_normed), len(set(labels))) | |
| # --- Pass 2: Soundex rebalancing --- | |
| labels = _rebalance_soundex(unique_normed, labels, soundex_threshold=soundex_threshold) | |
| # --- Label each cluster with the most frequent original title --- | |
| normed_to_originals: dict[str, list[str]] = {} | |
| for orig, norm in zip(titles, normed): | |
| normed_to_originals.setdefault(norm, []).append(orig) | |
| cluster_originals: dict[int, list[str]] = {} | |
| for idx, cid in enumerate(labels): | |
| cluster_originals.setdefault(cid, []).extend( | |
| normed_to_originals.get(unique_normed[idx], []) | |
| ) | |
| cluster_labels = { | |
| cid: Counter(origs).most_common(1)[0][0] | |
| for cid, origs in cluster_originals.items() | |
| } | |
| normed_to_bucket = {n: cluster_labels[labels[i]] for i, n in enumerate(unique_normed)} | |
| df[bucket_col] = normed.map(normed_to_bucket) | |
| n_buckets = len(cluster_labels) | |
| log.info("Final: %d unique title(s) → %d bucket(s)", len(unique_normed), n_buckets) | |
| return _reorder(df, title_col, bucket_col) | |
| def _reorder(df: pd.DataFrame, after_col: str, new_col: str) -> pd.DataFrame: | |
| """Return *df* with *new_col* moved to immediately after *after_col*.""" | |
| cols = list(df.columns) | |
| cols.remove(new_col) | |
| idx = cols.index(after_col) + 1 | |
| cols.insert(idx, new_col) | |
| return df[cols] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment