Last active
December 9, 2025 06:12
-
-
Save geky/9e343e9f2500d71d8a13c51634e58f56 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # | |
| # A local watch multitool | |
| # | |
| import base64 | |
| import glob | |
| import hashlib | |
| import itertools as it | |
| import os | |
| import shlex | |
| import struct | |
| import subprocess as sp | |
| import sys | |
| import threading as th | |
| import time | |
| try: | |
| import inotify_simple | |
| except ModuleNotFoundError: | |
| inotify_simple = None | |
| try: | |
| import http.server as http_server | |
| except ModuleNotFoundError: | |
| http_server = None | |
| # javascript blob for live-reloading | |
| JS_LIVERELOAD = ''' | |
| <script> | |
| (function() { | |
| let ws = new WebSocket('ws://' + window.location.host); | |
| ws.onopen = function() { | |
| ws.send('hello!'); | |
| console.log('websocket connected'); | |
| } | |
| ws.onmessage = function(message) { | |
| console.log('websocket recv', message.data); | |
| if (message.data == 'reload') { | |
| window.location.reload(); | |
| } | |
| } | |
| })(); | |
| </script> | |
| ''' | |
| # simple logging/errors with timestamps | |
| def wristlog(level, message=None, **args): | |
| if message is None: | |
| level, message = 'info', level | |
| # ignore verbose unless requested | |
| if not args.get('verbose'): | |
| if level == 'verbose': | |
| return | |
| # ignore info if requested | |
| if args.get('quiet') and level == 'info': | |
| return | |
| # figure out what we should do for colors | |
| if args.get('color') == 'always': | |
| color = True | |
| elif args.get('color') == 'never': | |
| color = False | |
| else: | |
| color = sys.stdout.isatty() | |
| # log message | |
| now = time.time() | |
| print('%s[%s.%03d]%s ww: %s%s:%s %s' % ( | |
| '\x1b[1;30m' if color else '', | |
| time.strftime('%H:%M:%S', time.localtime(now)), | |
| (now % 1)*1000, | |
| '\x1b[m' if color else '', | |
| '\x1b[1;30m' if color and level == 'verbose' | |
| else '\x1b[1;33m' if color and level == 'warning' | |
| else '\x1b[1;31m' if color and level == 'error' | |
| else '', | |
| level, | |
| '\x1b[m' if color else '', | |
| message), | |
| file=(sys.stderr if level == 'error' else None)) | |
| def wristerror(level, message=None, code=1, **args): | |
| if message is None: | |
| level, message = 'error', level | |
| wristlog(level, message, **args) | |
| sys.exit(code) | |
| # glob paths | |
| def wristglob(paths, default=None, *, | |
| ignore=None, | |
| dereference=False): | |
| # default | |
| if paths is None: | |
| paths = default | |
| def glob_(paths): | |
| if isinstance(paths, str): | |
| paths = [paths] | |
| # expand globs | |
| paths_ = set() | |
| for path in paths: | |
| if '*' in path: | |
| for path_ in glob.iglob(path): | |
| # normalize | |
| path_ = os.path.relpath(path_) | |
| paths_.add(path_) | |
| else: | |
| # normalize | |
| path_ = os.path.relpath(path) | |
| paths_.add(path_) | |
| paths = paths_ | |
| return paths | |
| # expand paths | |
| paths = glob_(paths) | |
| # expand ignore paths | |
| if ignore is not None: | |
| ignore = glob_(ignore) | |
| else: | |
| ignore = set() | |
| # ignore ignore paths | |
| paths -= ignore | |
| # expand dirs recursively, unless they match an ignore path | |
| paths_ = paths | |
| while paths_: | |
| paths__ = set() | |
| for path in paths_: | |
| if os.path.isdir(path) and ( | |
| dereference or not os.path.islink(path)): | |
| paths__ |= glob_([os.path.join(path, '*')]) | |
| # ignore ignore paths | |
| paths__ -= ignore | |
| # add to paths | |
| paths |= paths__ | |
| # recurse | |
| paths_ = paths__ | |
| return paths | |
| # create inotify object for given path | |
| def wristwatch(watch=None, ignore=None, **args): | |
| if inotify_simple is None: | |
| wristerror('inotify_simple module not found?', **args) | |
| # figure out paths | |
| watch_ = wristglob(watch, ['.'], | |
| ignore=ignore, | |
| dereference=args.get('dereference')) | |
| # create the inotify object | |
| inotify = inotify_simple.INotify() | |
| # interesting events | |
| flags = (inotify_simple.flags.ATTRIB | |
| | inotify_simple.flags.CREATE | |
| | inotify_simple.flags.DELETE | |
| | inotify_simple.flags.DELETE_SELF | |
| | inotify_simple.flags.MODIFY | |
| | inotify_simple.flags.MOVED_FROM | |
| | inotify_simple.flags.MOVED_TO | |
| | inotify_simple.flags.MOVE_SELF) | |
| # watch paths | |
| for path in sorted(watch_): | |
| try: | |
| inotify.add_watch(path, flags) | |
| wristlog('verbose', 'watching %s' % path, **args) | |
| # ignore missing paths | |
| except FileNotFoundError: | |
| pass | |
| return inotify | |
| # run an optional command | |
| def wristrun(command, **args): | |
| # subprocess does most of the work | |
| wristlog('info', 'running %s' % ( | |
| ' '.join(shlex.quote(c) for c in command)), | |
| **args) | |
| try: | |
| p = sp.run(command) | |
| code = p.returncode | |
| # ignore missing paths | |
| except FileNotFoundError: | |
| code = -1 | |
| if args.get('exit_on_error') and code != 0: | |
| wristerror('verbose', 'command failed with %d' % code, | |
| code=code, | |
| **args) | |
| # host an optional webserver at a given address | |
| def wristserve(addr, **args): | |
| if http_server is None: | |
| wristerror('http module not found?', **args) | |
| # spin up an http server | |
| # why is this so funky | |
| class Handler(http_server.SimpleHTTPRequestHandler): | |
| protocol_version = 'HTTP/1.1' | |
| def end_headers(self): | |
| self.send_header("Cache-Control", "no-cache") | |
| super().end_headers() | |
| def ws_handshake(self): | |
| # upgrade to websocket | |
| key = self.headers['Sec-WebSocket-Key'] | |
| digest = str(base64.b64encode( | |
| hashlib.sha1( | |
| bytes(key, 'utf8') | |
| + b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11") | |
| .digest()), 'utf8') | |
| self.send_response(101, 'Switching Protocols') | |
| self.send_header('Upgrade', 'websocket') | |
| self.send_header('Connection', 'Upgrade') | |
| self.send_header('Sec-WebSocket-Accept', digest) | |
| self.end_headers() | |
| wristlog('verbose', 'ws %d connected' % id(self), | |
| **args) | |
| with httpd.lock: | |
| httpd.clients[id(self)] = self | |
| # spin | |
| try: | |
| while True: | |
| # parse op, note BufferedIOBase is greedy | |
| m_op = self.rfile.read(2) | |
| m_len = m_op[1] & 0x7f | |
| if m_len == 0x7e: | |
| m_len, = struct.unpack('<H', self.rfile.read(2)) | |
| elif m_len == 0x7f: | |
| m_len, = struct.unpack('<Q', self.rfile.read(8)) | |
| # parse mask | |
| m_mask = b'\0\0\0\0' | |
| if m_op[1] & 0x80: | |
| m_mask = self.rfile.read(4) | |
| # parse data | |
| m_data = bytes(m ^ b for m, b in zip( | |
| it.cycle(m_mask), | |
| self.rfile.read(m_len))) | |
| # recv | |
| self.ws_recv( | |
| m_op[0] & 0xf, | |
| m_data.decode('utf8', errors='backslashreplace')) | |
| # exit? | |
| if m_op[0] & 0x08: | |
| break | |
| # truncated message, ignore | |
| except IndexError: | |
| pass | |
| # disconnect | |
| finally: | |
| with httpd.lock: | |
| del httpd.clients[id(self)] | |
| wristlog('verbose', 'ws %d disconnected' % id(self), | |
| **args) | |
| def ws_recv(self, op, message): | |
| wristlog('verbose', 'ws %d recv 0x%x %s' % ( | |
| id(self), op, message), | |
| **args) | |
| def ws_send(self, op, message=None): | |
| # op=0x81 => text message | |
| if message is None: | |
| op, message = 0x81, op | |
| with httpd.lock: | |
| # write op | |
| if len(message) <= 0x7d: | |
| self.wfile.write(struct.pack('<BB', | |
| 0x80 | op, len(message))) | |
| elif len(message) <= 0xffff: | |
| self.wfile.write(struct.pack('<BBH', | |
| 0x80 | op, 0x7e, len(message))) | |
| else: | |
| self.wfile.write(struct.pack('<BBQ', | |
| 0x80 | op, 0x7f, len(message))) | |
| # write data | |
| if message: | |
| self.wfile.write(bytes(message, 'utf8')) | |
| wristlog('verbose', 'ws %d send 0x%x %s' % ( | |
| id(self), op, message), | |
| **args) | |
| def do_GET(self): | |
| try: | |
| # upgrade websocket? | |
| if self.headers.get('Upgrade') == 'websocket': | |
| self.ws_handshake() | |
| # livereload injection get? | |
| elif args.get('inject_livereload'): | |
| # convert to bytes | |
| payload = bytes(JS_LIVERELOAD.strip(), 'utf8') | |
| # intercept Content-Length | |
| is_html = False | |
| def send_header(self, key, value): | |
| nonlocal is_html | |
| if (key.lower() == 'content-type' | |
| and value.split(';', 1)[0] == 'text/html'): | |
| is_html = True | |
| elif (key.lower() == 'content-length' | |
| and is_html | |
| and value != '0'): | |
| value = str(int(value) + len(payload)) | |
| super().send_header(key, value) | |
| self.send_header = send_header.__get__(self) | |
| try: | |
| super().do_GET() | |
| finally: | |
| del self.send_header | |
| # inject payload | |
| if is_html: | |
| self.wfile.write(payload) | |
| # normal get | |
| else: | |
| super().do_GET() | |
| # ignore disconnected clients | |
| except BrokenPipeError: | |
| pass | |
| def handle(self): | |
| try: | |
| super().handle() | |
| # ignore disconnected clients | |
| except ConnectionResetError: | |
| pass | |
| def log_message(self, fmt, *args_): | |
| wristlog('verbose', fmt % args_, **args) | |
| httpd = http_server.ThreadingHTTPServer(addr, Handler) | |
| # some shared state for websocket broadcasts | |
| httpd.lock = th.RLock() | |
| httpd.clients = {} | |
| def broadcast(self, op, message=None): | |
| with self.lock: | |
| for _, client in self.clients.items(): | |
| client.ws_send(op, message) | |
| httpd.broadcast = broadcast.__get__(httpd) | |
| # run in a background thread | |
| httpd.thread = th.Thread( | |
| target=httpd.serve_forever, | |
| daemon=True) | |
| httpd.thread.start() | |
| wristlog('info', 'serving %s:%s' % (addr[0], addr[1]), **args) | |
| return httpd | |
| # entry point | |
| def main(command, *, | |
| watch=None, | |
| ignore=None, | |
| serve=None, | |
| **args): | |
| # just dump the livereload script? | |
| if args.get('help_livereload'): | |
| print(JS_LIVERELOAD.strip()) | |
| sys.exit(0) | |
| httpd = None | |
| try: | |
| while True: | |
| # create inotify object _before_ running any commands, this | |
| # gives us the best chance to catch changes | |
| inotify = wristwatch(watch, ignore, **args) | |
| # run command, if any | |
| if command: | |
| wristrun(command, **args) | |
| if serve: | |
| # start http server after first successful run | |
| if httpd is None: | |
| httpd = wristserve(serve, **args) | |
| # notify any waiting clients of changes | |
| httpd.broadcast('reload') | |
| # wait on inotify | |
| wristlog('verbose', 'wating...', **args) | |
| notifications = inotify.read() | |
| inotify.close() | |
| inotify = None | |
| wristlog('verbose', 'notified %s' % (notifications[0],), **args) | |
| # wait a bit to avoid flickering | |
| time.sleep(args.get('wait') | |
| if args.get('wait') is not None | |
| else 0.01) | |
| except KeyboardInterrupt: | |
| pass | |
| if __name__ == "__main__": | |
| import argparse | |
| import sys | |
| parser = argparse.ArgumentParser( | |
| description="A local watch multitool.", | |
| allow_abbrev=False) | |
| parser.add_argument( | |
| 'command', | |
| nargs=argparse.REMAINDER, | |
| help="Optional command to run on changes.") | |
| parser.add_argument( | |
| '-v', '--verbose', | |
| action='store_true', | |
| help="Show more info.") | |
| parser.add_argument( | |
| '-q', '--quiet', | |
| action='store_true', | |
| help="Show less info.") | |
| parser.add_argument( | |
| '--color', | |
| choices=['never', 'always', 'auto'], | |
| default='auto', | |
| help="When to use terminal colors. Defaults to 'auto'.") | |
| parser.add_argument( | |
| '-K', '--watch', | |
| action='append', | |
| help="Paths to watch. Defaults to the current directory.") | |
| parser.add_argument( | |
| '-I', '--ignore', | |
| action='append', | |
| help="Paths to not watch.") | |
| parser.add_argument( | |
| '-L', '--dereference', | |
| action='store_true', | |
| help="Recurse into symlinks, which are ignored by default. Does " | |
| "not affect explicit paths. Watch out for cycles!") | |
| parser.add_argument( | |
| '-w', '--wait', | |
| type=float, | |
| help="Time to wait after changes. Defaults to 0.01 seconds.") | |
| parser.add_argument( | |
| '-s', '--serve', '--server', | |
| metavar='addr', | |
| type=lambda a: (lambda a, p: (a, int(p)))(*a.split(':', 1)), | |
| help="Run a simple http server at this address:port.") | |
| parser.add_argument( | |
| '--inject-livereload', | |
| action='store_true', | |
| help="Inject the live reload script into all .html and .htm " | |
| "files.") | |
| parser.add_argument( | |
| '--help-livereload', | |
| action='store_true', | |
| help="Print the live reload script in case you want to add this " | |
| "manually.") | |
| parser.add_argument( | |
| '-e', '--exit-on-error', | |
| action='store_true', | |
| help="Exit if command errors.") | |
| sys.exit(main(**{k: v | |
| for k, v in vars(parser.parse_args()).items() | |
| if v is not None})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment