1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4"""Implements ThreadPoolExecutor.""" 5 6__author__ = 'Brian Quinlan (brian@sweetapp.com)' 7 8import atexit 9from concurrent.futures import _base 10import itertools 11import queue 12import threading 13import weakref 14import os 15 16# Workers are created as daemon threads. This is done to allow the interpreter 17# to exit when there are still idle threads in a ThreadPoolExecutor's thread 18# pool (i.e. shutdown() was not called). However, allowing workers to die with 19# the interpreter has two undesirable properties: 20# - The workers would still be running during interpreter shutdown, 21# meaning that they would fail in unpredictable ways. 22# - The workers could be killed while evaluating a work item, which could 23# be bad if the callable being evaluated has external side-effects e.g. 24# writing to a file. 25# 26# To work around this problem, an exit handler is installed which tells the 27# workers to exit when their work queues are empty and then waits until the 28# threads finish. 29 30_threads_queues = weakref.WeakKeyDictionary() 31_shutdown = False 32 33def _python_exit(): 34 global _shutdown 35 _shutdown = True 36 items = list(_threads_queues.items()) 37 for t, q in items: 38 q.put(None) 39 for t, q in items: 40 t.join() 41 42atexit.register(_python_exit) 43 44 45class _WorkItem(object): 46 def __init__(self, future, fn, args, kwargs): 47 self.future = future 48 self.fn = fn 49 self.args = args 50 self.kwargs = kwargs 51 52 def run(self): 53 if not self.future.set_running_or_notify_cancel(): 54 return 55 56 try: 57 result = self.fn(*self.args, **self.kwargs) 58 except BaseException as exc: 59 self.future.set_exception(exc) 60 # Break a reference cycle with the exception 'exc' 61 self = None 62 else: 63 self.future.set_result(result) 64 65 66def _worker(executor_reference, work_queue, initializer, initargs): 67 if initializer is not None: 68 try: 69 initializer(*initargs) 70 except BaseException: 71 _base.LOGGER.critical('Exception in initializer:', exc_info=True) 72 executor = executor_reference() 73 if executor is not None: 74 executor._initializer_failed() 75 return 76 try: 77 while True: 78 work_item = work_queue.get(block=True) 79 if work_item is not None: 80 work_item.run() 81 # Delete references to object. See issue16284 82 del work_item 83 84 # attempt to increment idle count 85 executor = executor_reference() 86 if executor is not None: 87 executor._idle_semaphore.release() 88 del executor 89 continue 90 91 executor = executor_reference() 92 # Exit if: 93 # - The interpreter is shutting down OR 94 # - The executor that owns the worker has been collected OR 95 # - The executor that owns the worker has been shutdown. 96 if _shutdown or executor is None or executor._shutdown: 97 # Flag the executor as shutting down as early as possible if it 98 # is not gc-ed yet. 99 if executor is not None: 100 executor._shutdown = True 101 # Notice other workers 102 work_queue.put(None) 103 return 104 del executor 105 except BaseException: 106 _base.LOGGER.critical('Exception in worker', exc_info=True) 107 108 109class BrokenThreadPool(_base.BrokenExecutor): 110 """ 111 Raised when a worker thread in a ThreadPoolExecutor failed initializing. 112 """ 113 114 115class ThreadPoolExecutor(_base.Executor): 116 117 # Used to assign unique thread names when thread_name_prefix is not supplied. 118 _counter = itertools.count().__next__ 119 120 def __init__(self, max_workers=None, thread_name_prefix='', 121 initializer=None, initargs=()): 122 """Initializes a new ThreadPoolExecutor instance. 123 124 Args: 125 max_workers: The maximum number of threads that can be used to 126 execute the given calls. 127 thread_name_prefix: An optional name prefix to give our threads. 128 initializer: A callable used to initialize worker threads. 129 initargs: A tuple of arguments to pass to the initializer. 130 """ 131 if max_workers is None: 132 # ThreadPoolExecutor is often used to: 133 # * CPU bound task which releases GIL 134 # * I/O bound task (which releases GIL, of course) 135 # 136 # We use cpu_count + 4 for both types of tasks. 137 # But we limit it to 32 to avoid consuming surprisingly large resource 138 # on many core machine. 139 max_workers = min(32, (os.cpu_count() or 1) + 4) 140 if max_workers <= 0: 141 raise ValueError("max_workers must be greater than 0") 142 143 if initializer is not None and not callable(initializer): 144 raise TypeError("initializer must be a callable") 145 146 self._max_workers = max_workers 147 self._work_queue = queue.SimpleQueue() 148 self._idle_semaphore = threading.Semaphore(0) 149 self._threads = set() 150 self._broken = False 151 self._shutdown = False 152 self._shutdown_lock = threading.Lock() 153 self._thread_name_prefix = (thread_name_prefix or 154 ("ThreadPoolExecutor-%d" % self._counter())) 155 self._initializer = initializer 156 self._initargs = initargs 157 158 def submit(*args, **kwargs): 159 if len(args) >= 2: 160 self, fn, *args = args 161 elif not args: 162 raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object " 163 "needs an argument") 164 elif 'fn' in kwargs: 165 fn = kwargs.pop('fn') 166 self, *args = args 167 import warnings 168 warnings.warn("Passing 'fn' as keyword argument is deprecated", 169 DeprecationWarning, stacklevel=2) 170 else: 171 raise TypeError('submit expected at least 1 positional argument, ' 172 'got %d' % (len(args)-1)) 173 174 with self._shutdown_lock: 175 if self._broken: 176 raise BrokenThreadPool(self._broken) 177 178 if self._shutdown: 179 raise RuntimeError('cannot schedule new futures after shutdown') 180 if _shutdown: 181 raise RuntimeError('cannot schedule new futures after ' 182 'interpreter shutdown') 183 184 f = _base.Future() 185 w = _WorkItem(f, fn, args, kwargs) 186 187 self._work_queue.put(w) 188 self._adjust_thread_count() 189 return f 190 submit.__text_signature__ = _base.Executor.submit.__text_signature__ 191 submit.__doc__ = _base.Executor.submit.__doc__ 192 193 def _adjust_thread_count(self): 194 # if idle threads are available, don't spin new threads 195 if self._idle_semaphore.acquire(timeout=0): 196 return 197 198 # When the executor gets lost, the weakref callback will wake up 199 # the worker threads. 200 def weakref_cb(_, q=self._work_queue): 201 q.put(None) 202 203 num_threads = len(self._threads) 204 if num_threads < self._max_workers: 205 thread_name = '%s_%d' % (self._thread_name_prefix or self, 206 num_threads) 207 t = threading.Thread(name=thread_name, target=_worker, 208 args=(weakref.ref(self, weakref_cb), 209 self._work_queue, 210 self._initializer, 211 self._initargs)) 212 t.daemon = True 213 t.start() 214 self._threads.add(t) 215 _threads_queues[t] = self._work_queue 216 217 def _initializer_failed(self): 218 with self._shutdown_lock: 219 self._broken = ('A thread initializer failed, the thread pool ' 220 'is not usable anymore') 221 # Drain work queue and mark pending futures failed 222 while True: 223 try: 224 work_item = self._work_queue.get_nowait() 225 except queue.Empty: 226 break 227 if work_item is not None: 228 work_item.future.set_exception(BrokenThreadPool(self._broken)) 229 230 def shutdown(self, wait=True): 231 with self._shutdown_lock: 232 self._shutdown = True 233 self._work_queue.put(None) 234 if wait: 235 for t in self._threads: 236 t.join() 237 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 238