Last active
May 15, 2021 15:12
-
-
Save abc126/ffc1981e94eb8844df5d97fb42e2a74e to your computer and use it in GitHub Desktop.
get stdout and stderr realtime
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #https://stackoverflow.com/a/57084403 | |
| import io | |
| import time | |
| import subprocess | |
| import sys | |
| filename = 'test.log' | |
| with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader: | |
| process = subprocess.Popen(command, stdout=writer) | |
| while process.poll() is None: | |
| sys.stdout.write(reader.read()) | |
| time.sleep(0.5) | |
| # Read the remaining | |
| sys.stdout.write(reader.read()) | |
| # The only advantage of the file approach is that your code doesn't block. So you can do whatever you want in the meantime and read whenever you want from the reader in a non-blocking way. When you use PIPE, read and readline functions will block until either one character is written to the pipe or a line is written to the pipe respectively. | |
| # Solution 1: Log stdout AND stderr concurrently in realtime | |
| import subprocess as sp | |
| from concurrent.futures import ThreadPoolExecutor | |
| def log_popen_pipe(p, stdfile): | |
| with open("mylog.txt", "w") as f: | |
| while p.poll() is None: | |
| f.write(stdfile.readline()) | |
| f.flush() | |
| # Write the rest from the buffer | |
| f.write(stdfile.read()) | |
| with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p: | |
| with ThreadPoolExecutor(2) as pool: | |
| r1 = pool.submit(log_popen_pipe, p, p.stdout) | |
| r2 = pool.submit(log_popen_pipe, p, p.stderr) | |
| r1.result() | |
| r2.result() | |
| # Solution 2: A function read_popen_pipes() that allows you to iterate over both pipes (stdout/stderr), concurrently in realtime | |
| from queue import Queue, Empty | |
| from concurrent.futures import ThreadPoolExecutor | |
| def enqueue_output(file, queue): | |
| for line in iter(file.readline, ''): | |
| queue.put(line) | |
| file.close() | |
| def read_popen_pipes(p): | |
| with ThreadPoolExecutor(2) as pool: | |
| q_stdout, q_stderr = Queue(), Queue() | |
| pool.submit(enqueue_output, p.stdout, q_stdout) | |
| pool.submit(enqueue_output, p.stderr, q_stderr) | |
| while True: | |
| if p.poll() is not None and q_stdout.empty() and q_stderr.empty(): | |
| break | |
| out_line = err_line = '' | |
| try: | |
| out_line = q_stdout.get_nowait() | |
| except Empty: | |
| pass | |
| try: | |
| err_line = q_stderr.get_nowait() | |
| except Empty: | |
| pass | |
| yield (out_line, err_line) | |
| # The function in use: | |
| import subprocess as sp | |
| with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p: | |
| for out_line, err_line in read_popen_pipes(p): | |
| # Do stuff with each line, e.g.: | |
| print(out_line, end='') | |
| print(err_line, end='') | |
| return p.poll() # return status-code |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment