1 /* 2 * librdkafka - Apache Kafka C library 3 * 4 * Copyright (c) 2018 Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include "rdkafka_int.h" 30 #include "rdkafka_admin.h" 31 #include "rdkafka_request.h" 32 #include "rdkafka_aux.h" 33 34 #include <stdarg.h> 35 36 37 38 /** @brief Descriptive strings for rko_u.admin_request.state */ 39 static const char *rd_kafka_admin_state_desc[] = { 40 "initializing", 41 "waiting for broker", pg_bswap16(uint16 x)42 "waiting for controller", 43 "waiting for fanouts", 44 "constructing request", 45 "waiting for response from broker", 46 }; 47 48 49 50 /** 51 * @brief Admin API implementation. 52 * 53 * The public Admin API in librdkafka exposes a completely asynchronous 54 * interface where the initial request API (e.g., ..CreateTopics()) 55 * is non-blocking and returns immediately, and the application polls 56 * a ..queue_t for the result. 57 * 58 * The underlying handling of the request is also completely asynchronous 59 * inside librdkafka, for two reasons: 60 * - everything is async in librdkafka so adding something new that isn't 61 * would mean that existing functionality will need to be changed if 62 * it should be able to work simultaneously (such as statistics, timers, 63 * etc). There is no functional value to making the admin API pg_bswap32(uint32 x)64 * synchronous internally, even if it would simplify its implementation. 65 * So making it async allows the Admin API to be used with existing 66 * client types in existing applications without breakage. 67 * - the async approach allows multiple outstanding Admin API requests 68 * simultaneously. 69 * 70 * The internal async implementation relies on the following concepts: 71 * - it uses a single rko (rd_kafka_op_t) to maintain state. 72 * - the rko has a callback attached - called the worker callback. 73 * - the worker callback is a small state machine that triggers 74 * async operations (be it controller lookups, timeout timers, 75 * protocol transmits, etc). 76 * - the worker callback is only called on the rdkafka main thread. 77 * - the callback is triggered by different events and sources by enqueuing 78 * the rko on the rdkafka main ops queue. 79 * 80 * 81 * Let's illustrate this with a DeleteTopics example. This might look 82 * daunting, but it boils down to an asynchronous state machine being 83 * triggered by enqueuing the rko op. 84 * 85 * 1. [app thread] The user constructs the input arguments, 86 * including a response rkqu queue and then calls DeleteTopics(). 87 * 88 * 2. [app thread] DeleteTopics() creates a new internal op (rko) of type 89 * RD_KAFKA_OP_DELETETOPICS, makes a **copy** on the rko of all the 90 * input arguments (which allows the caller to free the originals 91 * whenever she likes). The rko op worker callback is set to the 92 * generic admin worker callback rd_kafka_admin_worker() 93 * 94 * 3. [app thread] DeleteTopics() enqueues the rko on librdkafka's main ops 95 * queue that is served by the rdkafka main thread in rd_kafka_thread_main() 96 * 97 * 4. [rdkafka main thread] The rko is dequeued by rd_kafka_q_serve and 98 * the rd_kafka_poll_cb() is called. 99 * 100 * 5. [rdkafka main thread] The rko_type switch case identifies the rko 101 * as an RD_KAFKA_OP_DELETETOPICS which is served by the op callback 102 * set in step 2. 103 * 104 * 6. [rdkafka main thread] The worker callback is called. 105 * After some initial checking of err==ERR__DESTROY events 106 * (which is used to clean up outstanding ops (etc) on termination), 107 * the code hits a state machine using rko_u.admin.request_state. 108 * 109 * 7. [rdkafka main thread] The initial state is RD_KAFKA_ADMIN_STATE_INIT 110 * where the worker validates the user input. 111 * An enqueue once (eonce) object is created - the use of this object 112 * allows having multiple outstanding async functions referencing the 113 * same underlying rko object, but only allowing the first one 114 * to trigger an event. 115 * A timeout timer is set up to trigger the eonce object when the 116 * full options.request_timeout has elapsed. 117 * 118 * 8. [rdkafka main thread] After initialization the state is updated 119 * to WAIT_BROKER or WAIT_CONTROLLER and the code falls through to 120 * looking up a specific broker or the controller broker and waiting for 121 * an active connection. 122 * Both the lookup and the waiting for an active connection are 123 * fully asynchronous, and the same eonce used for the timer is passed 124 * to the rd_kafka_broker_controller_async() or broker_async() functions 125 * which will trigger the eonce when a broker state change occurs. 126 * If the controller is already known (from metadata) and the connection 127 * is up a rkb broker object is returned and the eonce is not used, 128 * skip to step 11. 129 * 130 * 9. [rdkafka main thread] Upon metadata retrieval (which is triggered 131 * automatically by other parts of the code) the controller_id may be 132 * updated in which case the eonce is triggered. 133 * The eonce triggering enqueues the original rko on the rdkafka main 134 * ops queue again and we go to step 8 which will check if the controller 135 * connection is up. 136 * 137 * 10. [broker thread] If the controller_id is now known we wait for 138 * the corresponding broker's connection to come up. This signaling 139 * is performed from the broker thread upon broker state changes 140 * and uses the same eonce. The eonce triggering enqueues the original 141 * rko on the rdkafka main ops queue again we go to back to step 8 142 * to check if broker is now available. 143 * 144 * 11. [rdkafka main thread] Back in the worker callback we now have an 145 * rkb broker pointer (with reference count increased) for the controller 146 * with the connection up (it might go down while we're referencing it, 147 * but that does not stop us from enqueuing a protocol request). 148 * 149 * 12. [rdkafka main thread] A DeleteTopics protocol request buffer is 150 * constructed using the input parameters saved on the rko and the 151 * buffer is enqueued on the broker's transmit queue. 152 * The buffer is set up to provide the reply buffer on the rdkafka main 153 * ops queue (the same queue we are operating from) with a handler 154 * callback of rd_kafka_admin_handle_response(). 155 * The state is updated to the RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE. 156 * 157 * 13. [broker thread] If the request times out, a response with error code 158 * (ERR__TIMED_OUT) is enqueued. Go to 16. 159 * 160 * 14. [broker thread] If a response is received, the response buffer 161 * is enqueued. Go to 16. 162 * 163 * 15. [rdkafka main thread] The buffer callback (..handle_response()) 164 * is called, which attempts to extract the original rko from the eonce, 165 * but if the eonce has already been triggered by some other source 166 * (the timeout timer) the buffer callback simply returns and does nothing 167 * since the admin request is over and a result (probably a timeout) 168 * has been enqueued for the application. 169 * If the rko was still intact we temporarily set the reply buffer 170 * in the rko struct and call the worker callback. Go to 17. 171 * 172 * 16. [rdkafka main thread] The worker callback is called in state 173 * RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE without a response but with an error. 174 * An error result op is created and enqueued on the application's 175 * provided response rkqu queue. 176 * 177 * 17. [rdkafka main thread] The worker callback is called in state 178 * RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE with a response buffer with no 179 * error set. 180 * The worker calls the response `parse()` callback to parse the response 181 * buffer and populates a result op (rko_result) with the response 182 * information (such as per-topic error codes, etc). 183 * The result op is returned to the worker. 184 * 185 * 18. [rdkafka main thread] The worker enqueues the result op (rko_result) 186 * on the application's provided response rkqu queue. 187 * 188 * 19. [app thread] The application calls rd_kafka_queue_poll() to 189 * receive the result of the operation. The result may have been 190 * enqueued in step 18 thanks to succesful completion, or in any 191 * of the earlier stages when an error was encountered. 192 * 193 * 20. [app thread] The application uses rd_kafka_event_DeleteTopics_result() 194 * to retrieve the request-specific result type. 195 * 196 * 21. Done. 197 * 198 * 199 * 200 * 201 * Fanout (RD_KAFKA_OP_ADMIN_FANOUT) requests 202 * ------------------------------------------ 203 * 204 * Certain Admin APIs may have requests that need to be sent to different 205 * brokers, for instance DeleteRecords which needs to be sent to the leader 206 * for each given partition. 207 * 208 * To achieve this we create a Fanout (RD_KAFKA_OP_ADMIN_FANOUT) op for the 209 * overall Admin API call (e.g., DeleteRecords), and then sub-ops for each 210 * of the per-broker requests. These sub-ops have the proper op type for 211 * the operation they are performing (e.g., RD_KAFKA_OP_DELETERECORDS) 212 * but their replyq does not point back to the application replyq but 213 * rk_ops which is handled by the librdkafka main thread and with the op 214 * callback set to rd_kafka_admin_fanout_worker(). This worker aggregates 215 * the results of each fanned out sub-op and merges the result into a 216 * single result op (RD_KAFKA_OP_ADMIN_RESULT) that is enqueued on the 217 * application's replyq. 218 * 219 * We rely on the timeouts on the fanned out sub-ops rather than the parent 220 * fanout op. 221 * 222 * The parent fanout op must not be destroyed until all fanned out sub-ops 223 * are done (either by success, failure or timeout) and destroyed, and this 224 * is tracked by the rko_u.admin_request.fanout.outstanding counter. 225 * 226 */ 227 228 229 /** 230 * @enum Admin request target broker. Must be negative values since the field 231 * used is broker_id. 232 */ 233 enum { 234 RD_KAFKA_ADMIN_TARGET_CONTROLLER = -1, /**< Cluster controller */ 235 RD_KAFKA_ADMIN_TARGET_COORDINATOR = -2, /**< (Group) Coordinator */ 236 RD_KAFKA_ADMIN_TARGET_FANOUT = -3, /**< This rko is a fanout and 237 * and has no target broker */ 238 }; 239 240 /** 241 * @brief Admin op callback types 242 */ 243 typedef rd_kafka_resp_err_t (rd_kafka_admin_Request_cb_t) ( 244 rd_kafka_broker_t *rkb, 245 const rd_list_t *configs /*(ConfigResource_t*)*/, 246 rd_kafka_AdminOptions_t *options, 247 char *errstr, size_t errstr_size, 248 rd_kafka_replyq_t replyq, 249 rd_kafka_resp_cb_t *resp_cb, 250 void *opaque) 251 RD_WARN_UNUSED_RESULT; 252 253 typedef rd_kafka_resp_err_t (rd_kafka_admin_Response_parse_cb_t) ( 254 rd_kafka_op_t *rko_req, 255 rd_kafka_op_t **rko_resultp, 256 rd_kafka_buf_t *reply, 257 char *errstr, size_t errstr_size) 258 RD_WARN_UNUSED_RESULT; 259 260 typedef void (rd_kafka_admin_fanout_PartialResponse_cb_t) ( 261 rd_kafka_op_t *rko_req, 262 const rd_kafka_op_t *rko_partial); 263 264 typedef rd_list_copy_cb_t rd_kafka_admin_fanout_CopyResult_cb_t; 265 266 /** 267 * @struct Request-specific worker callbacks. 268 */ 269 struct rd_kafka_admin_worker_cbs { 270 /**< Protocol request callback which is called 271 * to construct and send the request. */ 272 rd_kafka_admin_Request_cb_t *request; 273 274 /**< Protocol response parser callback which is called 275 * to translate the response to a rko_result op. */ 276 rd_kafka_admin_Response_parse_cb_t *parse; 277 }; 278 279 /** 280 * @struct Fanout request callbacks. 281 */ 282 struct rd_kafka_admin_fanout_worker_cbs { 283 /** Merge results from a fanned out request into the user response. */ 284 rd_kafka_admin_fanout_PartialResponse_cb_t *partial_response; 285 286 /** Copy an accumulated result for storing into the rko_result. */ 287 rd_kafka_admin_fanout_CopyResult_cb_t *copy_result; 288 }; 289 290 /* Forward declarations */ 291 static void rd_kafka_admin_common_worker_destroy (rd_kafka_t *rk, 292 rd_kafka_op_t *rko, 293 rd_bool_t do_destroy); 294 static void rd_kafka_AdminOptions_init (rd_kafka_t *rk, 295 rd_kafka_AdminOptions_t *options); 296 static rd_kafka_op_res_t 297 rd_kafka_admin_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko); 298 static rd_kafka_ConfigEntry_t * 299 rd_kafka_ConfigEntry_copy (const rd_kafka_ConfigEntry_t *src); 300 static void rd_kafka_ConfigEntry_free (void *ptr); 301 static void *rd_kafka_ConfigEntry_list_copy (const void *src, void *opaque); 302 303 static void rd_kafka_admin_handle_response (rd_kafka_t *rk, 304 rd_kafka_broker_t *rkb, 305 rd_kafka_resp_err_t err, 306 rd_kafka_buf_t *reply, 307 rd_kafka_buf_t *request, 308 void *opaque); 309 310 static rd_kafka_op_res_t 311 rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, 312 rd_kafka_op_t *rko_fanout); 313 314 315 /** 316 * @name Common admin request code 317 * @{ 318 * 319 * 320 */ 321 322 /** 323 * @brief Create a new admin_result op based on the request op \p rko_req 324 */ 325 static rd_kafka_op_t *rd_kafka_admin_result_new (rd_kafka_op_t *rko_req) { 326 rd_kafka_op_t *rko_result; 327 rd_kafka_op_t *rko_fanout; 328 329 if ((rko_fanout = rko_req->rko_u.admin_request.fanout_parent)) { 330 /* If this is a fanned out request the rko_result needs to be 331 * handled by the fanout worker rather than the application. */ 332 rko_result = rd_kafka_op_new_cb( 333 rko_req->rko_rk, 334 RD_KAFKA_OP_ADMIN_RESULT, 335 rd_kafka_admin_fanout_worker); 336 /* Transfer fanout pointer to result */ 337 rko_result->rko_u.admin_result.fanout_parent = rko_fanout; 338 rko_req->rko_u.admin_request.fanout_parent = NULL; 339 /* Set event type based on original fanout ops reqtype, 340 * e.g., ..OP_DELETERECORDS */ 341 rko_result->rko_u.admin_result.reqtype = 342 rko_fanout->rko_u.admin_request.fanout.reqtype; 343 344 } else { 345 rko_result = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_RESULT); 346 347 /* If this is fanout request (i.e., the parent OP_ADMIN_FANOUT 348 * to fanned out requests) we need to use the original 349 * application request type. */ 350 if (rko_req->rko_type == RD_KAFKA_OP_ADMIN_FANOUT) 351 rko_result->rko_u.admin_result.reqtype = 352 rko_req->rko_u.admin_request.fanout.reqtype; 353 else 354 rko_result->rko_u.admin_result.reqtype = 355 rko_req->rko_type; 356 } 357 358 rko_result->rko_rk = rko_req->rko_rk; 359 360 rko_result->rko_u.admin_result.opaque = 361 rd_kafka_confval_get_ptr(&rko_req->rko_u.admin_request. 362 options.opaque); 363 364 rko_result->rko_evtype = rko_req->rko_u.admin_request.reply_event_type; 365 366 return rko_result; 367 } 368 369 370 /** 371 * @brief Set error code and error string on admin_result op \p rko. 372 */ 373 static void rd_kafka_admin_result_set_err0 (rd_kafka_op_t *rko, 374 rd_kafka_resp_err_t err, 375 const char *fmt, va_list ap) { 376 char buf[512]; 377 378 rd_vsnprintf(buf, sizeof(buf), fmt, ap); 379 380 rko->rko_err = err; 381 382 if (rko->rko_u.admin_result.errstr) 383 rd_free(rko->rko_u.admin_result.errstr); 384 rko->rko_u.admin_result.errstr = rd_strdup(buf); 385 386 rd_kafka_dbg(rko->rko_rk, ADMIN, "ADMINFAIL", 387 "Admin %s result error: %s", 388 rd_kafka_op2str(rko->rko_u.admin_result.reqtype), 389 rko->rko_u.admin_result.errstr); 390 } 391 392 /** 393 * @sa rd_kafka_admin_result_set_err0 394 */ 395 static RD_UNUSED RD_FORMAT(printf, 3, 4) 396 void rd_kafka_admin_result_set_err (rd_kafka_op_t *rko, 397 rd_kafka_resp_err_t err, 398 const char *fmt, ...) { 399 va_list ap; 400 401 va_start(ap, fmt); 402 rd_kafka_admin_result_set_err0(rko, err, fmt, ap); 403 va_end(ap); 404 } 405 406 /** 407 * @brief Enqueue admin_result on application's queue. 408 */ 409 static RD_INLINE 410 void rd_kafka_admin_result_enq (rd_kafka_op_t *rko_req, 411 rd_kafka_op_t *rko_result) { 412 rd_kafka_replyq_enq(&rko_req->rko_u.admin_request.replyq, 413 rko_result, 414 rko_req->rko_u.admin_request.replyq.version); 415 } 416 417 /** 418 * @brief Set request-level error code and string in reply op. 419 * 420 * @remark This function will NOT destroy the \p rko_req, so don't forget to 421 * call rd_kafka_admin_common_worker_destroy() when done with the rko. 422 */ 423 static RD_FORMAT(printf, 3, 4) 424 void rd_kafka_admin_result_fail (rd_kafka_op_t *rko_req, 425 rd_kafka_resp_err_t err, 426 const char *fmt, ...) { 427 va_list ap; 428 rd_kafka_op_t *rko_result; 429 430 if (!rko_req->rko_u.admin_request.replyq.q) 431 return; 432 433 rko_result = rd_kafka_admin_result_new(rko_req); 434 435 va_start(ap, fmt); 436 rd_kafka_admin_result_set_err0(rko_result, err, fmt, ap); 437 va_end(ap); 438 439 rd_kafka_admin_result_enq(rko_req, rko_result); 440 } 441 442 443 /** 444 * @brief Send the admin request contained in \p rko upon receiving 445 * a FindCoordinator response. 446 * 447 * @param opaque Must be an admin request op's eonce (rko_u.admin_request.eonce) 448 * (i.e. created by \c rd_kafka_admin_request_op_new ) 449 * 450 * @remark To be used as a callback for \c rd_kafka_coord_req 451 */ 452 static rd_kafka_resp_err_t 453 rd_kafka_admin_coord_request (rd_kafka_broker_t *rkb, 454 rd_kafka_op_t *rko_ignore, 455 rd_kafka_replyq_t replyq, 456 rd_kafka_resp_cb_t *resp_cb, 457 void *opaque) { 458 rd_kafka_t *rk = rkb->rkb_rk; 459 rd_kafka_enq_once_t *eonce = opaque; 460 rd_kafka_op_t *rko; 461 char errstr[512]; 462 rd_kafka_resp_err_t err; 463 464 465 rko = rd_kafka_enq_once_del_source_return(eonce, "coordinator request"); 466 if (!rko) 467 /* Admin request has timed out and been destroyed */ 468 return RD_KAFKA_RESP_ERR__DESTROY; 469 470 rd_kafka_enq_once_add_source(eonce, "coordinator response"); 471 472 err = rko->rko_u.admin_request.cbs->request( 473 rkb, 474 &rko->rko_u.admin_request.args, 475 &rko->rko_u.admin_request.options, 476 errstr, sizeof(errstr), 477 replyq, 478 rd_kafka_admin_handle_response, 479 eonce); 480 if (err) { 481 rd_kafka_enq_once_del_source(eonce, "coordinator response"); 482 rd_kafka_admin_result_fail( 483 rko, err, 484 "%s worker failed to send request: %s", 485 rd_kafka_op2str(rko->rko_type), errstr); 486 rd_kafka_admin_common_worker_destroy(rk, rko, 487 rd_true/*destroy*/); 488 } 489 return err; 490 } 491 492 493 /** 494 * @brief Return the topics list from a topic-related result object. 495 */ 496 static const rd_kafka_topic_result_t ** 497 rd_kafka_admin_result_ret_topics (const rd_kafka_op_t *rko, 498 size_t *cntp) { 499 rd_kafka_op_type_t reqtype = 500 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; 501 rd_assert(reqtype == RD_KAFKA_OP_CREATETOPICS || 502 reqtype == RD_KAFKA_OP_DELETETOPICS || 503 reqtype == RD_KAFKA_OP_CREATEPARTITIONS); 504 505 *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); 506 return (const rd_kafka_topic_result_t **)rko->rko_u.admin_result. 507 results.rl_elems; 508 } 509 510 /** 511 * @brief Return the ConfigResource list from a config-related result object. 512 */ 513 static const rd_kafka_ConfigResource_t ** 514 rd_kafka_admin_result_ret_resources (const rd_kafka_op_t *rko, 515 size_t *cntp) { 516 rd_kafka_op_type_t reqtype = 517 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; 518 rd_assert(reqtype == RD_KAFKA_OP_ALTERCONFIGS || 519 reqtype == RD_KAFKA_OP_DESCRIBECONFIGS); 520 521 *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); 522 return (const rd_kafka_ConfigResource_t **)rko->rko_u.admin_result. 523 results.rl_elems; 524 } 525 526 527 /** 528 * @brief Return the groups list from a group-related result object. 529 */ 530 static const rd_kafka_group_result_t ** 531 rd_kafka_admin_result_ret_groups (const rd_kafka_op_t *rko, 532 size_t *cntp) { 533 rd_kafka_op_type_t reqtype = 534 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; 535 rd_assert(reqtype == RD_KAFKA_OP_DELETEGROUPS || 536 reqtype == RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS); 537 538 *cntp = rd_list_cnt(&rko->rko_u.admin_result.results); 539 return (const rd_kafka_group_result_t **)rko->rko_u.admin_result. 540 results.rl_elems; 541 } 542 543 /** 544 * @brief Create a new admin_request op of type \p optype and sets up the 545 * generic (type independent files). 546 * 547 * The caller shall then populate the admin_request.args list 548 * and enqueue the op on rk_ops for further processing work. 549 * 550 * @param cbs Callbacks, must reside in .data segment. 551 * @param options Optional options, may be NULL to use defaults. 552 * 553 * @locks none 554 * @locality application thread 555 */ 556 static rd_kafka_op_t * 557 rd_kafka_admin_request_op_new (rd_kafka_t *rk, 558 rd_kafka_op_type_t optype, 559 rd_kafka_event_type_t reply_event_type, 560 const struct rd_kafka_admin_worker_cbs *cbs, 561 const rd_kafka_AdminOptions_t *options, 562 rd_kafka_q_t *rkq) { 563 rd_kafka_op_t *rko; 564 565 rd_assert(rk); 566 rd_assert(rkq); 567 rd_assert(cbs); 568 569 rko = rd_kafka_op_new_cb(rk, optype, rd_kafka_admin_worker); 570 571 rko->rko_u.admin_request.reply_event_type = reply_event_type; 572 573 rko->rko_u.admin_request.cbs = (struct rd_kafka_admin_worker_cbs *)cbs; 574 575 /* Make a copy of the options */ 576 if (options) 577 rko->rko_u.admin_request.options = *options; 578 else 579 rd_kafka_AdminOptions_init(rk, 580 &rko->rko_u.admin_request.options); 581 582 /* Default to controller */ 583 rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; 584 585 /* Calculate absolute timeout */ 586 rko->rko_u.admin_request.abs_timeout = 587 rd_timeout_init( 588 rd_kafka_confval_get_int(&rko->rko_u.admin_request. 589 options.request_timeout)); 590 591 /* Setup enq-op-once, which is triggered by either timer code 592 * or future wait-controller code. */ 593 rko->rko_u.admin_request.eonce = 594 rd_kafka_enq_once_new(rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0)); 595 596 /* The timer itself must be started from the rdkafka main thread, 597 * not here. */ 598 599 /* Set up replyq */ 600 rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0); 601 602 rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_INIT; 603 return rko; 604 } 605 606 607 /** 608 * @returns the remaining request timeout in milliseconds. 609 */ 610 static RD_INLINE int rd_kafka_admin_timeout_remains (rd_kafka_op_t *rko) { 611 return rd_timeout_remains(rko->rko_u.admin_request.abs_timeout); 612 } 613 614 /** 615 * @returns the remaining request timeout in microseconds. 616 */ 617 static RD_INLINE rd_ts_t 618 rd_kafka_admin_timeout_remains_us (rd_kafka_op_t *rko) { 619 return rd_timeout_remains_us(rko->rko_u.admin_request.abs_timeout); 620 } 621 622 623 /** 624 * @brief Timer timeout callback for the admin rko's eonce object. 625 */ 626 static void rd_kafka_admin_eonce_timeout_cb (rd_kafka_timers_t *rkts, 627 void *arg) { 628 rd_kafka_enq_once_t *eonce = arg; 629 630 rd_kafka_enq_once_trigger(eonce, RD_KAFKA_RESP_ERR__TIMED_OUT, 631 "timeout timer"); 632 } 633 634 635 636 /** 637 * @brief Common worker destroy to be called in destroy: label 638 * in worker. 639 */ 640 static void rd_kafka_admin_common_worker_destroy (rd_kafka_t *rk, 641 rd_kafka_op_t *rko, 642 rd_bool_t do_destroy) { 643 int timer_was_stopped; 644 645 /* Free resources for this op. */ 646 timer_was_stopped = 647 rd_kafka_timer_stop(&rk->rk_timers, 648 &rko->rko_u.admin_request.tmr, rd_true); 649 650 651 if (rko->rko_u.admin_request.eonce) { 652 /* Remove the stopped timer's eonce reference since its 653 * callback will not have fired if we stopped the timer. */ 654 if (timer_was_stopped) 655 rd_kafka_enq_once_del_source(rko->rko_u.admin_request. 656 eonce, "timeout timer"); 657 658 /* This is thread-safe to do even if there are outstanding 659 * timers or wait-controller references to the eonce 660 * since they only hold direct reference to the eonce, 661 * not the rko (the eonce holds a reference to the rko but 662 * it is cleared here). */ 663 rd_kafka_enq_once_destroy(rko->rko_u.admin_request.eonce); 664 rko->rko_u.admin_request.eonce = NULL; 665 } 666 667 if (do_destroy) 668 rd_kafka_op_destroy(rko); 669 } 670 671 672 673 /** 674 * @brief Asynchronously look up a broker. 675 * To be called repeatedly from each invocation of the worker 676 * when in state RD_KAFKA_ADMIN_STATE_WAIT_BROKER until 677 * a valid rkb is returned. 678 * 679 * @returns the broker rkb with refcount increased, or NULL if not yet 680 * available. 681 */ 682 static rd_kafka_broker_t * 683 rd_kafka_admin_common_get_broker (rd_kafka_t *rk, 684 rd_kafka_op_t *rko, 685 int32_t broker_id) { 686 rd_kafka_broker_t *rkb; 687 688 rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up broker %"PRId32, 689 rd_kafka_op2str(rko->rko_type), broker_id); 690 691 /* Since we're iterating over this broker_async() call 692 * (asynchronously) until a broker is availabe (or timeout) 693 * we need to re-enable the eonce to be triggered again (which 694 * is not necessary the first time we get here, but there 695 * is no harm doing it then either). */ 696 rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce, 697 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0)); 698 699 /* Look up the broker asynchronously, if the broker 700 * is not available the eonce is registered for broker 701 * state changes which will cause our function to be called 702 * again as soon as (any) broker state changes. 703 * When we are called again we perform the broker lookup 704 * again and hopefully get an rkb back, otherwise defer a new 705 * async wait. Repeat until success or timeout. */ 706 if (!(rkb = rd_kafka_broker_get_async( 707 rk, broker_id, RD_KAFKA_BROKER_STATE_UP, 708 rko->rko_u.admin_request.eonce))) { 709 /* Broker not available, wait asynchronously 710 * for broker metadata code to trigger eonce. */ 711 return NULL; 712 } 713 714 rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: broker %"PRId32" is %s", 715 rd_kafka_op2str(rko->rko_type), broker_id, rkb->rkb_name); 716 717 return rkb; 718 } 719 720 721 /** 722 * @brief Asynchronously look up the controller. 723 * To be called repeatedly from each invocation of the worker 724 * when in state RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER until 725 * a valid rkb is returned. 726 * 727 * @returns the controller rkb with refcount increased, or NULL if not yet 728 * available. 729 */ 730 static rd_kafka_broker_t * 731 rd_kafka_admin_common_get_controller (rd_kafka_t *rk, 732 rd_kafka_op_t *rko) { 733 rd_kafka_broker_t *rkb; 734 735 rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: looking up controller", 736 rd_kafka_op2str(rko->rko_type)); 737 738 /* Since we're iterating over this controller_async() call 739 * (asynchronously) until a controller is availabe (or timeout) 740 * we need to re-enable the eonce to be triggered again (which 741 * is not necessary the first time we get here, but there 742 * is no harm doing it then either). */ 743 rd_kafka_enq_once_reenable(rko->rko_u.admin_request.eonce, 744 rko, RD_KAFKA_REPLYQ(rk->rk_ops, 0)); 745 746 /* Look up the controller asynchronously, if the controller 747 * is not available the eonce is registered for broker 748 * state changes which will cause our function to be called 749 * again as soon as (any) broker state changes. 750 * When we are called again we perform the controller lookup 751 * again and hopefully get an rkb back, otherwise defer a new 752 * async wait. Repeat until success or timeout. */ 753 if (!(rkb = rd_kafka_broker_controller_async( 754 rk, RD_KAFKA_BROKER_STATE_UP, 755 rko->rko_u.admin_request.eonce))) { 756 /* Controller not available, wait asynchronously 757 * for controller code to trigger eonce. */ 758 return NULL; 759 } 760 761 rd_kafka_dbg(rk, ADMIN, "ADMIN", "%s: controller %s", 762 rd_kafka_op2str(rko->rko_type), rkb->rkb_name); 763 764 return rkb; 765 } 766 767 768 769 /** 770 * @brief Handle response from broker by triggering worker callback. 771 * 772 * @param opaque is the eonce from the worker protocol request call. 773 */ 774 static void rd_kafka_admin_handle_response (rd_kafka_t *rk, 775 rd_kafka_broker_t *rkb, 776 rd_kafka_resp_err_t err, 777 rd_kafka_buf_t *reply, 778 rd_kafka_buf_t *request, 779 void *opaque) { 780 rd_kafka_enq_once_t *eonce = opaque; 781 rd_kafka_op_t *rko; 782 783 /* From ...add_source("send") */ 784 rko = rd_kafka_enq_once_disable(eonce); 785 786 if (!rko) { 787 /* The operation timed out and the worker was 788 * dismantled while we were waiting for broker response, 789 * do nothing - everything has been cleaned up. */ 790 rd_kafka_dbg(rk, ADMIN, "ADMIN", 791 "Dropping outdated %sResponse with return code %s", 792 request ? 793 rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey): 794 "???", 795 rd_kafka_err2str(err)); 796 return; 797 } 798 799 /* Attach reply buffer to rko for parsing in the worker. */ 800 rd_assert(!rko->rko_u.admin_request.reply_buf); 801 rko->rko_u.admin_request.reply_buf = reply; 802 rko->rko_err = err; 803 804 if (rko->rko_op_cb(rk, NULL, rko) == RD_KAFKA_OP_RES_HANDLED) 805 rd_kafka_op_destroy(rko); 806 807 } 808 809 /** 810 * @brief Generic handler for protocol responses, calls the admin ops' 811 * Response_parse_cb and enqueues the result to the caller's queue. 812 */ 813 static void rd_kafka_admin_response_parse (rd_kafka_op_t *rko) { 814 rd_kafka_resp_err_t err; 815 rd_kafka_op_t *rko_result = NULL; 816 char errstr[512]; 817 818 if (rko->rko_err) { 819 rd_kafka_admin_result_fail( 820 rko, rko->rko_err, 821 "%s worker request failed: %s", 822 rd_kafka_op2str(rko->rko_type), 823 rd_kafka_err2str(rko->rko_err)); 824 return; 825 } 826 827 /* Response received. 828 * Let callback parse response and provide result in rko_result 829 * which is then enqueued on the reply queue. */ 830 err = rko->rko_u.admin_request.cbs->parse( 831 rko, &rko_result, 832 rko->rko_u.admin_request.reply_buf, 833 errstr, sizeof(errstr)); 834 if (err) { 835 rd_kafka_admin_result_fail( 836 rko, err, 837 "%s worker failed to parse response: %s", 838 rd_kafka_op2str(rko->rko_type), errstr); 839 return; 840 } 841 842 rd_assert(rko_result); 843 844 /* Enqueue result on application queue, we're done. */ 845 rd_kafka_admin_result_enq(rko, rko_result); 846 } 847 848 /** 849 * @brief Generic handler for coord_req() responses. 850 */ 851 static void 852 rd_kafka_admin_coord_response_parse (rd_kafka_t *rk, 853 rd_kafka_broker_t *rkb, 854 rd_kafka_resp_err_t err, 855 rd_kafka_buf_t *rkbuf, 856 rd_kafka_buf_t *request, 857 void *opaque) { 858 rd_kafka_op_t *rko_result; 859 rd_kafka_enq_once_t *eonce = opaque; 860 rd_kafka_op_t *rko; 861 char errstr[512]; 862 863 rko = rd_kafka_enq_once_del_source_return(eonce, 864 "coordinator response"); 865 if (!rko) 866 /* Admin request has timed out and been destroyed */ 867 return; 868 869 if (err) { 870 rd_kafka_admin_result_fail( 871 rko, err, 872 "%s worker coordinator request failed: %s", 873 rd_kafka_op2str(rko->rko_type), 874 rd_kafka_err2str(err)); 875 rd_kafka_admin_common_worker_destroy(rk, rko, 876 rd_true/*destroy*/); 877 return; 878 } 879 880 err = rko->rko_u.admin_request.cbs->parse( 881 rko, &rko_result, rkbuf, 882 errstr, sizeof(errstr)); 883 if (err) { 884 rd_kafka_admin_result_fail( 885 rko, err, 886 "%s worker failed to parse coordinator %sResponse: %s", 887 rd_kafka_op2str(rko->rko_type), 888 rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), 889 errstr); 890 rd_kafka_admin_common_worker_destroy(rk, rko, 891 rd_true/*destroy*/); 892 return; 893 } 894 895 rd_assert(rko_result); 896 897 /* Enqueue result on application queue, we're done. */ 898 rd_kafka_admin_result_enq(rko, rko_result); 899 } 900 901 902 903 /** 904 * @brief Common worker state machine handling regardless of request type. 905 * 906 * Tasks: 907 * - Sets up timeout on first call. 908 * - Checks for timeout. 909 * - Checks for and fails on errors. 910 * - Async Controller and broker lookups 911 * - Calls the Request callback 912 * - Calls the parse callback 913 * - Result reply 914 * - Destruction of rko 915 * 916 * rko->rko_err may be one of: 917 * RD_KAFKA_RESP_ERR_NO_ERROR, or 918 * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup, or 919 * RD_KAFKA_RESP_ERR__TIMED_OUT if request has timed out, 920 * or any other error code triggered by other parts of the code. 921 * 922 * @returns a hint to the op code whether the rko should be destroyed or not. 923 */ 924 static rd_kafka_op_res_t 925 rd_kafka_admin_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { 926 const char *name = rd_kafka_op2str(rko->rko_type); 927 rd_ts_t timeout_in; 928 rd_kafka_broker_t *rkb = NULL; 929 rd_kafka_resp_err_t err; 930 char errstr[512]; 931 932 /* ADMIN_FANOUT handled by fanout_worker() */ 933 rd_assert((rko->rko_type & ~ RD_KAFKA_OP_FLAGMASK) != 934 RD_KAFKA_OP_ADMIN_FANOUT); 935 936 if (rd_kafka_terminating(rk)) { 937 rd_kafka_dbg(rk, ADMIN, name, 938 "%s worker called in state %s: " 939 "handle is terminating: %s", 940 name, 941 rd_kafka_admin_state_desc[rko->rko_u. 942 admin_request.state], 943 rd_kafka_err2str(rko->rko_err)); 944 rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY, 945 "Handle is terminating: %s", 946 rd_kafka_err2str(rko->rko_err)); 947 goto destroy; 948 } 949 950 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) { 951 rd_kafka_admin_result_fail(rko, RD_KAFKA_RESP_ERR__DESTROY, 952 "Destroyed"); 953 goto destroy; /* rko being destroyed (silent) */ 954 } 955 956 rd_kafka_dbg(rk, ADMIN, name, 957 "%s worker called in state %s: %s", 958 name, 959 rd_kafka_admin_state_desc[rko->rko_u.admin_request.state], 960 rd_kafka_err2str(rko->rko_err)); 961 962 rd_assert(thrd_is_current(rko->rko_rk->rk_thread)); 963 964 /* Check for errors raised asynchronously (e.g., by timer) */ 965 if (rko->rko_err) { 966 rd_kafka_admin_result_fail( 967 rko, rko->rko_err, 968 "Failed while %s: %s", 969 rd_kafka_admin_state_desc[rko->rko_u. 970 admin_request.state], 971 rd_kafka_err2str(rko->rko_err)); 972 goto destroy; 973 } 974 975 /* Check for timeout */ 976 timeout_in = rd_kafka_admin_timeout_remains_us(rko); 977 if (timeout_in <= 0) { 978 rd_kafka_admin_result_fail( 979 rko, RD_KAFKA_RESP_ERR__TIMED_OUT, 980 "Timed out %s", 981 rd_kafka_admin_state_desc[rko->rko_u. 982 admin_request.state]); 983 goto destroy; 984 } 985 986 redo: 987 switch (rko->rko_u.admin_request.state) 988 { 989 case RD_KAFKA_ADMIN_STATE_INIT: 990 { 991 int32_t broker_id; 992 993 /* First call. */ 994 995 /* Set up timeout timer. */ 996 rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce, 997 "timeout timer"); 998 rd_kafka_timer_start_oneshot(&rk->rk_timers, 999 &rko->rko_u.admin_request.tmr, 1000 rd_true, timeout_in, 1001 rd_kafka_admin_eonce_timeout_cb, 1002 rko->rko_u.admin_request.eonce); 1003 1004 /* Use explicitly specified broker_id, if available. */ 1005 broker_id = (int32_t)rd_kafka_confval_get_int( 1006 &rko->rko_u.admin_request.options.broker); 1007 1008 if (broker_id != -1) { 1009 rd_kafka_dbg(rk, ADMIN, name, 1010 "%s using explicitly " 1011 "set broker id %"PRId32 1012 " rather than %"PRId32, 1013 name, broker_id, 1014 rko->rko_u.admin_request.broker_id); 1015 rko->rko_u.admin_request.broker_id = broker_id; 1016 } else { 1017 /* Default to controller */ 1018 broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; 1019 } 1020 1021 /* Resolve target broker(s) */ 1022 switch (rko->rko_u.admin_request.broker_id) 1023 { 1024 case RD_KAFKA_ADMIN_TARGET_CONTROLLER: 1025 /* Controller */ 1026 rko->rko_u.admin_request.state = 1027 RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER; 1028 goto redo; /* Trigger next state immediately */ 1029 1030 case RD_KAFKA_ADMIN_TARGET_COORDINATOR: 1031 /* Group (or other) coordinator */ 1032 rko->rko_u.admin_request.state = 1033 RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE; 1034 rd_kafka_enq_once_add_source(rko->rko_u.admin_request. 1035 eonce, 1036 "coordinator request"); 1037 rd_kafka_coord_req(rk, 1038 rko->rko_u.admin_request.coordtype, 1039 rko->rko_u.admin_request.coordkey, 1040 rd_kafka_admin_coord_request, 1041 NULL, 1042 rd_kafka_admin_timeout_remains(rko), 1043 RD_KAFKA_REPLYQ(rk->rk_ops, 0), 1044 rd_kafka_admin_coord_response_parse, 1045 rko->rko_u.admin_request.eonce); 1046 /* Wait asynchronously for broker response, which will 1047 * trigger the eonce and worker to be called again. */ 1048 return RD_KAFKA_OP_RES_KEEP; 1049 1050 case RD_KAFKA_ADMIN_TARGET_FANOUT: 1051 /* Shouldn't come here, fanouts are handled by 1052 * fanout_worker() */ 1053 RD_NOTREACHED(); 1054 return RD_KAFKA_OP_RES_KEEP; 1055 1056 default: 1057 /* Specific broker */ 1058 rd_assert(rko->rko_u.admin_request.broker_id >= 0); 1059 rko->rko_u.admin_request.state = 1060 RD_KAFKA_ADMIN_STATE_WAIT_BROKER; 1061 goto redo; /* Trigger next state immediately */ 1062 } 1063 } 1064 1065 1066 case RD_KAFKA_ADMIN_STATE_WAIT_BROKER: 1067 /* Broker lookup */ 1068 if (!(rkb = rd_kafka_admin_common_get_broker( 1069 rk, rko, rko->rko_u.admin_request.broker_id))) { 1070 /* Still waiting for broker to become available */ 1071 return RD_KAFKA_OP_RES_KEEP; 1072 } 1073 1074 rko->rko_u.admin_request.state = 1075 RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST; 1076 goto redo; 1077 1078 case RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER: 1079 if (!(rkb = rd_kafka_admin_common_get_controller(rk, rko))) { 1080 /* Still waiting for controller to become available. */ 1081 return RD_KAFKA_OP_RES_KEEP; 1082 } 1083 1084 rko->rko_u.admin_request.state = 1085 RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST; 1086 goto redo; 1087 1088 case RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS: 1089 /* This state is only used by ADMIN_FANOUT which has 1090 * its own fanout_worker() */ 1091 RD_NOTREACHED(); 1092 break; 1093 1094 case RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST: 1095 /* Got broker, send protocol request. */ 1096 1097 /* Make sure we're called from a 'goto redo' where 1098 * the rkb was set. */ 1099 rd_assert(rkb); 1100 1101 /* Still need to use the eonce since this worker may 1102 * time out while waiting for response from broker, in which 1103 * case the broker response will hit an empty eonce (ok). */ 1104 rd_kafka_enq_once_add_source(rko->rko_u.admin_request.eonce, 1105 "send"); 1106 1107 /* Send request (async) */ 1108 err = rko->rko_u.admin_request.cbs->request( 1109 rkb, 1110 &rko->rko_u.admin_request.args, 1111 &rko->rko_u.admin_request.options, 1112 errstr, sizeof(errstr), 1113 RD_KAFKA_REPLYQ(rk->rk_ops, 0), 1114 rd_kafka_admin_handle_response, 1115 rko->rko_u.admin_request.eonce); 1116 1117 /* Loose broker refcount from get_broker(), get_controller() */ 1118 rd_kafka_broker_destroy(rkb); 1119 1120 if (err) { 1121 rd_kafka_enq_once_del_source( 1122 rko->rko_u.admin_request.eonce, "send"); 1123 rd_kafka_admin_result_fail(rko, err, "%s", errstr); 1124 goto destroy; 1125 } 1126 1127 rko->rko_u.admin_request.state = 1128 RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE; 1129 1130 /* Wait asynchronously for broker response, which will 1131 * trigger the eonce and worker to be called again. */ 1132 return RD_KAFKA_OP_RES_KEEP; 1133 1134 1135 case RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE: 1136 rd_kafka_admin_response_parse(rko); 1137 goto destroy; 1138 } 1139 1140 return RD_KAFKA_OP_RES_KEEP; 1141 1142 destroy: 1143 rd_kafka_admin_common_worker_destroy(rk, rko, 1144 rd_false/*don't destroy*/); 1145 return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy() */ 1146 1147 } 1148 1149 1150 /** 1151 * @brief Create a new admin_fanout op of type \p req_type and sets up the 1152 * generic (type independent files). 1153 * 1154 * The caller shall then populate the \c admin_fanout.requests list, 1155 * initialize the \c admin_fanout.responses list, 1156 * set the initial \c admin_fanout.outstanding value, 1157 * and enqueue the op on rk_ops for further processing work. 1158 * 1159 * @param cbs Callbacks, must reside in .data segment. 1160 * @param options Optional options, may be NULL to use defaults. 1161 * @param rkq is the application reply queue. 1162 * 1163 * @locks none 1164 * @locality application thread 1165 */ 1166 static rd_kafka_op_t * 1167 rd_kafka_admin_fanout_op_new (rd_kafka_t *rk, 1168 rd_kafka_op_type_t req_type, 1169 rd_kafka_event_type_t reply_event_type, 1170 const struct rd_kafka_admin_fanout_worker_cbs 1171 *cbs, 1172 const rd_kafka_AdminOptions_t *options, 1173 rd_kafka_q_t *rkq) { 1174 rd_kafka_op_t *rko; 1175 1176 rd_assert(rk); 1177 rd_assert(rkq); 1178 rd_assert(cbs); 1179 1180 rko = rd_kafka_op_new(RD_KAFKA_OP_ADMIN_FANOUT); 1181 rko->rko_rk = rk; 1182 1183 rko->rko_u.admin_request.reply_event_type = reply_event_type; 1184 1185 rko->rko_u.admin_request.fanout.cbs = 1186 (struct rd_kafka_admin_fanout_worker_cbs *)cbs; 1187 1188 /* Make a copy of the options */ 1189 if (options) 1190 rko->rko_u.admin_request.options = *options; 1191 else 1192 rd_kafka_AdminOptions_init(rk, 1193 &rko->rko_u.admin_request.options); 1194 1195 rko->rko_u.admin_request.broker_id = RD_KAFKA_ADMIN_TARGET_FANOUT; 1196 1197 /* Calculate absolute timeout */ 1198 rko->rko_u.admin_request.abs_timeout = 1199 rd_timeout_init( 1200 rd_kafka_confval_get_int(&rko->rko_u.admin_request. 1201 options.request_timeout)); 1202 1203 /* Set up replyq */ 1204 rd_kafka_set_replyq(&rko->rko_u.admin_request.replyq, rkq, 0); 1205 1206 rko->rko_u.admin_request.state = RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS; 1207 1208 rko->rko_u.admin_request.fanout.reqtype = req_type; 1209 1210 return rko; 1211 } 1212 1213 1214 /** 1215 * @brief Common fanout worker state machine handling regardless of request type 1216 * 1217 * @param rko Result of a fanned out operation, e.g., DELETERECORDS result. 1218 * 1219 * Tasks: 1220 * - Checks for and responds to client termination 1221 * - Polls for fanned out responses 1222 * - Calls the partial response callback 1223 * - Calls the merge responses callback upon receipt of all partial responses 1224 * - Destruction of rko 1225 * 1226 * rko->rko_err may be one of: 1227 * RD_KAFKA_RESP_ERR_NO_ERROR, or 1228 * RD_KAFKA_RESP_ERR__DESTROY for queue destruction cleanup. 1229 * 1230 * @returns a hint to the op code whether the rko should be destroyed or not. 1231 */ 1232 static rd_kafka_op_res_t 1233 rd_kafka_admin_fanout_worker (rd_kafka_t *rk, rd_kafka_q_t *rkq, 1234 rd_kafka_op_t *rko) { 1235 rd_kafka_op_t *rko_fanout = rko->rko_u.admin_result.fanout_parent; 1236 const char *name = rd_kafka_op2str(rko_fanout->rko_u.admin_request. 1237 fanout.reqtype); 1238 rd_kafka_op_t *rko_result; 1239 1240 RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_ADMIN_RESULT); 1241 RD_KAFKA_OP_TYPE_ASSERT(rko_fanout, RD_KAFKA_OP_ADMIN_FANOUT); 1242 1243 rd_assert(rko_fanout->rko_u.admin_request.fanout.outstanding > 0); 1244 rko_fanout->rko_u.admin_request.fanout.outstanding--; 1245 1246 rko->rko_u.admin_result.fanout_parent = NULL; 1247 1248 if (rd_kafka_terminating(rk)) { 1249 rd_kafka_dbg(rk, ADMIN, name, 1250 "%s fanout worker called for fanned out op %s: " 1251 "handle is terminating: %s", 1252 name, 1253 rd_kafka_op2str(rko->rko_type), 1254 rd_kafka_err2str(rko_fanout->rko_err)); 1255 if (!rko->rko_err) 1256 rko->rko_err = RD_KAFKA_RESP_ERR__DESTROY; 1257 } 1258 1259 rd_kafka_dbg(rk, ADMIN, name, 1260 "%s fanout worker called for %s with %d request(s) " 1261 "outstanding: %s", 1262 name, 1263 rd_kafka_op2str(rko->rko_type), 1264 rko_fanout->rko_u.admin_request.fanout.outstanding, 1265 rd_kafka_err2str(rko_fanout->rko_err)); 1266 1267 /* Add partial response to rko_fanout's result list. */ 1268 rko_fanout->rko_u.admin_request. 1269 fanout.cbs->partial_response(rko_fanout, rko); 1270 1271 if (rko_fanout->rko_u.admin_request.fanout.outstanding > 0) 1272 /* Wait for outstanding requests to finish */ 1273 return RD_KAFKA_OP_RES_HANDLED; 1274 1275 rko_result = rd_kafka_admin_result_new(rko_fanout); 1276 rd_list_init_copy(&rko_result->rko_u.admin_result.results, 1277 &rko_fanout->rko_u.admin_request.fanout.results); 1278 rd_list_copy_to(&rko_result->rko_u.admin_result.results, 1279 &rko_fanout->rko_u.admin_request.fanout.results, 1280 rko_fanout->rko_u.admin_request. 1281 fanout.cbs->copy_result, NULL); 1282 1283 /* Enqueue result on application queue, we're done. */ 1284 rd_kafka_replyq_enq(&rko_fanout->rko_u.admin_request.replyq, rko_result, 1285 rko_fanout->rko_u.admin_request.replyq.version); 1286 1287 /* FALLTHRU */ 1288 if (rko_fanout->rko_u.admin_request.fanout.outstanding == 0) 1289 rd_kafka_op_destroy(rko_fanout); 1290 1291 return RD_KAFKA_OP_RES_HANDLED; /* trigger's op_destroy(rko) */ 1292 } 1293 1294 /**@}*/ 1295 1296 1297 /** 1298 * @name Generic AdminOptions 1299 * @{ 1300 * 1301 * 1302 */ 1303 1304 rd_kafka_resp_err_t 1305 rd_kafka_AdminOptions_set_request_timeout (rd_kafka_AdminOptions_t *options, 1306 int timeout_ms, 1307 char *errstr, size_t errstr_size) { 1308 return rd_kafka_confval_set_type(&options->request_timeout, 1309 RD_KAFKA_CONFVAL_INT, &timeout_ms, 1310 errstr, errstr_size); 1311 } 1312 1313 1314 rd_kafka_resp_err_t 1315 rd_kafka_AdminOptions_set_operation_timeout (rd_kafka_AdminOptions_t *options, 1316 int timeout_ms, 1317 char *errstr, size_t errstr_size) { 1318 return rd_kafka_confval_set_type(&options->operation_timeout, 1319 RD_KAFKA_CONFVAL_INT, &timeout_ms, 1320 errstr, errstr_size); 1321 } 1322 1323 1324 rd_kafka_resp_err_t 1325 rd_kafka_AdminOptions_set_validate_only (rd_kafka_AdminOptions_t *options, 1326 int true_or_false, 1327 char *errstr, size_t errstr_size) { 1328 return rd_kafka_confval_set_type(&options->validate_only, 1329 RD_KAFKA_CONFVAL_INT, &true_or_false, 1330 errstr, errstr_size); 1331 } 1332 1333 rd_kafka_resp_err_t 1334 rd_kafka_AdminOptions_set_incremental (rd_kafka_AdminOptions_t *options, 1335 int true_or_false, 1336 char *errstr, size_t errstr_size) { 1337 rd_snprintf(errstr, errstr_size, 1338 "Incremental updates currently not supported, see KIP-248"); 1339 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; 1340 1341 return rd_kafka_confval_set_type(&options->incremental, 1342 RD_KAFKA_CONFVAL_INT, &true_or_false, 1343 errstr, errstr_size); 1344 } 1345 1346 rd_kafka_resp_err_t 1347 rd_kafka_AdminOptions_set_broker (rd_kafka_AdminOptions_t *options, 1348 int32_t broker_id, 1349 char *errstr, size_t errstr_size) { 1350 int ibroker_id = (int)broker_id; 1351 1352 return rd_kafka_confval_set_type(&options->broker, 1353 RD_KAFKA_CONFVAL_INT, 1354 &ibroker_id, 1355 errstr, errstr_size); 1356 } 1357 1358 void 1359 rd_kafka_AdminOptions_set_opaque (rd_kafka_AdminOptions_t *options, 1360 void *opaque) { 1361 rd_kafka_confval_set_type(&options->opaque, 1362 RD_KAFKA_CONFVAL_PTR, opaque, NULL, 0); 1363 } 1364 1365 1366 /** 1367 * @brief Initialize and set up defaults for AdminOptions 1368 */ 1369 static void rd_kafka_AdminOptions_init (rd_kafka_t *rk, 1370 rd_kafka_AdminOptions_t *options) { 1371 rd_kafka_confval_init_int(&options->request_timeout, "request_timeout", 1372 0, 3600*1000, 1373 rk->rk_conf.admin.request_timeout_ms); 1374 1375 if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || 1376 options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS || 1377 options->for_api == RD_KAFKA_ADMIN_OP_DELETETOPICS || 1378 options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS || 1379 options->for_api == RD_KAFKA_ADMIN_OP_DELETERECORDS) 1380 rd_kafka_confval_init_int(&options->operation_timeout, 1381 "operation_timeout", 1382 -1, 3600*1000, 1383 rk->rk_conf.admin.request_timeout_ms); 1384 else 1385 rd_kafka_confval_disable(&options->operation_timeout, 1386 "operation_timeout"); 1387 1388 if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || 1389 options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS || 1390 options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS || 1391 options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS) 1392 rd_kafka_confval_init_int(&options->validate_only, 1393 "validate_only", 1394 0, 1, 0); 1395 else 1396 rd_kafka_confval_disable(&options->validate_only, 1397 "validate_only"); 1398 1399 if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || 1400 options->for_api == RD_KAFKA_ADMIN_OP_ALTERCONFIGS) 1401 rd_kafka_confval_init_int(&options->incremental, 1402 "incremental", 1403 0, 1, 0); 1404 else 1405 rd_kafka_confval_disable(&options->incremental, 1406 "incremental"); 1407 1408 rd_kafka_confval_init_int(&options->broker, "broker", 1409 0, INT32_MAX, -1); 1410 rd_kafka_confval_init_ptr(&options->opaque, "opaque"); 1411 } 1412 1413 1414 rd_kafka_AdminOptions_t * 1415 rd_kafka_AdminOptions_new (rd_kafka_t *rk, rd_kafka_admin_op_t for_api) { 1416 rd_kafka_AdminOptions_t *options; 1417 1418 if ((int)for_api < 0 || for_api >= RD_KAFKA_ADMIN_OP__CNT) 1419 return NULL; 1420 1421 options = rd_calloc(1, sizeof(*options)); 1422 1423 options->for_api = for_api; 1424 1425 rd_kafka_AdminOptions_init(rk, options); 1426 1427 return options; 1428 } 1429 1430 void rd_kafka_AdminOptions_destroy (rd_kafka_AdminOptions_t *options) { 1431 rd_free(options); 1432 } 1433 1434 /**@}*/ 1435 1436 1437 1438 1439 1440 1441 /** 1442 * @name CreateTopics 1443 * @{ 1444 * 1445 * 1446 * 1447 */ 1448 1449 1450 1451 rd_kafka_NewTopic_t * 1452 rd_kafka_NewTopic_new (const char *topic, 1453 int num_partitions, 1454 int replication_factor, 1455 char *errstr, size_t errstr_size) { 1456 rd_kafka_NewTopic_t *new_topic; 1457 1458 if (!topic) { 1459 rd_snprintf(errstr, errstr_size, "Invalid topic name"); 1460 return NULL; 1461 } 1462 1463 if (num_partitions < -1 || num_partitions > RD_KAFKAP_PARTITIONS_MAX) { 1464 rd_snprintf(errstr, errstr_size, "num_partitions out of " 1465 "expected range %d..%d or -1 for broker default", 1466 1, RD_KAFKAP_PARTITIONS_MAX); 1467 return NULL; 1468 } 1469 1470 if (replication_factor < -1 || 1471 replication_factor > RD_KAFKAP_BROKERS_MAX) { 1472 rd_snprintf(errstr, errstr_size, 1473 "replication_factor out of expected range %d..%d", 1474 -1, RD_KAFKAP_BROKERS_MAX); 1475 return NULL; 1476 } 1477 1478 new_topic = rd_calloc(1, sizeof(*new_topic)); 1479 new_topic->topic = rd_strdup(topic); 1480 new_topic->num_partitions = num_partitions; 1481 new_topic->replication_factor = replication_factor; 1482 1483 /* List of int32 lists */ 1484 rd_list_init(&new_topic->replicas, 0, rd_list_destroy_free); 1485 rd_list_prealloc_elems(&new_topic->replicas, 0, 1486 num_partitions == -1 ? 0 : num_partitions, 1487 0/*nozero*/); 1488 1489 /* List of ConfigEntrys */ 1490 rd_list_init(&new_topic->config, 0, rd_kafka_ConfigEntry_free); 1491 1492 return new_topic; 1493 1494 } 1495 1496 1497 /** 1498 * @brief Topic name comparator for NewTopic_t 1499 */ 1500 static int rd_kafka_NewTopic_cmp (const void *_a, const void *_b) { 1501 const rd_kafka_NewTopic_t *a = _a, *b = _b; 1502 return strcmp(a->topic, b->topic); 1503 } 1504 1505 1506 1507 /** 1508 * @brief Allocate a new NewTopic and make a copy of \p src 1509 */ 1510 static rd_kafka_NewTopic_t * 1511 rd_kafka_NewTopic_copy (const rd_kafka_NewTopic_t *src) { 1512 rd_kafka_NewTopic_t *dst; 1513 1514 dst = rd_kafka_NewTopic_new(src->topic, src->num_partitions, 1515 src->replication_factor, NULL, 0); 1516 rd_assert(dst); 1517 1518 rd_list_destroy(&dst->replicas); /* created in .._new() */ 1519 rd_list_init_copy(&dst->replicas, &src->replicas); 1520 rd_list_copy_to(&dst->replicas, &src->replicas, 1521 rd_list_copy_preallocated, NULL); 1522 1523 rd_list_init_copy(&dst->config, &src->config); 1524 rd_list_copy_to(&dst->config, &src->config, 1525 rd_kafka_ConfigEntry_list_copy, NULL); 1526 1527 return dst; 1528 } 1529 1530 void rd_kafka_NewTopic_destroy (rd_kafka_NewTopic_t *new_topic) { 1531 rd_list_destroy(&new_topic->replicas); 1532 rd_list_destroy(&new_topic->config); 1533 rd_free(new_topic->topic); 1534 rd_free(new_topic); 1535 } 1536 1537 static void rd_kafka_NewTopic_free (void *ptr) { 1538 rd_kafka_NewTopic_destroy(ptr); 1539 } 1540 1541 void 1542 rd_kafka_NewTopic_destroy_array (rd_kafka_NewTopic_t **new_topics, 1543 size_t new_topic_cnt) { 1544 size_t i; 1545 for (i = 0 ; i < new_topic_cnt ; i++) 1546 rd_kafka_NewTopic_destroy(new_topics[i]); 1547 } 1548 1549 1550 rd_kafka_resp_err_t 1551 rd_kafka_NewTopic_set_replica_assignment (rd_kafka_NewTopic_t *new_topic, 1552 int32_t partition, 1553 int32_t *broker_ids, 1554 size_t broker_id_cnt, 1555 char *errstr, size_t errstr_size) { 1556 rd_list_t *rl; 1557 int i; 1558 1559 if (new_topic->replication_factor != -1) { 1560 rd_snprintf(errstr, errstr_size, 1561 "Specifying a replication factor and " 1562 "a replica assignment are mutually exclusive"); 1563 return RD_KAFKA_RESP_ERR__INVALID_ARG; 1564 } else if (new_topic->num_partitions == -1) { 1565 rd_snprintf(errstr, errstr_size, 1566 "Specifying a default partition count and a " 1567 "replica assignment are mutually exclusive"); 1568 return RD_KAFKA_RESP_ERR__INVALID_ARG; 1569 } 1570 1571 /* Replica partitions must be added consecutively starting from 0. */ 1572 if (partition != rd_list_cnt(&new_topic->replicas)) { 1573 rd_snprintf(errstr, errstr_size, 1574 "Partitions must be added in order, " 1575 "starting at 0: expecting partition %d, " 1576 "not %"PRId32, 1577 rd_list_cnt(&new_topic->replicas), partition); 1578 return RD_KAFKA_RESP_ERR__INVALID_ARG; 1579 } 1580 1581 if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) { 1582 rd_snprintf(errstr, errstr_size, 1583 "Too many brokers specified " 1584 "(RD_KAFKAP_BROKERS_MAX=%d)", 1585 RD_KAFKAP_BROKERS_MAX); 1586 return RD_KAFKA_RESP_ERR__INVALID_ARG; 1587 } 1588 1589 1590 rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt); 1591 1592 for (i = 0 ; i < (int)broker_id_cnt ; i++) 1593 rd_list_set_int32(rl, i, broker_ids[i]); 1594 1595 rd_list_add(&new_topic->replicas, rl); 1596 1597 return RD_KAFKA_RESP_ERR_NO_ERROR; 1598 } 1599 1600 1601 /** 1602 * @brief Generic constructor of ConfigEntry which is also added to \p rl 1603 */ 1604 static rd_kafka_resp_err_t 1605 rd_kafka_admin_add_config0 (rd_list_t *rl, 1606 const char *name, const char *value, 1607 rd_kafka_AlterOperation_t operation) { 1608 rd_kafka_ConfigEntry_t *entry; 1609 1610 if (!name) 1611 return RD_KAFKA_RESP_ERR__INVALID_ARG; 1612 1613 entry = rd_calloc(1, sizeof(*entry)); 1614 entry->kv = rd_strtup_new(name, value); 1615 entry->a.operation = operation; 1616 1617 rd_list_add(rl, entry); 1618 1619 return RD_KAFKA_RESP_ERR_NO_ERROR; 1620 } 1621 1622 1623 rd_kafka_resp_err_t 1624 rd_kafka_NewTopic_set_config (rd_kafka_NewTopic_t *new_topic, 1625 const char *name, const char *value) { 1626 return rd_kafka_admin_add_config0(&new_topic->config, name, value, 1627 RD_KAFKA_ALTER_OP_ADD); 1628 } 1629 1630 1631 1632 /** 1633 * @brief Parse CreateTopicsResponse and create ADMIN_RESULT op. 1634 */ 1635 static rd_kafka_resp_err_t 1636 rd_kafka_CreateTopicsResponse_parse (rd_kafka_op_t *rko_req, 1637 rd_kafka_op_t **rko_resultp, 1638 rd_kafka_buf_t *reply, 1639 char *errstr, size_t errstr_size) { 1640 const int log_decode_errors = LOG_ERR; 1641 rd_kafka_broker_t *rkb = reply->rkbuf_rkb; 1642 rd_kafka_t *rk = rkb->rkb_rk; 1643 rd_kafka_op_t *rko_result = NULL; 1644 int32_t topic_cnt; 1645 int i; 1646 1647 if (rd_kafka_buf_ApiVersion(reply) >= 2) { 1648 int32_t Throttle_Time; 1649 rd_kafka_buf_read_i32(reply, &Throttle_Time); 1650 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); 1651 } 1652 1653 /* #topics */ 1654 rd_kafka_buf_read_i32(reply, &topic_cnt); 1655 1656 if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) 1657 rd_kafka_buf_parse_fail( 1658 reply, 1659 "Received %"PRId32" topics in response " 1660 "when only %d were requested", topic_cnt, 1661 rd_list_cnt(&rko_req->rko_u.admin_request.args)); 1662 1663 1664 rko_result = rd_kafka_admin_result_new(rko_req); 1665 1666 rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt, 1667 rd_kafka_topic_result_free); 1668 1669 for (i = 0 ; i < (int)topic_cnt ; i++) { 1670 rd_kafkap_str_t ktopic; 1671 int16_t error_code; 1672 rd_kafkap_str_t error_msg = RD_KAFKAP_STR_INITIALIZER; 1673 char *this_errstr = NULL; 1674 rd_kafka_topic_result_t *terr; 1675 rd_kafka_NewTopic_t skel; 1676 int orig_pos; 1677 1678 rd_kafka_buf_read_str(reply, &ktopic); 1679 rd_kafka_buf_read_i16(reply, &error_code); 1680 1681 if (rd_kafka_buf_ApiVersion(reply) >= 1) 1682 rd_kafka_buf_read_str(reply, &error_msg); 1683 1684 /* For non-blocking CreateTopicsRequests the broker 1685 * will returned REQUEST_TIMED_OUT for topics 1686 * that were triggered for creation - 1687 * we hide this error code from the application 1688 * since the topic creation is in fact in progress. */ 1689 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && 1690 rd_kafka_confval_get_int(&rko_req->rko_u. 1691 admin_request.options. 1692 operation_timeout) <= 0) { 1693 error_code = RD_KAFKA_RESP_ERR_NO_ERROR; 1694 this_errstr = NULL; 1695 } 1696 1697 if (error_code) { 1698 if (RD_KAFKAP_STR_IS_NULL(&error_msg) || 1699 RD_KAFKAP_STR_LEN(&error_msg) == 0) 1700 this_errstr = 1701 (char *)rd_kafka_err2str(error_code); 1702 else 1703 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); 1704 1705 } 1706 1707 terr = rd_kafka_topic_result_new(ktopic.str, 1708 RD_KAFKAP_STR_LEN(&ktopic), 1709 error_code, this_errstr); 1710 1711 /* As a convenience to the application we insert topic result 1712 * in the same order as they were requested. The broker 1713 * does not maintain ordering unfortunately. */ 1714 skel.topic = terr->topic; 1715 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, 1716 &skel, rd_kafka_NewTopic_cmp); 1717 if (orig_pos == -1) { 1718 rd_kafka_topic_result_destroy(terr); 1719 rd_kafka_buf_parse_fail( 1720 reply, 1721 "Broker returned topic %.*s that was not " 1722 "included in the original request", 1723 RD_KAFKAP_STR_PR(&ktopic)); 1724 } 1725 1726 if (rd_list_elem(&rko_result->rko_u.admin_result.results, 1727 orig_pos) != NULL) { 1728 rd_kafka_topic_result_destroy(terr); 1729 rd_kafka_buf_parse_fail( 1730 reply, 1731 "Broker returned topic %.*s multiple times", 1732 RD_KAFKAP_STR_PR(&ktopic)); 1733 } 1734 1735 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, 1736 terr); 1737 } 1738 1739 *rko_resultp = rko_result; 1740 1741 return RD_KAFKA_RESP_ERR_NO_ERROR; 1742 1743 err_parse: 1744 if (rko_result) 1745 rd_kafka_op_destroy(rko_result); 1746 1747 rd_snprintf(errstr, errstr_size, 1748 "CreateTopics response protocol parse failure: %s", 1749 rd_kafka_err2str(reply->rkbuf_err)); 1750 1751 return reply->rkbuf_err; 1752 } 1753 1754 1755 void rd_kafka_CreateTopics (rd_kafka_t *rk, 1756 rd_kafka_NewTopic_t **new_topics, 1757 size_t new_topic_cnt, 1758 const rd_kafka_AdminOptions_t *options, 1759 rd_kafka_queue_t *rkqu) { 1760 rd_kafka_op_t *rko; 1761 size_t i; 1762 static const struct rd_kafka_admin_worker_cbs cbs = { 1763 rd_kafka_CreateTopicsRequest, 1764 rd_kafka_CreateTopicsResponse_parse, 1765 }; 1766 1767 rd_assert(rkqu); 1768 1769 rko = rd_kafka_admin_request_op_new(rk, 1770 RD_KAFKA_OP_CREATETOPICS, 1771 RD_KAFKA_EVENT_CREATETOPICS_RESULT, 1772 &cbs, options, rkqu->rkqu_q); 1773 1774 rd_list_init(&rko->rko_u.admin_request.args, (int)new_topic_cnt, 1775 rd_kafka_NewTopic_free); 1776 1777 for (i = 0 ; i < new_topic_cnt ; i++) 1778 rd_list_add(&rko->rko_u.admin_request.args, 1779 rd_kafka_NewTopic_copy(new_topics[i])); 1780 1781 rd_kafka_q_enq(rk->rk_ops, rko); 1782 } 1783 1784 1785 /** 1786 * @brief Get an array of topic results from a CreateTopics result. 1787 * 1788 * The returned \p topics life-time is the same as the \p result object. 1789 * @param cntp is updated to the number of elements in the array. 1790 */ 1791 const rd_kafka_topic_result_t ** 1792 rd_kafka_CreateTopics_result_topics ( 1793 const rd_kafka_CreateTopics_result_t *result, 1794 size_t *cntp) { 1795 return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result, 1796 cntp); 1797 } 1798 1799 /**@}*/ 1800 1801 1802 1803 1804 /** 1805 * @name Delete topics 1806 * @{ 1807 * 1808 * 1809 * 1810 * 1811 */ 1812 1813 rd_kafka_DeleteTopic_t *rd_kafka_DeleteTopic_new (const char *topic) { 1814 size_t tsize = strlen(topic) + 1; 1815 rd_kafka_DeleteTopic_t *del_topic; 1816 1817 /* Single allocation */ 1818 del_topic = rd_malloc(sizeof(*del_topic) + tsize); 1819 del_topic->topic = del_topic->data; 1820 memcpy(del_topic->topic, topic, tsize); 1821 1822 return del_topic; 1823 } 1824 1825 void rd_kafka_DeleteTopic_destroy (rd_kafka_DeleteTopic_t *del_topic) { 1826 rd_free(del_topic); 1827 } 1828 1829 static void rd_kafka_DeleteTopic_free (void *ptr) { 1830 rd_kafka_DeleteTopic_destroy(ptr); 1831 } 1832 1833 1834 void rd_kafka_DeleteTopic_destroy_array (rd_kafka_DeleteTopic_t **del_topics, 1835 size_t del_topic_cnt) { 1836 size_t i; 1837 for (i = 0 ; i < del_topic_cnt ; i++) 1838 rd_kafka_DeleteTopic_destroy(del_topics[i]); 1839 } 1840 1841 1842 /** 1843 * @brief Topic name comparator for DeleteTopic_t 1844 */ 1845 static int rd_kafka_DeleteTopic_cmp (const void *_a, const void *_b) { 1846 const rd_kafka_DeleteTopic_t *a = _a, *b = _b; 1847 return strcmp(a->topic, b->topic); 1848 } 1849 1850 /** 1851 * @brief Allocate a new DeleteTopic and make a copy of \p src 1852 */ 1853 static rd_kafka_DeleteTopic_t * 1854 rd_kafka_DeleteTopic_copy (const rd_kafka_DeleteTopic_t *src) { 1855 return rd_kafka_DeleteTopic_new(src->topic); 1856 } 1857 1858 1859 1860 1861 1862 1863 1864 /** 1865 * @brief Parse DeleteTopicsResponse and create ADMIN_RESULT op. 1866 */ 1867 static rd_kafka_resp_err_t 1868 rd_kafka_DeleteTopicsResponse_parse (rd_kafka_op_t *rko_req, 1869 rd_kafka_op_t **rko_resultp, 1870 rd_kafka_buf_t *reply, 1871 char *errstr, size_t errstr_size) { 1872 const int log_decode_errors = LOG_ERR; 1873 rd_kafka_broker_t *rkb = reply->rkbuf_rkb; 1874 rd_kafka_t *rk = rkb->rkb_rk; 1875 rd_kafka_op_t *rko_result = NULL; 1876 int32_t topic_cnt; 1877 int i; 1878 1879 if (rd_kafka_buf_ApiVersion(reply) >= 1) { 1880 int32_t Throttle_Time; 1881 rd_kafka_buf_read_i32(reply, &Throttle_Time); 1882 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); 1883 } 1884 1885 /* #topics */ 1886 rd_kafka_buf_read_i32(reply, &topic_cnt); 1887 1888 if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) 1889 rd_kafka_buf_parse_fail( 1890 reply, 1891 "Received %"PRId32" topics in response " 1892 "when only %d were requested", topic_cnt, 1893 rd_list_cnt(&rko_req->rko_u.admin_request.args)); 1894 1895 rko_result = rd_kafka_admin_result_new(rko_req); 1896 1897 rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt, 1898 rd_kafka_topic_result_free); 1899 1900 for (i = 0 ; i < (int)topic_cnt ; i++) { 1901 rd_kafkap_str_t ktopic; 1902 int16_t error_code; 1903 rd_kafka_topic_result_t *terr; 1904 rd_kafka_NewTopic_t skel; 1905 int orig_pos; 1906 1907 rd_kafka_buf_read_str(reply, &ktopic); 1908 rd_kafka_buf_read_i16(reply, &error_code); 1909 1910 /* For non-blocking DeleteTopicsRequests the broker 1911 * will returned REQUEST_TIMED_OUT for topics 1912 * that were triggered for creation - 1913 * we hide this error code from the application 1914 * since the topic creation is in fact in progress. */ 1915 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && 1916 rd_kafka_confval_get_int(&rko_req->rko_u. 1917 admin_request.options. 1918 operation_timeout) <= 0) { 1919 error_code = RD_KAFKA_RESP_ERR_NO_ERROR; 1920 } 1921 1922 terr = rd_kafka_topic_result_new(ktopic.str, 1923 RD_KAFKAP_STR_LEN(&ktopic), 1924 error_code, 1925 error_code ? 1926 rd_kafka_err2str(error_code) : 1927 NULL); 1928 1929 /* As a convenience to the application we insert topic result 1930 * in the same order as they were requested. The broker 1931 * does not maintain ordering unfortunately. */ 1932 skel.topic = terr->topic; 1933 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, 1934 &skel, rd_kafka_DeleteTopic_cmp); 1935 if (orig_pos == -1) { 1936 rd_kafka_topic_result_destroy(terr); 1937 rd_kafka_buf_parse_fail( 1938 reply, 1939 "Broker returned topic %.*s that was not " 1940 "included in the original request", 1941 RD_KAFKAP_STR_PR(&ktopic)); 1942 } 1943 1944 if (rd_list_elem(&rko_result->rko_u.admin_result.results, 1945 orig_pos) != NULL) { 1946 rd_kafka_topic_result_destroy(terr); 1947 rd_kafka_buf_parse_fail( 1948 reply, 1949 "Broker returned topic %.*s multiple times", 1950 RD_KAFKAP_STR_PR(&ktopic)); 1951 } 1952 1953 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, 1954 terr); 1955 } 1956 1957 *rko_resultp = rko_result; 1958 1959 return RD_KAFKA_RESP_ERR_NO_ERROR; 1960 1961 err_parse: 1962 if (rko_result) 1963 rd_kafka_op_destroy(rko_result); 1964 1965 rd_snprintf(errstr, errstr_size, 1966 "DeleteTopics response protocol parse failure: %s", 1967 rd_kafka_err2str(reply->rkbuf_err)); 1968 1969 return reply->rkbuf_err; 1970 } 1971 1972 1973 1974 1975 1976 1977 void rd_kafka_DeleteTopics (rd_kafka_t *rk, 1978 rd_kafka_DeleteTopic_t **del_topics, 1979 size_t del_topic_cnt, 1980 const rd_kafka_AdminOptions_t *options, 1981 rd_kafka_queue_t *rkqu) { 1982 rd_kafka_op_t *rko; 1983 size_t i; 1984 static const struct rd_kafka_admin_worker_cbs cbs = { 1985 rd_kafka_DeleteTopicsRequest, 1986 rd_kafka_DeleteTopicsResponse_parse, 1987 }; 1988 1989 rd_assert(rkqu); 1990 1991 rko = rd_kafka_admin_request_op_new(rk, 1992 RD_KAFKA_OP_DELETETOPICS, 1993 RD_KAFKA_EVENT_DELETETOPICS_RESULT, 1994 &cbs, options, rkqu->rkqu_q); 1995 1996 rd_list_init(&rko->rko_u.admin_request.args, (int)del_topic_cnt, 1997 rd_kafka_DeleteTopic_free); 1998 1999 for (i = 0 ; i < del_topic_cnt ; i++) 2000 rd_list_add(&rko->rko_u.admin_request.args, 2001 rd_kafka_DeleteTopic_copy(del_topics[i])); 2002 2003 rd_kafka_q_enq(rk->rk_ops, rko); 2004 } 2005 2006 2007 /** 2008 * @brief Get an array of topic results from a DeleteTopics result. 2009 * 2010 * The returned \p topics life-time is the same as the \p result object. 2011 * @param cntp is updated to the number of elements in the array. 2012 */ 2013 const rd_kafka_topic_result_t ** 2014 rd_kafka_DeleteTopics_result_topics ( 2015 const rd_kafka_DeleteTopics_result_t *result, 2016 size_t *cntp) { 2017 return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result, 2018 cntp); 2019 } 2020 2021 2022 2023 2024 /** 2025 * @name Create partitions 2026 * @{ 2027 * 2028 * 2029 * 2030 * 2031 */ 2032 2033 rd_kafka_NewPartitions_t *rd_kafka_NewPartitions_new (const char *topic, 2034 size_t new_total_cnt, 2035 char *errstr, 2036 size_t errstr_size) { 2037 size_t tsize = strlen(topic) + 1; 2038 rd_kafka_NewPartitions_t *newps; 2039 2040 if (new_total_cnt < 1 || new_total_cnt > RD_KAFKAP_PARTITIONS_MAX) { 2041 rd_snprintf(errstr, errstr_size, "new_total_cnt out of " 2042 "expected range %d..%d", 2043 1, RD_KAFKAP_PARTITIONS_MAX); 2044 return NULL; 2045 } 2046 2047 /* Single allocation */ 2048 newps = rd_malloc(sizeof(*newps) + tsize); 2049 newps->total_cnt = new_total_cnt; 2050 newps->topic = newps->data; 2051 memcpy(newps->topic, topic, tsize); 2052 2053 /* List of int32 lists */ 2054 rd_list_init(&newps->replicas, 0, rd_list_destroy_free); 2055 rd_list_prealloc_elems(&newps->replicas, 0, new_total_cnt, 0/*nozero*/); 2056 2057 return newps; 2058 } 2059 2060 /** 2061 * @brief Topic name comparator for NewPartitions_t 2062 */ 2063 static int rd_kafka_NewPartitions_cmp (const void *_a, const void *_b) { 2064 const rd_kafka_NewPartitions_t *a = _a, *b = _b; 2065 return strcmp(a->topic, b->topic); 2066 } 2067 2068 2069 /** 2070 * @brief Allocate a new CreatePartitions and make a copy of \p src 2071 */ 2072 static rd_kafka_NewPartitions_t * 2073 rd_kafka_NewPartitions_copy (const rd_kafka_NewPartitions_t *src) { 2074 rd_kafka_NewPartitions_t *dst; 2075 2076 dst = rd_kafka_NewPartitions_new(src->topic, src->total_cnt, NULL, 0); 2077 2078 rd_list_destroy(&dst->replicas); /* created in .._new() */ 2079 rd_list_init_copy(&dst->replicas, &src->replicas); 2080 rd_list_copy_to(&dst->replicas, &src->replicas, 2081 rd_list_copy_preallocated, NULL); 2082 2083 return dst; 2084 } 2085 2086 void rd_kafka_NewPartitions_destroy (rd_kafka_NewPartitions_t *newps) { 2087 rd_list_destroy(&newps->replicas); 2088 rd_free(newps); 2089 } 2090 2091 static void rd_kafka_NewPartitions_free (void *ptr) { 2092 rd_kafka_NewPartitions_destroy(ptr); 2093 } 2094 2095 2096 void rd_kafka_NewPartitions_destroy_array (rd_kafka_NewPartitions_t **newps, 2097 size_t newps_cnt) { 2098 size_t i; 2099 for (i = 0 ; i < newps_cnt ; i++) 2100 rd_kafka_NewPartitions_destroy(newps[i]); 2101 } 2102 2103 2104 2105 2106 2107 rd_kafka_resp_err_t 2108 rd_kafka_NewPartitions_set_replica_assignment (rd_kafka_NewPartitions_t *newp, 2109 int32_t new_partition_idx, 2110 int32_t *broker_ids, 2111 size_t broker_id_cnt, 2112 char *errstr, 2113 size_t errstr_size) { 2114 rd_list_t *rl; 2115 int i; 2116 2117 /* Replica partitions must be added consecutively starting from 0. */ 2118 if (new_partition_idx != rd_list_cnt(&newp->replicas)) { 2119 rd_snprintf(errstr, errstr_size, 2120 "Partitions must be added in order, " 2121 "starting at 0: expecting partition " 2122 "index %d, not %"PRId32, 2123 rd_list_cnt(&newp->replicas), new_partition_idx); 2124 return RD_KAFKA_RESP_ERR__INVALID_ARG; 2125 } 2126 2127 if (broker_id_cnt > RD_KAFKAP_BROKERS_MAX) { 2128 rd_snprintf(errstr, errstr_size, 2129 "Too many brokers specified " 2130 "(RD_KAFKAP_BROKERS_MAX=%d)", 2131 RD_KAFKAP_BROKERS_MAX); 2132 return RD_KAFKA_RESP_ERR__INVALID_ARG; 2133 } 2134 2135 rl = rd_list_init_int32(rd_list_new(0, NULL), (int)broker_id_cnt); 2136 2137 for (i = 0 ; i < (int)broker_id_cnt ; i++) 2138 rd_list_set_int32(rl, i, broker_ids[i]); 2139 2140 rd_list_add(&newp->replicas, rl); 2141 2142 return RD_KAFKA_RESP_ERR_NO_ERROR; 2143 } 2144 2145 2146 2147 2148 /** 2149 * @brief Parse CreatePartitionsResponse and create ADMIN_RESULT op. 2150 */ 2151 static rd_kafka_resp_err_t 2152 rd_kafka_CreatePartitionsResponse_parse (rd_kafka_op_t *rko_req, 2153 rd_kafka_op_t **rko_resultp, 2154 rd_kafka_buf_t *reply, 2155 char *errstr, 2156 size_t errstr_size) { 2157 const int log_decode_errors = LOG_ERR; 2158 rd_kafka_broker_t *rkb = reply->rkbuf_rkb; 2159 rd_kafka_t *rk = rkb->rkb_rk; 2160 rd_kafka_op_t *rko_result = NULL; 2161 int32_t topic_cnt; 2162 int i; 2163 int32_t Throttle_Time; 2164 2165 rd_kafka_buf_read_i32(reply, &Throttle_Time); 2166 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); 2167 2168 /* #topics */ 2169 rd_kafka_buf_read_i32(reply, &topic_cnt); 2170 2171 if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) 2172 rd_kafka_buf_parse_fail( 2173 reply, 2174 "Received %"PRId32" topics in response " 2175 "when only %d were requested", topic_cnt, 2176 rd_list_cnt(&rko_req->rko_u.admin_request.args)); 2177 2178 rko_result = rd_kafka_admin_result_new(rko_req); 2179 2180 rd_list_init(&rko_result->rko_u.admin_result.results, topic_cnt, 2181 rd_kafka_topic_result_free); 2182 2183 for (i = 0 ; i < (int)topic_cnt ; i++) { 2184 rd_kafkap_str_t ktopic; 2185 int16_t error_code; 2186 char *this_errstr = NULL; 2187 rd_kafka_topic_result_t *terr; 2188 rd_kafka_NewTopic_t skel; 2189 rd_kafkap_str_t error_msg; 2190 int orig_pos; 2191 2192 rd_kafka_buf_read_str(reply, &ktopic); 2193 rd_kafka_buf_read_i16(reply, &error_code); 2194 rd_kafka_buf_read_str(reply, &error_msg); 2195 2196 /* For non-blocking CreatePartitionsRequests the broker 2197 * will returned REQUEST_TIMED_OUT for topics 2198 * that were triggered for creation - 2199 * we hide this error code from the application 2200 * since the topic creation is in fact in progress. */ 2201 if (error_code == RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT && 2202 rd_kafka_confval_get_int(&rko_req->rko_u. 2203 admin_request.options. 2204 operation_timeout) <= 0) { 2205 error_code = RD_KAFKA_RESP_ERR_NO_ERROR; 2206 } 2207 2208 if (error_code) { 2209 if (RD_KAFKAP_STR_IS_NULL(&error_msg) || 2210 RD_KAFKAP_STR_LEN(&error_msg) == 0) 2211 this_errstr = 2212 (char *)rd_kafka_err2str(error_code); 2213 else 2214 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); 2215 } 2216 2217 terr = rd_kafka_topic_result_new(ktopic.str, 2218 RD_KAFKAP_STR_LEN(&ktopic), 2219 error_code, 2220 error_code ? 2221 this_errstr : NULL); 2222 2223 /* As a convenience to the application we insert topic result 2224 * in the same order as they were requested. The broker 2225 * does not maintain ordering unfortunately. */ 2226 skel.topic = terr->topic; 2227 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, 2228 &skel, rd_kafka_NewPartitions_cmp); 2229 if (orig_pos == -1) { 2230 rd_kafka_topic_result_destroy(terr); 2231 rd_kafka_buf_parse_fail( 2232 reply, 2233 "Broker returned topic %.*s that was not " 2234 "included in the original request", 2235 RD_KAFKAP_STR_PR(&ktopic)); 2236 } 2237 2238 if (rd_list_elem(&rko_result->rko_u.admin_result.results, 2239 orig_pos) != NULL) { 2240 rd_kafka_topic_result_destroy(terr); 2241 rd_kafka_buf_parse_fail( 2242 reply, 2243 "Broker returned topic %.*s multiple times", 2244 RD_KAFKAP_STR_PR(&ktopic)); 2245 } 2246 2247 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, 2248 terr); 2249 } 2250 2251 *rko_resultp = rko_result; 2252 2253 return RD_KAFKA_RESP_ERR_NO_ERROR; 2254 2255 err_parse: 2256 if (rko_result) 2257 rd_kafka_op_destroy(rko_result); 2258 2259 rd_snprintf(errstr, errstr_size, 2260 "CreatePartitions response protocol parse failure: %s", 2261 rd_kafka_err2str(reply->rkbuf_err)); 2262 2263 return reply->rkbuf_err; 2264 } 2265 2266 2267 2268 2269 2270 2271 2272 void rd_kafka_CreatePartitions (rd_kafka_t *rk, 2273 rd_kafka_NewPartitions_t **newps, 2274 size_t newps_cnt, 2275 const rd_kafka_AdminOptions_t *options, 2276 rd_kafka_queue_t *rkqu) { 2277 rd_kafka_op_t *rko; 2278 size_t i; 2279 static const struct rd_kafka_admin_worker_cbs cbs = { 2280 rd_kafka_CreatePartitionsRequest, 2281 rd_kafka_CreatePartitionsResponse_parse, 2282 }; 2283 2284 rd_assert(rkqu); 2285 2286 rko = rd_kafka_admin_request_op_new( 2287 rk, 2288 RD_KAFKA_OP_CREATEPARTITIONS, 2289 RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, 2290 &cbs, options, rkqu->rkqu_q); 2291 2292 rd_list_init(&rko->rko_u.admin_request.args, (int)newps_cnt, 2293 rd_kafka_NewPartitions_free); 2294 2295 for (i = 0 ; i < newps_cnt ; i++) 2296 rd_list_add(&rko->rko_u.admin_request.args, 2297 rd_kafka_NewPartitions_copy(newps[i])); 2298 2299 rd_kafka_q_enq(rk->rk_ops, rko); 2300 } 2301 2302 2303 /** 2304 * @brief Get an array of topic results from a CreatePartitions result. 2305 * 2306 * The returned \p topics life-time is the same as the \p result object. 2307 * @param cntp is updated to the number of elements in the array. 2308 */ 2309 const rd_kafka_topic_result_t ** 2310 rd_kafka_CreatePartitions_result_topics ( 2311 const rd_kafka_CreatePartitions_result_t *result, 2312 size_t *cntp) { 2313 return rd_kafka_admin_result_ret_topics((const rd_kafka_op_t *)result, 2314 cntp); 2315 } 2316 2317 /**@}*/ 2318 2319 2320 2321 2322 /** 2323 * @name ConfigEntry 2324 * @{ 2325 * 2326 * 2327 * 2328 */ 2329 2330 static void rd_kafka_ConfigEntry_destroy (rd_kafka_ConfigEntry_t *entry) { 2331 rd_strtup_destroy(entry->kv); 2332 rd_list_destroy(&entry->synonyms); 2333 rd_free(entry); 2334 } 2335 2336 2337 static void rd_kafka_ConfigEntry_free (void *ptr) { 2338 rd_kafka_ConfigEntry_destroy((rd_kafka_ConfigEntry_t *)ptr); 2339 } 2340 2341 2342 /** 2343 * @brief Create new ConfigEntry 2344 * 2345 * @param name Config entry name 2346 * @param name_len Length of name, or -1 to use strlen() 2347 * @param value Config entry value, or NULL 2348 * @param value_len Length of value, or -1 to use strlen() 2349 */ 2350 static rd_kafka_ConfigEntry_t * 2351 rd_kafka_ConfigEntry_new0 (const char *name, size_t name_len, 2352 const char *value, size_t value_len) { 2353 rd_kafka_ConfigEntry_t *entry; 2354 2355 if (!name) 2356 return NULL; 2357 2358 entry = rd_calloc(1, sizeof(*entry)); 2359 entry->kv = rd_strtup_new0(name, name_len, value, value_len); 2360 2361 rd_list_init(&entry->synonyms, 0, rd_kafka_ConfigEntry_free); 2362 2363 entry->a.source = RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG; 2364 2365 return entry; 2366 } 2367 2368 /** 2369 * @sa rd_kafka_ConfigEntry_new0 2370 */ 2371 static rd_kafka_ConfigEntry_t * 2372 rd_kafka_ConfigEntry_new (const char *name, const char *value) { 2373 return rd_kafka_ConfigEntry_new0(name, -1, value, -1); 2374 } 2375 2376 2377 2378 2379 /** 2380 * @brief Allocate a new AlterConfigs and make a copy of \p src 2381 */ 2382 static rd_kafka_ConfigEntry_t * 2383 rd_kafka_ConfigEntry_copy (const rd_kafka_ConfigEntry_t *src) { 2384 rd_kafka_ConfigEntry_t *dst; 2385 2386 dst = rd_kafka_ConfigEntry_new(src->kv->name, src->kv->value); 2387 dst->a = src->a; 2388 2389 rd_list_destroy(&dst->synonyms); /* created in .._new() */ 2390 rd_list_init_copy(&dst->synonyms, &src->synonyms); 2391 rd_list_copy_to(&dst->synonyms, &src->synonyms, 2392 rd_kafka_ConfigEntry_list_copy, NULL); 2393 2394 return dst; 2395 } 2396 2397 static void *rd_kafka_ConfigEntry_list_copy (const void *src, void *opaque) { 2398 return rd_kafka_ConfigEntry_copy((const rd_kafka_ConfigEntry_t *)src); 2399 } 2400 2401 2402 const char *rd_kafka_ConfigEntry_name (const rd_kafka_ConfigEntry_t *entry) { 2403 return entry->kv->name; 2404 } 2405 2406 const char * 2407 rd_kafka_ConfigEntry_value (const rd_kafka_ConfigEntry_t *entry) { 2408 return entry->kv->value; 2409 } 2410 2411 rd_kafka_ConfigSource_t 2412 rd_kafka_ConfigEntry_source (const rd_kafka_ConfigEntry_t *entry) { 2413 return entry->a.source; 2414 } 2415 2416 int rd_kafka_ConfigEntry_is_read_only (const rd_kafka_ConfigEntry_t *entry) { 2417 return entry->a.is_readonly; 2418 } 2419 2420 int rd_kafka_ConfigEntry_is_default (const rd_kafka_ConfigEntry_t *entry) { 2421 return entry->a.is_default; 2422 } 2423 2424 int rd_kafka_ConfigEntry_is_sensitive (const rd_kafka_ConfigEntry_t *entry) { 2425 return entry->a.is_sensitive; 2426 } 2427 2428 int rd_kafka_ConfigEntry_is_synonym (const rd_kafka_ConfigEntry_t *entry) { 2429 return entry->a.is_synonym; 2430 } 2431 2432 const rd_kafka_ConfigEntry_t ** 2433 rd_kafka_ConfigEntry_synonyms (const rd_kafka_ConfigEntry_t *entry, 2434 size_t *cntp) { 2435 *cntp = rd_list_cnt(&entry->synonyms); 2436 if (!*cntp) 2437 return NULL; 2438 return (const rd_kafka_ConfigEntry_t **)entry->synonyms.rl_elems; 2439 2440 } 2441 2442 2443 /**@}*/ 2444 2445 2446 2447 /** 2448 * @name ConfigSource 2449 * @{ 2450 * 2451 * 2452 * 2453 */ 2454 2455 const char * 2456 rd_kafka_ConfigSource_name (rd_kafka_ConfigSource_t confsource) { 2457 static const char *names[] = { 2458 "UNKNOWN_CONFIG", 2459 "DYNAMIC_TOPIC_CONFIG", 2460 "DYNAMIC_BROKER_CONFIG", 2461 "DYNAMIC_DEFAULT_BROKER_CONFIG", 2462 "STATIC_BROKER_CONFIG", 2463 "DEFAULT_CONFIG", 2464 }; 2465 2466 if ((unsigned int)confsource >= 2467 (unsigned int)RD_KAFKA_CONFIG_SOURCE__CNT) 2468 return "UNSUPPORTED"; 2469 2470 return names[confsource]; 2471 } 2472 2473 /**@}*/ 2474 2475 2476 2477 /** 2478 * @name ConfigResource 2479 * @{ 2480 * 2481 * 2482 * 2483 */ 2484 2485 const char * 2486 rd_kafka_ResourceType_name (rd_kafka_ResourceType_t restype) { 2487 static const char *names[] = { 2488 "UNKNOWN", 2489 "ANY", 2490 "TOPIC", 2491 "GROUP", 2492 "BROKER", 2493 }; 2494 2495 if ((unsigned int)restype >= 2496 (unsigned int)RD_KAFKA_RESOURCE__CNT) 2497 return "UNSUPPORTED"; 2498 2499 return names[restype]; 2500 } 2501 2502 2503 rd_kafka_ConfigResource_t * 2504 rd_kafka_ConfigResource_new (rd_kafka_ResourceType_t restype, 2505 const char *resname) { 2506 rd_kafka_ConfigResource_t *config; 2507 size_t namesz = resname ? strlen(resname) : 0; 2508 2509 if (!namesz || (int)restype < 0) 2510 return NULL; 2511 2512 config = rd_calloc(1, sizeof(*config) + namesz + 1); 2513 config->name = config->data; 2514 memcpy(config->name, resname, namesz + 1); 2515 config->restype = restype; 2516 2517 rd_list_init(&config->config, 8, rd_kafka_ConfigEntry_free); 2518 2519 return config; 2520 } 2521 2522 void rd_kafka_ConfigResource_destroy (rd_kafka_ConfigResource_t *config) { 2523 rd_list_destroy(&config->config); 2524 if (config->errstr) 2525 rd_free(config->errstr); 2526 rd_free(config); 2527 } 2528 2529 static void rd_kafka_ConfigResource_free (void *ptr) { 2530 rd_kafka_ConfigResource_destroy((rd_kafka_ConfigResource_t *)ptr); 2531 } 2532 2533 2534 void rd_kafka_ConfigResource_destroy_array (rd_kafka_ConfigResource_t **config, 2535 size_t config_cnt) { 2536 size_t i; 2537 for (i = 0 ; i < config_cnt ; i++) 2538 rd_kafka_ConfigResource_destroy(config[i]); 2539 } 2540 2541 2542 /** 2543 * @brief Type and name comparator for ConfigResource_t 2544 */ 2545 static int rd_kafka_ConfigResource_cmp (const void *_a, const void *_b) { 2546 const rd_kafka_ConfigResource_t *a = _a, *b = _b; 2547 int r = RD_CMP(a->restype, b->restype); 2548 if (r) 2549 return r; 2550 return strcmp(a->name, b->name); 2551 } 2552 2553 /** 2554 * @brief Allocate a new AlterConfigs and make a copy of \p src 2555 */ 2556 static rd_kafka_ConfigResource_t * 2557 rd_kafka_ConfigResource_copy (const rd_kafka_ConfigResource_t *src) { 2558 rd_kafka_ConfigResource_t *dst; 2559 2560 dst = rd_kafka_ConfigResource_new(src->restype, src->name); 2561 2562 rd_list_destroy(&dst->config); /* created in .._new() */ 2563 rd_list_init_copy(&dst->config, &src->config); 2564 rd_list_copy_to(&dst->config, &src->config, 2565 rd_kafka_ConfigEntry_list_copy, NULL); 2566 2567 return dst; 2568 } 2569 2570 2571 static void 2572 rd_kafka_ConfigResource_add_ConfigEntry (rd_kafka_ConfigResource_t *config, 2573 rd_kafka_ConfigEntry_t *entry) { 2574 rd_list_add(&config->config, entry); 2575 } 2576 2577 2578 rd_kafka_resp_err_t 2579 rd_kafka_ConfigResource_add_config (rd_kafka_ConfigResource_t *config, 2580 const char *name, const char *value) { 2581 if (!name || !*name || !value) 2582 return RD_KAFKA_RESP_ERR__INVALID_ARG; 2583 2584 return rd_kafka_admin_add_config0(&config->config, name, value, 2585 RD_KAFKA_ALTER_OP_ADD); 2586 } 2587 2588 rd_kafka_resp_err_t 2589 rd_kafka_ConfigResource_set_config (rd_kafka_ConfigResource_t *config, 2590 const char *name, const char *value) { 2591 if (!name || !*name || !value) 2592 return RD_KAFKA_RESP_ERR__INVALID_ARG; 2593 2594 return rd_kafka_admin_add_config0(&config->config, name, value, 2595 RD_KAFKA_ALTER_OP_SET); 2596 } 2597 2598 rd_kafka_resp_err_t 2599 rd_kafka_ConfigResource_delete_config (rd_kafka_ConfigResource_t *config, 2600 const char *name) { 2601 if (!name || !*name) 2602 return RD_KAFKA_RESP_ERR__INVALID_ARG; 2603 2604 return rd_kafka_admin_add_config0(&config->config, name, NULL, 2605 RD_KAFKA_ALTER_OP_DELETE); 2606 } 2607 2608 2609 const rd_kafka_ConfigEntry_t ** 2610 rd_kafka_ConfigResource_configs (const rd_kafka_ConfigResource_t *config, 2611 size_t *cntp) { 2612 *cntp = rd_list_cnt(&config->config); 2613 if (!*cntp) 2614 return NULL; 2615 return (const rd_kafka_ConfigEntry_t **)config->config.rl_elems; 2616 } 2617 2618 2619 2620 2621 rd_kafka_ResourceType_t 2622 rd_kafka_ConfigResource_type (const rd_kafka_ConfigResource_t *config) { 2623 return config->restype; 2624 } 2625 2626 const char * 2627 rd_kafka_ConfigResource_name (const rd_kafka_ConfigResource_t *config) { 2628 return config->name; 2629 } 2630 2631 rd_kafka_resp_err_t 2632 rd_kafka_ConfigResource_error (const rd_kafka_ConfigResource_t *config) { 2633 return config->err; 2634 } 2635 2636 const char * 2637 rd_kafka_ConfigResource_error_string (const rd_kafka_ConfigResource_t *config) { 2638 if (!config->err) 2639 return NULL; 2640 if (config->errstr) 2641 return config->errstr; 2642 return rd_kafka_err2str(config->err); 2643 } 2644 2645 2646 /** 2647 * @brief Look in the provided ConfigResource_t* list for a resource of 2648 * type BROKER and set its broker id in \p broker_id, returning 2649 * RD_KAFKA_RESP_ERR_NO_ERROR. 2650 * 2651 * If multiple BROKER resources are found RD_KAFKA_RESP_ERR__CONFLICT 2652 * is returned and an error string is written to errstr. 2653 * 2654 * If no BROKER resources are found RD_KAFKA_RESP_ERR_NO_ERROR 2655 * is returned and \p broker_idp is set to use the coordinator. 2656 */ 2657 static rd_kafka_resp_err_t 2658 rd_kafka_ConfigResource_get_single_broker_id (const rd_list_t *configs, 2659 int32_t *broker_idp, 2660 char *errstr, 2661 size_t errstr_size) { 2662 const rd_kafka_ConfigResource_t *config; 2663 int i; 2664 int32_t broker_id = RD_KAFKA_ADMIN_TARGET_CONTROLLER; /* Some default 2665 * value that we 2666 * can compare 2667 * to below */ 2668 2669 RD_LIST_FOREACH(config, configs, i) { 2670 char *endptr; 2671 long int r; 2672 2673 if (config->restype != RD_KAFKA_RESOURCE_BROKER) 2674 continue; 2675 2676 if (broker_id != RD_KAFKA_ADMIN_TARGET_CONTROLLER) { 2677 rd_snprintf(errstr, errstr_size, 2678 "Only one ConfigResource of type BROKER " 2679 "is allowed per call"); 2680 return RD_KAFKA_RESP_ERR__CONFLICT; 2681 } 2682 2683 /* Convert string broker-id to int32 */ 2684 r = (int32_t)strtol(config->name, &endptr, 10); 2685 if (r == LONG_MIN || r == LONG_MAX || config->name == endptr || 2686 r < 0) { 2687 rd_snprintf(errstr, errstr_size, 2688 "Expected an int32 broker_id for " 2689 "ConfigResource(type=BROKER, name=%s)", 2690 config->name); 2691 return RD_KAFKA_RESP_ERR__INVALID_ARG; 2692 } 2693 2694 broker_id = r; 2695 2696 /* Keep scanning to make sure there are no duplicate 2697 * BROKER resources. */ 2698 } 2699 2700 *broker_idp = broker_id; 2701 2702 return RD_KAFKA_RESP_ERR_NO_ERROR; 2703 } 2704 2705 2706 /**@}*/ 2707 2708 2709 2710 /** 2711 * @name AlterConfigs 2712 * @{ 2713 * 2714 * 2715 * 2716 */ 2717 2718 2719 2720 /** 2721 * @brief Parse AlterConfigsResponse and create ADMIN_RESULT op. 2722 */ 2723 static rd_kafka_resp_err_t 2724 rd_kafka_AlterConfigsResponse_parse (rd_kafka_op_t *rko_req, 2725 rd_kafka_op_t **rko_resultp, 2726 rd_kafka_buf_t *reply, 2727 char *errstr, size_t errstr_size) { 2728 const int log_decode_errors = LOG_ERR; 2729 rd_kafka_broker_t *rkb = reply->rkbuf_rkb; 2730 rd_kafka_t *rk = rkb->rkb_rk; 2731 rd_kafka_op_t *rko_result = NULL; 2732 int32_t res_cnt; 2733 int i; 2734 int32_t Throttle_Time; 2735 2736 rd_kafka_buf_read_i32(reply, &Throttle_Time); 2737 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); 2738 2739 rd_kafka_buf_read_i32(reply, &res_cnt); 2740 2741 if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) { 2742 rd_snprintf(errstr, errstr_size, 2743 "Received %"PRId32" ConfigResources in response " 2744 "when only %d were requested", res_cnt, 2745 rd_list_cnt(&rko_req->rko_u.admin_request.args)); 2746 return RD_KAFKA_RESP_ERR__BAD_MSG; 2747 } 2748 2749 rko_result = rd_kafka_admin_result_new(rko_req); 2750 2751 rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, 2752 rd_kafka_ConfigResource_free); 2753 2754 for (i = 0 ; i < (int)res_cnt ; i++) { 2755 int16_t error_code; 2756 rd_kafkap_str_t error_msg; 2757 int8_t res_type; 2758 rd_kafkap_str_t kres_name; 2759 char *res_name; 2760 char *this_errstr = NULL; 2761 rd_kafka_ConfigResource_t *config; 2762 rd_kafka_ConfigResource_t skel; 2763 int orig_pos; 2764 2765 rd_kafka_buf_read_i16(reply, &error_code); 2766 rd_kafka_buf_read_str(reply, &error_msg); 2767 rd_kafka_buf_read_i8(reply, &res_type); 2768 rd_kafka_buf_read_str(reply, &kres_name); 2769 RD_KAFKAP_STR_DUPA(&res_name, &kres_name); 2770 2771 if (error_code) { 2772 if (RD_KAFKAP_STR_IS_NULL(&error_msg) || 2773 RD_KAFKAP_STR_LEN(&error_msg) == 0) 2774 this_errstr = 2775 (char *)rd_kafka_err2str(error_code); 2776 else 2777 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); 2778 } 2779 2780 config = rd_kafka_ConfigResource_new(res_type, res_name); 2781 if (!config) { 2782 rd_kafka_log(rko_req->rko_rk, LOG_ERR, 2783 "ADMIN", "AlterConfigs returned " 2784 "unsupported ConfigResource #%d with " 2785 "type %d and name \"%s\": ignoring", 2786 i, res_type, res_name); 2787 continue; 2788 } 2789 2790 config->err = error_code; 2791 if (this_errstr) 2792 config->errstr = rd_strdup(this_errstr); 2793 2794 /* As a convenience to the application we insert result 2795 * in the same order as they were requested. The broker 2796 * does not maintain ordering unfortunately. */ 2797 skel.restype = config->restype; 2798 skel.name = config->name; 2799 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, 2800 &skel, rd_kafka_ConfigResource_cmp); 2801 if (orig_pos == -1) { 2802 rd_kafka_ConfigResource_destroy(config); 2803 rd_kafka_buf_parse_fail( 2804 reply, 2805 "Broker returned ConfigResource %d,%s " 2806 "that was not " 2807 "included in the original request", 2808 res_type, res_name); 2809 } 2810 2811 if (rd_list_elem(&rko_result->rko_u.admin_result.results, 2812 orig_pos) != NULL) { 2813 rd_kafka_ConfigResource_destroy(config); 2814 rd_kafka_buf_parse_fail( 2815 reply, 2816 "Broker returned ConfigResource %d,%s " 2817 "multiple times", 2818 res_type, res_name); 2819 } 2820 2821 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, 2822 config); 2823 } 2824 2825 *rko_resultp = rko_result; 2826 2827 return RD_KAFKA_RESP_ERR_NO_ERROR; 2828 2829 err_parse: 2830 if (rko_result) 2831 rd_kafka_op_destroy(rko_result); 2832 2833 rd_snprintf(errstr, errstr_size, 2834 "AlterConfigs response protocol parse failure: %s", 2835 rd_kafka_err2str(reply->rkbuf_err)); 2836 2837 return reply->rkbuf_err; 2838 } 2839 2840 2841 2842 2843 void rd_kafka_AlterConfigs (rd_kafka_t *rk, 2844 rd_kafka_ConfigResource_t **configs, 2845 size_t config_cnt, 2846 const rd_kafka_AdminOptions_t *options, 2847 rd_kafka_queue_t *rkqu) { 2848 rd_kafka_op_t *rko; 2849 size_t i; 2850 rd_kafka_resp_err_t err; 2851 char errstr[256]; 2852 static const struct rd_kafka_admin_worker_cbs cbs = { 2853 rd_kafka_AlterConfigsRequest, 2854 rd_kafka_AlterConfigsResponse_parse, 2855 }; 2856 2857 rd_assert(rkqu); 2858 2859 rko = rd_kafka_admin_request_op_new( 2860 rk, 2861 RD_KAFKA_OP_ALTERCONFIGS, 2862 RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, 2863 &cbs, options, rkqu->rkqu_q); 2864 2865 rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt, 2866 rd_kafka_ConfigResource_free); 2867 2868 for (i = 0 ; i < config_cnt ; i++) 2869 rd_list_add(&rko->rko_u.admin_request.args, 2870 rd_kafka_ConfigResource_copy(configs[i])); 2871 2872 /* If there's a BROKER resource in the list we need to 2873 * speak directly to that broker rather than the controller. 2874 * 2875 * Multiple BROKER resources are not allowed. 2876 */ 2877 err = rd_kafka_ConfigResource_get_single_broker_id( 2878 &rko->rko_u.admin_request.args, 2879 &rko->rko_u.admin_request.broker_id, 2880 errstr, sizeof(errstr)); 2881 if (err) { 2882 rd_kafka_admin_result_fail(rko, err, "%s", errstr); 2883 rd_kafka_admin_common_worker_destroy(rk, rko, 2884 rd_true/*destroy*/); 2885 return; 2886 } 2887 2888 rd_kafka_q_enq(rk->rk_ops, rko); 2889 } 2890 2891 2892 const rd_kafka_ConfigResource_t ** 2893 rd_kafka_AlterConfigs_result_resources ( 2894 const rd_kafka_AlterConfigs_result_t *result, 2895 size_t *cntp) { 2896 return rd_kafka_admin_result_ret_resources( 2897 (const rd_kafka_op_t *)result, cntp); 2898 } 2899 2900 /**@}*/ 2901 2902 2903 2904 2905 /** 2906 * @name DescribeConfigs 2907 * @{ 2908 * 2909 * 2910 * 2911 */ 2912 2913 2914 /** 2915 * @brief Parse DescribeConfigsResponse and create ADMIN_RESULT op. 2916 */ 2917 static rd_kafka_resp_err_t 2918 rd_kafka_DescribeConfigsResponse_parse (rd_kafka_op_t *rko_req, 2919 rd_kafka_op_t **rko_resultp, 2920 rd_kafka_buf_t *reply, 2921 char *errstr, size_t errstr_size) { 2922 const int log_decode_errors = LOG_ERR; 2923 rd_kafka_broker_t *rkb = reply->rkbuf_rkb; 2924 rd_kafka_t *rk = rkb->rkb_rk; 2925 rd_kafka_op_t *rko_result = NULL; 2926 int32_t res_cnt; 2927 int i; 2928 int32_t Throttle_Time; 2929 rd_kafka_ConfigResource_t *config = NULL; 2930 rd_kafka_ConfigEntry_t *entry = NULL; 2931 2932 rd_kafka_buf_read_i32(reply, &Throttle_Time); 2933 rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); 2934 2935 /* #resources */ 2936 rd_kafka_buf_read_i32(reply, &res_cnt); 2937 2938 if (res_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) 2939 rd_kafka_buf_parse_fail( 2940 reply, 2941 "Received %"PRId32" ConfigResources in response " 2942 "when only %d were requested", res_cnt, 2943 rd_list_cnt(&rko_req->rko_u.admin_request.args)); 2944 2945 rko_result = rd_kafka_admin_result_new(rko_req); 2946 2947 rd_list_init(&rko_result->rko_u.admin_result.results, res_cnt, 2948 rd_kafka_ConfigResource_free); 2949 2950 for (i = 0 ; i < (int)res_cnt ; i++) { 2951 int16_t error_code; 2952 rd_kafkap_str_t error_msg; 2953 int8_t res_type; 2954 rd_kafkap_str_t kres_name; 2955 char *res_name; 2956 char *this_errstr = NULL; 2957 rd_kafka_ConfigResource_t skel; 2958 int orig_pos; 2959 int32_t entry_cnt; 2960 int ci; 2961 2962 rd_kafka_buf_read_i16(reply, &error_code); 2963 rd_kafka_buf_read_str(reply, &error_msg); 2964 rd_kafka_buf_read_i8(reply, &res_type); 2965 rd_kafka_buf_read_str(reply, &kres_name); 2966 RD_KAFKAP_STR_DUPA(&res_name, &kres_name); 2967 2968 if (error_code) { 2969 if (RD_KAFKAP_STR_IS_NULL(&error_msg) || 2970 RD_KAFKAP_STR_LEN(&error_msg) == 0) 2971 this_errstr = 2972 (char *)rd_kafka_err2str(error_code); 2973 else 2974 RD_KAFKAP_STR_DUPA(&this_errstr, &error_msg); 2975 } 2976 2977 config = rd_kafka_ConfigResource_new(res_type, res_name); 2978 if (!config) { 2979 rd_kafka_log(rko_req->rko_rk, LOG_ERR, 2980 "ADMIN", "DescribeConfigs returned " 2981 "unsupported ConfigResource #%d with " 2982 "type %d and name \"%s\": ignoring", 2983 i, res_type, res_name); 2984 continue; 2985 } 2986 2987 config->err = error_code; 2988 if (this_errstr) 2989 config->errstr = rd_strdup(this_errstr); 2990 2991 /* #config_entries */ 2992 rd_kafka_buf_read_i32(reply, &entry_cnt); 2993 2994 for (ci = 0 ; ci < (int)entry_cnt ; ci++) { 2995 rd_kafkap_str_t config_name, config_value; 2996 int32_t syn_cnt; 2997 int si; 2998 2999 rd_kafka_buf_read_str(reply, &config_name); 3000 rd_kafka_buf_read_str(reply, &config_value); 3001 3002 entry = rd_kafka_ConfigEntry_new0( 3003 config_name.str, 3004 RD_KAFKAP_STR_LEN(&config_name), 3005 config_value.str, 3006 RD_KAFKAP_STR_LEN(&config_value)); 3007 3008 rd_kafka_buf_read_bool(reply, &entry->a.is_readonly); 3009 3010 /* ApiVersion 0 has is_default field, while 3011 * ApiVersion 1 has source field. 3012 * Convert between the two so they look the same 3013 * to the caller. */ 3014 if (rd_kafka_buf_ApiVersion(reply) == 0) { 3015 rd_kafka_buf_read_bool(reply, 3016 &entry->a.is_default); 3017 if (entry->a.is_default) 3018 entry->a.source = 3019 RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG; 3020 } else { 3021 int8_t config_source; 3022 rd_kafka_buf_read_i8(reply, &config_source); 3023 entry->a.source = config_source; 3024 3025 if (entry->a.source == 3026 RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG) 3027 entry->a.is_default = 1; 3028 3029 } 3030 3031 rd_kafka_buf_read_bool(reply, &entry->a.is_sensitive); 3032 3033 3034 if (rd_kafka_buf_ApiVersion(reply) == 1) { 3035 /* #config_synonyms (ApiVersion 1) */ 3036 rd_kafka_buf_read_i32(reply, &syn_cnt); 3037 3038 if (syn_cnt > 100000) 3039 rd_kafka_buf_parse_fail( 3040 reply, 3041 "Broker returned %"PRId32 3042 " config synonyms for " 3043 "ConfigResource %d,%s: " 3044 "limit is 100000", 3045 syn_cnt, 3046 config->restype, 3047 config->name); 3048 3049 if (syn_cnt > 0) 3050 rd_list_grow(&entry->synonyms, syn_cnt); 3051 3052 } else { 3053 /* No synonyms in ApiVersion 0 */ 3054 syn_cnt = 0; 3055 } 3056 3057 3058 3059 /* Read synonyms (ApiVersion 1) */ 3060 for (si = 0 ; si < (int)syn_cnt ; si++) { 3061 rd_kafkap_str_t syn_name, syn_value; 3062 int8_t syn_source; 3063 rd_kafka_ConfigEntry_t *syn_entry; 3064 3065 rd_kafka_buf_read_str(reply, &syn_name); 3066 rd_kafka_buf_read_str(reply, &syn_value); 3067 rd_kafka_buf_read_i8(reply, &syn_source); 3068 3069 syn_entry = rd_kafka_ConfigEntry_new0( 3070 syn_name.str, 3071 RD_KAFKAP_STR_LEN(&syn_name), 3072 syn_value.str, 3073 RD_KAFKAP_STR_LEN(&syn_value)); 3074 if (!syn_entry) 3075 rd_kafka_buf_parse_fail( 3076 reply, 3077 "Broker returned invalid " 3078 "synonym #%d " 3079 "for ConfigEntry #%d (%s) " 3080 "and ConfigResource %d,%s: " 3081 "syn_name.len %d, " 3082 "syn_value.len %d", 3083 si, ci, entry->kv->name, 3084 config->restype, config->name, 3085 (int)syn_name.len, 3086 (int)syn_value.len); 3087 3088 syn_entry->a.source = syn_source; 3089 syn_entry->a.is_synonym = 1; 3090 3091 rd_list_add(&entry->synonyms, syn_entry); 3092 } 3093 3094 rd_kafka_ConfigResource_add_ConfigEntry( 3095 config, entry); 3096 entry = NULL; 3097 } 3098 3099 /* As a convenience to the application we insert result 3100 * in the same order as they were requested. The broker 3101 * does not maintain ordering unfortunately. */ 3102 skel.restype = config->restype; 3103 skel.name = config->name; 3104 orig_pos = rd_list_index(&rko_req->rko_u.admin_request.args, 3105 &skel, rd_kafka_ConfigResource_cmp); 3106 if (orig_pos == -1) 3107 rd_kafka_buf_parse_fail( 3108 reply, 3109 "Broker returned ConfigResource %d,%s " 3110 "that was not " 3111 "included in the original request", 3112 res_type, res_name); 3113 3114 if (rd_list_elem(&rko_result->rko_u.admin_result.results, 3115 orig_pos) != NULL) 3116 rd_kafka_buf_parse_fail( 3117 reply, 3118 "Broker returned ConfigResource %d,%s " 3119 "multiple times", 3120 res_type, res_name); 3121 3122 rd_list_set(&rko_result->rko_u.admin_result.results, orig_pos, 3123 config); 3124 config = NULL; 3125 } 3126 3127 *rko_resultp = rko_result; 3128 3129 return RD_KAFKA_RESP_ERR_NO_ERROR; 3130 3131 err_parse: 3132 if (entry) 3133 rd_kafka_ConfigEntry_destroy(entry); 3134 if (config) 3135 rd_kafka_ConfigResource_destroy(config); 3136 3137 if (rko_result) 3138 rd_kafka_op_destroy(rko_result); 3139 3140 rd_snprintf(errstr, errstr_size, 3141 "DescribeConfigs response protocol parse failure: %s", 3142 rd_kafka_err2str(reply->rkbuf_err)); 3143 3144 return reply->rkbuf_err; 3145 } 3146 3147 3148 3149 void rd_kafka_DescribeConfigs (rd_kafka_t *rk, 3150 rd_kafka_ConfigResource_t **configs, 3151 size_t config_cnt, 3152 const rd_kafka_AdminOptions_t *options, 3153 rd_kafka_queue_t *rkqu) { 3154 rd_kafka_op_t *rko; 3155 size_t i; 3156 rd_kafka_resp_err_t err; 3157 char errstr[256]; 3158 static const struct rd_kafka_admin_worker_cbs cbs = { 3159 rd_kafka_DescribeConfigsRequest, 3160 rd_kafka_DescribeConfigsResponse_parse, 3161 }; 3162 3163 rd_assert(rkqu); 3164 3165 rko = rd_kafka_admin_request_op_new( 3166 rk, 3167 RD_KAFKA_OP_DESCRIBECONFIGS, 3168 RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, 3169 &cbs, options, rkqu->rkqu_q); 3170 3171 rd_list_init(&rko->rko_u.admin_request.args, (int)config_cnt, 3172 rd_kafka_ConfigResource_free); 3173 3174 for (i = 0 ; i < config_cnt ; i++) 3175 rd_list_add(&rko->rko_u.admin_request.args, 3176 rd_kafka_ConfigResource_copy(configs[i])); 3177 3178 /* If there's a BROKER resource in the list we need to 3179 * speak directly to that broker rather than the controller. 3180 * 3181 * Multiple BROKER resources are not allowed. 3182 */ 3183 err = rd_kafka_ConfigResource_get_single_broker_id( 3184 &rko->rko_u.admin_request.args, 3185 &rko->rko_u.admin_request.broker_id, 3186 errstr, sizeof(errstr)); 3187 if (err) { 3188 rd_kafka_admin_result_fail(rko, err, "%s", errstr); 3189 rd_kafka_admin_common_worker_destroy(rk, rko, 3190 rd_true/*destroy*/); 3191 return; 3192 } 3193 3194 rd_kafka_q_enq(rk->rk_ops, rko); 3195 } 3196 3197 3198 3199 3200 const rd_kafka_ConfigResource_t ** 3201 rd_kafka_DescribeConfigs_result_resources ( 3202 const rd_kafka_DescribeConfigs_result_t *result, 3203 size_t *cntp) { 3204 return rd_kafka_admin_result_ret_resources( 3205 (const rd_kafka_op_t *)result, cntp); 3206 } 3207 3208 /**@}*/ 3209 3210 /** 3211 * @name Delete Records 3212 * @{ 3213 * 3214 * 3215 * 3216 * 3217 */ 3218 3219 rd_kafka_DeleteRecords_t * 3220 rd_kafka_DeleteRecords_new (const rd_kafka_topic_partition_list_t * 3221 before_offsets) { 3222 rd_kafka_DeleteRecords_t *del_records; 3223 3224 del_records = rd_calloc(1, sizeof(*del_records)); 3225 del_records->offsets = 3226 rd_kafka_topic_partition_list_copy(before_offsets); 3227 3228 return del_records; 3229 } 3230 3231 void rd_kafka_DeleteRecords_destroy (rd_kafka_DeleteRecords_t *del_records) { 3232 rd_kafka_topic_partition_list_destroy(del_records->offsets); 3233 rd_free(del_records); 3234 } 3235 3236 void rd_kafka_DeleteRecords_destroy_array (rd_kafka_DeleteRecords_t ** 3237 del_records, 3238 size_t del_record_cnt) { 3239 size_t i; 3240 for (i = 0 ; i < del_record_cnt ; i++) 3241 rd_kafka_DeleteRecords_destroy(del_records[i]); 3242 } 3243 3244 3245 3246 /** @brief Merge the DeleteRecords response from a single broker 3247 * into the user response list. 3248 */ 3249 static void 3250 rd_kafka_DeleteRecords_response_merge (rd_kafka_op_t *rko_fanout, 3251 const rd_kafka_op_t *rko_partial) { 3252 rd_kafka_t *rk = rko_fanout->rko_rk; 3253 const rd_kafka_topic_partition_list_t *partitions; 3254 rd_kafka_topic_partition_list_t *respartitions; 3255 const rd_kafka_topic_partition_t *partition; 3256 3257 rd_assert(rko_partial->rko_evtype == 3258 RD_KAFKA_EVENT_DELETERECORDS_RESULT); 3259 3260 /* Partitions from the DeleteRecordsResponse */ 3261 partitions = rd_list_elem(&rko_partial->rko_u.admin_result.results, 0); 3262 3263 /* Partitions (offsets) from the DeleteRecords() call */ 3264 respartitions = rd_list_elem(&rko_fanout->rko_u.admin_request. 3265 fanout.results, 0); 3266 3267 RD_KAFKA_TPLIST_FOREACH(partition, partitions) { 3268 rd_kafka_topic_partition_t *respart; 3269 3270 3271 /* Find result partition */ 3272 respart = rd_kafka_topic_partition_list_find( 3273 respartitions, 3274 partition->topic, 3275 partition->partition); 3276 if (unlikely(!respart)) { 3277 rd_dassert(!*"partition not found"); 3278 3279 rd_kafka_log(rk, LOG_WARNING, "DELETERECORDS", 3280 "DeleteRecords response contains " 3281 "unexpected %s [%"PRId32"] which " 3282 "was not in the request list: ignored", 3283 partition->topic, partition->partition); 3284 continue; 3285 } 3286 3287 respart->offset = partition->offset; 3288 respart->err = partition->err; 3289 } 3290 } 3291 3292 3293 3294 /** 3295 * @brief Parse DeleteRecordsResponse and create ADMIN_RESULT op. 3296 */ 3297 static rd_kafka_resp_err_t 3298 rd_kafka_DeleteRecordsResponse_parse (rd_kafka_op_t *rko_req, 3299 rd_kafka_op_t **rko_resultp, 3300 rd_kafka_buf_t *reply, 3301 char *errstr, size_t errstr_size) { 3302 const int log_decode_errors = LOG_ERR; 3303 rd_kafka_op_t *rko_result; 3304 rd_kafka_topic_partition_list_t *offsets; 3305 3306 rd_kafka_buf_read_throttle_time(reply); 3307 3308 offsets = rd_kafka_buf_read_topic_partitions(reply, 0, 3309 rd_true/*read_offset*/, 3310 rd_true/*read_part_errs*/); 3311 if (!offsets) 3312 rd_kafka_buf_parse_fail(reply, 3313 "Failed to parse topic partitions"); 3314 3315 3316 rko_result = rd_kafka_admin_result_new(rko_req); 3317 rd_list_init(&rko_result->rko_u.admin_result.results, 1, 3318 rd_kafka_topic_partition_list_destroy_free); 3319 rd_list_add(&rko_result->rko_u.admin_result.results, offsets); 3320 *rko_resultp = rko_result; 3321 return RD_KAFKA_RESP_ERR_NO_ERROR; 3322 3323 err_parse: 3324 rd_snprintf(errstr, errstr_size, 3325 "DeleteRecords response protocol parse failure: %s", 3326 rd_kafka_err2str(reply->rkbuf_err)); 3327 3328 return reply->rkbuf_err; 3329 } 3330 3331 3332 /** 3333 * @brief Call when leaders have been queried to progress the DeleteRecords 3334 * admin op to its next phase, sending DeleteRecords to partition 3335 * leaders. 3336 * 3337 * @param rko Reply op (RD_KAFKA_OP_LEADERS). 3338 */ 3339 static rd_kafka_op_res_t 3340 rd_kafka_DeleteRecords_leaders_queried_cb (rd_kafka_t *rk, 3341 rd_kafka_q_t *rkq, 3342 rd_kafka_op_t *reply) { 3343 rd_kafka_resp_err_t err = reply->rko_err; 3344 const rd_list_t *leaders = 3345 reply->rko_u.leaders.leaders; /* Possibly NULL (on err) */ 3346 rd_kafka_topic_partition_list_t *partitions = 3347 reply->rko_u.leaders.partitions; /* Possibly NULL (on err) */ 3348 rd_kafka_op_t *rko_fanout = reply->rko_u.leaders.opaque; 3349 rd_kafka_topic_partition_t *rktpar; 3350 rd_kafka_topic_partition_list_t *offsets; 3351 const struct rd_kafka_partition_leader *leader; 3352 static const struct rd_kafka_admin_worker_cbs cbs = { 3353 rd_kafka_DeleteRecordsRequest, 3354 rd_kafka_DeleteRecordsResponse_parse, 3355 }; 3356 int i; 3357 3358 rd_assert((rko_fanout->rko_type & ~RD_KAFKA_OP_FLAGMASK) == 3359 RD_KAFKA_OP_ADMIN_FANOUT); 3360 3361 if (err == RD_KAFKA_RESP_ERR__DESTROY) 3362 goto err; 3363 3364 /* Requested offsets */ 3365 offsets = rd_list_elem(&rko_fanout->rko_u.admin_request.args, 0); 3366 3367 /* Update the error field of each partition from the 3368 * leader-queried partition list so that ERR_UNKNOWN_TOPIC_OR_PART 3369 * and similar are propagated, since those partitions are not 3370 * included in the leaders list. */ 3371 RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) { 3372 rd_kafka_topic_partition_t *rktpar2; 3373 3374 if (!rktpar->err) 3375 continue; 3376 3377 rktpar2 = rd_kafka_topic_partition_list_find( 3378 offsets, rktpar->topic, rktpar->partition); 3379 rd_assert(rktpar2); 3380 rktpar2->err = rktpar->err; 3381 } 3382 3383 3384 if (err) { 3385 err: 3386 rd_kafka_admin_result_fail( 3387 rko_fanout, 3388 err, 3389 "Failed to query partition leaders: %s", 3390 err == RD_KAFKA_RESP_ERR__NOENT ? 3391 "No leaders found" : rd_kafka_err2str(err)); 3392 rd_kafka_admin_common_worker_destroy(rk, rko_fanout, 3393 rd_true/*destroy*/); 3394 return RD_KAFKA_OP_RES_HANDLED; 3395 } 3396 3397 /* The response lists is one element deep and that element is a 3398 * rd_kafka_topic_partition_list_t with the results of the deletes. */ 3399 rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, 1, 3400 rd_kafka_topic_partition_list_destroy_free); 3401 rd_list_add(&rko_fanout->rko_u.admin_request.fanout.results, 3402 rd_kafka_topic_partition_list_copy(offsets)); 3403 3404 rko_fanout->rko_u.admin_request.fanout.outstanding = 3405 rd_list_cnt(leaders); 3406 3407 rd_assert(rd_list_cnt(leaders) > 0); 3408 3409 /* For each leader send a request for its partitions */ 3410 RD_LIST_FOREACH(leader, leaders, i) { 3411 rd_kafka_op_t *rko = 3412 rd_kafka_admin_request_op_new( 3413 rk, 3414 RD_KAFKA_OP_DELETERECORDS, 3415 RD_KAFKA_EVENT_DELETERECORDS_RESULT, 3416 &cbs, &rko_fanout->rko_u.admin_request.options, 3417 rk->rk_ops); 3418 rko->rko_u.admin_request.fanout_parent = rko_fanout; 3419 rko->rko_u.admin_request.broker_id = leader->rkb->rkb_nodeid; 3420 3421 rd_kafka_topic_partition_list_sort_by_topic(leader->partitions); 3422 3423 rd_list_init(&rko->rko_u.admin_request.args, 1, 3424 rd_kafka_topic_partition_list_destroy_free); 3425 rd_list_add(&rko->rko_u.admin_request.args, 3426 rd_kafka_topic_partition_list_copy( 3427 leader->partitions)); 3428 3429 /* Enqueue op for admin_worker() to transition to next state */ 3430 rd_kafka_q_enq(rk->rk_ops, rko); 3431 } 3432 3433 return RD_KAFKA_OP_RES_HANDLED; 3434 } 3435 3436 3437 void rd_kafka_DeleteRecords (rd_kafka_t *rk, 3438 rd_kafka_DeleteRecords_t **del_records, 3439 size_t del_record_cnt, 3440 const rd_kafka_AdminOptions_t *options, 3441 rd_kafka_queue_t *rkqu) { 3442 rd_kafka_op_t *rko_fanout; 3443 static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { 3444 rd_kafka_DeleteRecords_response_merge, 3445 rd_kafka_topic_partition_list_copy_opaque, 3446 }; 3447 const rd_kafka_topic_partition_list_t *offsets; 3448 rd_kafka_topic_partition_list_t *copied_offsets; 3449 3450 rd_assert(rkqu); 3451 3452 rko_fanout = rd_kafka_admin_fanout_op_new( 3453 rk, 3454 RD_KAFKA_OP_DELETERECORDS, 3455 RD_KAFKA_EVENT_DELETERECORDS_RESULT, 3456 &fanout_cbs, options, rkqu->rkqu_q); 3457 3458 if (del_record_cnt != 1) { 3459 /* We only support one DeleteRecords per call since there 3460 * is no point in passing multiples, but the API still 3461 * needs to be extensible/future-proof. */ 3462 rd_kafka_admin_result_fail(rko_fanout, 3463 RD_KAFKA_RESP_ERR__INVALID_ARG, 3464 "Exactly one DeleteRecords must be " 3465 "passed"); 3466 rd_kafka_admin_common_worker_destroy(rk, rko_fanout, 3467 rd_true/*destroy*/); 3468 return; 3469 } 3470 3471 offsets = del_records[0]->offsets; 3472 3473 if (offsets == NULL || offsets->cnt == 0) { 3474 rd_kafka_admin_result_fail(rko_fanout, 3475 RD_KAFKA_RESP_ERR__INVALID_ARG, 3476 "No records to delete"); 3477 rd_kafka_admin_common_worker_destroy(rk, rko_fanout, 3478 rd_true/*destroy*/); 3479 return; 3480 } 3481 3482 /* Copy offsets list and store it on the request op */ 3483 copied_offsets = rd_kafka_topic_partition_list_copy(offsets); 3484 if (rd_kafka_topic_partition_list_has_duplicates( 3485 copied_offsets, rd_false/*check partition*/)) { 3486 rd_kafka_topic_partition_list_destroy(copied_offsets); 3487 rd_kafka_admin_result_fail(rko_fanout, 3488 RD_KAFKA_RESP_ERR__INVALID_ARG, 3489 "Duplicate partitions not allowed"); 3490 rd_kafka_admin_common_worker_destroy(rk, rko_fanout, 3491 rd_true/*destroy*/); 3492 return; 3493 } 3494 3495 /* Set default error on each partition so that if any of the partitions 3496 * never get a request sent we have an error to indicate it. */ 3497 rd_kafka_topic_partition_list_set_err(copied_offsets, 3498 RD_KAFKA_RESP_ERR__NOOP); 3499 3500 rd_list_init(&rko_fanout->rko_u.admin_request.args, 1, 3501 rd_kafka_topic_partition_list_destroy_free); 3502 rd_list_add(&rko_fanout->rko_u.admin_request.args, copied_offsets); 3503 3504 /* Async query for partition leaders */ 3505 rd_kafka_topic_partition_list_query_leaders_async( 3506 rk, copied_offsets, 3507 rd_kafka_admin_timeout_remains(rko_fanout), 3508 RD_KAFKA_REPLYQ(rk->rk_ops, 0), 3509 rd_kafka_DeleteRecords_leaders_queried_cb, 3510 rko_fanout); 3511 } 3512 3513 3514 /** 3515 * @brief Get the list of offsets from a DeleteRecords result. 3516 * 3517 * The returned \p offsets life-time is the same as the \p result object. 3518 */ 3519 const rd_kafka_topic_partition_list_t * 3520 rd_kafka_DeleteRecords_result_offsets ( 3521 const rd_kafka_DeleteRecords_result_t *result) { 3522 const rd_kafka_topic_partition_list_t *offsets; 3523 const rd_kafka_op_t *rko = (const rd_kafka_op_t *) result; 3524 size_t cnt; 3525 3526 rd_kafka_op_type_t reqtype = 3527 rko->rko_u.admin_result.reqtype & ~RD_KAFKA_OP_FLAGMASK; 3528 rd_assert(reqtype == RD_KAFKA_OP_DELETERECORDS); 3529 3530 cnt = rd_list_cnt(&rko->rko_u.admin_result.results); 3531 3532 rd_assert(cnt == 1); 3533 3534 offsets = (const rd_kafka_topic_partition_list_t *) 3535 rd_list_elem(&rko->rko_u.admin_result.results, 0); 3536 3537 rd_assert(offsets); 3538 3539 return offsets; 3540 } 3541 3542 /**@}*/ 3543 3544 /** 3545 * @name Delete groups 3546 * @{ 3547 * 3548 * 3549 * 3550 * 3551 */ 3552 3553 rd_kafka_DeleteGroup_t *rd_kafka_DeleteGroup_new (const char *group) { 3554 size_t tsize = strlen(group) + 1; 3555 rd_kafka_DeleteGroup_t *del_group; 3556 3557 /* Single allocation */ 3558 del_group = rd_malloc(sizeof(*del_group) + tsize); 3559 del_group->group = del_group->data; 3560 memcpy(del_group->group, group, tsize); 3561 3562 return del_group; 3563 } 3564 3565 void rd_kafka_DeleteGroup_destroy (rd_kafka_DeleteGroup_t *del_group) { 3566 rd_free(del_group); 3567 } 3568 3569 static void rd_kafka_DeleteGroup_free (void *ptr) { 3570 rd_kafka_DeleteGroup_destroy(ptr); 3571 } 3572 3573 void rd_kafka_DeleteGroup_destroy_array (rd_kafka_DeleteGroup_t **del_groups, 3574 size_t del_group_cnt) { 3575 size_t i; 3576 for (i = 0 ; i < del_group_cnt ; i++) 3577 rd_kafka_DeleteGroup_destroy(del_groups[i]); 3578 } 3579 3580 /** 3581 * @brief Group name comparator for DeleteGroup_t 3582 */ 3583 static int rd_kafka_DeleteGroup_cmp (const void *_a, const void *_b) { 3584 const rd_kafka_DeleteGroup_t *a = _a, *b = _b; 3585 return strcmp(a->group, b->group); 3586 } 3587 3588 /** 3589 * @brief Allocate a new DeleteGroup and make a copy of \p src 3590 */ 3591 static rd_kafka_DeleteGroup_t * 3592 rd_kafka_DeleteGroup_copy (const rd_kafka_DeleteGroup_t *src) { 3593 return rd_kafka_DeleteGroup_new(src->group); 3594 } 3595 3596 3597 /** 3598 * @brief Parse DeleteGroupsResponse and create ADMIN_RESULT op. 3599 */ 3600 static rd_kafka_resp_err_t 3601 rd_kafka_DeleteGroupsResponse_parse (rd_kafka_op_t *rko_req, 3602 rd_kafka_op_t **rko_resultp, 3603 rd_kafka_buf_t *reply, 3604 char *errstr, size_t errstr_size) { 3605 const int log_decode_errors = LOG_ERR; 3606 int32_t group_cnt; 3607 int i; 3608 rd_kafka_op_t *rko_result = NULL; 3609 3610 rd_kafka_buf_read_throttle_time(reply); 3611 3612 /* #group_error_codes */ 3613 rd_kafka_buf_read_i32(reply, &group_cnt); 3614 3615 if (group_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) 3616 rd_kafka_buf_parse_fail( 3617 reply, 3618 "Received %"PRId32" groups in response " 3619 "when only %d were requested", group_cnt, 3620 rd_list_cnt(&rko_req->rko_u.admin_request.args)); 3621 3622 rko_result = rd_kafka_admin_result_new(rko_req); 3623 rd_list_init(&rko_result->rko_u.admin_result.results, 3624 group_cnt, 3625 rd_kafka_group_result_free); 3626 3627 for (i = 0 ; i < (int)group_cnt ; i++) { 3628 rd_kafkap_str_t kgroup; 3629 int16_t error_code; 3630 rd_kafka_group_result_t *groupres; 3631 3632 rd_kafka_buf_read_str(reply, &kgroup); 3633 rd_kafka_buf_read_i16(reply, &error_code); 3634 3635 groupres = rd_kafka_group_result_new( 3636 kgroup.str, 3637 RD_KAFKAP_STR_LEN(&kgroup), 3638 NULL, 3639 error_code ? 3640 rd_kafka_error_new(error_code, NULL) : NULL); 3641 3642 rd_list_add(&rko_result->rko_u.admin_result.results, groupres); 3643 } 3644 3645 *rko_resultp = rko_result; 3646 return RD_KAFKA_RESP_ERR_NO_ERROR; 3647 3648 err_parse: 3649 if (rko_result) 3650 rd_kafka_op_destroy(rko_result); 3651 3652 rd_snprintf(errstr, errstr_size, 3653 "DeleteGroups response protocol parse failure: %s", 3654 rd_kafka_err2str(reply->rkbuf_err)); 3655 3656 return reply->rkbuf_err; 3657 } 3658 3659 /** @brief Merge the DeleteGroups response from a single broker 3660 * into the user response list. 3661 */ 3662 void rd_kafka_DeleteGroups_response_merge (rd_kafka_op_t *rko_fanout, 3663 const rd_kafka_op_t *rko_partial) { 3664 const rd_kafka_group_result_t *groupres = NULL; 3665 rd_kafka_group_result_t *newgroupres; 3666 const rd_kafka_DeleteGroup_t *grp = 3667 rko_partial->rko_u.admin_result.opaque; 3668 int orig_pos; 3669 3670 rd_assert(rko_partial->rko_evtype == 3671 RD_KAFKA_EVENT_DELETEGROUPS_RESULT); 3672 3673 if (!rko_partial->rko_err) { 3674 /* Proper results. 3675 * We only send one group per request, make sure it matches */ 3676 groupres = rd_list_elem(&rko_partial->rko_u.admin_result. 3677 results, 0); 3678 rd_assert(groupres); 3679 rd_assert(!strcmp(groupres->group, grp->group)); 3680 newgroupres = rd_kafka_group_result_copy(groupres); 3681 } else { 3682 /* Op errored, e.g. timeout */ 3683 newgroupres = rd_kafka_group_result_new( 3684 grp->group, -1, NULL, 3685 rd_kafka_error_new(rko_partial->rko_err, NULL)); 3686 } 3687 3688 /* As a convenience to the application we insert group result 3689 * in the same order as they were requested. */ 3690 orig_pos = rd_list_index(&rko_fanout->rko_u.admin_request.args, 3691 grp, rd_kafka_DeleteGroup_cmp); 3692 rd_assert(orig_pos != -1); 3693 3694 /* Make sure result is not already set */ 3695 rd_assert(rd_list_elem(&rko_fanout->rko_u.admin_request. 3696 fanout.results, orig_pos) == NULL); 3697 3698 rd_list_set(&rko_fanout->rko_u.admin_request.fanout.results, 3699 orig_pos, newgroupres); 3700 } 3701 3702 void rd_kafka_DeleteGroups (rd_kafka_t *rk, 3703 rd_kafka_DeleteGroup_t **del_groups, 3704 size_t del_group_cnt, 3705 const rd_kafka_AdminOptions_t *options, 3706 rd_kafka_queue_t *rkqu) { 3707 rd_kafka_op_t *rko_fanout; 3708 rd_list_t dup_list; 3709 size_t i; 3710 static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { 3711 rd_kafka_DeleteGroups_response_merge, 3712 rd_kafka_group_result_copy_opaque, 3713 }; 3714 3715 rd_assert(rkqu); 3716 3717 rko_fanout = rd_kafka_admin_fanout_op_new( 3718 rk, 3719 RD_KAFKA_OP_DELETEGROUPS, 3720 RD_KAFKA_EVENT_DELETEGROUPS_RESULT, 3721 &fanout_cbs, options, rkqu->rkqu_q); 3722 3723 if (del_group_cnt == 0) { 3724 rd_kafka_admin_result_fail(rko_fanout, 3725 RD_KAFKA_RESP_ERR__INVALID_ARG, 3726 "No groups to delete"); 3727 rd_kafka_admin_common_worker_destroy(rk, rko_fanout, 3728 rd_true/*destroy*/); 3729 return; 3730 } 3731 3732 /* Copy group list and store it on the request op. 3733 * Maintain original ordering. */ 3734 rd_list_init(&rko_fanout->rko_u.admin_request.args, 3735 (int)del_group_cnt, 3736 rd_kafka_DeleteGroup_free); 3737 for (i = 0; i < del_group_cnt; i++) 3738 rd_list_add(&rko_fanout->rko_u.admin_request.args, 3739 rd_kafka_DeleteGroup_copy(del_groups[i])); 3740 3741 /* Check for duplicates. 3742 * Make a temporary copy of the group list and sort it to check for 3743 * duplicates, we don't want the original list sorted since we want 3744 * to maintain ordering. */ 3745 rd_list_init(&dup_list, 3746 rd_list_cnt(&rko_fanout->rko_u.admin_request.args), 3747 NULL); 3748 rd_list_copy_to(&dup_list, 3749 &rko_fanout->rko_u.admin_request.args, 3750 NULL, NULL); 3751 rd_list_sort(&dup_list, rd_kafka_DeleteGroup_cmp); 3752 if (rd_list_find_duplicate(&dup_list, rd_kafka_DeleteGroup_cmp)) { 3753 rd_list_destroy(&dup_list); 3754 rd_kafka_admin_result_fail(rko_fanout, 3755 RD_KAFKA_RESP_ERR__INVALID_ARG, 3756 "Duplicate groups not allowed"); 3757 rd_kafka_admin_common_worker_destroy(rk, rko_fanout, 3758 rd_true/*destroy*/); 3759 return; 3760 } 3761 3762 rd_list_destroy(&dup_list); 3763 3764 /* Prepare results list where fanned out op's results will be 3765 * accumulated. */ 3766 rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, 3767 (int)del_group_cnt, 3768 rd_kafka_group_result_free); 3769 rko_fanout->rko_u.admin_request.fanout.outstanding = (int)del_group_cnt; 3770 3771 /* Create individual request ops for each group. 3772 * FIXME: A future optimization is to coalesce all groups for a single 3773 * coordinator into one op. */ 3774 for (i = 0; i < del_group_cnt; i++) { 3775 static const struct rd_kafka_admin_worker_cbs cbs = { 3776 rd_kafka_DeleteGroupsRequest, 3777 rd_kafka_DeleteGroupsResponse_parse, 3778 }; 3779 rd_kafka_DeleteGroup_t *grp = rd_list_elem( 3780 &rko_fanout->rko_u.admin_request.args, (int)i); 3781 rd_kafka_op_t *rko = 3782 rd_kafka_admin_request_op_new( 3783 rk, 3784 RD_KAFKA_OP_DELETEGROUPS, 3785 RD_KAFKA_EVENT_DELETEGROUPS_RESULT, 3786 &cbs, 3787 options, 3788 rk->rk_ops); 3789 3790 rko->rko_u.admin_request.fanout_parent = rko_fanout; 3791 rko->rko_u.admin_request.broker_id = 3792 RD_KAFKA_ADMIN_TARGET_COORDINATOR; 3793 rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; 3794 rko->rko_u.admin_request.coordkey = rd_strdup(grp->group); 3795 3796 /* Set the group name as the opaque so the fanout worker use it 3797 * to fill in errors. 3798 * References rko_fanout's memory, which will always outlive 3799 * the fanned out op. */ 3800 rd_kafka_AdminOptions_set_opaque( 3801 &rko->rko_u.admin_request.options, grp); 3802 3803 rd_list_init(&rko->rko_u.admin_request.args, 1, 3804 rd_kafka_DeleteGroup_free); 3805 rd_list_add(&rko->rko_u.admin_request.args, 3806 rd_kafka_DeleteGroup_copy(del_groups[i])); 3807 3808 rd_kafka_q_enq(rk->rk_ops, rko); 3809 } 3810 } 3811 3812 3813 /** 3814 * @brief Get an array of group results from a DeleteGroups result. 3815 * 3816 * The returned \p groups life-time is the same as the \p result object. 3817 * @param cntp is updated to the number of elements in the array. 3818 */ 3819 const rd_kafka_group_result_t ** 3820 rd_kafka_DeleteGroups_result_groups ( 3821 const rd_kafka_DeleteGroups_result_t *result, 3822 size_t *cntp) { 3823 return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, 3824 cntp); 3825 } 3826 3827 3828 /**@}*/ 3829 3830 3831 /** 3832 * @name Delete consumer group offsets (committed offsets) 3833 * @{ 3834 * 3835 * 3836 * 3837 * 3838 */ 3839 3840 rd_kafka_DeleteConsumerGroupOffsets_t * 3841 rd_kafka_DeleteConsumerGroupOffsets_new (const char *group, 3842 const rd_kafka_topic_partition_list_t 3843 *partitions) { 3844 size_t tsize = strlen(group) + 1; 3845 rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets; 3846 3847 rd_assert(partitions); 3848 3849 /* Single allocation */ 3850 del_grpoffsets = rd_malloc(sizeof(*del_grpoffsets) + tsize); 3851 del_grpoffsets->group = del_grpoffsets->data; 3852 memcpy(del_grpoffsets->group, group, tsize); 3853 del_grpoffsets->partitions = 3854 rd_kafka_topic_partition_list_copy(partitions); 3855 3856 return del_grpoffsets; 3857 } 3858 3859 void rd_kafka_DeleteConsumerGroupOffsets_destroy ( 3860 rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets) { 3861 rd_kafka_topic_partition_list_destroy(del_grpoffsets->partitions); 3862 rd_free(del_grpoffsets); 3863 } 3864 3865 static void rd_kafka_DeleteConsumerGroupOffsets_free (void *ptr) { 3866 rd_kafka_DeleteConsumerGroupOffsets_destroy(ptr); 3867 } 3868 3869 void rd_kafka_DeleteConsumerGroupOffsets_destroy_array ( 3870 rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets, 3871 size_t del_grpoffsets_cnt) { 3872 size_t i; 3873 for (i = 0 ; i < del_grpoffsets_cnt ; i++) 3874 rd_kafka_DeleteConsumerGroupOffsets_destroy(del_grpoffsets[i]); 3875 } 3876 3877 3878 /** 3879 * @brief Allocate a new DeleteGroup and make a copy of \p src 3880 */ 3881 static rd_kafka_DeleteConsumerGroupOffsets_t * 3882 rd_kafka_DeleteConsumerGroupOffsets_copy ( 3883 const rd_kafka_DeleteConsumerGroupOffsets_t *src) { 3884 return rd_kafka_DeleteConsumerGroupOffsets_new(src->group, 3885 src->partitions); 3886 } 3887 3888 3889 /** 3890 * @brief Parse OffsetDeleteResponse and create ADMIN_RESULT op. 3891 */ 3892 static rd_kafka_resp_err_t 3893 rd_kafka_OffsetDeleteResponse_parse (rd_kafka_op_t *rko_req, 3894 rd_kafka_op_t **rko_resultp, 3895 rd_kafka_buf_t *reply, 3896 char *errstr, size_t errstr_size) { 3897 const int log_decode_errors = LOG_ERR; 3898 rd_kafka_op_t *rko_result; 3899 int16_t ErrorCode; 3900 rd_kafka_topic_partition_list_t *partitions = NULL; 3901 const rd_kafka_DeleteConsumerGroupOffsets_t *del_grpoffsets = 3902 rd_list_elem(&rko_req->rko_u.admin_request.args, 0); 3903 3904 rd_kafka_buf_read_i16(reply, &ErrorCode); 3905 if (ErrorCode) { 3906 rd_snprintf(errstr, errstr_size, 3907 "OffsetDelete response error: %s", 3908 rd_kafka_err2str(ErrorCode)); 3909 return ErrorCode; 3910 } 3911 3912 rd_kafka_buf_read_throttle_time(reply); 3913 3914 partitions = rd_kafka_buf_read_topic_partitions(reply, 3915 16, 3916 rd_false/*no offset */, 3917 rd_true/*read error*/); 3918 if (!partitions) { 3919 rd_snprintf(errstr, errstr_size, 3920 "Failed to parse OffsetDeleteResponse partitions"); 3921 return RD_KAFKA_RESP_ERR__BAD_MSG; 3922 } 3923 3924 3925 /* Create result op and group_result_t */ 3926 rko_result = rd_kafka_admin_result_new(rko_req); 3927 rd_list_init(&rko_result->rko_u.admin_result.results, 1, 3928 rd_kafka_group_result_free); 3929 rd_list_add(&rko_result->rko_u.admin_result.results, 3930 rd_kafka_group_result_new(del_grpoffsets->group, -1, 3931 partitions, NULL)); 3932 rd_kafka_topic_partition_list_destroy(partitions); 3933 3934 *rko_resultp = rko_result; 3935 3936 return RD_KAFKA_RESP_ERR_NO_ERROR; 3937 3938 err_parse: 3939 rd_snprintf(errstr, errstr_size, 3940 "OffsetDelete response protocol parse failure: %s", 3941 rd_kafka_err2str(reply->rkbuf_err)); 3942 return reply->rkbuf_err; 3943 } 3944 3945 3946 void rd_kafka_DeleteConsumerGroupOffsets ( 3947 rd_kafka_t *rk, 3948 rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets, 3949 size_t del_grpoffsets_cnt, 3950 const rd_kafka_AdminOptions_t *options, 3951 rd_kafka_queue_t *rkqu) { 3952 static const struct rd_kafka_admin_worker_cbs cbs = { 3953 rd_kafka_OffsetDeleteRequest, 3954 rd_kafka_OffsetDeleteResponse_parse, 3955 }; 3956 rd_kafka_op_t *rko; 3957 3958 rd_assert(rkqu); 3959 3960 rko = rd_kafka_admin_request_op_new( 3961 rk, 3962 RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS, 3963 RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT, 3964 &cbs, options, rkqu->rkqu_q); 3965 3966 if (del_grpoffsets_cnt != 1) { 3967 /* For simplicity we only support one single group for now */ 3968 rd_kafka_admin_result_fail(rko, 3969 RD_KAFKA_RESP_ERR__INVALID_ARG, 3970 "Exactly one " 3971 "DeleteConsumerGroupOffsets must " 3972 "be passed"); 3973 rd_kafka_admin_common_worker_destroy(rk, rko, 3974 rd_true/*destroy*/); 3975 return; 3976 } 3977 3978 3979 rko->rko_u.admin_request.broker_id = 3980 RD_KAFKA_ADMIN_TARGET_COORDINATOR; 3981 rko->rko_u.admin_request.coordtype = RD_KAFKA_COORD_GROUP; 3982 rko->rko_u.admin_request.coordkey = 3983 rd_strdup(del_grpoffsets[0]->group); 3984 3985 /* Store copy of group on request so the group name can be reached 3986 * from the response parser. */ 3987 rd_list_init(&rko->rko_u.admin_request.args, 1, 3988 rd_kafka_DeleteConsumerGroupOffsets_free); 3989 rd_list_add(&rko->rko_u.admin_request.args, 3990 rd_kafka_DeleteConsumerGroupOffsets_copy( 3991 del_grpoffsets[0])); 3992 3993 rd_kafka_q_enq(rk->rk_ops, rko); 3994 } 3995 3996 3997 /** 3998 * @brief Get an array of group results from a DeleteGroups result. 3999 * 4000 * The returned \p groups life-time is the same as the \p result object. 4001 * @param cntp is updated to the number of elements in the array. 4002 */ 4003 const rd_kafka_group_result_t ** 4004 rd_kafka_DeleteConsumerGroupOffsets_result_groups ( 4005 const rd_kafka_DeleteConsumerGroupOffsets_result_t *result, 4006 size_t *cntp) { 4007 return rd_kafka_admin_result_ret_groups((const rd_kafka_op_t *)result, 4008 cntp); 4009 } 4010 4011 RD_EXPORT 4012 void rd_kafka_DeleteConsumerGroupOffsets ( 4013 rd_kafka_t *rk, 4014 rd_kafka_DeleteConsumerGroupOffsets_t **del_grpoffsets, 4015 size_t del_grpoffsets_cnt, 4016 const rd_kafka_AdminOptions_t *options, 4017 rd_kafka_queue_t *rkqu); 4018 4019 /**@}*/ 4020