1# Copyright 2020, 2021 PaGMO development team
2#
3# This file is part of the pygmo library.
4#
5# This Source Code Form is subject to the terms of the Mozilla
6# Public License v. 2.0. If a copy of the MPL was not distributed
7# with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
8
9from threading import Lock as _Lock
10
11
12def _evolve_func_mp_pool(ser_algo_pop):
13    # The evolve function that is actually run from the separate processes
14    # in mp_island (when using the pool).
15    import pickle
16    algo, pop = pickle.loads(ser_algo_pop)
17    new_pop = algo.evolve(pop)
18    return pickle.dumps((algo, new_pop))
19
20
21def _evolve_func_mp_pipe(conn, ser_algo_pop):
22    # The evolve function that is actually run from the separate processes
23    # in mp_island (when *not* using the pool). Communication with the
24    # parent process happens through the conn pipe.
25    from ._mp_utils import _temp_disable_sigint
26
27    # NOTE: disable SIGINT with the goal of preventing the user from accidentally
28    # interrupting the evolution via hitting Ctrl+C in an interactive session
29    # in the parent process. Note that this disables the signal only during
30    # evolution, but the signal is still enabled when the process is bootstrapping
31    # (so the user can still cause troubles in the child process with a well-timed
32    # Ctrl+C). There's nothing we can do about it: the only way would be to disable
33    # SIGINT before creating the child process, but unfortunately the creation
34    # of a child process happens in a separate thread and Python disallows messing
35    # with signal handlers from a thread different from the main one :(
36    with _temp_disable_sigint():
37        import pickle
38        try:
39            algo, pop = pickle.loads(ser_algo_pop)
40            new_pop = algo.evolve(pop)
41            conn.send(pickle.dumps((algo, new_pop)))
42        except Exception as e:
43            conn.send(RuntimeError(
44                "An exception was raised in the evolution of a multiprocessing island. The full error message is:\n{}".format(e)))
45        finally:
46            conn.close()
47
48
49class mp_island(object):
50    """Multiprocessing island.
51
52    .. versionadded:: 2.10
53
54       The *use_pool* parameter (in previous versions, :class:`~pygmo.mp_island` always used a process pool).
55
56    This user-defined island (UDI) will dispatch evolution tasks to an external Python process
57    using the facilities provided by the standard Python :mod:`multiprocessing` module.
58
59    If the construction argument *use_pool* is :data:`True`, then a process from a global
60    :class:`pool <multiprocessing.pool.Pool>` shared between different instances of
61    :class:`~pygmo.mp_island` will be used. The pool is created either implicitly by the construction
62    of the first :class:`~pygmo.mp_island` object or explicitly via the :func:`~pygmo.mp_island.init_pool()`
63    static method. The default number of processes in the pool is equal to the number of logical CPUs on the
64    current machine. The pool's size can be queried via :func:`~pygmo.mp_island.get_pool_size()`,
65    and changed via :func:`~pygmo.mp_island.resize_pool()`. The pool can be stopped via
66    :func:`~pygmo.mp_island.shutdown_pool()`.
67
68    If *use_pool* is :data:`False`, each evolution launched by an :class:`~pygmo.mp_island` will be offloaded
69    to a new :class:`process <multiprocessing.Process>` which will then be terminated at the end of the evolution.
70
71    Generally speaking, a process pool will be faster (and will use fewer resources) than spawning a new process
72    for every evolution. A process pool, however, by its very nature limits the number of evolutions that can
73    be run simultaneously on the system, and it introduces a serializing behaviour that might not be desirable
74    in certain situations (e.g., when studying parallel evolution with migration in an :class:`~pygmo.archipelago`).
75
76    .. note::
77
78       Due to certain implementation details of CPython, it is not possible to initialise, resize or shutdown the pool
79       from a thread different from the main one. Normally this is not a problem, but, for instance, if the first
80       :class:`~pygmo.mp_island` instance is created in a thread different from the main one, an error
81       will be raised. In such a situation, the user should ensure to call :func:`~pygmo.mp_island.init_pool()`
82       from the main thread before spawning the secondary thread.
83
84    .. warning::
85
86       Due to internal limitations of CPython, sending an interrupt signal (e.g., by pressing ``Ctrl+C`` in an interactive
87       Python session) while an :class:`~pygmo.mp_island` is evolving might end up sending an interrupt signal also to the
88       external evolution process(es). This can lead to unpredictable runtime behaviour (e.g., the session may hang). Although
89       pygmo tries hard to limit as much as possible the chances of this occurrence, it cannot eliminate them completely. Users
90       are thus advised to tread carefully with interrupt signals (especially in interactive sessions) when using
91       :class:`~pygmo.mp_island`.
92
93    .. warning::
94
95       Due to an `upstream bug <https://bugs.python.org/issue38501>`_, when using Python 3.8 the multiprocessing
96       machinery may lead to a hangup when exiting a Python session. As a workaround until the bug is resolved, users
97       are advised to explicitly call :func:`~pygmo.mp_island.shutdown_pool()` before exiting a Python session.
98
99    """
100
101    # Static variables for the pool.
102    _pool_lock = _Lock()
103    _pool = None
104    _pool_size = None
105
106    def __init__(self, use_pool=True):
107        """
108        Args:
109
110           use_pool(:class:`bool`): if :data:`True`, a process from a global pool will be used to run the evolution, otherwise a new
111              process will be spawned for each evolution
112
113        Raises:
114
115           TypeError: if *use_pool* is not of type :class:`bool`
116           unspecified: any exception thrown by :func:`~pygmo.mp_island.init_pool()` if *use_pool* is :data:`True`
117
118        """
119        self._init(use_pool)
120
121    def _init(self, use_pool):
122        # Implementation of the ctor. Factored out
123        # because it's re-used in the pickling support.
124        if not isinstance(use_pool, bool):
125            raise TypeError(
126                "The 'use_pool' parameter in the mp_island constructor must be a boolean, but it is of type {} instead.".format(type(use_pool)))
127        self._use_pool = use_pool
128        if self._use_pool:
129            # Init the process pool, if necessary.
130            mp_island.init_pool()
131        else:
132            # Init the pid member and associated lock.
133            self._pid_lock = _Lock()
134            self._pid = None
135
136    @property
137    def use_pool(self):
138        """Pool usage flag (read-only).
139
140        Returns:
141
142           :class:`bool`: :data:`True` if this island uses a process pool, :data:`False` otherwise
143
144        """
145        return self._use_pool
146
147    def __copy__(self):
148        # For copy/deepcopy, construct a new instance
149        # with the same arguments used to construct self.
150        # NOTE: no need for locking, as _use_pool is set
151        # on construction and never touched again.
152        return mp_island(self._use_pool)
153
154    def __deepcopy__(self, d):
155        return self.__copy__()
156
157    def __getstate__(self):
158        # For pickle/unpickle, we employ the construction
159        # argument, which will be used to re-init the class
160        # during unpickle.
161        return self._use_pool
162
163    def __setstate__(self, state):
164        # NOTE: we need to do a full init of the object,
165        # in order to set the use_pool flag and, if necessary,
166        # construct the _pid and _pid_lock objects.
167        self._init(state)
168
169    def run_evolve(self, algo, pop):
170        """Evolve population.
171
172        This method will evolve the input :class:`~pygmo.population` *pop* using the input
173        :class:`~pygmo.algorithm` *algo*, and return *algo* and the evolved population. The evolution
174        is run either on one of the processes of the pool backing :class:`~pygmo.mp_island`, or in
175        a new separate process. If this island is using a pool, and the pool was previously
176        shut down via :func:`~pygmo.mp_island.shutdown_pool()`, an exception will be raised.
177
178        Args:
179
180           algo(:class:`~pygmo.algorithm`): the input algorithm
181           pop(:class:`~pygmo.population`): the input population
182
183        Returns:
184
185           :class:`tuple`: a tuple of 2 elements containing *algo* (i.e., the :class:`~pygmo.algorithm` object that was used for the evolution) and the evolved :class:`~pygmo.population`
186
187        Raises:
188
189           RuntimeError: if the pool was manually shut down via :func:`~pygmo.mp_island.shutdown_pool()`
190           unspecified: any exception thrown by the evolution, by the (de)serialization
191             of the input arguments or of the return value, or by the public interface of the
192             process pool
193
194
195        """
196        # NOTE: the idea here is that we pass the *already serialized*
197        # arguments to the mp machinery, instead of letting the multiprocessing
198        # module do the serialization. The advantage of doing so is
199        # that if there are serialization errors, we catch them early here rather
200        # than failing in the bootstrap phase of the remote process, which
201        # can lead to hangups.
202        import pickle
203        ser_algo_pop = pickle.dumps((algo, pop))
204
205        if self._use_pool:
206            with mp_island._pool_lock:
207                # NOTE: run this while the pool is locked. We have
208                # functions to modify the pool (e.g., resize()) and
209                # we need to make sure we are not trying to touch
210                # the pool while we are sending tasks to it.
211                if mp_island._pool is None:
212                    raise RuntimeError(
213                        "The multiprocessing island pool was stopped. Please restart it via mp_island.init_pool().")
214                res = mp_island._pool.apply_async(
215                    _evolve_func_mp_pool, (ser_algo_pop,))
216            # NOTE: there might be a bug in need of a workaround lurking in here:
217            # http://stackoverflow.com/questions/11312525/catch-ctrlc-sigint-and-exit-multiprocesses-gracefully-in-python
218            # Just keep it in mind.
219            return pickle.loads(res.get())
220        else:
221            from ._mp_utils import _get_spawn_context
222
223            # Get the context for spawning the process.
224            mp_ctx = _get_spawn_context()
225
226            parent_conn, child_conn = mp_ctx.Pipe(duplex=False)
227            p = mp_ctx.Process(target=_evolve_func_mp_pipe,
228                               args=(child_conn, ser_algo_pop))
229            p.start()
230            with self._pid_lock:
231                self._pid = p.pid
232            # NOTE: after setting the pid, wrap everything
233            # in a try block with a finally clause for
234            # resetting the pid to None. This way, even
235            # if there are exceptions, we are sure the pid
236            # is set back to None.
237            try:
238                res = parent_conn.recv()
239                p.join()
240            finally:
241                with self._pid_lock:
242                    self._pid = None
243            if isinstance(res, RuntimeError):
244                raise res
245            return pickle.loads(res)
246
247    @property
248    def pid(self):
249        """ID of the evolution process (read-only).
250
251        This property is available only if the island is *not* using a process pool.
252
253        Returns:
254
255           :class:`int`: the ID of the process running the current evolution, or :data:`None` if no evolution is ongoing
256
257        Raises:
258
259           ValueError: if the island is using a process pool
260
261        """
262        if self._use_pool:
263            raise ValueError(
264                "The 'pid' property is available only when the island is configured to spawn new processes, but this mp_island is using a process pool instead.")
265        with self._pid_lock:
266            pid = self._pid
267        return pid
268
269    def get_name(self):
270        """Island's name.
271
272        Returns:
273
274           :class:`str`: ``"Multiprocessing island"``
275
276        """
277        return "Multiprocessing island"
278
279    def get_extra_info(self):
280        """Island's extra info.
281
282        If the island uses a process pool and the pool was previously shut down via :func:`~pygmo.mp_island.shutdown_pool()`,
283        invoking this function will trigger the creation of a new pool.
284
285        Returns:
286
287           :class:`str`: a string containing information about the state of the island (e.g., number of processes in the pool, ID of the evolution process, etc.)
288
289        Raises:
290
291           unspecified: any exception thrown by :func:`~pygmo.mp_island.get_pool_size()`
292
293        """
294        retval = "\tUsing a process pool: {}\n".format(
295            "yes" if self._use_pool else "no")
296        if self._use_pool:
297            retval += "\tNumber of processes in the pool: {}".format(
298                mp_island.get_pool_size())
299        else:
300            with self._pid_lock:
301                pid = self._pid
302            if pid is None:
303                retval += "\tNo active evolution process"
304            else:
305                retval += "\tEvolution process ID: {}".format(pid)
306        return retval
307
308    @staticmethod
309    def _init_pool_impl(processes):
310        # Implementation method for initing
311        # the pool. This will *not* do any locking.
312        from ._mp_utils import _make_pool
313
314        if mp_island._pool is None:
315            mp_island._pool, mp_island._pool_size = _make_pool(processes)
316
317    @staticmethod
318    def init_pool(processes=None):
319        """Initialise the process pool.
320
321        This method will initialise the process pool backing :class:`~pygmo.mp_island`, if the pool
322        has not been initialised yet or if the pool was previously shut down via :func:`~pygmo.mp_island.shutdown_pool()`.
323        Otherwise, this method will have no effects.
324
325        Args:
326
327           processes(:data:`None` or an :class:`int`): the size of the pool (if :data:`None`, the size of the pool will be
328             equal to the number of logical CPUs on the system)
329
330        Raises:
331
332           ValueError: if the pool does not exist yet and the function is being called from a thread different
333             from the main one, or if *processes* is a non-positive value
334           TypeError: if *processes* is not :data:`None` and not an :class:`int`
335
336        """
337        with mp_island._pool_lock:
338            mp_island._init_pool_impl(processes)
339
340    @staticmethod
341    def get_pool_size():
342        """Get the size of the process pool.
343
344        If the process pool was previously shut down via :func:`~pygmo.mp_island.shutdown_pool()`, invoking this
345        function will trigger the creation of a new pool.
346
347        Returns:
348
349           :class:`int`: the current size of the pool
350
351        Raises:
352
353           unspecified: any exception thrown by :func:`~pygmo.mp_island.init_pool()`
354
355        """
356        with mp_island._pool_lock:
357            mp_island._init_pool_impl(None)
358            return mp_island._pool_size
359
360    @staticmethod
361    def resize_pool(processes):
362        """Resize pool.
363
364        This method will resize the process pool backing :class:`~pygmo.mp_island`.
365
366        If the process pool was previously shut down via :func:`~pygmo.mp_island.shutdown_pool()`, invoking this
367        function will trigger the creation of a new pool.
368
369        Args:
370
371           processes(:class:`int`): the desired number of processes in the pool
372
373        Raises:
374
375           TypeError: if the *processes* argument is not an :class:`int`
376           ValueError: if the *processes* argument is not strictly positive
377           unspecified: any exception thrown by :func:`~pygmo.mp_island.init_pool()`
378
379        """
380        from ._mp_utils import _make_pool
381
382        if not isinstance(processes, int):
383            raise TypeError("The 'processes' argument must be an int")
384        if processes <= 0:
385            raise ValueError(
386                "The 'processes' argument must be strictly positive")
387
388        with mp_island._pool_lock:
389            # NOTE: this will either init a new pool
390            # with the requested number of processes,
391            # or do nothing if the pool exists already.
392            mp_island._init_pool_impl(processes)
393            if processes == mp_island._pool_size:
394                # Don't do anything if we are not changing
395                # the size of the pool.
396                return
397            # Create new pool.
398            new_pool, new_size = _make_pool(processes)
399            # Stop the current pool.
400            mp_island._pool.close()
401            mp_island._pool.join()
402            # Assign the new pool.
403            mp_island._pool = new_pool
404            mp_island._pool_size = new_size
405
406    @staticmethod
407    def shutdown_pool():
408        """Shutdown pool.
409
410        .. versionadded:: 2.8
411
412        This method will shut down the process pool backing :class:`~pygmo.mp_island`, after
413        all pending tasks in the pool have completed.
414
415        After the process pool has been shut down, attempting to run an evolution on the island
416        will raise an error. A new process pool can be created via an explicit call to
417        :func:`~pygmo.mp_island.init_pool()` or one of the methods of the public API of
418        :class:`~pygmo.mp_island` which trigger the creation of a new process pool.
419
420        """
421        with mp_island._pool_lock:
422            if mp_island._pool is not None:
423                mp_island._pool.close()
424                mp_island._pool.join()
425                mp_island._pool = None
426                mp_island._pool_size = None
427
428
429def _evolve_func_ipy(ser_algo_pop):
430    # The evolve function that is actually run from the separate processes
431    # in ipyparallel_island.
432    import pickle
433    algo, pop = pickle.loads(ser_algo_pop)
434    new_pop = algo.evolve(pop)
435    return pickle.dumps((algo, new_pop))
436
437
438class ipyparallel_island(object):
439    """Ipyparallel island.
440
441    This user-defined island (UDI) will dispatch evolution tasks to an ipyparallel cluster.
442    The communication with the cluster is managed via an :class:`ipyparallel.LoadBalancedView`
443    instance which is created either implicitly when the first evolution is run, or explicitly
444    via the :func:`~pygmo.ipyparallel_island.init_view()` method. The
445    :class:`~ipyparallel.LoadBalancedView` instance is a global object shared among all the
446    ipyparallel islands.
447
448    .. seealso::
449
450       https://ipyparallel.readthedocs.io/en/latest/
451
452    """
453
454    # Static variables for the view.
455    _view_lock = _Lock()
456    _view = None
457
458    @staticmethod
459    def init_view(client_args=[], client_kwargs={}, view_args=[], view_kwargs={}):
460        """Init the ipyparallel view.
461
462        .. versionadded:: 2.12
463
464        This method will initialise the :class:`ipyparallel.LoadBalancedView`
465        which is used by all ipyparallel islands to submit the evolution tasks
466        to an ipyparallel cluster. If the :class:`ipyparallel.LoadBalancedView`
467        has already been created, this method will perform no action.
468
469        The input arguments *client_args* and *client_kwargs* are forwarded
470        as positional and keyword arguments to the construction of an
471        :class:`ipyparallel.Client` instance. From the constructed client,
472        an :class:`ipyparallel.LoadBalancedView` instance is then created
473        via the :func:`ipyparallel.Client.load_balanced_view()` method, to
474        which the positional and keyword arguments *view_args* and
475        *view_kwargs* are passed.
476
477        Note that usually it is not necessary to explicitly invoke this
478        method: an :class:`ipyparallel.LoadBalancedView` is automatically
479        constructed with default settings the first time an evolution task
480        is submitted to an ipyparallel island. This method should be used
481        only if it is necessary to pass custom arguments to the construction
482        of the :class:`ipyparallel.Client` or :class:`ipyparallel.LoadBalancedView`
483        objects.
484
485        Args:
486
487            client_args(:class:`list`): the positional arguments used for the
488              construction of the client
489            client_kwargs(:class:`dict`): the keyword arguments used for the
490              construction of the client
491            view_args(:class:`list`): the positional arguments used for the
492              construction of the view
493            view_kwargs(:class:`dict`): the keyword arguments used for the
494              construction of the view
495
496        Raises:
497
498           unspecified: any exception thrown by the constructor of :class:`ipyparallel.Client`
499             or by the :func:`ipyparallel.Client.load_balanced_view()` method
500
501        """
502        from ._ipyparallel_utils import _make_ipyparallel_view
503
504        with ipyparallel_island._view_lock:
505            if ipyparallel_island._view is None:
506                # Create the new view.
507                ipyparallel_island._view = _make_ipyparallel_view(
508                    client_args, client_kwargs, view_args, view_kwargs)
509
510    @staticmethod
511    def shutdown_view():
512        """Destroy the ipyparallel view.
513
514        .. versionadded:: 2.12
515
516        This method will destroy the :class:`ipyparallel.LoadBalancedView`
517        currently being used by the ipyparallel islands for submitting
518        evolution tasks to an ipyparallel cluster. The view can be re-inited
519        implicitly by submitting a new evolution task, or by invoking
520        the :func:`~pygmo.ipyparallel_island.init_view()` method.
521
522        """
523        import gc
524        with ipyparallel_island._view_lock:
525            if ipyparallel_island._view is None:
526                return
527
528            old_view = ipyparallel_island._view
529            ipyparallel_island._view = None
530            del(old_view)
531            gc.collect()
532
533    def run_evolve(self, algo, pop):
534        """Evolve population.
535
536        This method will evolve the input :class:`~pygmo.population` *pop* using the input
537        :class:`~pygmo.algorithm` *algo*, and return *algo* and the evolved population. The evolution
538        task is submitted to the ipyparallel cluster via a global :class:`ipyparallel.LoadBalancedView`
539        instance initialised either implicitly by the first invocation of this method,
540        or by an explicit call to the :func:`~pygmo.ipyparallel_island.init_view()` method.
541
542        Args:
543
544            pop(:class:`~pygmo.population`): the input population
545            algo(:class:`~pygmo.algorithm`): the input algorithm
546
547        Returns:
548
549            :class:`tuple`: a tuple of 2 elements containing *algo* (i.e., the :class:`~pygmo.algorithm` object that was used for the evolution) and the evolved :class:`~pygmo.population`
550
551        Raises:
552
553            unspecified: any exception thrown by the evolution, by the creation of a
554              :class:`ipyparallel.LoadBalancedView`, or by the sumission of the evolution task
555              to the ipyparallel cluster
556
557        """
558        # NOTE: as in the mp_island, we pre-serialize
559        # the algo and pop, so that we can catch
560        # serialization errors early.
561        import pickle
562        from ._ipyparallel_utils import _make_ipyparallel_view
563
564        ser_algo_pop = pickle.dumps((algo, pop))
565        with ipyparallel_island._view_lock:
566            if ipyparallel_island._view is None:
567                ipyparallel_island._view = _make_ipyparallel_view(
568                    [], {}, [], {})
569            ret = ipyparallel_island._view.apply_async(
570                _evolve_func_ipy, ser_algo_pop)
571
572        return pickle.loads(ret.get())
573
574    def get_name(self):
575        """Island's name.
576
577        Returns:
578            :class:`str`: ``"Ipyparallel island"``
579
580        """
581        return "Ipyparallel island"
582
583    def get_extra_info(self):
584        """Island's extra info.
585
586        Returns:
587            :class:`str`: a string with extra information about the status of the island
588
589        """
590        from copy import deepcopy
591        with ipyparallel_island._view_lock:
592            if ipyparallel_island._view is None:
593                return "\tNo cluster view has been created yet"
594            else:
595                d = deepcopy(ipyparallel_island._view.queue_status())
596        return "\tQueue status:\n\t\n\t" + "\n\t".join(["(" + str(k) + ", " + str(d[k]) + ")" for k in d])
597