Created
March 8, 2018 23:11
-
-
Save donkirkby/a4851a140be1b8a4691a14e0a2884746 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from Queue import Empty, Queue | |
| from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter, Namespace | |
| from glob import glob | |
| import os | |
| from itertools import count | |
| from threading import Thread | |
| from time import sleep | |
| def create_arg_parser(): | |
| parser = ArgumentParser(description='options test', | |
| formatter_class=ArgumentDefaultsHelpFormatter) | |
| parser.add_argument('-d', | |
| '--delay', | |
| type=float, | |
| default=1.0) | |
| parser.add_argument('-m', | |
| '--message', | |
| default='Howdy') | |
| return parser | |
| def create_files(queue): | |
| for filename in glob('copy_test/*'): | |
| os.remove(filename) | |
| args = delay = None | |
| file_count = 0 | |
| while True: | |
| try: | |
| args = queue.get(timeout=delay) | |
| print('Created {} files.'.format(file_count)) | |
| if args.quit: | |
| break | |
| delay = args.delay | |
| except Empty: | |
| # continue with current args | |
| filename = os.path.join( | |
| 'copy_test', | |
| '{} {} for {:g}.txt'.format(args.message, | |
| file_count, | |
| args.delay)) | |
| with open(filename, 'w') as f: | |
| f.write('x') | |
| file_count += 1 | |
| def main(): | |
| queue = Queue() | |
| parser = create_arg_parser() | |
| args = Namespace(quit=False, **vars(parser.parse_args())) | |
| # noinspection PyProtectedMember | |
| commands = sorted((option.option_strings[0][-1], option.dest) | |
| for option in parser._optionals._actions | |
| if option.dest != 'help') | |
| attrs = dict(commands) | |
| commands.append(('s', 'status?')) | |
| commands.append(('q', 'quit!')) | |
| thread = Thread(target=create_files, args=(queue,)) | |
| thread.start() | |
| try: | |
| while not args.quit: | |
| queue.put(args) | |
| sleep(0.1) | |
| for code, command in commands: | |
| message = ' ' + command + ' ' + code | |
| if code in attrs: | |
| message += str(getattr(args, command)) | |
| print(message) | |
| request = raw_input('Enter a code and an optional value: ') | |
| code = request[0].lower() | |
| value = request[1:].strip() | |
| if code == 'q': | |
| break | |
| elif code != 's': | |
| attr = attrs.get(code) | |
| if attr is None: | |
| print('Unknown code.') | |
| else: | |
| parser.parse_args(['-'+code, value], args) | |
| finally: | |
| args.quit = True | |
| queue.put(args) | |
| thread.join() | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment