Last active
November 7, 2025 15:07
-
-
Save mauvieira/387f92fa1d19115e50639f3772748e0c to your computer and use it in GitHub Desktop.
Kindle Vocabulary Builder to .csv
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import csv | |
| import json | |
| import os | |
| import sqlite3 | |
| import sys | |
| import threading | |
| import time | |
| import urllib.parse | |
| import urllib.request | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from typing import Any, Dict, Iterable, List, Optional, Tuple | |
| PROCESSED_WORDS_FILE = "processed_words.json" | |
| BACKUP_DIR = "kindle_flashcards_backup" | |
| KINDLE_PATH = "/Volumes/Kindle" | |
| LOG_FILE = "kindle-to-mochi.log" | |
| class FreeDictionaryRateLimiter: | |
| def __init__(self, interval_seconds: float) -> None: | |
| self.interval_seconds = interval_seconds | |
| self._lock = threading.Lock() | |
| self._last_call = 0.0 | |
| def acquire(self) -> None: | |
| with self._lock: | |
| now = time.monotonic() | |
| remaining = self.interval_seconds - (now - self._last_call) | |
| if remaining > 0: | |
| time.sleep(remaining) | |
| self._last_call = time.monotonic() | |
| FREE_DICTIONARY_RATE_LIMITER = FreeDictionaryRateLimiter(1.2) | |
| def log_message(message: str) -> None: | |
| timestamp = datetime.now().isoformat() | |
| line = f"[{timestamp}] {message}" | |
| print(line) | |
| try: | |
| with open(LOG_FILE, "a", encoding="utf-8") as handle: | |
| handle.write(f"{line}\n") | |
| except OSError: | |
| pass | |
| @dataclass | |
| class DefinitionData: | |
| definition: str | |
| example: str | |
| part_of_speech: str | |
| synonyms: str | |
| source: str | |
| def load_processed_words() -> Tuple[Dict[str, Dict[str, Any]], Optional[str]]: | |
| if not os.path.exists(PROCESSED_WORDS_FILE): | |
| return {}, None | |
| try: | |
| with open(PROCESSED_WORDS_FILE, "r", encoding="utf-8") as handle: | |
| data = json.load(handle) | |
| except (OSError, json.JSONDecodeError) as exc: | |
| message = f"failed to load processed words: {exc}" | |
| log_message(message) | |
| return {}, message | |
| if not isinstance(data, dict): | |
| message = "processed words file is invalid" | |
| log_message(message) | |
| return {}, message | |
| return data, None | |
| def save_processed_words(processed_words: Dict[str, Dict[str, Any]]) -> Optional[str]: | |
| try: | |
| with open(PROCESSED_WORDS_FILE, "w", encoding="utf-8") as handle: | |
| json.dump(processed_words, handle, indent=2, ensure_ascii=False) | |
| except OSError as exc: | |
| message = f"failed to save processed words: {exc}" | |
| log_message(message) | |
| return message | |
| return None | |
| def fetch_url( | |
| url: str, headers: Optional[Dict[str, str]] = None | |
| ) -> Tuple[Optional[Any], Optional[str]]: | |
| request = urllib.request.Request(url) | |
| if headers: | |
| for key, value in headers.items(): | |
| request.add_header(key, value) | |
| try: | |
| with urllib.request.urlopen(request, timeout=10) as response: | |
| if response.status != 200: | |
| message = f"http status {response.status} for {url}" | |
| log_message(message) | |
| return None, message | |
| raw = response.read().decode() | |
| return json.loads(raw), None | |
| except Exception as exc: | |
| message = f"request error for '{url}': {exc}" | |
| log_message(message) | |
| return None, message | |
| def normalize_synonyms(synonyms: Iterable[str]) -> str: | |
| items = [item.strip() for item in synonyms if item] | |
| if not items: | |
| return "" | |
| return ", ".join(items[:3]) | |
| def clean_example(text: str) -> str: | |
| replacements = { | |
| "{wi}": "", | |
| "{/wi}": "", | |
| "{it}": "", | |
| "{/it}": "", | |
| } | |
| cleaned = text | |
| for old, new in replacements.items(): | |
| cleaned = cleaned.replace(old, new) | |
| return cleaned | |
| def get_definition_wordsapi( | |
| word: str, api_key: Optional[str] | |
| ) -> Tuple[Optional[DefinitionData], Optional[str]]: | |
| if not api_key: | |
| message = "wordsapi key not provided" | |
| log_message(message) | |
| return None, message | |
| url = f"https://wordsapiv1.p.rapidapi.com/words/{urllib.parse.quote(word)}" | |
| headers = { | |
| "x-rapidapi-host": "wordsapiv1.p.rapidapi.com", | |
| "x-rapidapi-key": api_key, | |
| } | |
| data, err = fetch_url(url, headers) | |
| if err: | |
| message = f"wordsapi: {err}" | |
| log_message(message) | |
| return None, message | |
| if not isinstance(data, dict): | |
| message = "wordsapi: invalid response structure" | |
| log_message(message) | |
| return None, message | |
| results = data.get("results") | |
| if not isinstance(results, list) or not results: | |
| message = "wordsapi: no results" | |
| log_message(message) | |
| return None, message | |
| result = results[0] | |
| definition = result.get("definition", "") | |
| if not definition: | |
| message = "wordsapi: empty definition" | |
| log_message(message) | |
| return None, message | |
| part_of_speech = result.get("partOfSpeech", "") | |
| synonyms = ( | |
| normalize_synonyms(result.get("synonyms", [])) | |
| if isinstance(result.get("synonyms"), list) | |
| else "" | |
| ) | |
| return DefinitionData(definition, "", part_of_speech, synonyms, "WordsAPI"), None | |
| def get_definition_merriam_webster( | |
| word: str, api_key: Optional[str], dictionary_type: str | |
| ) -> Tuple[Optional[DefinitionData], Optional[str]]: | |
| if not api_key: | |
| message = f"merriam-webster {dictionary_type}: key not provided" | |
| log_message(message) | |
| return None, message | |
| url = ( | |
| "https://www.dictionaryapi.com/api/v3/references/" | |
| f"{dictionary_type}/json/{urllib.parse.quote(word)}?key={api_key}" | |
| ) | |
| data, err = fetch_url(url) | |
| if err: | |
| message = f"merriam-webster {dictionary_type}: {err}" | |
| log_message(message) | |
| return None, message | |
| if not isinstance(data, list) or not data or not isinstance(data[0], dict): | |
| message = f"merriam-webster {dictionary_type}: invalid response" | |
| log_message(message) | |
| return None, message | |
| entry = data[0] | |
| definition_list = entry.get("shortdef") | |
| part_of_speech = entry.get("fl", "") | |
| definition = "" | |
| if isinstance(definition_list, list) and definition_list: | |
| definition = definition_list[0] | |
| example = "" | |
| for def_section in entry.get("def", []): | |
| if not isinstance(def_section, dict): | |
| continue | |
| sseq = def_section.get("sseq") | |
| if not isinstance(sseq, list): | |
| continue | |
| for sense_seq in sseq: | |
| for sense_group in sense_seq: | |
| if len(sense_group) < 2: | |
| continue | |
| sense_data = sense_group[1] | |
| if not isinstance(sense_data, dict): | |
| continue | |
| for marker in sense_data.get("dt", []): | |
| if len(marker) < 2 or marker[0] != "vis": | |
| continue | |
| visuals = marker[1] | |
| if isinstance(visuals, list) and visuals: | |
| visual_entry = visuals[0] | |
| if isinstance(visual_entry, dict): | |
| text = visual_entry.get("t", "") | |
| if text: | |
| example = clean_example(text) | |
| break | |
| if example: | |
| break | |
| if example: | |
| break | |
| if example: | |
| break | |
| if not definition and not example: | |
| message = f"merriam-webster {dictionary_type}: missing definition" | |
| log_message(message) | |
| return None, message | |
| return ( | |
| DefinitionData( | |
| definition, | |
| example, | |
| part_of_speech, | |
| "", | |
| f"Merriam-Webster ({dictionary_type})", | |
| ), | |
| None, | |
| ) | |
| def get_definition_free_dictionary( | |
| word: str, | |
| ) -> Tuple[Optional[DefinitionData], Optional[str]]: | |
| FREE_DICTIONARY_RATE_LIMITER.acquire() | |
| url = f"https://api.dictionaryapi.dev/api/v2/entries/en/{urllib.parse.quote(word)}" | |
| data, err = fetch_url(url) | |
| if err: | |
| message = f"free dictionary: {err}" | |
| log_message(message) | |
| return None, message | |
| if not isinstance(data, list) or not data: | |
| message = "free dictionary: invalid response" | |
| log_message(message) | |
| return None, message | |
| entry = data[0] | |
| meanings = entry.get("meanings") | |
| if not isinstance(meanings, list): | |
| message = "free dictionary: missing meanings" | |
| log_message(message) | |
| return None, message | |
| best_definition = "" | |
| best_example = "" | |
| part_of_speech = "" | |
| for meaning in meanings: | |
| if not isinstance(meaning, dict): | |
| continue | |
| if not part_of_speech: | |
| part_of_speech = meaning.get("partOfSpeech", "") | |
| definitions = meaning.get("definitions") | |
| if not isinstance(definitions, list): | |
| continue | |
| for definition_entry in definitions: | |
| if not isinstance(definition_entry, dict): | |
| continue | |
| definition_text = definition_entry.get("definition", "") | |
| if definition_text and not best_definition: | |
| best_definition = definition_text | |
| example_text = definition_entry.get("example", "") | |
| if example_text: | |
| return ( | |
| DefinitionData( | |
| definition_text, | |
| example_text, | |
| part_of_speech, | |
| "", | |
| "Free Dictionary", | |
| ), | |
| None, | |
| ) | |
| if not best_definition and not best_example: | |
| message = "free dictionary: definition not found" | |
| log_message(message) | |
| return None, message | |
| return ( | |
| DefinitionData( | |
| best_definition, best_example, part_of_speech, "", "Free Dictionary" | |
| ), | |
| None, | |
| ) | |
| def get_definition_multiple_apis( | |
| word: str, | |
| wordsapi_key: Optional[str], | |
| merriam_key: Optional[str], | |
| ) -> Tuple[Optional[DefinitionData], List[str]]: | |
| errors: List[str] = [] | |
| for dictionary_type in ("collegiate", "learners"): | |
| definition, err = get_definition_merriam_webster( | |
| word, merriam_key, dictionary_type | |
| ) | |
| if definition: | |
| return definition, errors | |
| if err: | |
| errors.append(err) | |
| definition, err = get_definition_wordsapi(word, wordsapi_key) | |
| if definition: | |
| return definition, errors | |
| if err: | |
| errors.append(err) | |
| definition, err = get_definition_free_dictionary(word) | |
| if definition: | |
| return definition, errors | |
| if err: | |
| errors.append(err) | |
| return None, errors | |
| def ensure_backup_dir() -> Optional[str]: | |
| if os.path.exists(BACKUP_DIR): | |
| return None | |
| try: | |
| os.makedirs(BACKUP_DIR) | |
| except OSError as exc: | |
| return f"failed to create backup directory: {exc}" | |
| return None | |
| def fetch_words_from_kindle(db_path: str) -> Tuple[List[str], int, Optional[str]]: | |
| try: | |
| with sqlite3.connect(db_path) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "SELECT DISTINCT stem FROM WORDS WHERE lang = 'en' ORDER BY stem" | |
| ) | |
| rows = cursor.fetchall() | |
| except sqlite3.Error as exc: | |
| return [], 0, f"database error: {exc}" | |
| words = [] | |
| for row in rows: | |
| word = row[0].strip().lower() | |
| if len(word) >= 2: | |
| words.append(word) | |
| return words, len(rows), None | |
| def filter_new_words( | |
| words: List[str], processed_words: Dict[str, Dict[str, Any]] | |
| ) -> List[str]: | |
| return [word for word in words if word not in processed_words] | |
| def build_flashcard_back(data: DefinitionData) -> str: | |
| sections: List[str] = [] | |
| if data.definition: | |
| sections.append(f"**Definition:** {data.definition}") | |
| if data.example: | |
| sections.append(f"**Example:** {data.example}") | |
| if data.part_of_speech: | |
| sections.append(f"**Type:** {data.part_of_speech}") | |
| if data.synonyms: | |
| sections.append(f"**Synonyms:** {data.synonyms}") | |
| sections.append(f"*Source: {data.source}*") | |
| return "\n\n".join(sections) | |
| def update_processed_word( | |
| processed_words: Dict[str, Dict[str, Any]], | |
| word: str, | |
| data: Optional[DefinitionData], | |
| success: bool, | |
| ) -> None: | |
| processed_words[word] = { | |
| "processed_date": datetime.now().isoformat(), | |
| "source": data.source if success and data else "failed", | |
| "has_example": bool(data.example) if success and data else False, | |
| } | |
| def extract_new_vocab_to_csv( | |
| db_path: str, wordsapi_key: Optional[str], merriam_key: Optional[str] | |
| ) -> None: | |
| processed_words, load_err = load_processed_words() | |
| if load_err: | |
| log_message(load_err) | |
| words, total, fetch_err = fetch_words_from_kindle(db_path) | |
| if fetch_err: | |
| log_message(fetch_err) | |
| return | |
| new_words = filter_new_words(words, processed_words) | |
| if not new_words: | |
| log_message("No new words found.") | |
| log_message(f"Total words on Kindle: {total}") | |
| log_message(f"Already processed: {len(processed_words)}") | |
| return | |
| log_message(f"Total words on Kindle: {total}") | |
| log_message(f"Already processed: {len(processed_words)}") | |
| log_message(f"New words to process: {len(new_words)}") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_file = f"kindle_new_words_{timestamp}.csv" | |
| dir_err = ensure_backup_dir() | |
| if dir_err: | |
| log_message(dir_err) | |
| return | |
| processed_count = 0 | |
| examples_count = 0 | |
| failed_words: List[str] = [] | |
| try: | |
| with open(output_file, "w", newline="", encoding="utf-8") as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=["Front", "Back"]) | |
| writer.writeheader() | |
| for index, word in enumerate(new_words, start=1): | |
| log_message(f"Processing: {word} ({index}/{len(new_words)})") | |
| definition_data, errors = get_definition_multiple_apis( | |
| word, wordsapi_key, merriam_key | |
| ) | |
| if definition_data: | |
| writer.writerow( | |
| { | |
| "Front": word.title(), | |
| "Back": build_flashcard_back(definition_data), | |
| } | |
| ) | |
| processed_count += 1 | |
| if definition_data.example: | |
| examples_count += 1 | |
| update_processed_word(processed_words, word, definition_data, True) | |
| else: | |
| if errors: | |
| log_message("; ".join(errors)) | |
| writer.writerow( | |
| {"Front": word.title(), "Back": "Definition not found"} | |
| ) | |
| failed_words.append(word) | |
| update_processed_word(processed_words, word, None, False) | |
| time.sleep(0.5) | |
| if index % 25 == 0: | |
| save_err = save_processed_words(processed_words) | |
| if save_err: | |
| log_message(save_err) | |
| return | |
| log_message( | |
| f"Processed {processed_count} words, {examples_count} with examples" | |
| ) | |
| except OSError as exc: | |
| log_message(f"file error: {exc}") | |
| return | |
| save_err = save_processed_words(processed_words) | |
| if save_err: | |
| log_message(save_err) | |
| return | |
| if processed_count == 0: | |
| os.remove(output_file) | |
| log_message("No words were processed successfully.") | |
| return | |
| backup_file = os.path.join(BACKUP_DIR, output_file) | |
| try: | |
| os.rename(output_file, backup_file) | |
| except OSError as exc: | |
| log_message(f"failed to move CSV to backup: {exc}") | |
| log_message(f"CSV remains at {output_file}") | |
| return | |
| log_message("Processing complete.") | |
| log_message(f"CSV file: {backup_file}") | |
| log_message(f"Processed words: {processed_count}") | |
| log_message( | |
| f"With examples: {examples_count} ({examples_count / processed_count * 100:.1f}%)" | |
| ) | |
| if failed_words: | |
| log_message(f"Not found: {len(failed_words)}") | |
| log_message(f"Import '{backup_file}' into Mochi Cards.") | |
| def show_stats() -> None: | |
| processed_words, err = load_processed_words() | |
| if err: | |
| log_message(err) | |
| return | |
| if not processed_words: | |
| log_message("No words processed yet.") | |
| return | |
| total = len(processed_words) | |
| with_examples = sum( | |
| 1 for word in processed_words.values() if word.get("has_example") | |
| ) | |
| failed = sum( | |
| 1 for word in processed_words.values() if word.get("source") == "failed" | |
| ) | |
| log_message("Statistics") | |
| log_message("=" * 30) | |
| log_message(f"Total processed: {total}") | |
| log_message(f"With examples: {with_examples} ({with_examples / total * 100:.1f}%)") | |
| log_message(f"Failed: {failed}") | |
| log_message(f"Successful: {total - failed}") | |
| log_message("Sources:") | |
| sources: Dict[str, int] = {} | |
| for data in processed_words.values(): | |
| source = data.get("source", "unknown") | |
| sources[source] = sources.get(source, 0) + 1 | |
| for source, count in sources.items(): | |
| log_message(f" {source}: {count}") | |
| def reset_history() -> None: | |
| if not os.path.exists(PROCESSED_WORDS_FILE): | |
| log_message("No history to clear.") | |
| return | |
| try: | |
| os.remove(PROCESSED_WORDS_FILE) | |
| except OSError as exc: | |
| log_message(f"failed to remove history: {exc}") | |
| return | |
| log_message("History cleared.") | |
| def prompt_api_key(label: str, env_var: str) -> Optional[str]: | |
| env_value = os.getenv(env_var) | |
| if env_value: | |
| return env_value | |
| entered = input(f"Enter {label} (leave blank to skip): ").strip() | |
| return entered or None | |
| def locate_vocab_db(root_path: str) -> Optional[str]: | |
| for root, _, files in os.walk(root_path): | |
| if "vocab.db" in files: | |
| return os.path.join(root, "vocab.db") | |
| return None | |
| def main() -> None: | |
| log_message("Kindle Flashcards - Incremental Mode") | |
| log_message("=" * 50) | |
| log_message("Options:") | |
| log_message("1. Process new words") | |
| log_message("2. View statistics") | |
| log_message("3. Reset history") | |
| choice = input("Choose an option (1-3): ").strip() | |
| if choice == "2": | |
| show_stats() | |
| sys.exit(0) | |
| if choice == "3": | |
| confirmation = ( | |
| input("This will delete the history. Continue? (y/N): ").strip().lower() | |
| ) | |
| if confirmation == "y": | |
| reset_history() | |
| else: | |
| log_message("Operation cancelled.") | |
| sys.exit(0) | |
| wordsapi_key = prompt_api_key("WordsAPI key", "WORDSAPI_KEY") | |
| merriam_key = prompt_api_key("Merriam-Webster key", "MERRIAM_WEBSTER_KEY") | |
| if not wordsapi_key and not merriam_key: | |
| log_message("No premium keys provided. Using Free Dictionary only.") | |
| log_message(f"Searching for vocab.db in {KINDLE_PATH}...") | |
| vocab_db_path = locate_vocab_db(KINDLE_PATH) | |
| if not vocab_db_path: | |
| log_message( | |
| "vocab.db not found. Ensure the Kindle is mounted at /Volumes/Kindle." | |
| ) | |
| sys.exit(1) | |
| log_message(f"Found: {vocab_db_path}") | |
| log_message("Starting incremental processing...") | |
| extract_new_vocab_to_csv(vocab_db_path, wordsapi_key, merriam_key) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment