• Home
  • History
  • Annotate
Name Date Size #Lines LOC

..03-May-2022-

.github/H16-Jul-2019-6143

docs/H16-Jul-2019-2,6492,081

examples/H16-Jul-2019-2,1461,600

pika/H16-Jul-2019-17,71513,259

testdata/H16-Jul-2019-203185

tests/H16-Jul-2019-15,22110,811

utils/H16-Jul-2019-426372

.checkignoreH A D16-Jul-201946 65

.codeclimate.ymlH A D16-Jul-2019109 98

.coveragercH A D16-Jul-201926 32

.gitignoreH A D16-Jul-2019211 2221

.travis.ymlH A D16-Jul-20194 KiB10998

CHANGELOG.rstH A D16-Jul-201947.1 KiB807688

CONTRIBUTING.mdH A D16-Jul-20191.7 KiB6940

LICENSEH A D16-Jul-20191.5 KiB2622

MANIFEST.inH A D16-Jul-201934 22

README.rstH A D16-Jul-201910.9 KiB281218

appveyor.ymlH A D16-Jul-20193.5 KiB10876

pylintrcH A D16-Jul-201912.1 KiB400275

setup.cfgH A D16-Jul-2019277 1715

setup.pyH A D03-May-20222.2 KiB5550

README.rst

1Pika
2====
3Pika is a RabbitMQ (AMQP 0-9-1) client library for Python.
4
5|Version| |Python versions| |Status| |Coverage| |License| |Docs|
6
7Introduction
8------------
9Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including
10RabbitMQ's extensions.
11
12- Python 2.7 and 3.4+ are supported.
13- Since threads aren't appropriate to every situation, it doesn't require
14  threads. Pika core takes care not to forbid them, either. The same goes for
15  greenlets, callbacks, continuations, and generators. An instance of Pika's
16  built-in connection adapters isn't thread-safe, however.
17- People may be using direct sockets, plain old ``select()``, or any of the
18  wide variety of ways of getting network events to and from a Python
19  application. Pika tries to stay compatible with all of these, and to make
20  adapting it to a new environment as simple as possible.
21
22Documentation
23-------------
24Pika's documentation can be found at https://pika.readthedocs.io.
25
26Example
27-------
28Here is the most simple example of use, sending a message with the
29``pika.BlockingConnection`` adapter:
30
31.. code :: python
32
33    import pika
34
35    connection = pika.BlockingConnection()
36    channel = connection.channel()
37    channel.basic_publish(exchange='test', routing_key='test',
38                          body=b'Test message.')
39    connection.close()
40
41And an example of writing a blocking consumer:
42
43.. code :: python
44
45    import pika
46
47    connection = pika.BlockingConnection()
48    channel = connection.channel()
49
50    for method_frame, properties, body in channel.consume('test'):
51        # Display the message parts and acknowledge the message
52        print(method_frame, properties, body)
53        channel.basic_ack(method_frame.delivery_tag)
54
55        # Escape out of the loop after 10 messages
56        if method_frame.delivery_tag == 10:
57            break
58
59    # Cancel the consumer and return any pending messages
60    requeued_messages = channel.cancel()
61    print('Requeued %i messages' % requeued_messages)
62    connection.close()
63
64Pika provides the following adapters
65------------------------------------
66
67- ``pika.adapters.asyncio_connection.AsyncioConnection`` - asynchronous adapter
68  for Python 3 `AsyncIO <https://docs.python.org/3/library/asyncio.html>`_'s
69  I/O loop.
70- ``pika.BlockingConnection`` - synchronous adapter on top of library for
71  simple usage.
72- ``pika.SelectConnection`` - asynchronous adapter without third-party
73  dependencies.
74- ``pika.adapters.tornado_connection.TornadoConnection`` - asynchronous adapter
75  for use with `Tornado <http://tornadoweb.org>`_'s I/O loop.
76- ``pika.adapters.twisted_connection.TwistedProtocolConnection`` - asynchronous
77  adapter for use with `Twisted <http://twistedmatrix.com>`_'s I/O loop.
78
79Multiple connection parameters
80------------------------------
81You can also pass multiple ``pika.ConnectionParameters`` instances for
82fault-tolerance as in the code snippet below (host names are just examples, of
83course). To enable retries, set ``connection_attempts`` and ``retry_delay`` as
84needed in the last ``pika.ConnectionParameters`` element of the sequence.
85Retries occur after connection attempts using all of the given connection
86parameters fail.
87
88.. code :: python
89
90    import pika
91
92    parameters = (
93        pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),
94        pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com',
95                                  connection_attempts=5, retry_delay=1))
96    connection = pika.BlockingConnection(parameters)
97
98With non-blocking adapters, such as ``pika.SelectConnection`` and
99``pika.adapters.asyncio_connection.AsyncioConnection``, you can request a
100connection using multiple connection parameter instances via the connection
101adapter's ``create_connection()`` class method.
102
103Requesting message acknowledgements from another thread
104-------------------------------------------------------
105The single-threaded usage constraint of an individual Pika connection adapter
106instance may result in a dropped AMQP/stream connection due to AMQP heartbeat
107timeout in consumers that take a long time to process an incoming message. A
108common solution is to delegate processing of the incoming messages to another
109thread, while the connection adapter's thread continues to service its I/O
110loop's message pump, permitting AMQP heartbeats and other I/O to be serviced in
111a timely fashion.
112
113Messages processed in another thread may not be acknowledged directly from that
114thread, since all accesses to the connection adapter instance must be from a
115single thread, which is the thread running the adapter's I/O loop. This is
116accomplished by requesting a callback to be executed in the adapter's
117I/O loop thread. For example, the callback function's implementation might look
118like this:
119
120.. code :: python
121
122    def ack_message(channel, delivery_tag):
123        """Note that `channel` must be the same Pika channel instance via which
124        the message being acknowledged was retrieved (AMQP protocol constraint).
125        """
126        if channel.is_open:
127            channel.basic_ack(delivery_tag)
128        else:
129            # Channel is already closed, so we can't acknowledge this message;
130            # log and/or do something that makes sense for your app in this case.
131            pass
132
133The code running in the other thread may request the ``ack_message()`` function
134to be executed in the connection adapter's I/O loop thread using an
135adapter-specific mechanism:
136
137- ``pika.BlockingConnection`` abstracts its I/O loop from the application and
138  thus exposes ``pika.BlockingConnection.add_callback_threadsafe()``. Refer to
139  this method's docstring for additional information. For example:
140
141  .. code :: python
142
143      connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
144
145- When using a non-blocking connection adapter, such as
146  ``pika.adapters.asyncio_connection.AsyncioConnection`` or
147  ``pika.SelectConnection``, you use the underlying asynchronous framework's
148  native API for requesting an I/O loop-bound callback from another thread. For
149  example, ``pika.SelectConnection``'s I/O loop provides
150  ``add_callback_threadsafe()``,
151  ``pika.adapters.tornado_connection.TornadoConnection``'s I/O loop has
152  ``add_callback()``, while
153  ``pika.adapters.asyncio_connection.AsyncioConnection``'s I/O loop exposes
154  ``call_soon_threadsafe()``.
155
156This threadsafe callback request mechanism may also be used to delegate
157publishing of messages, etc., from a background thread to the connection
158adapter's thread.
159
160Connection recovery
161-------------------
162
163Some RabbitMQ clients (Bunny, Java, .NET, Objective-C, Swift) provide a way to
164automatically recover connection, its channels and topology (e.g. queues,
165bindings and consumers) after a network failure. Others require connection
166recovery to be performed by the application code and strive to make it a
167straightforward process. Pika falls into the second category.
168
169Pika supports multiple connection adapters. They take different approaches to
170connection recovery.
171
172For ``pika.BlockingConnection`` adapter exception handling can be used to check
173for connection errors. Here is a very basic example:
174
175.. code :: python
176
177    import pika
178
179    while True:
180        try:
181            connection = pika.BlockingConnection()
182            channel = connection.channel()
183            channel.basic_consume('test', on_message_callback)
184            channel.start_consuming()
185        # Don't recover if connection was closed by broker
186        except pika.exceptions.ConnectionClosedByBroker:
187            break
188        # Don't recover on channel errors
189        except pika.exceptions.AMQPChannelError:
190            break
191        # Recover on all other connection errors
192        except pika.exceptions.AMQPConnectionError:
193            continue
194
195This example can be found in `examples/consume_recover.py`.
196
197Generic operation retry libraries such as
198`retry <https://github.com/invl/retry>`_ can be used. Decorators make it
199possible to configure some additional recovery behaviours, like delays between
200retries and limiting the number of retries:
201
202.. code :: python
203
204    from retry import retry
205
206
207    @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
208    def consume():
209        connection = pika.BlockingConnection()
210        channel = connection.channel()
211        channel.basic_consume('test', on_message_callback)
212
213        try:
214            channel.start_consuming()
215        # Don't recover connections closed by server
216        except pika.exceptions.ConnectionClosedByBroker:
217            pass
218
219
220    consume()
221
222This example can be found in `examples/consume_recover_retry.py`.
223
224For asynchronous adapters, use ``on_close_callback`` to react to connection
225failure events. This callback can be used to clean up and recover the
226connection.
227
228An example of recovery using ``on_close_callback`` can be found in
229`examples/asynchronous_consumer_example.py`.
230
231Contributing
232------------
233To contribute to Pika, please make sure that any new features or changes to
234existing functionality **include test coverage**.
235
236*Pull requests that add or change code without adequate test coverage will be
237rejected.*
238
239Additionally, please format your code using
240`Yapf <http://pypi.python.org/pypi/yapf>`_ with ``google`` style prior to
241issuing your pull request. *Note: only format those lines that you have changed
242in your pull request. If you format an entire file and change code outside of
243the scope of your PR, it will likely be rejected.*
244
245Extending to support additional I/O frameworks
246----------------------------------------------
247New non-blocking adapters may be implemented in either of the following ways:
248
249- By subclassing ``pika.BaseConnection``, implementing its abstract method and
250  passing its constructor an implementation of
251  ``pika.adapters.utils.nbio_interface.AbstractIOServices``.
252  ``pika.BaseConnection`` implements ``pika.connection.Connection``'s abstract
253  methods, including internally-initiated connection logic. For examples, refer
254  to the implementations of
255  ``pika.adapters.asyncio_connection.AsyncioConnection`` and
256  ``pika.adapters.tornado_connection.TornadoConnection``.
257- By subclassing ``pika.connection.Connection`` and implementing its abstract
258  methods. This approach facilitates implementation of custom
259  connection-establishment and transport mechanisms. For an example, refer to
260  the implementation of
261  ``pika.adapters.twisted_connection.TwistedProtocolConnection``.
262
263.. |Version| image:: https://img.shields.io/pypi/v/pika.svg?
264   :target: http://badge.fury.io/py/pika
265
266.. |Python versions| image:: https://img.shields.io/pypi/pyversions/pika.svg
267    :target: https://pypi.python.org/pypi/pika
268
269.. |Status| image:: https://img.shields.io/travis/pika/pika.svg?
270   :target: https://travis-ci.org/pika/pika
271
272.. |Coverage| image:: https://img.shields.io/codecov/c/github/pika/pika.svg?
273   :target: https://codecov.io/github/pika/pika?branch=master
274
275.. |License| image:: https://img.shields.io/pypi/l/pika.svg?
276   :target: https://pika.readthedocs.io
277
278.. |Docs| image:: https://readthedocs.org/projects/pika/badge/?version=stable
279   :target: https://pika.readthedocs.io
280   :alt: Documentation Status
281