Skip to content

Instantly share code, notes, and snippets.

@qatoqat
Created April 10, 2025 17:41
Show Gist options
  • Select an option

  • Save qatoqat/28fdf2a976e288358d6988b7aef4d7d4 to your computer and use it in GitHub Desktop.

Select an option

Save qatoqat/28fdf2a976e288358d6988b7aef4d7d4 to your computer and use it in GitHub Desktop.
Python static web server with hot reload using built-in library (pypy compatible)
import filecmp
import http.server
import mimetypes
import os
import shutil
import socketserver
import time
from subprocess import run
from threading import Thread
# -- config --
FRONTEND_SRC_DIR = "frontend"
FRONTEND_DEST_DIR = "public"
ADDRESS = "127.0.0.1"
PORT = 8000
HOT_RELOAD = True
WASM_MODULE_NAME = "wasm_module"
WASM_PKG_SRC_DIR = "pkg"
BG_WASM_SRC_PATH = f"{WASM_PKG_SRC_DIR}/{WASM_MODULE_NAME}_bg.wasm"
WASM_JS_SRC_PATH = f"{WASM_PKG_SRC_DIR}/{WASM_MODULE_NAME}.js"
BG_WASM_DEST_PATH = f"{FRONTEND_DEST_DIR}/assets/wasm/{WASM_MODULE_NAME}_bg.wasm"
WASM_JS_DEST_PATH = f"{FRONTEND_DEST_DIR}/assets/js/{WASM_MODULE_NAME}.js"
WASM_PKG_DEST_FILES = [BG_WASM_DEST_PATH, WASM_JS_DEST_PATH]
mimetypes.add_type('text/plain', '.toml')
last_modified = str(time.time())
server_ref: socketserver.TCPServer | None = None
# -- helpers --
hot_reload_script = """
<script>
console.log("Hot reload is enabled");
let reloadInterval;
let isPageVisible = true;
function startPolling() {
reloadInterval = setInterval(() => {
fetch("/__ping__")
.then(res => res.text())
.then(ts => {
if (window.__last_reload_ts && window.__last_reload_ts !== ts) {
location.reload();
}
window.__last_reload_ts = ts;
})
.catch(() => {});
}, 2000);
}
function stopPolling() {
if (reloadInterval) {
clearInterval(reloadInterval);
reloadInterval = null;
console.log("Hot reload is paused");
}
}
document.addEventListener('visibilitychange', () => {
if (document.visibilityState === 'visible') {
if (!reloadInterval) {
startPolling();
}
} else {
stopPolling();
}
});
if (document.visibilityState === 'visible') {
startPolling();
}
</script>
"""
# File mirroring functions (no changes)
def mirror_file(src_path, dest_path):
if os.path.exists(dest_path):
if filecmp.cmp(src_path, dest_path, shallow=True):
return
shutil.copy2(src_path, dest_path)
print(f"Copied file: {src_path} to {dest_path}")
def mirror_directory(src, dest):
if not os.path.exists(src):
raise FileNotFoundError(f"Source directory {src} does not exist.")
os.makedirs(dest, exist_ok=True)
src_files = set(os.listdir(src))
for item in src_files:
src_path = f"{src}/{item}"
dest_path = f"{dest}/{item}"
if os.path.isdir(src_path):
mirror_directory(src_path, dest_path)
else:
mirror_file(src_path, dest_path)
dest_files = set(os.listdir(dest))
files_to_remove = dest_files - src_files
for item in files_to_remove:
dest_path = f"{dest}/{item}"
if os.path.isfile(dest_path) and dest_path in WASM_PKG_DEST_FILES:
continue
if os.path.isdir(dest_path):
shutil.rmtree(dest_path)
print(f"Removed directory: {dest_path}")
else:
os.remove(dest_path)
print(f"Removed file: {dest_path}")
def sleep_cmd(seconds):
# if system() == "Windows":
# run(f"ping 127.0.0.1 -n {seconds} > nul", shell=True)
# else:
# run(f"sleep {seconds}", shell=True)
run(f"sleep {seconds}", shell=True) # mingw sleep on windows
def get_last_modified_times(directory):
return {os.path.join(directory, f): os.path.getmtime(os.path.join(directory, f)) for f in os.listdir(directory)}
def mirror_server_files():
mirror_directory(FRONTEND_SRC_DIR, FRONTEND_DEST_DIR)
mirror_file(WASM_JS_SRC_PATH, WASM_JS_DEST_PATH)
mirror_file(BG_WASM_SRC_PATH, BG_WASM_DEST_PATH)
def start_server_with_hot_reload():
mirror_server_files()
last_modified_times = {}
last_modified_times.update(get_last_modified_times(FRONTEND_SRC_DIR))
last_modified_times.update(get_last_modified_times(WASM_PKG_SRC_DIR))
update_timestamp()
print("Hot reload is enabled")
start_server_thread()
while True:
if server_ref is None:
print("Server thread has not started yet. Waiting for 5 seconds ...")
sleep_cmd(5)
continue
current_modified_times = {}
current_modified_times.update(get_last_modified_times(FRONTEND_SRC_DIR))
current_modified_times.update(get_last_modified_times(WASM_PKG_SRC_DIR))
if current_modified_times != last_modified_times:
last_modified_times = current_modified_times
print("Files have changed, restarting server ...")
stop_server()
mirror_server_files()
start_server_thread()
update_timestamp()
sleep_cmd(2)
# Update timestamp for hot reload
def update_timestamp():
global last_modified
last_modified = str(time.time())
# Custom request handler with hot reload logic
class CustomHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=FRONTEND_DEST_DIR, **kwargs)
def do_GET(self):
if self.path == "/__ping__":
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
global last_modified
self.wfile.write(last_modified.encode())
else:
super().do_GET()
def send_head(self):
path = self.translate_path(self.path)
if os.path.isdir(path):
for index in ("index.html", "index.htm"):
index_path = os.path.join(path, index)
if os.path.exists(index_path):
path = index_path
break
if os.path.isfile(path):
mime_type, _ = mimetypes.guess_type(path)
if mime_type == "text/html":
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
if "</body>" in content:
content = content.replace("</body>", f"{hot_reload_script}</body>")
else:
content += hot_reload_script
encoded = content.encode('utf-8')
self.send_response(200)
self.send_header("Content-type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
self.wfile.write(encoded)
return None
return super().send_head()
def log_message(self, format, *args):
if self.path == "/__ping__":
return
super().log_message(format, *args)
def start_server():
with socketserver.TCPServer((ADDRESS, PORT), CustomHTTPRequestHandler) as httpd:
print(f"Serving at http://{ADDRESS}:{PORT}")
global server_ref
server_ref = httpd
httpd.serve_forever()
def start_server_thread():
server_thread = Thread(target=start_server, daemon=True)
server_thread.start()
def stop_server():
global server_ref
if server_ref:
server_ref.shutdown()
if __name__ == '__main__':
start_server_with_hot_reload()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment