Skip to content

Instantly share code, notes, and snippets.

@davidefiocco
Last active March 6, 2026 12:58
Show Gist options
  • Select an option

  • Save davidefiocco/7d83e6b18eab03de5c4d5ce3051bf916 to your computer and use it in GitHub Desktop.

Select an option

Save davidefiocco/7d83e6b18eab03de5c4d5ce3051bf916 to your computer and use it in GitHub Desktop.
Track a Tapo RV30 vacuum on a live map in real time
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "python-kasa",
# "lz4",
# "pillow",
# ]
# ///
# Requires the room-cleaning branch for clean.current_map_id:
# pip install git+https://github.com/davidefiocco/python-kasa@feat/room-cleaning-improvements
"""Live map tracker for Tapo robot vacuums.
Connects to a Tapo RV30 (or compatible) vacuum, renders the floor map,
and shows the vacuum position updating in real time.
Usage (local):
uv run live_map.py --host 192.168.1.42
Usage (from Gist):
uv run https://gist.githubusercontent.com/<user>/<id>/raw/live_map.py \\
--host 192.168.1.42 --username you@example.com --password secret
"""
from __future__ import annotations
import argparse
import asyncio
import base64
import math
import os
import threading
import tkinter as tk
from collections import defaultdict
from typing import Any
import lz4.block
from PIL import Image, ImageDraw, ImageFont, ImageTk
from kasa import Discover, Module
SCALE = 3
ROOM_PALETTE = [
(130, 180, 230),
(230, 160, 130),
(160, 210, 160),
(210, 180, 230),
(230, 220, 140),
(180, 220, 220),
(220, 170, 190),
(190, 200, 160),
(170, 190, 230),
(230, 190, 170),
]
WALL_COLOR = (60, 60, 60)
UNEXPLORED_COLOR = (220, 220, 220)
CLEAN_COLOR = (245, 245, 245)
VACUUM_COLOR = (220, 40, 40)
DOCK_COLOR = (0, 170, 0)
TRAIL_COLOR = (255, 120, 120, 180)
FORBID_COLOR = (255, 60, 60, 60)
FORBID_BORDER = (200, 40, 40)
VIRTUAL_WALL_COLOR = (200, 40, 40)
# ---------------------------------------------------------------------------
# Map decoding
# ---------------------------------------------------------------------------
def _parse_rooms(area_list: list[dict]) -> dict[int, str]:
"""Return {room_id: decoded_name} from the area_list entries."""
rooms: dict[int, str] = {}
for entry in area_list:
if entry.get("type") != "room":
continue
rid = entry["id"]
raw_name = entry.get("name", "")
try:
name = base64.b64decode(raw_name).decode()
except Exception:
name = raw_name or f"Room {rid}"
rooms[rid] = name
return rooms
def _compute_centroids(
pixels: bytes, w: int, h: int, auto_lo: int, auto_hi: int
) -> dict[int, tuple[int, int]]:
"""Return {room_id: (cx, cy)} pixel centroids for each room."""
sums: dict[int, list[int]] = defaultdict(lambda: [0, 0, 0])
for y in range(h):
off = y * w
for x in range(w):
v = pixels[off + x]
if auto_lo < v <= auto_hi:
s = sums[v]
s[0] += x
s[1] += y
s[2] += 1
return {
rid: (s[0] // s[2], s[1] // s[2])
for rid, s in sums.items()
if s[2] > 0
}
def _real_to_display(
real_x: int, real_y: int,
real_origin: list[int], resolution: int, img_w: int,
) -> tuple[int, int]:
"""Convert real-world mm coordinates to display pixel coordinates."""
px = (real_x - real_origin[0]) / resolution
py = (real_y - real_origin[1]) / resolution
return int(img_w - 1 - px * SCALE), int(py * SCALE)
def _draw_zones(
img: Image.Image,
area_list: list[dict],
real_origin: list[int],
resolution: int,
) -> None:
"""Draw no-go zones and virtual walls on the map image."""
overlay = Image.new("RGBA", img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
img_w = img.width
for area in area_list:
atype = area.get("type")
vertexs = area.get("vertexs")
if not vertexs:
continue
points = [
_real_to_display(v[0], v[1], real_origin, resolution, img_w)
for v in vertexs
]
if atype == "forbid":
draw.polygon(points, fill=FORBID_COLOR, outline=FORBID_BORDER)
elif atype == "virtual_wall" and len(points) >= 2:
draw.line(points, fill=VIRTUAL_WALL_COLOR, width=2)
img.paste(Image.alpha_composite(img.convert("RGBA"), overlay).convert("RGB"))
def decode_map(data: dict[str, Any]) -> Image.Image:
"""Decode LZ4-compressed map pixels into a scaled Pillow RGB image with room labels."""
raw = base64.b64decode(data["map_data"])
w, h = data["width"], data["height"]
pix_len = data["pix_len"]
bit = data["bit_list"]
pixels = lz4.block.decompress(raw, uncompressed_size=pix_len)
auto_lo, auto_hi = bit["auto_area"]
barrier = bit["barrier"]
none_val = bit["none"]
clean_val = bit["clean"]
img = Image.new("RGB", (w, h), UNEXPLORED_COLOR)
px = img.load()
for y in range(h):
row = y * w
for x in range(w):
v = pixels[row + x]
if v == barrier:
px[x, y] = WALL_COLOR
elif v == none_val:
pass # already UNEXPLORED_COLOR
elif v == clean_val:
px[x, y] = CLEAN_COLOR
elif auto_lo < v <= auto_hi:
px[x, y] = ROOM_PALETTE[(v - 1) % len(ROOM_PALETTE)]
img = img.resize((w * SCALE, h * SCALE), Image.NEAREST)
img = img.transpose(Image.FLIP_LEFT_RIGHT)
real_origin = data.get("real_origin_coor", [0, 0, 0])
resolution = data.get("resolution", 50)
_draw_zones(img, data.get("area_list", []), real_origin, resolution)
rooms = _parse_rooms(data.get("area_list", []))
centroids = _compute_centroids(pixels, w, h, auto_lo, auto_hi)
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype("arial", 11)
except OSError:
font = ImageFont.load_default()
img_w = img.width
for rid, (cx, cy) in centroids.items():
name = rooms.get(rid, f"Room {rid}")
sx, sy = img_w - 1 - cx * SCALE, cy * SCALE
for dx in (-1, 0, 1):
for dy in (-1, 0, 1):
if dx or dy:
draw.text(
(sx + dx, sy + dy), name, fill=(40, 40, 40),
font=font, anchor="mm",
)
draw.text((sx, sy), name, fill=(255, 255, 255), font=font, anchor="mm")
return img
# ---------------------------------------------------------------------------
# Overlay drawing (called every poll)
# ---------------------------------------------------------------------------
def draw_overlay(
base: Image.Image,
data: dict[str, Any],
trail: list[tuple[int, int]],
) -> Image.Image:
"""Draw vacuum marker, dock icon, and position trail on the base map."""
img = base.copy()
draw = ImageDraw.Draw(img)
img_w = img.width
vac = data.get("vac_coor", [0, 0, 0])
charge = data.get("charge_coor", [0, 0, 0])
vx, vy, va = img_w - 1 - vac[0] * SCALE, vac[1] * SCALE, 180 - vac[2]
cx, cy = img_w - 1 - charge[0] * SCALE, charge[1] * SCALE
for tx, ty in trail:
draw.ellipse([tx - 1, ty - 1, tx + 1, ty + 1], fill=TRAIL_COLOR)
# Dock marker
s = 4
draw.rectangle([cx - s, cy - s, cx + s, cy + s], fill=DOCK_COLOR)
# Vacuum marker
r = 6
draw.ellipse([vx - r, vy - r, vx + r, vy + r], fill=VACUUM_COLOR)
angle = math.radians(va)
ax, ay = vx + 10 * math.cos(angle), vy + 10 * math.sin(angle)
draw.line([(vx, vy), (ax, ay)], fill="white", width=2)
return img
# ---------------------------------------------------------------------------
# Tkinter application
# ---------------------------------------------------------------------------
class LiveMapApp:
"""Tkinter window with an asyncio background loop for device polling."""
def __init__(
self,
root: tk.Tk,
host: str,
username: str,
password: str,
interval: float,
) -> None:
self.root = root
self.host = host
self.username = username
self.password = password
self.interval = interval
self.base_map: Image.Image | None = None
self.trail: list[tuple[int, int]] = []
self._tk_img: ImageTk.PhotoImage | None = None
self._dev: Any = None
self._clean: Any = None
self._map_id: int = 0
self._polls_since_base_refresh: int = 0
self._base_refresh_every: int = max(1, int(20 / interval))
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._loop.run_forever, daemon=True)
self._thread.start()
self.root.title("Tapo Vacuum \u2014 Live Map")
self.root.protocol("WM_DELETE_WINDOW", self._on_close)
self._canvas = tk.Canvas(root, bg="#ddd")
self._canvas.pack(fill=tk.BOTH, expand=True)
self._status = tk.Label(root, text="Connecting\u2026", anchor="w", padx=6)
self._status.pack(fill=tk.X)
self._schedule(self._connect())
# -- async helpers -----------------------------------------------------
def _schedule(self, coro: Any) -> None:
fut = asyncio.run_coroutine_threadsafe(coro, self._loop)
fut.add_done_callback(
lambda f: self.root.after(0, self._on_result, f)
)
def _on_result(self, fut: asyncio.Future) -> None: # type: ignore[type-arg]
try:
data = fut.result()
except Exception as exc:
self._status.config(text=f"Error: {exc}")
self.root.after(int(self.interval * 1000), self._poll_next)
return
if data is not None:
self._update_map(data)
# -- coroutines (background thread) ------------------------------------
async def _connect(self) -> dict[str, Any]:
dev = await Discover.discover_single(
self.host, username=self.username, password=self.password
)
await dev.update()
clean = dev.modules.get(Module.Clean)
if clean is None:
raise RuntimeError(f"Device at {self.host} has no Clean module")
self._dev = dev
self._clean = clean
self._map_id = clean.current_map_id
resp = await clean.call("getMapData", {"map_id": self._map_id, "type": 0})
return resp["getMapData"]
async def _poll(self) -> dict[str, Any]:
await self._dev.update()
self._map_id = self._clean.current_map_id
resp = await self._clean.call(
"getMapData", {"map_id": self._map_id, "type": 0}
)
return resp["getMapData"]
# -- UI (main thread) --------------------------------------------------
def _update_map(self, data: dict[str, Any]) -> None:
self._polls_since_base_refresh += 1
refresh_base = (
self.base_map is None
or self._polls_since_base_refresh >= self._base_refresh_every
)
if refresh_base:
self._polls_since_base_refresh = 0
self.base_map = decode_map(data)
w, h = data["width"], data["height"]
self._status.config(text=f"Map loaded ({w}\u00d7{h}, {w*SCALE}\u00d7{h*SCALE} scaled)")
self.root.geometry(f"{w * SCALE}x{h * SCALE + 24}")
vac = data.get("vac_coor", [0, 0, 0])
img_w = self.base_map.width
self.trail.append((img_w - 1 - vac[0] * SCALE, vac[1] * SCALE))
img = draw_overlay(self.base_map, data, self.trail)
self._tk_img = ImageTk.PhotoImage(img)
self._canvas.config(width=img.width, height=img.height)
self._canvas.delete("all")
self._canvas.create_image(0, 0, anchor=tk.NW, image=self._tk_img)
real = data.get("real_vac_coor", vac)
self._status.config(
text=f"Vacuum at ({real[0]}, {real[1]}) angle {real[2]}\u00b0"
)
self.root.after(int(self.interval * 1000), self._poll_next)
def _poll_next(self) -> None:
if self._clean is not None:
self._schedule(self._poll())
def _on_close(self) -> None:
self._loop.call_soon_threadsafe(self._loop.stop)
self.root.destroy()
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Live map tracker for Tapo robot vacuums"
)
parser.add_argument("--host", required=True, help="Device IP address")
parser.add_argument(
"--username",
default=os.environ.get("KASA_USERNAME"),
help="TP-Link account email (default: $KASA_USERNAME)",
)
parser.add_argument(
"--password",
default=os.environ.get("KASA_PASSWORD"),
help="TP-Link account password (default: $KASA_PASSWORD)",
)
parser.add_argument(
"--interval",
type=float,
default=2.0,
help="Seconds between position polls (default: 2)",
)
args = parser.parse_args()
if not args.username or not args.password:
parser.error(
"Credentials required via --username/--password"
" or KASA_USERNAME/KASA_PASSWORD env vars"
)
root = tk.Tk()
LiveMapApp(root, args.host, args.username, args.password, args.interval)
root.mainloop()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment