1#!/usr/bin/env python
2#
3# Copyright (c) 2016-2020 Intel Corporation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# Based on the software developed by:
18# Copyright (c) 2008,2016 david decotigny (Pool of threads)
19# Copyright (c) 2006-2008, R Oudkerk (multiprocessing.Pool)
20# All rights reserved.
21#
22# Redistribution and use in source and binary forms, with or without
23# modification, are permitted provided that the following conditions
24# are met:
25#
26# 1. Redistributions of source code must retain the above copyright
27#    notice, this list of conditions and the following disclaimer.
28# 2. Redistributions in binary form must reproduce the above copyright
29#    notice, this list of conditions and the following disclaimer in the
30#    documentation and/or other materials provided with the distribution.
31# 3. Neither the name of author nor the names of any contributors may be
32#    used to endorse or promote products derived from this software
33#    without specific prior written permission.
34#
35# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
36# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
37# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
38# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
39# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
40# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
41# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
42# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
43# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
44# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
45# SUCH DAMAGE.
46#
47
48# @brief Python Pool implementation based on TBB with monkey-patching
49#
50# See http://docs.python.org/dev/library/multiprocessing.html
51# Differences: added imap_async and imap_unordered_async, and terminate()
52# has to be called explicitly (it's not registered by atexit).
53#
54# The general idea is that we submit works to a workqueue, either as
55# single Jobs (one function to call), or JobSequences (batch of
56# Jobs). Each Job is associated with an ApplyResult object which has 2
57# states: waiting for the Job to complete, or Ready. Instead of
58# waiting for the jobs to finish, we wait for their ApplyResult object
59# to become ready: an event mechanism is used for that.
60# When we apply a function to several arguments in "parallel", we need
61# a way to wait for all/part of the Jobs to be processed: that's what
62# "collectors" are for; they group and wait for a set of ApplyResult
63# objects. Once a collector is ready to be used, we can use a
64# CollectorIterator to iterate over the result values it's collecting.
65#
66# The methods of a Pool object use all these concepts and expose
67# them to their caller in a very simple way.
68
69import sys
70import threading
71import traceback
72from .api import *
73
74__all__ = ["Pool", "TimeoutError"]
75__doc__ = """
76Standard Python Pool implementation based on Python API
77for Intel(R) Threading Building Blocks library (Intel(R) TBB)
78"""
79
80
81class TimeoutError(Exception):
82    """Raised when a result is not available within the given timeout"""
83    pass
84
85
86class Pool(object):
87    """
88    The Pool class provides standard multiprocessing.Pool interface
89    which is mapped onto Intel(R) TBB tasks executing in its thread pool
90    """
91
92    def __init__(self, nworkers=0, name="Pool"):
93        """
94        \param nworkers (integer) number of worker threads to start
95        \param name (string) prefix for the worker threads' name
96        """
97        self._closed = False
98        self._tasks = task_group()
99        self._pool = [None,]*default_num_threads()  # Dask asks for len(_pool)
100
101    def apply(self, func, args=(), kwds=dict()):
102        """Equivalent of the apply() builtin function. It blocks till
103        the result is ready."""
104        return self.apply_async(func, args, kwds).get()
105
106    def map(self, func, iterable, chunksize=None):
107        """A parallel equivalent of the map() builtin function. It
108        blocks till the result is ready.
109
110        This method chops the iterable into a number of chunks which
111        it submits to the process pool as separate tasks. The
112        (approximate) size of these chunks can be specified by setting
113        chunksize to a positive integer."""
114        return self.map_async(func, iterable, chunksize).get()
115
116    def imap(self, func, iterable, chunksize=1):
117        """
118        An equivalent of itertools.imap().
119
120        The chunksize argument is the same as the one used by the
121        map() method. For very long iterables using a large value for
122        chunksize can make the job complete much faster than
123        using the default value of 1.
124
125        Also if chunksize is 1 then the next() method of the iterator
126        returned by the imap() method has an optional timeout
127        parameter: next(timeout) will raise processing.TimeoutError if
128        the result cannot be returned within timeout seconds.
129        """
130        collector = OrderedResultCollector(as_iterator=True)
131        self._create_sequences(func, iterable, chunksize, collector)
132        return iter(collector)
133
134    def imap_unordered(self, func, iterable, chunksize=1):
135        """The same as imap() except that the ordering of the results
136        from the returned iterator should be considered
137        arbitrary. (Only when there is only one worker process is the
138        order guaranteed to be "correct".)"""
139        collector = UnorderedResultCollector()
140        self._create_sequences(func, iterable, chunksize, collector)
141        return iter(collector)
142
143    def apply_async(self, func, args=(), kwds=dict(), callback=None):
144        """A variant of the apply() method which returns an
145        ApplyResult object.
146
147        If callback is specified then it should be a callable which
148        accepts a single argument. When the result becomes ready,
149        callback is applied to it (unless the call failed). callback
150        should complete immediately since otherwise the thread which
151        handles the results will get blocked."""
152        assert not self._closed  # No lock here. We assume it's atomic...
153        apply_result = ApplyResult(callback=callback)
154        job = Job(func, args, kwds, apply_result)
155        self._tasks.run(job)
156        return apply_result
157
158    def map_async(self, func, iterable, chunksize=None, callback=None):
159        """A variant of the map() method which returns a ApplyResult
160        object.
161
162        If callback is specified then it should be a callable which
163        accepts a single argument. When the result becomes ready
164        callback is applied to it (unless the call failed). callback
165        should complete immediately since otherwise the thread which
166        handles the results will get blocked."""
167        apply_result = ApplyResult(callback=callback)
168        collector    = OrderedResultCollector(apply_result, as_iterator=False)
169        if not self._create_sequences(func, iterable, chunksize, collector):
170          apply_result._set_value([])
171        return apply_result
172
173    def imap_async(self, func, iterable, chunksize=None, callback=None):
174        """A variant of the imap() method which returns an ApplyResult
175        object that provides an iterator (next method(timeout)
176        available).
177
178        If callback is specified then it should be a callable which
179        accepts a single argument. When the resulting iterator becomes
180        ready, callback is applied to it (unless the call
181        failed). callback should complete immediately since otherwise
182        the thread which handles the results will get blocked."""
183        apply_result = ApplyResult(callback=callback)
184        collector    = OrderedResultCollector(apply_result, as_iterator=True)
185        if not self._create_sequences(func, iterable, chunksize, collector):
186          apply_result._set_value(iter([]))
187        return apply_result
188
189    def imap_unordered_async(self, func, iterable, chunksize=None,
190                             callback=None):
191        """A variant of the imap_unordered() method which returns an
192        ApplyResult object that provides an iterator (next
193        method(timeout) available).
194
195        If callback is specified then it should be a callable which
196        accepts a single argument. When the resulting iterator becomes
197        ready, callback is applied to it (unless the call
198        failed). callback should complete immediately since otherwise
199        the thread which handles the results will get blocked."""
200        apply_result = ApplyResult(callback=callback)
201        collector    = UnorderedResultCollector(apply_result)
202        if not self._create_sequences(func, iterable, chunksize, collector):
203          apply_result._set_value(iter([]))
204        return apply_result
205
206    def close(self):
207        """Prevents any more tasks from being submitted to the
208        pool. Once all the tasks have been completed the worker
209        processes will exit."""
210        # No lock here. We assume it's sufficiently atomic...
211        self._closed = True
212
213    def terminate(self):
214        """Stops the worker processes immediately without completing
215        outstanding work. When the pool object is garbage collected
216        terminate() will be called immediately."""
217        self.close()
218        self._tasks.cancel()
219
220    def join(self):
221        """Wait for the worker processes to exit. One must call
222        close() or terminate() before using join()."""
223        self._tasks.wait()
224
225    def __enter__(self):
226        return self
227
228    def __exit__(self, exc_type, exc_value, traceback):
229        self.join()
230
231    def __del__(self):
232        self.terminate()
233        self.join()
234
235    def _create_sequences(self, func, iterable, chunksize, collector):
236        """
237        Create callable objects to process and pushes them on the
238        work queue. Each work unit is meant to process a slice of
239        iterable of size chunksize. If collector is specified, then
240        the ApplyResult objects associated with the jobs will notify
241        collector when their result becomes ready.
242
243        \return the list callable objects (basically: JobSequences)
244        pushed onto the work queue
245        """
246        assert not self._closed  # No lock here. We assume it's atomic...
247        it_ = iter(iterable)
248        exit_loop = False
249        sequences = []
250        while not exit_loop:
251            seq = []
252            for _ in range(chunksize or 1):
253                try:
254                    arg = next(it_)
255                except StopIteration:
256                    exit_loop = True
257                    break
258                apply_result = ApplyResult(collector)
259                job = Job(func, (arg,), {}, apply_result)
260                seq.append(job)
261            if seq:
262                sequences.append(JobSequence(seq))
263        for t in sequences:
264            self._tasks.run(t)
265        return sequences
266
267
268class Job:
269    """A work unit that corresponds to the execution of a single function"""
270
271    def __init__(self, func, args, kwds, apply_result):
272        """
273        \param func/args/kwds used to call the function
274        \param apply_result ApplyResult object that holds the result
275        of the function call
276        """
277        self._func = func
278        self._args = args
279        self._kwds = kwds
280        self._result = apply_result
281
282    def __call__(self):
283        """
284        Call the function with the args/kwds and tell the ApplyResult
285        that its result is ready. Correctly handles the exceptions
286        happening during the execution of the function
287        """
288        try:
289            result = self._func(*self._args, **self._kwds)
290        except:
291            self._result._set_exception()
292        else:
293            self._result._set_value(result)
294
295
296class JobSequence:
297    """A work unit that corresponds to the processing of a continuous
298    sequence of Job objects"""
299
300    def __init__(self, jobs):
301        self._jobs = jobs
302
303    def __call__(self):
304        """
305        Call all the Job objects that have been specified
306        """
307        for job in self._jobs:
308            job()
309
310
311class ApplyResult(object):
312    """An object associated with a Job object that holds its result:
313    it's available during the whole life the Job and after, even when
314    the Job didn't process yet. It's possible to use this object to
315    wait for the result/exception of the job to be available.
316
317    The result objects returns by the Pool::*_async() methods are of
318    this type"""
319
320    def __init__(self, collector=None, callback=None):
321        """
322        \param collector when not None, the notify_ready() method of
323        the collector will be called when the result from the Job is
324        ready
325        \param callback when not None, function to call when the
326        result becomes available (this is the parameter passed to the
327        Pool::*_async() methods.
328        """
329        self._success = False
330        self._event = threading.Event()
331        self._data = None
332        self._collector = None
333        self._callback = callback
334
335        if collector is not None:
336            collector.register_result(self)
337            self._collector = collector
338
339    def get(self, timeout=None):
340        """
341        Returns the result when it arrives. If timeout is not None and
342        the result does not arrive within timeout seconds then
343        TimeoutError is raised. If the remote call raised an exception
344        then that exception will be reraised by get().
345        """
346        if not self.wait(timeout):
347            raise TimeoutError("Result not available within %fs" % timeout)
348        if self._success:
349            return self._data
350        if sys.version_info[0] == 3:
351            raise self._data[0](self._data[1]).with_traceback(self._data[2])
352        else:
353            exec("raise self._data[0], self._data[1], self._data[2]")
354
355    def wait(self, timeout=None):
356        """Waits until the result is available or until timeout
357        seconds pass."""
358        self._event.wait(timeout)
359        return self._event.isSet()
360
361    def ready(self):
362        """Returns whether the call has completed."""
363        return self._event.isSet()
364
365    def successful(self):
366        """Returns whether the call completed without raising an
367        exception. Will raise AssertionError if the result is not
368        ready."""
369        assert self.ready()
370        return self._success
371
372    def _set_value(self, value):
373        """Called by a Job object to tell the result is ready, and
374        provides the value of this result. The object will become
375        ready and successful. The collector's notify_ready() method
376        will be called, and the callback method too"""
377        assert not self.ready()
378        self._data = value
379        self._success = True
380        self._event.set()
381        if self._collector is not None:
382            self._collector.notify_ready(self)
383        if self._callback is not None:
384            try:
385                self._callback(value)
386            except:
387                traceback.print_exc()
388
389    def _set_exception(self):
390        """Called by a Job object to tell that an exception occurred
391        during the processing of the function. The object will become
392        ready but not successful. The collector's notify_ready()
393        method will be called, but NOT the callback method"""
394        # traceback.print_exc()
395        assert not self.ready()
396        self._data = sys.exc_info()
397        self._success = False
398        self._event.set()
399        if self._collector is not None:
400            self._collector.notify_ready(self)
401
402
403class AbstractResultCollector(object):
404    """ABC to define the interface of a ResultCollector object. It is
405    basically an object which knows whuich results it's waiting for,
406    and which is able to get notify when they get available. It is
407    also able to provide an iterator over the results when they are
408    available"""
409
410    def __init__(self, to_notify):
411        """
412        \param to_notify ApplyResult object to notify when all the
413        results we're waiting for become available. Can be None.
414        """
415        self._to_notify = to_notify
416
417    def register_result(self, apply_result):
418        """Used to identify which results we're waiting for. Will
419        always be called BEFORE the Jobs get submitted to the work
420        queue, and BEFORE the __iter__ and _get_result() methods can
421        be called
422        \param apply_result ApplyResult object to add in our collection
423        """
424        raise NotImplementedError("Children classes must implement it")
425
426    def notify_ready(self, apply_result):
427        """Called by the ApplyResult object (already registered via
428        register_result()) that it is now ready (ie. the Job's result
429        is available or an exception has been raised).
430        \param apply_result ApplyResult object telling us that the job
431        has been processed
432        """
433        raise NotImplementedError("Children classes must implement it")
434
435    def _get_result(self, idx, timeout=None):
436        """Called by the CollectorIterator object to retrieve the
437        result's values one after another (order defined by the
438        implementation)
439        \param idx The index of the result we want, wrt collector's order
440        \param timeout integer telling how long to wait (in seconds)
441        for the result at index idx to be available, or None (wait
442        forever)
443        """
444        raise NotImplementedError("Children classes must implement it")
445
446    def __iter__(self):
447        """Return a new CollectorIterator object for this collector"""
448        return CollectorIterator(self)
449
450
451class CollectorIterator(object):
452    """An iterator that allows to iterate over the result values
453    available in the given collector object. Equipped with an extended
454    next() method accepting a timeout argument. Created by the
455    AbstractResultCollector::__iter__() method"""
456
457    def __init__(self, collector):
458        """\param AbstractResultCollector instance"""
459        self._collector = collector
460        self._idx = 0
461
462    def __iter__(self):
463        return self
464
465    def next(self, timeout=None):
466        """Return the next result value in the sequence. Raise
467        StopIteration at the end. Can raise the exception raised by
468        the Job"""
469        try:
470            apply_result = self._collector._get_result(self._idx, timeout)
471        except IndexError:
472            # Reset for next time
473            self._idx = 0
474            raise StopIteration
475        except:
476            self._idx = 0
477            raise
478        self._idx += 1
479        assert apply_result.ready()
480        return apply_result.get(0)
481
482    def __next__(self):
483        return self.next()
484
485
486class UnorderedResultCollector(AbstractResultCollector):
487    """An AbstractResultCollector implementation that collects the
488    values of the ApplyResult objects in the order they become ready. The
489    CollectorIterator object returned by __iter__() will iterate over
490    them in the order they become ready"""
491
492    def __init__(self, to_notify=None):
493        """
494        \param to_notify ApplyResult object to notify when all the
495        results we're waiting for become available. Can be None.
496        """
497        AbstractResultCollector.__init__(self, to_notify)
498        self._cond = threading.Condition()
499        self._collection = []
500        self._expected = 0
501
502    def register_result(self, apply_result):
503        """Used to identify which results we're waiting for. Will
504        always be called BEFORE the Jobs get submitted to the work
505        queue, and BEFORE the __iter__ and _get_result() methods can
506        be called
507        \param apply_result ApplyResult object to add in our collection
508        """
509        self._expected += 1
510
511    def _get_result(self, idx, timeout=None):
512        """Called by the CollectorIterator object to retrieve the
513        result's values one after another, in the order the results have
514        become available.
515        \param idx The index of the result we want, wrt collector's order
516        \param timeout integer telling how long to wait (in seconds)
517        for the result at index idx to be available, or None (wait
518        forever)
519        """
520        self._cond.acquire()
521        try:
522            if idx >= self._expected:
523                raise IndexError
524            elif idx < len(self._collection):
525                return self._collection[idx]
526            elif idx != len(self._collection):
527                # Violation of the sequence protocol
528                raise IndexError()
529            else:
530                self._cond.wait(timeout=timeout)
531                try:
532                    return self._collection[idx]
533                except IndexError:
534                    # Still not added !
535                    raise TimeoutError("Timeout while waiting for results")
536        finally:
537            self._cond.release()
538
539    def notify_ready(self, apply_result=None):
540        """Called by the ApplyResult object (already registered via
541        register_result()) that it is now ready (ie. the Job's result
542        is available or an exception has been raised).
543        \param apply_result ApplyResult object telling us that the job
544        has been processed
545        """
546        first_item = False
547        self._cond.acquire()
548        try:
549            self._collection.append(apply_result)
550            first_item = (len(self._collection) == 1)
551
552            self._cond.notifyAll()
553        finally:
554            self._cond.release()
555
556        if first_item and self._to_notify is not None:
557            self._to_notify._set_value(iter(self))
558
559
560class OrderedResultCollector(AbstractResultCollector):
561    """An AbstractResultCollector implementation that collects the
562    values of the ApplyResult objects in the order they have been
563    submitted. The CollectorIterator object returned by __iter__()
564    will iterate over them in the order they have been submitted"""
565
566    def __init__(self, to_notify=None, as_iterator=True):
567        """
568        \param to_notify ApplyResult object to notify when all the
569        results we're waiting for become available. Can be None.
570        \param as_iterator boolean telling whether the result value
571        set on to_notify should be an iterator (available as soon as 1
572        result arrived) or a list (available only after the last
573        result arrived)
574        """
575        AbstractResultCollector.__init__(self, to_notify)
576        self._results = []
577        self._lock = threading.Lock()
578        self._remaining = 0
579        self._as_iterator = as_iterator
580
581    def register_result(self, apply_result):
582        """Used to identify which results we're waiting for. Will
583        always be called BEFORE the Jobs get submitted to the work
584        queue, and BEFORE the __iter__ and _get_result() methods can
585        be called
586        \param apply_result ApplyResult object to add in our collection
587        """
588        self._results.append(apply_result)
589        self._remaining += 1
590
591    def _get_result(self, idx, timeout=None):
592        """Called by the CollectorIterator object to retrieve the
593        result's values one after another (order defined by the
594        implementation)
595        \param idx The index of the result we want, wrt collector's order
596        \param timeout integer telling how long to wait (in seconds)
597        for the result at index idx to be available, or None (wait
598        forever)
599        """
600        res = self._results[idx]
601        res.wait(timeout)
602        return res
603
604    def notify_ready(self, apply_result):
605        """Called by the ApplyResult object (already registered via
606        register_result()) that it is now ready (ie. the Job's result
607        is available or an exception has been raised).
608        \param apply_result ApplyResult object telling us that the job
609        has been processed
610        """
611        got_first = False
612        got_last = False
613        self._lock.acquire()
614        try:
615            assert self._remaining > 0
616            got_first = (len(self._results) == self._remaining)
617            self._remaining -= 1
618            got_last = (self._remaining == 0)
619        finally:
620            self._lock.release()
621
622        if self._to_notify is not None:
623            if self._as_iterator and got_first:
624                self._to_notify._set_value(iter(self))
625            elif not self._as_iterator and got_last:
626                try:
627                    lst = [r.get(0) for r in self._results]
628                except:
629                    self._to_notify._set_exception()
630                else:
631                    self._to_notify._set_value(lst)
632