Skip to content

Instantly share code, notes, and snippets.

@giannitedesco
Created January 9, 2026 12:03
Show Gist options
  • Select an option

  • Save giannitedesco/aefd2be1243424a506d9bd5487bf168a to your computer and use it in GitHub Desktop.

Select an option

Save giannitedesco/aefd2be1243424a506d9bd5487bf168a to your computer and use it in GitHub Desktop.
Use minotaur to consume files from a spool dir
from pathlib import Path
from minotaur import Inotify, Mask
import logging
__all__ = (
'Consumer',
)
class Consumer:
__slots__ = (
'_spool',
)
_log = logging.getLogger(__name__)
_in_flags = Mask.CLOSE_WRITE | Mask.DELETE_SELF | Mask.MOVED_TO
def __init__(self, spool_dir: Path):
self._spool = spool_dir
self._create_spool()
self._scan_dir()
def _process(self, p: Path):
pass
def _create_spool(self):
self._spool.mkdir(parents=True, exist_ok=True)
def _scan_dir(self):
for p in self._spool.iterdir():
self._process(p)
async def run(self):
with Inotify(blocking=False) as n:
wd = n.add_watch(self._spool, self._in_flags)
async for evt in n:
if evt.mask & Mask.DELETE_SELF:
self._log.info('Spool dir delete')
return
self._process(self._spool / evt.name)
~
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment