Skip to content

Instantly share code, notes, and snippets.

@iwconfig
Created November 22, 2025 17:13
Show Gist options
  • Select an option

  • Save iwconfig/1f5ee5e64d63673188983fa24d3a9a48 to your computer and use it in GitHub Desktop.

Select an option

Save iwconfig/1f5ee5e64d63673188983fa24d3a9a48 to your computer and use it in GitHub Desktop.
Pure-python inotify
#!/usr/bin/env python3
# Minimal pure-Python inotify example (no external packages)
import ctypes
import ctypes.util
import os
import struct
import sys
import errno
import selectors
# inotify constants (from <linux/inotify.h>)
IN_ACCESS = 0x00000001
IN_MODIFY = 0x00000002
IN_ATTRIB = 0x00000004
IN_CLOSE_WRITE = 0x00000008
IN_CLOSE_NOWRITE = 0x00000010
IN_OPEN = 0x00000020
IN_MOVED_FROM = 0x00000040
IN_MOVED_TO = 0x00000080
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200
IN_DELETE_SELF = 0x00000400
IN_MOVE_SELF = 0x00000800
IN_ONLYDIR = 0x01000000
IN_DONT_FOLLOW = 0x02000000
IN_ONESHOT = 0x80000000
IN_ISDIR = 0x40000000
# flags for inotify_init1
IN_NONBLOCK = os.O_NONBLOCK
IN_CLOEXEC = os.O_CLOEXEC
libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
# prefer inotify_init1 if available
try:
inotify_init1 = libc.inotify_init1
inotify_init1.argtypes = [ctypes.c_int]
inotify_init1.restype = ctypes.c_int
except AttributeError:
inotify_init1 = None
inotify_add_watch = libc.inotify_add_watch
inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32]
inotify_add_watch.restype = ctypes.c_int
inotify_rm_watch = libc.inotify_rm_watch
inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
inotify_rm_watch.restype = ctypes.c_int
def inotify_init(flags=0):
if inotify_init1:
fd = inotify_init1(flags)
if fd == -1:
e = ctypes.get_errno()
raise OSError(e, os.strerror(e))
return fd
# fallback to inotify_init (no flags)
init = libc.inotify_init
init.argtypes = []
init.restype = ctypes.c_int
fd = init()
if fd == -1:
e = ctypes.get_errno()
raise OSError(e, os.strerror(e))
# set flags manually if requested
if flags:
current = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, current | (flags & IN_NONBLOCK))
return fd
# inotify_event struct: int wd; uint32_t mask; uint32_t cookie; uint32_t len; char name[len];
EVENT_STRUCT_FMT = 'iIII' # wd, mask, cookie, len
EVENT_STRUCT_SIZE = struct.calcsize(EVENT_STRUCT_FMT)
def parse_events(buf):
i = 0
while i + EVENT_STRUCT_SIZE <= len(buf):
wd, mask, cookie, name_len = struct.unpack_from(EVENT_STRUCT_FMT, buf, i)
i += EVENT_STRUCT_SIZE
name = b''
if name_len:
name = buf[i:i + name_len].split(b'\x00', 1)[0]
i += name_len
yield (wd, mask, cookie, name.decode(errors='replace'))
def mask_to_names(mask):
flags = []
if mask & IN_ACCESS: flags.append('IN_ACCESS')
if mask & IN_MODIFY: flags.append('IN_MODIFY')
if mask & IN_ATTRIB: flags.append('IN_ATTRIB')
if mask & IN_CLOSE_WRITE: flags.append('IN_CLOSE_WRITE')
if mask & IN_CLOSE_NOWRITE: flags.append('IN_CLOSE_NOWRITE')
if mask & IN_OPEN: flags.append('IN_OPEN')
if mask & IN_MOVED_FROM: flags.append('IN_MOVED_FROM')
if mask & IN_MOVED_TO: flags.append('IN_MOVED_TO')
if mask & IN_CREATE: flags.append('IN_CREATE')
if mask & IN_DELETE: flags.append('IN_DELETE')
if mask & IN_DELETE_SELF: flags.append('IN_DELETE_SELF')
if mask & IN_MOVE_SELF: flags.append('IN_MOVE_SELF')
if mask & IN_ISDIR: flags.append('IN_ISDIR')
return flags
def main(path):
# blocking read example (no O_NONBLOCK)
fd = inotify_init(IN_CLOEXEC)
try:
wd = inotify_add_watch(fd, path.encode(), IN_CREATE | IN_DELETE | IN_MODIFY | IN_MOVED_FROM | IN_MOVED_TO | IN_ISDIR)
if wd == -1:
e = ctypes.get_errno()
raise OSError(e, os.strerror(e))
print(f'Watching {path!r} (wd={wd}), fd={fd}')
# use a selector to block until fd is readable
sel = selectors.DefaultSelector()
sel.register(fd, selectors.EVENT_READ)
try:
while True:
events = sel.select() # blocks
for key, _ in events:
# read available bytes
try:
buf = os.read(fd, 4096)
except OSError as e:
if e.errno == errno.EINTR:
continue
raise
if not buf:
# EOF
print('inotify fd closed')
return
for wd_, mask, cookie, name in parse_events(buf):
names = mask_to_names(mask)
print(f'wd={wd_} mask={mask} ({",".join(names)}) cookie={cookie} name={name!r}')
except KeyboardInterrupt:
print('Interrupted, exiting')
finally:
if inotify_rm_watch(fd, wd) == -1:
e = ctypes.get_errno()
# ignore if already removed
os.close(fd)
except Exception:
os.close(fd)
raise
if __name__ == '__main__':
if len(sys.argv) != 2:
print(f'Usage: {sys.argv[0]} PATH')
sys.exit(2)
main(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment