Skip to content

Instantly share code, notes, and snippets.

@jrialland
Last active October 27, 2025 13:49
Show Gist options
  • Select an option

  • Save jrialland/cb2261c94adc01eb553884d10786742d to your computer and use it in GitHub Desktop.

Select an option

Save jrialland/cb2261c94adc01eb553884d10786742d to your computer and use it in GitHub Desktop.
a fastapi example that tails -f a file or a TextIO in a web app
from fastapi import FastAPI, Path
from fastapi.routing import APIRouter
from fastapi.responses import StreamingResponse
from starlette.responses import RedirectResponse, HTMLResponse
import asyncio
import json
import logging
import time
from typing import TextIO
from pathlib import Path
from contextlib import nullcontext
from threading import Thread
app = FastAPI()
def tail_file(
file: str | Path | TextIO, name: str | None = None, n: int = 100
) -> APIRouter:
if name is None:
if isinstance(file, (str, Path)):
file_path = Path(file).name
else:
raise ValueError("name must be provided when file is a TextIO")
else:
file_path = name
buffer = []
subscribers = []
async def read_tail_coro():
if isinstance(file, TextIO):
f = nullcontext(file)
else:
f = open(file, "r", encoding="utf-8", errors="ignore", buffering=n)
with f:
while True:
line = f.readline()
if not line:
await asyncio.sleep(0.1)
continue
buffer.append(line)
if len(buffer) > n:
buffer.pop(0)
for subscriber in subscribers:
await subscriber(line)
router = APIRouter()
task_created = False
@router.get(f"/tail/{file_path}")
async def get_tail() -> StreamingResponse:
nonlocal task_created
# Create the task only when the first request comes in
if not task_created:
asyncio.create_task(read_tail_coro())
task_created = True
async def event_generator():
# yield the last n lines from buffer
for line in buffer:
yield f"data: {json.dumps(line)}\n\n"
queue = asyncio.Queue()
subscribers.append(queue.put)
try:
while True:
line = await queue.get()
yield f"data: {json.dumps(line)}\n\n"
finally:
subscribers.remove(queue.put)
return StreamingResponse(event_generator(), media_type="text/event-stream")
return router
app.include_router(tail_file("app.log", n=10))
@app.get("/")
async def read_root():
# redirect to /index.html
return RedirectResponse(url="/index.html")
@app.get("/index.html")
async def read_index():
html = """
<html>
<head>
<title>Index Page</title>
<script>
eventSource = new EventSource("/tail/app.log");
eventSource.onmessage = function(event) {
const logEntry = JSON.parse(event.data);
const logContainer = document.getElementById("log");
logContainer.innerHTML += `<div>${logEntry}</div>`;
};
</script>
</head>
<body>
<h1>Welcome to the Index Page!</h1>
<div id="log"></div>
</body>
</html>
"""
return HTMLResponse(content=html, status_code=200)
# test : write a log every 10 seconds
def write_log():
while True:
logging.info("Writing log entry")
time.sleep(10)
log_thread = Thread(target=write_log, daemon=True)
log_thread.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment