Last active
February 18, 2025 16:42
-
-
Save Kwieeciol/91895fbdc534bf8b282eea20c1495b6e to your computer and use it in GitHub Desktop.
Simple class to asynchronously poll from a file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import Self, Awaitable | |
| import os | |
| import asyncio | |
| import aiofiles # type: ignore # pip install aiofiles | |
| from pathlib import Path | |
| class FilePoll: | |
| """Returns the newly added lines (not all changes)""" | |
| def __init__( | |
| self, filename: os.PathLike | str, *, interval: float = 1.0 | |
| ) -> None: | |
| self._queue: asyncio.Queue[list[str]] = asyncio.Queue() | |
| self._last_pos = 0 | |
| self._filename = Path(filename) | |
| self._running = True | |
| if not self._filename.exists(): | |
| raise ValueError('Filename does not exist.') | |
| self._interval = interval | |
| self._task: asyncio.Task | None = None | |
| async def read_file(self) -> tuple[list[str], int]: | |
| async with aiofiles.open(self._filename, 'r') as f: | |
| await f.seek(self._last_pos) | |
| lines = await f.readlines() | |
| return lines, await f.tell() | |
| async def poll(self) -> None: | |
| while self._running: | |
| lines, self._last_pos = await self.read_file() | |
| if lines: | |
| await self._queue.put(lines) | |
| await asyncio.sleep(self._interval) | |
| def wait(self) -> Awaitable[list[str]]: | |
| return self._queue.get() | |
| async def stop(self) -> None: | |
| if self._task is None: | |
| raise RuntimeError('No running poll') | |
| self._running = False | |
| self._task.cancel() | |
| def start(self) -> None: | |
| if self._task is not None and self._running: | |
| raise RuntimeError('Poll is already running') | |
| self._task = asyncio.create_task(self.poll()) | |
| def __aiter__(self) -> Self: | |
| self.start() | |
| return self | |
| async def __anext__(self) -> list[str]: | |
| return await self._queue.get() | |
| async def __aexit__(self) -> None: | |
| await self.stop() | |
| # USAGE | |
| async def poll(): | |
| path = 'test.txt' | |
| poll = FilePoll(path) | |
| # using async interator | |
| async for lines in poll: | |
| for line in lines: | |
| print(line) | |
| # using .wait() | |
| path = 'test.txt' | |
| poll = FilePoll(path) | |
| poll.start() | |
| while True: | |
| lines = await poll.wait() | |
| for line in lines: | |
| print(line) | |
| if line == 'exit': | |
| break | |
| await poll.stop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment