Created
September 2, 2025 14:29
-
-
Save adamziel/b7f2a43510a9b8e20f456142d6ef01cc to your computer and use it in GitHub Desktop.
Syntax-highlighted, debuggable PHP in CDP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Serves a syntax-highlighted, source-mapped, debuggable file to Chrome devtools | |
| over the CDP protocol. | |
| """ | |
| import asyncio | |
| import websockets | |
| import json | |
| import base64 | |
| class CDPServer: | |
| def __init__(self): | |
| self.clients = set() | |
| self.scripts = {} | |
| self.breakpoints = {} | |
| self.scripts_sent = set() | |
| async def register_client(self, websocket): | |
| self.clients.add(websocket) | |
| print(f"Client connected: {websocket.remote_address}") | |
| async def unregister_client(self, websocket): | |
| self.clients.discard(websocket) | |
| client_id = id(websocket) | |
| self.scripts_sent.discard(client_id) | |
| print(f"Client disconnected: {websocket.remote_address}") | |
| async def send_script_parsed(self, websocket): | |
| # Only send scripts once per client connection | |
| client_id = id(websocket) | |
| if client_id in self.scripts_sent: | |
| return | |
| # Load PHP source content to embed in sourcesContent | |
| php_script_content = """<?php | |
| function greet() { | |
| echo "Hello from PHP source!\n"; | |
| } | |
| greet(); | |
| """ | |
| # Prepare inline base64 source map URL for JavaScript, injecting sourcesContent | |
| php_unhighlighted_script_url = "file:///.sources/hello.php" | |
| php_highlighted_script_url = "file:///project/hello.php" | |
| map_obj = { | |
| "version": 3, | |
| "file": php_unhighlighted_script_url, | |
| "sources": [php_highlighted_script_url], | |
| "sourcesContent": [php_script_content], | |
| "names": [], | |
| "mappings": "AAAA;AACA;AACA;AACA;AACA;AACA" | |
| } | |
| encoded_map = base64.b64encode(json.dumps(map_obj).encode("utf-8")).decode("ascii") | |
| js_source_map_url = f"data:application/json;base64,{encoded_map}" | |
| js_script_id = "2" | |
| js_event = { | |
| "method": "Debugger.scriptParsed", | |
| "params": { | |
| "scriptId": js_script_id, | |
| "url": php_unhighlighted_script_url, | |
| "startLine": 0, | |
| "startColumn": 0, | |
| "endLine": php_script_content.count('\n'), | |
| "endColumn": 0, | |
| "executionContextId": 1, | |
| "hash": "def456", | |
| "sourceMapURL": js_source_map_url | |
| } | |
| } | |
| await websocket.send(json.dumps(js_event)) | |
| # Mark scripts as sent for this client | |
| self.scripts[js_script_id] = { | |
| "content": php_script_content, | |
| "url": php_unhighlighted_script_url | |
| } | |
| self.scripts_sent.add(client_id) | |
| async def handle_message(self, websocket, message): | |
| try: | |
| data = json.loads(message) | |
| method = data.get("method", "") | |
| params = data.get("params", {}) | |
| msg_id = data.get("id") | |
| response = {"id": msg_id} | |
| if method == "Runtime.enable": | |
| response["result"] = {} | |
| elif method == "Debugger.enable": | |
| response["result"] = {"debuggerId": "debugger-1"} | |
| # Send script parsed after debugger is enabled | |
| await self.send_script_parsed(websocket) | |
| elif method == "Debugger.setBreakpointByUrl": | |
| line_number = params.get("lineNumber") | |
| url = params.get("url", "") | |
| breakpoint_id = f"bp_{len(self.breakpoints)}" | |
| self.breakpoints[breakpoint_id] = { | |
| "url": url, | |
| "lineNumber": line_number | |
| } | |
| response["result"] = { | |
| "breakpointId": breakpoint_id, | |
| "locations": [{ | |
| "scriptId": "2", | |
| "lineNumber": line_number, | |
| "columnNumber": 0 | |
| }] | |
| } | |
| elif method == "Debugger.removeBreakpoint": | |
| breakpoint_id = params.get("breakpointId") | |
| if breakpoint_id in self.breakpoints: | |
| del self.breakpoints[breakpoint_id] | |
| response["result"] = {} | |
| elif method == "Debugger.getScriptSource": | |
| script_id = params.get("scriptId") | |
| if script_id in self.scripts: | |
| response["result"] = { | |
| "scriptSource": self.scripts[script_id]["content"] | |
| } | |
| else: | |
| print(script_id) | |
| print(self.scripts) | |
| response["error"] = {"code": -1, "message": "Script not found"} | |
| elif method == "Runtime.runIfWaitingForDebugger": | |
| response["result"] = {} | |
| else: | |
| response["result"] = {} | |
| await websocket.send(json.dumps(response)) | |
| except json.JSONDecodeError: | |
| print(f"Invalid JSON received: {message}") | |
| except Exception as e: | |
| print(f"Error handling message: {e}") | |
| async def handle_client(self, websocket): | |
| await self.register_client(websocket) | |
| try: | |
| async for message in websocket: | |
| await self.handle_message(websocket, message) | |
| except websockets.exceptions.ConnectionClosed: | |
| pass | |
| finally: | |
| await self.unregister_client(websocket) | |
| async def main(): | |
| server = CDPServer() | |
| print("Starting CDP server on ws://localhost:9222/") | |
| print("Open Chrome DevTools at: devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws=localhost:9222") | |
| async with websockets.serve(server.handle_client, "localhost", 9222): | |
| await asyncio.Future() # Run forever | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment