Skip to content

Instantly share code, notes, and snippets.

@mdrakiburrahman
Created February 28, 2026 21:18
Show Gist options
  • Select an option

  • Save mdrakiburrahman/137012f17f0c51d65a40f416e77a636c to your computer and use it in GitHub Desktop.

Select an option

Save mdrakiburrahman/137012f17f0c51d65a40f416e77a636c to your computer and use it in GitHub Desktop.
"""Fuzzy title bucketing: TF-IDF clustering → Soundex rebalancing.
Groups incident titles that are semantically similar (e.g. same alert
with a different region suffix, or "errors" vs "failures" variants) into a
single bucket label.
Pipeline
--------
1. **Normalise** — strip bracketed prefixes (``[topic=…]``), quoted strings,
``Region: …`` labels, UUIDs, IPs, timestamps, and uppercase region codes.
2. **TF-IDF + Agglomerative Clustering** — character n-gram vectorisation
with cosine-distance threshold.
3. **Soundex Rebalancing** — compute an American-Soundex signature for each
cluster label and merge clusters whose Jaccard similarity exceeds a
threshold. This catches "errors" vs "failures" style differences that
TF-IDF alone may miss.
"""
from __future__ import annotations
import logging
import re
from collections import Counter
import numpy as np
import pandas as pd
from sklearn.cluster import AgglomerativeClustering
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_distances
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Normalisation
# ---------------------------------------------------------------------------
_BRACKET_RE = re.compile(r"\[[^\]]*\]")
_QUOTE_RE = re.compile(r"'[^']*'|\"[^\"]*\"")
_REGION_LABEL_RE = re.compile(r"Region:\s*\S+", re.IGNORECASE)
_NOISE_RE = re.compile(
r"""
[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}
| \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}
| \d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2}
""",
re.VERBOSE,
)
_REGION_CODE_RE = re.compile(r"\b[A-Z]{2,5}\d{0,2}\b")
def _normalize(title: str) -> str:
"""Strip noise tokens and collapse whitespace for better similarity."""
title = _BRACKET_RE.sub(" ", title)
title = _QUOTE_RE.sub(" ", title)
title = _REGION_LABEL_RE.sub(" ", title)
title = _NOISE_RE.sub(" ", title)
title = _REGION_CODE_RE.sub(" ", title)
return re.sub(r"\s+", " ", title).strip().lower()
# ---------------------------------------------------------------------------
# American Soundex
# ---------------------------------------------------------------------------
_SOUNDEX_MAP = str.maketrans(
"ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"01230120022455012623010202",
)
def _soundex(word: str) -> str:
"""American Soundex code for a single word (4-character code)."""
word = "".join(c for c in word.upper() if c.isalpha())
if not word:
return ""
coded = word.translate(_SOUNDEX_MAP)
out = [word[0]]
prev = coded[0]
for i in range(1, len(coded)):
c = coded[i]
if c == "0": # vowels, H, W — transparent
continue
if c != prev:
out.append(c)
prev = c
return ("".join(out) + "000")[:4]
def _soundex_signature(text: str) -> frozenset[str]:
"""Soundex codes for every word in *text* (deduplicated)."""
return frozenset(
code for w in text.split() if (code := _soundex(w))
)
# ---------------------------------------------------------------------------
# Soundex-based cluster rebalancing
# ---------------------------------------------------------------------------
def _rebalance_soundex(
unique_normed: list[str],
labels: np.ndarray,
*,
soundex_threshold: float = 0.45,
) -> np.ndarray:
"""Merge clusters whose Soundex Jaccard similarity ≥ *soundex_threshold*.
Uses a union-find to transitively merge all qualifying pairs.
"""
# Build one soundex signature per cluster (from first member)
cluster_sig: dict[int, frozenset[str]] = {}
for idx, cid in enumerate(labels):
if cid not in cluster_sig:
cluster_sig[cid] = _soundex_signature(unique_normed[idx])
cids = list(cluster_sig.keys())
parent: dict[int, int] = {c: c for c in cids}
def _find(x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def _union(a: int, b: int) -> None:
a, b = _find(a), _find(b)
if a != b:
parent[b] = a
for i in range(len(cids)):
for j in range(i + 1, len(cids)):
si, sj = cluster_sig[cids[i]], cluster_sig[cids[j]]
if not si or not sj:
continue
jaccard = len(si & sj) / len(si | sj)
if jaccard >= soundex_threshold:
_union(cids[i], cids[j])
merged = 0
new_labels = labels.copy()
for idx in range(len(new_labels)):
root = _find(new_labels[idx])
if root != new_labels[idx]:
merged += 1
new_labels[idx] = root
if merged:
n_before = len(set(labels))
n_after = len(set(new_labels))
log.info("Soundex rebalancer merged %d cluster(s): %d → %d", n_before - n_after, n_before, n_after)
return new_labels
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def add_title_buckets(
df: pd.DataFrame,
*,
title_col: str = "Title",
bucket_col: str = "icm_ai_categorized_bucket",
distance_threshold: float = 0.40,
soundex_threshold: float = 0.45,
) -> pd.DataFrame:
"""Add a *bucket_col* column that groups similar titles together.
**Pass 1** — TF-IDF (character n-gram) + agglomerative clustering.
**Pass 2** — Soundex Jaccard rebalancing to merge remaining near-misses.
Args:
df: DataFrame that must contain *title_col*.
title_col: Column with incident titles.
bucket_col: Name of the new column to add.
distance_threshold: Max cosine distance for TF-IDF clustering.
soundex_threshold: Min Jaccard similarity of Soundex signatures to
merge two clusters in the rebalancing pass.
Returns:
A **copy** of *df* with *bucket_col* inserted right after *title_col*.
"""
df = df.copy()
if df.empty or title_col not in df.columns:
df[bucket_col] = pd.Series(dtype="str")
return df
titles = df[title_col].fillna("").astype(str)
normed = titles.map(_normalize)
unique_normed = normed.unique().tolist()
if len(unique_normed) == 1:
df[bucket_col] = titles.iloc[0]
return _reorder(df, title_col, bucket_col)
# --- Pass 1: TF-IDF clustering ---
vec = TfidfVectorizer(
analyzer="char_wb",
ngram_range=(3, 5),
max_features=10_000,
sublinear_tf=True,
)
tfidf = vec.fit_transform(unique_normed)
dist = cosine_distances(tfidf)
clustering = AgglomerativeClustering(
n_clusters=None,
metric="precomputed",
linkage="average",
distance_threshold=distance_threshold,
)
labels = clustering.fit_predict(dist)
log.info("TF-IDF pass: %d unique title(s) → %d cluster(s)", len(unique_normed), len(set(labels)))
# --- Pass 2: Soundex rebalancing ---
labels = _rebalance_soundex(unique_normed, labels, soundex_threshold=soundex_threshold)
# --- Label each cluster with the most frequent original title ---
normed_to_originals: dict[str, list[str]] = {}
for orig, norm in zip(titles, normed):
normed_to_originals.setdefault(norm, []).append(orig)
cluster_originals: dict[int, list[str]] = {}
for idx, cid in enumerate(labels):
cluster_originals.setdefault(cid, []).extend(
normed_to_originals.get(unique_normed[idx], [])
)
cluster_labels = {
cid: Counter(origs).most_common(1)[0][0]
for cid, origs in cluster_originals.items()
}
normed_to_bucket = {n: cluster_labels[labels[i]] for i, n in enumerate(unique_normed)}
df[bucket_col] = normed.map(normed_to_bucket)
n_buckets = len(cluster_labels)
log.info("Final: %d unique title(s) → %d bucket(s)", len(unique_normed), n_buckets)
return _reorder(df, title_col, bucket_col)
def _reorder(df: pd.DataFrame, after_col: str, new_col: str) -> pd.DataFrame:
"""Return *df* with *new_col* moved to immediately after *after_col*."""
cols = list(df.columns)
cols.remove(new_col)
idx = cols.index(after_col) + 1
cols.insert(idx, new_col)
return df[cols]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment