Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save TeoZosa/647724cef5221af3f60bba6488deeaa0 to your computer and use it in GitHub Desktop.

Select an option

Save TeoZosa/647724cef5221af3f60bba6488deeaa0 to your computer and use it in GitHub Desktop.
# Original code: https://github.com/GoogleCloudPlatform/cloudml-samples/blob/main/molecules/data-extractor.py
# adapted for a minimally reproducible, documented example
import multiprocessing as mp
import signal
# Good for debugging.
FORCE_DISABLE_MULTIPROCESSING = False
def _function_wrapper(args_tuple):
"""Function wrapper to call from multiprocessing."""
function, args = args_tuple
return function(*args)
def parallel_map(function, iterable):
"""Calls a function for every element in an iterable using multiple cores.
Design pattern: Catch Ctrl+C / SIGINT and exit multiprocesses gracefully in python [duplicate](https://stackoverflow.com/a/35134329)
The correct way to handle Ctrl+C/SIGINT with multiprocessing.Pool is to:
- Make the process ignore SIGINT before a process Pool is created. This way created child processes inherit SIGINT handler.
- Restore the original SIGINT handler in the parent process after a Pool has been created.
- Use map_async and apply_async instead of blocking map and apply.
- Wait on the results with timeout because the default blocking waits to ignore all signals. This is Python bug https://bugs.python.org/issue8296.
"""
if FORCE_DISABLE_MULTIPROCESSING:
return [function(*args) for args in iterable]
# Make the process ignore SIGINT before a process Pool is created.
# This way created child processes inherit SIGINT handler.
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
num_threads = mp.cpu_count() * 2
pool = mp.Pool(processes=num_threads)
# Restore the original SIGINT handler in the parent process
# after a Pool has been created.
signal.signal(signal.SIGINT, original_sigint_handler)
# Use map_async and apply_async instead of blocking map and apply.
p = pool.map_async(_function_wrapper, ((function, args) for args in iterable))
try:
# Wait on the results with timeout because the default blocking waits
# to ignore all signals. This is Python bug https://bugs.python.org/issue8296.
results = p.get(0xFFFFFFFF) # MAXINT timeout
except KeyboardInterrupt:
pool.terminate()
raise
pool.close()
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment