1 /* 2 * librdkafka - Apache Kafka C library 3 * 4 * Copyright (c) 2012-2015, Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 #ifndef _RDKAFKA_OP_H_ 29 #define _RDKAFKA_OP_H_ 30 31 32 #include "rdkafka_msg.h" 33 #include "rdkafka_timer.h" 34 #include "rdkafka_admin.h" 35 36 37 /* Forward declarations */ 38 typedef struct rd_kafka_q_s rd_kafka_q_t; 39 typedef struct rd_kafka_toppar_s rd_kafka_toppar_t; 40 typedef struct rd_kafka_op_s rd_kafka_op_t; 41 42 /* One-off reply queue + reply version. 43 * All APIs that take a rd_kafka_replyq_t makes a copy of the 44 * struct as-is and grabs hold of the existing .q refcount. 45 * Think of replyq as a (Q,VERSION) tuple. */ 46 typedef struct rd_kafka_replyq_s { 47 rd_kafka_q_t *q; 48 int32_t version; 49 #if ENABLE_DEVEL 50 char *_id; /* Devel id used for debugging reference leaks. 51 * Is a strdup() of the caller's function name, 52 * which makes for easy debugging with valgrind. */ 53 #endif 54 } rd_kafka_replyq_t; 55 56 57 58 59 /** 60 * Flags used by: 61 * - rd_kafka_op_t.rko_flags 62 * - rd_kafka_buf_t.rkbuf_flags 63 */ 64 #define RD_KAFKA_OP_F_FREE 0x1 /* rd_free payload when done with it */ 65 #define RD_KAFKA_OP_F_NO_RESPONSE 0x2 /* rkbuf: Not expecting a response */ 66 #define RD_KAFKA_OP_F_CRC 0x4 /* rkbuf: Perform CRC calculation */ 67 #define RD_KAFKA_OP_F_BLOCKING 0x8 /* rkbuf: blocking protocol request */ 68 #define RD_KAFKA_OP_F_REPROCESS 0x10 /* cgrp: Reprocess at a later time. */ 69 #define RD_KAFKA_OP_F_SENT 0x20 /* rkbuf: request sent on wire */ 70 #define RD_KAFKA_OP_F_FLEXVER 0x40 /* rkbuf: flexible protocol version 71 * (KIP-482) */ 72 #define RD_KAFKA_OP_F_NEED_MAKE 0x80 /* rkbuf: request content has not 73 * been made yet, the make 74 * callback will be triggered 75 * to construct the request 76 * right before it is sent. */ 77 78 typedef enum { 79 RD_KAFKA_OP_NONE, /* No specific type, use OP_CB */ 80 RD_KAFKA_OP_FETCH, /* Kafka thread -> Application */ 81 RD_KAFKA_OP_ERR, /* Kafka thread -> Application */ 82 RD_KAFKA_OP_CONSUMER_ERR, /* Kafka thread -> Application */ 83 RD_KAFKA_OP_DR, /* Kafka thread -> Application 84 * Produce message delivery report */ 85 RD_KAFKA_OP_STATS, /* Kafka thread -> Application */ 86 87 RD_KAFKA_OP_OFFSET_COMMIT, /* any -> toppar's Broker thread */ 88 RD_KAFKA_OP_NODE_UPDATE, /* any -> Broker thread: node update */ 89 90 RD_KAFKA_OP_XMIT_BUF, /* transmit buffer: any -> broker thread */ 91 RD_KAFKA_OP_RECV_BUF, /* received response buffer: broker thr -> any */ 92 RD_KAFKA_OP_XMIT_RETRY, /* retry buffer xmit: any -> broker thread */ 93 RD_KAFKA_OP_FETCH_START, /* Application -> toppar's handler thread */ 94 RD_KAFKA_OP_FETCH_STOP, /* Application -> toppar's handler thread */ 95 RD_KAFKA_OP_SEEK, /* Application -> toppar's handler thread */ 96 RD_KAFKA_OP_PAUSE, /* Application -> toppar's handler thread */ 97 RD_KAFKA_OP_OFFSET_FETCH, /* Broker -> broker thread: fetch offsets 98 * for topic. */ 99 100 RD_KAFKA_OP_PARTITION_JOIN, /* * -> cgrp op: add toppar to cgrp 101 * * -> broker op: add toppar to broker */ 102 RD_KAFKA_OP_PARTITION_LEAVE, /* * -> cgrp op: remove toppar from cgrp 103 * * -> broker op: remove toppar from rkb*/ 104 RD_KAFKA_OP_REBALANCE, /* broker thread -> app: 105 * group rebalance */ 106 RD_KAFKA_OP_TERMINATE, /* For generic use */ 107 RD_KAFKA_OP_COORD_QUERY, /* Query for coordinator */ 108 RD_KAFKA_OP_SUBSCRIBE, /* New subscription */ 109 RD_KAFKA_OP_ASSIGN, /* New assignment */ 110 RD_KAFKA_OP_GET_SUBSCRIPTION,/* Get current subscription. 111 * Reuses u.subscribe */ 112 RD_KAFKA_OP_GET_ASSIGNMENT, /* Get current assignment. 113 * Reuses u.assign */ 114 RD_KAFKA_OP_THROTTLE, /* Throttle info */ 115 RD_KAFKA_OP_NAME, /* Request name */ 116 RD_KAFKA_OP_CG_METADATA, /**< Request consumer metadata */ 117 RD_KAFKA_OP_OFFSET_RESET, /* Offset reset */ 118 RD_KAFKA_OP_METADATA, /* Metadata response */ 119 RD_KAFKA_OP_LOG, /* Log */ 120 RD_KAFKA_OP_WAKEUP, /* Wake-up signaling */ 121 RD_KAFKA_OP_CREATETOPICS, /**< Admin: CreateTopics: u.admin_request*/ 122 RD_KAFKA_OP_DELETETOPICS, /**< Admin: DeleteTopics: u.admin_request*/ 123 RD_KAFKA_OP_CREATEPARTITIONS,/**< Admin: CreatePartitions: 124 * u.admin_request*/ 125 RD_KAFKA_OP_ALTERCONFIGS, /**< Admin: AlterConfigs: u.admin_request*/ 126 RD_KAFKA_OP_DESCRIBECONFIGS, /**< Admin: DescribeConfigs: 127 * u.admin_request*/ 128 RD_KAFKA_OP_DELETERECORDS, /**< Admin: DeleteRecords: 129 * u.admin_request*/ 130 RD_KAFKA_OP_DELETEGROUPS, /**< Admin: DeleteGroups: u.admin_request*/ 131 RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS, /**< Admin: 132 * DeleteConsumerGroupOffsets 133 * u.admin_request */ 134 RD_KAFKA_OP_ADMIN_FANOUT, /**< Admin: fanout request */ 135 RD_KAFKA_OP_ADMIN_RESULT, /**< Admin API .._result_t */ 136 RD_KAFKA_OP_PURGE, /**< Purge queues */ 137 RD_KAFKA_OP_CONNECT, /**< Connect (to broker) */ 138 RD_KAFKA_OP_OAUTHBEARER_REFRESH, /**< Refresh OAUTHBEARER token */ 139 RD_KAFKA_OP_MOCK, /**< Mock cluster command */ 140 RD_KAFKA_OP_BROKER_MONITOR, /**< Broker state change */ 141 RD_KAFKA_OP_TXN, /**< Transaction command */ 142 RD_KAFKA_OP_GET_REBALANCE_PROTOCOL, /**< Get rebalance protocol */ 143 RD_KAFKA_OP_LEADERS, /**< Partition leader query */ 144 RD_KAFKA_OP_BARRIER, /**< Version barrier bump */ 145 RD_KAFKA_OP__END 146 } rd_kafka_op_type_t; 147 148 /* Flags used with op_type_t */ 149 #define RD_KAFKA_OP_CB (int)(1 << 29) /* Callback op. */ 150 #define RD_KAFKA_OP_REPLY (int)(1 << 30) /* Reply op. */ 151 #define RD_KAFKA_OP_FLAGMASK (RD_KAFKA_OP_CB | RD_KAFKA_OP_REPLY) 152 153 154 /** 155 * @brief Op/queue priority levels. 156 * @remark Since priority levels alter the FIFO order, pay extra attention 157 * to preserve ordering as deemed necessary. 158 * @remark Priority should only be set on ops destined for application 159 * facing queues (rk_rep, rkcg_q, etc). 160 */ 161 typedef enum { 162 RD_KAFKA_PRIO_NORMAL = 0, /* Normal bulk, messages, DRs, etc. */ 163 RD_KAFKA_PRIO_MEDIUM, /* Prioritize in front of bulk, 164 * still at some scale. e.g. logs, .. */ 165 RD_KAFKA_PRIO_HIGH, /* Small scale high priority */ 166 RD_KAFKA_PRIO_FLASH /* Micro scale, immediate delivery. */ 167 } rd_kafka_prio_t; 168 169 170 /** 171 * @brief Op handler result 172 * 173 * @remark When returning YIELD from a handler the handler will 174 * need to have made sure to either re-enqueue the op or destroy it 175 * since the caller will not touch the op anymore. 176 */ 177 typedef enum { 178 RD_KAFKA_OP_RES_PASS, /* Not handled, pass to caller */ 179 RD_KAFKA_OP_RES_HANDLED, /* Op was handled (through callbacks) */ 180 RD_KAFKA_OP_RES_KEEP, /* Op was handled (through callbacks) 181 * but must not be destroyed by op_handle(). 182 * It is NOT PERMITTED to return RES_KEEP 183 * from a callback handling a ERR__DESTROY 184 * event. */ 185 RD_KAFKA_OP_RES_YIELD /* Callback called yield */ 186 } rd_kafka_op_res_t; 187 188 189 /** 190 * @brief Queue serve callback call type 191 */ 192 typedef enum { 193 RD_KAFKA_Q_CB_INVALID, /* dont use */ 194 RD_KAFKA_Q_CB_CALLBACK,/* trigger callback based on op */ 195 RD_KAFKA_Q_CB_RETURN, /* return op rather than trigger callback 196 * (if possible)*/ 197 RD_KAFKA_Q_CB_FORCE_RETURN, /* return op, regardless of callback. */ 198 RD_KAFKA_Q_CB_EVENT /* like _Q_CB_RETURN but return event_t:ed op */ 199 } rd_kafka_q_cb_type_t; 200 201 /** 202 * @brief Queue serve callback 203 * @remark See rd_kafka_op_res_t docs for return semantics. 204 */ 205 typedef rd_kafka_op_res_t 206 (rd_kafka_q_serve_cb_t) (rd_kafka_t *rk, 207 struct rd_kafka_q_s *rkq, 208 struct rd_kafka_op_s *rko, 209 rd_kafka_q_cb_type_t cb_type, void *opaque) 210 RD_WARN_UNUSED_RESULT; 211 212 /** 213 * @brief Enumerates the assign op sub-types. 214 */ 215 typedef enum { 216 RD_KAFKA_ASSIGN_METHOD_ASSIGN, /**< Absolute assign/unassign */ 217 RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN, /**< Incremental assign */ 218 RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN /**< Incremental unassign */ 219 } rd_kafka_assign_method_t; 220 221 /** 222 * @brief Op callback type 223 */ 224 typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk, 225 rd_kafka_q_t *rkq, 226 struct rd_kafka_op_s *rko) 227 RD_WARN_UNUSED_RESULT; 228 229 /* Forward declaration */ 230 struct rd_kafka_admin_worker_cbs; 231 struct rd_kafka_admin_fanout_worker_cbs; 232 233 234 #define RD_KAFKA_OP_TYPE_ASSERT(rko,type) \ 235 rd_assert(((rko)->rko_type & ~RD_KAFKA_OP_FLAGMASK) == (type)) 236 237 struct rd_kafka_op_s { 238 TAILQ_ENTRY(rd_kafka_op_s) rko_link; 239 240 rd_kafka_op_type_t rko_type; /* Internal op type */ 241 rd_kafka_event_type_t rko_evtype; 242 int rko_flags; /* See RD_KAFKA_OP_F_... above */ 243 int32_t rko_version; 244 rd_kafka_resp_err_t rko_err; 245 rd_kafka_error_t *rko_error; 246 int32_t rko_len; /* Depends on type, typically the 247 * message length. */ 248 rd_kafka_prio_t rko_prio; /**< In-queue priority. 249 * Higher value means higher prio*/ 250 251 rd_kafka_toppar_t *rko_rktp; 252 253 /* 254 * Generic fields 255 */ 256 257 /* Indicates request: enqueue reply on rko_replyq.q with .version. 258 * .q is refcounted. */ 259 rd_kafka_replyq_t rko_replyq; 260 261 /* Original queue's op serve callback and opaque, if any. 262 * Mainly used for forwarded queues to use the original queue's 263 * serve function from the forwarded position. */ 264 rd_kafka_q_serve_cb_t *rko_serve; 265 void *rko_serve_opaque; 266 267 rd_kafka_t *rko_rk; 268 269 #if ENABLE_DEVEL 270 const char *rko_source; /**< Where op was created */ 271 #endif 272 273 /* RD_KAFKA_OP_CB */ 274 rd_kafka_op_cb_t *rko_op_cb; 275 276 union { 277 struct { 278 rd_kafka_buf_t *rkbuf; 279 rd_kafka_msg_t rkm; 280 int evidx; 281 } fetch; 282 283 struct { 284 rd_kafka_topic_partition_list_t *partitions; 285 /** Require stable (txn-commited) offsets */ 286 rd_bool_t require_stable; 287 int do_free; /* free .partitions on destroy() */ 288 } offset_fetch; 289 290 struct { 291 rd_kafka_topic_partition_list_t *partitions; 292 void (*cb) (rd_kafka_t *rk, 293 rd_kafka_resp_err_t err, 294 rd_kafka_topic_partition_list_t *offsets, 295 void *opaque); 296 void *opaque; 297 int silent_empty; /**< Fail silently if there are no 298 * offsets to commit. */ 299 rd_ts_t ts_timeout; 300 char *reason; 301 } offset_commit; 302 303 struct { 304 rd_kafka_topic_partition_list_t *topics; 305 } subscribe; /* also used for GET_SUBSCRIPTION */ 306 307 struct { 308 rd_kafka_topic_partition_list_t *partitions; 309 rd_kafka_assign_method_t method; 310 } assign; /* also used for GET_ASSIGNMENT */ 311 312 struct { 313 rd_kafka_topic_partition_list_t *partitions; 314 } rebalance; 315 316 struct { 317 const char *str; 318 } rebalance_protocol; 319 320 struct { 321 char *str; 322 } name; 323 324 rd_kafka_consumer_group_metadata_t *cg_metadata; 325 326 struct { 327 int64_t offset; 328 char *errstr; 329 rd_kafka_msg_t rkm; 330 rd_kafka_topic_t *rkt; 331 int fatal; /**< This was a ERR__FATAL error that has 332 * been translated to the fatal error 333 * code. */ 334 } err; /* used for ERR and CONSUMER_ERR */ 335 336 struct { 337 int throttle_time; 338 int32_t nodeid; 339 char *nodename; 340 } throttle; 341 342 struct { 343 char *json; 344 size_t json_len; 345 } stats; 346 347 struct { 348 rd_kafka_buf_t *rkbuf; 349 } xbuf; /* XMIT_BUF and RECV_BUF */ 350 351 /* RD_KAFKA_OP_METADATA */ 352 struct { 353 rd_kafka_metadata_t *md; 354 int force; /* force request regardless of outstanding 355 * metadata requests. */ 356 } metadata; 357 358 struct { 359 rd_kafka_topic_t *rkt; 360 rd_kafka_msgq_t msgq; 361 rd_kafka_msgq_t msgq2; 362 int do_purge2; 363 } dr; 364 365 struct { 366 int32_t nodeid; 367 char nodename[RD_KAFKA_NODENAME_SIZE]; 368 } node; 369 370 struct { 371 int64_t offset; 372 char *reason; 373 } offset_reset; 374 375 struct { 376 int64_t offset; 377 struct rd_kafka_cgrp_s *rkcg; 378 } fetch_start; /* reused for SEEK */ 379 380 struct { 381 int pause; 382 int flag; 383 } pause; 384 385 struct { 386 char fac[64]; 387 int level; 388 char *str; 389 int ctx; 390 } log; 391 392 struct { 393 rd_kafka_AdminOptions_t options; /**< Copy of user's 394 * options */ 395 rd_ts_t abs_timeout; /**< Absolute timeout 396 * for this request. */ 397 rd_kafka_timer_t tmr; /**< Timeout timer */ 398 struct rd_kafka_enq_once_s *eonce; /**< Enqueue op 399 * only once, 400 * used to 401 * (re)trigger 402 * the request op 403 * upon broker state 404 * changes while 405 * waiting for the 406 * controller, or 407 * due to .tmr 408 * timeout. */ 409 rd_list_t args;/**< Type depends on request, e.g. 410 * rd_kafka_NewTopic_t for CreateTopics 411 */ 412 413 rd_kafka_buf_t *reply_buf; /**< Protocol reply, 414 * temporary reference not 415 * owned by this rko */ 416 417 /**< Worker callbacks, see rdkafka_admin.c */ 418 struct rd_kafka_admin_worker_cbs *cbs; 419 420 /** Worker state */ 421 enum { 422 RD_KAFKA_ADMIN_STATE_INIT, 423 RD_KAFKA_ADMIN_STATE_WAIT_BROKER, 424 RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER, 425 RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS, 426 RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST, 427 RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE, 428 } state; 429 430 int32_t broker_id; /**< Requested broker id to 431 * communicate with. 432 * Used for AlterConfigs, et.al, 433 * that needs to speak to a 434 * specific broker rather than 435 * the controller. 436 * See RD_KAFKA_ADMIN_TARGET_.. 437 * for special values (coordinator, 438 * fanout, etc). 439 */ 440 /** The type of coordinator to look up */ 441 rd_kafka_coordtype_t coordtype; 442 /** Which coordinator to look up */ 443 char *coordkey; 444 445 /** Application's reply queue */ 446 rd_kafka_replyq_t replyq; 447 rd_kafka_event_type_t reply_event_type; 448 449 /** A collection of fanout child ops. */ 450 struct { 451 /** The type of request being fanned out. 452 * This is used for the ADMIN_RESULT. */ 453 rd_kafka_op_type_t reqtype; 454 455 /** Worker callbacks, see rdkafka_admin.c */ 456 struct rd_kafka_admin_fanout_worker_cbs *cbs; 457 458 /** Number of outstanding requests remaining to 459 * wait for. */ 460 int outstanding; 461 462 /** Incremental results from fanouts. 463 * This list is pre-allocated to the number 464 * of input objects and can thus be set 465 * by index to retain original ordering. */ 466 rd_list_t results; 467 468 /** Reply event type */ 469 rd_kafka_event_type_t reply_event_type; 470 471 } fanout; 472 473 /** A reference to the parent ADMIN_FANOUT op that 474 * spawned this op, if applicable. NULL otherwise. */ 475 struct rd_kafka_op_s *fanout_parent; 476 477 } admin_request; 478 479 struct { 480 rd_kafka_op_type_t reqtype; /**< Request op type, 481 * used for logging. */ 482 483 rd_list_t args; /**< Args moved from the request op 484 * when the result op is created. 485 * 486 * Type depends on request. 487 */ 488 489 char *errstr; /**< Error string, if rko_err 490 * is set, else NULL. */ 491 492 rd_list_t results; /**< Type depends on request type: 493 * 494 * (rd_kafka_topic_result_t *): 495 * CreateTopics, DeleteTopics, 496 * CreatePartitions. 497 * 498 * (rd_kafka_ConfigResource_t *): 499 * AlterConfigs, DescribeConfigs 500 */ 501 502 void *opaque; /**< Application's opaque as set by 503 * rd_kafka_AdminOptions_set_opaque 504 */ 505 506 /** A reference to the parent ADMIN_FANOUT op that 507 * spawned this op, if applicable. NULL otherwise. */ 508 struct rd_kafka_op_s *fanout_parent; 509 } admin_result; 510 511 struct { 512 int flags; /**< purge_flags from rd_kafka_purge() */ 513 } purge; 514 515 /**< Mock cluster command */ 516 struct { 517 enum { 518 RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR, 519 RD_KAFKA_MOCK_CMD_TOPIC_CREATE, 520 RD_KAFKA_MOCK_CMD_PART_SET_LEADER, 521 RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER, 522 RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS, 523 RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN, 524 RD_KAFKA_MOCK_CMD_BROKER_SET_RTT, 525 RD_KAFKA_MOCK_CMD_BROKER_SET_RACK, 526 RD_KAFKA_MOCK_CMD_COORD_SET, 527 RD_KAFKA_MOCK_CMD_APIVERSION_SET, 528 } cmd; 529 530 rd_kafka_resp_err_t err; /**< Error for: 531 * TOPIC_SET_ERROR */ 532 char *name; /**< For: 533 * TOPIC_SET_ERROR 534 * TOPIC_CREATE 535 * PART_SET_FOLLOWER 536 * PART_SET_FOLLOWER_WMARKS 537 * BROKER_SET_RACK 538 * COORD_SET (key_type) */ 539 char *str; /**< For: 540 * COORD_SET (key) */ 541 int32_t partition; /**< For: 542 * PART_SET_FOLLOWER 543 * PART_SET_FOLLOWER_WMARKS 544 * PART_SET_LEADER 545 * APIVERSION_SET (ApiKey) 546 */ 547 int32_t broker_id; /**< For: 548 * PART_SET_FOLLOWER 549 * PART_SET_LEADER 550 * BROKER_SET_UPDOWN 551 * BROKER_SET_RACK 552 * COORD_SET */ 553 int64_t lo; /**< Low offset, for: 554 * TOPIC_CREATE (part cnt) 555 * PART_SET_FOLLOWER_WMARKS 556 * BROKER_SET_UPDOWN 557 * APIVERSION_SET (minver) 558 * BROKER_SET_RTT 559 */ 560 int64_t hi; /**< High offset, for: 561 * TOPIC_CREATE (repl fact) 562 * PART_SET_FOLLOWER_WMARKS 563 * APIVERSION_SET (maxver) 564 */ 565 } mock; 566 567 struct { 568 struct rd_kafka_broker_s *rkb; /**< Broker who's state 569 * changed. */ 570 /**< Callback to trigger on the op handler's thread. */ 571 void (*cb) (struct rd_kafka_broker_s *rkb); 572 } broker_monitor; 573 574 struct { 575 /** Consumer group metadata for send_offsets_to.. */ 576 rd_kafka_consumer_group_metadata_t *cgmetadata; 577 /** Consumer group id for AddOffsetsTo.. */ 578 char *group_id; 579 int timeout_ms; /**< Operation timeout */ 580 rd_ts_t abs_timeout; /**< Absolute time */ 581 /**< Offsets to commit */ 582 rd_kafka_topic_partition_list_t *offsets; 583 } txn; 584 585 struct { 586 /* This struct serves two purposes, the fields 587 * with "Request:" are used for the async workers state 588 * while the "Reply:" fields is a separate reply 589 * rko that is enqueued for the caller upon 590 * completion or failure. */ 591 592 /** Request: Partitions to query. 593 * Reply: Queried partitions with .err field set. */ 594 rd_kafka_topic_partition_list_t *partitions; 595 596 /** Request: Absolute timeout */ 597 rd_ts_t ts_timeout; 598 599 /** Request: Metadata query timer */ 600 rd_kafka_timer_t query_tmr; 601 602 /** Request: Timeout timer */ 603 rd_kafka_timer_t timeout_tmr; 604 605 /** Request: Enqueue op only once, used to (re)trigger 606 * metadata cache lookups, topic refresh, timeout. */ 607 struct rd_kafka_enq_once_s *eonce; 608 609 /** Request: Caller's replyq */ 610 rd_kafka_replyq_t replyq; 611 612 /** Request: Number of metadata queries made. */ 613 int query_cnt; 614 615 /** Reply: Leaders (result) 616 * (rd_kafka_partition_leader*) */ 617 rd_list_t *leaders; 618 619 /** Reply: Callback on completion (or failure) */ 620 rd_kafka_op_cb_t *cb; 621 622 /** Reply: Callback opaque */ 623 void *opaque; 624 625 } leaders; 626 627 } rko_u; 628 }; 629 630 TAILQ_HEAD(rd_kafka_op_head_s, rd_kafka_op_s); 631 632 633 634 635 const char *rd_kafka_op2str (rd_kafka_op_type_t type); 636 void rd_kafka_op_destroy (rd_kafka_op_t *rko); 637 rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type); 638 #if ENABLE_DEVEL 639 #define _STRINGIFYX(A) #A 640 #define _STRINGIFY(A) _STRINGIFYX(A) 641 #define rd_kafka_op_new(type) \ 642 rd_kafka_op_new0(__FILE__ ":" _STRINGIFY(__LINE__), type) 643 #else 644 #define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type) 645 #endif 646 rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig, 647 rd_kafka_resp_err_t err); 648 rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk, 649 rd_kafka_op_type_t type, 650 rd_kafka_op_cb_t *cb); 651 int rd_kafka_op_reply (rd_kafka_op_t *rko, 652 rd_kafka_resp_err_t err); 653 int rd_kafka_op_error_reply (rd_kafka_op_t *rko, 654 rd_kafka_error_t *error); 655 656 #define rd_kafka_op_set_prio(rko,prio) ((rko)->rko_prio = prio) 657 658 #define rd_kafka_op_err(rk,err,...) do { \ 659 if (!((rk)->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)) { \ 660 rd_kafka_log(rk, LOG_ERR, "ERROR", __VA_ARGS__); \ 661 break; \ 662 } \ 663 rd_kafka_q_op_err((rk)->rk_rep, err, __VA_ARGS__); \ 664 } while (0) 665 666 void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_resp_err_t err, 667 const char *fmt, ...) 668 RD_FORMAT(printf, 3, 4); 669 void rd_kafka_consumer_err (rd_kafka_q_t *rkq, int32_t broker_id, 670 rd_kafka_resp_err_t err, int32_t version, 671 const char *topic, rd_kafka_toppar_t *rktp, 672 int64_t offset, const char *fmt, ...) 673 RD_FORMAT(printf, 8, 9); 674 rd_kafka_op_t *rd_kafka_op_req0 (rd_kafka_q_t *destq, 675 rd_kafka_q_t *recvq, 676 rd_kafka_op_t *rko, 677 int timeout_ms); 678 rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq, 679 rd_kafka_op_t *rko, 680 int timeout_ms); 681 rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type); 682 rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko); 683 rd_kafka_error_t *rd_kafka_op_error_destroy (rd_kafka_op_t *rko); 684 685 rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk, 686 rd_kafka_q_t *rkq, rd_kafka_op_t *rko) 687 RD_WARN_UNUSED_RESULT; 688 689 rd_kafka_op_t * 690 rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp, 691 rd_kafka_toppar_t *rktp, 692 int32_t version, 693 rd_kafka_buf_t *rkbuf, 694 int64_t offset, 695 size_t key_len, const void *key, 696 size_t val_len, const void *val); 697 698 rd_kafka_op_t * 699 rd_kafka_op_new_ctrl_msg (rd_kafka_toppar_t *rktp, 700 int32_t version, 701 rd_kafka_buf_t *rkbuf, 702 int64_t offset); 703 704 void rd_kafka_op_throttle_time (struct rd_kafka_broker_s *rkb, 705 rd_kafka_q_t *rkq, 706 int throttle_time); 707 708 709 rd_kafka_op_res_t 710 rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, 711 rd_kafka_q_cb_type_t cb_type, void *opaque, 712 rd_kafka_q_serve_cb_t *callback) RD_WARN_UNUSED_RESULT; 713 714 715 extern rd_atomic32_t rd_kafka_op_cnt; 716 717 void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko); 718 719 void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko); 720 721 722 #define rd_kafka_op_is_ctrl_msg(rko) \ 723 ((rko)->rko_type == RD_KAFKA_OP_FETCH && \ 724 !(rko)->rko_err && \ 725 ((rko)->rko_u.fetch.rkm.rkm_flags & RD_KAFKA_MSG_F_CONTROL)) 726 727 728 729 /** 730 * @returns true if the rko's replyq is valid and the 731 * rko's rktp version (if any) is not outdated. 732 */ 733 #define rd_kafka_op_replyq_is_valid(RKO) \ 734 (rd_kafka_replyq_is_valid(&(RKO)->rko_replyq) && \ 735 !rd_kafka_op_version_outdated((RKO), 0)) 736 737 #endif /* _RDKAFKA_OP_H_ */ 738