1###################################################################### 2# 3# File: b2sdk/bounded_queue_executor.py 4# 5# Copyright 2019 Backblaze Inc. All Rights Reserved. 6# 7# License https://www.backblaze.com/using_b2_code.html 8# 9###################################################################### 10 11import threading 12 13 14class BoundedQueueExecutor(object): 15 """ 16 Wrap a concurrent.futures.Executor and limits the number of requests that 17 can be queued at once. Requests to submit() tasks block until 18 there is room in the queue. 19 20 The number of available slots in the queue is tracked with a 21 semaphore that is acquired before queueing an action, and 22 released when an action finishes. 23 24 Counts the number of exceptions thrown by tasks, and makes them 25 available from get_num_exceptions() after shutting down. 26 """ 27 28 def __init__(self, executor, queue_limit): 29 """ 30 :param executor: an executor to be wrapped 31 :type executor: concurrent.futures.Executor 32 :param queue_limit: a queue limit 33 :type queue_limit: int 34 """ 35 self.executor = executor 36 self.semaphore = threading.Semaphore(queue_limit) 37 self.num_exceptions = 0 38 39 def submit(self, fcn, *args, **kwargs): 40 """ 41 Start execution of a callable with the given optional and positional arguments 42 43 :param fcn: a callable object 44 :type fcn: callable 45 :return: a future object 46 :rtype: concurrent.futures.Future 47 """ 48 # Wait until there is room in the queue. 49 self.semaphore.acquire() 50 51 # Wrap the action in a function that will release 52 # the semaphore after it runs. 53 def run_it(): 54 try: 55 return fcn(*args, **kwargs) 56 except Exception: 57 self.num_exceptions += 1 58 raise 59 finally: 60 self.semaphore.release() 61 62 # Submit the wrapped action. 63 return self.executor.submit(run_it) 64 65 def shutdown(self): 66 """ 67 Shut an executor down. 68 """ 69 self.executor.shutdown() 70 71 def get_num_exceptions(self): 72 """ 73 Return a number of exceptions. 74 75 :rtype: int 76 """ 77 return self.num_exceptions 78