Source code for kombine.interruptible_pool

"""
This is a drop-in replacement for multiprocessing's pool that
plays better with keyboard interrupts.  This implimentation is a modified
version of one originally written by Peter K. G. Williams <peter@newton.cx>
for emcee:

    * `<https://github.com/dfm/emcee/blob/master/emcee/interruptible_pool.py>`_

which was an adaptation of a method written by John Reese, shared as

    * `<https://github.com/jreese/multiprocessing-keyboardinterrupt/>`_
"""

import signal
import functools
from multiprocessing.pool import Pool as MPPool
from multiprocessing import TimeoutError


def _initializer_wrapper(initializer, *args):
    """
    Ignore SIGINT. During typical keyboard interrupts, the parent does the
    killing.
    """
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    if initializer is not None:
        initializer(*args)


[docs]def disable_openblas_threading(): """ A convenience function for turning off openblas threading to avoid costly overhead. Just setting the `OPENBLAS_NUM_THREADS` environment variable to `1` would be much simpler, but that only works if the user hasn't already imported `numpy`. This function attempts to use `ctypes` to load the OpenBLAS library and access the `openblas_set_num_threads` function, which will work even if the user already imported numpy or scipy. """ import numpy as np import ctypes from ctypes.util import find_library try: np_lib_dir = np.__config__.__dict__['openblas_info']['library_dirs'][0] except KeyError: np_lib_dir = None try_paths = ['{}/libopenblas.so'.format(np_lib_dir), '{}/libopenblas.dylib'.format(np_lib_dir), '/opt/OpenBLAS/lib/libopenblas.so', '/lib/libopenblas.so', '/usr/lib/libopenblas.so.0', find_library('openblas')] openblas_lib = None for path in try_paths: try: openblas_lib = ctypes.cdll.LoadLibrary(path) except OSError: continue try: openblas_lib.openblas_set_num_threads(1) except AttributeError: raise EnvironmentError('Could not locate an OpenBLAS shared library', 2)
[docs]class Pool(MPPool): """ A modified :class:`multiprocessing.pool.Pool` that handles :exc:`KeyboardInterrupts` in the :func:`map` method more gracefully. :param processes: (optional) The number of processes to use (defaults to number of CPUs). :param initializer: (optional) A callable to be called by each process when it starts. :param initargs: (optional) Arguments for *initializer*; called as ``initializer(*initargs)``. :param kwargs: (optional) Extra arguments. Python 2.7 supports a `maxtasksperchild` parameter. """ def __init__(self, processes=None, initializer=None, initargs=(), **kwargs): self._wait_timeout = 3600 new_initializer = functools.partial(_initializer_wrapper, initializer) super(Pool, self).__init__(processes, new_initializer, initargs, **kwargs)
[docs] def map(self, func, items, chunksize=None): """ A replacement for :func:`map` that handles :exc:`KeyboardInterrupt`. :param func: Function to apply to the items. :param items: Iterable of items to have `func` applied to. """ # Call r.get() with a timeout, since a Condition.wait() swallows # KeyboardInterrupts without a timeout r = self.map_async(func, items, chunksize) while True: try: return r.get(self._wait_timeout) except TimeoutError: pass except KeyboardInterrupt: self.terminate() self.join() raise