1from functools import partial
2from multiprocessing import cpu_count
3
4def runParallelFunction(runFunction, arguments, parallel_config = None):
5    """
6        Return the output of runFunction for each set of arguments,
7        making use of as much parallelization as possible on this system
8
9        :param runFunction: The function that can be executed in parallel
10        :param arguments:   List of tuples, where each tuple are the arguments
11                            to pass to the function
12        :param force_single:Force the run to be single-threaded, ignoring all settings from IOH_config
13        :param timeout:     How many seconds each execution get before being stopped.
14                            Only used when running in parallel without mpi.
15        :return:
16    """
17    if parallel_config is None:
18        return runSingleThreaded(runFunction, arguments)
19
20    if parallel_config['evaluate_parallel']:
21        if parallel_config['use_MPI']:
22            return runMPI(runFunction, arguments)
23        elif parallel_config['use_pebble']:
24            return runPebblePool(
25                runFunction, arguments,
26                timeout = parallel_config['timeout'],
27                num_threads = parallel_config['num_threads']
28            )
29        elif parallel_config['use_joblib']:
30            return runJoblib(
31                runFunction, arguments,
32                num_threads = parallel_config['num_threads']
33            )
34        else:
35            return runPool(
36                runFunction, arguments,
37                num_threads = parallel_config['num_threads']
38            )
39    else:
40        return runSingleThreaded(runFunction, arguments)
41
42# Inline function definition to allow the passing of multiple arguments to 'runFunction' through 'Pool.map'
43def func_star(a_b, func):
44    """Convert `f([1,2])` to `f(1,2)` call."""
45    return func(*a_b)
46
47def runPool(runFunction, arguments, num_threads = None):
48    """
49        Small overhead-function to handle multi-processing using Python's built-in multiprocessing.Pool
50
51        :param runFunction: The (``partial``) function to run in parallel, accepting ``arguments``
52        :param arguments:   The arguments to passed distributedly to ``runFunction``
53        :return:            List of any results produced by ``runFunction``
54    """
55    if num_threads is None:
56        num_threads = cpu_count()
57    arguments = list(arguments)
58    print(f"Running pool with {min(num_threads, len(arguments))} threads")
59    from multiprocessing import Pool
60    p = Pool(min(num_threads, len(arguments)))
61
62    local_func = partial(func_star, func=runFunction)
63    results = p.map(local_func, arguments)
64    p.close()
65    return results
66
67def runPebblePool(runfunction, arguments, timeout = 30, num_threads = None):
68    from pebble import ProcessPool
69    from concurrent.futures import TimeoutError
70
71    if num_threads is None:
72        num_threads = cpu_count()
73
74    arguments = list(arguments)
75
76    print(f"Running pebble pool with {min(num_threads, len(arguments))} threads")
77
78
79    def task_done(future):
80        try:
81            future.result()  # blocks until results are ready
82        except TimeoutError as error:
83            print("Function took longer than %d seconds" % error.args[1])
84        except Exception as error:
85            print("Function raised %s" % error)
86            print(error)  # traceback of the function
87
88    with ProcessPool(max_workers = min(num_threads, len(arguments)), max_tasks = len(arguments)) as pool:
89        for x in arguments:
90            if type(x) is not list and type(x) is not tuple:
91                x = [x]
92            future = pool.schedule(runfunction, args=x, timeout=timeout)
93            future.add_done_callback(task_done)
94
95def runJoblib(runFunction, arguments, num_threads = None):
96    from joblib import delayed, Parallel
97
98    if num_threads is None:
99        num_threads = cpu_count()
100#     arguments = list(arguments)
101#     print(f"Running joblib with {min(num_threads, len(arguments))} threads")
102    local_func = partial(func_star, func=runFunction)
103
104    return Parallel(n_jobs = num_threads)(delayed(local_func)(arg) for arg in arguments)
105
106def runSingleThreaded(runFunction, arguments):
107    """
108        Small overhead-function to iteratively run a function with a pre-determined input arguments
109
110        :param runFunction: The (``partial``) function to run, accepting ``arguments``
111        :param arguments:   The arguments to passed to ``runFunction``, one run at a time
112        :return:            List of any results produced by ``runFunction``
113    """
114    results = []
115    print("Running single-threaded")
116    for arg in arguments:
117        results.append(runFunction(*arg))
118    return results
119
120def runMPI(runFunction, arguments):
121    from schwimmbad import MPIPool
122    print("Running using MPI")
123    with MPIPool() as pool:
124        results = pool.map(runFunction, arguments)
125    return results
126
127