1 /**
2 * Copyright 2018 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "confluent_kafka.h"
18
19
20 /**
21 * @name Cluster and topic metadata retrieval
22 *
23 */
24
25
26 /**
27 * @returns a dict<partition_id, PartitionMetadata>,
28 * or NULL (and exception) on error.
29 */
30 static PyObject *
c_partitions_to_py(Handle * self,const rd_kafka_metadata_partition_t * c_partitions,int partition_cnt)31 c_partitions_to_py (Handle *self,
32 const rd_kafka_metadata_partition_t *c_partitions,
33 int partition_cnt) {
34 PyObject *PartitionMetadata_type;
35 PyObject *dict;
36 int i;
37
38 PartitionMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
39 "PartitionMetadata");
40 if (!PartitionMetadata_type)
41 return NULL;
42
43 dict = PyDict_New();
44 if (!dict)
45 goto err;
46
47 for (i = 0 ; i < partition_cnt ; i++) {
48 PyObject *partition, *key;
49 PyObject *error, *replicas, *isrs;
50
51 partition = PyObject_CallObject(PartitionMetadata_type, NULL);
52 if (!partition)
53 goto err;
54
55 key = cfl_PyInt_FromInt(c_partitions[i].id);
56
57 if (PyDict_SetItem(dict, key, partition) == -1) {
58 Py_DECREF(key);
59 Py_DECREF(partition);
60 goto err;
61 }
62
63 Py_DECREF(key);
64 Py_DECREF(partition);
65
66 if (cfl_PyObject_SetInt(partition, "id",
67 (int)c_partitions[i].id) == -1)
68 goto err;
69 if (cfl_PyObject_SetInt(partition, "leader",
70 (int)c_partitions[i].leader) == -1)
71 goto err;
72
73 error = KafkaError_new_or_None(c_partitions[i].err, NULL);
74
75 if (PyObject_SetAttrString(partition, "error", error) == -1) {
76 Py_DECREF(error);
77 goto err;
78 }
79
80 Py_DECREF(error);
81
82 /* replicas */
83 replicas = cfl_int32_array_to_py_list(
84 c_partitions[i].replicas,
85 (size_t)c_partitions[i].replica_cnt);
86 if (!replicas)
87 goto err;
88
89 if (PyObject_SetAttrString(partition, "replicas",
90 replicas) == -1) {
91 Py_DECREF(replicas);
92 goto err;
93 }
94 Py_DECREF(replicas);
95
96 /* isrs */
97 isrs = cfl_int32_array_to_py_list(
98 c_partitions[i].isrs, (size_t)c_partitions[i].isr_cnt);
99 if (!isrs)
100 goto err;
101
102 if (PyObject_SetAttrString(partition, "isrs", isrs) == -1) {
103 Py_DECREF(isrs);
104 goto err;
105 }
106 Py_DECREF(isrs);
107 }
108
109 Py_DECREF(PartitionMetadata_type);
110 return dict;
111
112 err:
113 Py_DECREF(PartitionMetadata_type);
114 Py_XDECREF(dict);
115 return NULL;
116 }
117
118
119 /**
120 * @returns a dict<topic, TopicMetadata>, or NULL (and exception) on error.
121 */
122 static PyObject *
c_topics_to_py(Handle * self,const rd_kafka_metadata_topic_t * c_topics,int topic_cnt)123 c_topics_to_py (Handle *self, const rd_kafka_metadata_topic_t *c_topics,
124 int topic_cnt) {
125 PyObject *TopicMetadata_type;
126 PyObject *dict;
127 int i;
128
129 TopicMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
130 "TopicMetadata");
131 if (!TopicMetadata_type)
132 return NULL;
133
134 dict = PyDict_New();
135 if (!dict)
136 goto err;
137
138 for (i = 0 ; i < topic_cnt ; i++) {
139 PyObject *topic;
140 PyObject *error, *partitions;
141
142 topic = PyObject_CallObject(TopicMetadata_type, NULL);
143 if (!topic)
144 goto err;
145
146 if (PyDict_SetItemString(dict, c_topics[i].topic,
147 topic) == -1) {
148 Py_DECREF(topic);
149 goto err;
150 }
151
152 Py_DECREF(topic);
153
154 if (cfl_PyObject_SetString(topic, "topic",
155 c_topics[i].topic) == -1)
156 goto err;
157
158 error = KafkaError_new_or_None(c_topics[i].err, NULL);
159
160 if (PyObject_SetAttrString(topic, "error", error) == -1) {
161 Py_DECREF(error);
162 goto err;
163 }
164
165 Py_DECREF(error);
166
167 /* partitions dict */
168 partitions = c_partitions_to_py(self,
169 c_topics[i].partitions,
170 c_topics[i].partition_cnt);
171 if (!partitions)
172 goto err;
173
174 if (PyObject_SetAttrString(topic, "partitions",
175 partitions) == -1) {
176 Py_DECREF(partitions);
177 goto err;
178 }
179
180 Py_DECREF(partitions);
181 }
182
183 Py_DECREF(TopicMetadata_type);
184 return dict;
185
186 err:
187 Py_DECREF(TopicMetadata_type);
188 Py_XDECREF(dict);
189 return NULL;
190 }
191
192
193 /**
194 * @returns a dict<broker_id, BrokerMetadata>, or NULL (and exception) on error.
195 */
c_brokers_to_py(Handle * self,const rd_kafka_metadata_broker_t * c_brokers,int broker_cnt)196 static PyObject *c_brokers_to_py (Handle *self,
197 const rd_kafka_metadata_broker_t *c_brokers,
198 int broker_cnt) {
199 PyObject *BrokerMetadata_type;
200 PyObject *dict;
201 int i;
202
203 BrokerMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
204 "BrokerMetadata");
205 if (!BrokerMetadata_type)
206 return NULL;
207
208 dict = PyDict_New();
209 if (!dict)
210 goto err;
211
212 for (i = 0 ; i < broker_cnt ; i++) {
213 PyObject *broker;
214 PyObject *key;
215
216 broker = PyObject_CallObject(BrokerMetadata_type, NULL);
217 if (!broker)
218 goto err;
219
220 key = cfl_PyInt_FromInt(c_brokers[i].id);
221
222 if (PyDict_SetItem(dict, key, broker) == -1) {
223 Py_DECREF(key);
224 Py_DECREF(broker);
225 goto err;
226 }
227
228 Py_DECREF(broker);
229
230 if (PyObject_SetAttrString(broker, "id", key) == -1) {
231 Py_DECREF(key);
232 goto err;
233 }
234 Py_DECREF(key);
235
236 if (cfl_PyObject_SetString(broker, "host",
237 c_brokers[i].host) == -1)
238 goto err;
239 if (cfl_PyObject_SetInt(broker, "port",
240 (int)c_brokers[i].port) == -1)
241 goto err;
242 }
243
244 Py_DECREF(BrokerMetadata_type);
245 return dict;
246
247 err:
248 Py_DECREF(BrokerMetadata_type);
249 Py_XDECREF(dict);
250 return NULL;
251 }
252
253
254 /**
255 * @returns a ClusterMetadata object populated with all metadata information
256 * from \p metadata, or NULL on error in which case an exception
257 * has been raised.
258 */
259 static PyObject *
c_metadata_to_py(Handle * self,const rd_kafka_metadata_t * metadata)260 c_metadata_to_py (Handle *self, const rd_kafka_metadata_t *metadata) {
261 PyObject *ClusterMetadata_type;
262 PyObject *cluster = NULL, *brokers, *topics;
263 #if RD_KAFKA_VERSION >= 0x000b0500
264 char *cluster_id;
265 #endif
266
267 ClusterMetadata_type = cfl_PyObject_lookup("confluent_kafka.admin",
268 "ClusterMetadata");
269 if (!ClusterMetadata_type)
270 return NULL;
271
272 cluster = PyObject_CallObject(ClusterMetadata_type, NULL);
273 Py_DECREF(ClusterMetadata_type);
274
275 if (!cluster)
276 return NULL;
277
278 #if RD_KAFKA_VERSION >= 0x000b0500
279 if (cfl_PyObject_SetInt(
280 cluster, "controller_id",
281 (int)rd_kafka_controllerid(self->rk, 0)) == -1)
282 goto err;
283
284 if ((cluster_id = rd_kafka_clusterid(self->rk, 0))) {
285 if (cfl_PyObject_SetString(cluster, "cluster_id",
286 cluster_id) == -1) {
287 free(cluster_id);
288 goto err;
289 }
290
291 free(cluster_id);
292 }
293 #endif
294
295 if (cfl_PyObject_SetInt(cluster, "orig_broker_id",
296 (int)metadata->orig_broker_id) == -1)
297 goto err;
298
299 if (metadata->orig_broker_name &&
300 cfl_PyObject_SetString(cluster, "orig_broker_name",
301 metadata->orig_broker_name) == -1)
302 goto err;
303
304
305
306 /* Create and set 'brokers' dict */
307 brokers = c_brokers_to_py(self,
308 metadata->brokers,
309 metadata->broker_cnt);
310 if (!brokers)
311 goto err;
312
313 if (PyObject_SetAttrString(cluster, "brokers", brokers) == -1) {
314 Py_DECREF(brokers);
315 goto err;
316 }
317 Py_DECREF(brokers);
318
319 /* Create and set 'topics' dict */
320 topics = c_topics_to_py(self, metadata->topics, metadata->topic_cnt);
321 if (!topics)
322 goto err;
323
324 if (PyObject_SetAttrString(cluster, "topics", topics) == -1) {
325 Py_DECREF(topics);
326 goto err;
327 }
328 Py_DECREF(topics);
329
330 return cluster;
331
332 err:
333 Py_XDECREF(cluster);
334 return NULL;
335 }
336
337
338 PyObject *
list_topics(Handle * self,PyObject * args,PyObject * kwargs)339 list_topics (Handle *self, PyObject *args, PyObject *kwargs) {
340 CallState cs;
341 PyObject *result = NULL;
342 rd_kafka_resp_err_t err;
343 const rd_kafka_metadata_t *metadata = NULL;
344 rd_kafka_topic_t *only_rkt = NULL;
345 const char *topic = NULL;
346 double timeout = -1.0f;
347 static char *kws[] = {"topic", "timeout", NULL};
348
349 if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|zd", kws,
350 &topic, &timeout))
351 return NULL;
352
353 if (topic != NULL) {
354 if (!(only_rkt = rd_kafka_topic_new(self->rk,
355 topic, NULL))) {
356 return PyErr_Format(
357 PyExc_RuntimeError,
358 "Unable to create topic object "
359 "for \"%s\": %s", topic,
360 rd_kafka_err2str(rd_kafka_last_error()));
361 }
362 }
363
364 CallState_begin(self, &cs);
365
366 err = rd_kafka_metadata(self->rk, !only_rkt, only_rkt, &metadata,
367 timeout >= 0 ? (int)(timeout * 1000.0f) : -1);
368
369 if (!CallState_end(self, &cs)) {
370 /* Exception raised */
371 goto end;
372 }
373
374 if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
375 cfl_PyErr_Format(err,
376 "Failed to get metadata: %s",
377 rd_kafka_err2str(err));
378
379 goto end;
380 }
381
382 result = c_metadata_to_py(self, metadata);
383
384 end:
385 if (metadata != NULL) {
386 rd_kafka_metadata_destroy(metadata);
387 }
388
389 if (only_rkt != NULL) {
390 rd_kafka_topic_destroy(only_rkt);
391 }
392
393 return result;
394 }
395
396 const char list_topics_doc[] = PyDoc_STR(
397 ".. py:function:: list_topics([topic=None], [timeout=-1])\n"
398 "\n"
399 " Request Metadata from cluster.\n"
400 " This method provides the same information as "
401 " listTopics(), describeTopics() and describeCluster() in "
402 " the Java Admin client.\n"
403 "\n"
404 " :param str topic: If specified, only request info about this topic, else return for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified it will be created.\n"
405 " :param float timeout: Maximum response time before timing out, or -1 for infinite timeout.\n"
406 " :rtype: ClusterMetadata \n"
407 " :raises: KafkaException \n");
408