Created
January 23, 2022 00:50
-
-
Save TeoZosa/647724cef5221af3f60bba6488deeaa0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Original code: https://github.com/GoogleCloudPlatform/cloudml-samples/blob/main/molecules/data-extractor.py | |
| # adapted for a minimally reproducible, documented example | |
| import multiprocessing as mp | |
| import signal | |
| # Good for debugging. | |
| FORCE_DISABLE_MULTIPROCESSING = False | |
| def _function_wrapper(args_tuple): | |
| """Function wrapper to call from multiprocessing.""" | |
| function, args = args_tuple | |
| return function(*args) | |
| def parallel_map(function, iterable): | |
| """Calls a function for every element in an iterable using multiple cores. | |
| Design pattern: Catch Ctrl+C / SIGINT and exit multiprocesses gracefully in python [duplicate](https://stackoverflow.com/a/35134329) | |
| The correct way to handle Ctrl+C/SIGINT with multiprocessing.Pool is to: | |
| - Make the process ignore SIGINT before a process Pool is created. This way created child processes inherit SIGINT handler. | |
| - Restore the original SIGINT handler in the parent process after a Pool has been created. | |
| - Use map_async and apply_async instead of blocking map and apply. | |
| - Wait on the results with timeout because the default blocking waits to ignore all signals. This is Python bug https://bugs.python.org/issue8296. | |
| """ | |
| if FORCE_DISABLE_MULTIPROCESSING: | |
| return [function(*args) for args in iterable] | |
| # Make the process ignore SIGINT before a process Pool is created. | |
| # This way created child processes inherit SIGINT handler. | |
| original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) | |
| num_threads = mp.cpu_count() * 2 | |
| pool = mp.Pool(processes=num_threads) | |
| # Restore the original SIGINT handler in the parent process | |
| # after a Pool has been created. | |
| signal.signal(signal.SIGINT, original_sigint_handler) | |
| # Use map_async and apply_async instead of blocking map and apply. | |
| p = pool.map_async(_function_wrapper, ((function, args) for args in iterable)) | |
| try: | |
| # Wait on the results with timeout because the default blocking waits | |
| # to ignore all signals. This is Python bug https://bugs.python.org/issue8296. | |
| results = p.get(0xFFFFFFFF) # MAXINT timeout | |
| except KeyboardInterrupt: | |
| pool.terminate() | |
| raise | |
| pool.close() | |
| return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment