Last active
April 10, 2025 16:23
-
-
Save goabonga/925271ca00c8dccd7f8cbd3a80ccbc22 to your computer and use it in GitHub Desktop.
chromadb & ollama & embedings & cities1000
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import zipfile | |
| import httpx | |
| import asyncio | |
| import chromadb | |
| from tqdm import tqdm | |
| CHROMA_DIR = "./chroma" | |
| EXTRACT_DIR = "./data" | |
| ZIP_PATH = "cities1000.zip" | |
| ZIP_URL = "https://download.geonames.org/export/dump/cities1000.zip" | |
| client_http: httpx.AsyncClient = httpx.AsyncClient(timeout=None) | |
| client = chromadb.PersistentClient(path=CHROMA_DIR) | |
| collection = client.get_or_create_collection(name="city_timezone") | |
| semaphore = asyncio.Semaphore(10) # Pour limiter à 10 requêtes simultanées | |
| async def get_embedding(text: str) -> list[float]: | |
| response = await client_http.post("http://localhost:11434/api/embeddings", json={ | |
| "model": "nomic-embed-text", | |
| "prompt": text, | |
| }) | |
| response.raise_for_status() | |
| return response.json()["embedding"] | |
| async def ingest_entry(id: str, content: str, metadata: dict, bar: tqdm) -> None: | |
| async with semaphore: | |
| embedding = await get_embedding(content) | |
| collection.add( | |
| ids=[id], | |
| documents=[content], | |
| metadatas=[metadata], | |
| embeddings=[embedding] | |
| ) | |
| bar.update(1) | |
| def download_and_extract_cities(url: str, zip_path: str, extract_to: str) -> str: | |
| with httpx.stream("GET", url) as response: | |
| response.raise_for_status() | |
| with open(zip_path, "wb") as f: | |
| for chunk in response.iter_bytes(): | |
| f.write(chunk) | |
| with zipfile.ZipFile(zip_path, "r") as zip_ref: | |
| zip_ref.extractall(extract_to) | |
| return os.path.join(extract_to, "cities1000.txt") | |
| async def parse_and_push_to_chromadb(file_path: str) -> None: | |
| total_lines = sum(1 for _ in open(file_path, "r", encoding="utf-8")) | |
| tasks = [] | |
| with tqdm(total=total_lines, desc="Indexing cities") as bar: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| parts = line.strip().split("\t") | |
| if len(parts) < 18: | |
| bar.update(1) | |
| continue | |
| geonameid = parts[0] | |
| name = parts[1] | |
| country_code = parts[8] | |
| latitude = float(parts[4]) | |
| longitude = float(parts[5]) | |
| timezone = parts[17] | |
| content = f"{name}, {country_code}" | |
| metadata = { | |
| "city": name, | |
| "country": country_code, | |
| "latitude": latitude, | |
| "longitude": longitude, | |
| "timezone": timezone | |
| } | |
| tasks.append(ingest_entry(geonameid, content, metadata, bar)) | |
| await asyncio.gather(*tasks) | |
| async def main() -> None: | |
| os.makedirs(EXTRACT_DIR, exist_ok=True) | |
| os.makedirs(CHROMA_DIR, exist_ok=True) | |
| txt_path = download_and_extract_cities(ZIP_URL, ZIP_PATH, EXTRACT_DIR) | |
| await parse_and_push_to_chromadb(txt_path) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment