1 /**
2 * Copyright 2016 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "confluent_kafka.h"
18
19
20 /****************************************************************************
21 *
22 *
23 * Consumer
24 *
25 *
26 *
27 *
28 ****************************************************************************/
29
30
Consumer_clear0(Handle * self)31 static void Consumer_clear0 (Handle *self) {
32 if (self->u.Consumer.on_assign) {
33 Py_DECREF(self->u.Consumer.on_assign);
34 self->u.Consumer.on_assign = NULL;
35 }
36 if (self->u.Consumer.on_revoke) {
37 Py_DECREF(self->u.Consumer.on_revoke);
38 self->u.Consumer.on_revoke = NULL;
39 }
40 if (self->u.Consumer.on_commit) {
41 Py_DECREF(self->u.Consumer.on_commit);
42 self->u.Consumer.on_commit = NULL;
43 }
44 if (self->u.Consumer.rkqu) {
45 rd_kafka_queue_destroy(self->u.Consumer.rkqu);
46 self->u.Consumer.rkqu = NULL;
47 }
48 }
49
Consumer_clear(Handle * self)50 static int Consumer_clear (Handle *self) {
51 Consumer_clear0(self);
52 Handle_clear(self);
53 return 0;
54 }
55
Consumer_dealloc(Handle * self)56 static void Consumer_dealloc (Handle *self) {
57 PyObject_GC_UnTrack(self);
58
59 Consumer_clear0(self);
60
61 if (self->rk) {
62 CallState cs;
63
64 CallState_begin(self, &cs);
65
66 /* If application has not called c.close() then
67 * rd_kafka_destroy() will, and that might trigger
68 * callbacks to be called from consumer_close().
69 * This should probably be fixed in librdkafka,
70 * or the application. */
71 rd_kafka_destroy(self->rk);
72
73 CallState_end(self, &cs);
74 }
75
76 Handle_clear(self);
77
78 Py_TYPE(self)->tp_free((PyObject *)self);
79 }
80
Consumer_traverse(Handle * self,visitproc visit,void * arg)81 static int Consumer_traverse (Handle *self,
82 visitproc visit, void *arg) {
83 if (self->u.Consumer.on_assign)
84 Py_VISIT(self->u.Consumer.on_assign);
85 if (self->u.Consumer.on_revoke)
86 Py_VISIT(self->u.Consumer.on_revoke);
87 if (self->u.Consumer.on_commit)
88 Py_VISIT(self->u.Consumer.on_commit);
89
90 Handle_traverse(self, visit, arg);
91
92 return 0;
93 }
94
95
96
97
98
99
Consumer_subscribe(Handle * self,PyObject * args,PyObject * kwargs)100 static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
101 PyObject *kwargs) {
102
103 rd_kafka_topic_partition_list_t *topics;
104 static char *kws[] = { "topics", "on_assign", "on_revoke", NULL };
105 PyObject *tlist, *on_assign = NULL, *on_revoke = NULL;
106 Py_ssize_t pos = 0;
107 rd_kafka_resp_err_t err;
108
109 if (!self->rk) {
110 PyErr_SetString(PyExc_RuntimeError,
111 "Consumer closed");
112 return NULL;
113 }
114
115 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OO", kws,
116 &tlist, &on_assign, &on_revoke))
117 return NULL;
118
119 if (!PyList_Check(tlist)) {
120 PyErr_Format(PyExc_TypeError,
121 "expected list of topic unicode strings");
122 return NULL;
123 }
124
125 if (on_assign && !PyCallable_Check(on_assign)) {
126 PyErr_Format(PyExc_TypeError,
127 "on_assign expects a callable");
128 return NULL;
129 }
130
131 if (on_revoke && !PyCallable_Check(on_revoke)) {
132 PyErr_Format(PyExc_TypeError,
133 "on_revoke expects a callable");
134 return NULL;
135 }
136
137 topics = rd_kafka_topic_partition_list_new((int)PyList_Size(tlist));
138 for (pos = 0 ; pos < PyList_Size(tlist) ; pos++) {
139 PyObject *o = PyList_GetItem(tlist, pos);
140 PyObject *uo, *uo8;
141 if (!(uo = cfl_PyObject_Unistr(o))) {
142 PyErr_Format(PyExc_TypeError,
143 "expected list of unicode strings");
144 rd_kafka_topic_partition_list_destroy(topics);
145 return NULL;
146 }
147 rd_kafka_topic_partition_list_add(topics,
148 cfl_PyUnistr_AsUTF8(uo, &uo8),
149 RD_KAFKA_PARTITION_UA);
150 Py_XDECREF(uo8);
151 Py_DECREF(uo);
152 }
153
154 err = rd_kafka_subscribe(self->rk, topics);
155
156 rd_kafka_topic_partition_list_destroy(topics);
157
158 if (err) {
159 cfl_PyErr_Format(err,
160 "Failed to set subscription: %s",
161 rd_kafka_err2str(err));
162 return NULL;
163 }
164
165 /*
166 * Update rebalance callbacks
167 */
168 if (self->u.Consumer.on_assign) {
169 Py_DECREF(self->u.Consumer.on_assign);
170 self->u.Consumer.on_assign = NULL;
171 }
172 if (on_assign) {
173 self->u.Consumer.on_assign = on_assign;
174 Py_INCREF(self->u.Consumer.on_assign);
175 }
176
177 if (self->u.Consumer.on_revoke) {
178 Py_DECREF(self->u.Consumer.on_revoke);
179 self->u.Consumer.on_revoke = NULL;
180 }
181 if (on_revoke) {
182 self->u.Consumer.on_revoke = on_revoke;
183 Py_INCREF(self->u.Consumer.on_revoke);
184 }
185
186 Py_RETURN_NONE;
187 }
188
189
Consumer_unsubscribe(Handle * self,PyObject * ignore)190 static PyObject *Consumer_unsubscribe (Handle *self,
191 PyObject *ignore) {
192
193 rd_kafka_resp_err_t err;
194
195 if (!self->rk) {
196 PyErr_SetString(PyExc_RuntimeError,
197 "Consumer closed");
198 return NULL;
199 }
200
201 err = rd_kafka_unsubscribe(self->rk);
202 if (err) {
203 cfl_PyErr_Format(err,
204 "Failed to remove subscription: %s",
205 rd_kafka_err2str(err));
206 return NULL;
207 }
208
209 Py_RETURN_NONE;
210 }
211
212
Consumer_assign(Handle * self,PyObject * tlist)213 static PyObject *Consumer_assign (Handle *self, PyObject *tlist) {
214
215 rd_kafka_topic_partition_list_t *c_parts;
216 rd_kafka_resp_err_t err;
217
218 if (!self->rk) {
219 PyErr_SetString(PyExc_RuntimeError,
220 "Consumer closed");
221 return NULL;
222 }
223
224 if (!(c_parts = py_to_c_parts(tlist)))
225 return NULL;
226
227 self->u.Consumer.rebalance_assigned++;
228
229 err = rd_kafka_assign(self->rk, c_parts);
230
231 rd_kafka_topic_partition_list_destroy(c_parts);
232
233 if (err) {
234 cfl_PyErr_Format(err,
235 "Failed to set assignment: %s",
236 rd_kafka_err2str(err));
237 return NULL;
238 }
239
240 Py_RETURN_NONE;
241 }
242
243
Consumer_unassign(Handle * self,PyObject * ignore)244 static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) {
245
246 rd_kafka_resp_err_t err;
247
248 if (!self->rk) {
249 PyErr_SetString(PyExc_RuntimeError,
250 "Consumer closed");
251 return NULL;
252 }
253
254 self->u.Consumer.rebalance_assigned++;
255
256 err = rd_kafka_assign(self->rk, NULL);
257 if (err) {
258 cfl_PyErr_Format(err,
259 "Failed to remove assignment: %s",
260 rd_kafka_err2str(err));
261 return NULL;
262 }
263
264 Py_RETURN_NONE;
265 }
266
Consumer_assignment(Handle * self,PyObject * args,PyObject * kwargs)267 static PyObject *Consumer_assignment (Handle *self, PyObject *args,
268 PyObject *kwargs) {
269
270 PyObject *plist;
271 rd_kafka_topic_partition_list_t *c_parts;
272 rd_kafka_resp_err_t err;
273
274 if (!self->rk) {
275 PyErr_SetString(PyExc_RuntimeError,
276 "Consumer closed");
277 return NULL;
278 }
279
280 err = rd_kafka_assignment(self->rk, &c_parts);
281 if (err) {
282 cfl_PyErr_Format(err,
283 "Failed to get assignment: %s",
284 rd_kafka_err2str(err));
285 return NULL;
286 }
287
288
289 plist = c_parts_to_py(c_parts);
290 rd_kafka_topic_partition_list_destroy(c_parts);
291
292 return plist;
293 }
294
295
296 /**
297 * @brief Global offset commit on_commit callback trampoline triggered
298 * from poll() et.al
299 */
Consumer_offset_commit_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * c_parts,void * opaque)300 static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
301 rd_kafka_topic_partition_list_t *c_parts,
302 void *opaque) {
303 Handle *self = opaque;
304 PyObject *parts, *k_err, *args, *result;
305 CallState *cs;
306
307 if (!self->u.Consumer.on_commit)
308 return;
309
310 cs = CallState_get(self);
311
312 /* Insantiate error object */
313 k_err = KafkaError_new_or_None(err, NULL);
314
315 /* Construct list of TopicPartition based on 'c_parts' */
316 if (c_parts)
317 parts = c_parts_to_py(c_parts);
318 else
319 parts = PyList_New(0);
320
321 args = Py_BuildValue("(OO)", k_err, parts);
322
323 Py_DECREF(k_err);
324 Py_DECREF(parts);
325
326 if (!args) {
327 cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
328 "Unable to build callback args");
329 CallState_crash(cs);
330 CallState_resume(cs);
331 return;
332 }
333
334 result = PyObject_CallObject(self->u.Consumer.on_commit, args);
335
336 Py_DECREF(args);
337
338 if (result)
339 Py_DECREF(result);
340 else {
341 CallState_crash(cs);
342 rd_kafka_yield(rk);
343 }
344
345 CallState_resume(cs);
346 }
347
348 /**
349 * @brief Simple struct to pass results from commit from offset_commit_return_cb
350 * back to offset_commit() return value.
351 */
352 struct commit_return {
353 rd_kafka_resp_err_t err;
354 rd_kafka_topic_partition_list_t *c_parts;
355 };
356
357 /**
358 * @brief Simple offset_commit_cb to pass the callback information
359 * as return value from commit() through the commit_return struct.
360 * Triggered from rd_kafka_commit_queue().
361 */
362 static void
Consumer_offset_commit_return_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * c_parts,void * opaque)363 Consumer_offset_commit_return_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
364 rd_kafka_topic_partition_list_t *c_parts,
365 void *opaque) {
366 struct commit_return *commit_return = opaque;
367
368 commit_return->err = err;
369 if (c_parts)
370 commit_return->c_parts =
371 rd_kafka_topic_partition_list_copy(c_parts);
372 }
373
374
Consumer_commit(Handle * self,PyObject * args,PyObject * kwargs)375 static PyObject *Consumer_commit (Handle *self, PyObject *args,
376 PyObject *kwargs) {
377 rd_kafka_resp_err_t err;
378 PyObject *msg = NULL, *offsets = NULL, *async_o = NULL;
379 rd_kafka_topic_partition_list_t *c_offsets;
380 int async = 1;
381 static char *kws[] = { "message", "offsets",
382 "async", "asynchronous", NULL };
383 rd_kafka_queue_t *rkqu = NULL;
384 struct commit_return commit_return;
385 PyThreadState *thread_state;
386
387 if (!self->rk) {
388 PyErr_SetString(PyExc_RuntimeError,
389 "Consumer closed");
390 return NULL;
391 }
392
393 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOOO", kws,
394 &msg, &offsets, &async_o, &async_o))
395 return NULL;
396
397 if (msg && offsets) {
398 PyErr_SetString(PyExc_ValueError,
399 "message and offsets are mutually exclusive");
400 return NULL;
401 }
402
403 if (async_o)
404 async = PyObject_IsTrue(async_o);
405
406
407 if (offsets) {
408
409 if (!(c_offsets = py_to_c_parts(offsets)))
410 return NULL;
411 } else if (msg) {
412 Message *m;
413 PyObject *uo8;
414
415 if (PyObject_Type((PyObject *)msg) !=
416 (PyObject *)&MessageType) {
417 PyErr_Format(PyExc_TypeError,
418 "expected %s", MessageType.tp_name);
419 return NULL;
420 }
421
422 m = (Message *)msg;
423
424 c_offsets = rd_kafka_topic_partition_list_new(1);
425 rd_kafka_topic_partition_list_add(
426 c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
427 m->partition)->offset =m->offset + 1;
428 Py_XDECREF(uo8);
429
430 } else {
431 c_offsets = NULL;
432 }
433
434 if (async) {
435 /* Async mode: Use consumer queue for offset commit
436 * served by consumer_poll() */
437 rkqu = self->u.Consumer.rkqu;
438
439 } else {
440 /* Sync mode: Let commit_queue() trigger the callback. */
441 memset(&commit_return, 0, sizeof(commit_return));
442
443 /* Unlock GIL while we are blocking. */
444 thread_state = PyEval_SaveThread();
445 }
446
447 err = rd_kafka_commit_queue(self->rk, c_offsets, rkqu,
448 async ?
449 Consumer_offset_commit_cb :
450 Consumer_offset_commit_return_cb,
451 async ?
452 (void *)self : (void *)&commit_return);
453
454 if (c_offsets)
455 rd_kafka_topic_partition_list_destroy(c_offsets);
456
457 if (!async) {
458 /* Re-lock GIL */
459 PyEval_RestoreThread(thread_state);
460
461 /* Honour inner error (richer) from offset_commit_return_cb */
462 if (commit_return.err)
463 err = commit_return.err;
464 }
465
466 if (err) {
467 /* Outer error from commit_queue() */
468 if (!async && commit_return.c_parts)
469 rd_kafka_topic_partition_list_destroy(commit_return.c_parts);
470
471 cfl_PyErr_Format(err,
472 "Commit failed: %s", rd_kafka_err2str(err));
473 return NULL;
474 }
475
476 if (async) {
477 /* async commit returns None when commit is in progress */
478 Py_RETURN_NONE;
479
480 } else {
481 PyObject *plist;
482
483 /* sync commit returns the topic,partition,offset,err list */
484 assert(commit_return.c_parts);
485
486 plist = c_parts_to_py(commit_return.c_parts);
487 rd_kafka_topic_partition_list_destroy(commit_return.c_parts);
488
489 return plist;
490 }
491 }
492
493
494
Consumer_store_offsets(Handle * self,PyObject * args,PyObject * kwargs)495 static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
496 PyObject *kwargs) {
497 #if RD_KAFKA_VERSION < 0x000b0000
498 PyErr_Format(PyExc_NotImplementedError,
499 "Consumer store_offsets require "
500 "confluent-kafka-python built for librdkafka "
501 "version >=v0.11.0 (librdkafka runtime 0x%x, "
502 "buildtime 0x%x)",
503 rd_kafka_version(), RD_KAFKA_VERSION);
504 return NULL;
505 #else
506 rd_kafka_resp_err_t err;
507 PyObject *msg = NULL, *offsets = NULL;
508 rd_kafka_topic_partition_list_t *c_offsets;
509 static char *kws[] = { "message", "offsets", NULL };
510
511 if (!self->rk) {
512 PyErr_SetString(PyExc_RuntimeError,
513 "Consumer closed");
514 return NULL;
515 }
516
517 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO", kws,
518 &msg, &offsets))
519 return NULL;
520
521 if (msg && offsets) {
522 PyErr_SetString(PyExc_ValueError,
523 "message and offsets are mutually exclusive");
524 return NULL;
525 }
526
527 if (!msg && !offsets) {
528 PyErr_SetString(PyExc_ValueError,
529 "expected either message or offsets");
530 return NULL;
531 }
532
533 if (offsets) {
534
535 if (!(c_offsets = py_to_c_parts(offsets)))
536 return NULL;
537 } else {
538 Message *m;
539 PyObject *uo8;
540
541 if (PyObject_Type((PyObject *)msg) !=
542 (PyObject *)&MessageType) {
543 PyErr_Format(PyExc_TypeError,
544 "expected %s", MessageType.tp_name);
545 return NULL;
546 }
547
548 m = (Message *)msg;
549
550 c_offsets = rd_kafka_topic_partition_list_new(1);
551 rd_kafka_topic_partition_list_add(
552 c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
553 m->partition)->offset = m->offset + 1;
554 Py_XDECREF(uo8);
555 }
556
557
558 err = rd_kafka_offsets_store(self->rk, c_offsets);
559 rd_kafka_topic_partition_list_destroy(c_offsets);
560
561
562
563 if (err) {
564 cfl_PyErr_Format(err,
565 "StoreOffsets failed: %s", rd_kafka_err2str(err));
566 return NULL;
567 }
568
569 Py_RETURN_NONE;
570 #endif
571 }
572
573
574
Consumer_committed(Handle * self,PyObject * args,PyObject * kwargs)575 static PyObject *Consumer_committed (Handle *self, PyObject *args,
576 PyObject *kwargs) {
577
578 PyObject *plist;
579 rd_kafka_topic_partition_list_t *c_parts;
580 rd_kafka_resp_err_t err;
581 double tmout = -1.0f;
582 static char *kws[] = { "partitions", "timeout", NULL };
583
584 if (!self->rk) {
585 PyErr_SetString(PyExc_RuntimeError,
586 "Consumer closed");
587 return NULL;
588 }
589
590 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kws,
591 &plist, &tmout))
592 return NULL;
593
594
595 if (!(c_parts = py_to_c_parts(plist)))
596 return NULL;
597
598 Py_BEGIN_ALLOW_THREADS;
599 err = rd_kafka_committed(self->rk, c_parts,
600 tmout >= 0 ? (int)(tmout * 1000.0f) : -1);
601 Py_END_ALLOW_THREADS;
602
603 if (err) {
604 rd_kafka_topic_partition_list_destroy(c_parts);
605 cfl_PyErr_Format(err,
606 "Failed to get committed offsets: %s",
607 rd_kafka_err2str(err));
608 return NULL;
609 }
610
611
612 plist = c_parts_to_py(c_parts);
613 rd_kafka_topic_partition_list_destroy(c_parts);
614
615 return plist;
616 }
617
618
Consumer_position(Handle * self,PyObject * args,PyObject * kwargs)619 static PyObject *Consumer_position (Handle *self, PyObject *args,
620 PyObject *kwargs) {
621
622 PyObject *plist;
623 rd_kafka_topic_partition_list_t *c_parts;
624 rd_kafka_resp_err_t err;
625 static char *kws[] = { "partitions", NULL };
626
627 if (!self->rk) {
628 PyErr_SetString(PyExc_RuntimeError,
629 "Consumer closed");
630 return NULL;
631 }
632
633 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws,
634 &plist))
635 return NULL;
636
637
638 if (!(c_parts = py_to_c_parts(plist)))
639 return NULL;
640
641 err = rd_kafka_position(self->rk, c_parts);
642
643 if (err) {
644 rd_kafka_topic_partition_list_destroy(c_parts);
645 cfl_PyErr_Format(err,
646 "Failed to get position: %s",
647 rd_kafka_err2str(err));
648 return NULL;
649 }
650
651
652 plist = c_parts_to_py(c_parts);
653 rd_kafka_topic_partition_list_destroy(c_parts);
654
655 return plist;
656 }
657
Consumer_pause(Handle * self,PyObject * args,PyObject * kwargs)658 static PyObject *Consumer_pause(Handle *self, PyObject *args,
659 PyObject *kwargs) {
660
661 PyObject *plist;
662 rd_kafka_topic_partition_list_t *c_parts;
663 rd_kafka_resp_err_t err;
664 static char *kws[] = {"partitions", NULL};
665
666 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws, &plist))
667 return NULL;
668
669 if (!(c_parts = py_to_c_parts(plist)))
670 return NULL;
671
672 err = rd_kafka_pause_partitions(self->rk, c_parts);
673 rd_kafka_topic_partition_list_destroy(c_parts);
674 if (err) {
675 cfl_PyErr_Format(err,
676 "Failed to pause partitions: %s",
677 rd_kafka_err2str(err));
678 return NULL;
679 }
680 Py_RETURN_NONE;
681 }
682
Consumer_resume(Handle * self,PyObject * args,PyObject * kwargs)683 static PyObject *Consumer_resume (Handle *self, PyObject *args,
684 PyObject *kwargs) {
685
686 PyObject *plist;
687 rd_kafka_topic_partition_list_t *c_parts;
688 rd_kafka_resp_err_t err;
689 static char *kws[] = {"partitions", NULL};
690
691 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws, &plist))
692 return NULL;
693
694 if (!(c_parts = py_to_c_parts(plist)))
695 return NULL;
696
697 err = rd_kafka_resume_partitions(self->rk, c_parts);
698 rd_kafka_topic_partition_list_destroy(c_parts);
699 if (err) {
700 cfl_PyErr_Format(err,
701 "Failed to resume partitions: %s",
702 rd_kafka_err2str(err));
703 return NULL;
704 }
705 Py_RETURN_NONE;
706 }
707
708
Consumer_seek(Handle * self,PyObject * args,PyObject * kwargs)709 static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs) {
710
711 TopicPartition *tp;
712 rd_kafka_resp_err_t err;
713 static char *kws[] = { "partition", NULL };
714 rd_kafka_topic_t *rkt;
715
716 if (!self->rk) {
717 PyErr_SetString(PyExc_RuntimeError, "Consumer closed");
718 return NULL;
719 }
720
721 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws,
722 (PyObject **)&tp))
723 return NULL;
724
725
726 if (PyObject_Type((PyObject *)tp) != (PyObject *)&TopicPartitionType) {
727 PyErr_Format(PyExc_TypeError,
728 "expected %s", TopicPartitionType.tp_name);
729 return NULL;
730 }
731
732 rkt = rd_kafka_topic_new(self->rk, tp->topic, NULL);
733 if (!rkt) {
734 cfl_PyErr_Format(rd_kafka_last_error(),
735 "Failed to get topic object for "
736 "topic \"%s\": %s",
737 tp->topic,
738 rd_kafka_err2str(rd_kafka_last_error()));
739 return NULL;
740 }
741
742 Py_BEGIN_ALLOW_THREADS;
743 err = rd_kafka_seek(rkt, tp->partition, tp->offset, -1);
744 Py_END_ALLOW_THREADS;
745
746 rd_kafka_topic_destroy(rkt);
747
748 if (err) {
749 cfl_PyErr_Format(err,
750 "Failed to seek to offset %"CFL_PRId64": %s",
751 tp->offset, rd_kafka_err2str(err));
752 return NULL;
753 }
754
755 Py_RETURN_NONE;
756 }
757
758
Consumer_get_watermark_offsets(Handle * self,PyObject * args,PyObject * kwargs)759 static PyObject *Consumer_get_watermark_offsets (Handle *self, PyObject *args,
760 PyObject *kwargs) {
761
762 TopicPartition *tp;
763 rd_kafka_resp_err_t err;
764 double tmout = -1.0f;
765 int cached = 0;
766 int64_t low = RD_KAFKA_OFFSET_INVALID, high = RD_KAFKA_OFFSET_INVALID;
767 static char *kws[] = { "partition", "timeout", "cached", NULL };
768 PyObject *rtup;
769
770 if (!self->rk) {
771 PyErr_SetString(PyExc_RuntimeError,
772 "Consumer closed");
773 return NULL;
774 }
775
776 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|db", kws,
777 (PyObject **)&tp, &tmout, &cached))
778 return NULL;
779
780
781 if (PyObject_Type((PyObject *)tp) != (PyObject *)&TopicPartitionType) {
782 PyErr_Format(PyExc_TypeError,
783 "expected %s", TopicPartitionType.tp_name);
784 return NULL;
785 }
786
787 if (cached) {
788 err = rd_kafka_get_watermark_offsets(self->rk,
789 tp->topic, tp->partition,
790 &low, &high);
791 } else {
792 Py_BEGIN_ALLOW_THREADS;
793 err = rd_kafka_query_watermark_offsets(self->rk,
794 tp->topic, tp->partition,
795 &low, &high,
796 tmout >= 0 ? (int)(tmout * 1000.0f) : -1);
797 Py_END_ALLOW_THREADS;
798 }
799
800 if (err) {
801 cfl_PyErr_Format(err,
802 "Failed to get watermark offsets: %s",
803 rd_kafka_err2str(err));
804 return NULL;
805 }
806
807 rtup = PyTuple_New(2);
808 PyTuple_SetItem(rtup, 0, PyLong_FromLongLong(low));
809 PyTuple_SetItem(rtup, 1, PyLong_FromLongLong(high));
810
811 return rtup;
812 }
813
814
Consumer_offsets_for_times(Handle * self,PyObject * args,PyObject * kwargs)815 static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args,
816 PyObject *kwargs) {
817 #if RD_KAFKA_VERSION < 0x000b0000
818 PyErr_Format(PyExc_NotImplementedError,
819 "Consumer offsets_for_times require "
820 "confluent-kafka-python built for librdkafka "
821 "version >=v0.11.0 (librdkafka runtime 0x%x, "
822 "buildtime 0x%x)",
823 rd_kafka_version(), RD_KAFKA_VERSION);
824 return NULL;
825 #else
826
827 PyObject *plist;
828 double tmout = -1.0f;
829 rd_kafka_topic_partition_list_t *c_parts;
830 rd_kafka_resp_err_t err;
831 static char *kws[] = { "partitions", "timeout", NULL };
832
833 if (!self->rk) {
834 PyErr_SetString(PyExc_RuntimeError,
835 "Consumer closed");
836 return NULL;
837 }
838
839 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kws,
840 &plist, &tmout))
841 return NULL;
842
843 if (!(c_parts = py_to_c_parts(plist)))
844 return NULL;
845
846 Py_BEGIN_ALLOW_THREADS;
847 err = rd_kafka_offsets_for_times(self->rk,
848 c_parts,
849 tmout >= 0 ? (int)(tmout * 1000.0f) : -1);
850 Py_END_ALLOW_THREADS;
851
852 if (err) {
853 rd_kafka_topic_partition_list_destroy(c_parts);
854 cfl_PyErr_Format(err,
855 "Failed to get offsets: %s",
856 rd_kafka_err2str(err));
857 return NULL;
858 }
859
860 plist = c_parts_to_py(c_parts);
861 rd_kafka_topic_partition_list_destroy(c_parts);
862
863 return plist;
864 #endif
865 }
866
867
Consumer_poll(Handle * self,PyObject * args,PyObject * kwargs)868 static PyObject *Consumer_poll (Handle *self, PyObject *args,
869 PyObject *kwargs) {
870 double tmout = -1.0f;
871 static char *kws[] = { "timeout", NULL };
872 rd_kafka_message_t *rkm;
873 PyObject *msgobj;
874 CallState cs;
875
876 if (!self->rk) {
877 PyErr_SetString(PyExc_RuntimeError,
878 "Consumer closed");
879 return NULL;
880 }
881
882 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
883 return NULL;
884
885 CallState_begin(self, &cs);
886
887 rkm = rd_kafka_consumer_poll(self->rk, tmout >= 0 ?
888 (int)(tmout * 1000.0f) : -1);
889
890 if (!CallState_end(self, &cs)) {
891 if (rkm)
892 rd_kafka_message_destroy(rkm);
893 return NULL;
894 }
895
896 if (!rkm)
897 Py_RETURN_NONE;
898
899 msgobj = Message_new0(self, rkm);
900 #ifdef RD_KAFKA_V_HEADERS
901 // Have to detach headers outside Message_new0 because it declares the
902 // rk message as a const
903 rd_kafka_message_detach_headers(rkm, &((Message *)msgobj)->c_headers);
904 #endif
905 rd_kafka_message_destroy(rkm);
906
907 return msgobj;
908 }
909
910
Consumer_consume(Handle * self,PyObject * args,PyObject * kwargs)911 static PyObject *Consumer_consume (Handle *self, PyObject *args,
912 PyObject *kwargs) {
913 unsigned int num_messages = 1;
914 double tmout = -1.0f;
915 static char *kws[] = { "num_messages", "timeout", NULL };
916 rd_kafka_message_t **rkmessages;
917 PyObject *msglist;
918 rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu;
919 CallState cs;
920 Py_ssize_t i, n;
921
922 if (!self->rk) {
923 PyErr_SetString(PyExc_RuntimeError,
924 "Consumer closed");
925 return NULL;
926 }
927
928 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Id", kws,
929 &num_messages, &tmout))
930 return NULL;
931
932 if (num_messages > 1000000) {
933 PyErr_SetString(PyExc_ValueError,
934 "num_messages must be between 0 and 1000000 (1M)");
935 return NULL;
936 }
937
938 CallState_begin(self, &cs);
939
940 rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *));
941
942 n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
943 tmout >= 0 ? (int)(tmout * 1000.0f) : -1,
944 rkmessages,
945 num_messages);
946
947 if (!CallState_end(self, &cs)) {
948 for (i = 0; i < n; i++) {
949 rd_kafka_message_destroy(rkmessages[i]);
950 }
951 free(rkmessages);
952 return NULL;
953 }
954
955 if (n < 0) {
956 free(rkmessages);
957 cfl_PyErr_Format(rd_kafka_last_error(),
958 "%s", rd_kafka_err2str(rd_kafka_last_error()));
959 return NULL;
960 }
961
962 msglist = PyList_New(n);
963
964 for (i = 0; i < n; i++) {
965 PyObject *msgobj = Message_new0(self, rkmessages[i]);
966 #ifdef RD_KAFKA_V_HEADERS
967 // Have to detach headers outside Message_new0 because it declares the
968 // rk message as a const
969 rd_kafka_message_detach_headers(rkmessages[i], &((Message *)msgobj)->c_headers);
970 #endif
971 PyList_SET_ITEM(msglist, i, msgobj);
972 rd_kafka_message_destroy(rkmessages[i]);
973 }
974
975 free(rkmessages);
976
977 return msglist;
978 }
979
980
Consumer_close(Handle * self,PyObject * ignore)981 static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
982 CallState cs;
983
984 if (!self->rk) {
985 PyErr_SetString(PyExc_RuntimeError,
986 "Consumer already closed");
987 return NULL;
988 }
989
990 CallState_begin(self, &cs);
991
992 rd_kafka_consumer_close(self->rk);
993
994 if (self->u.Consumer.rkqu) {
995 rd_kafka_queue_destroy(self->u.Consumer.rkqu);
996 self->u.Consumer.rkqu = NULL;
997 }
998
999 rd_kafka_destroy(self->rk);
1000 self->rk = NULL;
1001
1002 if (!CallState_end(self, &cs))
1003 return NULL;
1004
1005 Py_RETURN_NONE;
1006 }
1007
1008
1009 static PyMethodDef Consumer_methods[] = {
1010 { "subscribe", (PyCFunction)Consumer_subscribe,
1011 METH_VARARGS|METH_KEYWORDS,
1012 ".. py:function:: subscribe(topics, [on_assign=None], [on_revoke=None])\n"
1013 "\n"
1014 " Set subscription to supplied list of topics\n"
1015 " This replaces a previous subscription.\n"
1016 "\n"
1017 " Regexp pattern subscriptions are supported by prefixing "
1018 "the topic string with ``\"^\"``, e.g.::\n"
1019 "\n"
1020 " consumer.subscribe([\"^my_topic.*\", \"^another[0-9]-?[a-z]+$\", \"not_a_regex\"])\n"
1021 "\n"
1022 " :param list(str) topics: List of topics (strings) to subscribe to.\n"
1023 " :param callable on_assign: callback to provide handling of "
1024 "customized offsets on completion of a successful partition "
1025 "re-assignment.\n"
1026 " :param callable on_revoke: callback to provide handling of "
1027 "offset commits to a customized store on the start of a "
1028 "rebalance operation.\n"
1029 "\n"
1030 " :raises KafkaException:\n"
1031 " :raises: RuntimeError if called on a closed consumer\n"
1032 "\n"
1033 "\n"
1034 ".. py:function:: on_assign(consumer, partitions)\n"
1035 ".. py:function:: on_revoke(consumer, partitions)\n"
1036 "\n"
1037 " :param Consumer consumer: Consumer instance.\n"
1038 " :param list(TopicPartition) partitions: Absolute list of partitions being assigned or revoked.\n"
1039 "\n"
1040 },
1041 { "unsubscribe", (PyCFunction)Consumer_unsubscribe, METH_NOARGS,
1042 " Remove current subscription.\n"
1043 "\n"
1044 " :raises: KafkaException\n"
1045 " :raises: RuntimeError if called on a closed consumer\n"
1046 "\n"
1047 },
1048 { "poll", (PyCFunction)Consumer_poll,
1049 METH_VARARGS|METH_KEYWORDS,
1050 ".. py:function:: poll([timeout=None])\n"
1051 "\n"
1052 " Consume messages, calls callbacks and returns events.\n"
1053 "\n"
1054 " The application must check the returned :py:class:`Message` "
1055 "object's :py:func:`Message.error()` method to distinguish "
1056 "between proper messages (error() returns None), or an event or "
1057 "error (see error().code() for specifics).\n"
1058 "\n"
1059 " .. note: Callbacks may be called from this method, "
1060 "such as ``on_assign``, ``on_revoke``, et.al.\n"
1061 "\n"
1062 " :param float timeout: Maximum time to block waiting for message, event or callback. (Seconds)\n"
1063 " :returns: A Message object or None on timeout\n"
1064 " :rtype: :py:class:`Message` or None\n"
1065 " :raises: RuntimeError if called on a closed consumer\n"
1066 "\n"
1067 },
1068 { "consume", (PyCFunction)Consumer_consume,
1069 METH_VARARGS|METH_KEYWORDS,
1070 ".. py:function:: consume([num_messages=1], [timeout=-1])\n"
1071 "\n"
1072 " Consume messages, calls callbacks and returns list of messages "
1073 "(possibly empty on timeout).\n"
1074 "\n"
1075 " The application must check the returned :py:class:`Message` "
1076 "object's :py:func:`Message.error()` method to distinguish "
1077 "between proper messages (error() returns None), or an event or "
1078 "error for each :py:class:`Message` in the list (see error().code() "
1079 "for specifics).\n"
1080 "\n"
1081 " .. note: Callbacks may be called from this method, "
1082 "such as ``on_assign``, ``on_revoke``, et.al.\n"
1083 "\n"
1084 " :param int num_messages: Maximum number of messages to return (default: 1).\n"
1085 " :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)\n"
1086 " :returns: A list of Message objects (possibly empty on timeout)\n"
1087 " :rtype: list(Message)\n"
1088 " :raises RuntimeError: if called on a closed consumer\n"
1089 " :raises KafkaError: in case of internal error\n"
1090 " :raises ValueError: if num_messages > 1M\n"
1091 "\n"
1092 },
1093 { "assign", (PyCFunction)Consumer_assign, METH_O,
1094 ".. py:function:: assign(partitions)\n"
1095 "\n"
1096 " Set consumer partition assignment to the provided list of "
1097 ":py:class:`TopicPartition` and starts consuming.\n"
1098 "\n"
1099 " :param list(TopicPartition) partitions: List of topic+partitions and optionally initial offsets to start consuming.\n"
1100 " :raises: RuntimeError if called on a closed consumer\n"
1101 "\n"
1102 },
1103 { "unassign", (PyCFunction)Consumer_unassign, METH_NOARGS,
1104 " Removes the current partition assignment and stops consuming.\n"
1105 "\n"
1106 " :raises KafkaException:\n"
1107 " :raises RuntimeError: if called on a closed consumer\n"
1108 "\n"
1109 },
1110 { "assignment", (PyCFunction)Consumer_assignment,
1111 METH_VARARGS|METH_KEYWORDS,
1112 " Returns the current partition assignment.\n"
1113 "\n"
1114 " :returns: List of assigned topic+partitions.\n"
1115 " :rtype: list(TopicPartition)\n"
1116 " :raises: KafkaException\n"
1117 " :raises: RuntimeError if called on a closed consumer\n"
1118 "\n"
1119 },
1120 { "store_offsets", (PyCFunction)Consumer_store_offsets, METH_VARARGS|METH_KEYWORDS,
1121 ".. py:function:: store_offsets([message=None], [offsets=None])\n"
1122 "\n"
1123 " Store offsets for a message or a list of offsets.\n"
1124 "\n"
1125 " ``message`` and ``offsets`` are mutually exclusive. "
1126 "The stored offsets will be committed according to 'auto.commit.interval.ms' or manual "
1127 "offset-less :py:meth:`commit`. "
1128 "Note that 'enable.auto.offset.store' must be set to False when using this API.\n"
1129 "\n"
1130 " :param confluent_kafka.Message message: Store message's offset+1.\n"
1131 " :param list(TopicPartition) offsets: List of topic+partitions+offsets to store.\n"
1132 " :rtype: None\n"
1133 " :raises: KafkaException\n"
1134 " :raises: RuntimeError if called on a closed consumer\n"
1135 "\n"
1136 },
1137 { "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS,
1138 ".. py:function:: commit([message=None], [offsets=None], [asynchronous=True])\n"
1139 "\n"
1140 " Commit a message or a list of offsets.\n"
1141 "\n"
1142 " ``message`` and ``offsets`` are mutually exclusive, if neither is set "
1143 "the current partition assignment's offsets are used instead. "
1144 "The consumer relies on your use of this method if you have set 'enable.auto.commit' to False\n"
1145 "\n"
1146 " :param confluent_kafka.Message message: Commit message's offset+1.\n"
1147 " :param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.\n"
1148 " :param bool asynchronous: Asynchronous commit, return None immediately. "
1149 "If False the commit() call will block until the commit succeeds or "
1150 "fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition will need to be checked for success.\n"
1151 " :rtype: None|list(TopicPartition)\n"
1152 " :raises: KafkaException\n"
1153 " :raises: RuntimeError if called on a closed consumer\n"
1154 "\n"
1155 },
1156 { "committed", (PyCFunction)Consumer_committed,
1157 METH_VARARGS|METH_KEYWORDS,
1158 ".. py:function:: committed(partitions, [timeout=None])\n"
1159 "\n"
1160 " Retrieve committed offsets for the list of partitions.\n"
1161 "\n"
1162 " :param list(TopicPartition) partitions: List of topic+partitions "
1163 "to query for stored offsets.\n"
1164 " :param float timeout: Request timeout. (Seconds)\n"
1165 " :returns: List of topic+partitions with offset and possibly error set.\n"
1166 " :rtype: list(TopicPartition)\n"
1167 " :raises: KafkaException\n"
1168 " :raises: RuntimeError if called on a closed consumer\n"
1169 "\n"
1170 },
1171 { "position", (PyCFunction)Consumer_position,
1172 METH_VARARGS|METH_KEYWORDS,
1173 ".. py:function:: position(partitions)\n"
1174 "\n"
1175 " Retrieve current positions (offsets) for the list of partitions.\n"
1176 "\n"
1177 " :param list(TopicPartition) partitions: List of topic+partitions "
1178 "to return current offsets for. The current offset is the offset of the "
1179 "last consumed message + 1.\n"
1180 " :returns: List of topic+partitions with offset and possibly error set.\n"
1181 " :rtype: list(TopicPartition)\n"
1182 " :raises: KafkaException\n"
1183 " :raises: RuntimeError if called on a closed consumer\n"
1184 "\n"
1185 },
1186 { "pause", (PyCFunction)Consumer_pause,
1187 METH_VARARGS|METH_KEYWORDS,
1188 ".. py:function:: pause(partitions)\n"
1189 "\n"
1190 " Pause consumption for the provided list of partitions.\n"
1191 "\n"
1192 " :param list(TopicPartition) partitions: List of topic+partitions "
1193 "to pause.\n"
1194 " :rtype: None\n"
1195 " :raises: KafkaException\n"
1196 "\n"
1197 },
1198 { "resume", (PyCFunction)Consumer_resume,
1199 METH_VARARGS|METH_KEYWORDS,
1200 ".. py:function:: resume(partitions)\n"
1201 "\n"
1202 " Resume consumption for the provided list of partitions.\n"
1203 "\n"
1204 " :param list(TopicPartition) partitions: List of topic+partitions "
1205 "to resume.\n"
1206 " :rtype: None\n"
1207 " :raises: KafkaException\n"
1208 "\n"
1209 },
1210 { "seek", (PyCFunction)Consumer_seek,
1211 METH_VARARGS|METH_KEYWORDS,
1212 ".. py:function:: seek(partition)\n"
1213 "\n"
1214 " Set consume position for partition to offset.\n"
1215 " The offset may be an absolute (>=0) or a\n"
1216 " logical offset (:py:const:`OFFSET_BEGINNING` et.al).\n"
1217 "\n"
1218 " seek() may only be used to update the consume offset of an\n"
1219 " actively consumed partition (i.e., after :py:const:`assign()`),\n"
1220 " to set the starting offset of partition not being consumed instead\n"
1221 " pass the offset in an `assign()` call.\n"
1222 "\n"
1223 " :param TopicPartition partition: Topic+partition+offset to seek to.\n"
1224 "\n"
1225 " :raises: KafkaException\n"
1226 "\n"
1227 },
1228 { "get_watermark_offsets", (PyCFunction)Consumer_get_watermark_offsets,
1229 METH_VARARGS|METH_KEYWORDS,
1230 ".. py:function:: get_watermark_offsets(partition, [timeout=None], [cached=False])\n"
1231 "\n"
1232 " Retrieve low and high offsets for partition.\n"
1233 "\n"
1234 " :param TopicPartition partition: Topic+partition to return offsets for.\n"
1235 " :param float timeout: Request timeout (when cached=False). (Seconds)\n"
1236 " :param bool cached: Instead of querying the broker used cached information. "
1237 "Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while "
1238 "the high offset is updated on each message fetched from the broker for this partition.\n"
1239 " :returns: Tuple of (low,high) on success or None on timeout. "
1240 "The high offset is the offset of the last message + 1.\n"
1241 " :rtype: tuple(int,int)\n"
1242 " :raises: KafkaException\n"
1243 " :raises: RuntimeError if called on a closed consumer\n"
1244 "\n"
1245 },
1246 { "offsets_for_times", (PyCFunction)Consumer_offsets_for_times,
1247 METH_VARARGS|METH_KEYWORDS,
1248 ".. py:function:: offsets_for_times(partitions, [timeout=None])\n"
1249 "\n"
1250 " offsets_for_times looks up offsets by timestamp for the given partitions.\n"
1251 "\n"
1252 " The returned offsets for each partition is the earliest offset whose\n"
1253 " timestamp is greater than or equal to the given timestamp in the\n"
1254 " corresponding partition.\n"
1255 "\n"
1256 " :param list(TopicPartition) partitions: topic+partitions with timestamps in the TopicPartition.offset field.\n"
1257 " :param float timeout: Request timeout. (Seconds)\n"
1258 " :returns: list of topic+partition with offset field set and possibly error set\n"
1259 " :rtype: list(TopicPartition)\n"
1260 " :raises: KafkaException\n"
1261 " :raises: RuntimeError if called on a closed consumer\n"
1262 "\n"
1263 },
1264 { "close", (PyCFunction)Consumer_close, METH_NOARGS,
1265 "\n"
1266 " Close down and terminate the Kafka Consumer.\n"
1267 "\n"
1268 " Actions performed:\n"
1269 "\n"
1270 " - Stops consuming\n"
1271 " - Commits offsets - except if the consumer property 'enable.auto.commit' is set to False\n"
1272 " - Leave consumer group\n"
1273 "\n"
1274 " .. note: Registered callbacks may be called from this method, "
1275 "see :py:func::`poll()` for more info.\n"
1276 "\n"
1277 " :rtype: None\n"
1278 " :raises: RuntimeError if called on a closed consumer\n"
1279 "\n"
1280 },
1281 { "list_topics", (PyCFunction)list_topics, METH_VARARGS|METH_KEYWORDS,
1282 list_topics_doc
1283 },
1284
1285 { NULL }
1286 };
1287
1288
Consumer_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * c_parts,void * opaque)1289 static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
1290 rd_kafka_topic_partition_list_t *c_parts,
1291 void *opaque) {
1292 Handle *self = opaque;
1293 CallState *cs;
1294
1295 cs = CallState_get(self);
1296
1297 self->u.Consumer.rebalance_assigned = 0;
1298
1299 if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS &&
1300 self->u.Consumer.on_assign) ||
1301 (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS &&
1302 self->u.Consumer.on_revoke)) {
1303 PyObject *parts;
1304 PyObject *args, *result;
1305
1306 /* Construct list of TopicPartition based on 'c_parts' */
1307 parts = c_parts_to_py(c_parts);
1308
1309 args = Py_BuildValue("(OO)", self, parts);
1310
1311 Py_DECREF(parts);
1312
1313 if (!args) {
1314 cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
1315 "Unable to build callback args");
1316 CallState_crash(cs);
1317 CallState_resume(cs);
1318 return;
1319 }
1320
1321 result = PyObject_CallObject(
1322 err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
1323 self->u.Consumer.on_assign :
1324 self->u.Consumer.on_revoke, args);
1325
1326 Py_DECREF(args);
1327
1328 if (result)
1329 Py_DECREF(result);
1330 else {
1331 CallState_crash(cs);
1332 rd_kafka_yield(rk);
1333 }
1334 }
1335
1336 /* Fallback: librdkafka needs the rebalance_cb to call assign()
1337 * to synchronize state, if the user did not do this from callback,
1338 * or there was no callback, or the callback failed, then we perform
1339 * that assign() call here instead. */
1340 if (!self->u.Consumer.rebalance_assigned) {
1341 if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
1342 rd_kafka_assign(rk, c_parts);
1343 else
1344 rd_kafka_assign(rk, NULL);
1345 }
1346
1347 CallState_resume(cs);
1348 }
1349
1350
1351
1352
1353
Consumer_init(PyObject * selfobj,PyObject * args,PyObject * kwargs)1354 static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
1355 Handle *self = (Handle *)selfobj;
1356 char errstr[256];
1357 rd_kafka_conf_t *conf;
1358
1359 if (self->rk) {
1360 PyErr_SetString(PyExc_RuntimeError,
1361 "Consumer already initialized");
1362 return -1;
1363 }
1364
1365 self->type = RD_KAFKA_CONSUMER;
1366
1367 if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
1368 args, kwargs)))
1369 return -1; /* Exception raised by ..conf_setup() */
1370
1371 rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb);
1372 rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb);
1373
1374 self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
1375 errstr, sizeof(errstr));
1376 if (!self->rk) {
1377 cfl_PyErr_Format(rd_kafka_last_error(),
1378 "Failed to create consumer: %s", errstr);
1379 rd_kafka_conf_destroy(conf);
1380 return -1;
1381 }
1382
1383 /* Forward log messages to main queue which is then forwarded
1384 * to the consumer queue */
1385 if (self->logger)
1386 rd_kafka_set_log_queue(self->rk, NULL);
1387
1388 rd_kafka_poll_set_consumer(self->rk);
1389
1390 self->u.Consumer.rkqu = rd_kafka_queue_get_consumer(self->rk);
1391 assert(self->u.Consumer.rkqu);
1392
1393 return 0;
1394 }
1395
Consumer_new(PyTypeObject * type,PyObject * args,PyObject * kwargs)1396 static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
1397 PyObject *kwargs) {
1398 return type->tp_alloc(type, 0);
1399 }
1400
1401
1402 PyTypeObject ConsumerType = {
1403 PyVarObject_HEAD_INIT(NULL, 0)
1404 "cimpl.Consumer", /*tp_name*/
1405 sizeof(Handle), /*tp_basicsize*/
1406 0, /*tp_itemsize*/
1407 (destructor)Consumer_dealloc, /*tp_dealloc*/
1408 0, /*tp_print*/
1409 0, /*tp_getattr*/
1410 0, /*tp_setattr*/
1411 0, /*tp_compare*/
1412 0, /*tp_repr*/
1413 0, /*tp_as_number*/
1414 0, /*tp_as_sequence*/
1415 0, /*tp_as_mapping*/
1416 0, /*tp_hash */
1417 0, /*tp_call*/
1418 0, /*tp_str*/
1419 0, /*tp_getattro*/
1420 0, /*tp_setattro*/
1421 0, /*tp_as_buffer*/
1422 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
1423 Py_TPFLAGS_HAVE_GC, /*tp_flags*/
1424 "High-level Kafka Consumer\n"
1425 "\n"
1426 ".. py:function:: Consumer(config)\n"
1427 "\n"
1428 " :param dict config: Configuration properties. At a minimum ``group.id`` **must** be set,"
1429 " ``bootstrap.servers`` **should** be set"
1430 "\n"
1431 "Create new Consumer instance using provided configuration dict.\n"
1432 "\n"
1433 " Special configuration properties:\n"
1434 " ``on_commit``: Optional callback will be called when a commit "
1435 "request has succeeded or failed.\n"
1436 "\n"
1437 "\n"
1438 ".. py:function:: on_commit(err, partitions)\n"
1439 "\n"
1440 " :param KafkaError err: Commit error object, or None on success.\n"
1441 " :param list(TopicPartition) partitions: List of partitions with "
1442 "their committed offsets or per-partition errors.\n"
1443 "\n"
1444 "\n", /*tp_doc*/
1445 (traverseproc)Consumer_traverse, /* tp_traverse */
1446 (inquiry)Consumer_clear, /* tp_clear */
1447 0, /* tp_richcompare */
1448 0, /* tp_weaklistoffset */
1449 0, /* tp_iter */
1450 0, /* tp_iternext */
1451 Consumer_methods, /* tp_methods */
1452 0, /* tp_members */
1453 0, /* tp_getset */
1454 0, /* tp_base */
1455 0, /* tp_dict */
1456 0, /* tp_descr_get */
1457 0, /* tp_descr_set */
1458 0, /* tp_dictoffset */
1459 Consumer_init, /* tp_init */
1460 0, /* tp_alloc */
1461 Consumer_new /* tp_new */
1462 };
1463