Skip to content

Instantly share code, notes, and snippets.

@dwoffinden
Forked from RaphaelWimmer/endscopetool.py
Last active March 6, 2026 21:50
Show Gist options
  • Select an option

  • Save dwoffinden/20be1f532c3d34f6311a2ca5e99cad54 to your computer and use it in GitHub Desktop.

Select an option

Save dwoffinden/20be1f532c3d34f6311a2ca5e99cad54 to your computer and use it in GitHub Desktop.
Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
use flake
.direnv
.pre-commit-config.yaml
result
#!/usr/bin/env python3
# Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
# CC-0 / Public Domain
# (0) 2023 Raphael Wimmer
# v0.1.0
# reverse-engineered using a packet capture log - this means that I have no idea what all those magic numbers mean
# and whether there are further features that might be supported by the hardware
# usage: first connect to the 'softish-XXXX' wifi, then run this script. Check code for keyboard shortcuts.
import socket
import cv2
import numpy as np
import time
from PIL import Image
from io import BytesIO
from urllib.parse import parse_qs
def get_battery_level(query_string: str) -> float | None:
"""
Extracts the battery level from a string like 'type=2001&data=23'.
Returns an integer or None if not found or invalid.
"""
try:
params = parse_qs(query_string)
return int(params["data"][0]) / 100
except (KeyError, IndexError, ValueError):
print(f"failed to extract battery from data: ${query_string}")
return None
def draw_battery(
img: cv2.typing.MatLike,
x: int,
y: int,
width: int,
height: int,
level: float,
thickness: int,
) -> None:
"""
Draw a battery icon at (x, y) with given width, height and charge level (0 to 1).
"""
# Clamp level to [0, 1]
level = max(0, min(level, 1.0))
# Colors
border_color = (255, 255, 255)
fill_color = (0, 255, 0) if level > 0.3 else (0, 0, 255) # Red if low battery
# Draw battery outline
cv2.rectangle(img, (x, y), (x + width, y + height), border_color, thickness)
# Draw battery tip
tip_width = int(width * 0.08)
tip_x = x + width
tip_y = y + int(height * 0.3)
tip_height = int(height * 0.4)
cv2.rectangle(
img, (tip_x, tip_y), (tip_x + tip_width, tip_y + tip_height), border_color, -1
)
# Fill battery level
fill_width = int((width - 4) * level)
cv2.rectangle(
img, (x + 2, y + 2), (x + 2 + fill_width, y + height - 2), fill_color, -1
)
def absolute_frame_from_raw(raw_frame: int, latest_abs_frame: int) -> int:
# Find the multiple of 256 that makes raw_frame closest to latest_abs_frame
base = (latest_abs_frame // 256) * 256
candidates = [base - 256 + raw_frame, base + raw_frame, base + 256 + raw_frame]
# pick the candidate closest to latest_abs_frame
abs_frame = min(candidates, key=lambda x: abs(x - latest_abs_frame))
return abs_frame
def main() -> None:
debug = False
buffer_size = 1500
target_ip = "192.168.1.1"
target_port_meta = 61502
source_port_meta = 50262
target_port_vid = 61503
source_port_vid = 51320
sock_meta = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_meta.bind(("0.0.0.0", source_port_meta))
sock_vid = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_vid.bind(("0.0.0.0", source_port_vid))
sock_vid.settimeout(5.0)
brightness = 100
win_name = "Video Stream"
firstframe = True
def query_battery() -> float | None:
# Battery?
data: bytes = "type=1001\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply: bytes = sock_meta.recvfrom(buffer_size)[0]
received_data: str = reply.decode()
return get_battery_level(received_data)
try:
# get system info
data: bytes = "type=1002\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply: bytes = sock_meta.recvfrom(buffer_size)[0]
received_data = reply.decode()
print("Received data:", received_data)
battery_level: float | None = query_battery()
print(f"Battery level: {battery_level}")
# three times according to captured traffic
data = "\x20\x36\x00\x02".encode()
sock_vid.sendto(data, (target_ip, target_port_vid))
sock_vid.sendto(data, (target_ip, target_port_vid))
sock_vid.sendto(data, (target_ip, target_port_vid))
# set led brightness to 100%
data = "type=1003&value=100\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply = sock_meta.recvfrom(buffer_size)[0]
# handle UnicodeDecodeError: 'utf-8' codec can't decode byte 0xaa in position 21: invalid start byte gracefully
try:
received_data = reply.decode()
print("Received data:", received_data)
except UnicodeDecodeError:
print("UnicodeDecodeError, can be ignored")
cv2.namedWindow(win_name, flags=cv2.WINDOW_GUI_NORMAL)
rotation_lock = False
rotation = 0
fullframe = False
raw_frame = 0
frame = 0
part = 0
pic_buf = b""
keep_awake_time = time.time()
# Store received parts per frame
# frame_number -> {part_number: pic_data}
frames_dict: dict[int, dict[int, bytes]] = {}
# number of parts required per frame
parts_dict: dict[int, int] = {}
while True:
# read video stream
reply = sock_vid.recvfrom(buffer_size)[0]
raw_frame = reply[0]
frame_end: int = reply[1]
part = reply[2]
part_end: int = reply[3]
# misc_data = reply[4:8]
if not rotation_lock:
rotation = int.from_bytes(reply[4:6], "big")
pic_data = reply[8:]
frame = absolute_frame_from_raw(raw_frame, frame)
# store the part
if frame not in frames_dict:
frames_dict[frame] = {}
frames_dict[frame][part] = pic_data
if debug:
print(
f"raw_frame={raw_frame}, frame={frame}, frame_end={frame_end}, part={part}, part_end={part_end}"
)
# find number of parts required
if frame_end == 1:
parts_dict[frame] = part_end
if frame in parts_dict:
num_parts = parts_dict[frame]
parts = frames_dict[frame]
if all(p in parts for p in range(num_parts)):
pic_buf = b"".join(parts[i] for i in range(num_parts))
try:
image = Image.open(BytesIO(pic_buf))
image_np = np.array(image)
image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
num_rows, num_cols = image_cv.shape[:2]
if not fullframe:
# Case 1: Masked circle. The window will be a square of the SHORTER dimension.
square_size = min(num_rows, num_cols)
# Create a circular mask on the original image dimensions
mask = np.zeros((num_rows, num_cols), np.uint8)
cv2.circle(
mask,
(num_cols // 2, num_rows // 2),
square_size // 2,
255,
-1,
)
image_masked = cv2.bitwise_and(
image_cv, image_cv, mask=mask
)
# Get rotation matrix for the original image
rotation_matrix = cv2.getRotationMatrix2D(
(num_cols / 2, num_rows / 2), rotation + 90, 1
)
# Rotate the masked image within its original frame
image_rotated = cv2.warpAffine(
image_masked, rotation_matrix, (num_cols, num_rows)
)
# Crop the center square from the rotated image
center_x, center_y = num_cols // 2, num_rows // 2
half_size = square_size // 2
image_to_show = image_rotated[
center_y - half_size : center_y + half_size,
center_x - half_size : center_x + half_size,
]
else:
# Case 2: Full frame, ensuring no corners are ever cropped.
# The window will be a square with side length equal to the image diagonal.
# Calculate the length of the image diagonal
diagonal = np.sqrt(num_cols**2 + num_rows**2)
# The new square size is the diagonal, rounded up to the nearest integer
square_size = int(np.ceil(diagonal))
# Get the rotation matrix centered on the original image
rotation_matrix = cv2.getRotationMatrix2D(
(num_cols / 2, num_rows / 2), rotation + 90, 1
)
# Adjust the matrix's translation component to center the image on the new, larger canvas
tx = (square_size - num_cols) / 2
ty = (square_size - num_rows) / 2
rotation_matrix[0, 2] += tx
rotation_matrix[1, 2] += ty
# Warp the original image onto the new square canvas
image_to_show = cv2.warpAffine(
image_cv, rotation_matrix, (square_size, square_size)
)
if debug:
print(
f"image {num_rows}x{num_cols}, using window {square_size}x{square_size}"
)
if battery_level is not None:
draw_battery(
image_to_show,
x=square_size // 100,
y=square_size // 100,
width=square_size // 10,
height=square_size // 20,
level=battery_level,
thickness=square_size // 200,
)
cv2.imshow(win_name, image_to_show)
if firstframe:
cv2.resizeWindow(win_name, square_size, square_size)
firstframe = False
# delete earlier frame data
frames_dict = {
f: frames_dict[f] for f in frames_dict if f >= frame
}
parts_dict = {
f: parts_dict[f] for f in parts_dict if f >= frame
}
if time.time() > keep_awake_time:
keep_awake_time = time.time() + 10
prev_battery_level = battery_level
battery_level = query_battery()
if prev_battery_level != battery_level:
print(f"Battery level: {battery_level}")
except OSError:
print("image corrupted")
# process UI events (e.g. window closing) and poll for a keypress
key = cv2.pollKey() & 0xFF
if key == ord("1"):
rotation_lock = True
rotation = 0
elif key == ord("2"):
rotation_lock = True
rotation = 90
elif key == ord("3"):
rotation_lock = True
rotation = 180
elif key == ord("4"):
rotation_lock = True
rotation = 270
elif key == ord("r"):
rotation_lock = False
elif (
key == ord("q")
or key == 27
or cv2.getWindowProperty(win_name, cv2.WND_PROP_AUTOSIZE) == -1
):
print("window closed")
break
elif key == ord("w"):
with open("out.jpg", "wb") as fd:
ret = fd.write(pic_buf)
print("Wrote " + str(ret) + " bytes to out.jpg")
elif key == ord("+"):
if brightness < 100:
brightness += 10
data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
print("Send data: ", data)
sock_meta.sendto(data, (target_ip, target_port_meta))
reply = sock_meta.recvfrom(buffer_size)[0]
received_data = reply.decode()
print("Received data:", received_data)
elif key == ord("-"):
if brightness > 0:
brightness -= 10
data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
print("Send data: ", data)
sock_meta.sendto(data, (target_ip, target_port_meta))
reply = sock_meta.recvfrom(buffer_size)[0]
received_data = reply.decode()
print("Received data:", received_data)
elif key == ord("f"):
fullframe = not fullframe
elif key == ord("d"):
debug = not debug
finally:
# stop stream
data = "\x20\x37".encode()
sock_vid.sendto(data, (target_ip, target_port_vid))
# Close the socket
sock_meta.close()
sock_vid.close()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
{
"nodes": {
"flake-compat": {
"flake": false,
"locked": {
"lastModified": 1767039857,
"narHash": "sha256-vNpUSpF5Nuw8xvDLj2KCwwksIbjua2LZCqhV1LNRDns=",
"owner": "NixOS",
"repo": "flake-compat",
"rev": "5edf11c44bc78a0d334f6334cdaf7d60d732daab",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "flake-compat",
"type": "github"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"git-hooks": {
"inputs": {
"flake-compat": "flake-compat",
"gitignore": "gitignore",
"nixpkgs": "nixpkgs"
},
"locked": {
"lastModified": 1772665116,
"narHash": "sha256-XmjUDG/J8Z8lY5DVNVUf5aoZGc400FxcjsNCqHKiKtc=",
"owner": "cachix",
"repo": "git-hooks.nix",
"rev": "39f53203a8458c330f61cc0759fe243f0ac0d198",
"type": "github"
},
"original": {
"owner": "cachix",
"repo": "git-hooks.nix",
"type": "github"
}
},
"gitignore": {
"inputs": {
"nixpkgs": [
"git-hooks",
"nixpkgs"
]
},
"locked": {
"lastModified": 1709087332,
"narHash": "sha256-HG2cCnktfHsKV0s4XW83gU3F57gaTljL9KNSuG6bnQs=",
"owner": "hercules-ci",
"repo": "gitignore.nix",
"rev": "637db329424fd7e46cf4185293b9cc8c88c95394",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "gitignore.nix",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1770073757,
"narHash": "sha256-Vy+G+F+3E/Tl+GMNgiHl9Pah2DgShmIUBJXmbiQPHbI=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "47472570b1e607482890801aeaf29bfb749884f6",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1772598333,
"narHash": "sha256-YaHht/C35INEX3DeJQNWjNaTcPjYmBwwjFJ2jdtr+5U=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "fabb8c9deee281e50b1065002c9828f2cf7b2239",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-25.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"git-hooks": "git-hooks",
"nixpkgs": "nixpkgs_2"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}
{
description = "A flake for endscopetool";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.11";
flake-utils.url = "github:numtide/flake-utils";
git-hooks.url = "github:cachix/git-hooks.nix";
};
outputs =
{
self,
nixpkgs,
flake-utils,
git-hooks,
}:
flake-utils.lib.eachDefaultSystem (
system:
let
pkgs = import nixpkgs { inherit system; };
python = pkgs.python313;
deps = ps: [
(ps.opencv4.override { enableGtk3 = true; })
ps.numpy
ps.pillow
];
python-with-mypy = python.withPackages (
ps:
(deps ps)
++ [
ps.mypy
ps.types-pillow
]
);
endscopetool = python.pkgs.buildPythonApplication {
pname = "endscopetool";
version = "0.1.0";
pyproject = true;
src = ./.;
nativeBuildInputs = [
python.pkgs.setuptools
];
dependencies = deps python.pkgs;
buildInputs = [ pkgs.gtk3 ];
};
pre-commit-check = git-hooks.lib.${system}.run {
src = ./.;
hooks = {
nixfmt-rfc-style.enable = true;
mypy = {
enable = true;
settings = {
binPath = "${python-with-mypy}/bin/mypy";
};
};
ruff.enable = true;
ruff-format.enable = true;
};
};
in
{
packages.default = endscopetool;
apps.default = {
type = "app";
program = "${endscopetool}/bin/endscopetool";
};
checks = {
inherit pre-commit-check;
};
formatter =
let
config = self.checks.${system}.pre-commit-check.config;
script = ''
${pkgs.lib.getExe config.package} run --all-files --config ${config.configFile}
'';
in
pkgs.writeShellScriptBin "pre-commit-run" script;
devShells.default = pkgs.mkShell {
inherit (pre-commit-check) shellHook;
buildInputs = pre-commit-check.enabledPackages;
packages = [
pkgs.nixfmt-rfc-style
pkgs.gtk3
(python.withPackages deps)
];
};
}
);
}
[project]
name = "endscopetool"
version = "0.1.0"
description = "Python implementation of the endscopetool Android application"
requires-python = ">=3.12"
dependencies = [
"numpy",
"pillow",
"opencv"
]
[project.scripts]
endscopetool = "endscopetool:main"
[tool.setuptools]
py-modules = ["endscopetool"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment