1######################################################################
2#
3# File: b2sdk/bounded_queue_executor.py
4#
5# Copyright 2019 Backblaze Inc. All Rights Reserved.
6#
7# License https://www.backblaze.com/using_b2_code.html
8#
9######################################################################
10
11import threading
12
13
14class BoundedQueueExecutor(object):
15    """
16    Wrap a concurrent.futures.Executor and limits the number of requests that
17    can be queued at once.  Requests to submit() tasks block until
18    there is room in the queue.
19
20    The number of available slots in the queue is tracked with a
21    semaphore that is acquired before queueing an action, and
22    released when an action finishes.
23
24    Counts the number of exceptions thrown by tasks, and makes them
25    available from get_num_exceptions() after shutting down.
26    """
27
28    def __init__(self, executor, queue_limit):
29        """
30        :param executor: an executor to be wrapped
31        :type executor: concurrent.futures.Executor
32        :param queue_limit: a queue limit
33        :type queue_limit: int
34        """
35        self.executor = executor
36        self.semaphore = threading.Semaphore(queue_limit)
37        self.num_exceptions = 0
38
39    def submit(self, fcn, *args, **kwargs):
40        """
41        Start execution of a callable with the given optional and positional arguments
42
43        :param fcn: a callable object
44        :type fcn: callable
45        :return: a future object
46        :rtype: concurrent.futures.Future
47        """
48        # Wait until there is room in the queue.
49        self.semaphore.acquire()
50
51        # Wrap the action in a function that will release
52        # the semaphore after it runs.
53        def run_it():
54            try:
55                return fcn(*args, **kwargs)
56            except Exception:
57                self.num_exceptions += 1
58                raise
59            finally:
60                self.semaphore.release()
61
62        # Submit the wrapped action.
63        return self.executor.submit(run_it)
64
65    def shutdown(self):
66        """
67        Shut an executor down.
68        """
69        self.executor.shutdown()
70
71    def get_num_exceptions(self):
72        """
73        Return a number of exceptions.
74
75        :rtype: int
76        """
77        return self.num_exceptions
78