Created
July 7, 2018 13:06
-
-
Save xuecan/4814ff37eae3f253ed423ebfccea0068 to your computer and use it in GitHub Desktop.
监控子进程的 stdout 和 stderr,触发 callback,支持同时监控多个子进程
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Copyright (c) 2018 Xue Can <xuecan@gmail.com> | |
| """ | |
| ## 监控子进程输出的管理器 | |
| 这个模块提供了一个进程输出的监视器,它使用 Popen 启动子进程并持续监控输出。只要子进程 | |
| 输出是以行为基础的文本输出,就可以使用这个模块提供的监视器监控。特别的,如果需要监控的 | |
| 是日志文件等,实际上是转化为 `tail -F` 来监视进程的。 | |
| 最初,这个模块就是为了配合 `tail -F` 使用而编写的,后续发展到能够处理任何只输出,不需要通过 | |
| stdin 输入,且输出以行为基础的进程。 | |
| 以下是关于 `tail -F` 的一些信息。 | |
| `tail -F` 以文件名而不是文件描述符为依据跟踪文件的增长。由于最终是布署在 | |
| Linux 上,所以我们需要针对 GNU 版的行为进行处理。一些需要注意的有: | |
| * 开始调用时,即使需要跟踪的文件不存在也不会退出,只会在 stderr 输出形如 | |
| `tail: cannot open 'filename' for reading: No such file or directory`。 | |
| * 文件被砍了从头写,会在 stderr 输出形如 `tail: filename: file truncated` | |
| * 文件被删除或改名,会在 stderr 输出形如 | |
| `tail: 'filename' has become inaccessible: No such file or directory` | |
| * 文件重新可用,会在 stderr 输出形如 | |
| `tail: 'filename' has appeared; following new file` | |
| * 正常输出是在 stdout | |
| """ | |
| import sys | |
| import time | |
| import shlex | |
| import signal | |
| import typing | |
| import logging | |
| import selectors | |
| import subprocess | |
| from hashlib import md5 | |
| from threading import Thread | |
| from distutils.spawn import find_executable | |
| # 日志器 | |
| _LOG = logging.getLogger(__name__) | |
| # 在 macOS 中使用 gtail 代替 tail (可以通过 brew install coreutils 安装) | |
| _TAIL_PROGRAM = find_executable("gtail" if sys.platform == "darwin" else "tail") | |
| assert _TAIL_PROGRAM, "Oops! `tail` program not found." | |
| _TAIL_PROGRAM = shlex.quote(_TAIL_PROGRAM) | |
| class ProcessData(typing.NamedTuple): | |
| """进程信息""" | |
| key: str | |
| cmd: str | |
| kwargs: dict | |
| p: subprocess.Popen | |
| class SelectData(typing.NamedTuple): | |
| """Selector 选中的信息""" | |
| process_data: ProcessData | |
| stream_name: str | |
| class ProcessWatcher(object): | |
| """进程输出监视器""" | |
| def __init__(self, on_output=None, on_terminated=None): | |
| self._mapping = dict() | |
| self._selector = selectors.DefaultSelector() | |
| self._running = True | |
| self.on_output = on_output | |
| self.on_terminated = on_terminated | |
| def stop(self): | |
| if not self._running: | |
| _LOG.warning("Already stopped") | |
| return | |
| self._running = False | |
| self.kill_all() | |
| self.tick() | |
| def _register(self, process_data, stream_name): | |
| # 向内嵌的 selector 注册文件 | |
| fileobj = getattr(process_data.p, stream_name) | |
| select_data = SelectData(process_data, stream_name) | |
| self._selector.register(fileobj, selectors.EVENT_READ, select_data) | |
| def _unregister(self, process_data, stream_name): | |
| # 告知内嵌的 selector 某个文件不再需要了 | |
| fileobj = getattr(process_data.p, stream_name) | |
| self._selector.unregister(fileobj) | |
| def invoke(self, key, cmd, **kwargs): | |
| if key in self._mapping: | |
| message = "Key %r already exists" % key | |
| _LOG.warning(message) | |
| return | |
| if not self._running: | |
| _LOG.warning("The watcher is stopped") | |
| return | |
| # 使用 Popen 创建进程 | |
| args = shlex.split(cmd) | |
| kwargs["shell"] = False | |
| kwargs.setdefault("bufsize", 1) | |
| kwargs.setdefault("universal_newlines", True) | |
| kwargs.setdefault("encoding", "utf8") | |
| kwargs["stdin"] = None | |
| kwargs.setdefault("stdout", subprocess.PIPE) | |
| kwargs.setdefault("stderr", subprocess.PIPE) | |
| p = subprocess.Popen(args, **kwargs) | |
| # 根据 Popen 的参数决定需要向 selector 注册哪些文件 | |
| process_data = ProcessData(key, cmd, kwargs, p) | |
| if kwargs["stdout"] == subprocess.PIPE: | |
| self._register(process_data, "stdout") | |
| if kwargs["stderr"] == subprocess.PIPE: | |
| self._register(process_data, "stderr") | |
| # 写入进程映射中 | |
| self._mapping[key] = process_data | |
| return process_data | |
| def follow(self, filename, lines=0, sleep_interval=1.0): | |
| """添加要使用 tail -F 跟踪的文件""" | |
| key = "tailF-%s" % md5(filename.encode("utf8")).hexdigest() | |
| cmd = "%s -F -n %d -s %g %s" % \ | |
| (_TAIL_PROGRAM, lines, sleep_interval, shlex.quote(filename)) | |
| return self.invoke(key, cmd) | |
| def kill(self, key, kill_signal=signal.SIGTERM): | |
| process_data = self._mapping.get(key, None) | |
| if not process_data: | |
| message = "Key %r not exists" % key | |
| _LOG.warning(message) | |
| else: | |
| p = process_data.p | |
| p.send_signal(kill_signal) | |
| p.wait() | |
| def kill_all(self, kill_signal=signal.SIGTERM): | |
| for key, process_data in self._mapping.items(): | |
| process_data.p.send_signal(kill_signal) | |
| for key, process_data in self._mapping.items(): | |
| process_data.p.wait() | |
| def _on_output(self, select_data: SelectData, line): | |
| if callable(self.on_output): | |
| self.on_output(select_data, line) | |
| else: | |
| key = select_data.process_data.key | |
| stream_name = select_data.stream_name | |
| print("%r(%s): %s" % (key, stream_name, line)) | |
| def _on_terminated(self, process_data: ProcessData): | |
| if callable(self.on_terminated): | |
| self.on_terminated(process_data) | |
| else: | |
| print("Process %r is terminated." % process_data.key) | |
| def tick(self): | |
| # 这个 try ... except ... 的目的是不让整个循环因为异常终止 | |
| # Logger.exception 使用 logging.ERROR 级别输出,算是很严重的信息了 | |
| try: | |
| self._tick() | |
| except Exception: | |
| _LOG.exception("We've problem!") | |
| def _tick(self): | |
| # 一次 tick 要做的事情 | |
| # (1). 对活跃的进程的 stdout 和 stderr,如果可读,处理一行 | |
| items = self._selector.select(timeout=-1) # -1 表示非阻塞 | |
| for item in items: | |
| select_data: SelectData = item[0].data | |
| fileobj: typing.IO = item[0].fileobj | |
| if select_data.process_data.p.poll() is None: | |
| # 进程还未结束,只处理一行即可(否则可能死锁) | |
| line = fileobj.readline() | |
| self._on_output(select_data, line) | |
| # (2). 对结束的进程,处理剩余的所有输出,并进行处理 | |
| todo_list = list() | |
| for key, process_data in self._mapping.items(): | |
| if process_data.p.poll() is None: | |
| # 进程 p 还没结束 | |
| continue | |
| # 进程 p 已经结束了! | |
| todo_list.append(key) | |
| for key in todo_list: | |
| # (2.1). 清理 | |
| process_data = self._mapping.pop(key) | |
| self._unregister(process_data, "stdout") | |
| self._unregister(process_data, "stderr") | |
| # (2.2). 处理剩余的输出 | |
| out_buffer, err_buffer = process_data.p.communicate() | |
| select_data = SelectData(process_data, "stdout") | |
| self._on_output(select_data, out_buffer) | |
| select_data = SelectData(process_data, "stderr") | |
| self._on_output(select_data, err_buffer) | |
| # (2.3). 通知关心的对象 | |
| self._on_terminated(process_data) | |
| def loop(self, interval=0.1): | |
| interval = abs(interval or 0.1) | |
| while self._running: | |
| try: | |
| self.tick() | |
| time.sleep(interval) | |
| except (KeyboardInterrupt, SystemExit): | |
| self.stop() | |
| class ProcessWatcherThread(Thread): | |
| def __init__(self, group=None, name=None, daemon=None, | |
| on_output=None, on_terminated=None, interval=0.1): | |
| Thread.__init__(self, group=group, name=name, daemon=daemon) | |
| self._interval = interval | |
| self._watcher = ProcessWatcher(on_output, on_terminated) | |
| def run(self): | |
| self._watcher.loop(self._interval) | |
| def invoke(self, *args, **kwargs): | |
| return self._watcher.invoke(*args, **kwargs) | |
| def follow(self, *args, **kwargs): | |
| return self._watcher.follow(*args, **kwargs) | |
| def kill(self, *args, **kwargs): | |
| return self._watcher.kill(*args, **kwargs) | |
| def kill_all(self, *args, **kwargs): | |
| return self._watcher.kill_all(*args, **kwargs) | |
| def stop(self): | |
| self.join() | |
| def join(self, timeout=None): | |
| self._watcher.stop() | |
| Thread.join(self, timeout) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment