Created
January 8, 2026 17:02
-
-
Save lukehinds/9008b6874280131a319c441489aa7b38 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Script to detect and optionally remove duplicate topics in JSON graph files. | |
| Uses SHA256 checksums (already computed in node metadata) and other matching | |
| strategies to identify duplicate topics. | |
| Example usage: | |
| # Report duplicates using exact hash matching | |
| python tools/dedupe_graph.py --input examples/basic-graph-topics.jsonl | |
| # Report with case-insensitive matching | |
| python tools/dedupe_graph.py --input graph.json --strategy case-insensitive | |
| # Report with fuzzy matching (85% threshold) | |
| python tools/dedupe_graph.py --input graph.json --strategy fuzzy --threshold 0.85 | |
| # Remove duplicates and save to new file | |
| python tools/dedupe_graph.py --input graph.json --output deduped.json --mode remove | |
| """ | |
| import argparse | |
| import hashlib | |
| import json | |
| import sys | |
| from collections import defaultdict | |
| from difflib import SequenceMatcher | |
| from pathlib import Path | |
| from typing import Any | |
| # Display constants | |
| _KEY_DISPLAY_LENGTH = 16 | |
| _TOPIC_DISPLAY_LENGTH = 60 | |
| class DuplicateDetector: | |
| """Detects duplicate topics in a graph using various strategies.""" | |
| STRATEGY_EXACT = "exact" | |
| STRATEGY_CASE_INSENSITIVE = "case-insensitive" | |
| STRATEGY_FUZZY = "fuzzy" | |
| def __init__(self, strategy: str = STRATEGY_EXACT, threshold: float = 0.9): | |
| """ | |
| Initialize the duplicate detector. | |
| Args: | |
| strategy: Detection strategy - "exact", "case-insensitive", or "fuzzy" | |
| threshold: Similarity threshold for fuzzy matching (0.0 to 1.0) | |
| """ | |
| self.strategy = strategy | |
| self.threshold = threshold | |
| def find_duplicates(self, graph_data: dict[str, Any]) -> dict[str, list[dict[str, Any]]]: | |
| """ | |
| Find duplicate topics in the graph. | |
| Args: | |
| graph_data: Parsed JSON graph data | |
| Returns: | |
| Dictionary mapping hash/key to list of duplicate nodes. | |
| Only includes groups with 2+ nodes. | |
| """ | |
| if self.strategy == self.STRATEGY_EXACT: | |
| return self._exact_hash_match(graph_data) | |
| if self.strategy == self.STRATEGY_CASE_INSENSITIVE: | |
| return self._case_insensitive_match(graph_data) | |
| if self.strategy == self.STRATEGY_FUZZY: | |
| return self._fuzzy_match(graph_data) | |
| raise ValueError(f"Unknown strategy: {self.strategy}") | |
| def _exact_hash_match(self, graph_data: dict[str, Any]) -> dict[str, list[dict[str, Any]]]: | |
| """Group nodes by their existing SHA256 topic_hash.""" | |
| groups: dict[str, list[dict[str, Any]]] = defaultdict(list) | |
| nodes = graph_data.get("nodes", {}) | |
| for node_id, node in nodes.items(): | |
| topic_hash = node.get("metadata", {}).get("topic_hash") | |
| if topic_hash: | |
| groups[topic_hash].append({"id": node_id, **node}) | |
| else: | |
| # Compute hash if not present | |
| topic = node.get("topic", "") | |
| computed_hash = hashlib.sha256(topic.encode("utf-8")).hexdigest() | |
| groups[computed_hash].append({"id": node_id, **node}) | |
| # Filter to only groups with duplicates | |
| return {k: v for k, v in groups.items() if len(v) > 1} | |
| def _case_insensitive_match( | |
| self, graph_data: dict[str, Any] | |
| ) -> dict[str, list[dict[str, Any]]]: | |
| """Group nodes by case-insensitive normalized topic hash.""" | |
| groups: dict[str, list[dict[str, Any]]] = defaultdict(list) | |
| nodes = graph_data.get("nodes", {}) | |
| for node_id, node in nodes.items(): | |
| topic = node.get("topic", "") | |
| normalized = topic.lower().strip() | |
| normalized_hash = hashlib.sha256(normalized.encode("utf-8")).hexdigest() | |
| groups[normalized_hash].append({"id": node_id, **node}) | |
| return {k: v for k, v in groups.items() if len(v) > 1} | |
| def _fuzzy_match(self, graph_data: dict[str, Any]) -> dict[str, list[dict[str, Any]]]: | |
| """Group nodes by fuzzy similarity using SequenceMatcher.""" | |
| nodes = graph_data.get("nodes", {}) | |
| node_list = [{"id": node_id, **node} for node_id, node in nodes.items()] | |
| # Track which nodes have been assigned to a group | |
| assigned: set[str] = set() | |
| groups: dict[str, list[dict[str, Any]]] = {} | |
| group_counter = 0 | |
| for i, node_a in enumerate(node_list): | |
| if node_a["id"] in assigned: | |
| continue | |
| topic_a = node_a.get("topic", "") | |
| current_group = [node_a] | |
| assigned.add(node_a["id"]) | |
| for node_b in node_list[i + 1 :]: | |
| if node_b["id"] in assigned: | |
| continue | |
| topic_b = node_b.get("topic", "") | |
| similarity = SequenceMatcher(None, topic_a, topic_b).ratio() | |
| if similarity >= self.threshold: | |
| current_group.append(node_b) | |
| assigned.add(node_b["id"]) | |
| if len(current_group) > 1: | |
| group_key = f"fuzzy_group_{group_counter}" | |
| groups[group_key] = current_group | |
| group_counter += 1 | |
| return groups | |
| class DuplicateRemover: | |
| """Removes duplicate nodes from a graph while preserving structure.""" | |
| def remove_duplicates( | |
| self, | |
| graph_data: dict[str, Any], | |
| duplicates: dict[str, list[dict[str, Any]]], | |
| ) -> tuple[dict[str, Any], dict[str, Any]]: | |
| """ | |
| Remove duplicate nodes from the graph. | |
| For each duplicate group, keeps the node with the lowest ID and | |
| updates all references to point to the kept node. | |
| Args: | |
| graph_data: Original graph data | |
| duplicates: Duplicate groups from DuplicateDetector | |
| Returns: | |
| Tuple of (modified graph data, removal stats) | |
| """ | |
| # Deep copy to avoid modifying original | |
| result = json.loads(json.dumps(graph_data)) | |
| nodes = result.get("nodes", {}) | |
| removed_count = 0 | |
| merge_map: dict[str, str] = {} # Maps removed node ID -> kept node ID | |
| for _group_key, group_nodes in duplicates.items(): | |
| # Sort by ID (as int) to keep the lowest | |
| sorted_nodes = sorted(group_nodes, key=lambda n: int(n["id"])) | |
| keep_node = sorted_nodes[0] | |
| remove_nodes = sorted_nodes[1:] | |
| for remove_node in remove_nodes: | |
| remove_id = str(remove_node["id"]) | |
| keep_id = str(keep_node["id"]) | |
| merge_map[remove_id] = keep_id | |
| # Merge children from removed node into kept node | |
| if remove_id in nodes and keep_id in nodes: | |
| removed_children = nodes[remove_id].get("children", []) | |
| kept_children = nodes[keep_id].get("children", []) | |
| # Add children that aren't already present | |
| for child_id in removed_children: | |
| if child_id not in kept_children: | |
| kept_children.append(child_id) | |
| nodes[keep_id]["children"] = kept_children | |
| # Remove the duplicate node | |
| if remove_id in nodes: | |
| del nodes[remove_id] | |
| removed_count += 1 | |
| # Update all parent/child references | |
| self._update_references(nodes, merge_map) | |
| stats = { | |
| "removed_count": removed_count, | |
| "groups_processed": len(duplicates), | |
| "merge_map": merge_map, | |
| } | |
| return result, stats | |
| def _update_references(self, nodes: dict[str, Any], merge_map: dict[str, str]) -> None: | |
| """Update parent and child references to point to kept nodes.""" | |
| for node in nodes.values(): | |
| # Update children references | |
| children = node.get("children", []) | |
| node["children"] = [int(merge_map.get(str(c), str(c))) for c in children] | |
| # Remove self-references and duplicates | |
| node["children"] = list({c for c in node["children"] if str(c) != str(node.get("id"))}) | |
| # Update parent references | |
| parents = node.get("parents", []) | |
| node["parents"] = [int(merge_map.get(str(p), str(p))) for p in parents] | |
| # Remove references to removed nodes and self-references | |
| node["parents"] = list( | |
| { | |
| p | |
| for p in node["parents"] | |
| if str(p) not in merge_map and str(p) != str(node.get("id")) | |
| } | |
| ) | |
| def print_report( | |
| input_file: str, | |
| strategy: str, | |
| duplicates: dict[str, list[dict[str, Any]]], | |
| verbose: bool = False, | |
| ) -> None: | |
| """Print a formatted report of duplicate topics.""" | |
| print("Duplicate Detection Report") | |
| print("=" * 50) | |
| print(f"Strategy: {strategy}") | |
| print(f"Input: {input_file}") | |
| print() | |
| if not duplicates: | |
| print("No duplicates found.") | |
| return | |
| total_duplicates = sum(len(group) - 1 for group in duplicates.values()) | |
| print(f"Found {len(duplicates)} duplicate group(s):") | |
| print() | |
| for i, (key, group) in enumerate(duplicates.items(), 1): | |
| display_key = key[:_KEY_DISPLAY_LENGTH] + "..." if len(key) > _KEY_DISPLAY_LENGTH else key | |
| print(f"Group {i} (key: {display_key}):") | |
| for node in group: | |
| topic = node.get("topic", "") | |
| node_id = node.get("id") | |
| if verbose: | |
| # Show full topic | |
| print(f' - Node {node_id}: "{topic}"') | |
| else: | |
| # Truncate long topics | |
| if len(topic) > _TOPIC_DISPLAY_LENGTH: | |
| display_topic = topic[:_TOPIC_DISPLAY_LENGTH] + "..." | |
| else: | |
| display_topic = topic | |
| print(f' - Node {node_id}: "{display_topic}"') | |
| print() | |
| print("-" * 50) | |
| print(f"Summary: {total_duplicates} duplicate node(s) in {len(duplicates)} group(s)") | |
| def main() -> int: | |
| """Main entry point.""" | |
| parser = argparse.ArgumentParser( | |
| description="Detect and remove duplicate topics in JSON graph files.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s --input graph.json | |
| %(prog)s --input graph.json --strategy case-insensitive | |
| %(prog)s --input graph.json --strategy fuzzy --threshold 0.85 | |
| %(prog)s --input graph.json --output deduped.json --mode remove | |
| """, | |
| ) | |
| parser.add_argument( | |
| "--input", | |
| "-i", | |
| required=True, | |
| help="Input JSON graph file", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| "-o", | |
| help="Output file for deduplicated graph (required for remove mode)", | |
| ) | |
| parser.add_argument( | |
| "--mode", | |
| "-m", | |
| choices=["report", "remove"], | |
| default="report", | |
| help="Operation mode: report (default) or remove", | |
| ) | |
| parser.add_argument( | |
| "--strategy", | |
| "-s", | |
| choices=["exact", "case-insensitive", "fuzzy"], | |
| default="exact", | |
| help="Detection strategy (default: exact)", | |
| ) | |
| parser.add_argument( | |
| "--threshold", | |
| "-t", | |
| type=float, | |
| default=0.9, | |
| help="Similarity threshold for fuzzy matching (default: 0.9)", | |
| ) | |
| parser.add_argument( | |
| "--verbose", | |
| "-v", | |
| action="store_true", | |
| help="Show detailed output", | |
| ) | |
| args = parser.parse_args() | |
| # Validate arguments | |
| if args.mode == "remove" and not args.output: | |
| parser.error("--output is required when using --mode remove") | |
| if args.threshold < 0.0 or args.threshold > 1.0: | |
| parser.error("--threshold must be between 0.0 and 1.0") | |
| # Load input file | |
| input_path = Path(args.input) | |
| if not input_path.exists(): | |
| print(f"Error: Input file not found: {args.input}", file=sys.stderr) | |
| return 1 | |
| try: | |
| with open(input_path, encoding="utf-8") as f: | |
| graph_data = json.load(f) | |
| except json.JSONDecodeError as e: | |
| print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr) | |
| return 1 | |
| # Detect duplicates | |
| detector = DuplicateDetector(strategy=args.strategy, threshold=args.threshold) | |
| duplicates = detector.find_duplicates(graph_data) | |
| if args.mode == "report": | |
| print_report(args.input, args.strategy, duplicates, verbose=args.verbose) | |
| else: | |
| # Remove mode | |
| if not duplicates: | |
| print("No duplicates found. Output file not created.") | |
| return 0 | |
| remover = DuplicateRemover() | |
| deduped_graph, stats = remover.remove_duplicates(graph_data, duplicates) | |
| # Write output | |
| output_path = Path(args.output) | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| json.dump(deduped_graph, f, indent=2) | |
| print(f"Removed {stats['removed_count']} duplicate node(s)") | |
| print(f"Processed {stats['groups_processed']} duplicate group(s)") | |
| print(f"Output written to: {args.output}") | |
| if args.verbose: | |
| print("\nMerge mapping (removed -> kept):") | |
| for removed, kept in stats["merge_map"].items(): | |
| print(f" Node {removed} -> Node {kept}") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment