Last active
March 6, 2026 12:58
-
-
Save davidefiocco/7d83e6b18eab03de5c4d5ce3051bf916 to your computer and use it in GitHub Desktop.
Track a Tapo RV30 vacuum on a live map in real time
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.11" | |
| # dependencies = [ | |
| # "python-kasa", | |
| # "lz4", | |
| # "pillow", | |
| # ] | |
| # /// | |
| # Requires the room-cleaning branch for clean.current_map_id: | |
| # pip install git+https://github.com/davidefiocco/python-kasa@feat/room-cleaning-improvements | |
| """Live map tracker for Tapo robot vacuums. | |
| Connects to a Tapo RV30 (or compatible) vacuum, renders the floor map, | |
| and shows the vacuum position updating in real time. | |
| Usage (local): | |
| uv run live_map.py --host 192.168.1.42 | |
| Usage (from Gist): | |
| uv run https://gist.githubusercontent.com/<user>/<id>/raw/live_map.py \\ | |
| --host 192.168.1.42 --username you@example.com --password secret | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import base64 | |
| import math | |
| import os | |
| import threading | |
| import tkinter as tk | |
| from collections import defaultdict | |
| from typing import Any | |
| import lz4.block | |
| from PIL import Image, ImageDraw, ImageFont, ImageTk | |
| from kasa import Discover, Module | |
| SCALE = 3 | |
| ROOM_PALETTE = [ | |
| (130, 180, 230), | |
| (230, 160, 130), | |
| (160, 210, 160), | |
| (210, 180, 230), | |
| (230, 220, 140), | |
| (180, 220, 220), | |
| (220, 170, 190), | |
| (190, 200, 160), | |
| (170, 190, 230), | |
| (230, 190, 170), | |
| ] | |
| WALL_COLOR = (60, 60, 60) | |
| UNEXPLORED_COLOR = (220, 220, 220) | |
| CLEAN_COLOR = (245, 245, 245) | |
| VACUUM_COLOR = (220, 40, 40) | |
| DOCK_COLOR = (0, 170, 0) | |
| TRAIL_COLOR = (255, 120, 120, 180) | |
| FORBID_COLOR = (255, 60, 60, 60) | |
| FORBID_BORDER = (200, 40, 40) | |
| VIRTUAL_WALL_COLOR = (200, 40, 40) | |
| # --------------------------------------------------------------------------- | |
| # Map decoding | |
| # --------------------------------------------------------------------------- | |
| def _parse_rooms(area_list: list[dict]) -> dict[int, str]: | |
| """Return {room_id: decoded_name} from the area_list entries.""" | |
| rooms: dict[int, str] = {} | |
| for entry in area_list: | |
| if entry.get("type") != "room": | |
| continue | |
| rid = entry["id"] | |
| raw_name = entry.get("name", "") | |
| try: | |
| name = base64.b64decode(raw_name).decode() | |
| except Exception: | |
| name = raw_name or f"Room {rid}" | |
| rooms[rid] = name | |
| return rooms | |
| def _compute_centroids( | |
| pixels: bytes, w: int, h: int, auto_lo: int, auto_hi: int | |
| ) -> dict[int, tuple[int, int]]: | |
| """Return {room_id: (cx, cy)} pixel centroids for each room.""" | |
| sums: dict[int, list[int]] = defaultdict(lambda: [0, 0, 0]) | |
| for y in range(h): | |
| off = y * w | |
| for x in range(w): | |
| v = pixels[off + x] | |
| if auto_lo < v <= auto_hi: | |
| s = sums[v] | |
| s[0] += x | |
| s[1] += y | |
| s[2] += 1 | |
| return { | |
| rid: (s[0] // s[2], s[1] // s[2]) | |
| for rid, s in sums.items() | |
| if s[2] > 0 | |
| } | |
| def _real_to_display( | |
| real_x: int, real_y: int, | |
| real_origin: list[int], resolution: int, img_w: int, | |
| ) -> tuple[int, int]: | |
| """Convert real-world mm coordinates to display pixel coordinates.""" | |
| px = (real_x - real_origin[0]) / resolution | |
| py = (real_y - real_origin[1]) / resolution | |
| return int(img_w - 1 - px * SCALE), int(py * SCALE) | |
| def _draw_zones( | |
| img: Image.Image, | |
| area_list: list[dict], | |
| real_origin: list[int], | |
| resolution: int, | |
| ) -> None: | |
| """Draw no-go zones and virtual walls on the map image.""" | |
| overlay = Image.new("RGBA", img.size, (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(overlay) | |
| img_w = img.width | |
| for area in area_list: | |
| atype = area.get("type") | |
| vertexs = area.get("vertexs") | |
| if not vertexs: | |
| continue | |
| points = [ | |
| _real_to_display(v[0], v[1], real_origin, resolution, img_w) | |
| for v in vertexs | |
| ] | |
| if atype == "forbid": | |
| draw.polygon(points, fill=FORBID_COLOR, outline=FORBID_BORDER) | |
| elif atype == "virtual_wall" and len(points) >= 2: | |
| draw.line(points, fill=VIRTUAL_WALL_COLOR, width=2) | |
| img.paste(Image.alpha_composite(img.convert("RGBA"), overlay).convert("RGB")) | |
| def decode_map(data: dict[str, Any]) -> Image.Image: | |
| """Decode LZ4-compressed map pixels into a scaled Pillow RGB image with room labels.""" | |
| raw = base64.b64decode(data["map_data"]) | |
| w, h = data["width"], data["height"] | |
| pix_len = data["pix_len"] | |
| bit = data["bit_list"] | |
| pixels = lz4.block.decompress(raw, uncompressed_size=pix_len) | |
| auto_lo, auto_hi = bit["auto_area"] | |
| barrier = bit["barrier"] | |
| none_val = bit["none"] | |
| clean_val = bit["clean"] | |
| img = Image.new("RGB", (w, h), UNEXPLORED_COLOR) | |
| px = img.load() | |
| for y in range(h): | |
| row = y * w | |
| for x in range(w): | |
| v = pixels[row + x] | |
| if v == barrier: | |
| px[x, y] = WALL_COLOR | |
| elif v == none_val: | |
| pass # already UNEXPLORED_COLOR | |
| elif v == clean_val: | |
| px[x, y] = CLEAN_COLOR | |
| elif auto_lo < v <= auto_hi: | |
| px[x, y] = ROOM_PALETTE[(v - 1) % len(ROOM_PALETTE)] | |
| img = img.resize((w * SCALE, h * SCALE), Image.NEAREST) | |
| img = img.transpose(Image.FLIP_LEFT_RIGHT) | |
| real_origin = data.get("real_origin_coor", [0, 0, 0]) | |
| resolution = data.get("resolution", 50) | |
| _draw_zones(img, data.get("area_list", []), real_origin, resolution) | |
| rooms = _parse_rooms(data.get("area_list", [])) | |
| centroids = _compute_centroids(pixels, w, h, auto_lo, auto_hi) | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| font = ImageFont.truetype("arial", 11) | |
| except OSError: | |
| font = ImageFont.load_default() | |
| img_w = img.width | |
| for rid, (cx, cy) in centroids.items(): | |
| name = rooms.get(rid, f"Room {rid}") | |
| sx, sy = img_w - 1 - cx * SCALE, cy * SCALE | |
| for dx in (-1, 0, 1): | |
| for dy in (-1, 0, 1): | |
| if dx or dy: | |
| draw.text( | |
| (sx + dx, sy + dy), name, fill=(40, 40, 40), | |
| font=font, anchor="mm", | |
| ) | |
| draw.text((sx, sy), name, fill=(255, 255, 255), font=font, anchor="mm") | |
| return img | |
| # --------------------------------------------------------------------------- | |
| # Overlay drawing (called every poll) | |
| # --------------------------------------------------------------------------- | |
| def draw_overlay( | |
| base: Image.Image, | |
| data: dict[str, Any], | |
| trail: list[tuple[int, int]], | |
| ) -> Image.Image: | |
| """Draw vacuum marker, dock icon, and position trail on the base map.""" | |
| img = base.copy() | |
| draw = ImageDraw.Draw(img) | |
| img_w = img.width | |
| vac = data.get("vac_coor", [0, 0, 0]) | |
| charge = data.get("charge_coor", [0, 0, 0]) | |
| vx, vy, va = img_w - 1 - vac[0] * SCALE, vac[1] * SCALE, 180 - vac[2] | |
| cx, cy = img_w - 1 - charge[0] * SCALE, charge[1] * SCALE | |
| for tx, ty in trail: | |
| draw.ellipse([tx - 1, ty - 1, tx + 1, ty + 1], fill=TRAIL_COLOR) | |
| # Dock marker | |
| s = 4 | |
| draw.rectangle([cx - s, cy - s, cx + s, cy + s], fill=DOCK_COLOR) | |
| # Vacuum marker | |
| r = 6 | |
| draw.ellipse([vx - r, vy - r, vx + r, vy + r], fill=VACUUM_COLOR) | |
| angle = math.radians(va) | |
| ax, ay = vx + 10 * math.cos(angle), vy + 10 * math.sin(angle) | |
| draw.line([(vx, vy), (ax, ay)], fill="white", width=2) | |
| return img | |
| # --------------------------------------------------------------------------- | |
| # Tkinter application | |
| # --------------------------------------------------------------------------- | |
| class LiveMapApp: | |
| """Tkinter window with an asyncio background loop for device polling.""" | |
| def __init__( | |
| self, | |
| root: tk.Tk, | |
| host: str, | |
| username: str, | |
| password: str, | |
| interval: float, | |
| ) -> None: | |
| self.root = root | |
| self.host = host | |
| self.username = username | |
| self.password = password | |
| self.interval = interval | |
| self.base_map: Image.Image | None = None | |
| self.trail: list[tuple[int, int]] = [] | |
| self._tk_img: ImageTk.PhotoImage | None = None | |
| self._dev: Any = None | |
| self._clean: Any = None | |
| self._map_id: int = 0 | |
| self._polls_since_base_refresh: int = 0 | |
| self._base_refresh_every: int = max(1, int(20 / interval)) | |
| self._loop = asyncio.new_event_loop() | |
| self._thread = threading.Thread(target=self._loop.run_forever, daemon=True) | |
| self._thread.start() | |
| self.root.title("Tapo Vacuum \u2014 Live Map") | |
| self.root.protocol("WM_DELETE_WINDOW", self._on_close) | |
| self._canvas = tk.Canvas(root, bg="#ddd") | |
| self._canvas.pack(fill=tk.BOTH, expand=True) | |
| self._status = tk.Label(root, text="Connecting\u2026", anchor="w", padx=6) | |
| self._status.pack(fill=tk.X) | |
| self._schedule(self._connect()) | |
| # -- async helpers ----------------------------------------------------- | |
| def _schedule(self, coro: Any) -> None: | |
| fut = asyncio.run_coroutine_threadsafe(coro, self._loop) | |
| fut.add_done_callback( | |
| lambda f: self.root.after(0, self._on_result, f) | |
| ) | |
| def _on_result(self, fut: asyncio.Future) -> None: # type: ignore[type-arg] | |
| try: | |
| data = fut.result() | |
| except Exception as exc: | |
| self._status.config(text=f"Error: {exc}") | |
| self.root.after(int(self.interval * 1000), self._poll_next) | |
| return | |
| if data is not None: | |
| self._update_map(data) | |
| # -- coroutines (background thread) ------------------------------------ | |
| async def _connect(self) -> dict[str, Any]: | |
| dev = await Discover.discover_single( | |
| self.host, username=self.username, password=self.password | |
| ) | |
| await dev.update() | |
| clean = dev.modules.get(Module.Clean) | |
| if clean is None: | |
| raise RuntimeError(f"Device at {self.host} has no Clean module") | |
| self._dev = dev | |
| self._clean = clean | |
| self._map_id = clean.current_map_id | |
| resp = await clean.call("getMapData", {"map_id": self._map_id, "type": 0}) | |
| return resp["getMapData"] | |
| async def _poll(self) -> dict[str, Any]: | |
| await self._dev.update() | |
| self._map_id = self._clean.current_map_id | |
| resp = await self._clean.call( | |
| "getMapData", {"map_id": self._map_id, "type": 0} | |
| ) | |
| return resp["getMapData"] | |
| # -- UI (main thread) -------------------------------------------------- | |
| def _update_map(self, data: dict[str, Any]) -> None: | |
| self._polls_since_base_refresh += 1 | |
| refresh_base = ( | |
| self.base_map is None | |
| or self._polls_since_base_refresh >= self._base_refresh_every | |
| ) | |
| if refresh_base: | |
| self._polls_since_base_refresh = 0 | |
| self.base_map = decode_map(data) | |
| w, h = data["width"], data["height"] | |
| self._status.config(text=f"Map loaded ({w}\u00d7{h}, {w*SCALE}\u00d7{h*SCALE} scaled)") | |
| self.root.geometry(f"{w * SCALE}x{h * SCALE + 24}") | |
| vac = data.get("vac_coor", [0, 0, 0]) | |
| img_w = self.base_map.width | |
| self.trail.append((img_w - 1 - vac[0] * SCALE, vac[1] * SCALE)) | |
| img = draw_overlay(self.base_map, data, self.trail) | |
| self._tk_img = ImageTk.PhotoImage(img) | |
| self._canvas.config(width=img.width, height=img.height) | |
| self._canvas.delete("all") | |
| self._canvas.create_image(0, 0, anchor=tk.NW, image=self._tk_img) | |
| real = data.get("real_vac_coor", vac) | |
| self._status.config( | |
| text=f"Vacuum at ({real[0]}, {real[1]}) angle {real[2]}\u00b0" | |
| ) | |
| self.root.after(int(self.interval * 1000), self._poll_next) | |
| def _poll_next(self) -> None: | |
| if self._clean is not None: | |
| self._schedule(self._poll()) | |
| def _on_close(self) -> None: | |
| self._loop.call_soon_threadsafe(self._loop.stop) | |
| self.root.destroy() | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Live map tracker for Tapo robot vacuums" | |
| ) | |
| parser.add_argument("--host", required=True, help="Device IP address") | |
| parser.add_argument( | |
| "--username", | |
| default=os.environ.get("KASA_USERNAME"), | |
| help="TP-Link account email (default: $KASA_USERNAME)", | |
| ) | |
| parser.add_argument( | |
| "--password", | |
| default=os.environ.get("KASA_PASSWORD"), | |
| help="TP-Link account password (default: $KASA_PASSWORD)", | |
| ) | |
| parser.add_argument( | |
| "--interval", | |
| type=float, | |
| default=2.0, | |
| help="Seconds between position polls (default: 2)", | |
| ) | |
| args = parser.parse_args() | |
| if not args.username or not args.password: | |
| parser.error( | |
| "Credentials required via --username/--password" | |
| " or KASA_USERNAME/KASA_PASSWORD env vars" | |
| ) | |
| root = tk.Tk() | |
| LiveMapApp(root, args.host, args.username, args.password, args.interval) | |
| root.mainloop() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment