Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save ryderwishart/24f2b4782e2b3c7e869078240dca9c10 to your computer and use it in GitHub Desktop.

Select an option

Save ryderwishart/24f2b4782e2b3c7e869078240dca9c10 to your computer and use it in GitHub Desktop.
Score the naturalness of the translation using an ngram language model
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Corpus files already exist. Loading from disk...\n"
]
}
],
"source": [
"from collections import defaultdict, Counter\n",
"import re\n",
"import requests\n",
"import os\n",
"import math\n",
"import numpy as np\n",
"from typing import List, Tuple, Dict, Set\n",
"\n",
"def download_corpus(url, filename):\n",
" if not os.path.exists(filename):\n",
" print(f\"Downloading {filename}...\")\n",
" response = requests.get(url)\n",
" with open(filename, 'w', encoding='utf-8') as f:\n",
" f.write(response.text)\n",
" \n",
" with open(filename, 'r', encoding='utf-8') as f:\n",
" return f.read().split('\\n')\n",
"\n",
"class ImprovedStatisticalGlosser:\n",
" def __init__(self, n=3):\n",
" self.n = n\n",
" self.co_occurrences = defaultdict(lambda: defaultdict(int))\n",
" self.source_counts = defaultdict(int)\n",
" self.target_counts = defaultdict(int)\n",
" self.source_doc_freq = defaultdict(int)\n",
" self.target_doc_freq = defaultdict(int)\n",
" self.total_docs = 0\n",
" self.stop_words: Set[str] = set()\n",
"\n",
" def train(self, source_sentences: List[str], target_sentences: List[str]):\n",
" # Calculate stop words before training\n",
" self.calculate_stop_words(source_sentences + target_sentences)\n",
" \n",
" self.total_docs = len(source_sentences)\n",
" for idx, (source, target) in enumerate(zip(source_sentences, target_sentences)):\n",
" source_tokens = self.tokenize(source)\n",
" target_tokens = self.tokenize(target)\n",
" \n",
" source_ngrams = self.get_ngrams(source_tokens)\n",
" target_ngrams = self.get_ngrams(target_tokens)\n",
" \n",
" source_set = set(source_ngrams)\n",
" target_set = set(target_ngrams)\n",
" \n",
" for s_ngram in source_ngrams:\n",
" for t_ngram in target_ngrams:\n",
" self.co_occurrences[s_ngram][t_ngram] += 1\n",
" self.source_counts[s_ngram] += 1\n",
" \n",
" for t_ngram in target_ngrams:\n",
" self.target_counts[t_ngram] += 1\n",
" \n",
" for s_ngram in source_set:\n",
" self.source_doc_freq[s_ngram] += 1\n",
" for t_ngram in target_set:\n",
" self.target_doc_freq[t_ngram] += 1\n",
"\n",
" def calculate_stop_words(self, sentences: List[str], max_stop_words: int = 100):\n",
" word_counts = Counter(word for sentence in sentences for word in self.tokenize(sentence))\n",
" sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)\n",
" \n",
" # Calculate the elbow point\n",
" x = np.arange(1, len(sorted_words) + 1)\n",
" y = np.array([count for _, count in sorted_words])\n",
" \n",
" # Calculate the angle between consecutive points\n",
" angles = np.diff(np.arctan2(np.diff(y), np.diff(x)))\n",
" elbow_index = np.argmax(angles) + 1\n",
" \n",
" # Use the elbow point or max_stop_words, whichever is smaller\n",
" num_stop_words = min(elbow_index, max_stop_words)\n",
" self.stop_words = set(word for word, _ in sorted_words[:num_stop_words])\n",
"\n",
" def tokenize(self, sentence: str) -> List[str]:\n",
" tokens = re.findall(r'\\w+', sentence.lower())\n",
" return [token for token in tokens if token not in self.stop_words]\n",
"\n",
" def gloss(self, source_sentence, target_sentence):\n",
" source_tokens = self.tokenize(source_sentence)\n",
" target_tokens = self.tokenize(target_sentence)\n",
" \n",
" source_ngrams = self.get_ngrams(source_tokens)\n",
" target_ngrams = self.get_ngrams(target_tokens)\n",
" \n",
" mappings = []\n",
" \n",
" for i, s_ngram in enumerate(source_ngrams):\n",
" ngram_mappings = []\n",
" for j, t_ngram in enumerate(target_ngrams):\n",
" score = self.calculate_score(s_ngram, t_ngram, i, j, len(source_ngrams), len(target_ngrams))\n",
" if score > 0:\n",
" ngram_mappings.append((t_ngram, score))\n",
" \n",
" if not ngram_mappings: # If no n-gram matches, try individual tokens\n",
" s_tokens = s_ngram.split()\n",
" for s_token in s_tokens:\n",
" for j, t_ngram in enumerate(target_ngrams):\n",
" score = self.calculate_score(s_token, t_ngram, i, j, len(source_ngrams), len(target_ngrams))\n",
" if score > 0:\n",
" ngram_mappings.append((t_ngram, score))\n",
" \n",
" ngram_mappings.sort(key=lambda x: x[1], reverse=True)\n",
" mappings.append((s_ngram, ngram_mappings[:3])) # Keep top 3 mappings\n",
" \n",
" return mappings\n",
"\n",
" def calculate_score(self, source_ngram, target_ngram, source_pos, target_pos, source_len, target_len):\n",
" epsilon = 1e-10 # Smoothing factor\n",
" \n",
" co_occur = sum(self.co_occurrences[s_token][t_token] for s_token in source_ngram.split() for t_token in target_ngram.split())\n",
" co_occur += epsilon # Add smoothing\n",
" \n",
" source_count = sum(self.source_counts[s_token] for s_token in source_ngram.split()) + epsilon\n",
" target_count = sum(self.target_counts[t_token] for t_token in target_ngram.split()) + epsilon\n",
" \n",
" source_idf = sum(math.log((self.total_docs + epsilon) / (self.source_doc_freq[s_token] + epsilon)) for s_token in source_ngram.split())\n",
" target_idf = sum(math.log((self.total_docs + epsilon) / (self.target_doc_freq[t_token] + epsilon)) for t_token in target_ngram.split())\n",
" \n",
" tfidf_score = (co_occur / source_count) * source_idf * (co_occur / target_count) * target_idf\n",
" \n",
" position_score = 1 - abs((source_pos / source_len) - (target_pos / target_len))\n",
" \n",
" return tfidf_score * position_score\n",
"\n",
" def reassemble_alignments(self, mappings, source_tokens, target_tokens):\n",
" final_alignments = []\n",
" covered_source = set()\n",
" covered_target = set()\n",
" \n",
" for s_ngram, t_mappings in mappings:\n",
" if not t_mappings:\n",
" continue\n",
" \n",
" best_t_ngram, score = t_mappings[0]\n",
" s_indices = self.find_ngram_indices(source_tokens, s_ngram)\n",
" t_indices = self.find_ngram_indices(target_tokens, best_t_ngram)\n",
" \n",
" if not (set(s_indices) & covered_source) and not (set(t_indices) & covered_target):\n",
" final_alignments.append((s_ngram, best_t_ngram, score))\n",
" covered_source.update(s_indices)\n",
" covered_target.update(t_indices)\n",
" \n",
" # Handle unaligned tokens\n",
" for i, token in enumerate(source_tokens):\n",
" if i not in covered_source:\n",
" final_alignments.append((token, \"\", 0))\n",
" \n",
" final_alignments.sort(key=lambda x: source_tokens.index(x[0].split()[0]))\n",
" return final_alignments\n",
"\n",
" def get_ngrams(self, tokens):\n",
" return [' '.join(tokens[i:i+self.n]) for i in range(len(tokens)-self.n+1)]\n",
"\n",
" @staticmethod\n",
" def find_ngram_indices(tokens, ngram):\n",
" ngram_tokens = ngram.split()\n",
" return list(range(tokens.index(ngram_tokens[0]), tokens.index(ngram_tokens[0]) + len(ngram_tokens)))\n",
"\n",
"# Download the corpora\n",
"if __name__ == \"__main__\":\n",
" # Define corpus URLs and filenames\n",
" french_url = \"https://github.com/BibleNLP/ebible/raw/main/corpus/fra-fraLSG.txt\"\n",
" english_url = \"https://github.com/BibleNLP/ebible/raw/main/corpus/eng-eng-web.txt\"\n",
" french_filename = \"french_corpus.txt\"\n",
" english_filename = \"english_corpus.txt\"\n",
"\n",
" # Check if corpus files already exist\n",
" if os.path.exists(french_filename) and os.path.exists(english_filename):\n",
" print(\"Corpus files already exist. Loading from disk...\")\n",
" with open(french_filename, 'r', encoding='utf-8') as f:\n",
" french_corpus = f.readlines()\n",
" with open(english_filename, 'r', encoding='utf-8') as f:\n",
" english_corpus = f.readlines()\n",
" else:\n",
" print(\"Downloading corpus files...\")\n",
" french_corpus = download_corpus(french_url, french_filename)\n",
" english_corpus = download_corpus(english_url, english_filename)\n",
"\n",
" # Remove empty lines and ensure corpora are aligned\n",
" french_corpus = [line.strip() for line in french_corpus if line.strip()]\n",
" english_corpus = [line.strip() for line in english_corpus if line.strip()]\n",
" min_length = min(len(french_corpus), len(english_corpus))\n",
" french_corpus = french_corpus[:min_length]\n",
" english_corpus = english_corpus[:min_length]\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Average scrambled score: 0.0468\n",
"Average original score: 0.7950\n",
"Average score difference (original - scrambled): 0.7482\n"
]
}
],
"source": [
"import math\n",
"import random\n",
"from collections import defaultdict, Counter\n",
"from typing import List, Tuple, Dict\n",
"\n",
"class ImprovedNgramLanguageModel:\n",
" def __init__(self, ngram_orders=(1, 2, 3), num_scrambles=10, weights=None):\n",
" self.ngram_orders = ngram_orders\n",
" self.ngram_counts = {n: defaultdict(int) for n in ngram_orders}\n",
" self.context_counts = {n: defaultdict(int) for n in ngram_orders}\n",
" self.vocabulary_size = 0\n",
" self.num_scrambles = num_scrambles\n",
" self.corpus_unigram_dist = Counter()\n",
" self.total_chars = 0\n",
" self.corpus = [] # Add this line to store the corpus\n",
" \n",
" # Set default weights or use provided weights\n",
" if weights is None:\n",
" self.weights = {f'{n}-gram': 1/len(ngram_orders) for n in ngram_orders}\n",
" else:\n",
" total_weight = sum(weights.values())\n",
" self.weights = {k: v/total_weight for k, v in weights.items()} # Normalize weights\n",
"\n",
" def train(self, sentences: List[str]):\n",
" self.corpus = sentences # Store the corpus\n",
" for sentence in sentences:\n",
" sentence = sentence.lower()\n",
" self.corpus_unigram_dist.update(sentence)\n",
" self.total_chars += len(sentence)\n",
" for n in self.ngram_orders:\n",
" ngrams = self.get_ngrams(sentence, n)\n",
" for ngram in ngrams:\n",
" self.ngram_counts[n][ngram] += 1\n",
" self.context_counts[n][ngram[:-1]] += 1\n",
" self.vocabulary_size = len(self.corpus_unigram_dist)\n",
"\n",
" def score_sentence(self, sentence: str) -> Dict[str, float]:\n",
" sentence = sentence.lower()\n",
" scores = self._calculate_scores(sentence)\n",
" \n",
" # Special handling for unigrams\n",
" unigram_score = self._calculate_unigram_score(sentence)\n",
" scores['1-gram'] = unigram_score\n",
" \n",
" # Calculate scores for scrambled versions (except for unigrams)\n",
" scrambled_scores = [self._calculate_scores(self._scramble(sentence)) for _ in range(self.num_scrambles)]\n",
" avg_scrambled_scores = {k: sum(s[k] for s in scrambled_scores) / self.num_scrambles for k in scores.keys() if k != '1-gram'}\n",
" \n",
" # Normalize scores\n",
" normalized_scores = {'1-gram': unigram_score}\n",
" for k in scores.keys():\n",
" if k != '1-gram':\n",
" normalized_scores[k] = (scores[k] - avg_scrambled_scores[k]) / abs(avg_scrambled_scores[k]) if avg_scrambled_scores[k] != 0 else 0\n",
" \n",
" # Calculate weighted composite score\n",
" composite_score = sum(normalized_scores[k] * self.weights[k] for k in self.weights.keys())\n",
" \n",
" # Apply length normalization\n",
" length_factor = math.log(len(sentence)) / math.log(20) # 20 is a reference length\n",
" normalized_composite_score = composite_score * length_factor\n",
" \n",
" normalized_scores['composite'] = normalized_composite_score\n",
" \n",
" return normalized_scores\n",
"\n",
" def _calculate_scores(self, sentence: str) -> Dict[str, float]:\n",
" scores = {}\n",
" for n in self.ngram_orders:\n",
" if n == 1:\n",
" continue # Skip unigrams here, they're handled separately\n",
" ngrams = self.get_ngrams(sentence, n)\n",
" score = 0\n",
" for ngram in ngrams:\n",
" ngram_count = self.ngram_counts[n][ngram] + 1 # Add-one smoothing\n",
" context_count = self.context_counts[n][ngram[:-1]] + self.vocabulary_size\n",
" score += math.log(ngram_count / context_count)\n",
" scores[f'{n}-gram'] = score / len(ngrams) if ngrams else 0\n",
" return scores\n",
"\n",
" def _calculate_unigram_score(self, sentence: str) -> float:\n",
" sentence_unigram_dist = Counter(sentence)\n",
" score = 0\n",
" total_freq = sum(self.corpus_unigram_dist.values())\n",
" unknown_char_penalty = -1 # Penalty for each unknown character\n",
" unknown_chars = 0\n",
" \n",
" for char, count in sentence_unigram_dist.items():\n",
" corpus_freq = self.corpus_unigram_dist[char]\n",
" if corpus_freq == 0:\n",
" unknown_chars += count\n",
" continue\n",
" expected_freq = corpus_freq / total_freq\n",
" observed_freq = count / len(sentence)\n",
" score += observed_freq * math.log(observed_freq / expected_freq)\n",
" \n",
" # Apply penalty for unknown characters\n",
" score += unknown_char_penalty * (unknown_chars / len(sentence))\n",
" \n",
" return max(-1, min(1, score)) # Clamp score between -1 and 1\n",
"\n",
" def get_ngrams(self, sentence: str, n: int) -> List[Tuple[str]]:\n",
" return [tuple(sentence[i:i+n]) for i in range(len(sentence)-n+1)]\n",
" \n",
" def _scramble(self, sentence: str) -> str:\n",
" chars = list(sentence)\n",
" random.shuffle(chars)\n",
" return ''.join(chars)\n",
"\n",
" def test_model(self, n: int = 100) -> None:\n",
" # Select n random sentences from the corpus\n",
" test_sentences = random.sample(self.corpus, min(n, len(self.corpus)))\n",
" \n",
" total_diff = 0\n",
" all_scrambled_scores = []\n",
" all_original_scores = []\n",
" for sentence in test_sentences:\n",
" original_score = self.score_sentence(sentence)['composite']\n",
" scrambled_scores = [self.score_sentence(self._scramble(sentence))['composite'] for _ in range(5)]\n",
" avg_scrambled_score = sum(scrambled_scores) / len(scrambled_scores)\n",
" all_scrambled_scores.append(avg_scrambled_score)\n",
" all_original_scores.append(original_score)\n",
" diff = original_score - avg_scrambled_score\n",
" total_diff += diff\n",
"\n",
" avg_scrambled_scores = sum(all_scrambled_scores) / len(all_scrambled_scores)\n",
" avg_original_scores = sum(all_original_scores) / len(all_original_scores)\n",
" avg_diff = total_diff / len(test_sentences)\n",
" print(f\"Average scrambled score: {avg_scrambled_scores:.4f}\")\n",
" print(f\"Average original score: {avg_original_scores:.4f}\")\n",
" print(f\"Average score difference (original - scrambled): {avg_diff:.4f}\")\n",
"\n",
"# Test the improved model with custom weights\n",
"custom_weights = {'1-gram': 0.2, '2-gram': 0.3, '3-gram': 0.5}\n",
"improved_english_model = ImprovedNgramLanguageModel(ngram_orders=(1, 2, 3), num_scrambles=10, weights=custom_weights)\n",
"improved_english_model.train(english_corpus)\n",
"\n",
"# Run the self test\n",
"improved_english_model.test_model(n=200)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment