Created
April 10, 2025 17:41
-
-
Save qatoqat/28fdf2a976e288358d6988b7aef4d7d4 to your computer and use it in GitHub Desktop.
Python static web server with hot reload using built-in library (pypy compatible)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import filecmp | |
| import http.server | |
| import mimetypes | |
| import os | |
| import shutil | |
| import socketserver | |
| import time | |
| from subprocess import run | |
| from threading import Thread | |
| # -- config -- | |
| FRONTEND_SRC_DIR = "frontend" | |
| FRONTEND_DEST_DIR = "public" | |
| ADDRESS = "127.0.0.1" | |
| PORT = 8000 | |
| HOT_RELOAD = True | |
| WASM_MODULE_NAME = "wasm_module" | |
| WASM_PKG_SRC_DIR = "pkg" | |
| BG_WASM_SRC_PATH = f"{WASM_PKG_SRC_DIR}/{WASM_MODULE_NAME}_bg.wasm" | |
| WASM_JS_SRC_PATH = f"{WASM_PKG_SRC_DIR}/{WASM_MODULE_NAME}.js" | |
| BG_WASM_DEST_PATH = f"{FRONTEND_DEST_DIR}/assets/wasm/{WASM_MODULE_NAME}_bg.wasm" | |
| WASM_JS_DEST_PATH = f"{FRONTEND_DEST_DIR}/assets/js/{WASM_MODULE_NAME}.js" | |
| WASM_PKG_DEST_FILES = [BG_WASM_DEST_PATH, WASM_JS_DEST_PATH] | |
| mimetypes.add_type('text/plain', '.toml') | |
| last_modified = str(time.time()) | |
| server_ref: socketserver.TCPServer | None = None | |
| # -- helpers -- | |
| hot_reload_script = """ | |
| <script> | |
| console.log("Hot reload is enabled"); | |
| let reloadInterval; | |
| let isPageVisible = true; | |
| function startPolling() { | |
| reloadInterval = setInterval(() => { | |
| fetch("/__ping__") | |
| .then(res => res.text()) | |
| .then(ts => { | |
| if (window.__last_reload_ts && window.__last_reload_ts !== ts) { | |
| location.reload(); | |
| } | |
| window.__last_reload_ts = ts; | |
| }) | |
| .catch(() => {}); | |
| }, 2000); | |
| } | |
| function stopPolling() { | |
| if (reloadInterval) { | |
| clearInterval(reloadInterval); | |
| reloadInterval = null; | |
| console.log("Hot reload is paused"); | |
| } | |
| } | |
| document.addEventListener('visibilitychange', () => { | |
| if (document.visibilityState === 'visible') { | |
| if (!reloadInterval) { | |
| startPolling(); | |
| } | |
| } else { | |
| stopPolling(); | |
| } | |
| }); | |
| if (document.visibilityState === 'visible') { | |
| startPolling(); | |
| } | |
| </script> | |
| """ | |
| # File mirroring functions (no changes) | |
| def mirror_file(src_path, dest_path): | |
| if os.path.exists(dest_path): | |
| if filecmp.cmp(src_path, dest_path, shallow=True): | |
| return | |
| shutil.copy2(src_path, dest_path) | |
| print(f"Copied file: {src_path} to {dest_path}") | |
| def mirror_directory(src, dest): | |
| if not os.path.exists(src): | |
| raise FileNotFoundError(f"Source directory {src} does not exist.") | |
| os.makedirs(dest, exist_ok=True) | |
| src_files = set(os.listdir(src)) | |
| for item in src_files: | |
| src_path = f"{src}/{item}" | |
| dest_path = f"{dest}/{item}" | |
| if os.path.isdir(src_path): | |
| mirror_directory(src_path, dest_path) | |
| else: | |
| mirror_file(src_path, dest_path) | |
| dest_files = set(os.listdir(dest)) | |
| files_to_remove = dest_files - src_files | |
| for item in files_to_remove: | |
| dest_path = f"{dest}/{item}" | |
| if os.path.isfile(dest_path) and dest_path in WASM_PKG_DEST_FILES: | |
| continue | |
| if os.path.isdir(dest_path): | |
| shutil.rmtree(dest_path) | |
| print(f"Removed directory: {dest_path}") | |
| else: | |
| os.remove(dest_path) | |
| print(f"Removed file: {dest_path}") | |
| def sleep_cmd(seconds): | |
| # if system() == "Windows": | |
| # run(f"ping 127.0.0.1 -n {seconds} > nul", shell=True) | |
| # else: | |
| # run(f"sleep {seconds}", shell=True) | |
| run(f"sleep {seconds}", shell=True) # mingw sleep on windows | |
| def get_last_modified_times(directory): | |
| return {os.path.join(directory, f): os.path.getmtime(os.path.join(directory, f)) for f in os.listdir(directory)} | |
| def mirror_server_files(): | |
| mirror_directory(FRONTEND_SRC_DIR, FRONTEND_DEST_DIR) | |
| mirror_file(WASM_JS_SRC_PATH, WASM_JS_DEST_PATH) | |
| mirror_file(BG_WASM_SRC_PATH, BG_WASM_DEST_PATH) | |
| def start_server_with_hot_reload(): | |
| mirror_server_files() | |
| last_modified_times = {} | |
| last_modified_times.update(get_last_modified_times(FRONTEND_SRC_DIR)) | |
| last_modified_times.update(get_last_modified_times(WASM_PKG_SRC_DIR)) | |
| update_timestamp() | |
| print("Hot reload is enabled") | |
| start_server_thread() | |
| while True: | |
| if server_ref is None: | |
| print("Server thread has not started yet. Waiting for 5 seconds ...") | |
| sleep_cmd(5) | |
| continue | |
| current_modified_times = {} | |
| current_modified_times.update(get_last_modified_times(FRONTEND_SRC_DIR)) | |
| current_modified_times.update(get_last_modified_times(WASM_PKG_SRC_DIR)) | |
| if current_modified_times != last_modified_times: | |
| last_modified_times = current_modified_times | |
| print("Files have changed, restarting server ...") | |
| stop_server() | |
| mirror_server_files() | |
| start_server_thread() | |
| update_timestamp() | |
| sleep_cmd(2) | |
| # Update timestamp for hot reload | |
| def update_timestamp(): | |
| global last_modified | |
| last_modified = str(time.time()) | |
| # Custom request handler with hot reload logic | |
| class CustomHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, directory=FRONTEND_DEST_DIR, **kwargs) | |
| def do_GET(self): | |
| if self.path == "/__ping__": | |
| self.send_response(200) | |
| self.send_header("Content-type", "text/plain") | |
| self.end_headers() | |
| global last_modified | |
| self.wfile.write(last_modified.encode()) | |
| else: | |
| super().do_GET() | |
| def send_head(self): | |
| path = self.translate_path(self.path) | |
| if os.path.isdir(path): | |
| for index in ("index.html", "index.htm"): | |
| index_path = os.path.join(path, index) | |
| if os.path.exists(index_path): | |
| path = index_path | |
| break | |
| if os.path.isfile(path): | |
| mime_type, _ = mimetypes.guess_type(path) | |
| if mime_type == "text/html": | |
| with open(path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| if "</body>" in content: | |
| content = content.replace("</body>", f"{hot_reload_script}</body>") | |
| else: | |
| content += hot_reload_script | |
| encoded = content.encode('utf-8') | |
| self.send_response(200) | |
| self.send_header("Content-type", "text/html; charset=utf-8") | |
| self.send_header("Content-Length", str(len(encoded))) | |
| self.end_headers() | |
| self.wfile.write(encoded) | |
| return None | |
| return super().send_head() | |
| def log_message(self, format, *args): | |
| if self.path == "/__ping__": | |
| return | |
| super().log_message(format, *args) | |
| def start_server(): | |
| with socketserver.TCPServer((ADDRESS, PORT), CustomHTTPRequestHandler) as httpd: | |
| print(f"Serving at http://{ADDRESS}:{PORT}") | |
| global server_ref | |
| server_ref = httpd | |
| httpd.serve_forever() | |
| def start_server_thread(): | |
| server_thread = Thread(target=start_server, daemon=True) | |
| server_thread.start() | |
| def stop_server(): | |
| global server_ref | |
| if server_ref: | |
| server_ref.shutdown() | |
| if __name__ == '__main__': | |
| start_server_with_hot_reload() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment