Last active
November 1, 2025 13:10
-
-
Save XxUnkn0wnxX/b65b244c0b4ac39909988a58b6024d1b to your computer and use it in GitHub Desktop.
A toolkit to try & test why p5mate pro refuses to talk when in bootloader mode, ensure hidapi installed via pip
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Interactive HID test harness for the PS5Mate Pro.""" | |
| from __future__ import annotations | |
| import sys | |
| import time | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Callable, Iterable, Optional | |
| import hid | |
| # Vendor IDs observed for the PS5Mate Pro updater utility | |
| VENDOR_IDS = (0x6A3A, 0x1ABE) | |
| USAGE_PAGE = 0x0002 | |
| USAGE = 0x01 | |
| # HID report parameters | |
| READ_BUFFER_SIZE = 64 | |
| COMMAND_TIMEOUT_MS = 5000 | |
| # Bootloader command constants (matching bootloader.js) | |
| BOOTLOADER_CMD_NOT_SUPPORT = 0x0000 | |
| BOOTLOADER_CMD_PING = 0x0001 | |
| BOOTLOADER_CMD_DOWNLOAD_512 = 0x0101 | |
| BOOTLOADER_CMD_DOWNLOAD_50 = 0x0102 | |
| BOOTLOADER_CMD_UPLOAD_512 = 0x0201 | |
| BOOTLOADER_CMD_UPLOAD_50 = 0x0202 | |
| BOOTLOADER_CMD_UPLOAD_50_ACK_TIMEOUT_MS = 500 | |
| BOOTLOADER_AREA_APP_IDX = 0 | |
| BOOTLOADER_AREA_DATA_IDX = 1 | |
| BOOTLOADER_AREA_SN_HW_IDX = 2 | |
| BOOTLOADER_AREA_SN_SW_IDX = 3 | |
| class DeviceDisconnected(Exception): | |
| """Raised when the HID device is no longer available.""" | |
| def hex_bytes(data: Iterable[int]) -> str: | |
| return " ".join(f"{byte:02x}" for byte in data) | |
| def pack_le16(value: int) -> bytes: | |
| return value.to_bytes(2, byteorder="little", signed=False) | |
| def pack_le32(value: int) -> bytes: | |
| return value.to_bytes(4, byteorder="little", signed=False) | |
| def make_ping_payload() -> bytes: | |
| """Return the exact 6-byte BOOTLOADER_CMD_PING payload.""" | |
| return pack_le16(0) + pack_le16(BOOTLOADER_CMD_PING) + pack_le16(0) | |
| def bootloader_cmd_get_upload_50(req_idx: int, area_idx: int, pos: int, size: int) -> bytes: | |
| head = pack_le16(req_idx) + pack_le16(BOOTLOADER_CMD_UPLOAD_50) + pack_le16(7) | |
| return head + bytes((area_idx,)) + pack_le16(size) + pack_le32(pos) | |
| def bootloader_cmd_get_download_50(req_idx: int, area_idx: int, pos: int, data: bytes, size: int) -> bytes: | |
| head = pack_le16(req_idx) + pack_le16(BOOTLOADER_CMD_DOWNLOAD_50) + pack_le16(7 + 50) | |
| payload = bytes((area_idx,)) + pack_le16(size) + pack_le32(pos) | |
| padded = data.ljust(50, b"\x00")[:50] | |
| return head + payload + padded | |
| def bootloader_cmd_get_download_512(req_idx: int, area_idx: int, pos: int, data: bytes) -> bytes: | |
| head = pack_le16(req_idx) + pack_le16(BOOTLOADER_CMD_DOWNLOAD_512) + pack_le16(7 + 512) | |
| payload = bytes((area_idx,)) + pack_le16(512) + pack_le32(pos) | |
| padded = data.ljust(512, b"\x00")[:512] | |
| return head + payload + padded | |
| def find_device(): | |
| """Locate the first HID device that matches our VID/usage filters.""" | |
| for vendor_id in VENDOR_IDS: | |
| for dev in hid.enumerate(vendor_id, 0): | |
| if dev.get("usage_page") == USAGE_PAGE and dev.get("usage") == USAGE: | |
| return dev | |
| return None | |
| def wait_for_device() -> tuple[hid.device, dict]: | |
| print("Waiting for PS5Mate device...") | |
| while True: | |
| device_info = find_device() | |
| if not device_info: | |
| time.sleep(1) | |
| continue | |
| print( | |
| "Opening device", | |
| f"VID=0x{device_info['vendor_id']:04x}", | |
| f"PID=0x{device_info['product_id']:04x}", | |
| device_info.get("product_string", ""), | |
| ) | |
| try: | |
| device = hid.device() | |
| device.open_path(device_info["path"]) | |
| device.set_nonblocking(False) | |
| return device, device_info | |
| except OSError as exc: | |
| print(f"Failed to open HID device: {exc}. Retrying...") | |
| time.sleep(1) | |
| def open_device(): | |
| device, info = wait_for_device() | |
| return device, info | |
| def read_once(device: hid.device, timeout_ms: int = COMMAND_TIMEOUT_MS) -> Optional[bytes]: | |
| try: | |
| data = device.read(READ_BUFFER_SIZE, timeout_ms=timeout_ms) | |
| except OSError as exc: | |
| raise DeviceDisconnected(f"Read error: {exc}") from exc | |
| if not data: | |
| print("No data received (timeout).") | |
| return None | |
| print("Received:", hex_bytes(data)) | |
| return bytes(data) | |
| def read_optional(device: hid.device, timeout_ms: int) -> Optional[bytes]: | |
| try: | |
| data = device.read(READ_BUFFER_SIZE, timeout_ms=timeout_ms) | |
| except OSError as exc: | |
| raise DeviceDisconnected(f"Read error: {exc}") from exc | |
| if data: | |
| print("Received:", hex_bytes(data)) | |
| return bytes(data) | |
| return None | |
| def send_report(device: hid.device, payload: bytes, report_id: int, pad_to: int = READ_BUFFER_SIZE) -> bool: | |
| report = bytes((report_id,)) + payload | |
| if pad_to and len(report) < (pad_to + 1): | |
| report = report.ljust(pad_to + 1, b"\x00") | |
| print(f"Sending report (reportId=0x{report_id:02x}): {hex_bytes(report)}") | |
| try: | |
| written = device.write(report) | |
| except OSError as exc: | |
| raise DeviceDisconnected(f"Write failed: {exc}") from exc | |
| if written != len(report): | |
| raise DeviceDisconnected( | |
| f"Short write: {written} of {len(report)} bytes (device likely disconnected)." | |
| ) | |
| return True | |
| def send_ping(device: hid.device, report_id: int) -> None: | |
| if send_report(device, make_ping_payload(), report_id): | |
| print(f"Waiting up to {COMMAND_TIMEOUT_MS} ms for ping response…") | |
| read_once(device, timeout_ms=COMMAND_TIMEOUT_MS) | |
| def request_upload_50(device: hid.device, report_id: int) -> None: | |
| area_idx = prompt_area(default=BOOTLOADER_AREA_SN_HW_IDX) | |
| if area_idx is None: | |
| return | |
| pos = 0 | |
| try: | |
| size_str = input("Chunk size (bytes, max 50) [default 50]: ").strip() | |
| except EOFError: | |
| print() | |
| return | |
| try: | |
| size = 50 if not size_str else max(1, min(50, int(size_str, 0))) | |
| except ValueError: | |
| print("Invalid size; aborting request.") | |
| return | |
| payload = bootloader_cmd_get_upload_50(0, area_idx, pos, size) | |
| if send_report(device, payload, report_id): | |
| print(f"Waiting up to {COMMAND_TIMEOUT_MS} ms for upload response…") | |
| read_once(device, timeout_ms=COMMAND_TIMEOUT_MS) | |
| def prompt_area( | |
| *, | |
| default: int = BOOTLOADER_AREA_SN_HW_IDX, | |
| allowed: Optional[tuple[int, ...]] = None, | |
| prompt_override: Optional[str] = None, | |
| ) -> Optional[int]: | |
| if allowed is None: | |
| allowed = (BOOTLOADER_AREA_APP_IDX, BOOTLOADER_AREA_DATA_IDX, BOOTLOADER_AREA_SN_HW_IDX, BOOTLOADER_AREA_SN_SW_IDX) | |
| labels = { | |
| BOOTLOADER_AREA_APP_IDX: "APP", | |
| BOOTLOADER_AREA_DATA_IDX: "DATA", | |
| BOOTLOADER_AREA_SN_HW_IDX: "SN_HW", | |
| BOOTLOADER_AREA_SN_SW_IDX: "SN_SW", | |
| } | |
| prompt_text = prompt_override | |
| if prompt_text is None: | |
| entries = ", ".join(f"{idx}={labels.get(idx, str(idx))}" for idx in allowed) | |
| prompt_text = f"Area index ({entries}) [default {default}]: " | |
| try: | |
| area_str = input(prompt_text).strip() | |
| except EOFError: | |
| print() | |
| return None | |
| if not area_str: | |
| return default | |
| try: | |
| value = int(area_str, 0) | |
| except ValueError: | |
| print("Invalid area index; aborting request.") | |
| return None | |
| if value not in allowed: | |
| print("Area index out of range; aborting request.") | |
| return None | |
| return value | |
| def prompt_file_path(prompt: str) -> Optional[Path]: | |
| try: | |
| path_str = input(prompt).strip() | |
| except EOFError: | |
| print() | |
| return None | |
| if not path_str: | |
| print("No path provided.") | |
| return None | |
| path = Path(path_str).expanduser() | |
| if not path.exists(): | |
| print(f"File not found: {path}") | |
| return None | |
| if not path.is_file(): | |
| print(f"Not a file: {path}") | |
| return None | |
| return path | |
| def download_file(device: hid.device, report_id: int, chunk_size: int) -> None: | |
| area_idx = prompt_area( | |
| default=BOOTLOADER_AREA_APP_IDX, | |
| allowed=(BOOTLOADER_AREA_APP_IDX, BOOTLOADER_AREA_DATA_IDX), | |
| prompt_override="Area index (0=APP,1=DATA) [default 0]: ", | |
| ) | |
| if area_idx is None: | |
| return | |
| path = prompt_file_path("Path to firmware/data file: ") | |
| if path is None: | |
| return | |
| data = path.read_bytes() | |
| total_len = len(data) | |
| if total_len == 0: | |
| print("File is empty; nothing to send.") | |
| return | |
| if chunk_size == 512 and total_len % 512 != 0: | |
| print( | |
| "Warning: file size is not a multiple of 512 bytes. " | |
| "The last 512-byte frame will be padded, matching the browser updater's behaviour." | |
| ) | |
| print(f"Sending {total_len} bytes to area {area_idx} in {chunk_size}-byte chunks.") | |
| offset = 0 | |
| chunk_count = 0 | |
| start_time = time.time() | |
| while offset < total_len: | |
| remaining = total_len - offset | |
| if chunk_size == 50: | |
| # Match browser logic: keep chunks <=50 bytes and never cross 512-byte page boundaries. | |
| max_chunk = min(remaining, 50) | |
| boundary = 512 - (offset % 512) | |
| actual_size = min(max_chunk, boundary) | |
| chunk = data[offset : offset + actual_size] | |
| payload = bootloader_cmd_get_download_50(0, area_idx, offset, chunk, len(chunk)) | |
| else: | |
| actual_size = min(remaining, 512) | |
| chunk = data[offset : offset + actual_size] | |
| payload = bootloader_cmd_get_download_512(0, area_idx, offset, chunk) | |
| send_report(device, payload, report_id) | |
| ack = read_optional(device, timeout_ms=BOOTLOADER_CMD_UPLOAD_50_ACK_TIMEOUT_MS) | |
| if not ack: | |
| print("(No acknowledgement received for this chunk.)") | |
| offset += actual_size | |
| chunk_count += 1 | |
| progress = min(100.0, (offset / total_len) * 100.0) | |
| print(f"Chunk {chunk_count}: offset={offset} bytes ({progress:.2f}% complete)") | |
| duration = time.time() - start_time | |
| print(f"Transfer complete in {duration:.2f}s ({chunk_count} chunks).") | |
| def advanced_ping(device: hid.device, report_id: int) -> None: | |
| area_idx = prompt_area( | |
| default=BOOTLOADER_AREA_APP_IDX, | |
| allowed=(BOOTLOADER_AREA_APP_IDX, BOOTLOADER_AREA_DATA_IDX), | |
| prompt_override="Area index for advanced ping (0=APP,1=DATA) [default 0]: ", | |
| ) | |
| if area_idx is None: | |
| return | |
| print( | |
| "Sending zero-length DOWNLOAD_50 frame to probe bootloader acknowledgements." | |
| ) | |
| payload = bootloader_cmd_get_download_50(0, area_idx, 0, b"", 0) | |
| send_report(device, payload, report_id) | |
| ack = read_optional(device, timeout_ms=COMMAND_TIMEOUT_MS) | |
| if not ack: | |
| print("No response received to advanced ping.") | |
| def configure_report(device_state: "DeviceState") -> None: | |
| print(f"Current report ID: 0x{device_state.report_id:02x}") | |
| try: | |
| new_value = input("Enter new report ID (hex or decimal, blank to cancel): ").strip() | |
| except EOFError: | |
| print() | |
| return | |
| if not new_value: | |
| print("Report ID unchanged.") | |
| return | |
| try: | |
| device_state.report_id = int(new_value, 0) | |
| except ValueError: | |
| print("Invalid value; report ID unchanged.") | |
| return | |
| print(f"Report ID set to 0x{device_state.report_id:02x}") | |
| @dataclass | |
| class DeviceState: | |
| device: hid.device | |
| report_id: int = 0x00 | |
| def menu_loop(device: hid.device) -> None: | |
| state = DeviceState(device=device) | |
| def action_send_ping(dev: hid.device) -> None: | |
| send_ping(dev, state.report_id) | |
| def action_advanced_ping(dev: hid.device) -> None: | |
| advanced_ping(dev, state.report_id) | |
| def action_upload_50(dev: hid.device) -> None: | |
| request_upload_50(dev, state.report_id) | |
| def action_read(dev: hid.device) -> None: | |
| read_once(dev, timeout_ms=COMMAND_TIMEOUT_MS) | |
| def action_report(dev: hid.device) -> None: | |
| configure_report(state) | |
| def action_download_50(dev: hid.device) -> None: | |
| download_file(dev, state.report_id, chunk_size=50) | |
| def action_download_512(dev: hid.device) -> None: | |
| download_file(dev, state.report_id, chunk_size=512) | |
| actions: dict[str, tuple[str, Callable[[hid.device], None] | None]] = { | |
| "1": ("Send PING (BOOTLOADER_CMD_PING)", action_send_ping), | |
| "2": ("Advanced ping (fake DOWNLOAD to APP/DATA)", action_advanced_ping), | |
| "3": ("Request UPLOAD_50 slice", action_upload_50), | |
| "4": ("Read once (no command)", action_read), | |
| "5": ("Change report ID", action_report), | |
| "6": ("DOWNLOAD_50 (flash APP/DATA in 50B chunks)", action_download_50), | |
| "7": ("DOWNLOAD_512 (flash APP/DATA in 512B chunks)", action_download_512), | |
| "q": ("Quit", None), | |
| } | |
| prompt = "\nSelect action:\n" + "\n".join( | |
| f" {key}) {desc}" for key, (desc, _) in actions.items() | |
| ) + "\n> " | |
| while True: | |
| try: | |
| choice = input(prompt).strip().lower() | |
| except EOFError: | |
| print() | |
| break | |
| if not choice: | |
| continue | |
| if choice not in actions: | |
| print("Unknown option. Try again.") | |
| continue | |
| desc, handler = actions[choice] | |
| if handler is None: | |
| print("Closing device.") | |
| break | |
| print(f"\n[{desc}]") | |
| try: | |
| handler(device) | |
| except DeviceDisconnected as exc: | |
| print(f"Device disconnected: {exc}") | |
| raise | |
| def main() -> None: | |
| while True: | |
| device, info = open_device() | |
| print("Device opened. Connection will remain active until you exit.") | |
| try: | |
| menu_loop(device) | |
| except DeviceDisconnected: | |
| print("Waiting for device to reconnect before resuming menu...\n") | |
| device.close() | |
| continue | |
| finally: | |
| if device: | |
| try: | |
| device.close() | |
| except OSError: | |
| pass | |
| print("Device closed.") | |
| # menu_loop exited normally (user selected Quit) | |
| break | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("Interrupted by user.") | |
| sys.exit(130) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment