1 /**
2  * Copyright 2018 Confluent Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "confluent_kafka.h"
18 
19 #include <stdarg.h>
20 
21 
22 /****************************************************************************
23  *
24  *
25  * Admin Client API
26  *
27  *
28  ****************************************************************************/
29 
30 
31 
32 
Admin_clear(Handle * self)33 static int Admin_clear (Handle *self) {
34 
35         Handle_clear(self);
36 
37         return 0;
38 }
39 
Admin_dealloc(Handle * self)40 static void Admin_dealloc (Handle *self) {
41         PyObject_GC_UnTrack(self);
42 
43         if (self->rk) {
44                 CallState cs;
45                 CallState_begin(self, &cs);
46 
47                 rd_kafka_destroy(self->rk);
48 
49                 CallState_end(self, &cs);
50         }
51 
52         Admin_clear(self);
53 
54         Py_TYPE(self)->tp_free((PyObject *)self);
55 }
56 
Admin_traverse(Handle * self,visitproc visit,void * arg)57 static int Admin_traverse (Handle *self,
58                            visitproc visit, void *arg) {
59         Handle_traverse(self, visit, arg);
60 
61         return 0;
62 }
63 
64 
65 /**
66  * @name AdminOptions
67  *
68  *
69  */
70 #define Admin_options_def_int   (-12345)
71 #define Admin_options_def_float ((float)Admin_options_def_int)
72 
73 struct Admin_options {
74         int   validate_only;      /* needs special bool parsing */
75         float request_timeout;    /* parser: f */
76         float operation_timeout;  /* parser: f */
77         int   broker;             /* parser: i */
78 };
79 
80 /**@brief "unset" value initializers for Admin_options
81  * Make sure this is kept up to date with Admin_options above. */
82 #define Admin_options_INITIALIZER {                                     \
83                 Admin_options_def_int, Admin_options_def_float,         \
84                         Admin_options_def_float, Admin_options_def_int, \
85                         }
86 
87 #define Admin_options_is_set_int(v) ((v) != Admin_options_def_int)
88 #define Admin_options_is_set_float(v) Admin_options_is_set_int((int)(v))
89 
90 
91 /**
92  * @brief Convert Admin_options to rd_kafka_AdminOptions_t.
93  *
94  * @param forApi is the librdkafka name of the admin API that these options
95  *               will be used for, e.g., "CreateTopics".
96  * @param future is set as the options opaque.
97  *
98  * @returns a new C admin options object on success, or NULL on failure in
99  *          which case an exception is raised.
100  */
101 static rd_kafka_AdminOptions_t *
Admin_options_to_c(Handle * self,rd_kafka_admin_op_t for_api,const struct Admin_options * options,PyObject * future)102 Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
103                     const struct Admin_options *options,
104                     PyObject *future) {
105         rd_kafka_AdminOptions_t *c_options;
106         rd_kafka_resp_err_t err;
107         char errstr[512];
108 
109         c_options = rd_kafka_AdminOptions_new(self->rk, for_api);
110         if (!c_options) {
111                 PyErr_Format(PyExc_RuntimeError,
112                              "This Admin API method "
113                              "is unsupported by librdkafka %s",
114                              rd_kafka_version_str());
115                 return NULL;
116         }
117 
118         rd_kafka_AdminOptions_set_opaque(c_options, (void *)future);
119 
120         if (Admin_options_is_set_int(options->validate_only) &&
121             (err = rd_kafka_AdminOptions_set_validate_only(
122                     c_options, options->validate_only,
123                     errstr, sizeof(errstr))))
124                 goto err;
125 
126         if (Admin_options_is_set_float(options->request_timeout) &&
127             (err = rd_kafka_AdminOptions_set_request_timeout(
128                     c_options, (int)(options->request_timeout * 1000.0f),
129                     errstr, sizeof(errstr))))
130                 goto err;
131 
132         if (Admin_options_is_set_float(options->operation_timeout) &&
133             (err = rd_kafka_AdminOptions_set_operation_timeout(
134                     c_options, (int)(options->operation_timeout * 1000.0f),
135                     errstr, sizeof(errstr))))
136                 goto err;
137 
138         if (Admin_options_is_set_int(options->broker) &&
139             (err = rd_kafka_AdminOptions_set_broker(
140                     c_options, (int32_t)options->broker,
141                     errstr, sizeof(errstr))))
142                 goto err;
143 
144         return c_options;
145 
146  err:
147         rd_kafka_AdminOptions_destroy(c_options);
148         PyErr_Format(PyExc_ValueError, "%s", errstr);
149         return NULL;
150 }
151 
152 
153 
154 
155 /**
156  * @brief Translate Python list(list(int)) replica assignments and set
157  *        on the specified generic C object using a setter based on
158  *        forApi.
159  *
160  * @returns 1 on success or 0 on error in which case an exception is raised.
161  */
Admin_set_replica_assignment(const char * forApi,void * c_obj,PyObject * ra,int min_count,int max_count,const char * err_count_desc)162 static int Admin_set_replica_assignment (const char *forApi, void *c_obj,
163                                          PyObject *ra, int
164                                          min_count, int max_count,
165                                          const char *err_count_desc) {
166         int pi;
167 
168         if (!PyList_Check(ra) ||
169             (int)PyList_Size(ra) < min_count ||
170             (int)PyList_Size(ra) > max_count) {
171                 PyErr_Format(PyExc_ValueError,
172                              "replica_assignment must be "
173                              "a list of int lists with an "
174                              "outer size of %s", err_count_desc);
175                 return 0;
176         }
177 
178         for (pi = 0 ; pi < (int)PyList_Size(ra) ; pi++) {
179                 size_t ri;
180                 PyObject *replicas = PyList_GET_ITEM(ra, pi);
181                 rd_kafka_resp_err_t err;
182                 int32_t *c_replicas;
183                 size_t replica_cnt;
184                 char errstr[512];
185 
186                 if (!PyList_Check(replicas) ||
187                     (replica_cnt = (size_t)PyList_Size(replicas)) < 1) {
188                         PyErr_Format(
189                                 PyExc_ValueError,
190                                 "replica_assignment must be "
191                                 "a list of int lists with an "
192                                 "outer size of %s", err_count_desc);
193                         return 0;
194                 }
195 
196                 c_replicas = malloc(sizeof(*c_replicas) *
197                                     replica_cnt);
198 
199                 for (ri = 0 ; ri < replica_cnt ; ri++) {
200                         PyObject *replica =
201                                 PyList_GET_ITEM(replicas, ri);
202 
203                         if (!cfl_PyInt_Check(replica)) {
204                                 PyErr_Format(
205                                         PyExc_ValueError,
206                                         "replica_assignment must be "
207                                         "a list of int lists with an "
208                                         "outer size of %s", err_count_desc);
209                                 free(c_replicas);
210                                 return 0;
211                         }
212 
213                         c_replicas[ri] = (int32_t)cfl_PyInt_AsInt(replica);
214 
215                 }
216 
217 
218                 if (!strcmp(forApi, "CreateTopics"))
219                         err = rd_kafka_NewTopic_set_replica_assignment(
220                                 (rd_kafka_NewTopic_t *)c_obj, (int32_t)pi,
221                                 c_replicas, replica_cnt,
222                                 errstr, sizeof(errstr));
223                 else if (!strcmp(forApi, "CreatePartitions"))
224                         err = rd_kafka_NewPartitions_set_replica_assignment(
225                                 (rd_kafka_NewPartitions_t *)c_obj, (int32_t)pi,
226                                 c_replicas, replica_cnt,
227                                 errstr, sizeof(errstr));
228                 else {
229                         /* Should never be reached */
230                         err = RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
231                         snprintf(errstr, sizeof(errstr),
232                                  "Unsupported forApi %s", forApi);
233                 }
234 
235                 free(c_replicas);
236 
237                 if (err) {
238                         PyErr_SetString(
239                                 PyExc_ValueError, errstr);
240                         return 0;
241                 }
242         }
243 
244         return 1;
245 }
246 
247 /**
248  * @brief Translate a dict to ConfigResource set_config() calls,
249  *        or to NewTopic_add_config() calls.
250  *
251  *
252  * @returns 1 on success or 0 if an exception was raised.
253  */
254 static int
Admin_config_dict_to_c(void * c_obj,PyObject * dict,const char * op_name)255 Admin_config_dict_to_c (void *c_obj, PyObject *dict, const char *op_name) {
256         Py_ssize_t pos = 0;
257         PyObject *ko, *vo;
258 
259         while (PyDict_Next(dict, &pos, &ko, &vo)) {
260                 PyObject *ks, *ks8;
261                 PyObject *vs = NULL, *vs8 = NULL;
262                 const char *k;
263                 const char *v;
264                 rd_kafka_resp_err_t err;
265 
266                 if (!(ks = cfl_PyObject_Unistr(ko))) {
267                         PyErr_Format(PyExc_ValueError,
268                                      "expected %s config name to be unicode "
269                                      "string", op_name);
270                         return 0;
271                 }
272 
273                 k = cfl_PyUnistr_AsUTF8(ks, &ks8);
274 
275                 if (!(vs = cfl_PyObject_Unistr(vo)) ||
276                     !(v = cfl_PyUnistr_AsUTF8(vs, &vs8))) {
277                         PyErr_Format(PyExc_ValueError,
278                                      "expect %s config value for %s "
279                                      "to be unicode string",
280                                      op_name, k);
281                         Py_XDECREF(vs);
282                         Py_XDECREF(vs8);
283                         Py_DECREF(ks);
284                         Py_XDECREF(ks8);
285                         return 0;
286                 }
287 
288                 if (!strcmp(op_name, "set_config"))
289                         err = rd_kafka_ConfigResource_set_config(
290                                 (rd_kafka_ConfigResource_t *)c_obj,
291                                 k, v);
292                 else if (!strcmp(op_name, "newtopic_set_config"))
293                         err = rd_kafka_NewTopic_set_config(
294                                 (rd_kafka_NewTopic_t *)c_obj, k, v);
295                 else
296                         err = RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
297 
298                 if (err) {
299                         PyErr_Format(PyExc_ValueError,
300                                      "%s config %s failed: %s",
301                                      op_name, k, rd_kafka_err2str(err));
302                         Py_XDECREF(vs);
303                         Py_XDECREF(vs8);
304                         Py_DECREF(ks);
305                         Py_XDECREF(ks8);
306                         return 0;
307                 }
308 
309                 Py_XDECREF(vs);
310                 Py_XDECREF(vs8);
311                 Py_DECREF(ks);
312                 Py_XDECREF(ks8);
313         }
314 
315         return 1;
316 }
317 
318 
319 /**
320  * @brief create_topics
321  */
Admin_create_topics(Handle * self,PyObject * args,PyObject * kwargs)322 static PyObject *Admin_create_topics (Handle *self, PyObject *args,
323                                       PyObject *kwargs) {
324         PyObject *topics = NULL, *future, *validate_only_obj = NULL;
325         static char *kws[] = { "topics",
326                                "future",
327                                /* options */
328                                "validate_only",
329                                "request_timeout",
330                                "operation_timeout",
331                                NULL };
332         struct Admin_options options = Admin_options_INITIALIZER;
333         rd_kafka_AdminOptions_t *c_options = NULL;
334         int tcnt;
335         int i;
336         rd_kafka_NewTopic_t **c_objs;
337         rd_kafka_queue_t *rkqu;
338         CallState cs;
339 
340         /* topics is a list of NewTopic objects. */
341         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|Off", kws,
342                                          &topics, &future,
343                                          &validate_only_obj,
344                                          &options.request_timeout,
345                                          &options.operation_timeout))
346                 return NULL;
347 
348         if (!PyList_Check(topics) || (tcnt = (int)PyList_Size(topics)) < 1) {
349                 PyErr_SetString(PyExc_ValueError,
350                                 "Expected non-empty list of NewTopic objects");
351                 return NULL;
352         }
353 
354         if (validate_only_obj &&
355             !cfl_PyBool_get(validate_only_obj, "validate_only",
356                             &options.validate_only))
357                 return NULL;
358 
359         c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATETOPICS,
360                                        &options, future);
361         if (!c_options)
362                 return NULL; /* Exception raised by options_to_c() */
363 
364         /* options_to_c() sets future as the opaque, which is used in the
365          * background_event_cb to set the results on the future as the
366          * admin operation is finished, so we need to keep our own refcount. */
367         Py_INCREF(future);
368 
369         /*
370          * Parse the list of NewTopics and convert to corresponding C types.
371          */
372         c_objs = malloc(sizeof(*c_objs) * tcnt);
373 
374         for (i = 0 ; i < tcnt ; i++) {
375                 NewTopic *newt = (NewTopic *)PyList_GET_ITEM(topics, i);
376                 char errstr[512];
377                 int r;
378 
379                 r = PyObject_IsInstance((PyObject *)newt,
380                                         (PyObject *)&NewTopicType);
381                 if (r == -1)
382                         goto err; /* Exception raised by IsInstance() */
383                 else if (r == 0) {
384                         PyErr_SetString(PyExc_ValueError,
385                                         "Expected list of NewTopic objects");
386                         goto err;
387                 }
388 
389                 c_objs[i] = rd_kafka_NewTopic_new(newt->topic,
390                                                    newt->num_partitions,
391                                                    newt->replication_factor,
392                                                    errstr, sizeof(errstr));
393                 if (!c_objs[i]) {
394                         PyErr_Format(PyExc_ValueError,
395                                      "Invalid NewTopic(%s): %s",
396                                      newt->topic, errstr);
397                         i++;
398                         goto err;
399                 }
400 
401                 if (newt->replica_assignment) {
402                         if (newt->replication_factor != -1) {
403                                 PyErr_SetString(PyExc_ValueError,
404                                                 "replication_factor and "
405                                                 "replica_assignment are "
406                                                 "mutually exclusive");
407                                 i++;
408                                 goto err;
409                         }
410 
411                         if (!Admin_set_replica_assignment(
412                                     "CreateTopics", (void *)c_objs[i],
413                                     newt->replica_assignment,
414                                     newt->num_partitions, newt->num_partitions,
415                                     "num_partitions")) {
416                                 i++;
417                                 goto err;
418                         }
419                 }
420 
421                 if (newt->config) {
422                         if (!Admin_config_dict_to_c((void *)c_objs[i],
423                                                     newt->config,
424                                                     "newtopic_set_config")) {
425                                 i++;
426                                 goto err;
427                         }
428                 }
429         }
430 
431 
432         /* Use librdkafka's background thread queue to automatically dispatch
433          * Admin_background_event_cb() when the admin operation is finished. */
434         rkqu = rd_kafka_queue_get_background(self->rk);
435 
436         /*
437          * Call CreateTopics.
438          *
439          * We need to set up a CallState and release GIL here since
440          * the background_event_cb may be triggered immediately.
441          */
442         CallState_begin(self, &cs);
443         rd_kafka_CreateTopics(self->rk, c_objs, tcnt, c_options, rkqu);
444         CallState_end(self, &cs);
445 
446         rd_kafka_NewTopic_destroy_array(c_objs, tcnt);
447         rd_kafka_AdminOptions_destroy(c_options);
448         free(c_objs);
449         rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
450 
451         Py_RETURN_NONE;
452 
453  err:
454         rd_kafka_NewTopic_destroy_array(c_objs, i);
455         rd_kafka_AdminOptions_destroy(c_options);
456         free(c_objs);
457         Py_DECREF(future); /* from options_to_c() */
458 
459         return NULL;
460 }
461 
462 
463 /**
464  * @brief delete_topics
465  */
Admin_delete_topics(Handle * self,PyObject * args,PyObject * kwargs)466 static PyObject *Admin_delete_topics (Handle *self, PyObject *args,
467                                       PyObject *kwargs) {
468         PyObject *topics = NULL, *future;
469         static char *kws[] = { "topics",
470                                "future",
471                                /* options */
472                                "request_timeout",
473                                "operation_timeout",
474                                NULL };
475         struct Admin_options options = Admin_options_INITIALIZER;
476         rd_kafka_AdminOptions_t *c_options = NULL;
477         int tcnt;
478         int i;
479         rd_kafka_DeleteTopic_t **c_objs;
480         rd_kafka_queue_t *rkqu;
481         CallState cs;
482 
483         /* topics is a list of strings. */
484         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O|ff", kws,
485                                          (PyObject *)&PyList_Type, &topics,
486                                          &future,
487                                          &options.request_timeout,
488                                          &options.operation_timeout))
489                 return NULL;
490 
491         if (!PyList_Check(topics) || (tcnt = (int)PyList_Size(topics)) < 1) {
492                 PyErr_SetString(PyExc_ValueError,
493                                 "Expected non-empty list of topic strings");
494                 return NULL;
495         }
496 
497         c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DELETETOPICS,
498                                        &options, future);
499         if (!c_options)
500                 return NULL; /* Exception raised by options_to_c() */
501 
502         /* options_to_c() sets opaque to the future object, which is used in the
503          * background_event_cb to set the results on the future as the
504          * admin operation is finished, so we need to keep our own refcount. */
505         Py_INCREF(future);
506 
507         /*
508          * Parse the list of strings and convert to corresponding C types.
509          */
510         c_objs = malloc(sizeof(*c_objs) * tcnt);
511 
512         for (i = 0 ; i < tcnt ; i++) {
513                 PyObject *topic = PyList_GET_ITEM(topics, i);
514                 PyObject *utopic;
515                 PyObject *uotopic = NULL;
516 
517                 if (topic == Py_None ||
518                     !(utopic = cfl_PyObject_Unistr(topic))) {
519                         PyErr_Format(PyExc_ValueError,
520                                      "Expected list of topic strings, "
521                                      "not %s",
522                                      ((PyTypeObject *)PyObject_Type(topic))->
523                                      tp_name);
524                         goto err;
525                 }
526 
527                 c_objs[i] = rd_kafka_DeleteTopic_new(
528                         cfl_PyUnistr_AsUTF8(utopic, &uotopic));
529 
530                 Py_XDECREF(utopic);
531                 Py_XDECREF(uotopic);
532         }
533 
534 
535         /* Use librdkafka's background thread queue to automatically dispatch
536          * Admin_background_event_cb() when the admin operation is finished. */
537         rkqu = rd_kafka_queue_get_background(self->rk);
538 
539         /*
540          * Call DeleteTopics.
541          *
542          * We need to set up a CallState and release GIL here since
543          * the event_cb may be triggered immediately.
544          */
545         CallState_begin(self, &cs);
546         rd_kafka_DeleteTopics(self->rk, c_objs, tcnt, c_options, rkqu);
547         CallState_end(self, &cs);
548 
549         rd_kafka_DeleteTopic_destroy_array(c_objs, i);
550         rd_kafka_AdminOptions_destroy(c_options);
551         free(c_objs);
552         rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
553 
554         Py_RETURN_NONE;
555 
556  err:
557         rd_kafka_DeleteTopic_destroy_array(c_objs, i);
558         rd_kafka_AdminOptions_destroy(c_options);
559         free(c_objs);
560         Py_DECREF(future); /* from options_to_c() */
561 
562         return NULL;
563 }
564 
565 
566 /**
567  * @brief create_partitions
568  */
Admin_create_partitions(Handle * self,PyObject * args,PyObject * kwargs)569 static PyObject *Admin_create_partitions (Handle *self, PyObject *args,
570                                           PyObject *kwargs) {
571         PyObject *topics = NULL, *future, *validate_only_obj = NULL;
572         static char *kws[] = { "topics",
573                                "future",
574                                /* options */
575                                "validate_only",
576                                "request_timeout",
577                                "operation_timeout",
578                                NULL };
579         struct Admin_options options = Admin_options_INITIALIZER;
580         rd_kafka_AdminOptions_t *c_options = NULL;
581         int tcnt;
582         int i;
583         rd_kafka_NewPartitions_t **c_objs;
584         rd_kafka_queue_t *rkqu;
585         CallState cs;
586 
587         /* topics is a list of NewPartitions_t objects. */
588         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|Off", kws,
589                                          &topics, &future,
590                                          &validate_only_obj,
591                                          &options.request_timeout,
592                                          &options.operation_timeout))
593                 return NULL;
594 
595         if (!PyList_Check(topics) || (tcnt = (int)PyList_Size(topics)) < 1) {
596                 PyErr_SetString(PyExc_ValueError,
597                                 "Expected non-empty list of "
598                                 "NewPartitions objects");
599                 return NULL;
600         }
601 
602         if (validate_only_obj &&
603             !cfl_PyBool_get(validate_only_obj, "validate_only",
604                             &options.validate_only))
605                 return NULL;
606 
607         c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
608                                        &options, future);
609         if (!c_options)
610                 return NULL; /* Exception raised by options_to_c() */
611 
612         /* options_to_c() sets future as the opaque, which is used in the
613          * event_cb to set the results on the future as the admin operation
614          * is finished, so we need to keep our own refcount. */
615         Py_INCREF(future);
616 
617         /*
618          * Parse the list of NewPartitions and convert to corresponding C types.
619          */
620         c_objs = malloc(sizeof(*c_objs) * tcnt);
621 
622         for (i = 0 ; i < tcnt ; i++) {
623                 NewPartitions *newp = (NewPartitions *)PyList_GET_ITEM(topics,
624                                                                        i);
625                 char errstr[512];
626                 int r;
627 
628                 r = PyObject_IsInstance((PyObject *)newp,
629                                         (PyObject *)&NewPartitionsType);
630                 if (r == -1)
631                         goto err; /* Exception raised by IsInstance() */
632                 else if (r == 0) {
633                         PyErr_SetString(PyExc_ValueError,
634                                         "Expected list of "
635                                         "NewPartitions objects");
636                         goto err;
637                 }
638 
639                 c_objs[i] = rd_kafka_NewPartitions_new(newp->topic,
640                                                        newp->new_total_count,
641                                                        errstr, sizeof(errstr));
642                 if (!c_objs[i]) {
643                         PyErr_Format(PyExc_ValueError,
644                                      "Invalid NewPartitions(%s): %s",
645                                      newp->topic, errstr);
646                         goto err;
647                 }
648 
649                 if (newp->replica_assignment &&
650                     !Admin_set_replica_assignment(
651                             "CreatePartitions", (void *)c_objs[i],
652                             newp->replica_assignment,
653                             1, newp->new_total_count,
654                             "new_total_count - "
655                             "existing partition count")) {
656                         i++;
657                         goto err; /* Exception raised by set_..() */
658                 }
659         }
660 
661 
662         /* Use librdkafka's background thread queue to automatically dispatch
663          * Admin_background_event_cb() when the admin operation is finished. */
664         rkqu = rd_kafka_queue_get_background(self->rk);
665 
666         /*
667          * Call CreatePartitions
668          *
669          * We need to set up a CallState and release GIL here since
670          * the event_cb may be triggered immediately.
671          */
672         CallState_begin(self, &cs);
673         rd_kafka_CreatePartitions(self->rk, c_objs, tcnt, c_options, rkqu);
674         CallState_end(self, &cs);
675 
676         rd_kafka_NewPartitions_destroy_array(c_objs, tcnt);
677         rd_kafka_AdminOptions_destroy(c_options);
678         free(c_objs);
679         rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
680 
681         Py_RETURN_NONE;
682 
683  err:
684         rd_kafka_NewPartitions_destroy_array(c_objs, i);
685         rd_kafka_AdminOptions_destroy(c_options);
686         free(c_objs);
687         Py_DECREF(future); /* from options_to_c() */
688 
689         return NULL;
690 }
691 
692 
693 /**
694  * @brief describe_configs
695  */
Admin_describe_configs(Handle * self,PyObject * args,PyObject * kwargs)696 static PyObject *Admin_describe_configs (Handle *self, PyObject *args,
697                                          PyObject *kwargs) {
698         PyObject *resources, *future;
699         static char *kws[] = { "resources",
700                                "future",
701                                /* options */
702                                "request_timeout",
703                                "broker",
704                                NULL };
705         struct Admin_options options = Admin_options_INITIALIZER;
706         rd_kafka_AdminOptions_t *c_options = NULL;
707         PyObject *ConfigResource_type;
708         int cnt, i;
709         rd_kafka_ConfigResource_t **c_objs;
710         rd_kafka_queue_t *rkqu;
711         CallState cs;
712 
713         /* topics is a list of NewPartitions_t objects. */
714         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|fi", kws,
715                                          &resources, &future,
716                                          &options.request_timeout,
717                                          &options.broker))
718                 return NULL;
719 
720         if (!PyList_Check(resources) ||
721             (cnt = (int)PyList_Size(resources)) < 1) {
722                 PyErr_SetString(PyExc_ValueError,
723                                 "Expected non-empty list of ConfigResource "
724                                 "objects");
725                 return NULL;
726         }
727 
728         c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS,
729                                        &options, future);
730         if (!c_options)
731                 return NULL; /* Exception raised by options_to_c() */
732 
733         /* Look up the ConfigResource class so we can check if the provided
734          * topics are of correct type.
735          * Since this is not in the fast path we treat ourselves
736          * to the luxury of looking up this for each call. */
737         ConfigResource_type = cfl_PyObject_lookup("confluent_kafka.admin",
738                                                   "ConfigResource");
739         if (!ConfigResource_type) {
740                 rd_kafka_AdminOptions_destroy(c_options);
741                 return NULL; /* Exception raised by lookup() */
742         }
743 
744         /* options_to_c() sets future as the opaque, which is used in the
745          * event_cb to set the results on the future as the admin operation
746          * is finished, so we need to keep our own refcount. */
747         Py_INCREF(future);
748 
749         /*
750          * Parse the list of ConfigResources and convert to
751          * corresponding C types.
752          */
753         c_objs = malloc(sizeof(*c_objs) * cnt);
754 
755         for (i = 0 ; i < cnt ; i++) {
756                 PyObject *res = PyList_GET_ITEM(resources, i);
757                 int r;
758                 int restype;
759                 char *resname;
760 
761                 r = PyObject_IsInstance(res, ConfigResource_type);
762                 if (r == -1)
763                         goto err; /* Exception raised by IsInstance() */
764                 else if (r == 0) {
765                         PyErr_SetString(PyExc_ValueError,
766                                         "Expected list of "
767                                         "ConfigResource objects");
768                         goto err;
769                 }
770 
771                 if (!cfl_PyObject_GetInt(res, "restype_int", &restype, 0, 0))
772                         goto err;
773 
774                 if (!cfl_PyObject_GetString(res, "name", &resname, NULL, 0))
775                         goto err;
776 
777                 c_objs[i] = rd_kafka_ConfigResource_new(
778                         (rd_kafka_ResourceType_t)restype, resname);
779                 if (!c_objs[i]) {
780                         PyErr_Format(PyExc_ValueError,
781                                      "Invalid ConfigResource(%d,%s)",
782                                      restype, resname);
783                         free(resname);
784                         goto err;
785                 }
786                 free(resname);
787         }
788 
789 
790         /* Use librdkafka's background thread queue to automatically dispatch
791          * Admin_background_event_cb() when the admin operation is finished. */
792         rkqu = rd_kafka_queue_get_background(self->rk);
793 
794         /*
795          * Call DescribeConfigs
796          *
797          * We need to set up a CallState and release GIL here since
798          * the event_cb may be triggered immediately.
799          */
800         CallState_begin(self, &cs);
801         rd_kafka_DescribeConfigs(self->rk, c_objs, cnt, c_options, rkqu);
802         CallState_end(self, &cs);
803 
804         rd_kafka_ConfigResource_destroy_array(c_objs, cnt);
805         rd_kafka_AdminOptions_destroy(c_options);
806         free(c_objs);
807         rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
808 
809         Py_DECREF(ConfigResource_type); /* from lookup() */
810 
811         Py_RETURN_NONE;
812 
813  err:
814         rd_kafka_ConfigResource_destroy_array(c_objs, i);
815         rd_kafka_AdminOptions_destroy(c_options);
816         free(c_objs);
817         Py_DECREF(ConfigResource_type); /* from lookup() */
818         Py_DECREF(future); /* from options_to_c() */
819 
820         return NULL;
821 }
822 
823 
824 
825 
826 /**
827  * @brief alter_configs
828  */
Admin_alter_configs(Handle * self,PyObject * args,PyObject * kwargs)829 static PyObject *Admin_alter_configs (Handle *self, PyObject *args,
830                                          PyObject *kwargs) {
831         PyObject *resources, *future;
832         PyObject *validate_only_obj = NULL;
833         static char *kws[] = { "resources",
834                                "future",
835                                /* options */
836                                "validate_only",
837                                "request_timeout",
838                                "broker",
839                                NULL };
840         struct Admin_options options = Admin_options_INITIALIZER;
841         rd_kafka_AdminOptions_t *c_options = NULL;
842         PyObject *ConfigResource_type;
843         int cnt, i;
844         rd_kafka_ConfigResource_t **c_objs;
845         rd_kafka_queue_t *rkqu;
846         CallState cs;
847 
848         /* topics is a list of NewPartitions_t objects. */
849         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|Ofi", kws,
850                                          &resources, &future,
851                                          &validate_only_obj,
852                                          &options.request_timeout,
853                                          &options.broker))
854                 return NULL;
855 
856         if (!PyList_Check(resources) ||
857             (cnt = (int)PyList_Size(resources)) < 1) {
858                 PyErr_SetString(PyExc_ValueError,
859                                 "Expected non-empty list of ConfigResource "
860                                 "objects");
861                 return NULL;
862         }
863 
864         if (validate_only_obj &&
865             !cfl_PyBool_get(validate_only_obj, "validate_only",
866                             &options.validate_only))
867                 return NULL;
868 
869         c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ALTERCONFIGS,
870                                        &options, future);
871         if (!c_options)
872                 return NULL; /* Exception raised by options_to_c() */
873 
874         /* Look up the ConfigResource class so we can check if the provided
875          * topics are of correct type.
876          * Since this is not in the fast path we treat ourselves
877          * to the luxury of looking up this for each call. */
878         ConfigResource_type = cfl_PyObject_lookup("confluent_kafka.admin",
879                                                   "ConfigResource");
880         if (!ConfigResource_type) {
881                 rd_kafka_AdminOptions_destroy(c_options);
882                 return NULL; /* Exception raised by find() */
883         }
884 
885         /* options_to_c() sets future as the opaque, which is used in the
886          * event_cb to set the results on the future as the admin operation
887          * is finished, so we need to keep our own refcount. */
888         Py_INCREF(future);
889 
890         /*
891          * Parse the list of ConfigResources and convert to
892          * corresponding C types.
893          */
894         c_objs = malloc(sizeof(*c_objs) * cnt);
895 
896         for (i = 0 ; i < cnt ; i++) {
897                 PyObject *res = PyList_GET_ITEM(resources, i);
898                 int r;
899                 int restype;
900                 char *resname;
901                 PyObject *dict;
902 
903                 r = PyObject_IsInstance(res, ConfigResource_type);
904                 if (r == -1)
905                         goto err; /* Exception raised by IsInstance() */
906                 else if (r == 0) {
907                         PyErr_SetString(PyExc_ValueError,
908                                         "Expected list of "
909                                         "ConfigResource objects");
910                         goto err;
911                 }
912 
913                 if (!cfl_PyObject_GetInt(res, "restype_int", &restype, 0, 0))
914                         goto err;
915 
916                 if (!cfl_PyObject_GetString(res, "name", &resname, NULL, 0))
917                         goto err;
918 
919                 c_objs[i] = rd_kafka_ConfigResource_new(
920                         (rd_kafka_ResourceType_t)restype, resname);
921                 if (!c_objs[i]) {
922                         PyErr_Format(PyExc_ValueError,
923                                      "Invalid ConfigResource(%d,%s)",
924                                      restype, resname);
925                         free(resname);
926                         goto err;
927                 }
928                 free(resname);
929 
930                 /*
931                  * Translate and apply config entries in the various dicts.
932                  */
933                 if (!cfl_PyObject_GetAttr(res, "set_config_dict", &dict,
934                                           &PyDict_Type, 1)) {
935                         i++;
936                         goto err;
937                 }
938                 if (!Admin_config_dict_to_c(c_objs[i], dict, "set_config")) {
939                         Py_DECREF(dict);
940                         i++;
941                         goto err;
942                 }
943                 Py_DECREF(dict);
944         }
945 
946 
947         /* Use librdkafka's background thread queue to automatically dispatch
948          * Admin_background_event_cb() when the admin operation is finished. */
949         rkqu = rd_kafka_queue_get_background(self->rk);
950 
951         /*
952          * Call AlterConfigs
953          *
954          * We need to set up a CallState and release GIL here since
955          * the event_cb may be triggered immediately.
956          */
957         CallState_begin(self, &cs);
958         rd_kafka_AlterConfigs(self->rk, c_objs, cnt, c_options, rkqu);
959         CallState_end(self, &cs);
960 
961         rd_kafka_ConfigResource_destroy_array(c_objs, cnt);
962         rd_kafka_AdminOptions_destroy(c_options);
963         free(c_objs);
964         rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
965 
966         Py_DECREF(ConfigResource_type); /* from lookup() */
967 
968         Py_RETURN_NONE;
969 
970  err:
971         rd_kafka_ConfigResource_destroy_array(c_objs, i);
972         rd_kafka_AdminOptions_destroy(c_options);
973         free(c_objs);
974         Py_DECREF(ConfigResource_type); /* from lookup() */
975         Py_DECREF(future); /* from options_to_c() */
976 
977         return NULL;
978 }
979 
980 
981 
982 /**
983  * @brief Call rd_kafka_poll() and keep track of crashing callbacks.
984  * @returns -1 if callback crashed (or poll() failed), else the number
985  * of events served.
986  */
Admin_poll0(Handle * self,int tmout)987 static int Admin_poll0 (Handle *self, int tmout) {
988         int r;
989         CallState cs;
990 
991         CallState_begin(self, &cs);
992 
993         r = rd_kafka_poll(self->rk, tmout);
994 
995         if (!CallState_end(self, &cs)) {
996                 return -1;
997         }
998 
999         return r;
1000 }
1001 
1002 
Admin_poll(Handle * self,PyObject * args,PyObject * kwargs)1003 static PyObject *Admin_poll (Handle *self, PyObject *args,
1004                              PyObject *kwargs) {
1005         double tmout;
1006         int r;
1007         static char *kws[] = { "timeout", NULL };
1008 
1009         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d", kws, &tmout))
1010                 return NULL;
1011 
1012         r = Admin_poll0(self, (int)(tmout * 1000));
1013         if (r == -1)
1014                 return NULL;
1015 
1016         return cfl_PyInt_FromInt(r);
1017 }
1018 
1019 
1020 
1021 static PyMethodDef Admin_methods[] = {
1022         { "create_topics", (PyCFunction)Admin_create_topics,
1023           METH_VARARGS|METH_KEYWORDS,
1024           ".. py:function:: create_topics(topics, future, [validate_only, request_timeout, operation_timeout])\n"
1025           "\n"
1026           "  Create new topics.\n"
1027           "\n"
1028           "  This method should not be used directly, use confluent_kafka.AdminClient.create_topics()\n"
1029         },
1030 
1031         { "delete_topics", (PyCFunction)Admin_delete_topics,
1032           METH_VARARGS|METH_KEYWORDS,
1033           ".. py:function:: delete_topics(topics, future, [request_timeout, operation_timeout])\n"
1034           "\n"
1035           "  This method should not be used directly, use confluent_kafka.AdminClient.delete_topics()\n"
1036         },
1037 
1038         { "create_partitions", (PyCFunction)Admin_create_partitions,
1039           METH_VARARGS|METH_KEYWORDS,
1040           ".. py:function:: create_partitions(topics, future, [validate_only, request_timeout, operation_timeout])\n"
1041           "\n"
1042           "  This method should not be used directly, use confluent_kafka.AdminClient.create_partitions()\n"
1043         },
1044 
1045         { "describe_configs", (PyCFunction)Admin_describe_configs,
1046           METH_VARARGS|METH_KEYWORDS,
1047           ".. py:function:: describe_configs(resources, future, [request_timeout, broker])\n"
1048           "\n"
1049           "  This method should not be used directly, use confluent_kafka.AdminClient.describe_configs()\n"
1050         },
1051 
1052         { "alter_configs", (PyCFunction)Admin_alter_configs,
1053           METH_VARARGS|METH_KEYWORDS,
1054           ".. py:function:: alter_configs(resources, future, [request_timeout, broker])\n"
1055           "\n"
1056           "  This method should not be used directly, use confluent_kafka.AdminClient.alter_configs()\n"
1057         },
1058 
1059 
1060         { "poll", (PyCFunction)Admin_poll, METH_VARARGS|METH_KEYWORDS,
1061           ".. py:function:: poll([timeout])\n"
1062           "\n"
1063           "  Polls the Admin client for event callbacks, such as error_cb, "
1064           "stats_cb, etc, if registered.\n"
1065           "\n"
1066           "  There is no need to call poll() if no callbacks have been registered.\n"
1067           "\n"
1068           "  :param float timeout: Maximum time to block waiting for events. (Seconds)\n"
1069           "  :returns: Number of events processed (callbacks served)\n"
1070           "  :rtype: int\n"
1071           "\n"
1072         },
1073 
1074         { "list_topics", (PyCFunction)list_topics, METH_VARARGS|METH_KEYWORDS,
1075           list_topics_doc
1076         },
1077 
1078         { NULL }
1079 };
1080 
1081 
Admin__len__(Handle * self)1082 static Py_ssize_t Admin__len__ (Handle *self) {
1083         return rd_kafka_outq_len(self->rk);
1084 }
1085 
1086 
1087 static PySequenceMethods Admin_seq_methods = {
1088         (lenfunc)Admin__len__ /* sq_length */
1089 };
1090 
1091 
1092 /**
1093  * @brief Convert C topic_result_t array to topic-indexed dict.
1094  */
1095 static PyObject *
Admin_c_topic_result_to_py(const rd_kafka_topic_result_t ** c_result,size_t cnt)1096 Admin_c_topic_result_to_py (const rd_kafka_topic_result_t **c_result,
1097                             size_t cnt) {
1098         PyObject *result;
1099         size_t ti;
1100 
1101         result = PyDict_New();
1102 
1103         for (ti = 0 ; ti < cnt ; ti++) {
1104                 PyObject *error;
1105 
1106                 error = KafkaError_new_or_None(
1107                         rd_kafka_topic_result_error(c_result[ti]),
1108                         rd_kafka_topic_result_error_string(c_result[ti]));
1109 
1110                 PyDict_SetItemString(
1111                         result,
1112                         rd_kafka_topic_result_name(c_result[ti]),
1113                         error);
1114 
1115                 Py_DECREF(error);
1116         }
1117 
1118         return result;
1119 }
1120 
1121 
1122 
1123 /**
1124  * @brief Convert C ConfigEntry array to dict of py ConfigEntry objects.
1125  */
1126 static PyObject *
Admin_c_ConfigEntries_to_py(PyObject * ConfigEntry_type,const rd_kafka_ConfigEntry_t ** c_configs,size_t config_cnt)1127 Admin_c_ConfigEntries_to_py (PyObject *ConfigEntry_type,
1128                              const rd_kafka_ConfigEntry_t **c_configs,
1129                              size_t config_cnt) {
1130         PyObject *dict;
1131         size_t ci;
1132 
1133         dict = PyDict_New();
1134 
1135         for (ci = 0 ; ci < config_cnt ; ci++) {
1136                 PyObject *kwargs, *args;
1137                 const rd_kafka_ConfigEntry_t *ent = c_configs[ci];
1138                 const rd_kafka_ConfigEntry_t **c_synonyms;
1139                 PyObject *entry, *synonyms;
1140                 size_t synonym_cnt;
1141                 const char *val;
1142 
1143                 kwargs = PyDict_New();
1144 
1145                 cfl_PyDict_SetString(kwargs, "name",
1146                                      rd_kafka_ConfigEntry_name(ent));
1147                 val = rd_kafka_ConfigEntry_value(ent);
1148                 if (val)
1149                         cfl_PyDict_SetString(kwargs, "value", val);
1150                 else
1151                         PyDict_SetItemString(kwargs, "value", Py_None);
1152                 cfl_PyDict_SetInt(kwargs, "source",
1153                                   (int)rd_kafka_ConfigEntry_source(ent));
1154                 cfl_PyDict_SetInt(kwargs, "is_read_only",
1155                                   rd_kafka_ConfigEntry_is_read_only(ent));
1156                 cfl_PyDict_SetInt(kwargs, "is_default",
1157                                   rd_kafka_ConfigEntry_is_default(ent));
1158                 cfl_PyDict_SetInt(kwargs, "is_sensitive",
1159                                   rd_kafka_ConfigEntry_is_sensitive(ent));
1160                 cfl_PyDict_SetInt(kwargs, "is_synonym",
1161                                   rd_kafka_ConfigEntry_is_synonym(ent));
1162 
1163                 c_synonyms = rd_kafka_ConfigEntry_synonyms(ent,
1164                                                            &synonym_cnt);
1165                 synonyms = Admin_c_ConfigEntries_to_py(ConfigEntry_type,
1166                                                        c_synonyms,
1167                                                        synonym_cnt);
1168                 if (!synonyms) {
1169                         Py_DECREF(kwargs);
1170                         Py_DECREF(dict);
1171                         return NULL;
1172                 }
1173                 PyDict_SetItemString(kwargs, "synonyms", synonyms);
1174                 Py_DECREF(synonyms);
1175 
1176                 args = PyTuple_New(0);
1177                 entry = PyObject_Call(ConfigEntry_type, args, kwargs);
1178                 Py_DECREF(args);
1179                 Py_DECREF(kwargs);
1180                 if (!entry) {
1181                         Py_DECREF(dict);
1182                         return NULL;
1183                 }
1184 
1185                 PyDict_SetItemString(dict, rd_kafka_ConfigEntry_name(ent),
1186                                      entry);
1187                 Py_DECREF(entry);
1188         }
1189 
1190 
1191         return dict;
1192 }
1193 
1194 
1195 /**
1196  * @brief Convert C ConfigResource array to dict indexed by ConfigResource
1197  *        with the value of dict(ConfigEntry).
1198  *
1199  * @param ret_configs If true, return configs rather than None.
1200  */
1201 static PyObject *
Admin_c_ConfigResource_result_to_py(const rd_kafka_ConfigResource_t ** c_resources,size_t cnt,int ret_configs)1202 Admin_c_ConfigResource_result_to_py (const rd_kafka_ConfigResource_t **c_resources,
1203                                      size_t cnt,
1204                                      int ret_configs) {
1205         PyObject *result;
1206         PyObject *ConfigResource_type;
1207         PyObject *ConfigEntry_type;
1208         size_t ri;
1209 
1210         ConfigResource_type = cfl_PyObject_lookup("confluent_kafka.admin",
1211                                                   "ConfigResource");
1212         if (!ConfigResource_type)
1213                 return NULL;
1214 
1215         ConfigEntry_type = cfl_PyObject_lookup("confluent_kafka.admin",
1216                                                "ConfigEntry");
1217         if (!ConfigEntry_type) {
1218                 Py_DECREF(ConfigResource_type);
1219                 return NULL;
1220         }
1221 
1222         result = PyDict_New();
1223 
1224         for (ri = 0 ; ri < cnt ; ri++) {
1225                 const rd_kafka_ConfigResource_t *c_res = c_resources[ri];
1226                 const rd_kafka_ConfigEntry_t **c_configs;
1227                 PyObject *kwargs, *wrap;
1228                 PyObject *key;
1229                 PyObject *configs, *error;
1230                 size_t config_cnt;
1231 
1232                 c_configs = rd_kafka_ConfigResource_configs(c_res, &config_cnt);
1233                 configs = Admin_c_ConfigEntries_to_py(ConfigEntry_type,
1234                                                       c_configs, config_cnt);
1235                 if (!configs)
1236                         goto err;
1237 
1238                 error = KafkaError_new_or_None(
1239                         rd_kafka_ConfigResource_error(c_res),
1240                         rd_kafka_ConfigResource_error_string(c_res));
1241 
1242                 kwargs = PyDict_New();
1243                 cfl_PyDict_SetInt(kwargs, "restype",
1244                                   (int)rd_kafka_ConfigResource_type(c_res));
1245                 cfl_PyDict_SetString(kwargs, "name",
1246                                      rd_kafka_ConfigResource_name(c_res));
1247                 PyDict_SetItemString(kwargs, "described_configs", configs);
1248                 PyDict_SetItemString(kwargs, "error", error);
1249                 Py_DECREF(error);
1250 
1251                 /* Instantiate ConfigResource */
1252                 wrap = PyTuple_New(0);
1253                 key = PyObject_Call(ConfigResource_type, wrap, kwargs);
1254                 Py_DECREF(wrap);
1255                 Py_DECREF(kwargs);
1256                 if (!key) {
1257                         Py_DECREF(configs);
1258                         goto err;
1259                 }
1260 
1261                 /* Set result to dict[ConfigResource(..)] = configs | None
1262                  * depending on ret_configs */
1263                 if (ret_configs)
1264                         PyDict_SetItem(result, key, configs);
1265                 else
1266                         PyDict_SetItem(result, key, Py_None);
1267 
1268                 Py_DECREF(configs);
1269                 Py_DECREF(key);
1270         }
1271         return result;
1272 
1273  err:
1274         Py_DECREF(ConfigResource_type);
1275         Py_DECREF(ConfigEntry_type);
1276         Py_DECREF(result);
1277         return NULL;
1278 }
1279 
1280 
1281 /**
1282  * @brief Event callback triggered from librdkafka's background thread
1283  *        when Admin API results are ready.
1284  *
1285  *        The rkev opaque (not \p opaque) is the future PyObject
1286  *        which we'll set the result on.
1287  *
1288  * @locality background rdkafka thread
1289  */
Admin_background_event_cb(rd_kafka_t * rk,rd_kafka_event_t * rkev,void * opaque)1290 static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
1291                                        void *opaque) {
1292         PyObject *future = (PyObject *)rd_kafka_event_opaque(rkev);
1293         const rd_kafka_topic_result_t **c_topic_res;
1294         size_t c_topic_res_cnt;
1295         PyGILState_STATE gstate;
1296         PyObject *error, *method, *ret;
1297         PyObject *result = NULL;
1298         PyObject *exctype = NULL, *exc = NULL, *excargs = NULL;
1299 
1300         /* Acquire GIL */
1301         gstate = PyGILState_Ensure();
1302 
1303         /* Generic request-level error handling. */
1304         error = KafkaError_new_or_None(rd_kafka_event_error(rkev),
1305                                        rd_kafka_event_error_string(rkev));
1306         if (error != Py_None)
1307                 goto raise;
1308 
1309         switch (rd_kafka_event_type(rkev))
1310         {
1311         case RD_KAFKA_EVENT_CREATETOPICS_RESULT:
1312         {
1313                 const rd_kafka_CreateTopics_result_t *c_res;
1314 
1315                 c_res = rd_kafka_event_CreateTopics_result(rkev);
1316 
1317                 c_topic_res = rd_kafka_CreateTopics_result_topics(
1318                         c_res, &c_topic_res_cnt);
1319 
1320                 result = Admin_c_topic_result_to_py(c_topic_res,
1321                                                     c_topic_res_cnt);
1322                 break;
1323         }
1324 
1325         case RD_KAFKA_EVENT_DELETETOPICS_RESULT:
1326         {
1327                 const rd_kafka_DeleteTopics_result_t *c_res;
1328 
1329                 c_res = rd_kafka_event_DeleteTopics_result(rkev);
1330 
1331                 c_topic_res = rd_kafka_DeleteTopics_result_topics(
1332                         c_res, &c_topic_res_cnt);
1333 
1334                 result = Admin_c_topic_result_to_py(c_topic_res,
1335                                                     c_topic_res_cnt);
1336                 break;
1337         }
1338 
1339         case RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT:
1340         {
1341                 const rd_kafka_CreatePartitions_result_t *c_res;
1342 
1343                 c_res = rd_kafka_event_CreatePartitions_result(rkev);
1344 
1345                 c_topic_res = rd_kafka_CreatePartitions_result_topics(
1346                         c_res, &c_topic_res_cnt);
1347 
1348                 result = Admin_c_topic_result_to_py(c_topic_res,
1349                                                     c_topic_res_cnt);
1350                 break;
1351         }
1352 
1353         case RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT:
1354         {
1355                 const rd_kafka_ConfigResource_t **c_resources;
1356                 size_t resource_cnt;
1357 
1358                 c_resources = rd_kafka_DescribeConfigs_result_resources(
1359                         rd_kafka_event_DescribeConfigs_result(rkev),
1360                         &resource_cnt);
1361                 result = Admin_c_ConfigResource_result_to_py(
1362                         c_resources,
1363                         resource_cnt,
1364                         1/* return configs */);
1365                 break;
1366         }
1367 
1368         case RD_KAFKA_EVENT_ALTERCONFIGS_RESULT:
1369         {
1370                 const rd_kafka_ConfigResource_t **c_resources;
1371                 size_t resource_cnt;
1372 
1373                 c_resources = rd_kafka_AlterConfigs_result_resources(
1374                         rd_kafka_event_AlterConfigs_result(rkev),
1375                         &resource_cnt);
1376                 result = Admin_c_ConfigResource_result_to_py(
1377                         c_resources,
1378                         resource_cnt,
1379                         0/* return None instead of (the empty) configs */);
1380                 break;
1381         }
1382 
1383         default:
1384                 Py_DECREF(error); /* Py_None */
1385                 error = KafkaError_new0(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
1386                                         "Unsupported event type %s",
1387                                         rd_kafka_event_name(rkev));
1388                 goto raise;
1389         }
1390 
1391         if (!result) {
1392                 Py_DECREF(error); /* Py_None */
1393                 if (!PyErr_Occurred()) {
1394                         error = KafkaError_new0(RD_KAFKA_RESP_ERR__INVALID_ARG,
1395                                                 "BUG: Event %s handling failed "
1396                                                 "but no exception raised",
1397                                                 rd_kafka_event_name(rkev));
1398                 } else {
1399                         /* Extract the exception type and message
1400                          * and pass it as an error to raise and subsequently
1401                          * the future.
1402                          * We loose the backtrace here unfortunately, so
1403                          * these errors are a bit cryptic. */
1404                         PyObject *trace = NULL;
1405 
1406                         /* Fetch (and clear) currently raised exception */
1407                         PyErr_Fetch(&exctype, &error, &trace);
1408                         Py_XDECREF(trace);
1409                 }
1410 
1411                 goto raise;
1412         }
1413 
1414         /*
1415          * Call future.set_result()
1416          */
1417         method = cfl_PyUnistr(_FromString("set_result"));
1418 
1419         ret = PyObject_CallMethodObjArgs(future, method, result, NULL);
1420         Py_XDECREF(ret);
1421         Py_XDECREF(result);
1422         Py_DECREF(future);
1423         Py_DECREF(method);
1424 
1425         /* Release GIL */
1426         PyGILState_Release(gstate);
1427 
1428         rd_kafka_event_destroy(rkev);
1429 
1430         return;
1431 
1432  raise:
1433         /*
1434          * Pass an exception to future.set_exception().
1435          */
1436 
1437         if (!exctype) {
1438                 /* No previous exception raised, use KafkaException */
1439                 exctype = KafkaException;
1440                 Py_INCREF(exctype);
1441         }
1442 
1443         /* Create a new exception based on exception type and error. */
1444         excargs = PyTuple_New(1);
1445         Py_INCREF(error); /* tuple's reference */
1446         PyTuple_SET_ITEM(excargs, 0, error);
1447         exc = ((PyTypeObject *)exctype)->tp_new(
1448                 (PyTypeObject *)exctype, NULL, NULL);
1449         exc->ob_type->tp_init(exc, excargs, NULL);
1450         Py_DECREF(excargs);
1451         Py_XDECREF(exctype);
1452         Py_XDECREF(error); /* from error source above */
1453 
1454         /*
1455          * Call future.set_exception(exc)
1456          */
1457         method = cfl_PyUnistr(_FromString("set_exception"));
1458         ret = PyObject_CallMethodObjArgs(future, method, exc, NULL);
1459         Py_XDECREF(ret);
1460         Py_DECREF(exc);
1461         Py_DECREF(future);
1462         Py_DECREF(method);
1463 
1464         /* Release GIL */
1465         PyGILState_Release(gstate);
1466 
1467         rd_kafka_event_destroy(rkev);
1468 }
1469 
1470 
Admin_init(PyObject * selfobj,PyObject * args,PyObject * kwargs)1471 static int Admin_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
1472         Handle *self = (Handle *)selfobj;
1473         char errstr[256];
1474         rd_kafka_conf_t *conf;
1475 
1476         if (self->rk) {
1477                 PyErr_SetString(PyExc_RuntimeError,
1478                                 "Admin already __init__:ialized");
1479                 return -1;
1480         }
1481 
1482         self->type = PY_RD_KAFKA_ADMIN;
1483 
1484         if (!(conf = common_conf_setup(PY_RD_KAFKA_ADMIN, self,
1485                                        args, kwargs)))
1486                 return -1;
1487 
1488         rd_kafka_conf_set_background_event_cb(conf, Admin_background_event_cb);
1489 
1490         /* There is no dedicated ADMIN client type in librdkafka, the Admin
1491          * API can use either PRODUCER or CONSUMER.
1492          * We choose PRODUCER since it is more lightweight than a
1493          * CONSUMER instance. */
1494         self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
1495                                 errstr, sizeof(errstr));
1496         if (!self->rk) {
1497                 cfl_PyErr_Format(rd_kafka_last_error(),
1498                                  "Failed to create admin client: %s", errstr);
1499                 rd_kafka_conf_destroy(conf);
1500                 return -1;
1501         }
1502 
1503         /* Forward log messages to poll queue */
1504         if (self->logger)
1505                 rd_kafka_set_log_queue(self->rk, NULL);
1506 
1507         return 0;
1508 }
1509 
1510 
Admin_new(PyTypeObject * type,PyObject * args,PyObject * kwargs)1511 static PyObject *Admin_new (PyTypeObject *type, PyObject *args,
1512                             PyObject *kwargs) {
1513         return type->tp_alloc(type, 0);
1514 }
1515 
1516 
1517 
1518 PyTypeObject AdminType = {
1519         PyVarObject_HEAD_INIT(NULL, 0)
1520         "cimpl._AdminClientImpl",   /*tp_name*/
1521         sizeof(Handle),            /*tp_basicsize*/
1522         0,                         /*tp_itemsize*/
1523         (destructor)Admin_dealloc, /*tp_dealloc*/
1524         0,                         /*tp_print*/
1525         0,                         /*tp_getattr*/
1526         0,                         /*tp_setattr*/
1527         0,                         /*tp_compare*/
1528         0,                         /*tp_repr*/
1529         0,                         /*tp_as_number*/
1530         &Admin_seq_methods,        /*tp_as_sequence*/
1531         0,                         /*tp_as_mapping*/
1532         0,                         /*tp_hash */
1533         0,                         /*tp_call*/
1534         0,                         /*tp_str*/
1535         0,                         /*tp_getattro*/
1536         0,                         /*tp_setattro*/
1537         0,                         /*tp_as_buffer*/
1538         Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
1539         Py_TPFLAGS_HAVE_GC, /*tp_flags*/
1540         "Kafka Admin Client\n"
1541         "\n"
1542         ".. py:function:: Admin(**kwargs)\n"
1543         "\n"
1544         "  Create new AdminClient instance using provided configuration dict.\n"
1545         "\n"
1546         "This class should not be used directly, use confluent_kafka.AdminClient\n."
1547         "\n"
1548         ".. py:function:: len()\n"
1549         "\n"
1550         "  :returns: Number Kafka protocol requests waiting to be delivered to, or returned from, broker.\n"
1551         "  :rtype: int\n"
1552         "\n", /*tp_doc*/
1553         (traverseproc)Admin_traverse, /* tp_traverse */
1554         (inquiry)Admin_clear,      /* tp_clear */
1555         0,                         /* tp_richcompare */
1556         0,                         /* tp_weaklistoffset */
1557         0,                         /* tp_iter */
1558         0,                         /* tp_iternext */
1559         Admin_methods,             /* tp_methods */
1560         0,                         /* tp_members */
1561         0,                         /* tp_getset */
1562         0,                         /* tp_base */
1563         0,                         /* tp_dict */
1564         0,                         /* tp_descr_get */
1565         0,                         /* tp_descr_set */
1566         0,                         /* tp_dictoffset */
1567         Admin_init,                /* tp_init */
1568         0,                         /* tp_alloc */
1569         Admin_new                  /* tp_new */
1570 };
1571 
1572 
1573 
1574 
1575