Last active
October 27, 2025 13:49
-
-
Save jrialland/cb2261c94adc01eb553884d10786742d to your computer and use it in GitHub Desktop.
a fastapi example that tails -f a file or a TextIO in a web app
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from fastapi import FastAPI, Path | |
| from fastapi.routing import APIRouter | |
| from fastapi.responses import StreamingResponse | |
| from starlette.responses import RedirectResponse, HTMLResponse | |
| import asyncio | |
| import json | |
| import logging | |
| import time | |
| from typing import TextIO | |
| from pathlib import Path | |
| from contextlib import nullcontext | |
| from threading import Thread | |
| app = FastAPI() | |
| def tail_file( | |
| file: str | Path | TextIO, name: str | None = None, n: int = 100 | |
| ) -> APIRouter: | |
| if name is None: | |
| if isinstance(file, (str, Path)): | |
| file_path = Path(file).name | |
| else: | |
| raise ValueError("name must be provided when file is a TextIO") | |
| else: | |
| file_path = name | |
| buffer = [] | |
| subscribers = [] | |
| async def read_tail_coro(): | |
| if isinstance(file, TextIO): | |
| f = nullcontext(file) | |
| else: | |
| f = open(file, "r", encoding="utf-8", errors="ignore", buffering=n) | |
| with f: | |
| while True: | |
| line = f.readline() | |
| if not line: | |
| await asyncio.sleep(0.1) | |
| continue | |
| buffer.append(line) | |
| if len(buffer) > n: | |
| buffer.pop(0) | |
| for subscriber in subscribers: | |
| await subscriber(line) | |
| router = APIRouter() | |
| task_created = False | |
| @router.get(f"/tail/{file_path}") | |
| async def get_tail() -> StreamingResponse: | |
| nonlocal task_created | |
| # Create the task only when the first request comes in | |
| if not task_created: | |
| asyncio.create_task(read_tail_coro()) | |
| task_created = True | |
| async def event_generator(): | |
| # yield the last n lines from buffer | |
| for line in buffer: | |
| yield f"data: {json.dumps(line)}\n\n" | |
| queue = asyncio.Queue() | |
| subscribers.append(queue.put) | |
| try: | |
| while True: | |
| line = await queue.get() | |
| yield f"data: {json.dumps(line)}\n\n" | |
| finally: | |
| subscribers.remove(queue.put) | |
| return StreamingResponse(event_generator(), media_type="text/event-stream") | |
| return router | |
| app.include_router(tail_file("app.log", n=10)) | |
| @app.get("/") | |
| async def read_root(): | |
| # redirect to /index.html | |
| return RedirectResponse(url="/index.html") | |
| @app.get("/index.html") | |
| async def read_index(): | |
| html = """ | |
| <html> | |
| <head> | |
| <title>Index Page</title> | |
| <script> | |
| eventSource = new EventSource("/tail/app.log"); | |
| eventSource.onmessage = function(event) { | |
| const logEntry = JSON.parse(event.data); | |
| const logContainer = document.getElementById("log"); | |
| logContainer.innerHTML += `<div>${logEntry}</div>`; | |
| }; | |
| </script> | |
| </head> | |
| <body> | |
| <h1>Welcome to the Index Page!</h1> | |
| <div id="log"></div> | |
| </body> | |
| </html> | |
| """ | |
| return HTMLResponse(content=html, status_code=200) | |
| # test : write a log every 10 seconds | |
| def write_log(): | |
| while True: | |
| logging.info("Writing log entry") | |
| time.sleep(10) | |
| log_thread = Thread(target=write_log, daemon=True) | |
| log_thread.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment