Skip to content

Instantly share code, notes, and snippets.

@jrialland
Last active October 1, 2025 12:48
Show Gist options
  • Select an option

  • Save jrialland/610168989f9baf97db01a68a9a0115d6 to your computer and use it in GitHub Desktop.

Select an option

Save jrialland/610168989f9baf97db01a68a9a0115d6 to your computer and use it in GitHub Desktop.
An ASGI middleware class that runs a thirdparty npm process and proxies requests that fall in 404 to it
"""
ASGI Middleware for serving the frontend application.
This modules is used in development environment only.
When in production, the frontend application may be served by a dedicated web server.
```
if not_in_production_environment:
app = FrontendAppMiddleware(app, app_dir='<path to npm project>')
```
When activated, this middleware starts the frontend application using npm and proxies requests to it :
- Any request leading to a 404 (i.e not handled by other routes) is proxied to the frontend application.
- Websocket requests are also proxied to the frontend application.
This allows to serve the frontend application during development without needing to build it and serve static files.
"""
import asyncio
import logging
import os
import subprocess
import httpx
import websockets
from starlette.types import ASGIApp, Scope, Receive, Send
def check_http_port(host: str, port: int, timeout: float = 1.0) -> bool:
"""
Check if an HTTP server is listening on the given host and port.
"""
try:
httpx.get(f"http://{host}:{port}", timeout=timeout)
return True
except httpx.RequestError:
return False
class FrontendAppMiddleware:
"""
Middleware that starts the frontend application using npm and proxies requests to it.
Any 'local' 404 request (i.e requests that are not handled by any route) is proxied to the frontend application.
"""
def __init__(
self,
app: ASGIApp,
app_dir: str,
port: int | None = None,
startcmd: list[str] | None = None,
do_start_cmd: bool = True,
do_check_port: bool = True,
):
"""
Initialize the middleware.
:param app: The ASGI application to wrap.
:param app_dir: The directory where the frontend application is located (i.e where to run npm commands).
:param port: The port on which the frontend application listens. If None, defaults to 5173 (Vite default).
:param startcmd: The command to start the frontend application. If None, defaults to ['npm', 'run', 'dev'].
:param do_start_cmd: Whether to start the frontend application using the startcmd. If False, the middleware will only check if the application is running on the given port.
:param do_check_port: Whether to check if the frontend application is running on the given port before starting it.
"""
assert os.path.isdir(
app_dir
), f"Frontend app directory {app_dir} does not exist"
self.app = app
self.app_dir = app_dir
self.port = port or 5173 # default port for Vite
self.startcmd = startcmd or [
"npm",
"run",
"dev",
] # default command to start Vite
self.do_start_cmd = do_start_cmd
self.do_check_port = do_check_port
self.process = None
self.logger = logging.getLogger(self.__class__.__name__)
async def on_lifespan_startup(self):
"""
Called when the application starts up.
"""
if self.do_start_cmd:
if self.do_check_port and check_http_port("localhost", self.port):
self.logger.info(
"Frontend app already running on port %d, not starting it",
self.port,
)
return
self.logger.info(
"Starting frontend app in %s on port %d", self.app_dir, self.port
)
self.process = subprocess.Popen(self.startcmd, cwd=self.app_dir, shell=True)
self.logger.info("Frontend app started with PID %d", self.process.pid)
elif self.do_check_port and not check_http_port("localhost", self.port):
raise RuntimeError(
f"Frontend app is not running on port {self.port}, and automatic start is disabled"
)
async def on_lifespan_shutdown(self):
"""
Called when the application shuts down.
"""
if self.process:
self.logger.info("Stopping frontend app")
self.process.terminate()
try:
self.process.wait(timeout=10)
except subprocess.TimeoutExpired:
self.logger.warning(
"Frontend app did not terminate in time, killing it"
)
self.process.kill()
self.process = None
async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
Handle lifespan events to start and stop the frontend application.
:param scope: The ASGI scope.
:param receive: The ASGI receive callable.
:param send: The ASGI send callable.
:return: None
"""
async def receive_wrapper():
message = await receive()
if message["type"] == "lifespan.startup":
await self.on_lifespan_startup()
elif message["type"] == "lifespan.shutdown":
await self.on_lifespan_shutdown()
return message
await self.app(scope, receive_wrapper, send)
async def websocket_proxy(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
Proxy websocket requests to the frontend application.
:param scope: The ASGI scope.
:param receive: The ASGI receive callable.
:param send: The ASGI send callable.
:return: None
"""
# we only proxy the root path, other paths are handled by the main app
if scope["path"] != "/":
await self.app(scope, receive, send)
return
# reconstruct the URL to connect to the frontend app
url = f"ws://localhost:{self.port}{scope['path']}"
if scope.get("query_string", b""): # type: ignore
url += f"?{scope['query_string'].decode()}" # type: ignore
# extract subprotocols from the headers
subprotocols = []
for header, value in scope["headers"]: # type: ignore
if header == b"sec-websocket-protocol":
subprotocols = [v.strip() for v in value.decode().split(",")]
# connect to the frontend app websocket server
async with websockets.connect(url, subprotocols=subprotocols) as websocket:
await send(
{"type": "websocket.accept", "subprotocol": websocket.subprotocol}
)
async def forward_to_client():
"""for each message received from the frontend app, send it to the client"""
while True:
msg = await websocket.recv()
client_msg: dict = {"type": "websocket.send"}
if isinstance(msg, str):
client_msg["text"] = msg
else:
client_msg["bytes"] = msg
await send(client_msg)
async def forward_to_server():
"""for each message received from the client, send it to the frontend app"""
while True:
message = await receive()
if message["type"] == "websocket.receive":
if "bytes" in message:
await websocket.send(message["bytes"], text=False)
else:
await websocket.send(message.get("text", ""), text=True)
elif message["type"] == "websocket.disconnect":
await websocket.close()
return
await asyncio.gather(forward_to_client(), forward_to_server())
async def http_proxy(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
Proxy HTTP requests to the frontend application by first letting the wrapped
application handle the request. If it returns a 404, forward the request to the
frontend dev server instead.
:param scope: The ASGI scope.
:param receive: The ASGI receive callable.
:param send: The ASGI send callable.
:return: None
"""
response_messages: list[dict] = []
status_code: int | None = None
async def send_wrapper(message):
nonlocal status_code
if message["type"] == "http.response.start":
status_code = message["status"]
response_messages.append(message)
request_body_chunks: list[bytes] = []
request_more_body = False
async def receive_wrapper():
nonlocal request_more_body
message = await receive()
if message["type"] == "http.request":
request_body_chunks.append(message.get("body", b""))
request_more_body = message.get("more_body", False)
return message
await self.app(scope, receive_wrapper, send_wrapper)
if status_code != 404:
for message in response_messages:
await send(message)
return
# Ensure the entire body is captured even if the downstream app did not read it
while request_more_body:
message = await receive()
if message["type"] != "http.request":
continue
request_body_chunks.append(message.get("body", b""))
request_more_body = message.get("more_body", False)
body = b"".join(request_body_chunks)
headers = {k.decode(): v.decode() for k, v in scope["headers"]} # type: ignore
query_string = scope.get("query_string", b"") # type: ignore
if query_string:
query_part = f"?{query_string.decode()}"
else:
query_part = ""
url = f"http://localhost:{self.port}{scope['path']}{query_part}" # type: ignore
async with httpx.AsyncClient() as client:
response = await client.request(
method=scope["method"], # type: ignore
url=url,
headers=headers,
content=body,
)
await send(
{
"type": "http.response.start",
"status": response.status_code,
"headers": [
(k.encode(), v.encode()) for k, v in response.headers.items()
],
}
)
await send(
{
"type": "http.response.body",
"body": response.content,
"more_body": False,
}
)
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
ASGI entry point.
Dispatch to the appropriate handler based on the scope type.
:param scope: The ASGI scope.
:param receive: The ASGI receive callable.
:param send: The ASGI send callable.
:return: None
"""
if scope["type"] == "lifespan":
await self.lifespan(scope, receive, send)
elif scope["type"] == "websocket":
await self.websocket_proxy(scope, receive, send)
elif scope["type"] == "http":
await self.http_proxy(scope, receive, send)
else:
await self.app(scope, receive, send)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment