1Confluent's Python Client for Apache Kafka<sup>TM</sup> 2======================================================= 3 4**confluent-kafka-python** is Confluent's Python client for [Apache Kafka](http://kafka.apache.org/) and the 5[Confluent Platform](https://www.confluent.io/product/compare/). 6 7Features: 8 9- **High performance** - confluent-kafka-python is a lightweight wrapper around 10[librdkafka](https://github.com/edenhill/librdkafka), a finely tuned C 11client. 12 13- **Reliability** - There are a lot of details to get right when writing an Apache Kafka 14client. We get them right in one place (librdkafka) and leverage this work 15across all of our clients (also [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go) 16and [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet)). 17 18- **Supported** - Commercial support is offered by 19[Confluent](https://confluent.io/). 20 21- **Future proof** - Confluent, founded by the 22creators of Kafka, is building a [streaming platform](https://www.confluent.io/product/compare/) 23with Apache Kafka at its core. It's high priority for us that client features keep 24pace with core Apache Kafka and components of the [Confluent Platform](https://www.confluent.io/product/compare/). 25 26The Python bindings provides a high-level Producer and Consumer with support 27for the balanced consumer groups of Apache Kafka >= 0.9. 28 29See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info. 30 31**License**: [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0) 32 33 34Usage 35===== 36 37**Producer:** 38 39```python 40from confluent_kafka import Producer 41 42 43p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'}) 44 45def delivery_report(err, msg): 46 """ Called once for each message produced to indicate delivery result. 47 Triggered by poll() or flush(). """ 48 if err is not None: 49 print('Message delivery failed: {}'.format(err)) 50 else: 51 print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) 52 53for data in some_data_source: 54 # Trigger any available delivery report callbacks from previous produce() calls 55 p.poll(0) 56 57 # Asynchronously produce a message, the delivery report callback 58 # will be triggered from poll() above, or flush() below, when the message has 59 # been successfully delivered or failed permanently. 60 p.produce('mytopic', data.encode('utf-8'), callback=delivery_report) 61 62# Wait for any outstanding messages to be delivered and delivery report 63# callbacks to be triggered. 64p.flush() 65``` 66 67 68**High-level Consumer:** 69 70```python 71from confluent_kafka import Consumer, KafkaError 72 73 74c = Consumer({ 75 'bootstrap.servers': 'mybroker', 76 'group.id': 'mygroup', 77 'auto.offset.reset': 'earliest' 78}) 79 80c.subscribe(['mytopic']) 81 82while True: 83 msg = c.poll(1.0) 84 85 if msg is None: 86 continue 87 if msg.error(): 88 print("Consumer error: {}".format(msg.error())) 89 continue 90 91 print('Received message: {}'.format(msg.value().decode('utf-8'))) 92 93c.close() 94``` 95 96**AvroProducer** 97 98```python 99from confluent_kafka import avro 100from confluent_kafka.avro import AvroProducer 101 102 103value_schema_str = """ 104{ 105 "namespace": "my.test", 106 "name": "value", 107 "type": "record", 108 "fields" : [ 109 { 110 "name" : "name", 111 "type" : "string" 112 } 113 ] 114} 115""" 116 117key_schema_str = """ 118{ 119 "namespace": "my.test", 120 "name": "key", 121 "type": "record", 122 "fields" : [ 123 { 124 "name" : "name", 125 "type" : "string" 126 } 127 ] 128} 129""" 130 131value_schema = avro.loads(value_schema_str) 132key_schema = avro.loads(key_schema_str) 133value = {"name": "Value"} 134key = {"name": "Key"} 135 136avroProducer = AvroProducer({ 137 'bootstrap.servers': 'mybroker,mybroker2', 138 'schema.registry.url': 'http://schema_registry_host:port' 139 }, default_key_schema=key_schema, default_value_schema=value_schema) 140 141avroProducer.produce(topic='my_topic', value=value, key=key) 142avroProducer.flush() 143``` 144 145**AvroConsumer** 146 147```python 148from confluent_kafka import KafkaError 149from confluent_kafka.avro import AvroConsumer 150from confluent_kafka.avro.serializer import SerializerError 151 152 153c = AvroConsumer({ 154 'bootstrap.servers': 'mybroker,mybroker2', 155 'group.id': 'groupid', 156 'schema.registry.url': 'http://127.0.0.1:8081'}) 157 158c.subscribe(['my_topic']) 159 160while True: 161 try: 162 msg = c.poll(10) 163 164 except SerializerError as e: 165 print("Message deserialization failed for {}: {}".format(msg, e)) 166 break 167 168 if msg is None: 169 continue 170 171 if msg.error(): 172 print("AvroConsumer error: {}".format(msg.error())) 173 continue 174 175 print(msg.value()) 176 177c.close() 178``` 179 180See the [examples](examples) directory for more examples, including [how to configure](examples/confluent_cloud.py) the python client for use with 181[Confluent Cloud](https://www.confluent.io/confluent-cloud/). 182 183 184Install 185======= 186 187**NOTE:** The pre-built Linux wheels do NOT contain SASL Kerberos/GSSAPI support. 188 If you need SASL Kerberos/GSSAPI support you must install librdkafka and 189 its dependencies using the repositories below and then build 190 confluent-kafka using the command in the "Install from 191 source from PyPi" section below. 192 193**Install self-contained binary wheels for OSX and Linux from PyPi:** 194 195 $ pip install confluent-kafka 196 197**Install AvroProducer and AvroConsumer:** 198 199 $ pip install "confluent-kafka[avro]" 200 201**Install from source from PyPi** *(requires librdkafka + dependencies to be installed separately)*: 202 203 $ pip install --no-binary :all: confluent-kafka 204 205 206For source install, see *Prerequisites* below. 207 208 209Broker Compatibility 210==================== 211The Python client (as well as the underlying C library librdkafka) supports 212all broker versions >= 0.8. 213But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it 214is not safe for a client to assume what protocol version is actually supported 215by the broker, thus you will need to hint the Python client what protocol 216version it may use. This is done through two configuration settings: 217 218 * `broker.version.fallback=YOUR_BROKER_VERSION` (default 0.9.0.1) 219 * `api.version.request=true|false` (default true) 220 221When using a Kafka 0.10 broker or later you don't need to do anything 222(`api.version.request=true` is the default). 223If you use Kafka broker 0.9 or 0.8 you must set 224`api.version.request=false` and set 225`broker.version.fallback` to your broker version, 226e.g `broker.version.fallback=0.9.0.1`. 227 228More info here: 229https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility 230 231 232Prerequisites 233============= 234 235 * Python >= 2.7 or Python 3.x 236 * [librdkafka](https://github.com/edenhill/librdkafka) >= 0.11.5 (latest release is embedded in wheels) 237 238librdkafka is embedded in the macosx manylinux wheels, for other platforms, SASL Kerberos/GSSAPI support or 239when a specific version of librdkafka is desired, following these guidelines: 240 241 * For **Debian/Ubuntu** based systems, add this APT repo and then do `sudo apt-get install librdkafka-dev python-dev`: 242http://docs.confluent.io/current/installation.html#installation-apt 243 244 * For **RedHat** and **RPM**-based distros, add this YUM repo and then do `sudo yum install librdkafka-devel python-devel`: 245http://docs.confluent.io/current/installation.html#rpm-packages-via-yum 246 247 * On **OSX**, use **homebrew** and do `brew install librdkafka` 248 249 250Build 251===== 252 253 $ python setup.py build 254 255If librdkafka is installed in a non-standard location provide the include and library directories with: 256 257 $ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python setup.py ... 258 259 260Tests 261===== 262 263 264**Run unit-tests:** 265 266In order to run full test suite, simply execute: 267 268 $ tox -r 269 270**NOTE**: Requires `tox` (please install with `pip install tox`), several supported versions of Python on your path, and `librdkafka` [installed](tools/bootstrap-librdkafka.sh) into `tmp-build`. 271 272 273**Integration tests:** 274 275See [tests/README.md](tests/README.md) for instructions on how to run integration tests. 276 277 278 279Generate Documentation 280====================== 281Install sphinx and sphinx_rtd_theme packages: 282 283 $ pip install sphinx sphinx_rtd_theme 284 285Build HTML docs: 286 287 $ make docs 288 289or: 290 291 $ python setup.py build_sphinx 292 293Documentation will be generated in `docs/_build/`. 294