1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2018 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "rdkafka_int.h"
30 #include "rdkafka_admin.h"
31 #include "rdkafka_request.h"
32 #include "rdkafka_aux.h"
33
34 #include <stdarg.h>
35
36
37
38 /** @brief Descriptive strings for rko_u.admin_request.state */
39 static const char *rd_kafka_admin_state_desc[] = {
40 "initializing",
41 "waiting for broker",
42 "waiting for controller",
43 "waiting for fanouts",
44 "constructing request",
45 "waiting for response from broker",
46 };
47
48
49
50 /**
51 * @brief Admin API implementation.
52 *
53 * The public Admin API in librdkafka exposes a completely asynchronous
54 * interface where the initial request API (e.g., ..CreateTopics())
55 * is non-blocking and returns immediately, and the application polls
56 * a ..queue_t for the result.
57 *
58 * The underlying handling of the request is also completely asynchronous
59 * inside librdkafka, for two reasons:
60 * - everything is async in librdkafka so adding something new that isn't
61 * would mean that existing functionality will need to be changed if
62 * it should be able to work simultaneously (such as statistics, timers,
63 * etc). There is no functional value to making the admin API
64 * synchronous internally, even if it would simplify its implementation.
65 * So making it async allows the Admin API to be used with existing
66 * client types in existing applications without breakage.
67 * - the async approach allows multiple outstanding Admin API requests
68 * simultaneously.
69 *
70 * The internal async implementation relies on the following concepts:
71 * - it uses a single rko (rd_kafka_op_t) to maintain state.
72 * - the rko has a callback attached - called the worker callback.
73 * - the worker callback is a small state machine that triggers
74 * async operations (be it controller lookups, timeout timers,
75 * protocol transmits, etc).
76 * - the worker callback is only called on the rdkafka main thread.
77 * - the callback is triggered by different events and sources by enqueuing
78 * the rko on the rdkafka main ops queue.
79 *
80 *
81 * Let's illustrate this with a DeleteTopics example. This might look
82 * daunting, but it boils down to an asynchronous state machine being
83 * triggered by enqueuing the rko op.
84 *
85 * 1. [app thread] The user constructs the input arguments,
86 * including a response rkqu queue and then calls DeleteTopics().
87 *
88 * 2. [app thread] DeleteTopics() creates a new internal op (rko) of type
89 * RD_KAFKA_OP_DELETETOPICS, makes a **copy** on the rko of all the
90 * input arguments (which allows the caller to free the originals
91 * whenever she likes). The rko op worker callback is set to the
92 * generic admin worker callback rd_kafka_admin_worker()
93 *
94 * 3. [app thread] DeleteTopics() enqueues the rko on librdkafka's main ops
95 * queue that is served by the rdkafka main thread in rd_kafka_thread_main()
96 *
97 * 4. [rdkafka main thread] The rko is dequeued by rd_kafka_q_serve and
98 * the rd_kafka_poll_cb() is called.
99 *
100 * 5. [rdkafka main thread] The rko_type switch case identifies the rko
101 * as an RD_KAFKA_OP_DELETETOPICS which is served by the op callback
102 * set in step 2.
103 *
104 * 6. [rdkafka main thread] The worker callback is called.
105 * After some initial checking of err==ERR__DESTROY events
106 * (which is used to clean up outstanding ops (etc) on termination),
107 * the code hits a state machine using rko_u.admin.request_state.
108 *
109 * 7. [rdkafka main thread] The initial state is RD_KAFKA_ADMIN_STATE_INIT
110 * where the worker validates the user input.
111 * An enqueue once (eonce) object is created - the use of this object
112 * allows having multiple outstanding async functions referencing the
113 * same underlying rko object, but only allowing the first one
114 * to trigger an event.
115 * A timeout timer is set up to trigger the eonce object when the
116 * full options.request_timeout has elapsed.
117 *
118 * 8. [rdkafka main thread] After initialization the state is updated
119 * to WAIT_BROKER or WAIT_CONTROLLER and the code falls through to
120 * looking up a specific broker or the controller broker and waiting for
121 * an active connection.
122 * Both the lookup and the waiting for an active connection are
123 * fully asynchronous, and the same eonce used for the timer is passed
124 * to the rd_kafka_broker_controller_async() or broker_async() functions
125 * which will trigger the eonce when a broker state change occurs.
126 * If the controller is already known (from metadata) and the connection
127 * is up a rkb broker object is returned and the eonce is not used,
128 * skip to step 11.
129 *
130 * 9. [rdkafka main thread] Upon metadata retrieval (which is triggered
131 * automatically by other parts of the code) the controller_id may be
132 * updated in which case the eonce is triggered.
133 * The eonce triggering enqueues the original rko on the rdkafka main
134 * ops queue again and we go to step 8 which will check if the controller
135 * connection is up.
136 *
137 * 10. [broker thread] If the controller_id is now known we wait for
138 * the corresponding broker's connection to come up. This signaling
139 * is performed from the broker thread upon broker state changes
140 * and uses the same eonce. The eonce triggering enqueues the original
141 * rko on the rdkafka main ops queue again we go to back to step 8
142 * to check if broker is now available.
143 *
144 * 11. [rdkafka main thread] Back in the worker callback we now have an
145 * rkb broker pointer (with reference count increased) for the controller
146 * with the connection up (it might go down while we're referencing it,
147 * but that does not stop us from enqueuing a protocol request).
148 *
149 * 12. [rdkafka main thread] A DeleteTopics protocol request buffer is
150 * constructed using the input parameters saved on the rko and the
151 * buffer is enqueued on the broker's transmit queue.
152 * The buffer is set up to provide the reply buffer on the rdkafka main
153 * ops queue (the same queue we are operating from) with a handler
154 * callback of rd_kafka_admin_handle_response().
155 * The state is updated to the RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE.
156 *
157 * 13. [broker thread] If the request times out, a response with error code
158 * (ERR__TIMED_OUT) is enqueued. Go to 16.
159 *
160 * 14. [broker thread] If a response is received, the response buffer
161 * is enqueued. Go to 16.
162 *
163 * 15. [rdkafka main thread] The buffer callback (..handle_response())
164 * is called, which attempts to extract the original rko from the eonce,
165 * but if the eonce has already been triggered by some other source
166 * (the timeout timer) the buffer callback simply returns and does nothing
167 * since the admin request is over and a result (probably a timeout)
168 * has been enqueued for the application.
169 * If the rko was still intact we temporarily set the reply buffer
170 * in the rko struct and call the worker callback. Go to 17.
171 *
172 * 16. [rdkafka main thread] The worker callback is called in state
173 * RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE without a response but with an error.
174 * An error result op is created and enqueued on the application's
175 * provided response rkqu queue.
176 *
177 * 17. [rdkafka main thread] The worker callback is called in state
178 * RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE with a response buffer with no
179 * error set.
180 * The worker calls the response `parse()` callback to parse the response
181 * buffer and populates a result op (rko_result) with the response
182 * information (such as per-topic error codes, etc).
183 * The result op is returned to the worker.
184 *
185 * 18. [rdkafka main thread] The worker enqueues the result op (rko_result)
186 * on the application's provided response rkqu queue.
187 *
188 * 19. [app thread] The application calls rd_kafka_queue_poll() to
189 * receive the result of the operation. The result may have been
190 * enqueued in step 18 thanks to succesful completion, or in any
191 * of the earlier stages when an error was encountered.
192 *
193 * 20. [app thread] The application uses rd_kafka_event_DeleteTopics_result()
194 * to retrieve the request-specific result type.
195 *
196 * 21. Done.
197 *
198 *
199 *
200 *
201 * Fanout (RD_KAFKA_OP_ADMIN_FANOUT) requests
202 * ------------------------------------------
203 *
204 * Certain Admin APIs may have requests that need to be sent to different
205 * brokers, for instance DeleteRecords which needs to be sent to the leader
206 * for each given partition.
207 *
208 * To achieve this we create a Fanout (RD_KAFKA_OP_ADMIN_FANOUT) op for the
209 * overall Admin API call (e.g., DeleteRecords), and then sub-ops for each
210 * of the per-broker requests. These sub-ops have the proper op type for
211 * the operation they are performing (e.g., RD_KAFKA_OP_DELETERECORDS)
212 * but their replyq does not point back to the application replyq but
213 * rk_ops which is handled by the librdkafka main thread and with the op
214 * callback set to rd_kafka_admin_fanout_worker(). This worker aggregates
215 * the results of each fanned out sub-op and merges the result into a
216 * single result op (RD_KAFKA_OP_ADMIN_RESULT) that is enqueued on the
217 * application's replyq.
218 *
219 * We rely on the timeouts on the fanned out sub-ops rather than the parent
220 * fanout op.
221 *
222 * The parent fanout op must not be destroyed until all fanned out sub-ops
223 * are done (either by success, failure or timeout) and destroyed, and this
224 * is tracked by the rko_u.admin_request.fanout.outstanding counter.
225 *
226 */
227
228
229 /**
230 * @enum Admin request target broker. Must be negative values since the field
231 * used is broker_id.
232 */
233 enum {
234 RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1, /**< Cluster controller */
235 RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */
236 RD_KAFKA_ADMIN_TARGET_FANOUT = -3, /**< This rko is a fanout and
237 * and has no target broker */
238 };
239
240 /**
241 * @brief Admin op callback types
242 */
243 typedef rd_kafka_resp_err_t (rd_kafka_admin_Request_cb_t) (
244 rd_kafka_broker_t *rkb,
245 const rd_list_t *configs /*(ConfigResource_t*)*/,
246 rd_kafka_AdminOptions_t *options,
247 char *errstr, size_t errstr_size,
248 rd_kafka_replyq_t replyq,
249 rd_kafka_resp_cb_t *resp_cb,
250 void *opaque)
251 RD_WARN_UNUSED_RESULT;
252
253 typedef rd_kafka_resp_err_t (rd_kafka_admin_Response_parse_cb_t) (
254 rd_kafka_op_t *rko_req,
255 rd_kafka_op_t **rko_resultp,
256 rd_kafka_buf_t *reply,
257 char *errstr, size_t errstr_size)
258 RD_WARN_UNUSED_RESULT;
259
260 typedef void (rd_kafka_admin_fanout_PartialResponse_cb_t) (
261 rd_kafka_op_t *rko_req,
262 const rd_kafka_op_t *rko_partial);
263
264 typedef rd_list_copy_cb_t rd_kafka_admin_fanout_CopyResult_cb_t;
265
266 /**
267 * @struct Request-specific worker callbacks.
268 */
269 struct rd_kafka_admin_worker_cbs {
270 /**< Protocol request callback which is called
271 * to construct and send the request. */
272 rd_kafka_admin_Request_cb_t *request;
273
274 /**< Protocol response parser callback which is called
275 * to translate the response to a rko_result op. */
276 rd_kafka_admin_Response_parse_cb_t *parse;
277 };
278
279 /**
280 * @struct Fanout request callbacks.
281 */
282 struct rd_kafka_admin_fanout_worker_cbs {
283 /** Merge results from a fanned out request into the user response. */
284 rd_kafka_admin_fanout_PartialResponse_cb_t *partial_response;
285
286 /** Copy an accumulated result for storing into the rko_result. */
287 rd_kafka_admin_fanout_CopyResult_cb_t *copy_result;
288 };
289
290 /* Forward declarations */
291 static void rd_kafka_admin_common_worker_destroy (rd_kafka_t *rk,
292 rd_kafka_op_t *rko,
293 rd_bool_t do_destroy);
294 static void rd_kafka_AdminOptions_init (rd_kafka_t *rk,
295 rd_kafka_AdminOptions_t *options);
296 static rd_kafka_op_res_t
297 rd_kafka_admin_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko);
298 static rd_kafka_ConfigEntry_t *
299 rd_kafka_ConfigEntry_copy (const rd_kafka_ConfigEntry_t *src);
300 static void rd_kafka_ConfigEntry_free (void *ptr);
301 static void *rd_kafka_ConfigEntry_list_copy (const void *src, void *opaque);
302
303 static void rd_kafka_admin_handle_response (rd_kafka_t *rk,
304 rd_kafka_broker_t *rkb,
305 rd_kafka_resp_err_t err,
306 rd_kafka_buf_t *reply,
307 rd_kafka_buf_t *request,
308 void *opaque);
309
310 static rd_kafka_op_res_t
311 rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq,
312 rd_kafka_op_t *rko_fanout);
313
314
315 /**
316 * @name Common admin request code
317 * @{
318 *
319 *
320 */
321
322 /**
323 * @brief Create a new admin_result op based on the request op \p rko_req.
324 *
325 * @remark This moves the rko_req's admin_request.args list from \p rko_req
326 * to the returned rko. The \p rko_req args will be emptied.
327 */
rd_kafka_admin_result_new(rd_kafka_op_t * rko_req)328 static rd_kafka_op_t *rd_kafka_admin_result_new (rd_kafka_op_t *rko_req) {
329 rd_kafka_op_t *rko_result;
330 rd_kafka_op_t *rko_fanout;
331
332 if ((rko_fanout = rko_req->rko_u.admin_request.fanout_parent)) {
333 /* If this is a fanned out request the rko_result needs to be
334 * handled by the fanout worker rather than the application. */
335 rko_result = rd_kafka_op_new_cb(
336 rko_req->rko_rk,
337 RD_KAFKA_OP_ADMIN_RESULT,
338 rd_kafka_admin_fanout_worker);
339 /* Transfer fanout pointer to result */
340 rko_result->rko_u.admin_result.fanout_parent = rko_fanout;
341 rko_req->rko_u.admin_request.fanout_parent = NULL;
342 /* Set event type based on original fanout ops reqtype,
343 * e.g., ..OP_DELETERECORDS */
344 rko_result->rko_u.admin_result.reqtype =
345 rko_fanout->rko_u.admin_request.fanout.reqtype;
346
347 } else {
348 rko_result = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_RESULT);
349
350 /* If this is fanout request (i.e., the parent OP_ADMIN_FANOUT
351 * to fanned out requests) we need to use the original
352 * application request type. */
353 if (rko_req->rko_type == RD_KAFKA_OP_ADMIN_FANOUT)
354 rko_result->rko_u.admin_result.reqtype =
355 rko_req->rko_u.admin_request.fanout.reqtype;
356 else
357 rko_result->rko_u.admin_result.reqtype =
358 rko_req->rko_type;
359 }
360
361 rko_result->rko_rk = rko_req->rko_rk;
362
363 rko_result->rko_u.admin_result.opaque =
364 rd_kafka_confval_get_ptr(&rko_req->rko_u.admin_request.
365 options.opaque);
366
367 /* Move request arguments (list) from request to result.
368 * This is mainly so that partial_response() knows what arguments
369 * were provided to the response's request it is merging. */
370 rd_list_move(&rko_result->rko_u.admin_result.args,
371 &rko_req->rko_u.admin_request.args);
372
373 rko_result->rko_evtype = rko_req->rko_u.admin_request.reply_event_type;
374
375 return rko_result;
376 }
377
378
379 /**
380 * @brief Set error code and error string on admin_result op \p rko.
381 */
rd_kafka_admin_result_set_err0(rd_kafka_op_t * rko,rd_kafka_resp_err_t err,const char * fmt,va_list ap)382 static void rd_kafka_admin_result_set_err0 (rd_kafka_op_t *rko,
383 rd_kafka_resp_err_t err,
384 const char *fmt, va_list ap) {
385 char buf[512];
386
387 rd_vsnprintf(buf, sizeof(buf), fmt, ap);
388
389 rko->rko_err = err;
390
391 if (rko->rko_u.admin_result.errstr)
392 rd_free(rko->rko_u.admin_result.errstr);
393 rko->rko_u.admin_result.errstr = rd_strdup(buf);
394
395 rd_kafka_dbg(rko->rko_rk, ADMIN, "ADMINFAIL",
396 "Admin %s result error: %s",
397 rd_kafka_op2str(rko->rko_u.admin_result.reqtype),
398 rko->rko_u.admin_result.errstr);
399 }
400
401 /**
402 * @sa rd_kafka_admin_result_set_err0
403 */
404 static RD_UNUSED RD_FORMAT(printf, 3, 4)
rd_kafka_admin_result_set_err(rd_kafka_op_t * rko,rd_kafka_resp_err_t err,const char * fmt,...)405 void rd_kafka_admin_result_set_err (rd_kafka_op_t *rko,
406 rd_kafka_resp_err_t err,
407 const char *fmt, ...) {
408 va_list ap;
409
410 va_start(ap, fmt);
411 rd_kafka_admin_result_set_err0(rko, err, fmt, ap);
412 va_end(ap);
413 }
414
415 /**
416 * @brief Enqueue admin_result on application's queue.
417 */
418 static RD_INLINE
rd_kafka_admin_result_enq(rd_kafka_op_t * rko_req,rd_kafka_op_t * rko_result)419 void rd_kafka_admin_result_enq (rd_kafka_op_t *rko_req,
420 rd_kafka_op_t *rko_result) {
421 rd_kafka_replyq_enq(&rko_req->rko_u.admin_request.replyq,
422 rko_result,
423 rko_req->rko_u.admin_request.replyq.version);
424 }
425
426 /**
427 * @brief Set request-level error code and string in reply op.
428 *
429 * @remark This function will NOT destroy the \p rko_req, so don't forget to
430 * call rd_kafka_admin_common_worker_destroy() when done with the rko.
431 */
432 static RD_FORMAT(printf, 3, 4)
rd_kafka_admin_result_fail(rd_kafka_op_t * rko_req,rd_kafka_resp_err_t err,const char * fmt,...)433 void rd_kafka_admin_result_fail (rd_kafka_op_t *rko_req,
434 rd_kafka_resp_err_t err,
435 const char *fmt, ...) {
436 va_list ap;
437 rd_kafka_op_t *rko_result;
438
439 if (!rko_req->rko_u.admin_request.replyq.q)
440 return;
441
442 rko_result = rd_kafka_admin_result_new(rko_req);
443
444 va_start(ap, fmt);
445 rd_kafka_admin_result_set_err0(rko_result, err, fmt, ap);
446 va_end(ap);
447
448 rd_kafka_admin_result_enq(rko_req, rko_result);
449 }
450
451
452 /**
453 * @brief Send the admin request contained in \p rko upon receiving
454 * a FindCoordinator response.
455 *
456 * @param opaque Must be an admin request op's eonce (rko_u.admin_request.eonce)
457 * (i.e. created by \c rd_kafka_admin_request_op_new )
458 *
459 * @remark To be used as a callback for \c rd_kafka_coord_req
460 */
461 static rd_kafka_resp_err_t
rd_kafka_admin_coord_request(rd_kafka_broker_t * rkb,rd_kafka_op_t * rko_ignore,rd_kafka_replyq_t replyq,rd_kafka_resp_cb_t * resp_cb,void * opaque)462 rd_kafka_admin_coord_request (rd_kafka_broker_t *rkb,
463 rd_kafka_op_t *rko_ignore,
464 rd_kafka_replyq_t replyq,
465 rd_kafka_resp_cb_t *resp_cb,
466 void *opaque) {
467 rd_kafka_t *rk = rkb->rkb_rk;
468 rd_kafka_enq_once_t *eonce = opaque;
469 rd_kafka_op_t *rko;
470 char errstr[512];
471 rd_kafka_resp_err_t err;
472
473
474 rko = rd_kafka_enq_once_del_source_return(eonce, "coordinator request");
475 if (!rko)
476 /* Admin request has timed out and been destroyed */
477 return RD_KAFKA_RESP_ERR__DESTROY;
478
479 rd_kafka_enq_once_add_source(eonce, "coordinator response");
480
481 err = rko->rko_u.admin_request.cbs->request(
482 rkb,
483 &rko->rko_u.admin_request.args,
484 &rko->rko_u.admin_request.options,
485 errstr, sizeof(errstr),
486 replyq,
487 rd_kafka_admin_handle_response,
488 eonce);
489 if (err) {
490 rd_kafka_enq_once_del_source(eonce, "coordinator response");
491 rd_kafka_admin_result_fail(
492 rko, err,
493 "%s worker failed to send request: %s",
494 rd_kafka_op2str(rko->rko_type), errstr);
495 rd_kafka_admin_common_worker_destroy(rk, rko,
496 rd_true/*destroy*/);
497 }
498 return err;
499 }
500
501
502 /**
503 * @brief Return the topics list from a topic-related result object.
504 */
505 static const rd_kafka_topic_result_t **
rd_kafka_admin_result_ret_topics(const rd_kafka_op_t * rko,size_t * cntp)506 rd_kafka_admin_result_ret_topics (const rd_kafka_op_t *rko,
507 size_t *cntp) {
508 rd_kafka_op_type_t reqtype =
509 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
510 rd_assert(reqtype == RD_KAFKA_OP_CREATETOPICS ||
511 reqtype == RD_KAFKA_OP_DELETETOPICS ||
512 reqtype == RD_KAFKA_OP_CREATEPARTITIONS);
513
514 *cntp = rd_list_cnt(&rko->rko_u.admin_result.results);
515 return (const rd_kafka_topic_result_t **)rko->rko_u.admin_result.
516 results.rl_elems;
517 }
518
519 /**
520 * @brief Return the ConfigResource list from a config-related result object.
521 */
522 static const rd_kafka_ConfigResource_t **
rd_kafka_admin_result_ret_resources(const rd_kafka_op_t * rko,size_t * cntp)523 rd_kafka_admin_result_ret_resources (const rd_kafka_op_t *rko,
524 size_t *cntp) {
525 rd_kafka_op_type_t reqtype =
526 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
527 rd_assert(reqtype == RD_KAFKA_OP_ALTERCONFIGS ||
528 reqtype == RD_KAFKA_OP_DESCRIBECONFIGS);
529
530 *cntp = rd_list_cnt(&rko->rko_u.admin_result.results);
531 return (const rd_kafka_ConfigResource_t **)rko->rko_u.admin_result.
532 results.rl_elems;
533 }
534
535
536 /**
537 * @brief Return the groups list from a group-related result object.
538 */
539 static const rd_kafka_group_result_t **
rd_kafka_admin_result_ret_groups(const rd_kafka_op_t * rko,size_t * cntp)540 rd_kafka_admin_result_ret_groups (const rd_kafka_op_t *rko,
541 size_t *cntp) {
542 rd_kafka_op_type_t reqtype =
543 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
544 rd_assert(reqtype == RD_KAFKA_OP_DELETEGROUPS ||
545 reqtype == RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS);
546
547 *cntp = rd_list_cnt(&rko->rko_u.admin_result.results);
548 return (const rd_kafka_group_result_t **)rko->rko_u.admin_result.
549 results.rl_elems;
550 }
551
552 /**
553 * @brief Create a new admin_request op of type \p optype and sets up the
554 * generic (type independent files).
555 *
556 * The caller shall then populate the admin_request.args list
557 * and enqueue the op on rk_ops for further processing work.
558 *
559 * @param cbs Callbacks, must reside in .data segment.
560 * @param options Optional options, may be NULL to use defaults.
561 *
562 * @locks none
563 * @locality application thread
564 */
565 static rd_kafka_op_t *
rd_kafka_admin_request_op_new(rd_kafka_t * rk,rd_kafka_op_type_t optype,rd_kafka_event_type_t reply_event_type,const struct rd_kafka_admin_worker_cbs * cbs,const rd_kafka_AdminOptions_t * options,rd_kafka_q_t * rkq)566 rd_kafka_admin_request_op_new (rd_kafka_t *rk,
567 rd_kafka_op_type_t optype,
568 rd_kafka_event_type_t reply_event_type,
569 const struct rd_kafka_admin_worker_cbs *cbs,
570 const rd_kafka_AdminOptions_t *options,
571 rd_kafka_q_t *rkq) {
572 rd_kafka_op_t *rko;
573
574 rd_assert(rk);
575 rd_assert(rkq);
576 rd_assert(cbs);
577
578 rko = rd_kafka_op_new_cb(rk, optype, rd_kafka_admin_worker);
579
580 rko->rko_u.admin_request.reply_event_type = reply_event_type;
581
582 rko->rko_u.admin_request.cbs = (struct rd_kafka_admin_worker_cbs *)cbs;
583
584 /* Make a copy of the options */
585 if (options)
586 rko->rko_u.admin_request.options = *options;
587 else
588 rd_kafka_AdminOptions_init(rk,
589 &rko->rko_u.admin_request.options);
590
591 /* Default to controller */
592 rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER;
593
594 /* Calculate absolute timeout */
595 rko->rko_u.admin_request.abs_timeout =
596 rd_timeout_init(
597 rd_kafka_confval_get_int(&rko->rko_u.admin_request.
598 options.request_timeout));
599
600 /* Setup enq-op-once, which is triggered by either timer code
601 * or future wait-controller code. */
602 rko->rko_u.admin_request.eonce =
603 rd_kafka_enq_once_new(rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
604
605 /* The timer itself must be started from the rdkafka main thread,
606 * not here. */
607
608 /* Set up replyq */
609 rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0);
610
611 rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_INIT;
612 return rko;
613 }
614
615
616 /**
617 * @returns the remaining request timeout in milliseconds.
618 */
rd_kafka_admin_timeout_remains(rd_kafka_op_t * rko)619 static RD_INLINE int rd_kafka_admin_timeout_remains (rd_kafka_op_t *rko) {
620 return rd_timeout_remains(rko->rko_u.admin_request.abs_timeout);
621 }
622
623 /**
624 * @returns the remaining request timeout in microseconds.
625 */
626 static RD_INLINE rd_ts_t
rd_kafka_admin_timeout_remains_us(rd_kafka_op_t * rko)627 rd_kafka_admin_timeout_remains_us (rd_kafka_op_t *rko) {
628 return rd_timeout_remains_us(rko->rko_u.admin_request.abs_timeout);
629 }
630
631
632 /**
633 * @brief Timer timeout callback for the admin rko's eonce object.
634 */
rd_kafka_admin_eonce_timeout_cb(rd_kafka_timers_t * rkts,void * arg)635 static void rd_kafka_admin_eonce_timeout_cb (rd_kafka_timers_t *rkts,
636 void *arg) {
637 rd_kafka_enq_once_t *eonce = arg;
638
639 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT,
640 "timeout timer");
641 }
642
643
644
645 /**
646 * @brief Common worker destroy to be called in destroy: label
647 * in worker.
648 */
rd_kafka_admin_common_worker_destroy(rd_kafka_t * rk,rd_kafka_op_t * rko,rd_bool_t do_destroy)649 static void rd_kafka_admin_common_worker_destroy (rd_kafka_t *rk,
650 rd_kafka_op_t *rko,
651 rd_bool_t do_destroy) {
652 int timer_was_stopped;
653
654 /* Free resources for this op. */
655 timer_was_stopped =
656 rd_kafka_timer_stop(&rk->rk_timers,
657 &rko->rko_u.admin_request.tmr, rd_true);
658
659
660 if (rko->rko_u.admin_request.eonce) {
661 /* Remove the stopped timer's eonce reference since its
662 * callback will not have fired if we stopped the timer. */
663 if (timer_was_stopped)
664 rd_kafka_enq_once_del_source(rko->rko_u.admin_request.
665 eonce, "timeout timer");
666
667 /* This is thread-safe to do even if there are outstanding
668 * timers or wait-controller references to the eonce
669 * since they only hold direct reference to the eonce,
670 * not the rko (the eonce holds a reference to the rko but
671 * it is cleared here). */
672 rd_kafka_enq_once_destroy(rko->rko_u.admin_request.eonce);
673 rko->rko_u.admin_request.eonce = NULL;
674 }
675
676 if (do_destroy)
677 rd_kafka_op_destroy(rko);
678 }
679
680
681
682 /**
683 * @brief Asynchronously look up a broker.
684 * To be called repeatedly from each invocation of the worker
685 * when in state RD_KAFKA_ADMIN_STATE_WAIT_BROKER until
686 * a valid rkb is returned.
687 *
688 * @returns the broker rkb with refcount increased, or NULL if not yet
689 * available.
690 */
691 static rd_kafka_broker_t *
rd_kafka_admin_common_get_broker(rd_kafka_t * rk,rd_kafka_op_t * rko,int32_t broker_id)692 rd_kafka_admin_common_get_broker (rd_kafka_t *rk,
693 rd_kafka_op_t *rko,
694 int32_t broker_id) {
695 rd_kafka_broker_t *rkb;
696
697 rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up broker %"PRId32,
698 rd_kafka_op2str(rko->rko_type), broker_id);
699
700 /* Since we're iterating over this broker_async() call
701 * (asynchronously) until a broker is availabe (or timeout)
702 * we need to re-enable the eonce to be triggered again (which
703 * is not necessary the first time we get here, but there
704 * is no harm doing it then either). */
705 rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce,
706 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
707
708 /* Look up the broker asynchronously, if the broker
709 * is not available the eonce is registered for broker
710 * state changes which will cause our function to be called
711 * again as soon as (any) broker state changes.
712 * When we are called again we perform the broker lookup
713 * again and hopefully get an rkb back, otherwise defer a new
714 * async wait. Repeat until success or timeout. */
715 if (!(rkb = rd_kafka_broker_get_async(
716 rk, broker_id, RD_KAFKA_BROKER_STATE_UP,
717 rko->rko_u.admin_request.eonce))) {
718 /* Broker not available, wait asynchronously
719 * for broker metadata code to trigger eonce. */
720 return NULL;
721 }
722
723 rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: broker %"PRId32" is %s",
724 rd_kafka_op2str(rko->rko_type), broker_id, rkb->rkb_name);
725
726 return rkb;
727 }
728
729
730 /**
731 * @brief Asynchronously look up the controller.
732 * To be called repeatedly from each invocation of the worker
733 * when in state RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER until
734 * a valid rkb is returned.
735 *
736 * @returns the controller rkb with refcount increased, or NULL if not yet
737 * available.
738 */
739 static rd_kafka_broker_t *
rd_kafka_admin_common_get_controller(rd_kafka_t * rk,rd_kafka_op_t * rko)740 rd_kafka_admin_common_get_controller (rd_kafka_t *rk,
741 rd_kafka_op_t *rko) {
742 rd_kafka_broker_t *rkb;
743
744 rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up controller",
745 rd_kafka_op2str(rko->rko_type));
746
747 /* Since we're iterating over this controller_async() call
748 * (asynchronously) until a controller is availabe (or timeout)
749 * we need to re-enable the eonce to be triggered again (which
750 * is not necessary the first time we get here, but there
751 * is no harm doing it then either). */
752 rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce,
753 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
754
755 /* Look up the controller asynchronously, if the controller
756 * is not available the eonce is registered for broker
757 * state changes which will cause our function to be called
758 * again as soon as (any) broker state changes.
759 * When we are called again we perform the controller lookup
760 * again and hopefully get an rkb back, otherwise defer a new
761 * async wait. Repeat until success or timeout. */
762 if (!(rkb = rd_kafka_broker_controller_async(
763 rk, RD_KAFKA_BROKER_STATE_UP,
764 rko->rko_u.admin_request.eonce))) {
765 /* Controller not available, wait asynchronously
766 * for controller code to trigger eonce. */
767 return NULL;
768 }
769
770 rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: controller %s",
771 rd_kafka_op2str(rko->rko_type), rkb->rkb_name);
772
773 return rkb;
774 }
775
776
777
778 /**
779 * @brief Handle response from broker by triggering worker callback.
780 *
781 * @param opaque is the eonce from the worker protocol request call.
782 */
rd_kafka_admin_handle_response(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * reply,rd_kafka_buf_t * request,void * opaque)783 static void rd_kafka_admin_handle_response (rd_kafka_t *rk,
784 rd_kafka_broker_t *rkb,
785 rd_kafka_resp_err_t err,
786 rd_kafka_buf_t *reply,
787 rd_kafka_buf_t *request,
788 void *opaque) {
789 rd_kafka_enq_once_t *eonce = opaque;
790 rd_kafka_op_t *rko;
791
792 /* From ...add_source("send") */
793 rko = rd_kafka_enq_once_disable(eonce);
794
795 if (!rko) {
796 /* The operation timed out and the worker was
797 * dismantled while we were waiting for broker response,
798 * do nothing - everything has been cleaned up. */
799 rd_kafka_dbg(rk, ADMIN, "ADMIN",
800 "Dropping outdated %sResponse with return code %s",
801 request ?
802 rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey):
803 "???",
804 rd_kafka_err2str(err));
805 return;
806 }
807
808 /* Attach reply buffer to rko for parsing in the worker. */
809 rd_assert(!rko->rko_u.admin_request.reply_buf);
810 rko->rko_u.admin_request.reply_buf = reply;
811 rko->rko_err = err;
812
813 if (rko->rko_op_cb(rk, NULL, rko) == RD_KAFKA_OP_RES_HANDLED)
814 rd_kafka_op_destroy(rko);
815
816 }
817
818 /**
819 * @brief Generic handler for protocol responses, calls the admin ops'
820 * Response_parse_cb and enqueues the result to the caller's queue.
821 */
rd_kafka_admin_response_parse(rd_kafka_op_t * rko)822 static void rd_kafka_admin_response_parse (rd_kafka_op_t *rko) {
823 rd_kafka_resp_err_t err;
824 rd_kafka_op_t *rko_result = NULL;
825 char errstr[512];
826
827 if (rko->rko_err) {
828 rd_kafka_admin_result_fail(
829 rko, rko->rko_err,
830 "%s worker request failed: %s",
831 rd_kafka_op2str(rko->rko_type),
832 rd_kafka_err2str(rko->rko_err));
833 return;
834 }
835
836 /* Response received.
837 * Let callback parse response and provide result in rko_result
838 * which is then enqueued on the reply queue. */
839 err = rko->rko_u.admin_request.cbs->parse(
840 rko, &rko_result,
841 rko->rko_u.admin_request.reply_buf,
842 errstr, sizeof(errstr));
843 if (err) {
844 rd_kafka_admin_result_fail(
845 rko, err,
846 "%s worker failed to parse response: %s",
847 rd_kafka_op2str(rko->rko_type), errstr);
848 return;
849 }
850
851 rd_assert(rko_result);
852
853 /* Enqueue result on application queue, we're done. */
854 rd_kafka_admin_result_enq(rko, rko_result);
855 }
856
857 /**
858 * @brief Generic handler for coord_req() responses.
859 */
860 static void
rd_kafka_admin_coord_response_parse(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)861 rd_kafka_admin_coord_response_parse (rd_kafka_t *rk,
862 rd_kafka_broker_t *rkb,
863 rd_kafka_resp_err_t err,
864 rd_kafka_buf_t *rkbuf,
865 rd_kafka_buf_t *request,
866 void *opaque) {
867 rd_kafka_op_t *rko_result;
868 rd_kafka_enq_once_t *eonce = opaque;
869 rd_kafka_op_t *rko;
870 char errstr[512];
871
872 rko = rd_kafka_enq_once_del_source_return(eonce,
873 "coordinator response");
874 if (!rko)
875 /* Admin request has timed out and been destroyed */
876 return;
877
878 if (err) {
879 rd_kafka_admin_result_fail(
880 rko, err,
881 "%s worker coordinator request failed: %s",
882 rd_kafka_op2str(rko->rko_type),
883 rd_kafka_err2str(err));
884 rd_kafka_admin_common_worker_destroy(rk, rko,
885 rd_true/*destroy*/);
886 return;
887 }
888
889 err = rko->rko_u.admin_request.cbs->parse(
890 rko, &rko_result, rkbuf,
891 errstr, sizeof(errstr));
892 if (err) {
893 rd_kafka_admin_result_fail(
894 rko, err,
895 "%s worker failed to parse coordinator %sResponse: %s",
896 rd_kafka_op2str(rko->rko_type),
897 rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
898 errstr);
899 rd_kafka_admin_common_worker_destroy(rk, rko,
900 rd_true/*destroy*/);
901 return;
902 }
903
904 rd_assert(rko_result);
905
906 /* Enqueue result on application queue, we're done. */
907 rd_kafka_admin_result_enq(rko, rko_result);
908 }
909
910
911
912 /**
913 * @brief Common worker state machine handling regardless of request type.
914 *
915 * Tasks:
916 * - Sets up timeout on first call.
917 * - Checks for timeout.
918 * - Checks for and fails on errors.
919 * - Async Controller and broker lookups
920 * - Calls the Request callback
921 * - Calls the parse callback
922 * - Result reply
923 * - Destruction of rko
924 *
925 * rko->rko_err may be one of:
926 * RD_KAFKA_RESP_ERR_NO_ERROR, or
927 * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup, or
928 * RD_KAFKA_RESP_ERR__TIMED_OUT if request has timed out,
929 * or any other error code triggered by other parts of the code.
930 *
931 * @returns a hint to the op code whether the rko should be destroyed or not.
932 */
933 static rd_kafka_op_res_t
rd_kafka_admin_worker(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)934 rd_kafka_admin_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
935 const char *name = rd_kafka_op2str(rko->rko_type);
936 rd_ts_t timeout_in;
937 rd_kafka_broker_t *rkb = NULL;
938 rd_kafka_resp_err_t err;
939 char errstr[512];
940
941 /* ADMIN_FANOUT handled by fanout_worker() */
942 rd_assert((rko->rko_type & ~ RD_KAFKA_OP_FLAGMASK) !=
943 RD_KAFKA_OP_ADMIN_FANOUT);
944
945 if (rd_kafka_terminating(rk)) {
946 rd_kafka_dbg(rk, ADMIN, name,
947 "%s worker called in state %s: "
948 "handle is terminating: %s",
949 name,
950 rd_kafka_admin_state_desc[rko->rko_u.
951 admin_request.state],
952 rd_kafka_err2str(rko->rko_err));
953 rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY,
954 "Handle is terminating: %s",
955 rd_kafka_err2str(rko->rko_err));
956 goto destroy;
957 }
958
959 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) {
960 rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY,
961 "Destroyed");
962 goto destroy; /* rko being destroyed (silent) */
963 }
964
965 rd_kafka_dbg(rk, ADMIN, name,
966 "%s worker called in state %s: %s",
967 name,
968 rd_kafka_admin_state_desc[rko->rko_u.admin_request.state],
969 rd_kafka_err2str(rko->rko_err));
970
971 rd_assert(thrd_is_current(rko->rko_rk->rk_thread));
972
973 /* Check for errors raised asynchronously (e.g., by timer) */
974 if (rko->rko_err) {
975 rd_kafka_admin_result_fail(
976 rko, rko->rko_err,
977 "Failed while %s: %s",
978 rd_kafka_admin_state_desc[rko->rko_u.
979 admin_request.state],
980 rd_kafka_err2str(rko->rko_err));
981 goto destroy;
982 }
983
984 /* Check for timeout */
985 timeout_in = rd_kafka_admin_timeout_remains_us(rko);
986 if (timeout_in <= 0) {
987 rd_kafka_admin_result_fail(
988 rko, RD_KAFKA_RESP_ERR__TIMED_OUT,
989 "Timed out %s",
990 rd_kafka_admin_state_desc[rko->rko_u.
991 admin_request.state]);
992 goto destroy;
993 }
994
995 redo:
996 switch (rko->rko_u.admin_request.state)
997 {
998 case RD_KAFKA_ADMIN_STATE_INIT:
999 {
1000 int32_t broker_id;
1001
1002 /* First call. */
1003
1004 /* Set up timeout timer. */
1005 rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce,
1006 "timeout timer");
1007 rd_kafka_timer_start_oneshot(&rk->rk_timers,
1008 &rko->rko_u.admin_request.tmr,
1009 rd_true, timeout_in,
1010 rd_kafka_admin_eonce_timeout_cb,
1011 rko->rko_u.admin_request.eonce);
1012
1013 /* Use explicitly specified broker_id, if available. */
1014 broker_id = (int32_t)rd_kafka_confval_get_int(
1015 &rko->rko_u.admin_request.options.broker);
1016
1017 if (broker_id != -1) {
1018 rd_kafka_dbg(rk, ADMIN, name,
1019 "%s using explicitly "
1020 "set broker id %"PRId32
1021 " rather than %"PRId32,
1022 name, broker_id,
1023 rko->rko_u.admin_request.broker_id);
1024 rko->rko_u.admin_request.broker_id = broker_id;
1025 } else {
1026 /* Default to controller */
1027 broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER;
1028 }
1029
1030 /* Resolve target broker(s) */
1031 switch (rko->rko_u.admin_request.broker_id)
1032 {
1033 case RD_KAFKA_ADMIN_TARGET_CONTROLLER:
1034 /* Controller */
1035 rko->rko_u.admin_request.state =
1036 RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER;
1037 goto redo; /* Trigger next state immediately */
1038
1039 case RD_KAFKA_ADMIN_TARGET_COORDINATOR:
1040 /* Group (or other) coordinator */
1041 rko->rko_u.admin_request.state =
1042 RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE;
1043 rd_kafka_enq_once_add_source(rko->rko_u.admin_request.
1044 eonce,
1045 "coordinator request");
1046 rd_kafka_coord_req(rk,
1047 rko->rko_u.admin_request.coordtype,
1048 rko->rko_u.admin_request.coordkey,
1049 rd_kafka_admin_coord_request,
1050 NULL,
1051 rd_kafka_admin_timeout_remains(rko),
1052 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1053 rd_kafka_admin_coord_response_parse,
1054 rko->rko_u.admin_request.eonce);
1055 /* Wait asynchronously for broker response, which will
1056 * trigger the eonce and worker to be called again. */
1057 return RD_KAFKA_OP_RES_KEEP;
1058
1059 case RD_KAFKA_ADMIN_TARGET_FANOUT:
1060 /* Shouldn't come here, fanouts are handled by
1061 * fanout_worker() */
1062 RD_NOTREACHED();
1063 return RD_KAFKA_OP_RES_KEEP;
1064
1065 default:
1066 /* Specific broker */
1067 rd_assert(rko->rko_u.admin_request.broker_id >= 0);
1068 rko->rko_u.admin_request.state =
1069 RD_KAFKA_ADMIN_STATE_WAIT_BROKER;
1070 goto redo; /* Trigger next state immediately */
1071 }
1072 }
1073
1074
1075 case RD_KAFKA_ADMIN_STATE_WAIT_BROKER:
1076 /* Broker lookup */
1077 if (!(rkb = rd_kafka_admin_common_get_broker(
1078 rk, rko, rko->rko_u.admin_request.broker_id))) {
1079 /* Still waiting for broker to become available */
1080 return RD_KAFKA_OP_RES_KEEP;
1081 }
1082
1083 rko->rko_u.admin_request.state =
1084 RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST;
1085 goto redo;
1086
1087 case RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER:
1088 if (!(rkb = rd_kafka_admin_common_get_controller(rk, rko))) {
1089 /* Still waiting for controller to become available. */
1090 return RD_KAFKA_OP_RES_KEEP;
1091 }
1092
1093 rko->rko_u.admin_request.state =
1094 RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST;
1095 goto redo;
1096
1097 case RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS:
1098 /* This state is only used by ADMIN_FANOUT which has
1099 * its own fanout_worker() */
1100 RD_NOTREACHED();
1101 break;
1102
1103 case RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST:
1104 /* Got broker, send protocol request. */
1105
1106 /* Make sure we're called from a 'goto redo' where
1107 * the rkb was set. */
1108 rd_assert(rkb);
1109
1110 /* Still need to use the eonce since this worker may
1111 * time out while waiting for response from broker, in which
1112 * case the broker response will hit an empty eonce (ok). */
1113 rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce,
1114 "send");
1115
1116 /* Send request (async) */
1117 err = rko->rko_u.admin_request.cbs->request(
1118 rkb,
1119 &rko->rko_u.admin_request.args,
1120 &rko->rko_u.admin_request.options,
1121 errstr, sizeof(errstr),
1122 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1123 rd_kafka_admin_handle_response,
1124 rko->rko_u.admin_request.eonce);
1125
1126 /* Loose broker refcount from get_broker(), get_controller() */
1127 rd_kafka_broker_destroy(rkb);
1128
1129 if (err) {
1130 rd_kafka_enq_once_del_source(
1131 rko->rko_u.admin_request.eonce, "send");
1132 rd_kafka_admin_result_fail(rko, err, "%s", errstr);
1133 goto destroy;
1134 }
1135
1136 rko->rko_u.admin_request.state =
1137 RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE;
1138
1139 /* Wait asynchronously for broker response, which will
1140 * trigger the eonce and worker to be called again. */
1141 return RD_KAFKA_OP_RES_KEEP;
1142
1143
1144 case RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE:
1145 rd_kafka_admin_response_parse(rko);
1146 goto destroy;
1147 }
1148
1149 return RD_KAFKA_OP_RES_KEEP;
1150
1151 destroy:
1152 rd_kafka_admin_common_worker_destroy(rk, rko,
1153 rd_false/*don't destroy*/);
1154 return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy() */
1155
1156 }
1157
1158
1159 /**
1160 * @brief Create a new admin_fanout op of type \p req_type and sets up the
1161 * generic (type independent files).
1162 *
1163 * The caller shall then populate the \c admin_fanout.requests list,
1164 * initialize the \c admin_fanout.responses list,
1165 * set the initial \c admin_fanout.outstanding value,
1166 * and enqueue the op on rk_ops for further processing work.
1167 *
1168 * @param cbs Callbacks, must reside in .data segment.
1169 * @param options Optional options, may be NULL to use defaults.
1170 * @param rkq is the application reply queue.
1171 *
1172 * @locks none
1173 * @locality application thread
1174 */
1175 static rd_kafka_op_t *
rd_kafka_admin_fanout_op_new(rd_kafka_t * rk,rd_kafka_op_type_t req_type,rd_kafka_event_type_t reply_event_type,const struct rd_kafka_admin_fanout_worker_cbs * cbs,const rd_kafka_AdminOptions_t * options,rd_kafka_q_t * rkq)1176 rd_kafka_admin_fanout_op_new (rd_kafka_t *rk,
1177 rd_kafka_op_type_t req_type,
1178 rd_kafka_event_type_t reply_event_type,
1179 const struct rd_kafka_admin_fanout_worker_cbs
1180 *cbs,
1181 const rd_kafka_AdminOptions_t *options,
1182 rd_kafka_q_t *rkq) {
1183 rd_kafka_op_t *rko;
1184
1185 rd_assert(rk);
1186 rd_assert(rkq);
1187 rd_assert(cbs);
1188
1189 rko = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_FANOUT);
1190 rko->rko_rk = rk;
1191
1192 rko->rko_u.admin_request.reply_event_type = reply_event_type;
1193
1194 rko->rko_u.admin_request.fanout.cbs =
1195 (struct rd_kafka_admin_fanout_worker_cbs *)cbs;
1196
1197 /* Make a copy of the options */
1198 if (options)
1199 rko->rko_u.admin_request.options = *options;
1200 else
1201 rd_kafka_AdminOptions_init(rk,
1202 &rko->rko_u.admin_request.options);
1203
1204 rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_FANOUT;
1205
1206 /* Calculate absolute timeout */
1207 rko->rko_u.admin_request.abs_timeout =
1208 rd_timeout_init(
1209 rd_kafka_confval_get_int(&rko->rko_u.admin_request.
1210 options.request_timeout));
1211
1212 /* Set up replyq */
1213 rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0);
1214
1215 rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS;
1216
1217 rko->rko_u.admin_request.fanout.reqtype = req_type;
1218
1219 return rko;
1220 }
1221
1222
1223 /**
1224 * @brief Common fanout worker state machine handling regardless of request type
1225 *
1226 * @param rko Result of a fanned out operation, e.g., DELETERECORDS result.
1227 *
1228 * Tasks:
1229 * - Checks for and responds to client termination
1230 * - Polls for fanned out responses
1231 * - Calls the partial response callback
1232 * - Calls the merge responses callback upon receipt of all partial responses
1233 * - Destruction of rko
1234 *
1235 * rko->rko_err may be one of:
1236 * RD_KAFKA_RESP_ERR_NO_ERROR, or
1237 * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup.
1238 *
1239 * @returns a hint to the op code whether the rko should be destroyed or not.
1240 */
1241 static rd_kafka_op_res_t
rd_kafka_admin_fanout_worker(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1242 rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq,
1243 rd_kafka_op_t *rko) {
1244 rd_kafka_op_t *rko_fanout = rko->rko_u.admin_result.fanout_parent;
1245 const char *name = rd_kafka_op2str(rko_fanout->rko_u.admin_request.
1246 fanout.reqtype);
1247 rd_kafka_op_t *rko_result;
1248
1249 RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_ADMIN_RESULT);
1250 RD_KAFKA_OP_TYPE_ASSERT(rko_fanout, RD_KAFKA_OP_ADMIN_FANOUT);
1251
1252 rd_assert(rko_fanout->rko_u.admin_request.fanout.outstanding > 0);
1253 rko_fanout->rko_u.admin_request.fanout.outstanding--;
1254
1255 rko->rko_u.admin_result.fanout_parent = NULL;
1256
1257 if (rd_kafka_terminating(rk)) {
1258 rd_kafka_dbg(rk, ADMIN, name,
1259 "%s fanout worker called for fanned out op %s: "
1260 "handle is terminating: %s",
1261 name,
1262 rd_kafka_op2str(rko->rko_type),
1263 rd_kafka_err2str(rko_fanout->rko_err));
1264 if (!rko->rko_err)
1265 rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY;
1266 }
1267
1268 rd_kafka_dbg(rk, ADMIN, name,
1269 "%s fanout worker called for %s with %d request(s) "
1270 "outstanding: %s",
1271 name,
1272 rd_kafka_op2str(rko->rko_type),
1273 rko_fanout->rko_u.admin_request.fanout.outstanding,
1274 rd_kafka_err2str(rko_fanout->rko_err));
1275
1276 /* Add partial response to rko_fanout's result list. */
1277 rko_fanout->rko_u.admin_request.
1278 fanout.cbs->partial_response(rko_fanout, rko);
1279
1280 if (rko_fanout->rko_u.admin_request.fanout.outstanding > 0)
1281 /* Wait for outstanding requests to finish */
1282 return RD_KAFKA_OP_RES_HANDLED;
1283
1284 rko_result = rd_kafka_admin_result_new(rko_fanout);
1285 rd_list_init_copy(&rko_result->rko_u.admin_result.results,
1286 &rko_fanout->rko_u.admin_request.fanout.results);
1287 rd_list_copy_to(&rko_result->rko_u.admin_result.results,
1288 &rko_fanout->rko_u.admin_request.fanout.results,
1289 rko_fanout->rko_u.admin_request.
1290 fanout.cbs->copy_result, NULL);
1291
1292 /* Enqueue result on application queue, we're done. */
1293 rd_kafka_replyq_enq(&rko_fanout->rko_u.admin_request.replyq, rko_result,
1294 rko_fanout->rko_u.admin_request.replyq.version);
1295
1296 /* FALLTHRU */
1297 if (rko_fanout->rko_u.admin_request.fanout.outstanding == 0)
1298 rd_kafka_op_destroy(rko_fanout);
1299
1300 return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy(rko) */
1301 }
1302
1303 /**@}*/
1304
1305
1306 /**
1307 * @name Generic AdminOptions
1308 * @{
1309 *
1310 *
1311 */
1312
1313 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_request_timeout(rd_kafka_AdminOptions_t * options,int timeout_ms,char * errstr,size_t errstr_size)1314 rd_kafka_AdminOptions_set_request_timeout (rd_kafka_AdminOptions_t *options,
1315 int timeout_ms,
1316 char *errstr, size_t errstr_size) {
1317 return rd_kafka_confval_set_type(&options->request_timeout,
1318 RD_KAFKA_CONFVAL_INT, &timeout_ms,
1319 errstr, errstr_size);
1320 }
1321
1322
1323 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_operation_timeout(rd_kafka_AdminOptions_t * options,int timeout_ms,char * errstr,size_t errstr_size)1324 rd_kafka_AdminOptions_set_operation_timeout (rd_kafka_AdminOptions_t *options,
1325 int timeout_ms,
1326 char *errstr, size_t errstr_size) {
1327 return rd_kafka_confval_set_type(&options->operation_timeout,
1328 RD_KAFKA_CONFVAL_INT, &timeout_ms,
1329 errstr, errstr_size);
1330 }
1331
1332
1333 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_validate_only(rd_kafka_AdminOptions_t * options,int true_or_false,char * errstr,size_t errstr_size)1334 rd_kafka_AdminOptions_set_validate_only (rd_kafka_AdminOptions_t *options,
1335 int true_or_false,
1336 char *errstr, size_t errstr_size) {
1337 return rd_kafka_confval_set_type(&options->validate_only,
1338 RD_KAFKA_CONFVAL_INT, &true_or_false,
1339 errstr, errstr_size);
1340 }
1341
1342 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_incremental(rd_kafka_AdminOptions_t * options,int true_or_false,char * errstr,size_t errstr_size)1343 rd_kafka_AdminOptions_set_incremental (rd_kafka_AdminOptions_t *options,
1344 int true_or_false,
1345 char *errstr, size_t errstr_size) {
1346 rd_snprintf(errstr, errstr_size,
1347 "Incremental updates currently not supported, see KIP-248");
1348 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
1349
1350 return rd_kafka_confval_set_type(&options->incremental,
1351 RD_KAFKA_CONFVAL_INT, &true_or_false,
1352 errstr, errstr_size);
1353 }
1354
1355 rd_kafka_resp_err_t
rd_kafka_AdminOptions_set_broker(rd_kafka_AdminOptions_t * options,int32_t broker_id,char * errstr,size_t errstr_size)1356 rd_kafka_AdminOptions_set_broker (rd_kafka_AdminOptions_t *options,
1357 int32_t broker_id,
1358 char *errstr, size_t errstr_size) {
1359 int ibroker_id = (int)broker_id;
1360
1361 return rd_kafka_confval_set_type(&options->broker,
1362 RD_KAFKA_CONFVAL_INT,
1363 &ibroker_id,
1364 errstr, errstr_size);
1365 }
1366
1367 void
rd_kafka_AdminOptions_set_opaque(rd_kafka_AdminOptions_t * options,void * opaque)1368 rd_kafka_AdminOptions_set_opaque (rd_kafka_AdminOptions_t *options,
1369 void *opaque) {
1370 rd_kafka_confval_set_type(&options->opaque,
1371 RD_KAFKA_CONFVAL_PTR, opaque, NULL, 0);
1372 }
1373
1374
1375 /**
1376 * @brief Initialize and set up defaults for AdminOptions
1377 */
rd_kafka_AdminOptions_init(rd_kafka_t * rk,rd_kafka_AdminOptions_t * options)1378 static void rd_kafka_AdminOptions_init (rd_kafka_t *rk,
1379 rd_kafka_AdminOptions_t *options) {
1380 rd_kafka_confval_init_int(&options->request_timeout, "request_timeout",
1381 0, 3600*1000,
1382 rk->rk_conf.admin.request_timeout_ms);
1383
1384 if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
1385 options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS ||
1386 options->for_api == RD_KAFKA_ADMIN_OP_DELETETOPICS ||
1387 options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS ||
1388 options->for_api == RD_KAFKA_ADMIN_OP_DELETERECORDS)
1389 rd_kafka_confval_init_int(&options->operation_timeout,
1390 "operation_timeout",
1391 -1, 3600*1000,
1392 rk->rk_conf.admin.request_timeout_ms);
1393 else
1394 rd_kafka_confval_disable(&options->operation_timeout,
1395 "operation_timeout");
1396
1397 if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
1398 options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS ||
1399 options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS ||
1400 options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS)
1401 rd_kafka_confval_init_int(&options->validate_only,
1402 "validate_only",
1403 0, 1, 0);
1404 else
1405 rd_kafka_confval_disable(&options->validate_only,
1406 "validate_only");
1407
1408 if (options->for_api == RD_KAFKA_ADMIN_OP_ANY ||
1409 options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS)
1410 rd_kafka_confval_init_int(&options->incremental,
1411 "incremental",
1412 0, 1, 0);
1413 else
1414 rd_kafka_confval_disable(&options->incremental,
1415 "incremental");
1416
1417 rd_kafka_confval_init_int(&options->broker, "broker",
1418 0, INT32_MAX, -1);
1419 rd_kafka_confval_init_ptr(&options->opaque, "opaque");
1420 }
1421
1422
1423 rd_kafka_AdminOptions_t *
rd_kafka_AdminOptions_new(rd_kafka_t * rk,rd_kafka_admin_op_t for_api)1424 rd_kafka_AdminOptions_new (rd_kafka_t *rk, rd_kafka_admin_op_t for_api) {
1425 rd_kafka_AdminOptions_t *options;
1426
1427 if ((int)for_api < 0 || for_api >= RD_KAFKA_ADMIN_OP__CNT)
1428 return NULL;
1429
1430 options = rd_calloc(1, sizeof(*options));
1431
1432 options->for_api = for_api;
1433
1434 rd_kafka_AdminOptions_init(rk, options);
1435
1436 return options;
1437 }
1438
rd_kafka_AdminOptions_destroy(rd_kafka_AdminOptions_t * options)1439 void rd_kafka_AdminOptions_destroy (rd_kafka_AdminOptions_t *options) {
1440 rd_free(options);
1441 }
1442
1443 /**@}*/
1444
1445
1446
1447
1448
1449
1450 /**
1451 * @name CreateTopics
1452 * @{
1453 *
1454 *
1455 *
1456 */
1457
1458
1459
1460 rd_kafka_NewTopic_t *
rd_kafka_NewTopic_new(const char * topic,int num_partitions,int replication_factor,char * errstr,size_t errstr_size)1461 rd_kafka_NewTopic_new (const char *topic,
1462 int num_partitions,
1463 int replication_factor,
1464 char *errstr, size_t errstr_size) {
1465 rd_kafka_NewTopic_t *new_topic;
1466
1467 if (!topic) {
1468 rd_snprintf(errstr, errstr_size, "Invalid topic name");
1469 return NULL;
1470 }
1471
1472 if (num_partitions < -1 || num_partitions > RD_KAFKAP_PARTITIONS_MAX) {
1473 rd_snprintf(errstr, errstr_size, "num_partitions out of "
1474 "expected range %d..%d or -1 for broker default",
1475 1, RD_KAFKAP_PARTITIONS_MAX);
1476 return NULL;
1477 }
1478
1479 if (replication_factor < -1 ||
1480 replication_factor > RD_KAFKAP_BROKERS_MAX) {
1481 rd_snprintf(errstr, errstr_size,
1482 "replication_factor out of expected range %d..%d",
1483 -1, RD_KAFKAP_BROKERS_MAX);
1484 return NULL;
1485 }
1486
1487 new_topic = rd_calloc(1, sizeof(*new_topic));
1488 new_topic->topic = rd_strdup(topic);
1489 new_topic->num_partitions = num_partitions;
1490 new_topic->replication_factor = replication_factor;
1491
1492 /* List of int32 lists */
1493 rd_list_init(&new_topic->replicas, 0, rd_list_destroy_free);
1494 rd_list_prealloc_elems(&new_topic->replicas, 0,
1495 num_partitions == -1 ? 0 : num_partitions,
1496 0/*nozero*/);
1497
1498 /* List of ConfigEntrys */
1499 rd_list_init(&new_topic->config, 0, rd_kafka_ConfigEntry_free);
1500
1501 return new_topic;
1502
1503 }
1504
1505
1506 /**
1507 * @brief Topic name comparator for NewTopic_t
1508 */
rd_kafka_NewTopic_cmp(const void * _a,const void * _b)1509 static int rd_kafka_NewTopic_cmp (const void *_a, const void *_b) {
1510 const rd_kafka_NewTopic_t *a = _a, *b = _b;
1511 return strcmp(a->topic, b->topic);
1512 }
1513
1514
1515
1516 /**
1517 * @brief Allocate a new NewTopic and make a copy of \p src
1518 */
1519 static rd_kafka_NewTopic_t *
rd_kafka_NewTopic_copy(const rd_kafka_NewTopic_t * src)1520 rd_kafka_NewTopic_copy (const rd_kafka_NewTopic_t *src) {
1521 rd_kafka_NewTopic_t *dst;
1522
1523 dst = rd_kafka_NewTopic_new(src->topic, src->num_partitions,
1524 src->replication_factor, NULL, 0);
1525 rd_assert(dst);
1526
1527 rd_list_destroy(&dst->replicas); /* created in .._new() */
1528 rd_list_init_copy(&dst->replicas, &src->replicas);
1529 rd_list_copy_to(&dst->replicas, &src->replicas,
1530 rd_list_copy_preallocated, NULL);
1531
1532 rd_list_init_copy(&dst->config, &src->config);
1533 rd_list_copy_to(&dst->config, &src->config,
1534 rd_kafka_ConfigEntry_list_copy, NULL);
1535
1536 return dst;
1537 }
1538
rd_kafka_NewTopic_destroy(rd_kafka_NewTopic_t * new_topic)1539 void rd_kafka_NewTopic_destroy (rd_kafka_NewTopic_t *new_topic) {
1540 rd_list_destroy(&new_topic->replicas);
1541 rd_list_destroy(&new_topic->config);
1542 rd_free(new_topic->topic);
1543 rd_free(new_topic);
1544 }
1545
rd_kafka_NewTopic_free(void * ptr)1546 static void rd_kafka_NewTopic_free (void *ptr) {
1547 rd_kafka_NewTopic_destroy(ptr);
1548 }
1549
1550 void
rd_kafka_NewTopic_destroy_array(rd_kafka_NewTopic_t ** new_topics,size_t new_topic_cnt)1551 rd_kafka_NewTopic_destroy_array (rd_kafka_NewTopic_t **new_topics,
1552 size_t new_topic_cnt) {
1553 size_t i;
1554 for (i = 0 ; i < new_topic_cnt ; i++)
1555 rd_kafka_NewTopic_destroy(new_topics[i]);
1556 }
1557
1558
1559 rd_kafka_resp_err_t
rd_kafka_NewTopic_set_replica_assignment(rd_kafka_NewTopic_t * new_topic,int32_t partition,int32_t * broker_ids,size_t broker_id_cnt,char * errstr,size_t errstr_size)1560 rd_kafka_NewTopic_set_replica_assignment (rd_kafka_NewTopic_t *new_topic,
1561 int32_t partition,
1562 int32_t *broker_ids,
1563 size_t broker_id_cnt,
1564 char *errstr, size_t errstr_size) {
1565 rd_list_t *rl;
1566 int i;
1567
1568 if (new_topic->replication_factor != -1) {
1569 rd_snprintf(errstr, errstr_size,
1570 "Specifying a replication factor and "
1571 "a replica assignment are mutually exclusive");
1572 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1573 } else if (new_topic->num_partitions == -1) {
1574 rd_snprintf(errstr, errstr_size,
1575 "Specifying a default partition count and a "
1576 "replica assignment are mutually exclusive");
1577 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1578 }
1579
1580 /* Replica partitions must be added consecutively starting from 0. */
1581 if (partition != rd_list_cnt(&new_topic->replicas)) {
1582 rd_snprintf(errstr, errstr_size,
1583 "Partitions must be added in order, "
1584 "starting at 0: expecting partition %d, "
1585 "not %"PRId32,
1586 rd_list_cnt(&new_topic->replicas), partition);
1587 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1588 }
1589
1590 if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) {
1591 rd_snprintf(errstr, errstr_size,
1592 "Too many brokers specified "
1593 "(RD_KAFKAP_BROKERS_MAX=%d)",
1594 RD_KAFKAP_BROKERS_MAX);
1595 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1596 }
1597
1598
1599 rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt);
1600
1601 for (i = 0 ; i < (int)broker_id_cnt ; i++)
1602 rd_list_set_int32(rl, i, broker_ids[i]);
1603
1604 rd_list_add(&new_topic->replicas, rl);
1605
1606 return RD_KAFKA_RESP_ERR_NO_ERROR;
1607 }
1608
1609
1610 /**
1611 * @brief Generic constructor of ConfigEntry which is also added to \p rl
1612 */
1613 static rd_kafka_resp_err_t
rd_kafka_admin_add_config0(rd_list_t * rl,const char * name,const char * value,rd_kafka_AlterOperation_t operation)1614 rd_kafka_admin_add_config0 (rd_list_t *rl,
1615 const char *name, const char *value,
1616 rd_kafka_AlterOperation_t operation) {
1617 rd_kafka_ConfigEntry_t *entry;
1618
1619 if (!name)
1620 return RD_KAFKA_RESP_ERR__INVALID_ARG;
1621
1622 entry = rd_calloc(1, sizeof(*entry));
1623 entry->kv = rd_strtup_new(name, value);
1624 entry->a.operation = operation;
1625
1626 rd_list_add(rl, entry);
1627
1628 return RD_KAFKA_RESP_ERR_NO_ERROR;
1629 }
1630
1631
1632 rd_kafka_resp_err_t
rd_kafka_NewTopic_set_config(rd_kafka_NewTopic_t * new_topic,const char * name,const char * value)1633 rd_kafka_NewTopic_set_config (rd_kafka_NewTopic_t *new_topic,
1634 const char *name, const char *value) {
1635 return rd_kafka_admin_add_config0(&new_topic->config, name, value,
1636 RD_KAFKA_ALTER_OP_ADD);
1637 }
1638
1639
1640
1641 /**
1642 * @brief Parse CreateTopicsResponse and create ADMIN_RESULT op.
1643 */
1644 static rd_kafka_resp_err_t
rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)1645 rd_kafka_CreateTopicsResponse_parse (rd_kafka_op_t *rko_req,
1646 rd_kafka_op_t **rko_resultp,
1647 rd_kafka_buf_t *reply,
1648 char *errstr, size_t errstr_size) {
1649 const int log_decode_errors = LOG_ERR;
1650 rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
1651 rd_kafka_t *rk = rkb->rkb_rk;
1652 rd_kafka_op_t *rko_result = NULL;
1653 int32_t topic_cnt;
1654 int i;
1655
1656 if (rd_kafka_buf_ApiVersion(reply) >= 2) {
1657 int32_t Throttle_Time;
1658 rd_kafka_buf_read_i32(reply, &Throttle_Time);
1659 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
1660 }
1661
1662 /* #topics */
1663 rd_kafka_buf_read_i32(reply, &topic_cnt);
1664
1665 if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
1666 rd_kafka_buf_parse_fail(
1667 reply,
1668 "Received %"PRId32" topics in response "
1669 "when only %d were requested", topic_cnt,
1670 rd_list_cnt(&rko_req->rko_u.admin_request.args));
1671
1672
1673 rko_result = rd_kafka_admin_result_new(rko_req);
1674
1675 rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt,
1676 rd_kafka_topic_result_free);
1677
1678 for (i = 0 ; i < (int)topic_cnt ; i++) {
1679 rd_kafkap_str_t ktopic;
1680 int16_t error_code;
1681 rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER;
1682 char *this_errstr = NULL;
1683 rd_kafka_topic_result_t *terr;
1684 rd_kafka_NewTopic_t skel;
1685 int orig_pos;
1686
1687 rd_kafka_buf_read_str(reply, &ktopic);
1688 rd_kafka_buf_read_i16(reply, &error_code);
1689
1690 if (rd_kafka_buf_ApiVersion(reply) >= 1)
1691 rd_kafka_buf_read_str(reply, &error_msg);
1692
1693 /* For non-blocking CreateTopicsRequests the broker
1694 * will returned REQUEST_TIMED_OUT for topics
1695 * that were triggered for creation -
1696 * we hide this error code from the application
1697 * since the topic creation is in fact in progress. */
1698 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
1699 rd_kafka_confval_get_int(&rko_req->rko_u.
1700 admin_request.options.
1701 operation_timeout) <= 0) {
1702 error_code = RD_KAFKA_RESP_ERR_NO_ERROR;
1703 this_errstr = NULL;
1704 }
1705
1706 if (error_code) {
1707 if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
1708 RD_KAFKAP_STR_LEN(&error_msg) == 0)
1709 this_errstr =
1710 (char *)rd_kafka_err2str(error_code);
1711 else
1712 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
1713
1714 }
1715
1716 terr = rd_kafka_topic_result_new(ktopic.str,
1717 RD_KAFKAP_STR_LEN(&ktopic),
1718 error_code, this_errstr);
1719
1720 /* As a convenience to the application we insert topic result
1721 * in the same order as they were requested. The broker
1722 * does not maintain ordering unfortunately. */
1723 skel.topic = terr->topic;
1724 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
1725 &skel, rd_kafka_NewTopic_cmp);
1726 if (orig_pos == -1) {
1727 rd_kafka_topic_result_destroy(terr);
1728 rd_kafka_buf_parse_fail(
1729 reply,
1730 "Broker returned topic %.*s that was not "
1731 "included in the original request",
1732 RD_KAFKAP_STR_PR(&ktopic));
1733 }
1734
1735 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
1736 orig_pos) != NULL) {
1737 rd_kafka_topic_result_destroy(terr);
1738 rd_kafka_buf_parse_fail(
1739 reply,
1740 "Broker returned topic %.*s multiple times",
1741 RD_KAFKAP_STR_PR(&ktopic));
1742 }
1743
1744 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
1745 terr);
1746 }
1747
1748 *rko_resultp = rko_result;
1749
1750 return RD_KAFKA_RESP_ERR_NO_ERROR;
1751
1752 err_parse:
1753 if (rko_result)
1754 rd_kafka_op_destroy(rko_result);
1755
1756 rd_snprintf(errstr, errstr_size,
1757 "CreateTopics response protocol parse failure: %s",
1758 rd_kafka_err2str(reply->rkbuf_err));
1759
1760 return reply->rkbuf_err;
1761 }
1762
1763
rd_kafka_CreateTopics(rd_kafka_t * rk,rd_kafka_NewTopic_t ** new_topics,size_t new_topic_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)1764 void rd_kafka_CreateTopics (rd_kafka_t *rk,
1765 rd_kafka_NewTopic_t **new_topics,
1766 size_t new_topic_cnt,
1767 const rd_kafka_AdminOptions_t *options,
1768 rd_kafka_queue_t *rkqu) {
1769 rd_kafka_op_t *rko;
1770 size_t i;
1771 static const struct rd_kafka_admin_worker_cbs cbs = {
1772 rd_kafka_CreateTopicsRequest,
1773 rd_kafka_CreateTopicsResponse_parse,
1774 };
1775
1776 rd_assert(rkqu);
1777
1778 rko = rd_kafka_admin_request_op_new(rk,
1779 RD_KAFKA_OP_CREATETOPICS,
1780 RD_KAFKA_EVENT_CREATETOPICS_RESULT,
1781 &cbs, options, rkqu->rkqu_q);
1782
1783 rd_list_init(&rko->rko_u.admin_request.args, (int)new_topic_cnt,
1784 rd_kafka_NewTopic_free);
1785
1786 for (i = 0 ; i < new_topic_cnt ; i++)
1787 rd_list_add(&rko->rko_u.admin_request.args,
1788 rd_kafka_NewTopic_copy(new_topics[i]));
1789
1790 rd_kafka_q_enq(rk->rk_ops, rko);
1791 }
1792
1793
1794 /**
1795 * @brief Get an array of topic results from a CreateTopics result.
1796 *
1797 * The returned \p topics life-time is the same as the \p result object.
1798 * @param cntp is updated to the number of elements in the array.
1799 */
1800 const rd_kafka_topic_result_t **
rd_kafka_CreateTopics_result_topics(const rd_kafka_CreateTopics_result_t * result,size_t * cntp)1801 rd_kafka_CreateTopics_result_topics (
1802 const rd_kafka_CreateTopics_result_t *result,
1803 size_t *cntp) {
1804 return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result,
1805 cntp);
1806 }
1807
1808 /**@}*/
1809
1810
1811
1812
1813 /**
1814 * @name Delete topics
1815 * @{
1816 *
1817 *
1818 *
1819 *
1820 */
1821
rd_kafka_DeleteTopic_new(const char * topic)1822 rd_kafka_DeleteTopic_t *rd_kafka_DeleteTopic_new (const char *topic) {
1823 size_t tsize = strlen(topic) + 1;
1824 rd_kafka_DeleteTopic_t *del_topic;
1825
1826 /* Single allocation */
1827 del_topic = rd_malloc(sizeof(*del_topic) + tsize);
1828 del_topic->topic = del_topic->data;
1829 memcpy(del_topic->topic, topic, tsize);
1830
1831 return del_topic;
1832 }
1833
rd_kafka_DeleteTopic_destroy(rd_kafka_DeleteTopic_t * del_topic)1834 void rd_kafka_DeleteTopic_destroy (rd_kafka_DeleteTopic_t *del_topic) {
1835 rd_free(del_topic);
1836 }
1837
rd_kafka_DeleteTopic_free(void * ptr)1838 static void rd_kafka_DeleteTopic_free (void *ptr) {
1839 rd_kafka_DeleteTopic_destroy(ptr);
1840 }
1841
1842
rd_kafka_DeleteTopic_destroy_array(rd_kafka_DeleteTopic_t ** del_topics,size_t del_topic_cnt)1843 void rd_kafka_DeleteTopic_destroy_array (rd_kafka_DeleteTopic_t **del_topics,
1844 size_t del_topic_cnt) {
1845 size_t i;
1846 for (i = 0 ; i < del_topic_cnt ; i++)
1847 rd_kafka_DeleteTopic_destroy(del_topics[i]);
1848 }
1849
1850
1851 /**
1852 * @brief Topic name comparator for DeleteTopic_t
1853 */
rd_kafka_DeleteTopic_cmp(const void * _a,const void * _b)1854 static int rd_kafka_DeleteTopic_cmp (const void *_a, const void *_b) {
1855 const rd_kafka_DeleteTopic_t *a = _a, *b = _b;
1856 return strcmp(a->topic, b->topic);
1857 }
1858
1859 /**
1860 * @brief Allocate a new DeleteTopic and make a copy of \p src
1861 */
1862 static rd_kafka_DeleteTopic_t *
rd_kafka_DeleteTopic_copy(const rd_kafka_DeleteTopic_t * src)1863 rd_kafka_DeleteTopic_copy (const rd_kafka_DeleteTopic_t *src) {
1864 return rd_kafka_DeleteTopic_new(src->topic);
1865 }
1866
1867
1868
1869
1870
1871
1872
1873 /**
1874 * @brief Parse DeleteTopicsResponse and create ADMIN_RESULT op.
1875 */
1876 static rd_kafka_resp_err_t
rd_kafka_DeleteTopicsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)1877 rd_kafka_DeleteTopicsResponse_parse (rd_kafka_op_t *rko_req,
1878 rd_kafka_op_t **rko_resultp,
1879 rd_kafka_buf_t *reply,
1880 char *errstr, size_t errstr_size) {
1881 const int log_decode_errors = LOG_ERR;
1882 rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
1883 rd_kafka_t *rk = rkb->rkb_rk;
1884 rd_kafka_op_t *rko_result = NULL;
1885 int32_t topic_cnt;
1886 int i;
1887
1888 if (rd_kafka_buf_ApiVersion(reply) >= 1) {
1889 int32_t Throttle_Time;
1890 rd_kafka_buf_read_i32(reply, &Throttle_Time);
1891 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
1892 }
1893
1894 /* #topics */
1895 rd_kafka_buf_read_i32(reply, &topic_cnt);
1896
1897 if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
1898 rd_kafka_buf_parse_fail(
1899 reply,
1900 "Received %"PRId32" topics in response "
1901 "when only %d were requested", topic_cnt,
1902 rd_list_cnt(&rko_req->rko_u.admin_request.args));
1903
1904 rko_result = rd_kafka_admin_result_new(rko_req);
1905
1906 rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt,
1907 rd_kafka_topic_result_free);
1908
1909 for (i = 0 ; i < (int)topic_cnt ; i++) {
1910 rd_kafkap_str_t ktopic;
1911 int16_t error_code;
1912 rd_kafka_topic_result_t *terr;
1913 rd_kafka_NewTopic_t skel;
1914 int orig_pos;
1915
1916 rd_kafka_buf_read_str(reply, &ktopic);
1917 rd_kafka_buf_read_i16(reply, &error_code);
1918
1919 /* For non-blocking DeleteTopicsRequests the broker
1920 * will returned REQUEST_TIMED_OUT for topics
1921 * that were triggered for creation -
1922 * we hide this error code from the application
1923 * since the topic creation is in fact in progress. */
1924 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
1925 rd_kafka_confval_get_int(&rko_req->rko_u.
1926 admin_request.options.
1927 operation_timeout) <= 0) {
1928 error_code = RD_KAFKA_RESP_ERR_NO_ERROR;
1929 }
1930
1931 terr = rd_kafka_topic_result_new(ktopic.str,
1932 RD_KAFKAP_STR_LEN(&ktopic),
1933 error_code,
1934 error_code ?
1935 rd_kafka_err2str(error_code) :
1936 NULL);
1937
1938 /* As a convenience to the application we insert topic result
1939 * in the same order as they were requested. The broker
1940 * does not maintain ordering unfortunately. */
1941 skel.topic = terr->topic;
1942 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
1943 &skel, rd_kafka_DeleteTopic_cmp);
1944 if (orig_pos == -1) {
1945 rd_kafka_topic_result_destroy(terr);
1946 rd_kafka_buf_parse_fail(
1947 reply,
1948 "Broker returned topic %.*s that was not "
1949 "included in the original request",
1950 RD_KAFKAP_STR_PR(&ktopic));
1951 }
1952
1953 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
1954 orig_pos) != NULL) {
1955 rd_kafka_topic_result_destroy(terr);
1956 rd_kafka_buf_parse_fail(
1957 reply,
1958 "Broker returned topic %.*s multiple times",
1959 RD_KAFKAP_STR_PR(&ktopic));
1960 }
1961
1962 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
1963 terr);
1964 }
1965
1966 *rko_resultp = rko_result;
1967
1968 return RD_KAFKA_RESP_ERR_NO_ERROR;
1969
1970 err_parse:
1971 if (rko_result)
1972 rd_kafka_op_destroy(rko_result);
1973
1974 rd_snprintf(errstr, errstr_size,
1975 "DeleteTopics response protocol parse failure: %s",
1976 rd_kafka_err2str(reply->rkbuf_err));
1977
1978 return reply->rkbuf_err;
1979 }
1980
1981
1982
1983
1984
1985
rd_kafka_DeleteTopics(rd_kafka_t * rk,rd_kafka_DeleteTopic_t ** del_topics,size_t del_topic_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)1986 void rd_kafka_DeleteTopics (rd_kafka_t *rk,
1987 rd_kafka_DeleteTopic_t **del_topics,
1988 size_t del_topic_cnt,
1989 const rd_kafka_AdminOptions_t *options,
1990 rd_kafka_queue_t *rkqu) {
1991 rd_kafka_op_t *rko;
1992 size_t i;
1993 static const struct rd_kafka_admin_worker_cbs cbs = {
1994 rd_kafka_DeleteTopicsRequest,
1995 rd_kafka_DeleteTopicsResponse_parse,
1996 };
1997
1998 rd_assert(rkqu);
1999
2000 rko = rd_kafka_admin_request_op_new(rk,
2001 RD_KAFKA_OP_DELETETOPICS,
2002 RD_KAFKA_EVENT_DELETETOPICS_RESULT,
2003 &cbs, options, rkqu->rkqu_q);
2004
2005 rd_list_init(&rko->rko_u.admin_request.args, (int)del_topic_cnt,
2006 rd_kafka_DeleteTopic_free);
2007
2008 for (i = 0 ; i < del_topic_cnt ; i++)
2009 rd_list_add(&rko->rko_u.admin_request.args,
2010 rd_kafka_DeleteTopic_copy(del_topics[i]));
2011
2012 rd_kafka_q_enq(rk->rk_ops, rko);
2013 }
2014
2015
2016 /**
2017 * @brief Get an array of topic results from a DeleteTopics result.
2018 *
2019 * The returned \p topics life-time is the same as the \p result object.
2020 * @param cntp is updated to the number of elements in the array.
2021 */
2022 const rd_kafka_topic_result_t **
rd_kafka_DeleteTopics_result_topics(const rd_kafka_DeleteTopics_result_t * result,size_t * cntp)2023 rd_kafka_DeleteTopics_result_topics (
2024 const rd_kafka_DeleteTopics_result_t *result,
2025 size_t *cntp) {
2026 return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result,
2027 cntp);
2028 }
2029
2030
2031
2032
2033 /**
2034 * @name Create partitions
2035 * @{
2036 *
2037 *
2038 *
2039 *
2040 */
2041
rd_kafka_NewPartitions_new(const char * topic,size_t new_total_cnt,char * errstr,size_t errstr_size)2042 rd_kafka_NewPartitions_t *rd_kafka_NewPartitions_new (const char *topic,
2043 size_t new_total_cnt,
2044 char *errstr,
2045 size_t errstr_size) {
2046 size_t tsize = strlen(topic) + 1;
2047 rd_kafka_NewPartitions_t *newps;
2048
2049 if (new_total_cnt < 1 || new_total_cnt > RD_KAFKAP_PARTITIONS_MAX) {
2050 rd_snprintf(errstr, errstr_size, "new_total_cnt out of "
2051 "expected range %d..%d",
2052 1, RD_KAFKAP_PARTITIONS_MAX);
2053 return NULL;
2054 }
2055
2056 /* Single allocation */
2057 newps = rd_malloc(sizeof(*newps) + tsize);
2058 newps->total_cnt = new_total_cnt;
2059 newps->topic = newps->data;
2060 memcpy(newps->topic, topic, tsize);
2061
2062 /* List of int32 lists */
2063 rd_list_init(&newps->replicas, 0, rd_list_destroy_free);
2064 rd_list_prealloc_elems(&newps->replicas, 0, new_total_cnt, 0/*nozero*/);
2065
2066 return newps;
2067 }
2068
2069 /**
2070 * @brief Topic name comparator for NewPartitions_t
2071 */
rd_kafka_NewPartitions_cmp(const void * _a,const void * _b)2072 static int rd_kafka_NewPartitions_cmp (const void *_a, const void *_b) {
2073 const rd_kafka_NewPartitions_t *a = _a, *b = _b;
2074 return strcmp(a->topic, b->topic);
2075 }
2076
2077
2078 /**
2079 * @brief Allocate a new CreatePartitions and make a copy of \p src
2080 */
2081 static rd_kafka_NewPartitions_t *
rd_kafka_NewPartitions_copy(const rd_kafka_NewPartitions_t * src)2082 rd_kafka_NewPartitions_copy (const rd_kafka_NewPartitions_t *src) {
2083 rd_kafka_NewPartitions_t *dst;
2084
2085 dst = rd_kafka_NewPartitions_new(src->topic, src->total_cnt, NULL, 0);
2086
2087 rd_list_destroy(&dst->replicas); /* created in .._new() */
2088 rd_list_init_copy(&dst->replicas, &src->replicas);
2089 rd_list_copy_to(&dst->replicas, &src->replicas,
2090 rd_list_copy_preallocated, NULL);
2091
2092 return dst;
2093 }
2094
rd_kafka_NewPartitions_destroy(rd_kafka_NewPartitions_t * newps)2095 void rd_kafka_NewPartitions_destroy (rd_kafka_NewPartitions_t *newps) {
2096 rd_list_destroy(&newps->replicas);
2097 rd_free(newps);
2098 }
2099
rd_kafka_NewPartitions_free(void * ptr)2100 static void rd_kafka_NewPartitions_free (void *ptr) {
2101 rd_kafka_NewPartitions_destroy(ptr);
2102 }
2103
2104
rd_kafka_NewPartitions_destroy_array(rd_kafka_NewPartitions_t ** newps,size_t newps_cnt)2105 void rd_kafka_NewPartitions_destroy_array (rd_kafka_NewPartitions_t **newps,
2106 size_t newps_cnt) {
2107 size_t i;
2108 for (i = 0 ; i < newps_cnt ; i++)
2109 rd_kafka_NewPartitions_destroy(newps[i]);
2110 }
2111
2112
2113
2114
2115
2116 rd_kafka_resp_err_t
rd_kafka_NewPartitions_set_replica_assignment(rd_kafka_NewPartitions_t * newp,int32_t new_partition_idx,int32_t * broker_ids,size_t broker_id_cnt,char * errstr,size_t errstr_size)2117 rd_kafka_NewPartitions_set_replica_assignment (rd_kafka_NewPartitions_t *newp,
2118 int32_t new_partition_idx,
2119 int32_t *broker_ids,
2120 size_t broker_id_cnt,
2121 char *errstr,
2122 size_t errstr_size) {
2123 rd_list_t *rl;
2124 int i;
2125
2126 /* Replica partitions must be added consecutively starting from 0. */
2127 if (new_partition_idx != rd_list_cnt(&newp->replicas)) {
2128 rd_snprintf(errstr, errstr_size,
2129 "Partitions must be added in order, "
2130 "starting at 0: expecting partition "
2131 "index %d, not %"PRId32,
2132 rd_list_cnt(&newp->replicas), new_partition_idx);
2133 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2134 }
2135
2136 if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) {
2137 rd_snprintf(errstr, errstr_size,
2138 "Too many brokers specified "
2139 "(RD_KAFKAP_BROKERS_MAX=%d)",
2140 RD_KAFKAP_BROKERS_MAX);
2141 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2142 }
2143
2144 rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt);
2145
2146 for (i = 0 ; i < (int)broker_id_cnt ; i++)
2147 rd_list_set_int32(rl, i, broker_ids[i]);
2148
2149 rd_list_add(&newp->replicas, rl);
2150
2151 return RD_KAFKA_RESP_ERR_NO_ERROR;
2152 }
2153
2154
2155
2156
2157 /**
2158 * @brief Parse CreatePartitionsResponse and create ADMIN_RESULT op.
2159 */
2160 static rd_kafka_resp_err_t
rd_kafka_CreatePartitionsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)2161 rd_kafka_CreatePartitionsResponse_parse (rd_kafka_op_t *rko_req,
2162 rd_kafka_op_t **rko_resultp,
2163 rd_kafka_buf_t *reply,
2164 char *errstr,
2165 size_t errstr_size) {
2166 const int log_decode_errors = LOG_ERR;
2167 rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
2168 rd_kafka_t *rk = rkb->rkb_rk;
2169 rd_kafka_op_t *rko_result = NULL;
2170 int32_t topic_cnt;
2171 int i;
2172 int32_t Throttle_Time;
2173
2174 rd_kafka_buf_read_i32(reply, &Throttle_Time);
2175 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
2176
2177 /* #topics */
2178 rd_kafka_buf_read_i32(reply, &topic_cnt);
2179
2180 if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
2181 rd_kafka_buf_parse_fail(
2182 reply,
2183 "Received %"PRId32" topics in response "
2184 "when only %d were requested", topic_cnt,
2185 rd_list_cnt(&rko_req->rko_u.admin_request.args));
2186
2187 rko_result = rd_kafka_admin_result_new(rko_req);
2188
2189 rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt,
2190 rd_kafka_topic_result_free);
2191
2192 for (i = 0 ; i < (int)topic_cnt ; i++) {
2193 rd_kafkap_str_t ktopic;
2194 int16_t error_code;
2195 char *this_errstr = NULL;
2196 rd_kafka_topic_result_t *terr;
2197 rd_kafka_NewTopic_t skel;
2198 rd_kafkap_str_t error_msg;
2199 int orig_pos;
2200
2201 rd_kafka_buf_read_str(reply, &ktopic);
2202 rd_kafka_buf_read_i16(reply, &error_code);
2203 rd_kafka_buf_read_str(reply, &error_msg);
2204
2205 /* For non-blocking CreatePartitionsRequests the broker
2206 * will returned REQUEST_TIMED_OUT for topics
2207 * that were triggered for creation -
2208 * we hide this error code from the application
2209 * since the topic creation is in fact in progress. */
2210 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT &&
2211 rd_kafka_confval_get_int(&rko_req->rko_u.
2212 admin_request.options.
2213 operation_timeout) <= 0) {
2214 error_code = RD_KAFKA_RESP_ERR_NO_ERROR;
2215 }
2216
2217 if (error_code) {
2218 if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
2219 RD_KAFKAP_STR_LEN(&error_msg) == 0)
2220 this_errstr =
2221 (char *)rd_kafka_err2str(error_code);
2222 else
2223 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
2224 }
2225
2226 terr = rd_kafka_topic_result_new(ktopic.str,
2227 RD_KAFKAP_STR_LEN(&ktopic),
2228 error_code,
2229 error_code ?
2230 this_errstr : NULL);
2231
2232 /* As a convenience to the application we insert topic result
2233 * in the same order as they were requested. The broker
2234 * does not maintain ordering unfortunately. */
2235 skel.topic = terr->topic;
2236 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
2237 &skel, rd_kafka_NewPartitions_cmp);
2238 if (orig_pos == -1) {
2239 rd_kafka_topic_result_destroy(terr);
2240 rd_kafka_buf_parse_fail(
2241 reply,
2242 "Broker returned topic %.*s that was not "
2243 "included in the original request",
2244 RD_KAFKAP_STR_PR(&ktopic));
2245 }
2246
2247 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
2248 orig_pos) != NULL) {
2249 rd_kafka_topic_result_destroy(terr);
2250 rd_kafka_buf_parse_fail(
2251 reply,
2252 "Broker returned topic %.*s multiple times",
2253 RD_KAFKAP_STR_PR(&ktopic));
2254 }
2255
2256 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
2257 terr);
2258 }
2259
2260 *rko_resultp = rko_result;
2261
2262 return RD_KAFKA_RESP_ERR_NO_ERROR;
2263
2264 err_parse:
2265 if (rko_result)
2266 rd_kafka_op_destroy(rko_result);
2267
2268 rd_snprintf(errstr, errstr_size,
2269 "CreatePartitions response protocol parse failure: %s",
2270 rd_kafka_err2str(reply->rkbuf_err));
2271
2272 return reply->rkbuf_err;
2273 }
2274
2275
2276
2277
2278
2279
2280
rd_kafka_CreatePartitions(rd_kafka_t * rk,rd_kafka_NewPartitions_t ** newps,size_t newps_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)2281 void rd_kafka_CreatePartitions (rd_kafka_t *rk,
2282 rd_kafka_NewPartitions_t **newps,
2283 size_t newps_cnt,
2284 const rd_kafka_AdminOptions_t *options,
2285 rd_kafka_queue_t *rkqu) {
2286 rd_kafka_op_t *rko;
2287 size_t i;
2288 static const struct rd_kafka_admin_worker_cbs cbs = {
2289 rd_kafka_CreatePartitionsRequest,
2290 rd_kafka_CreatePartitionsResponse_parse,
2291 };
2292
2293 rd_assert(rkqu);
2294
2295 rko = rd_kafka_admin_request_op_new(
2296 rk,
2297 RD_KAFKA_OP_CREATEPARTITIONS,
2298 RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT,
2299 &cbs, options, rkqu->rkqu_q);
2300
2301 rd_list_init(&rko->rko_u.admin_request.args, (int)newps_cnt,
2302 rd_kafka_NewPartitions_free);
2303
2304 for (i = 0 ; i < newps_cnt ; i++)
2305 rd_list_add(&rko->rko_u.admin_request.args,
2306 rd_kafka_NewPartitions_copy(newps[i]));
2307
2308 rd_kafka_q_enq(rk->rk_ops, rko);
2309 }
2310
2311
2312 /**
2313 * @brief Get an array of topic results from a CreatePartitions result.
2314 *
2315 * The returned \p topics life-time is the same as the \p result object.
2316 * @param cntp is updated to the number of elements in the array.
2317 */
2318 const rd_kafka_topic_result_t **
rd_kafka_CreatePartitions_result_topics(const rd_kafka_CreatePartitions_result_t * result,size_t * cntp)2319 rd_kafka_CreatePartitions_result_topics (
2320 const rd_kafka_CreatePartitions_result_t *result,
2321 size_t *cntp) {
2322 return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result,
2323 cntp);
2324 }
2325
2326 /**@}*/
2327
2328
2329
2330
2331 /**
2332 * @name ConfigEntry
2333 * @{
2334 *
2335 *
2336 *
2337 */
2338
rd_kafka_ConfigEntry_destroy(rd_kafka_ConfigEntry_t * entry)2339 static void rd_kafka_ConfigEntry_destroy (rd_kafka_ConfigEntry_t *entry) {
2340 rd_strtup_destroy(entry->kv);
2341 rd_list_destroy(&entry->synonyms);
2342 rd_free(entry);
2343 }
2344
2345
rd_kafka_ConfigEntry_free(void * ptr)2346 static void rd_kafka_ConfigEntry_free (void *ptr) {
2347 rd_kafka_ConfigEntry_destroy((rd_kafka_ConfigEntry_t *)ptr);
2348 }
2349
2350
2351 /**
2352 * @brief Create new ConfigEntry
2353 *
2354 * @param name Config entry name
2355 * @param name_len Length of name, or -1 to use strlen()
2356 * @param value Config entry value, or NULL
2357 * @param value_len Length of value, or -1 to use strlen()
2358 */
2359 static rd_kafka_ConfigEntry_t *
rd_kafka_ConfigEntry_new0(const char * name,size_t name_len,const char * value,size_t value_len)2360 rd_kafka_ConfigEntry_new0 (const char *name, size_t name_len,
2361 const char *value, size_t value_len) {
2362 rd_kafka_ConfigEntry_t *entry;
2363
2364 if (!name)
2365 return NULL;
2366
2367 entry = rd_calloc(1, sizeof(*entry));
2368 entry->kv = rd_strtup_new0(name, name_len, value, value_len);
2369
2370 rd_list_init(&entry->synonyms, 0, rd_kafka_ConfigEntry_free);
2371
2372 entry->a.source = RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG;
2373
2374 return entry;
2375 }
2376
2377 /**
2378 * @sa rd_kafka_ConfigEntry_new0
2379 */
2380 static rd_kafka_ConfigEntry_t *
rd_kafka_ConfigEntry_new(const char * name,const char * value)2381 rd_kafka_ConfigEntry_new (const char *name, const char *value) {
2382 return rd_kafka_ConfigEntry_new0(name, -1, value, -1);
2383 }
2384
2385
2386
2387
2388 /**
2389 * @brief Allocate a new AlterConfigs and make a copy of \p src
2390 */
2391 static rd_kafka_ConfigEntry_t *
rd_kafka_ConfigEntry_copy(const rd_kafka_ConfigEntry_t * src)2392 rd_kafka_ConfigEntry_copy (const rd_kafka_ConfigEntry_t *src) {
2393 rd_kafka_ConfigEntry_t *dst;
2394
2395 dst = rd_kafka_ConfigEntry_new(src->kv->name, src->kv->value);
2396 dst->a = src->a;
2397
2398 rd_list_destroy(&dst->synonyms); /* created in .._new() */
2399 rd_list_init_copy(&dst->synonyms, &src->synonyms);
2400 rd_list_copy_to(&dst->synonyms, &src->synonyms,
2401 rd_kafka_ConfigEntry_list_copy, NULL);
2402
2403 return dst;
2404 }
2405
rd_kafka_ConfigEntry_list_copy(const void * src,void * opaque)2406 static void *rd_kafka_ConfigEntry_list_copy (const void *src, void *opaque) {
2407 return rd_kafka_ConfigEntry_copy((const rd_kafka_ConfigEntry_t *)src);
2408 }
2409
2410
rd_kafka_ConfigEntry_name(const rd_kafka_ConfigEntry_t * entry)2411 const char *rd_kafka_ConfigEntry_name (const rd_kafka_ConfigEntry_t *entry) {
2412 return entry->kv->name;
2413 }
2414
2415 const char *
rd_kafka_ConfigEntry_value(const rd_kafka_ConfigEntry_t * entry)2416 rd_kafka_ConfigEntry_value (const rd_kafka_ConfigEntry_t *entry) {
2417 return entry->kv->value;
2418 }
2419
2420 rd_kafka_ConfigSource_t
rd_kafka_ConfigEntry_source(const rd_kafka_ConfigEntry_t * entry)2421 rd_kafka_ConfigEntry_source (const rd_kafka_ConfigEntry_t *entry) {
2422 return entry->a.source;
2423 }
2424
rd_kafka_ConfigEntry_is_read_only(const rd_kafka_ConfigEntry_t * entry)2425 int rd_kafka_ConfigEntry_is_read_only (const rd_kafka_ConfigEntry_t *entry) {
2426 return entry->a.is_readonly;
2427 }
2428
rd_kafka_ConfigEntry_is_default(const rd_kafka_ConfigEntry_t * entry)2429 int rd_kafka_ConfigEntry_is_default (const rd_kafka_ConfigEntry_t *entry) {
2430 return entry->a.is_default;
2431 }
2432
rd_kafka_ConfigEntry_is_sensitive(const rd_kafka_ConfigEntry_t * entry)2433 int rd_kafka_ConfigEntry_is_sensitive (const rd_kafka_ConfigEntry_t *entry) {
2434 return entry->a.is_sensitive;
2435 }
2436
rd_kafka_ConfigEntry_is_synonym(const rd_kafka_ConfigEntry_t * entry)2437 int rd_kafka_ConfigEntry_is_synonym (const rd_kafka_ConfigEntry_t *entry) {
2438 return entry->a.is_synonym;
2439 }
2440
2441 const rd_kafka_ConfigEntry_t **
rd_kafka_ConfigEntry_synonyms(const rd_kafka_ConfigEntry_t * entry,size_t * cntp)2442 rd_kafka_ConfigEntry_synonyms (const rd_kafka_ConfigEntry_t *entry,
2443 size_t *cntp) {
2444 *cntp = rd_list_cnt(&entry->synonyms);
2445 if (!*cntp)
2446 return NULL;
2447 return (const rd_kafka_ConfigEntry_t **)entry->synonyms.rl_elems;
2448
2449 }
2450
2451
2452 /**@}*/
2453
2454
2455
2456 /**
2457 * @name ConfigSource
2458 * @{
2459 *
2460 *
2461 *
2462 */
2463
2464 const char *
rd_kafka_ConfigSource_name(rd_kafka_ConfigSource_t confsource)2465 rd_kafka_ConfigSource_name (rd_kafka_ConfigSource_t confsource) {
2466 static const char *names[] = {
2467 "UNKNOWN_CONFIG",
2468 "DYNAMIC_TOPIC_CONFIG",
2469 "DYNAMIC_BROKER_CONFIG",
2470 "DYNAMIC_DEFAULT_BROKER_CONFIG",
2471 "STATIC_BROKER_CONFIG",
2472 "DEFAULT_CONFIG",
2473 };
2474
2475 if ((unsigned int)confsource >=
2476 (unsigned int)RD_KAFKA_CONFIG_SOURCE__CNT)
2477 return "UNSUPPORTED";
2478
2479 return names[confsource];
2480 }
2481
2482 /**@}*/
2483
2484
2485
2486 /**
2487 * @name ConfigResource
2488 * @{
2489 *
2490 *
2491 *
2492 */
2493
2494 const char *
rd_kafka_ResourceType_name(rd_kafka_ResourceType_t restype)2495 rd_kafka_ResourceType_name (rd_kafka_ResourceType_t restype) {
2496 static const char *names[] = {
2497 "UNKNOWN",
2498 "ANY",
2499 "TOPIC",
2500 "GROUP",
2501 "BROKER",
2502 };
2503
2504 if ((unsigned int)restype >=
2505 (unsigned int)RD_KAFKA_RESOURCE__CNT)
2506 return "UNSUPPORTED";
2507
2508 return names[restype];
2509 }
2510
2511
2512 rd_kafka_ConfigResource_t *
rd_kafka_ConfigResource_new(rd_kafka_ResourceType_t restype,const char * resname)2513 rd_kafka_ConfigResource_new (rd_kafka_ResourceType_t restype,
2514 const char *resname) {
2515 rd_kafka_ConfigResource_t *config;
2516 size_t namesz = resname ? strlen(resname) : 0;
2517
2518 if (!namesz || (int)restype < 0)
2519 return NULL;
2520
2521 config = rd_calloc(1, sizeof(*config) + namesz + 1);
2522 config->name = config->data;
2523 memcpy(config->name, resname, namesz + 1);
2524 config->restype = restype;
2525
2526 rd_list_init(&config->config, 8, rd_kafka_ConfigEntry_free);
2527
2528 return config;
2529 }
2530
rd_kafka_ConfigResource_destroy(rd_kafka_ConfigResource_t * config)2531 void rd_kafka_ConfigResource_destroy (rd_kafka_ConfigResource_t *config) {
2532 rd_list_destroy(&config->config);
2533 if (config->errstr)
2534 rd_free(config->errstr);
2535 rd_free(config);
2536 }
2537
rd_kafka_ConfigResource_free(void * ptr)2538 static void rd_kafka_ConfigResource_free (void *ptr) {
2539 rd_kafka_ConfigResource_destroy((rd_kafka_ConfigResource_t *)ptr);
2540 }
2541
2542
rd_kafka_ConfigResource_destroy_array(rd_kafka_ConfigResource_t ** config,size_t config_cnt)2543 void rd_kafka_ConfigResource_destroy_array (rd_kafka_ConfigResource_t **config,
2544 size_t config_cnt) {
2545 size_t i;
2546 for (i = 0 ; i < config_cnt ; i++)
2547 rd_kafka_ConfigResource_destroy(config[i]);
2548 }
2549
2550
2551 /**
2552 * @brief Type and name comparator for ConfigResource_t
2553 */
rd_kafka_ConfigResource_cmp(const void * _a,const void * _b)2554 static int rd_kafka_ConfigResource_cmp (const void *_a, const void *_b) {
2555 const rd_kafka_ConfigResource_t *a = _a, *b = _b;
2556 int r = RD_CMP(a->restype, b->restype);
2557 if (r)
2558 return r;
2559 return strcmp(a->name, b->name);
2560 }
2561
2562 /**
2563 * @brief Allocate a new AlterConfigs and make a copy of \p src
2564 */
2565 static rd_kafka_ConfigResource_t *
rd_kafka_ConfigResource_copy(const rd_kafka_ConfigResource_t * src)2566 rd_kafka_ConfigResource_copy (const rd_kafka_ConfigResource_t *src) {
2567 rd_kafka_ConfigResource_t *dst;
2568
2569 dst = rd_kafka_ConfigResource_new(src->restype, src->name);
2570
2571 rd_list_destroy(&dst->config); /* created in .._new() */
2572 rd_list_init_copy(&dst->config, &src->config);
2573 rd_list_copy_to(&dst->config, &src->config,
2574 rd_kafka_ConfigEntry_list_copy, NULL);
2575
2576 return dst;
2577 }
2578
2579
2580 static void
rd_kafka_ConfigResource_add_ConfigEntry(rd_kafka_ConfigResource_t * config,rd_kafka_ConfigEntry_t * entry)2581 rd_kafka_ConfigResource_add_ConfigEntry (rd_kafka_ConfigResource_t *config,
2582 rd_kafka_ConfigEntry_t *entry) {
2583 rd_list_add(&config->config, entry);
2584 }
2585
2586
2587 rd_kafka_resp_err_t
rd_kafka_ConfigResource_add_config(rd_kafka_ConfigResource_t * config,const char * name,const char * value)2588 rd_kafka_ConfigResource_add_config (rd_kafka_ConfigResource_t *config,
2589 const char *name, const char *value) {
2590 if (!name || !*name || !value)
2591 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2592
2593 return rd_kafka_admin_add_config0(&config->config, name, value,
2594 RD_KAFKA_ALTER_OP_ADD);
2595 }
2596
2597 rd_kafka_resp_err_t
rd_kafka_ConfigResource_set_config(rd_kafka_ConfigResource_t * config,const char * name,const char * value)2598 rd_kafka_ConfigResource_set_config (rd_kafka_ConfigResource_t *config,
2599 const char *name, const char *value) {
2600 if (!name || !*name || !value)
2601 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2602
2603 return rd_kafka_admin_add_config0(&config->config, name, value,
2604 RD_KAFKA_ALTER_OP_SET);
2605 }
2606
2607 rd_kafka_resp_err_t
rd_kafka_ConfigResource_delete_config(rd_kafka_ConfigResource_t * config,const char * name)2608 rd_kafka_ConfigResource_delete_config (rd_kafka_ConfigResource_t *config,
2609 const char *name) {
2610 if (!name || !*name)
2611 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2612
2613 return rd_kafka_admin_add_config0(&config->config, name, NULL,
2614 RD_KAFKA_ALTER_OP_DELETE);
2615 }
2616
2617
2618 const rd_kafka_ConfigEntry_t **
rd_kafka_ConfigResource_configs(const rd_kafka_ConfigResource_t * config,size_t * cntp)2619 rd_kafka_ConfigResource_configs (const rd_kafka_ConfigResource_t *config,
2620 size_t *cntp) {
2621 *cntp = rd_list_cnt(&config->config);
2622 if (!*cntp)
2623 return NULL;
2624 return (const rd_kafka_ConfigEntry_t **)config->config.rl_elems;
2625 }
2626
2627
2628
2629
2630 rd_kafka_ResourceType_t
rd_kafka_ConfigResource_type(const rd_kafka_ConfigResource_t * config)2631 rd_kafka_ConfigResource_type (const rd_kafka_ConfigResource_t *config) {
2632 return config->restype;
2633 }
2634
2635 const char *
rd_kafka_ConfigResource_name(const rd_kafka_ConfigResource_t * config)2636 rd_kafka_ConfigResource_name (const rd_kafka_ConfigResource_t *config) {
2637 return config->name;
2638 }
2639
2640 rd_kafka_resp_err_t
rd_kafka_ConfigResource_error(const rd_kafka_ConfigResource_t * config)2641 rd_kafka_ConfigResource_error (const rd_kafka_ConfigResource_t *config) {
2642 return config->err;
2643 }
2644
2645 const char *
rd_kafka_ConfigResource_error_string(const rd_kafka_ConfigResource_t * config)2646 rd_kafka_ConfigResource_error_string (const rd_kafka_ConfigResource_t *config) {
2647 if (!config->err)
2648 return NULL;
2649 if (config->errstr)
2650 return config->errstr;
2651 return rd_kafka_err2str(config->err);
2652 }
2653
2654
2655 /**
2656 * @brief Look in the provided ConfigResource_t* list for a resource of
2657 * type BROKER and set its broker id in \p broker_id, returning
2658 * RD_KAFKA_RESP_ERR_NO_ERROR.
2659 *
2660 * If multiple BROKER resources are found RD_KAFKA_RESP_ERR__CONFLICT
2661 * is returned and an error string is written to errstr.
2662 *
2663 * If no BROKER resources are found RD_KAFKA_RESP_ERR_NO_ERROR
2664 * is returned and \p broker_idp is set to use the coordinator.
2665 */
2666 static rd_kafka_resp_err_t
rd_kafka_ConfigResource_get_single_broker_id(const rd_list_t * configs,int32_t * broker_idp,char * errstr,size_t errstr_size)2667 rd_kafka_ConfigResource_get_single_broker_id (const rd_list_t *configs,
2668 int32_t *broker_idp,
2669 char *errstr,
2670 size_t errstr_size) {
2671 const rd_kafka_ConfigResource_t *config;
2672 int i;
2673 int32_t broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; /* Some default
2674 * value that we
2675 * can compare
2676 * to below */
2677
2678 RD_LIST_FOREACH(config, configs, i) {
2679 char *endptr;
2680 long int r;
2681
2682 if (config->restype != RD_KAFKA_RESOURCE_BROKER)
2683 continue;
2684
2685 if (broker_id != RD_KAFKA_ADMIN_TARGET_CONTROLLER) {
2686 rd_snprintf(errstr, errstr_size,
2687 "Only one ConfigResource of type BROKER "
2688 "is allowed per call");
2689 return RD_KAFKA_RESP_ERR__CONFLICT;
2690 }
2691
2692 /* Convert string broker-id to int32 */
2693 r = (int32_t)strtol(config->name, &endptr, 10);
2694 if (r == LONG_MIN || r == LONG_MAX || config->name == endptr ||
2695 r < 0) {
2696 rd_snprintf(errstr, errstr_size,
2697 "Expected an int32 broker_id for "
2698 "ConfigResource(type=BROKER, name=%s)",
2699 config->name);
2700 return RD_KAFKA_RESP_ERR__INVALID_ARG;
2701 }
2702
2703 broker_id = r;
2704
2705 /* Keep scanning to make sure there are no duplicate
2706 * BROKER resources. */
2707 }
2708
2709 *broker_idp = broker_id;
2710
2711 return RD_KAFKA_RESP_ERR_NO_ERROR;
2712 }
2713
2714
2715 /**@}*/
2716
2717
2718
2719 /**
2720 * @name AlterConfigs
2721 * @{
2722 *
2723 *
2724 *
2725 */
2726
2727
2728
2729 /**
2730 * @brief Parse AlterConfigsResponse and create ADMIN_RESULT op.
2731 */
2732 static rd_kafka_resp_err_t
rd_kafka_AlterConfigsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)2733 rd_kafka_AlterConfigsResponse_parse (rd_kafka_op_t *rko_req,
2734 rd_kafka_op_t **rko_resultp,
2735 rd_kafka_buf_t *reply,
2736 char *errstr, size_t errstr_size) {
2737 const int log_decode_errors = LOG_ERR;
2738 rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
2739 rd_kafka_t *rk = rkb->rkb_rk;
2740 rd_kafka_op_t *rko_result = NULL;
2741 int32_t res_cnt;
2742 int i;
2743 int32_t Throttle_Time;
2744
2745 rd_kafka_buf_read_i32(reply, &Throttle_Time);
2746 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
2747
2748 rd_kafka_buf_read_i32(reply, &res_cnt);
2749
2750 if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) {
2751 rd_snprintf(errstr, errstr_size,
2752 "Received %"PRId32" ConfigResources in response "
2753 "when only %d were requested", res_cnt,
2754 rd_list_cnt(&rko_req->rko_u.admin_request.args));
2755 return RD_KAFKA_RESP_ERR__BAD_MSG;
2756 }
2757
2758 rko_result = rd_kafka_admin_result_new(rko_req);
2759
2760 rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt,
2761 rd_kafka_ConfigResource_free);
2762
2763 for (i = 0 ; i < (int)res_cnt ; i++) {
2764 int16_t error_code;
2765 rd_kafkap_str_t error_msg;
2766 int8_t res_type;
2767 rd_kafkap_str_t kres_name;
2768 char *res_name;
2769 char *this_errstr = NULL;
2770 rd_kafka_ConfigResource_t *config;
2771 rd_kafka_ConfigResource_t skel;
2772 int orig_pos;
2773
2774 rd_kafka_buf_read_i16(reply, &error_code);
2775 rd_kafka_buf_read_str(reply, &error_msg);
2776 rd_kafka_buf_read_i8(reply, &res_type);
2777 rd_kafka_buf_read_str(reply, &kres_name);
2778 RD_KAFKAP_STR_DUPA(&res_name, &kres_name);
2779
2780 if (error_code) {
2781 if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
2782 RD_KAFKAP_STR_LEN(&error_msg) == 0)
2783 this_errstr =
2784 (char *)rd_kafka_err2str(error_code);
2785 else
2786 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
2787 }
2788
2789 config = rd_kafka_ConfigResource_new(res_type, res_name);
2790 if (!config) {
2791 rd_kafka_log(rko_req->rko_rk, LOG_ERR,
2792 "ADMIN", "AlterConfigs returned "
2793 "unsupported ConfigResource #%d with "
2794 "type %d and name \"%s\": ignoring",
2795 i, res_type, res_name);
2796 continue;
2797 }
2798
2799 config->err = error_code;
2800 if (this_errstr)
2801 config->errstr = rd_strdup(this_errstr);
2802
2803 /* As a convenience to the application we insert result
2804 * in the same order as they were requested. The broker
2805 * does not maintain ordering unfortunately. */
2806 skel.restype = config->restype;
2807 skel.name = config->name;
2808 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
2809 &skel, rd_kafka_ConfigResource_cmp);
2810 if (orig_pos == -1) {
2811 rd_kafka_ConfigResource_destroy(config);
2812 rd_kafka_buf_parse_fail(
2813 reply,
2814 "Broker returned ConfigResource %d,%s "
2815 "that was not "
2816 "included in the original request",
2817 res_type, res_name);
2818 }
2819
2820 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
2821 orig_pos) != NULL) {
2822 rd_kafka_ConfigResource_destroy(config);
2823 rd_kafka_buf_parse_fail(
2824 reply,
2825 "Broker returned ConfigResource %d,%s "
2826 "multiple times",
2827 res_type, res_name);
2828 }
2829
2830 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
2831 config);
2832 }
2833
2834 *rko_resultp = rko_result;
2835
2836 return RD_KAFKA_RESP_ERR_NO_ERROR;
2837
2838 err_parse:
2839 if (rko_result)
2840 rd_kafka_op_destroy(rko_result);
2841
2842 rd_snprintf(errstr, errstr_size,
2843 "AlterConfigs response protocol parse failure: %s",
2844 rd_kafka_err2str(reply->rkbuf_err));
2845
2846 return reply->rkbuf_err;
2847 }
2848
2849
2850
2851
rd_kafka_AlterConfigs(rd_kafka_t * rk,rd_kafka_ConfigResource_t ** configs,size_t config_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)2852 void rd_kafka_AlterConfigs (rd_kafka_t *rk,
2853 rd_kafka_ConfigResource_t **configs,
2854 size_t config_cnt,
2855 const rd_kafka_AdminOptions_t *options,
2856 rd_kafka_queue_t *rkqu) {
2857 rd_kafka_op_t *rko;
2858 size_t i;
2859 rd_kafka_resp_err_t err;
2860 char errstr[256];
2861 static const struct rd_kafka_admin_worker_cbs cbs = {
2862 rd_kafka_AlterConfigsRequest,
2863 rd_kafka_AlterConfigsResponse_parse,
2864 };
2865
2866 rd_assert(rkqu);
2867
2868 rko = rd_kafka_admin_request_op_new(
2869 rk,
2870 RD_KAFKA_OP_ALTERCONFIGS,
2871 RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
2872 &cbs, options, rkqu->rkqu_q);
2873
2874 rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt,
2875 rd_kafka_ConfigResource_free);
2876
2877 for (i = 0 ; i < config_cnt ; i++)
2878 rd_list_add(&rko->rko_u.admin_request.args,
2879 rd_kafka_ConfigResource_copy(configs[i]));
2880
2881 /* If there's a BROKER resource in the list we need to
2882 * speak directly to that broker rather than the controller.
2883 *
2884 * Multiple BROKER resources are not allowed.
2885 */
2886 err = rd_kafka_ConfigResource_get_single_broker_id(
2887 &rko->rko_u.admin_request.args,
2888 &rko->rko_u.admin_request.broker_id,
2889 errstr, sizeof(errstr));
2890 if (err) {
2891 rd_kafka_admin_result_fail(rko, err, "%s", errstr);
2892 rd_kafka_admin_common_worker_destroy(rk, rko,
2893 rd_true/*destroy*/);
2894 return;
2895 }
2896
2897 rd_kafka_q_enq(rk->rk_ops, rko);
2898 }
2899
2900
2901 const rd_kafka_ConfigResource_t **
rd_kafka_AlterConfigs_result_resources(const rd_kafka_AlterConfigs_result_t * result,size_t * cntp)2902 rd_kafka_AlterConfigs_result_resources (
2903 const rd_kafka_AlterConfigs_result_t *result,
2904 size_t *cntp) {
2905 return rd_kafka_admin_result_ret_resources(
2906 (const rd_kafka_op_t *)result, cntp);
2907 }
2908
2909 /**@}*/
2910
2911
2912
2913
2914 /**
2915 * @name DescribeConfigs
2916 * @{
2917 *
2918 *
2919 *
2920 */
2921
2922
2923 /**
2924 * @brief Parse DescribeConfigsResponse and create ADMIN_RESULT op.
2925 */
2926 static rd_kafka_resp_err_t
rd_kafka_DescribeConfigsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)2927 rd_kafka_DescribeConfigsResponse_parse (rd_kafka_op_t *rko_req,
2928 rd_kafka_op_t **rko_resultp,
2929 rd_kafka_buf_t *reply,
2930 char *errstr, size_t errstr_size) {
2931 const int log_decode_errors = LOG_ERR;
2932 rd_kafka_broker_t *rkb = reply->rkbuf_rkb;
2933 rd_kafka_t *rk = rkb->rkb_rk;
2934 rd_kafka_op_t *rko_result = NULL;
2935 int32_t res_cnt;
2936 int i;
2937 int32_t Throttle_Time;
2938 rd_kafka_ConfigResource_t *config = NULL;
2939 rd_kafka_ConfigEntry_t *entry = NULL;
2940
2941 rd_kafka_buf_read_i32(reply, &Throttle_Time);
2942 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
2943
2944 /* #resources */
2945 rd_kafka_buf_read_i32(reply, &res_cnt);
2946
2947 if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
2948 rd_kafka_buf_parse_fail(
2949 reply,
2950 "Received %"PRId32" ConfigResources in response "
2951 "when only %d were requested", res_cnt,
2952 rd_list_cnt(&rko_req->rko_u.admin_request.args));
2953
2954 rko_result = rd_kafka_admin_result_new(rko_req);
2955
2956 rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt,
2957 rd_kafka_ConfigResource_free);
2958
2959 for (i = 0 ; i < (int)res_cnt ; i++) {
2960 int16_t error_code;
2961 rd_kafkap_str_t error_msg;
2962 int8_t res_type;
2963 rd_kafkap_str_t kres_name;
2964 char *res_name;
2965 char *this_errstr = NULL;
2966 rd_kafka_ConfigResource_t skel;
2967 int orig_pos;
2968 int32_t entry_cnt;
2969 int ci;
2970
2971 rd_kafka_buf_read_i16(reply, &error_code);
2972 rd_kafka_buf_read_str(reply, &error_msg);
2973 rd_kafka_buf_read_i8(reply, &res_type);
2974 rd_kafka_buf_read_str(reply, &kres_name);
2975 RD_KAFKAP_STR_DUPA(&res_name, &kres_name);
2976
2977 if (error_code) {
2978 if (RD_KAFKAP_STR_IS_NULL(&error_msg) ||
2979 RD_KAFKAP_STR_LEN(&error_msg) == 0)
2980 this_errstr =
2981 (char *)rd_kafka_err2str(error_code);
2982 else
2983 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg);
2984 }
2985
2986 config = rd_kafka_ConfigResource_new(res_type, res_name);
2987 if (!config) {
2988 rd_kafka_log(rko_req->rko_rk, LOG_ERR,
2989 "ADMIN", "DescribeConfigs returned "
2990 "unsupported ConfigResource #%d with "
2991 "type %d and name \"%s\": ignoring",
2992 i, res_type, res_name);
2993 continue;
2994 }
2995
2996 config->err = error_code;
2997 if (this_errstr)
2998 config->errstr = rd_strdup(this_errstr);
2999
3000 /* #config_entries */
3001 rd_kafka_buf_read_i32(reply, &entry_cnt);
3002
3003 for (ci = 0 ; ci < (int)entry_cnt ; ci++) {
3004 rd_kafkap_str_t config_name, config_value;
3005 int32_t syn_cnt;
3006 int si;
3007
3008 rd_kafka_buf_read_str(reply, &config_name);
3009 rd_kafka_buf_read_str(reply, &config_value);
3010
3011 entry = rd_kafka_ConfigEntry_new0(
3012 config_name.str,
3013 RD_KAFKAP_STR_LEN(&config_name),
3014 config_value.str,
3015 RD_KAFKAP_STR_LEN(&config_value));
3016
3017 rd_kafka_buf_read_bool(reply, &entry->a.is_readonly);
3018
3019 /* ApiVersion 0 has is_default field, while
3020 * ApiVersion 1 has source field.
3021 * Convert between the two so they look the same
3022 * to the caller. */
3023 if (rd_kafka_buf_ApiVersion(reply) == 0) {
3024 rd_kafka_buf_read_bool(reply,
3025 &entry->a.is_default);
3026 if (entry->a.is_default)
3027 entry->a.source =
3028 RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG;
3029 } else {
3030 int8_t config_source;
3031 rd_kafka_buf_read_i8(reply, &config_source);
3032 entry->a.source = config_source;
3033
3034 if (entry->a.source ==
3035 RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
3036 entry->a.is_default = 1;
3037
3038 }
3039
3040 rd_kafka_buf_read_bool(reply, &entry->a.is_sensitive);
3041
3042
3043 if (rd_kafka_buf_ApiVersion(reply) == 1) {
3044 /* #config_synonyms (ApiVersion 1) */
3045 rd_kafka_buf_read_i32(reply, &syn_cnt);
3046
3047 if (syn_cnt > 100000)
3048 rd_kafka_buf_parse_fail(
3049 reply,
3050 "Broker returned %"PRId32
3051 " config synonyms for "
3052 "ConfigResource %d,%s: "
3053 "limit is 100000",
3054 syn_cnt,
3055 config->restype,
3056 config->name);
3057
3058 if (syn_cnt > 0)
3059 rd_list_grow(&entry->synonyms, syn_cnt);
3060
3061 } else {
3062 /* No synonyms in ApiVersion 0 */
3063 syn_cnt = 0;
3064 }
3065
3066
3067
3068 /* Read synonyms (ApiVersion 1) */
3069 for (si = 0 ; si < (int)syn_cnt ; si++) {
3070 rd_kafkap_str_t syn_name, syn_value;
3071 int8_t syn_source;
3072 rd_kafka_ConfigEntry_t *syn_entry;
3073
3074 rd_kafka_buf_read_str(reply, &syn_name);
3075 rd_kafka_buf_read_str(reply, &syn_value);
3076 rd_kafka_buf_read_i8(reply, &syn_source);
3077
3078 syn_entry = rd_kafka_ConfigEntry_new0(
3079 syn_name.str,
3080 RD_KAFKAP_STR_LEN(&syn_name),
3081 syn_value.str,
3082 RD_KAFKAP_STR_LEN(&syn_value));
3083 if (!syn_entry)
3084 rd_kafka_buf_parse_fail(
3085 reply,
3086 "Broker returned invalid "
3087 "synonym #%d "
3088 "for ConfigEntry #%d (%s) "
3089 "and ConfigResource %d,%s: "
3090 "syn_name.len %d, "
3091 "syn_value.len %d",
3092 si, ci, entry->kv->name,
3093 config->restype, config->name,
3094 (int)syn_name.len,
3095 (int)syn_value.len);
3096
3097 syn_entry->a.source = syn_source;
3098 syn_entry->a.is_synonym = 1;
3099
3100 rd_list_add(&entry->synonyms, syn_entry);
3101 }
3102
3103 rd_kafka_ConfigResource_add_ConfigEntry(
3104 config, entry);
3105 entry = NULL;
3106 }
3107
3108 /* As a convenience to the application we insert result
3109 * in the same order as they were requested. The broker
3110 * does not maintain ordering unfortunately. */
3111 skel.restype = config->restype;
3112 skel.name = config->name;
3113 orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args,
3114 &skel, rd_kafka_ConfigResource_cmp);
3115 if (orig_pos == -1)
3116 rd_kafka_buf_parse_fail(
3117 reply,
3118 "Broker returned ConfigResource %d,%s "
3119 "that was not "
3120 "included in the original request",
3121 res_type, res_name);
3122
3123 if (rd_list_elem(&rko_result->rko_u.admin_result.results,
3124 orig_pos) != NULL)
3125 rd_kafka_buf_parse_fail(
3126 reply,
3127 "Broker returned ConfigResource %d,%s "
3128 "multiple times",
3129 res_type, res_name);
3130
3131 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos,
3132 config);
3133 config = NULL;
3134 }
3135
3136 *rko_resultp = rko_result;
3137
3138 return RD_KAFKA_RESP_ERR_NO_ERROR;
3139
3140 err_parse:
3141 if (entry)
3142 rd_kafka_ConfigEntry_destroy(entry);
3143 if (config)
3144 rd_kafka_ConfigResource_destroy(config);
3145
3146 if (rko_result)
3147 rd_kafka_op_destroy(rko_result);
3148
3149 rd_snprintf(errstr, errstr_size,
3150 "DescribeConfigs response protocol parse failure: %s",
3151 rd_kafka_err2str(reply->rkbuf_err));
3152
3153 return reply->rkbuf_err;
3154 }
3155
3156
3157
rd_kafka_DescribeConfigs(rd_kafka_t * rk,rd_kafka_ConfigResource_t ** configs,size_t config_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)3158 void rd_kafka_DescribeConfigs (rd_kafka_t *rk,
3159 rd_kafka_ConfigResource_t **configs,
3160 size_t config_cnt,
3161 const rd_kafka_AdminOptions_t *options,
3162 rd_kafka_queue_t *rkqu) {
3163 rd_kafka_op_t *rko;
3164 size_t i;
3165 rd_kafka_resp_err_t err;
3166 char errstr[256];
3167 static const struct rd_kafka_admin_worker_cbs cbs = {
3168 rd_kafka_DescribeConfigsRequest,
3169 rd_kafka_DescribeConfigsResponse_parse,
3170 };
3171
3172 rd_assert(rkqu);
3173
3174 rko = rd_kafka_admin_request_op_new(
3175 rk,
3176 RD_KAFKA_OP_DESCRIBECONFIGS,
3177 RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT,
3178 &cbs, options, rkqu->rkqu_q);
3179
3180 rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt,
3181 rd_kafka_ConfigResource_free);
3182
3183 for (i = 0 ; i < config_cnt ; i++)
3184 rd_list_add(&rko->rko_u.admin_request.args,
3185 rd_kafka_ConfigResource_copy(configs[i]));
3186
3187 /* If there's a BROKER resource in the list we need to
3188 * speak directly to that broker rather than the controller.
3189 *
3190 * Multiple BROKER resources are not allowed.
3191 */
3192 err = rd_kafka_ConfigResource_get_single_broker_id(
3193 &rko->rko_u.admin_request.args,
3194 &rko->rko_u.admin_request.broker_id,
3195 errstr, sizeof(errstr));
3196 if (err) {
3197 rd_kafka_admin_result_fail(rko, err, "%s", errstr);
3198 rd_kafka_admin_common_worker_destroy(rk, rko,
3199 rd_true/*destroy*/);
3200 return;
3201 }
3202
3203 rd_kafka_q_enq(rk->rk_ops, rko);
3204 }
3205
3206
3207
3208
3209 const rd_kafka_ConfigResource_t **
rd_kafka_DescribeConfigs_result_resources(const rd_kafka_DescribeConfigs_result_t * result,size_t * cntp)3210 rd_kafka_DescribeConfigs_result_resources (
3211 const rd_kafka_DescribeConfigs_result_t *result,
3212 size_t *cntp) {
3213 return rd_kafka_admin_result_ret_resources(
3214 (const rd_kafka_op_t *)result, cntp);
3215 }
3216
3217 /**@}*/
3218
3219 /**
3220 * @name Delete Records
3221 * @{
3222 *
3223 *
3224 *
3225 *
3226 */
3227
3228 rd_kafka_DeleteRecords_t *
rd_kafka_DeleteRecords_new(const rd_kafka_topic_partition_list_t * before_offsets)3229 rd_kafka_DeleteRecords_new (const rd_kafka_topic_partition_list_t *
3230 before_offsets) {
3231 rd_kafka_DeleteRecords_t *del_records;
3232
3233 del_records = rd_calloc(1, sizeof(*del_records));
3234 del_records->offsets =
3235 rd_kafka_topic_partition_list_copy(before_offsets);
3236
3237 return del_records;
3238 }
3239
rd_kafka_DeleteRecords_destroy(rd_kafka_DeleteRecords_t * del_records)3240 void rd_kafka_DeleteRecords_destroy (rd_kafka_DeleteRecords_t *del_records) {
3241 rd_kafka_topic_partition_list_destroy(del_records->offsets);
3242 rd_free(del_records);
3243 }
3244
rd_kafka_DeleteRecords_destroy_array(rd_kafka_DeleteRecords_t ** del_records,size_t del_record_cnt)3245 void rd_kafka_DeleteRecords_destroy_array (rd_kafka_DeleteRecords_t **
3246 del_records,
3247 size_t del_record_cnt) {
3248 size_t i;
3249 for (i = 0 ; i < del_record_cnt ; i++)
3250 rd_kafka_DeleteRecords_destroy(del_records[i]);
3251 }
3252
3253
3254
3255 /** @brief Merge the DeleteRecords response from a single broker
3256 * into the user response list.
3257 */
3258 static void
rd_kafka_DeleteRecords_response_merge(rd_kafka_op_t * rko_fanout,const rd_kafka_op_t * rko_partial)3259 rd_kafka_DeleteRecords_response_merge (rd_kafka_op_t *rko_fanout,
3260 const rd_kafka_op_t *rko_partial) {
3261 rd_kafka_t *rk = rko_fanout->rko_rk;
3262 const rd_kafka_topic_partition_list_t *partitions;
3263 rd_kafka_topic_partition_list_t *respartitions;
3264 const rd_kafka_topic_partition_t *partition;
3265
3266 rd_assert(rko_partial->rko_evtype ==
3267 RD_KAFKA_EVENT_DELETERECORDS_RESULT);
3268
3269 /* All partitions (offsets) from the DeleteRecords() call */
3270 respartitions = rd_list_elem(&rko_fanout->rko_u.admin_request.
3271 fanout.results, 0);
3272
3273 if (rko_partial->rko_err) {
3274 /* If there was a request-level error, set the error on
3275 * all requested partitions for this request. */
3276 const rd_kafka_topic_partition_list_t *reqpartitions;
3277 rd_kafka_topic_partition_t *reqpartition;
3278
3279 /* Partitions (offsets) from this DeleteRecordsRequest */
3280 reqpartitions = rd_list_elem(&rko_partial->rko_u.
3281 admin_result.args, 0);
3282
3283 RD_KAFKA_TPLIST_FOREACH(reqpartition, reqpartitions) {
3284 rd_kafka_topic_partition_t *respart;
3285
3286 /* Find result partition */
3287 respart = rd_kafka_topic_partition_list_find(
3288 respartitions,
3289 reqpartition->topic,
3290 reqpartition->partition);
3291
3292 rd_assert(respart || !*"respart not found");
3293
3294 respart->err = rko_partial->rko_err;
3295 }
3296
3297 return;
3298 }
3299
3300 /* Partitions from the DeleteRecordsResponse */
3301 partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0);
3302
3303 RD_KAFKA_TPLIST_FOREACH(partition, partitions) {
3304 rd_kafka_topic_partition_t *respart;
3305
3306
3307 /* Find result partition */
3308 respart = rd_kafka_topic_partition_list_find(
3309 respartitions,
3310 partition->topic,
3311 partition->partition);
3312 if (unlikely(!respart)) {
3313 rd_dassert(!*"partition not found");
3314
3315 rd_kafka_log(rk, LOG_WARNING, "DELETERECORDS",
3316 "DeleteRecords response contains "
3317 "unexpected %s [%"PRId32"] which "
3318 "was not in the request list: ignored",
3319 partition->topic, partition->partition);
3320 continue;
3321 }
3322
3323 respart->offset = partition->offset;
3324 respart->err = partition->err;
3325 }
3326 }
3327
3328
3329
3330 /**
3331 * @brief Parse DeleteRecordsResponse and create ADMIN_RESULT op.
3332 */
3333 static rd_kafka_resp_err_t
rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)3334 rd_kafka_DeleteRecordsResponse_parse (rd_kafka_op_t *rko_req,
3335 rd_kafka_op_t **rko_resultp,
3336 rd_kafka_buf_t *reply,
3337 char *errstr, size_t errstr_size) {
3338 const int log_decode_errors = LOG_ERR;
3339 rd_kafka_op_t *rko_result;
3340 rd_kafka_topic_partition_list_t *offsets;
3341
3342 rd_kafka_buf_read_throttle_time(reply);
3343
3344 offsets = rd_kafka_buf_read_topic_partitions(reply, 0,
3345 rd_true/*read_offset*/,
3346 rd_true/*read_part_errs*/);
3347 if (!offsets)
3348 rd_kafka_buf_parse_fail(reply,
3349 "Failed to parse topic partitions");
3350
3351
3352 rko_result = rd_kafka_admin_result_new(rko_req);
3353 rd_list_init(&rko_result->rko_u.admin_result.results, 1,
3354 rd_kafka_topic_partition_list_destroy_free);
3355 rd_list_add(&rko_result->rko_u.admin_result.results, offsets);
3356 *rko_resultp = rko_result;
3357 return RD_KAFKA_RESP_ERR_NO_ERROR;
3358
3359 err_parse:
3360 rd_snprintf(errstr, errstr_size,
3361 "DeleteRecords response protocol parse failure: %s",
3362 rd_kafka_err2str(reply->rkbuf_err));
3363
3364 return reply->rkbuf_err;
3365 }
3366
3367
3368 /**
3369 * @brief Call when leaders have been queried to progress the DeleteRecords
3370 * admin op to its next phase, sending DeleteRecords to partition
3371 * leaders.
3372 *
3373 * @param rko Reply op (RD_KAFKA_OP_LEADERS).
3374 */
3375 static rd_kafka_op_res_t
rd_kafka_DeleteRecords_leaders_queried_cb(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * reply)3376 rd_kafka_DeleteRecords_leaders_queried_cb (rd_kafka_t *rk,
3377 rd_kafka_q_t *rkq,
3378 rd_kafka_op_t *reply) {
3379 rd_kafka_resp_err_t err = reply->rko_err;
3380 const rd_list_t *leaders =
3381 reply->rko_u.leaders.leaders; /* Possibly NULL (on err) */
3382 rd_kafka_topic_partition_list_t *partitions =
3383 reply->rko_u.leaders.partitions; /* Possibly NULL (on err) */
3384 rd_kafka_op_t *rko_fanout = reply->rko_u.leaders.opaque;
3385 rd_kafka_topic_partition_t *rktpar;
3386 rd_kafka_topic_partition_list_t *offsets;
3387 const struct rd_kafka_partition_leader *leader;
3388 static const struct rd_kafka_admin_worker_cbs cbs = {
3389 rd_kafka_DeleteRecordsRequest,
3390 rd_kafka_DeleteRecordsResponse_parse,
3391 };
3392 int i;
3393
3394 rd_assert((rko_fanout->rko_type & ~RD_KAFKA_OP_FLAGMASK) ==
3395 RD_KAFKA_OP_ADMIN_FANOUT);
3396
3397 if (err == RD_KAFKA_RESP_ERR__DESTROY)
3398 goto err;
3399
3400 /* Requested offsets */
3401 offsets = rd_list_elem(&rko_fanout->rko_u.admin_request.args, 0);
3402
3403 /* Update the error field of each partition from the
3404 * leader-queried partition list so that ERR_UNKNOWN_TOPIC_OR_PART
3405 * and similar are propagated, since those partitions are not
3406 * included in the leaders list. */
3407 RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
3408 rd_kafka_topic_partition_t *rktpar2;
3409
3410 if (!rktpar->err)
3411 continue;
3412
3413 rktpar2 = rd_kafka_topic_partition_list_find(
3414 offsets, rktpar->topic, rktpar->partition);
3415 rd_assert(rktpar2);
3416 rktpar2->err = rktpar->err;
3417 }
3418
3419
3420 if (err) {
3421 err:
3422 rd_kafka_admin_result_fail(
3423 rko_fanout,
3424 err,
3425 "Failed to query partition leaders: %s",
3426 err == RD_KAFKA_RESP_ERR__NOENT ?
3427 "No leaders found" : rd_kafka_err2str(err));
3428 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3429 rd_true/*destroy*/);
3430 return RD_KAFKA_OP_RES_HANDLED;
3431 }
3432
3433 /* The response lists is one element deep and that element is a
3434 * rd_kafka_topic_partition_list_t with the results of the deletes. */
3435 rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, 1,
3436 rd_kafka_topic_partition_list_destroy_free);
3437 rd_list_add(&rko_fanout->rko_u.admin_request.fanout.results,
3438 rd_kafka_topic_partition_list_copy(offsets));
3439
3440 rko_fanout->rko_u.admin_request.fanout.outstanding =
3441 rd_list_cnt(leaders);
3442
3443 rd_assert(rd_list_cnt(leaders) > 0);
3444
3445 /* For each leader send a request for its partitions */
3446 RD_LIST_FOREACH(leader, leaders, i) {
3447 rd_kafka_op_t *rko =
3448 rd_kafka_admin_request_op_new(
3449 rk,
3450 RD_KAFKA_OP_DELETERECORDS,
3451 RD_KAFKA_EVENT_DELETERECORDS_RESULT,
3452 &cbs, &rko_fanout->rko_u.admin_request.options,
3453 rk->rk_ops);
3454 rko->rko_u.admin_request.fanout_parent = rko_fanout;
3455 rko->rko_u.admin_request.broker_id = leader->rkb->rkb_nodeid;
3456
3457 rd_kafka_topic_partition_list_sort_by_topic(leader->partitions);
3458
3459 rd_list_init(&rko->rko_u.admin_request.args, 1,
3460 rd_kafka_topic_partition_list_destroy_free);
3461 rd_list_add(&rko->rko_u.admin_request.args,
3462 rd_kafka_topic_partition_list_copy(
3463 leader->partitions));
3464
3465 /* Enqueue op for admin_worker() to transition to next state */
3466 rd_kafka_q_enq(rk->rk_ops, rko);
3467 }
3468
3469 return RD_KAFKA_OP_RES_HANDLED;
3470 }
3471
3472
rd_kafka_DeleteRecords(rd_kafka_t * rk,rd_kafka_DeleteRecords_t ** del_records,size_t del_record_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)3473 void rd_kafka_DeleteRecords (rd_kafka_t *rk,
3474 rd_kafka_DeleteRecords_t **del_records,
3475 size_t del_record_cnt,
3476 const rd_kafka_AdminOptions_t *options,
3477 rd_kafka_queue_t *rkqu) {
3478 rd_kafka_op_t *rko_fanout;
3479 static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = {
3480 rd_kafka_DeleteRecords_response_merge,
3481 rd_kafka_topic_partition_list_copy_opaque,
3482 };
3483 const rd_kafka_topic_partition_list_t *offsets;
3484 rd_kafka_topic_partition_list_t *copied_offsets;
3485
3486 rd_assert(rkqu);
3487
3488 rko_fanout = rd_kafka_admin_fanout_op_new(
3489 rk,
3490 RD_KAFKA_OP_DELETERECORDS,
3491 RD_KAFKA_EVENT_DELETERECORDS_RESULT,
3492 &fanout_cbs, options, rkqu->rkqu_q);
3493
3494 if (del_record_cnt != 1) {
3495 /* We only support one DeleteRecords per call since there
3496 * is no point in passing multiples, but the API still
3497 * needs to be extensible/future-proof. */
3498 rd_kafka_admin_result_fail(rko_fanout,
3499 RD_KAFKA_RESP_ERR__INVALID_ARG,
3500 "Exactly one DeleteRecords must be "
3501 "passed");
3502 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3503 rd_true/*destroy*/);
3504 return;
3505 }
3506
3507 offsets = del_records[0]->offsets;
3508
3509 if (offsets == NULL || offsets->cnt == 0) {
3510 rd_kafka_admin_result_fail(rko_fanout,
3511 RD_KAFKA_RESP_ERR__INVALID_ARG,
3512 "No records to delete");
3513 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3514 rd_true/*destroy*/);
3515 return;
3516 }
3517
3518 /* Copy offsets list and store it on the request op */
3519 copied_offsets = rd_kafka_topic_partition_list_copy(offsets);
3520 if (rd_kafka_topic_partition_list_has_duplicates(
3521 copied_offsets, rd_false/*check partition*/)) {
3522 rd_kafka_topic_partition_list_destroy(copied_offsets);
3523 rd_kafka_admin_result_fail(rko_fanout,
3524 RD_KAFKA_RESP_ERR__INVALID_ARG,
3525 "Duplicate partitions not allowed");
3526 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3527 rd_true/*destroy*/);
3528 return;
3529 }
3530
3531 /* Set default error on each partition so that if any of the partitions
3532 * never get a request sent we have an error to indicate it. */
3533 rd_kafka_topic_partition_list_set_err(copied_offsets,
3534 RD_KAFKA_RESP_ERR__NOOP);
3535
3536 rd_list_init(&rko_fanout->rko_u.admin_request.args, 1,
3537 rd_kafka_topic_partition_list_destroy_free);
3538 rd_list_add(&rko_fanout->rko_u.admin_request.args, copied_offsets);
3539
3540 /* Async query for partition leaders */
3541 rd_kafka_topic_partition_list_query_leaders_async(
3542 rk, copied_offsets,
3543 rd_kafka_admin_timeout_remains(rko_fanout),
3544 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
3545 rd_kafka_DeleteRecords_leaders_queried_cb,
3546 rko_fanout);
3547 }
3548
3549
3550 /**
3551 * @brief Get the list of offsets from a DeleteRecords result.
3552 *
3553 * The returned \p offsets life-time is the same as the \p result object.
3554 */
3555 const rd_kafka_topic_partition_list_t *
rd_kafka_DeleteRecords_result_offsets(const rd_kafka_DeleteRecords_result_t * result)3556 rd_kafka_DeleteRecords_result_offsets (
3557 const rd_kafka_DeleteRecords_result_t *result) {
3558 const rd_kafka_topic_partition_list_t *offsets;
3559 const rd_kafka_op_t *rko = (const rd_kafka_op_t *) result;
3560 size_t cnt;
3561
3562 rd_kafka_op_type_t reqtype =
3563 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK;
3564 rd_assert(reqtype == RD_KAFKA_OP_DELETERECORDS);
3565
3566 cnt = rd_list_cnt(&rko->rko_u.admin_result.results);
3567
3568 rd_assert(cnt == 1);
3569
3570 offsets = (const rd_kafka_topic_partition_list_t *)
3571 rd_list_elem(&rko->rko_u.admin_result.results, 0);
3572
3573 rd_assert(offsets);
3574
3575 return offsets;
3576 }
3577
3578 /**@}*/
3579
3580 /**
3581 * @name Delete groups
3582 * @{
3583 *
3584 *
3585 *
3586 *
3587 */
3588
rd_kafka_DeleteGroup_new(const char * group)3589 rd_kafka_DeleteGroup_t *rd_kafka_DeleteGroup_new (const char *group) {
3590 size_t tsize = strlen(group) + 1;
3591 rd_kafka_DeleteGroup_t *del_group;
3592
3593 /* Single allocation */
3594 del_group = rd_malloc(sizeof(*del_group) + tsize);
3595 del_group->group = del_group->data;
3596 memcpy(del_group->group, group, tsize);
3597
3598 return del_group;
3599 }
3600
rd_kafka_DeleteGroup_destroy(rd_kafka_DeleteGroup_t * del_group)3601 void rd_kafka_DeleteGroup_destroy (rd_kafka_DeleteGroup_t *del_group) {
3602 rd_free(del_group);
3603 }
3604
rd_kafka_DeleteGroup_free(void * ptr)3605 static void rd_kafka_DeleteGroup_free (void *ptr) {
3606 rd_kafka_DeleteGroup_destroy(ptr);
3607 }
3608
rd_kafka_DeleteGroup_destroy_array(rd_kafka_DeleteGroup_t ** del_groups,size_t del_group_cnt)3609 void rd_kafka_DeleteGroup_destroy_array (rd_kafka_DeleteGroup_t **del_groups,
3610 size_t del_group_cnt) {
3611 size_t i;
3612 for (i = 0 ; i < del_group_cnt ; i++)
3613 rd_kafka_DeleteGroup_destroy(del_groups[i]);
3614 }
3615
3616 /**
3617 * @brief Group name comparator for DeleteGroup_t
3618 */
rd_kafka_DeleteGroup_cmp(const void * _a,const void * _b)3619 static int rd_kafka_DeleteGroup_cmp (const void *_a, const void *_b) {
3620 const rd_kafka_DeleteGroup_t *a = _a, *b = _b;
3621 return strcmp(a->group, b->group);
3622 }
3623
3624 /**
3625 * @brief Allocate a new DeleteGroup and make a copy of \p src
3626 */
3627 static rd_kafka_DeleteGroup_t *
rd_kafka_DeleteGroup_copy(const rd_kafka_DeleteGroup_t * src)3628 rd_kafka_DeleteGroup_copy (const rd_kafka_DeleteGroup_t *src) {
3629 return rd_kafka_DeleteGroup_new(src->group);
3630 }
3631
3632
3633 /**
3634 * @brief Parse DeleteGroupsResponse and create ADMIN_RESULT op.
3635 */
3636 static rd_kafka_resp_err_t
rd_kafka_DeleteGroupsResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)3637 rd_kafka_DeleteGroupsResponse_parse (rd_kafka_op_t *rko_req,
3638 rd_kafka_op_t **rko_resultp,
3639 rd_kafka_buf_t *reply,
3640 char *errstr, size_t errstr_size) {
3641 const int log_decode_errors = LOG_ERR;
3642 int32_t group_cnt;
3643 int i;
3644 rd_kafka_op_t *rko_result = NULL;
3645
3646 rd_kafka_buf_read_throttle_time(reply);
3647
3648 /* #group_error_codes */
3649 rd_kafka_buf_read_i32(reply, &group_cnt);
3650
3651 if (group_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
3652 rd_kafka_buf_parse_fail(
3653 reply,
3654 "Received %"PRId32" groups in response "
3655 "when only %d were requested", group_cnt,
3656 rd_list_cnt(&rko_req->rko_u.admin_request.args));
3657
3658 rko_result = rd_kafka_admin_result_new(rko_req);
3659 rd_list_init(&rko_result->rko_u.admin_result.results,
3660 group_cnt,
3661 rd_kafka_group_result_free);
3662
3663 for (i = 0 ; i < (int)group_cnt ; i++) {
3664 rd_kafkap_str_t kgroup;
3665 int16_t error_code;
3666 rd_kafka_group_result_t *groupres;
3667
3668 rd_kafka_buf_read_str(reply, &kgroup);
3669 rd_kafka_buf_read_i16(reply, &error_code);
3670
3671 groupres = rd_kafka_group_result_new(
3672 kgroup.str,
3673 RD_KAFKAP_STR_LEN(&kgroup),
3674 NULL,
3675 error_code ?
3676 rd_kafka_error_new(error_code, NULL) : NULL);
3677
3678 rd_list_add(&rko_result->rko_u.admin_result.results, groupres);
3679 }
3680
3681 *rko_resultp = rko_result;
3682 return RD_KAFKA_RESP_ERR_NO_ERROR;
3683
3684 err_parse:
3685 if (rko_result)
3686 rd_kafka_op_destroy(rko_result);
3687
3688 rd_snprintf(errstr, errstr_size,
3689 "DeleteGroups response protocol parse failure: %s",
3690 rd_kafka_err2str(reply->rkbuf_err));
3691
3692 return reply->rkbuf_err;
3693 }
3694
3695 /** @brief Merge the DeleteGroups response from a single broker
3696 * into the user response list.
3697 */
rd_kafka_DeleteGroups_response_merge(rd_kafka_op_t * rko_fanout,const rd_kafka_op_t * rko_partial)3698 void rd_kafka_DeleteGroups_response_merge (rd_kafka_op_t *rko_fanout,
3699 const rd_kafka_op_t *rko_partial) {
3700 const rd_kafka_group_result_t *groupres = NULL;
3701 rd_kafka_group_result_t *newgroupres;
3702 const rd_kafka_DeleteGroup_t *grp =
3703 rko_partial->rko_u.admin_result.opaque;
3704 int orig_pos;
3705
3706 rd_assert(rko_partial->rko_evtype ==
3707 RD_KAFKA_EVENT_DELETEGROUPS_RESULT);
3708
3709 if (!rko_partial->rko_err) {
3710 /* Proper results.
3711 * We only send one group per request, make sure it matches */
3712 groupres = rd_list_elem(&rko_partial->rko_u.admin_result.
3713 results, 0);
3714 rd_assert(groupres);
3715 rd_assert(!strcmp(groupres->group, grp->group));
3716 newgroupres = rd_kafka_group_result_copy(groupres);
3717 } else {
3718 /* Op errored, e.g. timeout */
3719 newgroupres = rd_kafka_group_result_new(
3720 grp->group, -1, NULL,
3721 rd_kafka_error_new(rko_partial->rko_err, NULL));
3722 }
3723
3724 /* As a convenience to the application we insert group result
3725 * in the same order as they were requested. */
3726 orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args,
3727 grp, rd_kafka_DeleteGroup_cmp);
3728 rd_assert(orig_pos != -1);
3729
3730 /* Make sure result is not already set */
3731 rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request.
3732 fanout.results, orig_pos) == NULL);
3733
3734 rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results,
3735 orig_pos, newgroupres);
3736 }
3737
rd_kafka_DeleteGroups(rd_kafka_t * rk,rd_kafka_DeleteGroup_t ** del_groups,size_t del_group_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)3738 void rd_kafka_DeleteGroups (rd_kafka_t *rk,
3739 rd_kafka_DeleteGroup_t **del_groups,
3740 size_t del_group_cnt,
3741 const rd_kafka_AdminOptions_t *options,
3742 rd_kafka_queue_t *rkqu) {
3743 rd_kafka_op_t *rko_fanout;
3744 rd_list_t dup_list;
3745 size_t i;
3746 static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = {
3747 rd_kafka_DeleteGroups_response_merge,
3748 rd_kafka_group_result_copy_opaque,
3749 };
3750
3751 rd_assert(rkqu);
3752
3753 rko_fanout = rd_kafka_admin_fanout_op_new(
3754 rk,
3755 RD_KAFKA_OP_DELETEGROUPS,
3756 RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
3757 &fanout_cbs, options, rkqu->rkqu_q);
3758
3759 if (del_group_cnt == 0) {
3760 rd_kafka_admin_result_fail(rko_fanout,
3761 RD_KAFKA_RESP_ERR__INVALID_ARG,
3762 "No groups to delete");
3763 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3764 rd_true/*destroy*/);
3765 return;
3766 }
3767
3768 /* Copy group list and store it on the request op.
3769 * Maintain original ordering. */
3770 rd_list_init(&rko_fanout->rko_u.admin_request.args,
3771 (int)del_group_cnt,
3772 rd_kafka_DeleteGroup_free);
3773 for (i = 0; i < del_group_cnt; i++)
3774 rd_list_add(&rko_fanout->rko_u.admin_request.args,
3775 rd_kafka_DeleteGroup_copy(del_groups[i]));
3776
3777 /* Check for duplicates.
3778 * Make a temporary copy of the group list and sort it to check for
3779 * duplicates, we don't want the original list sorted since we want
3780 * to maintain ordering. */
3781 rd_list_init(&dup_list,
3782 rd_list_cnt(&rko_fanout->rko_u.admin_request.args),
3783 NULL);
3784 rd_list_copy_to(&dup_list,
3785 &rko_fanout->rko_u.admin_request.args,
3786 NULL, NULL);
3787 rd_list_sort(&dup_list, rd_kafka_DeleteGroup_cmp);
3788 if (rd_list_find_duplicate(&dup_list, rd_kafka_DeleteGroup_cmp)) {
3789 rd_list_destroy(&dup_list);
3790 rd_kafka_admin_result_fail(rko_fanout,
3791 RD_KAFKA_RESP_ERR__INVALID_ARG,
3792 "Duplicate groups not allowed");
3793 rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
3794 rd_true/*destroy*/);
3795 return;
3796 }
3797
3798 rd_list_destroy(&dup_list);
3799
3800 /* Prepare results list where fanned out op's results will be
3801 * accumulated. */
3802 rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results,
3803 (int)del_group_cnt,
3804 rd_kafka_group_result_free);
3805 rko_fanout->rko_u.admin_request.fanout.outstanding = (int)del_group_cnt;
3806
3807 /* Create individual request ops for each group.
3808 * FIXME: A future optimization is to coalesce all groups for a single
3809 * coordinator into one op. */
3810 for (i = 0; i < del_group_cnt; i++) {
3811 static const struct rd_kafka_admin_worker_cbs cbs = {
3812 rd_kafka_DeleteGroupsRequest,
3813 rd_kafka_DeleteGroupsResponse_parse,
3814 };
3815 rd_kafka_DeleteGroup_t *grp = rd_list_elem(
3816 &rko_fanout->rko_u.admin_request.args, (int)i);
3817 rd_kafka_op_t *rko =
3818 rd_kafka_admin_request_op_new(
3819 rk,
3820 RD_KAFKA_OP_DELETEGROUPS,
3821 RD_KAFKA_EVENT_DELETEGROUPS_RESULT,
3822 &cbs,
3823 options,
3824 rk->rk_ops);
3825
3826 rko->rko_u.admin_request.fanout_parent = rko_fanout;
3827 rko->rko_u.admin_request.broker_id =
3828 RD_KAFKA_ADMIN_TARGET_COORDINATOR;
3829 rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP;
3830 rko->rko_u.admin_request.coordkey = rd_strdup(grp->group);
3831
3832 /* Set the group name as the opaque so the fanout worker use it
3833 * to fill in errors.
3834 * References rko_fanout's memory, which will always outlive
3835 * the fanned out op. */
3836 rd_kafka_AdminOptions_set_opaque(
3837 &rko->rko_u.admin_request.options, grp);
3838
3839 rd_list_init(&rko->rko_u.admin_request.args, 1,
3840 rd_kafka_DeleteGroup_free);
3841 rd_list_add(&rko->rko_u.admin_request.args,
3842 rd_kafka_DeleteGroup_copy(del_groups[i]));
3843
3844 rd_kafka_q_enq(rk->rk_ops, rko);
3845 }
3846 }
3847
3848
3849 /**
3850 * @brief Get an array of group results from a DeleteGroups result.
3851 *
3852 * The returned \p groups life-time is the same as the \p result object.
3853 * @param cntp is updated to the number of elements in the array.
3854 */
3855 const rd_kafka_group_result_t **
rd_kafka_DeleteGroups_result_groups(const rd_kafka_DeleteGroups_result_t * result,size_t * cntp)3856 rd_kafka_DeleteGroups_result_groups (
3857 const rd_kafka_DeleteGroups_result_t *result,
3858 size_t *cntp) {
3859 return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result,
3860 cntp);
3861 }
3862
3863
3864 /**@}*/
3865
3866
3867 /**
3868 * @name Delete consumer group offsets (committed offsets)
3869 * @{
3870 *
3871 *
3872 *
3873 *
3874 */
3875
3876 rd_kafka_DeleteConsumerGroupOffsets_t *
rd_kafka_DeleteConsumerGroupOffsets_new(const char * group,const rd_kafka_topic_partition_list_t * partitions)3877 rd_kafka_DeleteConsumerGroupOffsets_new (const char *group,
3878 const rd_kafka_topic_partition_list_t
3879 *partitions) {
3880 size_t tsize = strlen(group) + 1;
3881 rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets;
3882
3883 rd_assert(partitions);
3884
3885 /* Single allocation */
3886 del_grpoffsets = rd_malloc(sizeof(*del_grpoffsets) + tsize);
3887 del_grpoffsets->group = del_grpoffsets->data;
3888 memcpy(del_grpoffsets->group, group, tsize);
3889 del_grpoffsets->partitions =
3890 rd_kafka_topic_partition_list_copy(partitions);
3891
3892 return del_grpoffsets;
3893 }
3894
rd_kafka_DeleteConsumerGroupOffsets_destroy(rd_kafka_DeleteConsumerGroupOffsets_t * del_grpoffsets)3895 void rd_kafka_DeleteConsumerGroupOffsets_destroy (
3896 rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets) {
3897 rd_kafka_topic_partition_list_destroy(del_grpoffsets->partitions);
3898 rd_free(del_grpoffsets);
3899 }
3900
rd_kafka_DeleteConsumerGroupOffsets_free(void * ptr)3901 static void rd_kafka_DeleteConsumerGroupOffsets_free (void *ptr) {
3902 rd_kafka_DeleteConsumerGroupOffsets_destroy(ptr);
3903 }
3904
rd_kafka_DeleteConsumerGroupOffsets_destroy_array(rd_kafka_DeleteConsumerGroupOffsets_t ** del_grpoffsets,size_t del_grpoffsets_cnt)3905 void rd_kafka_DeleteConsumerGroupOffsets_destroy_array (
3906 rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets,
3907 size_t del_grpoffsets_cnt) {
3908 size_t i;
3909 for (i = 0 ; i < del_grpoffsets_cnt ; i++)
3910 rd_kafka_DeleteConsumerGroupOffsets_destroy(del_grpoffsets[i]);
3911 }
3912
3913
3914 /**
3915 * @brief Allocate a new DeleteGroup and make a copy of \p src
3916 */
3917 static rd_kafka_DeleteConsumerGroupOffsets_t *
rd_kafka_DeleteConsumerGroupOffsets_copy(const rd_kafka_DeleteConsumerGroupOffsets_t * src)3918 rd_kafka_DeleteConsumerGroupOffsets_copy (
3919 const rd_kafka_DeleteConsumerGroupOffsets_t *src) {
3920 return rd_kafka_DeleteConsumerGroupOffsets_new(src->group,
3921 src->partitions);
3922 }
3923
3924
3925 /**
3926 * @brief Parse OffsetDeleteResponse and create ADMIN_RESULT op.
3927 */
3928 static rd_kafka_resp_err_t
rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t * rko_req,rd_kafka_op_t ** rko_resultp,rd_kafka_buf_t * reply,char * errstr,size_t errstr_size)3929 rd_kafka_OffsetDeleteResponse_parse (rd_kafka_op_t *rko_req,
3930 rd_kafka_op_t **rko_resultp,
3931 rd_kafka_buf_t *reply,
3932 char *errstr, size_t errstr_size) {
3933 const int log_decode_errors = LOG_ERR;
3934 rd_kafka_op_t *rko_result;
3935 int16_t ErrorCode;
3936 rd_kafka_topic_partition_list_t *partitions = NULL;
3937 const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets;
3938
3939 rd_kafka_buf_read_i16(reply, &ErrorCode);
3940 if (ErrorCode) {
3941 rd_snprintf(errstr, errstr_size,
3942 "OffsetDelete response error: %s",
3943 rd_kafka_err2str(ErrorCode));
3944 return ErrorCode;
3945 }
3946
3947 rd_kafka_buf_read_throttle_time(reply);
3948
3949 partitions = rd_kafka_buf_read_topic_partitions(reply,
3950 16,
3951 rd_false/*no offset */,
3952 rd_true/*read error*/);
3953 if (!partitions) {
3954 rd_snprintf(errstr, errstr_size,
3955 "Failed to parse OffsetDeleteResponse partitions");
3956 return RD_KAFKA_RESP_ERR__BAD_MSG;
3957 }
3958
3959
3960 /* Create result op and group_result_t */
3961 rko_result = rd_kafka_admin_result_new(rko_req);
3962 del_grpoffsets = rd_list_elem(&rko_result->rko_u.admin_result.args, 0);
3963
3964 rd_list_init(&rko_result->rko_u.admin_result.results, 1,
3965 rd_kafka_group_result_free);
3966 rd_list_add(&rko_result->rko_u.admin_result.results,
3967 rd_kafka_group_result_new(del_grpoffsets->group, -1,
3968 partitions, NULL));
3969 rd_kafka_topic_partition_list_destroy(partitions);
3970
3971 *rko_resultp = rko_result;
3972
3973 return RD_KAFKA_RESP_ERR_NO_ERROR;
3974
3975 err_parse:
3976 rd_snprintf(errstr, errstr_size,
3977 "OffsetDelete response protocol parse failure: %s",
3978 rd_kafka_err2str(reply->rkbuf_err));
3979 return reply->rkbuf_err;
3980 }
3981
3982
rd_kafka_DeleteConsumerGroupOffsets(rd_kafka_t * rk,rd_kafka_DeleteConsumerGroupOffsets_t ** del_grpoffsets,size_t del_grpoffsets_cnt,const rd_kafka_AdminOptions_t * options,rd_kafka_queue_t * rkqu)3983 void rd_kafka_DeleteConsumerGroupOffsets (
3984 rd_kafka_t *rk,
3985 rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets,
3986 size_t del_grpoffsets_cnt,
3987 const rd_kafka_AdminOptions_t *options,
3988 rd_kafka_queue_t *rkqu) {
3989 static const struct rd_kafka_admin_worker_cbs cbs = {
3990 rd_kafka_OffsetDeleteRequest,
3991 rd_kafka_OffsetDeleteResponse_parse,
3992 };
3993 rd_kafka_op_t *rko;
3994
3995 rd_assert(rkqu);
3996
3997 rko = rd_kafka_admin_request_op_new(
3998 rk,
3999 RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS,
4000 RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT,
4001 &cbs, options, rkqu->rkqu_q);
4002
4003 if (del_grpoffsets_cnt != 1) {
4004 /* For simplicity we only support one single group for now */
4005 rd_kafka_admin_result_fail(rko,
4006 RD_KAFKA_RESP_ERR__INVALID_ARG,
4007 "Exactly one "
4008 "DeleteConsumerGroupOffsets must "
4009 "be passed");
4010 rd_kafka_admin_common_worker_destroy(rk, rko,
4011 rd_true/*destroy*/);
4012 return;
4013 }
4014
4015
4016 rko->rko_u.admin_request.broker_id =
4017 RD_KAFKA_ADMIN_TARGET_COORDINATOR;
4018 rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP;
4019 rko->rko_u.admin_request.coordkey =
4020 rd_strdup(del_grpoffsets[0]->group);
4021
4022 /* Store copy of group on request so the group name can be reached
4023 * from the response parser. */
4024 rd_list_init(&rko->rko_u.admin_request.args, 1,
4025 rd_kafka_DeleteConsumerGroupOffsets_free);
4026 rd_list_add(&rko->rko_u.admin_request.args,
4027 rd_kafka_DeleteConsumerGroupOffsets_copy(
4028 del_grpoffsets[0]));
4029
4030 rd_kafka_q_enq(rk->rk_ops, rko);
4031 }
4032
4033
4034 /**
4035 * @brief Get an array of group results from a DeleteGroups result.
4036 *
4037 * The returned \p groups life-time is the same as the \p result object.
4038 * @param cntp is updated to the number of elements in the array.
4039 */
4040 const rd_kafka_group_result_t **
rd_kafka_DeleteConsumerGroupOffsets_result_groups(const rd_kafka_DeleteConsumerGroupOffsets_result_t * result,size_t * cntp)4041 rd_kafka_DeleteConsumerGroupOffsets_result_groups (
4042 const rd_kafka_DeleteConsumerGroupOffsets_result_t *result,
4043 size_t *cntp) {
4044 return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result,
4045 cntp);
4046 }
4047
4048 RD_EXPORT
4049 void rd_kafka_DeleteConsumerGroupOffsets (
4050 rd_kafka_t *rk,
4051 rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets,
4052 size_t del_grpoffsets_cnt,
4053 const rd_kafka_AdminOptions_t *options,
4054 rd_kafka_queue_t *rkqu);
4055
4056 /**@}*/
4057