1# Copyright 2020, 2021 PaGMO development team 2# 3# This file is part of the pygmo library. 4# 5# This Source Code Form is subject to the terms of the Mozilla 6# Public License v. 2.0. If a copy of the MPL was not distributed 7# with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 8 9 10from threading import Lock as _Lock 11 12 13def _mp_ipy_bfe_func(ser_prob_dv): 14 # The function that will be invoked 15 # by the individual processes/nodes of mp/ipy bfe. 16 import pickle 17 18 prob = pickle.loads(ser_prob_dv[0]) 19 dv = pickle.loads(ser_prob_dv[1]) 20 21 return pickle.dumps(prob.fitness(dv)) 22 23 24class mp_bfe(object): 25 """Multiprocessing batch fitness evaluator. 26 27 .. versionadded:: 2.13 28 29 This user-defined batch fitness evaluator (UDBFE) will dispatch 30 the fitness evaluation in batch mode of a set of decision vectors 31 to a process pool created and managed via the facilities of the 32 standard Python :mod:`multiprocessing` module. 33 34 The evaluations of the decision vectors are dispatched to the processes 35 of a global :class:`pool <multiprocessing.pool.Pool>` shared between 36 different instances of :class:`~pygmo.mp_bfe`. The pool is created 37 either implicitly by the construction of the first :class:`~pygmo.mp_bfe` 38 object or explicitly via the :func:`~pygmo.mp_bfe.init_pool()` 39 static method. The default number of processes in the pool is equal to 40 the number of logical CPUs on the current machine. The pool's size can 41 be queried via :func:`~pygmo.mp_bfe.get_pool_size()`, and changed via 42 :func:`~pygmo.mp_bfe.resize_pool()`. The pool can be stopped via 43 :func:`~pygmo.mp_bfe.shutdown_pool()`. 44 45 .. note:: 46 47 Due to certain implementation details of CPython, it is not possible to initialise, resize or shutdown the pool 48 from a thread different from the main one. Normally this is not a problem, but, for instance, if the first 49 :class:`~pygmo.mp_bfe` instance is created in a thread different from the main one, an error 50 will be raised. In such a situation, the user should ensure to call :func:`~pygmo.mp_bfe.init_pool()` 51 from the main thread before spawning the secondary thread. 52 53 .. warning:: 54 55 Due to internal limitations of CPython, sending an interrupt signal (e.g., by pressing ``Ctrl+C`` in an interactive 56 Python session) while an :class:`~pygmo.mp_bfe` is running might end up sending an interrupt signal also to the 57 external process(es). This can lead to unpredictable runtime behaviour (e.g., the session may hang). Although 58 pygmo tries hard to limit as much as possible the chances of this occurrence, it cannot eliminate them completely. Users 59 are thus advised to tread carefully with interrupt signals (especially in interactive sessions) when using 60 :class:`~pygmo.mp_bfe`. 61 62 .. warning:: 63 64 Due to an `upstream bug <https://bugs.python.org/issue38501>`_, when using Python 3.8 the multiprocessing 65 machinery may lead to a hangup when exiting a Python session. As a workaround until the bug is resolved, users 66 are advised to explicitly call :func:`~pygmo.mp_bfe.shutdown_pool()` before exiting a Python session. 67 68 """ 69 70 # Static variables for the pool. 71 _pool_lock = _Lock() 72 _pool = None 73 _pool_size = None 74 75 def __init__(self, chunksize=None): 76 """ 77 Args: 78 79 chunksize(:class:`int` or :data:`None`): if not :data:`None`, this positive integral represents 80 the approximate number of decision vectors that are processed by each task 81 submitted to the process pool by the call operator 82 83 Raises: 84 85 TypeError: if *chunksize* is neither :data:`None` nor a value of an integral type 86 ValueError: if *chunksize* is not strictly positive 87 unspecified: any exception thrown by :func:`~pygmo.mp_bfe.init_pool()` 88 89 """ 90 if not chunksize is None and not isinstance(chunksize, int): 91 raise TypeError( 92 "The 'chunksize' argument must be None or an int, but it is of type '{}' instead".format(type(chunksize))) 93 94 if not chunksize is None and chunksize <= 0: 95 raise ValueError( 96 "The 'chunksize' parameter must be a positive integer, but its value is {} instead".format(chunksize)) 97 98 # Init the process pool, if necessary. 99 mp_bfe.init_pool() 100 101 # Save the chunk size parameter. 102 self._chunksize = chunksize 103 104 def __call__(self, prob, dvs): 105 """Call operator. 106 107 This method will evaluate in batch mode the fitnesses of the input decision vectors 108 *dvs* using the fitness function from the optimisation problem *prob*. The fitness 109 evaluations are delegated to the processes of the pool backing 110 :class:`~pygmo.mp_bfe`. 111 112 See the documentation of :class:`pygmo.bfe` for an explanation of the expected 113 formats of *dvs* and of the return value. 114 115 Args: 116 117 prob(:class:`~pygmo.problem`): the input problem 118 dvs(:class:`numpy.ndarray`): the input decision vectors, represented as a 119 flattened 1D array 120 121 Returns: 122 123 :class:`numpy.ndarray`: the fitness vectors corresponding to *dvs*, represented as a 124 flattened 1D array 125 126 Raises: 127 128 unspecified: any exception thrown by the evaluations, by the (de)serialization 129 of the input arguments or of the return value, or by the public interface of the 130 process pool 131 132 133 """ 134 import pickle 135 import numpy as np 136 137 # Fetch the dimension and the fitness 138 # dimension of the problem. 139 ndim = prob.get_nx() 140 nf = prob.get_nf() 141 142 # Compute the total number of decision 143 # vectors represented by dvs. 144 ndvs = len(dvs) // ndim 145 # Reshape dvs so that it represents 146 # ndvs decision vectors of dimension ndim 147 # each. 148 dvs.shape = (ndvs, ndim) 149 150 # Pre-serialize the problem. 151 pprob = pickle.dumps(prob) 152 153 # Build the list of arguments to pass 154 # to the processes in the pool. 155 async_args = [(pprob, pickle.dumps(dv)) for dv in dvs] 156 157 with mp_bfe._pool_lock: 158 # Make sure the pool exists. 159 mp_bfe._init_pool_impl(None) 160 # Runt the objfun evaluations in async mode. 161 if self._chunksize is None: 162 ret = mp_bfe._pool.map_async(_mp_ipy_bfe_func, async_args) 163 else: 164 ret = mp_bfe._pool.map_async( 165 _mp_ipy_bfe_func, async_args, chunksize=self._chunksize) 166 167 # Build the vector of fitness vectors as a 2D numpy array. 168 fvs = np.array([pickle.loads(fv) for fv in ret.get()]) 169 # Reshape it so that it is 1D. 170 fvs.shape = (ndvs*nf,) 171 172 return fvs 173 174 def get_name(self): 175 """Name of this evaluator. 176 177 Returns: 178 179 :class:`str`: ``"Multiprocessing batch fitness evaluator"`` 180 181 """ 182 return "Multiprocessing batch fitness evaluator" 183 184 def get_extra_info(self): 185 """Extra info for this evaluator. 186 187 If the process pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`, 188 invoking this function will trigger the creation of a new pool. 189 190 Returns: 191 192 :class:`str`: a string containing information about the number of processes in the pool 193 194 Raises: 195 196 unspecified: any exception thrown by :func:`~pygmo.mp_bfe.get_pool_size()` 197 198 """ 199 return "\tNumber of processes in the pool: {}".format( 200 mp_bfe.get_pool_size()) 201 202 @staticmethod 203 def _init_pool_impl(processes): 204 # Implementation method for initing 205 # the pool. This will *not* do any locking. 206 from ._mp_utils import _make_pool 207 208 if mp_bfe._pool is None: 209 mp_bfe._pool, mp_bfe._pool_size = _make_pool(processes) 210 211 @staticmethod 212 def init_pool(processes=None): 213 """Initialise the process pool. 214 215 This method will initialise the process pool backing :class:`~pygmo.mp_bfe`, if the pool 216 has not been initialised yet or if the pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`. 217 Otherwise, this method will have no effects. 218 219 Args: 220 221 processes(:data:`None` or an :class:`int`): the size of the pool (if :data:`None`, the size of the pool will be 222 equal to the number of logical CPUs on the system) 223 224 Raises: 225 226 ValueError: if the pool does not exist yet and the function is being called from a thread different 227 from the main one, or if *processes* is a non-positive value 228 TypeError: if *processes* is not :data:`None` and not an :class:`int` 229 230 """ 231 with mp_bfe._pool_lock: 232 mp_bfe._init_pool_impl(processes) 233 234 @staticmethod 235 def get_pool_size(): 236 """Get the size of the process pool. 237 238 If the process pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`, invoking this 239 function will trigger the creation of a new pool. 240 241 Returns: 242 243 :class:`int`: the current size of the pool 244 245 Raises: 246 247 unspecified: any exception thrown by :func:`~pygmo.mp_bfe.init_pool()` 248 249 """ 250 with mp_bfe._pool_lock: 251 mp_bfe._init_pool_impl(None) 252 return mp_bfe._pool_size 253 254 @staticmethod 255 def resize_pool(processes): 256 """Resize pool. 257 258 This method will resize the process pool backing :class:`~pygmo.mp_bfe`. 259 260 If the process pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`, invoking this 261 function will trigger the creation of a new pool. 262 263 Args: 264 265 processes(:class:`int`): the desired number of processes in the pool 266 267 Raises: 268 269 TypeError: if the *processes* argument is not an :class:`int` 270 ValueError: if the *processes* argument is not strictly positive 271 unspecified: any exception thrown by :func:`~pygmo.mp_bfe.init_pool()` 272 273 """ 274 from ._mp_utils import _make_pool 275 276 if not isinstance(processes, int): 277 raise TypeError("The 'processes' argument must be an int") 278 if processes <= 0: 279 raise ValueError( 280 "The 'processes' argument must be strictly positive") 281 282 with mp_bfe._pool_lock: 283 # NOTE: this will either init a new pool 284 # with the requested number of processes, 285 # or do nothing if the pool exists already. 286 mp_bfe._init_pool_impl(processes) 287 if processes == mp_bfe._pool_size: 288 # Don't do anything if we are not changing 289 # the size of the pool. 290 return 291 # Create new pool. 292 new_pool, new_size = _make_pool(processes) 293 # Stop the current pool. 294 mp_bfe._pool.close() 295 mp_bfe._pool.join() 296 # Assign the new pool. 297 mp_bfe._pool = new_pool 298 mp_bfe._pool_size = new_size 299 300 @staticmethod 301 def shutdown_pool(): 302 """Shutdown pool. 303 304 This method will shut down the process pool backing :class:`~pygmo.mp_bfe`, after 305 all pending tasks in the pool have completed. 306 307 After the process pool has been shut down, attempting to use the evaluator 308 will raise an error. A new process pool can be created via an explicit call to 309 :func:`~pygmo.mp_bfe.init_pool()` or one of the methods of the public API of 310 :class:`~pygmo.mp_bfe` which trigger the creation of a new process pool. 311 312 """ 313 with mp_bfe._pool_lock: 314 if mp_bfe._pool is not None: 315 mp_bfe._pool.close() 316 mp_bfe._pool.join() 317 mp_bfe._pool = None 318 mp_bfe._pool_size = None 319 320 321class ipyparallel_bfe(object): 322 """Ipyparallel batch fitness evaluator. 323 324 .. versionadded:: 2.13 325 326 This user-defined batch fitness evaluator (UDBFE) will dispatch 327 the fitness evaluation in batch mode of a set of decision vectors 328 to an ipyparallel cluster. The communication with the cluster is managed 329 via an :class:`ipyparallel.LoadBalancedView` instance which is 330 created either implicitly when the first fitness evaluation is run, or 331 explicitly via the :func:`~pygmo.ipyparallel_bfe.init_view()` method. The 332 :class:`~ipyparallel.LoadBalancedView` instance is a global object shared 333 among all the ipyparallel batch fitness evaluators. 334 335 .. seealso:: 336 337 https://ipyparallel.readthedocs.io/en/latest/ 338 339 """ 340 # Static variables for the view. 341 _view_lock = _Lock() 342 _view = None 343 344 @staticmethod 345 def init_view(client_args=[], client_kwargs={}, view_args=[], view_kwargs={}): 346 """Init the ipyparallel view. 347 348 This method will initialise the :class:`ipyparallel.LoadBalancedView` 349 which is used by all ipyparallel evaluators to submit the evaluation tasks 350 to an ipyparallel cluster. If the :class:`ipyparallel.LoadBalancedView` 351 has already been created, this method will perform no action. 352 353 The input arguments *client_args* and *client_kwargs* are forwarded 354 as positional and keyword arguments to the construction of an 355 :class:`ipyparallel.Client` instance. From the constructed client, 356 an :class:`ipyparallel.LoadBalancedView` instance is then created 357 via the :func:`ipyparallel.Client.load_balanced_view()` method, to 358 which the positional and keyword arguments *view_args* and 359 *view_kwargs* are passed. 360 361 Note that usually it is not necessary to explicitly invoke this 362 method: an :class:`ipyparallel.LoadBalancedView` is automatically 363 constructed with default settings the first time a batch evaluation task 364 is submitted to an ipyparallel evaluator. This method should be used 365 only if it is necessary to pass custom arguments to the construction 366 of the :class:`ipyparallel.Client` or :class:`ipyparallel.LoadBalancedView` 367 objects. 368 369 Args: 370 371 client_args(:class:`list`): the positional arguments used for the 372 construction of the client 373 client_kwargs(:class:`dict`): the keyword arguments used for the 374 construction of the client 375 view_args(:class:`list`): the positional arguments used for the 376 construction of the view 377 view_kwargs(:class:`dict`): the keyword arguments used for the 378 construction of the view 379 380 Raises: 381 382 unspecified: any exception thrown by the constructor of :class:`ipyparallel.Client` 383 or by the :func:`ipyparallel.Client.load_balanced_view()` method 384 385 """ 386 from ._ipyparallel_utils import _make_ipyparallel_view 387 388 with ipyparallel_bfe._view_lock: 389 if ipyparallel_bfe._view is None: 390 # Create the new view. 391 ipyparallel_bfe._view = _make_ipyparallel_view( 392 client_args, client_kwargs, view_args, view_kwargs) 393 394 @staticmethod 395 def shutdown_view(): 396 """Destroy the ipyparallel view. 397 398 This method will destroy the :class:`ipyparallel.LoadBalancedView` 399 currently being used by the ipyparallel evaluators for submitting 400 evaluation tasks to an ipyparallel cluster. The view can be re-inited 401 implicitly by submitting a new evaluation task, or by invoking 402 the :func:`~pygmo.ipyparallel_bfe.init_view()` method. 403 404 """ 405 import gc 406 with ipyparallel_bfe._view_lock: 407 if ipyparallel_bfe._view is None: 408 return 409 410 old_view = ipyparallel_bfe._view 411 ipyparallel_bfe._view = None 412 del(old_view) 413 gc.collect() 414 415 def __call__(self, prob, dvs): 416 """Call operator. 417 418 This method will evaluate in batch mode the fitnesses of the input decision vectors 419 *dvs* using the fitness function from the optimisation problem *prob*. The fitness 420 evaluations are delegated to the nodes of the ipyparallel cluster backing 421 :class:`~pygmo.ipyparallel_bfe`. 422 423 See the documentation of :class:`pygmo.bfe` for an explanation of the expected 424 formats of *dvs* and of the return value. 425 426 Args: 427 428 prob(:class:`~pygmo.problem`): the input problem 429 dvs(:class:`numpy.ndarray`): the input decision vectors, represented as a 430 flattened 1D array 431 432 Returns: 433 434 :class:`numpy.ndarray`: the fitness vectors corresponding to *dvs*, represented as a 435 flattened 1D array 436 437 Raises: 438 439 unspecified: any exception thrown by the evaluations, by the (de)serialization 440 of the input arguments or of the return value, or by the public interface of 441 :class:`ipyparallel.LoadBalancedView`. 442 443 """ 444 import pickle 445 import numpy as np 446 from ._ipyparallel_utils import _make_ipyparallel_view 447 448 # Fetch the dimension and the fitness 449 # dimension of the problem. 450 ndim = prob.get_nx() 451 nf = prob.get_nf() 452 453 # Compute the total number of decision 454 # vectors represented by dvs. 455 ndvs = len(dvs) // ndim 456 # Reshape dvs so that it represents 457 # ndvs decision vectors of dimension ndim 458 # each. 459 dvs.shape = (ndvs, ndim) 460 461 # Pre-serialize the problem. 462 pprob = pickle.dumps(prob) 463 464 # Build the list of arguments to pass 465 # to the cluster nodes. 466 async_args = [(pprob, pickle.dumps(dv)) for dv in dvs] 467 468 with ipyparallel_bfe._view_lock: 469 if ipyparallel_bfe._view is None: 470 ipyparallel_bfe._view = _make_ipyparallel_view( 471 [], {}, [], {}) 472 ret = ipyparallel_bfe._view.map_async(_mp_ipy_bfe_func, async_args) 473 474 # Build the vector of fitness vectors as a 2D numpy array. 475 fvs = np.array([pickle.loads(fv) for fv in ret.get()]) 476 # Reshape it so that it is 1D. 477 fvs.shape = (ndvs*nf,) 478 479 return fvs 480 481 def get_name(self): 482 """Name of the evaluator. 483 484 Returns: 485 :class:`str`: ``"Ipyparallel batch fitness evaluator"`` 486 487 """ 488 return "Ipyparallel batch fitness evaluator" 489 490 def get_extra_info(self): 491 """Extra info for this evaluator. 492 493 Returns: 494 :class:`str`: a string with extra information about the status of the evaluator 495 496 """ 497 from copy import deepcopy 498 with ipyparallel_bfe._view_lock: 499 if ipyparallel_bfe._view is None: 500 return "\tNo cluster view has been created yet" 501 else: 502 d = deepcopy(ipyparallel_bfe._view.queue_status()) 503 return "\tQueue status:\n\t\n\t" + "\n\t".join(["(" + str(k) + ", " + str(d[k]) + ")" for k in d]) 504