1 /**
2  * Copyright 2018 Confluent Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "confluent_kafka.h"
18 
19 
20 /**
21  * @name Cluster and topic metadata retrieval
22  *
23  */
24 
25 
26 /**
27  * @returns a dict<partition_id, PartitionMetadata>,
28  *          or NULL (and exception) on error.
29  */
30 static PyObject *
c_partitions_to_py(Handle * self,const rd_kafka_metadata_partition_t * c_partitions,int partition_cnt)31 c_partitions_to_py (Handle *self,
32                     const rd_kafka_metadata_partition_t *c_partitions,
33                     int partition_cnt) {
34         PyObject *PartitionMetadata_type;
35         PyObject *dict;
36         int i;
37 
38         PartitionMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
39                                                      "PartitionMetadata");
40         if (!PartitionMetadata_type)
41                 return NULL;
42 
43         dict = PyDict_New();
44         if (!dict)
45                 goto err;
46 
47         for (i = 0 ; i < partition_cnt ; i++) {
48                 PyObject *partition, *key;
49                 PyObject *error, *replicas, *isrs;
50 
51                 partition = PyObject_CallObject(PartitionMetadata_type, NULL);
52                 if (!partition)
53                         goto err;
54 
55                 key = cfl_PyInt_FromInt(c_partitions[i].id);
56 
57                 if (PyDict_SetItem(dict, key, partition) == -1) {
58                         Py_DECREF(key);
59                         Py_DECREF(partition);
60                         goto err;
61                 }
62 
63                 Py_DECREF(key);
64                 Py_DECREF(partition);
65 
66                 if (cfl_PyObject_SetInt(partition, "id",
67                                         (int)c_partitions[i].id) == -1)
68                         goto err;
69                 if (cfl_PyObject_SetInt(partition, "leader",
70                                         (int)c_partitions[i].leader) == -1)
71                         goto err;
72 
73                 error = KafkaError_new_or_None(c_partitions[i].err, NULL);
74 
75                 if (PyObject_SetAttrString(partition, "error", error) == -1) {
76                         Py_DECREF(error);
77                         goto err;
78                 }
79 
80                 Py_DECREF(error);
81 
82                 /* replicas */
83                 replicas = cfl_int32_array_to_py_list(
84                         c_partitions[i].replicas,
85                         (size_t)c_partitions[i].replica_cnt);
86                 if (!replicas)
87                         goto err;
88 
89                 if (PyObject_SetAttrString(partition, "replicas",
90                                            replicas) == -1) {
91                         Py_DECREF(replicas);
92                         goto err;
93                 }
94                 Py_DECREF(replicas);
95 
96                 /* isrs */
97                 isrs = cfl_int32_array_to_py_list(
98                         c_partitions[i].isrs, (size_t)c_partitions[i].isr_cnt);
99                 if (!isrs)
100                         goto err;
101 
102                 if (PyObject_SetAttrString(partition, "isrs", isrs) == -1) {
103                         Py_DECREF(isrs);
104                         goto err;
105                 }
106                 Py_DECREF(isrs);
107         }
108 
109         Py_DECREF(PartitionMetadata_type);
110         return dict;
111 
112  err:
113         Py_DECREF(PartitionMetadata_type);
114         Py_XDECREF(dict);
115         return NULL;
116 }
117 
118 
119 /**
120  * @returns a dict<topic, TopicMetadata>, or NULL (and exception) on error.
121  */
122 static PyObject *
c_topics_to_py(Handle * self,const rd_kafka_metadata_topic_t * c_topics,int topic_cnt)123 c_topics_to_py (Handle *self, const rd_kafka_metadata_topic_t *c_topics,
124                 int topic_cnt) {
125         PyObject *TopicMetadata_type;
126         PyObject *dict;
127         int i;
128 
129         TopicMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
130                                                   "TopicMetadata");
131         if (!TopicMetadata_type)
132                 return NULL;
133 
134         dict = PyDict_New();
135         if (!dict)
136                 goto err;
137 
138         for (i = 0 ; i < topic_cnt ; i++) {
139                 PyObject *topic;
140                 PyObject *error, *partitions;
141 
142                 topic = PyObject_CallObject(TopicMetadata_type, NULL);
143                 if (!topic)
144                         goto err;
145 
146                 if (PyDict_SetItemString(dict, c_topics[i].topic,
147                                          topic) == -1) {
148                         Py_DECREF(topic);
149                         goto err;
150                 }
151 
152                 Py_DECREF(topic);
153 
154                 if (cfl_PyObject_SetString(topic, "topic",
155                                            c_topics[i].topic) == -1)
156                         goto err;
157 
158                 error = KafkaError_new_or_None(c_topics[i].err, NULL);
159 
160                 if (PyObject_SetAttrString(topic, "error", error) == -1) {
161                         Py_DECREF(error);
162                         goto err;
163                 }
164 
165                 Py_DECREF(error);
166 
167                 /* partitions dict */
168                 partitions = c_partitions_to_py(self,
169                                                 c_topics[i].partitions,
170                                                 c_topics[i].partition_cnt);
171                 if (!partitions)
172                         goto err;
173 
174                 if (PyObject_SetAttrString(topic, "partitions",
175                                            partitions) == -1) {
176                         Py_DECREF(partitions);
177                         goto err;
178                 }
179 
180                 Py_DECREF(partitions);
181         }
182 
183         Py_DECREF(TopicMetadata_type);
184         return dict;
185 
186  err:
187         Py_DECREF(TopicMetadata_type);
188         Py_XDECREF(dict);
189         return NULL;
190 }
191 
192 
193 /**
194  * @returns a dict<broker_id, BrokerMetadata>, or NULL (and exception) on error.
195  */
c_brokers_to_py(Handle * self,const rd_kafka_metadata_broker_t * c_brokers,int broker_cnt)196 static PyObject *c_brokers_to_py (Handle *self,
197                                   const rd_kafka_metadata_broker_t *c_brokers,
198                                   int broker_cnt) {
199         PyObject *BrokerMetadata_type;
200         PyObject *dict;
201         int i;
202 
203         BrokerMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
204                                                   "BrokerMetadata");
205         if (!BrokerMetadata_type)
206                 return NULL;
207 
208         dict = PyDict_New();
209         if (!dict)
210                 goto err;
211 
212         for (i = 0 ; i < broker_cnt ; i++) {
213                 PyObject *broker;
214                 PyObject *key;
215 
216                 broker = PyObject_CallObject(BrokerMetadata_type, NULL);
217                 if (!broker)
218                         goto err;
219 
220                 key = cfl_PyInt_FromInt(c_brokers[i].id);
221 
222                 if (PyDict_SetItem(dict, key, broker) == -1) {
223                         Py_DECREF(key);
224                         Py_DECREF(broker);
225                         goto err;
226                 }
227 
228                 Py_DECREF(broker);
229 
230                 if (PyObject_SetAttrString(broker, "id", key) == -1) {
231                         Py_DECREF(key);
232                         goto err;
233                 }
234                 Py_DECREF(key);
235 
236                 if (cfl_PyObject_SetString(broker, "host",
237                                            c_brokers[i].host) == -1)
238                         goto err;
239                 if (cfl_PyObject_SetInt(broker, "port",
240                                         (int)c_brokers[i].port) == -1)
241                         goto err;
242         }
243 
244         Py_DECREF(BrokerMetadata_type);
245         return dict;
246 
247  err:
248         Py_DECREF(BrokerMetadata_type);
249         Py_XDECREF(dict);
250         return NULL;
251 }
252 
253 
254 /**
255  * @returns a ClusterMetadata object populated with all metadata information
256  *          from \p metadata, or NULL on error in which case an exception
257  *          has been raised.
258  */
259 static PyObject *
c_metadata_to_py(Handle * self,const rd_kafka_metadata_t * metadata)260 c_metadata_to_py (Handle *self, const rd_kafka_metadata_t *metadata) {
261         PyObject *ClusterMetadata_type;
262         PyObject *cluster = NULL, *brokers, *topics;
263 #if RD_KAFKA_VERSION >= 0x000b0500
264         char *cluster_id;
265 #endif
266 
267         ClusterMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
268                                                    "ClusterMetadata");
269         if (!ClusterMetadata_type)
270                 return NULL;
271 
272         cluster = PyObject_CallObject(ClusterMetadata_type, NULL);
273         Py_DECREF(ClusterMetadata_type);
274 
275         if (!cluster)
276                 return NULL;
277 
278 #if RD_KAFKA_VERSION >= 0x000b0500
279         if (cfl_PyObject_SetInt(
280                     cluster, "controller_id",
281                     (int)rd_kafka_controllerid(self->rk, 0)) == -1)
282                 goto err;
283 
284         if ((cluster_id = rd_kafka_clusterid(self->rk, 0))) {
285                 if (cfl_PyObject_SetString(cluster, "cluster_id",
286                                            cluster_id) == -1) {
287                         free(cluster_id);
288                         goto err;
289                 }
290 
291                 free(cluster_id);
292         }
293 #endif
294 
295         if (cfl_PyObject_SetInt(cluster, "orig_broker_id",
296                                 (int)metadata->orig_broker_id) == -1)
297                 goto err;
298 
299         if (metadata->orig_broker_name &&
300             cfl_PyObject_SetString(cluster, "orig_broker_name",
301                                    metadata->orig_broker_name) == -1)
302                 goto err;
303 
304 
305 
306         /* Create and set 'brokers' dict */
307         brokers = c_brokers_to_py(self,
308                                   metadata->brokers,
309                                   metadata->broker_cnt);
310         if (!brokers)
311                 goto err;
312 
313         if (PyObject_SetAttrString(cluster, "brokers", brokers) == -1) {
314                 Py_DECREF(brokers);
315                 goto err;
316         }
317         Py_DECREF(brokers);
318 
319         /* Create and set 'topics' dict */
320         topics = c_topics_to_py(self, metadata->topics, metadata->topic_cnt);
321         if (!topics)
322                 goto err;
323 
324         if (PyObject_SetAttrString(cluster, "topics", topics) == -1) {
325                 Py_DECREF(topics);
326                 goto err;
327         }
328         Py_DECREF(topics);
329 
330         return cluster;
331 
332  err:
333         Py_XDECREF(cluster);
334         return NULL;
335 }
336 
337 
338 PyObject *
list_topics(Handle * self,PyObject * args,PyObject * kwargs)339 list_topics (Handle *self, PyObject *args, PyObject *kwargs) {
340         CallState cs;
341         PyObject *result = NULL;
342         rd_kafka_resp_err_t err;
343         const rd_kafka_metadata_t *metadata = NULL;
344         rd_kafka_topic_t *only_rkt = NULL;
345         const char *topic = NULL;
346         double timeout = -1.0f;
347         static char *kws[] = {"topic", "timeout", NULL};
348 
349         if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|zd", kws,
350                                          &topic, &timeout))
351                 return NULL;
352 
353         if (topic != NULL) {
354                 if (!(only_rkt = rd_kafka_topic_new(self->rk,
355                                                     topic, NULL))) {
356                         return PyErr_Format(
357                                 PyExc_RuntimeError,
358                                 "Unable to create topic object "
359                                 "for \"%s\": %s", topic,
360                                 rd_kafka_err2str(rd_kafka_last_error()));
361                 }
362         }
363 
364         CallState_begin(self, &cs);
365 
366         err = rd_kafka_metadata(self->rk, !only_rkt, only_rkt, &metadata,
367                                 timeout >= 0 ? (int)(timeout * 1000.0f) : -1);
368 
369         if (!CallState_end(self, &cs)) {
370                 /* Exception raised */
371                 goto end;
372         }
373 
374         if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
375                 cfl_PyErr_Format(err,
376                                  "Failed to get metadata: %s",
377                                  rd_kafka_err2str(err));
378 
379                 goto end;
380         }
381 
382         result = c_metadata_to_py(self, metadata);
383 
384  end:
385         if (metadata != NULL) {
386                 rd_kafka_metadata_destroy(metadata);
387         }
388 
389         if (only_rkt != NULL) {
390                 rd_kafka_topic_destroy(only_rkt);
391         }
392 
393         return result;
394 }
395 
396 const char list_topics_doc[] = PyDoc_STR(
397         ".. py:function:: list_topics([topic=None], [timeout=-1])\n"
398         "\n"
399         " Request Metadata from cluster.\n"
400         " This method provides the same information as "
401         " listTopics(), describeTopics() and describeCluster() in "
402         " the Java Admin client.\n"
403         "\n"
404         " :param str topic: If specified, only request info about this topic, else return for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified it will be created.\n"
405         " :param float timeout: Maximum response time before timing out, or -1 for infinite timeout.\n"
406         " :rtype: ClusterMetadata \n"
407         " :raises: KafkaException \n");
408