1 /**
2 * Copyright 2016 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "confluent_kafka.h"
18
19 #include <stdarg.h>
20
21
22 /**
23 * @brief KNOWN ISSUES
24 *
25 * - Partitioners will cause a dead-lock with librdkafka, because:
26 * GIL + topic lock in topic_new is different lock order than
27 * topic lock in msg_partitioner + GIL.
28 * This needs to be sorted out in librdkafka, preferably making the
29 * partitioner run without any locks taken.
30 * Until this is fixed the partitioner is ignored and librdkafka's
31 * default will be used.
32 * - KafkaError type .tp_doc allocation is lost on exit.
33 *
34 */
35
36
37 PyObject *KafkaException;
38
39
40 /****************************************************************************
41 *
42 *
43 * KafkaError
44 *
45 *
46 * FIXME: Pre-create simple instances for each error code, only instantiate
47 * a new object if a rich error string is provided.
48 *
49 ****************************************************************************/
50 typedef struct {
51 #ifdef PY3
52 PyException_HEAD
53 #else
54 PyObject_HEAD
55 /* Standard fields of PyBaseExceptionObject which we inherit from. */
56 PyObject *dict;
57 PyObject *args;
58 PyObject *message;
59 #endif
60
61 rd_kafka_resp_err_t code; /* Error code */
62 char *str; /* Human readable representation of error, if one
63 * was provided by librdkafka.
64 * Else falls back on err2str(). */
65 int fatal; /**< Set to true if a fatal error. */
66 } KafkaError;
67
68
69 static void cfl_PyErr_Fatal (rd_kafka_resp_err_t err, const char *reason);
70
KafkaError_code(KafkaError * self,PyObject * ignore)71 static PyObject *KafkaError_code (KafkaError *self, PyObject *ignore) {
72 return cfl_PyInt_FromInt(self->code);
73 }
74
KafkaError_str(KafkaError * self,PyObject * ignore)75 static PyObject *KafkaError_str (KafkaError *self, PyObject *ignore) {
76 if (self->str)
77 return cfl_PyUnistr(_FromString(self->str));
78 else
79 return cfl_PyUnistr(_FromString(rd_kafka_err2str(self->code)));
80 }
81
KafkaError_name(KafkaError * self,PyObject * ignore)82 static PyObject *KafkaError_name (KafkaError *self, PyObject *ignore) {
83 /* FIXME: Pre-create name objects */
84 return cfl_PyUnistr(_FromString(rd_kafka_err2name(self->code)));
85 }
86
KafkaError_fatal(KafkaError * self,PyObject * ignore)87 static PyObject *KafkaError_fatal (KafkaError *self, PyObject *ignore) {
88 PyObject *ret = self->fatal ? Py_True : Py_False;
89 Py_INCREF(ret);
90 return ret;
91 }
92
93
KafkaError_test_raise_fatal(KafkaError * null,PyObject * ignore)94 static PyObject *KafkaError_test_raise_fatal (KafkaError *null,
95 PyObject *ignore) {
96 cfl_PyErr_Fatal(RD_KAFKA_RESP_ERR__INVALID_ARG,
97 "This is a fatal exception for testing purposes");
98 return NULL;
99 }
100
101
102 static PyMethodDef KafkaError_methods[] = {
103 { "code", (PyCFunction)KafkaError_code, METH_NOARGS,
104 " Returns the error/event code for comparison to"
105 "KafkaError.<ERR_CONSTANTS>.\n"
106 "\n"
107 " :returns: error/event code\n"
108 " :rtype: int\n"
109 "\n"
110 },
111 { "str", (PyCFunction)KafkaError_str, METH_NOARGS,
112 " Returns the human-readable error/event string.\n"
113 "\n"
114 " :returns: error/event message string\n"
115 " :rtype: str\n"
116 "\n"
117 },
118 { "name", (PyCFunction)KafkaError_name, METH_NOARGS,
119 " Returns the enum name for error/event.\n"
120 "\n"
121 " :returns: error/event enum name string\n"
122 " :rtype: str\n"
123 "\n"
124 },
125 { "fatal", (PyCFunction)KafkaError_fatal, METH_NOARGS,
126 " :returns: True if this a fatal error, else False.\n"
127 " :rtype: bool\n"
128 "\n"
129 },
130 { "_test_raise_fatal", (PyCFunction)KafkaError_test_raise_fatal,
131 METH_NOARGS|METH_STATIC
132 },
133
134 { NULL }
135 };
136
137
KafkaError_clear(PyObject * self0)138 static void KafkaError_clear (PyObject *self0) {
139 KafkaError *self = (KafkaError *)self0;
140 if (self->str) {
141 free(self->str);
142 self->str = NULL;
143 }
144 }
145
KafkaError_dealloc(PyObject * self0)146 static void KafkaError_dealloc (PyObject *self0) {
147 KafkaError *self = (KafkaError *)self0;
148 KafkaError_clear(self0);;
149 PyObject_GC_UnTrack(self0);
150 Py_TYPE(self)->tp_free(self0);
151 }
152
153
154
KafkaError_traverse(KafkaError * self,visitproc visit,void * arg)155 static int KafkaError_traverse (KafkaError *self,
156 visitproc visit, void *arg) {
157 return 0;
158 }
159
KafkaError_str0(KafkaError * self)160 static PyObject *KafkaError_str0 (KafkaError *self) {
161 return cfl_PyUnistr(_FromFormat("KafkaError{%scode=%s,val=%d,str=\"%s\"}",
162 self->fatal?"FATAL,":"",
163 rd_kafka_err2name(self->code),
164 self->code,
165 self->str ? self->str :
166 rd_kafka_err2str(self->code)));
167 }
168
KafkaError_hash(KafkaError * self)169 static long KafkaError_hash (KafkaError *self) {
170 return self->code;
171 }
172
173 static PyTypeObject KafkaErrorType;
174
175
176
KafkaError_richcompare(KafkaError * self,PyObject * o2,int op)177 static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2,
178 int op) {
179 int code2;
180 int r;
181 PyObject *result;
182
183 if (Py_TYPE(o2) == &KafkaErrorType)
184 code2 = ((KafkaError *)o2)->code;
185 else
186 code2 = cfl_PyInt_AsInt(o2);
187
188 switch (op)
189 {
190 case Py_LT:
191 r = self->code < code2;
192 break;
193 case Py_LE:
194 r = self->code <= code2;
195 break;
196 case Py_EQ:
197 r = self->code == code2;
198 break;
199 case Py_NE:
200 r = self->code != code2;
201 break;
202 case Py_GT:
203 r = self->code > code2;
204 break;
205 case Py_GE:
206 r = self->code >= code2;
207 break;
208 default:
209 r = 0;
210 break;
211 }
212
213 result = r ? Py_True : Py_False;
214 Py_INCREF(result);
215 return result;
216 }
217
218
219 static PyTypeObject KafkaErrorType = {
220 PyVarObject_HEAD_INIT(NULL, 0)
221 "cimpl.KafkaError", /*tp_name*/
222 sizeof(KafkaError), /*tp_basicsize*/
223 0, /*tp_itemsize*/
224 (destructor)KafkaError_dealloc, /*tp_dealloc*/
225 0, /*tp_print*/
226 0, /*tp_getattr*/
227 0, /*tp_setattr*/
228 0, /*tp_compare*/
229 (reprfunc)KafkaError_str0, /*tp_repr*/
230 0, /*tp_as_number*/
231 0, /*tp_as_sequence*/
232 0, /*tp_as_mapping*/
233 (hashfunc)KafkaError_hash, /*tp_hash */
234 0, /*tp_call*/
235 0, /*tp_str*/
236 PyObject_GenericGetAttr, /*tp_getattro*/
237 0, /*tp_setattro*/
238 0, /*tp_as_buffer*/
239 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
240 Py_TPFLAGS_BASE_EXC_SUBCLASS | Py_TPFLAGS_HAVE_GC, /*tp_flags*/
241 "Kafka error and event object\n"
242 "\n"
243 " The KafkaError class serves multiple purposes:\n"
244 "\n"
245 " - Propagation of errors\n"
246 " - Propagation of events\n"
247 " - Exceptions\n"
248 "\n"
249 " This class is not user-instantiable.\n"
250 "\n", /*tp_doc*/
251 (traverseproc)KafkaError_traverse, /* tp_traverse */
252 (inquiry)KafkaError_clear, /* tp_clear */
253 (richcmpfunc)KafkaError_richcompare, /* tp_richcompare */
254 0, /* tp_weaklistoffset */
255 0, /* tp_iter */
256 0, /* tp_iternext */
257 KafkaError_methods, /* tp_methods */
258 0, /* tp_members */
259 0, /* tp_getset */
260 0, /* tp_base */
261 0, /* tp_dict */
262 0, /* tp_descr_get */
263 0, /* tp_descr_set */
264 0, /* tp_dictoffset */
265 0, /* tp_init */
266 0 /* tp_alloc */
267 };
268
269
270 /**
271 * @brief Initialize a KafkaError object.
272 */
KafkaError_init(KafkaError * self,rd_kafka_resp_err_t code,const char * str)273 static void KafkaError_init (KafkaError *self,
274 rd_kafka_resp_err_t code, const char *str) {
275 self->code = code;
276 self->fatal = 0;
277 if (str)
278 self->str = strdup(str);
279 else
280 self->str = NULL;
281 }
282
283 /**
284 * @brief Internal factory to create KafkaError object.
285 */
KafkaError_new0(rd_kafka_resp_err_t err,const char * fmt,...)286 PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...) {
287
288 KafkaError *self;
289 va_list ap;
290 char buf[512];
291
292 self = (KafkaError *)KafkaErrorType.
293 tp_alloc(&KafkaErrorType, 0);
294 if (!self)
295 return NULL;
296
297 if (fmt) {
298 va_start(ap, fmt);
299 vsnprintf(buf, sizeof(buf), fmt, ap);
300 va_end(ap);
301 }
302
303 KafkaError_init(self, err, fmt ? buf : rd_kafka_err2str(err));
304
305 return (PyObject *)self;
306 }
307
308 /**
309 * @brief Internal factory to create KafkaError object.
310 * @returns a new KafkaError object if \p err != 0, else a None object.
311 */
KafkaError_new_or_None(rd_kafka_resp_err_t err,const char * str)312 PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str) {
313 if (!err)
314 Py_RETURN_NONE;
315 if (str)
316 return KafkaError_new0(err, "%s", str);
317 else
318 return KafkaError_new0(err, NULL);
319 }
320
321
322 /**
323 * @brief Raise exception from fatal error.
324 */
cfl_PyErr_Fatal(rd_kafka_resp_err_t err,const char * reason)325 static void cfl_PyErr_Fatal (rd_kafka_resp_err_t err, const char *reason) {
326 PyObject *eo = KafkaError_new0(err, "%s", reason);
327 ((KafkaError *)eo)->fatal = 1;
328 PyErr_SetObject(KafkaException, eo);
329 }
330
331
332
333
334
335 /****************************************************************************
336 *
337 *
338 * Message
339 *
340 *
341 *
342 *
343 ****************************************************************************/
344
345
346 /**
347 * @returns a Message's error object, if any, else None.
348 * @remark The error object refcount is increased by this function.
349 */
Message_error(Message * self,PyObject * ignore)350 PyObject *Message_error (Message *self, PyObject *ignore) {
351 if (self->error) {
352 Py_INCREF(self->error);
353 return self->error;
354 } else
355 Py_RETURN_NONE;
356 }
357
Message_value(Message * self,PyObject * ignore)358 static PyObject *Message_value (Message *self, PyObject *ignore) {
359 if (self->value) {
360 Py_INCREF(self->value);
361 return self->value;
362 } else
363 Py_RETURN_NONE;
364 }
365
366
Message_key(Message * self,PyObject * ignore)367 static PyObject *Message_key (Message *self, PyObject *ignore) {
368 if (self->key) {
369 Py_INCREF(self->key);
370 return self->key;
371 } else
372 Py_RETURN_NONE;
373 }
374
Message_topic(Message * self,PyObject * ignore)375 static PyObject *Message_topic (Message *self, PyObject *ignore) {
376 if (self->topic) {
377 Py_INCREF(self->topic);
378 return self->topic;
379 } else
380 Py_RETURN_NONE;
381 }
382
Message_partition(Message * self,PyObject * ignore)383 static PyObject *Message_partition (Message *self, PyObject *ignore) {
384 if (self->partition != RD_KAFKA_PARTITION_UA)
385 return cfl_PyInt_FromInt(self->partition);
386 else
387 Py_RETURN_NONE;
388 }
389
390
Message_offset(Message * self,PyObject * ignore)391 static PyObject *Message_offset (Message *self, PyObject *ignore) {
392 if (self->offset >= 0)
393 return PyLong_FromLongLong(self->offset);
394 else
395 Py_RETURN_NONE;
396 }
397
398
Message_timestamp(Message * self,PyObject * ignore)399 static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
400 return Py_BuildValue("iL",
401 self->tstype,
402 self->timestamp);
403 }
404
Message_headers(Message * self,PyObject * ignore)405 static PyObject *Message_headers (Message *self, PyObject *ignore) {
406 #ifdef RD_KAFKA_V_HEADERS
407 if (self->headers) {
408 Py_INCREF(self->headers);
409 return self->headers;
410 } else if (self->c_headers) {
411 self->headers = c_headers_to_py(self->c_headers);
412 rd_kafka_headers_destroy(self->c_headers);
413 self->c_headers = NULL;
414 Py_INCREF(self->headers);
415 return self->headers;
416 } else {
417 Py_RETURN_NONE;
418 }
419 #else
420 Py_RETURN_NONE;
421 #endif
422 }
423
Message_set_headers(Message * self,PyObject * new_headers)424 static PyObject *Message_set_headers (Message *self, PyObject *new_headers) {
425 if (self->headers)
426 Py_DECREF(self->headers);
427 self->headers = new_headers;
428 Py_INCREF(self->headers);
429
430 Py_RETURN_NONE;
431 }
432
Message_set_value(Message * self,PyObject * new_val)433 static PyObject *Message_set_value (Message *self, PyObject *new_val) {
434 if (self->value)
435 Py_DECREF(self->value);
436 self->value = new_val;
437 Py_INCREF(self->value);
438
439 Py_RETURN_NONE;
440 }
441
Message_set_key(Message * self,PyObject * new_key)442 static PyObject *Message_set_key (Message *self, PyObject *new_key) {
443 if (self->key)
444 Py_DECREF(self->key);
445 self->key = new_key;
446 Py_INCREF(self->key);
447
448 Py_RETURN_NONE;
449 }
450
451 static PyMethodDef Message_methods[] = {
452 { "error", (PyCFunction)Message_error, METH_NOARGS,
453 " The message object is also used to propagate errors and events, "
454 "an application must check error() to determine if the Message "
455 "is a proper message (error() returns None) or an error or event "
456 "(error() returns a KafkaError object)\n"
457 "\n"
458 " :rtype: None or :py:class:`KafkaError`\n"
459 "\n"
460 },
461
462 { "value", (PyCFunction)Message_value, METH_NOARGS,
463 " :returns: message value (payload) or None if not available.\n"
464 " :rtype: str|bytes or None\n"
465 "\n"
466 },
467 { "key", (PyCFunction)Message_key, METH_NOARGS,
468 " :returns: message key or None if not available.\n"
469 " :rtype: str|bytes or None\n"
470 "\n"
471 },
472 { "topic", (PyCFunction)Message_topic, METH_NOARGS,
473 " :returns: topic name or None if not available.\n"
474 " :rtype: str or None\n"
475 "\n"
476 },
477 { "partition", (PyCFunction)Message_partition, METH_NOARGS,
478 " :returns: partition number or None if not available.\n"
479 " :rtype: int or None\n"
480 "\n"
481 },
482 { "offset", (PyCFunction)Message_offset, METH_NOARGS,
483 " :returns: message offset or None if not available.\n"
484 " :rtype: int or None\n"
485 "\n"
486 },
487 { "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
488 "Retrieve timestamp type and timestamp from message.\n"
489 "The timestamp type is one of:\n"
490 " * :py:const:`TIMESTAMP_NOT_AVAILABLE`"
491 " - Timestamps not supported by broker\n"
492 " * :py:const:`TIMESTAMP_CREATE_TIME` "
493 " - Message creation time (or source / producer time)\n"
494 " * :py:const:`TIMESTAMP_LOG_APPEND_TIME` "
495 " - Broker receive time\n"
496 "\n"
497 "The returned timestamp should be ignored if the timestamp type is "
498 ":py:const:`TIMESTAMP_NOT_AVAILABLE`.\n"
499 "\n"
500 " The timestamp is the number of milliseconds since the epoch (UTC).\n"
501 "\n"
502 " Timestamps require broker version 0.10.0.0 or later and \n"
503 " ``{'api.version.request': True}`` configured on the client.\n"
504 "\n"
505 " :returns: tuple of message timestamp type, and timestamp.\n"
506 " :rtype: (int, int)\n"
507 "\n"
508 },
509 { "headers", (PyCFunction)Message_headers, METH_NOARGS,
510 " Retrieve the headers set on a message. Each header is a key value"
511 "pair. Please note that header keys are ordered and can repeat.\n"
512 "\n"
513 " :returns: list of two-tuples, one (key, value) pair for each header.\n"
514 " :rtype: [(str, bytes),...] or None.\n"
515 "\n"
516 },
517 { "set_headers", (PyCFunction)Message_set_headers, METH_O,
518 " Set the field 'Message.headers' with new value.\n"
519 "\n"
520 " :param object value: Message.headers.\n"
521 " :returns: None.\n"
522 " :rtype: None\n"
523 "\n"
524 },
525 { "set_value", (PyCFunction)Message_set_value, METH_O,
526 " Set the field 'Message.value' with new value.\n"
527 "\n"
528 " :param object value: Message.value.\n"
529 " :returns: None.\n"
530 " :rtype: None\n"
531 "\n"
532 },
533 { "set_key", (PyCFunction)Message_set_key, METH_O,
534 " Set the field 'Message.key' with new value.\n"
535 "\n"
536 " :param object value: Message.key.\n"
537 " :returns: None.\n"
538 " :rtype: None\n"
539 "\n"
540 },
541 { NULL }
542 };
543
Message_clear(Message * self)544 static int Message_clear (Message *self) {
545 if (self->topic) {
546 Py_DECREF(self->topic);
547 self->topic = NULL;
548 }
549 if (self->value) {
550 Py_DECREF(self->value);
551 self->value = NULL;
552 }
553 if (self->key) {
554 Py_DECREF(self->key);
555 self->key = NULL;
556 }
557 if (self->error) {
558 Py_DECREF(self->error);
559 self->error = NULL;
560 }
561 if (self->headers) {
562 Py_DECREF(self->headers);
563 self->headers = NULL;
564 }
565 #ifdef RD_KAFKA_V_HEADERS
566 if (self->c_headers){
567 rd_kafka_headers_destroy(self->c_headers);
568 self->c_headers = NULL;
569 }
570 #endif
571 return 0;
572 }
573
574
Message_dealloc(Message * self)575 static void Message_dealloc (Message *self) {
576 Message_clear(self);
577 PyObject_GC_UnTrack(self);
578 Py_TYPE(self)->tp_free((PyObject *)self);
579 }
580
581
Message_traverse(Message * self,visitproc visit,void * arg)582 static int Message_traverse (Message *self,
583 visitproc visit, void *arg) {
584 if (self->topic)
585 Py_VISIT(self->topic);
586 if (self->value)
587 Py_VISIT(self->value);
588 if (self->key)
589 Py_VISIT(self->key);
590 if (self->error)
591 Py_VISIT(self->error);
592 if (self->headers)
593 Py_VISIT(self->headers);
594 return 0;
595 }
596
Message__len__(Message * self)597 static Py_ssize_t Message__len__ (Message *self) {
598 return self->value ? PyObject_Length(self->value) : 0;
599 }
600
601 static PySequenceMethods Message_seq_methods = {
602 (lenfunc)Message__len__ /* sq_length */
603 };
604
605 PyTypeObject MessageType = {
606 PyVarObject_HEAD_INIT(NULL, 0)
607 "cimpl.Message", /*tp_name*/
608 sizeof(Message), /*tp_basicsize*/
609 0, /*tp_itemsize*/
610 (destructor)Message_dealloc, /*tp_dealloc*/
611 0, /*tp_print*/
612 0, /*tp_getattr*/
613 0, /*tp_setattr*/
614 0, /*tp_compare*/
615 0, /*tp_repr*/
616 0, /*tp_as_number*/
617 &Message_seq_methods, /*tp_as_sequence*/
618 0, /*tp_as_mapping*/
619 0, /*tp_hash */
620 0, /*tp_call*/
621 0, /*tp_str*/
622 PyObject_GenericGetAttr, /*tp_getattro*/
623 0, /*tp_setattro*/
624 0, /*tp_as_buffer*/
625 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
626 Py_TPFLAGS_HAVE_GC, /*tp_flags*/
627 "The Message object represents either a single consumed or "
628 "produced message, or an event (:py:func:`error()` is not None).\n"
629 "\n"
630 "An application must check with :py:func:`error()` to see if the "
631 "object is a proper message (error() returns None) or an "
632 "error/event.\n"
633 "\n"
634 "This class is not user-instantiable.\n"
635 "\n"
636 ".. py:function:: len()\n"
637 "\n"
638 " :returns: Message value (payload) size in bytes\n"
639 " :rtype: int\n"
640 "\n", /*tp_doc*/
641 (traverseproc)Message_traverse, /* tp_traverse */
642 (inquiry)Message_clear, /* tp_clear */
643 0, /* tp_richcompare */
644 0, /* tp_weaklistoffset */
645 0, /* tp_iter */
646 0, /* tp_iternext */
647 Message_methods, /* tp_methods */
648 0, /* tp_members */
649 0, /* tp_getset */
650 0, /* tp_base */
651 0, /* tp_dict */
652 0, /* tp_descr_get */
653 0, /* tp_descr_set */
654 0, /* tp_dictoffset */
655 0, /* tp_init */
656 0 /* tp_alloc */
657 };
658
659 /**
660 * @brief Internal factory to create Message object from message_t
661 */
Message_new0(const Handle * handle,const rd_kafka_message_t * rkm)662 PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm) {
663 Message *self;
664
665 self = (Message *)MessageType.tp_alloc(&MessageType, 0);
666 if (!self)
667 return NULL;
668
669 /* Only use message error string on Consumer, for Producers
670 * it will contain the original message payload. */
671 self->error = KafkaError_new_or_None(
672 rkm->err,
673 (rkm->err && handle->type != RD_KAFKA_PRODUCER) ?
674 rd_kafka_message_errstr(rkm) : NULL);
675
676 if (rkm->rkt)
677 self->topic = cfl_PyUnistr(
678 _FromString(rd_kafka_topic_name(rkm->rkt)));
679 if (rkm->payload)
680 self->value = cfl_PyBin(_FromStringAndSize(rkm->payload,
681 rkm->len));
682 if (rkm->key)
683 self->key = cfl_PyBin(
684 _FromStringAndSize(rkm->key, rkm->key_len));
685
686 self->partition = rkm->partition;
687 self->offset = rkm->offset;
688
689 self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);
690
691 return (PyObject *)self;
692 }
693
694
695
696
697 /****************************************************************************
698 *
699 *
700 * TopicPartition
701 *
702 *
703 *
704 *
705 ****************************************************************************/
TopicPartition_clear(TopicPartition * self)706 static int TopicPartition_clear (TopicPartition *self) {
707 if (self->topic) {
708 free(self->topic);
709 self->topic = NULL;
710 }
711 if (self->error) {
712 Py_DECREF(self->error);
713 self->error = NULL;
714 }
715 return 0;
716 }
717
TopicPartition_setup(TopicPartition * self,const char * topic,int partition,long long offset,rd_kafka_resp_err_t err)718 static void TopicPartition_setup (TopicPartition *self, const char *topic,
719 int partition, long long offset,
720 rd_kafka_resp_err_t err) {
721 self->topic = strdup(topic);
722 self->partition = partition;
723 self->offset = offset;
724 self->error = KafkaError_new_or_None(err, NULL);
725 }
726
727
TopicPartition_dealloc(TopicPartition * self)728 static void TopicPartition_dealloc (TopicPartition *self) {
729 PyObject_GC_UnTrack(self);
730
731 TopicPartition_clear(self);
732
733 Py_TYPE(self)->tp_free((PyObject *)self);
734 }
735
736
TopicPartition_init(PyObject * self,PyObject * args,PyObject * kwargs)737 static int TopicPartition_init (PyObject *self, PyObject *args,
738 PyObject *kwargs) {
739 const char *topic;
740 int partition = RD_KAFKA_PARTITION_UA;
741 long long offset = RD_KAFKA_OFFSET_INVALID;
742 static char *kws[] = { "topic",
743 "partition",
744 "offset",
745 NULL };
746
747 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws,
748 &topic, &partition, &offset))
749 return -1;
750
751 TopicPartition_setup((TopicPartition *)self,
752 topic, partition, offset, 0);
753
754 return 0;
755 }
756
757
TopicPartition_new(PyTypeObject * type,PyObject * args,PyObject * kwargs)758 static PyObject *TopicPartition_new (PyTypeObject *type, PyObject *args,
759 PyObject *kwargs) {
760 PyObject *self = type->tp_alloc(type, 1);
761 return self;
762 }
763
764
765
TopicPartition_traverse(TopicPartition * self,visitproc visit,void * arg)766 static int TopicPartition_traverse (TopicPartition *self,
767 visitproc visit, void *arg) {
768 if (self->error)
769 Py_VISIT(self->error);
770 return 0;
771 }
772
773
774 static PyMemberDef TopicPartition_members[] = {
775 { "topic", T_STRING, offsetof(TopicPartition, topic), READONLY,
776 ":attribute topic: Topic name (string)" },
777 { "partition", T_INT, offsetof(TopicPartition, partition), 0,
778 ":attribute partition: Partition number (int)" },
779 { "offset", T_LONGLONG, offsetof(TopicPartition, offset), 0,
780 ":attribute offset: Offset (long)\n"
781 "\n"
782 "Either an absolute offset (>=0) or a logical offset: "
783 " :py:const:`OFFSET_BEGINNING`,"
784 " :py:const:`OFFSET_END`,"
785 " :py:const:`OFFSET_STORED`,"
786 " :py:const:`OFFSET_INVALID`\n"
787 },
788 { "error", T_OBJECT, offsetof(TopicPartition, error), READONLY,
789 ":attribute error: Indicates an error (with :py:class:`KafkaError`) unless None." },
790 { NULL }
791 };
792
793
TopicPartition_str0(TopicPartition * self)794 static PyObject *TopicPartition_str0 (TopicPartition *self) {
795 PyObject *errstr = NULL;
796 PyObject *errstr8 = NULL;
797 const char *c_errstr = NULL;
798 PyObject *ret;
799 char offset_str[40];
800
801 snprintf(offset_str, sizeof(offset_str), "%"CFL_PRId64"", self->offset);
802
803 if (self->error != Py_None) {
804 errstr = cfl_PyObject_Unistr(self->error);
805 c_errstr = cfl_PyUnistr_AsUTF8(errstr, &errstr8);
806 }
807
808 ret = cfl_PyUnistr(
809 _FromFormat("TopicPartition{topic=%s,partition=%"CFL_PRId32
810 ",offset=%s,error=%s}",
811 self->topic, self->partition,
812 offset_str,
813 c_errstr ? c_errstr : "None"));
814 Py_XDECREF(errstr8);
815 Py_XDECREF(errstr);
816 return ret;
817 }
818
819
820 static PyObject *
TopicPartition_richcompare(TopicPartition * self,PyObject * o2,int op)821 TopicPartition_richcompare (TopicPartition *self, PyObject *o2,
822 int op) {
823 TopicPartition *a = self, *b;
824 int tr, pr;
825 int r;
826 PyObject *result;
827
828 if (Py_TYPE(o2) != Py_TYPE(self)) {
829 PyErr_SetNone(PyExc_NotImplementedError);
830 return NULL;
831 }
832
833 b = (TopicPartition *)o2;
834
835 tr = strcmp(a->topic, b->topic);
836 pr = a->partition - b->partition;
837 switch (op)
838 {
839 case Py_LT:
840 r = tr < 0 || (tr == 0 && pr < 0);
841 break;
842 case Py_LE:
843 r = tr < 0 || (tr == 0 && pr <= 0);
844 break;
845 case Py_EQ:
846 r = (tr == 0 && pr == 0);
847 break;
848 case Py_NE:
849 r = (tr != 0 || pr != 0);
850 break;
851 case Py_GT:
852 r = tr > 0 || (tr == 0 && pr > 0);
853 break;
854 case Py_GE:
855 r = tr > 0 || (tr == 0 && pr >= 0);
856 break;
857 default:
858 r = 0;
859 break;
860 }
861
862 result = r ? Py_True : Py_False;
863 Py_INCREF(result);
864 return result;
865 }
866
867
TopicPartition_hash(TopicPartition * self)868 static long TopicPartition_hash (TopicPartition *self) {
869 PyObject *topic = cfl_PyUnistr(_FromString(self->topic));
870 long r = PyObject_Hash(topic) ^ self->partition;
871 Py_DECREF(topic);
872 return r;
873 }
874
875
876 PyTypeObject TopicPartitionType = {
877 PyVarObject_HEAD_INIT(NULL, 0)
878 "cimpl.TopicPartition", /*tp_name*/
879 sizeof(TopicPartition), /*tp_basicsize*/
880 0, /*tp_itemsize*/
881 (destructor)TopicPartition_dealloc, /*tp_dealloc*/
882 0, /*tp_print*/
883 0, /*tp_getattr*/
884 0, /*tp_setattr*/
885 0, /*tp_compare*/
886 (reprfunc)TopicPartition_str0, /*tp_repr*/
887 0, /*tp_as_number*/
888 0, /*tp_as_sequence*/
889 0, /*tp_as_mapping*/
890 (hashfunc)TopicPartition_hash, /*tp_hash */
891 0, /*tp_call*/
892 0, /*tp_str*/
893 PyObject_GenericGetAttr, /*tp_getattro*/
894 0, /*tp_setattro*/
895 0, /*tp_as_buffer*/
896 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
897 Py_TPFLAGS_HAVE_GC, /*tp_flags*/
898 "TopicPartition is a generic type to hold a single partition and "
899 "various information about it.\n"
900 "\n"
901 "It is typically used to provide a list of topics or partitions for "
902 "various operations, such as :py:func:`Consumer.assign()`.\n"
903 "\n"
904 ".. py:function:: TopicPartition(topic, [partition], [offset])\n"
905 "\n"
906 " Instantiate a TopicPartition object.\n"
907 "\n"
908 " :param string topic: Topic name\n"
909 " :param int partition: Partition id\n"
910 " :param int offset: Initial partition offset\n"
911 " :rtype: TopicPartition\n"
912 "\n"
913 "\n", /*tp_doc*/
914 (traverseproc)TopicPartition_traverse, /* tp_traverse */
915 (inquiry)TopicPartition_clear, /* tp_clear */
916 (richcmpfunc)TopicPartition_richcompare, /* tp_richcompare */
917 0, /* tp_weaklistoffset */
918 0, /* tp_iter */
919 0, /* tp_iternext */
920 0, /* tp_methods */
921 TopicPartition_members,/* tp_members */
922 0, /* tp_getset */
923 0, /* tp_base */
924 0, /* tp_dict */
925 0, /* tp_descr_get */
926 0, /* tp_descr_set */
927 0, /* tp_dictoffset */
928 TopicPartition_init, /* tp_init */
929 0, /* tp_alloc */
930 TopicPartition_new /* tp_new */
931 };
932
933 /**
934 * @brief Internal factory to create a TopicPartition object.
935 */
TopicPartition_new0(const char * topic,int partition,long long offset,rd_kafka_resp_err_t err)936 static PyObject *TopicPartition_new0 (const char *topic, int partition,
937 long long offset,
938 rd_kafka_resp_err_t err) {
939 TopicPartition *self;
940
941 self = (TopicPartition *)TopicPartitionType.tp_new(
942 &TopicPartitionType, NULL, NULL);
943
944 TopicPartition_setup(self, topic, partition, offset, err);
945
946 return (PyObject *)self;
947 }
948
949
950
951
952 /**
953 * @brief Convert C rd_kafka_topic_partition_list_t to Python list(TopicPartition).
954 *
955 * @returns The new Python list object.
956 */
c_parts_to_py(const rd_kafka_topic_partition_list_t * c_parts)957 PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
958 PyObject *parts;
959 size_t i;
960
961 parts = PyList_New(c_parts->cnt);
962
963 for (i = 0 ; i < (size_t)c_parts->cnt ; i++) {
964 const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i];
965 PyList_SET_ITEM(parts, i,
966 TopicPartition_new0(
967 rktpar->topic, rktpar->partition,
968 rktpar->offset, rktpar->err));
969 }
970
971 return parts;
972
973 }
974
975 /**
976 * @brief Convert Python list(TopicPartition) to C rd_kafka_topic_partition_list_t.
977 *
978 * @returns The new C list on success or NULL on error.
979 */
py_to_c_parts(PyObject * plist)980 rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
981 rd_kafka_topic_partition_list_t *c_parts;
982 size_t i;
983
984 if (!PyList_Check(plist)) {
985 PyErr_SetString(PyExc_TypeError,
986 "requires list of TopicPartition");
987 return NULL;
988 }
989
990 c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));
991
992 for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
993 TopicPartition *tp = (TopicPartition *)
994 PyList_GetItem(plist, i);
995
996 if (PyObject_Type((PyObject *)tp) !=
997 (PyObject *)&TopicPartitionType) {
998 PyErr_Format(PyExc_TypeError,
999 "expected %s",
1000 TopicPartitionType.tp_name);
1001 rd_kafka_topic_partition_list_destroy(c_parts);
1002 return NULL;
1003 }
1004
1005 rd_kafka_topic_partition_list_add(c_parts,
1006 tp->topic,
1007 tp->partition)->offset =
1008 tp->offset;
1009 }
1010
1011 return c_parts;
1012 }
1013
1014 #ifdef RD_KAFKA_V_HEADERS
1015
1016
1017 /**
1018 * @brief Translate Python \p key and \p value to C types and set on
1019 * provided \p rd_headers object.
1020 *
1021 * @returns 1 on success or 0 if an exception was raised.
1022 */
py_header_to_c(rd_kafka_headers_t * rd_headers,PyObject * key,PyObject * value)1023 static int py_header_to_c (rd_kafka_headers_t *rd_headers,
1024 PyObject *key, PyObject *value) {
1025 PyObject *ks, *ks8, *vo8 = NULL;
1026 const char *k;
1027 const void *v = NULL;
1028 Py_ssize_t vsize = 0;
1029 rd_kafka_resp_err_t err;
1030
1031 if (!(ks = cfl_PyObject_Unistr(key))) {
1032 PyErr_SetString(PyExc_TypeError,
1033 "expected header key to be unicode "
1034 "string");
1035 return 0;
1036 }
1037
1038 k = cfl_PyUnistr_AsUTF8(ks, &ks8);
1039
1040 if (value != Py_None) {
1041 if (cfl_PyBin(_Check(value))) {
1042 /* Proper binary */
1043 if (cfl_PyBin(_AsStringAndSize(value, (char **)&v,
1044 &vsize)) == -1) {
1045 Py_DECREF(ks);
1046 Py_XDECREF(ks8);
1047 return 0;
1048 }
1049 } else if (cfl_PyUnistr(_Check(value))) {
1050 /* Unicode string, translate to utf-8. */
1051 v = cfl_PyUnistr_AsUTF8(value, &vo8);
1052 if (!v) {
1053 Py_DECREF(ks);
1054 Py_XDECREF(ks8);
1055 return 0;
1056 }
1057 vsize = (Py_ssize_t)strlen(v);
1058 } else {
1059 PyErr_Format(PyExc_TypeError,
1060 "expected header value to be "
1061 "None, binary, or unicode string, not %s",
1062 ((PyTypeObject *)PyObject_Type(value))->
1063 tp_name);
1064 Py_DECREF(ks);
1065 Py_XDECREF(ks8);
1066 return 0;
1067 }
1068 }
1069
1070 if ((err = rd_kafka_header_add(rd_headers, k, -1, v, vsize))) {
1071 cfl_PyErr_Format(err,
1072 "Unable to add message header \"%s\": "
1073 "%s",
1074 k, rd_kafka_err2str(err));
1075 Py_DECREF(ks);
1076 Py_XDECREF(ks8);
1077 Py_XDECREF(vo8);
1078 return 0;
1079 }
1080
1081 Py_DECREF(ks);
1082 Py_XDECREF(ks8);
1083 Py_XDECREF(vo8);
1084
1085 return 1;
1086 }
1087
1088 /**
1089 * @brief Convert Python list of tuples to rd_kafka_headers_t
1090 *
1091 * Header names must be unicode strong.
1092 * Header values may be None, binary or unicode string, the latter is
1093 * automatically encoded as utf-8.
1094 */
py_headers_list_to_c(PyObject * hdrs)1095 static rd_kafka_headers_t *py_headers_list_to_c (PyObject *hdrs) {
1096 int i, len;
1097 rd_kafka_headers_t *rd_headers = NULL;
1098
1099 len = (int)PyList_Size(hdrs);
1100 rd_headers = rd_kafka_headers_new(len);
1101
1102 for (i = 0; i < len; i++) {
1103 PyObject *tuple = PyList_GET_ITEM(hdrs, i);
1104
1105 if (!PyTuple_Check(tuple) || PyTuple_Size(tuple) != 2) {
1106 rd_kafka_headers_destroy(rd_headers);
1107 PyErr_SetString(PyExc_TypeError,
1108 "Headers are expected to be a "
1109 "list of (key, value) tuples");
1110 return NULL;
1111 }
1112
1113 if (!py_header_to_c(rd_headers,
1114 PyTuple_GET_ITEM(tuple, 0),
1115 PyTuple_GET_ITEM(tuple, 1))) {
1116 rd_kafka_headers_destroy(rd_headers);
1117 return NULL;
1118 }
1119 }
1120 return rd_headers;
1121 }
1122
1123
1124 /**
1125 * @brief Convert Python dict to rd_kafka_headers_t
1126 */
py_headers_dict_to_c(PyObject * hdrs)1127 static rd_kafka_headers_t *py_headers_dict_to_c (PyObject *hdrs) {
1128 int len;
1129 Py_ssize_t pos = 0;
1130 rd_kafka_headers_t *rd_headers = NULL;
1131 PyObject *ko, *vo;
1132
1133 len = (int)PyDict_Size(hdrs);
1134 rd_headers = rd_kafka_headers_new(len);
1135
1136 while (PyDict_Next(hdrs, &pos, &ko, &vo)) {
1137
1138 if (!py_header_to_c(rd_headers, ko, vo)) {
1139 rd_kafka_headers_destroy(rd_headers);
1140 return NULL;
1141 }
1142 }
1143
1144 return rd_headers;
1145 }
1146
1147
1148 /**
1149 * @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
1150 *
1151 * @returns The new Python list[(header_key, header_value),...] object.
1152 */
py_headers_to_c(PyObject * hdrs)1153 rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs) {
1154
1155 if (PyList_Check(hdrs)) {
1156 return py_headers_list_to_c(hdrs);
1157 } else if (PyDict_Check(hdrs)) {
1158 return py_headers_dict_to_c(hdrs);
1159 } else {
1160 PyErr_Format(PyExc_TypeError,
1161 "expected headers to be "
1162 "dict or list of (key, value) tuples, not %s",
1163 ((PyTypeObject *)PyObject_Type(hdrs))->tp_name);
1164 return NULL;
1165 }
1166 }
1167
1168
1169 /**
1170 * @brief Convert rd_kafka_headers_t to Python list[(header_key, header_value),...])
1171 *
1172 * @returns The new C headers on success or NULL on error.
1173 */
c_headers_to_py(rd_kafka_headers_t * headers)1174 PyObject *c_headers_to_py (rd_kafka_headers_t *headers) {
1175 size_t idx = 0;
1176 size_t header_size = 0;
1177 const char *header_key;
1178 const void *header_value;
1179 size_t header_value_size;
1180 PyObject *header_list;
1181
1182 header_size = rd_kafka_header_cnt(headers);
1183 header_list = PyList_New(header_size);
1184
1185 while (!rd_kafka_header_get_all(headers, idx++,
1186 &header_key, &header_value, &header_value_size)) {
1187 // Create one (key, value) tuple for each header
1188 PyObject *header_tuple = PyTuple_New(2);
1189 PyTuple_SetItem(header_tuple, 0,
1190 cfl_PyUnistr(_FromString(header_key))
1191 );
1192
1193 if (header_value) {
1194 PyTuple_SetItem(header_tuple, 1,
1195 cfl_PyBin(_FromStringAndSize(header_value, header_value_size))
1196 );
1197 } else {
1198 PyTuple_SetItem(header_tuple, 1, Py_None);
1199 }
1200 PyList_SET_ITEM(header_list, idx-1, header_tuple);
1201 }
1202
1203 return header_list;
1204 }
1205 #endif
1206
1207
1208 /****************************************************************************
1209 *
1210 *
1211 * Common callbacks
1212 *
1213 *
1214 *
1215 *
1216 ****************************************************************************/
error_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)1217 static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
1218 Handle *h = opaque;
1219 PyObject *eo, *result;
1220 CallState *cs;
1221
1222 cs = CallState_get(h);
1223
1224 /* If the client raised a fatal error we'll raise an exception
1225 * rather than calling the error callback. */
1226 if (err == RD_KAFKA_RESP_ERR__FATAL) {
1227 char errstr[512];
1228 err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
1229 cfl_PyErr_Fatal(err, errstr);
1230 goto crash;
1231 }
1232
1233 if (!h->error_cb) {
1234 /* No callback defined */
1235 goto done;
1236 }
1237
1238 eo = KafkaError_new0(err, "%s", reason);
1239 result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
1240 Py_DECREF(eo);
1241
1242 if (result)
1243 Py_DECREF(result);
1244 else {
1245 crash:
1246 CallState_crash(cs);
1247 rd_kafka_yield(h->rk);
1248 }
1249
1250 done:
1251 CallState_resume(cs);
1252 }
1253
1254 /**
1255 * @brief librdkafka throttle callback triggered by poll() or flush(), triggers the
1256 * corresponding Python throttle_cb
1257 */
throttle_cb(rd_kafka_t * rk,const char * broker_name,int32_t broker_id,int throttle_time_ms,void * opaque)1258 static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker_id,
1259 int throttle_time_ms, void *opaque) {
1260 Handle *h = opaque;
1261 PyObject *ThrottleEvent_type, *throttle_event;
1262 PyObject *result, *args;
1263 CallState *cs;
1264
1265 cs = CallState_get(h);
1266 if (!h->throttle_cb) {
1267 /* No callback defined */
1268 goto done;
1269 }
1270
1271 ThrottleEvent_type = cfl_PyObject_lookup("confluent_kafka",
1272 "ThrottleEvent");
1273
1274 if (!ThrottleEvent_type) {
1275 /* ThrottleEvent class not found */
1276 goto err;
1277 }
1278
1279 args = Py_BuildValue("(sid)", broker_name, broker_id, (double)throttle_time_ms/1000);
1280 throttle_event = PyObject_Call(ThrottleEvent_type, args, NULL);
1281
1282 Py_DECREF(args);
1283 Py_DECREF(ThrottleEvent_type);
1284
1285 if (!throttle_event) {
1286 /* Failed to instantiate ThrottleEvent object */
1287 goto err;
1288 }
1289
1290 result = PyObject_CallFunctionObjArgs(h->throttle_cb, throttle_event, NULL);
1291
1292 Py_DECREF(throttle_event);
1293
1294 if (result) {
1295 /* throttle_cb executed successfully */
1296 Py_DECREF(result);
1297 goto done;
1298 }
1299
1300 /**
1301 * Stop callback dispatcher, return err to application
1302 * fall-through to unlock GIL
1303 */
1304 err:
1305 CallState_crash(cs);
1306 rd_kafka_yield(h->rk);
1307 done:
1308 CallState_resume(cs);
1309 }
1310
stats_cb(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)1311 static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
1312 Handle *h = opaque;
1313 PyObject *eo = NULL, *result = NULL;
1314 CallState *cs = NULL;
1315
1316 cs = CallState_get(h);
1317 if (json_len == 0) {
1318 /* No data returned*/
1319 goto done;
1320 }
1321
1322 eo = Py_BuildValue("s", json);
1323 result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL);
1324 Py_DECREF(eo);
1325
1326 if (result)
1327 Py_DECREF(result);
1328 else {
1329 CallState_crash(cs);
1330 rd_kafka_yield(h->rk);
1331 }
1332
1333 done:
1334 CallState_resume(cs);
1335 return 0;
1336 }
1337
log_cb(const rd_kafka_t * rk,int level,const char * fac,const char * buf)1338 static void log_cb (const rd_kafka_t *rk, int level,
1339 const char *fac, const char *buf) {
1340 Handle *h = rd_kafka_opaque(rk);
1341 PyObject *result;
1342 CallState *cs;
1343 static const int level_map[8] = {
1344 /* Map syslog levels to python logging levels */
1345 50, /* LOG_EMERG -> logging.CRITICAL */
1346 50, /* LOG_ALERT -> logging.CRITICAL */
1347 50, /* LOG_CRIT -> logging.CRITICAL */
1348 40, /* LOG_ERR -> logging.ERROR */
1349 30, /* LOG_WARNING -> logging.WARNING */
1350 20, /* LOG_NOTICE -> logging.INFO */
1351 20, /* LOG_INFO -> logging.INFO */
1352 10, /* LOG_DEBUG -> logging.DEBUG */
1353 };
1354
1355 cs = CallState_get(h);
1356 result = PyObject_CallMethod(h->logger, "log", "issss",
1357 level_map[level],
1358 "%s [%s] %s",
1359 fac, rd_kafka_name(rk), buf);
1360
1361 if (result)
1362 Py_DECREF(result);
1363 else {
1364 CallState_crash(cs);
1365 rd_kafka_yield(h->rk);
1366 }
1367
1368 CallState_resume(cs);
1369 }
1370
1371 /****************************************************************************
1372 *
1373 *
1374 * Common helpers
1375 *
1376 *
1377 *
1378 *
1379 ****************************************************************************/
1380
1381
1382
1383 /**
1384 * Clear Python object references in Handle
1385 */
Handle_clear(Handle * h)1386 void Handle_clear (Handle *h) {
1387 if (h->error_cb) {
1388 Py_DECREF(h->error_cb);
1389 h->error_cb = NULL;
1390 }
1391
1392 if (h->throttle_cb) {
1393 Py_DECREF(h->throttle_cb);
1394 h->throttle_cb = NULL;
1395 }
1396
1397 if (h->stats_cb) {
1398 Py_DECREF(h->stats_cb);
1399 h->stats_cb = NULL;
1400 }
1401
1402 if (h->logger) {
1403 Py_DECREF(h->logger);
1404 h->logger = NULL;
1405 }
1406
1407 if (h->initiated) {
1408 #ifdef WITH_PY_TSS
1409 PyThread_tss_delete(&h->tlskey);
1410 #else
1411 PyThread_delete_key(h->tlskey);
1412 #endif
1413 }
1414 }
1415
1416 /**
1417 * GC traversal for Python object references
1418 */
Handle_traverse(Handle * h,visitproc visit,void * arg)1419 int Handle_traverse (Handle *h, visitproc visit, void *arg) {
1420 if (h->error_cb)
1421 Py_VISIT(h->error_cb);
1422
1423 if (h->throttle_cb)
1424 Py_VISIT(h->throttle_cb);
1425
1426 if (h->stats_cb)
1427 Py_VISIT(h->stats_cb);
1428
1429 return 0;
1430 }
1431
1432 /**
1433 * @brief Set single special producer config value.
1434 *
1435 * @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
1436 */
producer_conf_set_special(Handle * self,rd_kafka_conf_t * conf,const char * name,PyObject * valobj)1437 static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
1438 const char *name, PyObject *valobj) {
1439
1440 if (!strcmp(name, "on_delivery")) {
1441 if (!PyCallable_Check(valobj)) {
1442 cfl_PyErr_Format(
1443 RD_KAFKA_RESP_ERR__INVALID_ARG,
1444 "%s requires a callable "
1445 "object", name);
1446 return -1;
1447 }
1448
1449 self->u.Producer.default_dr_cb = valobj;
1450 Py_INCREF(self->u.Producer.default_dr_cb);
1451
1452 return 1;
1453
1454 } else if (!strcmp(name, "delivery.report.only.error")) {
1455 /* Since we allocate msgstate for each produced message
1456 * with a callback we can't use delivery.report.only.error
1457 * as-is, as we wouldn't be able to ever free those msgstates.
1458 * Instead we shortcut this setting in the Python client,
1459 * providing the same functionality from dr_msg_cb trampoline.
1460 */
1461
1462 if (!cfl_PyBool_get(valobj, name,
1463 &self->u.Producer.dr_only_error))
1464 return -1;
1465
1466 return 1;
1467 }
1468
1469 return 0; /* Not handled */
1470 }
1471
1472
1473 /**
1474 * @brief Set single special consumer config value.
1475 *
1476 * @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
1477 */
consumer_conf_set_special(Handle * self,rd_kafka_conf_t * conf,const char * name,PyObject * valobj)1478 static int consumer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
1479 const char *name, PyObject *valobj) {
1480
1481 if (!strcmp(name, "on_commit")) {
1482 if (!PyCallable_Check(valobj)) {
1483 cfl_PyErr_Format(
1484 RD_KAFKA_RESP_ERR__INVALID_ARG,
1485 "%s requires a callable "
1486 "object", name);
1487 return -1;
1488 }
1489
1490 self->u.Consumer.on_commit = valobj;
1491 Py_INCREF(self->u.Consumer.on_commit);
1492
1493 return 1;
1494 }
1495
1496 return 0;
1497 }
1498
1499 /**
1500 * @brief Call out to __init__.py _resolve_plugins() to see if any
1501 * of the specified `plugin.library.paths` are found in the
1502 * wheel's embedded library directory, and if so change the
1503 * path to use these libraries.
1504 *
1505 * @returns a possibly updated plugin.library.paths string object which
1506 * must be DECREF:ed, or NULL if an exception was raised.
1507 */
resolve_plugins(PyObject * plugins)1508 static PyObject *resolve_plugins (PyObject *plugins) {
1509 PyObject *resolved;
1510 PyObject *module, *function;
1511
1512 module = PyImport_ImportModule("confluent_kafka");
1513 if (!module)
1514 return NULL;
1515
1516 function = PyObject_GetAttrString(module, "_resolve_plugins");
1517 if (!function) {
1518 PyErr_SetString(PyExc_RuntimeError,
1519 "confluent_kafka._resolve_plugins() not found");
1520 Py_DECREF(module);
1521 return NULL;
1522 }
1523
1524 resolved = PyObject_CallFunctionObjArgs(function, plugins, NULL);
1525
1526 Py_DECREF(function);
1527 Py_DECREF(module);
1528
1529 if (!resolved) {
1530 PyErr_SetString(PyExc_RuntimeError,
1531 "confluent_kafka._resolve_plugins() failed");
1532 return NULL;
1533 }
1534
1535 return resolved;
1536 }
1537
1538 /**
1539 * @brief Remove property from confidct and set rd_kafka_conf with its value
1540 *
1541 * @param vo The property value object
1542 *
1543 * @returns 1 on success or 0 on failure (exception raised).
1544 */
common_conf_set_special(PyObject * confdict,rd_kafka_conf_t * conf,const char * name,PyObject * vo)1545 static int common_conf_set_special(PyObject *confdict, rd_kafka_conf_t *conf,
1546 const char *name, PyObject *vo) {
1547 const char *v;
1548 char errstr[256];
1549 PyObject *vs;
1550 PyObject *vs8 = NULL;
1551
1552 if (!(vs = cfl_PyObject_Unistr(vo))) {
1553 PyErr_Format(PyExc_TypeError, "expected configuration property %s "
1554 "as type unicode string", name);
1555 return 0;
1556 }
1557
1558 v = cfl_PyUnistr_AsUTF8(vs, &vs8);
1559 if (rd_kafka_conf_set(conf, name, v, errstr, sizeof(errstr))
1560 != RD_KAFKA_CONF_OK) {
1561 cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
1562 "%s", errstr);
1563 Py_DECREF(vs);
1564 Py_XDECREF(vs8);
1565 return 0;
1566 }
1567
1568 Py_DECREF(vs);
1569 Py_XDECREF(vs8);
1570 PyDict_DelItemString(confdict, name);
1571 return 1;
1572 }
1573
1574 /**
1575 * Common config setup for Kafka client handles.
1576 *
1577 * Returns a conf object on success or NULL on failure in which case
1578 * an exception has been raised.
1579 */
common_conf_setup(rd_kafka_type_t ktype,Handle * h,PyObject * args,PyObject * kwargs)1580 rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
1581 Handle *h,
1582 PyObject *args,
1583 PyObject *kwargs) {
1584 rd_kafka_conf_t *conf;
1585 Py_ssize_t pos = 0;
1586 PyObject *ko, *vo;
1587 PyObject *confdict = NULL;
1588
1589 if (rd_kafka_version() < MIN_RD_KAFKA_VERSION) {
1590 PyErr_Format(PyExc_RuntimeError,
1591 "%s: librdkafka version %s (0x%x) detected",
1592 MIN_VER_ERRSTR, rd_kafka_version_str(),
1593 rd_kafka_version());
1594 return NULL;
1595 }
1596
1597 /* Supported parameter constellations:
1598 * - kwargs (conf={..}, logger=..)
1599 * - args and kwargs ({..}, logger=..)
1600 * - args ({..})
1601 * When both args and kwargs are present the kwargs take
1602 * precedence in case of duplicate keys.
1603 * All keys map to configuration properties.
1604 *
1605 * Copy configuration dict to avoid manipulating application config.
1606 */
1607 if (args && PyTuple_Size(args)) {
1608 if (!PyTuple_Check(args) ||
1609 PyTuple_Size(args) > 1) {
1610 PyErr_SetString(PyExc_TypeError,
1611 "expected tuple containing single dict");
1612 return NULL;
1613 } else if (PyTuple_Size(args) == 1 &&
1614 !PyDict_Check((confdict = PyTuple_GetItem(args, 0)))) {
1615 PyErr_SetString(PyExc_TypeError,
1616 "expected configuration dict");
1617 return NULL;
1618 }
1619 confdict = PyDict_Copy(confdict);
1620 }
1621
1622 if (!confdict) {
1623 if (!kwargs) {
1624 PyErr_SetString(PyExc_TypeError,
1625 "expected configuration dict");
1626 return NULL;
1627 }
1628
1629 confdict = PyDict_Copy(kwargs);
1630
1631 } else if (kwargs) {
1632 /* Update confdict with kwargs */
1633 PyDict_Update(confdict, kwargs);
1634 }
1635
1636 if (ktype == RD_KAFKA_CONSUMER &&
1637 !PyDict_GetItemString(confdict, "group.id")) {
1638
1639 PyErr_SetString(PyExc_ValueError,
1640 "Failed to create consumer: group.id must be set");
1641 Py_DECREF(confdict);
1642 return NULL;
1643 }
1644
1645 conf = rd_kafka_conf_new();
1646
1647 /*
1648 * Set debug contexts first to capture all events including plugin loading
1649 */
1650 if ((vo = PyDict_GetItemString(confdict, "debug")) &&
1651 !common_conf_set_special(confdict, conf, "debug", vo))
1652 goto outer_err;
1653
1654 /*
1655 * Plugins must be configured prior to handling any of their
1656 * configuration properties.
1657 * Dicts are unordered so we explicitly check for, set, and delete the
1658 * plugin paths here.
1659 * This ensures plugin configuration properties are handled in the
1660 * correct order.
1661 */
1662 if ((vo = PyDict_GetItemString(confdict, "plugin.library.paths"))) {
1663 /* Resolve plugin paths */
1664 PyObject *resolved;
1665
1666 resolved = resolve_plugins(vo);
1667 if (!resolved)
1668 goto outer_err;
1669
1670 if (!common_conf_set_special(confdict, conf,
1671 "plugin.library.paths",
1672 resolved)) {
1673 Py_DECREF(resolved);
1674 goto outer_err;
1675 }
1676 Py_DECREF(resolved);
1677 }
1678
1679 if ((vo = PyDict_GetItemString(confdict, "default.topic.config"))) {
1680 /* TODO: uncomment for 1.0 release
1681 PyErr_Warn(PyExc_DeprecationWarning,
1682 "default.topic.config has being deprecated, "
1683 "set default topic configuration values in the global dict");
1684 */
1685 if (PyDict_Update(confdict, vo) == -1) {
1686 goto outer_err;
1687 }
1688 PyDict_DelItemString(confdict, "default.topic.config");
1689 }
1690
1691 /* Convert config dict to config key-value pairs. */
1692 while (PyDict_Next(confdict, &pos, &ko, &vo)) {
1693 PyObject *ks;
1694 PyObject *ks8 = NULL;
1695 PyObject *vs = NULL, *vs8 = NULL;
1696 const char *k;
1697 const char *v;
1698 char errstr[256];
1699 int r = 0;
1700
1701 if (!(ks = cfl_PyObject_Unistr(ko))) {
1702 PyErr_SetString(PyExc_TypeError,
1703 "expected configuration property name "
1704 "as type unicode string");
1705 goto inner_err;
1706 }
1707
1708 k = cfl_PyUnistr_AsUTF8(ks, &ks8);
1709 if (!strcmp(k, "error_cb")) {
1710 if (!PyCallable_Check(vo)) {
1711 PyErr_SetString(PyExc_TypeError,
1712 "expected error_cb property "
1713 "as a callable function");
1714 goto inner_err;
1715 }
1716 if (h->error_cb) {
1717 Py_DECREF(h->error_cb);
1718 h->error_cb = NULL;
1719 }
1720 if (vo != Py_None) {
1721 h->error_cb = vo;
1722 Py_INCREF(h->error_cb);
1723 }
1724 Py_XDECREF(ks8);
1725 Py_DECREF(ks);
1726 continue;
1727 } else if (!strcmp(k, "throttle_cb")) {
1728 if (!PyCallable_Check(vo)) {
1729 PyErr_SetString(PyExc_ValueError,
1730 "expected throttle_cb property "
1731 "as a callable function");
1732 goto inner_err;
1733 }
1734 if (h->throttle_cb) {
1735 Py_DECREF(h->throttle_cb);
1736 h->throttle_cb = NULL;
1737 }
1738 if (vo != Py_None) {
1739 h->throttle_cb = vo;
1740 Py_INCREF(h->throttle_cb);
1741 }
1742 Py_XDECREF(ks8);
1743 Py_DECREF(ks);
1744 continue;
1745 } else if (!strcmp(k, "stats_cb")) {
1746 if (!PyCallable_Check(vo)) {
1747 PyErr_SetString(PyExc_TypeError,
1748 "expected stats_cb property "
1749 "as a callable function");
1750 goto inner_err;
1751 }
1752
1753 if (h->stats_cb) {
1754 Py_DECREF(h->stats_cb);
1755 h->stats_cb = NULL;
1756 }
1757 if (vo != Py_None) {
1758 h->stats_cb = vo;
1759 Py_INCREF(h->stats_cb);
1760 }
1761 Py_XDECREF(ks8);
1762 Py_DECREF(ks);
1763 continue;
1764 } else if (!strcmp(k, "logger")) {
1765 if (h->logger) {
1766 Py_DECREF(h->logger);
1767 h->logger = NULL;
1768 }
1769
1770 if (vo != Py_None) {
1771 h->logger = vo;
1772 Py_INCREF(h->logger);
1773 }
1774 Py_XDECREF(ks8);
1775 Py_DECREF(ks);
1776 continue;
1777 }
1778
1779 /* Special handling for certain config keys. */
1780 if (ktype == RD_KAFKA_PRODUCER)
1781 r = producer_conf_set_special(h, conf, k, vo);
1782 else if (ktype == RD_KAFKA_CONSUMER)
1783 r = consumer_conf_set_special(h, conf, k, vo);
1784 if (r == -1) {
1785 /* Error */
1786 goto inner_err;
1787 } else if (r == 1) {
1788 /* Handled */
1789 continue;
1790 }
1791
1792
1793 /*
1794 * Pass configuration property through to librdkafka.
1795 */
1796 if (vo == Py_None) {
1797 v = NULL;
1798 } else {
1799 if (!(vs = cfl_PyObject_Unistr(vo))) {
1800 PyErr_SetString(PyExc_TypeError,
1801 "expected configuration "
1802 "property value as type "
1803 "unicode string");
1804 goto inner_err;
1805 }
1806 v = cfl_PyUnistr_AsUTF8(vs, &vs8);
1807 }
1808
1809 if (rd_kafka_conf_set(conf, k, v, errstr, sizeof(errstr)) !=
1810 RD_KAFKA_CONF_OK) {
1811 cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
1812 "%s", errstr);
1813 goto inner_err;
1814 }
1815
1816 Py_XDECREF(vs8);
1817 Py_XDECREF(vs);
1818 Py_XDECREF(ks8);
1819 Py_DECREF(ks);
1820 continue;
1821
1822 inner_err:
1823 Py_XDECREF(vs8);
1824 Py_XDECREF(vs);
1825 Py_XDECREF(ks8);
1826 Py_XDECREF(ks);
1827 goto outer_err;
1828 }
1829
1830 Py_DECREF(confdict);
1831
1832 rd_kafka_conf_set_error_cb(conf, error_cb);
1833
1834 if (h->throttle_cb)
1835 rd_kafka_conf_set_throttle_cb(conf, throttle_cb);
1836
1837 if (h->stats_cb)
1838 rd_kafka_conf_set_stats_cb(conf, stats_cb);
1839
1840 if (h->logger) {
1841 /* Write logs to log queue (which is forwarded
1842 * to the polled queue in the Producer/Consumer constructors) */
1843 rd_kafka_conf_set(conf, "log.queue", "true", NULL, 0);
1844 rd_kafka_conf_set_log_cb(conf, log_cb);
1845 }
1846
1847 rd_kafka_conf_set_opaque(conf, h);
1848
1849 #ifdef WITH_PY_TSS
1850 if (PyThread_tss_create(&h->tlskey)) {
1851 PyErr_SetString(PyExc_RuntimeError,
1852 "Failed to initialize thread local storage");
1853 rd_kafka_conf_destroy(conf);
1854 return NULL;
1855 }
1856 #else
1857 h->tlskey = PyThread_create_key();
1858 #endif
1859
1860 h->initiated = 1;
1861
1862 return conf;
1863
1864 outer_err:
1865 Py_DECREF(confdict);
1866 rd_kafka_conf_destroy(conf);
1867
1868 return NULL;
1869 }
1870
1871
1872
1873
1874 /**
1875 * @brief Initialiase a CallState and unlock the GIL prior to a
1876 * possibly blocking external call.
1877 */
CallState_begin(Handle * h,CallState * cs)1878 void CallState_begin (Handle *h, CallState *cs) {
1879 cs->thread_state = PyEval_SaveThread();
1880 assert(cs->thread_state != NULL);
1881 cs->crashed = 0;
1882 #ifdef WITH_PY_TSS
1883 PyThread_tss_set(&h->tlskey, cs);
1884 #else
1885 PyThread_set_key_value(h->tlskey, cs);
1886 #endif
1887 }
1888
1889 /**
1890 * @brief Relock the GIL after external call is done.
1891 * @returns 0 if a Python signal was raised or a callback crashed, else 1.
1892 */
CallState_end(Handle * h,CallState * cs)1893 int CallState_end (Handle *h, CallState *cs) {
1894 #ifdef WITH_PY_TSS
1895 PyThread_tss_set(&h->tlskey, NULL);
1896 #else
1897 PyThread_delete_key_value(h->tlskey);
1898 #endif
1899
1900 PyEval_RestoreThread(cs->thread_state);
1901
1902 if (PyErr_CheckSignals() == -1 || cs->crashed)
1903 return 0;
1904
1905 return 1;
1906 }
1907
1908
1909 /**
1910 * @brief Get the current thread's CallState and re-locks the GIL.
1911 */
CallState_get(Handle * h)1912 CallState *CallState_get (Handle *h) {
1913 CallState *cs;
1914 #ifdef WITH_PY_TSS
1915 cs = PyThread_tss_get(&h->tlskey);
1916 #else
1917 cs = PyThread_get_key_value(h->tlskey);
1918 #endif
1919 assert(cs != NULL);
1920 assert(cs->thread_state != NULL);
1921 PyEval_RestoreThread(cs->thread_state);
1922 cs->thread_state = NULL;
1923 return cs;
1924 }
1925
1926 /**
1927 * @brief Un-locks the GIL to resume blocking external call.
1928 */
CallState_resume(CallState * cs)1929 void CallState_resume (CallState *cs) {
1930 assert(cs->thread_state == NULL);
1931 cs->thread_state = PyEval_SaveThread();
1932 }
1933
1934 /**
1935 * @brief Indicate that call crashed.
1936 */
CallState_crash(CallState * cs)1937 void CallState_crash (CallState *cs) {
1938 cs->crashed++;
1939 }
1940
1941
1942
1943 /**
1944 * @brief Find class/type/object \p typename in \p modulename
1945 *
1946 * @returns a new reference to the object.
1947 *
1948 * @raises a TypeError exception if the type is not found.
1949 */
1950
cfl_PyObject_lookup(const char * modulename,const char * typename)1951 PyObject *cfl_PyObject_lookup (const char *modulename, const char *typename) {
1952 PyObject *module = PyImport_ImportModule(modulename);
1953 PyObject *obj;
1954
1955 if (!modulename) {
1956 PyErr_Format(PyExc_TypeError,
1957 "Module %s not found when looking up %s.%s",
1958 modulename, modulename, typename);
1959 return NULL;
1960 }
1961
1962 obj = PyObject_GetAttrString(module, typename);
1963 if (!obj) {
1964 Py_DECREF(module);
1965 PyErr_Format(PyExc_TypeError,
1966 "No such class/type/object: %s.%s",
1967 modulename, typename);
1968 return NULL;
1969 }
1970
1971 return obj;
1972 }
1973
1974
cfl_PyDict_SetString(PyObject * dict,const char * name,const char * val)1975 void cfl_PyDict_SetString (PyObject *dict, const char *name, const char *val) {
1976 PyObject *vo = cfl_PyUnistr(_FromString(val));
1977 PyDict_SetItemString(dict, name, vo);
1978 Py_DECREF(vo);
1979 }
1980
cfl_PyDict_SetInt(PyObject * dict,const char * name,int val)1981 void cfl_PyDict_SetInt (PyObject *dict, const char *name, int val) {
1982 PyObject *vo = cfl_PyInt_FromInt(val);
1983 PyDict_SetItemString(dict, name, vo);
1984 Py_DECREF(vo);
1985 }
1986
1987
cfl_PyObject_SetString(PyObject * o,const char * name,const char * val)1988 int cfl_PyObject_SetString (PyObject *o, const char *name, const char *val) {
1989 PyObject *vo = cfl_PyUnistr(_FromString(val));
1990 int r = PyObject_SetAttrString(o, name, vo);
1991 Py_DECREF(vo);
1992 return r;
1993 }
1994
cfl_PyObject_SetInt(PyObject * o,const char * name,int val)1995 int cfl_PyObject_SetInt (PyObject *o, const char *name, int val) {
1996 PyObject *vo = cfl_PyInt_FromInt(val);
1997 int r = PyObject_SetAttrString(o, name, vo);
1998 Py_DECREF(vo);
1999 return r;
2000 }
2001
2002
2003 /**
2004 * @brief Get attribute \p attr_name from \p object and verify it is
2005 * of type \p py_type.
2006 *
2007 * @param py_type the value type of \p attr_name must match \p py_type, unless
2008 * \p py_type is NULL.
2009 *
2010 * @returns 1 if \p valp was updated with the object (new reference) or NULL
2011 * if not matched and not required, or
2012 * 0 if an exception was raised.
2013 */
cfl_PyObject_GetAttr(PyObject * object,const char * attr_name,PyObject ** valp,const PyTypeObject * py_type,int required)2014 int cfl_PyObject_GetAttr (PyObject *object, const char *attr_name,
2015 PyObject **valp, const PyTypeObject *py_type,
2016 int required) {
2017 PyObject *o;
2018
2019 o = PyObject_GetAttrString(object, attr_name);
2020 if (!o) {
2021 if (!required) {
2022 *valp = NULL;
2023 return 1;
2024 }
2025
2026 PyErr_Format(PyExc_TypeError,
2027 "Required attribute .%s missing", attr_name);
2028 return 0;
2029 }
2030
2031 if (py_type && Py_TYPE(o) != py_type) {
2032 Py_DECREF(o);
2033 PyErr_Format(PyExc_TypeError,
2034 "Expected .%s to be %s type, not %s",
2035 attr_name, py_type->tp_name,
2036 ((PyTypeObject *)PyObject_Type(o))->tp_name);
2037 return 0;
2038 }
2039
2040 *valp = o;
2041
2042 return 1;
2043 }
2044
2045 /**
2046 * @brief Get attribute \p attr_name from \p object and make sure it is
2047 * an integer type.
2048 *
2049 * @returns 1 if \p valp was updated with either the object value, or \p defval.
2050 * 0 if an exception was raised.
2051 */
cfl_PyObject_GetInt(PyObject * object,const char * attr_name,int * valp,int defval,int required)2052 int cfl_PyObject_GetInt (PyObject *object, const char *attr_name, int *valp,
2053 int defval, int required) {
2054 PyObject *o;
2055
2056 if (!cfl_PyObject_GetAttr(object, attr_name, &o,
2057 #ifdef PY3
2058 &PyLong_Type,
2059 #else
2060 &PyInt_Type,
2061 #endif
2062 required))
2063 return 0;
2064
2065 if (!o) {
2066 *valp = defval;
2067 return 1;
2068 }
2069
2070 *valp = cfl_PyInt_AsInt(o);
2071 Py_DECREF(o);
2072
2073 return 1;
2074 }
2075
2076
2077 /**
2078 * @brief Checks that \p object is a bool (or boolable) and sets
2079 * \p *valp according to the object.
2080 *
2081 * @returns 1 if \p valp was set, or 0 if \p object is not a boolable object.
2082 * An exception is raised in the error case.
2083 */
cfl_PyBool_get(PyObject * object,const char * name,int * valp)2084 int cfl_PyBool_get (PyObject *object, const char *name, int *valp) {
2085 if (!PyBool_Check(object)) {
2086 PyErr_Format(PyExc_TypeError,
2087 "Expected %s to be bool type, not %s",
2088 name,
2089 ((PyTypeObject *)PyObject_Type(object))->tp_name);
2090 return 0;
2091 }
2092
2093 *valp = object == Py_True;
2094
2095 return 1;
2096 }
2097
2098
2099 /**
2100 * @brief Get attribute \p attr_name from \p object and make sure it is
2101 * a string type.
2102 *
2103 * @returns 1 if \p valp was updated with a newly allocated copy of either the
2104 * object value (UTF8), or \p defval.
2105 * 0 if an exception was raised.
2106 */
cfl_PyObject_GetString(PyObject * object,const char * attr_name,char ** valp,const char * defval,int required)2107 int cfl_PyObject_GetString (PyObject *object, const char *attr_name,
2108 char **valp, const char *defval, int required) {
2109 PyObject *o, *uo, *uop;
2110
2111 if (!cfl_PyObject_GetAttr(object, attr_name, &o,
2112 #ifdef PY3
2113 &PyUnicode_Type,
2114 #else
2115 /* Python 2: support both str and unicode
2116 * let PyObject_Unistr() do the
2117 * proper conversion below. */
2118 NULL,
2119 #endif
2120 required))
2121 return 0;
2122
2123 if (!o) {
2124 *valp = defval ? strdup(defval) : NULL;
2125 return 1;
2126 }
2127
2128 if (!(uo = cfl_PyObject_Unistr(o))) {
2129 Py_DECREF(o);
2130 PyErr_Format(PyExc_TypeError,
2131 "Expected .%s to be a unicode string type, not %s",
2132 attr_name,
2133 ((PyTypeObject *)PyObject_Type(o))->tp_name);
2134 return 0;
2135 }
2136 Py_DECREF(o);
2137
2138 *valp = (char *)cfl_PyUnistr_AsUTF8(uo, &uop);
2139 if (!*valp) {
2140 Py_DECREF(uo);
2141 Py_XDECREF(uop);
2142 return 0; /* exception raised by AsUTF8 */
2143 }
2144
2145 *valp = strdup(*valp);
2146 Py_DECREF(uo);
2147 Py_XDECREF(uop);
2148
2149 return 1;
2150 }
2151
2152
2153
2154 /**
2155 * @returns a Python list of longs based on the input int32_t array
2156 */
cfl_int32_array_to_py_list(const int32_t * arr,size_t cnt)2157 PyObject *cfl_int32_array_to_py_list (const int32_t *arr, size_t cnt) {
2158 PyObject *list;
2159 size_t i;
2160
2161 list = PyList_New((Py_ssize_t)cnt);
2162 if (!list)
2163 return NULL;
2164
2165 for (i = 0 ; i < cnt ; i++)
2166 PyList_SET_ITEM(list, (Py_ssize_t)i,
2167 cfl_PyInt_FromInt(arr[i]));
2168
2169 return list;
2170 }
2171
2172
2173 /****************************************************************************
2174 *
2175 *
2176 * Base
2177 *
2178 *
2179 *
2180 *
2181 ****************************************************************************/
2182
2183
libversion(PyObject * self,PyObject * args)2184 static PyObject *libversion (PyObject *self, PyObject *args) {
2185 return Py_BuildValue("si",
2186 rd_kafka_version_str(),
2187 rd_kafka_version());
2188 }
2189
2190 /*
2191 * Version hex representation
2192 * 0xMMmmRRPP
2193 * MM=major, mm=minor, RR=revision, PP=patchlevel (not used)
2194 */
version(PyObject * self,PyObject * args)2195 static PyObject *version (PyObject *self, PyObject *args) {
2196 return Py_BuildValue("si", "1.2.0", 0x01020000);
2197 }
2198
2199 static PyMethodDef cimpl_methods[] = {
2200 {"libversion", libversion, METH_NOARGS,
2201 " Retrieve librdkafka version string and integer\n"
2202 "\n"
2203 " :returns: (version_string, version_int) tuple\n"
2204 " :rtype: tuple(str,int)\n"
2205 "\n"
2206 },
2207 {"version", version, METH_NOARGS,
2208 " Retrieve module version string and integer\n"
2209 "\n"
2210 " :returns: (version_string, version_int) tuple\n"
2211 " :rtype: tuple(str,int)\n"
2212 "\n"
2213 },
2214 { NULL }
2215 };
2216
2217
2218 /**
2219 * @brief Add librdkafka error enums to KafkaError's type dict.
2220 * @returns an updated doc string containing all error constants.
2221 */
KafkaError_add_errs(PyObject * dict,const char * origdoc)2222 static char *KafkaError_add_errs (PyObject *dict, const char *origdoc) {
2223 const struct rd_kafka_err_desc *descs;
2224 size_t cnt;
2225 size_t i;
2226 char *doc;
2227 size_t dof = 0, dsize;
2228 /* RST grid table column widths */
2229 #define _COL1_W 50
2230 #define _COL2_W 100 /* Must be larger than COL1 */
2231 char dash[_COL2_W], eq[_COL2_W];
2232
2233 rd_kafka_get_err_descs(&descs, &cnt);
2234
2235 memset(dash, '-', sizeof(dash));
2236 memset(eq, '=', sizeof(eq));
2237
2238 /* Setup output doc buffer. */
2239 dof = strlen(origdoc);
2240 dsize = dof + 500 + (cnt * 200);
2241 doc = malloc(dsize);
2242 memcpy(doc, origdoc, dof+1);
2243
2244 #define _PRINT(...) do { \
2245 char tmpdoc[512]; \
2246 size_t _len; \
2247 _len = snprintf(tmpdoc, sizeof(tmpdoc), __VA_ARGS__); \
2248 if (_len > sizeof(tmpdoc)) _len = sizeof(tmpdoc)-1; \
2249 if (dof + _len >= dsize) { \
2250 dsize += 2; \
2251 doc = realloc(doc, dsize); \
2252 } \
2253 memcpy(doc+dof, tmpdoc, _len+1); \
2254 dof += _len; \
2255 } while (0)
2256
2257 /* Error constant table header (RST grid table) */
2258 _PRINT("Error and event constants:\n\n"
2259 "+-%.*s-+-%.*s-+\n"
2260 "| %-*.*s | %-*.*s |\n"
2261 "+=%.*s=+=%.*s=+\n",
2262 _COL1_W, dash, _COL2_W, dash,
2263 _COL1_W, _COL1_W, "Constant", _COL2_W, _COL2_W, "Description",
2264 _COL1_W, eq, _COL2_W, eq);
2265
2266 for (i = 0 ; i < cnt ; i++) {
2267 PyObject *code;
2268
2269 if (!descs[i].desc)
2270 continue;
2271
2272 code = cfl_PyInt_FromInt(descs[i].code);
2273
2274 PyDict_SetItemString(dict, descs[i].name, code);
2275
2276 Py_DECREF(code);
2277
2278 _PRINT("| %-*.*s | %-*.*s |\n"
2279 "+-%.*s-+-%.*s-+\n",
2280 _COL1_W, _COL1_W, descs[i].name,
2281 _COL2_W, _COL2_W, descs[i].desc,
2282 _COL1_W, dash, _COL2_W, dash);
2283 }
2284
2285 _PRINT("\n");
2286
2287 return doc; // FIXME: leak
2288 }
2289
2290
2291 #ifdef PY3
2292 static struct PyModuleDef cimpl_moduledef = {
2293 PyModuleDef_HEAD_INIT,
2294 "cimpl", /* m_name */
2295 "Confluent's Python client for Apache Kafka (C implementation)", /* m_doc */
2296 -1, /* m_size */
2297 cimpl_methods, /* m_methods */
2298 };
2299 #endif
2300
2301
_init_cimpl(void)2302 static PyObject *_init_cimpl (void) {
2303 PyObject *m;
2304
2305 PyEval_InitThreads();
2306
2307 if (PyType_Ready(&KafkaErrorType) < 0)
2308 return NULL;
2309 if (PyType_Ready(&MessageType) < 0)
2310 return NULL;
2311 if (PyType_Ready(&TopicPartitionType) < 0)
2312 return NULL;
2313 if (PyType_Ready(&ProducerType) < 0)
2314 return NULL;
2315 if (PyType_Ready(&ConsumerType) < 0)
2316 return NULL;
2317 if (PyType_Ready(&AdminType) < 0)
2318 return NULL;
2319 if (AdminTypes_Ready() < 0)
2320 return NULL;
2321
2322 #ifdef PY3
2323 m = PyModule_Create(&cimpl_moduledef);
2324 #else
2325 m = Py_InitModule3("cimpl", cimpl_methods,
2326 "Confluent's Python client for Apache Kafka (C implementation)");
2327 #endif
2328 if (!m)
2329 return NULL;
2330
2331 Py_INCREF(&KafkaErrorType);
2332 KafkaErrorType.tp_doc =
2333 KafkaError_add_errs(KafkaErrorType.tp_dict,
2334 KafkaErrorType.tp_doc);
2335 PyModule_AddObject(m, "KafkaError", (PyObject *)&KafkaErrorType);
2336
2337 Py_INCREF(&MessageType);
2338 PyModule_AddObject(m, "Message", (PyObject *)&MessageType);
2339
2340 Py_INCREF(&TopicPartitionType);
2341 PyModule_AddObject(m, "TopicPartition",
2342 (PyObject *)&TopicPartitionType);
2343
2344 Py_INCREF(&ProducerType);
2345 PyModule_AddObject(m, "Producer", (PyObject *)&ProducerType);
2346
2347 Py_INCREF(&ConsumerType);
2348 PyModule_AddObject(m, "Consumer", (PyObject *)&ConsumerType);
2349
2350 Py_INCREF(&AdminType);
2351 PyModule_AddObject(m, "_AdminClientImpl", (PyObject *)&AdminType);
2352
2353 AdminTypes_AddObjects(m);
2354
2355 #if PY_VERSION_HEX >= 0x02070000
2356 KafkaException = PyErr_NewExceptionWithDoc(
2357 "cimpl.KafkaException",
2358 "Kafka exception that wraps the :py:class:`KafkaError` "
2359 "class.\n"
2360 "\n"
2361 "Use ``exception.args[0]`` to extract the "
2362 ":py:class:`KafkaError` object\n"
2363 "\n",
2364 NULL, NULL);
2365 #else
2366 KafkaException = PyErr_NewException("cimpl.KafkaException", NULL, NULL);
2367 #endif
2368 Py_INCREF(KafkaException);
2369 PyModule_AddObject(m, "KafkaException", KafkaException);
2370
2371 PyModule_AddIntConstant(m, "TIMESTAMP_NOT_AVAILABLE", RD_KAFKA_TIMESTAMP_NOT_AVAILABLE);
2372 PyModule_AddIntConstant(m, "TIMESTAMP_CREATE_TIME", RD_KAFKA_TIMESTAMP_CREATE_TIME);
2373 PyModule_AddIntConstant(m, "TIMESTAMP_LOG_APPEND_TIME", RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME);
2374
2375 PyModule_AddIntConstant(m, "OFFSET_BEGINNING", RD_KAFKA_OFFSET_BEGINNING);
2376 PyModule_AddIntConstant(m, "OFFSET_END", RD_KAFKA_OFFSET_END);
2377 PyModule_AddIntConstant(m, "OFFSET_STORED", RD_KAFKA_OFFSET_STORED);
2378 PyModule_AddIntConstant(m, "OFFSET_INVALID", RD_KAFKA_OFFSET_INVALID);
2379
2380 return m;
2381 }
2382
2383
2384 #ifdef PY3
PyInit_cimpl(void)2385 PyMODINIT_FUNC PyInit_cimpl (void) {
2386 return _init_cimpl();
2387 }
2388 #else
initcimpl(void)2389 PyMODINIT_FUNC initcimpl (void) {
2390 _init_cimpl();
2391 }
2392 #endif
2393