Skip to content

Instantly share code, notes, and snippets.

@goabonga
Last active April 10, 2025 16:23
Show Gist options
  • Select an option

  • Save goabonga/925271ca00c8dccd7f8cbd3a80ccbc22 to your computer and use it in GitHub Desktop.

Select an option

Save goabonga/925271ca00c8dccd7f8cbd3a80ccbc22 to your computer and use it in GitHub Desktop.
chromadb & ollama & embedings & cities1000
import os
import zipfile
import httpx
import asyncio
import chromadb
from tqdm import tqdm
CHROMA_DIR = "./chroma"
EXTRACT_DIR = "./data"
ZIP_PATH = "cities1000.zip"
ZIP_URL = "https://download.geonames.org/export/dump/cities1000.zip"
client_http: httpx.AsyncClient = httpx.AsyncClient(timeout=None)
client = chromadb.PersistentClient(path=CHROMA_DIR)
collection = client.get_or_create_collection(name="city_timezone")
semaphore = asyncio.Semaphore(10) # Pour limiter à 10 requêtes simultanées
async def get_embedding(text: str) -> list[float]:
response = await client_http.post("http://localhost:11434/api/embeddings", json={
"model": "nomic-embed-text",
"prompt": text,
})
response.raise_for_status()
return response.json()["embedding"]
async def ingest_entry(id: str, content: str, metadata: dict, bar: tqdm) -> None:
async with semaphore:
embedding = await get_embedding(content)
collection.add(
ids=[id],
documents=[content],
metadatas=[metadata],
embeddings=[embedding]
)
bar.update(1)
def download_and_extract_cities(url: str, zip_path: str, extract_to: str) -> str:
with httpx.stream("GET", url) as response:
response.raise_for_status()
with open(zip_path, "wb") as f:
for chunk in response.iter_bytes():
f.write(chunk)
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_to)
return os.path.join(extract_to, "cities1000.txt")
async def parse_and_push_to_chromadb(file_path: str) -> None:
total_lines = sum(1 for _ in open(file_path, "r", encoding="utf-8"))
tasks = []
with tqdm(total=total_lines, desc="Indexing cities") as bar:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
parts = line.strip().split("\t")
if len(parts) < 18:
bar.update(1)
continue
geonameid = parts[0]
name = parts[1]
country_code = parts[8]
latitude = float(parts[4])
longitude = float(parts[5])
timezone = parts[17]
content = f"{name}, {country_code}"
metadata = {
"city": name,
"country": country_code,
"latitude": latitude,
"longitude": longitude,
"timezone": timezone
}
tasks.append(ingest_entry(geonameid, content, metadata, bar))
await asyncio.gather(*tasks)
async def main() -> None:
os.makedirs(EXTRACT_DIR, exist_ok=True)
os.makedirs(CHROMA_DIR, exist_ok=True)
txt_path = download_and_extract_cities(ZIP_URL, ZIP_PATH, EXTRACT_DIR)
await parse_and_push_to_chromadb(txt_path)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment