1Confluent's Python Client for Apache Kafka<sup>TM</sup>
2=======================================================
3
4**confluent-kafka-python** is Confluent's Python client for [Apache Kafka](http://kafka.apache.org/) and the
5[Confluent Platform](https://www.confluent.io/product/compare/).
6
7Features:
8
9- **High performance** - confluent-kafka-python is a lightweight wrapper around
10[librdkafka](https://github.com/edenhill/librdkafka), a finely tuned C
11client.
12
13- **Reliability** - There are a lot of details to get right when writing an Apache Kafka
14client. We get them right in one place (librdkafka) and leverage this work
15across all of our clients (also [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go)
16and [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet)).
17
18- **Supported** - Commercial support is offered by
19[Confluent](https://confluent.io/).
20
21- **Future proof** - Confluent, founded by the
22creators of Kafka, is building a [streaming platform](https://www.confluent.io/product/compare/)
23with Apache Kafka at its core. It's high priority for us that client features keep
24pace with core Apache Kafka and components of the [Confluent Platform](https://www.confluent.io/product/compare/).
25
26The Python bindings provides a high-level Producer and Consumer with support
27for the balanced consumer groups of Apache Kafka &gt;= 0.9.
28
29See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info.
30
31**License**: [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0)
32
33
34Usage
35=====
36
37**Producer:**
38
39```python
40from confluent_kafka import Producer
41
42
43p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
44
45def delivery_report(err, msg):
46    """ Called once for each message produced to indicate delivery result.
47        Triggered by poll() or flush(). """
48    if err is not None:
49        print('Message delivery failed: {}'.format(err))
50    else:
51        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
52
53for data in some_data_source:
54    # Trigger any available delivery report callbacks from previous produce() calls
55    p.poll(0)
56
57    # Asynchronously produce a message, the delivery report callback
58    # will be triggered from poll() above, or flush() below, when the message has
59    # been successfully delivered or failed permanently.
60    p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
61
62# Wait for any outstanding messages to be delivered and delivery report
63# callbacks to be triggered.
64p.flush()
65```
66
67
68**High-level Consumer:**
69
70```python
71from confluent_kafka import Consumer, KafkaError
72
73
74c = Consumer({
75    'bootstrap.servers': 'mybroker',
76    'group.id': 'mygroup',
77    'auto.offset.reset': 'earliest'
78})
79
80c.subscribe(['mytopic'])
81
82while True:
83    msg = c.poll(1.0)
84
85    if msg is None:
86        continue
87    if msg.error():
88        print("Consumer error: {}".format(msg.error()))
89        continue
90
91    print('Received message: {}'.format(msg.value().decode('utf-8')))
92
93c.close()
94```
95
96**AvroProducer**
97
98```python
99from confluent_kafka import avro
100from confluent_kafka.avro import AvroProducer
101
102
103value_schema_str = """
104{
105   "namespace": "my.test",
106   "name": "value",
107   "type": "record",
108   "fields" : [
109     {
110       "name" : "name",
111       "type" : "string"
112     }
113   ]
114}
115"""
116
117key_schema_str = """
118{
119   "namespace": "my.test",
120   "name": "key",
121   "type": "record",
122   "fields" : [
123     {
124       "name" : "name",
125       "type" : "string"
126     }
127   ]
128}
129"""
130
131value_schema = avro.loads(value_schema_str)
132key_schema = avro.loads(key_schema_str)
133value = {"name": "Value"}
134key = {"name": "Key"}
135
136avroProducer = AvroProducer({
137    'bootstrap.servers': 'mybroker,mybroker2',
138    'schema.registry.url': 'http://schema_registry_host:port'
139    }, default_key_schema=key_schema, default_value_schema=value_schema)
140
141avroProducer.produce(topic='my_topic', value=value, key=key)
142avroProducer.flush()
143```
144
145**AvroConsumer**
146
147```python
148from confluent_kafka import KafkaError
149from confluent_kafka.avro import AvroConsumer
150from confluent_kafka.avro.serializer import SerializerError
151
152
153c = AvroConsumer({
154    'bootstrap.servers': 'mybroker,mybroker2',
155    'group.id': 'groupid',
156    'schema.registry.url': 'http://127.0.0.1:8081'})
157
158c.subscribe(['my_topic'])
159
160while True:
161    try:
162        msg = c.poll(10)
163
164    except SerializerError as e:
165        print("Message deserialization failed for {}: {}".format(msg, e))
166        break
167
168    if msg is None:
169        continue
170
171    if msg.error():
172        print("AvroConsumer error: {}".format(msg.error()))
173        continue
174
175    print(msg.value())
176
177c.close()
178```
179
180See the [examples](examples) directory for more examples, including [how to configure](examples/confluent_cloud.py) the python client for use with
181[Confluent Cloud](https://www.confluent.io/confluent-cloud/).
182
183
184Install
185=======
186
187**NOTE:** The pre-built Linux wheels do NOT contain SASL Kerberos/GSSAPI support.
188          If you need SASL Kerberos/GSSAPI support you must install librdkafka and
189          its dependencies using the repositories below and then build
190          confluent-kafka  using the command in the "Install from
191          source from PyPi" section below.
192
193**Install self-contained binary wheels for OSX and Linux from PyPi:**
194
195    $ pip install confluent-kafka
196
197**Install AvroProducer and AvroConsumer:**
198
199    $ pip install "confluent-kafka[avro]"
200
201**Install from source from PyPi** *(requires librdkafka + dependencies to be installed separately)*:
202
203    $ pip install --no-binary :all: confluent-kafka
204
205
206For source install, see *Prerequisites* below.
207
208
209Broker Compatibility
210====================
211The Python client (as well as the underlying C library librdkafka) supports
212all broker versions &gt;= 0.8.
213But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it
214is not safe for a client to assume what protocol version is actually supported
215by the broker, thus you will need to hint the Python client what protocol
216version it may use. This is done through two configuration settings:
217
218 * `broker.version.fallback=YOUR_BROKER_VERSION` (default 0.9.0.1)
219 * `api.version.request=true|false` (default true)
220
221When using a Kafka 0.10 broker or later you don't need to do anything
222(`api.version.request=true` is the default).
223If you use Kafka broker 0.9 or 0.8 you must set
224`api.version.request=false` and set
225`broker.version.fallback` to your broker version,
226e.g `broker.version.fallback=0.9.0.1`.
227
228More info here:
229https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
230
231
232Prerequisites
233=============
234
235 * Python >= 2.7 or Python 3.x
236 * [librdkafka](https://github.com/edenhill/librdkafka) >= 0.11.5 (latest release is embedded in wheels)
237
238librdkafka is embedded in the macosx manylinux wheels, for other platforms, SASL Kerberos/GSSAPI support or
239when a specific version of librdkafka is desired, following these guidelines:
240
241  * For **Debian/Ubuntu** based systems, add this APT repo and then do `sudo apt-get install librdkafka-dev python-dev`:
242http://docs.confluent.io/current/installation.html#installation-apt
243
244 * For **RedHat** and **RPM**-based distros, add this YUM repo and then do `sudo yum install librdkafka-devel python-devel`:
245http://docs.confluent.io/current/installation.html#rpm-packages-via-yum
246
247 * On **OSX**, use **homebrew** and do `brew install librdkafka`
248
249
250Build
251=====
252
253    $ python setup.py build
254
255If librdkafka is installed in a non-standard location provide the include and library directories with:
256
257    $ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python setup.py ...
258
259
260Tests
261=====
262
263
264**Run unit-tests:**
265
266In order to run full test suite, simply execute:
267
268    $ tox -r
269
270**NOTE**: Requires `tox` (please install with `pip install tox`), several supported versions of Python on your path, and `librdkafka` [installed](tools/bootstrap-librdkafka.sh) into `tmp-build`.
271
272
273**Integration tests:**
274
275See [tests/README.md](tests/README.md) for instructions on how to run integration tests.
276
277
278
279Generate Documentation
280======================
281Install sphinx and sphinx_rtd_theme packages:
282
283    $ pip install sphinx sphinx_rtd_theme
284
285Build HTML docs:
286
287    $ make docs
288
289or:
290
291    $ python setup.py build_sphinx
292
293Documentation will be generated in `docs/_build/`.
294