1from functools import partial 2from multiprocessing import cpu_count 3 4def runParallelFunction(runFunction, arguments, parallel_config = None): 5 """ 6 Return the output of runFunction for each set of arguments, 7 making use of as much parallelization as possible on this system 8 9 :param runFunction: The function that can be executed in parallel 10 :param arguments: List of tuples, where each tuple are the arguments 11 to pass to the function 12 :param force_single:Force the run to be single-threaded, ignoring all settings from IOH_config 13 :param timeout: How many seconds each execution get before being stopped. 14 Only used when running in parallel without mpi. 15 :return: 16 """ 17 if parallel_config is None: 18 return runSingleThreaded(runFunction, arguments) 19 20 if parallel_config['evaluate_parallel']: 21 if parallel_config['use_MPI']: 22 return runMPI(runFunction, arguments) 23 elif parallel_config['use_pebble']: 24 return runPebblePool( 25 runFunction, arguments, 26 timeout = parallel_config['timeout'], 27 num_threads = parallel_config['num_threads'] 28 ) 29 elif parallel_config['use_joblib']: 30 return runJoblib( 31 runFunction, arguments, 32 num_threads = parallel_config['num_threads'] 33 ) 34 else: 35 return runPool( 36 runFunction, arguments, 37 num_threads = parallel_config['num_threads'] 38 ) 39 else: 40 return runSingleThreaded(runFunction, arguments) 41 42# Inline function definition to allow the passing of multiple arguments to 'runFunction' through 'Pool.map' 43def func_star(a_b, func): 44 """Convert `f([1,2])` to `f(1,2)` call.""" 45 return func(*a_b) 46 47def runPool(runFunction, arguments, num_threads = None): 48 """ 49 Small overhead-function to handle multi-processing using Python's built-in multiprocessing.Pool 50 51 :param runFunction: The (``partial``) function to run in parallel, accepting ``arguments`` 52 :param arguments: The arguments to passed distributedly to ``runFunction`` 53 :return: List of any results produced by ``runFunction`` 54 """ 55 if num_threads is None: 56 num_threads = cpu_count() 57 arguments = list(arguments) 58 print(f"Running pool with {min(num_threads, len(arguments))} threads") 59 from multiprocessing import Pool 60 p = Pool(min(num_threads, len(arguments))) 61 62 local_func = partial(func_star, func=runFunction) 63 results = p.map(local_func, arguments) 64 p.close() 65 return results 66 67def runPebblePool(runfunction, arguments, timeout = 30, num_threads = None): 68 from pebble import ProcessPool 69 from concurrent.futures import TimeoutError 70 71 if num_threads is None: 72 num_threads = cpu_count() 73 74 arguments = list(arguments) 75 76 print(f"Running pebble pool with {min(num_threads, len(arguments))} threads") 77 78 79 def task_done(future): 80 try: 81 future.result() # blocks until results are ready 82 except TimeoutError as error: 83 print("Function took longer than %d seconds" % error.args[1]) 84 except Exception as error: 85 print("Function raised %s" % error) 86 print(error) # traceback of the function 87 88 with ProcessPool(max_workers = min(num_threads, len(arguments)), max_tasks = len(arguments)) as pool: 89 for x in arguments: 90 if type(x) is not list and type(x) is not tuple: 91 x = [x] 92 future = pool.schedule(runfunction, args=x, timeout=timeout) 93 future.add_done_callback(task_done) 94 95def runJoblib(runFunction, arguments, num_threads = None): 96 from joblib import delayed, Parallel 97 98 if num_threads is None: 99 num_threads = cpu_count() 100# arguments = list(arguments) 101# print(f"Running joblib with {min(num_threads, len(arguments))} threads") 102 local_func = partial(func_star, func=runFunction) 103 104 return Parallel(n_jobs = num_threads)(delayed(local_func)(arg) for arg in arguments) 105 106def runSingleThreaded(runFunction, arguments): 107 """ 108 Small overhead-function to iteratively run a function with a pre-determined input arguments 109 110 :param runFunction: The (``partial``) function to run, accepting ``arguments`` 111 :param arguments: The arguments to passed to ``runFunction``, one run at a time 112 :return: List of any results produced by ``runFunction`` 113 """ 114 results = [] 115 print("Running single-threaded") 116 for arg in arguments: 117 results.append(runFunction(*arg)) 118 return results 119 120def runMPI(runFunction, arguments): 121 from schwimmbad import MPIPool 122 print("Running using MPI") 123 with MPIPool() as pool: 124 results = pool.map(runFunction, arguments) 125 return results 126 127