1.. _guide-consumers: 2 3=========== 4 Consumers 5=========== 6 7.. _consumer-basics: 8 9Basics 10====== 11 12The :class:`~kombu.messaging.Consumer` takes a connection (or channel) and a list of queues to 13consume from. Several consumers can be mixed to consume from different 14channels, as they all bind to the same connection, and ``drain_events`` will 15drain events from all channels on that connection. 16 17.. note:: 18 19 Kombu since 3.0 will only accept json/binary or text messages by default, 20 to allow deserialization of other formats you have to specify them 21 in the ``accept`` argument (in addition to setting the right content type for your messages): 22 23 .. code-block:: python 24 25 >>> Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml']) 26 27You can create a consumer using a Connection. Consumer is consuming from single queue with name `'queue'`: 28 29.. code-block:: python 30 31 >>> queue = Queue('queue', routing_key='queue') 32 >>> consumer = connection.Consumer(queue) 33 34You can also instantiate Consumer directly, it takes a channel or a connection as an argument. This consumer also 35consumes from single queue with name `'queue'`: 36 37.. code-block:: python 38 39 >>> queue = Queue('queue', routing_key='queue') 40 >>> with Connection('amqp://') as conn: 41 ... with conn.channel() as channel: 42 ... consumer = Consumer(channel, queue) 43 44Consumer needs to specify handler of received data. This handler specified in form of callback. Callback function is called 45by kombu library every time a new message is received. Callback is called with two parameters ``body`` containing deserialized 46data sent by producer and :class:`~kombu.message.Message` instance ``message``. User is also responsible for acknowledging of message when manual 47acknowledge is set. 48 49.. code-block:: python 50 51 >>> def callback(body, message): 52 ... print(body) 53 ... message.ack() 54 55 >>> consumer.register_callback(callback) 56 57Draining events from a single consumer. Method ``drain_events`` by default blocks indefinitely. This example sets timeout to 1 second: 58 59.. code-block:: python 60 61 >>> with consumer: 62 ... connection.drain_events(timeout=1) 63 64Draining events from several consumers. Each consumer has its own list of queues. Each consumer accepts `'json'` format of data: 65 66.. code-block:: python 67 68 >>> from kombu.utils.compat import nested 69 70 >>> queues1 = [Queue('queue11', routing_key='queue12')] 71 >>> queues2 = [Queue('queue21', routing_key='queue22')] 72 >>> with connection.channel(), connection.channel() as (channel1, channel2): 73 ... with nested(Consumer(channel1, queues1, accept=['json']), 74 ... Consumer(channel2, queues2, accept=['json'])): 75 ... connection.drain_events(timeout=1) 76 77The full example will look as follows: 78 79.. code-block:: python 80 81 from kombu import Connection, Consumer, Queue 82 83 def callback(body, message): 84 print('RECEIVED MESSAGE: {0!r}'.format(body)) 85 message.ack() 86 87 queue1 = Queue('queue1', routing_key='queue1') 88 queue2 = Queue('queue2', routing_key='queue2') 89 90 with Connection('amqp://') as conn: 91 with conn.channel() as channel: 92 consumer = Consumer(conn, [queue1, queue2], accept=['json']) 93 consumer.register_callback(callback) 94 with consumer: 95 conn.drain_events(timeout=1) 96 97Consumer mixin classes 98====================== 99 100Kombu provides predefined mixin classes in module :py:mod:`~kombu.mixins`. It contains two classes: 101:class:`~kombu.mixins.ConsumerMixin` for creating consumers and :class:`~kombu.mixins.ConsumerProducerMixin` 102for creating consumers supporting also publishing messages. Consumers can be created just by subclassing 103mixin class and overriding some of the methods: 104 105.. code-block:: python 106 107 from kombu.mixins import ConsumerMixin 108 109 class C(ConsumerMixin): 110 111 def __init__(self, connection): 112 self.connection = connection 113 114 def get_consumers(self, Consumer, channel): 115 return [ 116 Consumer(queues, callbacks=[self.on_message], accept=['json']), 117 ] 118 119 def on_message(self, body, message): 120 print('RECEIVED MESSAGE: {0!r}'.format(body)) 121 message.ack() 122 123 C(connection).run() 124 125 126and with multiple channels again: 127 128.. code-block:: python 129 130 from kombu import Consumer 131 from kombu.mixins import ConsumerMixin 132 133 class C(ConsumerMixin): 134 channel2 = None 135 136 def __init__(self, connection): 137 self.connection = connection 138 139 def get_consumers(self, _, default_channel): 140 self.channel2 = default_channel.connection.channel() 141 return [Consumer(default_channel, queues1, 142 callbacks=[self.on_message], 143 accept=['json']), 144 Consumer(self.channel2, queues2, 145 callbacks=[self.on_special_message], 146 accept=['json'])] 147 148 def on_consumer_end(self, connection, default_channel): 149 if self.channel2: 150 self.channel2.close() 151 152 C(connection).run() 153 154 155The main use of :class:`~kombu.mixins.ConsumerProducerMixin` is to create consumers 156that need to also publish messages on a separate connection (e.g. sending rpc 157replies, streaming results): 158 159.. code-block:: python 160 161 from kombu import Producer, Queue 162 from kombu.mixins import ConsumerProducerMixin 163 164 rpc_queue = Queue('rpc_queue') 165 166 class Worker(ConsumerProducerMixin): 167 168 def __init__(self, connection): 169 self.connection = connection 170 171 def get_consumers(self, Consumer, channel): 172 return [Consumer( 173 queues=[rpc_queue], 174 on_message=self.on_request, 175 accept={'application/json'}, 176 prefetch_count=1, 177 )] 178 179 def on_request(self, message): 180 n = message.payload['n'] 181 print(' [.] fib({0})'.format(n)) 182 result = fib(n) 183 184 self.producer.publish( 185 {'result': result}, 186 exchange='', routing_key=message.properties['reply_to'], 187 correlation_id=message.properties['correlation_id'], 188 serializer='json', 189 retry=True, 190 ) 191 message.ack() 192 193.. seealso:: 194 195 :file:`examples/rpc-tut6/` in the Github repository. 196 197 198Advanced Topics 199=============== 200 201RabbitMQ 202-------- 203 204Consumer Priorities 205~~~~~~~~~~~~~~~~~~~ 206 207RabbitMQ defines a consumer priority extension to the amqp protocol, 208that can be enabled by setting the ``x-priority`` argument to 209``basic.consume``. 210 211In kombu you can specify this argument on the :class:`~kombu.Queue`, like 212this: 213 214.. code-block:: python 215 216 queue = Queue('name', Exchange('exchange_name', type='direct'), 217 consumer_arguments={'x-priority': 10}) 218 219Read more about consumer priorities here: 220https://www.rabbitmq.com/consumer-priority.html 221 222 223Reference 224========= 225 226.. autoclass:: kombu.Consumer 227 :noindex: 228 :members: 229