Call PowerShell from Python with support for color output and exception handling using TTY for stdin, stdout, stderr.
New version without need for TTYs here: https://gist.github.com/nathan815/6e894b3394d42bd1c9e7dfae5142a678
| import subprocess | |
| import json | |
| from json.decoder import JSONDecodeError | |
| import errno | |
| import os | |
| import pty | |
| import select | |
| import subprocess | |
| import re | |
| from typing import Tuple, Any, Union | |
| def run_powershell( | |
| script: str, | |
| function: str='', | |
| print_stdout: bool=True, | |
| print_stderr: bool=True, | |
| output: str='json', | |
| ) -> Tuple[Any, list]: | |
| ''' | |
| Invokes a PowerShell script or specific function | |
| Parameters: | |
| script: path to the PS script | |
| function: PS function to call, can also pass any arguments to it | |
| print_stdout: print PS stdout to console | |
| print_stderr: print PS stderr to console | |
| output: 'json' | 'raw' | |
| json: parse output as JSON. The last output line from the PS script/function MUST be a single line of JSON. | |
| raw: return all stdout lines without parsing | |
| Returns: | |
| (output, err) | |
| Example: | |
| my_script.ps1 | |
| function myPsFunction(a, b) { | |
| $hosts = 'esx01', 'esx02', a, b | |
| return $hosts | ConvertTo-Json -Compress | |
| } | |
| test.py | |
| data, err = run_powershell('my_script.ps1', function='myPsFunction x y') | |
| print(data) | |
| output: | |
| ['esx01', 'esx02', 'x', 'y'] | |
| ''' | |
| if output not in ['json', 'raw']: | |
| raise ValueError('invalid value for argument output') | |
| caught_error_key = '_PsUnhandledException_' | |
| # Import the PS script and run the specified function | |
| ps_code = f''' | |
| $ErrorActionPreference = "stop"; | |
| try {{ | |
| . {script}; | |
| {function}; | |
| }} catch {{ | |
| $e = $_ | Select-Object ErrorDetails, ErrorRecord, CategoryInfo, ScriptStackTrace | |
| $errorJson = @{{ {caught_error_key} = $e }} | ConvertTo-Json -Compress -Depth 10 | |
| Write-Host $errorJson | |
| exit 1 | |
| }} | |
| ''' | |
| print(ps_code) | |
| cmd = ['pwsh', '-command', ps_code] | |
| print(f'[PowerShell => {script} {function}]') | |
| output_lines = {'stdout': [], 'stderr': []} | |
| current_line = {'stdout': b'', 'stderr': b''} | |
| ansi_escape_re = re.compile(r'(?:\x1b[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') | |
| exit_code = None | |
| for output in _run_cmd_tty_stream(cmd): | |
| returncode = output.get('returncode', None) | |
| if returncode is not None: | |
| exit_code = returncode | |
| else: | |
| name = output['name'] | |
| data = output['data'] | |
| current_line[name] += data | |
| eol = b'\n' in current_line[name][-2:] | |
| if eol: | |
| try: | |
| line = current_line[name].decode('utf-8') | |
| except AttributeError: | |
| line = current_line[name] | |
| if print_stdout and name == 'stdout' or print_stderr and name == 'stderr': | |
| print(line, end='') | |
| output_lines[name].append(ansi_escape_re.sub('', line).replace('\x1b=', '')) | |
| current_line[name] = b'' | |
| stdout = output_lines['stdout'] | |
| stderr = output_lines['stderr'] | |
| last_line_json = None | |
| try: | |
| last_line = stdout[-1] | |
| last_line_json = json.loads(last_line) | |
| except JSONDecodeError: | |
| pass | |
| if exit_code != 0: | |
| # Check if an exception occurred in PS script | |
| msg = f'PowerShell exited with non-zero code {exit_code}' | |
| if caught_error_key in last_line_json: | |
| error = last_line_json[caught_error_key] | |
| info = error.get('CategoryInfo', {}) | |
| raise PowerShellException(f"{msg} Exception: {info.get('Reason')} {info.get('TargetName')}", error) | |
| raise PowerShellException(msg) | |
| if output == 'raw': | |
| return stdout, stderr | |
| elif output == 'json': | |
| if not last_line_json: | |
| raise PowerShellException(f'JSON not detected on last line of PS stdout', stdout) | |
| return last_line_json, stderr | |
| def _run_cmd_tty_stream(cmd, bytes_input=b''): | |
| """Streams the output of cmd with bytes_input to stdin, | |
| with stdin, stdout and stderr as TTYs. | |
| Each yield from this function is: | |
| { "name": "stdout/stderr", "data": b"", "returncode": 0 } | |
| Adapted from https://stackoverflow.com/a/52954716/507629 | |
| and https://gist.github.com/hayd/4f46a68fc697ba8888a7b517a414583e | |
| """ | |
| # provide tty to enable line-buffering | |
| mo, so = pty.openpty() # stdout | |
| me, se = pty.openpty() # stderr | |
| mi, si = pty.openpty() # stdin | |
| p = subprocess.Popen( | |
| cmd, | |
| bufsize=1, stdin=si, stdout=so, stderr=se, | |
| close_fds=True) | |
| for fd in [so, se, si]: | |
| os.close(fd) | |
| os.write(mi, bytes_input) | |
| timeout = 0.04 # seconds | |
| readable = [mo, me] | |
| fd_name = {mo: 'stdout', me: 'stderr'} | |
| try: | |
| while readable: | |
| ready, _, _ = select.select(readable, [], [], timeout) | |
| for fd in ready: | |
| try: | |
| data = os.read(fd, 512) | |
| except OSError as e: | |
| if e.errno != errno.EIO: | |
| raise | |
| # EIO means EOF on some systems | |
| readable.remove(fd) | |
| else: | |
| if not data: # EOF | |
| readable.remove(fd) | |
| yield {'name': fd_name[fd], 'data': data, 'returncode': None} | |
| finally: | |
| for fd in [mo, me, mi]: | |
| os.close(fd) | |
| if p.poll() is None: | |
| p.kill() | |
| p.wait() | |
| yield {'name': None, 'data': None, 'returncode': p.returncode} | |
| return | |
| class PowerShellException(Exception): | |
| pass |
| function myPsFunction($a, $b) { | |
| $data = 'abc', 'def', $a, $b | |
| Write-Host "This should be green" -ForegroundColor green | |
| Write-Error "Test error!" | |
| Write-Host "This should be magenta" -ForegroundColor magenta | |
| $host.ui.WriteErrorLine("test raw stderror output") | |
| Write-Host "No color" | |
| return $data | ConvertTo-Json -Compress | |
| } |
| from powershell import run_powershell, PowerShellException | |
| try: | |
| data, err = run_powershell('test.ps1', function='myPsFunction arg1 arg2') | |
| print('parsed array from PS: ', data) | |
| for d in data: | |
| print(d) | |
| except PowerShellException as e: | |
| print('PS Exception: ', e) |
Call PowerShell from Python with support for color output and exception handling using TTY for stdin, stdout, stderr.
New version without need for TTYs here: https://gist.github.com/nathan815/6e894b3394d42bd1c9e7dfae5142a678