1# 2# Module implementing queues 3# 4# multiprocessing/queues.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 11 12import sys 13import os 14import threading 15import collections 16import time 17import types 18import weakref 19import errno 20 21from queue import Empty, Full 22 23try: 24 import _multiprocess as _multiprocessing 25except ImportError: 26 import _multiprocessing 27 28from . import connection 29from . import context 30_ForkingPickler = context.reduction.ForkingPickler 31 32from .util import debug, info, Finalize, register_after_fork, is_exiting 33 34# 35# Queue type using a pipe, buffer and thread 36# 37 38class Queue(object): 39 40 def __init__(self, maxsize=0, *, ctx): 41 if maxsize <= 0: 42 # Can raise ImportError (see issues #3770 and #23400) 43 from .synchronize import SEM_VALUE_MAX as maxsize 44 self._maxsize = maxsize 45 self._reader, self._writer = connection.Pipe(duplex=False) 46 self._rlock = ctx.Lock() 47 self._opid = os.getpid() 48 if sys.platform == 'win32': 49 self._wlock = None 50 else: 51 self._wlock = ctx.Lock() 52 self._sem = ctx.BoundedSemaphore(maxsize) 53 # For use by concurrent.futures 54 self._ignore_epipe = False 55 self._reset() 56 57 if sys.platform != 'win32': 58 register_after_fork(self, Queue._after_fork) 59 60 def __getstate__(self): 61 context.assert_spawning(self) 62 return (self._ignore_epipe, self._maxsize, self._reader, self._writer, 63 self._rlock, self._wlock, self._sem, self._opid) 64 65 def __setstate__(self, state): 66 (self._ignore_epipe, self._maxsize, self._reader, self._writer, 67 self._rlock, self._wlock, self._sem, self._opid) = state 68 self._reset() 69 70 def _after_fork(self): 71 debug('Queue._after_fork()') 72 self._reset(after_fork=True) 73 74 def _reset(self, after_fork=False): 75 if after_fork: 76 self._notempty._at_fork_reinit() 77 else: 78 self._notempty = threading.Condition(threading.Lock()) 79 self._buffer = collections.deque() 80 self._thread = None 81 self._jointhread = None 82 self._joincancelled = False 83 self._closed = False 84 self._close = None 85 self._send_bytes = self._writer.send_bytes 86 self._recv_bytes = self._reader.recv_bytes 87 self._poll = self._reader.poll 88 89 def put(self, obj, block=True, timeout=None): 90 if self._closed: 91 raise ValueError(f"Queue {self!r} is closed") 92 if not self._sem.acquire(block, timeout): 93 raise Full 94 95 with self._notempty: 96 if self._thread is None: 97 self._start_thread() 98 self._buffer.append(obj) 99 self._notempty.notify() 100 101 def get(self, block=True, timeout=None): 102 if self._closed: 103 raise ValueError(f"Queue {self!r} is closed") 104 if block and timeout is None: 105 with self._rlock: 106 res = self._recv_bytes() 107 self._sem.release() 108 else: 109 if block: 110 deadline = getattr(time,'monotonic',time.time)() + timeout 111 if not self._rlock.acquire(block, timeout): 112 raise Empty 113 try: 114 if block: 115 timeout = deadline - getattr(time,'monotonic',time.time)() 116 if not self._poll(timeout): 117 raise Empty 118 elif not self._poll(): 119 raise Empty 120 res = self._recv_bytes() 121 self._sem.release() 122 finally: 123 self._rlock.release() 124 # unserialize the data after having released the lock 125 return _ForkingPickler.loads(res) 126 127 def qsize(self): 128 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 129 return self._maxsize - self._sem._semlock._get_value() 130 131 def empty(self): 132 return not self._poll() 133 134 def full(self): 135 return self._sem._semlock._is_zero() 136 137 def get_nowait(self): 138 return self.get(False) 139 140 def put_nowait(self, obj): 141 return self.put(obj, False) 142 143 def close(self): 144 self._closed = True 145 try: 146 self._reader.close() 147 finally: 148 close = self._close 149 if close: 150 self._close = None 151 close() 152 153 def join_thread(self): 154 debug('Queue.join_thread()') 155 assert self._closed, "Queue {0!r} not closed".format(self) 156 if self._jointhread: 157 self._jointhread() 158 159 def cancel_join_thread(self): 160 debug('Queue.cancel_join_thread()') 161 self._joincancelled = True 162 try: 163 self._jointhread.cancel() 164 except AttributeError: 165 pass 166 167 def _start_thread(self): 168 debug('Queue._start_thread()') 169 170 # Start thread which transfers data from buffer to pipe 171 self._buffer.clear() 172 self._thread = threading.Thread( 173 target=Queue._feed, 174 args=(self._buffer, self._notempty, self._send_bytes, 175 self._wlock, self._writer.close, self._ignore_epipe, 176 self._on_queue_feeder_error, self._sem), 177 name='QueueFeederThread' 178 ) 179 self._thread.daemon = True 180 181 debug('doing self._thread.start()') 182 self._thread.start() 183 debug('... done self._thread.start()') 184 185 if not self._joincancelled: 186 self._jointhread = Finalize( 187 self._thread, Queue._finalize_join, 188 [weakref.ref(self._thread)], 189 exitpriority=-5 190 ) 191 192 # Send sentinel to the thread queue object when garbage collected 193 self._close = Finalize( 194 self, Queue._finalize_close, 195 [self._buffer, self._notempty], 196 exitpriority=10 197 ) 198 199 @staticmethod 200 def _finalize_join(twr): 201 debug('joining queue thread') 202 thread = twr() 203 if thread is not None: 204 thread.join() 205 debug('... queue thread joined') 206 else: 207 debug('... queue thread already dead') 208 209 @staticmethod 210 def _finalize_close(buffer, notempty): 211 debug('telling queue thread to quit') 212 with notempty: 213 buffer.append(_sentinel) 214 notempty.notify() 215 216 @staticmethod 217 def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, 218 onerror, queue_sem): 219 debug('starting thread to feed data to pipe') 220 nacquire = notempty.acquire 221 nrelease = notempty.release 222 nwait = notempty.wait 223 bpopleft = buffer.popleft 224 sentinel = _sentinel 225 if sys.platform != 'win32': 226 wacquire = writelock.acquire 227 wrelease = writelock.release 228 else: 229 wacquire = None 230 231 while 1: 232 try: 233 nacquire() 234 try: 235 if not buffer: 236 nwait() 237 finally: 238 nrelease() 239 try: 240 while 1: 241 obj = bpopleft() 242 if obj is sentinel: 243 debug('feeder thread got sentinel -- exiting') 244 close() 245 return 246 247 # serialize the data before acquiring the lock 248 obj = _ForkingPickler.dumps(obj) 249 if wacquire is None: 250 send_bytes(obj) 251 else: 252 wacquire() 253 try: 254 send_bytes(obj) 255 finally: 256 wrelease() 257 except IndexError: 258 pass 259 except Exception as e: 260 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: 261 return 262 # Since this runs in a daemon thread the resources it uses 263 # may be become unusable while the process is cleaning up. 264 # We ignore errors which happen after the process has 265 # started to cleanup. 266 if is_exiting(): 267 info('error in queue thread: %s', e) 268 return 269 else: 270 # Since the object has not been sent in the queue, we need 271 # to decrease the size of the queue. The error acts as 272 # if the object had been silently removed from the queue 273 # and this step is necessary to have a properly working 274 # queue. 275 queue_sem.release() 276 onerror(e, obj) 277 278 @staticmethod 279 def _on_queue_feeder_error(e, obj): 280 """ 281 Private API hook called when feeding data in the background thread 282 raises an exception. For overriding by concurrent.futures. 283 """ 284 import traceback 285 traceback.print_exc() 286 287 288_sentinel = object() 289 290# 291# A queue type which also supports join() and task_done() methods 292# 293# Note that if you do not call task_done() for each finished task then 294# eventually the counter's semaphore may overflow causing Bad Things 295# to happen. 296# 297 298class JoinableQueue(Queue): 299 300 def __init__(self, maxsize=0, *, ctx): 301 Queue.__init__(self, maxsize, ctx=ctx) 302 self._unfinished_tasks = ctx.Semaphore(0) 303 self._cond = ctx.Condition() 304 305 def __getstate__(self): 306 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 307 308 def __setstate__(self, state): 309 Queue.__setstate__(self, state[:-2]) 310 self._cond, self._unfinished_tasks = state[-2:] 311 312 def put(self, obj, block=True, timeout=None): 313 if self._closed: 314 raise ValueError(f"Queue {self!r} is closed") 315 if not self._sem.acquire(block, timeout): 316 raise Full 317 318 with self._notempty, self._cond: 319 if self._thread is None: 320 self._start_thread() 321 self._buffer.append(obj) 322 self._unfinished_tasks.release() 323 self._notempty.notify() 324 325 def task_done(self): 326 with self._cond: 327 if not self._unfinished_tasks.acquire(False): 328 raise ValueError('task_done() called too many times') 329 if self._unfinished_tasks._semlock._is_zero(): 330 self._cond.notify_all() 331 332 def join(self): 333 with self._cond: 334 if not self._unfinished_tasks._semlock._is_zero(): 335 self._cond.wait() 336 337# 338# Simplified Queue type -- really just a locked pipe 339# 340 341class SimpleQueue(object): 342 343 def __init__(self, *, ctx): 344 self._reader, self._writer = connection.Pipe(duplex=False) 345 self._rlock = ctx.Lock() 346 self._poll = self._reader.poll 347 if sys.platform == 'win32': 348 self._wlock = None 349 else: 350 self._wlock = ctx.Lock() 351 352 def close(self): 353 self._reader.close() 354 self._writer.close() 355 356 def empty(self): 357 return not self._poll() 358 359 def __getstate__(self): 360 context.assert_spawning(self) 361 return (self._reader, self._writer, self._rlock, self._wlock) 362 363 def __setstate__(self, state): 364 (self._reader, self._writer, self._rlock, self._wlock) = state 365 self._poll = self._reader.poll 366 367 def get(self): 368 with self._rlock: 369 res = self._reader.recv_bytes() 370 # unserialize the data after having released the lock 371 return _ForkingPickler.loads(res) 372 373 def put(self, obj): 374 # serialize the data before acquiring the lock 375 obj = _ForkingPickler.dumps(obj) 376 if self._wlock is None: 377 # writes to a message oriented win32 pipe are atomic 378 self._writer.send_bytes(obj) 379 else: 380 with self._wlock: 381 self._writer.send_bytes(obj) 382 383 __class_getitem__ = classmethod(types.GenericAlias) 384