1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2018 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "rdkafka_int.h"
30 #include "rdkafka_admin.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_aux.h"
33 
34 #include <stdarg.h>
35 
36 
37 
38 /** @brief Descriptive strings for rko_u.admin_request.state */
39 static const char *rd_kafka_admin_state_desc[] = {
40         "initializing",
41         "waiting for broker",
pg_bswap16(uint16 x)42         "waiting for controller",
43         "waiting for fanouts",
44         "constructing request",
45         "waiting for response from broker",
46 };
47 
48 
49 
50 /**
51  * @brief Admin API implementation.
52  *
53  * The public Admin API in librdkafka exposes a completely asynchronous
54  * interface where the initial request API (e.g., ..CreateTopics())
55  * is non-blocking and returns immediately, and the application polls
56  * a ..queue_t for the result.
57  *
58  * The underlying handling of the request is also completely asynchronous
59  * inside librdkafka, for two reasons:
60  *  - everything is async in librdkafka so adding something new that isn't
61  *    would mean that existing functionality will need to be changed if
62  *    it should be able to work simultaneously (such as statistics, timers,
63  *    etc). There is no functional value to making the admin API
pg_bswap32(uint32 x)64  *    synchronous internally, even if it would simplify its implementation.
65  *    So making it async allows the Admin API to be used with existing
66  *    client types in existing applications without breakage.
67  *  - the async approach allows multiple outstanding Admin API requests
68  *    simultaneously.
69  *
70  * The internal async implementation relies on the following concepts:
71  *  - it uses a single rko (rd_kafka_op_t) to maintain state.
72  *  - the rko has a callback attached - called the worker callback.
73  *  - the worker callback is a small state machine that triggers
74  *    async operations (be it controller lookups, timeout timers,
75  *    protocol transmits, etc).
76  *  - the worker callback is only called on the rdkafka main thread.
77  *  - the callback is triggered by different events and sources by enqueuing
78  *    the rko on the rdkafka main ops queue.
79  *
80  *
81  * Let's illustrate this with a DeleteTopics example. This might look
82  * daunting, but it boils down to an asynchronous state machine being
83  * triggered by enqueuing the rko op.
84  *
85  *  1. [app thread] The user constructs the input arguments,
86  *     including a response rkqu queue and then calls DeleteTopics().
87  *
88  *  2. [app thread] DeleteTopics() creates a new internal op (rko) of type
89  *     RD_KAFKA_OP_DELETETOPICS, makes a **copy** on the rko of all the
90  *     input arguments (which allows the caller to free the originals
91  *     whenever she likes). The rko op worker callback is set to the
92  *     generic admin worker callback rd_kafka_admin_worker()
93  *
94  *  3. [app thread] DeleteTopics() enqueues the rko on librdkafka's main ops
95  *     queue that is served by the rdkafka main thread in rd_kafka_thread_main()
96  *
97  *  4. [rdkafka main thread] The rko is dequeued by rd_kafka_q_serve and
98  *     the rd_kafka_poll_cb() is called.
99  *
100  *  5. [rdkafka main thread] The rko_type switch case identifies the rko
101  *     as an RD_KAFKA_OP_DELETETOPICS which is served by the op callback
102  *     set in step 2.
103  *
104  *  6. [rdkafka main thread] The worker callback is called.
105  *     After some initial checking of err==ERR__DESTROY events
106  *     (which is used to clean up outstanding ops (etc) on termination),
107  *     the code hits a state machine using rko_u.admin.request_state.
108  *
109  *  7. [rdkafka main thread] The initial state is RD_KAFKA_ADMIN_STATE_INIT
110  *     where the worker validates the user input.
111  *     An enqueue once (eonce) object is created - the use of this object
112  *     allows having multiple outstanding async functions referencing the
113  *     same underlying rko object, but only allowing the first one
114  *     to trigger an event.
115  *     A timeout timer is set up to trigger the eonce object when the
116  *     full options.request_timeout has elapsed.
117  *
118  *  8. [rdkafka main thread] After initialization the state is updated
119  *     to WAIT_BROKER or WAIT_CONTROLLER and the code falls through to
120  *     looking up a specific broker or the controller broker and waiting for
121  *     an active connection.
122  *     Both the lookup and the waiting for an active connection are
123  *     fully asynchronous, and the same eonce used for the timer is passed
124  *     to the rd_kafka_broker_controller_async() or broker_async() functions
125  *     which will trigger the eonce when a broker state change occurs.
126  *     If the controller is already known (from metadata) and the connection
127  *     is up a rkb broker object is returned and the eonce is not used,
128  *     skip to step 11.
129  *
130  *  9. [rdkafka main thread] Upon metadata retrieval (which is triggered
131  *     automatically by other parts of the code) the controller_id may be
132  *     updated in which case the eonce is triggered.
133  *     The eonce triggering enqueues the original rko on the rdkafka main
134  *     ops queue again and we go to step 8 which will check if the controller
135  *     connection is up.
136  *
137  * 10. [broker thread] If the controller_id is now known we wait for
138  *     the corresponding broker's connection to come up. This signaling
139  *     is performed from the broker thread upon broker state changes
140  *     and uses the same eonce. The eonce triggering enqueues the original
141  *     rko on the rdkafka main ops queue again we go to back to step 8
142  *     to check if broker is now available.
143  *
144  * 11. [rdkafka main thread] Back in the worker callback we now have an
145  *     rkb broker pointer (with reference count increased) for the controller
146  *     with the connection up (it might go down while we're referencing it,
147  *     but that does not stop us from enqueuing a protocol request).
148  *
149  * 12. [rdkafka main thread] A DeleteTopics protocol request buffer is
150  *     constructed using the input parameters saved on the rko and the
151  *     buffer is enqueued on the broker's transmit queue.
152  *     The buffer is set up to provide the reply buffer on the rdkafka main
153  *     ops queue (the same queue we are operating from) with a handler
154  *     callback of rd_kafka_admin_handle_response().
155  *     The state is updated to the RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE.
156  *
157  * 13. [broker thread] If the request times out, a response with error code
158  *     (ERR__TIMED_OUT) is enqueued. Go to 16.
159  *
160  * 14. [broker thread] If a response is received, the response buffer
161  *     is enqueued. Go to 16.
162  *
163  * 15. [rdkafka main thread] The buffer callback (..handle_response())
164  *     is called, which attempts to extract the original rko from the eonce,
165  *     but if the eonce has already been triggered by some other source
166  *     (the timeout timer) the buffer callback simply returns and does nothing
167  *     since the admin request is over and a result (probably a timeout)
168  *     has been enqueued for the application.
169  *     If the rko was still intact we temporarily set the reply buffer
170  *     in the rko struct and call the worker callback. Go to 17.
171  *
172  * 16. [rdkafka main thread] The worker callback is called in state
173  *     RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE without a response but with an error.
174  *     An error result op is created and enqueued on the application's
175  *     provided response rkqu queue.
176  *
177  * 17. [rdkafka main thread] The worker callback is called in state
178  *     RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE with a response buffer with no
179  *     error set.
180  *     The worker calls the response `parse()` callback to parse the response
181  *     buffer and populates a result op (rko_result) with the response
182  *     information (such as per-topic error codes, etc).
183  *     The result op is returned to the worker.
184  *
185  * 18. [rdkafka main thread] The worker enqueues the result op (rko_result)
186  *     on the application's provided response rkqu queue.
187  *
188  * 19. [app thread] The application calls rd_kafka_queue_poll() to
189  *     receive the result of the operation. The result may have been
190  *     enqueued in step 18 thanks to succesful completion, or in any
191  *     of the earlier stages when an error was encountered.
192  *
193  * 20. [app thread] The application uses rd_kafka_event_DeleteTopics_result()
194  *     to retrieve the request-specific result type.
195  *
196  * 21. Done.
197  *
198  *
199  *
200  *
201  * Fanout (RD_KAFKA_OP_ADMIN_FANOUT) requests
202  * ------------------------------------------
203  *
204  * Certain Admin APIs may have requests that need to be sent to different
205  * brokers, for instance DeleteRecords which needs to be sent to the leader
206  * for each given partition.
207  *
208  * To achieve this we create a Fanout (RD_KAFKA_OP_ADMIN_FANOUT) op for the
209  * overall Admin API call (e.g., DeleteRecords), and then sub-ops for each
210  * of the per-broker requests. These sub-ops have the proper op type for
211  * the operation they are performing (e.g., RD_KAFKA_OP_DELETERECORDS)
212  * but their replyq does not point back to the application replyq but
213  * rk_ops which is handled by the librdkafka main thread and with the op
214  * callback set to rd_kafka_admin_fanout_worker(). This worker aggregates
215  * the results of each fanned out sub-op and merges the result into a
216  * single result op (RD_KAFKA_OP_ADMIN_RESULT) that is enqueued on the
217  * application's replyq.
218  *
219  * We rely on the timeouts on the fanned out sub-ops rather than the parent
220  * fanout op.
221  *
222  * The parent fanout op must not be destroyed until all fanned out sub-ops
223  * are done (either by success, failure or timeout) and destroyed, and this
224  * is tracked by the rko_u.admin_request.fanout.outstanding counter.
225  *
226  */
227 
228 
229 /**
230  * @enum Admin request target broker. Must be negative values since the field
231  *       used is broker_id.
232  */
233 enum {
234         RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1,  /**< Cluster controller */
235         RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */
236         RD_KAFKA_ADMIN_TARGET_FANOUT = -3,      /**< This rko is a fanout and
237                                                  *   and has no target broker */
238 };
239 
240 /**
241  * @brief Admin op callback types
242  */
243 typedef rd_kafka_resp_err_t (rd_kafka_admin_Request_cb_t) (
244         rd_kafka_broker_t *rkb,
245         const rd_list_t *configs /*(ConfigResource_t*)*/,
246         rd_kafka_AdminOptions_t *options,
247         char *errstr, size_t errstr_size,
248         rd_kafka_replyq_t replyq,
249         rd_kafka_resp_cb_t *resp_cb,
250         void *opaque)
251         RD_WARN_UNUSED_RESULT;
252 
253 typedef rd_kafka_resp_err_t (rd_kafka_admin_Response_parse_cb_t) (
254         rd_kafka_op_t *rko_req,
255         rd_kafka_op_t **rko_resultp,
256         rd_kafka_buf_t *reply,
257         char *errstr, size_t errstr_size)
258         RD_WARN_UNUSED_RESULT;
259 
260 typedef void (rd_kafka_admin_fanout_PartialResponse_cb_t) (
261         rd_kafka_op_t *rko_req,
262         const rd_kafka_op_t *rko_partial);
263 
264 typedef rd_list_copy_cb_t rd_kafka_admin_fanout_CopyResult_cb_t;
265 
266 /**
267  * @struct Request-specific worker callbacks.
268  */
269 struct rd_kafka_admin_worker_cbs {
270         /**< Protocol request callback which is called
271          *   to construct and send the request. */
272         rd_kafka_admin_Request_cb_t *request;
273 
274         /**< Protocol response parser callback which is called
275          *   to translate the response to a rko_result op. */
276         rd_kafka_admin_Response_parse_cb_t *parse;
277 };
278 
279 /**
280  * @struct Fanout request callbacks.
281  */
282 struct rd_kafka_admin_fanout_worker_cbs {
283         /** Merge results from a fanned out request into the user response. */
284         rd_kafka_admin_fanout_PartialResponse_cb_t *partial_response;
285 
286         /** Copy an accumulated result for storing into the rko_result. */
287         rd_kafka_admin_fanout_CopyResult_cb_t *copy_result;
288 };
289 
290 /* Forward declarations */
291 static void rd_kafka_admin_common_worker_destroy (rd_kafka_t *rk,
292                                                   rd_kafka_op_t *rko,
293                                                   rd_bool_t do_destroy);
294 static void rd_kafka_AdminOptions_init (rd_kafka_t *rk,
295                                         rd_kafka_AdminOptions_t *options);
296 static rd_kafka_op_res_t
297 rd_kafka_admin_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko);
298 static rd_kafka_ConfigEntry_t *
299 rd_kafka_ConfigEntry_copy (const rd_kafka_ConfigEntry_t *src);
300 static void rd_kafka_ConfigEntry_free (void *ptr);
301 static void *rd_kafka_ConfigEntry_list_copy (const void *src, void *opaque);
302 
303 static void rd_kafka_admin_handle_response (rd_kafka_t *rk,
304                                             rd_kafka_broker_t *rkb,
305                                             rd_kafka_resp_err_t err,
306                                             rd_kafka_buf_t *reply,
307                                             rd_kafka_buf_t *request,
308                                             void *opaque);
309 
310 static rd_kafka_op_res_t
311 rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq,
312                               rd_kafka_op_t *rko_fanout);
313 
314 
315 /**
316  * @name Common admin request code
317  * @{
318  *
319  *
320  */
321 
322 /**
323  * @brief Create a new admin_result op based on the request op \p rko_req
324  */
325 static rd_kafka_op_t *rd_kafka_admin_result_new (rd_kafka_op_t *rko_req) {
326         rd_kafka_op_t *rko_result;
327         rd_kafka_op_t *rko_fanout;
328 
329         if ((rko_fanout = rko_req->rko_u.admin_request.fanout_parent)) {
330                 /* If this is a fanned out request the rko_result needs to be
331                  * handled by the fanout worker rather than the application. */
332                 rko_result = rd_kafka_op_new_cb(
333                         rko_req->rko_rk,
334                         RD_KAFKA_OP_ADMIN_RESULT,
335                         rd_kafka_admin_fanout_worker);
336                 /* Transfer fanout pointer to result */
337                 rko_result->rko_u.admin_result.fanout_parent = rko_fanout;
338                 rko_req->rko_u.admin_request.fanout_parent = NULL;
339                 /* Set event type based on original fanout ops reqtype,
340                  * e.g., ..OP_DELETERECORDS */
341                 rko_result->rko_u.admin_result.reqtype =
342                         rko_fanout->rko_u.admin_request.fanout.reqtype;
343 
344         } else {
345                 rko_result = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_RESULT);
346 
347                 /* If this is fanout request (i.e., the parent OP_ADMIN_FANOUT
348                  * to fanned out requests) we need to use the original
349                  * application request type. */
350                 if (rko_req->rko_type == RD_KAFKA_OP_ADMIN_FANOUT)
351                         rko_result->rko_u.admin_result.reqtype =
352                                 rko_req->rko_u.admin_request.fanout.reqtype;
353                 else
354                         rko_result->rko_u.admin_result.reqtype =
355                                 rko_req->rko_type;
356         }
357 
358         rko_result->rko_rk = rko_req->rko_rk;
359 
360         rko_result->rko_u.admin_result.opaque =
361                 rd_kafka_confval_get_ptr(&rko_req->rko_u.admin_request.
362                                          options.opaque);
363 
364         rko_result->rko_evtype = rko_req->rko_u.admin_request.reply_event_type;
365 
366         return rko_result;
367 }
368 
369 
370 /**
371  * @brief Set error code and error string on admin_result op \p rko.
372  */
373 static void rd_kafka_admin_result_set_err0 (rd_kafka_op_t *rko,
374                                             rd_kafka_resp_err_t err,
375                                             const char *fmt, va_list ap) {
376         char buf[512];
377 
378         rd_vsnprintf(buf, sizeof(buf), fmt, ap);
379 
380         rko->rko_err = err;
381 
382         if (rko->rko_u.admin_result.errstr)
383                 rd_free(rko->rko_u.admin_result.errstr);
384         rko->rko_u.admin_result.errstr = rd_strdup(buf);
385 
386         rd_kafka_dbg(rko->rko_rk, ADMIN, "ADMINFAIL",
387                      "Admin %s result error: %s",
388                      rd_kafka_op2str(rko->rko_u.admin_result.reqtype),
389                      rko->rko_u.admin_result.errstr);
390 }
391 
392 /**
393  * @sa rd_kafka_admin_result_set_err0
394  */
395 static RD_UNUSED RD_FORMAT(printf, 3, 4)
396         void rd_kafka_admin_result_set_err (rd_kafka_op_t *rko,
397                                             rd_kafka_resp_err_t err,
398                                             const char *fmt, ...) {
399         va_list ap;
400 
401         va_start(ap, fmt);
402         rd_kafka_admin_result_set_err0(rko, err, fmt, ap);
403         va_end(ap);
404 }
405 
406 /**
407  * @brief Enqueue admin_result on application's queue.
408  */
409 static RD_INLINE
410 void rd_kafka_admin_result_enq (rd_kafka_op_t *rko_req,
411                                 rd_kafka_op_t *rko_result) {
412         rd_kafka_replyq_enq(&rko_req->rko_u.admin_request.replyq,
413                             rko_result,
414                             rko_req->rko_u.admin_request.replyq.version);
415 }
416 
417 /**
418  * @brief Set request-level error code and string in reply op.
419  *
420  * @remark This function will NOT destroy the \p rko_req, so don't forget to
421  *         call rd_kafka_admin_common_worker_destroy() when done with the rko.
422  */
423 static RD_FORMAT(printf, 3, 4)
424         void rd_kafka_admin_result_fail (rd_kafka_op_t *rko_req,
425                                          rd_kafka_resp_err_t err,
426                                          const char *fmt, ...) {
427         va_list ap;
428         rd_kafka_op_t *rko_result;
429 
430         if (!rko_req->rko_u.admin_request.replyq.q)
431                 return;
432 
433         rko_result = rd_kafka_admin_result_new(rko_req);
434 
435         va_start(ap, fmt);
436         rd_kafka_admin_result_set_err0(rko_result, err, fmt, ap);
437         va_end(ap);
438 
439         rd_kafka_admin_result_enq(rko_req, rko_result);
440 }
441 
442 
443 /**
444  * @brief Send the admin request contained in \p rko upon receiving
445  *        a FindCoordinator response.
446  *
447  * @param opaque Must be an admin request op's eonce (rko_u.admin_request.eonce)
448  *               (i.e. created by \c rd_kafka_admin_request_op_new )
449  *
450  * @remark To be used as a callback for \c rd_kafka_coord_req
451  */
452 static rd_kafka_resp_err_t
453 rd_kafka_admin_coord_request (rd_kafka_broker_t *rkb,
454                               rd_kafka_op_t *rko_ignore,
455                               rd_kafka_replyq_t replyq,
456                               rd_kafka_resp_cb_t *resp_cb,
457                               void *opaque) {
458         rd_kafka_t *rk = rkb->rkb_rk;
459         rd_kafka_enq_once_t *eonce = opaque;
460         rd_kafka_op_t *rko;
461         char errstr[512];
462         rd_kafka_resp_err_t err;
463 
464 
465         rko = rd_kafka_enq_once_del_source_return(eonce, "coordinator request");
466         if (!rko)
467                 /* Admin request has timed out and been destroyed */
468                 return RD_KAFKA_RESP_ERR__DESTROY;
469 
470         rd_kafka_enq_once_add_source(eonce, "coordinator response");
471 
472         err = rko->rko_u.admin_request.cbs->request(
473                 rkb,
474                 &rko->rko_u.admin_request.args,
475                 &rko->rko_u.admin_request.options,
476                 errstr, sizeof(errstr),
477                 replyq,
478                 rd_kafka_admin_handle_response,
479                 eonce);
480         if (err) {
481                 rd_kafka_enq_once_del_source(eonce, "coordinator response");
482                 rd_kafka_admin_result_fail(
483                         rko, err,
484                         "%s worker failed to send request: %s",
485                         rd_kafka_op2str(rko->rko_type), errstr);
486                 rd_kafka_admin_common_worker_destroy(rk, rko,
487                                                      rd_true/*destroy*/);
488         }
489         return err;
490 }
491 
492 
493 /**
494  * @brief Return the topics list from a topic-related result object.
495  */
496 static const rd_kafka_topic_result_t **
497 rd_kafka_admin_result_ret_topics (const rd_kafka_op_t *rko,
498                                   size_t *cntp) {
499         rd_kafka_op_type_t reqtype =
500                 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
501         rd_assert(reqtype == RD_KAFKA_OP_CREATETOPICS ||
502                   reqtype == RD_KAFKA_OP_DELETETOPICS ||
503                   reqtype == RD_KAFKA_OP_CREATEPARTITIONS);
504 
505         *cntp = rd_list_cnt(&rko->rko_u.admin_result.results);
506         return (const rd_kafka_topic_result_t **)rko->rko_u.admin_result.
507                 results.rl_elems;
508 }
509 
510 /**
511  * @brief Return the ConfigResource list from a config-related result object.
512  */
513 static const rd_kafka_ConfigResource_t **
514 rd_kafka_admin_result_ret_resources (const rd_kafka_op_t *rko,
515                                      size_t *cntp) {
516         rd_kafka_op_type_t reqtype =
517                 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
518         rd_assert(reqtype == RD_KAFKA_OP_ALTERCONFIGS ||
519                   reqtype == RD_KAFKA_OP_DESCRIBECONFIGS);
520 
521         *cntp = rd_list_cnt(&rko->rko_u.admin_result.results);
522         return (const rd_kafka_ConfigResource_t **)rko->rko_u.admin_result.
523                 results.rl_elems;
524 }
525 
526 
527 /**
528  * @brief Return the groups list from a group-related result object.
529  */
530 static const rd_kafka_group_result_t **
531 rd_kafka_admin_result_ret_groups (const rd_kafka_op_t *rko,
532                                   size_t *cntp) {
533         rd_kafka_op_type_t reqtype =
534                 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
535         rd_assert(reqtype == RD_KAFKA_OP_DELETEGROUPS ||
536                   reqtype == RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS);
537 
538         *cntp = rd_list_cnt(&rko->rko_u.admin_result.results);
539         return (const rd_kafka_group_result_t **)rko->rko_u.admin_result.
540                 results.rl_elems;
541 }
542 
543 /**
544  * @brief Create a new admin_request op of type \p optype and sets up the
545  *        generic (type independent files).
546  *
547  *        The caller shall then populate the admin_request.args list
548  *        and enqueue the op on rk_ops for further processing work.
549  *
550  * @param cbs Callbacks, must reside in .data segment.
551  * @param options Optional options, may be NULL to use defaults.
552  *
553  * @locks none
554  * @locality application thread
555  */
556 static rd_kafka_op_t *
557 rd_kafka_admin_request_op_new (rd_kafka_t *rk,
558                                rd_kafka_op_type_t optype,
559                                rd_kafka_event_type_t reply_event_type,
560                                const struct rd_kafka_admin_worker_cbs *cbs,
561                                const rd_kafka_AdminOptions_t *options,
562                                rd_kafka_q_t *rkq) {
563         rd_kafka_op_t *rko;
564 
565         rd_assert(rk);
566         rd_assert(rkq);
567         rd_assert(cbs);
568 
569         rko = rd_kafka_op_new_cb(rk, optype, rd_kafka_admin_worker);
570 
571         rko->rko_u.admin_request.reply_event_type = reply_event_type;
572 
573         rko->rko_u.admin_request.cbs = (struct rd_kafka_admin_worker_cbs *)cbs;
574 
575         /* Make a copy of the options */
576         if (options)
577                 rko->rko_u.admin_request.options = *options;
578         else
579                 rd_kafka_AdminOptions_init(rk,
580                                            &rko->rko_u.admin_request.options);
581 
582         /* Default to controller */
583         rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER;
584 
585         /* Calculate absolute timeout */
586         rko->rko_u.admin_request.abs_timeout =
587                 rd_timeout_init(
588                         rd_kafka_confval_get_int(&rko->rko_u.admin_request.
589                                                  options.request_timeout));
590 
591         /* Setup enq-op-once, which is triggered by either timer code
592          * or future wait-controller code. */
593         rko->rko_u.admin_request.eonce =
594                 rd_kafka_enq_once_new(rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
595 
596         /* The timer itself must be started from the rdkafka main thread,
597          * not here. */
598 
599         /* Set up replyq */
600         rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0);
601 
602         rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_INIT;
603         return rko;
604 }
605 
606 
607 /**
608  * @returns the remaining request timeout in milliseconds.
609  */
610 static RD_INLINE int rd_kafka_admin_timeout_remains (rd_kafka_op_t *rko) {
611         return rd_timeout_remains(rko->rko_u.admin_request.abs_timeout);
612 }
613 
614 /**
615  * @returns the remaining request timeout in microseconds.
616  */
617 static RD_INLINE rd_ts_t
618 rd_kafka_admin_timeout_remains_us (rd_kafka_op_t *rko) {
619         return rd_timeout_remains_us(rko->rko_u.admin_request.abs_timeout);
620 }
621 
622 
623 /**
624  * @brief Timer timeout callback for the admin rko's eonce object.
625  */
626 static void rd_kafka_admin_eonce_timeout_cb (rd_kafka_timers_t *rkts,
627                                              void *arg) {
628         rd_kafka_enq_once_t *eonce = arg;
629 
630         rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT,
631                                   "timeout timer");
632 }
633 
634 
635 
636 /**
637  * @brief Common worker destroy to be called in destroy: label
638  *        in worker.
639  */
640 static void rd_kafka_admin_common_worker_destroy (rd_kafka_t *rk,
641                                                   rd_kafka_op_t *rko,
642                                                   rd_bool_t do_destroy) {
643         int timer_was_stopped;
644 
645         /* Free resources for this op. */
646         timer_was_stopped =
647                 rd_kafka_timer_stop(&rk->rk_timers,
648                                     &rko->rko_u.admin_request.tmr, rd_true);
649 
650 
651         if (rko->rko_u.admin_request.eonce) {
652                 /* Remove the stopped timer's eonce reference since its
653                  * callback will not have fired if we stopped the timer. */
654                 if (timer_was_stopped)
655                         rd_kafka_enq_once_del_source(rko->rko_u.admin_request.
656                                                      eonce, "timeout timer");
657 
658                 /* This is thread-safe to do even if there are outstanding
659                  * timers or wait-controller references to the eonce
660                  * since they only hold direct reference to the eonce,
661                  * not the rko (the eonce holds a reference to the rko but
662                  * it is cleared here). */
663                 rd_kafka_enq_once_destroy(rko->rko_u.admin_request.eonce);
664                 rko->rko_u.admin_request.eonce = NULL;
665         }
666 
667         if (do_destroy)
668                 rd_kafka_op_destroy(rko);
669 }
670 
671 
672 
673 /**
674  * @brief Asynchronously look up a broker.
675  *        To be called repeatedly from each invocation of the worker
676  *        when in state RD_KAFKA_ADMIN_STATE_WAIT_BROKER until
677  *        a valid rkb is returned.
678  *
679  * @returns the broker rkb with refcount increased, or NULL if not yet
680  *          available.
681  */
682 static rd_kafka_broker_t *
683 rd_kafka_admin_common_get_broker (rd_kafka_t *rk,
684                                   rd_kafka_op_t *rko,
685                                   int32_t broker_id) {
686         rd_kafka_broker_t *rkb;
687 
688         rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up broker %"PRId32,
689                      rd_kafka_op2str(rko->rko_type), broker_id);
690 
691         /* Since we're iterating over this broker_async() call
692          * (asynchronously) until a broker is availabe (or timeout)
693          * we need to re-enable the eonce to be triggered again (which
694          * is not necessary the first time we get here, but there
695          * is no harm doing it then either). */
696         rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce,
697                                    rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
698 
699         /* Look up the broker asynchronously, if the broker
700          * is not available the eonce is registered for broker
701          * state changes which will cause our function to be called
702          * again as soon as (any) broker state changes.
703          * When we are called again we perform the broker lookup
704          * again and hopefully get an rkb back, otherwise defer a new
705          * async wait. Repeat until success or timeout. */
706         if (!(rkb = rd_kafka_broker_get_async(
707                       rk, broker_id, RD_KAFKA_BROKER_STATE_UP,
708                       rko->rko_u.admin_request.eonce))) {
709                 /* Broker not available, wait asynchronously
710                  * for broker metadata code to trigger eonce. */
711                 return NULL;
712         }
713 
714         rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: broker %"PRId32" is %s",
715                      rd_kafka_op2str(rko->rko_type), broker_id, rkb->rkb_name);
716 
717         return rkb;
718 }
719 
720 
721 /**
722  * @brief Asynchronously look up the controller.
723  *        To be called repeatedly from each invocation of the worker
724  *        when in state RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER until
725  *        a valid rkb is returned.
726  *
727  * @returns the controller rkb with refcount increased, or NULL if not yet
728  *          available.
729  */
730 static rd_kafka_broker_t *
731 rd_kafka_admin_common_get_controller (rd_kafka_t *rk,
732                                       rd_kafka_op_t *rko) {
733         rd_kafka_broker_t *rkb;
734 
735         rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up controller",
736                      rd_kafka_op2str(rko->rko_type));
737 
738         /* Since we're iterating over this controller_async() call
739          * (asynchronously) until a controller is availabe (or timeout)
740          * we need to re-enable the eonce to be triggered again (which
741          * is not necessary the first time we get here, but there
742          * is no harm doing it then either). */
743         rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce,
744                                    rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
745 
746         /* Look up the controller asynchronously, if the controller
747          * is not available the eonce is registered for broker
748          * state changes which will cause our function to be called
749          * again as soon as (any) broker state changes.
750          * When we are called again we perform the controller lookup
751          * again and hopefully get an rkb back, otherwise defer a new
752          * async wait. Repeat until success or timeout. */
753         if (!(rkb = rd_kafka_broker_controller_async(
754                       rk, RD_KAFKA_BROKER_STATE_UP,
755                       rko->rko_u.admin_request.eonce))) {
756                 /* Controller not available, wait asynchronously
757                  * for controller code to trigger eonce. */
758                 return NULL;
759         }
760 
761         rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: controller %s",
762                      rd_kafka_op2str(rko->rko_type), rkb->rkb_name);
763 
764         return rkb;
765 }
766 
767 
768 
769 /**
770  * @brief Handle response from broker by triggering worker callback.
771  *
772  * @param opaque is the eonce from the worker protocol request call.
773  */
774 static void rd_kafka_admin_handle_response (rd_kafka_t *rk,
775                                             rd_kafka_broker_t *rkb,
776                                             rd_kafka_resp_err_t err,
777                                             rd_kafka_buf_t *reply,
778                                             rd_kafka_buf_t *request,
779                                             void *opaque) {
780         rd_kafka_enq_once_t *eonce = opaque;
781         rd_kafka_op_t *rko;
782 
783         /* From ...add_source("send") */
784         rko = rd_kafka_enq_once_disable(eonce);
785 
786         if (!rko) {
787                 /* The operation timed out and the worker was
788                  * dismantled while we were waiting for broker response,
789                  * do nothing - everything has been cleaned up. */
790                 rd_kafka_dbg(rk, ADMIN, "ADMIN",
791                              "Dropping outdated %sResponse with return code %s",
792                              request ?
793                              rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey):
794                              "???",
795                              rd_kafka_err2str(err));
796                 return;
797         }
798 
799         /* Attach reply buffer to rko for parsing in the worker. */
800         rd_assert(!rko->rko_u.admin_request.reply_buf);
801         rko->rko_u.admin_request.reply_buf = reply;
802         rko->rko_err = err;
803 
804         if (rko->rko_op_cb(rk, NULL, rko) == RD_KAFKA_OP_RES_HANDLED)
805                 rd_kafka_op_destroy(rko);
806 
807 }
808 
809 /**
810  * @brief Generic handler for protocol responses, calls the admin ops'
811  *        Response_parse_cb and enqueues the result to the caller's queue.
812  */
813 static void rd_kafka_admin_response_parse (rd_kafka_op_t *rko) {
814         rd_kafka_resp_err_t err;
815         rd_kafka_op_t *rko_result = NULL;
816         char errstr[512];
817 
818         if (rko->rko_err) {
819                 rd_kafka_admin_result_fail(
820                         rko, rko->rko_err,
821                         "%s worker request failed: %s",
822                         rd_kafka_op2str(rko->rko_type),
823                         rd_kafka_err2str(rko->rko_err));
824                 return;
825         }
826 
827         /* Response received.
828          * Let callback parse response and provide result in rko_result
829          * which is then enqueued on the reply queue. */
830         err = rko->rko_u.admin_request.cbs->parse(
831                 rko, &rko_result,
832                 rko->rko_u.admin_request.reply_buf,
833                 errstr, sizeof(errstr));
834         if (err) {
835                 rd_kafka_admin_result_fail(
836                         rko, err,
837                         "%s worker failed to parse response: %s",
838                         rd_kafka_op2str(rko->rko_type), errstr);
839                 return;
840         }
841 
842         rd_assert(rko_result);
843 
844         /* Enqueue result on application queue, we're done. */
845         rd_kafka_admin_result_enq(rko, rko_result);
846 }
847 
848 /**
849  * @brief Generic handler for coord_req() responses.
850  */
851 static void
852 rd_kafka_admin_coord_response_parse (rd_kafka_t *rk,
853                                      rd_kafka_broker_t *rkb,
854                                      rd_kafka_resp_err_t err,
855                                      rd_kafka_buf_t *rkbuf,
856                                      rd_kafka_buf_t *request,
857                                      void *opaque) {
858         rd_kafka_op_t *rko_result;
859         rd_kafka_enq_once_t *eonce = opaque;
860         rd_kafka_op_t *rko;
861         char errstr[512];
862 
863         rko = rd_kafka_enq_once_del_source_return(eonce,
864                                                   "coordinator response");
865         if (!rko)
866                 /* Admin request has timed out and been destroyed */
867                 return;
868 
869         if (err) {
870                 rd_kafka_admin_result_fail(
871                         rko, err,
872                         "%s worker coordinator request failed: %s",
873                         rd_kafka_op2str(rko->rko_type),
874                         rd_kafka_err2str(err));
875                 rd_kafka_admin_common_worker_destroy(rk, rko,
876                                                      rd_true/*destroy*/);
877                 return;
878         }
879 
880         err = rko->rko_u.admin_request.cbs->parse(
881                 rko, &rko_result, rkbuf,
882                 errstr, sizeof(errstr));
883         if (err) {
884                 rd_kafka_admin_result_fail(
885                         rko, err,
886                         "%s worker failed to parse coordinator %sResponse: %s",
887                         rd_kafka_op2str(rko->rko_type),
888                         rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
889                         errstr);
890                 rd_kafka_admin_common_worker_destroy(rk, rko,
891                                                      rd_true/*destroy*/);
892                 return;
893         }
894 
895         rd_assert(rko_result);
896 
897         /* Enqueue result on application queue, we're done. */
898         rd_kafka_admin_result_enq(rko, rko_result);
899 }
900 
901 
902 
903 /**
904  * @brief Common worker state machine handling regardless of request type.
905  *
906  * Tasks:
907  *  - Sets up timeout on first call.
908  *  - Checks for timeout.
909  *  - Checks for and fails on errors.
910  *  - Async Controller and broker lookups
911  *  - Calls the Request callback
912  *  - Calls the parse callback
913  *  - Result reply
914  *  - Destruction of rko
915  *
916  * rko->rko_err may be one of:
917  * RD_KAFKA_RESP_ERR_NO_ERROR, or
918  * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup, or
919  * RD_KAFKA_RESP_ERR__TIMED_OUT if request has timed out,
920  * or any other error code triggered by other parts of the code.
921  *
922  * @returns a hint to the op code whether the rko should be destroyed or not.
923  */
924 static rd_kafka_op_res_t
925 rd_kafka_admin_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
926         const char *name = rd_kafka_op2str(rko->rko_type);
927         rd_ts_t timeout_in;
928         rd_kafka_broker_t *rkb = NULL;
929         rd_kafka_resp_err_t err;
930         char errstr[512];
931 
932         /* ADMIN_FANOUT handled by fanout_worker() */
933         rd_assert((rko->rko_type & ~ RD_KAFKA_OP_FLAGMASK) !=
934                   RD_KAFKA_OP_ADMIN_FANOUT);
935 
936         if (rd_kafka_terminating(rk)) {
937                 rd_kafka_dbg(rk, ADMIN, name,
938                              "%s worker called in state %s: "
939                              "handle is terminating: %s",
940                              name,
941                              rd_kafka_admin_state_desc[rko->rko_u.
942                                                        admin_request.state],
943                              rd_kafka_err2str(rko->rko_err));
944                 rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY,
945                                            "Handle is terminating: %s",
946                                            rd_kafka_err2str(rko->rko_err));
947                 goto destroy;
948         }
949 
950         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) {
951                 rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY,
952                                            "Destroyed");
953                 goto destroy; /* rko being destroyed (silent) */
954         }
955 
956         rd_kafka_dbg(rk, ADMIN, name,
957                      "%s worker called in state %s: %s",
958                      name,
959                      rd_kafka_admin_state_desc[rko->rko_u.admin_request.state],
960                      rd_kafka_err2str(rko->rko_err));
961 
962         rd_assert(thrd_is_current(rko->rko_rk->rk_thread));
963 
964         /* Check for errors raised asynchronously (e.g., by timer) */
965         if (rko->rko_err) {
966                 rd_kafka_admin_result_fail(
967                         rko, rko->rko_err,
968                         "Failed while %s: %s",
969                         rd_kafka_admin_state_desc[rko->rko_u.
970                                                   admin_request.state],
971                         rd_kafka_err2str(rko->rko_err));
972                 goto destroy;
973         }
974 
975         /* Check for timeout */
976         timeout_in = rd_kafka_admin_timeout_remains_us(rko);
977         if (timeout_in <= 0) {
978                 rd_kafka_admin_result_fail(
979                         rko, RD_KAFKA_RESP_ERR__TIMED_OUT,
980                         "Timed out %s",
981                         rd_kafka_admin_state_desc[rko->rko_u.
982                                                   admin_request.state]);
983                 goto destroy;
984         }
985 
986  redo:
987         switch (rko->rko_u.admin_request.state)
988         {
989         case RD_KAFKA_ADMIN_STATE_INIT:
990         {
991                 int32_t broker_id;
992 
993                 /* First call. */
994 
995                 /* Set up timeout timer. */
996                 rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce,
997                                              "timeout timer");
998                 rd_kafka_timer_start_oneshot(&rk->rk_timers,
999                                              &rko->rko_u.admin_request.tmr,
1000                                              rd_true, timeout_in,
1001                                              rd_kafka_admin_eonce_timeout_cb,
1002                                              rko->rko_u.admin_request.eonce);
1003 
1004                 /* Use explicitly specified broker_id, if available. */
1005                 broker_id = (int32_t)rd_kafka_confval_get_int(
1006                         &rko->rko_u.admin_request.options.broker);
1007 
1008                 if (broker_id != -1) {
1009                         rd_kafka_dbg(rk, ADMIN, name,
1010                                      "%s using explicitly "
1011                                      "set broker id %"PRId32
1012                                      " rather than %"PRId32,
1013                                      name, broker_id,
1014                                      rko->rko_u.admin_request.broker_id);
1015                         rko->rko_u.admin_request.broker_id = broker_id;
1016                 } else {
1017                         /* Default to controller */
1018                         broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER;
1019                 }
1020 
1021                 /* Resolve target broker(s) */
1022                 switch (rko->rko_u.admin_request.broker_id)
1023                 {
1024                 case RD_KAFKA_ADMIN_TARGET_CONTROLLER:
1025                         /* Controller */
1026                         rko->rko_u.admin_request.state =
1027                                 RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER;
1028                         goto redo;  /* Trigger next state immediately */
1029 
1030                 case RD_KAFKA_ADMIN_TARGET_COORDINATOR:
1031                         /* Group (or other) coordinator */
1032                         rko->rko_u.admin_request.state =
1033                                 RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE;
1034                         rd_kafka_enq_once_add_source(rko->rko_u.admin_request.
1035                                                      eonce,
1036                                                      "coordinator request");
1037                         rd_kafka_coord_req(rk,
1038                                            rko->rko_u.admin_request.coordtype,
1039                                            rko->rko_u.admin_request.coordkey,
1040                                            rd_kafka_admin_coord_request,
1041                                            NULL,
1042                                            rd_kafka_admin_timeout_remains(rko),
1043                                            RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1044                                            rd_kafka_admin_coord_response_parse,
1045                                            rko->rko_u.admin_request.eonce);
1046                         /* Wait asynchronously for broker response, which will
1047                          * trigger the eonce and worker to be called again. */
1048                         return RD_KAFKA_OP_RES_KEEP;
1049 
1050                 case RD_KAFKA_ADMIN_TARGET_FANOUT:
1051                         /* Shouldn't come here, fanouts are handled by
1052                          * fanout_worker() */
1053                         RD_NOTREACHED();
1054                         return RD_KAFKA_OP_RES_KEEP;
1055 
1056                 default:
1057                         /* Specific broker */
1058                         rd_assert(rko->rko_u.admin_request.broker_id >= 0);
1059                         rko->rko_u.admin_request.state =
1060                                 RD_KAFKA_ADMIN_STATE_WAIT_BROKER;
1061                         goto redo;  /* Trigger next state immediately */
1062                 }
1063         }
1064 
1065 
1066         case RD_KAFKA_ADMIN_STATE_WAIT_BROKER:
1067                 /* Broker lookup */
1068                 if (!(rkb = rd_kafka_admin_common_get_broker(
1069                               rk, rko, rko->rko_u.admin_request.broker_id))) {
1070                         /* Still waiting for broker to become available */
1071                         return RD_KAFKA_OP_RES_KEEP;
1072                 }
1073 
1074                 rko->rko_u.admin_request.state =
1075                         RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST;
1076                 goto redo;
1077 
1078         case RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER:
1079                 if (!(rkb = rd_kafka_admin_common_get_controller(rk, rko))) {
1080                         /* Still waiting for controller to become available. */
1081                         return RD_KAFKA_OP_RES_KEEP;
1082                 }
1083 
1084                 rko->rko_u.admin_request.state =
1085                         RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST;
1086                 goto redo;
1087 
1088         case RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS:
1089                 /* This state is only used by ADMIN_FANOUT which has
1090                  * its own fanout_worker() */
1091                 RD_NOTREACHED();
1092                 break;
1093 
1094         case RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST:
1095                 /* Got broker, send protocol request. */
1096 
1097                 /* Make sure we're called from a 'goto redo' where
1098                  * the rkb was set. */
1099                 rd_assert(rkb);
1100 
1101                 /* Still need to use the eonce since this worker may
1102                  * time out while waiting for response from broker, in which
1103                  * case the broker response will hit an empty eonce (ok). */
1104                 rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce,
1105                                              "send");
1106 
1107                 /* Send request (async) */
1108                 err = rko->rko_u.admin_request.cbs->request(
1109                         rkb,
1110                         &rko->rko_u.admin_request.args,
1111                         &rko->rko_u.admin_request.options,
1112                         errstr, sizeof(errstr),
1113                         RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1114                         rd_kafka_admin_handle_response,
1115                         rko->rko_u.admin_request.eonce);
1116 
1117                 /* Loose broker refcount from get_broker(), get_controller() */
1118                 rd_kafka_broker_destroy(rkb);
1119 
1120                 if (err) {
1121                         rd_kafka_enq_once_del_source(
1122                                 rko->rko_u.admin_request.eonce, "send");
1123                         rd_kafka_admin_result_fail(rko, err, "%s", errstr);
1124                         goto destroy;
1125                 }
1126 
1127                 rko->rko_u.admin_request.state =
1128                         RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE;
1129 
1130                 /* Wait asynchronously for broker response, which will
1131                  * trigger the eonce and worker to be called again. */
1132                 return RD_KAFKA_OP_RES_KEEP;
1133 
1134 
1135         case RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE:
1136                 rd_kafka_admin_response_parse(rko);
1137                 goto destroy;
1138         }
1139 
1140         return RD_KAFKA_OP_RES_KEEP;
1141 
1142  destroy:
1143         rd_kafka_admin_common_worker_destroy(rk, rko,
1144                                              rd_false/*don't destroy*/);
1145         return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy() */
1146 
1147 }
1148 
1149 
1150 /**
1151  * @brief Create a new admin_fanout op of type \p req_type and sets up the
1152  *        generic (type independent files).
1153  *
1154  *        The caller shall then populate the \c admin_fanout.requests list,
1155  *        initialize the \c admin_fanout.responses list,
1156  *        set the initial \c admin_fanout.outstanding value,
1157  *        and enqueue the op on rk_ops for further processing work.
1158  *
1159  * @param cbs Callbacks, must reside in .data segment.
1160  * @param options Optional options, may be NULL to use defaults.
1161  * @param rkq is the application reply queue.
1162  *
1163  * @locks none
1164  * @locality application thread
1165  */
1166 static rd_kafka_op_t *
1167 rd_kafka_admin_fanout_op_new (rd_kafka_t *rk,
1168                               rd_kafka_op_type_t req_type,
1169                               rd_kafka_event_type_t reply_event_type,
1170                               const struct rd_kafka_admin_fanout_worker_cbs
1171                               *cbs,
1172                               const rd_kafka_AdminOptions_t *options,
1173                               rd_kafka_q_t *rkq) {
1174         rd_kafka_op_t *rko;
1175 
1176         rd_assert(rk);
1177         rd_assert(rkq);
1178         rd_assert(cbs);
1179 
1180         rko = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_FANOUT);
1181         rko->rko_rk = rk;
1182 
1183         rko->rko_u.admin_request.reply_event_type = reply_event_type;
1184 
1185         rko->rko_u.admin_request.fanout.cbs =
1186                 (struct rd_kafka_admin_fanout_worker_cbs *)cbs;
1187 
1188         /* Make a copy of the options */
1189         if (options)
1190                 rko->rko_u.admin_request.options = *options;
1191         else
1192                 rd_kafka_AdminOptions_init(rk,
1193                                            &rko->rko_u.admin_request.options);
1194 
1195         rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_FANOUT;
1196 
1197         /* Calculate absolute timeout */
1198         rko->rko_u.admin_request.abs_timeout =
1199                 rd_timeout_init(
1200                         rd_kafka_confval_get_int(&rko->rko_u.admin_request.
1201                                                  options.request_timeout));
1202 
1203         /* Set up replyq */
1204         rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0);
1205 
1206         rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS;
1207 
1208         rko->rko_u.admin_request.fanout.reqtype = req_type;
1209 
1210         return rko;
1211 }
1212 
1213 
1214 /**
1215  * @brief Common fanout worker state machine handling regardless of request type
1216  *
1217  * @param rko Result of a fanned out operation, e.g., DELETERECORDS result.
1218  *
1219  * Tasks:
1220  *  - Checks for and responds to client termination
1221  *  - Polls for fanned out responses
1222  *  - Calls the partial response callback
1223  *  - Calls the merge responses callback upon receipt of all partial responses
1224  *  - Destruction of rko
1225  *
1226  * rko->rko_err may be one of:
1227  * RD_KAFKA_RESP_ERR_NO_ERROR, or
1228  * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup.
1229  *
1230  * @returns a hint to the op code whether the rko should be destroyed or not.
1231  */
1232 static rd_kafka_op_res_t
1233 rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq,
1234                               rd_kafka_op_t *rko) {
1235         rd_kafka_op_t *rko_fanout = rko->rko_u.admin_result.fanout_parent;
1236         const char *name = rd_kafka_op2str(rko_fanout->rko_u.admin_request.
1237                                            fanout.reqtype);
1238         rd_kafka_op_t *rko_result;
1239 
1240         RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_ADMIN_RESULT);
1241         RD_KAFKA_OP_TYPE_ASSERT(rko_fanout, RD_KAFKA_OP_ADMIN_FANOUT);
1242 
1243         rd_assert(rko_fanout->rko_u.admin_request.fanout.outstanding > 0);
1244         rko_fanout->rko_u.admin_request.fanout.outstanding--;
1245 
1246         rko->rko_u.admin_result.fanout_parent = NULL;
1247 
1248         if (rd_kafka_terminating(rk)) {
1249                 rd_kafka_dbg(rk, ADMIN, name,
1250                              "%s fanout worker called for fanned out op %s: "
1251                              "handle is terminating: %s",
1252                              name,
1253                              rd_kafka_op2str(rko->rko_type),
1254                              rd_kafka_err2str(rko_fanout->rko_err));
1255                 if (!rko->rko_err)
1256                         rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY;
1257         }
1258 
1259         rd_kafka_dbg(rk, ADMIN, name,
1260                      "%s fanout worker called for %s with %d request(s) "
1261                      "outstanding: %s",
1262                      name,
1263                      rd_kafka_op2str(rko->rko_type),
1264                      rko_fanout->rko_u.admin_request.fanout.outstanding,
1265                      rd_kafka_err2str(rko_fanout->rko_err));
1266 
1267         /* Add partial response to rko_fanout's result list. */
1268         rko_fanout->rko_u.admin_request.
1269                 fanout.cbs->partial_response(rko_fanout, rko);
1270 
1271         if (rko_fanout->rko_u.admin_request.fanout.outstanding > 0)
1272                 /* Wait for outstanding requests to finish */
1273                 return RD_KAFKA_OP_RES_HANDLED;
1274 
1275         rko_result = rd_kafka_admin_result_new(rko_fanout);
1276         rd_list_init_copy(&rko_result->rko_u.admin_result.results,
1277                           &rko_fanout->rko_u.admin_request.fanout.results);
1278         rd_list_copy_to(&rko_result->rko_u.admin_result.results,
1279                         &rko_fanout->rko_u.admin_request.fanout.results,
1280                         rko_fanout->rko_u.admin_request.
1281                         fanout.cbs->copy_result, NULL);
1282 
1283         /* Enqueue result on application queue, we're done. */
1284         rd_kafka_replyq_enq(&rko_fanout->rko_u.admin_request.replyq, rko_result,
1285                             rko_fanout->rko_u.admin_request.replyq.version);
1286 
1287         /* FALLTHRU */
1288         if (rko_fanout->rko_u.admin_request.fanout.outstanding == 0)
1289                 rd_kafka_op_destroy(rko_fanout);
1290 
1291         return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy(rko) */
1292 }
1293 
1294 /**@}*/
1295 
1296 
1297 /**
1298  * @name Generic AdminOptions
1299  * @{
1300  *
1301  *
1302  */
1303 
1304 rd_kafka_resp_err_t
1305 rd_kafka_AdminOptions_set_request_timeout (rd_kafka_AdminOptions_t *options,
1306                                            int timeout_ms,
1307                                            char *errstr, size_t errstr_size) {
1308         return rd_kafka_confval_set_type(&options->request_timeout,
1309                                          RD_KAFKA_CONFVAL_INT, &timeout_ms,
1310                                          errstr, errstr_size);
1311 }
1312 
1313 
1314 rd_kafka_resp_err_t
1315 rd_kafka_AdminOptions_set_operation_timeout (rd_kafka_AdminOptions_t *options,
1316                                              int timeout_ms,
1317                                              char *errstr, size_t errstr_size) {
1318         return rd_kafka_confval_set_type(&options->operation_timeout,
1319                                          RD_KAFKA_CONFVAL_INT, &timeout_ms,
1320                                          errstr, errstr_size);
1321 }
1322 
1323 
1324 rd_kafka_resp_err_t
1325 rd_kafka_AdminOptions_set_validate_only (rd_kafka_AdminOptions_t *options,
1326                                         int true_or_false,
1327                                         char *errstr, size_t errstr_size) {
1328         return rd_kafka_confval_set_type(&options->validate_only,
1329                                          RD_KAFKA_CONFVAL_INT, &true_or_false,
1330                                          errstr, errstr_size);
1331 }
1332 
1333 rd_kafka_resp_err_t
1334 rd_kafka_AdminOptions_set_incremental (rd_kafka_AdminOptions_t *options,
1335                                        int true_or_false,
1336                                        char *errstr, size_t errstr_size) {
1337         rd_snprintf(errstr, errstr_size,
1338                     "Incremental updates currently not supported, see KIP-248");
1339         return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
1340 
1341         return rd_kafka_confval_set_type(&options->incremental,
1342                                          RD_KAFKA_CONFVAL_INT, &true_or_false,
1343                                          errstr, errstr_size);
1344 }
1345 
1346 rd_kafka_resp_err_t
1347 rd_kafka_AdminOptions_set_broker (rd_kafka_AdminOptions_t *options,
1348                                   int32_t broker_id,
1349                                   char *errstr, size_t errstr_size) {
1350         int ibroker_id = (int)broker_id;
1351 
1352         return rd_kafka_confval_set_type(&options->broker,
1353                                          RD_KAFKA_CONFVAL_INT,
1354                                          &ibroker_id,
1355                                          errstr, errstr_size);
1356 }
1357 
1358 void
1359 rd_kafka_AdminOptions_set_opaque (rd_kafka_AdminOptions_t *options,
1360                                   void *opaque) {
1361         rd_kafka_confval_set_type(&options->opaque,
1362                                   RD_KAFKA_CONFVAL_PTR, opaque, NULL, 0);
1363 }
1364 
1365 
1366 /**
1367  * @brief Initialize and set up defaults for AdminOptions
1368  */
1369 static void rd_kafka_AdminOptions_init (rd_kafka_t *rk,
1370                                         rd_kafka_AdminOptions_t *options) {
1371         rd_kafka_confval_init_int(&options->request_timeout, "request_timeout",
1372                                   0, 3600*1000,
1373                                   rk->rk_conf.admin.request_timeout_ms);
1374 
1375         if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
1376             options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS ||
1377             options->for_api == RD_KAFKA_ADMIN_OP_DELETETOPICS ||
1378             options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS ||
1379             options->for_api == RD_KAFKA_ADMIN_OP_DELETERECORDS)
1380                 rd_kafka_confval_init_int(&options->operation_timeout,
1381                                           "operation_timeout",
1382                                           -1, 3600*1000,
1383                                           rk->rk_conf.admin.request_timeout_ms);
1384         else
1385                 rd_kafka_confval_disable(&options->operation_timeout,
1386                                          "operation_timeout");
1387 
1388         if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
1389             options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS ||
1390             options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS ||
1391             options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS)
1392                 rd_kafka_confval_init_int(&options->validate_only,
1393                                           "validate_only",
1394                                           0, 1, 0);
1395         else
1396                 rd_kafka_confval_disable(&options->validate_only,
1397                                          "validate_only");
1398 
1399         if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
1400             options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS)
1401                 rd_kafka_confval_init_int(&options->incremental,
1402                                           "incremental",
1403                                           0, 1, 0);
1404         else
1405                 rd_kafka_confval_disable(&options->incremental,
1406                                          "incremental");
1407 
1408         rd_kafka_confval_init_int(&options->broker, "broker",
1409                                   0, INT32_MAX, -1);
1410         rd_kafka_confval_init_ptr(&options->opaque, "opaque");
1411 }
1412 
1413 
1414 rd_kafka_AdminOptions_t *
1415 rd_kafka_AdminOptions_new (rd_kafka_t *rk, rd_kafka_admin_op_t for_api) {
1416         rd_kafka_AdminOptions_t *options;
1417 
1418         if ((int)for_api < 0 || for_api >= RD_KAFKA_ADMIN_OP__CNT)
1419                 return NULL;
1420 
1421         options = rd_calloc(1, sizeof(*options));
1422 
1423         options->for_api = for_api;
1424 
1425         rd_kafka_AdminOptions_init(rk, options);
1426 
1427         return options;
1428 }
1429 
1430 void rd_kafka_AdminOptions_destroy (rd_kafka_AdminOptions_t *options) {
1431         rd_free(options);
1432 }
1433 
1434 /**@}*/
1435 
1436 
1437 
1438 
1439 
1440 
1441 /**
1442  * @name CreateTopics
1443  * @{
1444  *
1445  *
1446  *
1447  */
1448 
1449 
1450 
1451 rd_kafka_NewTopic_t *
1452 rd_kafka_NewTopic_new (const char *topic,
1453                        int num_partitions,
1454                        int replication_factor,
1455                        char *errstr, size_t errstr_size) {
1456         rd_kafka_NewTopic_t *new_topic;
1457 
1458         if (!topic) {
1459                 rd_snprintf(errstr, errstr_size, "Invalid topic name");
1460                 return NULL;
1461         }
1462 
1463         if (num_partitions < -1 || num_partitions > RD_KAFKAP_PARTITIONS_MAX) {
1464                 rd_snprintf(errstr, errstr_size, "num_partitions out of "
1465                             "expected range %d..%d or -1 for broker default",
1466                             1, RD_KAFKAP_PARTITIONS_MAX);
1467                 return NULL;
1468         }
1469 
1470         if (replication_factor < -1 ||
1471             replication_factor > RD_KAFKAP_BROKERS_MAX) {
1472                 rd_snprintf(errstr, errstr_size,
1473                             "replication_factor out of expected range %d..%d",
1474                             -1, RD_KAFKAP_BROKERS_MAX);
1475                 return NULL;
1476         }
1477 
1478         new_topic = rd_calloc(1, sizeof(*new_topic));
1479         new_topic->topic = rd_strdup(topic);
1480         new_topic->num_partitions = num_partitions;
1481         new_topic->replication_factor = replication_factor;
1482 
1483         /* List of int32 lists */
1484         rd_list_init(&new_topic->replicas, 0, rd_list_destroy_free);
1485         rd_list_prealloc_elems(&new_topic->replicas, 0,
1486                                num_partitions == -1 ? 0 : num_partitions,
1487                                0/*nozero*/);
1488 
1489         /* List of ConfigEntrys */
1490         rd_list_init(&new_topic->config, 0, rd_kafka_ConfigEntry_free);
1491 
1492         return new_topic;
1493 
1494 }
1495 
1496 
1497 /**
1498  * @brief Topic name comparator for NewTopic_t
1499  */
1500 static int rd_kafka_NewTopic_cmp (const void *_a, const void *_b) {
1501         const rd_kafka_NewTopic_t *a = _a, *b = _b;
1502         return strcmp(a->topic, b->topic);
1503 }
1504 
1505 
1506 
1507 /**
1508  * @brief Allocate a new NewTopic and make a copy of \p src
1509  */
1510 static rd_kafka_NewTopic_t *
1511 rd_kafka_NewTopic_copy (const rd_kafka_NewTopic_t *src) {
1512         rd_kafka_NewTopic_t *dst;
1513 
1514         dst = rd_kafka_NewTopic_new(src->topic, src->num_partitions,
1515                                     src->replication_factor, NULL, 0);
1516         rd_assert(dst);
1517 
1518         rd_list_destroy(&dst->replicas); /* created in .._new() */
1519         rd_list_init_copy(&dst->replicas, &src->replicas);
1520         rd_list_copy_to(&dst->replicas, &src->replicas,
1521                         rd_list_copy_preallocated, NULL);
1522 
1523         rd_list_init_copy(&dst->config, &src->config);
1524         rd_list_copy_to(&dst->config, &src->config,
1525                         rd_kafka_ConfigEntry_list_copy, NULL);
1526 
1527         return dst;
1528 }
1529 
1530 void rd_kafka_NewTopic_destroy (rd_kafka_NewTopic_t *new_topic) {
1531         rd_list_destroy(&new_topic->replicas);
1532         rd_list_destroy(&new_topic->config);
1533         rd_free(new_topic->topic);
1534         rd_free(new_topic);
1535 }
1536 
1537 static void rd_kafka_NewTopic_free (void *ptr) {
1538         rd_kafka_NewTopic_destroy(ptr);
1539 }
1540 
1541 void
1542 rd_kafka_NewTopic_destroy_array (rd_kafka_NewTopic_t **new_topics,
1543                                  size_t new_topic_cnt) {
1544         size_t i;
1545         for (i = 0 ; i < new_topic_cnt ; i++)
1546                 rd_kafka_NewTopic_destroy(new_topics[i]);
1547 }
1548 
1549 
1550 rd_kafka_resp_err_t
1551 rd_kafka_NewTopic_set_replica_assignment (rd_kafka_NewTopic_t *new_topic,
1552                                           int32_t partition,
1553                                           int32_t *broker_ids,
1554                                           size_t broker_id_cnt,
1555                                           char *errstr, size_t errstr_size) {
1556         rd_list_t *rl;
1557         int i;
1558 
1559         if (new_topic->replication_factor != -1) {
1560                 rd_snprintf(errstr, errstr_size,
1561                             "Specifying a replication factor and "
1562                             "a replica assignment are mutually exclusive");
1563                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1564         } else if (new_topic->num_partitions == -1) {
1565                 rd_snprintf(errstr, errstr_size,
1566                             "Specifying a default partition count and a "
1567                             "replica assignment are mutually exclusive");
1568                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1569         }
1570 
1571         /* Replica partitions must be added consecutively starting from 0. */
1572         if (partition != rd_list_cnt(&new_topic->replicas)) {
1573                 rd_snprintf(errstr, errstr_size,
1574                             "Partitions must be added in order, "
1575                             "starting at 0: expecting partition %d, "
1576                             "not %"PRId32,
1577                             rd_list_cnt(&new_topic->replicas), partition);
1578                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1579         }
1580 
1581         if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) {
1582                 rd_snprintf(errstr, errstr_size,
1583                             "Too many brokers specified "
1584                             "(RD_KAFKAP_BROKERS_MAX=%d)",
1585                             RD_KAFKAP_BROKERS_MAX);
1586                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1587         }
1588 
1589 
1590         rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt);
1591 
1592         for (i = 0 ; i < (int)broker_id_cnt ; i++)
1593                 rd_list_set_int32(rl, i, broker_ids[i]);
1594 
1595         rd_list_add(&new_topic->replicas, rl);
1596 
1597         return RD_KAFKA_RESP_ERR_NO_ERROR;
1598 }
1599 
1600 
1601 /**
1602  * @brief Generic constructor of ConfigEntry which is also added to \p rl
1603  */
1604 static rd_kafka_resp_err_t
1605 rd_kafka_admin_add_config0 (rd_list_t *rl,
1606                             const char *name, const char *value,
1607                             rd_kafka_AlterOperation_t operation) {
1608         rd_kafka_ConfigEntry_t *entry;
1609 
1610         if (!name)
1611                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1612 
1613         entry = rd_calloc(1, sizeof(*entry));
1614         entry->kv = rd_strtup_new(name, value);
1615         entry->a.operation = operation;
1616 
1617         rd_list_add(rl, entry);
1618 
1619         return RD_KAFKA_RESP_ERR_NO_ERROR;
1620 }
1621 
1622 
1623 rd_kafka_resp_err_t
1624 rd_kafka_NewTopic_set_config (rd_kafka_NewTopic_t *new_topic,
1625                               const char *name, const char *value) {
1626         return rd_kafka_admin_add_config0(&new_topic->config, name, value,
1627                                           RD_KAFKA_ALTER_OP_ADD);
1628 }
1629 
1630 
1631 
1632 /**
1633  * @brief Parse CreateTopicsResponse and create ADMIN_RESULT op.
1634  */
1635 static rd_kafka_resp_err_t
1636 rd_kafka_CreateTopicsResponse_parse (rd_kafka_op_t *rko_req,
1637                                      rd_kafka_op_t **rko_resultp,
1638                                      rd_kafka_buf_t *reply,
1639                                      char *errstr, size_t errstr_size) {
1640         const int log_decode_errors = LOG_ERR;
1641         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
1642         rd_kafka_t *rk = rkb->rkb_rk;
1643         rd_kafka_op_t *rko_result = NULL;
1644         int32_t topic_cnt;
1645         int i;
1646 
1647         if (rd_kafka_buf_ApiVersion(reply) >= 2) {
1648                 int32_t Throttle_Time;
1649                 rd_kafka_buf_read_i32(reply, &Throttle_Time);
1650                 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
1651         }
1652 
1653         /* #topics */
1654         rd_kafka_buf_read_i32(reply, &topic_cnt);
1655 
1656         if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
1657                 rd_kafka_buf_parse_fail(
1658                         reply,
1659                         "Received %"PRId32" topics in response "
1660                         "when only %d were requested", topic_cnt,
1661                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
1662 
1663 
1664         rko_result = rd_kafka_admin_result_new(rko_req);
1665 
1666         rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt,
1667                      rd_kafka_topic_result_free);
1668 
1669         for (i = 0 ; i < (int)topic_cnt ; i++) {
1670                 rd_kafkap_str_t ktopic;
1671                 int16_t error_code;
1672                 rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER;
1673                 char *this_errstr = NULL;
1674                 rd_kafka_topic_result_t *terr;
1675                 rd_kafka_NewTopic_t skel;
1676                 int orig_pos;
1677 
1678                 rd_kafka_buf_read_str(reply, &ktopic);
1679                 rd_kafka_buf_read_i16(reply, &error_code);
1680 
1681                 if (rd_kafka_buf_ApiVersion(reply) >= 1)
1682                         rd_kafka_buf_read_str(reply, &error_msg);
1683 
1684                 /* For non-blocking CreateTopicsRequests the broker
1685                  * will returned REQUEST_TIMED_OUT for topics
1686                  * that were triggered for creation -
1687                  * we hide this error code from the application
1688                  * since the topic creation is in fact in progress. */
1689                 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
1690                     rd_kafka_confval_get_int(&rko_req->rko_u.
1691                                              admin_request.options.
1692                                              operation_timeout) <= 0) {
1693                         error_code = RD_KAFKA_RESP_ERR_NO_ERROR;
1694                         this_errstr = NULL;
1695                 }
1696 
1697                 if (error_code) {
1698                         if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
1699                             RD_KAFKAP_STR_LEN(&error_msg) == 0)
1700                                 this_errstr =
1701                                         (char *)rd_kafka_err2str(error_code);
1702                         else
1703                                 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
1704 
1705                 }
1706 
1707                 terr = rd_kafka_topic_result_new(ktopic.str,
1708                                                  RD_KAFKAP_STR_LEN(&ktopic),
1709                                                  error_code, this_errstr);
1710 
1711                 /* As a convenience to the application we insert topic result
1712                  * in the same order as they were requested. The broker
1713                  * does not maintain ordering unfortunately. */
1714                 skel.topic = terr->topic;
1715                 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
1716                                          &skel, rd_kafka_NewTopic_cmp);
1717                 if (orig_pos == -1) {
1718                         rd_kafka_topic_result_destroy(terr);
1719                         rd_kafka_buf_parse_fail(
1720                                 reply,
1721                                 "Broker returned topic %.*s that was not "
1722                                 "included in the original request",
1723                                 RD_KAFKAP_STR_PR(&ktopic));
1724                 }
1725 
1726                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
1727                                  orig_pos) != NULL) {
1728                         rd_kafka_topic_result_destroy(terr);
1729                         rd_kafka_buf_parse_fail(
1730                                 reply,
1731                                 "Broker returned topic %.*s multiple times",
1732                                 RD_KAFKAP_STR_PR(&ktopic));
1733                 }
1734 
1735                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
1736                             terr);
1737         }
1738 
1739         *rko_resultp = rko_result;
1740 
1741         return RD_KAFKA_RESP_ERR_NO_ERROR;
1742 
1743  err_parse:
1744         if (rko_result)
1745                 rd_kafka_op_destroy(rko_result);
1746 
1747         rd_snprintf(errstr, errstr_size,
1748                     "CreateTopics response protocol parse failure: %s",
1749                     rd_kafka_err2str(reply->rkbuf_err));
1750 
1751         return reply->rkbuf_err;
1752 }
1753 
1754 
1755 void rd_kafka_CreateTopics (rd_kafka_t *rk,
1756                             rd_kafka_NewTopic_t **new_topics,
1757                             size_t new_topic_cnt,
1758                             const rd_kafka_AdminOptions_t *options,
1759                             rd_kafka_queue_t *rkqu) {
1760         rd_kafka_op_t *rko;
1761         size_t i;
1762         static const struct rd_kafka_admin_worker_cbs cbs = {
1763                 rd_kafka_CreateTopicsRequest,
1764                 rd_kafka_CreateTopicsResponse_parse,
1765         };
1766 
1767         rd_assert(rkqu);
1768 
1769         rko = rd_kafka_admin_request_op_new(rk,
1770                                             RD_KAFKA_OP_CREATETOPICS,
1771                                             RD_KAFKA_EVENT_CREATETOPICS_RESULT,
1772                                             &cbs, options, rkqu->rkqu_q);
1773 
1774         rd_list_init(&rko->rko_u.admin_request.args, (int)new_topic_cnt,
1775                      rd_kafka_NewTopic_free);
1776 
1777         for (i = 0 ; i < new_topic_cnt ; i++)
1778                 rd_list_add(&rko->rko_u.admin_request.args,
1779                             rd_kafka_NewTopic_copy(new_topics[i]));
1780 
1781         rd_kafka_q_enq(rk->rk_ops, rko);
1782 }
1783 
1784 
1785 /**
1786  * @brief Get an array of topic results from a CreateTopics result.
1787  *
1788  * The returned \p topics life-time is the same as the \p result object.
1789  * @param cntp is updated to the number of elements in the array.
1790  */
1791 const rd_kafka_topic_result_t **
1792 rd_kafka_CreateTopics_result_topics (
1793         const rd_kafka_CreateTopics_result_t *result,
1794         size_t *cntp) {
1795         return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result,
1796                                                 cntp);
1797 }
1798 
1799 /**@}*/
1800 
1801 
1802 
1803 
1804 /**
1805  * @name Delete topics
1806  * @{
1807  *
1808  *
1809  *
1810  *
1811  */
1812 
1813 rd_kafka_DeleteTopic_t *rd_kafka_DeleteTopic_new (const char *topic) {
1814         size_t tsize = strlen(topic) + 1;
1815         rd_kafka_DeleteTopic_t *del_topic;
1816 
1817         /* Single allocation */
1818         del_topic = rd_malloc(sizeof(*del_topic) + tsize);
1819         del_topic->topic = del_topic->data;
1820         memcpy(del_topic->topic, topic, tsize);
1821 
1822         return del_topic;
1823 }
1824 
1825 void rd_kafka_DeleteTopic_destroy (rd_kafka_DeleteTopic_t *del_topic) {
1826         rd_free(del_topic);
1827 }
1828 
1829 static void rd_kafka_DeleteTopic_free (void *ptr) {
1830         rd_kafka_DeleteTopic_destroy(ptr);
1831 }
1832 
1833 
1834 void rd_kafka_DeleteTopic_destroy_array (rd_kafka_DeleteTopic_t **del_topics,
1835                                          size_t del_topic_cnt) {
1836         size_t i;
1837         for (i = 0 ; i < del_topic_cnt ; i++)
1838                 rd_kafka_DeleteTopic_destroy(del_topics[i]);
1839 }
1840 
1841 
1842 /**
1843  * @brief Topic name comparator for DeleteTopic_t
1844  */
1845 static int rd_kafka_DeleteTopic_cmp (const void *_a, const void *_b) {
1846         const rd_kafka_DeleteTopic_t *a = _a, *b = _b;
1847         return strcmp(a->topic, b->topic);
1848 }
1849 
1850 /**
1851  * @brief Allocate a new DeleteTopic and make a copy of \p src
1852  */
1853 static rd_kafka_DeleteTopic_t *
1854 rd_kafka_DeleteTopic_copy (const rd_kafka_DeleteTopic_t *src) {
1855         return rd_kafka_DeleteTopic_new(src->topic);
1856 }
1857 
1858 
1859 
1860 
1861 
1862 
1863 
1864 /**
1865  * @brief Parse DeleteTopicsResponse and create ADMIN_RESULT op.
1866  */
1867 static rd_kafka_resp_err_t
1868 rd_kafka_DeleteTopicsResponse_parse (rd_kafka_op_t *rko_req,
1869                                      rd_kafka_op_t **rko_resultp,
1870                                      rd_kafka_buf_t *reply,
1871                                      char *errstr, size_t errstr_size) {
1872         const int log_decode_errors = LOG_ERR;
1873         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
1874         rd_kafka_t *rk = rkb->rkb_rk;
1875         rd_kafka_op_t *rko_result = NULL;
1876         int32_t topic_cnt;
1877         int i;
1878 
1879         if (rd_kafka_buf_ApiVersion(reply) >= 1) {
1880                 int32_t Throttle_Time;
1881                 rd_kafka_buf_read_i32(reply, &Throttle_Time);
1882                 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
1883         }
1884 
1885         /* #topics */
1886         rd_kafka_buf_read_i32(reply, &topic_cnt);
1887 
1888         if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
1889                 rd_kafka_buf_parse_fail(
1890                         reply,
1891                         "Received %"PRId32" topics in response "
1892                         "when only %d were requested", topic_cnt,
1893                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
1894 
1895         rko_result = rd_kafka_admin_result_new(rko_req);
1896 
1897         rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt,
1898                      rd_kafka_topic_result_free);
1899 
1900         for (i = 0 ; i < (int)topic_cnt ; i++) {
1901                 rd_kafkap_str_t ktopic;
1902                 int16_t error_code;
1903                 rd_kafka_topic_result_t *terr;
1904                 rd_kafka_NewTopic_t skel;
1905                 int orig_pos;
1906 
1907                 rd_kafka_buf_read_str(reply, &ktopic);
1908                 rd_kafka_buf_read_i16(reply, &error_code);
1909 
1910                 /* For non-blocking DeleteTopicsRequests the broker
1911                  * will returned REQUEST_TIMED_OUT for topics
1912                  * that were triggered for creation -
1913                  * we hide this error code from the application
1914                  * since the topic creation is in fact in progress. */
1915                 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
1916                     rd_kafka_confval_get_int(&rko_req->rko_u.
1917                                              admin_request.options.
1918                                              operation_timeout) <= 0) {
1919                         error_code = RD_KAFKA_RESP_ERR_NO_ERROR;
1920                 }
1921 
1922                 terr = rd_kafka_topic_result_new(ktopic.str,
1923                                                  RD_KAFKAP_STR_LEN(&ktopic),
1924                                                  error_code,
1925                                                  error_code ?
1926                                                  rd_kafka_err2str(error_code) :
1927                                                  NULL);
1928 
1929                 /* As a convenience to the application we insert topic result
1930                  * in the same order as they were requested. The broker
1931                  * does not maintain ordering unfortunately. */
1932                 skel.topic = terr->topic;
1933                 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
1934                                          &skel, rd_kafka_DeleteTopic_cmp);
1935                 if (orig_pos == -1) {
1936                         rd_kafka_topic_result_destroy(terr);
1937                         rd_kafka_buf_parse_fail(
1938                                 reply,
1939                                 "Broker returned topic %.*s that was not "
1940                                 "included in the original request",
1941                                 RD_KAFKAP_STR_PR(&ktopic));
1942                 }
1943 
1944                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
1945                                  orig_pos) != NULL) {
1946                         rd_kafka_topic_result_destroy(terr);
1947                         rd_kafka_buf_parse_fail(
1948                                 reply,
1949                                 "Broker returned topic %.*s multiple times",
1950                                 RD_KAFKAP_STR_PR(&ktopic));
1951                 }
1952 
1953                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
1954                             terr);
1955         }
1956 
1957         *rko_resultp = rko_result;
1958 
1959         return RD_KAFKA_RESP_ERR_NO_ERROR;
1960 
1961  err_parse:
1962         if (rko_result)
1963                 rd_kafka_op_destroy(rko_result);
1964 
1965         rd_snprintf(errstr, errstr_size,
1966                     "DeleteTopics response protocol parse failure: %s",
1967                     rd_kafka_err2str(reply->rkbuf_err));
1968 
1969         return reply->rkbuf_err;
1970 }
1971 
1972 
1973 
1974 
1975 
1976 
1977 void rd_kafka_DeleteTopics (rd_kafka_t *rk,
1978                             rd_kafka_DeleteTopic_t **del_topics,
1979                             size_t del_topic_cnt,
1980                             const rd_kafka_AdminOptions_t *options,
1981                             rd_kafka_queue_t *rkqu) {
1982         rd_kafka_op_t *rko;
1983         size_t i;
1984         static const struct rd_kafka_admin_worker_cbs cbs = {
1985                 rd_kafka_DeleteTopicsRequest,
1986                 rd_kafka_DeleteTopicsResponse_parse,
1987         };
1988 
1989         rd_assert(rkqu);
1990 
1991         rko = rd_kafka_admin_request_op_new(rk,
1992                                             RD_KAFKA_OP_DELETETOPICS,
1993                                             RD_KAFKA_EVENT_DELETETOPICS_RESULT,
1994                                             &cbs, options, rkqu->rkqu_q);
1995 
1996         rd_list_init(&rko->rko_u.admin_request.args, (int)del_topic_cnt,
1997                      rd_kafka_DeleteTopic_free);
1998 
1999         for (i = 0 ; i < del_topic_cnt ; i++)
2000                 rd_list_add(&rko->rko_u.admin_request.args,
2001                             rd_kafka_DeleteTopic_copy(del_topics[i]));
2002 
2003         rd_kafka_q_enq(rk->rk_ops, rko);
2004 }
2005 
2006 
2007 /**
2008  * @brief Get an array of topic results from a DeleteTopics result.
2009  *
2010  * The returned \p topics life-time is the same as the \p result object.
2011  * @param cntp is updated to the number of elements in the array.
2012  */
2013 const rd_kafka_topic_result_t **
2014 rd_kafka_DeleteTopics_result_topics (
2015         const rd_kafka_DeleteTopics_result_t *result,
2016         size_t *cntp) {
2017         return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result,
2018                                                 cntp);
2019 }
2020 
2021 
2022 
2023 
2024 /**
2025  * @name Create partitions
2026  * @{
2027  *
2028  *
2029  *
2030  *
2031  */
2032 
2033 rd_kafka_NewPartitions_t *rd_kafka_NewPartitions_new (const char *topic,
2034                                                       size_t new_total_cnt,
2035                                                       char *errstr,
2036                                                       size_t errstr_size) {
2037         size_t tsize = strlen(topic) + 1;
2038         rd_kafka_NewPartitions_t *newps;
2039 
2040         if (new_total_cnt < 1 || new_total_cnt > RD_KAFKAP_PARTITIONS_MAX) {
2041                 rd_snprintf(errstr, errstr_size, "new_total_cnt out of "
2042                             "expected range %d..%d",
2043                             1, RD_KAFKAP_PARTITIONS_MAX);
2044                 return NULL;
2045         }
2046 
2047         /* Single allocation */
2048         newps = rd_malloc(sizeof(*newps) + tsize);
2049         newps->total_cnt = new_total_cnt;
2050         newps->topic = newps->data;
2051         memcpy(newps->topic, topic, tsize);
2052 
2053         /* List of int32 lists */
2054         rd_list_init(&newps->replicas, 0, rd_list_destroy_free);
2055         rd_list_prealloc_elems(&newps->replicas, 0, new_total_cnt, 0/*nozero*/);
2056 
2057         return newps;
2058 }
2059 
2060 /**
2061  * @brief Topic name comparator for NewPartitions_t
2062  */
2063 static int rd_kafka_NewPartitions_cmp (const void *_a, const void *_b) {
2064         const rd_kafka_NewPartitions_t *a = _a, *b = _b;
2065         return strcmp(a->topic, b->topic);
2066 }
2067 
2068 
2069 /**
2070  * @brief Allocate a new CreatePartitions and make a copy of \p src
2071  */
2072 static rd_kafka_NewPartitions_t *
2073 rd_kafka_NewPartitions_copy (const rd_kafka_NewPartitions_t *src) {
2074         rd_kafka_NewPartitions_t *dst;
2075 
2076         dst = rd_kafka_NewPartitions_new(src->topic, src->total_cnt, NULL, 0);
2077 
2078         rd_list_destroy(&dst->replicas); /* created in .._new() */
2079         rd_list_init_copy(&dst->replicas, &src->replicas);
2080         rd_list_copy_to(&dst->replicas, &src->replicas,
2081                         rd_list_copy_preallocated, NULL);
2082 
2083         return dst;
2084 }
2085 
2086 void rd_kafka_NewPartitions_destroy (rd_kafka_NewPartitions_t *newps) {
2087         rd_list_destroy(&newps->replicas);
2088         rd_free(newps);
2089 }
2090 
2091 static void rd_kafka_NewPartitions_free (void *ptr) {
2092         rd_kafka_NewPartitions_destroy(ptr);
2093 }
2094 
2095 
2096 void rd_kafka_NewPartitions_destroy_array (rd_kafka_NewPartitions_t **newps,
2097                                          size_t newps_cnt) {
2098         size_t i;
2099         for (i = 0 ; i < newps_cnt ; i++)
2100                 rd_kafka_NewPartitions_destroy(newps[i]);
2101 }
2102 
2103 
2104 
2105 
2106 
2107 rd_kafka_resp_err_t
2108 rd_kafka_NewPartitions_set_replica_assignment (rd_kafka_NewPartitions_t *newp,
2109                                                int32_t new_partition_idx,
2110                                                int32_t *broker_ids,
2111                                                size_t broker_id_cnt,
2112                                                char *errstr,
2113                                                size_t errstr_size) {
2114         rd_list_t *rl;
2115         int i;
2116 
2117         /* Replica partitions must be added consecutively starting from 0. */
2118         if (new_partition_idx != rd_list_cnt(&newp->replicas)) {
2119                 rd_snprintf(errstr, errstr_size,
2120                             "Partitions must be added in order, "
2121                             "starting at 0: expecting partition "
2122                             "index %d, not %"PRId32,
2123                             rd_list_cnt(&newp->replicas), new_partition_idx);
2124                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2125         }
2126 
2127         if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) {
2128                 rd_snprintf(errstr, errstr_size,
2129                             "Too many brokers specified "
2130                             "(RD_KAFKAP_BROKERS_MAX=%d)",
2131                             RD_KAFKAP_BROKERS_MAX);
2132                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2133         }
2134 
2135         rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt);
2136 
2137         for (i = 0 ; i < (int)broker_id_cnt ; i++)
2138                 rd_list_set_int32(rl, i, broker_ids[i]);
2139 
2140         rd_list_add(&newp->replicas, rl);
2141 
2142         return RD_KAFKA_RESP_ERR_NO_ERROR;
2143 }
2144 
2145 
2146 
2147 
2148 /**
2149  * @brief Parse CreatePartitionsResponse and create ADMIN_RESULT op.
2150  */
2151 static rd_kafka_resp_err_t
2152 rd_kafka_CreatePartitionsResponse_parse (rd_kafka_op_t *rko_req,
2153                                          rd_kafka_op_t **rko_resultp,
2154                                          rd_kafka_buf_t *reply,
2155                                          char *errstr,
2156                                          size_t errstr_size) {
2157         const int log_decode_errors = LOG_ERR;
2158         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
2159         rd_kafka_t *rk = rkb->rkb_rk;
2160         rd_kafka_op_t *rko_result = NULL;
2161         int32_t topic_cnt;
2162         int i;
2163         int32_t Throttle_Time;
2164 
2165         rd_kafka_buf_read_i32(reply, &Throttle_Time);
2166         rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
2167 
2168         /* #topics */
2169         rd_kafka_buf_read_i32(reply, &topic_cnt);
2170 
2171         if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
2172                 rd_kafka_buf_parse_fail(
2173                         reply,
2174                         "Received %"PRId32" topics in response "
2175                         "when only %d were requested", topic_cnt,
2176                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
2177 
2178         rko_result = rd_kafka_admin_result_new(rko_req);
2179 
2180         rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt,
2181                      rd_kafka_topic_result_free);
2182 
2183         for (i = 0 ; i < (int)topic_cnt ; i++) {
2184                 rd_kafkap_str_t ktopic;
2185                 int16_t error_code;
2186                 char *this_errstr = NULL;
2187                 rd_kafka_topic_result_t *terr;
2188                 rd_kafka_NewTopic_t skel;
2189                 rd_kafkap_str_t error_msg;
2190                 int orig_pos;
2191 
2192                 rd_kafka_buf_read_str(reply, &ktopic);
2193                 rd_kafka_buf_read_i16(reply, &error_code);
2194                 rd_kafka_buf_read_str(reply, &error_msg);
2195 
2196                 /* For non-blocking CreatePartitionsRequests the broker
2197                  * will returned REQUEST_TIMED_OUT for topics
2198                  * that were triggered for creation -
2199                  * we hide this error code from the application
2200                  * since the topic creation is in fact in progress. */
2201                 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
2202                     rd_kafka_confval_get_int(&rko_req->rko_u.
2203                                              admin_request.options.
2204                                              operation_timeout) <= 0) {
2205                         error_code = RD_KAFKA_RESP_ERR_NO_ERROR;
2206                 }
2207 
2208                 if (error_code) {
2209                         if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
2210                             RD_KAFKAP_STR_LEN(&error_msg) == 0)
2211                                 this_errstr =
2212                                         (char *)rd_kafka_err2str(error_code);
2213                         else
2214                                 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
2215                 }
2216 
2217                 terr = rd_kafka_topic_result_new(ktopic.str,
2218                                                  RD_KAFKAP_STR_LEN(&ktopic),
2219                                                  error_code,
2220                                                  error_code ?
2221                                                  this_errstr : NULL);
2222 
2223                 /* As a convenience to the application we insert topic result
2224                  * in the same order as they were requested. The broker
2225                  * does not maintain ordering unfortunately. */
2226                 skel.topic = terr->topic;
2227                 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
2228                                          &skel, rd_kafka_NewPartitions_cmp);
2229                 if (orig_pos == -1) {
2230                         rd_kafka_topic_result_destroy(terr);
2231                         rd_kafka_buf_parse_fail(
2232                                 reply,
2233                                 "Broker returned topic %.*s that was not "
2234                                 "included in the original request",
2235                                 RD_KAFKAP_STR_PR(&ktopic));
2236                 }
2237 
2238                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
2239                                  orig_pos) != NULL) {
2240                         rd_kafka_topic_result_destroy(terr);
2241                         rd_kafka_buf_parse_fail(
2242                                 reply,
2243                                 "Broker returned topic %.*s multiple times",
2244                                 RD_KAFKAP_STR_PR(&ktopic));
2245                 }
2246 
2247                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
2248                             terr);
2249         }
2250 
2251         *rko_resultp = rko_result;
2252 
2253         return RD_KAFKA_RESP_ERR_NO_ERROR;
2254 
2255  err_parse:
2256         if (rko_result)
2257                 rd_kafka_op_destroy(rko_result);
2258 
2259         rd_snprintf(errstr, errstr_size,
2260                     "CreatePartitions response protocol parse failure: %s",
2261                     rd_kafka_err2str(reply->rkbuf_err));
2262 
2263         return reply->rkbuf_err;
2264 }
2265 
2266 
2267 
2268 
2269 
2270 
2271 
2272 void rd_kafka_CreatePartitions (rd_kafka_t *rk,
2273                                 rd_kafka_NewPartitions_t **newps,
2274                                 size_t newps_cnt,
2275                                 const rd_kafka_AdminOptions_t *options,
2276                                 rd_kafka_queue_t *rkqu) {
2277         rd_kafka_op_t *rko;
2278         size_t i;
2279         static const struct rd_kafka_admin_worker_cbs cbs = {
2280                 rd_kafka_CreatePartitionsRequest,
2281                 rd_kafka_CreatePartitionsResponse_parse,
2282         };
2283 
2284         rd_assert(rkqu);
2285 
2286         rko = rd_kafka_admin_request_op_new(
2287                 rk,
2288                 RD_KAFKA_OP_CREATEPARTITIONS,
2289                 RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT,
2290                 &cbs, options, rkqu->rkqu_q);
2291 
2292         rd_list_init(&rko->rko_u.admin_request.args, (int)newps_cnt,
2293                      rd_kafka_NewPartitions_free);
2294 
2295         for (i = 0 ; i < newps_cnt ; i++)
2296                 rd_list_add(&rko->rko_u.admin_request.args,
2297                             rd_kafka_NewPartitions_copy(newps[i]));
2298 
2299         rd_kafka_q_enq(rk->rk_ops, rko);
2300 }
2301 
2302 
2303 /**
2304  * @brief Get an array of topic results from a CreatePartitions result.
2305  *
2306  * The returned \p topics life-time is the same as the \p result object.
2307  * @param cntp is updated to the number of elements in the array.
2308  */
2309 const rd_kafka_topic_result_t **
2310 rd_kafka_CreatePartitions_result_topics (
2311         const rd_kafka_CreatePartitions_result_t *result,
2312         size_t *cntp) {
2313         return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result,
2314                                                 cntp);
2315 }
2316 
2317 /**@}*/
2318 
2319 
2320 
2321 
2322 /**
2323  * @name ConfigEntry
2324  * @{
2325  *
2326  *
2327  *
2328  */
2329 
2330 static void rd_kafka_ConfigEntry_destroy (rd_kafka_ConfigEntry_t *entry) {
2331         rd_strtup_destroy(entry->kv);
2332         rd_list_destroy(&entry->synonyms);
2333         rd_free(entry);
2334 }
2335 
2336 
2337 static void rd_kafka_ConfigEntry_free (void *ptr) {
2338         rd_kafka_ConfigEntry_destroy((rd_kafka_ConfigEntry_t *)ptr);
2339 }
2340 
2341 
2342 /**
2343  * @brief Create new ConfigEntry
2344  *
2345  * @param name Config entry name
2346  * @param name_len Length of name, or -1 to use strlen()
2347  * @param value Config entry value, or NULL
2348  * @param value_len Length of value, or -1 to use strlen()
2349  */
2350 static rd_kafka_ConfigEntry_t *
2351 rd_kafka_ConfigEntry_new0 (const char *name, size_t name_len,
2352                            const char *value, size_t value_len) {
2353         rd_kafka_ConfigEntry_t *entry;
2354 
2355         if (!name)
2356                 return NULL;
2357 
2358         entry = rd_calloc(1, sizeof(*entry));
2359         entry->kv = rd_strtup_new0(name, name_len, value, value_len);
2360 
2361         rd_list_init(&entry->synonyms, 0, rd_kafka_ConfigEntry_free);
2362 
2363         entry->a.source = RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG;
2364 
2365         return entry;
2366 }
2367 
2368 /**
2369  * @sa rd_kafka_ConfigEntry_new0
2370  */
2371 static rd_kafka_ConfigEntry_t *
2372 rd_kafka_ConfigEntry_new (const char *name, const char *value) {
2373         return rd_kafka_ConfigEntry_new0(name, -1, value, -1);
2374 }
2375 
2376 
2377 
2378 
2379 /**
2380  * @brief Allocate a new AlterConfigs and make a copy of \p src
2381  */
2382 static rd_kafka_ConfigEntry_t *
2383 rd_kafka_ConfigEntry_copy (const rd_kafka_ConfigEntry_t *src) {
2384         rd_kafka_ConfigEntry_t *dst;
2385 
2386         dst = rd_kafka_ConfigEntry_new(src->kv->name, src->kv->value);
2387         dst->a = src->a;
2388 
2389         rd_list_destroy(&dst->synonyms); /* created in .._new() */
2390         rd_list_init_copy(&dst->synonyms, &src->synonyms);
2391         rd_list_copy_to(&dst->synonyms, &src->synonyms,
2392                         rd_kafka_ConfigEntry_list_copy, NULL);
2393 
2394         return dst;
2395 }
2396 
2397 static void *rd_kafka_ConfigEntry_list_copy (const void *src, void *opaque) {
2398         return rd_kafka_ConfigEntry_copy((const rd_kafka_ConfigEntry_t *)src);
2399 }
2400 
2401 
2402 const char *rd_kafka_ConfigEntry_name (const rd_kafka_ConfigEntry_t *entry) {
2403         return entry->kv->name;
2404 }
2405 
2406 const char *
2407 rd_kafka_ConfigEntry_value (const rd_kafka_ConfigEntry_t *entry) {
2408         return entry->kv->value;
2409 }
2410 
2411 rd_kafka_ConfigSource_t
2412 rd_kafka_ConfigEntry_source (const rd_kafka_ConfigEntry_t *entry) {
2413         return entry->a.source;
2414 }
2415 
2416 int rd_kafka_ConfigEntry_is_read_only (const rd_kafka_ConfigEntry_t *entry) {
2417         return entry->a.is_readonly;
2418 }
2419 
2420 int rd_kafka_ConfigEntry_is_default (const rd_kafka_ConfigEntry_t *entry) {
2421         return entry->a.is_default;
2422 }
2423 
2424 int rd_kafka_ConfigEntry_is_sensitive (const rd_kafka_ConfigEntry_t *entry) {
2425         return entry->a.is_sensitive;
2426 }
2427 
2428 int rd_kafka_ConfigEntry_is_synonym (const rd_kafka_ConfigEntry_t *entry) {
2429         return entry->a.is_synonym;
2430 }
2431 
2432 const rd_kafka_ConfigEntry_t **
2433 rd_kafka_ConfigEntry_synonyms (const rd_kafka_ConfigEntry_t *entry,
2434                                size_t *cntp) {
2435         *cntp = rd_list_cnt(&entry->synonyms);
2436         if (!*cntp)
2437                 return NULL;
2438         return (const rd_kafka_ConfigEntry_t **)entry->synonyms.rl_elems;
2439 
2440 }
2441 
2442 
2443 /**@}*/
2444 
2445 
2446 
2447 /**
2448  * @name ConfigSource
2449  * @{
2450  *
2451  *
2452  *
2453  */
2454 
2455 const char *
2456 rd_kafka_ConfigSource_name (rd_kafka_ConfigSource_t confsource) {
2457         static const char *names[] = {
2458                 "UNKNOWN_CONFIG",
2459                 "DYNAMIC_TOPIC_CONFIG",
2460                 "DYNAMIC_BROKER_CONFIG",
2461                 "DYNAMIC_DEFAULT_BROKER_CONFIG",
2462                 "STATIC_BROKER_CONFIG",
2463                 "DEFAULT_CONFIG",
2464         };
2465 
2466         if ((unsigned int)confsource >=
2467             (unsigned int)RD_KAFKA_CONFIG_SOURCE__CNT)
2468                 return "UNSUPPORTED";
2469 
2470         return names[confsource];
2471 }
2472 
2473 /**@}*/
2474 
2475 
2476 
2477 /**
2478  * @name ConfigResource
2479  * @{
2480  *
2481  *
2482  *
2483  */
2484 
2485 const char *
2486 rd_kafka_ResourceType_name (rd_kafka_ResourceType_t restype) {
2487         static const char *names[] = {
2488                 "UNKNOWN",
2489                 "ANY",
2490                 "TOPIC",
2491                 "GROUP",
2492                 "BROKER",
2493         };
2494 
2495         if ((unsigned int)restype >=
2496             (unsigned int)RD_KAFKA_RESOURCE__CNT)
2497                 return "UNSUPPORTED";
2498 
2499         return names[restype];
2500 }
2501 
2502 
2503 rd_kafka_ConfigResource_t *
2504 rd_kafka_ConfigResource_new (rd_kafka_ResourceType_t restype,
2505                              const char *resname) {
2506         rd_kafka_ConfigResource_t *config;
2507         size_t namesz = resname ? strlen(resname) : 0;
2508 
2509         if (!namesz || (int)restype < 0)
2510                 return NULL;
2511 
2512         config = rd_calloc(1, sizeof(*config) + namesz + 1);
2513         config->name = config->data;
2514         memcpy(config->name, resname, namesz + 1);
2515         config->restype = restype;
2516 
2517         rd_list_init(&config->config, 8, rd_kafka_ConfigEntry_free);
2518 
2519         return config;
2520 }
2521 
2522 void rd_kafka_ConfigResource_destroy (rd_kafka_ConfigResource_t *config) {
2523         rd_list_destroy(&config->config);
2524         if (config->errstr)
2525                 rd_free(config->errstr);
2526         rd_free(config);
2527 }
2528 
2529 static void rd_kafka_ConfigResource_free (void *ptr) {
2530         rd_kafka_ConfigResource_destroy((rd_kafka_ConfigResource_t *)ptr);
2531 }
2532 
2533 
2534 void rd_kafka_ConfigResource_destroy_array (rd_kafka_ConfigResource_t **config,
2535                                             size_t config_cnt) {
2536         size_t i;
2537         for (i = 0 ; i < config_cnt ; i++)
2538                 rd_kafka_ConfigResource_destroy(config[i]);
2539 }
2540 
2541 
2542 /**
2543  * @brief Type and name comparator for ConfigResource_t
2544  */
2545 static int rd_kafka_ConfigResource_cmp (const void *_a, const void *_b) {
2546         const rd_kafka_ConfigResource_t *a = _a, *b = _b;
2547         int r = RD_CMP(a->restype, b->restype);
2548         if (r)
2549                 return r;
2550         return strcmp(a->name, b->name);
2551 }
2552 
2553 /**
2554  * @brief Allocate a new AlterConfigs and make a copy of \p src
2555  */
2556 static rd_kafka_ConfigResource_t *
2557 rd_kafka_ConfigResource_copy (const rd_kafka_ConfigResource_t *src) {
2558         rd_kafka_ConfigResource_t *dst;
2559 
2560         dst = rd_kafka_ConfigResource_new(src->restype, src->name);
2561 
2562         rd_list_destroy(&dst->config); /* created in .._new() */
2563         rd_list_init_copy(&dst->config, &src->config);
2564         rd_list_copy_to(&dst->config, &src->config,
2565                         rd_kafka_ConfigEntry_list_copy, NULL);
2566 
2567         return dst;
2568 }
2569 
2570 
2571 static void
2572 rd_kafka_ConfigResource_add_ConfigEntry (rd_kafka_ConfigResource_t *config,
2573                                          rd_kafka_ConfigEntry_t *entry) {
2574         rd_list_add(&config->config, entry);
2575 }
2576 
2577 
2578 rd_kafka_resp_err_t
2579 rd_kafka_ConfigResource_add_config (rd_kafka_ConfigResource_t *config,
2580                                     const char *name, const char *value) {
2581         if (!name || !*name || !value)
2582                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2583 
2584         return rd_kafka_admin_add_config0(&config->config, name, value,
2585                                           RD_KAFKA_ALTER_OP_ADD);
2586 }
2587 
2588 rd_kafka_resp_err_t
2589 rd_kafka_ConfigResource_set_config (rd_kafka_ConfigResource_t *config,
2590                                     const char *name, const char *value) {
2591         if (!name || !*name || !value)
2592                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2593 
2594         return rd_kafka_admin_add_config0(&config->config, name, value,
2595                                           RD_KAFKA_ALTER_OP_SET);
2596 }
2597 
2598 rd_kafka_resp_err_t
2599 rd_kafka_ConfigResource_delete_config (rd_kafka_ConfigResource_t *config,
2600                                        const char *name) {
2601         if (!name || !*name)
2602                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2603 
2604         return rd_kafka_admin_add_config0(&config->config, name, NULL,
2605                                           RD_KAFKA_ALTER_OP_DELETE);
2606 }
2607 
2608 
2609 const rd_kafka_ConfigEntry_t **
2610 rd_kafka_ConfigResource_configs (const rd_kafka_ConfigResource_t *config,
2611                                  size_t *cntp) {
2612         *cntp = rd_list_cnt(&config->config);
2613         if (!*cntp)
2614                 return NULL;
2615         return (const rd_kafka_ConfigEntry_t **)config->config.rl_elems;
2616 }
2617 
2618 
2619 
2620 
2621 rd_kafka_ResourceType_t
2622 rd_kafka_ConfigResource_type (const rd_kafka_ConfigResource_t *config) {
2623         return config->restype;
2624 }
2625 
2626 const char *
2627 rd_kafka_ConfigResource_name (const rd_kafka_ConfigResource_t *config) {
2628         return config->name;
2629 }
2630 
2631 rd_kafka_resp_err_t
2632 rd_kafka_ConfigResource_error (const rd_kafka_ConfigResource_t *config) {
2633         return config->err;
2634 }
2635 
2636 const char *
2637 rd_kafka_ConfigResource_error_string (const rd_kafka_ConfigResource_t *config) {
2638         if (!config->err)
2639                 return NULL;
2640         if (config->errstr)
2641                 return config->errstr;
2642         return rd_kafka_err2str(config->err);
2643 }
2644 
2645 
2646 /**
2647  * @brief Look in the provided ConfigResource_t* list for a resource of
2648  *        type BROKER and set its broker id in \p broker_id, returning
2649  *        RD_KAFKA_RESP_ERR_NO_ERROR.
2650  *
2651  *        If multiple BROKER resources are found RD_KAFKA_RESP_ERR__CONFLICT
2652  *        is returned and an error string is written to errstr.
2653  *
2654  *        If no BROKER resources are found RD_KAFKA_RESP_ERR_NO_ERROR
2655  *        is returned and \p broker_idp is set to use the coordinator.
2656  */
2657 static rd_kafka_resp_err_t
2658 rd_kafka_ConfigResource_get_single_broker_id (const rd_list_t *configs,
2659                                               int32_t *broker_idp,
2660                                               char *errstr,
2661                                               size_t errstr_size) {
2662         const rd_kafka_ConfigResource_t *config;
2663         int i;
2664         int32_t broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; /* Some default
2665                                                                * value that we
2666                                                                * can compare
2667                                                                * to below */
2668 
2669         RD_LIST_FOREACH(config, configs, i) {
2670                 char *endptr;
2671                 long int r;
2672 
2673                 if (config->restype != RD_KAFKA_RESOURCE_BROKER)
2674                         continue;
2675 
2676                 if (broker_id != RD_KAFKA_ADMIN_TARGET_CONTROLLER) {
2677                         rd_snprintf(errstr, errstr_size,
2678                                     "Only one ConfigResource of type BROKER "
2679                                     "is allowed per call");
2680                         return RD_KAFKA_RESP_ERR__CONFLICT;
2681                 }
2682 
2683                 /* Convert string broker-id to int32 */
2684                 r = (int32_t)strtol(config->name, &endptr, 10);
2685                 if (r == LONG_MIN || r == LONG_MAX || config->name == endptr ||
2686                     r < 0) {
2687                         rd_snprintf(errstr, errstr_size,
2688                                     "Expected an int32 broker_id for "
2689                                     "ConfigResource(type=BROKER, name=%s)",
2690                                     config->name);
2691                         return RD_KAFKA_RESP_ERR__INVALID_ARG;
2692                 }
2693 
2694                 broker_id = r;
2695 
2696                 /* Keep scanning to make sure there are no duplicate
2697                  * BROKER resources. */
2698         }
2699 
2700         *broker_idp = broker_id;
2701 
2702         return RD_KAFKA_RESP_ERR_NO_ERROR;
2703 }
2704 
2705 
2706 /**@}*/
2707 
2708 
2709 
2710 /**
2711  * @name AlterConfigs
2712  * @{
2713  *
2714  *
2715  *
2716  */
2717 
2718 
2719 
2720 /**
2721  * @brief Parse AlterConfigsResponse and create ADMIN_RESULT op.
2722  */
2723 static rd_kafka_resp_err_t
2724 rd_kafka_AlterConfigsResponse_parse (rd_kafka_op_t *rko_req,
2725                                      rd_kafka_op_t **rko_resultp,
2726                                      rd_kafka_buf_t *reply,
2727                                      char *errstr, size_t errstr_size) {
2728         const int log_decode_errors = LOG_ERR;
2729         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
2730         rd_kafka_t *rk = rkb->rkb_rk;
2731         rd_kafka_op_t *rko_result = NULL;
2732         int32_t res_cnt;
2733         int i;
2734         int32_t Throttle_Time;
2735 
2736         rd_kafka_buf_read_i32(reply, &Throttle_Time);
2737         rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
2738 
2739         rd_kafka_buf_read_i32(reply, &res_cnt);
2740 
2741         if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) {
2742                 rd_snprintf(errstr, errstr_size,
2743                             "Received %"PRId32" ConfigResources in response "
2744                             "when only %d were requested", res_cnt,
2745                             rd_list_cnt(&rko_req->rko_u.admin_request.args));
2746                 return RD_KAFKA_RESP_ERR__BAD_MSG;
2747         }
2748 
2749         rko_result = rd_kafka_admin_result_new(rko_req);
2750 
2751         rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt,
2752                      rd_kafka_ConfigResource_free);
2753 
2754         for (i = 0 ; i < (int)res_cnt ; i++) {
2755                 int16_t error_code;
2756                 rd_kafkap_str_t error_msg;
2757                 int8_t res_type;
2758                 rd_kafkap_str_t kres_name;
2759                 char *res_name;
2760                 char *this_errstr = NULL;
2761                 rd_kafka_ConfigResource_t *config;
2762                 rd_kafka_ConfigResource_t skel;
2763                 int orig_pos;
2764 
2765                 rd_kafka_buf_read_i16(reply, &error_code);
2766                 rd_kafka_buf_read_str(reply, &error_msg);
2767                 rd_kafka_buf_read_i8(reply, &res_type);
2768                 rd_kafka_buf_read_str(reply, &kres_name);
2769                 RD_KAFKAP_STR_DUPA(&res_name, &kres_name);
2770 
2771                 if (error_code) {
2772                         if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
2773                             RD_KAFKAP_STR_LEN(&error_msg) == 0)
2774                                 this_errstr =
2775                                         (char *)rd_kafka_err2str(error_code);
2776                         else
2777                                 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
2778                 }
2779 
2780                 config = rd_kafka_ConfigResource_new(res_type, res_name);
2781                 if (!config) {
2782                         rd_kafka_log(rko_req->rko_rk, LOG_ERR,
2783                                      "ADMIN", "AlterConfigs returned "
2784                                      "unsupported ConfigResource #%d with "
2785                                      "type %d and name \"%s\": ignoring",
2786                                      i, res_type, res_name);
2787                         continue;
2788                 }
2789 
2790                 config->err = error_code;
2791                 if (this_errstr)
2792                         config->errstr = rd_strdup(this_errstr);
2793 
2794                 /* As a convenience to the application we insert result
2795                  * in the same order as they were requested. The broker
2796                  * does not maintain ordering unfortunately. */
2797                 skel.restype = config->restype;
2798                 skel.name = config->name;
2799                 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
2800                                          &skel, rd_kafka_ConfigResource_cmp);
2801                 if (orig_pos == -1) {
2802                         rd_kafka_ConfigResource_destroy(config);
2803                         rd_kafka_buf_parse_fail(
2804                                 reply,
2805                                 "Broker returned ConfigResource %d,%s "
2806                                 "that was not "
2807                                 "included in the original request",
2808                                 res_type, res_name);
2809                 }
2810 
2811                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
2812                                  orig_pos) != NULL) {
2813                         rd_kafka_ConfigResource_destroy(config);
2814                         rd_kafka_buf_parse_fail(
2815                                 reply,
2816                                 "Broker returned ConfigResource %d,%s "
2817                                 "multiple times",
2818                                 res_type, res_name);
2819                 }
2820 
2821                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
2822                             config);
2823         }
2824 
2825         *rko_resultp = rko_result;
2826 
2827         return RD_KAFKA_RESP_ERR_NO_ERROR;
2828 
2829  err_parse:
2830         if (rko_result)
2831                 rd_kafka_op_destroy(rko_result);
2832 
2833         rd_snprintf(errstr, errstr_size,
2834                     "AlterConfigs response protocol parse failure: %s",
2835                     rd_kafka_err2str(reply->rkbuf_err));
2836 
2837         return reply->rkbuf_err;
2838 }
2839 
2840 
2841 
2842 
2843 void rd_kafka_AlterConfigs (rd_kafka_t *rk,
2844                             rd_kafka_ConfigResource_t **configs,
2845                             size_t config_cnt,
2846                             const rd_kafka_AdminOptions_t *options,
2847                             rd_kafka_queue_t *rkqu) {
2848         rd_kafka_op_t *rko;
2849         size_t i;
2850         rd_kafka_resp_err_t err;
2851         char errstr[256];
2852         static const struct rd_kafka_admin_worker_cbs cbs = {
2853                 rd_kafka_AlterConfigsRequest,
2854                 rd_kafka_AlterConfigsResponse_parse,
2855         };
2856 
2857         rd_assert(rkqu);
2858 
2859         rko = rd_kafka_admin_request_op_new(
2860                 rk,
2861                 RD_KAFKA_OP_ALTERCONFIGS,
2862                 RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
2863                 &cbs, options, rkqu->rkqu_q);
2864 
2865         rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt,
2866                      rd_kafka_ConfigResource_free);
2867 
2868         for (i = 0 ; i < config_cnt ; i++)
2869                 rd_list_add(&rko->rko_u.admin_request.args,
2870                             rd_kafka_ConfigResource_copy(configs[i]));
2871 
2872         /* If there's a BROKER resource in the list we need to
2873          * speak directly to that broker rather than the controller.
2874          *
2875          * Multiple BROKER resources are not allowed.
2876          */
2877         err = rd_kafka_ConfigResource_get_single_broker_id(
2878                 &rko->rko_u.admin_request.args,
2879                 &rko->rko_u.admin_request.broker_id,
2880                 errstr, sizeof(errstr));
2881         if (err) {
2882                 rd_kafka_admin_result_fail(rko, err, "%s", errstr);
2883                 rd_kafka_admin_common_worker_destroy(rk, rko,
2884                                                      rd_true/*destroy*/);
2885                 return;
2886         }
2887 
2888         rd_kafka_q_enq(rk->rk_ops, rko);
2889 }
2890 
2891 
2892 const rd_kafka_ConfigResource_t **
2893 rd_kafka_AlterConfigs_result_resources (
2894         const rd_kafka_AlterConfigs_result_t *result,
2895         size_t *cntp) {
2896         return rd_kafka_admin_result_ret_resources(
2897                 (const rd_kafka_op_t *)result, cntp);
2898 }
2899 
2900 /**@}*/
2901 
2902 
2903 
2904 
2905 /**
2906  * @name DescribeConfigs
2907  * @{
2908  *
2909  *
2910  *
2911  */
2912 
2913 
2914 /**
2915  * @brief Parse DescribeConfigsResponse and create ADMIN_RESULT op.
2916  */
2917 static rd_kafka_resp_err_t
2918 rd_kafka_DescribeConfigsResponse_parse (rd_kafka_op_t *rko_req,
2919                                         rd_kafka_op_t **rko_resultp,
2920                                         rd_kafka_buf_t *reply,
2921                                         char *errstr, size_t errstr_size) {
2922         const int log_decode_errors = LOG_ERR;
2923         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
2924         rd_kafka_t *rk = rkb->rkb_rk;
2925         rd_kafka_op_t *rko_result = NULL;
2926         int32_t res_cnt;
2927         int i;
2928         int32_t Throttle_Time;
2929         rd_kafka_ConfigResource_t *config = NULL;
2930         rd_kafka_ConfigEntry_t *entry = NULL;
2931 
2932         rd_kafka_buf_read_i32(reply, &Throttle_Time);
2933         rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
2934 
2935         /* #resources */
2936         rd_kafka_buf_read_i32(reply, &res_cnt);
2937 
2938         if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
2939                 rd_kafka_buf_parse_fail(
2940                         reply,
2941                         "Received %"PRId32" ConfigResources in response "
2942                         "when only %d were requested", res_cnt,
2943                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
2944 
2945         rko_result = rd_kafka_admin_result_new(rko_req);
2946 
2947         rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt,
2948                      rd_kafka_ConfigResource_free);
2949 
2950         for (i = 0 ; i < (int)res_cnt ; i++) {
2951                 int16_t error_code;
2952                 rd_kafkap_str_t error_msg;
2953                 int8_t res_type;
2954                 rd_kafkap_str_t kres_name;
2955                 char *res_name;
2956                 char *this_errstr = NULL;
2957                 rd_kafka_ConfigResource_t skel;
2958                 int orig_pos;
2959                 int32_t entry_cnt;
2960                 int ci;
2961 
2962                 rd_kafka_buf_read_i16(reply, &error_code);
2963                 rd_kafka_buf_read_str(reply, &error_msg);
2964                 rd_kafka_buf_read_i8(reply, &res_type);
2965                 rd_kafka_buf_read_str(reply, &kres_name);
2966                 RD_KAFKAP_STR_DUPA(&res_name, &kres_name);
2967 
2968                 if (error_code) {
2969                         if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
2970                             RD_KAFKAP_STR_LEN(&error_msg) == 0)
2971                                 this_errstr =
2972                                         (char *)rd_kafka_err2str(error_code);
2973                         else
2974                                 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
2975                 }
2976 
2977                 config = rd_kafka_ConfigResource_new(res_type, res_name);
2978                 if (!config) {
2979                         rd_kafka_log(rko_req->rko_rk, LOG_ERR,
2980                                      "ADMIN", "DescribeConfigs returned "
2981                                      "unsupported ConfigResource #%d with "
2982                                      "type %d and name \"%s\": ignoring",
2983                                      i, res_type, res_name);
2984                         continue;
2985                 }
2986 
2987                 config->err = error_code;
2988                 if (this_errstr)
2989                         config->errstr = rd_strdup(this_errstr);
2990 
2991                 /* #config_entries */
2992                 rd_kafka_buf_read_i32(reply, &entry_cnt);
2993 
2994                 for (ci = 0 ; ci < (int)entry_cnt ; ci++) {
2995                         rd_kafkap_str_t config_name, config_value;
2996                         int32_t syn_cnt;
2997                         int si;
2998 
2999                         rd_kafka_buf_read_str(reply, &config_name);
3000                         rd_kafka_buf_read_str(reply, &config_value);
3001 
3002                         entry = rd_kafka_ConfigEntry_new0(
3003                                 config_name.str,
3004                                 RD_KAFKAP_STR_LEN(&config_name),
3005                                 config_value.str,
3006                                 RD_KAFKAP_STR_LEN(&config_value));
3007 
3008                         rd_kafka_buf_read_bool(reply, &entry->a.is_readonly);
3009 
3010                         /* ApiVersion 0 has is_default field, while
3011                          * ApiVersion 1 has source field.
3012                          * Convert between the two so they look the same
3013                          * to the caller. */
3014                         if (rd_kafka_buf_ApiVersion(reply) == 0) {
3015                                 rd_kafka_buf_read_bool(reply,
3016                                                        &entry->a.is_default);
3017                                 if (entry->a.is_default)
3018                                         entry->a.source =
3019                                                 RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG;
3020                         } else {
3021                                 int8_t config_source;
3022                                 rd_kafka_buf_read_i8(reply, &config_source);
3023                                 entry->a.source = config_source;
3024 
3025                                 if (entry->a.source ==
3026                                     RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
3027                                         entry->a.is_default = 1;
3028 
3029                         }
3030 
3031                         rd_kafka_buf_read_bool(reply, &entry->a.is_sensitive);
3032 
3033 
3034                         if (rd_kafka_buf_ApiVersion(reply) == 1) {
3035                                 /* #config_synonyms (ApiVersion 1) */
3036                                 rd_kafka_buf_read_i32(reply, &syn_cnt);
3037 
3038                                 if (syn_cnt > 100000)
3039                                         rd_kafka_buf_parse_fail(
3040                                                 reply,
3041                                                 "Broker returned %"PRId32
3042                                                 " config synonyms for "
3043                                                 "ConfigResource %d,%s: "
3044                                                 "limit is 100000",
3045                                                 syn_cnt,
3046                                                 config->restype,
3047                                                 config->name);
3048 
3049                                 if (syn_cnt > 0)
3050                                         rd_list_grow(&entry->synonyms, syn_cnt);
3051 
3052                         } else {
3053                                 /* No synonyms in ApiVersion 0 */
3054                                 syn_cnt = 0;
3055                         }
3056 
3057 
3058 
3059                         /* Read synonyms (ApiVersion 1) */
3060                         for (si = 0 ; si < (int)syn_cnt ; si++) {
3061                                 rd_kafkap_str_t syn_name, syn_value;
3062                                 int8_t syn_source;
3063                                 rd_kafka_ConfigEntry_t *syn_entry;
3064 
3065                                 rd_kafka_buf_read_str(reply, &syn_name);
3066                                 rd_kafka_buf_read_str(reply, &syn_value);
3067                                 rd_kafka_buf_read_i8(reply, &syn_source);
3068 
3069                                 syn_entry = rd_kafka_ConfigEntry_new0(
3070                                         syn_name.str,
3071                                         RD_KAFKAP_STR_LEN(&syn_name),
3072                                         syn_value.str,
3073                                         RD_KAFKAP_STR_LEN(&syn_value));
3074                                 if (!syn_entry)
3075                                         rd_kafka_buf_parse_fail(
3076                                                 reply,
3077                                                 "Broker returned invalid "
3078                                                 "synonym #%d "
3079                                                 "for ConfigEntry #%d (%s) "
3080                                                 "and ConfigResource %d,%s: "
3081                                                 "syn_name.len %d, "
3082                                                 "syn_value.len %d",
3083                                                 si, ci, entry->kv->name,
3084                                                 config->restype, config->name,
3085                                                 (int)syn_name.len,
3086                                                 (int)syn_value.len);
3087 
3088                                 syn_entry->a.source = syn_source;
3089                                 syn_entry->a.is_synonym = 1;
3090 
3091                                 rd_list_add(&entry->synonyms, syn_entry);
3092                         }
3093 
3094                         rd_kafka_ConfigResource_add_ConfigEntry(
3095                                 config, entry);
3096                         entry = NULL;
3097                 }
3098 
3099                 /* As a convenience to the application we insert result
3100                  * in the same order as they were requested. The broker
3101                  * does not maintain ordering unfortunately. */
3102                 skel.restype = config->restype;
3103                 skel.name = config->name;
3104                 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args,
3105                                          &skel, rd_kafka_ConfigResource_cmp);
3106                 if (orig_pos == -1)
3107                         rd_kafka_buf_parse_fail(
3108                                 reply,
3109                                 "Broker returned ConfigResource %d,%s "
3110                                 "that was not "
3111                                 "included in the original request",
3112                                 res_type, res_name);
3113 
3114                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
3115                                  orig_pos) != NULL)
3116                         rd_kafka_buf_parse_fail(
3117                                 reply,
3118                                 "Broker returned ConfigResource %d,%s "
3119                                 "multiple times",
3120                                 res_type, res_name);
3121 
3122                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
3123                             config);
3124                 config = NULL;
3125         }
3126 
3127         *rko_resultp = rko_result;
3128 
3129         return RD_KAFKA_RESP_ERR_NO_ERROR;
3130 
3131  err_parse:
3132         if (entry)
3133                 rd_kafka_ConfigEntry_destroy(entry);
3134         if (config)
3135                 rd_kafka_ConfigResource_destroy(config);
3136 
3137         if (rko_result)
3138                 rd_kafka_op_destroy(rko_result);
3139 
3140         rd_snprintf(errstr, errstr_size,
3141                     "DescribeConfigs response protocol parse failure: %s",
3142                     rd_kafka_err2str(reply->rkbuf_err));
3143 
3144         return reply->rkbuf_err;
3145 }
3146 
3147 
3148 
3149 void rd_kafka_DescribeConfigs (rd_kafka_t *rk,
3150                                rd_kafka_ConfigResource_t **configs,
3151                                size_t config_cnt,
3152                                const rd_kafka_AdminOptions_t *options,
3153                                rd_kafka_queue_t *rkqu) {
3154         rd_kafka_op_t *rko;
3155         size_t i;
3156         rd_kafka_resp_err_t err;
3157         char errstr[256];
3158         static const struct rd_kafka_admin_worker_cbs cbs = {
3159                 rd_kafka_DescribeConfigsRequest,
3160                 rd_kafka_DescribeConfigsResponse_parse,
3161         };
3162 
3163         rd_assert(rkqu);
3164 
3165         rko = rd_kafka_admin_request_op_new(
3166                 rk,
3167                 RD_KAFKA_OP_DESCRIBECONFIGS,
3168                 RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
3169                 &cbs, options, rkqu->rkqu_q);
3170 
3171         rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt,
3172                      rd_kafka_ConfigResource_free);
3173 
3174         for (i = 0 ; i < config_cnt ; i++)
3175                 rd_list_add(&rko->rko_u.admin_request.args,
3176                             rd_kafka_ConfigResource_copy(configs[i]));
3177 
3178         /* If there's a BROKER resource in the list we need to
3179          * speak directly to that broker rather than the controller.
3180          *
3181          * Multiple BROKER resources are not allowed.
3182          */
3183         err = rd_kafka_ConfigResource_get_single_broker_id(
3184                 &rko->rko_u.admin_request.args,
3185                 &rko->rko_u.admin_request.broker_id,
3186                 errstr, sizeof(errstr));
3187         if (err) {
3188                 rd_kafka_admin_result_fail(rko, err, "%s", errstr);
3189                 rd_kafka_admin_common_worker_destroy(rk, rko,
3190                                                      rd_true/*destroy*/);
3191                 return;
3192         }
3193 
3194         rd_kafka_q_enq(rk->rk_ops, rko);
3195 }
3196 
3197 
3198 
3199 
3200 const rd_kafka_ConfigResource_t **
3201 rd_kafka_DescribeConfigs_result_resources (
3202         const rd_kafka_DescribeConfigs_result_t *result,
3203         size_t *cntp) {
3204         return rd_kafka_admin_result_ret_resources(
3205                 (const rd_kafka_op_t *)result, cntp);
3206 }
3207 
3208 /**@}*/
3209 
3210 /**
3211  * @name Delete Records
3212  * @{
3213  *
3214  *
3215  *
3216  *
3217  */
3218 
3219 rd_kafka_DeleteRecords_t *
3220 rd_kafka_DeleteRecords_new (const rd_kafka_topic_partition_list_t *
3221                             before_offsets) {
3222         rd_kafka_DeleteRecords_t *del_records;
3223 
3224         del_records = rd_calloc(1, sizeof(*del_records));
3225         del_records->offsets =
3226                 rd_kafka_topic_partition_list_copy(before_offsets);
3227 
3228         return del_records;
3229 }
3230 
3231 void rd_kafka_DeleteRecords_destroy (rd_kafka_DeleteRecords_t *del_records) {
3232         rd_kafka_topic_partition_list_destroy(del_records->offsets);
3233         rd_free(del_records);
3234 }
3235 
3236 void rd_kafka_DeleteRecords_destroy_array (rd_kafka_DeleteRecords_t **
3237                                            del_records,
3238                                            size_t del_record_cnt) {
3239         size_t i;
3240         for (i = 0 ; i < del_record_cnt ; i++)
3241                 rd_kafka_DeleteRecords_destroy(del_records[i]);
3242 }
3243 
3244 
3245 
3246 /** @brief Merge the DeleteRecords response from a single broker
3247  *         into the user response list.
3248  */
3249 static void
3250 rd_kafka_DeleteRecords_response_merge (rd_kafka_op_t *rko_fanout,
3251                                        const rd_kafka_op_t *rko_partial) {
3252         rd_kafka_t *rk = rko_fanout->rko_rk;
3253         const rd_kafka_topic_partition_list_t *partitions;
3254         rd_kafka_topic_partition_list_t *respartitions;
3255         const rd_kafka_topic_partition_t *partition;
3256 
3257         rd_assert(rko_partial->rko_evtype ==
3258                   RD_KAFKA_EVENT_DELETERECORDS_RESULT);
3259 
3260         /* Partitions from the DeleteRecordsResponse */
3261         partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0);
3262 
3263         /* Partitions (offsets) from the DeleteRecords() call */
3264         respartitions = rd_list_elem(&rko_fanout->rko_u.admin_request.
3265                                      fanout.results, 0);
3266 
3267         RD_KAFKA_TPLIST_FOREACH(partition, partitions) {
3268                 rd_kafka_topic_partition_t *respart;
3269 
3270 
3271                 /* Find result partition */
3272                 respart = rd_kafka_topic_partition_list_find(
3273                         respartitions,
3274                         partition->topic,
3275                         partition->partition);
3276                 if (unlikely(!respart)) {
3277                         rd_dassert(!*"partition not found");
3278 
3279                         rd_kafka_log(rk, LOG_WARNING, "DELETERECORDS",
3280                                      "DeleteRecords response contains "
3281                                      "unexpected %s [%"PRId32"] which "
3282                                      "was not in the request list: ignored",
3283                                      partition->topic, partition->partition);
3284                         continue;
3285                 }
3286 
3287                 respart->offset = partition->offset;
3288                 respart->err = partition->err;
3289         }
3290 }
3291 
3292 
3293 
3294 /**
3295  * @brief Parse DeleteRecordsResponse and create ADMIN_RESULT op.
3296  */
3297 static rd_kafka_resp_err_t
3298 rd_kafka_DeleteRecordsResponse_parse (rd_kafka_op_t *rko_req,
3299                                       rd_kafka_op_t **rko_resultp,
3300                                       rd_kafka_buf_t *reply,
3301                                       char *errstr, size_t errstr_size) {
3302         const int log_decode_errors = LOG_ERR;
3303         rd_kafka_op_t *rko_result;
3304         rd_kafka_topic_partition_list_t *offsets;
3305 
3306         rd_kafka_buf_read_throttle_time(reply);
3307 
3308         offsets = rd_kafka_buf_read_topic_partitions(reply, 0,
3309                                                      rd_true/*read_offset*/,
3310                                                      rd_true/*read_part_errs*/);
3311         if (!offsets)
3312                 rd_kafka_buf_parse_fail(reply,
3313                                         "Failed to parse topic partitions");
3314 
3315 
3316         rko_result = rd_kafka_admin_result_new(rko_req);
3317         rd_list_init(&rko_result->rko_u.admin_result.results, 1,
3318                      rd_kafka_topic_partition_list_destroy_free);
3319         rd_list_add(&rko_result->rko_u.admin_result.results, offsets);
3320         *rko_resultp = rko_result;
3321         return RD_KAFKA_RESP_ERR_NO_ERROR;
3322 
3323 err_parse:
3324         rd_snprintf(errstr, errstr_size,
3325                     "DeleteRecords response protocol parse failure: %s",
3326                     rd_kafka_err2str(reply->rkbuf_err));
3327 
3328         return reply->rkbuf_err;
3329 }
3330 
3331 
3332 /**
3333  * @brief Call when leaders have been queried to progress the DeleteRecords
3334  *        admin op to its next phase, sending DeleteRecords to partition
3335  *        leaders.
3336  *
3337  * @param rko Reply op (RD_KAFKA_OP_LEADERS).
3338  */
3339 static rd_kafka_op_res_t
3340 rd_kafka_DeleteRecords_leaders_queried_cb (rd_kafka_t *rk,
3341                                            rd_kafka_q_t *rkq,
3342                                            rd_kafka_op_t *reply) {
3343         rd_kafka_resp_err_t err = reply->rko_err;
3344         const rd_list_t *leaders =
3345                 reply->rko_u.leaders.leaders; /* Possibly NULL (on err) */
3346         rd_kafka_topic_partition_list_t *partitions =
3347                 reply->rko_u.leaders.partitions; /* Possibly NULL (on err) */
3348         rd_kafka_op_t *rko_fanout = reply->rko_u.leaders.opaque;
3349         rd_kafka_topic_partition_t *rktpar;
3350         rd_kafka_topic_partition_list_t *offsets;
3351         const struct rd_kafka_partition_leader *leader;
3352         static const struct rd_kafka_admin_worker_cbs cbs = {
3353                 rd_kafka_DeleteRecordsRequest,
3354                 rd_kafka_DeleteRecordsResponse_parse,
3355         };
3356         int i;
3357 
3358         rd_assert((rko_fanout->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
3359                   RD_KAFKA_OP_ADMIN_FANOUT);
3360 
3361         if (err == RD_KAFKA_RESP_ERR__DESTROY)
3362                 goto err;
3363 
3364         /* Requested offsets */
3365         offsets = rd_list_elem(&rko_fanout->rko_u.admin_request.args, 0);
3366 
3367         /* Update the error field of each partition from the
3368          * leader-queried partition list so that ERR_UNKNOWN_TOPIC_OR_PART
3369          * and similar are propagated, since those partitions are not
3370          * included in the leaders list. */
3371         RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
3372                 rd_kafka_topic_partition_t *rktpar2;
3373 
3374                 if (!rktpar->err)
3375                         continue;
3376 
3377                 rktpar2 = rd_kafka_topic_partition_list_find(
3378                         offsets, rktpar->topic, rktpar->partition);
3379                 rd_assert(rktpar2);
3380                 rktpar2->err = rktpar->err;
3381         }
3382 
3383 
3384         if (err) {
3385         err:
3386                 rd_kafka_admin_result_fail(
3387                         rko_fanout,
3388                         err,
3389                         "Failed to query partition leaders: %s",
3390                         err == RD_KAFKA_RESP_ERR__NOENT ?
3391                         "No leaders found" : rd_kafka_err2str(err));
3392                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3393                                                      rd_true/*destroy*/);
3394                 return RD_KAFKA_OP_RES_HANDLED;
3395         }
3396 
3397         /* The response lists is one element deep and that element is a
3398          * rd_kafka_topic_partition_list_t with the results of the deletes. */
3399         rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, 1,
3400                      rd_kafka_topic_partition_list_destroy_free);
3401         rd_list_add(&rko_fanout->rko_u.admin_request.fanout.results,
3402                     rd_kafka_topic_partition_list_copy(offsets));
3403 
3404         rko_fanout->rko_u.admin_request.fanout.outstanding =
3405                 rd_list_cnt(leaders);
3406 
3407         rd_assert(rd_list_cnt(leaders) > 0);
3408 
3409         /* For each leader send a request for its partitions */
3410         RD_LIST_FOREACH(leader, leaders, i) {
3411                 rd_kafka_op_t *rko =
3412                         rd_kafka_admin_request_op_new(
3413                                 rk,
3414                                 RD_KAFKA_OP_DELETERECORDS,
3415                                 RD_KAFKA_EVENT_DELETERECORDS_RESULT,
3416                                 &cbs, &rko_fanout->rko_u.admin_request.options,
3417                                 rk->rk_ops);
3418                 rko->rko_u.admin_request.fanout_parent = rko_fanout;
3419                 rko->rko_u.admin_request.broker_id = leader->rkb->rkb_nodeid;
3420 
3421                 rd_kafka_topic_partition_list_sort_by_topic(leader->partitions);
3422 
3423                 rd_list_init(&rko->rko_u.admin_request.args, 1,
3424                              rd_kafka_topic_partition_list_destroy_free);
3425                 rd_list_add(&rko->rko_u.admin_request.args,
3426                             rd_kafka_topic_partition_list_copy(
3427                                     leader->partitions));
3428 
3429                 /* Enqueue op for admin_worker() to transition to next state */
3430                 rd_kafka_q_enq(rk->rk_ops, rko);
3431         }
3432 
3433         return RD_KAFKA_OP_RES_HANDLED;
3434 }
3435 
3436 
3437 void rd_kafka_DeleteRecords (rd_kafka_t *rk,
3438                              rd_kafka_DeleteRecords_t **del_records,
3439                              size_t del_record_cnt,
3440                              const rd_kafka_AdminOptions_t *options,
3441                              rd_kafka_queue_t *rkqu) {
3442         rd_kafka_op_t *rko_fanout;
3443         static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = {
3444                 rd_kafka_DeleteRecords_response_merge,
3445                 rd_kafka_topic_partition_list_copy_opaque,
3446         };
3447         const rd_kafka_topic_partition_list_t *offsets;
3448         rd_kafka_topic_partition_list_t *copied_offsets;
3449 
3450         rd_assert(rkqu);
3451 
3452         rko_fanout = rd_kafka_admin_fanout_op_new(
3453                 rk,
3454                 RD_KAFKA_OP_DELETERECORDS,
3455                 RD_KAFKA_EVENT_DELETERECORDS_RESULT,
3456                 &fanout_cbs, options, rkqu->rkqu_q);
3457 
3458         if (del_record_cnt != 1) {
3459                 /* We only support one DeleteRecords per call since there
3460                  * is no point in passing multiples, but the API still
3461                  * needs to be extensible/future-proof. */
3462                 rd_kafka_admin_result_fail(rko_fanout,
3463                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3464                                            "Exactly one DeleteRecords must be "
3465                                            "passed");
3466                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3467                                                      rd_true/*destroy*/);
3468                 return;
3469         }
3470 
3471         offsets = del_records[0]->offsets;
3472 
3473         if (offsets == NULL || offsets->cnt == 0) {
3474                 rd_kafka_admin_result_fail(rko_fanout,
3475                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3476                                            "No records to delete");
3477                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3478                                                      rd_true/*destroy*/);
3479                 return;
3480         }
3481 
3482         /* Copy offsets list and store it on the request op */
3483         copied_offsets = rd_kafka_topic_partition_list_copy(offsets);
3484         if (rd_kafka_topic_partition_list_has_duplicates(
3485                     copied_offsets, rd_false/*check partition*/)) {
3486                 rd_kafka_topic_partition_list_destroy(copied_offsets);
3487                 rd_kafka_admin_result_fail(rko_fanout,
3488                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3489                                            "Duplicate partitions not allowed");
3490                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3491                                                      rd_true/*destroy*/);
3492                 return;
3493         }
3494 
3495         /* Set default error on each partition so that if any of the partitions
3496          * never get a request sent we have an error to indicate it. */
3497         rd_kafka_topic_partition_list_set_err(copied_offsets,
3498                                               RD_KAFKA_RESP_ERR__NOOP);
3499 
3500         rd_list_init(&rko_fanout->rko_u.admin_request.args, 1,
3501                      rd_kafka_topic_partition_list_destroy_free);
3502         rd_list_add(&rko_fanout->rko_u.admin_request.args, copied_offsets);
3503 
3504         /* Async query for partition leaders */
3505         rd_kafka_topic_partition_list_query_leaders_async(
3506                 rk, copied_offsets,
3507                 rd_kafka_admin_timeout_remains(rko_fanout),
3508                 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
3509                 rd_kafka_DeleteRecords_leaders_queried_cb,
3510                 rko_fanout);
3511 }
3512 
3513 
3514 /**
3515  * @brief Get the list of offsets from a DeleteRecords result.
3516  *
3517  * The returned \p offsets life-time is the same as the \p result object.
3518  */
3519 const rd_kafka_topic_partition_list_t *
3520 rd_kafka_DeleteRecords_result_offsets (
3521         const rd_kafka_DeleteRecords_result_t *result) {
3522         const rd_kafka_topic_partition_list_t *offsets;
3523         const rd_kafka_op_t *rko = (const rd_kafka_op_t *) result;
3524         size_t cnt;
3525 
3526         rd_kafka_op_type_t reqtype =
3527                 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
3528         rd_assert(reqtype == RD_KAFKA_OP_DELETERECORDS);
3529 
3530         cnt = rd_list_cnt(&rko->rko_u.admin_result.results);
3531 
3532         rd_assert(cnt == 1);
3533 
3534         offsets = (const rd_kafka_topic_partition_list_t *)
3535                 rd_list_elem(&rko->rko_u.admin_result.results, 0);
3536 
3537         rd_assert(offsets);
3538 
3539         return offsets;
3540 }
3541 
3542 /**@}*/
3543 
3544 /**
3545  * @name Delete groups
3546  * @{
3547  *
3548  *
3549  *
3550  *
3551  */
3552 
3553 rd_kafka_DeleteGroup_t *rd_kafka_DeleteGroup_new (const char *group) {
3554         size_t tsize = strlen(group) + 1;
3555         rd_kafka_DeleteGroup_t *del_group;
3556 
3557         /* Single allocation */
3558         del_group = rd_malloc(sizeof(*del_group) + tsize);
3559         del_group->group = del_group->data;
3560         memcpy(del_group->group, group, tsize);
3561 
3562         return del_group;
3563 }
3564 
3565 void rd_kafka_DeleteGroup_destroy (rd_kafka_DeleteGroup_t *del_group) {
3566         rd_free(del_group);
3567 }
3568 
3569 static void rd_kafka_DeleteGroup_free (void *ptr) {
3570         rd_kafka_DeleteGroup_destroy(ptr);
3571 }
3572 
3573 void rd_kafka_DeleteGroup_destroy_array (rd_kafka_DeleteGroup_t **del_groups,
3574                                          size_t del_group_cnt) {
3575         size_t i;
3576         for (i = 0 ; i < del_group_cnt ; i++)
3577                 rd_kafka_DeleteGroup_destroy(del_groups[i]);
3578 }
3579 
3580 /**
3581  * @brief Group name comparator for DeleteGroup_t
3582  */
3583 static int rd_kafka_DeleteGroup_cmp (const void *_a, const void *_b) {
3584         const rd_kafka_DeleteGroup_t *a = _a, *b = _b;
3585         return strcmp(a->group, b->group);
3586 }
3587 
3588 /**
3589  * @brief Allocate a new DeleteGroup and make a copy of \p src
3590  */
3591 static rd_kafka_DeleteGroup_t *
3592 rd_kafka_DeleteGroup_copy (const rd_kafka_DeleteGroup_t *src) {
3593         return rd_kafka_DeleteGroup_new(src->group);
3594 }
3595 
3596 
3597 /**
3598  * @brief Parse DeleteGroupsResponse and create ADMIN_RESULT op.
3599  */
3600 static rd_kafka_resp_err_t
3601 rd_kafka_DeleteGroupsResponse_parse (rd_kafka_op_t *rko_req,
3602                                      rd_kafka_op_t **rko_resultp,
3603                                      rd_kafka_buf_t *reply,
3604                                      char *errstr, size_t errstr_size) {
3605         const int log_decode_errors = LOG_ERR;
3606         int32_t group_cnt;
3607         int i;
3608         rd_kafka_op_t *rko_result = NULL;
3609 
3610         rd_kafka_buf_read_throttle_time(reply);
3611 
3612         /* #group_error_codes */
3613         rd_kafka_buf_read_i32(reply, &group_cnt);
3614 
3615         if (group_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
3616                 rd_kafka_buf_parse_fail(
3617                         reply,
3618                         "Received %"PRId32" groups in response "
3619                         "when only %d were requested", group_cnt,
3620                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
3621 
3622         rko_result = rd_kafka_admin_result_new(rko_req);
3623         rd_list_init(&rko_result->rko_u.admin_result.results,
3624                      group_cnt,
3625                      rd_kafka_group_result_free);
3626 
3627         for (i = 0 ; i < (int)group_cnt ; i++) {
3628                 rd_kafkap_str_t kgroup;
3629                 int16_t error_code;
3630                 rd_kafka_group_result_t *groupres;
3631 
3632                 rd_kafka_buf_read_str(reply, &kgroup);
3633                 rd_kafka_buf_read_i16(reply, &error_code);
3634 
3635                 groupres = rd_kafka_group_result_new(
3636                         kgroup.str,
3637                         RD_KAFKAP_STR_LEN(&kgroup),
3638                         NULL,
3639                         error_code ?
3640                         rd_kafka_error_new(error_code, NULL) : NULL);
3641 
3642                 rd_list_add(&rko_result->rko_u.admin_result.results, groupres);
3643         }
3644 
3645         *rko_resultp = rko_result;
3646         return RD_KAFKA_RESP_ERR_NO_ERROR;
3647 
3648 err_parse:
3649         if (rko_result)
3650                 rd_kafka_op_destroy(rko_result);
3651 
3652         rd_snprintf(errstr, errstr_size,
3653                     "DeleteGroups response protocol parse failure: %s",
3654                     rd_kafka_err2str(reply->rkbuf_err));
3655 
3656         return reply->rkbuf_err;
3657 }
3658 
3659 /** @brief Merge the DeleteGroups response from a single broker
3660  *         into the user response list.
3661  */
3662 void rd_kafka_DeleteGroups_response_merge (rd_kafka_op_t *rko_fanout,
3663                                            const rd_kafka_op_t *rko_partial) {
3664         const rd_kafka_group_result_t *groupres = NULL;
3665         rd_kafka_group_result_t *newgroupres;
3666         const rd_kafka_DeleteGroup_t *grp =
3667                 rko_partial->rko_u.admin_result.opaque;
3668         int orig_pos;
3669 
3670         rd_assert(rko_partial->rko_evtype ==
3671                   RD_KAFKA_EVENT_DELETEGROUPS_RESULT);
3672 
3673         if (!rko_partial->rko_err) {
3674                 /* Proper results.
3675                  * We only send one group per request, make sure it matches */
3676                 groupres = rd_list_elem(&rko_partial->rko_u.admin_result.
3677                                         results, 0);
3678                 rd_assert(groupres);
3679                 rd_assert(!strcmp(groupres->group, grp->group));
3680                 newgroupres = rd_kafka_group_result_copy(groupres);
3681         } else {
3682                 /* Op errored, e.g. timeout */
3683                 newgroupres = rd_kafka_group_result_new(
3684                         grp->group, -1, NULL,
3685                         rd_kafka_error_new(rko_partial->rko_err, NULL));
3686         }
3687 
3688         /* As a convenience to the application we insert group result
3689          * in the same order as they were requested. */
3690         orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args,
3691                                  grp, rd_kafka_DeleteGroup_cmp);
3692         rd_assert(orig_pos != -1);
3693 
3694         /* Make sure result is not already set */
3695         rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.
3696                                fanout.results, orig_pos) == NULL);
3697 
3698         rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results,
3699                     orig_pos, newgroupres);
3700 }
3701 
3702 void rd_kafka_DeleteGroups (rd_kafka_t *rk,
3703                             rd_kafka_DeleteGroup_t **del_groups,
3704                             size_t del_group_cnt,
3705                             const rd_kafka_AdminOptions_t *options,
3706                             rd_kafka_queue_t *rkqu) {
3707         rd_kafka_op_t *rko_fanout;
3708         rd_list_t dup_list;
3709         size_t i;
3710         static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = {
3711                 rd_kafka_DeleteGroups_response_merge,
3712                 rd_kafka_group_result_copy_opaque,
3713         };
3714 
3715         rd_assert(rkqu);
3716 
3717         rko_fanout = rd_kafka_admin_fanout_op_new(
3718                 rk,
3719                 RD_KAFKA_OP_DELETEGROUPS,
3720                 RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
3721                 &fanout_cbs, options, rkqu->rkqu_q);
3722 
3723         if (del_group_cnt == 0) {
3724                 rd_kafka_admin_result_fail(rko_fanout,
3725                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3726                                            "No groups to delete");
3727                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3728                                                      rd_true/*destroy*/);
3729                 return;
3730         }
3731 
3732         /* Copy group list and store it on the request op.
3733          * Maintain original ordering. */
3734         rd_list_init(&rko_fanout->rko_u.admin_request.args,
3735                      (int)del_group_cnt,
3736                      rd_kafka_DeleteGroup_free);
3737         for (i = 0; i < del_group_cnt; i++)
3738                 rd_list_add(&rko_fanout->rko_u.admin_request.args,
3739                             rd_kafka_DeleteGroup_copy(del_groups[i]));
3740 
3741         /* Check for duplicates.
3742          * Make a temporary copy of the group list and sort it to check for
3743          * duplicates, we don't want the original list sorted since we want
3744          * to maintain ordering. */
3745         rd_list_init(&dup_list,
3746                      rd_list_cnt(&rko_fanout->rko_u.admin_request.args),
3747                      NULL);
3748         rd_list_copy_to(&dup_list,
3749                         &rko_fanout->rko_u.admin_request.args,
3750                         NULL, NULL);
3751         rd_list_sort(&dup_list, rd_kafka_DeleteGroup_cmp);
3752         if (rd_list_find_duplicate(&dup_list, rd_kafka_DeleteGroup_cmp)) {
3753                 rd_list_destroy(&dup_list);
3754                 rd_kafka_admin_result_fail(rko_fanout,
3755                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3756                                            "Duplicate groups not allowed");
3757                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3758                                                      rd_true/*destroy*/);
3759                 return;
3760         }
3761 
3762         rd_list_destroy(&dup_list);
3763 
3764         /* Prepare results list where fanned out op's results will be
3765          * accumulated. */
3766         rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results,
3767                      (int)del_group_cnt,
3768                      rd_kafka_group_result_free);
3769         rko_fanout->rko_u.admin_request.fanout.outstanding = (int)del_group_cnt;
3770 
3771         /* Create individual request ops for each group.
3772          * FIXME: A future optimization is to coalesce all groups for a single
3773          *        coordinator into one op. */
3774         for (i = 0; i < del_group_cnt; i++) {
3775                 static const struct rd_kafka_admin_worker_cbs cbs = {
3776                         rd_kafka_DeleteGroupsRequest,
3777                         rd_kafka_DeleteGroupsResponse_parse,
3778                 };
3779                 rd_kafka_DeleteGroup_t *grp = rd_list_elem(
3780                         &rko_fanout->rko_u.admin_request.args, (int)i);
3781                 rd_kafka_op_t *rko =
3782                         rd_kafka_admin_request_op_new(
3783                                 rk,
3784                                 RD_KAFKA_OP_DELETEGROUPS,
3785                                 RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
3786                                 &cbs,
3787                                 options,
3788                                 rk->rk_ops);
3789 
3790                 rko->rko_u.admin_request.fanout_parent = rko_fanout;
3791                 rko->rko_u.admin_request.broker_id =
3792                         RD_KAFKA_ADMIN_TARGET_COORDINATOR;
3793                 rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP;
3794                 rko->rko_u.admin_request.coordkey = rd_strdup(grp->group);
3795 
3796                 /* Set the group name as the opaque so the fanout worker use it
3797                  * to fill in errors.
3798                  * References rko_fanout's memory, which will always outlive
3799                  * the fanned out op. */
3800                 rd_kafka_AdminOptions_set_opaque(
3801                         &rko->rko_u.admin_request.options, grp);
3802 
3803                 rd_list_init(&rko->rko_u.admin_request.args, 1,
3804                              rd_kafka_DeleteGroup_free);
3805                 rd_list_add(&rko->rko_u.admin_request.args,
3806                             rd_kafka_DeleteGroup_copy(del_groups[i]));
3807 
3808                 rd_kafka_q_enq(rk->rk_ops, rko);
3809         }
3810 }
3811 
3812 
3813 /**
3814  * @brief Get an array of group results from a DeleteGroups result.
3815  *
3816  * The returned \p groups life-time is the same as the \p result object.
3817  * @param cntp is updated to the number of elements in the array.
3818  */
3819 const rd_kafka_group_result_t **
3820 rd_kafka_DeleteGroups_result_groups (
3821         const rd_kafka_DeleteGroups_result_t *result,
3822         size_t *cntp) {
3823         return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result,
3824                                                 cntp);
3825 }
3826 
3827 
3828 /**@}*/
3829 
3830 
3831 /**
3832  * @name Delete consumer group offsets (committed offsets)
3833  * @{
3834  *
3835  *
3836  *
3837  *
3838  */
3839 
3840 rd_kafka_DeleteConsumerGroupOffsets_t *
3841 rd_kafka_DeleteConsumerGroupOffsets_new (const char *group,
3842                                          const rd_kafka_topic_partition_list_t
3843                                          *partitions) {
3844         size_t tsize = strlen(group) + 1;
3845         rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets;
3846 
3847         rd_assert(partitions);
3848 
3849         /* Single allocation */
3850         del_grpoffsets = rd_malloc(sizeof(*del_grpoffsets) + tsize);
3851         del_grpoffsets->group = del_grpoffsets->data;
3852         memcpy(del_grpoffsets->group, group, tsize);
3853         del_grpoffsets->partitions =
3854                 rd_kafka_topic_partition_list_copy(partitions);
3855 
3856         return del_grpoffsets;
3857 }
3858 
3859 void rd_kafka_DeleteConsumerGroupOffsets_destroy (
3860         rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets) {
3861         rd_kafka_topic_partition_list_destroy(del_grpoffsets->partitions);
3862         rd_free(del_grpoffsets);
3863 }
3864 
3865 static void rd_kafka_DeleteConsumerGroupOffsets_free (void *ptr) {
3866         rd_kafka_DeleteConsumerGroupOffsets_destroy(ptr);
3867 }
3868 
3869 void rd_kafka_DeleteConsumerGroupOffsets_destroy_array (
3870         rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets,
3871         size_t del_grpoffsets_cnt) {
3872         size_t i;
3873         for (i = 0 ; i < del_grpoffsets_cnt ; i++)
3874                 rd_kafka_DeleteConsumerGroupOffsets_destroy(del_grpoffsets[i]);
3875 }
3876 
3877 
3878 /**
3879  * @brief Allocate a new DeleteGroup and make a copy of \p src
3880  */
3881 static rd_kafka_DeleteConsumerGroupOffsets_t *
3882 rd_kafka_DeleteConsumerGroupOffsets_copy (
3883         const rd_kafka_DeleteConsumerGroupOffsets_t *src) {
3884         return rd_kafka_DeleteConsumerGroupOffsets_new(src->group,
3885                                                        src->partitions);
3886 }
3887 
3888 
3889 /**
3890  * @brief Parse OffsetDeleteResponse and create ADMIN_RESULT op.
3891  */
3892 static rd_kafka_resp_err_t
3893 rd_kafka_OffsetDeleteResponse_parse (rd_kafka_op_t *rko_req,
3894                                      rd_kafka_op_t **rko_resultp,
3895                                      rd_kafka_buf_t *reply,
3896                                      char *errstr, size_t errstr_size) {
3897         const int log_decode_errors = LOG_ERR;
3898         rd_kafka_op_t *rko_result;
3899         int16_t ErrorCode;
3900         rd_kafka_topic_partition_list_t *partitions = NULL;
3901         const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets =
3902                 rd_list_elem(&rko_req->rko_u.admin_request.args, 0);
3903 
3904         rd_kafka_buf_read_i16(reply, &ErrorCode);
3905         if (ErrorCode) {
3906                 rd_snprintf(errstr, errstr_size,
3907                             "OffsetDelete response error: %s",
3908                             rd_kafka_err2str(ErrorCode));
3909                 return ErrorCode;
3910         }
3911 
3912         rd_kafka_buf_read_throttle_time(reply);
3913 
3914         partitions = rd_kafka_buf_read_topic_partitions(reply,
3915                                                         16,
3916                                                         rd_false/*no offset */,
3917                                                         rd_true/*read error*/);
3918         if (!partitions) {
3919                 rd_snprintf(errstr, errstr_size,
3920                             "Failed to parse OffsetDeleteResponse partitions");
3921                 return RD_KAFKA_RESP_ERR__BAD_MSG;
3922         }
3923 
3924 
3925         /* Create result op and group_result_t */
3926         rko_result = rd_kafka_admin_result_new(rko_req);
3927         rd_list_init(&rko_result->rko_u.admin_result.results, 1,
3928                      rd_kafka_group_result_free);
3929         rd_list_add(&rko_result->rko_u.admin_result.results,
3930                     rd_kafka_group_result_new(del_grpoffsets->group, -1,
3931                                               partitions, NULL));
3932         rd_kafka_topic_partition_list_destroy(partitions);
3933 
3934         *rko_resultp = rko_result;
3935 
3936         return RD_KAFKA_RESP_ERR_NO_ERROR;
3937 
3938  err_parse:
3939         rd_snprintf(errstr, errstr_size,
3940                     "OffsetDelete response protocol parse failure: %s",
3941                     rd_kafka_err2str(reply->rkbuf_err));
3942         return reply->rkbuf_err;
3943 }
3944 
3945 
3946 void rd_kafka_DeleteConsumerGroupOffsets (
3947         rd_kafka_t *rk,
3948         rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets,
3949         size_t del_grpoffsets_cnt,
3950         const rd_kafka_AdminOptions_t *options,
3951         rd_kafka_queue_t *rkqu) {
3952         static const struct rd_kafka_admin_worker_cbs cbs = {
3953                 rd_kafka_OffsetDeleteRequest,
3954                 rd_kafka_OffsetDeleteResponse_parse,
3955         };
3956         rd_kafka_op_t *rko;
3957 
3958         rd_assert(rkqu);
3959 
3960         rko = rd_kafka_admin_request_op_new(
3961                 rk,
3962                 RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS,
3963                 RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT,
3964                 &cbs, options, rkqu->rkqu_q);
3965 
3966         if (del_grpoffsets_cnt != 1) {
3967                 /* For simplicity we only support one single group for now */
3968                 rd_kafka_admin_result_fail(rko,
3969                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3970                                            "Exactly one "
3971                                            "DeleteConsumerGroupOffsets must "
3972                                            "be passed");
3973                 rd_kafka_admin_common_worker_destroy(rk, rko,
3974                                                      rd_true/*destroy*/);
3975                 return;
3976         }
3977 
3978 
3979         rko->rko_u.admin_request.broker_id =
3980                 RD_KAFKA_ADMIN_TARGET_COORDINATOR;
3981         rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP;
3982         rko->rko_u.admin_request.coordkey =
3983                 rd_strdup(del_grpoffsets[0]->group);
3984 
3985         /* Store copy of group on request so the group name can be reached
3986          * from the response parser. */
3987         rd_list_init(&rko->rko_u.admin_request.args, 1,
3988                      rd_kafka_DeleteConsumerGroupOffsets_free);
3989         rd_list_add(&rko->rko_u.admin_request.args,
3990                     rd_kafka_DeleteConsumerGroupOffsets_copy(
3991                             del_grpoffsets[0]));
3992 
3993         rd_kafka_q_enq(rk->rk_ops, rko);
3994 }
3995 
3996 
3997 /**
3998  * @brief Get an array of group results from a DeleteGroups result.
3999  *
4000  * The returned \p groups life-time is the same as the \p result object.
4001  * @param cntp is updated to the number of elements in the array.
4002  */
4003 const rd_kafka_group_result_t **
4004 rd_kafka_DeleteConsumerGroupOffsets_result_groups (
4005         const rd_kafka_DeleteConsumerGroupOffsets_result_t *result,
4006         size_t *cntp) {
4007         return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result,
4008                                                 cntp);
4009 }
4010 
4011 RD_EXPORT
4012 void rd_kafka_DeleteConsumerGroupOffsets (
4013         rd_kafka_t *rk,
4014         rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets,
4015         size_t del_grpoffsets_cnt,
4016         const rd_kafka_AdminOptions_t *options,
4017         rd_kafka_queue_t *rkqu);
4018 
4019 /**@}*/
4020