Skip to content

Instantly share code, notes, and snippets.

@lukehinds
Created January 8, 2026 17:02
Show Gist options
  • Select an option

  • Save lukehinds/9008b6874280131a319c441489aa7b38 to your computer and use it in GitHub Desktop.

Select an option

Save lukehinds/9008b6874280131a319c441489aa7b38 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Script to detect and optionally remove duplicate topics in JSON graph files.
Uses SHA256 checksums (already computed in node metadata) and other matching
strategies to identify duplicate topics.
Example usage:
# Report duplicates using exact hash matching
python tools/dedupe_graph.py --input examples/basic-graph-topics.jsonl
# Report with case-insensitive matching
python tools/dedupe_graph.py --input graph.json --strategy case-insensitive
# Report with fuzzy matching (85% threshold)
python tools/dedupe_graph.py --input graph.json --strategy fuzzy --threshold 0.85
# Remove duplicates and save to new file
python tools/dedupe_graph.py --input graph.json --output deduped.json --mode remove
"""
import argparse
import hashlib
import json
import sys
from collections import defaultdict
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any
# Display constants
_KEY_DISPLAY_LENGTH = 16
_TOPIC_DISPLAY_LENGTH = 60
class DuplicateDetector:
"""Detects duplicate topics in a graph using various strategies."""
STRATEGY_EXACT = "exact"
STRATEGY_CASE_INSENSITIVE = "case-insensitive"
STRATEGY_FUZZY = "fuzzy"
def __init__(self, strategy: str = STRATEGY_EXACT, threshold: float = 0.9):
"""
Initialize the duplicate detector.
Args:
strategy: Detection strategy - "exact", "case-insensitive", or "fuzzy"
threshold: Similarity threshold for fuzzy matching (0.0 to 1.0)
"""
self.strategy = strategy
self.threshold = threshold
def find_duplicates(self, graph_data: dict[str, Any]) -> dict[str, list[dict[str, Any]]]:
"""
Find duplicate topics in the graph.
Args:
graph_data: Parsed JSON graph data
Returns:
Dictionary mapping hash/key to list of duplicate nodes.
Only includes groups with 2+ nodes.
"""
if self.strategy == self.STRATEGY_EXACT:
return self._exact_hash_match(graph_data)
if self.strategy == self.STRATEGY_CASE_INSENSITIVE:
return self._case_insensitive_match(graph_data)
if self.strategy == self.STRATEGY_FUZZY:
return self._fuzzy_match(graph_data)
raise ValueError(f"Unknown strategy: {self.strategy}")
def _exact_hash_match(self, graph_data: dict[str, Any]) -> dict[str, list[dict[str, Any]]]:
"""Group nodes by their existing SHA256 topic_hash."""
groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
nodes = graph_data.get("nodes", {})
for node_id, node in nodes.items():
topic_hash = node.get("metadata", {}).get("topic_hash")
if topic_hash:
groups[topic_hash].append({"id": node_id, **node})
else:
# Compute hash if not present
topic = node.get("topic", "")
computed_hash = hashlib.sha256(topic.encode("utf-8")).hexdigest()
groups[computed_hash].append({"id": node_id, **node})
# Filter to only groups with duplicates
return {k: v for k, v in groups.items() if len(v) > 1}
def _case_insensitive_match(
self, graph_data: dict[str, Any]
) -> dict[str, list[dict[str, Any]]]:
"""Group nodes by case-insensitive normalized topic hash."""
groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
nodes = graph_data.get("nodes", {})
for node_id, node in nodes.items():
topic = node.get("topic", "")
normalized = topic.lower().strip()
normalized_hash = hashlib.sha256(normalized.encode("utf-8")).hexdigest()
groups[normalized_hash].append({"id": node_id, **node})
return {k: v for k, v in groups.items() if len(v) > 1}
def _fuzzy_match(self, graph_data: dict[str, Any]) -> dict[str, list[dict[str, Any]]]:
"""Group nodes by fuzzy similarity using SequenceMatcher."""
nodes = graph_data.get("nodes", {})
node_list = [{"id": node_id, **node} for node_id, node in nodes.items()]
# Track which nodes have been assigned to a group
assigned: set[str] = set()
groups: dict[str, list[dict[str, Any]]] = {}
group_counter = 0
for i, node_a in enumerate(node_list):
if node_a["id"] in assigned:
continue
topic_a = node_a.get("topic", "")
current_group = [node_a]
assigned.add(node_a["id"])
for node_b in node_list[i + 1 :]:
if node_b["id"] in assigned:
continue
topic_b = node_b.get("topic", "")
similarity = SequenceMatcher(None, topic_a, topic_b).ratio()
if similarity >= self.threshold:
current_group.append(node_b)
assigned.add(node_b["id"])
if len(current_group) > 1:
group_key = f"fuzzy_group_{group_counter}"
groups[group_key] = current_group
group_counter += 1
return groups
class DuplicateRemover:
"""Removes duplicate nodes from a graph while preserving structure."""
def remove_duplicates(
self,
graph_data: dict[str, Any],
duplicates: dict[str, list[dict[str, Any]]],
) -> tuple[dict[str, Any], dict[str, Any]]:
"""
Remove duplicate nodes from the graph.
For each duplicate group, keeps the node with the lowest ID and
updates all references to point to the kept node.
Args:
graph_data: Original graph data
duplicates: Duplicate groups from DuplicateDetector
Returns:
Tuple of (modified graph data, removal stats)
"""
# Deep copy to avoid modifying original
result = json.loads(json.dumps(graph_data))
nodes = result.get("nodes", {})
removed_count = 0
merge_map: dict[str, str] = {} # Maps removed node ID -> kept node ID
for _group_key, group_nodes in duplicates.items():
# Sort by ID (as int) to keep the lowest
sorted_nodes = sorted(group_nodes, key=lambda n: int(n["id"]))
keep_node = sorted_nodes[0]
remove_nodes = sorted_nodes[1:]
for remove_node in remove_nodes:
remove_id = str(remove_node["id"])
keep_id = str(keep_node["id"])
merge_map[remove_id] = keep_id
# Merge children from removed node into kept node
if remove_id in nodes and keep_id in nodes:
removed_children = nodes[remove_id].get("children", [])
kept_children = nodes[keep_id].get("children", [])
# Add children that aren't already present
for child_id in removed_children:
if child_id not in kept_children:
kept_children.append(child_id)
nodes[keep_id]["children"] = kept_children
# Remove the duplicate node
if remove_id in nodes:
del nodes[remove_id]
removed_count += 1
# Update all parent/child references
self._update_references(nodes, merge_map)
stats = {
"removed_count": removed_count,
"groups_processed": len(duplicates),
"merge_map": merge_map,
}
return result, stats
def _update_references(self, nodes: dict[str, Any], merge_map: dict[str, str]) -> None:
"""Update parent and child references to point to kept nodes."""
for node in nodes.values():
# Update children references
children = node.get("children", [])
node["children"] = [int(merge_map.get(str(c), str(c))) for c in children]
# Remove self-references and duplicates
node["children"] = list({c for c in node["children"] if str(c) != str(node.get("id"))})
# Update parent references
parents = node.get("parents", [])
node["parents"] = [int(merge_map.get(str(p), str(p))) for p in parents]
# Remove references to removed nodes and self-references
node["parents"] = list(
{
p
for p in node["parents"]
if str(p) not in merge_map and str(p) != str(node.get("id"))
}
)
def print_report(
input_file: str,
strategy: str,
duplicates: dict[str, list[dict[str, Any]]],
verbose: bool = False,
) -> None:
"""Print a formatted report of duplicate topics."""
print("Duplicate Detection Report")
print("=" * 50)
print(f"Strategy: {strategy}")
print(f"Input: {input_file}")
print()
if not duplicates:
print("No duplicates found.")
return
total_duplicates = sum(len(group) - 1 for group in duplicates.values())
print(f"Found {len(duplicates)} duplicate group(s):")
print()
for i, (key, group) in enumerate(duplicates.items(), 1):
display_key = key[:_KEY_DISPLAY_LENGTH] + "..." if len(key) > _KEY_DISPLAY_LENGTH else key
print(f"Group {i} (key: {display_key}):")
for node in group:
topic = node.get("topic", "")
node_id = node.get("id")
if verbose:
# Show full topic
print(f' - Node {node_id}: "{topic}"')
else:
# Truncate long topics
if len(topic) > _TOPIC_DISPLAY_LENGTH:
display_topic = topic[:_TOPIC_DISPLAY_LENGTH] + "..."
else:
display_topic = topic
print(f' - Node {node_id}: "{display_topic}"')
print()
print("-" * 50)
print(f"Summary: {total_duplicates} duplicate node(s) in {len(duplicates)} group(s)")
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Detect and remove duplicate topics in JSON graph files.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --input graph.json
%(prog)s --input graph.json --strategy case-insensitive
%(prog)s --input graph.json --strategy fuzzy --threshold 0.85
%(prog)s --input graph.json --output deduped.json --mode remove
""",
)
parser.add_argument(
"--input",
"-i",
required=True,
help="Input JSON graph file",
)
parser.add_argument(
"--output",
"-o",
help="Output file for deduplicated graph (required for remove mode)",
)
parser.add_argument(
"--mode",
"-m",
choices=["report", "remove"],
default="report",
help="Operation mode: report (default) or remove",
)
parser.add_argument(
"--strategy",
"-s",
choices=["exact", "case-insensitive", "fuzzy"],
default="exact",
help="Detection strategy (default: exact)",
)
parser.add_argument(
"--threshold",
"-t",
type=float,
default=0.9,
help="Similarity threshold for fuzzy matching (default: 0.9)",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Show detailed output",
)
args = parser.parse_args()
# Validate arguments
if args.mode == "remove" and not args.output:
parser.error("--output is required when using --mode remove")
if args.threshold < 0.0 or args.threshold > 1.0:
parser.error("--threshold must be between 0.0 and 1.0")
# Load input file
input_path = Path(args.input)
if not input_path.exists():
print(f"Error: Input file not found: {args.input}", file=sys.stderr)
return 1
try:
with open(input_path, encoding="utf-8") as f:
graph_data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr)
return 1
# Detect duplicates
detector = DuplicateDetector(strategy=args.strategy, threshold=args.threshold)
duplicates = detector.find_duplicates(graph_data)
if args.mode == "report":
print_report(args.input, args.strategy, duplicates, verbose=args.verbose)
else:
# Remove mode
if not duplicates:
print("No duplicates found. Output file not created.")
return 0
remover = DuplicateRemover()
deduped_graph, stats = remover.remove_duplicates(graph_data, duplicates)
# Write output
output_path = Path(args.output)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(deduped_graph, f, indent=2)
print(f"Removed {stats['removed_count']} duplicate node(s)")
print(f"Processed {stats['groups_processed']} duplicate group(s)")
print(f"Output written to: {args.output}")
if args.verbose:
print("\nMerge mapping (removed -> kept):")
for removed, kept in stats["merge_map"].items():
print(f" Node {removed} -> Node {kept}")
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment