Skip to content

Instantly share code, notes, and snippets.

@Kwieeciol
Last active February 18, 2025 16:42
Show Gist options
  • Select an option

  • Save Kwieeciol/91895fbdc534bf8b282eea20c1495b6e to your computer and use it in GitHub Desktop.

Select an option

Save Kwieeciol/91895fbdc534bf8b282eea20c1495b6e to your computer and use it in GitHub Desktop.
Simple class to asynchronously poll from a file
from typing import Self, Awaitable
import os
import asyncio
import aiofiles # type: ignore # pip install aiofiles
from pathlib import Path
class FilePoll:
"""Returns the newly added lines (not all changes)"""
def __init__(
self, filename: os.PathLike | str, *, interval: float = 1.0
) -> None:
self._queue: asyncio.Queue[list[str]] = asyncio.Queue()
self._last_pos = 0
self._filename = Path(filename)
self._running = True
if not self._filename.exists():
raise ValueError('Filename does not exist.')
self._interval = interval
self._task: asyncio.Task | None = None
async def read_file(self) -> tuple[list[str], int]:
async with aiofiles.open(self._filename, 'r') as f:
await f.seek(self._last_pos)
lines = await f.readlines()
return lines, await f.tell()
async def poll(self) -> None:
while self._running:
lines, self._last_pos = await self.read_file()
if lines:
await self._queue.put(lines)
await asyncio.sleep(self._interval)
def wait(self) -> Awaitable[list[str]]:
return self._queue.get()
async def stop(self) -> None:
if self._task is None:
raise RuntimeError('No running poll')
self._running = False
self._task.cancel()
def start(self) -> None:
if self._task is not None and self._running:
raise RuntimeError('Poll is already running')
self._task = asyncio.create_task(self.poll())
def __aiter__(self) -> Self:
self.start()
return self
async def __anext__(self) -> list[str]:
return await self._queue.get()
async def __aexit__(self) -> None:
await self.stop()
# USAGE
async def poll():
path = 'test.txt'
poll = FilePoll(path)
# using async interator
async for lines in poll:
for line in lines:
print(line)
# using .wait()
path = 'test.txt'
poll = FilePoll(path)
poll.start()
while True:
lines = await poll.wait()
for line in lines:
print(line)
if line == 'exit':
break
await poll.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment