1#!/usr/bin/env python 2# 3# Copyright (c) 2016-2020 Intel Corporation 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# Based on the software developed by: 18# Copyright (c) 2008,2016 david decotigny (Pool of threads) 19# Copyright (c) 2006-2008, R Oudkerk (multiprocessing.Pool) 20# All rights reserved. 21# 22# Redistribution and use in source and binary forms, with or without 23# modification, are permitted provided that the following conditions 24# are met: 25# 26# 1. Redistributions of source code must retain the above copyright 27# notice, this list of conditions and the following disclaimer. 28# 2. Redistributions in binary form must reproduce the above copyright 29# notice, this list of conditions and the following disclaimer in the 30# documentation and/or other materials provided with the distribution. 31# 3. Neither the name of author nor the names of any contributors may be 32# used to endorse or promote products derived from this software 33# without specific prior written permission. 34# 35# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 36# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 37# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 38# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 39# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 40# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 41# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 42# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 43# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 44# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 45# SUCH DAMAGE. 46# 47 48# @brief Python Pool implementation based on TBB with monkey-patching 49# 50# See http://docs.python.org/dev/library/multiprocessing.html 51# Differences: added imap_async and imap_unordered_async, and terminate() 52# has to be called explicitly (it's not registered by atexit). 53# 54# The general idea is that we submit works to a workqueue, either as 55# single Jobs (one function to call), or JobSequences (batch of 56# Jobs). Each Job is associated with an ApplyResult object which has 2 57# states: waiting for the Job to complete, or Ready. Instead of 58# waiting for the jobs to finish, we wait for their ApplyResult object 59# to become ready: an event mechanism is used for that. 60# When we apply a function to several arguments in "parallel", we need 61# a way to wait for all/part of the Jobs to be processed: that's what 62# "collectors" are for; they group and wait for a set of ApplyResult 63# objects. Once a collector is ready to be used, we can use a 64# CollectorIterator to iterate over the result values it's collecting. 65# 66# The methods of a Pool object use all these concepts and expose 67# them to their caller in a very simple way. 68 69import sys 70import threading 71import traceback 72from .api import * 73 74__all__ = ["Pool", "TimeoutError"] 75__doc__ = """ 76Standard Python Pool implementation based on Python API 77for Intel(R) Threading Building Blocks library (Intel(R) TBB) 78""" 79 80 81class TimeoutError(Exception): 82 """Raised when a result is not available within the given timeout""" 83 pass 84 85 86class Pool(object): 87 """ 88 The Pool class provides standard multiprocessing.Pool interface 89 which is mapped onto Intel(R) TBB tasks executing in its thread pool 90 """ 91 92 def __init__(self, nworkers=0, name="Pool"): 93 """ 94 \param nworkers (integer) number of worker threads to start 95 \param name (string) prefix for the worker threads' name 96 """ 97 self._closed = False 98 self._tasks = task_group() 99 self._pool = [None,]*default_num_threads() # Dask asks for len(_pool) 100 101 def apply(self, func, args=(), kwds=dict()): 102 """Equivalent of the apply() builtin function. It blocks till 103 the result is ready.""" 104 return self.apply_async(func, args, kwds).get() 105 106 def map(self, func, iterable, chunksize=None): 107 """A parallel equivalent of the map() builtin function. It 108 blocks till the result is ready. 109 110 This method chops the iterable into a number of chunks which 111 it submits to the process pool as separate tasks. The 112 (approximate) size of these chunks can be specified by setting 113 chunksize to a positive integer.""" 114 return self.map_async(func, iterable, chunksize).get() 115 116 def imap(self, func, iterable, chunksize=1): 117 """ 118 An equivalent of itertools.imap(). 119 120 The chunksize argument is the same as the one used by the 121 map() method. For very long iterables using a large value for 122 chunksize can make the job complete much faster than 123 using the default value of 1. 124 125 Also if chunksize is 1 then the next() method of the iterator 126 returned by the imap() method has an optional timeout 127 parameter: next(timeout) will raise processing.TimeoutError if 128 the result cannot be returned within timeout seconds. 129 """ 130 collector = OrderedResultCollector(as_iterator=True) 131 self._create_sequences(func, iterable, chunksize, collector) 132 return iter(collector) 133 134 def imap_unordered(self, func, iterable, chunksize=1): 135 """The same as imap() except that the ordering of the results 136 from the returned iterator should be considered 137 arbitrary. (Only when there is only one worker process is the 138 order guaranteed to be "correct".)""" 139 collector = UnorderedResultCollector() 140 self._create_sequences(func, iterable, chunksize, collector) 141 return iter(collector) 142 143 def apply_async(self, func, args=(), kwds=dict(), callback=None): 144 """A variant of the apply() method which returns an 145 ApplyResult object. 146 147 If callback is specified then it should be a callable which 148 accepts a single argument. When the result becomes ready, 149 callback is applied to it (unless the call failed). callback 150 should complete immediately since otherwise the thread which 151 handles the results will get blocked.""" 152 assert not self._closed # No lock here. We assume it's atomic... 153 apply_result = ApplyResult(callback=callback) 154 job = Job(func, args, kwds, apply_result) 155 self._tasks.run(job) 156 return apply_result 157 158 def map_async(self, func, iterable, chunksize=None, callback=None): 159 """A variant of the map() method which returns a ApplyResult 160 object. 161 162 If callback is specified then it should be a callable which 163 accepts a single argument. When the result becomes ready 164 callback is applied to it (unless the call failed). callback 165 should complete immediately since otherwise the thread which 166 handles the results will get blocked.""" 167 apply_result = ApplyResult(callback=callback) 168 collector = OrderedResultCollector(apply_result, as_iterator=False) 169 if not self._create_sequences(func, iterable, chunksize, collector): 170 apply_result._set_value([]) 171 return apply_result 172 173 def imap_async(self, func, iterable, chunksize=None, callback=None): 174 """A variant of the imap() method which returns an ApplyResult 175 object that provides an iterator (next method(timeout) 176 available). 177 178 If callback is specified then it should be a callable which 179 accepts a single argument. When the resulting iterator becomes 180 ready, callback is applied to it (unless the call 181 failed). callback should complete immediately since otherwise 182 the thread which handles the results will get blocked.""" 183 apply_result = ApplyResult(callback=callback) 184 collector = OrderedResultCollector(apply_result, as_iterator=True) 185 if not self._create_sequences(func, iterable, chunksize, collector): 186 apply_result._set_value(iter([])) 187 return apply_result 188 189 def imap_unordered_async(self, func, iterable, chunksize=None, 190 callback=None): 191 """A variant of the imap_unordered() method which returns an 192 ApplyResult object that provides an iterator (next 193 method(timeout) available). 194 195 If callback is specified then it should be a callable which 196 accepts a single argument. When the resulting iterator becomes 197 ready, callback is applied to it (unless the call 198 failed). callback should complete immediately since otherwise 199 the thread which handles the results will get blocked.""" 200 apply_result = ApplyResult(callback=callback) 201 collector = UnorderedResultCollector(apply_result) 202 if not self._create_sequences(func, iterable, chunksize, collector): 203 apply_result._set_value(iter([])) 204 return apply_result 205 206 def close(self): 207 """Prevents any more tasks from being submitted to the 208 pool. Once all the tasks have been completed the worker 209 processes will exit.""" 210 # No lock here. We assume it's sufficiently atomic... 211 self._closed = True 212 213 def terminate(self): 214 """Stops the worker processes immediately without completing 215 outstanding work. When the pool object is garbage collected 216 terminate() will be called immediately.""" 217 self.close() 218 self._tasks.cancel() 219 220 def join(self): 221 """Wait for the worker processes to exit. One must call 222 close() or terminate() before using join().""" 223 self._tasks.wait() 224 225 def __enter__(self): 226 return self 227 228 def __exit__(self, exc_type, exc_value, traceback): 229 self.join() 230 231 def __del__(self): 232 self.terminate() 233 self.join() 234 235 def _create_sequences(self, func, iterable, chunksize, collector): 236 """ 237 Create callable objects to process and pushes them on the 238 work queue. Each work unit is meant to process a slice of 239 iterable of size chunksize. If collector is specified, then 240 the ApplyResult objects associated with the jobs will notify 241 collector when their result becomes ready. 242 243 \return the list callable objects (basically: JobSequences) 244 pushed onto the work queue 245 """ 246 assert not self._closed # No lock here. We assume it's atomic... 247 it_ = iter(iterable) 248 exit_loop = False 249 sequences = [] 250 while not exit_loop: 251 seq = [] 252 for _ in range(chunksize or 1): 253 try: 254 arg = next(it_) 255 except StopIteration: 256 exit_loop = True 257 break 258 apply_result = ApplyResult(collector) 259 job = Job(func, (arg,), {}, apply_result) 260 seq.append(job) 261 if seq: 262 sequences.append(JobSequence(seq)) 263 for t in sequences: 264 self._tasks.run(t) 265 return sequences 266 267 268class Job: 269 """A work unit that corresponds to the execution of a single function""" 270 271 def __init__(self, func, args, kwds, apply_result): 272 """ 273 \param func/args/kwds used to call the function 274 \param apply_result ApplyResult object that holds the result 275 of the function call 276 """ 277 self._func = func 278 self._args = args 279 self._kwds = kwds 280 self._result = apply_result 281 282 def __call__(self): 283 """ 284 Call the function with the args/kwds and tell the ApplyResult 285 that its result is ready. Correctly handles the exceptions 286 happening during the execution of the function 287 """ 288 try: 289 result = self._func(*self._args, **self._kwds) 290 except: 291 self._result._set_exception() 292 else: 293 self._result._set_value(result) 294 295 296class JobSequence: 297 """A work unit that corresponds to the processing of a continuous 298 sequence of Job objects""" 299 300 def __init__(self, jobs): 301 self._jobs = jobs 302 303 def __call__(self): 304 """ 305 Call all the Job objects that have been specified 306 """ 307 for job in self._jobs: 308 job() 309 310 311class ApplyResult(object): 312 """An object associated with a Job object that holds its result: 313 it's available during the whole life the Job and after, even when 314 the Job didn't process yet. It's possible to use this object to 315 wait for the result/exception of the job to be available. 316 317 The result objects returns by the Pool::*_async() methods are of 318 this type""" 319 320 def __init__(self, collector=None, callback=None): 321 """ 322 \param collector when not None, the notify_ready() method of 323 the collector will be called when the result from the Job is 324 ready 325 \param callback when not None, function to call when the 326 result becomes available (this is the parameter passed to the 327 Pool::*_async() methods. 328 """ 329 self._success = False 330 self._event = threading.Event() 331 self._data = None 332 self._collector = None 333 self._callback = callback 334 335 if collector is not None: 336 collector.register_result(self) 337 self._collector = collector 338 339 def get(self, timeout=None): 340 """ 341 Returns the result when it arrives. If timeout is not None and 342 the result does not arrive within timeout seconds then 343 TimeoutError is raised. If the remote call raised an exception 344 then that exception will be reraised by get(). 345 """ 346 if not self.wait(timeout): 347 raise TimeoutError("Result not available within %fs" % timeout) 348 if self._success: 349 return self._data 350 if sys.version_info[0] == 3: 351 raise self._data[0](self._data[1]).with_traceback(self._data[2]) 352 else: 353 exec("raise self._data[0], self._data[1], self._data[2]") 354 355 def wait(self, timeout=None): 356 """Waits until the result is available or until timeout 357 seconds pass.""" 358 self._event.wait(timeout) 359 return self._event.isSet() 360 361 def ready(self): 362 """Returns whether the call has completed.""" 363 return self._event.isSet() 364 365 def successful(self): 366 """Returns whether the call completed without raising an 367 exception. Will raise AssertionError if the result is not 368 ready.""" 369 assert self.ready() 370 return self._success 371 372 def _set_value(self, value): 373 """Called by a Job object to tell the result is ready, and 374 provides the value of this result. The object will become 375 ready and successful. The collector's notify_ready() method 376 will be called, and the callback method too""" 377 assert not self.ready() 378 self._data = value 379 self._success = True 380 self._event.set() 381 if self._collector is not None: 382 self._collector.notify_ready(self) 383 if self._callback is not None: 384 try: 385 self._callback(value) 386 except: 387 traceback.print_exc() 388 389 def _set_exception(self): 390 """Called by a Job object to tell that an exception occurred 391 during the processing of the function. The object will become 392 ready but not successful. The collector's notify_ready() 393 method will be called, but NOT the callback method""" 394 # traceback.print_exc() 395 assert not self.ready() 396 self._data = sys.exc_info() 397 self._success = False 398 self._event.set() 399 if self._collector is not None: 400 self._collector.notify_ready(self) 401 402 403class AbstractResultCollector(object): 404 """ABC to define the interface of a ResultCollector object. It is 405 basically an object which knows whuich results it's waiting for, 406 and which is able to get notify when they get available. It is 407 also able to provide an iterator over the results when they are 408 available""" 409 410 def __init__(self, to_notify): 411 """ 412 \param to_notify ApplyResult object to notify when all the 413 results we're waiting for become available. Can be None. 414 """ 415 self._to_notify = to_notify 416 417 def register_result(self, apply_result): 418 """Used to identify which results we're waiting for. Will 419 always be called BEFORE the Jobs get submitted to the work 420 queue, and BEFORE the __iter__ and _get_result() methods can 421 be called 422 \param apply_result ApplyResult object to add in our collection 423 """ 424 raise NotImplementedError("Children classes must implement it") 425 426 def notify_ready(self, apply_result): 427 """Called by the ApplyResult object (already registered via 428 register_result()) that it is now ready (ie. the Job's result 429 is available or an exception has been raised). 430 \param apply_result ApplyResult object telling us that the job 431 has been processed 432 """ 433 raise NotImplementedError("Children classes must implement it") 434 435 def _get_result(self, idx, timeout=None): 436 """Called by the CollectorIterator object to retrieve the 437 result's values one after another (order defined by the 438 implementation) 439 \param idx The index of the result we want, wrt collector's order 440 \param timeout integer telling how long to wait (in seconds) 441 for the result at index idx to be available, or None (wait 442 forever) 443 """ 444 raise NotImplementedError("Children classes must implement it") 445 446 def __iter__(self): 447 """Return a new CollectorIterator object for this collector""" 448 return CollectorIterator(self) 449 450 451class CollectorIterator(object): 452 """An iterator that allows to iterate over the result values 453 available in the given collector object. Equipped with an extended 454 next() method accepting a timeout argument. Created by the 455 AbstractResultCollector::__iter__() method""" 456 457 def __init__(self, collector): 458 """\param AbstractResultCollector instance""" 459 self._collector = collector 460 self._idx = 0 461 462 def __iter__(self): 463 return self 464 465 def next(self, timeout=None): 466 """Return the next result value in the sequence. Raise 467 StopIteration at the end. Can raise the exception raised by 468 the Job""" 469 try: 470 apply_result = self._collector._get_result(self._idx, timeout) 471 except IndexError: 472 # Reset for next time 473 self._idx = 0 474 raise StopIteration 475 except: 476 self._idx = 0 477 raise 478 self._idx += 1 479 assert apply_result.ready() 480 return apply_result.get(0) 481 482 def __next__(self): 483 return self.next() 484 485 486class UnorderedResultCollector(AbstractResultCollector): 487 """An AbstractResultCollector implementation that collects the 488 values of the ApplyResult objects in the order they become ready. The 489 CollectorIterator object returned by __iter__() will iterate over 490 them in the order they become ready""" 491 492 def __init__(self, to_notify=None): 493 """ 494 \param to_notify ApplyResult object to notify when all the 495 results we're waiting for become available. Can be None. 496 """ 497 AbstractResultCollector.__init__(self, to_notify) 498 self._cond = threading.Condition() 499 self._collection = [] 500 self._expected = 0 501 502 def register_result(self, apply_result): 503 """Used to identify which results we're waiting for. Will 504 always be called BEFORE the Jobs get submitted to the work 505 queue, and BEFORE the __iter__ and _get_result() methods can 506 be called 507 \param apply_result ApplyResult object to add in our collection 508 """ 509 self._expected += 1 510 511 def _get_result(self, idx, timeout=None): 512 """Called by the CollectorIterator object to retrieve the 513 result's values one after another, in the order the results have 514 become available. 515 \param idx The index of the result we want, wrt collector's order 516 \param timeout integer telling how long to wait (in seconds) 517 for the result at index idx to be available, or None (wait 518 forever) 519 """ 520 self._cond.acquire() 521 try: 522 if idx >= self._expected: 523 raise IndexError 524 elif idx < len(self._collection): 525 return self._collection[idx] 526 elif idx != len(self._collection): 527 # Violation of the sequence protocol 528 raise IndexError() 529 else: 530 self._cond.wait(timeout=timeout) 531 try: 532 return self._collection[idx] 533 except IndexError: 534 # Still not added ! 535 raise TimeoutError("Timeout while waiting for results") 536 finally: 537 self._cond.release() 538 539 def notify_ready(self, apply_result=None): 540 """Called by the ApplyResult object (already registered via 541 register_result()) that it is now ready (ie. the Job's result 542 is available or an exception has been raised). 543 \param apply_result ApplyResult object telling us that the job 544 has been processed 545 """ 546 first_item = False 547 self._cond.acquire() 548 try: 549 self._collection.append(apply_result) 550 first_item = (len(self._collection) == 1) 551 552 self._cond.notifyAll() 553 finally: 554 self._cond.release() 555 556 if first_item and self._to_notify is not None: 557 self._to_notify._set_value(iter(self)) 558 559 560class OrderedResultCollector(AbstractResultCollector): 561 """An AbstractResultCollector implementation that collects the 562 values of the ApplyResult objects in the order they have been 563 submitted. The CollectorIterator object returned by __iter__() 564 will iterate over them in the order they have been submitted""" 565 566 def __init__(self, to_notify=None, as_iterator=True): 567 """ 568 \param to_notify ApplyResult object to notify when all the 569 results we're waiting for become available. Can be None. 570 \param as_iterator boolean telling whether the result value 571 set on to_notify should be an iterator (available as soon as 1 572 result arrived) or a list (available only after the last 573 result arrived) 574 """ 575 AbstractResultCollector.__init__(self, to_notify) 576 self._results = [] 577 self._lock = threading.Lock() 578 self._remaining = 0 579 self._as_iterator = as_iterator 580 581 def register_result(self, apply_result): 582 """Used to identify which results we're waiting for. Will 583 always be called BEFORE the Jobs get submitted to the work 584 queue, and BEFORE the __iter__ and _get_result() methods can 585 be called 586 \param apply_result ApplyResult object to add in our collection 587 """ 588 self._results.append(apply_result) 589 self._remaining += 1 590 591 def _get_result(self, idx, timeout=None): 592 """Called by the CollectorIterator object to retrieve the 593 result's values one after another (order defined by the 594 implementation) 595 \param idx The index of the result we want, wrt collector's order 596 \param timeout integer telling how long to wait (in seconds) 597 for the result at index idx to be available, or None (wait 598 forever) 599 """ 600 res = self._results[idx] 601 res.wait(timeout) 602 return res 603 604 def notify_ready(self, apply_result): 605 """Called by the ApplyResult object (already registered via 606 register_result()) that it is now ready (ie. the Job's result 607 is available or an exception has been raised). 608 \param apply_result ApplyResult object telling us that the job 609 has been processed 610 """ 611 got_first = False 612 got_last = False 613 self._lock.acquire() 614 try: 615 assert self._remaining > 0 616 got_first = (len(self._results) == self._remaining) 617 self._remaining -= 1 618 got_last = (self._remaining == 0) 619 finally: 620 self._lock.release() 621 622 if self._to_notify is not None: 623 if self._as_iterator and got_first: 624 self._to_notify._set_value(iter(self)) 625 elif not self._as_iterator and got_last: 626 try: 627 lst = [r.get(0) for r in self._results] 628 except: 629 self._to_notify._set_exception() 630 else: 631 self._to_notify._set_value(lst) 632