Created
November 22, 2025 17:13
-
-
Save iwconfig/1f5ee5e64d63673188983fa24d3a9a48 to your computer and use it in GitHub Desktop.
Pure-python inotify
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Minimal pure-Python inotify example (no external packages) | |
| import ctypes | |
| import ctypes.util | |
| import os | |
| import struct | |
| import sys | |
| import errno | |
| import selectors | |
| # inotify constants (from <linux/inotify.h>) | |
| IN_ACCESS = 0x00000001 | |
| IN_MODIFY = 0x00000002 | |
| IN_ATTRIB = 0x00000004 | |
| IN_CLOSE_WRITE = 0x00000008 | |
| IN_CLOSE_NOWRITE = 0x00000010 | |
| IN_OPEN = 0x00000020 | |
| IN_MOVED_FROM = 0x00000040 | |
| IN_MOVED_TO = 0x00000080 | |
| IN_CREATE = 0x00000100 | |
| IN_DELETE = 0x00000200 | |
| IN_DELETE_SELF = 0x00000400 | |
| IN_MOVE_SELF = 0x00000800 | |
| IN_ONLYDIR = 0x01000000 | |
| IN_DONT_FOLLOW = 0x02000000 | |
| IN_ONESHOT = 0x80000000 | |
| IN_ISDIR = 0x40000000 | |
| # flags for inotify_init1 | |
| IN_NONBLOCK = os.O_NONBLOCK | |
| IN_CLOEXEC = os.O_CLOEXEC | |
| libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True) | |
| # prefer inotify_init1 if available | |
| try: | |
| inotify_init1 = libc.inotify_init1 | |
| inotify_init1.argtypes = [ctypes.c_int] | |
| inotify_init1.restype = ctypes.c_int | |
| except AttributeError: | |
| inotify_init1 = None | |
| inotify_add_watch = libc.inotify_add_watch | |
| inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32] | |
| inotify_add_watch.restype = ctypes.c_int | |
| inotify_rm_watch = libc.inotify_rm_watch | |
| inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int] | |
| inotify_rm_watch.restype = ctypes.c_int | |
| def inotify_init(flags=0): | |
| if inotify_init1: | |
| fd = inotify_init1(flags) | |
| if fd == -1: | |
| e = ctypes.get_errno() | |
| raise OSError(e, os.strerror(e)) | |
| return fd | |
| # fallback to inotify_init (no flags) | |
| init = libc.inotify_init | |
| init.argtypes = [] | |
| init.restype = ctypes.c_int | |
| fd = init() | |
| if fd == -1: | |
| e = ctypes.get_errno() | |
| raise OSError(e, os.strerror(e)) | |
| # set flags manually if requested | |
| if flags: | |
| current = fcntl.fcntl(fd, fcntl.F_GETFL) | |
| fcntl.fcntl(fd, fcntl.F_SETFL, current | (flags & IN_NONBLOCK)) | |
| return fd | |
| # inotify_event struct: int wd; uint32_t mask; uint32_t cookie; uint32_t len; char name[len]; | |
| EVENT_STRUCT_FMT = 'iIII' # wd, mask, cookie, len | |
| EVENT_STRUCT_SIZE = struct.calcsize(EVENT_STRUCT_FMT) | |
| def parse_events(buf): | |
| i = 0 | |
| while i + EVENT_STRUCT_SIZE <= len(buf): | |
| wd, mask, cookie, name_len = struct.unpack_from(EVENT_STRUCT_FMT, buf, i) | |
| i += EVENT_STRUCT_SIZE | |
| name = b'' | |
| if name_len: | |
| name = buf[i:i + name_len].split(b'\x00', 1)[0] | |
| i += name_len | |
| yield (wd, mask, cookie, name.decode(errors='replace')) | |
| def mask_to_names(mask): | |
| flags = [] | |
| if mask & IN_ACCESS: flags.append('IN_ACCESS') | |
| if mask & IN_MODIFY: flags.append('IN_MODIFY') | |
| if mask & IN_ATTRIB: flags.append('IN_ATTRIB') | |
| if mask & IN_CLOSE_WRITE: flags.append('IN_CLOSE_WRITE') | |
| if mask & IN_CLOSE_NOWRITE: flags.append('IN_CLOSE_NOWRITE') | |
| if mask & IN_OPEN: flags.append('IN_OPEN') | |
| if mask & IN_MOVED_FROM: flags.append('IN_MOVED_FROM') | |
| if mask & IN_MOVED_TO: flags.append('IN_MOVED_TO') | |
| if mask & IN_CREATE: flags.append('IN_CREATE') | |
| if mask & IN_DELETE: flags.append('IN_DELETE') | |
| if mask & IN_DELETE_SELF: flags.append('IN_DELETE_SELF') | |
| if mask & IN_MOVE_SELF: flags.append('IN_MOVE_SELF') | |
| if mask & IN_ISDIR: flags.append('IN_ISDIR') | |
| return flags | |
| def main(path): | |
| # blocking read example (no O_NONBLOCK) | |
| fd = inotify_init(IN_CLOEXEC) | |
| try: | |
| wd = inotify_add_watch(fd, path.encode(), IN_CREATE | IN_DELETE | IN_MODIFY | IN_MOVED_FROM | IN_MOVED_TO | IN_ISDIR) | |
| if wd == -1: | |
| e = ctypes.get_errno() | |
| raise OSError(e, os.strerror(e)) | |
| print(f'Watching {path!r} (wd={wd}), fd={fd}') | |
| # use a selector to block until fd is readable | |
| sel = selectors.DefaultSelector() | |
| sel.register(fd, selectors.EVENT_READ) | |
| try: | |
| while True: | |
| events = sel.select() # blocks | |
| for key, _ in events: | |
| # read available bytes | |
| try: | |
| buf = os.read(fd, 4096) | |
| except OSError as e: | |
| if e.errno == errno.EINTR: | |
| continue | |
| raise | |
| if not buf: | |
| # EOF | |
| print('inotify fd closed') | |
| return | |
| for wd_, mask, cookie, name in parse_events(buf): | |
| names = mask_to_names(mask) | |
| print(f'wd={wd_} mask={mask} ({",".join(names)}) cookie={cookie} name={name!r}') | |
| except KeyboardInterrupt: | |
| print('Interrupted, exiting') | |
| finally: | |
| if inotify_rm_watch(fd, wd) == -1: | |
| e = ctypes.get_errno() | |
| # ignore if already removed | |
| os.close(fd) | |
| except Exception: | |
| os.close(fd) | |
| raise | |
| if __name__ == '__main__': | |
| if len(sys.argv) != 2: | |
| print(f'Usage: {sys.argv[0]} PATH') | |
| sys.exit(2) | |
| main(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment