1 /**
2  * Copyright 2016 Confluent Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "confluent_kafka.h"
18 
19 
20 /****************************************************************************
21  *
22  *
23  * Consumer
24  *
25  *
26  *
27  *
28  ****************************************************************************/
29 
30 
Consumer_clear0(Handle * self)31 static void Consumer_clear0 (Handle *self) {
32 	if (self->u.Consumer.on_assign) {
33 		Py_DECREF(self->u.Consumer.on_assign);
34 		self->u.Consumer.on_assign = NULL;
35 	}
36 	if (self->u.Consumer.on_revoke) {
37 		Py_DECREF(self->u.Consumer.on_revoke);
38 		self->u.Consumer.on_revoke = NULL;
39 	}
40 	if (self->u.Consumer.on_commit) {
41 		Py_DECREF(self->u.Consumer.on_commit);
42 		self->u.Consumer.on_commit = NULL;
43 	}
44 	if (self->u.Consumer.rkqu) {
45 	        rd_kafka_queue_destroy(self->u.Consumer.rkqu);
46 	        self->u.Consumer.rkqu = NULL;
47 	}
48 }
49 
Consumer_clear(Handle * self)50 static int Consumer_clear (Handle *self) {
51         Consumer_clear0(self);
52         Handle_clear(self);
53         return 0;
54 }
55 
Consumer_dealloc(Handle * self)56 static void Consumer_dealloc (Handle *self) {
57 	PyObject_GC_UnTrack(self);
58 
59         Consumer_clear0(self);
60 
61         if (self->rk) {
62                 CallState cs;
63 
64                 CallState_begin(self, &cs);
65 
66                 /* If application has not called c.close() then
67                  * rd_kafka_destroy() will, and that might trigger
68                  * callbacks to be called from consumer_close().
69                  * This should probably be fixed in librdkafka,
70                  * or the application. */
71                 rd_kafka_destroy(self->rk);
72 
73                 CallState_end(self, &cs);
74         }
75 
76         Handle_clear(self);
77 
78         Py_TYPE(self)->tp_free((PyObject *)self);
79 }
80 
Consumer_traverse(Handle * self,visitproc visit,void * arg)81 static int Consumer_traverse (Handle *self,
82 			      visitproc visit, void *arg) {
83 	if (self->u.Consumer.on_assign)
84 		Py_VISIT(self->u.Consumer.on_assign);
85 	if (self->u.Consumer.on_revoke)
86 		Py_VISIT(self->u.Consumer.on_revoke);
87 	if (self->u.Consumer.on_commit)
88 		Py_VISIT(self->u.Consumer.on_commit);
89 
90 	Handle_traverse(self, visit, arg);
91 
92 	return 0;
93 }
94 
95 
96 
97 
98 
99 
Consumer_subscribe(Handle * self,PyObject * args,PyObject * kwargs)100 static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
101 					 PyObject *kwargs) {
102 
103 	rd_kafka_topic_partition_list_t *topics;
104 	static char *kws[] = { "topics", "on_assign", "on_revoke", NULL };
105 	PyObject *tlist, *on_assign = NULL, *on_revoke = NULL;
106 	Py_ssize_t pos = 0;
107 	rd_kafka_resp_err_t err;
108 
109         if (!self->rk) {
110                 PyErr_SetString(PyExc_RuntimeError,
111                                 "Consumer closed");
112                 return NULL;
113         }
114 
115 	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OO", kws,
116 					 &tlist, &on_assign, &on_revoke))
117 		return NULL;
118 
119 	if (!PyList_Check(tlist)) {
120 		PyErr_Format(PyExc_TypeError,
121 			     "expected list of topic unicode strings");
122 		return NULL;
123 	}
124 
125 	if (on_assign && !PyCallable_Check(on_assign)) {
126 		PyErr_Format(PyExc_TypeError,
127 			     "on_assign expects a callable");
128 		return NULL;
129 	}
130 
131 	if (on_revoke && !PyCallable_Check(on_revoke)) {
132 		PyErr_Format(PyExc_TypeError,
133 			     "on_revoke expects a callable");
134 		return NULL;
135 	}
136 
137 	topics = rd_kafka_topic_partition_list_new((int)PyList_Size(tlist));
138 	for (pos = 0 ; pos < PyList_Size(tlist) ; pos++) {
139 		PyObject *o = PyList_GetItem(tlist, pos);
140 		PyObject *uo, *uo8;
141 		if (!(uo = cfl_PyObject_Unistr(o))) {
142 			PyErr_Format(PyExc_TypeError,
143 				     "expected list of unicode strings");
144 			rd_kafka_topic_partition_list_destroy(topics);
145 			return NULL;
146 		}
147 		rd_kafka_topic_partition_list_add(topics,
148 						  cfl_PyUnistr_AsUTF8(uo, &uo8),
149 						  RD_KAFKA_PARTITION_UA);
150                 Py_XDECREF(uo8);
151 		Py_DECREF(uo);
152 	}
153 
154 	err = rd_kafka_subscribe(self->rk, topics);
155 
156 	rd_kafka_topic_partition_list_destroy(topics);
157 
158 	if (err) {
159 		cfl_PyErr_Format(err,
160 				 "Failed to set subscription: %s",
161 				 rd_kafka_err2str(err));
162 		return NULL;
163 	}
164 
165 	/*
166 	 * Update rebalance callbacks
167 	 */
168 	if (self->u.Consumer.on_assign) {
169 		Py_DECREF(self->u.Consumer.on_assign);
170 		self->u.Consumer.on_assign = NULL;
171 	}
172 	if (on_assign) {
173 		self->u.Consumer.on_assign = on_assign;
174 		Py_INCREF(self->u.Consumer.on_assign);
175 	}
176 
177 	if (self->u.Consumer.on_revoke) {
178 		Py_DECREF(self->u.Consumer.on_revoke);
179 		self->u.Consumer.on_revoke = NULL;
180 	}
181 	if (on_revoke) {
182 		self->u.Consumer.on_revoke = on_revoke;
183 		Py_INCREF(self->u.Consumer.on_revoke);
184 	}
185 
186 	Py_RETURN_NONE;
187 }
188 
189 
Consumer_unsubscribe(Handle * self,PyObject * ignore)190 static PyObject *Consumer_unsubscribe (Handle *self,
191 					   PyObject *ignore) {
192 
193 	rd_kafka_resp_err_t err;
194 
195         if (!self->rk) {
196                 PyErr_SetString(PyExc_RuntimeError,
197                                 "Consumer closed");
198                 return NULL;
199         }
200 
201 	err = rd_kafka_unsubscribe(self->rk);
202 	if (err) {
203 		cfl_PyErr_Format(err,
204 				 "Failed to remove subscription: %s",
205 				 rd_kafka_err2str(err));
206 		return NULL;
207 	}
208 
209 	Py_RETURN_NONE;
210 }
211 
212 
Consumer_assign(Handle * self,PyObject * tlist)213 static PyObject *Consumer_assign (Handle *self, PyObject *tlist) {
214 
215 	rd_kafka_topic_partition_list_t *c_parts;
216 	rd_kafka_resp_err_t err;
217 
218         if (!self->rk) {
219                 PyErr_SetString(PyExc_RuntimeError,
220                                 "Consumer closed");
221                 return NULL;
222         }
223 
224 	if (!(c_parts = py_to_c_parts(tlist)))
225 		return NULL;
226 
227 	self->u.Consumer.rebalance_assigned++;
228 
229 	err = rd_kafka_assign(self->rk, c_parts);
230 
231 	rd_kafka_topic_partition_list_destroy(c_parts);
232 
233 	if (err) {
234 		cfl_PyErr_Format(err,
235 				 "Failed to set assignment: %s",
236 				 rd_kafka_err2str(err));
237 		return NULL;
238 	}
239 
240 	Py_RETURN_NONE;
241 }
242 
243 
Consumer_unassign(Handle * self,PyObject * ignore)244 static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) {
245 
246 	rd_kafka_resp_err_t err;
247 
248         if (!self->rk) {
249                 PyErr_SetString(PyExc_RuntimeError,
250                                 "Consumer closed");
251                 return NULL;
252         }
253 
254 	self->u.Consumer.rebalance_assigned++;
255 
256 	err = rd_kafka_assign(self->rk, NULL);
257 	if (err) {
258 		cfl_PyErr_Format(err,
259 				 "Failed to remove assignment: %s",
260 				 rd_kafka_err2str(err));
261 		return NULL;
262 	}
263 
264 	Py_RETURN_NONE;
265 }
266 
Consumer_assignment(Handle * self,PyObject * args,PyObject * kwargs)267 static PyObject *Consumer_assignment (Handle *self, PyObject *args,
268                                       PyObject *kwargs) {
269 
270         PyObject *plist;
271         rd_kafka_topic_partition_list_t *c_parts;
272         rd_kafka_resp_err_t err;
273 
274         if (!self->rk) {
275                 PyErr_SetString(PyExc_RuntimeError,
276                                 "Consumer closed");
277                 return NULL;
278         }
279 
280         err = rd_kafka_assignment(self->rk, &c_parts);
281         if (err) {
282                 cfl_PyErr_Format(err,
283                                  "Failed to get assignment: %s",
284                                  rd_kafka_err2str(err));
285                 return NULL;
286         }
287 
288 
289         plist = c_parts_to_py(c_parts);
290         rd_kafka_topic_partition_list_destroy(c_parts);
291 
292         return plist;
293 }
294 
295 
296 /**
297  * @brief Global offset commit on_commit callback trampoline triggered
298  *        from poll() et.al
299  */
Consumer_offset_commit_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * c_parts,void * opaque)300 static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
301                                        rd_kafka_topic_partition_list_t *c_parts,
302                                        void *opaque) {
303         Handle *self = opaque;
304         PyObject *parts, *k_err, *args, *result;
305         CallState *cs;
306 
307         if (!self->u.Consumer.on_commit)
308                 return;
309 
310         cs = CallState_get(self);
311 
312         /* Insantiate error object */
313         k_err = KafkaError_new_or_None(err, NULL);
314 
315         /* Construct list of TopicPartition based on 'c_parts' */
316         if (c_parts)
317                 parts = c_parts_to_py(c_parts);
318         else
319                 parts = PyList_New(0);
320 
321         args = Py_BuildValue("(OO)", k_err, parts);
322 
323         Py_DECREF(k_err);
324         Py_DECREF(parts);
325 
326         if (!args) {
327                 cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
328                                  "Unable to build callback args");
329                 CallState_crash(cs);
330                 CallState_resume(cs);
331                 return;
332         }
333 
334         result = PyObject_CallObject(self->u.Consumer.on_commit, args);
335 
336         Py_DECREF(args);
337 
338         if (result)
339                 Py_DECREF(result);
340         else {
341                 CallState_crash(cs);
342                 rd_kafka_yield(rk);
343         }
344 
345         CallState_resume(cs);
346 }
347 
348 /**
349  * @brief Simple struct to pass results from commit from offset_commit_return_cb
350  *        back to offset_commit() return value.
351  */
352 struct commit_return {
353         rd_kafka_resp_err_t err;
354         rd_kafka_topic_partition_list_t *c_parts;
355 };
356 
357 /**
358  * @brief Simple offset_commit_cb to pass the callback information
359  *        as return value from commit() through the commit_return struct.
360  *        Triggered from rd_kafka_commit_queue().
361  */
362 static void
Consumer_offset_commit_return_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * c_parts,void * opaque)363 Consumer_offset_commit_return_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
364                                   rd_kafka_topic_partition_list_t *c_parts,
365                                   void *opaque) {
366         struct commit_return *commit_return = opaque;
367 
368         commit_return->err = err;
369         if (c_parts)
370                 commit_return->c_parts =
371                         rd_kafka_topic_partition_list_copy(c_parts);
372 }
373 
374 
Consumer_commit(Handle * self,PyObject * args,PyObject * kwargs)375 static PyObject *Consumer_commit (Handle *self, PyObject *args,
376                                   PyObject *kwargs) {
377 	rd_kafka_resp_err_t err;
378 	PyObject *msg = NULL, *offsets = NULL, *async_o = NULL;
379 	rd_kafka_topic_partition_list_t *c_offsets;
380 	int async = 1;
381 	static char *kws[] = { "message", "offsets",
382                                "async", "asynchronous", NULL };
383         rd_kafka_queue_t *rkqu = NULL;
384         struct commit_return commit_return;
385         PyThreadState *thread_state;
386 
387         if (!self->rk) {
388                 PyErr_SetString(PyExc_RuntimeError,
389                                 "Consumer closed");
390                 return NULL;
391         }
392 
393 	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOOO", kws,
394 					 &msg, &offsets, &async_o, &async_o))
395 		return NULL;
396 
397 	if (msg && offsets) {
398 		PyErr_SetString(PyExc_ValueError,
399 				"message and offsets are mutually exclusive");
400 		return NULL;
401 	}
402 
403 	if (async_o)
404 		async = PyObject_IsTrue(async_o);
405 
406 
407 	if (offsets) {
408 
409 		if (!(c_offsets = py_to_c_parts(offsets)))
410 			return NULL;
411 	} else if (msg) {
412 		Message *m;
413                 PyObject *uo8;
414 
415 		if (PyObject_Type((PyObject *)msg) !=
416 		    (PyObject *)&MessageType) {
417 			PyErr_Format(PyExc_TypeError,
418 				     "expected %s", MessageType.tp_name);
419 			return NULL;
420 		}
421 
422 		m = (Message *)msg;
423 
424 		c_offsets = rd_kafka_topic_partition_list_new(1);
425 		rd_kafka_topic_partition_list_add(
426 			c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
427 			m->partition)->offset =m->offset + 1;
428                 Py_XDECREF(uo8);
429 
430 	} else {
431 		c_offsets = NULL;
432 	}
433 
434         if (async) {
435                 /* Async mode: Use consumer queue for offset commit
436                  *             served by consumer_poll() */
437                 rkqu = self->u.Consumer.rkqu;
438 
439         } else {
440                 /* Sync mode: Let commit_queue() trigger the callback. */
441                 memset(&commit_return, 0, sizeof(commit_return));
442 
443                 /* Unlock GIL while we are blocking. */
444                 thread_state = PyEval_SaveThread();
445         }
446 
447         err = rd_kafka_commit_queue(self->rk, c_offsets, rkqu,
448                                     async ?
449                                     Consumer_offset_commit_cb :
450                                     Consumer_offset_commit_return_cb,
451                                     async ?
452                                     (void *)self : (void *)&commit_return);
453 
454         if (c_offsets)
455                 rd_kafka_topic_partition_list_destroy(c_offsets);
456 
457         if (!async) {
458                 /* Re-lock GIL */
459                 PyEval_RestoreThread(thread_state);
460 
461                 /* Honour inner error (richer) from offset_commit_return_cb */
462                 if (commit_return.err)
463                         err = commit_return.err;
464         }
465 
466         if (err) {
467                 /* Outer error from commit_queue() */
468                 if (!async && commit_return.c_parts)
469                         rd_kafka_topic_partition_list_destroy(commit_return.c_parts);
470 
471                 cfl_PyErr_Format(err,
472                                  "Commit failed: %s", rd_kafka_err2str(err));
473                 return NULL;
474         }
475 
476         if (async) {
477                 /* async commit returns None when commit is in progress */
478                 Py_RETURN_NONE;
479 
480         } else {
481                 PyObject *plist;
482 
483                 /* sync commit returns the topic,partition,offset,err list */
484                 assert(commit_return.c_parts);
485 
486                 plist = c_parts_to_py(commit_return.c_parts);
487                 rd_kafka_topic_partition_list_destroy(commit_return.c_parts);
488 
489                 return plist;
490         }
491 }
492 
493 
494 
Consumer_store_offsets(Handle * self,PyObject * args,PyObject * kwargs)495 static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
496 						PyObject *kwargs) {
497 #if RD_KAFKA_VERSION < 0x000b0000
498 	PyErr_Format(PyExc_NotImplementedError,
499 		     "Consumer store_offsets require "
500 		     "confluent-kafka-python built for librdkafka "
501 		     "version >=v0.11.0 (librdkafka runtime 0x%x, "
502 		     "buildtime 0x%x)",
503 		     rd_kafka_version(), RD_KAFKA_VERSION);
504 	return NULL;
505 #else
506 	rd_kafka_resp_err_t err;
507 	PyObject *msg = NULL, *offsets = NULL;
508 	rd_kafka_topic_partition_list_t *c_offsets;
509 	static char *kws[] = { "message", "offsets", NULL };
510 
511         if (!self->rk) {
512                 PyErr_SetString(PyExc_RuntimeError,
513                                 "Consumer closed");
514                 return NULL;
515         }
516 
517 	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO", kws,
518 					 &msg, &offsets))
519 		return NULL;
520 
521 	if (msg && offsets) {
522 		PyErr_SetString(PyExc_ValueError,
523 				"message and offsets are mutually exclusive");
524 		return NULL;
525 	}
526 
527 	if (!msg && !offsets) {
528 		PyErr_SetString(PyExc_ValueError,
529 				"expected either message or offsets");
530 		return NULL;
531 	}
532 
533 	if (offsets) {
534 
535 		if (!(c_offsets = py_to_c_parts(offsets)))
536 			return NULL;
537 	} else {
538 		Message *m;
539 		PyObject *uo8;
540 
541 		if (PyObject_Type((PyObject *)msg) !=
542 		    (PyObject *)&MessageType) {
543 			PyErr_Format(PyExc_TypeError,
544 				     "expected %s", MessageType.tp_name);
545 			return NULL;
546 		}
547 
548 		m = (Message *)msg;
549 
550 		c_offsets = rd_kafka_topic_partition_list_new(1);
551 		rd_kafka_topic_partition_list_add(
552 			c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
553 			m->partition)->offset = m->offset + 1;
554 		Py_XDECREF(uo8);
555 	}
556 
557 
558 	err = rd_kafka_offsets_store(self->rk, c_offsets);
559 	rd_kafka_topic_partition_list_destroy(c_offsets);
560 
561 
562 
563 	if (err) {
564 		cfl_PyErr_Format(err,
565 				 "StoreOffsets failed: %s", rd_kafka_err2str(err));
566 		return NULL;
567 	}
568 
569 	Py_RETURN_NONE;
570 #endif
571 }
572 
573 
574 
Consumer_committed(Handle * self,PyObject * args,PyObject * kwargs)575 static PyObject *Consumer_committed (Handle *self, PyObject *args,
576 					 PyObject *kwargs) {
577 
578 	PyObject *plist;
579 	rd_kafka_topic_partition_list_t *c_parts;
580 	rd_kafka_resp_err_t err;
581 	double tmout = -1.0f;
582 	static char *kws[] = { "partitions", "timeout", NULL };
583 
584         if (!self->rk) {
585                 PyErr_SetString(PyExc_RuntimeError,
586                                 "Consumer closed");
587                 return NULL;
588         }
589 
590 	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kws,
591 					 &plist, &tmout))
592 		return NULL;
593 
594 
595 	if (!(c_parts = py_to_c_parts(plist)))
596 		return NULL;
597 
598         Py_BEGIN_ALLOW_THREADS;
599         err = rd_kafka_committed(self->rk, c_parts,
600                                  tmout >= 0 ? (int)(tmout * 1000.0f) : -1);
601         Py_END_ALLOW_THREADS;
602 
603 	if (err) {
604 		rd_kafka_topic_partition_list_destroy(c_parts);
605 		cfl_PyErr_Format(err,
606 				 "Failed to get committed offsets: %s",
607 				 rd_kafka_err2str(err));
608 		return NULL;
609 	}
610 
611 
612 	plist = c_parts_to_py(c_parts);
613 	rd_kafka_topic_partition_list_destroy(c_parts);
614 
615 	return plist;
616 }
617 
618 
Consumer_position(Handle * self,PyObject * args,PyObject * kwargs)619 static PyObject *Consumer_position (Handle *self, PyObject *args,
620 					PyObject *kwargs) {
621 
622 	PyObject *plist;
623 	rd_kafka_topic_partition_list_t *c_parts;
624 	rd_kafka_resp_err_t err;
625 	static char *kws[] = { "partitions", NULL };
626 
627         if (!self->rk) {
628                 PyErr_SetString(PyExc_RuntimeError,
629                                 "Consumer closed");
630                 return NULL;
631         }
632 
633 	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws,
634 					 &plist))
635 		return NULL;
636 
637 
638 	if (!(c_parts = py_to_c_parts(plist)))
639 		return NULL;
640 
641 	err = rd_kafka_position(self->rk, c_parts);
642 
643 	if (err) {
644 		rd_kafka_topic_partition_list_destroy(c_parts);
645 		cfl_PyErr_Format(err,
646 				 "Failed to get position: %s",
647 				 rd_kafka_err2str(err));
648 		return NULL;
649 	}
650 
651 
652 	plist = c_parts_to_py(c_parts);
653 	rd_kafka_topic_partition_list_destroy(c_parts);
654 
655 	return plist;
656 }
657 
Consumer_pause(Handle * self,PyObject * args,PyObject * kwargs)658 static PyObject *Consumer_pause(Handle *self, PyObject *args,
659                     PyObject *kwargs) {
660 
661     PyObject *plist;
662 	rd_kafka_topic_partition_list_t *c_parts;
663     rd_kafka_resp_err_t err;
664     static char *kws[] = {"partitions", NULL};
665 
666     if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws, &plist))
667         return NULL;
668 
669     if (!(c_parts = py_to_c_parts(plist)))
670         return NULL;
671 
672     err = rd_kafka_pause_partitions(self->rk, c_parts);
673     rd_kafka_topic_partition_list_destroy(c_parts);
674     if (err) {
675         cfl_PyErr_Format(err,
676                 "Failed to pause partitions: %s",
677                 rd_kafka_err2str(err));
678         return NULL;
679     }
680 	Py_RETURN_NONE;
681 }
682 
Consumer_resume(Handle * self,PyObject * args,PyObject * kwargs)683 static PyObject *Consumer_resume (Handle *self, PyObject *args,
684                     PyObject *kwargs) {
685 
686     PyObject *plist;
687 	rd_kafka_topic_partition_list_t *c_parts;
688     rd_kafka_resp_err_t err;
689     static char *kws[] = {"partitions", NULL};
690 
691     if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws, &plist))
692         return NULL;
693 
694     if (!(c_parts = py_to_c_parts(plist)))
695         return NULL;
696 
697     err = rd_kafka_resume_partitions(self->rk, c_parts);
698     rd_kafka_topic_partition_list_destroy(c_parts);
699     if (err) {
700         cfl_PyErr_Format(err,
701                 "Failed to resume partitions: %s",
702                 rd_kafka_err2str(err));
703         return NULL;
704     }
705 	Py_RETURN_NONE;
706 }
707 
708 
Consumer_seek(Handle * self,PyObject * args,PyObject * kwargs)709 static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs) {
710 
711         TopicPartition *tp;
712         rd_kafka_resp_err_t err;
713         static char *kws[] = { "partition", NULL };
714         rd_kafka_topic_t *rkt;
715 
716         if (!self->rk) {
717                 PyErr_SetString(PyExc_RuntimeError, "Consumer closed");
718                 return NULL;
719         }
720 
721         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws,
722                                          (PyObject **)&tp))
723                 return NULL;
724 
725 
726         if (PyObject_Type((PyObject *)tp) != (PyObject *)&TopicPartitionType) {
727                 PyErr_Format(PyExc_TypeError,
728                              "expected %s", TopicPartitionType.tp_name);
729                 return NULL;
730         }
731 
732         rkt = rd_kafka_topic_new(self->rk, tp->topic, NULL);
733         if (!rkt) {
734                 cfl_PyErr_Format(rd_kafka_last_error(),
735                                  "Failed to get topic object for "
736                                  "topic \"%s\": %s",
737                                  tp->topic,
738                                  rd_kafka_err2str(rd_kafka_last_error()));
739                 return NULL;
740         }
741 
742         Py_BEGIN_ALLOW_THREADS;
743         err = rd_kafka_seek(rkt, tp->partition, tp->offset, -1);
744         Py_END_ALLOW_THREADS;
745 
746         rd_kafka_topic_destroy(rkt);
747 
748         if (err) {
749                 cfl_PyErr_Format(err,
750                                  "Failed to seek to offset %"CFL_PRId64": %s",
751                                  tp->offset, rd_kafka_err2str(err));
752                 return NULL;
753         }
754 
755         Py_RETURN_NONE;
756 }
757 
758 
Consumer_get_watermark_offsets(Handle * self,PyObject * args,PyObject * kwargs)759 static PyObject *Consumer_get_watermark_offsets (Handle *self, PyObject *args,
760                                                  PyObject *kwargs) {
761 
762         TopicPartition *tp;
763         rd_kafka_resp_err_t err;
764         double tmout = -1.0f;
765         int cached = 0;
766         int64_t low = RD_KAFKA_OFFSET_INVALID, high = RD_KAFKA_OFFSET_INVALID;
767         static char *kws[] = { "partition", "timeout", "cached", NULL };
768         PyObject *rtup;
769 
770         if (!self->rk) {
771                 PyErr_SetString(PyExc_RuntimeError,
772                                 "Consumer closed");
773                 return NULL;
774         }
775 
776         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|db", kws,
777                                          (PyObject **)&tp, &tmout, &cached))
778                 return NULL;
779 
780 
781         if (PyObject_Type((PyObject *)tp) != (PyObject *)&TopicPartitionType) {
782                 PyErr_Format(PyExc_TypeError,
783                              "expected %s", TopicPartitionType.tp_name);
784                 return NULL;
785         }
786 
787         if (cached) {
788                 err = rd_kafka_get_watermark_offsets(self->rk,
789                                                      tp->topic, tp->partition,
790                                                      &low, &high);
791         } else {
792                 Py_BEGIN_ALLOW_THREADS;
793                 err = rd_kafka_query_watermark_offsets(self->rk,
794                                                        tp->topic, tp->partition,
795                                                        &low, &high,
796                                                        tmout >= 0 ? (int)(tmout * 1000.0f) : -1);
797                 Py_END_ALLOW_THREADS;
798         }
799 
800         if (err) {
801                 cfl_PyErr_Format(err,
802                                  "Failed to get watermark offsets: %s",
803                                  rd_kafka_err2str(err));
804                 return NULL;
805         }
806 
807         rtup = PyTuple_New(2);
808         PyTuple_SetItem(rtup, 0, PyLong_FromLongLong(low));
809         PyTuple_SetItem(rtup, 1, PyLong_FromLongLong(high));
810 
811         return rtup;
812 }
813 
814 
Consumer_offsets_for_times(Handle * self,PyObject * args,PyObject * kwargs)815 static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args,
816                                                  PyObject *kwargs) {
817 #if RD_KAFKA_VERSION < 0x000b0000
818 	PyErr_Format(PyExc_NotImplementedError,
819 		     "Consumer offsets_for_times require "
820 		     "confluent-kafka-python built for librdkafka "
821 		     "version >=v0.11.0 (librdkafka runtime 0x%x, "
822 		     "buildtime 0x%x)",
823 		     rd_kafka_version(), RD_KAFKA_VERSION);
824 	return NULL;
825 #else
826 
827 	PyObject *plist;
828 	double tmout = -1.0f;
829 	rd_kafka_topic_partition_list_t *c_parts;
830 	rd_kafka_resp_err_t err;
831 	static char *kws[] = { "partitions", "timeout", NULL };
832 
833         if (!self->rk) {
834                 PyErr_SetString(PyExc_RuntimeError,
835                                 "Consumer closed");
836                 return NULL;
837         }
838 
839 	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kws,
840 					 &plist, &tmout))
841 		return NULL;
842 
843         if (!(c_parts = py_to_c_parts(plist)))
844                 return NULL;
845 
846         Py_BEGIN_ALLOW_THREADS;
847         err = rd_kafka_offsets_for_times(self->rk,
848                                          c_parts,
849                                          tmout >= 0 ? (int)(tmout * 1000.0f) : -1);
850         Py_END_ALLOW_THREADS;
851 
852         if (err) {
853                 rd_kafka_topic_partition_list_destroy(c_parts);
854                 cfl_PyErr_Format(err,
855                                  "Failed to get offsets: %s",
856                                  rd_kafka_err2str(err));
857                 return NULL;
858         }
859 
860         plist = c_parts_to_py(c_parts);
861         rd_kafka_topic_partition_list_destroy(c_parts);
862 
863         return plist;
864 #endif
865 }
866 
867 
Consumer_poll(Handle * self,PyObject * args,PyObject * kwargs)868 static PyObject *Consumer_poll (Handle *self, PyObject *args,
869                                     PyObject *kwargs) {
870         double tmout = -1.0f;
871         static char *kws[] = { "timeout", NULL };
872         rd_kafka_message_t *rkm;
873         PyObject *msgobj;
874         CallState cs;
875 
876         if (!self->rk) {
877                 PyErr_SetString(PyExc_RuntimeError,
878                                 "Consumer closed");
879                 return NULL;
880         }
881 
882         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
883                 return NULL;
884 
885         CallState_begin(self, &cs);
886 
887         rkm = rd_kafka_consumer_poll(self->rk, tmout >= 0 ?
888                                      (int)(tmout * 1000.0f) : -1);
889 
890         if (!CallState_end(self, &cs)) {
891                 if (rkm)
892                         rd_kafka_message_destroy(rkm);
893                 return NULL;
894         }
895 
896         if (!rkm)
897                 Py_RETURN_NONE;
898 
899         msgobj = Message_new0(self, rkm);
900 #ifdef RD_KAFKA_V_HEADERS
901         // Have to detach headers outside Message_new0 because it declares the
902         // rk message as a const
903         rd_kafka_message_detach_headers(rkm, &((Message *)msgobj)->c_headers);
904 #endif
905         rd_kafka_message_destroy(rkm);
906 
907         return msgobj;
908 }
909 
910 
Consumer_consume(Handle * self,PyObject * args,PyObject * kwargs)911 static PyObject *Consumer_consume (Handle *self, PyObject *args,
912                                         PyObject *kwargs) {
913         unsigned int num_messages = 1;
914         double tmout = -1.0f;
915         static char *kws[] = { "num_messages", "timeout", NULL };
916         rd_kafka_message_t **rkmessages;
917         PyObject *msglist;
918         rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu;
919         CallState cs;
920         Py_ssize_t i, n;
921 
922         if (!self->rk) {
923                 PyErr_SetString(PyExc_RuntimeError,
924                                 "Consumer closed");
925                 return NULL;
926         }
927 
928         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Id", kws,
929 					 &num_messages, &tmout))
930 		return NULL;
931 
932 	if (num_messages > 1000000) {
933 	        PyErr_SetString(PyExc_ValueError,
934 	                        "num_messages must be between 0 and 1000000 (1M)");
935 	        return NULL;
936 	}
937 
938         CallState_begin(self, &cs);
939 
940         rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *));
941 
942         n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
943                 tmout >= 0 ? (int)(tmout * 1000.0f) : -1,
944                 rkmessages,
945                 num_messages);
946 
947         if (!CallState_end(self, &cs)) {
948                 for (i = 0; i < n; i++) {
949                         rd_kafka_message_destroy(rkmessages[i]);
950                 }
951                 free(rkmessages);
952                 return NULL;
953         }
954 
955         if (n < 0) {
956                 free(rkmessages);
957                 cfl_PyErr_Format(rd_kafka_last_error(),
958                                  "%s", rd_kafka_err2str(rd_kafka_last_error()));
959                 return NULL;
960         }
961 
962         msglist = PyList_New(n);
963 
964         for (i = 0; i < n; i++) {
965                 PyObject *msgobj = Message_new0(self, rkmessages[i]);
966 #ifdef RD_KAFKA_V_HEADERS
967                 // Have to detach headers outside Message_new0 because it declares the
968                 // rk message as a const
969                 rd_kafka_message_detach_headers(rkmessages[i], &((Message *)msgobj)->c_headers);
970 #endif
971                 PyList_SET_ITEM(msglist, i, msgobj);
972                 rd_kafka_message_destroy(rkmessages[i]);
973         }
974 
975         free(rkmessages);
976 
977         return msglist;
978 }
979 
980 
Consumer_close(Handle * self,PyObject * ignore)981 static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
982         CallState cs;
983 
984         if (!self->rk) {
985                 PyErr_SetString(PyExc_RuntimeError,
986                                 "Consumer already closed");
987                 return NULL;
988         }
989 
990         CallState_begin(self, &cs);
991 
992         rd_kafka_consumer_close(self->rk);
993 
994         if (self->u.Consumer.rkqu) {
995                 rd_kafka_queue_destroy(self->u.Consumer.rkqu);
996                 self->u.Consumer.rkqu = NULL;
997         }
998 
999         rd_kafka_destroy(self->rk);
1000         self->rk = NULL;
1001 
1002         if (!CallState_end(self, &cs))
1003                 return NULL;
1004 
1005         Py_RETURN_NONE;
1006 }
1007 
1008 
1009 static PyMethodDef Consumer_methods[] = {
1010 	{ "subscribe", (PyCFunction)Consumer_subscribe,
1011 	  METH_VARARGS|METH_KEYWORDS,
1012 	  ".. py:function:: subscribe(topics, [on_assign=None], [on_revoke=None])\n"
1013 	  "\n"
1014 	  "  Set subscription to supplied list of topics\n"
1015 	  "  This replaces a previous subscription.\n"
1016           "\n"
1017           "  Regexp pattern subscriptions are supported by prefixing "
1018           "the topic string with ``\"^\"``, e.g.::\n"
1019           "\n"
1020           "    consumer.subscribe([\"^my_topic.*\", \"^another[0-9]-?[a-z]+$\", \"not_a_regex\"])\n"
1021 	  "\n"
1022 	  "  :param list(str) topics: List of topics (strings) to subscribe to.\n"
1023 	  "  :param callable on_assign: callback to provide handling of "
1024 	  "customized offsets on completion of a successful partition "
1025 	  "re-assignment.\n"
1026 	  "  :param callable on_revoke: callback to provide handling of "
1027 	  "offset commits to a customized store on the start of a "
1028 	  "rebalance operation.\n"
1029 	  "\n"
1030 	  "  :raises KafkaException:\n"
1031       "  :raises: RuntimeError if called on a closed consumer\n"
1032 	  "\n"
1033 	  "\n"
1034 	  ".. py:function:: on_assign(consumer, partitions)\n"
1035 	  ".. py:function:: on_revoke(consumer, partitions)\n"
1036 	  "\n"
1037 	  "  :param Consumer consumer: Consumer instance.\n"
1038 	  "  :param list(TopicPartition) partitions: Absolute list of partitions being assigned or revoked.\n"
1039 	  "\n"
1040 	},
1041         { "unsubscribe", (PyCFunction)Consumer_unsubscribe, METH_NOARGS,
1042           "  Remove current subscription.\n"
1043           "\n"
1044           "  :raises: KafkaException\n"
1045           "  :raises: RuntimeError if called on a closed consumer\n"
1046           "\n"
1047         },
1048 	{ "poll", (PyCFunction)Consumer_poll,
1049 	  METH_VARARGS|METH_KEYWORDS,
1050 	  ".. py:function:: poll([timeout=None])\n"
1051 	  "\n"
1052 	  "  Consume messages, calls callbacks and returns events.\n"
1053 	  "\n"
1054 	  "  The application must check the returned :py:class:`Message` "
1055 	  "object's :py:func:`Message.error()` method to distinguish "
1056 	  "between proper messages (error() returns None), or an event or "
1057 	  "error (see error().code() for specifics).\n"
1058 	  "\n"
1059 	  "  .. note: Callbacks may be called from this method, "
1060 	  "such as ``on_assign``, ``on_revoke``, et.al.\n"
1061 	  "\n"
1062 	  "  :param float timeout: Maximum time to block waiting for message, event or callback. (Seconds)\n"
1063 	  "  :returns: A Message object or None on timeout\n"
1064 	  "  :rtype: :py:class:`Message` or None\n"
1065       "  :raises: RuntimeError if called on a closed consumer\n"
1066 	  "\n"
1067 	},
1068 	{ "consume", (PyCFunction)Consumer_consume,
1069 	  METH_VARARGS|METH_KEYWORDS,
1070 	  ".. py:function:: consume([num_messages=1], [timeout=-1])\n"
1071 	  "\n"
1072 	  "  Consume messages, calls callbacks and returns list of messages "
1073 	  "(possibly empty on timeout).\n"
1074 	  "\n"
1075 	  "  The application must check the returned :py:class:`Message` "
1076 	  "object's :py:func:`Message.error()` method to distinguish "
1077 	  "between proper messages (error() returns None), or an event or "
1078 	  "error for each :py:class:`Message` in the list (see error().code() "
1079 	  "for specifics).\n"
1080 	  "\n"
1081 	  "  .. note: Callbacks may be called from this method, "
1082 	  "such as ``on_assign``, ``on_revoke``, et.al.\n"
1083 	  "\n"
1084 	  "  :param int num_messages: Maximum number of messages to return (default: 1).\n"
1085 	  "  :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)\n"
1086 	  "  :returns: A list of Message objects (possibly empty on timeout)\n"
1087 	  "  :rtype: list(Message)\n"
1088           "  :raises RuntimeError: if called on a closed consumer\n"
1089           "  :raises KafkaError: in case of internal error\n"
1090           "  :raises ValueError: if num_messages > 1M\n"
1091 	  "\n"
1092 	},
1093 	{ "assign", (PyCFunction)Consumer_assign, METH_O,
1094 	  ".. py:function:: assign(partitions)\n"
1095 	  "\n"
1096 	  "  Set consumer partition assignment to the provided list of "
1097 	  ":py:class:`TopicPartition` and starts consuming.\n"
1098 	  "\n"
1099 	  "  :param list(TopicPartition) partitions: List of topic+partitions and optionally initial offsets to start consuming.\n"
1100           "  :raises: RuntimeError if called on a closed consumer\n"
1101 	  "\n"
1102 	},
1103         { "unassign", (PyCFunction)Consumer_unassign, METH_NOARGS,
1104           "  Removes the current partition assignment and stops consuming.\n"
1105           "\n"
1106           "  :raises KafkaException:\n"
1107           "  :raises RuntimeError: if called on a closed consumer\n"
1108           "\n"
1109         },
1110         { "assignment", (PyCFunction)Consumer_assignment,
1111           METH_VARARGS|METH_KEYWORDS,
1112           "  Returns the current partition assignment.\n"
1113           "\n"
1114           "  :returns: List of assigned topic+partitions.\n"
1115           "  :rtype: list(TopicPartition)\n"
1116           "  :raises: KafkaException\n"
1117           "  :raises: RuntimeError if called on a closed consumer\n"
1118           "\n"
1119         },
1120 	{ "store_offsets", (PyCFunction)Consumer_store_offsets, METH_VARARGS|METH_KEYWORDS,
1121 	  ".. py:function:: store_offsets([message=None], [offsets=None])\n"
1122 	  "\n"
1123 	  "  Store offsets for a message or a list of offsets.\n"
1124 	  "\n"
1125 	  "  ``message`` and ``offsets`` are mutually exclusive. "
1126 	  "The stored offsets will be committed according to 'auto.commit.interval.ms' or manual "
1127 	  "offset-less :py:meth:`commit`. "
1128 	  "Note that 'enable.auto.offset.store' must be set to False when using this API.\n"
1129 	  "\n"
1130 	  "  :param confluent_kafka.Message message: Store message's offset+1.\n"
1131 	  "  :param list(TopicPartition) offsets: List of topic+partitions+offsets to store.\n"
1132 	  "  :rtype: None\n"
1133 	  "  :raises: KafkaException\n"
1134       "  :raises: RuntimeError if called on a closed consumer\n"
1135 	  "\n"
1136 	},
1137 	{ "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS,
1138 	  ".. py:function:: commit([message=None], [offsets=None], [asynchronous=True])\n"
1139 	  "\n"
1140 	  "  Commit a message or a list of offsets.\n"
1141 	  "\n"
1142 	  "  ``message`` and ``offsets`` are mutually exclusive, if neither is set "
1143 	  "the current partition assignment's offsets are used instead. "
1144 	  "The consumer relies on your use of this method if you have set 'enable.auto.commit' to False\n"
1145 	  "\n"
1146 	  "  :param confluent_kafka.Message message: Commit message's offset+1.\n"
1147 	  "  :param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.\n"
1148 	  "  :param bool asynchronous: Asynchronous commit, return None immediately. "
1149           "If False the commit() call will block until the commit succeeds or "
1150           "fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition will need to be checked for success.\n"
1151 	  "  :rtype: None|list(TopicPartition)\n"
1152 	  "  :raises: KafkaException\n"
1153       "  :raises: RuntimeError if called on a closed consumer\n"
1154 	  "\n"
1155 	},
1156 	{ "committed", (PyCFunction)Consumer_committed,
1157 	  METH_VARARGS|METH_KEYWORDS,
1158 	  ".. py:function:: committed(partitions, [timeout=None])\n"
1159 	  "\n"
1160 	  "  Retrieve committed offsets for the list of partitions.\n"
1161 	  "\n"
1162 	  "  :param list(TopicPartition) partitions: List of topic+partitions "
1163 	  "to query for stored offsets.\n"
1164 	  "  :param float timeout: Request timeout. (Seconds)\n"
1165 	  "  :returns: List of topic+partitions with offset and possibly error set.\n"
1166 	  "  :rtype: list(TopicPartition)\n"
1167 	  "  :raises: KafkaException\n"
1168       "  :raises: RuntimeError if called on a closed consumer\n"
1169 	  "\n"
1170 	},
1171 	{ "position", (PyCFunction)Consumer_position,
1172 	  METH_VARARGS|METH_KEYWORDS,
1173 	  ".. py:function:: position(partitions)\n"
1174 	  "\n"
1175 	  "  Retrieve current positions (offsets) for the list of partitions.\n"
1176 	  "\n"
1177 	  "  :param list(TopicPartition) partitions: List of topic+partitions "
1178 	  "to return current offsets for. The current offset is the offset of the "
1179 	  "last consumed message + 1.\n"
1180 	  "  :returns: List of topic+partitions with offset and possibly error set.\n"
1181 	  "  :rtype: list(TopicPartition)\n"
1182 	  "  :raises: KafkaException\n"
1183       "  :raises: RuntimeError if called on a closed consumer\n"
1184 	  "\n"
1185 	},
1186 	{ "pause", (PyCFunction)Consumer_pause,
1187 	  METH_VARARGS|METH_KEYWORDS,
1188 	  ".. py:function:: pause(partitions)\n"
1189 	  "\n"
1190 	  "  Pause consumption for the provided list of partitions.\n"
1191 	  "\n"
1192 	  "  :param list(TopicPartition) partitions: List of topic+partitions "
1193 	  "to pause.\n"
1194 	  "  :rtype: None\n"
1195 	  "  :raises: KafkaException\n"
1196 	  "\n"
1197 	},
1198 	{ "resume", (PyCFunction)Consumer_resume,
1199 	  METH_VARARGS|METH_KEYWORDS,
1200 	  ".. py:function:: resume(partitions)\n"
1201 	  "\n"
1202 	  "  Resume consumption for the provided list of partitions.\n"
1203 	  "\n"
1204 	  "  :param list(TopicPartition) partitions: List of topic+partitions "
1205 	  "to resume.\n"
1206 	  "  :rtype: None\n"
1207 	  "  :raises: KafkaException\n"
1208 	  "\n"
1209 	},
1210         { "seek", (PyCFunction)Consumer_seek,
1211           METH_VARARGS|METH_KEYWORDS,
1212           ".. py:function:: seek(partition)\n"
1213           "\n"
1214           "  Set consume position for partition to offset.\n"
1215           "  The offset may be an absolute (>=0) or a\n"
1216           "  logical offset (:py:const:`OFFSET_BEGINNING` et.al).\n"
1217           "\n"
1218           "  seek() may only be used to update the consume offset of an\n"
1219           "  actively consumed partition (i.e., after :py:const:`assign()`),\n"
1220           "  to set the starting offset of partition not being consumed instead\n"
1221           "  pass the offset in an `assign()` call.\n"
1222           "\n"
1223           "  :param TopicPartition partition: Topic+partition+offset to seek to.\n"
1224           "\n"
1225           "  :raises: KafkaException\n"
1226           "\n"
1227         },
1228         { "get_watermark_offsets", (PyCFunction)Consumer_get_watermark_offsets,
1229           METH_VARARGS|METH_KEYWORDS,
1230           ".. py:function:: get_watermark_offsets(partition, [timeout=None], [cached=False])\n"
1231           "\n"
1232           "  Retrieve low and high offsets for partition.\n"
1233           "\n"
1234           "  :param TopicPartition partition: Topic+partition to return offsets for.\n"
1235           "  :param float timeout: Request timeout (when cached=False). (Seconds)\n"
1236           "  :param bool cached: Instead of querying the broker used cached information. "
1237           "Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while "
1238           "the high offset is updated on each message fetched from the broker for this partition.\n"
1239           "  :returns: Tuple of (low,high) on success or None on timeout. "
1240           "The high offset is the offset of the last message + 1.\n"
1241           "  :rtype: tuple(int,int)\n"
1242           "  :raises: KafkaException\n"
1243           "  :raises: RuntimeError if called on a closed consumer\n"
1244           "\n"
1245         },
1246         { "offsets_for_times", (PyCFunction)Consumer_offsets_for_times,
1247           METH_VARARGS|METH_KEYWORDS,
1248           ".. py:function:: offsets_for_times(partitions, [timeout=None])\n"
1249           "\n"
1250           " offsets_for_times looks up offsets by timestamp for the given partitions.\n"
1251           "\n"
1252           " The returned offsets for each partition is the earliest offset whose\n"
1253           " timestamp is greater than or equal to the given timestamp in the\n"
1254           " corresponding partition.\n"
1255           "\n"
1256           "  :param list(TopicPartition) partitions: topic+partitions with timestamps in the TopicPartition.offset field.\n"
1257           "  :param float timeout: Request timeout. (Seconds)\n"
1258           "  :returns: list of topic+partition with offset field set and possibly error set\n"
1259           "  :rtype: list(TopicPartition)\n"
1260           "  :raises: KafkaException\n"
1261           "  :raises: RuntimeError if called on a closed consumer\n"
1262           "\n"
1263         },
1264 	{ "close", (PyCFunction)Consumer_close, METH_NOARGS,
1265 	  "\n"
1266 	  "  Close down and terminate the Kafka Consumer.\n"
1267 	  "\n"
1268 	  "  Actions performed:\n"
1269 	  "\n"
1270 	  "  - Stops consuming\n"
1271 	  "  - Commits offsets - except if the consumer property 'enable.auto.commit' is set to False\n"
1272 	  "  - Leave consumer group\n"
1273 	  "\n"
1274 	  "  .. note: Registered callbacks may be called from this method, "
1275 	  "see :py:func::`poll()` for more info.\n"
1276 	  "\n"
1277 	  "  :rtype: None\n"
1278       "  :raises: RuntimeError if called on a closed consumer\n"
1279 	  "\n"
1280 	},
1281         { "list_topics", (PyCFunction)list_topics, METH_VARARGS|METH_KEYWORDS,
1282           list_topics_doc
1283         },
1284 
1285 	{ NULL }
1286 };
1287 
1288 
Consumer_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * c_parts,void * opaque)1289 static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
1290 				   rd_kafka_topic_partition_list_t *c_parts,
1291 				   void *opaque) {
1292 	Handle *self = opaque;
1293 	CallState *cs;
1294 
1295 	cs = CallState_get(self);
1296 
1297 	self->u.Consumer.rebalance_assigned = 0;
1298 
1299 	if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS &&
1300 	     self->u.Consumer.on_assign) ||
1301 	    (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS &&
1302 	     self->u.Consumer.on_revoke)) {
1303 		PyObject *parts;
1304 		PyObject *args, *result;
1305 
1306 		/* Construct list of TopicPartition based on 'c_parts' */
1307 		parts = c_parts_to_py(c_parts);
1308 
1309 		args = Py_BuildValue("(OO)", self, parts);
1310 
1311 		Py_DECREF(parts);
1312 
1313 		if (!args) {
1314 			cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
1315 					 "Unable to build callback args");
1316 			CallState_crash(cs);
1317 			CallState_resume(cs);
1318 			return;
1319 		}
1320 
1321 		result = PyObject_CallObject(
1322 			err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
1323 			self->u.Consumer.on_assign :
1324 			self->u.Consumer.on_revoke, args);
1325 
1326 		Py_DECREF(args);
1327 
1328 		if (result)
1329 			Py_DECREF(result);
1330 		else {
1331 			CallState_crash(cs);
1332 			rd_kafka_yield(rk);
1333 		}
1334 	}
1335 
1336 	/* Fallback: librdkafka needs the rebalance_cb to call assign()
1337 	 * to synchronize state, if the user did not do this from callback,
1338 	 * or there was no callback, or the callback failed, then we perform
1339 	 * that assign() call here instead. */
1340 	if (!self->u.Consumer.rebalance_assigned) {
1341 		if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
1342 			rd_kafka_assign(rk, c_parts);
1343 		else
1344 			rd_kafka_assign(rk, NULL);
1345 	}
1346 
1347 	CallState_resume(cs);
1348 }
1349 
1350 
1351 
1352 
1353 
Consumer_init(PyObject * selfobj,PyObject * args,PyObject * kwargs)1354 static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
1355         Handle *self = (Handle *)selfobj;
1356         char errstr[256];
1357         rd_kafka_conf_t *conf;
1358 
1359         if (self->rk) {
1360                 PyErr_SetString(PyExc_RuntimeError,
1361                                 "Consumer already initialized");
1362                 return -1;
1363         }
1364 
1365         self->type = RD_KAFKA_CONSUMER;
1366 
1367         if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
1368                                        args, kwargs)))
1369                 return -1; /* Exception raised by ..conf_setup() */
1370 
1371         rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
1372         rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);
1373 
1374         self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
1375                                 errstr, sizeof(errstr));
1376         if (!self->rk) {
1377                 cfl_PyErr_Format(rd_kafka_last_error(),
1378                                  "Failed to create consumer: %s", errstr);
1379                 rd_kafka_conf_destroy(conf);
1380                 return -1;
1381         }
1382 
1383         /* Forward log messages to main queue which is then forwarded
1384          * to the consumer queue */
1385         if (self->logger)
1386                 rd_kafka_set_log_queue(self->rk, NULL);
1387 
1388         rd_kafka_poll_set_consumer(self->rk);
1389 
1390         self->u.Consumer.rkqu = rd_kafka_queue_get_consumer(self->rk);
1391         assert(self->u.Consumer.rkqu);
1392 
1393         return 0;
1394 }
1395 
Consumer_new(PyTypeObject * type,PyObject * args,PyObject * kwargs)1396 static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
1397                                PyObject *kwargs) {
1398         return type->tp_alloc(type, 0);
1399 }
1400 
1401 
1402 PyTypeObject ConsumerType = {
1403 	PyVarObject_HEAD_INIT(NULL, 0)
1404 	"cimpl.Consumer",        /*tp_name*/
1405 	sizeof(Handle),          /*tp_basicsize*/
1406 	0,                         /*tp_itemsize*/
1407 	(destructor)Consumer_dealloc, /*tp_dealloc*/
1408 	0,                         /*tp_print*/
1409 	0,                         /*tp_getattr*/
1410 	0,                         /*tp_setattr*/
1411 	0,                         /*tp_compare*/
1412 	0,                         /*tp_repr*/
1413 	0,                         /*tp_as_number*/
1414 	0,                         /*tp_as_sequence*/
1415 	0,                         /*tp_as_mapping*/
1416 	0,                         /*tp_hash */
1417 	0,                         /*tp_call*/
1418 	0,                         /*tp_str*/
1419 	0,                         /*tp_getattro*/
1420 	0,                         /*tp_setattro*/
1421 	0,                         /*tp_as_buffer*/
1422 	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
1423 	Py_TPFLAGS_HAVE_GC, /*tp_flags*/
1424         "High-level Kafka Consumer\n"
1425         "\n"
1426         ".. py:function:: Consumer(config)\n"
1427         "\n"
1428         "  :param dict config: Configuration properties. At a minimum ``group.id`` **must** be set,"
1429                 " ``bootstrap.servers`` **should** be set"
1430         "\n"
1431         "Create new Consumer instance using provided configuration dict.\n"
1432         "\n"
1433         " Special configuration properties:\n"
1434                 "   ``on_commit``: Optional callback will be called when a commit "
1435         "request has succeeded or failed.\n"
1436         "\n"
1437         "\n"
1438         ".. py:function:: on_commit(err, partitions)\n"
1439         "\n"
1440         "  :param KafkaError err: Commit error object, or None on success.\n"
1441         "  :param list(TopicPartition) partitions: List of partitions with "
1442         "their committed offsets or per-partition errors.\n"
1443         "\n"
1444         "\n", /*tp_doc*/
1445 	(traverseproc)Consumer_traverse, /* tp_traverse */
1446 	(inquiry)Consumer_clear, /* tp_clear */
1447 	0,		           /* tp_richcompare */
1448 	0,		           /* tp_weaklistoffset */
1449 	0,		           /* tp_iter */
1450 	0,		           /* tp_iternext */
1451 	Consumer_methods,      /* tp_methods */
1452 	0,                         /* tp_members */
1453 	0,                         /* tp_getset */
1454 	0,                         /* tp_base */
1455 	0,                         /* tp_dict */
1456 	0,                         /* tp_descr_get */
1457 	0,                         /* tp_descr_set */
1458 	0,                         /* tp_dictoffset */
1459         Consumer_init,             /* tp_init */
1460 	0,                         /* tp_alloc */
1461 	Consumer_new           /* tp_new */
1462 };
1463