Skip to content

Instantly share code, notes, and snippets.

@sergioloppe
Last active September 11, 2023 20:21
Show Gist options
  • Select an option

  • Save sergioloppe/d1fa6c987b0123ca1edabdc67d5049e4 to your computer and use it in GitHub Desktop.

Select an option

Save sergioloppe/d1fa6c987b0123ca1edabdc67d5049e4 to your computer and use it in GitHub Desktop.
Pairwise comparison for personal data
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "1aec2d16",
"metadata": {},
"source": [
"### Dependencies\n",
"Install\n",
"```\n",
"pip install mmh3 textdistance\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "714e3fa1",
"metadata": {
"collapsed": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": []
}
],
"source": [
"pip install mmh3 textdistance"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "9482cff5",
"metadata": {},
"outputs": [],
"source": [
"import mmh3\n",
"import textdistance"
]
},
{
"cell_type": "code",
"execution_count": 142,
"id": "8f5c6992",
"metadata": {},
"outputs": [],
"source": [
"import mmh3\n",
"import textdistance\n",
"\n",
"def jaro_winkler_distance(R1, R2):\n",
" return textdistance.jaro_winkler(R1, R2)\n",
"\n",
"def levenshtein_distance(R1, R2):\n",
" return textdistance.levenshtein(R1, R2)\n",
"\n",
"def levenshtein_probability(R1,R2):\n",
" D = levenshtein_distance(R1, R2)\n",
" max_len = max(len(R1), len(R2))\n",
" P = 1 - D / max_len\n",
" return P\n",
"\n",
"def normalized_weighted_average(values, weights):\n",
" return sum(weights[i] * values[i] for i in range(len(values))) / sum(weights)\n",
"\n",
"def compute_normalized_weighted_average(x, y, distance_function, weights):\n",
" distances = [distance_function(x[i], y[i]) for i in range(len(x))]\n",
" return normalized_weighted_average(distances, weights)\n",
"\n",
"def compare(x, y, w):\n",
" jw_s1 = compute_normalized_weighted_average(x, y, jaro_winkler_distance, w)\n",
" lv_s2 = compute_normalized_weighted_average(x, y, levenshtein_probability, w)\n",
" \n",
" s = normalized_weighted_average([jw_s1, lv_s2], [0.6, 0.4])\n",
" # print(f'{s} <- {jw_s1} {lv_s2}')\n",
" return s\n",
"\n",
"def identify_duplicates(x, Y, w, threshold):\n",
" duplicates = [y for y in Y if compare(x, y, w) >= threshold]\n",
" return duplicates\n"
]
},
{
"cell_type": "code",
"execution_count": 143,
"id": "7c97bc22",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[['John', 'Doe', 'us', '1990-01-01']]"
]
},
"execution_count": 143,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Testing the thing\n",
"x = [\"John\", \"Doe\", \"us\", \"1990-01-01\"]\n",
"Y = [\n",
" [\"John\", \"Doe\", \"us\", \"1990-01-01\"],\n",
" [\"Jon\", \"Doe\", \"us\", \"1990-01-01\"],\n",
" [\"John\", \"Dow\", \"us\", \"1990-01-01\"],\n",
" [\"John\", \"Doe\", \"ca\", \"1990-01-01\"],\n",
" [\"John\", \"Doe\", \"us\", \"1991-01-01\"]\n",
"]\n",
"\n",
"w = [0.25, 0.25, 0.1, 0.4]\n",
"th = 0.97\n",
"\n",
"identify_duplicates(x, Y, w, th)"
]
},
{
"cell_type": "code",
"execution_count": 153,
"id": "22566714",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[['Johannes', 'Dough', 'uk', '1990-01-01'], ['Giovanni', 'Dough', 'du', '1999-01-01'], ['John', 'Dow', 'fr', '1985-01-01'], ['Johnny', 'Doww', 'fr', '1991-01-01'], ['Giovanni', 'Doughh', 'pt', '1996-01-01']]\n"
]
}
],
"source": [
"# Create a Random Dataset\n",
"import random\n",
"\n",
"def generate_dataset(n, x):\n",
" \"\"\"\n",
" x = [\"Johannes\", \"Dough\", \"uk\", \"1990-01-01\"]\n",
" \"\"\"\n",
" first_names = [\"John\", \"Jon\", \"Jonathan\", \"Johnny\", \"Jonny\", \"Johannes\", \"Juan\", \"Joan\", \"Jean\", \"Giovanni\"]\n",
" last_names = [\"Doe\", \"Dow\", \"Dough\", \"Doh\", \"Do\", \"Dou\", \"Doww\", \"Dowe\", \"Dohh\", \"Doughh\"]\n",
" nationalities = [\"us\", \"ca\", \"uk\", \"au\", \"de\", \"fr\", \"es\", \"it\", \"du\", \"pt\"]\n",
" years = [str(year) for year in range(1980, 2000)]\n",
" \n",
" Y = [x]\n",
"\n",
" for _ in range(n):\n",
" first_name = random.choice(first_names)\n",
" last_name = random.choice(last_names)\n",
" nationality = random.choice(nationalities)\n",
" year = random.choice(years)\n",
" date_of_birth = f\"{year}-01-01\"\n",
" Y.append([first_name, last_name, nationality, date_of_birth])\n",
"\n",
" return x, Y\n",
"\n",
"# Generate 10000 records\n",
"x = [\"Johannes\", \"Dough\", \"uk\", \"1990-01-01\"]\n",
"x, Y = generate_dataset(10000, x)\n",
"\n",
"print(Y[:5]) # Print the first 5 records"
]
},
{
"cell_type": "code",
"execution_count": 156,
"id": "a90e2f49",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['Johannes', 'Dough', 'uk', '1990-01-01']\n"
]
},
{
"data": {
"text/plain": [
"[['Johannes', 'Dough', 'uk', '1990-01-01'],\n",
" ['Johannes', 'Doughh', 'uk', '1990-01-01']]"
]
},
"execution_count": 156,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"w = [0.25, 0.25, 0.1, 0.4]\n",
"th = 0.97\n",
"\n",
"print(x)\n",
"identify_duplicates(x, Y, w, th)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
# Dependencies
# pip install mmh3 textdistance
import random
import mmh3
import textdistance
####
# Define the main algorithm
####
def jaro_winkler_distance(R1, R2):
return textdistance.jaro_winkler(R1, R2)
def levenshtein_distance(R1, R2):
return textdistance.levenshtein(R1, R2)
def levenshtein_probability(R1,R2):
D = levenshtein_distance(R1, R2)
max_len = max(len(R1), len(R2))
P = 1 - D / max_len
return P
def normalized_weighted_average(values, weights):
return sum(weights[i] * values[i] for i in range(len(values))) / sum(weights)
def compute_normalized_weighted_average(x, y, distance_function, weights):
distances = [distance_function(x[i], y[i]) for i in range(len(x))]
return normalized_weighted_average(distances, weights)
def compare(x, y, w):
jw_s1 = compute_normalized_weighted_average(x, y, jaro_winkler_distance, w)
lv_s2 = compute_normalized_weighted_average(x, y, levenshtein_probability, w)
s = normalized_weighted_average([jw_s1, lv_s2], [0.6, 0.4])
# print(f'{s} <- {jw_s1} {lv_s2}')
return s
def identify_duplicates(x, Y, w, threshold):
duplicates = [y for y in Y if compare(x, y, w) >= threshold]
return duplicates
####
# Create a Random Dataset
####
def generate_dataset(n, x):
"""
x = ["Johannes", "Dough", "uk", "1990-01-01"]
"""
first_names = ["John", "Jon", "Jonathan", "Johnny", "Jonny", "Johannes", "Juan", "Joan", "Jean", "Giovanni"]
last_names = ["Doe", "Dow", "Dough", "Doh", "Do", "Dou", "Doww", "Dowe", "Dohh", "Doughh"]
nationalities = ["us", "ca", "uk", "au", "de", "fr", "es", "it", "du", "pt"]
years = [str(year) for year in range(1980, 2000)]
Y = [x]
for _ in range(n):
first_name = random.choice(first_names)
last_name = random.choice(last_names)
nationality = random.choice(nationalities)
year = random.choice(years)
date_of_birth = f"{year}-01-01"
Y.append([first_name, last_name, nationality, date_of_birth])
return x, Y
####
# Testing the thing
####
# Weights and threshold
w = [0.25, 0.25, 0.1, 0.4]
th = 0.97
# Generate 10000 records
x = ["Johannes", "Dough", "uk", "1990-01-01"]
x, Y = generate_dataset(10000, x)
print(x)
print(Y[:5]) # Print the first 5 records
print("-----")
identify_duplicates(x, Y, w, th)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment