1.. _guide-consumers:
2
3===========
4 Consumers
5===========
6
7.. _consumer-basics:
8
9Basics
10======
11
12The :class:`~kombu.messaging.Consumer` takes a connection (or channel) and a list of queues to
13consume from. Several consumers can be mixed to consume from different
14channels, as they all bind to the same connection, and ``drain_events`` will
15drain events from all channels on that connection.
16
17.. note::
18
19    Kombu since 3.0 will only accept json/binary or text messages by default,
20    to allow deserialization of other formats you have to specify them
21    in the ``accept`` argument (in addition to setting the right content type for your messages):
22
23    .. code-block:: python
24
25        >>> Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])
26
27You can create a consumer using a Connection. Consumer is consuming from single queue with name `'queue'`:
28
29.. code-block:: python
30
31    >>> queue = Queue('queue', routing_key='queue')
32    >>> consumer = connection.Consumer(queue)
33
34You can also instantiate Consumer directly, it takes a channel or a connection as an argument. This consumer also
35consumes from single queue with name `'queue'`:
36
37.. code-block:: python
38
39    >>> queue = Queue('queue', routing_key='queue')
40    >>> with Connection('amqp://') as conn:
41    ...     with conn.channel() as channel:
42    ...         consumer = Consumer(channel, queue)
43
44Consumer needs to specify handler of received data. This handler specified in form of callback. Callback function is called
45by kombu library every time a new message is received. Callback is called with two parameters ``body`` containing deserialized
46data sent by producer and :class:`~kombu.message.Message` instance ``message``. User is also responsible for acknowledging of message when manual
47acknowledge is set.
48
49.. code-block:: python
50
51    >>> def callback(body, message):
52    ... print(body)
53    ... message.ack()
54
55    >>> consumer.register_callback(callback)
56
57Draining events from a single consumer. Method ``drain_events`` by default blocks indefinitely. This example sets timeout to 1 second:
58
59.. code-block:: python
60
61    >>> with consumer:
62    ...     connection.drain_events(timeout=1)
63
64Draining events from several consumers. Each consumer has its own list of queues. Each consumer accepts `'json'` format of data:
65
66.. code-block:: python
67
68    >>> from kombu.utils.compat import nested
69
70    >>> queues1 = [Queue('queue11', routing_key='queue12')]
71    >>> queues2 = [Queue('queue21', routing_key='queue22')]
72    >>> with connection.channel(), connection.channel() as (channel1, channel2):
73    ...     with nested(Consumer(channel1, queues1, accept=['json']),
74    ...                 Consumer(channel2, queues2, accept=['json'])):
75    ...         connection.drain_events(timeout=1)
76
77The full example will look as follows:
78
79.. code-block:: python
80
81    from kombu import Connection, Consumer, Queue
82
83    def callback(body, message):
84        print('RECEIVED MESSAGE: {0!r}'.format(body))
85        message.ack()
86
87    queue1 = Queue('queue1', routing_key='queue1')
88    queue2 = Queue('queue2', routing_key='queue2')
89
90    with Connection('amqp://') as conn:
91        with conn.channel() as channel:
92            consumer = Consumer(conn, [queue1, queue2], accept=['json'])
93            consumer.register_callback(callback)
94            with consumer:
95                conn.drain_events(timeout=1)
96
97Consumer mixin classes
98======================
99
100Kombu provides predefined mixin classes in module :py:mod:`~kombu.mixins`. It contains two classes:
101:class:`~kombu.mixins.ConsumerMixin` for creating consumers and :class:`~kombu.mixins.ConsumerProducerMixin`
102for creating consumers supporting also publishing messages. Consumers can be created just by subclassing
103mixin class and overriding some of the methods:
104
105.. code-block:: python
106
107    from kombu.mixins import ConsumerMixin
108
109    class C(ConsumerMixin):
110
111        def __init__(self, connection):
112            self.connection = connection
113
114        def get_consumers(self, Consumer, channel):
115            return [
116                Consumer(queues, callbacks=[self.on_message], accept=['json']),
117            ]
118
119        def on_message(self, body, message):
120            print('RECEIVED MESSAGE: {0!r}'.format(body))
121            message.ack()
122
123    C(connection).run()
124
125
126and with multiple channels again:
127
128.. code-block:: python
129
130    from kombu import Consumer
131    from kombu.mixins import ConsumerMixin
132
133    class C(ConsumerMixin):
134        channel2 = None
135
136        def __init__(self, connection):
137            self.connection = connection
138
139        def get_consumers(self, _, default_channel):
140            self.channel2 = default_channel.connection.channel()
141            return [Consumer(default_channel, queues1,
142                             callbacks=[self.on_message],
143                             accept=['json']),
144                    Consumer(self.channel2, queues2,
145                             callbacks=[self.on_special_message],
146                             accept=['json'])]
147
148        def on_consumer_end(self, connection, default_channel):
149            if self.channel2:
150                self.channel2.close()
151
152    C(connection).run()
153
154
155The main use of :class:`~kombu.mixins.ConsumerProducerMixin` is to create consumers
156that need to also publish messages on a separate connection (e.g. sending rpc
157replies, streaming results):
158
159.. code-block:: python
160
161    from kombu import Producer, Queue
162    from kombu.mixins import ConsumerProducerMixin
163
164    rpc_queue = Queue('rpc_queue')
165
166    class Worker(ConsumerProducerMixin):
167
168        def __init__(self, connection):
169            self.connection = connection
170
171        def get_consumers(self, Consumer, channel):
172            return [Consumer(
173                queues=[rpc_queue],
174                on_message=self.on_request,
175                accept={'application/json'},
176                prefetch_count=1,
177            )]
178
179        def on_request(self, message):
180            n = message.payload['n']
181            print(' [.] fib({0})'.format(n))
182            result = fib(n)
183
184            self.producer.publish(
185                {'result': result},
186                exchange='', routing_key=message.properties['reply_to'],
187                correlation_id=message.properties['correlation_id'],
188                serializer='json',
189                retry=True,
190            )
191            message.ack()
192
193.. seealso::
194
195    :file:`examples/rpc-tut6/` in the Github repository.
196
197
198Advanced Topics
199===============
200
201RabbitMQ
202--------
203
204Consumer Priorities
205~~~~~~~~~~~~~~~~~~~
206
207RabbitMQ defines a consumer priority extension to the amqp protocol,
208that can be enabled by setting the ``x-priority`` argument to
209``basic.consume``.
210
211In kombu you can specify this argument on the :class:`~kombu.Queue`, like
212this:
213
214.. code-block:: python
215
216    queue = Queue('name', Exchange('exchange_name', type='direct'),
217                  consumer_arguments={'x-priority': 10})
218
219Read more about consumer priorities here:
220https://www.rabbitmq.com/consumer-priority.html
221
222
223Reference
224=========
225
226.. autoclass:: kombu.Consumer
227    :noindex:
228    :members:
229