1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') 2 3import collections 4import heapq 5import warnings 6 7from . import events 8from . import locks 9 10 11class QueueEmpty(Exception): 12 """Raised when Queue.get_nowait() is called on an empty Queue.""" 13 pass 14 15 16class QueueFull(Exception): 17 """Raised when the Queue.put_nowait() method is called on a full Queue.""" 18 pass 19 20 21class Queue: 22 """A queue, useful for coordinating producer and consumer coroutines. 23 24 If maxsize is less than or equal to zero, the queue size is infinite. If it 25 is an integer greater than 0, then "await put()" will block when the 26 queue reaches maxsize, until an item is removed by get(). 27 28 Unlike the standard library Queue, you can reliably know this Queue's size 29 with qsize(), since your single-threaded asyncio application won't be 30 interrupted between calling qsize() and doing an operation on the Queue. 31 """ 32 33 def __init__(self, maxsize=0, *, loop=None): 34 if loop is None: 35 self._loop = events.get_event_loop() 36 else: 37 self._loop = loop 38 warnings.warn("The loop argument is deprecated since Python 3.8, " 39 "and scheduled for removal in Python 3.10.", 40 DeprecationWarning, stacklevel=2) 41 self._maxsize = maxsize 42 43 # Futures. 44 self._getters = collections.deque() 45 # Futures. 46 self._putters = collections.deque() 47 self._unfinished_tasks = 0 48 self._finished = locks.Event(loop=loop) 49 self._finished.set() 50 self._init(maxsize) 51 52 # These three are overridable in subclasses. 53 54 def _init(self, maxsize): 55 self._queue = collections.deque() 56 57 def _get(self): 58 return self._queue.popleft() 59 60 def _put(self, item): 61 self._queue.append(item) 62 63 # End of the overridable methods. 64 65 def _wakeup_next(self, waiters): 66 # Wake up the next waiter (if any) that isn't cancelled. 67 while waiters: 68 waiter = waiters.popleft() 69 if not waiter.done(): 70 waiter.set_result(None) 71 break 72 73 def __repr__(self): 74 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 75 76 def __str__(self): 77 return f'<{type(self).__name__} {self._format()}>' 78 79 def _format(self): 80 result = f'maxsize={self._maxsize!r}' 81 if getattr(self, '_queue', None): 82 result += f' _queue={list(self._queue)!r}' 83 if self._getters: 84 result += f' _getters[{len(self._getters)}]' 85 if self._putters: 86 result += f' _putters[{len(self._putters)}]' 87 if self._unfinished_tasks: 88 result += f' tasks={self._unfinished_tasks}' 89 return result 90 91 def qsize(self): 92 """Number of items in the queue.""" 93 return len(self._queue) 94 95 @property 96 def maxsize(self): 97 """Number of items allowed in the queue.""" 98 return self._maxsize 99 100 def empty(self): 101 """Return True if the queue is empty, False otherwise.""" 102 return not self._queue 103 104 def full(self): 105 """Return True if there are maxsize items in the queue. 106 107 Note: if the Queue was initialized with maxsize=0 (the default), 108 then full() is never True. 109 """ 110 if self._maxsize <= 0: 111 return False 112 else: 113 return self.qsize() >= self._maxsize 114 115 async def put(self, item): 116 """Put an item into the queue. 117 118 Put an item into the queue. If the queue is full, wait until a free 119 slot is available before adding item. 120 """ 121 while self.full(): 122 putter = self._loop.create_future() 123 self._putters.append(putter) 124 try: 125 await putter 126 except: 127 putter.cancel() # Just in case putter is not done yet. 128 try: 129 # Clean self._putters from canceled putters. 130 self._putters.remove(putter) 131 except ValueError: 132 # The putter could be removed from self._putters by a 133 # previous get_nowait call. 134 pass 135 if not self.full() and not putter.cancelled(): 136 # We were woken up by get_nowait(), but can't take 137 # the call. Wake up the next in line. 138 self._wakeup_next(self._putters) 139 raise 140 return self.put_nowait(item) 141 142 def put_nowait(self, item): 143 """Put an item into the queue without blocking. 144 145 If no free slot is immediately available, raise QueueFull. 146 """ 147 if self.full(): 148 raise QueueFull 149 self._put(item) 150 self._unfinished_tasks += 1 151 self._finished.clear() 152 self._wakeup_next(self._getters) 153 154 async def get(self): 155 """Remove and return an item from the queue. 156 157 If queue is empty, wait until an item is available. 158 """ 159 while self.empty(): 160 getter = self._loop.create_future() 161 self._getters.append(getter) 162 try: 163 await getter 164 except: 165 getter.cancel() # Just in case getter is not done yet. 166 try: 167 # Clean self._getters from canceled getters. 168 self._getters.remove(getter) 169 except ValueError: 170 # The getter could be removed from self._getters by a 171 # previous put_nowait call. 172 pass 173 if not self.empty() and not getter.cancelled(): 174 # We were woken up by put_nowait(), but can't take 175 # the call. Wake up the next in line. 176 self._wakeup_next(self._getters) 177 raise 178 return self.get_nowait() 179 180 def get_nowait(self): 181 """Remove and return an item from the queue. 182 183 Return an item if one is immediately available, else raise QueueEmpty. 184 """ 185 if self.empty(): 186 raise QueueEmpty 187 item = self._get() 188 self._wakeup_next(self._putters) 189 return item 190 191 def task_done(self): 192 """Indicate that a formerly enqueued task is complete. 193 194 Used by queue consumers. For each get() used to fetch a task, 195 a subsequent call to task_done() tells the queue that the processing 196 on the task is complete. 197 198 If a join() is currently blocking, it will resume when all items have 199 been processed (meaning that a task_done() call was received for every 200 item that had been put() into the queue). 201 202 Raises ValueError if called more times than there were items placed in 203 the queue. 204 """ 205 if self._unfinished_tasks <= 0: 206 raise ValueError('task_done() called too many times') 207 self._unfinished_tasks -= 1 208 if self._unfinished_tasks == 0: 209 self._finished.set() 210 211 async def join(self): 212 """Block until all items in the queue have been gotten and processed. 213 214 The count of unfinished tasks goes up whenever an item is added to the 215 queue. The count goes down whenever a consumer calls task_done() to 216 indicate that the item was retrieved and all work on it is complete. 217 When the count of unfinished tasks drops to zero, join() unblocks. 218 """ 219 if self._unfinished_tasks > 0: 220 await self._finished.wait() 221 222 223class PriorityQueue(Queue): 224 """A subclass of Queue; retrieves entries in priority order (lowest first). 225 226 Entries are typically tuples of the form: (priority number, data). 227 """ 228 229 def _init(self, maxsize): 230 self._queue = [] 231 232 def _put(self, item, heappush=heapq.heappush): 233 heappush(self._queue, item) 234 235 def _get(self, heappop=heapq.heappop): 236 return heappop(self._queue) 237 238 239class LifoQueue(Queue): 240 """A subclass of Queue that retrieves most recently added entries first.""" 241 242 def _init(self, maxsize): 243 self._queue = [] 244 245 def _put(self, item): 246 self._queue.append(item) 247 248 def _get(self): 249 return self._queue.pop() 250