1# Copyright 2020, 2021 PaGMO development team
2#
3# This file is part of the pygmo library.
4#
5# This Source Code Form is subject to the terms of the Mozilla
6# Public License v. 2.0. If a copy of the MPL was not distributed
7# with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
8
9
10from threading import Lock as _Lock
11
12
13def _mp_ipy_bfe_func(ser_prob_dv):
14    # The function that will be invoked
15    # by the individual processes/nodes of mp/ipy bfe.
16    import pickle
17
18    prob = pickle.loads(ser_prob_dv[0])
19    dv = pickle.loads(ser_prob_dv[1])
20
21    return pickle.dumps(prob.fitness(dv))
22
23
24class mp_bfe(object):
25    """Multiprocessing batch fitness evaluator.
26
27    .. versionadded:: 2.13
28
29    This user-defined batch fitness evaluator (UDBFE) will dispatch
30    the fitness evaluation in batch mode of a set of decision vectors
31    to a process pool created and managed via the facilities of the
32    standard Python :mod:`multiprocessing` module.
33
34    The evaluations of the decision vectors are dispatched to the processes
35    of a global :class:`pool <multiprocessing.pool.Pool>` shared between
36    different instances of :class:`~pygmo.mp_bfe`. The pool is created
37    either implicitly by the construction of the first :class:`~pygmo.mp_bfe`
38    object or explicitly via the :func:`~pygmo.mp_bfe.init_pool()`
39    static method. The default number of processes in the pool is equal to
40    the number of logical CPUs on the current machine. The pool's size can
41    be queried via :func:`~pygmo.mp_bfe.get_pool_size()`, and changed via
42    :func:`~pygmo.mp_bfe.resize_pool()`. The pool can be stopped via
43    :func:`~pygmo.mp_bfe.shutdown_pool()`.
44
45    .. note::
46
47       Due to certain implementation details of CPython, it is not possible to initialise, resize or shutdown the pool
48       from a thread different from the main one. Normally this is not a problem, but, for instance, if the first
49       :class:`~pygmo.mp_bfe` instance is created in a thread different from the main one, an error
50       will be raised. In such a situation, the user should ensure to call :func:`~pygmo.mp_bfe.init_pool()`
51       from the main thread before spawning the secondary thread.
52
53    .. warning::
54
55       Due to internal limitations of CPython, sending an interrupt signal (e.g., by pressing ``Ctrl+C`` in an interactive
56       Python session) while an :class:`~pygmo.mp_bfe` is running might end up sending an interrupt signal also to the
57       external process(es). This can lead to unpredictable runtime behaviour (e.g., the session may hang). Although
58       pygmo tries hard to limit as much as possible the chances of this occurrence, it cannot eliminate them completely. Users
59       are thus advised to tread carefully with interrupt signals (especially in interactive sessions) when using
60       :class:`~pygmo.mp_bfe`.
61
62    .. warning::
63
64       Due to an `upstream bug <https://bugs.python.org/issue38501>`_, when using Python 3.8 the multiprocessing
65       machinery may lead to a hangup when exiting a Python session. As a workaround until the bug is resolved, users
66       are advised to explicitly call :func:`~pygmo.mp_bfe.shutdown_pool()` before exiting a Python session.
67
68    """
69
70    # Static variables for the pool.
71    _pool_lock = _Lock()
72    _pool = None
73    _pool_size = None
74
75    def __init__(self, chunksize=None):
76        """
77        Args:
78
79           chunksize(:class:`int` or :data:`None`): if not :data:`None`, this positive integral represents
80             the approximate number of decision vectors that are processed by each task
81             submitted to the process pool by the call operator
82
83        Raises:
84
85           TypeError: if *chunksize* is neither :data:`None` nor a value of an integral type
86           ValueError: if *chunksize* is not strictly positive
87           unspecified: any exception thrown by :func:`~pygmo.mp_bfe.init_pool()`
88
89        """
90        if not chunksize is None and not isinstance(chunksize, int):
91            raise TypeError(
92                "The 'chunksize' argument must be None or an int, but it is of type '{}' instead".format(type(chunksize)))
93
94        if not chunksize is None and chunksize <= 0:
95            raise ValueError(
96                "The 'chunksize' parameter must be a positive integer, but its value is {} instead".format(chunksize))
97
98        # Init the process pool, if necessary.
99        mp_bfe.init_pool()
100
101        # Save the chunk size parameter.
102        self._chunksize = chunksize
103
104    def __call__(self, prob, dvs):
105        """Call operator.
106
107        This method will evaluate in batch mode the fitnesses of the input decision vectors
108        *dvs* using the fitness function from the optimisation problem *prob*. The fitness
109        evaluations are delegated to the processes of the pool backing
110        :class:`~pygmo.mp_bfe`.
111
112        See the documentation of :class:`pygmo.bfe` for an explanation of the expected
113        formats of *dvs* and of the return value.
114
115        Args:
116
117           prob(:class:`~pygmo.problem`): the input problem
118           dvs(:class:`numpy.ndarray`): the input decision vectors, represented as a
119             flattened 1D array
120
121        Returns:
122
123           :class:`numpy.ndarray`: the fitness vectors corresponding to *dvs*, represented as a
124             flattened 1D array
125
126        Raises:
127
128           unspecified: any exception thrown by the evaluations, by the (de)serialization
129             of the input arguments or of the return value, or by the public interface of the
130             process pool
131
132
133        """
134        import pickle
135        import numpy as np
136
137        # Fetch the dimension and the fitness
138        # dimension of the problem.
139        ndim = prob.get_nx()
140        nf = prob.get_nf()
141
142        # Compute the total number of decision
143        # vectors represented by dvs.
144        ndvs = len(dvs) // ndim
145        # Reshape dvs so that it represents
146        # ndvs decision vectors of dimension ndim
147        # each.
148        dvs.shape = (ndvs, ndim)
149
150        # Pre-serialize the problem.
151        pprob = pickle.dumps(prob)
152
153        # Build the list of arguments to pass
154        # to the processes in the pool.
155        async_args = [(pprob, pickle.dumps(dv)) for dv in dvs]
156
157        with mp_bfe._pool_lock:
158            # Make sure the pool exists.
159            mp_bfe._init_pool_impl(None)
160            # Runt the objfun evaluations in async mode.
161            if self._chunksize is None:
162                ret = mp_bfe._pool.map_async(_mp_ipy_bfe_func, async_args)
163            else:
164                ret = mp_bfe._pool.map_async(
165                    _mp_ipy_bfe_func, async_args, chunksize=self._chunksize)
166
167        # Build the vector of fitness vectors as a 2D numpy array.
168        fvs = np.array([pickle.loads(fv) for fv in ret.get()])
169        # Reshape it so that it is 1D.
170        fvs.shape = (ndvs*nf,)
171
172        return fvs
173
174    def get_name(self):
175        """Name of this evaluator.
176
177        Returns:
178
179           :class:`str`: ``"Multiprocessing batch fitness evaluator"``
180
181        """
182        return "Multiprocessing batch fitness evaluator"
183
184    def get_extra_info(self):
185        """Extra info for this evaluator.
186
187        If the process pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`,
188        invoking this function will trigger the creation of a new pool.
189
190        Returns:
191
192           :class:`str`: a string containing information about the number of processes in the pool
193
194        Raises:
195
196           unspecified: any exception thrown by :func:`~pygmo.mp_bfe.get_pool_size()`
197
198        """
199        return "\tNumber of processes in the pool: {}".format(
200            mp_bfe.get_pool_size())
201
202    @staticmethod
203    def _init_pool_impl(processes):
204        # Implementation method for initing
205        # the pool. This will *not* do any locking.
206        from ._mp_utils import _make_pool
207
208        if mp_bfe._pool is None:
209            mp_bfe._pool, mp_bfe._pool_size = _make_pool(processes)
210
211    @staticmethod
212    def init_pool(processes=None):
213        """Initialise the process pool.
214
215        This method will initialise the process pool backing :class:`~pygmo.mp_bfe`, if the pool
216        has not been initialised yet or if the pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`.
217        Otherwise, this method will have no effects.
218
219        Args:
220
221           processes(:data:`None` or an :class:`int`): the size of the pool (if :data:`None`, the size of the pool will be
222             equal to the number of logical CPUs on the system)
223
224        Raises:
225
226           ValueError: if the pool does not exist yet and the function is being called from a thread different
227             from the main one, or if *processes* is a non-positive value
228           TypeError: if *processes* is not :data:`None` and not an :class:`int`
229
230        """
231        with mp_bfe._pool_lock:
232            mp_bfe._init_pool_impl(processes)
233
234    @staticmethod
235    def get_pool_size():
236        """Get the size of the process pool.
237
238        If the process pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`, invoking this
239        function will trigger the creation of a new pool.
240
241        Returns:
242
243           :class:`int`: the current size of the pool
244
245        Raises:
246
247           unspecified: any exception thrown by :func:`~pygmo.mp_bfe.init_pool()`
248
249        """
250        with mp_bfe._pool_lock:
251            mp_bfe._init_pool_impl(None)
252            return mp_bfe._pool_size
253
254    @staticmethod
255    def resize_pool(processes):
256        """Resize pool.
257
258        This method will resize the process pool backing :class:`~pygmo.mp_bfe`.
259
260        If the process pool was previously shut down via :func:`~pygmo.mp_bfe.shutdown_pool()`, invoking this
261        function will trigger the creation of a new pool.
262
263        Args:
264
265           processes(:class:`int`): the desired number of processes in the pool
266
267        Raises:
268
269           TypeError: if the *processes* argument is not an :class:`int`
270           ValueError: if the *processes* argument is not strictly positive
271           unspecified: any exception thrown by :func:`~pygmo.mp_bfe.init_pool()`
272
273        """
274        from ._mp_utils import _make_pool
275
276        if not isinstance(processes, int):
277            raise TypeError("The 'processes' argument must be an int")
278        if processes <= 0:
279            raise ValueError(
280                "The 'processes' argument must be strictly positive")
281
282        with mp_bfe._pool_lock:
283            # NOTE: this will either init a new pool
284            # with the requested number of processes,
285            # or do nothing if the pool exists already.
286            mp_bfe._init_pool_impl(processes)
287            if processes == mp_bfe._pool_size:
288                # Don't do anything if we are not changing
289                # the size of the pool.
290                return
291            # Create new pool.
292            new_pool, new_size = _make_pool(processes)
293            # Stop the current pool.
294            mp_bfe._pool.close()
295            mp_bfe._pool.join()
296            # Assign the new pool.
297            mp_bfe._pool = new_pool
298            mp_bfe._pool_size = new_size
299
300    @staticmethod
301    def shutdown_pool():
302        """Shutdown pool.
303
304        This method will shut down the process pool backing :class:`~pygmo.mp_bfe`, after
305        all pending tasks in the pool have completed.
306
307        After the process pool has been shut down, attempting to use the evaluator
308        will raise an error. A new process pool can be created via an explicit call to
309        :func:`~pygmo.mp_bfe.init_pool()` or one of the methods of the public API of
310        :class:`~pygmo.mp_bfe` which trigger the creation of a new process pool.
311
312        """
313        with mp_bfe._pool_lock:
314            if mp_bfe._pool is not None:
315                mp_bfe._pool.close()
316                mp_bfe._pool.join()
317                mp_bfe._pool = None
318                mp_bfe._pool_size = None
319
320
321class ipyparallel_bfe(object):
322    """Ipyparallel batch fitness evaluator.
323
324    .. versionadded:: 2.13
325
326    This user-defined batch fitness evaluator (UDBFE) will dispatch
327    the fitness evaluation in batch mode of a set of decision vectors
328    to an ipyparallel cluster. The communication with the cluster is managed
329    via an :class:`ipyparallel.LoadBalancedView` instance which is
330    created either implicitly when the first fitness evaluation is run, or
331    explicitly via the :func:`~pygmo.ipyparallel_bfe.init_view()` method. The
332    :class:`~ipyparallel.LoadBalancedView` instance is a global object shared
333    among all the ipyparallel batch fitness evaluators.
334
335    .. seealso::
336
337       https://ipyparallel.readthedocs.io/en/latest/
338
339    """
340    # Static variables for the view.
341    _view_lock = _Lock()
342    _view = None
343
344    @staticmethod
345    def init_view(client_args=[], client_kwargs={}, view_args=[], view_kwargs={}):
346        """Init the ipyparallel view.
347
348        This method will initialise the :class:`ipyparallel.LoadBalancedView`
349        which is used by all ipyparallel evaluators to submit the evaluation tasks
350        to an ipyparallel cluster. If the :class:`ipyparallel.LoadBalancedView`
351        has already been created, this method will perform no action.
352
353        The input arguments *client_args* and *client_kwargs* are forwarded
354        as positional and keyword arguments to the construction of an
355        :class:`ipyparallel.Client` instance. From the constructed client,
356        an :class:`ipyparallel.LoadBalancedView` instance is then created
357        via the :func:`ipyparallel.Client.load_balanced_view()` method, to
358        which the positional and keyword arguments *view_args* and
359        *view_kwargs* are passed.
360
361        Note that usually it is not necessary to explicitly invoke this
362        method: an :class:`ipyparallel.LoadBalancedView` is automatically
363        constructed with default settings the first time a batch evaluation task
364        is submitted to an ipyparallel evaluator. This method should be used
365        only if it is necessary to pass custom arguments to the construction
366        of the :class:`ipyparallel.Client` or :class:`ipyparallel.LoadBalancedView`
367        objects.
368
369        Args:
370
371            client_args(:class:`list`): the positional arguments used for the
372              construction of the client
373            client_kwargs(:class:`dict`): the keyword arguments used for the
374              construction of the client
375            view_args(:class:`list`): the positional arguments used for the
376              construction of the view
377            view_kwargs(:class:`dict`): the keyword arguments used for the
378              construction of the view
379
380        Raises:
381
382           unspecified: any exception thrown by the constructor of :class:`ipyparallel.Client`
383             or by the :func:`ipyparallel.Client.load_balanced_view()` method
384
385        """
386        from ._ipyparallel_utils import _make_ipyparallel_view
387
388        with ipyparallel_bfe._view_lock:
389            if ipyparallel_bfe._view is None:
390                # Create the new view.
391                ipyparallel_bfe._view = _make_ipyparallel_view(
392                    client_args, client_kwargs, view_args, view_kwargs)
393
394    @staticmethod
395    def shutdown_view():
396        """Destroy the ipyparallel view.
397
398        This method will destroy the :class:`ipyparallel.LoadBalancedView`
399        currently being used by the ipyparallel evaluators for submitting
400        evaluation tasks to an ipyparallel cluster. The view can be re-inited
401        implicitly by submitting a new evaluation task, or by invoking
402        the :func:`~pygmo.ipyparallel_bfe.init_view()` method.
403
404        """
405        import gc
406        with ipyparallel_bfe._view_lock:
407            if ipyparallel_bfe._view is None:
408                return
409
410            old_view = ipyparallel_bfe._view
411            ipyparallel_bfe._view = None
412            del(old_view)
413            gc.collect()
414
415    def __call__(self, prob, dvs):
416        """Call operator.
417
418        This method will evaluate in batch mode the fitnesses of the input decision vectors
419        *dvs* using the fitness function from the optimisation problem *prob*. The fitness
420        evaluations are delegated to the nodes of the ipyparallel cluster backing
421        :class:`~pygmo.ipyparallel_bfe`.
422
423        See the documentation of :class:`pygmo.bfe` for an explanation of the expected
424        formats of *dvs* and of the return value.
425
426        Args:
427
428           prob(:class:`~pygmo.problem`): the input problem
429           dvs(:class:`numpy.ndarray`): the input decision vectors, represented as a
430             flattened 1D array
431
432        Returns:
433
434           :class:`numpy.ndarray`: the fitness vectors corresponding to *dvs*, represented as a
435             flattened 1D array
436
437        Raises:
438
439           unspecified: any exception thrown by the evaluations, by the (de)serialization
440             of the input arguments or of the return value, or by the public interface of
441             :class:`ipyparallel.LoadBalancedView`.
442
443        """
444        import pickle
445        import numpy as np
446        from ._ipyparallel_utils import _make_ipyparallel_view
447
448        # Fetch the dimension and the fitness
449        # dimension of the problem.
450        ndim = prob.get_nx()
451        nf = prob.get_nf()
452
453        # Compute the total number of decision
454        # vectors represented by dvs.
455        ndvs = len(dvs) // ndim
456        # Reshape dvs so that it represents
457        # ndvs decision vectors of dimension ndim
458        # each.
459        dvs.shape = (ndvs, ndim)
460
461        # Pre-serialize the problem.
462        pprob = pickle.dumps(prob)
463
464        # Build the list of arguments to pass
465        # to the cluster nodes.
466        async_args = [(pprob, pickle.dumps(dv)) for dv in dvs]
467
468        with ipyparallel_bfe._view_lock:
469            if ipyparallel_bfe._view is None:
470                ipyparallel_bfe._view = _make_ipyparallel_view(
471                    [], {}, [], {})
472            ret = ipyparallel_bfe._view.map_async(_mp_ipy_bfe_func, async_args)
473
474        # Build the vector of fitness vectors as a 2D numpy array.
475        fvs = np.array([pickle.loads(fv) for fv in ret.get()])
476        # Reshape it so that it is 1D.
477        fvs.shape = (ndvs*nf,)
478
479        return fvs
480
481    def get_name(self):
482        """Name of the evaluator.
483
484        Returns:
485            :class:`str`: ``"Ipyparallel batch fitness evaluator"``
486
487        """
488        return "Ipyparallel batch fitness evaluator"
489
490    def get_extra_info(self):
491        """Extra info for this evaluator.
492
493        Returns:
494            :class:`str`: a string with extra information about the status of the evaluator
495
496        """
497        from copy import deepcopy
498        with ipyparallel_bfe._view_lock:
499            if ipyparallel_bfe._view is None:
500                return "\tNo cluster view has been created yet"
501            else:
502                d = deepcopy(ipyparallel_bfe._view.queue_status())
503        return "\tQueue status:\n\t\n\t" + "\n\t".join(["(" + str(k) + ", " + str(d[k]) + ")" for k in d])
504