Last active
December 30, 2025 15:03
-
-
Save DiTo97/389846d6d82ae26150edf5a4c16350e1 to your computer and use it in GitHub Desktop.
CLI tool generating SVG maps of any country with Google Maps-style pin markers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = [ | |
| # "aiohttp", | |
| # "geopandas", | |
| # "geopy", | |
| # "matplotlib", | |
| # "rich", | |
| # "scipy", | |
| # "svgpath2mpl", | |
| # ] | |
| # /// | |
| """ | |
| mappamia | |
| ======== | |
| CLI tool generating SVG maps of any country with Google Maps-style pin markers for specific municipalities. | |
| """ | |
| import argparse | |
| import asyncio | |
| import hashlib | |
| import logging | |
| import sys | |
| from dataclasses import dataclass | |
| import geopandas | |
| import matplotlib.pyplot as plt | |
| from geopy.adapters import AioHTTPAdapter | |
| from geopy.exc import GeocoderTimedOut, GeocoderServiceError | |
| from geopy.geocoders import Nominatim | |
| from geopy.location import Location | |
| from matplotlib.transforms import Affine2D | |
| from rich.logging import RichHandler | |
| from svgpath2mpl import parse_path | |
| def get_printer() -> logging.Logger: | |
| def f(record: logging.LogRecord) -> bool: | |
| record.levelname = record.levelname.lower() | |
| return True | |
| handler = RichHandler() | |
| handler.addFilter(f) | |
| logging.basicConfig( | |
| datefmt="[%X]", | |
| force=True, | |
| format="%(message)s", | |
| handlers=[handler], | |
| level=logging.INFO, | |
| ) | |
| printer = logging.getLogger("rich") | |
| return printer | |
| SVG = """ | |
| M146.667,0C94.903,0,52.946,41.957,52.946,93.721c0,22.322,7.849,42.789,20.891,58.878 | |
| c4.204,5.178,11.237,13.331,14.903,18.906c21.109,32.069,48.19,78.643,56.082,116.864 | |
| c1.354,6.527,2.986,6.641,4.743,0.212 c5.629-20.609,20.228-65.639,50.377-112.757 | |
| c3.595-5.619,10.884-13.483,15.409-18.379c6.554-7.098,12.009-15.224,16.154-24.084 | |
| c5.651-12.086,8.882-25.466,8.882-39.629C240.387,41.962,198.43,0,146.667,0z | |
| M146.667,144.358 c-28.892,0-52.313-23.421-52.313-52.313c0-28.887,23.421-52.307,52.313-52.307 | |
| s52.313,23.421,52.313,52.307 C198.98,120.938,175.559,144.358,146.667,144.358z | |
| """ | |
| URL = "https://naturalearth.s3.amazonaws.com/110m_cultural/ne_110m_admin_0_countries.zip" | |
| printer = get_printer() | |
| class MunicipalityNotFoundException(Exception): | |
| """raises for municipality not found in Nominatim API""" | |
| @dataclass | |
| class Arguments: | |
| country: str | |
| municipalities: list[str] | |
| def __post_init__(self): | |
| if isinstance(self.municipalities, str): | |
| self.municipalities = sorted([_.strip() for _ in self.municipalities.split(",") if _.strip()]) | |
| def make_args() -> Arguments: | |
| parser = argparse.ArgumentParser(description="generating SVG maps of any country with Google Maps-style pin markers.") | |
| parser.add_argument("--country", required=True, type=str, help='country name (e.g. "Italy", "France", "UK")') | |
| parser.add_argument("--municipalities", required=True, type=str, help='comma-separated array of municipalities (e.g. "Milan,Rome")') | |
| args, _ = parser.parse_known_args() | |
| return Arguments(**vars(args)) | |
| async def geocode( | |
| semaphore: asyncio.Semaphore, geolocator: Nominatim, municipality: str, country: str | |
| ) -> Location: | |
| """async worker for geocoding a single municipality's coordinates within a specific country.""" | |
| async with semaphore: | |
| await asyncio.sleep(1.25) | |
| try: | |
| location = await geolocator.geocode("%s, %s" % (municipality, country)) | |
| except ( | |
| GeocoderServiceError, | |
| GeocoderTimedOut | |
| ): | |
| printer.error("❌ %s", municipality) | |
| raise | |
| if not location: | |
| printer.error("❌ %s not found in Nominatim API.", municipality) | |
| raise MunicipalityNotFoundException() | |
| printer.info("✔️ " + " got municipality: %s (%f, %f)", municipality, location.latitude, location.longitude) | |
| return location | |
| async def fetch_coordinates(municipalities: list[str], country: str) -> list[Location]: | |
| """fetches geo coordinates of multiple municipalities within a specific country.""" | |
| printer.info("🌍 starting async geocoding for %d municipalities in %s...", len(municipalities), country) | |
| async with Nominatim(user_agent="mappamia", adapter_factory=AioHTTPAdapter) as geolocator: | |
| # Nominatim free API has a strict RPS policy | |
| # The policy is not as stringent on commercial plans | |
| semaphore = asyncio.Semaphore(1) | |
| tasks = [geocode(semaphore, geolocator, municipality, country) for municipality in municipalities] | |
| responses = await asyncio.gather(*tasks) | |
| return responses | |
| def fetch_geometry(country: str) -> geopandas.GeoDataFrame: | |
| """fetches country geometry from Natural Earth 1:110m dataset.""" | |
| dataframe = geopandas.read_file(URL) | |
| dataframe = dataframe[dataframe.NAME == country] # TODO: approximate string matching | |
| if dataframe.empty: | |
| raise ValueError("%s not found in atlas dataset." % country) | |
| return dataframe | |
| def compute_dynamic_marker_size(geometry: geopandas.GeoDataFrame) -> int: | |
| """computes the marker size dinamically based on the size of a country geometry | |
| The scaling is set according to the size of the reference marker in Italy. | |
| """ | |
| XYXY = geometry.total_bounds | |
| size = (XYXY[2] - XYXY[0]) * (XYXY[3] - XYXY[1]) | |
| if size < 0: return 1500 | |
| scaling = (123.1 / size) ** 0.5 | |
| return max(200, min(5000, int(1500 * (scaling)))) | |
| def compute_arrayhash(strings: list[str]) -> str: | |
| """computes a byte length-aware hash for an array of strings. | |
| The hash will prepend each string with its byte length to avoid collisions. | |
| Examples: | |
| >>> compute_arrayhash(["AB", "C"]) != compute_arrayhash(["A", "BC"]) | |
| True | |
| """ | |
| hasher = hashlib.sha256() | |
| def f(string: str): | |
| encoding = string.encode('utf-8') | |
| hasher.update(f"{len(encoding)}:".encode("utf-8")) | |
| hasher.update(encoding) | |
| for _ in strings: f(_) | |
| return hasher.hexdigest() | |
| def make_map_figure(geometry: geopandas.GeoDataFrame, coordinates: list[Location], filepath: str): | |
| """generates and saves a high-quality SVG map for a specific geometry""" | |
| marker = parse_path(SVG) | |
| transform = Affine2D().translate(-146.667, -293.334).scale(1, -1).scale(0.003333) | |
| marker = marker.transformed(transform) | |
| _, ax = plt.subplots(figsize=(10, 10)) | |
| ax.set_axis_off() | |
| geometry.plot(ax=ax, color="#E0E0E0", edgecolor="white", zorder=1) | |
| for location in coordinates: | |
| ax.scatter( | |
| location.longitude, | |
| location.latitude, | |
| marker=marker, | |
| s=compute_dynamic_marker_size(geometry), | |
| color="#B50B15", | |
| zorder=2, | |
| linewidth=0 | |
| ) | |
| plt.savefig(filepath, format="svg", bbox_inches="tight") | |
| async def main(): | |
| args = make_args() | |
| if not args.municipalities: | |
| raise ValueError("municipalities must be non-empty.") | |
| geometry = fetch_geometry(args.country) | |
| filepath = "%s-map.svg" % compute_arrayhash(args.municipalities + [args.country])[:16] | |
| coordinates = await fetch_coordinates(args.municipalities, args.country) | |
| make_map_figure(geometry, coordinates, filepath) | |
| printer.info("✨ I'm done! %s", filepath) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment