1# Copyright 2015 The Tornado Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15"""Asynchronous queues for coroutines. 16 17.. warning:: 18 19 Unlike the standard library's `queue` module, the classes defined here 20 are *not* thread-safe. To use these queues from another thread, 21 use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread 22 before calling any queue methods. 23""" 24# pylint: skip-file 25 26from __future__ import absolute_import, division, print_function 27 28import collections 29import heapq 30 31from salt.ext.tornado import gen, ioloop 32from salt.ext.tornado.concurrent import Future 33from salt.ext.tornado.locks import Event 34 35__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty'] 36 37 38class QueueEmpty(Exception): 39 """Raised by `.Queue.get_nowait` when the queue has no items.""" 40 pass 41 42 43class QueueFull(Exception): 44 """Raised by `.Queue.put_nowait` when a queue is at its maximum size.""" 45 pass 46 47 48def _set_timeout(future, timeout): 49 if timeout: 50 def on_timeout(): 51 future.set_exception(gen.TimeoutError()) 52 io_loop = ioloop.IOLoop.current() 53 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 54 future.add_done_callback( 55 lambda _: io_loop.remove_timeout(timeout_handle)) 56 57 58class _QueueIterator(object): 59 def __init__(self, q): 60 self.q = q 61 62 def __anext__(self): 63 return self.q.get() 64 65 66class Queue(object): 67 """Coordinate producer and consumer coroutines. 68 69 If maxsize is 0 (the default) the queue size is unbounded. 70 71 .. testcode:: 72 73 from salt.ext.tornado import gen 74 from salt.ext.tornado.ioloop import IOLoop 75 from salt.ext.tornado.queues import Queue 76 77 q = Queue(maxsize=2) 78 79 @gen.coroutine 80 def consumer(): 81 while True: 82 item = yield q.get() 83 try: 84 print('Doing work on %s' % item) 85 yield gen.sleep(0.01) 86 finally: 87 q.task_done() 88 89 @gen.coroutine 90 def producer(): 91 for item in range(5): 92 yield q.put(item) 93 print('Put %s' % item) 94 95 @gen.coroutine 96 def main(): 97 # Start consumer without waiting (since it never finishes). 98 IOLoop.current().spawn_callback(consumer) 99 yield producer() # Wait for producer to put all tasks. 100 yield q.join() # Wait for consumer to finish all tasks. 101 print('Done') 102 103 IOLoop.current().run_sync(main) 104 105 .. testoutput:: 106 107 Put 0 108 Put 1 109 Doing work on 0 110 Put 2 111 Doing work on 1 112 Put 3 113 Doing work on 2 114 Put 4 115 Doing work on 3 116 Doing work on 4 117 Done 118 119 In Python 3.5, `Queue` implements the async iterator protocol, so 120 ``consumer()`` could be rewritten as:: 121 122 async def consumer(): 123 async for item in q: 124 try: 125 print('Doing work on %s' % item) 126 yield gen.sleep(0.01) 127 finally: 128 q.task_done() 129 130 .. versionchanged:: 4.3 131 Added ``async for`` support in Python 3.5. 132 133 """ 134 def __init__(self, maxsize=0): 135 if maxsize is None: 136 raise TypeError("maxsize can't be None") 137 138 if maxsize < 0: 139 raise ValueError("maxsize can't be negative") 140 141 self._maxsize = maxsize 142 self._init() 143 self._getters = collections.deque([]) # Futures. 144 self._putters = collections.deque([]) # Pairs of (item, Future). 145 self._unfinished_tasks = 0 146 self._finished = Event() 147 self._finished.set() 148 149 @property 150 def maxsize(self): 151 """Number of items allowed in the queue.""" 152 return self._maxsize 153 154 def qsize(self): 155 """Number of items in the queue.""" 156 return len(self._queue) 157 158 def empty(self): 159 return not self._queue 160 161 def full(self): 162 if self.maxsize == 0: 163 return False 164 else: 165 return self.qsize() >= self.maxsize 166 167 def put(self, item, timeout=None): 168 """Put an item into the queue, perhaps waiting until there is room. 169 170 Returns a Future, which raises `tornado.gen.TimeoutError` after a 171 timeout. 172 """ 173 try: 174 self.put_nowait(item) 175 except QueueFull: 176 future = Future() 177 self._putters.append((item, future)) 178 _set_timeout(future, timeout) 179 return future 180 else: 181 return gen._null_future 182 183 def put_nowait(self, item): 184 """Put an item into the queue without blocking. 185 186 If no free slot is immediately available, raise `QueueFull`. 187 """ 188 self._consume_expired() 189 if self._getters: 190 assert self.empty(), "queue non-empty, why are getters waiting?" 191 getter = self._getters.popleft() 192 self.__put_internal(item) 193 getter.set_result(self._get()) 194 elif self.full(): 195 raise QueueFull 196 else: 197 self.__put_internal(item) 198 199 def get(self, timeout=None): 200 """Remove and return an item from the queue. 201 202 Returns a Future which resolves once an item is available, or raises 203 `tornado.gen.TimeoutError` after a timeout. 204 """ 205 future = Future() 206 try: 207 future.set_result(self.get_nowait()) 208 except QueueEmpty: 209 self._getters.append(future) 210 _set_timeout(future, timeout) 211 return future 212 213 def get_nowait(self): 214 """Remove and return an item from the queue without blocking. 215 216 Return an item if one is immediately available, else raise 217 `QueueEmpty`. 218 """ 219 self._consume_expired() 220 if self._putters: 221 assert self.full(), "queue not full, why are putters waiting?" 222 item, putter = self._putters.popleft() 223 self.__put_internal(item) 224 putter.set_result(None) 225 return self._get() 226 elif self.qsize(): 227 return self._get() 228 else: 229 raise QueueEmpty 230 231 def task_done(self): 232 """Indicate that a formerly enqueued task is complete. 233 234 Used by queue consumers. For each `.get` used to fetch a task, a 235 subsequent call to `.task_done` tells the queue that the processing 236 on the task is complete. 237 238 If a `.join` is blocking, it resumes when all items have been 239 processed; that is, when every `.put` is matched by a `.task_done`. 240 241 Raises `ValueError` if called more times than `.put`. 242 """ 243 if self._unfinished_tasks <= 0: 244 raise ValueError('task_done() called too many times') 245 self._unfinished_tasks -= 1 246 if self._unfinished_tasks == 0: 247 self._finished.set() 248 249 def join(self, timeout=None): 250 """Block until all items in the queue are processed. 251 252 Returns a Future, which raises `tornado.gen.TimeoutError` after a 253 timeout. 254 """ 255 return self._finished.wait(timeout) 256 257 def __aiter__(self): 258 return _QueueIterator(self) 259 260 # These three are overridable in subclasses. 261 def _init(self): 262 self._queue = collections.deque() 263 264 def _get(self): 265 return self._queue.popleft() 266 267 def _put(self, item): 268 self._queue.append(item) 269 # End of the overridable methods. 270 271 def __put_internal(self, item): 272 self._unfinished_tasks += 1 273 self._finished.clear() 274 self._put(item) 275 276 def _consume_expired(self): 277 # Remove timed-out waiters. 278 while self._putters and self._putters[0][1].done(): 279 self._putters.popleft() 280 281 while self._getters and self._getters[0].done(): 282 self._getters.popleft() 283 284 def __repr__(self): 285 return '<%s at %s %s>' % ( 286 type(self).__name__, hex(id(self)), self._format()) 287 288 def __str__(self): 289 return '<%s %s>' % (type(self).__name__, self._format()) 290 291 def _format(self): 292 result = 'maxsize=%r' % (self.maxsize, ) 293 if getattr(self, '_queue', None): 294 result += ' queue=%r' % self._queue 295 if self._getters: 296 result += ' getters[%s]' % len(self._getters) 297 if self._putters: 298 result += ' putters[%s]' % len(self._putters) 299 if self._unfinished_tasks: 300 result += ' tasks=%s' % self._unfinished_tasks 301 return result 302 303 304class PriorityQueue(Queue): 305 """A `.Queue` that retrieves entries in priority order, lowest first. 306 307 Entries are typically tuples like ``(priority number, data)``. 308 309 .. testcode:: 310 311 from salt.ext.tornado.queues import PriorityQueue 312 313 q = PriorityQueue() 314 q.put((1, 'medium-priority item')) 315 q.put((0, 'high-priority item')) 316 q.put((10, 'low-priority item')) 317 318 print(q.get_nowait()) 319 print(q.get_nowait()) 320 print(q.get_nowait()) 321 322 .. testoutput:: 323 324 (0, 'high-priority item') 325 (1, 'medium-priority item') 326 (10, 'low-priority item') 327 """ 328 def _init(self): 329 self._queue = [] 330 331 def _put(self, item): 332 heapq.heappush(self._queue, item) 333 334 def _get(self): 335 return heapq.heappop(self._queue) 336 337 338class LifoQueue(Queue): 339 """A `.Queue` that retrieves the most recently put items first. 340 341 .. testcode:: 342 343 from salt.ext.tornado.queues import LifoQueue 344 345 q = LifoQueue() 346 q.put(3) 347 q.put(2) 348 q.put(1) 349 350 print(q.get_nowait()) 351 print(q.get_nowait()) 352 print(q.get_nowait()) 353 354 .. testoutput:: 355 356 1 357 2 358 3 359 """ 360 def _init(self): 361 self._queue = [] 362 363 def _put(self, item): 364 self._queue.append(item) 365 366 def _get(self): 367 return self._queue.pop() 368