Last active
October 1, 2025 12:48
-
-
Save jrialland/610168989f9baf97db01a68a9a0115d6 to your computer and use it in GitHub Desktop.
An ASGI middleware class that runs a thirdparty npm process and proxies requests that fall in 404 to it
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| ASGI Middleware for serving the frontend application. | |
| This modules is used in development environment only. | |
| When in production, the frontend application may be served by a dedicated web server. | |
| ``` | |
| if not_in_production_environment: | |
| app = FrontendAppMiddleware(app, app_dir='<path to npm project>') | |
| ``` | |
| When activated, this middleware starts the frontend application using npm and proxies requests to it : | |
| - Any request leading to a 404 (i.e not handled by other routes) is proxied to the frontend application. | |
| - Websocket requests are also proxied to the frontend application. | |
| This allows to serve the frontend application during development without needing to build it and serve static files. | |
| """ | |
| import asyncio | |
| import logging | |
| import os | |
| import subprocess | |
| import httpx | |
| import websockets | |
| from starlette.types import ASGIApp, Scope, Receive, Send | |
| def check_http_port(host: str, port: int, timeout: float = 1.0) -> bool: | |
| """ | |
| Check if an HTTP server is listening on the given host and port. | |
| """ | |
| try: | |
| httpx.get(f"http://{host}:{port}", timeout=timeout) | |
| return True | |
| except httpx.RequestError: | |
| return False | |
| class FrontendAppMiddleware: | |
| """ | |
| Middleware that starts the frontend application using npm and proxies requests to it. | |
| Any 'local' 404 request (i.e requests that are not handled by any route) is proxied to the frontend application. | |
| """ | |
| def __init__( | |
| self, | |
| app: ASGIApp, | |
| app_dir: str, | |
| port: int | None = None, | |
| startcmd: list[str] | None = None, | |
| do_start_cmd: bool = True, | |
| do_check_port: bool = True, | |
| ): | |
| """ | |
| Initialize the middleware. | |
| :param app: The ASGI application to wrap. | |
| :param app_dir: The directory where the frontend application is located (i.e where to run npm commands). | |
| :param port: The port on which the frontend application listens. If None, defaults to 5173 (Vite default). | |
| :param startcmd: The command to start the frontend application. If None, defaults to ['npm', 'run', 'dev']. | |
| :param do_start_cmd: Whether to start the frontend application using the startcmd. If False, the middleware will only check if the application is running on the given port. | |
| :param do_check_port: Whether to check if the frontend application is running on the given port before starting it. | |
| """ | |
| assert os.path.isdir( | |
| app_dir | |
| ), f"Frontend app directory {app_dir} does not exist" | |
| self.app = app | |
| self.app_dir = app_dir | |
| self.port = port or 5173 # default port for Vite | |
| self.startcmd = startcmd or [ | |
| "npm", | |
| "run", | |
| "dev", | |
| ] # default command to start Vite | |
| self.do_start_cmd = do_start_cmd | |
| self.do_check_port = do_check_port | |
| self.process = None | |
| self.logger = logging.getLogger(self.__class__.__name__) | |
| async def on_lifespan_startup(self): | |
| """ | |
| Called when the application starts up. | |
| """ | |
| if self.do_start_cmd: | |
| if self.do_check_port and check_http_port("localhost", self.port): | |
| self.logger.info( | |
| "Frontend app already running on port %d, not starting it", | |
| self.port, | |
| ) | |
| return | |
| self.logger.info( | |
| "Starting frontend app in %s on port %d", self.app_dir, self.port | |
| ) | |
| self.process = subprocess.Popen(self.startcmd, cwd=self.app_dir, shell=True) | |
| self.logger.info("Frontend app started with PID %d", self.process.pid) | |
| elif self.do_check_port and not check_http_port("localhost", self.port): | |
| raise RuntimeError( | |
| f"Frontend app is not running on port {self.port}, and automatic start is disabled" | |
| ) | |
| async def on_lifespan_shutdown(self): | |
| """ | |
| Called when the application shuts down. | |
| """ | |
| if self.process: | |
| self.logger.info("Stopping frontend app") | |
| self.process.terminate() | |
| try: | |
| self.process.wait(timeout=10) | |
| except subprocess.TimeoutExpired: | |
| self.logger.warning( | |
| "Frontend app did not terminate in time, killing it" | |
| ) | |
| self.process.kill() | |
| self.process = None | |
| async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None: | |
| """ | |
| Handle lifespan events to start and stop the frontend application. | |
| :param scope: The ASGI scope. | |
| :param receive: The ASGI receive callable. | |
| :param send: The ASGI send callable. | |
| :return: None | |
| """ | |
| async def receive_wrapper(): | |
| message = await receive() | |
| if message["type"] == "lifespan.startup": | |
| await self.on_lifespan_startup() | |
| elif message["type"] == "lifespan.shutdown": | |
| await self.on_lifespan_shutdown() | |
| return message | |
| await self.app(scope, receive_wrapper, send) | |
| async def websocket_proxy(self, scope: Scope, receive: Receive, send: Send) -> None: | |
| """ | |
| Proxy websocket requests to the frontend application. | |
| :param scope: The ASGI scope. | |
| :param receive: The ASGI receive callable. | |
| :param send: The ASGI send callable. | |
| :return: None | |
| """ | |
| # we only proxy the root path, other paths are handled by the main app | |
| if scope["path"] != "/": | |
| await self.app(scope, receive, send) | |
| return | |
| # reconstruct the URL to connect to the frontend app | |
| url = f"ws://localhost:{self.port}{scope['path']}" | |
| if scope.get("query_string", b""): # type: ignore | |
| url += f"?{scope['query_string'].decode()}" # type: ignore | |
| # extract subprotocols from the headers | |
| subprotocols = [] | |
| for header, value in scope["headers"]: # type: ignore | |
| if header == b"sec-websocket-protocol": | |
| subprotocols = [v.strip() for v in value.decode().split(",")] | |
| # connect to the frontend app websocket server | |
| async with websockets.connect(url, subprotocols=subprotocols) as websocket: | |
| await send( | |
| {"type": "websocket.accept", "subprotocol": websocket.subprotocol} | |
| ) | |
| async def forward_to_client(): | |
| """for each message received from the frontend app, send it to the client""" | |
| while True: | |
| msg = await websocket.recv() | |
| client_msg: dict = {"type": "websocket.send"} | |
| if isinstance(msg, str): | |
| client_msg["text"] = msg | |
| else: | |
| client_msg["bytes"] = msg | |
| await send(client_msg) | |
| async def forward_to_server(): | |
| """for each message received from the client, send it to the frontend app""" | |
| while True: | |
| message = await receive() | |
| if message["type"] == "websocket.receive": | |
| if "bytes" in message: | |
| await websocket.send(message["bytes"], text=False) | |
| else: | |
| await websocket.send(message.get("text", ""), text=True) | |
| elif message["type"] == "websocket.disconnect": | |
| await websocket.close() | |
| return | |
| await asyncio.gather(forward_to_client(), forward_to_server()) | |
| async def http_proxy(self, scope: Scope, receive: Receive, send: Send) -> None: | |
| """ | |
| Proxy HTTP requests to the frontend application by first letting the wrapped | |
| application handle the request. If it returns a 404, forward the request to the | |
| frontend dev server instead. | |
| :param scope: The ASGI scope. | |
| :param receive: The ASGI receive callable. | |
| :param send: The ASGI send callable. | |
| :return: None | |
| """ | |
| response_messages: list[dict] = [] | |
| status_code: int | None = None | |
| async def send_wrapper(message): | |
| nonlocal status_code | |
| if message["type"] == "http.response.start": | |
| status_code = message["status"] | |
| response_messages.append(message) | |
| request_body_chunks: list[bytes] = [] | |
| request_more_body = False | |
| async def receive_wrapper(): | |
| nonlocal request_more_body | |
| message = await receive() | |
| if message["type"] == "http.request": | |
| request_body_chunks.append(message.get("body", b"")) | |
| request_more_body = message.get("more_body", False) | |
| return message | |
| await self.app(scope, receive_wrapper, send_wrapper) | |
| if status_code != 404: | |
| for message in response_messages: | |
| await send(message) | |
| return | |
| # Ensure the entire body is captured even if the downstream app did not read it | |
| while request_more_body: | |
| message = await receive() | |
| if message["type"] != "http.request": | |
| continue | |
| request_body_chunks.append(message.get("body", b"")) | |
| request_more_body = message.get("more_body", False) | |
| body = b"".join(request_body_chunks) | |
| headers = {k.decode(): v.decode() for k, v in scope["headers"]} # type: ignore | |
| query_string = scope.get("query_string", b"") # type: ignore | |
| if query_string: | |
| query_part = f"?{query_string.decode()}" | |
| else: | |
| query_part = "" | |
| url = f"http://localhost:{self.port}{scope['path']}{query_part}" # type: ignore | |
| async with httpx.AsyncClient() as client: | |
| response = await client.request( | |
| method=scope["method"], # type: ignore | |
| url=url, | |
| headers=headers, | |
| content=body, | |
| ) | |
| await send( | |
| { | |
| "type": "http.response.start", | |
| "status": response.status_code, | |
| "headers": [ | |
| (k.encode(), v.encode()) for k, v in response.headers.items() | |
| ], | |
| } | |
| ) | |
| await send( | |
| { | |
| "type": "http.response.body", | |
| "body": response.content, | |
| "more_body": False, | |
| } | |
| ) | |
| async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: | |
| """ | |
| ASGI entry point. | |
| Dispatch to the appropriate handler based on the scope type. | |
| :param scope: The ASGI scope. | |
| :param receive: The ASGI receive callable. | |
| :param send: The ASGI send callable. | |
| :return: None | |
| """ | |
| if scope["type"] == "lifespan": | |
| await self.lifespan(scope, receive, send) | |
| elif scope["type"] == "websocket": | |
| await self.websocket_proxy(scope, receive, send) | |
| elif scope["type"] == "http": | |
| await self.http_proxy(scope, receive, send) | |
| else: | |
| await self.app(scope, receive, send) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment