1# Copyright 2015 The Tornado Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15"""Asynchronous queues for coroutines.
16
17.. warning::
18
19   Unlike the standard library's `queue` module, the classes defined here
20   are *not* thread-safe. To use these queues from another thread,
21   use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
22   before calling any queue methods.
23"""
24# pylint: skip-file
25
26from __future__ import absolute_import, division, print_function
27
28import collections
29import heapq
30
31from salt.ext.tornado import gen, ioloop
32from salt.ext.tornado.concurrent import Future
33from salt.ext.tornado.locks import Event
34
35__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
36
37
38class QueueEmpty(Exception):
39    """Raised by `.Queue.get_nowait` when the queue has no items."""
40    pass
41
42
43class QueueFull(Exception):
44    """Raised by `.Queue.put_nowait` when a queue is at its maximum size."""
45    pass
46
47
48def _set_timeout(future, timeout):
49    if timeout:
50        def on_timeout():
51            future.set_exception(gen.TimeoutError())
52        io_loop = ioloop.IOLoop.current()
53        timeout_handle = io_loop.add_timeout(timeout, on_timeout)
54        future.add_done_callback(
55            lambda _: io_loop.remove_timeout(timeout_handle))
56
57
58class _QueueIterator(object):
59    def __init__(self, q):
60        self.q = q
61
62    def __anext__(self):
63        return self.q.get()
64
65
66class Queue(object):
67    """Coordinate producer and consumer coroutines.
68
69    If maxsize is 0 (the default) the queue size is unbounded.
70
71    .. testcode::
72
73        from salt.ext.tornado import gen
74        from salt.ext.tornado.ioloop import IOLoop
75        from salt.ext.tornado.queues import Queue
76
77        q = Queue(maxsize=2)
78
79        @gen.coroutine
80        def consumer():
81            while True:
82                item = yield q.get()
83                try:
84                    print('Doing work on %s' % item)
85                    yield gen.sleep(0.01)
86                finally:
87                    q.task_done()
88
89        @gen.coroutine
90        def producer():
91            for item in range(5):
92                yield q.put(item)
93                print('Put %s' % item)
94
95        @gen.coroutine
96        def main():
97            # Start consumer without waiting (since it never finishes).
98            IOLoop.current().spawn_callback(consumer)
99            yield producer()     # Wait for producer to put all tasks.
100            yield q.join()       # Wait for consumer to finish all tasks.
101            print('Done')
102
103        IOLoop.current().run_sync(main)
104
105    .. testoutput::
106
107        Put 0
108        Put 1
109        Doing work on 0
110        Put 2
111        Doing work on 1
112        Put 3
113        Doing work on 2
114        Put 4
115        Doing work on 3
116        Doing work on 4
117        Done
118
119    In Python 3.5, `Queue` implements the async iterator protocol, so
120    ``consumer()`` could be rewritten as::
121
122        async def consumer():
123            async for item in q:
124                try:
125                    print('Doing work on %s' % item)
126                    yield gen.sleep(0.01)
127                finally:
128                    q.task_done()
129
130    .. versionchanged:: 4.3
131       Added ``async for`` support in Python 3.5.
132
133    """
134    def __init__(self, maxsize=0):
135        if maxsize is None:
136            raise TypeError("maxsize can't be None")
137
138        if maxsize < 0:
139            raise ValueError("maxsize can't be negative")
140
141        self._maxsize = maxsize
142        self._init()
143        self._getters = collections.deque([])  # Futures.
144        self._putters = collections.deque([])  # Pairs of (item, Future).
145        self._unfinished_tasks = 0
146        self._finished = Event()
147        self._finished.set()
148
149    @property
150    def maxsize(self):
151        """Number of items allowed in the queue."""
152        return self._maxsize
153
154    def qsize(self):
155        """Number of items in the queue."""
156        return len(self._queue)
157
158    def empty(self):
159        return not self._queue
160
161    def full(self):
162        if self.maxsize == 0:
163            return False
164        else:
165            return self.qsize() >= self.maxsize
166
167    def put(self, item, timeout=None):
168        """Put an item into the queue, perhaps waiting until there is room.
169
170        Returns a Future, which raises `tornado.gen.TimeoutError` after a
171        timeout.
172        """
173        try:
174            self.put_nowait(item)
175        except QueueFull:
176            future = Future()
177            self._putters.append((item, future))
178            _set_timeout(future, timeout)
179            return future
180        else:
181            return gen._null_future
182
183    def put_nowait(self, item):
184        """Put an item into the queue without blocking.
185
186        If no free slot is immediately available, raise `QueueFull`.
187        """
188        self._consume_expired()
189        if self._getters:
190            assert self.empty(), "queue non-empty, why are getters waiting?"
191            getter = self._getters.popleft()
192            self.__put_internal(item)
193            getter.set_result(self._get())
194        elif self.full():
195            raise QueueFull
196        else:
197            self.__put_internal(item)
198
199    def get(self, timeout=None):
200        """Remove and return an item from the queue.
201
202        Returns a Future which resolves once an item is available, or raises
203        `tornado.gen.TimeoutError` after a timeout.
204        """
205        future = Future()
206        try:
207            future.set_result(self.get_nowait())
208        except QueueEmpty:
209            self._getters.append(future)
210            _set_timeout(future, timeout)
211        return future
212
213    def get_nowait(self):
214        """Remove and return an item from the queue without blocking.
215
216        Return an item if one is immediately available, else raise
217        `QueueEmpty`.
218        """
219        self._consume_expired()
220        if self._putters:
221            assert self.full(), "queue not full, why are putters waiting?"
222            item, putter = self._putters.popleft()
223            self.__put_internal(item)
224            putter.set_result(None)
225            return self._get()
226        elif self.qsize():
227            return self._get()
228        else:
229            raise QueueEmpty
230
231    def task_done(self):
232        """Indicate that a formerly enqueued task is complete.
233
234        Used by queue consumers. For each `.get` used to fetch a task, a
235        subsequent call to `.task_done` tells the queue that the processing
236        on the task is complete.
237
238        If a `.join` is blocking, it resumes when all items have been
239        processed; that is, when every `.put` is matched by a `.task_done`.
240
241        Raises `ValueError` if called more times than `.put`.
242        """
243        if self._unfinished_tasks <= 0:
244            raise ValueError('task_done() called too many times')
245        self._unfinished_tasks -= 1
246        if self._unfinished_tasks == 0:
247            self._finished.set()
248
249    def join(self, timeout=None):
250        """Block until all items in the queue are processed.
251
252        Returns a Future, which raises `tornado.gen.TimeoutError` after a
253        timeout.
254        """
255        return self._finished.wait(timeout)
256
257    def __aiter__(self):
258        return _QueueIterator(self)
259
260    # These three are overridable in subclasses.
261    def _init(self):
262        self._queue = collections.deque()
263
264    def _get(self):
265        return self._queue.popleft()
266
267    def _put(self, item):
268        self._queue.append(item)
269    # End of the overridable methods.
270
271    def __put_internal(self, item):
272        self._unfinished_tasks += 1
273        self._finished.clear()
274        self._put(item)
275
276    def _consume_expired(self):
277        # Remove timed-out waiters.
278        while self._putters and self._putters[0][1].done():
279            self._putters.popleft()
280
281        while self._getters and self._getters[0].done():
282            self._getters.popleft()
283
284    def __repr__(self):
285        return '<%s at %s %s>' % (
286            type(self).__name__, hex(id(self)), self._format())
287
288    def __str__(self):
289        return '<%s %s>' % (type(self).__name__, self._format())
290
291    def _format(self):
292        result = 'maxsize=%r' % (self.maxsize, )
293        if getattr(self, '_queue', None):
294            result += ' queue=%r' % self._queue
295        if self._getters:
296            result += ' getters[%s]' % len(self._getters)
297        if self._putters:
298            result += ' putters[%s]' % len(self._putters)
299        if self._unfinished_tasks:
300            result += ' tasks=%s' % self._unfinished_tasks
301        return result
302
303
304class PriorityQueue(Queue):
305    """A `.Queue` that retrieves entries in priority order, lowest first.
306
307    Entries are typically tuples like ``(priority number, data)``.
308
309    .. testcode::
310
311        from salt.ext.tornado.queues import PriorityQueue
312
313        q = PriorityQueue()
314        q.put((1, 'medium-priority item'))
315        q.put((0, 'high-priority item'))
316        q.put((10, 'low-priority item'))
317
318        print(q.get_nowait())
319        print(q.get_nowait())
320        print(q.get_nowait())
321
322    .. testoutput::
323
324        (0, 'high-priority item')
325        (1, 'medium-priority item')
326        (10, 'low-priority item')
327    """
328    def _init(self):
329        self._queue = []
330
331    def _put(self, item):
332        heapq.heappush(self._queue, item)
333
334    def _get(self):
335        return heapq.heappop(self._queue)
336
337
338class LifoQueue(Queue):
339    """A `.Queue` that retrieves the most recently put items first.
340
341    .. testcode::
342
343        from salt.ext.tornado.queues import LifoQueue
344
345        q = LifoQueue()
346        q.put(3)
347        q.put(2)
348        q.put(1)
349
350        print(q.get_nowait())
351        print(q.get_nowait())
352        print(q.get_nowait())
353
354    .. testoutput::
355
356        1
357        2
358        3
359    """
360    def _init(self):
361        self._queue = []
362
363    def _put(self, item):
364        self._queue.append(item)
365
366    def _get(self):
367        return self._queue.pop()
368