1# Copyright 2020, 2021 PaGMO development team 2# 3# This file is part of the pygmo library. 4# 5# This Source Code Form is subject to the terms of the Mozilla 6# Public License v. 2.0. If a copy of the MPL was not distributed 7# with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 8 9from threading import Lock as _Lock 10 11 12def _evolve_func_mp_pool(ser_algo_pop): 13 # The evolve function that is actually run from the separate processes 14 # in mp_island (when using the pool). 15 import pickle 16 algo, pop = pickle.loads(ser_algo_pop) 17 new_pop = algo.evolve(pop) 18 return pickle.dumps((algo, new_pop)) 19 20 21def _evolve_func_mp_pipe(conn, ser_algo_pop): 22 # The evolve function that is actually run from the separate processes 23 # in mp_island (when *not* using the pool). Communication with the 24 # parent process happens through the conn pipe. 25 from ._mp_utils import _temp_disable_sigint 26 27 # NOTE: disable SIGINT with the goal of preventing the user from accidentally 28 # interrupting the evolution via hitting Ctrl+C in an interactive session 29 # in the parent process. Note that this disables the signal only during 30 # evolution, but the signal is still enabled when the process is bootstrapping 31 # (so the user can still cause troubles in the child process with a well-timed 32 # Ctrl+C). There's nothing we can do about it: the only way would be to disable 33 # SIGINT before creating the child process, but unfortunately the creation 34 # of a child process happens in a separate thread and Python disallows messing 35 # with signal handlers from a thread different from the main one :( 36 with _temp_disable_sigint(): 37 import pickle 38 try: 39 algo, pop = pickle.loads(ser_algo_pop) 40 new_pop = algo.evolve(pop) 41 conn.send(pickle.dumps((algo, new_pop))) 42 except Exception as e: 43 conn.send(RuntimeError( 44 "An exception was raised in the evolution of a multiprocessing island. The full error message is:\n{}".format(e))) 45 finally: 46 conn.close() 47 48 49class mp_island(object): 50 """Multiprocessing island. 51 52 .. versionadded:: 2.10 53 54 The *use_pool* parameter (in previous versions, :class:`~pygmo.mp_island` always used a process pool). 55 56 This user-defined island (UDI) will dispatch evolution tasks to an external Python process 57 using the facilities provided by the standard Python :mod:`multiprocessing` module. 58 59 If the construction argument *use_pool* is :data:`True`, then a process from a global 60 :class:`pool <multiprocessing.pool.Pool>` shared between different instances of 61 :class:`~pygmo.mp_island` will be used. The pool is created either implicitly by the construction 62 of the first :class:`~pygmo.mp_island` object or explicitly via the :func:`~pygmo.mp_island.init_pool()` 63 static method. The default number of processes in the pool is equal to the number of logical CPUs on the 64 current machine. The pool's size can be queried via :func:`~pygmo.mp_island.get_pool_size()`, 65 and changed via :func:`~pygmo.mp_island.resize_pool()`. The pool can be stopped via 66 :func:`~pygmo.mp_island.shutdown_pool()`. 67 68 If *use_pool* is :data:`False`, each evolution launched by an :class:`~pygmo.mp_island` will be offloaded 69 to a new :class:`process <multiprocessing.Process>` which will then be terminated at the end of the evolution. 70 71 Generally speaking, a process pool will be faster (and will use fewer resources) than spawning a new process 72 for every evolution. A process pool, however, by its very nature limits the number of evolutions that can 73 be run simultaneously on the system, and it introduces a serializing behaviour that might not be desirable 74 in certain situations (e.g., when studying parallel evolution with migration in an :class:`~pygmo.archipelago`). 75 76 .. note:: 77 78 Due to certain implementation details of CPython, it is not possible to initialise, resize or shutdown the pool 79 from a thread different from the main one. Normally this is not a problem, but, for instance, if the first 80 :class:`~pygmo.mp_island` instance is created in a thread different from the main one, an error 81 will be raised. In such a situation, the user should ensure to call :func:`~pygmo.mp_island.init_pool()` 82 from the main thread before spawning the secondary thread. 83 84 .. warning:: 85 86 Due to internal limitations of CPython, sending an interrupt signal (e.g., by pressing ``Ctrl+C`` in an interactive 87 Python session) while an :class:`~pygmo.mp_island` is evolving might end up sending an interrupt signal also to the 88 external evolution process(es). This can lead to unpredictable runtime behaviour (e.g., the session may hang). Although 89 pygmo tries hard to limit as much as possible the chances of this occurrence, it cannot eliminate them completely. Users 90 are thus advised to tread carefully with interrupt signals (especially in interactive sessions) when using 91 :class:`~pygmo.mp_island`. 92 93 .. warning:: 94 95 Due to an `upstream bug <https://bugs.python.org/issue38501>`_, when using Python 3.8 the multiprocessing 96 machinery may lead to a hangup when exiting a Python session. As a workaround until the bug is resolved, users 97 are advised to explicitly call :func:`~pygmo.mp_island.shutdown_pool()` before exiting a Python session. 98 99 """ 100 101 # Static variables for the pool. 102 _pool_lock = _Lock() 103 _pool = None 104 _pool_size = None 105 106 def __init__(self, use_pool=True): 107 """ 108 Args: 109 110 use_pool(:class:`bool`): if :data:`True`, a process from a global pool will be used to run the evolution, otherwise a new 111 process will be spawned for each evolution 112 113 Raises: 114 115 TypeError: if *use_pool* is not of type :class:`bool` 116 unspecified: any exception thrown by :func:`~pygmo.mp_island.init_pool()` if *use_pool* is :data:`True` 117 118 """ 119 self._init(use_pool) 120 121 def _init(self, use_pool): 122 # Implementation of the ctor. Factored out 123 # because it's re-used in the pickling support. 124 if not isinstance(use_pool, bool): 125 raise TypeError( 126 "The 'use_pool' parameter in the mp_island constructor must be a boolean, but it is of type {} instead.".format(type(use_pool))) 127 self._use_pool = use_pool 128 if self._use_pool: 129 # Init the process pool, if necessary. 130 mp_island.init_pool() 131 else: 132 # Init the pid member and associated lock. 133 self._pid_lock = _Lock() 134 self._pid = None 135 136 @property 137 def use_pool(self): 138 """Pool usage flag (read-only). 139 140 Returns: 141 142 :class:`bool`: :data:`True` if this island uses a process pool, :data:`False` otherwise 143 144 """ 145 return self._use_pool 146 147 def __copy__(self): 148 # For copy/deepcopy, construct a new instance 149 # with the same arguments used to construct self. 150 # NOTE: no need for locking, as _use_pool is set 151 # on construction and never touched again. 152 return mp_island(self._use_pool) 153 154 def __deepcopy__(self, d): 155 return self.__copy__() 156 157 def __getstate__(self): 158 # For pickle/unpickle, we employ the construction 159 # argument, which will be used to re-init the class 160 # during unpickle. 161 return self._use_pool 162 163 def __setstate__(self, state): 164 # NOTE: we need to do a full init of the object, 165 # in order to set the use_pool flag and, if necessary, 166 # construct the _pid and _pid_lock objects. 167 self._init(state) 168 169 def run_evolve(self, algo, pop): 170 """Evolve population. 171 172 This method will evolve the input :class:`~pygmo.population` *pop* using the input 173 :class:`~pygmo.algorithm` *algo*, and return *algo* and the evolved population. The evolution 174 is run either on one of the processes of the pool backing :class:`~pygmo.mp_island`, or in 175 a new separate process. If this island is using a pool, and the pool was previously 176 shut down via :func:`~pygmo.mp_island.shutdown_pool()`, an exception will be raised. 177 178 Args: 179 180 algo(:class:`~pygmo.algorithm`): the input algorithm 181 pop(:class:`~pygmo.population`): the input population 182 183 Returns: 184 185 :class:`tuple`: a tuple of 2 elements containing *algo* (i.e., the :class:`~pygmo.algorithm` object that was used for the evolution) and the evolved :class:`~pygmo.population` 186 187 Raises: 188 189 RuntimeError: if the pool was manually shut down via :func:`~pygmo.mp_island.shutdown_pool()` 190 unspecified: any exception thrown by the evolution, by the (de)serialization 191 of the input arguments or of the return value, or by the public interface of the 192 process pool 193 194 195 """ 196 # NOTE: the idea here is that we pass the *already serialized* 197 # arguments to the mp machinery, instead of letting the multiprocessing 198 # module do the serialization. The advantage of doing so is 199 # that if there are serialization errors, we catch them early here rather 200 # than failing in the bootstrap phase of the remote process, which 201 # can lead to hangups. 202 import pickle 203 ser_algo_pop = pickle.dumps((algo, pop)) 204 205 if self._use_pool: 206 with mp_island._pool_lock: 207 # NOTE: run this while the pool is locked. We have 208 # functions to modify the pool (e.g., resize()) and 209 # we need to make sure we are not trying to touch 210 # the pool while we are sending tasks to it. 211 if mp_island._pool is None: 212 raise RuntimeError( 213 "The multiprocessing island pool was stopped. Please restart it via mp_island.init_pool().") 214 res = mp_island._pool.apply_async( 215 _evolve_func_mp_pool, (ser_algo_pop,)) 216 # NOTE: there might be a bug in need of a workaround lurking in here: 217 # http://stackoverflow.com/questions/11312525/catch-ctrlc-sigint-and-exit-multiprocesses-gracefully-in-python 218 # Just keep it in mind. 219 return pickle.loads(res.get()) 220 else: 221 from ._mp_utils import _get_spawn_context 222 223 # Get the context for spawning the process. 224 mp_ctx = _get_spawn_context() 225 226 parent_conn, child_conn = mp_ctx.Pipe(duplex=False) 227 p = mp_ctx.Process(target=_evolve_func_mp_pipe, 228 args=(child_conn, ser_algo_pop)) 229 p.start() 230 with self._pid_lock: 231 self._pid = p.pid 232 # NOTE: after setting the pid, wrap everything 233 # in a try block with a finally clause for 234 # resetting the pid to None. This way, even 235 # if there are exceptions, we are sure the pid 236 # is set back to None. 237 try: 238 res = parent_conn.recv() 239 p.join() 240 finally: 241 with self._pid_lock: 242 self._pid = None 243 if isinstance(res, RuntimeError): 244 raise res 245 return pickle.loads(res) 246 247 @property 248 def pid(self): 249 """ID of the evolution process (read-only). 250 251 This property is available only if the island is *not* using a process pool. 252 253 Returns: 254 255 :class:`int`: the ID of the process running the current evolution, or :data:`None` if no evolution is ongoing 256 257 Raises: 258 259 ValueError: if the island is using a process pool 260 261 """ 262 if self._use_pool: 263 raise ValueError( 264 "The 'pid' property is available only when the island is configured to spawn new processes, but this mp_island is using a process pool instead.") 265 with self._pid_lock: 266 pid = self._pid 267 return pid 268 269 def get_name(self): 270 """Island's name. 271 272 Returns: 273 274 :class:`str`: ``"Multiprocessing island"`` 275 276 """ 277 return "Multiprocessing island" 278 279 def get_extra_info(self): 280 """Island's extra info. 281 282 If the island uses a process pool and the pool was previously shut down via :func:`~pygmo.mp_island.shutdown_pool()`, 283 invoking this function will trigger the creation of a new pool. 284 285 Returns: 286 287 :class:`str`: a string containing information about the state of the island (e.g., number of processes in the pool, ID of the evolution process, etc.) 288 289 Raises: 290 291 unspecified: any exception thrown by :func:`~pygmo.mp_island.get_pool_size()` 292 293 """ 294 retval = "\tUsing a process pool: {}\n".format( 295 "yes" if self._use_pool else "no") 296 if self._use_pool: 297 retval += "\tNumber of processes in the pool: {}".format( 298 mp_island.get_pool_size()) 299 else: 300 with self._pid_lock: 301 pid = self._pid 302 if pid is None: 303 retval += "\tNo active evolution process" 304 else: 305 retval += "\tEvolution process ID: {}".format(pid) 306 return retval 307 308 @staticmethod 309 def _init_pool_impl(processes): 310 # Implementation method for initing 311 # the pool. This will *not* do any locking. 312 from ._mp_utils import _make_pool 313 314 if mp_island._pool is None: 315 mp_island._pool, mp_island._pool_size = _make_pool(processes) 316 317 @staticmethod 318 def init_pool(processes=None): 319 """Initialise the process pool. 320 321 This method will initialise the process pool backing :class:`~pygmo.mp_island`, if the pool 322 has not been initialised yet or if the pool was previously shut down via :func:`~pygmo.mp_island.shutdown_pool()`. 323 Otherwise, this method will have no effects. 324 325 Args: 326 327 processes(:data:`None` or an :class:`int`): the size of the pool (if :data:`None`, the size of the pool will be 328 equal to the number of logical CPUs on the system) 329 330 Raises: 331 332 ValueError: if the pool does not exist yet and the function is being called from a thread different 333 from the main one, or if *processes* is a non-positive value 334 TypeError: if *processes* is not :data:`None` and not an :class:`int` 335 336 """ 337 with mp_island._pool_lock: 338 mp_island._init_pool_impl(processes) 339 340 @staticmethod 341 def get_pool_size(): 342 """Get the size of the process pool. 343 344 If the process pool was previously shut down via :func:`~pygmo.mp_island.shutdown_pool()`, invoking this 345 function will trigger the creation of a new pool. 346 347 Returns: 348 349 :class:`int`: the current size of the pool 350 351 Raises: 352 353 unspecified: any exception thrown by :func:`~pygmo.mp_island.init_pool()` 354 355 """ 356 with mp_island._pool_lock: 357 mp_island._init_pool_impl(None) 358 return mp_island._pool_size 359 360 @staticmethod 361 def resize_pool(processes): 362 """Resize pool. 363 364 This method will resize the process pool backing :class:`~pygmo.mp_island`. 365 366 If the process pool was previously shut down via :func:`~pygmo.mp_island.shutdown_pool()`, invoking this 367 function will trigger the creation of a new pool. 368 369 Args: 370 371 processes(:class:`int`): the desired number of processes in the pool 372 373 Raises: 374 375 TypeError: if the *processes* argument is not an :class:`int` 376 ValueError: if the *processes* argument is not strictly positive 377 unspecified: any exception thrown by :func:`~pygmo.mp_island.init_pool()` 378 379 """ 380 from ._mp_utils import _make_pool 381 382 if not isinstance(processes, int): 383 raise TypeError("The 'processes' argument must be an int") 384 if processes <= 0: 385 raise ValueError( 386 "The 'processes' argument must be strictly positive") 387 388 with mp_island._pool_lock: 389 # NOTE: this will either init a new pool 390 # with the requested number of processes, 391 # or do nothing if the pool exists already. 392 mp_island._init_pool_impl(processes) 393 if processes == mp_island._pool_size: 394 # Don't do anything if we are not changing 395 # the size of the pool. 396 return 397 # Create new pool. 398 new_pool, new_size = _make_pool(processes) 399 # Stop the current pool. 400 mp_island._pool.close() 401 mp_island._pool.join() 402 # Assign the new pool. 403 mp_island._pool = new_pool 404 mp_island._pool_size = new_size 405 406 @staticmethod 407 def shutdown_pool(): 408 """Shutdown pool. 409 410 .. versionadded:: 2.8 411 412 This method will shut down the process pool backing :class:`~pygmo.mp_island`, after 413 all pending tasks in the pool have completed. 414 415 After the process pool has been shut down, attempting to run an evolution on the island 416 will raise an error. A new process pool can be created via an explicit call to 417 :func:`~pygmo.mp_island.init_pool()` or one of the methods of the public API of 418 :class:`~pygmo.mp_island` which trigger the creation of a new process pool. 419 420 """ 421 with mp_island._pool_lock: 422 if mp_island._pool is not None: 423 mp_island._pool.close() 424 mp_island._pool.join() 425 mp_island._pool = None 426 mp_island._pool_size = None 427 428 429def _evolve_func_ipy(ser_algo_pop): 430 # The evolve function that is actually run from the separate processes 431 # in ipyparallel_island. 432 import pickle 433 algo, pop = pickle.loads(ser_algo_pop) 434 new_pop = algo.evolve(pop) 435 return pickle.dumps((algo, new_pop)) 436 437 438class ipyparallel_island(object): 439 """Ipyparallel island. 440 441 This user-defined island (UDI) will dispatch evolution tasks to an ipyparallel cluster. 442 The communication with the cluster is managed via an :class:`ipyparallel.LoadBalancedView` 443 instance which is created either implicitly when the first evolution is run, or explicitly 444 via the :func:`~pygmo.ipyparallel_island.init_view()` method. The 445 :class:`~ipyparallel.LoadBalancedView` instance is a global object shared among all the 446 ipyparallel islands. 447 448 .. seealso:: 449 450 https://ipyparallel.readthedocs.io/en/latest/ 451 452 """ 453 454 # Static variables for the view. 455 _view_lock = _Lock() 456 _view = None 457 458 @staticmethod 459 def init_view(client_args=[], client_kwargs={}, view_args=[], view_kwargs={}): 460 """Init the ipyparallel view. 461 462 .. versionadded:: 2.12 463 464 This method will initialise the :class:`ipyparallel.LoadBalancedView` 465 which is used by all ipyparallel islands to submit the evolution tasks 466 to an ipyparallel cluster. If the :class:`ipyparallel.LoadBalancedView` 467 has already been created, this method will perform no action. 468 469 The input arguments *client_args* and *client_kwargs* are forwarded 470 as positional and keyword arguments to the construction of an 471 :class:`ipyparallel.Client` instance. From the constructed client, 472 an :class:`ipyparallel.LoadBalancedView` instance is then created 473 via the :func:`ipyparallel.Client.load_balanced_view()` method, to 474 which the positional and keyword arguments *view_args* and 475 *view_kwargs* are passed. 476 477 Note that usually it is not necessary to explicitly invoke this 478 method: an :class:`ipyparallel.LoadBalancedView` is automatically 479 constructed with default settings the first time an evolution task 480 is submitted to an ipyparallel island. This method should be used 481 only if it is necessary to pass custom arguments to the construction 482 of the :class:`ipyparallel.Client` or :class:`ipyparallel.LoadBalancedView` 483 objects. 484 485 Args: 486 487 client_args(:class:`list`): the positional arguments used for the 488 construction of the client 489 client_kwargs(:class:`dict`): the keyword arguments used for the 490 construction of the client 491 view_args(:class:`list`): the positional arguments used for the 492 construction of the view 493 view_kwargs(:class:`dict`): the keyword arguments used for the 494 construction of the view 495 496 Raises: 497 498 unspecified: any exception thrown by the constructor of :class:`ipyparallel.Client` 499 or by the :func:`ipyparallel.Client.load_balanced_view()` method 500 501 """ 502 from ._ipyparallel_utils import _make_ipyparallel_view 503 504 with ipyparallel_island._view_lock: 505 if ipyparallel_island._view is None: 506 # Create the new view. 507 ipyparallel_island._view = _make_ipyparallel_view( 508 client_args, client_kwargs, view_args, view_kwargs) 509 510 @staticmethod 511 def shutdown_view(): 512 """Destroy the ipyparallel view. 513 514 .. versionadded:: 2.12 515 516 This method will destroy the :class:`ipyparallel.LoadBalancedView` 517 currently being used by the ipyparallel islands for submitting 518 evolution tasks to an ipyparallel cluster. The view can be re-inited 519 implicitly by submitting a new evolution task, or by invoking 520 the :func:`~pygmo.ipyparallel_island.init_view()` method. 521 522 """ 523 import gc 524 with ipyparallel_island._view_lock: 525 if ipyparallel_island._view is None: 526 return 527 528 old_view = ipyparallel_island._view 529 ipyparallel_island._view = None 530 del(old_view) 531 gc.collect() 532 533 def run_evolve(self, algo, pop): 534 """Evolve population. 535 536 This method will evolve the input :class:`~pygmo.population` *pop* using the input 537 :class:`~pygmo.algorithm` *algo*, and return *algo* and the evolved population. The evolution 538 task is submitted to the ipyparallel cluster via a global :class:`ipyparallel.LoadBalancedView` 539 instance initialised either implicitly by the first invocation of this method, 540 or by an explicit call to the :func:`~pygmo.ipyparallel_island.init_view()` method. 541 542 Args: 543 544 pop(:class:`~pygmo.population`): the input population 545 algo(:class:`~pygmo.algorithm`): the input algorithm 546 547 Returns: 548 549 :class:`tuple`: a tuple of 2 elements containing *algo* (i.e., the :class:`~pygmo.algorithm` object that was used for the evolution) and the evolved :class:`~pygmo.population` 550 551 Raises: 552 553 unspecified: any exception thrown by the evolution, by the creation of a 554 :class:`ipyparallel.LoadBalancedView`, or by the sumission of the evolution task 555 to the ipyparallel cluster 556 557 """ 558 # NOTE: as in the mp_island, we pre-serialize 559 # the algo and pop, so that we can catch 560 # serialization errors early. 561 import pickle 562 from ._ipyparallel_utils import _make_ipyparallel_view 563 564 ser_algo_pop = pickle.dumps((algo, pop)) 565 with ipyparallel_island._view_lock: 566 if ipyparallel_island._view is None: 567 ipyparallel_island._view = _make_ipyparallel_view( 568 [], {}, [], {}) 569 ret = ipyparallel_island._view.apply_async( 570 _evolve_func_ipy, ser_algo_pop) 571 572 return pickle.loads(ret.get()) 573 574 def get_name(self): 575 """Island's name. 576 577 Returns: 578 :class:`str`: ``"Ipyparallel island"`` 579 580 """ 581 return "Ipyparallel island" 582 583 def get_extra_info(self): 584 """Island's extra info. 585 586 Returns: 587 :class:`str`: a string with extra information about the status of the island 588 589 """ 590 from copy import deepcopy 591 with ipyparallel_island._view_lock: 592 if ipyparallel_island._view is None: 593 return "\tNo cluster view has been created yet" 594 else: 595 d = deepcopy(ipyparallel_island._view.queue_status()) 596 return "\tQueue status:\n\t\n\t" + "\n\t".join(["(" + str(k) + ", " + str(d[k]) + ")" for k in d]) 597