1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2018 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "rdkafka_int.h"
30 #include "rdkafka_admin.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_aux.h"
33 
34 #include <stdarg.h>
35 
36 
37 
38 /** @brief Descriptive strings for rko_u.admin_request.state */
39 static const char *rd_kafka_admin_state_desc[] = {
40         "initializing",
41         "waiting for broker",
42         "waiting for controller",
43         "waiting for fanouts",
44         "constructing request",
45         "waiting for response from broker",
46 };
47 
48 
49 
50 /**
51  * @brief Admin API implementation.
52  *
53  * The public Admin API in librdkafka exposes a completely asynchronous
54  * interface where the initial request API (e.g., ..CreateTopics())
55  * is non-blocking and returns immediately, and the application polls
56  * a ..queue_t for the result.
57  *
58  * The underlying handling of the request is also completely asynchronous
59  * inside librdkafka, for two reasons:
60  *  - everything is async in librdkafka so adding something new that isn't
61  *    would mean that existing functionality will need to be changed if
62  *    it should be able to work simultaneously (such as statistics, timers,
63  *    etc). There is no functional value to making the admin API
64  *    synchronous internally, even if it would simplify its implementation.
65  *    So making it async allows the Admin API to be used with existing
66  *    client types in existing applications without breakage.
67  *  - the async approach allows multiple outstanding Admin API requests
68  *    simultaneously.
69  *
70  * The internal async implementation relies on the following concepts:
71  *  - it uses a single rko (rd_kafka_op_t) to maintain state.
72  *  - the rko has a callback attached - called the worker callback.
73  *  - the worker callback is a small state machine that triggers
74  *    async operations (be it controller lookups, timeout timers,
75  *    protocol transmits, etc).
76  *  - the worker callback is only called on the rdkafka main thread.
77  *  - the callback is triggered by different events and sources by enqueuing
78  *    the rko on the rdkafka main ops queue.
79  *
80  *
81  * Let's illustrate this with a DeleteTopics example. This might look
82  * daunting, but it boils down to an asynchronous state machine being
83  * triggered by enqueuing the rko op.
84  *
85  *  1. [app thread] The user constructs the input arguments,
86  *     including a response rkqu queue and then calls DeleteTopics().
87  *
88  *  2. [app thread] DeleteTopics() creates a new internal op (rko) of type
89  *     RD_KAFKA_OP_DELETETOPICS, makes a **copy** on the rko of all the
90  *     input arguments (which allows the caller to free the originals
91  *     whenever she likes). The rko op worker callback is set to the
92  *     generic admin worker callback rd_kafka_admin_worker()
93  *
94  *  3. [app thread] DeleteTopics() enqueues the rko on librdkafka's main ops
95  *     queue that is served by the rdkafka main thread in rd_kafka_thread_main()
96  *
97  *  4. [rdkafka main thread] The rko is dequeued by rd_kafka_q_serve and
98  *     the rd_kafka_poll_cb() is called.
99  *
100  *  5. [rdkafka main thread] The rko_type switch case identifies the rko
101  *     as an RD_KAFKA_OP_DELETETOPICS which is served by the op callback
102  *     set in step 2.
103  *
104  *  6. [rdkafka main thread] The worker callback is called.
105  *     After some initial checking of err==ERR__DESTROY events
106  *     (which is used to clean up outstanding ops (etc) on termination),
107  *     the code hits a state machine using rko_u.admin.request_state.
108  *
109  *  7. [rdkafka main thread] The initial state is RD_KAFKA_ADMIN_STATE_INIT
110  *     where the worker validates the user input.
111  *     An enqueue once (eonce) object is created - the use of this object
112  *     allows having multiple outstanding async functions referencing the
113  *     same underlying rko object, but only allowing the first one
114  *     to trigger an event.
115  *     A timeout timer is set up to trigger the eonce object when the
116  *     full options.request_timeout has elapsed.
117  *
118  *  8. [rdkafka main thread] After initialization the state is updated
119  *     to WAIT_BROKER or WAIT_CONTROLLER and the code falls through to
120  *     looking up a specific broker or the controller broker and waiting for
121  *     an active connection.
122  *     Both the lookup and the waiting for an active connection are
123  *     fully asynchronous, and the same eonce used for the timer is passed
124  *     to the rd_kafka_broker_controller_async() or broker_async() functions
125  *     which will trigger the eonce when a broker state change occurs.
126  *     If the controller is already known (from metadata) and the connection
127  *     is up a rkb broker object is returned and the eonce is not used,
128  *     skip to step 11.
129  *
130  *  9. [rdkafka main thread] Upon metadata retrieval (which is triggered
131  *     automatically by other parts of the code) the controller_id may be
132  *     updated in which case the eonce is triggered.
133  *     The eonce triggering enqueues the original rko on the rdkafka main
134  *     ops queue again and we go to step 8 which will check if the controller
135  *     connection is up.
136  *
137  * 10. [broker thread] If the controller_id is now known we wait for
138  *     the corresponding broker's connection to come up. This signaling
139  *     is performed from the broker thread upon broker state changes
140  *     and uses the same eonce. The eonce triggering enqueues the original
141  *     rko on the rdkafka main ops queue again we go to back to step 8
142  *     to check if broker is now available.
143  *
144  * 11. [rdkafka main thread] Back in the worker callback we now have an
145  *     rkb broker pointer (with reference count increased) for the controller
146  *     with the connection up (it might go down while we're referencing it,
147  *     but that does not stop us from enqueuing a protocol request).
148  *
149  * 12. [rdkafka main thread] A DeleteTopics protocol request buffer is
150  *     constructed using the input parameters saved on the rko and the
151  *     buffer is enqueued on the broker's transmit queue.
152  *     The buffer is set up to provide the reply buffer on the rdkafka main
153  *     ops queue (the same queue we are operating from) with a handler
154  *     callback of rd_kafka_admin_handle_response().
155  *     The state is updated to the RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE.
156  *
157  * 13. [broker thread] If the request times out, a response with error code
158  *     (ERR__TIMED_OUT) is enqueued. Go to 16.
159  *
160  * 14. [broker thread] If a response is received, the response buffer
161  *     is enqueued. Go to 16.
162  *
163  * 15. [rdkafka main thread] The buffer callback (..handle_response())
164  *     is called, which attempts to extract the original rko from the eonce,
165  *     but if the eonce has already been triggered by some other source
166  *     (the timeout timer) the buffer callback simply returns and does nothing
167  *     since the admin request is over and a result (probably a timeout)
168  *     has been enqueued for the application.
169  *     If the rko was still intact we temporarily set the reply buffer
170  *     in the rko struct and call the worker callback. Go to 17.
171  *
172  * 16. [rdkafka main thread] The worker callback is called in state
173  *     RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE without a response but with an error.
174  *     An error result op is created and enqueued on the application's
175  *     provided response rkqu queue.
176  *
177  * 17. [rdkafka main thread] The worker callback is called in state
178  *     RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE with a response buffer with no
179  *     error set.
180  *     The worker calls the response `parse()` callback to parse the response
181  *     buffer and populates a result op (rko_result) with the response
182  *     information (such as per-topic error codes, etc).
183  *     The result op is returned to the worker.
184  *
185  * 18. [rdkafka main thread] The worker enqueues the result op (rko_result)
186  *     on the application's provided response rkqu queue.
187  *
188  * 19. [app thread] The application calls rd_kafka_queue_poll() to
189  *     receive the result of the operation. The result may have been
190  *     enqueued in step 18 thanks to succesful completion, or in any
191  *     of the earlier stages when an error was encountered.
192  *
193  * 20. [app thread] The application uses rd_kafka_event_DeleteTopics_result()
194  *     to retrieve the request-specific result type.
195  *
196  * 21. Done.
197  *
198  *
199  *
200  *
201  * Fanout (RD_KAFKA_OP_ADMIN_FANOUT) requests
202  * ------------------------------------------
203  *
204  * Certain Admin APIs may have requests that need to be sent to different
205  * brokers, for instance DeleteRecords which needs to be sent to the leader
206  * for each given partition.
207  *
208  * To achieve this we create a Fanout (RD_KAFKA_OP_ADMIN_FANOUT) op for the
209  * overall Admin API call (e.g., DeleteRecords), and then sub-ops for each
210  * of the per-broker requests. These sub-ops have the proper op type for
211  * the operation they are performing (e.g., RD_KAFKA_OP_DELETERECORDS)
212  * but their replyq does not point back to the application replyq but
213  * rk_ops which is handled by the librdkafka main thread and with the op
214  * callback set to rd_kafka_admin_fanout_worker(). This worker aggregates
215  * the results of each fanned out sub-op and merges the result into a
216  * single result op (RD_KAFKA_OP_ADMIN_RESULT) that is enqueued on the
217  * application's replyq.
218  *
219  * We rely on the timeouts on the fanned out sub-ops rather than the parent
220  * fanout op.
221  *
222  * The parent fanout op must not be destroyed until all fanned out sub-ops
223  * are done (either by success, failure or timeout) and destroyed, and this
224  * is tracked by the rko_u.admin_request.fanout.outstanding counter.
225  *
226  */
227 
228 
229 /**
230  * @enum Admin request target broker. Must be negative values since the field
231  *       used is broker_id.
232  */
233 enum {
234         RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1,  /**< Cluster controller */
235         RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */
236         RD_KAFKA_ADMIN_TARGET_FANOUT = -3,      /**< This rko is a fanout and
237                                                  *   and has no target broker */
238 };
239 
240 /**
241  * @brief Admin op callback types
242  */
243 typedef rd_kafka_resp_err_t (rd_kafka_admin_Request_cb_t) (
244         rd_kafka_broker_t *rkb,
245         const rd_list_t *configs /*(ConfigResource_t*)*/,
246         rd_kafka_AdminOptions_t *options,
247         char *errstr, size_t errstr_size,
248         rd_kafka_replyq_t replyq,
249         rd_kafka_resp_cb_t *resp_cb,
250         void *opaque)
251         RD_WARN_UNUSED_RESULT;
252 
253 typedef rd_kafka_resp_err_t (rd_kafka_admin_Response_parse_cb_t) (
254         rd_kafka_op_t *rko_req,
255         rd_kafka_op_t **rko_resultp,
256         rd_kafka_buf_t *reply,
257         char *errstr, size_t errstr_size)
258         RD_WARN_UNUSED_RESULT;
259 
260 typedef void (rd_kafka_admin_fanout_PartialResponse_cb_t) (
261         rd_kafka_op_t *rko_req,
262         const rd_kafka_op_t *rko_partial);
263 
264 typedef rd_list_copy_cb_t rd_kafka_admin_fanout_CopyResult_cb_t;
265 
266 /**
267  * @struct Request-specific worker callbacks.
268  */
269 struct rd_kafka_admin_worker_cbs {
270         /**< Protocol request callback which is called
271          *   to construct and send the request. */
272         rd_kafka_admin_Request_cb_t *request;
273 
274         /**< Protocol response parser callback which is called
275          *   to translate the response to a rko_result op. */
276         rd_kafka_admin_Response_parse_cb_t *parse;
277 };
278 
279 /**
280  * @struct Fanout request callbacks.
281  */
282 struct rd_kafka_admin_fanout_worker_cbs {
283         /** Merge results from a fanned out request into the user response. */
284         rd_kafka_admin_fanout_PartialResponse_cb_t *partial_response;
285 
286         /** Copy an accumulated result for storing into the rko_result. */
287         rd_kafka_admin_fanout_CopyResult_cb_t *copy_result;
288 };
289 
290 /* Forward declarations */
291 static void rd_kafka_admin_common_worker_destroy (rd_kafka_t *rk,
292                                                   rd_kafka_op_t *rko,
293                                                   rd_bool_t do_destroy);
294 static void rd_kafka_AdminOptions_init (rd_kafka_t *rk,
295                                         rd_kafka_AdminOptions_t *options);
296 static rd_kafka_op_res_t
297 rd_kafka_admin_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko);
298 static rd_kafka_ConfigEntry_t *
299 rd_kafka_ConfigEntry_copy (const rd_kafka_ConfigEntry_t *src);
300 static void rd_kafka_ConfigEntry_free (void *ptr);
301 static void *rd_kafka_ConfigEntry_list_copy (const void *src, void *opaque);
302 
303 static void rd_kafka_admin_handle_response (rd_kafka_t *rk,
304                                             rd_kafka_broker_t *rkb,
305                                             rd_kafka_resp_err_t err,
306                                             rd_kafka_buf_t *reply,
307                                             rd_kafka_buf_t *request,
308                                             void *opaque);
309 
310 static rd_kafka_op_res_t
311 rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq,
312                               rd_kafka_op_t *rko_fanout);
313 
314 
315 /**
316  * @name Common admin request code
317  * @{
318  *
319  *
320  */
321 
322 /**
323  * @brief Create a new admin_result op based on the request op \p rko_req.
324  *
325  * @remark This moves the rko_req's admin_request.args list from \p rko_req
326  *         to the returned rko. The \p rko_req args will be emptied.
327  */
rd_kafka_admin_result_new(rd_kafka_op_t * rko_req)328 static rd_kafka_op_t *rd_kafka_admin_result_new (rd_kafka_op_t *rko_req) {
329         rd_kafka_op_t *rko_result;
330         rd_kafka_op_t *rko_fanout;
331 
332         if ((rko_fanout = rko_req->rko_u.admin_request.fanout_parent)) {
333                 /* If this is a fanned out request the rko_result needs to be
334                  * handled by the fanout worker rather than the application. */
335                 rko_result = rd_kafka_op_new_cb(
336                         rko_req->rko_rk,
337                         RD_KAFKA_OP_ADMIN_RESULT,
338                         rd_kafka_admin_fanout_worker);
339                 /* Transfer fanout pointer to result */
340                 rko_result->rko_u.admin_result.fanout_parent = rko_fanout;
341                 rko_req->rko_u.admin_request.fanout_parent = NULL;
342                 /* Set event type based on original fanout ops reqtype,
343                  * e.g., ..OP_DELETERECORDS */
344                 rko_result->rko_u.admin_result.reqtype =
345                         rko_fanout->rko_u.admin_request.fanout.reqtype;
346 
347         } else {
348                 rko_result = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_RESULT);
349 
350                 /* If this is fanout request (i.e., the parent OP_ADMIN_FANOUT
351                  * to fanned out requests) we need to use the original
352                  * application request type. */
353                 if (rko_req->rko_type == RD_KAFKA_OP_ADMIN_FANOUT)
354                         rko_result->rko_u.admin_result.reqtype =
355                                 rko_req->rko_u.admin_request.fanout.reqtype;
356                 else
357                         rko_result->rko_u.admin_result.reqtype =
358                                 rko_req->rko_type;
359         }
360 
361         rko_result->rko_rk = rko_req->rko_rk;
362 
363         rko_result->rko_u.admin_result.opaque =
364                 rd_kafka_confval_get_ptr(&rko_req->rko_u.admin_request.
365                                          options.opaque);
366 
367         /* Move request arguments (list) from request to result.
368          * This is mainly so that partial_response() knows what arguments
369          * were provided to the response's request it is merging. */
370         rd_list_move(&rko_result->rko_u.admin_result.args,
371                      &rko_req->rko_u.admin_request.args);
372 
373         rko_result->rko_evtype = rko_req->rko_u.admin_request.reply_event_type;
374 
375         return rko_result;
376 }
377 
378 
379 /**
380  * @brief Set error code and error string on admin_result op \p rko.
381  */
rd_kafka_admin_result_set_err0(rd_kafka_op_t * rko,rd_kafka_resp_err_t err,const char * fmt,va_list ap)382 static void rd_kafka_admin_result_set_err0 (rd_kafka_op_t *rko,
383                                             rd_kafka_resp_err_t err,
384                                             const char *fmt, va_list ap) {
385         char buf[512];
386 
387         rd_vsnprintf(buf, sizeof(buf), fmt, ap);
388 
389         rko->rko_err = err;
390 
391         if (rko->rko_u.admin_result.errstr)
392                 rd_free(rko->rko_u.admin_result.errstr);
393         rko->rko_u.admin_result.errstr = rd_strdup(buf);
394 
395         rd_kafka_dbg(rko->rko_rk, ADMIN, "ADMINFAIL",
396                      "Admin %s result error: %s",
397                      rd_kafka_op2str(rko->rko_u.admin_result.reqtype),
398                      rko->rko_u.admin_result.errstr);
399 }
400 
401 /**
402  * @sa rd_kafka_admin_result_set_err0
403  */
404 static RD_UNUSED RD_FORMAT(printf, 3, 4)
rd_kafka_admin_result_set_err(rd_kafka_op_t * rko,rd_kafka_resp_err_t err,const char * fmt,...)405         void rd_kafka_admin_result_set_err (rd_kafka_op_t *rko,
406                                             rd_kafka_resp_err_t err,
407                                             const char *fmt, ...) {
408         va_list ap;
409 
410         va_start(ap, fmt);
411         rd_kafka_admin_result_set_err0(rko, err, fmt, ap);
412         va_end(ap);
413 }
414 
415 /**
416  * @brief Enqueue admin_result on application's queue.
417  */
418 static RD_INLINE
rd_kafka_admin_result_enq(rd_kafka_op_t * rko_req,rd_kafka_op_t * rko_result)419 void rd_kafka_admin_result_enq (rd_kafka_op_t *rko_req,
420                                 rd_kafka_op_t *rko_result) {
421         rd_kafka_replyq_enq(&rko_req->rko_u.admin_request.replyq,
422                             rko_result,
423                             rko_req->rko_u.admin_request.replyq.version);
424 }
425 
426 /**
427  * @brief Set request-level error code and string in reply op.
428  *
429  * @remark This function will NOT destroy the \p rko_req, so don't forget to
430  *         call rd_kafka_admin_common_worker_destroy() when done with the rko.
431  */
432 static RD_FORMAT(printf, 3, 4)
rd_kafka_admin_result_fail(rd_kafka_op_t * rko_req,rd_kafka_resp_err_t err,const char * fmt,...)433         void rd_kafka_admin_result_fail (rd_kafka_op_t *rko_req,
434                                          rd_kafka_resp_err_t err,
435                                          const char *fmt, ...) {
436         va_list ap;
437         rd_kafka_op_t *rko_result;
438 
439         if (!rko_req->rko_u.admin_request.replyq.q)
440                 return;
441 
442         rko_result = rd_kafka_admin_result_new(rko_req);
443 
444         va_start(ap, fmt);
445         rd_kafka_admin_result_set_err0(rko_result, err, fmt, ap);
446         va_end(ap);
447 
448         rd_kafka_admin_result_enq(rko_req, rko_result);
449 }
450 
451 
452 /**
453  * @brief Send the admin request contained in \p rko upon receiving
454  *        a FindCoordinator response.
455  *
456  * @param opaque Must be an admin request op's eonce (rko_u.admin_request.eonce)
457  *               (i.e. created by \c rd_kafka_admin_request_op_new )
458  *
459  * @remark To be used as a callback for \c rd_kafka_coord_req
460  */
461 static rd_kafka_resp_err_t
rd_kafka_admin_coord_request(rd_kafka_broker_t * rkb,rd_kafka_op_t * rko_ignore,rd_kafka_replyq_t replyq,rd_kafka_resp_cb_t * resp_cb,void * opaque)462 rd_kafka_admin_coord_request (rd_kafka_broker_t *rkb,
463                               rd_kafka_op_t *rko_ignore,
464                               rd_kafka_replyq_t replyq,
465                               rd_kafka_resp_cb_t *resp_cb,
466                               void *opaque) {
467         rd_kafka_t *rk = rkb->rkb_rk;
468         rd_kafka_enq_once_t *eonce = opaque;
469         rd_kafka_op_t *rko;
470         char errstr[512];
471         rd_kafka_resp_err_t err;
472 
473 
474         rko = rd_kafka_enq_once_del_source_return(eonce, "coordinator request");
475         if (!rko)
476                 /* Admin request has timed out and been destroyed */
477                 return RD_KAFKA_RESP_ERR__DESTROY;
478 
479         rd_kafka_enq_once_add_source(eonce, "coordinator response");
480 
481         err = rko->rko_u.admin_request.cbs->request(
482                 rkb,
483                 &rko->rko_u.admin_request.args,
484                 &rko->rko_u.admin_request.options,
485                 errstr, sizeof(errstr),
486                 replyq,
487                 rd_kafka_admin_handle_response,
488                 eonce);
489         if (err) {
490                 rd_kafka_enq_once_del_source(eonce, "coordinator response");
491                 rd_kafka_admin_result_fail(
492                         rko, err,
493                         "%s worker failed to send request: %s",
494                         rd_kafka_op2str(rko->rko_type), errstr);
495                 rd_kafka_admin_common_worker_destroy(rk, rko,
496                                                      rd_true/*destroy*/);
497         }
498         return err;
499 }
500 
501 
502 /**
503  * @brief Return the topics list from a topic-related result object.
504  */
505 static const rd_kafka_topic_result_t **
rd_kafka_admin_result_ret_topics(const rd_kafka_op_t * rko,size_t * cntp)506 rd_kafka_admin_result_ret_topics (const rd_kafka_op_t *rko,
507                                   size_t *cntp) {
508         rd_kafka_op_type_t reqtype =
509                 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
510         rd_assert(reqtype == RD_KAFKA_OP_CREATETOPICS ||
511                   reqtype == RD_KAFKA_OP_DELETETOPICS ||
512                   reqtype == RD_KAFKA_OP_CREATEPARTITIONS);
513 
514         *cntp = rd_list_cnt(&rko->rko_u.admin_result.results);
515         return (const rd_kafka_topic_result_t **)rko->rko_u.admin_result.
516                 results.rl_elems;
517 }
518 
519 /**
520  * @brief Return the ConfigResource list from a config-related result object.
521  */
522 static const rd_kafka_ConfigResource_t **
rd_kafka_admin_result_ret_resources(const rd_kafka_op_t * rko,size_t * cntp)523 rd_kafka_admin_result_ret_resources (const rd_kafka_op_t *rko,
524                                      size_t *cntp) {
525         rd_kafka_op_type_t reqtype =
526                 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
527         rd_assert(reqtype == RD_KAFKA_OP_ALTERCONFIGS ||
528                   reqtype == RD_KAFKA_OP_DESCRIBECONFIGS);
529 
530         *cntp = rd_list_cnt(&rko->rko_u.admin_result.results);
531         return (const rd_kafka_ConfigResource_t **)rko->rko_u.admin_result.
532                 results.rl_elems;
533 }
534 
535 
536 /**
537  * @brief Return the groups list from a group-related result object.
538  */
539 static const rd_kafka_group_result_t **
rd_kafka_admin_result_ret_groups(const rd_kafka_op_t * rko,size_t * cntp)540 rd_kafka_admin_result_ret_groups (const rd_kafka_op_t *rko,
541                                   size_t *cntp) {
542         rd_kafka_op_type_t reqtype =
543                 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
544         rd_assert(reqtype == RD_KAFKA_OP_DELETEGROUPS ||
545                   reqtype == RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS);
546 
547         *cntp = rd_list_cnt(&rko->rko_u.admin_result.results);
548         return (const rd_kafka_group_result_t **)rko->rko_u.admin_result.
549                 results.rl_elems;
550 }
551 
552 /**
553  * @brief Create a new admin_request op of type \p optype and sets up the
554  *        generic (type independent files).
555  *
556  *        The caller shall then populate the admin_request.args list
557  *        and enqueue the op on rk_ops for further processing work.
558  *
559  * @param cbs Callbacks, must reside in .data segment.
560  * @param options Optional options, may be NULL to use defaults.
561  *
562  * @locks none
563  * @locality application thread
564  */
565 static rd_kafka_op_t *
rd_kafka_admin_request_op_new(rd_kafka_t * rk,rd_kafka_op_type_t optype,rd_kafka_event_type_t reply_event_type,const struct rd_kafka_admin_worker_cbs * cbs,const rd_kafka_AdminOptions_t * options,rd_kafka_q_t * rkq)566 rd_kafka_admin_request_op_new (rd_kafka_t *rk,
567                                rd_kafka_op_type_t optype,
568                                rd_kafka_event_type_t reply_event_type,
569                                const struct rd_kafka_admin_worker_cbs *cbs,
570                                const rd_kafka_AdminOptions_t *options,
571                                rd_kafka_q_t *rkq) {
572         rd_kafka_op_t *rko;
573 
574         rd_assert(rk);
575         rd_assert(rkq);
576         rd_assert(cbs);
577 
578         rko = rd_kafka_op_new_cb(rk, optype, rd_kafka_admin_worker);
579 
580         rko->rko_u.admin_request.reply_event_type = reply_event_type;
581 
582         rko->rko_u.admin_request.cbs = (struct rd_kafka_admin_worker_cbs *)cbs;
583 
584         /* Make a copy of the options */
585         if (options)
586                 rko->rko_u.admin_request.options = *options;
587         else
588                 rd_kafka_AdminOptions_init(rk,
589                                            &rko->rko_u.admin_request.options);
590 
591         /* Default to controller */
592         rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER;
593 
594         /* Calculate absolute timeout */
595         rko->rko_u.admin_request.abs_timeout =
596                 rd_timeout_init(
597                         rd_kafka_confval_get_int(&rko->rko_u.admin_request.
598                                                  options.request_timeout));
599 
600         /* Setup enq-op-once, which is triggered by either timer code
601          * or future wait-controller code. */
602         rko->rko_u.admin_request.eonce =
603                 rd_kafka_enq_once_new(rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
604 
605         /* The timer itself must be started from the rdkafka main thread,
606          * not here. */
607 
608         /* Set up replyq */
609         rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0);
610 
611         rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_INIT;
612         return rko;
613 }
614 
615 
616 /**
617  * @returns the remaining request timeout in milliseconds.
618  */
rd_kafka_admin_timeout_remains(rd_kafka_op_t * rko)619 static RD_INLINE int rd_kafka_admin_timeout_remains (rd_kafka_op_t *rko) {
620         return rd_timeout_remains(rko->rko_u.admin_request.abs_timeout);
621 }
622 
623 /**
624  * @returns the remaining request timeout in microseconds.
625  */
626 static RD_INLINE rd_ts_t
rd_kafka_admin_timeout_remains_us(rd_kafka_op_t * rko)627 rd_kafka_admin_timeout_remains_us (rd_kafka_op_t *rko) {
628         return rd_timeout_remains_us(rko->rko_u.admin_request.abs_timeout);
629 }
630 
631 
632 /**
633  * @brief Timer timeout callback for the admin rko's eonce object.
634  */
rd_kafka_admin_eonce_timeout_cb(rd_kafka_timers_t * rkts,void * arg)635 static void rd_kafka_admin_eonce_timeout_cb (rd_kafka_timers_t *rkts,
636                                              void *arg) {
637         rd_kafka_enq_once_t *eonce = arg;
638 
639         rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT,
640                                   "timeout timer");
641 }
642 
643 
644 
645 /**
646  * @brief Common worker destroy to be called in destroy: label
647  *        in worker.
648  */
rd_kafka_admin_common_worker_destroy(rd_kafka_t * rk,rd_kafka_op_t * rko,rd_bool_t do_destroy)649 static void rd_kafka_admin_common_worker_destroy (rd_kafka_t *rk,
650                                                   rd_kafka_op_t *rko,
651                                                   rd_bool_t do_destroy) {
652         int timer_was_stopped;
653 
654         /* Free resources for this op. */
655         timer_was_stopped =
656                 rd_kafka_timer_stop(&rk->rk_timers,
657                                     &rko->rko_u.admin_request.tmr, rd_true);
658 
659 
660         if (rko->rko_u.admin_request.eonce) {
661                 /* Remove the stopped timer's eonce reference since its
662                  * callback will not have fired if we stopped the timer. */
663                 if (timer_was_stopped)
664                         rd_kafka_enq_once_del_source(rko->rko_u.admin_request.
665                                                      eonce, "timeout timer");
666 
667                 /* This is thread-safe to do even if there are outstanding
668                  * timers or wait-controller references to the eonce
669                  * since they only hold direct reference to the eonce,
670                  * not the rko (the eonce holds a reference to the rko but
671                  * it is cleared here). */
672                 rd_kafka_enq_once_destroy(rko->rko_u.admin_request.eonce);
673                 rko->rko_u.admin_request.eonce = NULL;
674         }
675 
676         if (do_destroy)
677                 rd_kafka_op_destroy(rko);
678 }
679 
680 
681 
682 /**
683  * @brief Asynchronously look up a broker.
684  *        To be called repeatedly from each invocation of the worker
685  *        when in state RD_KAFKA_ADMIN_STATE_WAIT_BROKER until
686  *        a valid rkb is returned.
687  *
688  * @returns the broker rkb with refcount increased, or NULL if not yet
689  *          available.
690  */
691 static rd_kafka_broker_t *
rd_kafka_admin_common_get_broker(rd_kafka_t * rk,rd_kafka_op_t * rko,int32_t broker_id)692 rd_kafka_admin_common_get_broker (rd_kafka_t *rk,
693                                   rd_kafka_op_t *rko,
694                                   int32_t broker_id) {
695         rd_kafka_broker_t *rkb;
696 
697         rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up broker %"PRId32,
698                      rd_kafka_op2str(rko->rko_type), broker_id);
699 
700         /* Since we're iterating over this broker_async() call
701          * (asynchronously) until a broker is availabe (or timeout)
702          * we need to re-enable the eonce to be triggered again (which
703          * is not necessary the first time we get here, but there
704          * is no harm doing it then either). */
705         rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce,
706                                    rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
707 
708         /* Look up the broker asynchronously, if the broker
709          * is not available the eonce is registered for broker
710          * state changes which will cause our function to be called
711          * again as soon as (any) broker state changes.
712          * When we are called again we perform the broker lookup
713          * again and hopefully get an rkb back, otherwise defer a new
714          * async wait. Repeat until success or timeout. */
715         if (!(rkb = rd_kafka_broker_get_async(
716                       rk, broker_id, RD_KAFKA_BROKER_STATE_UP,
717                       rko->rko_u.admin_request.eonce))) {
718                 /* Broker not available, wait asynchronously
719                  * for broker metadata code to trigger eonce. */
720                 return NULL;
721         }
722 
723         rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: broker %"PRId32" is %s",
724                      rd_kafka_op2str(rko->rko_type), broker_id, rkb->rkb_name);
725 
726         return rkb;
727 }
728 
729 
730 /**
731  * @brief Asynchronously look up the controller.
732  *        To be called repeatedly from each invocation of the worker
733  *        when in state RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER until
734  *        a valid rkb is returned.
735  *
736  * @returns the controller rkb with refcount increased, or NULL if not yet
737  *          available.
738  */
739 static rd_kafka_broker_t *
rd_kafka_admin_common_get_controller(rd_kafka_t * rk,rd_kafka_op_t * rko)740 rd_kafka_admin_common_get_controller (rd_kafka_t *rk,
741                                       rd_kafka_op_t *rko) {
742         rd_kafka_broker_t *rkb;
743 
744         rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up controller",
745                      rd_kafka_op2str(rko->rko_type));
746 
747         /* Since we're iterating over this controller_async() call
748          * (asynchronously) until a controller is availabe (or timeout)
749          * we need to re-enable the eonce to be triggered again (which
750          * is not necessary the first time we get here, but there
751          * is no harm doing it then either). */
752         rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce,
753                                    rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
754 
755         /* Look up the controller asynchronously, if the controller
756          * is not available the eonce is registered for broker
757          * state changes which will cause our function to be called
758          * again as soon as (any) broker state changes.
759          * When we are called again we perform the controller lookup
760          * again and hopefully get an rkb back, otherwise defer a new
761          * async wait. Repeat until success or timeout. */
762         if (!(rkb = rd_kafka_broker_controller_async(
763                       rk, RD_KAFKA_BROKER_STATE_UP,
764                       rko->rko_u.admin_request.eonce))) {
765                 /* Controller not available, wait asynchronously
766                  * for controller code to trigger eonce. */
767                 return NULL;
768         }
769 
770         rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: controller %s",
771                      rd_kafka_op2str(rko->rko_type), rkb->rkb_name);
772 
773         return rkb;
774 }
775 
776 
777 
778 /**
779  * @brief Handle response from broker by triggering worker callback.
780  *
781  * @param opaque is the eonce from the worker protocol request call.
782  */
rd_kafka_admin_handle_response(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * reply,rd_kafka_buf_t * request,void * opaque)783 static void rd_kafka_admin_handle_response (rd_kafka_t *rk,
784                                             rd_kafka_broker_t *rkb,
785                                             rd_kafka_resp_err_t err,
786                                             rd_kafka_buf_t *reply,
787                                             rd_kafka_buf_t *request,
788                                             void *opaque) {
789         rd_kafka_enq_once_t *eonce = opaque;
790         rd_kafka_op_t *rko;
791 
792         /* From ...add_source("send") */
793         rko = rd_kafka_enq_once_disable(eonce);
794 
795         if (!rko) {
796                 /* The operation timed out and the worker was
797                  * dismantled while we were waiting for broker response,
798                  * do nothing - everything has been cleaned up. */
799                 rd_kafka_dbg(rk, ADMIN, "ADMIN",
800                              "Dropping outdated %sResponse with return code %s",
801                              request ?
802                              rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey):
803                              "???",
804                              rd_kafka_err2str(err));
805                 return;
806         }
807 
808         /* Attach reply buffer to rko for parsing in the worker. */
809         rd_assert(!rko->rko_u.admin_request.reply_buf);
810         rko->rko_u.admin_request.reply_buf = reply;
811         rko->rko_err = err;
812 
813         if (rko->rko_op_cb(rk, NULL, rko) == RD_KAFKA_OP_RES_HANDLED)
814                 rd_kafka_op_destroy(rko);
815 
816 }
817 
818 /**
819  * @brief Generic handler for protocol responses, calls the admin ops'
820  *        Response_parse_cb and enqueues the result to the caller's queue.
821  */
rd_kafka_admin_response_parse(rd_kafka_op_t * rko)822 static void rd_kafka_admin_response_parse (rd_kafka_op_t *rko) {
823         rd_kafka_resp_err_t err;
824         rd_kafka_op_t *rko_result = NULL;
825         char errstr[512];
826 
827         if (rko->rko_err) {
828                 rd_kafka_admin_result_fail(
829                         rko, rko->rko_err,
830                         "%s worker request failed: %s",
831                         rd_kafka_op2str(rko->rko_type),
832                         rd_kafka_err2str(rko->rko_err));
833                 return;
834         }
835 
836         /* Response received.
837          * Let callback parse response and provide result in rko_result
838          * which is then enqueued on the reply queue. */
839         err = rko->rko_u.admin_request.cbs->parse(
840                 rko, &rko_result,
841                 rko->rko_u.admin_request.reply_buf,
842                 errstr, sizeof(errstr));
843         if (err) {
844                 rd_kafka_admin_result_fail(
845                         rko, err,
846                         "%s worker failed to parse response: %s",
847                         rd_kafka_op2str(rko->rko_type), errstr);
848                 return;
849         }
850 
851         rd_assert(rko_result);
852 
853         /* Enqueue result on application queue, we're done. */
854         rd_kafka_admin_result_enq(rko, rko_result);
855 }
856 
857 /**
858  * @brief Generic handler for coord_req() responses.
859  */
860 static void
rd_kafka_admin_coord_response_parse(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)861 rd_kafka_admin_coord_response_parse (rd_kafka_t *rk,
862                                      rd_kafka_broker_t *rkb,
863                                      rd_kafka_resp_err_t err,
864                                      rd_kafka_buf_t *rkbuf,
865                                      rd_kafka_buf_t *request,
866                                      void *opaque) {
867         rd_kafka_op_t *rko_result;
868         rd_kafka_enq_once_t *eonce = opaque;
869         rd_kafka_op_t *rko;
870         char errstr[512];
871 
872         rko = rd_kafka_enq_once_del_source_return(eonce,
873                                                   "coordinator response");
874         if (!rko)
875                 /* Admin request has timed out and been destroyed */
876                 return;
877 
878         if (err) {
879                 rd_kafka_admin_result_fail(
880                         rko, err,
881                         "%s worker coordinator request failed: %s",
882                         rd_kafka_op2str(rko->rko_type),
883                         rd_kafka_err2str(err));
884                 rd_kafka_admin_common_worker_destroy(rk, rko,
885                                                      rd_true/*destroy*/);
886                 return;
887         }
888 
889         err = rko->rko_u.admin_request.cbs->parse(
890                 rko, &rko_result, rkbuf,
891                 errstr, sizeof(errstr));
892         if (err) {
893                 rd_kafka_admin_result_fail(
894                         rko, err,
895                         "%s worker failed to parse coordinator %sResponse: %s",
896                         rd_kafka_op2str(rko->rko_type),
897                         rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
898                         errstr);
899                 rd_kafka_admin_common_worker_destroy(rk, rko,
900                                                      rd_true/*destroy*/);
901                 return;
902         }
903 
904         rd_assert(rko_result);
905 
906         /* Enqueue result on application queue, we're done. */
907         rd_kafka_admin_result_enq(rko, rko_result);
908 }
909 
910 
911 
912 /**
913  * @brief Common worker state machine handling regardless of request type.
914  *
915  * Tasks:
916  *  - Sets up timeout on first call.
917  *  - Checks for timeout.
918  *  - Checks for and fails on errors.
919  *  - Async Controller and broker lookups
920  *  - Calls the Request callback
921  *  - Calls the parse callback
922  *  - Result reply
923  *  - Destruction of rko
924  *
925  * rko->rko_err may be one of:
926  * RD_KAFKA_RESP_ERR_NO_ERROR, or
927  * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup, or
928  * RD_KAFKA_RESP_ERR__TIMED_OUT if request has timed out,
929  * or any other error code triggered by other parts of the code.
930  *
931  * @returns a hint to the op code whether the rko should be destroyed or not.
932  */
933 static rd_kafka_op_res_t
rd_kafka_admin_worker(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)934 rd_kafka_admin_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
935         const char *name = rd_kafka_op2str(rko->rko_type);
936         rd_ts_t timeout_in;
937         rd_kafka_broker_t *rkb = NULL;
938         rd_kafka_resp_err_t err;
939         char errstr[512];
940 
941         /* ADMIN_FANOUT handled by fanout_worker() */
942         rd_assert((rko->rko_type & ~ RD_KAFKA_OP_FLAGMASK) !=
943                   RD_KAFKA_OP_ADMIN_FANOUT);
944 
945         if (rd_kafka_terminating(rk)) {
946                 rd_kafka_dbg(rk, ADMIN, name,
947                              "%s worker called in state %s: "
948                              "handle is terminating: %s",
949                              name,
950                              rd_kafka_admin_state_desc[rko->rko_u.
951                                                        admin_request.state],
952                              rd_kafka_err2str(rko->rko_err));
953                 rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY,
954                                            "Handle is terminating: %s",
955                                            rd_kafka_err2str(rko->rko_err));
956                 goto destroy;
957         }
958 
959         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) {
960                 rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY,
961                                            "Destroyed");
962                 goto destroy; /* rko being destroyed (silent) */
963         }
964 
965         rd_kafka_dbg(rk, ADMIN, name,
966                      "%s worker called in state %s: %s",
967                      name,
968                      rd_kafka_admin_state_desc[rko->rko_u.admin_request.state],
969                      rd_kafka_err2str(rko->rko_err));
970 
971         rd_assert(thrd_is_current(rko->rko_rk->rk_thread));
972 
973         /* Check for errors raised asynchronously (e.g., by timer) */
974         if (rko->rko_err) {
975                 rd_kafka_admin_result_fail(
976                         rko, rko->rko_err,
977                         "Failed while %s: %s",
978                         rd_kafka_admin_state_desc[rko->rko_u.
979                                                   admin_request.state],
980                         rd_kafka_err2str(rko->rko_err));
981                 goto destroy;
982         }
983 
984         /* Check for timeout */
985         timeout_in = rd_kafka_admin_timeout_remains_us(rko);
986         if (timeout_in <= 0) {
987                 rd_kafka_admin_result_fail(
988                         rko, RD_KAFKA_RESP_ERR__TIMED_OUT,
989                         "Timed out %s",
990                         rd_kafka_admin_state_desc[rko->rko_u.
991                                                   admin_request.state]);
992                 goto destroy;
993         }
994 
995  redo:
996         switch (rko->rko_u.admin_request.state)
997         {
998         case RD_KAFKA_ADMIN_STATE_INIT:
999         {
1000                 int32_t broker_id;
1001 
1002                 /* First call. */
1003 
1004                 /* Set up timeout timer. */
1005                 rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce,
1006                                              "timeout timer");
1007                 rd_kafka_timer_start_oneshot(&rk->rk_timers,
1008                                              &rko->rko_u.admin_request.tmr,
1009                                              rd_true, timeout_in,
1010                                              rd_kafka_admin_eonce_timeout_cb,
1011                                              rko->rko_u.admin_request.eonce);
1012 
1013                 /* Use explicitly specified broker_id, if available. */
1014                 broker_id = (int32_t)rd_kafka_confval_get_int(
1015                         &rko->rko_u.admin_request.options.broker);
1016 
1017                 if (broker_id != -1) {
1018                         rd_kafka_dbg(rk, ADMIN, name,
1019                                      "%s using explicitly "
1020                                      "set broker id %"PRId32
1021                                      " rather than %"PRId32,
1022                                      name, broker_id,
1023                                      rko->rko_u.admin_request.broker_id);
1024                         rko->rko_u.admin_request.broker_id = broker_id;
1025                 } else {
1026                         /* Default to controller */
1027                         broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER;
1028                 }
1029 
1030                 /* Resolve target broker(s) */
1031                 switch (rko->rko_u.admin_request.broker_id)
1032                 {
1033                 case RD_KAFKA_ADMIN_TARGET_CONTROLLER:
1034                         /* Controller */
1035                         rko->rko_u.admin_request.state =
1036                                 RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER;
1037                         goto redo;  /* Trigger next state immediately */
1038 
1039                 case RD_KAFKA_ADMIN_TARGET_COORDINATOR:
1040                         /* Group (or other) coordinator */
1041                         rko->rko_u.admin_request.state =
1042                                 RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE;
1043                         rd_kafka_enq_once_add_source(rko->rko_u.admin_request.
1044                                                      eonce,
1045                                                      "coordinator request");
1046                         rd_kafka_coord_req(rk,
1047                                            rko->rko_u.admin_request.coordtype,
1048                                            rko->rko_u.admin_request.coordkey,
1049                                            rd_kafka_admin_coord_request,
1050                                            NULL,
1051                                            rd_kafka_admin_timeout_remains(rko),
1052                                            RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1053                                            rd_kafka_admin_coord_response_parse,
1054                                            rko->rko_u.admin_request.eonce);
1055                         /* Wait asynchronously for broker response, which will
1056                          * trigger the eonce and worker to be called again. */
1057                         return RD_KAFKA_OP_RES_KEEP;
1058 
1059                 case RD_KAFKA_ADMIN_TARGET_FANOUT:
1060                         /* Shouldn't come here, fanouts are handled by
1061                          * fanout_worker() */
1062                         RD_NOTREACHED();
1063                         return RD_KAFKA_OP_RES_KEEP;
1064 
1065                 default:
1066                         /* Specific broker */
1067                         rd_assert(rko->rko_u.admin_request.broker_id >= 0);
1068                         rko->rko_u.admin_request.state =
1069                                 RD_KAFKA_ADMIN_STATE_WAIT_BROKER;
1070                         goto redo;  /* Trigger next state immediately */
1071                 }
1072         }
1073 
1074 
1075         case RD_KAFKA_ADMIN_STATE_WAIT_BROKER:
1076                 /* Broker lookup */
1077                 if (!(rkb = rd_kafka_admin_common_get_broker(
1078                               rk, rko, rko->rko_u.admin_request.broker_id))) {
1079                         /* Still waiting for broker to become available */
1080                         return RD_KAFKA_OP_RES_KEEP;
1081                 }
1082 
1083                 rko->rko_u.admin_request.state =
1084                         RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST;
1085                 goto redo;
1086 
1087         case RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER:
1088                 if (!(rkb = rd_kafka_admin_common_get_controller(rk, rko))) {
1089                         /* Still waiting for controller to become available. */
1090                         return RD_KAFKA_OP_RES_KEEP;
1091                 }
1092 
1093                 rko->rko_u.admin_request.state =
1094                         RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST;
1095                 goto redo;
1096 
1097         case RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS:
1098                 /* This state is only used by ADMIN_FANOUT which has
1099                  * its own fanout_worker() */
1100                 RD_NOTREACHED();
1101                 break;
1102 
1103         case RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST:
1104                 /* Got broker, send protocol request. */
1105 
1106                 /* Make sure we're called from a 'goto redo' where
1107                  * the rkb was set. */
1108                 rd_assert(rkb);
1109 
1110                 /* Still need to use the eonce since this worker may
1111                  * time out while waiting for response from broker, in which
1112                  * case the broker response will hit an empty eonce (ok). */
1113                 rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce,
1114                                              "send");
1115 
1116                 /* Send request (async) */
1117                 err = rko->rko_u.admin_request.cbs->request(
1118                         rkb,
1119                         &rko->rko_u.admin_request.args,
1120                         &rko->rko_u.admin_request.options,
1121                         errstr, sizeof(errstr),
1122                         RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1123                         rd_kafka_admin_handle_response,
1124                         rko->rko_u.admin_request.eonce);
1125 
1126                 /* Loose broker refcount from get_broker(), get_controller() */
1127                 rd_kafka_broker_destroy(rkb);
1128 
1129                 if (err) {
1130                         rd_kafka_enq_once_del_source(
1131                                 rko->rko_u.admin_request.eonce, "send");
1132                         rd_kafka_admin_result_fail(rko, err, "%s", errstr);
1133                         goto destroy;
1134                 }
1135 
1136                 rko->rko_u.admin_request.state =
1137                         RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE;
1138 
1139                 /* Wait asynchronously for broker response, which will
1140                  * trigger the eonce and worker to be called again. */
1141                 return RD_KAFKA_OP_RES_KEEP;
1142 
1143 
1144         case RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE:
1145                 rd_kafka_admin_response_parse(rko);
1146                 goto destroy;
1147         }
1148 
1149         return RD_KAFKA_OP_RES_KEEP;
1150 
1151  destroy:
1152         rd_kafka_admin_common_worker_destroy(rk, rko,
1153                                              rd_false/*don't destroy*/);
1154         return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy() */
1155 
1156 }
1157 
1158 
1159 /**
1160  * @brief Create a new admin_fanout op of type \p req_type and sets up the
1161  *        generic (type independent files).
1162  *
1163  *        The caller shall then populate the \c admin_fanout.requests list,
1164  *        initialize the \c admin_fanout.responses list,
1165  *        set the initial \c admin_fanout.outstanding value,
1166  *        and enqueue the op on rk_ops for further processing work.
1167  *
1168  * @param cbs Callbacks, must reside in .data segment.
1169  * @param options Optional options, may be NULL to use defaults.
1170  * @param rkq is the application reply queue.
1171  *
1172  * @locks none
1173  * @locality application thread
1174  */
1175 static rd_kafka_op_t *
rd_kafka_admin_fanout_op_new(rd_kafka_t * rk,rd_kafka_op_type_t req_type,rd_kafka_event_type_t reply_event_type,const struct rd_kafka_admin_fanout_worker_cbs * cbs,const rd_kafka_AdminOptions_t * options,rd_kafka_q_t * rkq)1176 rd_kafka_admin_fanout_op_new (rd_kafka_t *rk,
1177                               rd_kafka_op_type_t req_type,
1178                               rd_kafka_event_type_t reply_event_type,
1179                               const struct rd_kafka_admin_fanout_worker_cbs
1180                               *cbs,
1181                               const rd_kafka_AdminOptions_t *options,
1182                               rd_kafka_q_t *rkq) {
1183         rd_kafka_op_t *rko;
1184 
1185         rd_assert(rk);
1186         rd_assert(rkq);
1187         rd_assert(cbs);
1188 
1189         rko = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_FANOUT);
1190         rko->rko_rk = rk;
1191 
1192         rko->rko_u.admin_request.reply_event_type = reply_event_type;
1193 
1194         rko->rko_u.admin_request.fanout.cbs =
1195                 (struct rd_kafka_admin_fanout_worker_cbs *)cbs;
1196 
1197         /* Make a copy of the options */
1198         if (options)
1199                 rko->rko_u.admin_request.options = *options;
1200         else
1201                 rd_kafka_AdminOptions_init(rk,
1202                                            &rko->rko_u.admin_request.options);
1203 
1204         rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_FANOUT;
1205 
1206         /* Calculate absolute timeout */
1207         rko->rko_u.admin_request.abs_timeout =
1208                 rd_timeout_init(
1209                         rd_kafka_confval_get_int(&rko->rko_u.admin_request.
1210                                                  options.request_timeout));
1211 
1212         /* Set up replyq */
1213         rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0);
1214 
1215         rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS;
1216 
1217         rko->rko_u.admin_request.fanout.reqtype = req_type;
1218 
1219         return rko;
1220 }
1221 
1222 
1223 /**
1224  * @brief Common fanout worker state machine handling regardless of request type
1225  *
1226  * @param rko Result of a fanned out operation, e.g., DELETERECORDS result.
1227  *
1228  * Tasks:
1229  *  - Checks for and responds to client termination
1230  *  - Polls for fanned out responses
1231  *  - Calls the partial response callback
1232  *  - Calls the merge responses callback upon receipt of all partial responses
1233  *  - Destruction of rko
1234  *
1235  * rko->rko_err may be one of:
1236  * RD_KAFKA_RESP_ERR_NO_ERROR, or
1237  * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup.
1238  *
1239  * @returns a hint to the op code whether the rko should be destroyed or not.
1240  */
1241 static rd_kafka_op_res_t
rd_kafka_admin_fanout_worker(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1242 rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq,
1243                               rd_kafka_op_t *rko) {
1244         rd_kafka_op_t *rko_fanout = rko->rko_u.admin_result.fanout_parent;
1245         const char *name = rd_kafka_op2str(rko_fanout->rko_u.admin_request.
1246                                            fanout.reqtype);
1247         rd_kafka_op_t *rko_result;
1248 
1249         RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_ADMIN_RESULT);
1250         RD_KAFKA_OP_TYPE_ASSERT(rko_fanout, RD_KAFKA_OP_ADMIN_FANOUT);
1251 
1252         rd_assert(rko_fanout->rko_u.admin_request.fanout.outstanding > 0);
1253         rko_fanout->rko_u.admin_request.fanout.outstanding--;
1254 
1255         rko->rko_u.admin_result.fanout_parent = NULL;
1256 
1257         if (rd_kafka_terminating(rk)) {
1258                 rd_kafka_dbg(rk, ADMIN, name,
1259                              "%s fanout worker called for fanned out op %s: "
1260                              "handle is terminating: %s",
1261                              name,
1262                              rd_kafka_op2str(rko->rko_type),
1263                              rd_kafka_err2str(rko_fanout->rko_err));
1264                 if (!rko->rko_err)
1265                         rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY;
1266         }
1267 
1268         rd_kafka_dbg(rk, ADMIN, name,
1269                      "%s fanout worker called for %s with %d request(s) "
1270                      "outstanding: %s",
1271                      name,
1272                      rd_kafka_op2str(rko->rko_type),
1273                      rko_fanout->rko_u.admin_request.fanout.outstanding,
1274                      rd_kafka_err2str(rko_fanout->rko_err));
1275 
1276         /* Add partial response to rko_fanout's result list. */
1277         rko_fanout->rko_u.admin_request.
1278                 fanout.cbs->partial_response(rko_fanout, rko);
1279 
1280         if (rko_fanout->rko_u.admin_request.fanout.outstanding > 0)
1281                 /* Wait for outstanding requests to finish */
1282                 return RD_KAFKA_OP_RES_HANDLED;
1283 
1284         rko_result = rd_kafka_admin_result_new(rko_fanout);
1285         rd_list_init_copy(&rko_result->rko_u.admin_result.results,
1286                           &rko_fanout->rko_u.admin_request.fanout.results);
1287         rd_list_copy_to(&rko_result->rko_u.admin_result.results,
1288                         &rko_fanout->rko_u.admin_request.fanout.results,
1289                         rko_fanout->rko_u.admin_request.
1290                         fanout.cbs->copy_result, NULL);
1291 
1292         /* Enqueue result on application queue, we're done. */
1293         rd_kafka_replyq_enq(&rko_fanout->rko_u.admin_request.replyq, rko_result,
1294                             rko_fanout->rko_u.admin_request.replyq.version);
1295 
1296         /* FALLTHRU */
1297         if (rko_fanout->rko_u.admin_request.fanout.outstanding == 0)
1298                 rd_kafka_op_destroy(rko_fanout);
1299 
1300         return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy(rko) */
1301 }
1302 
1303 /**@}*/
1304 
1305 
1306 /**
1307  * @name Generic AdminOptions
1308  * @{
1309  *
1310  *
1311  */
1312 
1313 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_request_timeout(rd_kafka_AdminOptions_t * options,int timeout_ms,char * errstr,size_t errstr_size)1314 rd_kafka_AdminOptions_set_request_timeout (rd_kafka_AdminOptions_t *options,
1315                                            int timeout_ms,
1316                                            char *errstr, size_t errstr_size) {
1317         return rd_kafka_confval_set_type(&options->request_timeout,
1318                                          RD_KAFKA_CONFVAL_INT, &timeout_ms,
1319                                          errstr, errstr_size);
1320 }
1321 
1322 
1323 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_operation_timeout(rd_kafka_AdminOptions_t * options,int timeout_ms,char * errstr,size_t errstr_size)1324 rd_kafka_AdminOptions_set_operation_timeout (rd_kafka_AdminOptions_t *options,
1325                                              int timeout_ms,
1326                                              char *errstr, size_t errstr_size) {
1327         return rd_kafka_confval_set_type(&options->operation_timeout,
1328                                          RD_KAFKA_CONFVAL_INT, &timeout_ms,
1329                                          errstr, errstr_size);
1330 }
1331 
1332 
1333 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_validate_only(rd_kafka_AdminOptions_t * options,int true_or_false,char * errstr,size_t errstr_size)1334 rd_kafka_AdminOptions_set_validate_only (rd_kafka_AdminOptions_t *options,
1335                                         int true_or_false,
1336                                         char *errstr, size_t errstr_size) {
1337         return rd_kafka_confval_set_type(&options->validate_only,
1338                                          RD_KAFKA_CONFVAL_INT, &true_or_false,
1339                                          errstr, errstr_size);
1340 }
1341 
1342 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_incremental(rd_kafka_AdminOptions_t * options,int true_or_false,char * errstr,size_t errstr_size)1343 rd_kafka_AdminOptions_set_incremental (rd_kafka_AdminOptions_t *options,
1344                                        int true_or_false,
1345                                        char *errstr, size_t errstr_size) {
1346         rd_snprintf(errstr, errstr_size,
1347                     "Incremental updates currently not supported, see KIP-248");
1348         return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
1349 
1350         return rd_kafka_confval_set_type(&options->incremental,
1351                                          RD_KAFKA_CONFVAL_INT, &true_or_false,
1352                                          errstr, errstr_size);
1353 }
1354 
1355 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_broker(rd_kafka_AdminOptions_t * options,int32_t broker_id,char * errstr,size_t errstr_size)1356 rd_kafka_AdminOptions_set_broker (rd_kafka_AdminOptions_t *options,
1357                                   int32_t broker_id,
1358                                   char *errstr, size_t errstr_size) {
1359         int ibroker_id = (int)broker_id;
1360 
1361         return rd_kafka_confval_set_type(&options->broker,
1362                                          RD_KAFKA_CONFVAL_INT,
1363                                          &ibroker_id,
1364                                          errstr, errstr_size);
1365 }
1366 
1367 void
rd_kafka_AdminOptions_set_opaque(rd_kafka_AdminOptions_t * options,void * opaque)1368 rd_kafka_AdminOptions_set_opaque (rd_kafka_AdminOptions_t *options,
1369                                   void *opaque) {
1370         rd_kafka_confval_set_type(&options->opaque,
1371                                   RD_KAFKA_CONFVAL_PTR, opaque, NULL, 0);
1372 }
1373 
1374 
1375 /**
1376  * @brief Initialize and set up defaults for AdminOptions
1377  */
rd_kafka_AdminOptions_init(rd_kafka_t * rk,rd_kafka_AdminOptions_t * options)1378 static void rd_kafka_AdminOptions_init (rd_kafka_t *rk,
1379                                         rd_kafka_AdminOptions_t *options) {
1380         rd_kafka_confval_init_int(&options->request_timeout, "request_timeout",
1381                                   0, 3600*1000,
1382                                   rk->rk_conf.admin.request_timeout_ms);
1383 
1384         if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
1385             options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS ||
1386             options->for_api == RD_KAFKA_ADMIN_OP_DELETETOPICS ||
1387             options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS ||
1388             options->for_api == RD_KAFKA_ADMIN_OP_DELETERECORDS)
1389                 rd_kafka_confval_init_int(&options->operation_timeout,
1390                                           "operation_timeout",
1391                                           -1, 3600*1000,
1392                                           rk->rk_conf.admin.request_timeout_ms);
1393         else
1394                 rd_kafka_confval_disable(&options->operation_timeout,
1395                                          "operation_timeout");
1396 
1397         if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
1398             options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS ||
1399             options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS ||
1400             options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS)
1401                 rd_kafka_confval_init_int(&options->validate_only,
1402                                           "validate_only",
1403                                           0, 1, 0);
1404         else
1405                 rd_kafka_confval_disable(&options->validate_only,
1406                                          "validate_only");
1407 
1408         if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
1409             options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS)
1410                 rd_kafka_confval_init_int(&options->incremental,
1411                                           "incremental",
1412                                           0, 1, 0);
1413         else
1414                 rd_kafka_confval_disable(&options->incremental,
1415                                          "incremental");
1416 
1417         rd_kafka_confval_init_int(&options->broker, "broker",
1418                                   0, INT32_MAX, -1);
1419         rd_kafka_confval_init_ptr(&options->opaque, "opaque");
1420 }
1421 
1422 
1423 rd_kafka_AdminOptions_t *
rd_kafka_AdminOptions_new(rd_kafka_t * rk,rd_kafka_admin_op_t for_api)1424 rd_kafka_AdminOptions_new (rd_kafka_t *rk, rd_kafka_admin_op_t for_api) {
1425         rd_kafka_AdminOptions_t *options;
1426 
1427         if ((int)for_api < 0 || for_api >= RD_KAFKA_ADMIN_OP__CNT)
1428                 return NULL;
1429 
1430         options = rd_calloc(1, sizeof(*options));
1431 
1432         options->for_api = for_api;
1433 
1434         rd_kafka_AdminOptions_init(rk, options);
1435 
1436         return options;
1437 }
1438 
rd_kafka_AdminOptions_destroy(rd_kafka_AdminOptions_t * options)1439 void rd_kafka_AdminOptions_destroy (rd_kafka_AdminOptions_t *options) {
1440         rd_free(options);
1441 }
1442 
1443 /**@}*/
1444 
1445 
1446 
1447 
1448 
1449 
1450 /**
1451  * @name CreateTopics
1452  * @{
1453  *
1454  *
1455  *
1456  */
1457 
1458 
1459 
1460 rd_kafka_NewTopic_t *
rd_kafka_NewTopic_new(const char * topic,int num_partitions,int replication_factor,char * errstr,size_t errstr_size)1461 rd_kafka_NewTopic_new (const char *topic,
1462                        int num_partitions,
1463                        int replication_factor,
1464                        char *errstr, size_t errstr_size) {
1465         rd_kafka_NewTopic_t *new_topic;
1466 
1467         if (!topic) {
1468                 rd_snprintf(errstr, errstr_size, "Invalid topic name");
1469                 return NULL;
1470         }
1471 
1472         if (num_partitions < -1 || num_partitions > RD_KAFKAP_PARTITIONS_MAX) {
1473                 rd_snprintf(errstr, errstr_size, "num_partitions out of "
1474                             "expected range %d..%d or -1 for broker default",
1475                             1, RD_KAFKAP_PARTITIONS_MAX);
1476                 return NULL;
1477         }
1478 
1479         if (replication_factor < -1 ||
1480             replication_factor > RD_KAFKAP_BROKERS_MAX) {
1481                 rd_snprintf(errstr, errstr_size,
1482                             "replication_factor out of expected range %d..%d",
1483                             -1, RD_KAFKAP_BROKERS_MAX);
1484                 return NULL;
1485         }
1486 
1487         new_topic = rd_calloc(1, sizeof(*new_topic));
1488         new_topic->topic = rd_strdup(topic);
1489         new_topic->num_partitions = num_partitions;
1490         new_topic->replication_factor = replication_factor;
1491 
1492         /* List of int32 lists */
1493         rd_list_init(&new_topic->replicas, 0, rd_list_destroy_free);
1494         rd_list_prealloc_elems(&new_topic->replicas, 0,
1495                                num_partitions == -1 ? 0 : num_partitions,
1496                                0/*nozero*/);
1497 
1498         /* List of ConfigEntrys */
1499         rd_list_init(&new_topic->config, 0, rd_kafka_ConfigEntry_free);
1500 
1501         return new_topic;
1502 
1503 }
1504 
1505 
1506 /**
1507  * @brief Topic name comparator for NewTopic_t
1508  */
rd_kafka_NewTopic_cmp(const void * _a,const void * _b)1509 static int rd_kafka_NewTopic_cmp (const void *_a, const void *_b) {
1510         const rd_kafka_NewTopic_t *a = _a, *b = _b;
1511         return strcmp(a->topic, b->topic);
1512 }
1513 
1514 
1515 
1516 /**
1517  * @brief Allocate a new NewTopic and make a copy of \p src
1518  */
1519 static rd_kafka_NewTopic_t *
rd_kafka_NewTopic_copy(const rd_kafka_NewTopic_t * src)1520 rd_kafka_NewTopic_copy (const rd_kafka_NewTopic_t *src) {
1521         rd_kafka_NewTopic_t *dst;
1522 
1523         dst = rd_kafka_NewTopic_new(src->topic, src->num_partitions,
1524                                     src->replication_factor, NULL, 0);
1525         rd_assert(dst);
1526 
1527         rd_list_destroy(&dst->replicas); /* created in .._new() */
1528         rd_list_init_copy(&dst->replicas, &src->replicas);
1529         rd_list_copy_to(&dst->replicas, &src->replicas,
1530                         rd_list_copy_preallocated, NULL);
1531 
1532         rd_list_init_copy(&dst->config, &src->config);
1533         rd_list_copy_to(&dst->config, &src->config,
1534                         rd_kafka_ConfigEntry_list_copy, NULL);
1535 
1536         return dst;
1537 }
1538 
rd_kafka_NewTopic_destroy(rd_kafka_NewTopic_t * new_topic)1539 void rd_kafka_NewTopic_destroy (rd_kafka_NewTopic_t *new_topic) {
1540         rd_list_destroy(&new_topic->replicas);
1541         rd_list_destroy(&new_topic->config);
1542         rd_free(new_topic->topic);
1543         rd_free(new_topic);
1544 }
1545 
rd_kafka_NewTopic_free(void * ptr)1546 static void rd_kafka_NewTopic_free (void *ptr) {
1547         rd_kafka_NewTopic_destroy(ptr);
1548 }
1549 
1550 void
rd_kafka_NewTopic_destroy_array(rd_kafka_NewTopic_t ** new_topics,size_t new_topic_cnt)1551 rd_kafka_NewTopic_destroy_array (rd_kafka_NewTopic_t **new_topics,
1552                                  size_t new_topic_cnt) {
1553         size_t i;
1554         for (i = 0 ; i < new_topic_cnt ; i++)
1555                 rd_kafka_NewTopic_destroy(new_topics[i]);
1556 }
1557 
1558 
1559 rd_kafka_resp_err_t
rd_kafka_NewTopic_set_replica_assignment(rd_kafka_NewTopic_t * new_topic,int32_t partition,int32_t * broker_ids,size_t broker_id_cnt,char * errstr,size_t errstr_size)1560 rd_kafka_NewTopic_set_replica_assignment (rd_kafka_NewTopic_t *new_topic,
1561                                           int32_t partition,
1562                                           int32_t *broker_ids,
1563                                           size_t broker_id_cnt,
1564                                           char *errstr, size_t errstr_size) {
1565         rd_list_t *rl;
1566         int i;
1567 
1568         if (new_topic->replication_factor != -1) {
1569                 rd_snprintf(errstr, errstr_size,
1570                             "Specifying a replication factor and "
1571                             "a replica assignment are mutually exclusive");
1572                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1573         } else if (new_topic->num_partitions == -1) {
1574                 rd_snprintf(errstr, errstr_size,
1575                             "Specifying a default partition count and a "
1576                             "replica assignment are mutually exclusive");
1577                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1578         }
1579 
1580         /* Replica partitions must be added consecutively starting from 0. */
1581         if (partition != rd_list_cnt(&new_topic->replicas)) {
1582                 rd_snprintf(errstr, errstr_size,
1583                             "Partitions must be added in order, "
1584                             "starting at 0: expecting partition %d, "
1585                             "not %"PRId32,
1586                             rd_list_cnt(&new_topic->replicas), partition);
1587                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1588         }
1589 
1590         if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) {
1591                 rd_snprintf(errstr, errstr_size,
1592                             "Too many brokers specified "
1593                             "(RD_KAFKAP_BROKERS_MAX=%d)",
1594                             RD_KAFKAP_BROKERS_MAX);
1595                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1596         }
1597 
1598 
1599         rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt);
1600 
1601         for (i = 0 ; i < (int)broker_id_cnt ; i++)
1602                 rd_list_set_int32(rl, i, broker_ids[i]);
1603 
1604         rd_list_add(&new_topic->replicas, rl);
1605 
1606         return RD_KAFKA_RESP_ERR_NO_ERROR;
1607 }
1608 
1609 
1610 /**
1611  * @brief Generic constructor of ConfigEntry which is also added to \p rl
1612  */
1613 static rd_kafka_resp_err_t
rd_kafka_admin_add_config0(rd_list_t * rl,const char * name,const char * value,rd_kafka_AlterOperation_t operation)1614 rd_kafka_admin_add_config0 (rd_list_t *rl,
1615                             const char *name, const char *value,
1616                             rd_kafka_AlterOperation_t operation) {
1617         rd_kafka_ConfigEntry_t *entry;
1618 
1619         if (!name)
1620                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1621 
1622         entry = rd_calloc(1, sizeof(*entry));
1623         entry->kv = rd_strtup_new(name, value);
1624         entry->a.operation = operation;
1625 
1626         rd_list_add(rl, entry);
1627 
1628         return RD_KAFKA_RESP_ERR_NO_ERROR;
1629 }
1630 
1631 
1632 rd_kafka_resp_err_t
rd_kafka_NewTopic_set_config(rd_kafka_NewTopic_t * new_topic,const char * name,const char * value)1633 rd_kafka_NewTopic_set_config (rd_kafka_NewTopic_t *new_topic,
1634                               const char *name, const char *value) {
1635         return rd_kafka_admin_add_config0(&new_topic->config, name, value,
1636                                           RD_KAFKA_ALTER_OP_ADD);
1637 }
1638 
1639 
1640 
1641 /**
1642  * @brief Parse CreateTopicsResponse and create ADMIN_RESULT op.
1643  */
1644 static rd_kafka_resp_err_t
rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)1645 rd_kafka_CreateTopicsResponse_parse (rd_kafka_op_t *rko_req,
1646                                      rd_kafka_op_t **rko_resultp,
1647                                      rd_kafka_buf_t *reply,
1648                                      char *errstr, size_t errstr_size) {
1649         const int log_decode_errors = LOG_ERR;
1650         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
1651         rd_kafka_t *rk = rkb->rkb_rk;
1652         rd_kafka_op_t *rko_result = NULL;
1653         int32_t topic_cnt;
1654         int i;
1655 
1656         if (rd_kafka_buf_ApiVersion(reply) >= 2) {
1657                 int32_t Throttle_Time;
1658                 rd_kafka_buf_read_i32(reply, &Throttle_Time);
1659                 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
1660         }
1661 
1662         /* #topics */
1663         rd_kafka_buf_read_i32(reply, &topic_cnt);
1664 
1665         if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
1666                 rd_kafka_buf_parse_fail(
1667                         reply,
1668                         "Received %"PRId32" topics in response "
1669                         "when only %d were requested", topic_cnt,
1670                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
1671 
1672 
1673         rko_result = rd_kafka_admin_result_new(rko_req);
1674 
1675         rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt,
1676                      rd_kafka_topic_result_free);
1677 
1678         for (i = 0 ; i < (int)topic_cnt ; i++) {
1679                 rd_kafkap_str_t ktopic;
1680                 int16_t error_code;
1681                 rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER;
1682                 char *this_errstr = NULL;
1683                 rd_kafka_topic_result_t *terr;
1684                 rd_kafka_NewTopic_t skel;
1685                 int orig_pos;
1686 
1687                 rd_kafka_buf_read_str(reply, &ktopic);
1688                 rd_kafka_buf_read_i16(reply, &error_code);
1689 
1690                 if (rd_kafka_buf_ApiVersion(reply) >= 1)
1691                         rd_kafka_buf_read_str(reply, &error_msg);
1692 
1693                 /* For non-blocking CreateTopicsRequests the broker
1694                  * will returned REQUEST_TIMED_OUT for topics
1695                  * that were triggered for creation -
1696                  * we hide this error code from the application
1697                  * since the topic creation is in fact in progress. */
1698                 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
1699                     rd_kafka_confval_get_int(&rko_req->rko_u.
1700                                              admin_request.options.
1701                                              operation_timeout) <= 0) {
1702                         error_code = RD_KAFKA_RESP_ERR_NO_ERROR;
1703                         this_errstr = NULL;
1704                 }
1705 
1706                 if (error_code) {
1707                         if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
1708                             RD_KAFKAP_STR_LEN(&error_msg) == 0)
1709                                 this_errstr =
1710                                         (char *)rd_kafka_err2str(error_code);
1711                         else
1712                                 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
1713 
1714                 }
1715 
1716                 terr = rd_kafka_topic_result_new(ktopic.str,
1717                                                  RD_KAFKAP_STR_LEN(&ktopic),
1718                                                  error_code, this_errstr);
1719 
1720                 /* As a convenience to the application we insert topic result
1721                  * in the same order as they were requested. The broker
1722                  * does not maintain ordering unfortunately. */
1723                 skel.topic = terr->topic;
1724                 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
1725                                          &skel, rd_kafka_NewTopic_cmp);
1726                 if (orig_pos == -1) {
1727                         rd_kafka_topic_result_destroy(terr);
1728                         rd_kafka_buf_parse_fail(
1729                                 reply,
1730                                 "Broker returned topic %.*s that was not "
1731                                 "included in the original request",
1732                                 RD_KAFKAP_STR_PR(&ktopic));
1733                 }
1734 
1735                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
1736                                  orig_pos) != NULL) {
1737                         rd_kafka_topic_result_destroy(terr);
1738                         rd_kafka_buf_parse_fail(
1739                                 reply,
1740                                 "Broker returned topic %.*s multiple times",
1741                                 RD_KAFKAP_STR_PR(&ktopic));
1742                 }
1743 
1744                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
1745                             terr);
1746         }
1747 
1748         *rko_resultp = rko_result;
1749 
1750         return RD_KAFKA_RESP_ERR_NO_ERROR;
1751 
1752  err_parse:
1753         if (rko_result)
1754                 rd_kafka_op_destroy(rko_result);
1755 
1756         rd_snprintf(errstr, errstr_size,
1757                     "CreateTopics response protocol parse failure: %s",
1758                     rd_kafka_err2str(reply->rkbuf_err));
1759 
1760         return reply->rkbuf_err;
1761 }
1762 
1763 
rd_kafka_CreateTopics(rd_kafka_t * rk,rd_kafka_NewTopic_t ** new_topics,size_t new_topic_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)1764 void rd_kafka_CreateTopics (rd_kafka_t *rk,
1765                             rd_kafka_NewTopic_t **new_topics,
1766                             size_t new_topic_cnt,
1767                             const rd_kafka_AdminOptions_t *options,
1768                             rd_kafka_queue_t *rkqu) {
1769         rd_kafka_op_t *rko;
1770         size_t i;
1771         static const struct rd_kafka_admin_worker_cbs cbs = {
1772                 rd_kafka_CreateTopicsRequest,
1773                 rd_kafka_CreateTopicsResponse_parse,
1774         };
1775 
1776         rd_assert(rkqu);
1777 
1778         rko = rd_kafka_admin_request_op_new(rk,
1779                                             RD_KAFKA_OP_CREATETOPICS,
1780                                             RD_KAFKA_EVENT_CREATETOPICS_RESULT,
1781                                             &cbs, options, rkqu->rkqu_q);
1782 
1783         rd_list_init(&rko->rko_u.admin_request.args, (int)new_topic_cnt,
1784                      rd_kafka_NewTopic_free);
1785 
1786         for (i = 0 ; i < new_topic_cnt ; i++)
1787                 rd_list_add(&rko->rko_u.admin_request.args,
1788                             rd_kafka_NewTopic_copy(new_topics[i]));
1789 
1790         rd_kafka_q_enq(rk->rk_ops, rko);
1791 }
1792 
1793 
1794 /**
1795  * @brief Get an array of topic results from a CreateTopics result.
1796  *
1797  * The returned \p topics life-time is the same as the \p result object.
1798  * @param cntp is updated to the number of elements in the array.
1799  */
1800 const rd_kafka_topic_result_t **
rd_kafka_CreateTopics_result_topics(const rd_kafka_CreateTopics_result_t * result,size_t * cntp)1801 rd_kafka_CreateTopics_result_topics (
1802         const rd_kafka_CreateTopics_result_t *result,
1803         size_t *cntp) {
1804         return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result,
1805                                                 cntp);
1806 }
1807 
1808 /**@}*/
1809 
1810 
1811 
1812 
1813 /**
1814  * @name Delete topics
1815  * @{
1816  *
1817  *
1818  *
1819  *
1820  */
1821 
rd_kafka_DeleteTopic_new(const char * topic)1822 rd_kafka_DeleteTopic_t *rd_kafka_DeleteTopic_new (const char *topic) {
1823         size_t tsize = strlen(topic) + 1;
1824         rd_kafka_DeleteTopic_t *del_topic;
1825 
1826         /* Single allocation */
1827         del_topic = rd_malloc(sizeof(*del_topic) + tsize);
1828         del_topic->topic = del_topic->data;
1829         memcpy(del_topic->topic, topic, tsize);
1830 
1831         return del_topic;
1832 }
1833 
rd_kafka_DeleteTopic_destroy(rd_kafka_DeleteTopic_t * del_topic)1834 void rd_kafka_DeleteTopic_destroy (rd_kafka_DeleteTopic_t *del_topic) {
1835         rd_free(del_topic);
1836 }
1837 
rd_kafka_DeleteTopic_free(void * ptr)1838 static void rd_kafka_DeleteTopic_free (void *ptr) {
1839         rd_kafka_DeleteTopic_destroy(ptr);
1840 }
1841 
1842 
rd_kafka_DeleteTopic_destroy_array(rd_kafka_DeleteTopic_t ** del_topics,size_t del_topic_cnt)1843 void rd_kafka_DeleteTopic_destroy_array (rd_kafka_DeleteTopic_t **del_topics,
1844                                          size_t del_topic_cnt) {
1845         size_t i;
1846         for (i = 0 ; i < del_topic_cnt ; i++)
1847                 rd_kafka_DeleteTopic_destroy(del_topics[i]);
1848 }
1849 
1850 
1851 /**
1852  * @brief Topic name comparator for DeleteTopic_t
1853  */
rd_kafka_DeleteTopic_cmp(const void * _a,const void * _b)1854 static int rd_kafka_DeleteTopic_cmp (const void *_a, const void *_b) {
1855         const rd_kafka_DeleteTopic_t *a = _a, *b = _b;
1856         return strcmp(a->topic, b->topic);
1857 }
1858 
1859 /**
1860  * @brief Allocate a new DeleteTopic and make a copy of \p src
1861  */
1862 static rd_kafka_DeleteTopic_t *
rd_kafka_DeleteTopic_copy(const rd_kafka_DeleteTopic_t * src)1863 rd_kafka_DeleteTopic_copy (const rd_kafka_DeleteTopic_t *src) {
1864         return rd_kafka_DeleteTopic_new(src->topic);
1865 }
1866 
1867 
1868 
1869 
1870 
1871 
1872 
1873 /**
1874  * @brief Parse DeleteTopicsResponse and create ADMIN_RESULT op.
1875  */
1876 static rd_kafka_resp_err_t
rd_kafka_DeleteTopicsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)1877 rd_kafka_DeleteTopicsResponse_parse (rd_kafka_op_t *rko_req,
1878                                      rd_kafka_op_t **rko_resultp,
1879                                      rd_kafka_buf_t *reply,
1880                                      char *errstr, size_t errstr_size) {
1881         const int log_decode_errors = LOG_ERR;
1882         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
1883         rd_kafka_t *rk = rkb->rkb_rk;
1884         rd_kafka_op_t *rko_result = NULL;
1885         int32_t topic_cnt;
1886         int i;
1887 
1888         if (rd_kafka_buf_ApiVersion(reply) >= 1) {
1889                 int32_t Throttle_Time;
1890                 rd_kafka_buf_read_i32(reply, &Throttle_Time);
1891                 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
1892         }
1893 
1894         /* #topics */
1895         rd_kafka_buf_read_i32(reply, &topic_cnt);
1896 
1897         if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
1898                 rd_kafka_buf_parse_fail(
1899                         reply,
1900                         "Received %"PRId32" topics in response "
1901                         "when only %d were requested", topic_cnt,
1902                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
1903 
1904         rko_result = rd_kafka_admin_result_new(rko_req);
1905 
1906         rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt,
1907                      rd_kafka_topic_result_free);
1908 
1909         for (i = 0 ; i < (int)topic_cnt ; i++) {
1910                 rd_kafkap_str_t ktopic;
1911                 int16_t error_code;
1912                 rd_kafka_topic_result_t *terr;
1913                 rd_kafka_NewTopic_t skel;
1914                 int orig_pos;
1915 
1916                 rd_kafka_buf_read_str(reply, &ktopic);
1917                 rd_kafka_buf_read_i16(reply, &error_code);
1918 
1919                 /* For non-blocking DeleteTopicsRequests the broker
1920                  * will returned REQUEST_TIMED_OUT for topics
1921                  * that were triggered for creation -
1922                  * we hide this error code from the application
1923                  * since the topic creation is in fact in progress. */
1924                 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
1925                     rd_kafka_confval_get_int(&rko_req->rko_u.
1926                                              admin_request.options.
1927                                              operation_timeout) <= 0) {
1928                         error_code = RD_KAFKA_RESP_ERR_NO_ERROR;
1929                 }
1930 
1931                 terr = rd_kafka_topic_result_new(ktopic.str,
1932                                                  RD_KAFKAP_STR_LEN(&ktopic),
1933                                                  error_code,
1934                                                  error_code ?
1935                                                  rd_kafka_err2str(error_code) :
1936                                                  NULL);
1937 
1938                 /* As a convenience to the application we insert topic result
1939                  * in the same order as they were requested. The broker
1940                  * does not maintain ordering unfortunately. */
1941                 skel.topic = terr->topic;
1942                 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
1943                                          &skel, rd_kafka_DeleteTopic_cmp);
1944                 if (orig_pos == -1) {
1945                         rd_kafka_topic_result_destroy(terr);
1946                         rd_kafka_buf_parse_fail(
1947                                 reply,
1948                                 "Broker returned topic %.*s that was not "
1949                                 "included in the original request",
1950                                 RD_KAFKAP_STR_PR(&ktopic));
1951                 }
1952 
1953                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
1954                                  orig_pos) != NULL) {
1955                         rd_kafka_topic_result_destroy(terr);
1956                         rd_kafka_buf_parse_fail(
1957                                 reply,
1958                                 "Broker returned topic %.*s multiple times",
1959                                 RD_KAFKAP_STR_PR(&ktopic));
1960                 }
1961 
1962                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
1963                             terr);
1964         }
1965 
1966         *rko_resultp = rko_result;
1967 
1968         return RD_KAFKA_RESP_ERR_NO_ERROR;
1969 
1970  err_parse:
1971         if (rko_result)
1972                 rd_kafka_op_destroy(rko_result);
1973 
1974         rd_snprintf(errstr, errstr_size,
1975                     "DeleteTopics response protocol parse failure: %s",
1976                     rd_kafka_err2str(reply->rkbuf_err));
1977 
1978         return reply->rkbuf_err;
1979 }
1980 
1981 
1982 
1983 
1984 
1985 
rd_kafka_DeleteTopics(rd_kafka_t * rk,rd_kafka_DeleteTopic_t ** del_topics,size_t del_topic_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)1986 void rd_kafka_DeleteTopics (rd_kafka_t *rk,
1987                             rd_kafka_DeleteTopic_t **del_topics,
1988                             size_t del_topic_cnt,
1989                             const rd_kafka_AdminOptions_t *options,
1990                             rd_kafka_queue_t *rkqu) {
1991         rd_kafka_op_t *rko;
1992         size_t i;
1993         static const struct rd_kafka_admin_worker_cbs cbs = {
1994                 rd_kafka_DeleteTopicsRequest,
1995                 rd_kafka_DeleteTopicsResponse_parse,
1996         };
1997 
1998         rd_assert(rkqu);
1999 
2000         rko = rd_kafka_admin_request_op_new(rk,
2001                                             RD_KAFKA_OP_DELETETOPICS,
2002                                             RD_KAFKA_EVENT_DELETETOPICS_RESULT,
2003                                             &cbs, options, rkqu->rkqu_q);
2004 
2005         rd_list_init(&rko->rko_u.admin_request.args, (int)del_topic_cnt,
2006                      rd_kafka_DeleteTopic_free);
2007 
2008         for (i = 0 ; i < del_topic_cnt ; i++)
2009                 rd_list_add(&rko->rko_u.admin_request.args,
2010                             rd_kafka_DeleteTopic_copy(del_topics[i]));
2011 
2012         rd_kafka_q_enq(rk->rk_ops, rko);
2013 }
2014 
2015 
2016 /**
2017  * @brief Get an array of topic results from a DeleteTopics result.
2018  *
2019  * The returned \p topics life-time is the same as the \p result object.
2020  * @param cntp is updated to the number of elements in the array.
2021  */
2022 const rd_kafka_topic_result_t **
rd_kafka_DeleteTopics_result_topics(const rd_kafka_DeleteTopics_result_t * result,size_t * cntp)2023 rd_kafka_DeleteTopics_result_topics (
2024         const rd_kafka_DeleteTopics_result_t *result,
2025         size_t *cntp) {
2026         return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result,
2027                                                 cntp);
2028 }
2029 
2030 
2031 
2032 
2033 /**
2034  * @name Create partitions
2035  * @{
2036  *
2037  *
2038  *
2039  *
2040  */
2041 
rd_kafka_NewPartitions_new(const char * topic,size_t new_total_cnt,char * errstr,size_t errstr_size)2042 rd_kafka_NewPartitions_t *rd_kafka_NewPartitions_new (const char *topic,
2043                                                       size_t new_total_cnt,
2044                                                       char *errstr,
2045                                                       size_t errstr_size) {
2046         size_t tsize = strlen(topic) + 1;
2047         rd_kafka_NewPartitions_t *newps;
2048 
2049         if (new_total_cnt < 1 || new_total_cnt > RD_KAFKAP_PARTITIONS_MAX) {
2050                 rd_snprintf(errstr, errstr_size, "new_total_cnt out of "
2051                             "expected range %d..%d",
2052                             1, RD_KAFKAP_PARTITIONS_MAX);
2053                 return NULL;
2054         }
2055 
2056         /* Single allocation */
2057         newps = rd_malloc(sizeof(*newps) + tsize);
2058         newps->total_cnt = new_total_cnt;
2059         newps->topic = newps->data;
2060         memcpy(newps->topic, topic, tsize);
2061 
2062         /* List of int32 lists */
2063         rd_list_init(&newps->replicas, 0, rd_list_destroy_free);
2064         rd_list_prealloc_elems(&newps->replicas, 0, new_total_cnt, 0/*nozero*/);
2065 
2066         return newps;
2067 }
2068 
2069 /**
2070  * @brief Topic name comparator for NewPartitions_t
2071  */
rd_kafka_NewPartitions_cmp(const void * _a,const void * _b)2072 static int rd_kafka_NewPartitions_cmp (const void *_a, const void *_b) {
2073         const rd_kafka_NewPartitions_t *a = _a, *b = _b;
2074         return strcmp(a->topic, b->topic);
2075 }
2076 
2077 
2078 /**
2079  * @brief Allocate a new CreatePartitions and make a copy of \p src
2080  */
2081 static rd_kafka_NewPartitions_t *
rd_kafka_NewPartitions_copy(const rd_kafka_NewPartitions_t * src)2082 rd_kafka_NewPartitions_copy (const rd_kafka_NewPartitions_t *src) {
2083         rd_kafka_NewPartitions_t *dst;
2084 
2085         dst = rd_kafka_NewPartitions_new(src->topic, src->total_cnt, NULL, 0);
2086 
2087         rd_list_destroy(&dst->replicas); /* created in .._new() */
2088         rd_list_init_copy(&dst->replicas, &src->replicas);
2089         rd_list_copy_to(&dst->replicas, &src->replicas,
2090                         rd_list_copy_preallocated, NULL);
2091 
2092         return dst;
2093 }
2094 
rd_kafka_NewPartitions_destroy(rd_kafka_NewPartitions_t * newps)2095 void rd_kafka_NewPartitions_destroy (rd_kafka_NewPartitions_t *newps) {
2096         rd_list_destroy(&newps->replicas);
2097         rd_free(newps);
2098 }
2099 
rd_kafka_NewPartitions_free(void * ptr)2100 static void rd_kafka_NewPartitions_free (void *ptr) {
2101         rd_kafka_NewPartitions_destroy(ptr);
2102 }
2103 
2104 
rd_kafka_NewPartitions_destroy_array(rd_kafka_NewPartitions_t ** newps,size_t newps_cnt)2105 void rd_kafka_NewPartitions_destroy_array (rd_kafka_NewPartitions_t **newps,
2106                                          size_t newps_cnt) {
2107         size_t i;
2108         for (i = 0 ; i < newps_cnt ; i++)
2109                 rd_kafka_NewPartitions_destroy(newps[i]);
2110 }
2111 
2112 
2113 
2114 
2115 
2116 rd_kafka_resp_err_t
rd_kafka_NewPartitions_set_replica_assignment(rd_kafka_NewPartitions_t * newp,int32_t new_partition_idx,int32_t * broker_ids,size_t broker_id_cnt,char * errstr,size_t errstr_size)2117 rd_kafka_NewPartitions_set_replica_assignment (rd_kafka_NewPartitions_t *newp,
2118                                                int32_t new_partition_idx,
2119                                                int32_t *broker_ids,
2120                                                size_t broker_id_cnt,
2121                                                char *errstr,
2122                                                size_t errstr_size) {
2123         rd_list_t *rl;
2124         int i;
2125 
2126         /* Replica partitions must be added consecutively starting from 0. */
2127         if (new_partition_idx != rd_list_cnt(&newp->replicas)) {
2128                 rd_snprintf(errstr, errstr_size,
2129                             "Partitions must be added in order, "
2130                             "starting at 0: expecting partition "
2131                             "index %d, not %"PRId32,
2132                             rd_list_cnt(&newp->replicas), new_partition_idx);
2133                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2134         }
2135 
2136         if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) {
2137                 rd_snprintf(errstr, errstr_size,
2138                             "Too many brokers specified "
2139                             "(RD_KAFKAP_BROKERS_MAX=%d)",
2140                             RD_KAFKAP_BROKERS_MAX);
2141                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2142         }
2143 
2144         rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt);
2145 
2146         for (i = 0 ; i < (int)broker_id_cnt ; i++)
2147                 rd_list_set_int32(rl, i, broker_ids[i]);
2148 
2149         rd_list_add(&newp->replicas, rl);
2150 
2151         return RD_KAFKA_RESP_ERR_NO_ERROR;
2152 }
2153 
2154 
2155 
2156 
2157 /**
2158  * @brief Parse CreatePartitionsResponse and create ADMIN_RESULT op.
2159  */
2160 static rd_kafka_resp_err_t
rd_kafka_CreatePartitionsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)2161 rd_kafka_CreatePartitionsResponse_parse (rd_kafka_op_t *rko_req,
2162                                          rd_kafka_op_t **rko_resultp,
2163                                          rd_kafka_buf_t *reply,
2164                                          char *errstr,
2165                                          size_t errstr_size) {
2166         const int log_decode_errors = LOG_ERR;
2167         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
2168         rd_kafka_t *rk = rkb->rkb_rk;
2169         rd_kafka_op_t *rko_result = NULL;
2170         int32_t topic_cnt;
2171         int i;
2172         int32_t Throttle_Time;
2173 
2174         rd_kafka_buf_read_i32(reply, &Throttle_Time);
2175         rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
2176 
2177         /* #topics */
2178         rd_kafka_buf_read_i32(reply, &topic_cnt);
2179 
2180         if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
2181                 rd_kafka_buf_parse_fail(
2182                         reply,
2183                         "Received %"PRId32" topics in response "
2184                         "when only %d were requested", topic_cnt,
2185                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
2186 
2187         rko_result = rd_kafka_admin_result_new(rko_req);
2188 
2189         rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt,
2190                      rd_kafka_topic_result_free);
2191 
2192         for (i = 0 ; i < (int)topic_cnt ; i++) {
2193                 rd_kafkap_str_t ktopic;
2194                 int16_t error_code;
2195                 char *this_errstr = NULL;
2196                 rd_kafka_topic_result_t *terr;
2197                 rd_kafka_NewTopic_t skel;
2198                 rd_kafkap_str_t error_msg;
2199                 int orig_pos;
2200 
2201                 rd_kafka_buf_read_str(reply, &ktopic);
2202                 rd_kafka_buf_read_i16(reply, &error_code);
2203                 rd_kafka_buf_read_str(reply, &error_msg);
2204 
2205                 /* For non-blocking CreatePartitionsRequests the broker
2206                  * will returned REQUEST_TIMED_OUT for topics
2207                  * that were triggered for creation -
2208                  * we hide this error code from the application
2209                  * since the topic creation is in fact in progress. */
2210                 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
2211                     rd_kafka_confval_get_int(&rko_req->rko_u.
2212                                              admin_request.options.
2213                                              operation_timeout) <= 0) {
2214                         error_code = RD_KAFKA_RESP_ERR_NO_ERROR;
2215                 }
2216 
2217                 if (error_code) {
2218                         if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
2219                             RD_KAFKAP_STR_LEN(&error_msg) == 0)
2220                                 this_errstr =
2221                                         (char *)rd_kafka_err2str(error_code);
2222                         else
2223                                 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
2224                 }
2225 
2226                 terr = rd_kafka_topic_result_new(ktopic.str,
2227                                                  RD_KAFKAP_STR_LEN(&ktopic),
2228                                                  error_code,
2229                                                  error_code ?
2230                                                  this_errstr : NULL);
2231 
2232                 /* As a convenience to the application we insert topic result
2233                  * in the same order as they were requested. The broker
2234                  * does not maintain ordering unfortunately. */
2235                 skel.topic = terr->topic;
2236                 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
2237                                          &skel, rd_kafka_NewPartitions_cmp);
2238                 if (orig_pos == -1) {
2239                         rd_kafka_topic_result_destroy(terr);
2240                         rd_kafka_buf_parse_fail(
2241                                 reply,
2242                                 "Broker returned topic %.*s that was not "
2243                                 "included in the original request",
2244                                 RD_KAFKAP_STR_PR(&ktopic));
2245                 }
2246 
2247                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
2248                                  orig_pos) != NULL) {
2249                         rd_kafka_topic_result_destroy(terr);
2250                         rd_kafka_buf_parse_fail(
2251                                 reply,
2252                                 "Broker returned topic %.*s multiple times",
2253                                 RD_KAFKAP_STR_PR(&ktopic));
2254                 }
2255 
2256                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
2257                             terr);
2258         }
2259 
2260         *rko_resultp = rko_result;
2261 
2262         return RD_KAFKA_RESP_ERR_NO_ERROR;
2263 
2264  err_parse:
2265         if (rko_result)
2266                 rd_kafka_op_destroy(rko_result);
2267 
2268         rd_snprintf(errstr, errstr_size,
2269                     "CreatePartitions response protocol parse failure: %s",
2270                     rd_kafka_err2str(reply->rkbuf_err));
2271 
2272         return reply->rkbuf_err;
2273 }
2274 
2275 
2276 
2277 
2278 
2279 
2280 
rd_kafka_CreatePartitions(rd_kafka_t * rk,rd_kafka_NewPartitions_t ** newps,size_t newps_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)2281 void rd_kafka_CreatePartitions (rd_kafka_t *rk,
2282                                 rd_kafka_NewPartitions_t **newps,
2283                                 size_t newps_cnt,
2284                                 const rd_kafka_AdminOptions_t *options,
2285                                 rd_kafka_queue_t *rkqu) {
2286         rd_kafka_op_t *rko;
2287         size_t i;
2288         static const struct rd_kafka_admin_worker_cbs cbs = {
2289                 rd_kafka_CreatePartitionsRequest,
2290                 rd_kafka_CreatePartitionsResponse_parse,
2291         };
2292 
2293         rd_assert(rkqu);
2294 
2295         rko = rd_kafka_admin_request_op_new(
2296                 rk,
2297                 RD_KAFKA_OP_CREATEPARTITIONS,
2298                 RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT,
2299                 &cbs, options, rkqu->rkqu_q);
2300 
2301         rd_list_init(&rko->rko_u.admin_request.args, (int)newps_cnt,
2302                      rd_kafka_NewPartitions_free);
2303 
2304         for (i = 0 ; i < newps_cnt ; i++)
2305                 rd_list_add(&rko->rko_u.admin_request.args,
2306                             rd_kafka_NewPartitions_copy(newps[i]));
2307 
2308         rd_kafka_q_enq(rk->rk_ops, rko);
2309 }
2310 
2311 
2312 /**
2313  * @brief Get an array of topic results from a CreatePartitions result.
2314  *
2315  * The returned \p topics life-time is the same as the \p result object.
2316  * @param cntp is updated to the number of elements in the array.
2317  */
2318 const rd_kafka_topic_result_t **
rd_kafka_CreatePartitions_result_topics(const rd_kafka_CreatePartitions_result_t * result,size_t * cntp)2319 rd_kafka_CreatePartitions_result_topics (
2320         const rd_kafka_CreatePartitions_result_t *result,
2321         size_t *cntp) {
2322         return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result,
2323                                                 cntp);
2324 }
2325 
2326 /**@}*/
2327 
2328 
2329 
2330 
2331 /**
2332  * @name ConfigEntry
2333  * @{
2334  *
2335  *
2336  *
2337  */
2338 
rd_kafka_ConfigEntry_destroy(rd_kafka_ConfigEntry_t * entry)2339 static void rd_kafka_ConfigEntry_destroy (rd_kafka_ConfigEntry_t *entry) {
2340         rd_strtup_destroy(entry->kv);
2341         rd_list_destroy(&entry->synonyms);
2342         rd_free(entry);
2343 }
2344 
2345 
rd_kafka_ConfigEntry_free(void * ptr)2346 static void rd_kafka_ConfigEntry_free (void *ptr) {
2347         rd_kafka_ConfigEntry_destroy((rd_kafka_ConfigEntry_t *)ptr);
2348 }
2349 
2350 
2351 /**
2352  * @brief Create new ConfigEntry
2353  *
2354  * @param name Config entry name
2355  * @param name_len Length of name, or -1 to use strlen()
2356  * @param value Config entry value, or NULL
2357  * @param value_len Length of value, or -1 to use strlen()
2358  */
2359 static rd_kafka_ConfigEntry_t *
rd_kafka_ConfigEntry_new0(const char * name,size_t name_len,const char * value,size_t value_len)2360 rd_kafka_ConfigEntry_new0 (const char *name, size_t name_len,
2361                            const char *value, size_t value_len) {
2362         rd_kafka_ConfigEntry_t *entry;
2363 
2364         if (!name)
2365                 return NULL;
2366 
2367         entry = rd_calloc(1, sizeof(*entry));
2368         entry->kv = rd_strtup_new0(name, name_len, value, value_len);
2369 
2370         rd_list_init(&entry->synonyms, 0, rd_kafka_ConfigEntry_free);
2371 
2372         entry->a.source = RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG;
2373 
2374         return entry;
2375 }
2376 
2377 /**
2378  * @sa rd_kafka_ConfigEntry_new0
2379  */
2380 static rd_kafka_ConfigEntry_t *
rd_kafka_ConfigEntry_new(const char * name,const char * value)2381 rd_kafka_ConfigEntry_new (const char *name, const char *value) {
2382         return rd_kafka_ConfigEntry_new0(name, -1, value, -1);
2383 }
2384 
2385 
2386 
2387 
2388 /**
2389  * @brief Allocate a new AlterConfigs and make a copy of \p src
2390  */
2391 static rd_kafka_ConfigEntry_t *
rd_kafka_ConfigEntry_copy(const rd_kafka_ConfigEntry_t * src)2392 rd_kafka_ConfigEntry_copy (const rd_kafka_ConfigEntry_t *src) {
2393         rd_kafka_ConfigEntry_t *dst;
2394 
2395         dst = rd_kafka_ConfigEntry_new(src->kv->name, src->kv->value);
2396         dst->a = src->a;
2397 
2398         rd_list_destroy(&dst->synonyms); /* created in .._new() */
2399         rd_list_init_copy(&dst->synonyms, &src->synonyms);
2400         rd_list_copy_to(&dst->synonyms, &src->synonyms,
2401                         rd_kafka_ConfigEntry_list_copy, NULL);
2402 
2403         return dst;
2404 }
2405 
rd_kafka_ConfigEntry_list_copy(const void * src,void * opaque)2406 static void *rd_kafka_ConfigEntry_list_copy (const void *src, void *opaque) {
2407         return rd_kafka_ConfigEntry_copy((const rd_kafka_ConfigEntry_t *)src);
2408 }
2409 
2410 
rd_kafka_ConfigEntry_name(const rd_kafka_ConfigEntry_t * entry)2411 const char *rd_kafka_ConfigEntry_name (const rd_kafka_ConfigEntry_t *entry) {
2412         return entry->kv->name;
2413 }
2414 
2415 const char *
rd_kafka_ConfigEntry_value(const rd_kafka_ConfigEntry_t * entry)2416 rd_kafka_ConfigEntry_value (const rd_kafka_ConfigEntry_t *entry) {
2417         return entry->kv->value;
2418 }
2419 
2420 rd_kafka_ConfigSource_t
rd_kafka_ConfigEntry_source(const rd_kafka_ConfigEntry_t * entry)2421 rd_kafka_ConfigEntry_source (const rd_kafka_ConfigEntry_t *entry) {
2422         return entry->a.source;
2423 }
2424 
rd_kafka_ConfigEntry_is_read_only(const rd_kafka_ConfigEntry_t * entry)2425 int rd_kafka_ConfigEntry_is_read_only (const rd_kafka_ConfigEntry_t *entry) {
2426         return entry->a.is_readonly;
2427 }
2428 
rd_kafka_ConfigEntry_is_default(const rd_kafka_ConfigEntry_t * entry)2429 int rd_kafka_ConfigEntry_is_default (const rd_kafka_ConfigEntry_t *entry) {
2430         return entry->a.is_default;
2431 }
2432 
rd_kafka_ConfigEntry_is_sensitive(const rd_kafka_ConfigEntry_t * entry)2433 int rd_kafka_ConfigEntry_is_sensitive (const rd_kafka_ConfigEntry_t *entry) {
2434         return entry->a.is_sensitive;
2435 }
2436 
rd_kafka_ConfigEntry_is_synonym(const rd_kafka_ConfigEntry_t * entry)2437 int rd_kafka_ConfigEntry_is_synonym (const rd_kafka_ConfigEntry_t *entry) {
2438         return entry->a.is_synonym;
2439 }
2440 
2441 const rd_kafka_ConfigEntry_t **
rd_kafka_ConfigEntry_synonyms(const rd_kafka_ConfigEntry_t * entry,size_t * cntp)2442 rd_kafka_ConfigEntry_synonyms (const rd_kafka_ConfigEntry_t *entry,
2443                                size_t *cntp) {
2444         *cntp = rd_list_cnt(&entry->synonyms);
2445         if (!*cntp)
2446                 return NULL;
2447         return (const rd_kafka_ConfigEntry_t **)entry->synonyms.rl_elems;
2448 
2449 }
2450 
2451 
2452 /**@}*/
2453 
2454 
2455 
2456 /**
2457  * @name ConfigSource
2458  * @{
2459  *
2460  *
2461  *
2462  */
2463 
2464 const char *
rd_kafka_ConfigSource_name(rd_kafka_ConfigSource_t confsource)2465 rd_kafka_ConfigSource_name (rd_kafka_ConfigSource_t confsource) {
2466         static const char *names[] = {
2467                 "UNKNOWN_CONFIG",
2468                 "DYNAMIC_TOPIC_CONFIG",
2469                 "DYNAMIC_BROKER_CONFIG",
2470                 "DYNAMIC_DEFAULT_BROKER_CONFIG",
2471                 "STATIC_BROKER_CONFIG",
2472                 "DEFAULT_CONFIG",
2473         };
2474 
2475         if ((unsigned int)confsource >=
2476             (unsigned int)RD_KAFKA_CONFIG_SOURCE__CNT)
2477                 return "UNSUPPORTED";
2478 
2479         return names[confsource];
2480 }
2481 
2482 /**@}*/
2483 
2484 
2485 
2486 /**
2487  * @name ConfigResource
2488  * @{
2489  *
2490  *
2491  *
2492  */
2493 
2494 const char *
rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype)2495 rd_kafka_ResourceType_name (rd_kafka_ResourceType_t restype) {
2496         static const char *names[] = {
2497                 "UNKNOWN",
2498                 "ANY",
2499                 "TOPIC",
2500                 "GROUP",
2501                 "BROKER",
2502         };
2503 
2504         if ((unsigned int)restype >=
2505             (unsigned int)RD_KAFKA_RESOURCE__CNT)
2506                 return "UNSUPPORTED";
2507 
2508         return names[restype];
2509 }
2510 
2511 
2512 rd_kafka_ConfigResource_t *
rd_kafka_ConfigResource_new(rd_kafka_ResourceType_t restype,const char * resname)2513 rd_kafka_ConfigResource_new (rd_kafka_ResourceType_t restype,
2514                              const char *resname) {
2515         rd_kafka_ConfigResource_t *config;
2516         size_t namesz = resname ? strlen(resname) : 0;
2517 
2518         if (!namesz || (int)restype < 0)
2519                 return NULL;
2520 
2521         config = rd_calloc(1, sizeof(*config) + namesz + 1);
2522         config->name = config->data;
2523         memcpy(config->name, resname, namesz + 1);
2524         config->restype = restype;
2525 
2526         rd_list_init(&config->config, 8, rd_kafka_ConfigEntry_free);
2527 
2528         return config;
2529 }
2530 
rd_kafka_ConfigResource_destroy(rd_kafka_ConfigResource_t * config)2531 void rd_kafka_ConfigResource_destroy (rd_kafka_ConfigResource_t *config) {
2532         rd_list_destroy(&config->config);
2533         if (config->errstr)
2534                 rd_free(config->errstr);
2535         rd_free(config);
2536 }
2537 
rd_kafka_ConfigResource_free(void * ptr)2538 static void rd_kafka_ConfigResource_free (void *ptr) {
2539         rd_kafka_ConfigResource_destroy((rd_kafka_ConfigResource_t *)ptr);
2540 }
2541 
2542 
rd_kafka_ConfigResource_destroy_array(rd_kafka_ConfigResource_t ** config,size_t config_cnt)2543 void rd_kafka_ConfigResource_destroy_array (rd_kafka_ConfigResource_t **config,
2544                                             size_t config_cnt) {
2545         size_t i;
2546         for (i = 0 ; i < config_cnt ; i++)
2547                 rd_kafka_ConfigResource_destroy(config[i]);
2548 }
2549 
2550 
2551 /**
2552  * @brief Type and name comparator for ConfigResource_t
2553  */
rd_kafka_ConfigResource_cmp(const void * _a,const void * _b)2554 static int rd_kafka_ConfigResource_cmp (const void *_a, const void *_b) {
2555         const rd_kafka_ConfigResource_t *a = _a, *b = _b;
2556         int r = RD_CMP(a->restype, b->restype);
2557         if (r)
2558                 return r;
2559         return strcmp(a->name, b->name);
2560 }
2561 
2562 /**
2563  * @brief Allocate a new AlterConfigs and make a copy of \p src
2564  */
2565 static rd_kafka_ConfigResource_t *
rd_kafka_ConfigResource_copy(const rd_kafka_ConfigResource_t * src)2566 rd_kafka_ConfigResource_copy (const rd_kafka_ConfigResource_t *src) {
2567         rd_kafka_ConfigResource_t *dst;
2568 
2569         dst = rd_kafka_ConfigResource_new(src->restype, src->name);
2570 
2571         rd_list_destroy(&dst->config); /* created in .._new() */
2572         rd_list_init_copy(&dst->config, &src->config);
2573         rd_list_copy_to(&dst->config, &src->config,
2574                         rd_kafka_ConfigEntry_list_copy, NULL);
2575 
2576         return dst;
2577 }
2578 
2579 
2580 static void
rd_kafka_ConfigResource_add_ConfigEntry(rd_kafka_ConfigResource_t * config,rd_kafka_ConfigEntry_t * entry)2581 rd_kafka_ConfigResource_add_ConfigEntry (rd_kafka_ConfigResource_t *config,
2582                                          rd_kafka_ConfigEntry_t *entry) {
2583         rd_list_add(&config->config, entry);
2584 }
2585 
2586 
2587 rd_kafka_resp_err_t
rd_kafka_ConfigResource_add_config(rd_kafka_ConfigResource_t * config,const char * name,const char * value)2588 rd_kafka_ConfigResource_add_config (rd_kafka_ConfigResource_t *config,
2589                                     const char *name, const char *value) {
2590         if (!name || !*name || !value)
2591                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2592 
2593         return rd_kafka_admin_add_config0(&config->config, name, value,
2594                                           RD_KAFKA_ALTER_OP_ADD);
2595 }
2596 
2597 rd_kafka_resp_err_t
rd_kafka_ConfigResource_set_config(rd_kafka_ConfigResource_t * config,const char * name,const char * value)2598 rd_kafka_ConfigResource_set_config (rd_kafka_ConfigResource_t *config,
2599                                     const char *name, const char *value) {
2600         if (!name || !*name || !value)
2601                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2602 
2603         return rd_kafka_admin_add_config0(&config->config, name, value,
2604                                           RD_KAFKA_ALTER_OP_SET);
2605 }
2606 
2607 rd_kafka_resp_err_t
rd_kafka_ConfigResource_delete_config(rd_kafka_ConfigResource_t * config,const char * name)2608 rd_kafka_ConfigResource_delete_config (rd_kafka_ConfigResource_t *config,
2609                                        const char *name) {
2610         if (!name || !*name)
2611                 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2612 
2613         return rd_kafka_admin_add_config0(&config->config, name, NULL,
2614                                           RD_KAFKA_ALTER_OP_DELETE);
2615 }
2616 
2617 
2618 const rd_kafka_ConfigEntry_t **
rd_kafka_ConfigResource_configs(const rd_kafka_ConfigResource_t * config,size_t * cntp)2619 rd_kafka_ConfigResource_configs (const rd_kafka_ConfigResource_t *config,
2620                                  size_t *cntp) {
2621         *cntp = rd_list_cnt(&config->config);
2622         if (!*cntp)
2623                 return NULL;
2624         return (const rd_kafka_ConfigEntry_t **)config->config.rl_elems;
2625 }
2626 
2627 
2628 
2629 
2630 rd_kafka_ResourceType_t
rd_kafka_ConfigResource_type(const rd_kafka_ConfigResource_t * config)2631 rd_kafka_ConfigResource_type (const rd_kafka_ConfigResource_t *config) {
2632         return config->restype;
2633 }
2634 
2635 const char *
rd_kafka_ConfigResource_name(const rd_kafka_ConfigResource_t * config)2636 rd_kafka_ConfigResource_name (const rd_kafka_ConfigResource_t *config) {
2637         return config->name;
2638 }
2639 
2640 rd_kafka_resp_err_t
rd_kafka_ConfigResource_error(const rd_kafka_ConfigResource_t * config)2641 rd_kafka_ConfigResource_error (const rd_kafka_ConfigResource_t *config) {
2642         return config->err;
2643 }
2644 
2645 const char *
rd_kafka_ConfigResource_error_string(const rd_kafka_ConfigResource_t * config)2646 rd_kafka_ConfigResource_error_string (const rd_kafka_ConfigResource_t *config) {
2647         if (!config->err)
2648                 return NULL;
2649         if (config->errstr)
2650                 return config->errstr;
2651         return rd_kafka_err2str(config->err);
2652 }
2653 
2654 
2655 /**
2656  * @brief Look in the provided ConfigResource_t* list for a resource of
2657  *        type BROKER and set its broker id in \p broker_id, returning
2658  *        RD_KAFKA_RESP_ERR_NO_ERROR.
2659  *
2660  *        If multiple BROKER resources are found RD_KAFKA_RESP_ERR__CONFLICT
2661  *        is returned and an error string is written to errstr.
2662  *
2663  *        If no BROKER resources are found RD_KAFKA_RESP_ERR_NO_ERROR
2664  *        is returned and \p broker_idp is set to use the coordinator.
2665  */
2666 static rd_kafka_resp_err_t
rd_kafka_ConfigResource_get_single_broker_id(const rd_list_t * configs,int32_t * broker_idp,char * errstr,size_t errstr_size)2667 rd_kafka_ConfigResource_get_single_broker_id (const rd_list_t *configs,
2668                                               int32_t *broker_idp,
2669                                               char *errstr,
2670                                               size_t errstr_size) {
2671         const rd_kafka_ConfigResource_t *config;
2672         int i;
2673         int32_t broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; /* Some default
2674                                                                * value that we
2675                                                                * can compare
2676                                                                * to below */
2677 
2678         RD_LIST_FOREACH(config, configs, i) {
2679                 char *endptr;
2680                 long int r;
2681 
2682                 if (config->restype != RD_KAFKA_RESOURCE_BROKER)
2683                         continue;
2684 
2685                 if (broker_id != RD_KAFKA_ADMIN_TARGET_CONTROLLER) {
2686                         rd_snprintf(errstr, errstr_size,
2687                                     "Only one ConfigResource of type BROKER "
2688                                     "is allowed per call");
2689                         return RD_KAFKA_RESP_ERR__CONFLICT;
2690                 }
2691 
2692                 /* Convert string broker-id to int32 */
2693                 r = (int32_t)strtol(config->name, &endptr, 10);
2694                 if (r == LONG_MIN || r == LONG_MAX || config->name == endptr ||
2695                     r < 0) {
2696                         rd_snprintf(errstr, errstr_size,
2697                                     "Expected an int32 broker_id for "
2698                                     "ConfigResource(type=BROKER, name=%s)",
2699                                     config->name);
2700                         return RD_KAFKA_RESP_ERR__INVALID_ARG;
2701                 }
2702 
2703                 broker_id = r;
2704 
2705                 /* Keep scanning to make sure there are no duplicate
2706                  * BROKER resources. */
2707         }
2708 
2709         *broker_idp = broker_id;
2710 
2711         return RD_KAFKA_RESP_ERR_NO_ERROR;
2712 }
2713 
2714 
2715 /**@}*/
2716 
2717 
2718 
2719 /**
2720  * @name AlterConfigs
2721  * @{
2722  *
2723  *
2724  *
2725  */
2726 
2727 
2728 
2729 /**
2730  * @brief Parse AlterConfigsResponse and create ADMIN_RESULT op.
2731  */
2732 static rd_kafka_resp_err_t
rd_kafka_AlterConfigsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)2733 rd_kafka_AlterConfigsResponse_parse (rd_kafka_op_t *rko_req,
2734                                      rd_kafka_op_t **rko_resultp,
2735                                      rd_kafka_buf_t *reply,
2736                                      char *errstr, size_t errstr_size) {
2737         const int log_decode_errors = LOG_ERR;
2738         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
2739         rd_kafka_t *rk = rkb->rkb_rk;
2740         rd_kafka_op_t *rko_result = NULL;
2741         int32_t res_cnt;
2742         int i;
2743         int32_t Throttle_Time;
2744 
2745         rd_kafka_buf_read_i32(reply, &Throttle_Time);
2746         rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
2747 
2748         rd_kafka_buf_read_i32(reply, &res_cnt);
2749 
2750         if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) {
2751                 rd_snprintf(errstr, errstr_size,
2752                             "Received %"PRId32" ConfigResources in response "
2753                             "when only %d were requested", res_cnt,
2754                             rd_list_cnt(&rko_req->rko_u.admin_request.args));
2755                 return RD_KAFKA_RESP_ERR__BAD_MSG;
2756         }
2757 
2758         rko_result = rd_kafka_admin_result_new(rko_req);
2759 
2760         rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt,
2761                      rd_kafka_ConfigResource_free);
2762 
2763         for (i = 0 ; i < (int)res_cnt ; i++) {
2764                 int16_t error_code;
2765                 rd_kafkap_str_t error_msg;
2766                 int8_t res_type;
2767                 rd_kafkap_str_t kres_name;
2768                 char *res_name;
2769                 char *this_errstr = NULL;
2770                 rd_kafka_ConfigResource_t *config;
2771                 rd_kafka_ConfigResource_t skel;
2772                 int orig_pos;
2773 
2774                 rd_kafka_buf_read_i16(reply, &error_code);
2775                 rd_kafka_buf_read_str(reply, &error_msg);
2776                 rd_kafka_buf_read_i8(reply, &res_type);
2777                 rd_kafka_buf_read_str(reply, &kres_name);
2778                 RD_KAFKAP_STR_DUPA(&res_name, &kres_name);
2779 
2780                 if (error_code) {
2781                         if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
2782                             RD_KAFKAP_STR_LEN(&error_msg) == 0)
2783                                 this_errstr =
2784                                         (char *)rd_kafka_err2str(error_code);
2785                         else
2786                                 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
2787                 }
2788 
2789                 config = rd_kafka_ConfigResource_new(res_type, res_name);
2790                 if (!config) {
2791                         rd_kafka_log(rko_req->rko_rk, LOG_ERR,
2792                                      "ADMIN", "AlterConfigs returned "
2793                                      "unsupported ConfigResource #%d with "
2794                                      "type %d and name \"%s\": ignoring",
2795                                      i, res_type, res_name);
2796                         continue;
2797                 }
2798 
2799                 config->err = error_code;
2800                 if (this_errstr)
2801                         config->errstr = rd_strdup(this_errstr);
2802 
2803                 /* As a convenience to the application we insert result
2804                  * in the same order as they were requested. The broker
2805                  * does not maintain ordering unfortunately. */
2806                 skel.restype = config->restype;
2807                 skel.name = config->name;
2808                 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
2809                                          &skel, rd_kafka_ConfigResource_cmp);
2810                 if (orig_pos == -1) {
2811                         rd_kafka_ConfigResource_destroy(config);
2812                         rd_kafka_buf_parse_fail(
2813                                 reply,
2814                                 "Broker returned ConfigResource %d,%s "
2815                                 "that was not "
2816                                 "included in the original request",
2817                                 res_type, res_name);
2818                 }
2819 
2820                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
2821                                  orig_pos) != NULL) {
2822                         rd_kafka_ConfigResource_destroy(config);
2823                         rd_kafka_buf_parse_fail(
2824                                 reply,
2825                                 "Broker returned ConfigResource %d,%s "
2826                                 "multiple times",
2827                                 res_type, res_name);
2828                 }
2829 
2830                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
2831                             config);
2832         }
2833 
2834         *rko_resultp = rko_result;
2835 
2836         return RD_KAFKA_RESP_ERR_NO_ERROR;
2837 
2838  err_parse:
2839         if (rko_result)
2840                 rd_kafka_op_destroy(rko_result);
2841 
2842         rd_snprintf(errstr, errstr_size,
2843                     "AlterConfigs response protocol parse failure: %s",
2844                     rd_kafka_err2str(reply->rkbuf_err));
2845 
2846         return reply->rkbuf_err;
2847 }
2848 
2849 
2850 
2851 
rd_kafka_AlterConfigs(rd_kafka_t * rk,rd_kafka_ConfigResource_t ** configs,size_t config_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)2852 void rd_kafka_AlterConfigs (rd_kafka_t *rk,
2853                             rd_kafka_ConfigResource_t **configs,
2854                             size_t config_cnt,
2855                             const rd_kafka_AdminOptions_t *options,
2856                             rd_kafka_queue_t *rkqu) {
2857         rd_kafka_op_t *rko;
2858         size_t i;
2859         rd_kafka_resp_err_t err;
2860         char errstr[256];
2861         static const struct rd_kafka_admin_worker_cbs cbs = {
2862                 rd_kafka_AlterConfigsRequest,
2863                 rd_kafka_AlterConfigsResponse_parse,
2864         };
2865 
2866         rd_assert(rkqu);
2867 
2868         rko = rd_kafka_admin_request_op_new(
2869                 rk,
2870                 RD_KAFKA_OP_ALTERCONFIGS,
2871                 RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
2872                 &cbs, options, rkqu->rkqu_q);
2873 
2874         rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt,
2875                      rd_kafka_ConfigResource_free);
2876 
2877         for (i = 0 ; i < config_cnt ; i++)
2878                 rd_list_add(&rko->rko_u.admin_request.args,
2879                             rd_kafka_ConfigResource_copy(configs[i]));
2880 
2881         /* If there's a BROKER resource in the list we need to
2882          * speak directly to that broker rather than the controller.
2883          *
2884          * Multiple BROKER resources are not allowed.
2885          */
2886         err = rd_kafka_ConfigResource_get_single_broker_id(
2887                 &rko->rko_u.admin_request.args,
2888                 &rko->rko_u.admin_request.broker_id,
2889                 errstr, sizeof(errstr));
2890         if (err) {
2891                 rd_kafka_admin_result_fail(rko, err, "%s", errstr);
2892                 rd_kafka_admin_common_worker_destroy(rk, rko,
2893                                                      rd_true/*destroy*/);
2894                 return;
2895         }
2896 
2897         rd_kafka_q_enq(rk->rk_ops, rko);
2898 }
2899 
2900 
2901 const rd_kafka_ConfigResource_t **
rd_kafka_AlterConfigs_result_resources(const rd_kafka_AlterConfigs_result_t * result,size_t * cntp)2902 rd_kafka_AlterConfigs_result_resources (
2903         const rd_kafka_AlterConfigs_result_t *result,
2904         size_t *cntp) {
2905         return rd_kafka_admin_result_ret_resources(
2906                 (const rd_kafka_op_t *)result, cntp);
2907 }
2908 
2909 /**@}*/
2910 
2911 
2912 
2913 
2914 /**
2915  * @name DescribeConfigs
2916  * @{
2917  *
2918  *
2919  *
2920  */
2921 
2922 
2923 /**
2924  * @brief Parse DescribeConfigsResponse and create ADMIN_RESULT op.
2925  */
2926 static rd_kafka_resp_err_t
rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)2927 rd_kafka_DescribeConfigsResponse_parse (rd_kafka_op_t *rko_req,
2928                                         rd_kafka_op_t **rko_resultp,
2929                                         rd_kafka_buf_t *reply,
2930                                         char *errstr, size_t errstr_size) {
2931         const int log_decode_errors = LOG_ERR;
2932         rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
2933         rd_kafka_t *rk = rkb->rkb_rk;
2934         rd_kafka_op_t *rko_result = NULL;
2935         int32_t res_cnt;
2936         int i;
2937         int32_t Throttle_Time;
2938         rd_kafka_ConfigResource_t *config = NULL;
2939         rd_kafka_ConfigEntry_t *entry = NULL;
2940 
2941         rd_kafka_buf_read_i32(reply, &Throttle_Time);
2942         rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
2943 
2944         /* #resources */
2945         rd_kafka_buf_read_i32(reply, &res_cnt);
2946 
2947         if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
2948                 rd_kafka_buf_parse_fail(
2949                         reply,
2950                         "Received %"PRId32" ConfigResources in response "
2951                         "when only %d were requested", res_cnt,
2952                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
2953 
2954         rko_result = rd_kafka_admin_result_new(rko_req);
2955 
2956         rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt,
2957                      rd_kafka_ConfigResource_free);
2958 
2959         for (i = 0 ; i < (int)res_cnt ; i++) {
2960                 int16_t error_code;
2961                 rd_kafkap_str_t error_msg;
2962                 int8_t res_type;
2963                 rd_kafkap_str_t kres_name;
2964                 char *res_name;
2965                 char *this_errstr = NULL;
2966                 rd_kafka_ConfigResource_t skel;
2967                 int orig_pos;
2968                 int32_t entry_cnt;
2969                 int ci;
2970 
2971                 rd_kafka_buf_read_i16(reply, &error_code);
2972                 rd_kafka_buf_read_str(reply, &error_msg);
2973                 rd_kafka_buf_read_i8(reply, &res_type);
2974                 rd_kafka_buf_read_str(reply, &kres_name);
2975                 RD_KAFKAP_STR_DUPA(&res_name, &kres_name);
2976 
2977                 if (error_code) {
2978                         if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
2979                             RD_KAFKAP_STR_LEN(&error_msg) == 0)
2980                                 this_errstr =
2981                                         (char *)rd_kafka_err2str(error_code);
2982                         else
2983                                 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
2984                 }
2985 
2986                 config = rd_kafka_ConfigResource_new(res_type, res_name);
2987                 if (!config) {
2988                         rd_kafka_log(rko_req->rko_rk, LOG_ERR,
2989                                      "ADMIN", "DescribeConfigs returned "
2990                                      "unsupported ConfigResource #%d with "
2991                                      "type %d and name \"%s\": ignoring",
2992                                      i, res_type, res_name);
2993                         continue;
2994                 }
2995 
2996                 config->err = error_code;
2997                 if (this_errstr)
2998                         config->errstr = rd_strdup(this_errstr);
2999 
3000                 /* #config_entries */
3001                 rd_kafka_buf_read_i32(reply, &entry_cnt);
3002 
3003                 for (ci = 0 ; ci < (int)entry_cnt ; ci++) {
3004                         rd_kafkap_str_t config_name, config_value;
3005                         int32_t syn_cnt;
3006                         int si;
3007 
3008                         rd_kafka_buf_read_str(reply, &config_name);
3009                         rd_kafka_buf_read_str(reply, &config_value);
3010 
3011                         entry = rd_kafka_ConfigEntry_new0(
3012                                 config_name.str,
3013                                 RD_KAFKAP_STR_LEN(&config_name),
3014                                 config_value.str,
3015                                 RD_KAFKAP_STR_LEN(&config_value));
3016 
3017                         rd_kafka_buf_read_bool(reply, &entry->a.is_readonly);
3018 
3019                         /* ApiVersion 0 has is_default field, while
3020                          * ApiVersion 1 has source field.
3021                          * Convert between the two so they look the same
3022                          * to the caller. */
3023                         if (rd_kafka_buf_ApiVersion(reply) == 0) {
3024                                 rd_kafka_buf_read_bool(reply,
3025                                                        &entry->a.is_default);
3026                                 if (entry->a.is_default)
3027                                         entry->a.source =
3028                                                 RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG;
3029                         } else {
3030                                 int8_t config_source;
3031                                 rd_kafka_buf_read_i8(reply, &config_source);
3032                                 entry->a.source = config_source;
3033 
3034                                 if (entry->a.source ==
3035                                     RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
3036                                         entry->a.is_default = 1;
3037 
3038                         }
3039 
3040                         rd_kafka_buf_read_bool(reply, &entry->a.is_sensitive);
3041 
3042 
3043                         if (rd_kafka_buf_ApiVersion(reply) == 1) {
3044                                 /* #config_synonyms (ApiVersion 1) */
3045                                 rd_kafka_buf_read_i32(reply, &syn_cnt);
3046 
3047                                 if (syn_cnt > 100000)
3048                                         rd_kafka_buf_parse_fail(
3049                                                 reply,
3050                                                 "Broker returned %"PRId32
3051                                                 " config synonyms for "
3052                                                 "ConfigResource %d,%s: "
3053                                                 "limit is 100000",
3054                                                 syn_cnt,
3055                                                 config->restype,
3056                                                 config->name);
3057 
3058                                 if (syn_cnt > 0)
3059                                         rd_list_grow(&entry->synonyms, syn_cnt);
3060 
3061                         } else {
3062                                 /* No synonyms in ApiVersion 0 */
3063                                 syn_cnt = 0;
3064                         }
3065 
3066 
3067 
3068                         /* Read synonyms (ApiVersion 1) */
3069                         for (si = 0 ; si < (int)syn_cnt ; si++) {
3070                                 rd_kafkap_str_t syn_name, syn_value;
3071                                 int8_t syn_source;
3072                                 rd_kafka_ConfigEntry_t *syn_entry;
3073 
3074                                 rd_kafka_buf_read_str(reply, &syn_name);
3075                                 rd_kafka_buf_read_str(reply, &syn_value);
3076                                 rd_kafka_buf_read_i8(reply, &syn_source);
3077 
3078                                 syn_entry = rd_kafka_ConfigEntry_new0(
3079                                         syn_name.str,
3080                                         RD_KAFKAP_STR_LEN(&syn_name),
3081                                         syn_value.str,
3082                                         RD_KAFKAP_STR_LEN(&syn_value));
3083                                 if (!syn_entry)
3084                                         rd_kafka_buf_parse_fail(
3085                                                 reply,
3086                                                 "Broker returned invalid "
3087                                                 "synonym #%d "
3088                                                 "for ConfigEntry #%d (%s) "
3089                                                 "and ConfigResource %d,%s: "
3090                                                 "syn_name.len %d, "
3091                                                 "syn_value.len %d",
3092                                                 si, ci, entry->kv->name,
3093                                                 config->restype, config->name,
3094                                                 (int)syn_name.len,
3095                                                 (int)syn_value.len);
3096 
3097                                 syn_entry->a.source = syn_source;
3098                                 syn_entry->a.is_synonym = 1;
3099 
3100                                 rd_list_add(&entry->synonyms, syn_entry);
3101                         }
3102 
3103                         rd_kafka_ConfigResource_add_ConfigEntry(
3104                                 config, entry);
3105                         entry = NULL;
3106                 }
3107 
3108                 /* As a convenience to the application we insert result
3109                  * in the same order as they were requested. The broker
3110                  * does not maintain ordering unfortunately. */
3111                 skel.restype = config->restype;
3112                 skel.name = config->name;
3113                 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
3114                                          &skel, rd_kafka_ConfigResource_cmp);
3115                 if (orig_pos == -1)
3116                         rd_kafka_buf_parse_fail(
3117                                 reply,
3118                                 "Broker returned ConfigResource %d,%s "
3119                                 "that was not "
3120                                 "included in the original request",
3121                                 res_type, res_name);
3122 
3123                 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
3124                                  orig_pos) != NULL)
3125                         rd_kafka_buf_parse_fail(
3126                                 reply,
3127                                 "Broker returned ConfigResource %d,%s "
3128                                 "multiple times",
3129                                 res_type, res_name);
3130 
3131                 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
3132                             config);
3133                 config = NULL;
3134         }
3135 
3136         *rko_resultp = rko_result;
3137 
3138         return RD_KAFKA_RESP_ERR_NO_ERROR;
3139 
3140  err_parse:
3141         if (entry)
3142                 rd_kafka_ConfigEntry_destroy(entry);
3143         if (config)
3144                 rd_kafka_ConfigResource_destroy(config);
3145 
3146         if (rko_result)
3147                 rd_kafka_op_destroy(rko_result);
3148 
3149         rd_snprintf(errstr, errstr_size,
3150                     "DescribeConfigs response protocol parse failure: %s",
3151                     rd_kafka_err2str(reply->rkbuf_err));
3152 
3153         return reply->rkbuf_err;
3154 }
3155 
3156 
3157 
rd_kafka_DescribeConfigs(rd_kafka_t * rk,rd_kafka_ConfigResource_t ** configs,size_t config_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)3158 void rd_kafka_DescribeConfigs (rd_kafka_t *rk,
3159                                rd_kafka_ConfigResource_t **configs,
3160                                size_t config_cnt,
3161                                const rd_kafka_AdminOptions_t *options,
3162                                rd_kafka_queue_t *rkqu) {
3163         rd_kafka_op_t *rko;
3164         size_t i;
3165         rd_kafka_resp_err_t err;
3166         char errstr[256];
3167         static const struct rd_kafka_admin_worker_cbs cbs = {
3168                 rd_kafka_DescribeConfigsRequest,
3169                 rd_kafka_DescribeConfigsResponse_parse,
3170         };
3171 
3172         rd_assert(rkqu);
3173 
3174         rko = rd_kafka_admin_request_op_new(
3175                 rk,
3176                 RD_KAFKA_OP_DESCRIBECONFIGS,
3177                 RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
3178                 &cbs, options, rkqu->rkqu_q);
3179 
3180         rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt,
3181                      rd_kafka_ConfigResource_free);
3182 
3183         for (i = 0 ; i < config_cnt ; i++)
3184                 rd_list_add(&rko->rko_u.admin_request.args,
3185                             rd_kafka_ConfigResource_copy(configs[i]));
3186 
3187         /* If there's a BROKER resource in the list we need to
3188          * speak directly to that broker rather than the controller.
3189          *
3190          * Multiple BROKER resources are not allowed.
3191          */
3192         err = rd_kafka_ConfigResource_get_single_broker_id(
3193                 &rko->rko_u.admin_request.args,
3194                 &rko->rko_u.admin_request.broker_id,
3195                 errstr, sizeof(errstr));
3196         if (err) {
3197                 rd_kafka_admin_result_fail(rko, err, "%s", errstr);
3198                 rd_kafka_admin_common_worker_destroy(rk, rko,
3199                                                      rd_true/*destroy*/);
3200                 return;
3201         }
3202 
3203         rd_kafka_q_enq(rk->rk_ops, rko);
3204 }
3205 
3206 
3207 
3208 
3209 const rd_kafka_ConfigResource_t **
rd_kafka_DescribeConfigs_result_resources(const rd_kafka_DescribeConfigs_result_t * result,size_t * cntp)3210 rd_kafka_DescribeConfigs_result_resources (
3211         const rd_kafka_DescribeConfigs_result_t *result,
3212         size_t *cntp) {
3213         return rd_kafka_admin_result_ret_resources(
3214                 (const rd_kafka_op_t *)result, cntp);
3215 }
3216 
3217 /**@}*/
3218 
3219 /**
3220  * @name Delete Records
3221  * @{
3222  *
3223  *
3224  *
3225  *
3226  */
3227 
3228 rd_kafka_DeleteRecords_t *
rd_kafka_DeleteRecords_new(const rd_kafka_topic_partition_list_t * before_offsets)3229 rd_kafka_DeleteRecords_new (const rd_kafka_topic_partition_list_t *
3230                             before_offsets) {
3231         rd_kafka_DeleteRecords_t *del_records;
3232 
3233         del_records = rd_calloc(1, sizeof(*del_records));
3234         del_records->offsets =
3235                 rd_kafka_topic_partition_list_copy(before_offsets);
3236 
3237         return del_records;
3238 }
3239 
rd_kafka_DeleteRecords_destroy(rd_kafka_DeleteRecords_t * del_records)3240 void rd_kafka_DeleteRecords_destroy (rd_kafka_DeleteRecords_t *del_records) {
3241         rd_kafka_topic_partition_list_destroy(del_records->offsets);
3242         rd_free(del_records);
3243 }
3244 
rd_kafka_DeleteRecords_destroy_array(rd_kafka_DeleteRecords_t ** del_records,size_t del_record_cnt)3245 void rd_kafka_DeleteRecords_destroy_array (rd_kafka_DeleteRecords_t **
3246                                            del_records,
3247                                            size_t del_record_cnt) {
3248         size_t i;
3249         for (i = 0 ; i < del_record_cnt ; i++)
3250                 rd_kafka_DeleteRecords_destroy(del_records[i]);
3251 }
3252 
3253 
3254 
3255 /** @brief Merge the DeleteRecords response from a single broker
3256  *         into the user response list.
3257  */
3258 static void
rd_kafka_DeleteRecords_response_merge(rd_kafka_op_t * rko_fanout,const rd_kafka_op_t * rko_partial)3259 rd_kafka_DeleteRecords_response_merge (rd_kafka_op_t *rko_fanout,
3260                                        const rd_kafka_op_t *rko_partial) {
3261         rd_kafka_t *rk = rko_fanout->rko_rk;
3262         const rd_kafka_topic_partition_list_t *partitions;
3263         rd_kafka_topic_partition_list_t *respartitions;
3264         const rd_kafka_topic_partition_t *partition;
3265 
3266         rd_assert(rko_partial->rko_evtype ==
3267                   RD_KAFKA_EVENT_DELETERECORDS_RESULT);
3268 
3269         /* All partitions (offsets) from the DeleteRecords() call */
3270         respartitions = rd_list_elem(&rko_fanout->rko_u.admin_request.
3271                                      fanout.results, 0);
3272 
3273         if (rko_partial->rko_err) {
3274                 /* If there was a request-level error, set the error on
3275                  * all requested partitions for this request. */
3276                 const rd_kafka_topic_partition_list_t *reqpartitions;
3277                 rd_kafka_topic_partition_t *reqpartition;
3278 
3279                 /* Partitions (offsets) from this DeleteRecordsRequest */
3280                 reqpartitions = rd_list_elem(&rko_partial->rko_u.
3281                                              admin_result.args, 0);
3282 
3283                 RD_KAFKA_TPLIST_FOREACH(reqpartition, reqpartitions) {
3284                         rd_kafka_topic_partition_t *respart;
3285 
3286                         /* Find result partition */
3287                         respart = rd_kafka_topic_partition_list_find(
3288                                 respartitions,
3289                                 reqpartition->topic,
3290                                 reqpartition->partition);
3291 
3292                         rd_assert(respart || !*"respart not found");
3293 
3294                         respart->err = rko_partial->rko_err;
3295                 }
3296 
3297                 return;
3298         }
3299 
3300         /* Partitions from the DeleteRecordsResponse */
3301         partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0);
3302 
3303         RD_KAFKA_TPLIST_FOREACH(partition, partitions) {
3304                 rd_kafka_topic_partition_t *respart;
3305 
3306 
3307                 /* Find result partition */
3308                 respart = rd_kafka_topic_partition_list_find(
3309                         respartitions,
3310                         partition->topic,
3311                         partition->partition);
3312                 if (unlikely(!respart)) {
3313                         rd_dassert(!*"partition not found");
3314 
3315                         rd_kafka_log(rk, LOG_WARNING, "DELETERECORDS",
3316                                      "DeleteRecords response contains "
3317                                      "unexpected %s [%"PRId32"] which "
3318                                      "was not in the request list: ignored",
3319                                      partition->topic, partition->partition);
3320                         continue;
3321                 }
3322 
3323                 respart->offset = partition->offset;
3324                 respart->err = partition->err;
3325         }
3326 }
3327 
3328 
3329 
3330 /**
3331  * @brief Parse DeleteRecordsResponse and create ADMIN_RESULT op.
3332  */
3333 static rd_kafka_resp_err_t
rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)3334 rd_kafka_DeleteRecordsResponse_parse (rd_kafka_op_t *rko_req,
3335                                       rd_kafka_op_t **rko_resultp,
3336                                       rd_kafka_buf_t *reply,
3337                                       char *errstr, size_t errstr_size) {
3338         const int log_decode_errors = LOG_ERR;
3339         rd_kafka_op_t *rko_result;
3340         rd_kafka_topic_partition_list_t *offsets;
3341 
3342         rd_kafka_buf_read_throttle_time(reply);
3343 
3344         offsets = rd_kafka_buf_read_topic_partitions(reply, 0,
3345                                                      rd_true/*read_offset*/,
3346                                                      rd_true/*read_part_errs*/);
3347         if (!offsets)
3348                 rd_kafka_buf_parse_fail(reply,
3349                                         "Failed to parse topic partitions");
3350 
3351 
3352         rko_result = rd_kafka_admin_result_new(rko_req);
3353         rd_list_init(&rko_result->rko_u.admin_result.results, 1,
3354                      rd_kafka_topic_partition_list_destroy_free);
3355         rd_list_add(&rko_result->rko_u.admin_result.results, offsets);
3356         *rko_resultp = rko_result;
3357         return RD_KAFKA_RESP_ERR_NO_ERROR;
3358 
3359 err_parse:
3360         rd_snprintf(errstr, errstr_size,
3361                     "DeleteRecords response protocol parse failure: %s",
3362                     rd_kafka_err2str(reply->rkbuf_err));
3363 
3364         return reply->rkbuf_err;
3365 }
3366 
3367 
3368 /**
3369  * @brief Call when leaders have been queried to progress the DeleteRecords
3370  *        admin op to its next phase, sending DeleteRecords to partition
3371  *        leaders.
3372  *
3373  * @param rko Reply op (RD_KAFKA_OP_LEADERS).
3374  */
3375 static rd_kafka_op_res_t
rd_kafka_DeleteRecords_leaders_queried_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * reply)3376 rd_kafka_DeleteRecords_leaders_queried_cb (rd_kafka_t *rk,
3377                                            rd_kafka_q_t *rkq,
3378                                            rd_kafka_op_t *reply) {
3379         rd_kafka_resp_err_t err = reply->rko_err;
3380         const rd_list_t *leaders =
3381                 reply->rko_u.leaders.leaders; /* Possibly NULL (on err) */
3382         rd_kafka_topic_partition_list_t *partitions =
3383                 reply->rko_u.leaders.partitions; /* Possibly NULL (on err) */
3384         rd_kafka_op_t *rko_fanout = reply->rko_u.leaders.opaque;
3385         rd_kafka_topic_partition_t *rktpar;
3386         rd_kafka_topic_partition_list_t *offsets;
3387         const struct rd_kafka_partition_leader *leader;
3388         static const struct rd_kafka_admin_worker_cbs cbs = {
3389                 rd_kafka_DeleteRecordsRequest,
3390                 rd_kafka_DeleteRecordsResponse_parse,
3391         };
3392         int i;
3393 
3394         rd_assert((rko_fanout->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
3395                   RD_KAFKA_OP_ADMIN_FANOUT);
3396 
3397         if (err == RD_KAFKA_RESP_ERR__DESTROY)
3398                 goto err;
3399 
3400         /* Requested offsets */
3401         offsets = rd_list_elem(&rko_fanout->rko_u.admin_request.args, 0);
3402 
3403         /* Update the error field of each partition from the
3404          * leader-queried partition list so that ERR_UNKNOWN_TOPIC_OR_PART
3405          * and similar are propagated, since those partitions are not
3406          * included in the leaders list. */
3407         RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
3408                 rd_kafka_topic_partition_t *rktpar2;
3409 
3410                 if (!rktpar->err)
3411                         continue;
3412 
3413                 rktpar2 = rd_kafka_topic_partition_list_find(
3414                         offsets, rktpar->topic, rktpar->partition);
3415                 rd_assert(rktpar2);
3416                 rktpar2->err = rktpar->err;
3417         }
3418 
3419 
3420         if (err) {
3421         err:
3422                 rd_kafka_admin_result_fail(
3423                         rko_fanout,
3424                         err,
3425                         "Failed to query partition leaders: %s",
3426                         err == RD_KAFKA_RESP_ERR__NOENT ?
3427                         "No leaders found" : rd_kafka_err2str(err));
3428                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3429                                                      rd_true/*destroy*/);
3430                 return RD_KAFKA_OP_RES_HANDLED;
3431         }
3432 
3433         /* The response lists is one element deep and that element is a
3434          * rd_kafka_topic_partition_list_t with the results of the deletes. */
3435         rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, 1,
3436                      rd_kafka_topic_partition_list_destroy_free);
3437         rd_list_add(&rko_fanout->rko_u.admin_request.fanout.results,
3438                     rd_kafka_topic_partition_list_copy(offsets));
3439 
3440         rko_fanout->rko_u.admin_request.fanout.outstanding =
3441                 rd_list_cnt(leaders);
3442 
3443         rd_assert(rd_list_cnt(leaders) > 0);
3444 
3445         /* For each leader send a request for its partitions */
3446         RD_LIST_FOREACH(leader, leaders, i) {
3447                 rd_kafka_op_t *rko =
3448                         rd_kafka_admin_request_op_new(
3449                                 rk,
3450                                 RD_KAFKA_OP_DELETERECORDS,
3451                                 RD_KAFKA_EVENT_DELETERECORDS_RESULT,
3452                                 &cbs, &rko_fanout->rko_u.admin_request.options,
3453                                 rk->rk_ops);
3454                 rko->rko_u.admin_request.fanout_parent = rko_fanout;
3455                 rko->rko_u.admin_request.broker_id = leader->rkb->rkb_nodeid;
3456 
3457                 rd_kafka_topic_partition_list_sort_by_topic(leader->partitions);
3458 
3459                 rd_list_init(&rko->rko_u.admin_request.args, 1,
3460                              rd_kafka_topic_partition_list_destroy_free);
3461                 rd_list_add(&rko->rko_u.admin_request.args,
3462                             rd_kafka_topic_partition_list_copy(
3463                                     leader->partitions));
3464 
3465                 /* Enqueue op for admin_worker() to transition to next state */
3466                 rd_kafka_q_enq(rk->rk_ops, rko);
3467         }
3468 
3469         return RD_KAFKA_OP_RES_HANDLED;
3470 }
3471 
3472 
rd_kafka_DeleteRecords(rd_kafka_t * rk,rd_kafka_DeleteRecords_t ** del_records,size_t del_record_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)3473 void rd_kafka_DeleteRecords (rd_kafka_t *rk,
3474                              rd_kafka_DeleteRecords_t **del_records,
3475                              size_t del_record_cnt,
3476                              const rd_kafka_AdminOptions_t *options,
3477                              rd_kafka_queue_t *rkqu) {
3478         rd_kafka_op_t *rko_fanout;
3479         static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = {
3480                 rd_kafka_DeleteRecords_response_merge,
3481                 rd_kafka_topic_partition_list_copy_opaque,
3482         };
3483         const rd_kafka_topic_partition_list_t *offsets;
3484         rd_kafka_topic_partition_list_t *copied_offsets;
3485 
3486         rd_assert(rkqu);
3487 
3488         rko_fanout = rd_kafka_admin_fanout_op_new(
3489                 rk,
3490                 RD_KAFKA_OP_DELETERECORDS,
3491                 RD_KAFKA_EVENT_DELETERECORDS_RESULT,
3492                 &fanout_cbs, options, rkqu->rkqu_q);
3493 
3494         if (del_record_cnt != 1) {
3495                 /* We only support one DeleteRecords per call since there
3496                  * is no point in passing multiples, but the API still
3497                  * needs to be extensible/future-proof. */
3498                 rd_kafka_admin_result_fail(rko_fanout,
3499                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3500                                            "Exactly one DeleteRecords must be "
3501                                            "passed");
3502                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3503                                                      rd_true/*destroy*/);
3504                 return;
3505         }
3506 
3507         offsets = del_records[0]->offsets;
3508 
3509         if (offsets == NULL || offsets->cnt == 0) {
3510                 rd_kafka_admin_result_fail(rko_fanout,
3511                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3512                                            "No records to delete");
3513                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3514                                                      rd_true/*destroy*/);
3515                 return;
3516         }
3517 
3518         /* Copy offsets list and store it on the request op */
3519         copied_offsets = rd_kafka_topic_partition_list_copy(offsets);
3520         if (rd_kafka_topic_partition_list_has_duplicates(
3521                     copied_offsets, rd_false/*check partition*/)) {
3522                 rd_kafka_topic_partition_list_destroy(copied_offsets);
3523                 rd_kafka_admin_result_fail(rko_fanout,
3524                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3525                                            "Duplicate partitions not allowed");
3526                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3527                                                      rd_true/*destroy*/);
3528                 return;
3529         }
3530 
3531         /* Set default error on each partition so that if any of the partitions
3532          * never get a request sent we have an error to indicate it. */
3533         rd_kafka_topic_partition_list_set_err(copied_offsets,
3534                                               RD_KAFKA_RESP_ERR__NOOP);
3535 
3536         rd_list_init(&rko_fanout->rko_u.admin_request.args, 1,
3537                      rd_kafka_topic_partition_list_destroy_free);
3538         rd_list_add(&rko_fanout->rko_u.admin_request.args, copied_offsets);
3539 
3540         /* Async query for partition leaders */
3541         rd_kafka_topic_partition_list_query_leaders_async(
3542                 rk, copied_offsets,
3543                 rd_kafka_admin_timeout_remains(rko_fanout),
3544                 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
3545                 rd_kafka_DeleteRecords_leaders_queried_cb,
3546                 rko_fanout);
3547 }
3548 
3549 
3550 /**
3551  * @brief Get the list of offsets from a DeleteRecords result.
3552  *
3553  * The returned \p offsets life-time is the same as the \p result object.
3554  */
3555 const rd_kafka_topic_partition_list_t *
rd_kafka_DeleteRecords_result_offsets(const rd_kafka_DeleteRecords_result_t * result)3556 rd_kafka_DeleteRecords_result_offsets (
3557         const rd_kafka_DeleteRecords_result_t *result) {
3558         const rd_kafka_topic_partition_list_t *offsets;
3559         const rd_kafka_op_t *rko = (const rd_kafka_op_t *) result;
3560         size_t cnt;
3561 
3562         rd_kafka_op_type_t reqtype =
3563                 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
3564         rd_assert(reqtype == RD_KAFKA_OP_DELETERECORDS);
3565 
3566         cnt = rd_list_cnt(&rko->rko_u.admin_result.results);
3567 
3568         rd_assert(cnt == 1);
3569 
3570         offsets = (const rd_kafka_topic_partition_list_t *)
3571                 rd_list_elem(&rko->rko_u.admin_result.results, 0);
3572 
3573         rd_assert(offsets);
3574 
3575         return offsets;
3576 }
3577 
3578 /**@}*/
3579 
3580 /**
3581  * @name Delete groups
3582  * @{
3583  *
3584  *
3585  *
3586  *
3587  */
3588 
rd_kafka_DeleteGroup_new(const char * group)3589 rd_kafka_DeleteGroup_t *rd_kafka_DeleteGroup_new (const char *group) {
3590         size_t tsize = strlen(group) + 1;
3591         rd_kafka_DeleteGroup_t *del_group;
3592 
3593         /* Single allocation */
3594         del_group = rd_malloc(sizeof(*del_group) + tsize);
3595         del_group->group = del_group->data;
3596         memcpy(del_group->group, group, tsize);
3597 
3598         return del_group;
3599 }
3600 
rd_kafka_DeleteGroup_destroy(rd_kafka_DeleteGroup_t * del_group)3601 void rd_kafka_DeleteGroup_destroy (rd_kafka_DeleteGroup_t *del_group) {
3602         rd_free(del_group);
3603 }
3604 
rd_kafka_DeleteGroup_free(void * ptr)3605 static void rd_kafka_DeleteGroup_free (void *ptr) {
3606         rd_kafka_DeleteGroup_destroy(ptr);
3607 }
3608 
rd_kafka_DeleteGroup_destroy_array(rd_kafka_DeleteGroup_t ** del_groups,size_t del_group_cnt)3609 void rd_kafka_DeleteGroup_destroy_array (rd_kafka_DeleteGroup_t **del_groups,
3610                                          size_t del_group_cnt) {
3611         size_t i;
3612         for (i = 0 ; i < del_group_cnt ; i++)
3613                 rd_kafka_DeleteGroup_destroy(del_groups[i]);
3614 }
3615 
3616 /**
3617  * @brief Group name comparator for DeleteGroup_t
3618  */
rd_kafka_DeleteGroup_cmp(const void * _a,const void * _b)3619 static int rd_kafka_DeleteGroup_cmp (const void *_a, const void *_b) {
3620         const rd_kafka_DeleteGroup_t *a = _a, *b = _b;
3621         return strcmp(a->group, b->group);
3622 }
3623 
3624 /**
3625  * @brief Allocate a new DeleteGroup and make a copy of \p src
3626  */
3627 static rd_kafka_DeleteGroup_t *
rd_kafka_DeleteGroup_copy(const rd_kafka_DeleteGroup_t * src)3628 rd_kafka_DeleteGroup_copy (const rd_kafka_DeleteGroup_t *src) {
3629         return rd_kafka_DeleteGroup_new(src->group);
3630 }
3631 
3632 
3633 /**
3634  * @brief Parse DeleteGroupsResponse and create ADMIN_RESULT op.
3635  */
3636 static rd_kafka_resp_err_t
rd_kafka_DeleteGroupsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)3637 rd_kafka_DeleteGroupsResponse_parse (rd_kafka_op_t *rko_req,
3638                                      rd_kafka_op_t **rko_resultp,
3639                                      rd_kafka_buf_t *reply,
3640                                      char *errstr, size_t errstr_size) {
3641         const int log_decode_errors = LOG_ERR;
3642         int32_t group_cnt;
3643         int i;
3644         rd_kafka_op_t *rko_result = NULL;
3645 
3646         rd_kafka_buf_read_throttle_time(reply);
3647 
3648         /* #group_error_codes */
3649         rd_kafka_buf_read_i32(reply, &group_cnt);
3650 
3651         if (group_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
3652                 rd_kafka_buf_parse_fail(
3653                         reply,
3654                         "Received %"PRId32" groups in response "
3655                         "when only %d were requested", group_cnt,
3656                         rd_list_cnt(&rko_req->rko_u.admin_request.args));
3657 
3658         rko_result = rd_kafka_admin_result_new(rko_req);
3659         rd_list_init(&rko_result->rko_u.admin_result.results,
3660                      group_cnt,
3661                      rd_kafka_group_result_free);
3662 
3663         for (i = 0 ; i < (int)group_cnt ; i++) {
3664                 rd_kafkap_str_t kgroup;
3665                 int16_t error_code;
3666                 rd_kafka_group_result_t *groupres;
3667 
3668                 rd_kafka_buf_read_str(reply, &kgroup);
3669                 rd_kafka_buf_read_i16(reply, &error_code);
3670 
3671                 groupres = rd_kafka_group_result_new(
3672                         kgroup.str,
3673                         RD_KAFKAP_STR_LEN(&kgroup),
3674                         NULL,
3675                         error_code ?
3676                         rd_kafka_error_new(error_code, NULL) : NULL);
3677 
3678                 rd_list_add(&rko_result->rko_u.admin_result.results, groupres);
3679         }
3680 
3681         *rko_resultp = rko_result;
3682         return RD_KAFKA_RESP_ERR_NO_ERROR;
3683 
3684 err_parse:
3685         if (rko_result)
3686                 rd_kafka_op_destroy(rko_result);
3687 
3688         rd_snprintf(errstr, errstr_size,
3689                     "DeleteGroups response protocol parse failure: %s",
3690                     rd_kafka_err2str(reply->rkbuf_err));
3691 
3692         return reply->rkbuf_err;
3693 }
3694 
3695 /** @brief Merge the DeleteGroups response from a single broker
3696  *         into the user response list.
3697  */
rd_kafka_DeleteGroups_response_merge(rd_kafka_op_t * rko_fanout,const rd_kafka_op_t * rko_partial)3698 void rd_kafka_DeleteGroups_response_merge (rd_kafka_op_t *rko_fanout,
3699                                            const rd_kafka_op_t *rko_partial) {
3700         const rd_kafka_group_result_t *groupres = NULL;
3701         rd_kafka_group_result_t *newgroupres;
3702         const rd_kafka_DeleteGroup_t *grp =
3703                 rko_partial->rko_u.admin_result.opaque;
3704         int orig_pos;
3705 
3706         rd_assert(rko_partial->rko_evtype ==
3707                   RD_KAFKA_EVENT_DELETEGROUPS_RESULT);
3708 
3709         if (!rko_partial->rko_err) {
3710                 /* Proper results.
3711                  * We only send one group per request, make sure it matches */
3712                 groupres = rd_list_elem(&rko_partial->rko_u.admin_result.
3713                                         results, 0);
3714                 rd_assert(groupres);
3715                 rd_assert(!strcmp(groupres->group, grp->group));
3716                 newgroupres = rd_kafka_group_result_copy(groupres);
3717         } else {
3718                 /* Op errored, e.g. timeout */
3719                 newgroupres = rd_kafka_group_result_new(
3720                         grp->group, -1, NULL,
3721                         rd_kafka_error_new(rko_partial->rko_err, NULL));
3722         }
3723 
3724         /* As a convenience to the application we insert group result
3725          * in the same order as they were requested. */
3726         orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args,
3727                                  grp, rd_kafka_DeleteGroup_cmp);
3728         rd_assert(orig_pos != -1);
3729 
3730         /* Make sure result is not already set */
3731         rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.
3732                                fanout.results, orig_pos) == NULL);
3733 
3734         rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results,
3735                     orig_pos, newgroupres);
3736 }
3737 
rd_kafka_DeleteGroups(rd_kafka_t * rk,rd_kafka_DeleteGroup_t ** del_groups,size_t del_group_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)3738 void rd_kafka_DeleteGroups (rd_kafka_t *rk,
3739                             rd_kafka_DeleteGroup_t **del_groups,
3740                             size_t del_group_cnt,
3741                             const rd_kafka_AdminOptions_t *options,
3742                             rd_kafka_queue_t *rkqu) {
3743         rd_kafka_op_t *rko_fanout;
3744         rd_list_t dup_list;
3745         size_t i;
3746         static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = {
3747                 rd_kafka_DeleteGroups_response_merge,
3748                 rd_kafka_group_result_copy_opaque,
3749         };
3750 
3751         rd_assert(rkqu);
3752 
3753         rko_fanout = rd_kafka_admin_fanout_op_new(
3754                 rk,
3755                 RD_KAFKA_OP_DELETEGROUPS,
3756                 RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
3757                 &fanout_cbs, options, rkqu->rkqu_q);
3758 
3759         if (del_group_cnt == 0) {
3760                 rd_kafka_admin_result_fail(rko_fanout,
3761                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3762                                            "No groups to delete");
3763                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3764                                                      rd_true/*destroy*/);
3765                 return;
3766         }
3767 
3768         /* Copy group list and store it on the request op.
3769          * Maintain original ordering. */
3770         rd_list_init(&rko_fanout->rko_u.admin_request.args,
3771                      (int)del_group_cnt,
3772                      rd_kafka_DeleteGroup_free);
3773         for (i = 0; i < del_group_cnt; i++)
3774                 rd_list_add(&rko_fanout->rko_u.admin_request.args,
3775                             rd_kafka_DeleteGroup_copy(del_groups[i]));
3776 
3777         /* Check for duplicates.
3778          * Make a temporary copy of the group list and sort it to check for
3779          * duplicates, we don't want the original list sorted since we want
3780          * to maintain ordering. */
3781         rd_list_init(&dup_list,
3782                      rd_list_cnt(&rko_fanout->rko_u.admin_request.args),
3783                      NULL);
3784         rd_list_copy_to(&dup_list,
3785                         &rko_fanout->rko_u.admin_request.args,
3786                         NULL, NULL);
3787         rd_list_sort(&dup_list, rd_kafka_DeleteGroup_cmp);
3788         if (rd_list_find_duplicate(&dup_list, rd_kafka_DeleteGroup_cmp)) {
3789                 rd_list_destroy(&dup_list);
3790                 rd_kafka_admin_result_fail(rko_fanout,
3791                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
3792                                            "Duplicate groups not allowed");
3793                 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3794                                                      rd_true/*destroy*/);
3795                 return;
3796         }
3797 
3798         rd_list_destroy(&dup_list);
3799 
3800         /* Prepare results list where fanned out op's results will be
3801          * accumulated. */
3802         rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results,
3803                      (int)del_group_cnt,
3804                      rd_kafka_group_result_free);
3805         rko_fanout->rko_u.admin_request.fanout.outstanding = (int)del_group_cnt;
3806 
3807         /* Create individual request ops for each group.
3808          * FIXME: A future optimization is to coalesce all groups for a single
3809          *        coordinator into one op. */
3810         for (i = 0; i < del_group_cnt; i++) {
3811                 static const struct rd_kafka_admin_worker_cbs cbs = {
3812                         rd_kafka_DeleteGroupsRequest,
3813                         rd_kafka_DeleteGroupsResponse_parse,
3814                 };
3815                 rd_kafka_DeleteGroup_t *grp = rd_list_elem(
3816                         &rko_fanout->rko_u.admin_request.args, (int)i);
3817                 rd_kafka_op_t *rko =
3818                         rd_kafka_admin_request_op_new(
3819                                 rk,
3820                                 RD_KAFKA_OP_DELETEGROUPS,
3821                                 RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
3822                                 &cbs,
3823                                 options,
3824                                 rk->rk_ops);
3825 
3826                 rko->rko_u.admin_request.fanout_parent = rko_fanout;
3827                 rko->rko_u.admin_request.broker_id =
3828                         RD_KAFKA_ADMIN_TARGET_COORDINATOR;
3829                 rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP;
3830                 rko->rko_u.admin_request.coordkey = rd_strdup(grp->group);
3831 
3832                 /* Set the group name as the opaque so the fanout worker use it
3833                  * to fill in errors.
3834                  * References rko_fanout's memory, which will always outlive
3835                  * the fanned out op. */
3836                 rd_kafka_AdminOptions_set_opaque(
3837                         &rko->rko_u.admin_request.options, grp);
3838 
3839                 rd_list_init(&rko->rko_u.admin_request.args, 1,
3840                              rd_kafka_DeleteGroup_free);
3841                 rd_list_add(&rko->rko_u.admin_request.args,
3842                             rd_kafka_DeleteGroup_copy(del_groups[i]));
3843 
3844                 rd_kafka_q_enq(rk->rk_ops, rko);
3845         }
3846 }
3847 
3848 
3849 /**
3850  * @brief Get an array of group results from a DeleteGroups result.
3851  *
3852  * The returned \p groups life-time is the same as the \p result object.
3853  * @param cntp is updated to the number of elements in the array.
3854  */
3855 const rd_kafka_group_result_t **
rd_kafka_DeleteGroups_result_groups(const rd_kafka_DeleteGroups_result_t * result,size_t * cntp)3856 rd_kafka_DeleteGroups_result_groups (
3857         const rd_kafka_DeleteGroups_result_t *result,
3858         size_t *cntp) {
3859         return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result,
3860                                                 cntp);
3861 }
3862 
3863 
3864 /**@}*/
3865 
3866 
3867 /**
3868  * @name Delete consumer group offsets (committed offsets)
3869  * @{
3870  *
3871  *
3872  *
3873  *
3874  */
3875 
3876 rd_kafka_DeleteConsumerGroupOffsets_t *
rd_kafka_DeleteConsumerGroupOffsets_new(const char * group,const rd_kafka_topic_partition_list_t * partitions)3877 rd_kafka_DeleteConsumerGroupOffsets_new (const char *group,
3878                                          const rd_kafka_topic_partition_list_t
3879                                          *partitions) {
3880         size_t tsize = strlen(group) + 1;
3881         rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets;
3882 
3883         rd_assert(partitions);
3884 
3885         /* Single allocation */
3886         del_grpoffsets = rd_malloc(sizeof(*del_grpoffsets) + tsize);
3887         del_grpoffsets->group = del_grpoffsets->data;
3888         memcpy(del_grpoffsets->group, group, tsize);
3889         del_grpoffsets->partitions =
3890                 rd_kafka_topic_partition_list_copy(partitions);
3891 
3892         return del_grpoffsets;
3893 }
3894 
rd_kafka_DeleteConsumerGroupOffsets_destroy(rd_kafka_DeleteConsumerGroupOffsets_t * del_grpoffsets)3895 void rd_kafka_DeleteConsumerGroupOffsets_destroy (
3896         rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets) {
3897         rd_kafka_topic_partition_list_destroy(del_grpoffsets->partitions);
3898         rd_free(del_grpoffsets);
3899 }
3900 
rd_kafka_DeleteConsumerGroupOffsets_free(void * ptr)3901 static void rd_kafka_DeleteConsumerGroupOffsets_free (void *ptr) {
3902         rd_kafka_DeleteConsumerGroupOffsets_destroy(ptr);
3903 }
3904 
rd_kafka_DeleteConsumerGroupOffsets_destroy_array(rd_kafka_DeleteConsumerGroupOffsets_t ** del_grpoffsets,size_t del_grpoffsets_cnt)3905 void rd_kafka_DeleteConsumerGroupOffsets_destroy_array (
3906         rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets,
3907         size_t del_grpoffsets_cnt) {
3908         size_t i;
3909         for (i = 0 ; i < del_grpoffsets_cnt ; i++)
3910                 rd_kafka_DeleteConsumerGroupOffsets_destroy(del_grpoffsets[i]);
3911 }
3912 
3913 
3914 /**
3915  * @brief Allocate a new DeleteGroup and make a copy of \p src
3916  */
3917 static rd_kafka_DeleteConsumerGroupOffsets_t *
rd_kafka_DeleteConsumerGroupOffsets_copy(const rd_kafka_DeleteConsumerGroupOffsets_t * src)3918 rd_kafka_DeleteConsumerGroupOffsets_copy (
3919         const rd_kafka_DeleteConsumerGroupOffsets_t *src) {
3920         return rd_kafka_DeleteConsumerGroupOffsets_new(src->group,
3921                                                        src->partitions);
3922 }
3923 
3924 
3925 /**
3926  * @brief Parse OffsetDeleteResponse and create ADMIN_RESULT op.
3927  */
3928 static rd_kafka_resp_err_t
rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)3929 rd_kafka_OffsetDeleteResponse_parse (rd_kafka_op_t *rko_req,
3930                                      rd_kafka_op_t **rko_resultp,
3931                                      rd_kafka_buf_t *reply,
3932                                      char *errstr, size_t errstr_size) {
3933         const int log_decode_errors = LOG_ERR;
3934         rd_kafka_op_t *rko_result;
3935         int16_t ErrorCode;
3936         rd_kafka_topic_partition_list_t *partitions = NULL;
3937         const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets;
3938 
3939         rd_kafka_buf_read_i16(reply, &ErrorCode);
3940         if (ErrorCode) {
3941                 rd_snprintf(errstr, errstr_size,
3942                             "OffsetDelete response error: %s",
3943                             rd_kafka_err2str(ErrorCode));
3944                 return ErrorCode;
3945         }
3946 
3947         rd_kafka_buf_read_throttle_time(reply);
3948 
3949         partitions = rd_kafka_buf_read_topic_partitions(reply,
3950                                                         16,
3951                                                         rd_false/*no offset */,
3952                                                         rd_true/*read error*/);
3953         if (!partitions) {
3954                 rd_snprintf(errstr, errstr_size,
3955                             "Failed to parse OffsetDeleteResponse partitions");
3956                 return RD_KAFKA_RESP_ERR__BAD_MSG;
3957         }
3958 
3959 
3960         /* Create result op and group_result_t */
3961         rko_result = rd_kafka_admin_result_new(rko_req);
3962         del_grpoffsets = rd_list_elem(&rko_result->rko_u.admin_result.args, 0);
3963 
3964         rd_list_init(&rko_result->rko_u.admin_result.results, 1,
3965                      rd_kafka_group_result_free);
3966         rd_list_add(&rko_result->rko_u.admin_result.results,
3967                     rd_kafka_group_result_new(del_grpoffsets->group, -1,
3968                                               partitions, NULL));
3969         rd_kafka_topic_partition_list_destroy(partitions);
3970 
3971         *rko_resultp = rko_result;
3972 
3973         return RD_KAFKA_RESP_ERR_NO_ERROR;
3974 
3975  err_parse:
3976         rd_snprintf(errstr, errstr_size,
3977                     "OffsetDelete response protocol parse failure: %s",
3978                     rd_kafka_err2str(reply->rkbuf_err));
3979         return reply->rkbuf_err;
3980 }
3981 
3982 
rd_kafka_DeleteConsumerGroupOffsets(rd_kafka_t * rk,rd_kafka_DeleteConsumerGroupOffsets_t ** del_grpoffsets,size_t del_grpoffsets_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)3983 void rd_kafka_DeleteConsumerGroupOffsets (
3984         rd_kafka_t *rk,
3985         rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets,
3986         size_t del_grpoffsets_cnt,
3987         const rd_kafka_AdminOptions_t *options,
3988         rd_kafka_queue_t *rkqu) {
3989         static const struct rd_kafka_admin_worker_cbs cbs = {
3990                 rd_kafka_OffsetDeleteRequest,
3991                 rd_kafka_OffsetDeleteResponse_parse,
3992         };
3993         rd_kafka_op_t *rko;
3994 
3995         rd_assert(rkqu);
3996 
3997         rko = rd_kafka_admin_request_op_new(
3998                 rk,
3999                 RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS,
4000                 RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT,
4001                 &cbs, options, rkqu->rkqu_q);
4002 
4003         if (del_grpoffsets_cnt != 1) {
4004                 /* For simplicity we only support one single group for now */
4005                 rd_kafka_admin_result_fail(rko,
4006                                            RD_KAFKA_RESP_ERR__INVALID_ARG,
4007                                            "Exactly one "
4008                                            "DeleteConsumerGroupOffsets must "
4009                                            "be passed");
4010                 rd_kafka_admin_common_worker_destroy(rk, rko,
4011                                                      rd_true/*destroy*/);
4012                 return;
4013         }
4014 
4015 
4016         rko->rko_u.admin_request.broker_id =
4017                 RD_KAFKA_ADMIN_TARGET_COORDINATOR;
4018         rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP;
4019         rko->rko_u.admin_request.coordkey =
4020                 rd_strdup(del_grpoffsets[0]->group);
4021 
4022         /* Store copy of group on request so the group name can be reached
4023          * from the response parser. */
4024         rd_list_init(&rko->rko_u.admin_request.args, 1,
4025                      rd_kafka_DeleteConsumerGroupOffsets_free);
4026         rd_list_add(&rko->rko_u.admin_request.args,
4027                     rd_kafka_DeleteConsumerGroupOffsets_copy(
4028                             del_grpoffsets[0]));
4029 
4030         rd_kafka_q_enq(rk->rk_ops, rko);
4031 }
4032 
4033 
4034 /**
4035  * @brief Get an array of group results from a DeleteGroups result.
4036  *
4037  * The returned \p groups life-time is the same as the \p result object.
4038  * @param cntp is updated to the number of elements in the array.
4039  */
4040 const rd_kafka_group_result_t **
rd_kafka_DeleteConsumerGroupOffsets_result_groups(const rd_kafka_DeleteConsumerGroupOffsets_result_t * result,size_t * cntp)4041 rd_kafka_DeleteConsumerGroupOffsets_result_groups (
4042         const rd_kafka_DeleteConsumerGroupOffsets_result_t *result,
4043         size_t *cntp) {
4044         return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result,
4045                                                 cntp);
4046 }
4047 
4048 RD_EXPORT
4049 void rd_kafka_DeleteConsumerGroupOffsets (
4050         rd_kafka_t *rk,
4051         rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets,
4052         size_t del_grpoffsets_cnt,
4053         const rd_kafka_AdminOptions_t *options,
4054         rd_kafka_queue_t *rkqu);
4055 
4056 /**@}*/
4057