1 /**
2  * Copyright 2016 Confluent Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "confluent_kafka.h"
18 
19 #include <stdarg.h>
20 
21 
22 /**
23  * @brief KNOWN ISSUES
24  *
25  *  - Partitioners will cause a dead-lock with librdkafka, because:
26  *     GIL + topic lock in topic_new  is different lock order than
27  *     topic lock in msg_partitioner + GIL.
28  *     This needs to be sorted out in librdkafka, preferably making the
29  *     partitioner run without any locks taken.
30  *     Until this is fixed the partitioner is ignored and librdkafka's
31  *     default will be used.
32  *  - KafkaError type .tp_doc allocation is lost on exit.
33  *
34  */
35 
36 
37 PyObject *KafkaException;
38 
39 
40 /****************************************************************************
41  *
42  *
43  * KafkaError
44  *
45  *
46  * FIXME: Pre-create simple instances for each error code, only instantiate
47  *        a new object if a rich error string is provided.
48  *
49  ****************************************************************************/
50 typedef struct {
51 #ifdef PY3
52         PyException_HEAD
53 #else
54         PyObject_HEAD
55         /* Standard fields of PyBaseExceptionObject which we inherit from. */
56         PyObject *dict;
57         PyObject *args;
58         PyObject *message;
59 #endif
60 
61 	rd_kafka_resp_err_t code;  /* Error code */
62 	char *str;   /* Human readable representation of error, if one
63 		      * was provided by librdkafka.
64 		      * Else falls back on err2str(). */
65         int   fatal; /**< Set to true if a fatal error. */
66 } KafkaError;
67 
68 
69 static void cfl_PyErr_Fatal (rd_kafka_resp_err_t err, const char *reason);
70 
KafkaError_code(KafkaError * self,PyObject * ignore)71 static PyObject *KafkaError_code (KafkaError *self, PyObject *ignore) {
72 	return cfl_PyInt_FromInt(self->code);
73 }
74 
KafkaError_str(KafkaError * self,PyObject * ignore)75 static PyObject *KafkaError_str (KafkaError *self, PyObject *ignore) {
76 	if (self->str)
77 		return cfl_PyUnistr(_FromString(self->str));
78 	else
79 		return cfl_PyUnistr(_FromString(rd_kafka_err2str(self->code)));
80 }
81 
KafkaError_name(KafkaError * self,PyObject * ignore)82 static PyObject *KafkaError_name (KafkaError *self, PyObject *ignore) {
83 	/* FIXME: Pre-create name objects */
84 	return cfl_PyUnistr(_FromString(rd_kafka_err2name(self->code)));
85 }
86 
KafkaError_fatal(KafkaError * self,PyObject * ignore)87 static PyObject *KafkaError_fatal (KafkaError *self, PyObject *ignore) {
88         PyObject *ret = self->fatal ? Py_True : Py_False;
89         Py_INCREF(ret);
90         return ret;
91 }
92 
93 
KafkaError_test_raise_fatal(KafkaError * null,PyObject * ignore)94 static PyObject *KafkaError_test_raise_fatal (KafkaError *null,
95                                               PyObject *ignore) {
96         cfl_PyErr_Fatal(RD_KAFKA_RESP_ERR__INVALID_ARG,
97                         "This is a fatal exception for testing purposes");
98         return NULL;
99 }
100 
101 
102 static PyMethodDef KafkaError_methods[] = {
103 	{ "code", (PyCFunction)KafkaError_code, METH_NOARGS,
104 	  "  Returns the error/event code for comparison to"
105 	  "KafkaError.<ERR_CONSTANTS>.\n"
106 	  "\n"
107 	  "  :returns: error/event code\n"
108 	  "  :rtype: int\n"
109 	  "\n"
110 	},
111 	{ "str", (PyCFunction)KafkaError_str, METH_NOARGS,
112 	  "  Returns the human-readable error/event string.\n"
113 	  "\n"
114 	  "  :returns: error/event message string\n"
115 	  "  :rtype: str\n"
116 	  "\n"
117 	},
118 	{ "name", (PyCFunction)KafkaError_name, METH_NOARGS,
119 	  "  Returns the enum name for error/event.\n"
120 	  "\n"
121 	  "  :returns: error/event enum name string\n"
122 	  "  :rtype: str\n"
123 	  "\n"
124 	},
125         { "fatal", (PyCFunction)KafkaError_fatal, METH_NOARGS,
126           "  :returns: True if this a fatal error, else False.\n"
127           "  :rtype: bool\n"
128           "\n"
129         },
130         { "_test_raise_fatal", (PyCFunction)KafkaError_test_raise_fatal,
131           METH_NOARGS|METH_STATIC
132         },
133 
134 	{ NULL }
135 };
136 
137 
KafkaError_clear(PyObject * self0)138 static void KafkaError_clear (PyObject *self0) {
139         KafkaError *self = (KafkaError *)self0;
140         if (self->str) {
141                 free(self->str);
142                 self->str = NULL;
143         }
144 }
145 
KafkaError_dealloc(PyObject * self0)146 static void KafkaError_dealloc (PyObject *self0) {
147         KafkaError *self = (KafkaError *)self0;
148         KafkaError_clear(self0);;
149         PyObject_GC_UnTrack(self0);
150         Py_TYPE(self)->tp_free(self0);
151 }
152 
153 
154 
KafkaError_traverse(KafkaError * self,visitproc visit,void * arg)155 static int KafkaError_traverse (KafkaError *self,
156                                 visitproc visit, void *arg) {
157         return 0;
158 }
159 
KafkaError_str0(KafkaError * self)160 static PyObject *KafkaError_str0 (KafkaError *self) {
161 	return cfl_PyUnistr(_FromFormat("KafkaError{%scode=%s,val=%d,str=\"%s\"}",
162                                         self->fatal?"FATAL,":"",
163 					 rd_kafka_err2name(self->code),
164 					 self->code,
165 					 self->str ? self->str :
166 					 rd_kafka_err2str(self->code)));
167 }
168 
KafkaError_hash(KafkaError * self)169 static long KafkaError_hash (KafkaError *self) {
170 	return self->code;
171 }
172 
173 static PyTypeObject KafkaErrorType;
174 
175 
176 
KafkaError_richcompare(KafkaError * self,PyObject * o2,int op)177 static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2,
178 					 int op) {
179 	int code2;
180 	int r;
181 	PyObject *result;
182 
183 	if (Py_TYPE(o2) == &KafkaErrorType)
184 		code2 = ((KafkaError *)o2)->code;
185 	else
186 		code2 = cfl_PyInt_AsInt(o2);
187 
188 	switch (op)
189 	{
190 	case Py_LT:
191 		r = self->code < code2;
192 		break;
193 	case Py_LE:
194 		r = self->code <= code2;
195 		break;
196 	case Py_EQ:
197 		r = self->code == code2;
198 		break;
199 	case Py_NE:
200 		r = self->code != code2;
201 		break;
202 	case Py_GT:
203 		r = self->code > code2;
204 		break;
205 	case Py_GE:
206 		r = self->code >= code2;
207 		break;
208 	default:
209 		r = 0;
210 		break;
211 	}
212 
213 	result = r ? Py_True : Py_False;
214 	Py_INCREF(result);
215 	return result;
216 }
217 
218 
219 static PyTypeObject KafkaErrorType = {
220 	PyVarObject_HEAD_INIT(NULL, 0)
221 	"cimpl.KafkaError",      /*tp_name*/
222 	sizeof(KafkaError),    /*tp_basicsize*/
223 	0,                         /*tp_itemsize*/
224 	(destructor)KafkaError_dealloc, /*tp_dealloc*/
225 	0,                         /*tp_print*/
226 	0,                         /*tp_getattr*/
227 	0,                         /*tp_setattr*/
228 	0,                         /*tp_compare*/
229 	(reprfunc)KafkaError_str0, /*tp_repr*/
230 	0,                         /*tp_as_number*/
231 	0,                         /*tp_as_sequence*/
232 	0,                         /*tp_as_mapping*/
233 	(hashfunc)KafkaError_hash, /*tp_hash */
234 	0,                         /*tp_call*/
235 	0,                         /*tp_str*/
236 	PyObject_GenericGetAttr,   /*tp_getattro*/
237 	0,                         /*tp_setattro*/
238 	0,                         /*tp_as_buffer*/
239 	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
240         Py_TPFLAGS_BASE_EXC_SUBCLASS | Py_TPFLAGS_HAVE_GC, /*tp_flags*/
241 	"Kafka error and event object\n"
242 	"\n"
243 	"  The KafkaError class serves multiple purposes:\n"
244 	"\n"
245 	"  - Propagation of errors\n"
246 	"  - Propagation of events\n"
247 	"  - Exceptions\n"
248 	"\n"
249 	"  This class is not user-instantiable.\n"
250 	"\n", /*tp_doc*/
251         (traverseproc)KafkaError_traverse, /* tp_traverse */
252 	(inquiry)KafkaError_clear, /* tp_clear */
253 	(richcmpfunc)KafkaError_richcompare, /* tp_richcompare */
254 	0,		           /* tp_weaklistoffset */
255 	0,		           /* tp_iter */
256 	0,		           /* tp_iternext */
257 	KafkaError_methods,    /* tp_methods */
258 	0,                         /* tp_members */
259 	0,                         /* tp_getset */
260 	0,                         /* tp_base */
261 	0,                         /* tp_dict */
262 	0,                         /* tp_descr_get */
263 	0,                         /* tp_descr_set */
264 	0,                         /* tp_dictoffset */
265 	0,                         /* tp_init */
266 	0                          /* tp_alloc */
267 };
268 
269 
270 /**
271  * @brief Initialize a KafkaError object.
272  */
KafkaError_init(KafkaError * self,rd_kafka_resp_err_t code,const char * str)273 static void KafkaError_init (KafkaError *self,
274 			     rd_kafka_resp_err_t code, const char *str) {
275 	self->code = code;
276         self->fatal = 0;
277 	if (str)
278 		self->str = strdup(str);
279 	else
280 		self->str = NULL;
281 }
282 
283 /**
284  * @brief Internal factory to create KafkaError object.
285  */
KafkaError_new0(rd_kafka_resp_err_t err,const char * fmt,...)286 PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...) {
287 
288 	KafkaError *self;
289 	va_list ap;
290 	char buf[512];
291 
292 	self = (KafkaError *)KafkaErrorType.
293 		tp_alloc(&KafkaErrorType, 0);
294 	if (!self)
295 		return NULL;
296 
297 	if (fmt) {
298 		va_start(ap, fmt);
299 		vsnprintf(buf, sizeof(buf), fmt, ap);
300 		va_end(ap);
301 	}
302 
303 	KafkaError_init(self, err, fmt ? buf : rd_kafka_err2str(err));
304 
305 	return (PyObject *)self;
306 }
307 
308 /**
309  * @brief Internal factory to create KafkaError object.
310  * @returns a new KafkaError object if \p err != 0, else a None object.
311  */
KafkaError_new_or_None(rd_kafka_resp_err_t err,const char * str)312  PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str) {
313 	if (!err)
314 		Py_RETURN_NONE;
315         if (str)
316                 return KafkaError_new0(err, "%s", str);
317         else
318                 return KafkaError_new0(err, NULL);
319 }
320 
321 
322 /**
323  * @brief Raise exception from fatal error.
324  */
cfl_PyErr_Fatal(rd_kafka_resp_err_t err,const char * reason)325 static void cfl_PyErr_Fatal (rd_kafka_resp_err_t err, const char *reason) {
326         PyObject *eo = KafkaError_new0(err, "%s", reason);
327         ((KafkaError *)eo)->fatal = 1;
328         PyErr_SetObject(KafkaException, eo);
329 }
330 
331 
332 
333 
334 
335 /****************************************************************************
336  *
337  *
338  * Message
339  *
340  *
341  *
342  *
343  ****************************************************************************/
344 
345 
346 /**
347  * @returns a Message's error object, if any, else None.
348  * @remark The error object refcount is increased by this function.
349  */
Message_error(Message * self,PyObject * ignore)350 PyObject *Message_error (Message *self, PyObject *ignore) {
351 	if (self->error) {
352 		Py_INCREF(self->error);
353 		return self->error;
354 	} else
355 		Py_RETURN_NONE;
356 }
357 
Message_value(Message * self,PyObject * ignore)358 static PyObject *Message_value (Message *self, PyObject *ignore) {
359 	if (self->value) {
360 		Py_INCREF(self->value);
361 		return self->value;
362 	} else
363 		Py_RETURN_NONE;
364 }
365 
366 
Message_key(Message * self,PyObject * ignore)367 static PyObject *Message_key (Message *self, PyObject *ignore) {
368 	if (self->key) {
369 		Py_INCREF(self->key);
370 		return self->key;
371 	} else
372 		Py_RETURN_NONE;
373 }
374 
Message_topic(Message * self,PyObject * ignore)375 static PyObject *Message_topic (Message *self, PyObject *ignore) {
376 	if (self->topic) {
377 		Py_INCREF(self->topic);
378 		return self->topic;
379 	} else
380 		Py_RETURN_NONE;
381 }
382 
Message_partition(Message * self,PyObject * ignore)383 static PyObject *Message_partition (Message *self, PyObject *ignore) {
384 	if (self->partition != RD_KAFKA_PARTITION_UA)
385 		return cfl_PyInt_FromInt(self->partition);
386 	else
387 		Py_RETURN_NONE;
388 }
389 
390 
Message_offset(Message * self,PyObject * ignore)391 static PyObject *Message_offset (Message *self, PyObject *ignore) {
392 	if (self->offset >= 0)
393 		return PyLong_FromLongLong(self->offset);
394 	else
395 		Py_RETURN_NONE;
396 }
397 
398 
Message_timestamp(Message * self,PyObject * ignore)399 static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
400 	return Py_BuildValue("iL",
401 			     self->tstype,
402 			     self->timestamp);
403 }
404 
Message_headers(Message * self,PyObject * ignore)405 static PyObject *Message_headers (Message *self, PyObject *ignore) {
406 #ifdef RD_KAFKA_V_HEADERS
407 	if (self->headers) {
408         Py_INCREF(self->headers);
409 		return self->headers;
410     } else if (self->c_headers) {
411         self->headers = c_headers_to_py(self->c_headers);
412         rd_kafka_headers_destroy(self->c_headers);
413         self->c_headers = NULL;
414         Py_INCREF(self->headers);
415         return self->headers;
416 	} else {
417 		Py_RETURN_NONE;
418     }
419 #else
420 		Py_RETURN_NONE;
421 #endif
422 }
423 
Message_set_headers(Message * self,PyObject * new_headers)424 static PyObject *Message_set_headers (Message *self, PyObject *new_headers) {
425    if (self->headers)
426         Py_DECREF(self->headers);
427    self->headers = new_headers;
428    Py_INCREF(self->headers);
429 
430    Py_RETURN_NONE;
431 }
432 
Message_set_value(Message * self,PyObject * new_val)433 static PyObject *Message_set_value (Message *self, PyObject *new_val) {
434    if (self->value)
435         Py_DECREF(self->value);
436    self->value = new_val;
437    Py_INCREF(self->value);
438 
439    Py_RETURN_NONE;
440 }
441 
Message_set_key(Message * self,PyObject * new_key)442 static PyObject *Message_set_key (Message *self, PyObject *new_key) {
443    if (self->key)
444         Py_DECREF(self->key);
445    self->key = new_key;
446    Py_INCREF(self->key);
447 
448    Py_RETURN_NONE;
449 }
450 
451 static PyMethodDef Message_methods[] = {
452 	{ "error", (PyCFunction)Message_error, METH_NOARGS,
453 	  "  The message object is also used to propagate errors and events, "
454 	  "an application must check error() to determine if the Message "
455 	  "is a proper message (error() returns None) or an error or event "
456 	  "(error() returns a KafkaError object)\n"
457 	  "\n"
458 	  "  :rtype: None or :py:class:`KafkaError`\n"
459 	  "\n"
460 	},
461 
462 	{ "value", (PyCFunction)Message_value, METH_NOARGS,
463 	  "  :returns: message value (payload) or None if not available.\n"
464 	  "  :rtype: str|bytes or None\n"
465 	  "\n"
466 	},
467 	{ "key", (PyCFunction)Message_key, METH_NOARGS,
468 	  "  :returns: message key or None if not available.\n"
469 	  "  :rtype: str|bytes or None\n"
470 	  "\n"
471 	},
472 	{ "topic", (PyCFunction)Message_topic, METH_NOARGS,
473 	  "  :returns: topic name or None if not available.\n"
474 	  "  :rtype: str or None\n"
475 	  "\n"
476 	},
477 	{ "partition", (PyCFunction)Message_partition, METH_NOARGS,
478 	  "  :returns: partition number or None if not available.\n"
479 	  "  :rtype: int or None\n"
480 	  "\n"
481 	},
482 	{ "offset", (PyCFunction)Message_offset, METH_NOARGS,
483 	  "  :returns: message offset or None if not available.\n"
484 	  "  :rtype: int or None\n"
485 	  "\n"
486 	},
487 	{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
488           "Retrieve timestamp type and timestamp from message.\n"
489           "The timestamp type is one of:\n"
490           " * :py:const:`TIMESTAMP_NOT_AVAILABLE`"
491           " - Timestamps not supported by broker\n"
492           " * :py:const:`TIMESTAMP_CREATE_TIME` "
493           " - Message creation time (or source / producer time)\n"
494           " * :py:const:`TIMESTAMP_LOG_APPEND_TIME` "
495           " - Broker receive time\n"
496           "\n"
497           "The returned timestamp should be ignored if the timestamp type is "
498           ":py:const:`TIMESTAMP_NOT_AVAILABLE`.\n"
499           "\n"
500           "  The timestamp is the number of milliseconds since the epoch (UTC).\n"
501           "\n"
502           "  Timestamps require broker version 0.10.0.0 or later and \n"
503           "  ``{'api.version.request': True}`` configured on the client.\n"
504           "\n"
505 	  "  :returns: tuple of message timestamp type, and timestamp.\n"
506 	  "  :rtype: (int, int)\n"
507 	  "\n"
508 	},
509 	{ "headers", (PyCFunction)Message_headers, METH_NOARGS,
510       "  Retrieve the headers set on a message. Each header is a key value"
511       "pair. Please note that header keys are ordered and can repeat.\n"
512       "\n"
513 	  "  :returns: list of two-tuples, one (key, value) pair for each header.\n"
514 	  "  :rtype: [(str, bytes),...] or None.\n"
515 	  "\n"
516 	},
517 	{ "set_headers", (PyCFunction)Message_set_headers, METH_O,
518 	  "  Set the field 'Message.headers' with new value.\n"
519           "\n"
520 	  "  :param object value: Message.headers.\n"
521 	  "  :returns: None.\n"
522 	  "  :rtype: None\n"
523 	  "\n"
524 	},
525 	{ "set_value", (PyCFunction)Message_set_value, METH_O,
526 	  "  Set the field 'Message.value' with new value.\n"
527           "\n"
528 	  "  :param object value: Message.value.\n"
529 	  "  :returns: None.\n"
530 	  "  :rtype: None\n"
531 	  "\n"
532 	},
533 	{ "set_key", (PyCFunction)Message_set_key, METH_O,
534 	  "  Set the field 'Message.key' with new value.\n"
535           "\n"
536 	  "  :param object value: Message.key.\n"
537 	  "  :returns: None.\n"
538 	  "  :rtype: None\n"
539 	  "\n"
540 	},
541 	{ NULL }
542 };
543 
Message_clear(Message * self)544 static int Message_clear (Message *self) {
545 	if (self->topic) {
546 		Py_DECREF(self->topic);
547 		self->topic = NULL;
548 	}
549 	if (self->value) {
550 		Py_DECREF(self->value);
551 		self->value = NULL;
552 	}
553 	if (self->key) {
554 		Py_DECREF(self->key);
555 		self->key = NULL;
556 	}
557 	if (self->error) {
558 		Py_DECREF(self->error);
559 		self->error = NULL;
560 	}
561 	if (self->headers) {
562 		Py_DECREF(self->headers);
563 		self->headers = NULL;
564 	}
565 #ifdef RD_KAFKA_V_HEADERS
566     if (self->c_headers){
567         rd_kafka_headers_destroy(self->c_headers);
568         self->c_headers = NULL;
569     }
570 #endif
571 	return 0;
572 }
573 
574 
Message_dealloc(Message * self)575 static void Message_dealloc (Message *self) {
576 	Message_clear(self);
577 	PyObject_GC_UnTrack(self);
578 	Py_TYPE(self)->tp_free((PyObject *)self);
579 }
580 
581 
Message_traverse(Message * self,visitproc visit,void * arg)582 static int Message_traverse (Message *self,
583 			     visitproc visit, void *arg) {
584 	if (self->topic)
585 		Py_VISIT(self->topic);
586 	if (self->value)
587 		Py_VISIT(self->value);
588 	if (self->key)
589 		Py_VISIT(self->key);
590 	if (self->error)
591 		Py_VISIT(self->error);
592 	if (self->headers)
593 		Py_VISIT(self->headers);
594 	return 0;
595 }
596 
Message__len__(Message * self)597 static Py_ssize_t Message__len__ (Message *self) {
598 	return self->value ? PyObject_Length(self->value) : 0;
599 }
600 
601 static PySequenceMethods Message_seq_methods = {
602 	(lenfunc)Message__len__ /* sq_length */
603 };
604 
605 PyTypeObject MessageType = {
606 	PyVarObject_HEAD_INIT(NULL, 0)
607 	"cimpl.Message",         /*tp_name*/
608 	sizeof(Message),       /*tp_basicsize*/
609 	0,                         /*tp_itemsize*/
610 	(destructor)Message_dealloc, /*tp_dealloc*/
611 	0,                         /*tp_print*/
612 	0,                         /*tp_getattr*/
613 	0,                         /*tp_setattr*/
614 	0,                         /*tp_compare*/
615 	0,                         /*tp_repr*/
616 	0,                         /*tp_as_number*/
617 	&Message_seq_methods,  /*tp_as_sequence*/
618 	0,                         /*tp_as_mapping*/
619 	0,                         /*tp_hash */
620 	0,                         /*tp_call*/
621 	0,                         /*tp_str*/
622 	PyObject_GenericGetAttr,                         /*tp_getattro*/
623 	0,                         /*tp_setattro*/
624 	0,                         /*tp_as_buffer*/
625 	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
626 	Py_TPFLAGS_HAVE_GC, /*tp_flags*/
627 	"The Message object represents either a single consumed or "
628 	"produced message, or an event (:py:func:`error()` is not None).\n"
629 	"\n"
630 	"An application must check with :py:func:`error()` to see if the "
631 	"object is a proper message (error() returns None) or an "
632 	"error/event.\n"
633         "\n"
634 	"This class is not user-instantiable.\n"
635 	"\n"
636 	".. py:function:: len()\n"
637 	"\n"
638 	"  :returns: Message value (payload) size in bytes\n"
639 	"  :rtype: int\n"
640 	"\n", /*tp_doc*/
641 	(traverseproc)Message_traverse,        /* tp_traverse */
642 	(inquiry)Message_clear,	           /* tp_clear */
643 	0,		           /* tp_richcompare */
644 	0,		           /* tp_weaklistoffset */
645 	0,		           /* tp_iter */
646 	0,		           /* tp_iternext */
647 	Message_methods,       /* tp_methods */
648 	0,                         /* tp_members */
649 	0,                         /* tp_getset */
650 	0,                         /* tp_base */
651 	0,                         /* tp_dict */
652 	0,                         /* tp_descr_get */
653 	0,                         /* tp_descr_set */
654 	0,                         /* tp_dictoffset */
655 	0,                         /* tp_init */
656 	0                          /* tp_alloc */
657 };
658 
659 /**
660  * @brief Internal factory to create Message object from message_t
661  */
Message_new0(const Handle * handle,const rd_kafka_message_t * rkm)662 PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm) {
663 	Message *self;
664 
665 	self = (Message *)MessageType.tp_alloc(&MessageType, 0);
666 	if (!self)
667 		return NULL;
668 
669         /* Only use message error string on Consumer, for Producers
670          * it will contain the original message payload. */
671         self->error = KafkaError_new_or_None(
672                 rkm->err,
673                 (rkm->err && handle->type != RD_KAFKA_PRODUCER) ?
674                 rd_kafka_message_errstr(rkm) : NULL);
675 
676 	if (rkm->rkt)
677 		self->topic = cfl_PyUnistr(
678 			_FromString(rd_kafka_topic_name(rkm->rkt)));
679 	if (rkm->payload)
680 		self->value = cfl_PyBin(_FromStringAndSize(rkm->payload,
681 							   rkm->len));
682 	if (rkm->key)
683 		self->key = cfl_PyBin(
684 			_FromStringAndSize(rkm->key, rkm->key_len));
685 
686 	self->partition = rkm->partition;
687 	self->offset = rkm->offset;
688 
689 	self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);
690 
691 	return (PyObject *)self;
692 }
693 
694 
695 
696 
697 /****************************************************************************
698  *
699  *
700  * TopicPartition
701  *
702  *
703  *
704  *
705  ****************************************************************************/
TopicPartition_clear(TopicPartition * self)706 static int TopicPartition_clear (TopicPartition *self) {
707 	if (self->topic) {
708 		free(self->topic);
709 		self->topic = NULL;
710 	}
711 	if (self->error) {
712 		Py_DECREF(self->error);
713 		self->error = NULL;
714 	}
715 	return 0;
716 }
717 
TopicPartition_setup(TopicPartition * self,const char * topic,int partition,long long offset,rd_kafka_resp_err_t err)718 static void TopicPartition_setup (TopicPartition *self, const char *topic,
719 				  int partition, long long offset,
720 				  rd_kafka_resp_err_t err) {
721 	self->topic = strdup(topic);
722 	self->partition = partition;
723 	self->offset = offset;
724 	self->error = KafkaError_new_or_None(err, NULL);
725 }
726 
727 
TopicPartition_dealloc(TopicPartition * self)728 static void TopicPartition_dealloc (TopicPartition *self) {
729 	PyObject_GC_UnTrack(self);
730 
731 	TopicPartition_clear(self);
732 
733 	Py_TYPE(self)->tp_free((PyObject *)self);
734 }
735 
736 
TopicPartition_init(PyObject * self,PyObject * args,PyObject * kwargs)737 static int TopicPartition_init (PyObject *self, PyObject *args,
738 				      PyObject *kwargs) {
739 	const char *topic;
740 	int partition = RD_KAFKA_PARTITION_UA;
741 	long long offset = RD_KAFKA_OFFSET_INVALID;
742 	static char *kws[] = { "topic",
743 			       "partition",
744 			       "offset",
745 			       NULL };
746 
747 	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws,
748 					 &topic, &partition, &offset))
749 		return -1;
750 
751 	TopicPartition_setup((TopicPartition *)self,
752 			     topic, partition, offset, 0);
753 
754 	return 0;
755 }
756 
757 
TopicPartition_new(PyTypeObject * type,PyObject * args,PyObject * kwargs)758 static PyObject *TopicPartition_new (PyTypeObject *type, PyObject *args,
759 				     PyObject *kwargs) {
760 	PyObject *self = type->tp_alloc(type, 1);
761 	return self;
762 }
763 
764 
765 
TopicPartition_traverse(TopicPartition * self,visitproc visit,void * arg)766 static int TopicPartition_traverse (TopicPartition *self,
767 				    visitproc visit, void *arg) {
768 	if (self->error)
769 		Py_VISIT(self->error);
770 	return 0;
771 }
772 
773 
774 static PyMemberDef TopicPartition_members[] = {
775         { "topic", T_STRING, offsetof(TopicPartition, topic), READONLY,
776           ":attribute topic: Topic name (string)" },
777         { "partition", T_INT, offsetof(TopicPartition, partition), 0,
778           ":attribute partition: Partition number (int)" },
779         { "offset", T_LONGLONG, offsetof(TopicPartition, offset), 0,
780           ":attribute offset: Offset (long)\n"
781           "\n"
782           "Either an absolute offset (>=0) or a logical offset: "
783           " :py:const:`OFFSET_BEGINNING`,"
784           " :py:const:`OFFSET_END`,"
785           " :py:const:`OFFSET_STORED`,"
786           " :py:const:`OFFSET_INVALID`\n"
787         },
788         { "error", T_OBJECT, offsetof(TopicPartition, error), READONLY,
789           ":attribute error: Indicates an error (with :py:class:`KafkaError`) unless None." },
790         { NULL }
791 };
792 
793 
TopicPartition_str0(TopicPartition * self)794 static PyObject *TopicPartition_str0 (TopicPartition *self) {
795         PyObject *errstr = NULL;
796         PyObject *errstr8 = NULL;
797         const char *c_errstr = NULL;
798 	PyObject *ret;
799 	char offset_str[40];
800 
801 	snprintf(offset_str, sizeof(offset_str), "%"CFL_PRId64"", self->offset);
802 
803         if (self->error != Py_None) {
804                 errstr = cfl_PyObject_Unistr(self->error);
805                 c_errstr = cfl_PyUnistr_AsUTF8(errstr, &errstr8);
806         }
807 
808 	ret = cfl_PyUnistr(
809 		_FromFormat("TopicPartition{topic=%s,partition=%"CFL_PRId32
810 			    ",offset=%s,error=%s}",
811 			    self->topic, self->partition,
812 			    offset_str,
813 			    c_errstr ? c_errstr : "None"));
814         Py_XDECREF(errstr8);
815         Py_XDECREF(errstr);
816 	return ret;
817 }
818 
819 
820 static PyObject *
TopicPartition_richcompare(TopicPartition * self,PyObject * o2,int op)821 TopicPartition_richcompare (TopicPartition *self, PyObject *o2,
822 			    int op) {
823 	TopicPartition *a = self, *b;
824 	int tr, pr;
825 	int r;
826 	PyObject *result;
827 
828 	if (Py_TYPE(o2) != Py_TYPE(self)) {
829 		PyErr_SetNone(PyExc_NotImplementedError);
830 		return NULL;
831 	}
832 
833 	b = (TopicPartition *)o2;
834 
835 	tr = strcmp(a->topic, b->topic);
836 	pr = a->partition - b->partition;
837 	switch (op)
838 	{
839 	case Py_LT:
840 		r = tr < 0 || (tr == 0 && pr < 0);
841 		break;
842 	case Py_LE:
843 		r = tr < 0 || (tr == 0 && pr <= 0);
844 		break;
845 	case Py_EQ:
846 		r = (tr == 0 && pr == 0);
847 		break;
848 	case Py_NE:
849 		r = (tr != 0 || pr != 0);
850 		break;
851 	case Py_GT:
852 		r = tr > 0 || (tr == 0 && pr > 0);
853 		break;
854 	case Py_GE:
855 		r = tr > 0 || (tr == 0 && pr >= 0);
856 		break;
857 	default:
858 		r = 0;
859 		break;
860 	}
861 
862 	result = r ? Py_True : Py_False;
863 	Py_INCREF(result);
864 	return result;
865 }
866 
867 
TopicPartition_hash(TopicPartition * self)868 static long TopicPartition_hash (TopicPartition *self) {
869 	PyObject *topic = cfl_PyUnistr(_FromString(self->topic));
870 	long r = PyObject_Hash(topic) ^ self->partition;
871 	Py_DECREF(topic);
872 	return r;
873 }
874 
875 
876 PyTypeObject TopicPartitionType = {
877 	PyVarObject_HEAD_INIT(NULL, 0)
878 	"cimpl.TopicPartition",         /*tp_name*/
879 	sizeof(TopicPartition),       /*tp_basicsize*/
880 	0,                         /*tp_itemsize*/
881 	(destructor)TopicPartition_dealloc, /*tp_dealloc*/
882 	0,                         /*tp_print*/
883 	0,                         /*tp_getattr*/
884 	0,                         /*tp_setattr*/
885 	0,                         /*tp_compare*/
886 	(reprfunc)TopicPartition_str0, /*tp_repr*/
887 	0,                         /*tp_as_number*/
888 	0,                         /*tp_as_sequence*/
889 	0,                         /*tp_as_mapping*/
890 	(hashfunc)TopicPartition_hash, /*tp_hash */
891 	0,                         /*tp_call*/
892 	0,                         /*tp_str*/
893 	PyObject_GenericGetAttr,   /*tp_getattro*/
894 	0,                         /*tp_setattro*/
895 	0,                         /*tp_as_buffer*/
896 	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
897 	Py_TPFLAGS_HAVE_GC, /*tp_flags*/
898 	"TopicPartition is a generic type to hold a single partition and "
899 	"various information about it.\n"
900 	"\n"
901 	"It is typically used to provide a list of topics or partitions for "
902 	"various operations, such as :py:func:`Consumer.assign()`.\n"
903 	"\n"
904 	".. py:function:: TopicPartition(topic, [partition], [offset])\n"
905 	"\n"
906 	"  Instantiate a TopicPartition object.\n"
907 	"\n"
908 	"  :param string topic: Topic name\n"
909 	"  :param int partition: Partition id\n"
910 	"  :param int offset: Initial partition offset\n"
911 	"  :rtype: TopicPartition\n"
912 	"\n"
913 	"\n", /*tp_doc*/
914 	(traverseproc)TopicPartition_traverse, /* tp_traverse */
915 	(inquiry)TopicPartition_clear,       /* tp_clear */
916 	(richcmpfunc)TopicPartition_richcompare, /* tp_richcompare */
917 	0,		           /* tp_weaklistoffset */
918 	0,		           /* tp_iter */
919 	0,		           /* tp_iternext */
920 	0,                         /* tp_methods */
921 	TopicPartition_members,/* tp_members */
922 	0,                         /* tp_getset */
923 	0,                         /* tp_base */
924 	0,                         /* tp_dict */
925 	0,                         /* tp_descr_get */
926 	0,                         /* tp_descr_set */
927 	0,                         /* tp_dictoffset */
928 	TopicPartition_init,       /* tp_init */
929 	0,                         /* tp_alloc */
930 	TopicPartition_new         /* tp_new */
931 };
932 
933 /**
934  * @brief Internal factory to create a TopicPartition object.
935  */
TopicPartition_new0(const char * topic,int partition,long long offset,rd_kafka_resp_err_t err)936 static PyObject *TopicPartition_new0 (const char *topic, int partition,
937 				      long long offset,
938 				      rd_kafka_resp_err_t err) {
939 	TopicPartition *self;
940 
941 	self = (TopicPartition *)TopicPartitionType.tp_new(
942 		&TopicPartitionType, NULL, NULL);
943 
944 	TopicPartition_setup(self, topic, partition, offset, err);
945 
946 	return (PyObject *)self;
947 }
948 
949 
950 
951 
952 /**
953  * @brief Convert C rd_kafka_topic_partition_list_t to Python list(TopicPartition).
954  *
955  * @returns The new Python list object.
956  */
c_parts_to_py(const rd_kafka_topic_partition_list_t * c_parts)957 PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
958 	PyObject *parts;
959 	size_t i;
960 
961 	parts = PyList_New(c_parts->cnt);
962 
963 	for (i = 0 ; i < (size_t)c_parts->cnt ; i++) {
964 		const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i];
965 		PyList_SET_ITEM(parts, i,
966 				TopicPartition_new0(
967 					rktpar->topic, rktpar->partition,
968 					rktpar->offset, rktpar->err));
969 	}
970 
971 	return parts;
972 
973 }
974 
975 /**
976  * @brief Convert Python list(TopicPartition) to C rd_kafka_topic_partition_list_t.
977  *
978  * @returns The new C list on success or NULL on error.
979  */
py_to_c_parts(PyObject * plist)980 rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
981 	rd_kafka_topic_partition_list_t *c_parts;
982 	size_t i;
983 
984 	if (!PyList_Check(plist)) {
985 		PyErr_SetString(PyExc_TypeError,
986 				"requires list of TopicPartition");
987 		return NULL;
988 	}
989 
990 	c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));
991 
992 	for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
993 		TopicPartition *tp = (TopicPartition *)
994 			PyList_GetItem(plist, i);
995 
996 		if (PyObject_Type((PyObject *)tp) !=
997 		    (PyObject *)&TopicPartitionType) {
998 			PyErr_Format(PyExc_TypeError,
999 				     "expected %s",
1000 				     TopicPartitionType.tp_name);
1001 			rd_kafka_topic_partition_list_destroy(c_parts);
1002 			return NULL;
1003 		}
1004 
1005 		rd_kafka_topic_partition_list_add(c_parts,
1006 						  tp->topic,
1007 						  tp->partition)->offset =
1008 			tp->offset;
1009 	}
1010 
1011 	return c_parts;
1012 }
1013 
1014 #ifdef RD_KAFKA_V_HEADERS
1015 
1016 
1017 /**
1018  * @brief Translate Python \p key and \p value to C types and set on
1019  *        provided \p rd_headers object.
1020  *
1021  * @returns 1 on success or 0 if an exception was raised.
1022  */
py_header_to_c(rd_kafka_headers_t * rd_headers,PyObject * key,PyObject * value)1023 static int py_header_to_c (rd_kafka_headers_t *rd_headers,
1024                            PyObject *key, PyObject *value) {
1025         PyObject *ks, *ks8, *vo8 = NULL;
1026         const char *k;
1027         const void *v = NULL;
1028         Py_ssize_t vsize = 0;
1029         rd_kafka_resp_err_t err;
1030 
1031         if (!(ks = cfl_PyObject_Unistr(key))) {
1032                 PyErr_SetString(PyExc_TypeError,
1033                                 "expected header key to be unicode "
1034                                 "string");
1035                 return 0;
1036         }
1037 
1038         k = cfl_PyUnistr_AsUTF8(ks, &ks8);
1039 
1040         if (value != Py_None) {
1041                 if (cfl_PyBin(_Check(value))) {
1042                         /* Proper binary */
1043                         if (cfl_PyBin(_AsStringAndSize(value, (char **)&v,
1044                                                        &vsize)) == -1) {
1045                                 Py_DECREF(ks);
1046                                 Py_XDECREF(ks8);
1047                                 return 0;
1048                         }
1049                 } else if (cfl_PyUnistr(_Check(value))) {
1050                         /* Unicode string, translate to utf-8. */
1051                         v = cfl_PyUnistr_AsUTF8(value, &vo8);
1052                         if (!v) {
1053                                 Py_DECREF(ks);
1054                                 Py_XDECREF(ks8);
1055                                 return 0;
1056                         }
1057                         vsize = (Py_ssize_t)strlen(v);
1058                 } else {
1059                         PyErr_Format(PyExc_TypeError,
1060                                      "expected header value to be "
1061                                      "None, binary, or unicode string, not %s",
1062                                      ((PyTypeObject *)PyObject_Type(value))->
1063                                      tp_name);
1064                         Py_DECREF(ks);
1065                         Py_XDECREF(ks8);
1066                         return 0;
1067                 }
1068         }
1069 
1070         if ((err = rd_kafka_header_add(rd_headers, k, -1, v, vsize))) {
1071                 cfl_PyErr_Format(err,
1072                                  "Unable to add message header \"%s\": "
1073                                  "%s",
1074                                  k, rd_kafka_err2str(err));
1075                 Py_DECREF(ks);
1076                 Py_XDECREF(ks8);
1077                 Py_XDECREF(vo8);
1078                 return 0;
1079         }
1080 
1081         Py_DECREF(ks);
1082         Py_XDECREF(ks8);
1083         Py_XDECREF(vo8);
1084 
1085         return 1;
1086 }
1087 
1088 /**
1089  * @brief Convert Python list of tuples to rd_kafka_headers_t
1090  *
1091  * Header names must be unicode strong.
1092  * Header values may be None, binary or unicode string, the latter is
1093  * automatically encoded as utf-8.
1094  */
py_headers_list_to_c(PyObject * hdrs)1095 static rd_kafka_headers_t *py_headers_list_to_c (PyObject *hdrs) {
1096         int i, len;
1097         rd_kafka_headers_t *rd_headers = NULL;
1098 
1099         len = (int)PyList_Size(hdrs);
1100         rd_headers = rd_kafka_headers_new(len);
1101 
1102         for (i = 0; i < len; i++) {
1103                 PyObject *tuple = PyList_GET_ITEM(hdrs, i);
1104 
1105                 if (!PyTuple_Check(tuple) || PyTuple_Size(tuple) != 2) {
1106                         rd_kafka_headers_destroy(rd_headers);
1107                         PyErr_SetString(PyExc_TypeError,
1108                                         "Headers are expected to be a "
1109                                         "list of (key, value) tuples");
1110                         return NULL;
1111                 }
1112 
1113                 if (!py_header_to_c(rd_headers,
1114                                     PyTuple_GET_ITEM(tuple, 0),
1115                                     PyTuple_GET_ITEM(tuple, 1))) {
1116                         rd_kafka_headers_destroy(rd_headers);
1117                         return NULL;
1118                 }
1119         }
1120         return rd_headers;
1121 }
1122 
1123 
1124 /**
1125  * @brief Convert Python dict to rd_kafka_headers_t
1126  */
py_headers_dict_to_c(PyObject * hdrs)1127 static rd_kafka_headers_t *py_headers_dict_to_c (PyObject *hdrs) {
1128         int len;
1129         Py_ssize_t pos = 0;
1130         rd_kafka_headers_t *rd_headers = NULL;
1131         PyObject *ko, *vo;
1132 
1133         len = (int)PyDict_Size(hdrs);
1134         rd_headers = rd_kafka_headers_new(len);
1135 
1136         while (PyDict_Next(hdrs, &pos, &ko, &vo)) {
1137 
1138                 if (!py_header_to_c(rd_headers, ko, vo)) {
1139                         rd_kafka_headers_destroy(rd_headers);
1140                         return NULL;
1141                 }
1142         }
1143 
1144         return rd_headers;
1145 }
1146 
1147 
1148 /**
1149  * @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
1150  *
1151  * @returns The new Python list[(header_key, header_value),...] object.
1152  */
py_headers_to_c(PyObject * hdrs)1153 rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs) {
1154 
1155         if (PyList_Check(hdrs)) {
1156                 return py_headers_list_to_c(hdrs);
1157         } else if (PyDict_Check(hdrs)) {
1158                 return py_headers_dict_to_c(hdrs);
1159         } else {
1160                 PyErr_Format(PyExc_TypeError,
1161                              "expected headers to be "
1162                              "dict or list of (key, value) tuples, not %s",
1163                              ((PyTypeObject *)PyObject_Type(hdrs))->tp_name);
1164                 return NULL;
1165         }
1166 }
1167 
1168 
1169 /**
1170  * @brief Convert rd_kafka_headers_t to Python list[(header_key, header_value),...])
1171  *
1172  * @returns The new C headers on success or NULL on error.
1173  */
c_headers_to_py(rd_kafka_headers_t * headers)1174 PyObject *c_headers_to_py (rd_kafka_headers_t *headers) {
1175     size_t idx = 0;
1176     size_t header_size = 0;
1177     const char *header_key;
1178     const void *header_value;
1179     size_t header_value_size;
1180     PyObject *header_list;
1181 
1182     header_size = rd_kafka_header_cnt(headers);
1183     header_list = PyList_New(header_size);
1184 
1185     while (!rd_kafka_header_get_all(headers, idx++,
1186                                      &header_key, &header_value, &header_value_size)) {
1187             // Create one (key, value) tuple for each header
1188             PyObject *header_tuple = PyTuple_New(2);
1189             PyTuple_SetItem(header_tuple, 0,
1190                 cfl_PyUnistr(_FromString(header_key))
1191             );
1192 
1193             if (header_value) {
1194                     PyTuple_SetItem(header_tuple, 1,
1195                         cfl_PyBin(_FromStringAndSize(header_value, header_value_size))
1196                     );
1197             } else {
1198                 PyTuple_SetItem(header_tuple, 1, Py_None);
1199             }
1200         PyList_SET_ITEM(header_list, idx-1, header_tuple);
1201     }
1202 
1203     return header_list;
1204 }
1205 #endif
1206 
1207 
1208 /****************************************************************************
1209  *
1210  *
1211  * Common callbacks
1212  *
1213  *
1214  *
1215  *
1216  ****************************************************************************/
error_cb(rd_kafka_t * rk,int err,const char * reason,void * opaque)1217 static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
1218 	Handle *h = opaque;
1219 	PyObject *eo, *result;
1220 	CallState *cs;
1221 
1222 	cs = CallState_get(h);
1223 
1224         /* If the client raised a fatal error we'll raise an exception
1225          * rather than calling the error callback. */
1226         if (err == RD_KAFKA_RESP_ERR__FATAL) {
1227                 char errstr[512];
1228                 err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
1229                 cfl_PyErr_Fatal(err, errstr);
1230                 goto crash;
1231         }
1232 
1233 	if (!h->error_cb) {
1234 		/* No callback defined */
1235 		goto done;
1236 	}
1237 
1238 	eo = KafkaError_new0(err, "%s", reason);
1239 	result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
1240 	Py_DECREF(eo);
1241 
1242 	if (result)
1243 		Py_DECREF(result);
1244 	else {
1245         crash:
1246 		CallState_crash(cs);
1247 		rd_kafka_yield(h->rk);
1248 	}
1249 
1250  done:
1251 	CallState_resume(cs);
1252 }
1253 
1254 /**
1255  * @brief librdkafka throttle callback triggered by poll() or flush(), triggers the
1256  *        corresponding Python throttle_cb
1257  */
throttle_cb(rd_kafka_t * rk,const char * broker_name,int32_t broker_id,int throttle_time_ms,void * opaque)1258 static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker_id,
1259                          int throttle_time_ms, void *opaque) {
1260         Handle *h = opaque;
1261         PyObject *ThrottleEvent_type, *throttle_event;
1262         PyObject *result, *args;
1263         CallState *cs;
1264 
1265         cs = CallState_get(h);
1266         if (!h->throttle_cb) {
1267                 /* No callback defined */
1268                 goto done;
1269         }
1270 
1271         ThrottleEvent_type = cfl_PyObject_lookup("confluent_kafka",
1272                                                  "ThrottleEvent");
1273 
1274         if (!ThrottleEvent_type) {
1275                 /* ThrottleEvent class not found */
1276                 goto err;
1277         }
1278 
1279         args = Py_BuildValue("(sid)", broker_name, broker_id, (double)throttle_time_ms/1000);
1280         throttle_event = PyObject_Call(ThrottleEvent_type, args, NULL);
1281 
1282         Py_DECREF(args);
1283         Py_DECREF(ThrottleEvent_type);
1284 
1285         if (!throttle_event) {
1286                 /* Failed to instantiate ThrottleEvent object */
1287                 goto err;
1288         }
1289 
1290         result = PyObject_CallFunctionObjArgs(h->throttle_cb, throttle_event, NULL);
1291 
1292         Py_DECREF(throttle_event);
1293 
1294         if (result) {
1295                 /* throttle_cb executed successfully */
1296                 Py_DECREF(result);
1297                 goto done;
1298         }
1299 
1300         /**
1301         * Stop callback dispatcher, return err to application
1302         * fall-through to unlock GIL
1303         */
1304  err:
1305         CallState_crash(cs);
1306         rd_kafka_yield(h->rk);
1307  done:
1308         CallState_resume(cs);
1309 }
1310 
stats_cb(rd_kafka_t * rk,char * json,size_t json_len,void * opaque)1311 static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
1312 	Handle *h = opaque;
1313 	PyObject *eo = NULL, *result = NULL;
1314 	CallState *cs = NULL;
1315 
1316 	cs = CallState_get(h);
1317 	if (json_len == 0) {
1318 		/* No data returned*/
1319 		goto done;
1320 	}
1321 
1322 	eo = Py_BuildValue("s", json);
1323 	result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL);
1324 	Py_DECREF(eo);
1325 
1326 	if (result)
1327 		Py_DECREF(result);
1328 	else {
1329 		CallState_crash(cs);
1330 		rd_kafka_yield(h->rk);
1331 	}
1332 
1333  done:
1334 	CallState_resume(cs);
1335 	return 0;
1336 }
1337 
log_cb(const rd_kafka_t * rk,int level,const char * fac,const char * buf)1338 static void log_cb (const rd_kafka_t *rk, int level,
1339                     const char *fac, const char *buf) {
1340         Handle *h = rd_kafka_opaque(rk);
1341         PyObject *result;
1342         CallState *cs;
1343         static const int level_map[8] = {
1344                 /* Map syslog levels to python logging levels */
1345                 50, /* LOG_EMERG   -> logging.CRITICAL */
1346                 50, /* LOG_ALERT   -> logging.CRITICAL */
1347                 50, /* LOG_CRIT    -> logging.CRITICAL */
1348                 40, /* LOG_ERR     -> logging.ERROR */
1349                 30, /* LOG_WARNING -> logging.WARNING */
1350                 20, /* LOG_NOTICE  -> logging.INFO */
1351                 20, /* LOG_INFO    -> logging.INFO */
1352                 10, /* LOG_DEBUG   -> logging.DEBUG */
1353         };
1354 
1355         cs = CallState_get(h);
1356         result = PyObject_CallMethod(h->logger, "log", "issss",
1357                                      level_map[level],
1358                                      "%s [%s] %s",
1359                                      fac, rd_kafka_name(rk), buf);
1360 
1361         if (result)
1362                 Py_DECREF(result);
1363         else {
1364                 CallState_crash(cs);
1365                 rd_kafka_yield(h->rk);
1366         }
1367 
1368         CallState_resume(cs);
1369 }
1370 
1371 /****************************************************************************
1372  *
1373  *
1374  * Common helpers
1375  *
1376  *
1377  *
1378  *
1379  ****************************************************************************/
1380 
1381 
1382 
1383 /**
1384  * Clear Python object references in Handle
1385  */
Handle_clear(Handle * h)1386 void Handle_clear (Handle *h) {
1387         if (h->error_cb) {
1388                 Py_DECREF(h->error_cb);
1389                 h->error_cb = NULL;
1390         }
1391 
1392         if (h->throttle_cb) {
1393                 Py_DECREF(h->throttle_cb);
1394                 h->throttle_cb = NULL;
1395         }
1396 
1397         if (h->stats_cb) {
1398                 Py_DECREF(h->stats_cb);
1399                 h->stats_cb = NULL;
1400         }
1401 
1402         if (h->logger) {
1403                 Py_DECREF(h->logger);
1404                 h->logger = NULL;
1405         }
1406 
1407         if (h->initiated) {
1408 #ifdef WITH_PY_TSS
1409                 PyThread_tss_delete(&h->tlskey);
1410 #else
1411                 PyThread_delete_key(h->tlskey);
1412 #endif
1413         }
1414 }
1415 
1416 /**
1417  * GC traversal for Python object references
1418  */
Handle_traverse(Handle * h,visitproc visit,void * arg)1419 int Handle_traverse (Handle *h, visitproc visit, void *arg) {
1420 	if (h->error_cb)
1421 		Py_VISIT(h->error_cb);
1422 
1423         if (h->throttle_cb)
1424                 Py_VISIT(h->throttle_cb);
1425 
1426 	if (h->stats_cb)
1427 		Py_VISIT(h->stats_cb);
1428 
1429 	return 0;
1430 }
1431 
1432 /**
1433  * @brief Set single special producer config value.
1434  *
1435  * @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
1436  */
producer_conf_set_special(Handle * self,rd_kafka_conf_t * conf,const char * name,PyObject * valobj)1437 static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
1438 				      const char *name, PyObject *valobj) {
1439 
1440 	if (!strcmp(name, "on_delivery")) {
1441 		if (!PyCallable_Check(valobj)) {
1442 			cfl_PyErr_Format(
1443 				RD_KAFKA_RESP_ERR__INVALID_ARG,
1444 				"%s requires a callable "
1445 				"object", name);
1446 			return -1;
1447 		}
1448 
1449 		self->u.Producer.default_dr_cb = valobj;
1450 		Py_INCREF(self->u.Producer.default_dr_cb);
1451 
1452 		return 1;
1453 
1454         } else if (!strcmp(name, "delivery.report.only.error")) {
1455                 /* Since we allocate msgstate for each produced message
1456                  * with a callback we can't use delivery.report.only.error
1457                  * as-is, as we wouldn't be able to ever free those msgstates.
1458                  * Instead we shortcut this setting in the Python client,
1459                  * providing the same functionality from dr_msg_cb trampoline.
1460                  */
1461 
1462                 if (!cfl_PyBool_get(valobj, name,
1463                                     &self->u.Producer.dr_only_error))
1464                         return -1;
1465 
1466                 return 1;
1467         }
1468 
1469 	return 0; /* Not handled */
1470 }
1471 
1472 
1473 /**
1474  * @brief Set single special consumer config value.
1475  *
1476  * @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
1477  */
consumer_conf_set_special(Handle * self,rd_kafka_conf_t * conf,const char * name,PyObject * valobj)1478 static int consumer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
1479 				      const char *name, PyObject *valobj) {
1480 
1481 	if (!strcmp(name, "on_commit")) {
1482 		if (!PyCallable_Check(valobj)) {
1483 			cfl_PyErr_Format(
1484 				RD_KAFKA_RESP_ERR__INVALID_ARG,
1485 				"%s requires a callable "
1486 				"object", name);
1487 			return -1;
1488 		}
1489 
1490 		self->u.Consumer.on_commit = valobj;
1491 		Py_INCREF(self->u.Consumer.on_commit);
1492 
1493 		return 1;
1494 	}
1495 
1496 	return 0;
1497 }
1498 
1499 /**
1500  * @brief Call out to __init__.py _resolve_plugins() to see if any
1501  *        of the specified `plugin.library.paths` are found in the
1502  *        wheel's embedded library directory, and if so change the
1503  *        path to use these libraries.
1504  *
1505  * @returns a possibly updated plugin.library.paths string object which
1506  *          must be DECREF:ed, or NULL if an exception was raised.
1507  */
resolve_plugins(PyObject * plugins)1508 static PyObject *resolve_plugins (PyObject *plugins) {
1509         PyObject *resolved;
1510         PyObject *module, *function;
1511 
1512         module = PyImport_ImportModule("confluent_kafka");
1513         if (!module)
1514                 return NULL;
1515 
1516         function = PyObject_GetAttrString(module, "_resolve_plugins");
1517         if (!function) {
1518                 PyErr_SetString(PyExc_RuntimeError,
1519                                 "confluent_kafka._resolve_plugins() not found");
1520                 Py_DECREF(module);
1521                 return NULL;
1522         }
1523 
1524         resolved = PyObject_CallFunctionObjArgs(function, plugins, NULL);
1525 
1526         Py_DECREF(function);
1527         Py_DECREF(module);
1528 
1529         if (!resolved) {
1530                 PyErr_SetString(PyExc_RuntimeError,
1531                                 "confluent_kafka._resolve_plugins() failed");
1532                 return NULL;
1533         }
1534 
1535         return resolved;
1536 }
1537 
1538 /**
1539  * @brief Remove property from confidct and set rd_kafka_conf with its value
1540  *
1541  * @param vo The property value object
1542  *
1543  * @returns 1 on success or 0 on failure (exception raised).
1544  */
common_conf_set_special(PyObject * confdict,rd_kafka_conf_t * conf,const char * name,PyObject * vo)1545 static int common_conf_set_special(PyObject *confdict, rd_kafka_conf_t *conf,
1546                                    const char *name, PyObject *vo) {
1547         const char *v;
1548         char errstr[256];
1549         PyObject *vs;
1550         PyObject *vs8 = NULL;
1551 
1552         if (!(vs = cfl_PyObject_Unistr(vo))) {
1553                 PyErr_Format(PyExc_TypeError, "expected configuration property %s "
1554                              "as type unicode string", name);
1555                 return 0;
1556         }
1557 
1558         v = cfl_PyUnistr_AsUTF8(vs, &vs8);
1559         if (rd_kafka_conf_set(conf, name, v, errstr, sizeof(errstr))
1560             != RD_KAFKA_CONF_OK) {
1561                 cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
1562                                  "%s", errstr);
1563                 Py_DECREF(vs);
1564                 Py_XDECREF(vs8);
1565                 return 0;
1566         }
1567 
1568         Py_DECREF(vs);
1569         Py_XDECREF(vs8);
1570         PyDict_DelItemString(confdict, name);
1571         return 1;
1572 }
1573 
1574 /**
1575  * Common config setup for Kafka client handles.
1576  *
1577  * Returns a conf object on success or NULL on failure in which case
1578  * an exception has been raised.
1579  */
common_conf_setup(rd_kafka_type_t ktype,Handle * h,PyObject * args,PyObject * kwargs)1580 rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
1581 				    Handle *h,
1582 				    PyObject *args,
1583 				    PyObject *kwargs) {
1584 	rd_kafka_conf_t *conf;
1585 	Py_ssize_t pos = 0;
1586 	PyObject *ko, *vo;
1587         PyObject *confdict = NULL;
1588 
1589         if (rd_kafka_version() < MIN_RD_KAFKA_VERSION) {
1590                 PyErr_Format(PyExc_RuntimeError,
1591                              "%s: librdkafka version %s (0x%x) detected",
1592                              MIN_VER_ERRSTR, rd_kafka_version_str(),
1593                              rd_kafka_version());
1594                 return NULL;
1595         }
1596 
1597         /* Supported parameter constellations:
1598          *  - kwargs (conf={..}, logger=..)
1599          *  - args and kwargs ({..}, logger=..)
1600          *  - args ({..})
1601          * When both args and kwargs are present the kwargs take
1602          * precedence in case of duplicate keys.
1603          * All keys map to configuration properties.
1604          *
1605          * Copy configuration dict to avoid manipulating application config.
1606          */
1607         if (args && PyTuple_Size(args)) {
1608                 if (!PyTuple_Check(args) ||
1609                     PyTuple_Size(args) > 1) {
1610                         PyErr_SetString(PyExc_TypeError,
1611                                         "expected tuple containing single dict");
1612                         return NULL;
1613                 } else if (PyTuple_Size(args) == 1 &&
1614                            !PyDict_Check((confdict = PyTuple_GetItem(args, 0)))) {
1615                                 PyErr_SetString(PyExc_TypeError,
1616                                                 "expected configuration dict");
1617                                 return NULL;
1618                 }
1619                 confdict = PyDict_Copy(confdict);
1620         }
1621 
1622         if (!confdict) {
1623                 if (!kwargs) {
1624                         PyErr_SetString(PyExc_TypeError,
1625                                         "expected configuration dict");
1626                         return NULL;
1627                 }
1628 
1629                 confdict = PyDict_Copy(kwargs);
1630 
1631         } else if (kwargs) {
1632                 /* Update confdict with kwargs */
1633                 PyDict_Update(confdict, kwargs);
1634         }
1635 
1636         if (ktype == RD_KAFKA_CONSUMER &&
1637                 !PyDict_GetItemString(confdict, "group.id")) {
1638 
1639                 PyErr_SetString(PyExc_ValueError,
1640                                 "Failed to create consumer: group.id must be set");
1641                 Py_DECREF(confdict);
1642                 return NULL;
1643         }
1644 
1645 	conf = rd_kafka_conf_new();
1646 
1647         /*
1648          * Set debug contexts first to capture all events including plugin loading
1649          */
1650          if ((vo = PyDict_GetItemString(confdict, "debug")) &&
1651               !common_conf_set_special(confdict, conf, "debug", vo))
1652                         goto outer_err;
1653 
1654         /*
1655          * Plugins must be configured prior to handling any of their
1656          * configuration properties.
1657          * Dicts are unordered so we explicitly check for, set, and delete the
1658          * plugin paths here.
1659          * This ensures plugin configuration properties are handled in the
1660          * correct order.
1661          */
1662         if ((vo = PyDict_GetItemString(confdict, "plugin.library.paths"))) {
1663                 /* Resolve plugin paths */
1664                 PyObject *resolved;
1665 
1666                 resolved = resolve_plugins(vo);
1667                 if (!resolved)
1668                         goto outer_err;
1669 
1670                 if (!common_conf_set_special(confdict, conf,
1671                                              "plugin.library.paths",
1672                                              resolved)) {
1673                         Py_DECREF(resolved);
1674                         goto outer_err;
1675                 }
1676                 Py_DECREF(resolved);
1677         }
1678 
1679         if ((vo = PyDict_GetItemString(confdict, "default.topic.config"))) {
1680         /* TODO: uncomment for 1.0 release
1681                 PyErr_Warn(PyExc_DeprecationWarning,
1682                              "default.topic.config has being deprecated, "
1683                              "set default topic configuration values in the global dict");
1684         */
1685                 if (PyDict_Update(confdict, vo) == -1) {
1686                         goto outer_err;
1687                 }
1688                 PyDict_DelItemString(confdict, "default.topic.config");
1689         }
1690 
1691 	/* Convert config dict to config key-value pairs. */
1692 	while (PyDict_Next(confdict, &pos, &ko, &vo)) {
1693 		PyObject *ks;
1694 		PyObject *ks8 = NULL;
1695 		PyObject *vs = NULL, *vs8 = NULL;
1696 		const char *k;
1697 		const char *v;
1698 		char errstr[256];
1699                 int r = 0;
1700 
1701 		if (!(ks = cfl_PyObject_Unistr(ko))) {
1702                         PyErr_SetString(PyExc_TypeError,
1703                                         "expected configuration property name "
1704                                         "as type unicode string");
1705                         goto inner_err;
1706 		}
1707 
1708 		k = cfl_PyUnistr_AsUTF8(ks, &ks8);
1709 		if (!strcmp(k, "error_cb")) {
1710 			if (!PyCallable_Check(vo)) {
1711 				PyErr_SetString(PyExc_TypeError,
1712 						"expected error_cb property "
1713 						"as a callable function");
1714                                 goto inner_err;
1715                         }
1716 			if (h->error_cb) {
1717 				Py_DECREF(h->error_cb);
1718 				h->error_cb = NULL;
1719 			}
1720 			if (vo != Py_None) {
1721 				h->error_cb = vo;
1722 				Py_INCREF(h->error_cb);
1723 			}
1724                         Py_XDECREF(ks8);
1725 			Py_DECREF(ks);
1726 			continue;
1727                 } else if (!strcmp(k, "throttle_cb")) {
1728                         if (!PyCallable_Check(vo)) {
1729                                 PyErr_SetString(PyExc_ValueError,
1730                                         "expected throttle_cb property "
1731                                         "as a callable function");
1732                                 goto inner_err;
1733                         }
1734                         if (h->throttle_cb) {
1735                                 Py_DECREF(h->throttle_cb);
1736                                 h->throttle_cb = NULL;
1737                         }
1738                         if (vo != Py_None) {
1739                                 h->throttle_cb = vo;
1740                                 Py_INCREF(h->throttle_cb);
1741                         }
1742                         Py_XDECREF(ks8);
1743                         Py_DECREF(ks);
1744                         continue;
1745 		} else if (!strcmp(k, "stats_cb")) {
1746 			if (!PyCallable_Check(vo)) {
1747 				PyErr_SetString(PyExc_TypeError,
1748 						"expected stats_cb property "
1749 						"as a callable function");
1750                                 goto inner_err;
1751                         }
1752 
1753 			if (h->stats_cb) {
1754 				Py_DECREF(h->stats_cb);
1755 				h->stats_cb = NULL;
1756 			}
1757 			if (vo != Py_None) {
1758 				h->stats_cb = vo;
1759 				Py_INCREF(h->stats_cb);
1760 			}
1761                         Py_XDECREF(ks8);
1762 			Py_DECREF(ks);
1763 			continue;
1764                 } else if (!strcmp(k, "logger")) {
1765                         if (h->logger) {
1766                                 Py_DECREF(h->logger);
1767                                 h->logger = NULL;
1768                         }
1769 
1770                         if (vo != Py_None) {
1771                                 h->logger = vo;
1772                                 Py_INCREF(h->logger);
1773                         }
1774                         Py_XDECREF(ks8);
1775                         Py_DECREF(ks);
1776                         continue;
1777                 }
1778 
1779 		/* Special handling for certain config keys. */
1780 		if (ktype == RD_KAFKA_PRODUCER)
1781 			r = producer_conf_set_special(h, conf, k, vo);
1782 		else if (ktype == RD_KAFKA_CONSUMER)
1783 			r = consumer_conf_set_special(h, conf, k, vo);
1784 		if (r == -1) {
1785 			/* Error */
1786                         goto inner_err;
1787 		} else if (r == 1) {
1788 			/* Handled */
1789 			continue;
1790 		}
1791 
1792 
1793 		/*
1794 		 * Pass configuration property through to librdkafka.
1795 		 */
1796                 if (vo == Py_None) {
1797                         v = NULL;
1798                 } else {
1799                         if (!(vs = cfl_PyObject_Unistr(vo))) {
1800                                 PyErr_SetString(PyExc_TypeError,
1801                                                 "expected configuration "
1802                                                 "property value as type "
1803                                                 "unicode string");
1804                                 goto inner_err;
1805                         }
1806                         v = cfl_PyUnistr_AsUTF8(vs, &vs8);
1807                 }
1808 
1809 		if (rd_kafka_conf_set(conf, k, v, errstr, sizeof(errstr)) !=
1810 		    RD_KAFKA_CONF_OK) {
1811 			cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
1812 					  "%s", errstr);
1813                         goto inner_err;
1814 		}
1815 
1816                 Py_XDECREF(vs8);
1817                 Py_XDECREF(vs);
1818                 Py_XDECREF(ks8);
1819 		Py_DECREF(ks);
1820                 continue;
1821 
1822 inner_err:
1823                 Py_XDECREF(vs8);
1824                 Py_XDECREF(vs);
1825                 Py_XDECREF(ks8);
1826                 Py_XDECREF(ks);
1827                 goto outer_err;
1828         }
1829 
1830         Py_DECREF(confdict);
1831 
1832         rd_kafka_conf_set_error_cb(conf, error_cb);
1833 
1834         if (h->throttle_cb)
1835                 rd_kafka_conf_set_throttle_cb(conf, throttle_cb);
1836 
1837 	if (h->stats_cb)
1838 		rd_kafka_conf_set_stats_cb(conf, stats_cb);
1839 
1840         if (h->logger) {
1841                 /* Write logs to log queue (which is forwarded
1842                  * to the polled queue in the Producer/Consumer constructors) */
1843                 rd_kafka_conf_set(conf, "log.queue", "true", NULL, 0);
1844                 rd_kafka_conf_set_log_cb(conf, log_cb);
1845         }
1846 
1847 	rd_kafka_conf_set_opaque(conf, h);
1848 
1849 #ifdef WITH_PY_TSS
1850         if (PyThread_tss_create(&h->tlskey)) {
1851                 PyErr_SetString(PyExc_RuntimeError,
1852                                 "Failed to initialize thread local storage");
1853                 rd_kafka_conf_destroy(conf);
1854                 return NULL;
1855         }
1856 #else
1857         h->tlskey = PyThread_create_key();
1858 #endif
1859 
1860         h->initiated = 1;
1861 
1862 	return conf;
1863 
1864 outer_err:
1865         Py_DECREF(confdict);
1866         rd_kafka_conf_destroy(conf);
1867 
1868         return NULL;
1869 }
1870 
1871 
1872 
1873 
1874 /**
1875  * @brief Initialiase a CallState and unlock the GIL prior to a
1876  *        possibly blocking external call.
1877  */
CallState_begin(Handle * h,CallState * cs)1878 void CallState_begin (Handle *h, CallState *cs) {
1879 	cs->thread_state = PyEval_SaveThread();
1880 	assert(cs->thread_state != NULL);
1881 	cs->crashed = 0;
1882 #ifdef WITH_PY_TSS
1883         PyThread_tss_set(&h->tlskey, cs);
1884 #else
1885         PyThread_set_key_value(h->tlskey, cs);
1886 #endif
1887 }
1888 
1889 /**
1890  * @brief Relock the GIL after external call is done.
1891  * @returns 0 if a Python signal was raised or a callback crashed, else 1.
1892  */
CallState_end(Handle * h,CallState * cs)1893 int CallState_end (Handle *h, CallState *cs) {
1894 #ifdef WITH_PY_TSS
1895         PyThread_tss_set(&h->tlskey, NULL);
1896 #else
1897         PyThread_delete_key_value(h->tlskey);
1898 #endif
1899 
1900 	PyEval_RestoreThread(cs->thread_state);
1901 
1902 	if (PyErr_CheckSignals() == -1 || cs->crashed)
1903 		return 0;
1904 
1905 	return 1;
1906 }
1907 
1908 
1909 /**
1910  * @brief Get the current thread's CallState and re-locks the GIL.
1911  */
CallState_get(Handle * h)1912 CallState *CallState_get (Handle *h) {
1913         CallState *cs;
1914 #ifdef WITH_PY_TSS
1915         cs = PyThread_tss_get(&h->tlskey);
1916 #else
1917         cs = PyThread_get_key_value(h->tlskey);
1918 #endif
1919 	assert(cs != NULL);
1920 	assert(cs->thread_state != NULL);
1921 	PyEval_RestoreThread(cs->thread_state);
1922 	cs->thread_state = NULL;
1923 	return cs;
1924 }
1925 
1926 /**
1927  * @brief Un-locks the GIL to resume blocking external call.
1928  */
CallState_resume(CallState * cs)1929 void CallState_resume (CallState *cs) {
1930 	assert(cs->thread_state == NULL);
1931 	cs->thread_state = PyEval_SaveThread();
1932 }
1933 
1934 /**
1935  * @brief Indicate that call crashed.
1936  */
CallState_crash(CallState * cs)1937 void CallState_crash (CallState *cs) {
1938 	cs->crashed++;
1939 }
1940 
1941 
1942 
1943 /**
1944  * @brief Find class/type/object \p typename in \p modulename
1945  *
1946  * @returns a new reference to the object.
1947  *
1948  * @raises a TypeError exception if the type is not found.
1949  */
1950 
cfl_PyObject_lookup(const char * modulename,const char * typename)1951 PyObject *cfl_PyObject_lookup (const char *modulename, const char *typename) {
1952         PyObject *module = PyImport_ImportModule(modulename);
1953         PyObject *obj;
1954 
1955         if (!modulename) {
1956                 PyErr_Format(PyExc_TypeError,
1957                              "Module %s not found when looking up %s.%s",
1958                              modulename, modulename, typename);
1959                 return NULL;
1960         }
1961 
1962         obj = PyObject_GetAttrString(module, typename);
1963         if (!obj) {
1964                 Py_DECREF(module);
1965                 PyErr_Format(PyExc_TypeError,
1966                              "No such class/type/object: %s.%s",
1967                              modulename, typename);
1968                 return NULL;
1969         }
1970 
1971         return obj;
1972 }
1973 
1974 
cfl_PyDict_SetString(PyObject * dict,const char * name,const char * val)1975 void cfl_PyDict_SetString (PyObject *dict, const char *name, const char *val) {
1976         PyObject *vo = cfl_PyUnistr(_FromString(val));
1977         PyDict_SetItemString(dict, name, vo);
1978         Py_DECREF(vo);
1979 }
1980 
cfl_PyDict_SetInt(PyObject * dict,const char * name,int val)1981 void cfl_PyDict_SetInt (PyObject *dict, const char *name, int val) {
1982         PyObject *vo = cfl_PyInt_FromInt(val);
1983         PyDict_SetItemString(dict, name, vo);
1984         Py_DECREF(vo);
1985 }
1986 
1987 
cfl_PyObject_SetString(PyObject * o,const char * name,const char * val)1988 int cfl_PyObject_SetString (PyObject *o, const char *name, const char *val) {
1989         PyObject *vo = cfl_PyUnistr(_FromString(val));
1990         int r = PyObject_SetAttrString(o, name, vo);
1991         Py_DECREF(vo);
1992         return r;
1993 }
1994 
cfl_PyObject_SetInt(PyObject * o,const char * name,int val)1995 int cfl_PyObject_SetInt (PyObject *o, const char *name, int val) {
1996         PyObject *vo = cfl_PyInt_FromInt(val);
1997         int r = PyObject_SetAttrString(o, name, vo);
1998         Py_DECREF(vo);
1999         return r;
2000 }
2001 
2002 
2003 /**
2004  * @brief Get attribute \p attr_name from \p object and verify it is
2005  *        of type \p py_type.
2006  *
2007  * @param py_type the value type of \p attr_name must match \p py_type, unless
2008  *                \p py_type is NULL.
2009  *
2010  * @returns 1 if \p valp was updated with the object (new reference) or NULL
2011  *          if not matched and not required, or
2012  *          0 if an exception was raised.
2013  */
cfl_PyObject_GetAttr(PyObject * object,const char * attr_name,PyObject ** valp,const PyTypeObject * py_type,int required)2014 int cfl_PyObject_GetAttr (PyObject *object, const char *attr_name,
2015                           PyObject **valp, const PyTypeObject *py_type,
2016                           int required) {
2017         PyObject *o;
2018 
2019         o = PyObject_GetAttrString(object, attr_name);
2020         if (!o) {
2021                 if (!required) {
2022                         *valp = NULL;
2023                         return 1;
2024                 }
2025 
2026                 PyErr_Format(PyExc_TypeError,
2027                              "Required attribute .%s missing", attr_name);
2028                 return 0;
2029         }
2030 
2031         if (py_type && Py_TYPE(o) != py_type) {
2032                 Py_DECREF(o);
2033                 PyErr_Format(PyExc_TypeError,
2034                              "Expected .%s to be %s type, not %s",
2035                              attr_name, py_type->tp_name,
2036                              ((PyTypeObject *)PyObject_Type(o))->tp_name);
2037                 return 0;
2038         }
2039 
2040         *valp = o;
2041 
2042         return 1;
2043 }
2044 
2045 /**
2046  * @brief Get attribute \p attr_name from \p object and make sure it is
2047  *        an integer type.
2048  *
2049  * @returns 1 if \p valp was updated with either the object value, or \p defval.
2050  *          0 if an exception was raised.
2051  */
cfl_PyObject_GetInt(PyObject * object,const char * attr_name,int * valp,int defval,int required)2052 int cfl_PyObject_GetInt (PyObject *object, const char *attr_name, int *valp,
2053                          int defval, int required) {
2054         PyObject *o;
2055 
2056         if (!cfl_PyObject_GetAttr(object, attr_name, &o,
2057 #ifdef PY3
2058                                   &PyLong_Type,
2059 #else
2060                                   &PyInt_Type,
2061 #endif
2062                                   required))
2063                 return 0;
2064 
2065         if (!o) {
2066                 *valp = defval;
2067                 return 1;
2068         }
2069 
2070         *valp = cfl_PyInt_AsInt(o);
2071         Py_DECREF(o);
2072 
2073         return 1;
2074 }
2075 
2076 
2077 /**
2078  * @brief Checks that \p object is a bool (or boolable) and sets
2079  *        \p *valp according to the object.
2080  *
2081  * @returns 1 if \p valp was set, or 0 if \p object is not a boolable object.
2082  *          An exception is raised in the error case.
2083  */
cfl_PyBool_get(PyObject * object,const char * name,int * valp)2084 int cfl_PyBool_get (PyObject *object, const char *name, int *valp) {
2085         if (!PyBool_Check(object)) {
2086                 PyErr_Format(PyExc_TypeError,
2087                              "Expected %s to be bool type, not %s",
2088                              name,
2089                              ((PyTypeObject *)PyObject_Type(object))->tp_name);
2090                 return 0;
2091         }
2092 
2093         *valp = object == Py_True;
2094 
2095         return 1;
2096 }
2097 
2098 
2099 /**
2100  * @brief Get attribute \p attr_name from \p object and make sure it is
2101  *        a string type.
2102  *
2103  * @returns 1 if \p valp was updated with a newly allocated copy of either the
2104  *          object value (UTF8), or \p defval.
2105  *          0 if an exception was raised.
2106  */
cfl_PyObject_GetString(PyObject * object,const char * attr_name,char ** valp,const char * defval,int required)2107 int cfl_PyObject_GetString (PyObject *object, const char *attr_name,
2108                             char **valp, const char *defval, int required) {
2109         PyObject *o, *uo, *uop;
2110 
2111         if (!cfl_PyObject_GetAttr(object, attr_name, &o,
2112 #ifdef PY3
2113                                   &PyUnicode_Type,
2114 #else
2115                                   /* Python 2: support both str and unicode
2116                                    *           let PyObject_Unistr() do the
2117                                    *           proper conversion below. */
2118                                   NULL,
2119 #endif
2120                                   required))
2121                 return 0;
2122 
2123         if (!o) {
2124                 *valp = defval ? strdup(defval) : NULL;
2125                 return 1;
2126         }
2127 
2128         if (!(uo = cfl_PyObject_Unistr(o))) {
2129                 Py_DECREF(o);
2130                 PyErr_Format(PyExc_TypeError,
2131                              "Expected .%s to be a unicode string type, not %s",
2132                              attr_name,
2133                              ((PyTypeObject *)PyObject_Type(o))->tp_name);
2134                 return 0;
2135         }
2136         Py_DECREF(o);
2137 
2138         *valp = (char *)cfl_PyUnistr_AsUTF8(uo, &uop);
2139         if (!*valp) {
2140                 Py_DECREF(uo);
2141                 Py_XDECREF(uop);
2142                 return 0; /* exception raised by AsUTF8 */
2143         }
2144 
2145         *valp = strdup(*valp);
2146         Py_DECREF(uo);
2147         Py_XDECREF(uop);
2148 
2149         return 1;
2150 }
2151 
2152 
2153 
2154 /**
2155  * @returns a Python list of longs based on the input int32_t array
2156  */
cfl_int32_array_to_py_list(const int32_t * arr,size_t cnt)2157 PyObject *cfl_int32_array_to_py_list (const int32_t *arr, size_t cnt) {
2158         PyObject *list;
2159         size_t i;
2160 
2161         list = PyList_New((Py_ssize_t)cnt);
2162         if (!list)
2163                 return NULL;
2164 
2165         for (i = 0 ; i < cnt ; i++)
2166                 PyList_SET_ITEM(list, (Py_ssize_t)i,
2167                                 cfl_PyInt_FromInt(arr[i]));
2168 
2169         return list;
2170 }
2171 
2172 
2173 /****************************************************************************
2174  *
2175  *
2176  * Base
2177  *
2178  *
2179  *
2180  *
2181  ****************************************************************************/
2182 
2183 
libversion(PyObject * self,PyObject * args)2184 static PyObject *libversion (PyObject *self, PyObject *args) {
2185 	return Py_BuildValue("si",
2186 			     rd_kafka_version_str(),
2187 			     rd_kafka_version());
2188 }
2189 
2190 /*
2191  * Version hex representation
2192  * 0xMMmmRRPP
2193  * MM=major, mm=minor, RR=revision, PP=patchlevel (not used)
2194  */
version(PyObject * self,PyObject * args)2195 static PyObject *version (PyObject *self, PyObject *args) {
2196 	return Py_BuildValue("si", "1.2.0", 0x01020000);
2197 }
2198 
2199 static PyMethodDef cimpl_methods[] = {
2200 	{"libversion", libversion, METH_NOARGS,
2201 	 "  Retrieve librdkafka version string and integer\n"
2202 	 "\n"
2203 	 "  :returns: (version_string, version_int) tuple\n"
2204 	 "  :rtype: tuple(str,int)\n"
2205 	 "\n"
2206 	},
2207 	{"version", version, METH_NOARGS,
2208 	 "  Retrieve module version string and integer\n"
2209 	 "\n"
2210 	 "  :returns: (version_string, version_int) tuple\n"
2211 	 "  :rtype: tuple(str,int)\n"
2212 	 "\n"
2213 	},
2214 	{ NULL }
2215 };
2216 
2217 
2218 /**
2219  * @brief Add librdkafka error enums to KafkaError's type dict.
2220  * @returns an updated doc string containing all error constants.
2221  */
KafkaError_add_errs(PyObject * dict,const char * origdoc)2222 static char *KafkaError_add_errs (PyObject *dict, const char *origdoc) {
2223 	const struct rd_kafka_err_desc *descs;
2224 	size_t cnt;
2225 	size_t i;
2226 	char *doc;
2227 	size_t dof = 0, dsize;
2228 	/* RST grid table column widths */
2229 #define _COL1_W 50
2230 #define _COL2_W 100 /* Must be larger than COL1 */
2231 	char dash[_COL2_W], eq[_COL2_W];
2232 
2233 	rd_kafka_get_err_descs(&descs, &cnt);
2234 
2235 	memset(dash, '-', sizeof(dash));
2236 	memset(eq, '=', sizeof(eq));
2237 
2238 	/* Setup output doc buffer. */
2239 	dof = strlen(origdoc);
2240 	dsize = dof + 500 + (cnt * 200);
2241 	doc = malloc(dsize);
2242 	memcpy(doc, origdoc, dof+1);
2243 
2244 #define _PRINT(...) do {						\
2245 		char tmpdoc[512];					\
2246 		size_t _len;						\
2247 		_len = snprintf(tmpdoc, sizeof(tmpdoc), __VA_ARGS__);	\
2248 		if (_len > sizeof(tmpdoc)) _len = sizeof(tmpdoc)-1;	\
2249 		if (dof + _len >= dsize) {				\
2250 			dsize += 2;					\
2251 			doc = realloc(doc, dsize);			\
2252 		}							\
2253 		memcpy(doc+dof, tmpdoc, _len+1);			\
2254 		dof += _len;						\
2255 	} while (0)
2256 
2257 	/* Error constant table header (RST grid table) */
2258 	_PRINT("Error and event constants:\n\n"
2259 	       "+-%.*s-+-%.*s-+\n"
2260 	       "| %-*.*s | %-*.*s |\n"
2261 	       "+=%.*s=+=%.*s=+\n",
2262 	       _COL1_W, dash, _COL2_W, dash,
2263 	       _COL1_W, _COL1_W, "Constant", _COL2_W, _COL2_W, "Description",
2264 	       _COL1_W, eq, _COL2_W, eq);
2265 
2266 	for (i = 0 ; i < cnt ; i++) {
2267 		PyObject *code;
2268 
2269 		if (!descs[i].desc)
2270 			continue;
2271 
2272 		code = cfl_PyInt_FromInt(descs[i].code);
2273 
2274 		PyDict_SetItemString(dict, descs[i].name, code);
2275 
2276 		Py_DECREF(code);
2277 
2278 		_PRINT("| %-*.*s | %-*.*s |\n"
2279 		       "+-%.*s-+-%.*s-+\n",
2280 		       _COL1_W, _COL1_W, descs[i].name,
2281 		       _COL2_W, _COL2_W, descs[i].desc,
2282 		       _COL1_W, dash, _COL2_W, dash);
2283 	}
2284 
2285 	_PRINT("\n");
2286 
2287 	return doc; // FIXME: leak
2288 }
2289 
2290 
2291 #ifdef PY3
2292 static struct PyModuleDef cimpl_moduledef = {
2293 	PyModuleDef_HEAD_INIT,
2294 	"cimpl",                                  /* m_name */
2295 	"Confluent's Python client for Apache Kafka (C implementation)", /* m_doc */
2296 	-1,                                       /* m_size */
2297 	cimpl_methods,                            /* m_methods */
2298 };
2299 #endif
2300 
2301 
_init_cimpl(void)2302 static PyObject *_init_cimpl (void) {
2303 	PyObject *m;
2304 
2305         PyEval_InitThreads();
2306 
2307 	if (PyType_Ready(&KafkaErrorType) < 0)
2308 		return NULL;
2309 	if (PyType_Ready(&MessageType) < 0)
2310 		return NULL;
2311 	if (PyType_Ready(&TopicPartitionType) < 0)
2312 		return NULL;
2313 	if (PyType_Ready(&ProducerType) < 0)
2314 		return NULL;
2315 	if (PyType_Ready(&ConsumerType) < 0)
2316 		return NULL;
2317         if (PyType_Ready(&AdminType) < 0)
2318                 return NULL;
2319         if (AdminTypes_Ready() < 0)
2320                 return NULL;
2321 
2322 #ifdef PY3
2323 	m = PyModule_Create(&cimpl_moduledef);
2324 #else
2325 	m = Py_InitModule3("cimpl", cimpl_methods,
2326 			   "Confluent's Python client for Apache Kafka (C implementation)");
2327 #endif
2328 	if (!m)
2329 		return NULL;
2330 
2331 	Py_INCREF(&KafkaErrorType);
2332 	KafkaErrorType.tp_doc =
2333 		KafkaError_add_errs(KafkaErrorType.tp_dict,
2334 				    KafkaErrorType.tp_doc);
2335 	PyModule_AddObject(m, "KafkaError", (PyObject *)&KafkaErrorType);
2336 
2337 	Py_INCREF(&MessageType);
2338 	PyModule_AddObject(m, "Message", (PyObject *)&MessageType);
2339 
2340 	Py_INCREF(&TopicPartitionType);
2341 	PyModule_AddObject(m, "TopicPartition",
2342 			   (PyObject *)&TopicPartitionType);
2343 
2344 	Py_INCREF(&ProducerType);
2345 	PyModule_AddObject(m, "Producer", (PyObject *)&ProducerType);
2346 
2347 	Py_INCREF(&ConsumerType);
2348 	PyModule_AddObject(m, "Consumer", (PyObject *)&ConsumerType);
2349 
2350         Py_INCREF(&AdminType);
2351         PyModule_AddObject(m, "_AdminClientImpl", (PyObject *)&AdminType);
2352 
2353         AdminTypes_AddObjects(m);
2354 
2355 #if PY_VERSION_HEX >= 0x02070000
2356 	KafkaException = PyErr_NewExceptionWithDoc(
2357 		"cimpl.KafkaException",
2358 		"Kafka exception that wraps the :py:class:`KafkaError` "
2359 		"class.\n"
2360 		"\n"
2361 		"Use ``exception.args[0]`` to extract the "
2362 		":py:class:`KafkaError` object\n"
2363 		"\n",
2364 		NULL, NULL);
2365 #else
2366         KafkaException = PyErr_NewException("cimpl.KafkaException", NULL, NULL);
2367 #endif
2368 	Py_INCREF(KafkaException);
2369 	PyModule_AddObject(m, "KafkaException", KafkaException);
2370 
2371 	PyModule_AddIntConstant(m, "TIMESTAMP_NOT_AVAILABLE", RD_KAFKA_TIMESTAMP_NOT_AVAILABLE);
2372 	PyModule_AddIntConstant(m, "TIMESTAMP_CREATE_TIME", RD_KAFKA_TIMESTAMP_CREATE_TIME);
2373 	PyModule_AddIntConstant(m, "TIMESTAMP_LOG_APPEND_TIME", RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME);
2374 
2375         PyModule_AddIntConstant(m, "OFFSET_BEGINNING", RD_KAFKA_OFFSET_BEGINNING);
2376         PyModule_AddIntConstant(m, "OFFSET_END", RD_KAFKA_OFFSET_END);
2377         PyModule_AddIntConstant(m, "OFFSET_STORED", RD_KAFKA_OFFSET_STORED);
2378         PyModule_AddIntConstant(m, "OFFSET_INVALID", RD_KAFKA_OFFSET_INVALID);
2379 
2380 	return m;
2381 }
2382 
2383 
2384 #ifdef PY3
PyInit_cimpl(void)2385 PyMODINIT_FUNC PyInit_cimpl (void) {
2386 	return _init_cimpl();
2387 }
2388 #else
initcimpl(void)2389 PyMODINIT_FUNC initcimpl (void) {
2390 	_init_cimpl();
2391 }
2392 #endif
2393