1 /**
2 * Copyright 2018 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "confluent_kafka.h"
18
19 #include <stdarg.h>
20
21
22 /****************************************************************************
23 *
24 *
25 * Admin Client API
26 *
27 *
28 ****************************************************************************/
29
30
31
32
Admin_clear(Handle * self)33 static int Admin_clear (Handle *self) {
34
35 Handle_clear(self);
36
37 return 0;
38 }
39
Admin_dealloc(Handle * self)40 static void Admin_dealloc (Handle *self) {
41 PyObject_GC_UnTrack(self);
42
43 if (self->rk) {
44 CallState cs;
45 CallState_begin(self, &cs);
46
47 rd_kafka_destroy(self->rk);
48
49 CallState_end(self, &cs);
50 }
51
52 Admin_clear(self);
53
54 Py_TYPE(self)->tp_free((PyObject *)self);
55 }
56
Admin_traverse(Handle * self,visitproc visit,void * arg)57 static int Admin_traverse (Handle *self,
58 visitproc visit, void *arg) {
59 Handle_traverse(self, visit, arg);
60
61 return 0;
62 }
63
64
65 /**
66 * @name AdminOptions
67 *
68 *
69 */
70 #define Admin_options_def_int (-12345)
71 #define Admin_options_def_float ((float)Admin_options_def_int)
72
73 struct Admin_options {
74 int validate_only; /* needs special bool parsing */
75 float request_timeout; /* parser: f */
76 float operation_timeout; /* parser: f */
77 int broker; /* parser: i */
78 };
79
80 /**@brief "unset" value initializers for Admin_options
81 * Make sure this is kept up to date with Admin_options above. */
82 #define Admin_options_INITIALIZER { \
83 Admin_options_def_int, Admin_options_def_float, \
84 Admin_options_def_float, Admin_options_def_int, \
85 }
86
87 #define Admin_options_is_set_int(v) ((v) != Admin_options_def_int)
88 #define Admin_options_is_set_float(v) Admin_options_is_set_int((int)(v))
89
90
91 /**
92 * @brief Convert Admin_options to rd_kafka_AdminOptions_t.
93 *
94 * @param forApi is the librdkafka name of the admin API that these options
95 * will be used for, e.g., "CreateTopics".
96 * @param future is set as the options opaque.
97 *
98 * @returns a new C admin options object on success, or NULL on failure in
99 * which case an exception is raised.
100 */
101 static rd_kafka_AdminOptions_t *
Admin_options_to_c(Handle * self,rd_kafka_admin_op_t for_api,const struct Admin_options * options,PyObject * future)102 Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
103 const struct Admin_options *options,
104 PyObject *future) {
105 rd_kafka_AdminOptions_t *c_options;
106 rd_kafka_resp_err_t err;
107 char errstr[512];
108
109 c_options = rd_kafka_AdminOptions_new(self->rk, for_api);
110 if (!c_options) {
111 PyErr_Format(PyExc_RuntimeError,
112 "This Admin API method "
113 "is unsupported by librdkafka %s",
114 rd_kafka_version_str());
115 return NULL;
116 }
117
118 rd_kafka_AdminOptions_set_opaque(c_options, (void *)future);
119
120 if (Admin_options_is_set_int(options->validate_only) &&
121 (err = rd_kafka_AdminOptions_set_validate_only(
122 c_options, options->validate_only,
123 errstr, sizeof(errstr))))
124 goto err;
125
126 if (Admin_options_is_set_float(options->request_timeout) &&
127 (err = rd_kafka_AdminOptions_set_request_timeout(
128 c_options, (int)(options->request_timeout * 1000.0f),
129 errstr, sizeof(errstr))))
130 goto err;
131
132 if (Admin_options_is_set_float(options->operation_timeout) &&
133 (err = rd_kafka_AdminOptions_set_operation_timeout(
134 c_options, (int)(options->operation_timeout * 1000.0f),
135 errstr, sizeof(errstr))))
136 goto err;
137
138 if (Admin_options_is_set_int(options->broker) &&
139 (err = rd_kafka_AdminOptions_set_broker(
140 c_options, (int32_t)options->broker,
141 errstr, sizeof(errstr))))
142 goto err;
143
144 return c_options;
145
146 err:
147 rd_kafka_AdminOptions_destroy(c_options);
148 PyErr_Format(PyExc_ValueError, "%s", errstr);
149 return NULL;
150 }
151
152
153
154
155 /**
156 * @brief Translate Python list(list(int)) replica assignments and set
157 * on the specified generic C object using a setter based on
158 * forApi.
159 *
160 * @returns 1 on success or 0 on error in which case an exception is raised.
161 */
Admin_set_replica_assignment(const char * forApi,void * c_obj,PyObject * ra,int min_count,int max_count,const char * err_count_desc)162 static int Admin_set_replica_assignment (const char *forApi, void *c_obj,
163 PyObject *ra, int
164 min_count, int max_count,
165 const char *err_count_desc) {
166 int pi;
167
168 if (!PyList_Check(ra) ||
169 (int)PyList_Size(ra) < min_count ||
170 (int)PyList_Size(ra) > max_count) {
171 PyErr_Format(PyExc_ValueError,
172 "replica_assignment must be "
173 "a list of int lists with an "
174 "outer size of %s", err_count_desc);
175 return 0;
176 }
177
178 for (pi = 0 ; pi < (int)PyList_Size(ra) ; pi++) {
179 size_t ri;
180 PyObject *replicas = PyList_GET_ITEM(ra, pi);
181 rd_kafka_resp_err_t err;
182 int32_t *c_replicas;
183 size_t replica_cnt;
184 char errstr[512];
185
186 if (!PyList_Check(replicas) ||
187 (replica_cnt = (size_t)PyList_Size(replicas)) < 1) {
188 PyErr_Format(
189 PyExc_ValueError,
190 "replica_assignment must be "
191 "a list of int lists with an "
192 "outer size of %s", err_count_desc);
193 return 0;
194 }
195
196 c_replicas = malloc(sizeof(*c_replicas) *
197 replica_cnt);
198
199 for (ri = 0 ; ri < replica_cnt ; ri++) {
200 PyObject *replica =
201 PyList_GET_ITEM(replicas, ri);
202
203 if (!cfl_PyInt_Check(replica)) {
204 PyErr_Format(
205 PyExc_ValueError,
206 "replica_assignment must be "
207 "a list of int lists with an "
208 "outer size of %s", err_count_desc);
209 free(c_replicas);
210 return 0;
211 }
212
213 c_replicas[ri] = (int32_t)cfl_PyInt_AsInt(replica);
214
215 }
216
217
218 if (!strcmp(forApi, "CreateTopics"))
219 err = rd_kafka_NewTopic_set_replica_assignment(
220 (rd_kafka_NewTopic_t *)c_obj, (int32_t)pi,
221 c_replicas, replica_cnt,
222 errstr, sizeof(errstr));
223 else if (!strcmp(forApi, "CreatePartitions"))
224 err = rd_kafka_NewPartitions_set_replica_assignment(
225 (rd_kafka_NewPartitions_t *)c_obj, (int32_t)pi,
226 c_replicas, replica_cnt,
227 errstr, sizeof(errstr));
228 else {
229 /* Should never be reached */
230 err = RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
231 snprintf(errstr, sizeof(errstr),
232 "Unsupported forApi %s", forApi);
233 }
234
235 free(c_replicas);
236
237 if (err) {
238 PyErr_SetString(
239 PyExc_ValueError, errstr);
240 return 0;
241 }
242 }
243
244 return 1;
245 }
246
247 /**
248 * @brief Translate a dict to ConfigResource set_config() calls,
249 * or to NewTopic_add_config() calls.
250 *
251 *
252 * @returns 1 on success or 0 if an exception was raised.
253 */
254 static int
Admin_config_dict_to_c(void * c_obj,PyObject * dict,const char * op_name)255 Admin_config_dict_to_c (void *c_obj, PyObject *dict, const char *op_name) {
256 Py_ssize_t pos = 0;
257 PyObject *ko, *vo;
258
259 while (PyDict_Next(dict, &pos, &ko, &vo)) {
260 PyObject *ks, *ks8;
261 PyObject *vs = NULL, *vs8 = NULL;
262 const char *k;
263 const char *v;
264 rd_kafka_resp_err_t err;
265
266 if (!(ks = cfl_PyObject_Unistr(ko))) {
267 PyErr_Format(PyExc_ValueError,
268 "expected %s config name to be unicode "
269 "string", op_name);
270 return 0;
271 }
272
273 k = cfl_PyUnistr_AsUTF8(ks, &ks8);
274
275 if (!(vs = cfl_PyObject_Unistr(vo)) ||
276 !(v = cfl_PyUnistr_AsUTF8(vs, &vs8))) {
277 PyErr_Format(PyExc_ValueError,
278 "expect %s config value for %s "
279 "to be unicode string",
280 op_name, k);
281 Py_XDECREF(vs);
282 Py_XDECREF(vs8);
283 Py_DECREF(ks);
284 Py_XDECREF(ks8);
285 return 0;
286 }
287
288 if (!strcmp(op_name, "set_config"))
289 err = rd_kafka_ConfigResource_set_config(
290 (rd_kafka_ConfigResource_t *)c_obj,
291 k, v);
292 else if (!strcmp(op_name, "newtopic_set_config"))
293 err = rd_kafka_NewTopic_set_config(
294 (rd_kafka_NewTopic_t *)c_obj, k, v);
295 else
296 err = RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
297
298 if (err) {
299 PyErr_Format(PyExc_ValueError,
300 "%s config %s failed: %s",
301 op_name, k, rd_kafka_err2str(err));
302 Py_XDECREF(vs);
303 Py_XDECREF(vs8);
304 Py_DECREF(ks);
305 Py_XDECREF(ks8);
306 return 0;
307 }
308
309 Py_XDECREF(vs);
310 Py_XDECREF(vs8);
311 Py_DECREF(ks);
312 Py_XDECREF(ks8);
313 }
314
315 return 1;
316 }
317
318
319 /**
320 * @brief create_topics
321 */
Admin_create_topics(Handle * self,PyObject * args,PyObject * kwargs)322 static PyObject *Admin_create_topics (Handle *self, PyObject *args,
323 PyObject *kwargs) {
324 PyObject *topics = NULL, *future, *validate_only_obj = NULL;
325 static char *kws[] = { "topics",
326 "future",
327 /* options */
328 "validate_only",
329 "request_timeout",
330 "operation_timeout",
331 NULL };
332 struct Admin_options options = Admin_options_INITIALIZER;
333 rd_kafka_AdminOptions_t *c_options = NULL;
334 int tcnt;
335 int i;
336 rd_kafka_NewTopic_t **c_objs;
337 rd_kafka_queue_t *rkqu;
338 CallState cs;
339
340 /* topics is a list of NewTopic objects. */
341 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|Off", kws,
342 &topics, &future,
343 &validate_only_obj,
344 &options.request_timeout,
345 &options.operation_timeout))
346 return NULL;
347
348 if (!PyList_Check(topics) || (tcnt = (int)PyList_Size(topics)) < 1) {
349 PyErr_SetString(PyExc_ValueError,
350 "Expected non-empty list of NewTopic objects");
351 return NULL;
352 }
353
354 if (validate_only_obj &&
355 !cfl_PyBool_get(validate_only_obj, "validate_only",
356 &options.validate_only))
357 return NULL;
358
359 c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATETOPICS,
360 &options, future);
361 if (!c_options)
362 return NULL; /* Exception raised by options_to_c() */
363
364 /* options_to_c() sets future as the opaque, which is used in the
365 * background_event_cb to set the results on the future as the
366 * admin operation is finished, so we need to keep our own refcount. */
367 Py_INCREF(future);
368
369 /*
370 * Parse the list of NewTopics and convert to corresponding C types.
371 */
372 c_objs = malloc(sizeof(*c_objs) * tcnt);
373
374 for (i = 0 ; i < tcnt ; i++) {
375 NewTopic *newt = (NewTopic *)PyList_GET_ITEM(topics, i);
376 char errstr[512];
377 int r;
378
379 r = PyObject_IsInstance((PyObject *)newt,
380 (PyObject *)&NewTopicType);
381 if (r == -1)
382 goto err; /* Exception raised by IsInstance() */
383 else if (r == 0) {
384 PyErr_SetString(PyExc_ValueError,
385 "Expected list of NewTopic objects");
386 goto err;
387 }
388
389 c_objs[i] = rd_kafka_NewTopic_new(newt->topic,
390 newt->num_partitions,
391 newt->replication_factor,
392 errstr, sizeof(errstr));
393 if (!c_objs[i]) {
394 PyErr_Format(PyExc_ValueError,
395 "Invalid NewTopic(%s): %s",
396 newt->topic, errstr);
397 i++;
398 goto err;
399 }
400
401 if (newt->replica_assignment) {
402 if (newt->replication_factor != -1) {
403 PyErr_SetString(PyExc_ValueError,
404 "replication_factor and "
405 "replica_assignment are "
406 "mutually exclusive");
407 i++;
408 goto err;
409 }
410
411 if (!Admin_set_replica_assignment(
412 "CreateTopics", (void *)c_objs[i],
413 newt->replica_assignment,
414 newt->num_partitions, newt->num_partitions,
415 "num_partitions")) {
416 i++;
417 goto err;
418 }
419 }
420
421 if (newt->config) {
422 if (!Admin_config_dict_to_c((void *)c_objs[i],
423 newt->config,
424 "newtopic_set_config")) {
425 i++;
426 goto err;
427 }
428 }
429 }
430
431
432 /* Use librdkafka's background thread queue to automatically dispatch
433 * Admin_background_event_cb() when the admin operation is finished. */
434 rkqu = rd_kafka_queue_get_background(self->rk);
435
436 /*
437 * Call CreateTopics.
438 *
439 * We need to set up a CallState and release GIL here since
440 * the background_event_cb may be triggered immediately.
441 */
442 CallState_begin(self, &cs);
443 rd_kafka_CreateTopics(self->rk, c_objs, tcnt, c_options, rkqu);
444 CallState_end(self, &cs);
445
446 rd_kafka_NewTopic_destroy_array(c_objs, tcnt);
447 rd_kafka_AdminOptions_destroy(c_options);
448 free(c_objs);
449 rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
450
451 Py_RETURN_NONE;
452
453 err:
454 rd_kafka_NewTopic_destroy_array(c_objs, i);
455 rd_kafka_AdminOptions_destroy(c_options);
456 free(c_objs);
457 Py_DECREF(future); /* from options_to_c() */
458
459 return NULL;
460 }
461
462
463 /**
464 * @brief delete_topics
465 */
Admin_delete_topics(Handle * self,PyObject * args,PyObject * kwargs)466 static PyObject *Admin_delete_topics (Handle *self, PyObject *args,
467 PyObject *kwargs) {
468 PyObject *topics = NULL, *future;
469 static char *kws[] = { "topics",
470 "future",
471 /* options */
472 "request_timeout",
473 "operation_timeout",
474 NULL };
475 struct Admin_options options = Admin_options_INITIALIZER;
476 rd_kafka_AdminOptions_t *c_options = NULL;
477 int tcnt;
478 int i;
479 rd_kafka_DeleteTopic_t **c_objs;
480 rd_kafka_queue_t *rkqu;
481 CallState cs;
482
483 /* topics is a list of strings. */
484 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O|ff", kws,
485 (PyObject *)&PyList_Type, &topics,
486 &future,
487 &options.request_timeout,
488 &options.operation_timeout))
489 return NULL;
490
491 if (!PyList_Check(topics) || (tcnt = (int)PyList_Size(topics)) < 1) {
492 PyErr_SetString(PyExc_ValueError,
493 "Expected non-empty list of topic strings");
494 return NULL;
495 }
496
497 c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DELETETOPICS,
498 &options, future);
499 if (!c_options)
500 return NULL; /* Exception raised by options_to_c() */
501
502 /* options_to_c() sets opaque to the future object, which is used in the
503 * background_event_cb to set the results on the future as the
504 * admin operation is finished, so we need to keep our own refcount. */
505 Py_INCREF(future);
506
507 /*
508 * Parse the list of strings and convert to corresponding C types.
509 */
510 c_objs = malloc(sizeof(*c_objs) * tcnt);
511
512 for (i = 0 ; i < tcnt ; i++) {
513 PyObject *topic = PyList_GET_ITEM(topics, i);
514 PyObject *utopic;
515 PyObject *uotopic = NULL;
516
517 if (topic == Py_None ||
518 !(utopic = cfl_PyObject_Unistr(topic))) {
519 PyErr_Format(PyExc_ValueError,
520 "Expected list of topic strings, "
521 "not %s",
522 ((PyTypeObject *)PyObject_Type(topic))->
523 tp_name);
524 goto err;
525 }
526
527 c_objs[i] = rd_kafka_DeleteTopic_new(
528 cfl_PyUnistr_AsUTF8(utopic, &uotopic));
529
530 Py_XDECREF(utopic);
531 Py_XDECREF(uotopic);
532 }
533
534
535 /* Use librdkafka's background thread queue to automatically dispatch
536 * Admin_background_event_cb() when the admin operation is finished. */
537 rkqu = rd_kafka_queue_get_background(self->rk);
538
539 /*
540 * Call DeleteTopics.
541 *
542 * We need to set up a CallState and release GIL here since
543 * the event_cb may be triggered immediately.
544 */
545 CallState_begin(self, &cs);
546 rd_kafka_DeleteTopics(self->rk, c_objs, tcnt, c_options, rkqu);
547 CallState_end(self, &cs);
548
549 rd_kafka_DeleteTopic_destroy_array(c_objs, i);
550 rd_kafka_AdminOptions_destroy(c_options);
551 free(c_objs);
552 rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
553
554 Py_RETURN_NONE;
555
556 err:
557 rd_kafka_DeleteTopic_destroy_array(c_objs, i);
558 rd_kafka_AdminOptions_destroy(c_options);
559 free(c_objs);
560 Py_DECREF(future); /* from options_to_c() */
561
562 return NULL;
563 }
564
565
566 /**
567 * @brief create_partitions
568 */
Admin_create_partitions(Handle * self,PyObject * args,PyObject * kwargs)569 static PyObject *Admin_create_partitions (Handle *self, PyObject *args,
570 PyObject *kwargs) {
571 PyObject *topics = NULL, *future, *validate_only_obj = NULL;
572 static char *kws[] = { "topics",
573 "future",
574 /* options */
575 "validate_only",
576 "request_timeout",
577 "operation_timeout",
578 NULL };
579 struct Admin_options options = Admin_options_INITIALIZER;
580 rd_kafka_AdminOptions_t *c_options = NULL;
581 int tcnt;
582 int i;
583 rd_kafka_NewPartitions_t **c_objs;
584 rd_kafka_queue_t *rkqu;
585 CallState cs;
586
587 /* topics is a list of NewPartitions_t objects. */
588 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|Off", kws,
589 &topics, &future,
590 &validate_only_obj,
591 &options.request_timeout,
592 &options.operation_timeout))
593 return NULL;
594
595 if (!PyList_Check(topics) || (tcnt = (int)PyList_Size(topics)) < 1) {
596 PyErr_SetString(PyExc_ValueError,
597 "Expected non-empty list of "
598 "NewPartitions objects");
599 return NULL;
600 }
601
602 if (validate_only_obj &&
603 !cfl_PyBool_get(validate_only_obj, "validate_only",
604 &options.validate_only))
605 return NULL;
606
607 c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
608 &options, future);
609 if (!c_options)
610 return NULL; /* Exception raised by options_to_c() */
611
612 /* options_to_c() sets future as the opaque, which is used in the
613 * event_cb to set the results on the future as the admin operation
614 * is finished, so we need to keep our own refcount. */
615 Py_INCREF(future);
616
617 /*
618 * Parse the list of NewPartitions and convert to corresponding C types.
619 */
620 c_objs = malloc(sizeof(*c_objs) * tcnt);
621
622 for (i = 0 ; i < tcnt ; i++) {
623 NewPartitions *newp = (NewPartitions *)PyList_GET_ITEM(topics,
624 i);
625 char errstr[512];
626 int r;
627
628 r = PyObject_IsInstance((PyObject *)newp,
629 (PyObject *)&NewPartitionsType);
630 if (r == -1)
631 goto err; /* Exception raised by IsInstance() */
632 else if (r == 0) {
633 PyErr_SetString(PyExc_ValueError,
634 "Expected list of "
635 "NewPartitions objects");
636 goto err;
637 }
638
639 c_objs[i] = rd_kafka_NewPartitions_new(newp->topic,
640 newp->new_total_count,
641 errstr, sizeof(errstr));
642 if (!c_objs[i]) {
643 PyErr_Format(PyExc_ValueError,
644 "Invalid NewPartitions(%s): %s",
645 newp->topic, errstr);
646 goto err;
647 }
648
649 if (newp->replica_assignment &&
650 !Admin_set_replica_assignment(
651 "CreatePartitions", (void *)c_objs[i],
652 newp->replica_assignment,
653 1, newp->new_total_count,
654 "new_total_count - "
655 "existing partition count")) {
656 i++;
657 goto err; /* Exception raised by set_..() */
658 }
659 }
660
661
662 /* Use librdkafka's background thread queue to automatically dispatch
663 * Admin_background_event_cb() when the admin operation is finished. */
664 rkqu = rd_kafka_queue_get_background(self->rk);
665
666 /*
667 * Call CreatePartitions
668 *
669 * We need to set up a CallState and release GIL here since
670 * the event_cb may be triggered immediately.
671 */
672 CallState_begin(self, &cs);
673 rd_kafka_CreatePartitions(self->rk, c_objs, tcnt, c_options, rkqu);
674 CallState_end(self, &cs);
675
676 rd_kafka_NewPartitions_destroy_array(c_objs, tcnt);
677 rd_kafka_AdminOptions_destroy(c_options);
678 free(c_objs);
679 rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
680
681 Py_RETURN_NONE;
682
683 err:
684 rd_kafka_NewPartitions_destroy_array(c_objs, i);
685 rd_kafka_AdminOptions_destroy(c_options);
686 free(c_objs);
687 Py_DECREF(future); /* from options_to_c() */
688
689 return NULL;
690 }
691
692
693 /**
694 * @brief describe_configs
695 */
Admin_describe_configs(Handle * self,PyObject * args,PyObject * kwargs)696 static PyObject *Admin_describe_configs (Handle *self, PyObject *args,
697 PyObject *kwargs) {
698 PyObject *resources, *future;
699 static char *kws[] = { "resources",
700 "future",
701 /* options */
702 "request_timeout",
703 "broker",
704 NULL };
705 struct Admin_options options = Admin_options_INITIALIZER;
706 rd_kafka_AdminOptions_t *c_options = NULL;
707 PyObject *ConfigResource_type;
708 int cnt, i;
709 rd_kafka_ConfigResource_t **c_objs;
710 rd_kafka_queue_t *rkqu;
711 CallState cs;
712
713 /* topics is a list of NewPartitions_t objects. */
714 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|fi", kws,
715 &resources, &future,
716 &options.request_timeout,
717 &options.broker))
718 return NULL;
719
720 if (!PyList_Check(resources) ||
721 (cnt = (int)PyList_Size(resources)) < 1) {
722 PyErr_SetString(PyExc_ValueError,
723 "Expected non-empty list of ConfigResource "
724 "objects");
725 return NULL;
726 }
727
728 c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS,
729 &options, future);
730 if (!c_options)
731 return NULL; /* Exception raised by options_to_c() */
732
733 /* Look up the ConfigResource class so we can check if the provided
734 * topics are of correct type.
735 * Since this is not in the fast path we treat ourselves
736 * to the luxury of looking up this for each call. */
737 ConfigResource_type = cfl_PyObject_lookup("confluent_kafka.admin",
738 "ConfigResource");
739 if (!ConfigResource_type) {
740 rd_kafka_AdminOptions_destroy(c_options);
741 return NULL; /* Exception raised by lookup() */
742 }
743
744 /* options_to_c() sets future as the opaque, which is used in the
745 * event_cb to set the results on the future as the admin operation
746 * is finished, so we need to keep our own refcount. */
747 Py_INCREF(future);
748
749 /*
750 * Parse the list of ConfigResources and convert to
751 * corresponding C types.
752 */
753 c_objs = malloc(sizeof(*c_objs) * cnt);
754
755 for (i = 0 ; i < cnt ; i++) {
756 PyObject *res = PyList_GET_ITEM(resources, i);
757 int r;
758 int restype;
759 char *resname;
760
761 r = PyObject_IsInstance(res, ConfigResource_type);
762 if (r == -1)
763 goto err; /* Exception raised by IsInstance() */
764 else if (r == 0) {
765 PyErr_SetString(PyExc_ValueError,
766 "Expected list of "
767 "ConfigResource objects");
768 goto err;
769 }
770
771 if (!cfl_PyObject_GetInt(res, "restype_int", &restype, 0, 0))
772 goto err;
773
774 if (!cfl_PyObject_GetString(res, "name", &resname, NULL, 0))
775 goto err;
776
777 c_objs[i] = rd_kafka_ConfigResource_new(
778 (rd_kafka_ResourceType_t)restype, resname);
779 if (!c_objs[i]) {
780 PyErr_Format(PyExc_ValueError,
781 "Invalid ConfigResource(%d,%s)",
782 restype, resname);
783 free(resname);
784 goto err;
785 }
786 free(resname);
787 }
788
789
790 /* Use librdkafka's background thread queue to automatically dispatch
791 * Admin_background_event_cb() when the admin operation is finished. */
792 rkqu = rd_kafka_queue_get_background(self->rk);
793
794 /*
795 * Call DescribeConfigs
796 *
797 * We need to set up a CallState and release GIL here since
798 * the event_cb may be triggered immediately.
799 */
800 CallState_begin(self, &cs);
801 rd_kafka_DescribeConfigs(self->rk, c_objs, cnt, c_options, rkqu);
802 CallState_end(self, &cs);
803
804 rd_kafka_ConfigResource_destroy_array(c_objs, cnt);
805 rd_kafka_AdminOptions_destroy(c_options);
806 free(c_objs);
807 rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
808
809 Py_DECREF(ConfigResource_type); /* from lookup() */
810
811 Py_RETURN_NONE;
812
813 err:
814 rd_kafka_ConfigResource_destroy_array(c_objs, i);
815 rd_kafka_AdminOptions_destroy(c_options);
816 free(c_objs);
817 Py_DECREF(ConfigResource_type); /* from lookup() */
818 Py_DECREF(future); /* from options_to_c() */
819
820 return NULL;
821 }
822
823
824
825
826 /**
827 * @brief alter_configs
828 */
Admin_alter_configs(Handle * self,PyObject * args,PyObject * kwargs)829 static PyObject *Admin_alter_configs (Handle *self, PyObject *args,
830 PyObject *kwargs) {
831 PyObject *resources, *future;
832 PyObject *validate_only_obj = NULL;
833 static char *kws[] = { "resources",
834 "future",
835 /* options */
836 "validate_only",
837 "request_timeout",
838 "broker",
839 NULL };
840 struct Admin_options options = Admin_options_INITIALIZER;
841 rd_kafka_AdminOptions_t *c_options = NULL;
842 PyObject *ConfigResource_type;
843 int cnt, i;
844 rd_kafka_ConfigResource_t **c_objs;
845 rd_kafka_queue_t *rkqu;
846 CallState cs;
847
848 /* topics is a list of NewPartitions_t objects. */
849 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|Ofi", kws,
850 &resources, &future,
851 &validate_only_obj,
852 &options.request_timeout,
853 &options.broker))
854 return NULL;
855
856 if (!PyList_Check(resources) ||
857 (cnt = (int)PyList_Size(resources)) < 1) {
858 PyErr_SetString(PyExc_ValueError,
859 "Expected non-empty list of ConfigResource "
860 "objects");
861 return NULL;
862 }
863
864 if (validate_only_obj &&
865 !cfl_PyBool_get(validate_only_obj, "validate_only",
866 &options.validate_only))
867 return NULL;
868
869 c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ALTERCONFIGS,
870 &options, future);
871 if (!c_options)
872 return NULL; /* Exception raised by options_to_c() */
873
874 /* Look up the ConfigResource class so we can check if the provided
875 * topics are of correct type.
876 * Since this is not in the fast path we treat ourselves
877 * to the luxury of looking up this for each call. */
878 ConfigResource_type = cfl_PyObject_lookup("confluent_kafka.admin",
879 "ConfigResource");
880 if (!ConfigResource_type) {
881 rd_kafka_AdminOptions_destroy(c_options);
882 return NULL; /* Exception raised by find() */
883 }
884
885 /* options_to_c() sets future as the opaque, which is used in the
886 * event_cb to set the results on the future as the admin operation
887 * is finished, so we need to keep our own refcount. */
888 Py_INCREF(future);
889
890 /*
891 * Parse the list of ConfigResources and convert to
892 * corresponding C types.
893 */
894 c_objs = malloc(sizeof(*c_objs) * cnt);
895
896 for (i = 0 ; i < cnt ; i++) {
897 PyObject *res = PyList_GET_ITEM(resources, i);
898 int r;
899 int restype;
900 char *resname;
901 PyObject *dict;
902
903 r = PyObject_IsInstance(res, ConfigResource_type);
904 if (r == -1)
905 goto err; /* Exception raised by IsInstance() */
906 else if (r == 0) {
907 PyErr_SetString(PyExc_ValueError,
908 "Expected list of "
909 "ConfigResource objects");
910 goto err;
911 }
912
913 if (!cfl_PyObject_GetInt(res, "restype_int", &restype, 0, 0))
914 goto err;
915
916 if (!cfl_PyObject_GetString(res, "name", &resname, NULL, 0))
917 goto err;
918
919 c_objs[i] = rd_kafka_ConfigResource_new(
920 (rd_kafka_ResourceType_t)restype, resname);
921 if (!c_objs[i]) {
922 PyErr_Format(PyExc_ValueError,
923 "Invalid ConfigResource(%d,%s)",
924 restype, resname);
925 free(resname);
926 goto err;
927 }
928 free(resname);
929
930 /*
931 * Translate and apply config entries in the various dicts.
932 */
933 if (!cfl_PyObject_GetAttr(res, "set_config_dict", &dict,
934 &PyDict_Type, 1)) {
935 i++;
936 goto err;
937 }
938 if (!Admin_config_dict_to_c(c_objs[i], dict, "set_config")) {
939 Py_DECREF(dict);
940 i++;
941 goto err;
942 }
943 Py_DECREF(dict);
944 }
945
946
947 /* Use librdkafka's background thread queue to automatically dispatch
948 * Admin_background_event_cb() when the admin operation is finished. */
949 rkqu = rd_kafka_queue_get_background(self->rk);
950
951 /*
952 * Call AlterConfigs
953 *
954 * We need to set up a CallState and release GIL here since
955 * the event_cb may be triggered immediately.
956 */
957 CallState_begin(self, &cs);
958 rd_kafka_AlterConfigs(self->rk, c_objs, cnt, c_options, rkqu);
959 CallState_end(self, &cs);
960
961 rd_kafka_ConfigResource_destroy_array(c_objs, cnt);
962 rd_kafka_AdminOptions_destroy(c_options);
963 free(c_objs);
964 rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
965
966 Py_DECREF(ConfigResource_type); /* from lookup() */
967
968 Py_RETURN_NONE;
969
970 err:
971 rd_kafka_ConfigResource_destroy_array(c_objs, i);
972 rd_kafka_AdminOptions_destroy(c_options);
973 free(c_objs);
974 Py_DECREF(ConfigResource_type); /* from lookup() */
975 Py_DECREF(future); /* from options_to_c() */
976
977 return NULL;
978 }
979
980
981
982 /**
983 * @brief Call rd_kafka_poll() and keep track of crashing callbacks.
984 * @returns -1 if callback crashed (or poll() failed), else the number
985 * of events served.
986 */
Admin_poll0(Handle * self,int tmout)987 static int Admin_poll0 (Handle *self, int tmout) {
988 int r;
989 CallState cs;
990
991 CallState_begin(self, &cs);
992
993 r = rd_kafka_poll(self->rk, tmout);
994
995 if (!CallState_end(self, &cs)) {
996 return -1;
997 }
998
999 return r;
1000 }
1001
1002
Admin_poll(Handle * self,PyObject * args,PyObject * kwargs)1003 static PyObject *Admin_poll (Handle *self, PyObject *args,
1004 PyObject *kwargs) {
1005 double tmout;
1006 int r;
1007 static char *kws[] = { "timeout", NULL };
1008
1009 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d", kws, &tmout))
1010 return NULL;
1011
1012 r = Admin_poll0(self, (int)(tmout * 1000));
1013 if (r == -1)
1014 return NULL;
1015
1016 return cfl_PyInt_FromInt(r);
1017 }
1018
1019
1020
1021 static PyMethodDef Admin_methods[] = {
1022 { "create_topics", (PyCFunction)Admin_create_topics,
1023 METH_VARARGS|METH_KEYWORDS,
1024 ".. py:function:: create_topics(topics, future, [validate_only, request_timeout, operation_timeout])\n"
1025 "\n"
1026 " Create new topics.\n"
1027 "\n"
1028 " This method should not be used directly, use confluent_kafka.AdminClient.create_topics()\n"
1029 },
1030
1031 { "delete_topics", (PyCFunction)Admin_delete_topics,
1032 METH_VARARGS|METH_KEYWORDS,
1033 ".. py:function:: delete_topics(topics, future, [request_timeout, operation_timeout])\n"
1034 "\n"
1035 " This method should not be used directly, use confluent_kafka.AdminClient.delete_topics()\n"
1036 },
1037
1038 { "create_partitions", (PyCFunction)Admin_create_partitions,
1039 METH_VARARGS|METH_KEYWORDS,
1040 ".. py:function:: create_partitions(topics, future, [validate_only, request_timeout, operation_timeout])\n"
1041 "\n"
1042 " This method should not be used directly, use confluent_kafka.AdminClient.create_partitions()\n"
1043 },
1044
1045 { "describe_configs", (PyCFunction)Admin_describe_configs,
1046 METH_VARARGS|METH_KEYWORDS,
1047 ".. py:function:: describe_configs(resources, future, [request_timeout, broker])\n"
1048 "\n"
1049 " This method should not be used directly, use confluent_kafka.AdminClient.describe_configs()\n"
1050 },
1051
1052 { "alter_configs", (PyCFunction)Admin_alter_configs,
1053 METH_VARARGS|METH_KEYWORDS,
1054 ".. py:function:: alter_configs(resources, future, [request_timeout, broker])\n"
1055 "\n"
1056 " This method should not be used directly, use confluent_kafka.AdminClient.alter_configs()\n"
1057 },
1058
1059
1060 { "poll", (PyCFunction)Admin_poll, METH_VARARGS|METH_KEYWORDS,
1061 ".. py:function:: poll([timeout])\n"
1062 "\n"
1063 " Polls the Admin client for event callbacks, such as error_cb, "
1064 "stats_cb, etc, if registered.\n"
1065 "\n"
1066 " There is no need to call poll() if no callbacks have been registered.\n"
1067 "\n"
1068 " :param float timeout: Maximum time to block waiting for events. (Seconds)\n"
1069 " :returns: Number of events processed (callbacks served)\n"
1070 " :rtype: int\n"
1071 "\n"
1072 },
1073
1074 { "list_topics", (PyCFunction)list_topics, METH_VARARGS|METH_KEYWORDS,
1075 list_topics_doc
1076 },
1077
1078 { NULL }
1079 };
1080
1081
Admin__len__(Handle * self)1082 static Py_ssize_t Admin__len__ (Handle *self) {
1083 return rd_kafka_outq_len(self->rk);
1084 }
1085
1086
1087 static PySequenceMethods Admin_seq_methods = {
1088 (lenfunc)Admin__len__ /* sq_length */
1089 };
1090
1091
1092 /**
1093 * @brief Convert C topic_result_t array to topic-indexed dict.
1094 */
1095 static PyObject *
Admin_c_topic_result_to_py(const rd_kafka_topic_result_t ** c_result,size_t cnt)1096 Admin_c_topic_result_to_py (const rd_kafka_topic_result_t **c_result,
1097 size_t cnt) {
1098 PyObject *result;
1099 size_t ti;
1100
1101 result = PyDict_New();
1102
1103 for (ti = 0 ; ti < cnt ; ti++) {
1104 PyObject *error;
1105
1106 error = KafkaError_new_or_None(
1107 rd_kafka_topic_result_error(c_result[ti]),
1108 rd_kafka_topic_result_error_string(c_result[ti]));
1109
1110 PyDict_SetItemString(
1111 result,
1112 rd_kafka_topic_result_name(c_result[ti]),
1113 error);
1114
1115 Py_DECREF(error);
1116 }
1117
1118 return result;
1119 }
1120
1121
1122
1123 /**
1124 * @brief Convert C ConfigEntry array to dict of py ConfigEntry objects.
1125 */
1126 static PyObject *
Admin_c_ConfigEntries_to_py(PyObject * ConfigEntry_type,const rd_kafka_ConfigEntry_t ** c_configs,size_t config_cnt)1127 Admin_c_ConfigEntries_to_py (PyObject *ConfigEntry_type,
1128 const rd_kafka_ConfigEntry_t **c_configs,
1129 size_t config_cnt) {
1130 PyObject *dict;
1131 size_t ci;
1132
1133 dict = PyDict_New();
1134
1135 for (ci = 0 ; ci < config_cnt ; ci++) {
1136 PyObject *kwargs, *args;
1137 const rd_kafka_ConfigEntry_t *ent = c_configs[ci];
1138 const rd_kafka_ConfigEntry_t **c_synonyms;
1139 PyObject *entry, *synonyms;
1140 size_t synonym_cnt;
1141 const char *val;
1142
1143 kwargs = PyDict_New();
1144
1145 cfl_PyDict_SetString(kwargs, "name",
1146 rd_kafka_ConfigEntry_name(ent));
1147 val = rd_kafka_ConfigEntry_value(ent);
1148 if (val)
1149 cfl_PyDict_SetString(kwargs, "value", val);
1150 else
1151 PyDict_SetItemString(kwargs, "value", Py_None);
1152 cfl_PyDict_SetInt(kwargs, "source",
1153 (int)rd_kafka_ConfigEntry_source(ent));
1154 cfl_PyDict_SetInt(kwargs, "is_read_only",
1155 rd_kafka_ConfigEntry_is_read_only(ent));
1156 cfl_PyDict_SetInt(kwargs, "is_default",
1157 rd_kafka_ConfigEntry_is_default(ent));
1158 cfl_PyDict_SetInt(kwargs, "is_sensitive",
1159 rd_kafka_ConfigEntry_is_sensitive(ent));
1160 cfl_PyDict_SetInt(kwargs, "is_synonym",
1161 rd_kafka_ConfigEntry_is_synonym(ent));
1162
1163 c_synonyms = rd_kafka_ConfigEntry_synonyms(ent,
1164 &synonym_cnt);
1165 synonyms = Admin_c_ConfigEntries_to_py(ConfigEntry_type,
1166 c_synonyms,
1167 synonym_cnt);
1168 if (!synonyms) {
1169 Py_DECREF(kwargs);
1170 Py_DECREF(dict);
1171 return NULL;
1172 }
1173 PyDict_SetItemString(kwargs, "synonyms", synonyms);
1174 Py_DECREF(synonyms);
1175
1176 args = PyTuple_New(0);
1177 entry = PyObject_Call(ConfigEntry_type, args, kwargs);
1178 Py_DECREF(args);
1179 Py_DECREF(kwargs);
1180 if (!entry) {
1181 Py_DECREF(dict);
1182 return NULL;
1183 }
1184
1185 PyDict_SetItemString(dict, rd_kafka_ConfigEntry_name(ent),
1186 entry);
1187 Py_DECREF(entry);
1188 }
1189
1190
1191 return dict;
1192 }
1193
1194
1195 /**
1196 * @brief Convert C ConfigResource array to dict indexed by ConfigResource
1197 * with the value of dict(ConfigEntry).
1198 *
1199 * @param ret_configs If true, return configs rather than None.
1200 */
1201 static PyObject *
Admin_c_ConfigResource_result_to_py(const rd_kafka_ConfigResource_t ** c_resources,size_t cnt,int ret_configs)1202 Admin_c_ConfigResource_result_to_py (const rd_kafka_ConfigResource_t **c_resources,
1203 size_t cnt,
1204 int ret_configs) {
1205 PyObject *result;
1206 PyObject *ConfigResource_type;
1207 PyObject *ConfigEntry_type;
1208 size_t ri;
1209
1210 ConfigResource_type = cfl_PyObject_lookup("confluent_kafka.admin",
1211 "ConfigResource");
1212 if (!ConfigResource_type)
1213 return NULL;
1214
1215 ConfigEntry_type = cfl_PyObject_lookup("confluent_kafka.admin",
1216 "ConfigEntry");
1217 if (!ConfigEntry_type) {
1218 Py_DECREF(ConfigResource_type);
1219 return NULL;
1220 }
1221
1222 result = PyDict_New();
1223
1224 for (ri = 0 ; ri < cnt ; ri++) {
1225 const rd_kafka_ConfigResource_t *c_res = c_resources[ri];
1226 const rd_kafka_ConfigEntry_t **c_configs;
1227 PyObject *kwargs, *wrap;
1228 PyObject *key;
1229 PyObject *configs, *error;
1230 size_t config_cnt;
1231
1232 c_configs = rd_kafka_ConfigResource_configs(c_res, &config_cnt);
1233 configs = Admin_c_ConfigEntries_to_py(ConfigEntry_type,
1234 c_configs, config_cnt);
1235 if (!configs)
1236 goto err;
1237
1238 error = KafkaError_new_or_None(
1239 rd_kafka_ConfigResource_error(c_res),
1240 rd_kafka_ConfigResource_error_string(c_res));
1241
1242 kwargs = PyDict_New();
1243 cfl_PyDict_SetInt(kwargs, "restype",
1244 (int)rd_kafka_ConfigResource_type(c_res));
1245 cfl_PyDict_SetString(kwargs, "name",
1246 rd_kafka_ConfigResource_name(c_res));
1247 PyDict_SetItemString(kwargs, "described_configs", configs);
1248 PyDict_SetItemString(kwargs, "error", error);
1249 Py_DECREF(error);
1250
1251 /* Instantiate ConfigResource */
1252 wrap = PyTuple_New(0);
1253 key = PyObject_Call(ConfigResource_type, wrap, kwargs);
1254 Py_DECREF(wrap);
1255 Py_DECREF(kwargs);
1256 if (!key) {
1257 Py_DECREF(configs);
1258 goto err;
1259 }
1260
1261 /* Set result to dict[ConfigResource(..)] = configs | None
1262 * depending on ret_configs */
1263 if (ret_configs)
1264 PyDict_SetItem(result, key, configs);
1265 else
1266 PyDict_SetItem(result, key, Py_None);
1267
1268 Py_DECREF(configs);
1269 Py_DECREF(key);
1270 }
1271 return result;
1272
1273 err:
1274 Py_DECREF(ConfigResource_type);
1275 Py_DECREF(ConfigEntry_type);
1276 Py_DECREF(result);
1277 return NULL;
1278 }
1279
1280
1281 /**
1282 * @brief Event callback triggered from librdkafka's background thread
1283 * when Admin API results are ready.
1284 *
1285 * The rkev opaque (not \p opaque) is the future PyObject
1286 * which we'll set the result on.
1287 *
1288 * @locality background rdkafka thread
1289 */
Admin_background_event_cb(rd_kafka_t * rk,rd_kafka_event_t * rkev,void * opaque)1290 static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
1291 void *opaque) {
1292 PyObject *future = (PyObject *)rd_kafka_event_opaque(rkev);
1293 const rd_kafka_topic_result_t **c_topic_res;
1294 size_t c_topic_res_cnt;
1295 PyGILState_STATE gstate;
1296 PyObject *error, *method, *ret;
1297 PyObject *result = NULL;
1298 PyObject *exctype = NULL, *exc = NULL, *excargs = NULL;
1299
1300 /* Acquire GIL */
1301 gstate = PyGILState_Ensure();
1302
1303 /* Generic request-level error handling. */
1304 error = KafkaError_new_or_None(rd_kafka_event_error(rkev),
1305 rd_kafka_event_error_string(rkev));
1306 if (error != Py_None)
1307 goto raise;
1308
1309 switch (rd_kafka_event_type(rkev))
1310 {
1311 case RD_KAFKA_EVENT_CREATETOPICS_RESULT:
1312 {
1313 const rd_kafka_CreateTopics_result_t *c_res;
1314
1315 c_res = rd_kafka_event_CreateTopics_result(rkev);
1316
1317 c_topic_res = rd_kafka_CreateTopics_result_topics(
1318 c_res, &c_topic_res_cnt);
1319
1320 result = Admin_c_topic_result_to_py(c_topic_res,
1321 c_topic_res_cnt);
1322 break;
1323 }
1324
1325 case RD_KAFKA_EVENT_DELETETOPICS_RESULT:
1326 {
1327 const rd_kafka_DeleteTopics_result_t *c_res;
1328
1329 c_res = rd_kafka_event_DeleteTopics_result(rkev);
1330
1331 c_topic_res = rd_kafka_DeleteTopics_result_topics(
1332 c_res, &c_topic_res_cnt);
1333
1334 result = Admin_c_topic_result_to_py(c_topic_res,
1335 c_topic_res_cnt);
1336 break;
1337 }
1338
1339 case RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT:
1340 {
1341 const rd_kafka_CreatePartitions_result_t *c_res;
1342
1343 c_res = rd_kafka_event_CreatePartitions_result(rkev);
1344
1345 c_topic_res = rd_kafka_CreatePartitions_result_topics(
1346 c_res, &c_topic_res_cnt);
1347
1348 result = Admin_c_topic_result_to_py(c_topic_res,
1349 c_topic_res_cnt);
1350 break;
1351 }
1352
1353 case RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT:
1354 {
1355 const rd_kafka_ConfigResource_t **c_resources;
1356 size_t resource_cnt;
1357
1358 c_resources = rd_kafka_DescribeConfigs_result_resources(
1359 rd_kafka_event_DescribeConfigs_result(rkev),
1360 &resource_cnt);
1361 result = Admin_c_ConfigResource_result_to_py(
1362 c_resources,
1363 resource_cnt,
1364 1/* return configs */);
1365 break;
1366 }
1367
1368 case RD_KAFKA_EVENT_ALTERCONFIGS_RESULT:
1369 {
1370 const rd_kafka_ConfigResource_t **c_resources;
1371 size_t resource_cnt;
1372
1373 c_resources = rd_kafka_AlterConfigs_result_resources(
1374 rd_kafka_event_AlterConfigs_result(rkev),
1375 &resource_cnt);
1376 result = Admin_c_ConfigResource_result_to_py(
1377 c_resources,
1378 resource_cnt,
1379 0/* return None instead of (the empty) configs */);
1380 break;
1381 }
1382
1383 default:
1384 Py_DECREF(error); /* Py_None */
1385 error = KafkaError_new0(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
1386 "Unsupported event type %s",
1387 rd_kafka_event_name(rkev));
1388 goto raise;
1389 }
1390
1391 if (!result) {
1392 Py_DECREF(error); /* Py_None */
1393 if (!PyErr_Occurred()) {
1394 error = KafkaError_new0(RD_KAFKA_RESP_ERR__INVALID_ARG,
1395 "BUG: Event %s handling failed "
1396 "but no exception raised",
1397 rd_kafka_event_name(rkev));
1398 } else {
1399 /* Extract the exception type and message
1400 * and pass it as an error to raise and subsequently
1401 * the future.
1402 * We loose the backtrace here unfortunately, so
1403 * these errors are a bit cryptic. */
1404 PyObject *trace = NULL;
1405
1406 /* Fetch (and clear) currently raised exception */
1407 PyErr_Fetch(&exctype, &error, &trace);
1408 Py_XDECREF(trace);
1409 }
1410
1411 goto raise;
1412 }
1413
1414 /*
1415 * Call future.set_result()
1416 */
1417 method = cfl_PyUnistr(_FromString("set_result"));
1418
1419 ret = PyObject_CallMethodObjArgs(future, method, result, NULL);
1420 Py_XDECREF(ret);
1421 Py_XDECREF(result);
1422 Py_DECREF(future);
1423 Py_DECREF(method);
1424
1425 /* Release GIL */
1426 PyGILState_Release(gstate);
1427
1428 rd_kafka_event_destroy(rkev);
1429
1430 return;
1431
1432 raise:
1433 /*
1434 * Pass an exception to future.set_exception().
1435 */
1436
1437 if (!exctype) {
1438 /* No previous exception raised, use KafkaException */
1439 exctype = KafkaException;
1440 Py_INCREF(exctype);
1441 }
1442
1443 /* Create a new exception based on exception type and error. */
1444 excargs = PyTuple_New(1);
1445 Py_INCREF(error); /* tuple's reference */
1446 PyTuple_SET_ITEM(excargs, 0, error);
1447 exc = ((PyTypeObject *)exctype)->tp_new(
1448 (PyTypeObject *)exctype, NULL, NULL);
1449 exc->ob_type->tp_init(exc, excargs, NULL);
1450 Py_DECREF(excargs);
1451 Py_XDECREF(exctype);
1452 Py_XDECREF(error); /* from error source above */
1453
1454 /*
1455 * Call future.set_exception(exc)
1456 */
1457 method = cfl_PyUnistr(_FromString("set_exception"));
1458 ret = PyObject_CallMethodObjArgs(future, method, exc, NULL);
1459 Py_XDECREF(ret);
1460 Py_DECREF(exc);
1461 Py_DECREF(future);
1462 Py_DECREF(method);
1463
1464 /* Release GIL */
1465 PyGILState_Release(gstate);
1466
1467 rd_kafka_event_destroy(rkev);
1468 }
1469
1470
Admin_init(PyObject * selfobj,PyObject * args,PyObject * kwargs)1471 static int Admin_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
1472 Handle *self = (Handle *)selfobj;
1473 char errstr[256];
1474 rd_kafka_conf_t *conf;
1475
1476 if (self->rk) {
1477 PyErr_SetString(PyExc_RuntimeError,
1478 "Admin already __init__:ialized");
1479 return -1;
1480 }
1481
1482 self->type = PY_RD_KAFKA_ADMIN;
1483
1484 if (!(conf = common_conf_setup(PY_RD_KAFKA_ADMIN, self,
1485 args, kwargs)))
1486 return -1;
1487
1488 rd_kafka_conf_set_background_event_cb(conf, Admin_background_event_cb);
1489
1490 /* There is no dedicated ADMIN client type in librdkafka, the Admin
1491 * API can use either PRODUCER or CONSUMER.
1492 * We choose PRODUCER since it is more lightweight than a
1493 * CONSUMER instance. */
1494 self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
1495 errstr, sizeof(errstr));
1496 if (!self->rk) {
1497 cfl_PyErr_Format(rd_kafka_last_error(),
1498 "Failed to create admin client: %s", errstr);
1499 rd_kafka_conf_destroy(conf);
1500 return -1;
1501 }
1502
1503 /* Forward log messages to poll queue */
1504 if (self->logger)
1505 rd_kafka_set_log_queue(self->rk, NULL);
1506
1507 return 0;
1508 }
1509
1510
Admin_new(PyTypeObject * type,PyObject * args,PyObject * kwargs)1511 static PyObject *Admin_new (PyTypeObject *type, PyObject *args,
1512 PyObject *kwargs) {
1513 return type->tp_alloc(type, 0);
1514 }
1515
1516
1517
1518 PyTypeObject AdminType = {
1519 PyVarObject_HEAD_INIT(NULL, 0)
1520 "cimpl._AdminClientImpl", /*tp_name*/
1521 sizeof(Handle), /*tp_basicsize*/
1522 0, /*tp_itemsize*/
1523 (destructor)Admin_dealloc, /*tp_dealloc*/
1524 0, /*tp_print*/
1525 0, /*tp_getattr*/
1526 0, /*tp_setattr*/
1527 0, /*tp_compare*/
1528 0, /*tp_repr*/
1529 0, /*tp_as_number*/
1530 &Admin_seq_methods, /*tp_as_sequence*/
1531 0, /*tp_as_mapping*/
1532 0, /*tp_hash */
1533 0, /*tp_call*/
1534 0, /*tp_str*/
1535 0, /*tp_getattro*/
1536 0, /*tp_setattro*/
1537 0, /*tp_as_buffer*/
1538 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
1539 Py_TPFLAGS_HAVE_GC, /*tp_flags*/
1540 "Kafka Admin Client\n"
1541 "\n"
1542 ".. py:function:: Admin(**kwargs)\n"
1543 "\n"
1544 " Create new AdminClient instance using provided configuration dict.\n"
1545 "\n"
1546 "This class should not be used directly, use confluent_kafka.AdminClient\n."
1547 "\n"
1548 ".. py:function:: len()\n"
1549 "\n"
1550 " :returns: Number Kafka protocol requests waiting to be delivered to, or returned from, broker.\n"
1551 " :rtype: int\n"
1552 "\n", /*tp_doc*/
1553 (traverseproc)Admin_traverse, /* tp_traverse */
1554 (inquiry)Admin_clear, /* tp_clear */
1555 0, /* tp_richcompare */
1556 0, /* tp_weaklistoffset */
1557 0, /* tp_iter */
1558 0, /* tp_iternext */
1559 Admin_methods, /* tp_methods */
1560 0, /* tp_members */
1561 0, /* tp_getset */
1562 0, /* tp_base */
1563 0, /* tp_dict */
1564 0, /* tp_descr_get */
1565 0, /* tp_descr_set */
1566 0, /* tp_dictoffset */
1567 Admin_init, /* tp_init */
1568 0, /* tp_alloc */
1569 Admin_new /* tp_new */
1570 };
1571
1572
1573
1574
1575