Created
July 23, 2024 22:33
-
-
Save ryderwishart/24f2b4782e2b3c7e869078240dca9c10 to your computer and use it in GitHub Desktop.
Score the naturalness of the translation using an ngram language model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Corpus files already exist. Loading from disk...\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "from collections import defaultdict, Counter\n", | |
| "import re\n", | |
| "import requests\n", | |
| "import os\n", | |
| "import math\n", | |
| "import numpy as np\n", | |
| "from typing import List, Tuple, Dict, Set\n", | |
| "\n", | |
| "def download_corpus(url, filename):\n", | |
| " if not os.path.exists(filename):\n", | |
| " print(f\"Downloading {filename}...\")\n", | |
| " response = requests.get(url)\n", | |
| " with open(filename, 'w', encoding='utf-8') as f:\n", | |
| " f.write(response.text)\n", | |
| " \n", | |
| " with open(filename, 'r', encoding='utf-8') as f:\n", | |
| " return f.read().split('\\n')\n", | |
| "\n", | |
| "class ImprovedStatisticalGlosser:\n", | |
| " def __init__(self, n=3):\n", | |
| " self.n = n\n", | |
| " self.co_occurrences = defaultdict(lambda: defaultdict(int))\n", | |
| " self.source_counts = defaultdict(int)\n", | |
| " self.target_counts = defaultdict(int)\n", | |
| " self.source_doc_freq = defaultdict(int)\n", | |
| " self.target_doc_freq = defaultdict(int)\n", | |
| " self.total_docs = 0\n", | |
| " self.stop_words: Set[str] = set()\n", | |
| "\n", | |
| " def train(self, source_sentences: List[str], target_sentences: List[str]):\n", | |
| " # Calculate stop words before training\n", | |
| " self.calculate_stop_words(source_sentences + target_sentences)\n", | |
| " \n", | |
| " self.total_docs = len(source_sentences)\n", | |
| " for idx, (source, target) in enumerate(zip(source_sentences, target_sentences)):\n", | |
| " source_tokens = self.tokenize(source)\n", | |
| " target_tokens = self.tokenize(target)\n", | |
| " \n", | |
| " source_ngrams = self.get_ngrams(source_tokens)\n", | |
| " target_ngrams = self.get_ngrams(target_tokens)\n", | |
| " \n", | |
| " source_set = set(source_ngrams)\n", | |
| " target_set = set(target_ngrams)\n", | |
| " \n", | |
| " for s_ngram in source_ngrams:\n", | |
| " for t_ngram in target_ngrams:\n", | |
| " self.co_occurrences[s_ngram][t_ngram] += 1\n", | |
| " self.source_counts[s_ngram] += 1\n", | |
| " \n", | |
| " for t_ngram in target_ngrams:\n", | |
| " self.target_counts[t_ngram] += 1\n", | |
| " \n", | |
| " for s_ngram in source_set:\n", | |
| " self.source_doc_freq[s_ngram] += 1\n", | |
| " for t_ngram in target_set:\n", | |
| " self.target_doc_freq[t_ngram] += 1\n", | |
| "\n", | |
| " def calculate_stop_words(self, sentences: List[str], max_stop_words: int = 100):\n", | |
| " word_counts = Counter(word for sentence in sentences for word in self.tokenize(sentence))\n", | |
| " sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)\n", | |
| " \n", | |
| " # Calculate the elbow point\n", | |
| " x = np.arange(1, len(sorted_words) + 1)\n", | |
| " y = np.array([count for _, count in sorted_words])\n", | |
| " \n", | |
| " # Calculate the angle between consecutive points\n", | |
| " angles = np.diff(np.arctan2(np.diff(y), np.diff(x)))\n", | |
| " elbow_index = np.argmax(angles) + 1\n", | |
| " \n", | |
| " # Use the elbow point or max_stop_words, whichever is smaller\n", | |
| " num_stop_words = min(elbow_index, max_stop_words)\n", | |
| " self.stop_words = set(word for word, _ in sorted_words[:num_stop_words])\n", | |
| "\n", | |
| " def tokenize(self, sentence: str) -> List[str]:\n", | |
| " tokens = re.findall(r'\\w+', sentence.lower())\n", | |
| " return [token for token in tokens if token not in self.stop_words]\n", | |
| "\n", | |
| " def gloss(self, source_sentence, target_sentence):\n", | |
| " source_tokens = self.tokenize(source_sentence)\n", | |
| " target_tokens = self.tokenize(target_sentence)\n", | |
| " \n", | |
| " source_ngrams = self.get_ngrams(source_tokens)\n", | |
| " target_ngrams = self.get_ngrams(target_tokens)\n", | |
| " \n", | |
| " mappings = []\n", | |
| " \n", | |
| " for i, s_ngram in enumerate(source_ngrams):\n", | |
| " ngram_mappings = []\n", | |
| " for j, t_ngram in enumerate(target_ngrams):\n", | |
| " score = self.calculate_score(s_ngram, t_ngram, i, j, len(source_ngrams), len(target_ngrams))\n", | |
| " if score > 0:\n", | |
| " ngram_mappings.append((t_ngram, score))\n", | |
| " \n", | |
| " if not ngram_mappings: # If no n-gram matches, try individual tokens\n", | |
| " s_tokens = s_ngram.split()\n", | |
| " for s_token in s_tokens:\n", | |
| " for j, t_ngram in enumerate(target_ngrams):\n", | |
| " score = self.calculate_score(s_token, t_ngram, i, j, len(source_ngrams), len(target_ngrams))\n", | |
| " if score > 0:\n", | |
| " ngram_mappings.append((t_ngram, score))\n", | |
| " \n", | |
| " ngram_mappings.sort(key=lambda x: x[1], reverse=True)\n", | |
| " mappings.append((s_ngram, ngram_mappings[:3])) # Keep top 3 mappings\n", | |
| " \n", | |
| " return mappings\n", | |
| "\n", | |
| " def calculate_score(self, source_ngram, target_ngram, source_pos, target_pos, source_len, target_len):\n", | |
| " epsilon = 1e-10 # Smoothing factor\n", | |
| " \n", | |
| " co_occur = sum(self.co_occurrences[s_token][t_token] for s_token in source_ngram.split() for t_token in target_ngram.split())\n", | |
| " co_occur += epsilon # Add smoothing\n", | |
| " \n", | |
| " source_count = sum(self.source_counts[s_token] for s_token in source_ngram.split()) + epsilon\n", | |
| " target_count = sum(self.target_counts[t_token] for t_token in target_ngram.split()) + epsilon\n", | |
| " \n", | |
| " source_idf = sum(math.log((self.total_docs + epsilon) / (self.source_doc_freq[s_token] + epsilon)) for s_token in source_ngram.split())\n", | |
| " target_idf = sum(math.log((self.total_docs + epsilon) / (self.target_doc_freq[t_token] + epsilon)) for t_token in target_ngram.split())\n", | |
| " \n", | |
| " tfidf_score = (co_occur / source_count) * source_idf * (co_occur / target_count) * target_idf\n", | |
| " \n", | |
| " position_score = 1 - abs((source_pos / source_len) - (target_pos / target_len))\n", | |
| " \n", | |
| " return tfidf_score * position_score\n", | |
| "\n", | |
| " def reassemble_alignments(self, mappings, source_tokens, target_tokens):\n", | |
| " final_alignments = []\n", | |
| " covered_source = set()\n", | |
| " covered_target = set()\n", | |
| " \n", | |
| " for s_ngram, t_mappings in mappings:\n", | |
| " if not t_mappings:\n", | |
| " continue\n", | |
| " \n", | |
| " best_t_ngram, score = t_mappings[0]\n", | |
| " s_indices = self.find_ngram_indices(source_tokens, s_ngram)\n", | |
| " t_indices = self.find_ngram_indices(target_tokens, best_t_ngram)\n", | |
| " \n", | |
| " if not (set(s_indices) & covered_source) and not (set(t_indices) & covered_target):\n", | |
| " final_alignments.append((s_ngram, best_t_ngram, score))\n", | |
| " covered_source.update(s_indices)\n", | |
| " covered_target.update(t_indices)\n", | |
| " \n", | |
| " # Handle unaligned tokens\n", | |
| " for i, token in enumerate(source_tokens):\n", | |
| " if i not in covered_source:\n", | |
| " final_alignments.append((token, \"\", 0))\n", | |
| " \n", | |
| " final_alignments.sort(key=lambda x: source_tokens.index(x[0].split()[0]))\n", | |
| " return final_alignments\n", | |
| "\n", | |
| " def get_ngrams(self, tokens):\n", | |
| " return [' '.join(tokens[i:i+self.n]) for i in range(len(tokens)-self.n+1)]\n", | |
| "\n", | |
| " @staticmethod\n", | |
| " def find_ngram_indices(tokens, ngram):\n", | |
| " ngram_tokens = ngram.split()\n", | |
| " return list(range(tokens.index(ngram_tokens[0]), tokens.index(ngram_tokens[0]) + len(ngram_tokens)))\n", | |
| "\n", | |
| "# Download the corpora\n", | |
| "if __name__ == \"__main__\":\n", | |
| " # Define corpus URLs and filenames\n", | |
| " french_url = \"https://github.com/BibleNLP/ebible/raw/main/corpus/fra-fraLSG.txt\"\n", | |
| " english_url = \"https://github.com/BibleNLP/ebible/raw/main/corpus/eng-eng-web.txt\"\n", | |
| " french_filename = \"french_corpus.txt\"\n", | |
| " english_filename = \"english_corpus.txt\"\n", | |
| "\n", | |
| " # Check if corpus files already exist\n", | |
| " if os.path.exists(french_filename) and os.path.exists(english_filename):\n", | |
| " print(\"Corpus files already exist. Loading from disk...\")\n", | |
| " with open(french_filename, 'r', encoding='utf-8') as f:\n", | |
| " french_corpus = f.readlines()\n", | |
| " with open(english_filename, 'r', encoding='utf-8') as f:\n", | |
| " english_corpus = f.readlines()\n", | |
| " else:\n", | |
| " print(\"Downloading corpus files...\")\n", | |
| " french_corpus = download_corpus(french_url, french_filename)\n", | |
| " english_corpus = download_corpus(english_url, english_filename)\n", | |
| "\n", | |
| " # Remove empty lines and ensure corpora are aligned\n", | |
| " french_corpus = [line.strip() for line in french_corpus if line.strip()]\n", | |
| " english_corpus = [line.strip() for line in english_corpus if line.strip()]\n", | |
| " min_length = min(len(french_corpus), len(english_corpus))\n", | |
| " french_corpus = french_corpus[:min_length]\n", | |
| " english_corpus = english_corpus[:min_length]\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Average scrambled score: 0.0468\n", | |
| "Average original score: 0.7950\n", | |
| "Average score difference (original - scrambled): 0.7482\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "import math\n", | |
| "import random\n", | |
| "from collections import defaultdict, Counter\n", | |
| "from typing import List, Tuple, Dict\n", | |
| "\n", | |
| "class ImprovedNgramLanguageModel:\n", | |
| " def __init__(self, ngram_orders=(1, 2, 3), num_scrambles=10, weights=None):\n", | |
| " self.ngram_orders = ngram_orders\n", | |
| " self.ngram_counts = {n: defaultdict(int) for n in ngram_orders}\n", | |
| " self.context_counts = {n: defaultdict(int) for n in ngram_orders}\n", | |
| " self.vocabulary_size = 0\n", | |
| " self.num_scrambles = num_scrambles\n", | |
| " self.corpus_unigram_dist = Counter()\n", | |
| " self.total_chars = 0\n", | |
| " self.corpus = [] # Add this line to store the corpus\n", | |
| " \n", | |
| " # Set default weights or use provided weights\n", | |
| " if weights is None:\n", | |
| " self.weights = {f'{n}-gram': 1/len(ngram_orders) for n in ngram_orders}\n", | |
| " else:\n", | |
| " total_weight = sum(weights.values())\n", | |
| " self.weights = {k: v/total_weight for k, v in weights.items()} # Normalize weights\n", | |
| "\n", | |
| " def train(self, sentences: List[str]):\n", | |
| " self.corpus = sentences # Store the corpus\n", | |
| " for sentence in sentences:\n", | |
| " sentence = sentence.lower()\n", | |
| " self.corpus_unigram_dist.update(sentence)\n", | |
| " self.total_chars += len(sentence)\n", | |
| " for n in self.ngram_orders:\n", | |
| " ngrams = self.get_ngrams(sentence, n)\n", | |
| " for ngram in ngrams:\n", | |
| " self.ngram_counts[n][ngram] += 1\n", | |
| " self.context_counts[n][ngram[:-1]] += 1\n", | |
| " self.vocabulary_size = len(self.corpus_unigram_dist)\n", | |
| "\n", | |
| " def score_sentence(self, sentence: str) -> Dict[str, float]:\n", | |
| " sentence = sentence.lower()\n", | |
| " scores = self._calculate_scores(sentence)\n", | |
| " \n", | |
| " # Special handling for unigrams\n", | |
| " unigram_score = self._calculate_unigram_score(sentence)\n", | |
| " scores['1-gram'] = unigram_score\n", | |
| " \n", | |
| " # Calculate scores for scrambled versions (except for unigrams)\n", | |
| " scrambled_scores = [self._calculate_scores(self._scramble(sentence)) for _ in range(self.num_scrambles)]\n", | |
| " avg_scrambled_scores = {k: sum(s[k] for s in scrambled_scores) / self.num_scrambles for k in scores.keys() if k != '1-gram'}\n", | |
| " \n", | |
| " # Normalize scores\n", | |
| " normalized_scores = {'1-gram': unigram_score}\n", | |
| " for k in scores.keys():\n", | |
| " if k != '1-gram':\n", | |
| " normalized_scores[k] = (scores[k] - avg_scrambled_scores[k]) / abs(avg_scrambled_scores[k]) if avg_scrambled_scores[k] != 0 else 0\n", | |
| " \n", | |
| " # Calculate weighted composite score\n", | |
| " composite_score = sum(normalized_scores[k] * self.weights[k] for k in self.weights.keys())\n", | |
| " \n", | |
| " # Apply length normalization\n", | |
| " length_factor = math.log(len(sentence)) / math.log(20) # 20 is a reference length\n", | |
| " normalized_composite_score = composite_score * length_factor\n", | |
| " \n", | |
| " normalized_scores['composite'] = normalized_composite_score\n", | |
| " \n", | |
| " return normalized_scores\n", | |
| "\n", | |
| " def _calculate_scores(self, sentence: str) -> Dict[str, float]:\n", | |
| " scores = {}\n", | |
| " for n in self.ngram_orders:\n", | |
| " if n == 1:\n", | |
| " continue # Skip unigrams here, they're handled separately\n", | |
| " ngrams = self.get_ngrams(sentence, n)\n", | |
| " score = 0\n", | |
| " for ngram in ngrams:\n", | |
| " ngram_count = self.ngram_counts[n][ngram] + 1 # Add-one smoothing\n", | |
| " context_count = self.context_counts[n][ngram[:-1]] + self.vocabulary_size\n", | |
| " score += math.log(ngram_count / context_count)\n", | |
| " scores[f'{n}-gram'] = score / len(ngrams) if ngrams else 0\n", | |
| " return scores\n", | |
| "\n", | |
| " def _calculate_unigram_score(self, sentence: str) -> float:\n", | |
| " sentence_unigram_dist = Counter(sentence)\n", | |
| " score = 0\n", | |
| " total_freq = sum(self.corpus_unigram_dist.values())\n", | |
| " unknown_char_penalty = -1 # Penalty for each unknown character\n", | |
| " unknown_chars = 0\n", | |
| " \n", | |
| " for char, count in sentence_unigram_dist.items():\n", | |
| " corpus_freq = self.corpus_unigram_dist[char]\n", | |
| " if corpus_freq == 0:\n", | |
| " unknown_chars += count\n", | |
| " continue\n", | |
| " expected_freq = corpus_freq / total_freq\n", | |
| " observed_freq = count / len(sentence)\n", | |
| " score += observed_freq * math.log(observed_freq / expected_freq)\n", | |
| " \n", | |
| " # Apply penalty for unknown characters\n", | |
| " score += unknown_char_penalty * (unknown_chars / len(sentence))\n", | |
| " \n", | |
| " return max(-1, min(1, score)) # Clamp score between -1 and 1\n", | |
| "\n", | |
| " def get_ngrams(self, sentence: str, n: int) -> List[Tuple[str]]:\n", | |
| " return [tuple(sentence[i:i+n]) for i in range(len(sentence)-n+1)]\n", | |
| " \n", | |
| " def _scramble(self, sentence: str) -> str:\n", | |
| " chars = list(sentence)\n", | |
| " random.shuffle(chars)\n", | |
| " return ''.join(chars)\n", | |
| "\n", | |
| " def test_model(self, n: int = 100) -> None:\n", | |
| " # Select n random sentences from the corpus\n", | |
| " test_sentences = random.sample(self.corpus, min(n, len(self.corpus)))\n", | |
| " \n", | |
| " total_diff = 0\n", | |
| " all_scrambled_scores = []\n", | |
| " all_original_scores = []\n", | |
| " for sentence in test_sentences:\n", | |
| " original_score = self.score_sentence(sentence)['composite']\n", | |
| " scrambled_scores = [self.score_sentence(self._scramble(sentence))['composite'] for _ in range(5)]\n", | |
| " avg_scrambled_score = sum(scrambled_scores) / len(scrambled_scores)\n", | |
| " all_scrambled_scores.append(avg_scrambled_score)\n", | |
| " all_original_scores.append(original_score)\n", | |
| " diff = original_score - avg_scrambled_score\n", | |
| " total_diff += diff\n", | |
| "\n", | |
| " avg_scrambled_scores = sum(all_scrambled_scores) / len(all_scrambled_scores)\n", | |
| " avg_original_scores = sum(all_original_scores) / len(all_original_scores)\n", | |
| " avg_diff = total_diff / len(test_sentences)\n", | |
| " print(f\"Average scrambled score: {avg_scrambled_scores:.4f}\")\n", | |
| " print(f\"Average original score: {avg_original_scores:.4f}\")\n", | |
| " print(f\"Average score difference (original - scrambled): {avg_diff:.4f}\")\n", | |
| "\n", | |
| "# Test the improved model with custom weights\n", | |
| "custom_weights = {'1-gram': 0.2, '2-gram': 0.3, '3-gram': 0.5}\n", | |
| "improved_english_model = ImprovedNgramLanguageModel(ngram_orders=(1, 2, 3), num_scrambles=10, weights=custom_weights)\n", | |
| "improved_english_model.train(english_corpus)\n", | |
| "\n", | |
| "# Run the self test\n", | |
| "improved_english_model.test_model(n=200)" | |
| ] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 3", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.12.4" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 2 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment