1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
2
3import collections
4import heapq
5import warnings
6
7from . import events
8from . import locks
9
10
11class QueueEmpty(Exception):
12    """Raised when Queue.get_nowait() is called on an empty Queue."""
13    pass
14
15
16class QueueFull(Exception):
17    """Raised when the Queue.put_nowait() method is called on a full Queue."""
18    pass
19
20
21class Queue:
22    """A queue, useful for coordinating producer and consumer coroutines.
23
24    If maxsize is less than or equal to zero, the queue size is infinite. If it
25    is an integer greater than 0, then "await put()" will block when the
26    queue reaches maxsize, until an item is removed by get().
27
28    Unlike the standard library Queue, you can reliably know this Queue's size
29    with qsize(), since your single-threaded asyncio application won't be
30    interrupted between calling qsize() and doing an operation on the Queue.
31    """
32
33    def __init__(self, maxsize=0, *, loop=None):
34        if loop is None:
35            self._loop = events.get_event_loop()
36        else:
37            self._loop = loop
38            warnings.warn("The loop argument is deprecated since Python 3.8, "
39                          "and scheduled for removal in Python 3.10.",
40                          DeprecationWarning, stacklevel=2)
41        self._maxsize = maxsize
42
43        # Futures.
44        self._getters = collections.deque()
45        # Futures.
46        self._putters = collections.deque()
47        self._unfinished_tasks = 0
48        self._finished = locks.Event(loop=loop)
49        self._finished.set()
50        self._init(maxsize)
51
52    # These three are overridable in subclasses.
53
54    def _init(self, maxsize):
55        self._queue = collections.deque()
56
57    def _get(self):
58        return self._queue.popleft()
59
60    def _put(self, item):
61        self._queue.append(item)
62
63    # End of the overridable methods.
64
65    def _wakeup_next(self, waiters):
66        # Wake up the next waiter (if any) that isn't cancelled.
67        while waiters:
68            waiter = waiters.popleft()
69            if not waiter.done():
70                waiter.set_result(None)
71                break
72
73    def __repr__(self):
74        return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
75
76    def __str__(self):
77        return f'<{type(self).__name__} {self._format()}>'
78
79    def __class_getitem__(cls, type):
80        return cls
81
82    def _format(self):
83        result = f'maxsize={self._maxsize!r}'
84        if getattr(self, '_queue', None):
85            result += f' _queue={list(self._queue)!r}'
86        if self._getters:
87            result += f' _getters[{len(self._getters)}]'
88        if self._putters:
89            result += f' _putters[{len(self._putters)}]'
90        if self._unfinished_tasks:
91            result += f' tasks={self._unfinished_tasks}'
92        return result
93
94    def qsize(self):
95        """Number of items in the queue."""
96        return len(self._queue)
97
98    @property
99    def maxsize(self):
100        """Number of items allowed in the queue."""
101        return self._maxsize
102
103    def empty(self):
104        """Return True if the queue is empty, False otherwise."""
105        return not self._queue
106
107    def full(self):
108        """Return True if there are maxsize items in the queue.
109
110        Note: if the Queue was initialized with maxsize=0 (the default),
111        then full() is never True.
112        """
113        if self._maxsize <= 0:
114            return False
115        else:
116            return self.qsize() >= self._maxsize
117
118    async def put(self, item):
119        """Put an item into the queue.
120
121        Put an item into the queue. If the queue is full, wait until a free
122        slot is available before adding item.
123        """
124        while self.full():
125            putter = self._loop.create_future()
126            self._putters.append(putter)
127            try:
128                await putter
129            except:
130                putter.cancel()  # Just in case putter is not done yet.
131                try:
132                    # Clean self._putters from canceled putters.
133                    self._putters.remove(putter)
134                except ValueError:
135                    # The putter could be removed from self._putters by a
136                    # previous get_nowait call.
137                    pass
138                if not self.full() and not putter.cancelled():
139                    # We were woken up by get_nowait(), but can't take
140                    # the call.  Wake up the next in line.
141                    self._wakeup_next(self._putters)
142                raise
143        return self.put_nowait(item)
144
145    def put_nowait(self, item):
146        """Put an item into the queue without blocking.
147
148        If no free slot is immediately available, raise QueueFull.
149        """
150        if self.full():
151            raise QueueFull
152        self._put(item)
153        self._unfinished_tasks += 1
154        self._finished.clear()
155        self._wakeup_next(self._getters)
156
157    async def get(self):
158        """Remove and return an item from the queue.
159
160        If queue is empty, wait until an item is available.
161        """
162        while self.empty():
163            getter = self._loop.create_future()
164            self._getters.append(getter)
165            try:
166                await getter
167            except:
168                getter.cancel()  # Just in case getter is not done yet.
169                try:
170                    # Clean self._getters from canceled getters.
171                    self._getters.remove(getter)
172                except ValueError:
173                    # The getter could be removed from self._getters by a
174                    # previous put_nowait call.
175                    pass
176                if not self.empty() and not getter.cancelled():
177                    # We were woken up by put_nowait(), but can't take
178                    # the call.  Wake up the next in line.
179                    self._wakeup_next(self._getters)
180                raise
181        return self.get_nowait()
182
183    def get_nowait(self):
184        """Remove and return an item from the queue.
185
186        Return an item if one is immediately available, else raise QueueEmpty.
187        """
188        if self.empty():
189            raise QueueEmpty
190        item = self._get()
191        self._wakeup_next(self._putters)
192        return item
193
194    def task_done(self):
195        """Indicate that a formerly enqueued task is complete.
196
197        Used by queue consumers. For each get() used to fetch a task,
198        a subsequent call to task_done() tells the queue that the processing
199        on the task is complete.
200
201        If a join() is currently blocking, it will resume when all items have
202        been processed (meaning that a task_done() call was received for every
203        item that had been put() into the queue).
204
205        Raises ValueError if called more times than there were items placed in
206        the queue.
207        """
208        if self._unfinished_tasks <= 0:
209            raise ValueError('task_done() called too many times')
210        self._unfinished_tasks -= 1
211        if self._unfinished_tasks == 0:
212            self._finished.set()
213
214    async def join(self):
215        """Block until all items in the queue have been gotten and processed.
216
217        The count of unfinished tasks goes up whenever an item is added to the
218        queue. The count goes down whenever a consumer calls task_done() to
219        indicate that the item was retrieved and all work on it is complete.
220        When the count of unfinished tasks drops to zero, join() unblocks.
221        """
222        if self._unfinished_tasks > 0:
223            await self._finished.wait()
224
225
226class PriorityQueue(Queue):
227    """A subclass of Queue; retrieves entries in priority order (lowest first).
228
229    Entries are typically tuples of the form: (priority number, data).
230    """
231
232    def _init(self, maxsize):
233        self._queue = []
234
235    def _put(self, item, heappush=heapq.heappush):
236        heappush(self._queue, item)
237
238    def _get(self, heappop=heapq.heappop):
239        return heappop(self._queue)
240
241
242class LifoQueue(Queue):
243    """A subclass of Queue that retrieves most recently added entries first."""
244
245    def _init(self, maxsize):
246        self._queue = []
247
248    def _put(self, item):
249        self._queue.append(item)
250
251    def _get(self):
252        return self._queue.pop()
253