1.. currentmodule:: asyncio
2
3.. _asyncio-queues:
4
5======
6Queues
7======
8
9**Source code:** :source:`Lib/asyncio/queues.py`
10
11------------------------------------------------
12
13asyncio queues are designed to be similar to classes of the
14:mod:`queue` module.  Although asyncio queues are not thread-safe,
15they are designed to be used specifically in async/await code.
16
17Note that methods of asyncio queues don't have a *timeout* parameter;
18use :func:`asyncio.wait_for` function to do queue operations with a
19timeout.
20
21See also the `Examples`_ section below.
22
23Queue
24=====
25
26.. class:: Queue(maxsize=0, \*, loop=None)
27
28   A first in, first out (FIFO) queue.
29
30   If *maxsize* is less than or equal to zero, the queue size is
31   infinite.  If it is an integer greater than ``0``, then
32   ``await put()`` blocks when the queue reaches *maxsize*
33   until an item is removed by :meth:`get`.
34
35   Unlike the standard library threading :mod:`queue`, the size of
36   the queue is always known and can be returned by calling the
37   :meth:`qsize` method.
38
39   .. deprecated-removed:: 3.8 3.10
40      The *loop* parameter.
41
42
43   This class is :ref:`not thread safe <asyncio-multithreading>`.
44
45   .. attribute:: maxsize
46
47      Number of items allowed in the queue.
48
49   .. method:: empty()
50
51      Return ``True`` if the queue is empty, ``False`` otherwise.
52
53   .. method:: full()
54
55      Return ``True`` if there are :attr:`maxsize` items in the queue.
56
57      If the queue was initialized with ``maxsize=0`` (the default),
58      then :meth:`full()` never returns ``True``.
59
60   .. coroutinemethod:: get()
61
62      Remove and return an item from the queue. If queue is empty,
63      wait until an item is available.
64
65   .. method:: get_nowait()
66
67      Return an item if one is immediately available, else raise
68      :exc:`QueueEmpty`.
69
70   .. coroutinemethod:: join()
71
72      Block until all items in the queue have been received and processed.
73
74      The count of unfinished tasks goes up whenever an item is added
75      to the queue. The count goes down whenever a consumer coroutine calls
76      :meth:`task_done` to indicate that the item was retrieved and all
77      work on it is complete.  When the count of unfinished tasks drops
78      to zero, :meth:`join` unblocks.
79
80   .. coroutinemethod:: put(item)
81
82      Put an item into the queue. If the queue is full, wait until a
83      free slot is available before adding the item.
84
85   .. method:: put_nowait(item)
86
87      Put an item into the queue without blocking.
88
89      If no free slot is immediately available, raise :exc:`QueueFull`.
90
91   .. method:: qsize()
92
93      Return the number of items in the queue.
94
95   .. method:: task_done()
96
97      Indicate that a formerly enqueued task is complete.
98
99      Used by queue consumers. For each :meth:`~Queue.get` used to
100      fetch a task, a subsequent call to :meth:`task_done` tells the
101      queue that the processing on the task is complete.
102
103      If a :meth:`join` is currently blocking, it will resume when all
104      items have been processed (meaning that a :meth:`task_done`
105      call was received for every item that had been :meth:`~Queue.put`
106      into the queue).
107
108      Raises :exc:`ValueError` if called more times than there were
109      items placed in the queue.
110
111
112Priority Queue
113==============
114
115.. class:: PriorityQueue
116
117   A variant of :class:`Queue`; retrieves entries in priority order
118   (lowest first).
119
120   Entries are typically tuples of the form
121   ``(priority_number, data)``.
122
123
124LIFO Queue
125==========
126
127.. class:: LifoQueue
128
129   A variant of :class:`Queue` that retrieves most recently added
130   entries first (last in, first out).
131
132
133Exceptions
134==========
135
136.. exception:: QueueEmpty
137
138   This exception is raised when the :meth:`~Queue.get_nowait` method
139   is called on an empty queue.
140
141
142.. exception:: QueueFull
143
144   Exception raised when the :meth:`~Queue.put_nowait` method is called
145   on a queue that has reached its *maxsize*.
146
147
148Examples
149========
150
151.. _asyncio_example_queue_dist:
152
153Queues can be used to distribute workload between several
154concurrent tasks::
155
156   import asyncio
157   import random
158   import time
159
160
161   async def worker(name, queue):
162       while True:
163           # Get a "work item" out of the queue.
164           sleep_for = await queue.get()
165
166           # Sleep for the "sleep_for" seconds.
167           await asyncio.sleep(sleep_for)
168
169           # Notify the queue that the "work item" has been processed.
170           queue.task_done()
171
172           print(f'{name} has slept for {sleep_for:.2f} seconds')
173
174
175   async def main():
176       # Create a queue that we will use to store our "workload".
177       queue = asyncio.Queue()
178
179       # Generate random timings and put them into the queue.
180       total_sleep_time = 0
181       for _ in range(20):
182           sleep_for = random.uniform(0.05, 1.0)
183           total_sleep_time += sleep_for
184           queue.put_nowait(sleep_for)
185
186       # Create three worker tasks to process the queue concurrently.
187       tasks = []
188       for i in range(3):
189           task = asyncio.create_task(worker(f'worker-{i}', queue))
190           tasks.append(task)
191
192       # Wait until the queue is fully processed.
193       started_at = time.monotonic()
194       await queue.join()
195       total_slept_for = time.monotonic() - started_at
196
197       # Cancel our worker tasks.
198       for task in tasks:
199           task.cancel()
200       # Wait until all worker tasks are cancelled.
201       await asyncio.gather(*tasks, return_exceptions=True)
202
203       print('====')
204       print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
205       print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
206
207
208   asyncio.run(main())
209