1 /* 2 * librdkafka - Apache Kafka C/C++ library 3 * 4 * Copyright (c) 2014 Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #ifndef _RDKAFKACPP_INT_H_ 30 #define _RDKAFKACPP_INT_H_ 31 32 #include <string> 33 #include <iostream> 34 #include <cstring> 35 #include <stdlib.h> 36 37 #include "rdkafkacpp.h" 38 39 extern "C" { 40 #include "../src/rdkafka.h" 41 } 42 43 #ifdef _MSC_VER 44 /* Visual Studio */ 45 #include "../src/win32_config.h" 46 #else 47 /* POSIX / UNIX based systems */ 48 #include "../config.h" /* mklove output */ 49 #endif 50 51 #ifdef _MSC_VER 52 typedef int mode_t; 53 #pragma warning(disable : 4250) 54 #endif 55 56 57 namespace RdKafka { 58 59 void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque); 60 void log_cb_trampoline (const rd_kafka_t *rk, int level, 61 const char *fac, const char *buf); 62 void error_cb_trampoline (rd_kafka_t *rk, int err, const char *reason, 63 void *opaque); 64 void throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name, 65 int32_t broker_id, int throttle_time_ms, 66 void *opaque); 67 int stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len, 68 void *opaque); 69 int socket_cb_trampoline (int domain, int type, int protocol, void *opaque); 70 int open_cb_trampoline (const char *pathname, int flags, mode_t mode, 71 void *opaque); 72 void rebalance_cb_trampoline (rd_kafka_t *rk, 73 rd_kafka_resp_err_t err, 74 rd_kafka_topic_partition_list_t *c_partitions, 75 void *opaque); 76 void offset_commit_cb_trampoline0 ( 77 rd_kafka_t *rk, 78 rd_kafka_resp_err_t err, 79 rd_kafka_topic_partition_list_t *c_offsets, void *opaque); 80 void oauthbearer_token_refresh_cb_trampoline (rd_kafka_t *rk, 81 const char *oauthbearer_config, 82 void *opaque); 83 84 int ssl_cert_verify_cb_trampoline ( 85 rd_kafka_t *rk, 86 const char *broker_name, 87 int32_t broker_id, 88 int *x509_error, 89 int depth, 90 const char *buf, size_t size, 91 char *errstr, size_t errstr_size, 92 void *opaque); 93 94 rd_kafka_topic_partition_list_t * 95 partitions_to_c_parts (const std::vector<TopicPartition*> &partitions); 96 97 /** 98 * @brief Update the application provided 'partitions' with info from 'c_parts' 99 */ 100 void update_partitions_from_c_parts (std::vector<TopicPartition*> &partitions, 101 const rd_kafka_topic_partition_list_t *c_parts); 102 103 104 class ErrorImpl : public Error { 105 public: ~ErrorImpl()106 ~ErrorImpl () { 107 rd_kafka_error_destroy(c_error_); 108 }; 109 ErrorImpl(ErrorCode code,const std::string * errstr)110 ErrorImpl (ErrorCode code, const std::string *errstr) { 111 c_error_ = rd_kafka_error_new(static_cast<rd_kafka_resp_err_t>(code), 112 errstr ? "%s" : NULL, 113 errstr ? errstr->c_str() : NULL); 114 } 115 ErrorImpl(rd_kafka_error_t * c_error)116 ErrorImpl (rd_kafka_error_t *c_error): 117 c_error_(c_error) {}; 118 create(ErrorCode code,const std::string * errstr)119 static Error *create (ErrorCode code, const std::string *errstr) { 120 return new ErrorImpl(code, errstr); 121 } 122 code()123 ErrorCode code () const { 124 return static_cast<ErrorCode>(rd_kafka_error_code(c_error_)); 125 } 126 name()127 std::string name () const { 128 return std::string(rd_kafka_error_name(c_error_)); 129 } 130 str()131 std::string str () const { 132 return std::string(rd_kafka_error_string(c_error_)); 133 } 134 is_fatal()135 bool is_fatal () const { 136 return rd_kafka_error_is_fatal(c_error_); 137 } 138 is_retriable()139 bool is_retriable () const { 140 return rd_kafka_error_is_retriable(c_error_); 141 } 142 txn_requires_abort()143 bool txn_requires_abort () const { 144 return rd_kafka_error_txn_requires_abort(c_error_); 145 } 146 147 rd_kafka_error_t *c_error_; 148 }; 149 150 151 class EventImpl : public Event { 152 public: ~EventImpl()153 ~EventImpl () {}; 154 EventImpl(Type type,ErrorCode err,Severity severity,const char * fac,const char * str)155 EventImpl (Type type, ErrorCode err, Severity severity, 156 const char *fac, const char *str): 157 type_(type), err_(err), severity_(severity), fac_(fac ? fac : ""), 158 str_(str), id_(0), throttle_time_(0) {}; 159 EventImpl(Type type)160 EventImpl (Type type): 161 type_(type), err_(ERR_NO_ERROR), severity_(EVENT_SEVERITY_EMERG), 162 fac_(""), str_(""), id_(0), throttle_time_(0) {}; 163 type()164 Type type () const { return type_; } err()165 ErrorCode err () const { return err_; } severity()166 Severity severity () const { return severity_; } fac()167 std::string fac () const { return fac_; } str()168 std::string str () const { return str_; } broker_name()169 std::string broker_name () const { 170 if (type_ == EVENT_THROTTLE) 171 return str_; 172 else 173 return std::string(""); 174 } broker_id()175 int broker_id () const { return id_; } throttle_time()176 int throttle_time () const { return throttle_time_; } 177 fatal()178 bool fatal () const { return fatal_; } 179 180 Type type_; 181 ErrorCode err_; 182 Severity severity_; 183 std::string fac_; 184 std::string str_; /* reused for THROTTLE broker_name */ 185 int id_; 186 int throttle_time_; 187 bool fatal_; 188 }; 189 190 191 class HeadersImpl : public Headers { 192 public: HeadersImpl()193 HeadersImpl (): 194 headers_ (rd_kafka_headers_new(8)) {} 195 HeadersImpl(rd_kafka_headers_t * headers)196 HeadersImpl (rd_kafka_headers_t *headers): 197 headers_ (headers) {} 198 HeadersImpl(const std::vector<Header> & headers)199 HeadersImpl (const std::vector<Header> &headers) { 200 if (headers.size() > 0) { 201 headers_ = rd_kafka_headers_new(headers.size()); 202 from_vector(headers); 203 } else { 204 headers_ = rd_kafka_headers_new(8); 205 } 206 } 207 ~HeadersImpl()208 ~HeadersImpl() { 209 if (headers_) { 210 rd_kafka_headers_destroy(headers_); 211 } 212 } 213 add(const std::string & key,const char * value)214 ErrorCode add(const std::string& key, const char *value) { 215 rd_kafka_resp_err_t err; 216 err = rd_kafka_header_add(headers_, 217 key.c_str(), key.size(), 218 value, -1); 219 return static_cast<RdKafka::ErrorCode>(err); 220 } 221 add(const std::string & key,const void * value,size_t value_size)222 ErrorCode add(const std::string& key, const void *value, size_t value_size) { 223 rd_kafka_resp_err_t err; 224 err = rd_kafka_header_add(headers_, 225 key.c_str(), key.size(), 226 value, value_size); 227 return static_cast<RdKafka::ErrorCode>(err); 228 } 229 add(const std::string & key,const std::string & value)230 ErrorCode add(const std::string &key, const std::string &value) { 231 rd_kafka_resp_err_t err; 232 err = rd_kafka_header_add(headers_, 233 key.c_str(), key.size(), 234 value.c_str(), value.size()); 235 return static_cast<RdKafka::ErrorCode>(err); 236 } 237 add(const Header & header)238 ErrorCode add(const Header &header) { 239 rd_kafka_resp_err_t err; 240 err = rd_kafka_header_add(headers_, 241 header.key().c_str(), header.key().size(), 242 header.value(), header.value_size()); 243 return static_cast<RdKafka::ErrorCode>(err); 244 } 245 remove(const std::string & key)246 ErrorCode remove(const std::string& key) { 247 rd_kafka_resp_err_t err; 248 err = rd_kafka_header_remove (headers_, key.c_str()); 249 return static_cast<RdKafka::ErrorCode>(err); 250 } 251 get(const std::string & key)252 std::vector<Headers::Header> get(const std::string &key) const { 253 std::vector<Headers::Header> headers; 254 const void *value; 255 size_t size; 256 rd_kafka_resp_err_t err; 257 for (size_t idx = 0; 258 !(err = rd_kafka_header_get(headers_, idx, key.c_str(), 259 &value, &size)) ; 260 idx++) { 261 headers.push_back(Headers::Header(key, value, size)); 262 } 263 return headers; 264 } 265 get_last(const std::string & key)266 Headers::Header get_last(const std::string& key) const { 267 const void *value; 268 size_t size; 269 rd_kafka_resp_err_t err; 270 err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size); 271 return Headers::Header(key, value, size, 272 static_cast<RdKafka::ErrorCode>(err)); 273 } 274 get_all()275 std::vector<Headers::Header> get_all() const { 276 std::vector<Headers::Header> headers; 277 size_t idx = 0; 278 const char *name; 279 const void *valuep; 280 size_t size; 281 while (!rd_kafka_header_get_all(headers_, idx++, 282 &name, &valuep, &size)) { 283 headers.push_back(Headers::Header(name, valuep, size)); 284 } 285 return headers; 286 } 287 size()288 size_t size() const { 289 return rd_kafka_header_cnt(headers_); 290 } 291 292 /** @brief Reset the C headers pointer to NULL. */ c_headers_destroyed()293 void c_headers_destroyed() { 294 headers_ = NULL; 295 } 296 297 /** @returns the underlying C headers, or NULL. */ c_ptr()298 rd_kafka_headers_t *c_ptr() { 299 return headers_; 300 } 301 302 303 private: from_vector(const std::vector<Header> & headers)304 void from_vector(const std::vector<Header> &headers) { 305 if (headers.size() == 0) 306 return; 307 for (std::vector<Header>::const_iterator it = headers.begin(); 308 it != headers.end(); it++) 309 this->add(*it); 310 } 311 312 HeadersImpl(HeadersImpl const&) /*= delete*/; 313 HeadersImpl& operator=(HeadersImpl const&) /*= delete*/; 314 315 rd_kafka_headers_t *headers_; 316 }; 317 318 319 320 class MessageImpl : public Message { 321 public: ~MessageImpl()322 ~MessageImpl () { 323 if (free_rkmessage_) 324 rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_)); 325 if (key_) 326 delete key_; 327 if (headers_) 328 delete headers_; 329 }; 330 MessageImpl(RdKafka::Topic * topic,rd_kafka_message_t * rkmessage)331 MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage): 332 topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), 333 headers_(NULL) {} 334 MessageImpl(RdKafka::Topic * topic,rd_kafka_message_t * rkmessage,bool dofree)335 MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage, 336 bool dofree): 337 topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL), 338 headers_(NULL) {} 339 MessageImpl(rd_kafka_message_t * rkmessage)340 MessageImpl (rd_kafka_message_t *rkmessage): 341 topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL), 342 headers_(NULL) { 343 if (rkmessage->rkt) { 344 /* Possibly NULL */ 345 topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt)); 346 } 347 } 348 349 /* Create errored message */ MessageImpl(RdKafka::Topic * topic,RdKafka::ErrorCode err)350 MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err): 351 topic_(topic), free_rkmessage_(false), key_(NULL), headers_(NULL) { 352 rkmessage_ = &rkmessage_err_; 353 memset(&rkmessage_err_, 0, sizeof(rkmessage_err_)); 354 rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err); 355 } 356 errstr()357 std::string errstr() const { 358 /* FIXME: If there is an error string in payload (for consume_cb) 359 * it wont be shown since 'payload' is reused for errstr 360 * and we cant distinguish between consumer and producer. 361 * For the producer case the payload needs to be the original 362 * payload pointer. */ 363 const char *es = rd_kafka_err2str(rkmessage_->err); 364 return std::string(es ? es : ""); 365 } 366 err()367 ErrorCode err () const { 368 return static_cast<RdKafka::ErrorCode>(rkmessage_->err); 369 } 370 topic()371 Topic *topic () const { return topic_; } topic_name()372 std::string topic_name () const { 373 if (rkmessage_->rkt) 374 return rd_kafka_topic_name(rkmessage_->rkt); 375 else 376 return ""; 377 } partition()378 int32_t partition () const { return rkmessage_->partition; } payload()379 void *payload () const { return rkmessage_->payload; } len()380 size_t len () const { return rkmessage_->len; } key()381 const std::string *key () const { 382 if (key_) { 383 return key_; 384 } else if (rkmessage_->key) { 385 key_ = new std::string(static_cast<char const*>(rkmessage_->key), rkmessage_->key_len); 386 return key_; 387 } 388 return NULL; 389 } key_pointer()390 const void *key_pointer () const { return rkmessage_->key; } key_len()391 size_t key_len () const { return rkmessage_->key_len; } 392 offset()393 int64_t offset () const { return rkmessage_->offset; } 394 timestamp()395 MessageTimestamp timestamp () const { 396 MessageTimestamp ts; 397 rd_kafka_timestamp_type_t tstype; 398 ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype); 399 ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype); 400 return ts; 401 } 402 msg_opaque()403 void *msg_opaque () const { return rkmessage_->_private; }; 404 latency()405 int64_t latency () const { 406 return rd_kafka_message_latency(rkmessage_); 407 } 408 c_ptr()409 struct rd_kafka_message_s *c_ptr () { 410 return rkmessage_; 411 } 412 status()413 Status status () const { 414 return static_cast<Status>(rd_kafka_message_status(rkmessage_)); 415 } 416 headers()417 Headers *headers () { 418 ErrorCode err; 419 return headers(&err); 420 } 421 headers(ErrorCode * err)422 Headers *headers (ErrorCode *err) { 423 *err = ERR_NO_ERROR; 424 425 if (!headers_) { 426 rd_kafka_headers_t *c_hdrs; 427 rd_kafka_resp_err_t c_err; 428 429 if ((c_err = rd_kafka_message_detach_headers(rkmessage_, &c_hdrs))) { 430 *err = static_cast<RdKafka::ErrorCode>(c_err); 431 return NULL; 432 } 433 434 headers_ = new HeadersImpl(c_hdrs); 435 } 436 437 return headers_; 438 } 439 440 RdKafka::Topic *topic_; 441 rd_kafka_message_t *rkmessage_; 442 bool free_rkmessage_; 443 /* For error signalling by the C++ layer the .._err_ message is 444 * used as a place holder and rkmessage_ is set to point to it. */ 445 rd_kafka_message_t rkmessage_err_; 446 mutable std::string *key_; /* mutable because it's a cached value */ 447 448 private: 449 /* "delete" copy ctor + copy assignment, for safety of key_ */ 450 MessageImpl(MessageImpl const&) /*= delete*/; 451 MessageImpl& operator=(MessageImpl const&) /*= delete*/; 452 453 RdKafka::Headers *headers_; 454 }; 455 456 457 class ConfImpl : public Conf { 458 public: ConfImpl()459 ConfImpl() 460 :consume_cb_(NULL), 461 dr_cb_(NULL), 462 event_cb_(NULL), 463 socket_cb_(NULL), 464 open_cb_(NULL), 465 partitioner_cb_(NULL), 466 partitioner_kp_cb_(NULL), 467 rebalance_cb_(NULL), 468 offset_commit_cb_(NULL), 469 oauthbearer_token_refresh_cb_(NULL), 470 ssl_cert_verify_cb_(NULL), 471 rk_conf_(NULL), 472 rkt_conf_(NULL){} ~ConfImpl()473 ~ConfImpl () { 474 if (rk_conf_) 475 rd_kafka_conf_destroy(rk_conf_); 476 else if (rkt_conf_) 477 rd_kafka_topic_conf_destroy(rkt_conf_); 478 } 479 480 Conf::ConfResult set(const std::string &name, 481 const std::string &value, 482 std::string &errstr); 483 set(const std::string & name,DeliveryReportCb * dr_cb,std::string & errstr)484 Conf::ConfResult set (const std::string &name, DeliveryReportCb *dr_cb, 485 std::string &errstr) { 486 if (name != "dr_cb") { 487 errstr = "Invalid value type, expected RdKafka::DeliveryReportCb"; 488 return Conf::CONF_INVALID; 489 } 490 491 if (!rk_conf_) { 492 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 493 return Conf::CONF_INVALID; 494 } 495 496 dr_cb_ = dr_cb; 497 return Conf::CONF_OK; 498 } 499 set(const std::string & name,OAuthBearerTokenRefreshCb * oauthbearer_token_refresh_cb,std::string & errstr)500 Conf::ConfResult set (const std::string &name, 501 OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, 502 std::string &errstr) { 503 if (name != "oauthbearer_token_refresh_cb") { 504 errstr = "Invalid value type, expected RdKafka::OAuthBearerTokenRefreshCb"; 505 return Conf::CONF_INVALID; 506 } 507 508 if (!rk_conf_) { 509 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 510 return Conf::CONF_INVALID; 511 } 512 513 oauthbearer_token_refresh_cb_ = oauthbearer_token_refresh_cb; 514 return Conf::CONF_OK; 515 } 516 set(const std::string & name,EventCb * event_cb,std::string & errstr)517 Conf::ConfResult set (const std::string &name, EventCb *event_cb, 518 std::string &errstr) { 519 if (name != "event_cb") { 520 errstr = "Invalid value type, expected RdKafka::EventCb"; 521 return Conf::CONF_INVALID; 522 } 523 524 if (!rk_conf_) { 525 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 526 return Conf::CONF_INVALID; 527 } 528 529 event_cb_ = event_cb; 530 return Conf::CONF_OK; 531 } 532 set(const std::string & name,const Conf * topic_conf,std::string & errstr)533 Conf::ConfResult set (const std::string &name, const Conf *topic_conf, 534 std::string &errstr) { 535 const ConfImpl *tconf_impl = 536 dynamic_cast<const RdKafka::ConfImpl *>(topic_conf); 537 if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) { 538 errstr = "Invalid value type, expected RdKafka::Conf"; 539 return Conf::CONF_INVALID; 540 } 541 542 if (!rk_conf_) { 543 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 544 return Conf::CONF_INVALID; 545 } 546 547 rd_kafka_conf_set_default_topic_conf(rk_conf_, 548 rd_kafka_topic_conf_dup(tconf_impl-> 549 rkt_conf_)); 550 551 return Conf::CONF_OK; 552 } 553 set(const std::string & name,PartitionerCb * partitioner_cb,std::string & errstr)554 Conf::ConfResult set (const std::string &name, PartitionerCb *partitioner_cb, 555 std::string &errstr) { 556 if (name != "partitioner_cb") { 557 errstr = "Invalid value type, expected RdKafka::PartitionerCb"; 558 return Conf::CONF_INVALID; 559 } 560 561 if (!rkt_conf_) { 562 errstr = "Requires RdKafka::Conf::CONF_TOPIC object"; 563 return Conf::CONF_INVALID; 564 } 565 566 partitioner_cb_ = partitioner_cb; 567 return Conf::CONF_OK; 568 } 569 set(const std::string & name,PartitionerKeyPointerCb * partitioner_kp_cb,std::string & errstr)570 Conf::ConfResult set (const std::string &name, 571 PartitionerKeyPointerCb *partitioner_kp_cb, 572 std::string &errstr) { 573 if (name != "partitioner_key_pointer_cb") { 574 errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb"; 575 return Conf::CONF_INVALID; 576 } 577 578 if (!rkt_conf_) { 579 errstr = "Requires RdKafka::Conf::CONF_TOPIC object"; 580 return Conf::CONF_INVALID; 581 } 582 583 partitioner_kp_cb_ = partitioner_kp_cb; 584 return Conf::CONF_OK; 585 } 586 set(const std::string & name,SocketCb * socket_cb,std::string & errstr)587 Conf::ConfResult set (const std::string &name, SocketCb *socket_cb, 588 std::string &errstr) { 589 if (name != "socket_cb") { 590 errstr = "Invalid value type, expected RdKafka::SocketCb"; 591 return Conf::CONF_INVALID; 592 } 593 594 if (!rk_conf_) { 595 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 596 return Conf::CONF_INVALID; 597 } 598 599 socket_cb_ = socket_cb; 600 return Conf::CONF_OK; 601 } 602 603 set(const std::string & name,OpenCb * open_cb,std::string & errstr)604 Conf::ConfResult set (const std::string &name, OpenCb *open_cb, 605 std::string &errstr) { 606 if (name != "open_cb") { 607 errstr = "Invalid value type, expected RdKafka::OpenCb"; 608 return Conf::CONF_INVALID; 609 } 610 611 if (!rk_conf_) { 612 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 613 return Conf::CONF_INVALID; 614 } 615 616 open_cb_ = open_cb; 617 return Conf::CONF_OK; 618 } 619 620 621 622 set(const std::string & name,RebalanceCb * rebalance_cb,std::string & errstr)623 Conf::ConfResult set (const std::string &name, RebalanceCb *rebalance_cb, 624 std::string &errstr) { 625 if (name != "rebalance_cb") { 626 errstr = "Invalid value type, expected RdKafka::RebalanceCb"; 627 return Conf::CONF_INVALID; 628 } 629 630 if (!rk_conf_) { 631 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 632 return Conf::CONF_INVALID; 633 } 634 635 rebalance_cb_ = rebalance_cb; 636 return Conf::CONF_OK; 637 } 638 639 set(const std::string & name,OffsetCommitCb * offset_commit_cb,std::string & errstr)640 Conf::ConfResult set (const std::string &name, 641 OffsetCommitCb *offset_commit_cb, 642 std::string &errstr) { 643 if (name != "offset_commit_cb") { 644 errstr = "Invalid value type, expected RdKafka::OffsetCommitCb"; 645 return Conf::CONF_INVALID; 646 } 647 648 if (!rk_conf_) { 649 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 650 return Conf::CONF_INVALID; 651 } 652 653 offset_commit_cb_ = offset_commit_cb; 654 return Conf::CONF_OK; 655 } 656 657 set(const std::string & name,SslCertificateVerifyCb * ssl_cert_verify_cb,std::string & errstr)658 Conf::ConfResult set (const std::string &name, 659 SslCertificateVerifyCb *ssl_cert_verify_cb, 660 std::string &errstr) { 661 if (name != "ssl_cert_verify_cb") { 662 errstr = "Invalid value type, expected RdKafka::SslCertificateVerifyCb"; 663 return Conf::CONF_INVALID; 664 } 665 666 if (!rk_conf_) { 667 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 668 return Conf::CONF_INVALID; 669 } 670 671 ssl_cert_verify_cb_ = ssl_cert_verify_cb; 672 return Conf::CONF_OK; 673 } 674 set_ssl_cert(RdKafka::CertificateType cert_type,RdKafka::CertificateEncoding cert_enc,const void * buffer,size_t size,std::string & errstr)675 Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type, 676 RdKafka::CertificateEncoding cert_enc, 677 const void *buffer, size_t size, 678 std::string &errstr) { 679 rd_kafka_conf_res_t res; 680 char errbuf[512]; 681 682 if (!rk_conf_) { 683 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 684 return Conf::CONF_INVALID; 685 } 686 687 res = rd_kafka_conf_set_ssl_cert( 688 rk_conf_, 689 static_cast<rd_kafka_cert_type_t>(cert_type), 690 static_cast<rd_kafka_cert_enc_t>(cert_enc), 691 buffer, size, errbuf, sizeof(errbuf)); 692 693 if (res != RD_KAFKA_CONF_OK) 694 errstr = errbuf; 695 696 return static_cast<Conf::ConfResult>(res); 697 } 698 699 get(const std::string & name,std::string & value)700 Conf::ConfResult get(const std::string &name, std::string &value) const { 701 if (name.compare("dr_cb") == 0 || 702 name.compare("event_cb") == 0 || 703 name.compare("partitioner_cb") == 0 || 704 name.compare("partitioner_key_pointer_cb") == 0 || 705 name.compare("socket_cb") == 0 || 706 name.compare("open_cb") == 0 || 707 name.compare("rebalance_cb") == 0 || 708 name.compare("offset_commit_cb") == 0 || 709 name.compare("oauthbearer_token_refresh_cb") == 0 || 710 name.compare("ssl_cert_verify_cb") == 0) { 711 return Conf::CONF_INVALID; 712 } 713 rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID; 714 715 /* Get size of property */ 716 size_t size; 717 if (rk_conf_) 718 res = rd_kafka_conf_get(rk_conf_, 719 name.c_str(), NULL, &size); 720 else if (rkt_conf_) 721 res = rd_kafka_topic_conf_get(rkt_conf_, 722 name.c_str(), NULL, &size); 723 if (res != RD_KAFKA_CONF_OK) 724 return static_cast<Conf::ConfResult>(res); 725 726 char *tmpValue = new char[size]; 727 728 if (rk_conf_) 729 res = rd_kafka_conf_get(rk_conf_, name.c_str(), 730 tmpValue, &size); 731 else if (rkt_conf_) 732 res = rd_kafka_topic_conf_get(rkt_conf_, 733 name.c_str(), tmpValue, &size); 734 735 if (res == RD_KAFKA_CONF_OK) 736 value.assign(tmpValue); 737 delete[] tmpValue; 738 739 return static_cast<Conf::ConfResult>(res); 740 } 741 get(DeliveryReportCb * & dr_cb)742 Conf::ConfResult get(DeliveryReportCb *&dr_cb) const { 743 if (!rk_conf_) 744 return Conf::CONF_INVALID; 745 dr_cb = this->dr_cb_; 746 return Conf::CONF_OK; 747 } 748 get(OAuthBearerTokenRefreshCb * & oauthbearer_token_refresh_cb)749 Conf::ConfResult get( 750 OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const { 751 if (!rk_conf_) 752 return Conf::CONF_INVALID; 753 oauthbearer_token_refresh_cb = this->oauthbearer_token_refresh_cb_; 754 return Conf::CONF_OK; 755 } 756 get(EventCb * & event_cb)757 Conf::ConfResult get(EventCb *&event_cb) const { 758 if (!rk_conf_) 759 return Conf::CONF_INVALID; 760 event_cb = this->event_cb_; 761 return Conf::CONF_OK; 762 } 763 get(PartitionerCb * & partitioner_cb)764 Conf::ConfResult get(PartitionerCb *&partitioner_cb) const { 765 if (!rkt_conf_) 766 return Conf::CONF_INVALID; 767 partitioner_cb = this->partitioner_cb_; 768 return Conf::CONF_OK; 769 } 770 get(PartitionerKeyPointerCb * & partitioner_kp_cb)771 Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const { 772 if (!rkt_conf_) 773 return Conf::CONF_INVALID; 774 partitioner_kp_cb = this->partitioner_kp_cb_; 775 return Conf::CONF_OK; 776 } 777 get(SocketCb * & socket_cb)778 Conf::ConfResult get(SocketCb *&socket_cb) const { 779 if (!rk_conf_) 780 return Conf::CONF_INVALID; 781 socket_cb = this->socket_cb_; 782 return Conf::CONF_OK; 783 } 784 get(OpenCb * & open_cb)785 Conf::ConfResult get(OpenCb *&open_cb) const { 786 if (!rk_conf_) 787 return Conf::CONF_INVALID; 788 open_cb = this->open_cb_; 789 return Conf::CONF_OK; 790 } 791 get(RebalanceCb * & rebalance_cb)792 Conf::ConfResult get(RebalanceCb *&rebalance_cb) const { 793 if (!rk_conf_) 794 return Conf::CONF_INVALID; 795 rebalance_cb = this->rebalance_cb_; 796 return Conf::CONF_OK; 797 } 798 get(OffsetCommitCb * & offset_commit_cb)799 Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const { 800 if (!rk_conf_) 801 return Conf::CONF_INVALID; 802 offset_commit_cb = this->offset_commit_cb_; 803 return Conf::CONF_OK; 804 } 805 get(SslCertificateVerifyCb * & ssl_cert_verify_cb)806 Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const { 807 if (!rk_conf_) 808 return Conf::CONF_INVALID; 809 ssl_cert_verify_cb = this->ssl_cert_verify_cb_; 810 return Conf::CONF_OK; 811 } 812 813 std::list<std::string> *dump (); 814 815 set(const std::string & name,ConsumeCb * consume_cb,std::string & errstr)816 Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb, 817 std::string &errstr) { 818 if (name != "consume_cb") { 819 errstr = "Invalid value type, expected RdKafka::ConsumeCb"; 820 return Conf::CONF_INVALID; 821 } 822 823 if (!rk_conf_) { 824 errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; 825 return Conf::CONF_INVALID; 826 } 827 828 consume_cb_ = consume_cb; 829 return Conf::CONF_OK; 830 } 831 c_ptr_global()832 struct rd_kafka_conf_s *c_ptr_global () { 833 if (conf_type_ == CONF_GLOBAL) 834 return rk_conf_; 835 else 836 return NULL; 837 } 838 c_ptr_topic()839 struct rd_kafka_topic_conf_s *c_ptr_topic () { 840 if (conf_type_ == CONF_TOPIC) 841 return rkt_conf_; 842 else 843 return NULL; 844 } 845 846 ConsumeCb *consume_cb_; 847 DeliveryReportCb *dr_cb_; 848 EventCb *event_cb_; 849 SocketCb *socket_cb_; 850 OpenCb *open_cb_; 851 PartitionerCb *partitioner_cb_; 852 PartitionerKeyPointerCb *partitioner_kp_cb_; 853 RebalanceCb *rebalance_cb_; 854 OffsetCommitCb *offset_commit_cb_; 855 OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_; 856 SslCertificateVerifyCb *ssl_cert_verify_cb_; 857 ConfType conf_type_; 858 rd_kafka_conf_t *rk_conf_; 859 rd_kafka_topic_conf_t *rkt_conf_; 860 }; 861 862 863 class HandleImpl : virtual public Handle { 864 public: ~HandleImpl()865 ~HandleImpl() {}; HandleImpl()866 HandleImpl () {}; name()867 const std::string name () const { return std::string(rd_kafka_name(rk_)); }; memberid()868 const std::string memberid () const { 869 char *str = rd_kafka_memberid(rk_); 870 std::string memberid = str ? str : ""; 871 if (str) 872 rd_kafka_mem_free(rk_, str); 873 return memberid; 874 } poll(int timeout_ms)875 int poll (int timeout_ms) { return rd_kafka_poll(rk_, timeout_ms); }; outq_len()876 int outq_len () { return rd_kafka_outq_len(rk_); }; 877 878 void set_common_config (RdKafka::ConfImpl *confimpl); 879 880 RdKafka::ErrorCode metadata (bool all_topics,const Topic *only_rkt, 881 Metadata **metadatap, int timeout_ms); 882 883 ErrorCode pause (std::vector<TopicPartition*> &partitions); 884 ErrorCode resume (std::vector<TopicPartition*> &partitions); 885 query_watermark_offsets(const std::string & topic,int32_t partition,int64_t * low,int64_t * high,int timeout_ms)886 ErrorCode query_watermark_offsets (const std::string &topic, 887 int32_t partition, 888 int64_t *low, int64_t *high, 889 int timeout_ms) { 890 return static_cast<RdKafka::ErrorCode>( 891 rd_kafka_query_watermark_offsets( 892 rk_, topic.c_str(), partition, 893 low, high, timeout_ms)); 894 } 895 get_watermark_offsets(const std::string & topic,int32_t partition,int64_t * low,int64_t * high)896 ErrorCode get_watermark_offsets (const std::string &topic, 897 int32_t partition, 898 int64_t *low, int64_t *high) { 899 return static_cast<RdKafka::ErrorCode>( 900 rd_kafka_get_watermark_offsets( 901 rk_, topic.c_str(), partition, 902 low, high)); 903 } 904 905 Queue *get_partition_queue (const TopicPartition *partition); 906 offsetsForTimes(std::vector<TopicPartition * > & offsets,int timeout_ms)907 ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets, 908 int timeout_ms) { 909 rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets); 910 ErrorCode err = static_cast<ErrorCode>( 911 rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms)); 912 update_partitions_from_c_parts(offsets, c_offsets); 913 rd_kafka_topic_partition_list_destroy(c_offsets); 914 return err; 915 } 916 917 ErrorCode set_log_queue (Queue *queue); 918 yield()919 void yield () { 920 rd_kafka_yield(rk_); 921 } 922 clusterid(int timeout_ms)923 const std::string clusterid (int timeout_ms) { 924 char *str = rd_kafka_clusterid(rk_, timeout_ms); 925 std::string clusterid = str ? str : ""; 926 if (str) 927 rd_kafka_mem_free(rk_, str); 928 return clusterid; 929 } 930 c_ptr()931 struct rd_kafka_s *c_ptr () { 932 return rk_; 933 } 934 controllerid(int timeout_ms)935 int32_t controllerid (int timeout_ms) { 936 return rd_kafka_controllerid(rk_, timeout_ms); 937 } 938 fatal_error(std::string & errstr)939 ErrorCode fatal_error (std::string &errstr) { 940 char errbuf[512]; 941 RdKafka::ErrorCode err = 942 static_cast<RdKafka::ErrorCode>( 943 rd_kafka_fatal_error(rk_, errbuf, sizeof(errbuf))); 944 if (err) 945 errstr = errbuf; 946 return err; 947 } 948 oauthbearer_set_token(const std::string & token_value,int64_t md_lifetime_ms,const std::string & md_principal_name,const std::list<std::string> & extensions,std::string & errstr)949 ErrorCode oauthbearer_set_token (const std::string &token_value, 950 int64_t md_lifetime_ms, 951 const std::string &md_principal_name, 952 const std::list<std::string> &extensions, 953 std::string &errstr) { 954 char errbuf[512]; 955 ErrorCode err; 956 const char **extensions_copy = new const char *[extensions.size()]; 957 int elem = 0; 958 959 for (std::list<std::string>::const_iterator it = extensions.begin(); 960 it != extensions.end(); it++) 961 extensions_copy[elem++] = it->c_str(); 962 err = static_cast<ErrorCode>(rd_kafka_oauthbearer_set_token( 963 rk_, token_value.c_str(), 964 md_lifetime_ms, 965 md_principal_name.c_str(), 966 extensions_copy, 967 extensions.size(), 968 errbuf, sizeof(errbuf))); 969 free(extensions_copy); 970 971 if (err != ERR_NO_ERROR) 972 errstr = errbuf; 973 974 return err; 975 } 976 oauthbearer_set_token_failure(const std::string & errstr)977 ErrorCode oauthbearer_set_token_failure(const std::string &errstr) { 978 return static_cast<ErrorCode>(rd_kafka_oauthbearer_set_token_failure( 979 rk_, errstr.c_str())); 980 }; 981 982 983 rd_kafka_t *rk_; 984 /* All Producer and Consumer callbacks must reside in HandleImpl and 985 * the opaque provided to rdkafka must be a pointer to HandleImpl, since 986 * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to 987 * HandleImpl due to the skewed diamond inheritance. */ 988 ConsumeCb *consume_cb_; 989 EventCb *event_cb_; 990 SocketCb *socket_cb_; 991 OpenCb *open_cb_; 992 DeliveryReportCb *dr_cb_; 993 PartitionerCb *partitioner_cb_; 994 PartitionerKeyPointerCb *partitioner_kp_cb_; 995 RebalanceCb *rebalance_cb_; 996 OffsetCommitCb *offset_commit_cb_; 997 OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_; 998 SslCertificateVerifyCb *ssl_cert_verify_cb_; 999 }; 1000 1001 1002 class TopicImpl : public Topic { 1003 public: ~TopicImpl()1004 ~TopicImpl () { 1005 rd_kafka_topic_destroy(rkt_); 1006 } 1007 name()1008 const std::string name () const { 1009 return rd_kafka_topic_name(rkt_); 1010 } 1011 partition_available(int32_t partition)1012 bool partition_available (int32_t partition) const { 1013 return !!rd_kafka_topic_partition_available(rkt_, partition); 1014 } 1015 offset_store(int32_t partition,int64_t offset)1016 ErrorCode offset_store (int32_t partition, int64_t offset) { 1017 return static_cast<RdKafka::ErrorCode>( 1018 rd_kafka_offset_store(rkt_, partition, offset)); 1019 } 1020 1021 static Topic *create (Handle &base, const std::string &topic, 1022 Conf *conf); 1023 c_ptr()1024 struct rd_kafka_topic_s *c_ptr () { 1025 return rkt_; 1026 } 1027 1028 rd_kafka_topic_t *rkt_; 1029 PartitionerCb *partitioner_cb_; 1030 PartitionerKeyPointerCb *partitioner_kp_cb_; 1031 }; 1032 1033 1034 /** 1035 * Topic and Partition 1036 */ 1037 class TopicPartitionImpl : public TopicPartition { 1038 public: ~TopicPartitionImpl()1039 ~TopicPartitionImpl() {}; 1040 1041 static TopicPartition *create (const std::string &topic, int partition); 1042 TopicPartitionImpl(const std::string & topic,int partition)1043 TopicPartitionImpl (const std::string &topic, int partition): 1044 topic_(topic), partition_(partition), offset_(RdKafka::Topic::OFFSET_INVALID), 1045 err_(ERR_NO_ERROR) {} 1046 TopicPartitionImpl(const std::string & topic,int partition,int64_t offset)1047 TopicPartitionImpl (const std::string &topic, int partition, int64_t offset): 1048 topic_(topic), partition_(partition), offset_(offset), 1049 err_(ERR_NO_ERROR) {} 1050 TopicPartitionImpl(const rd_kafka_topic_partition_t * c_part)1051 TopicPartitionImpl (const rd_kafka_topic_partition_t *c_part) { 1052 topic_ = std::string(c_part->topic); 1053 partition_ = c_part->partition; 1054 offset_ = c_part->offset; 1055 err_ = static_cast<ErrorCode>(c_part->err); 1056 // FIXME: metadata 1057 } 1058 1059 static void destroy (std::vector<TopicPartition*> &partitions); 1060 partition()1061 int partition () const { return partition_; } topic()1062 const std::string &topic () const { return topic_ ; } 1063 offset()1064 int64_t offset () const { return offset_; } 1065 err()1066 ErrorCode err () const { return err_; } 1067 set_offset(int64_t offset)1068 void set_offset (int64_t offset) { offset_ = offset; } 1069 1070 std::ostream& operator<<(std::ostream &ostrm) const { 1071 return ostrm << topic_ << " [" << partition_ << "]"; 1072 } 1073 1074 std::string topic_; 1075 int partition_; 1076 int64_t offset_; 1077 ErrorCode err_; 1078 }; 1079 1080 1081 /** 1082 * @class ConsumerGroupMetadata wraps the 1083 * C rd_kafka_consumer_group_metadata_t object. 1084 */ 1085 class ConsumerGroupMetadataImpl : public ConsumerGroupMetadata { 1086 public: ~ConsumerGroupMetadataImpl()1087 ~ConsumerGroupMetadataImpl() { 1088 rd_kafka_consumer_group_metadata_destroy(cgmetadata_); 1089 } 1090 ConsumerGroupMetadataImpl(rd_kafka_consumer_group_metadata_t * cgmetadata)1091 ConsumerGroupMetadataImpl(rd_kafka_consumer_group_metadata_t *cgmetadata): 1092 cgmetadata_(cgmetadata) {} 1093 1094 rd_kafka_consumer_group_metadata_t *cgmetadata_; 1095 }; 1096 1097 1098 class KafkaConsumerImpl : virtual public KafkaConsumer, virtual public HandleImpl { 1099 public: ~KafkaConsumerImpl()1100 ~KafkaConsumerImpl () { 1101 1102 } 1103 1104 static KafkaConsumer *create (Conf *conf, std::string &errstr); 1105 1106 ErrorCode assignment (std::vector<TopicPartition*> &partitions); 1107 ErrorCode subscription (std::vector<std::string> &topics); 1108 ErrorCode subscribe (const std::vector<std::string> &topics); 1109 ErrorCode unsubscribe (); 1110 ErrorCode assign (const std::vector<TopicPartition*> &partitions); 1111 ErrorCode unassign (); 1112 1113 Message *consume (int timeout_ms); commitSync()1114 ErrorCode commitSync () { 1115 return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0/*sync*/)); 1116 } commitAsync()1117 ErrorCode commitAsync () { 1118 return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1/*async*/)); 1119 } commitSync(Message * message)1120 ErrorCode commitSync (Message *message) { 1121 MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message); 1122 return static_cast<ErrorCode>( 1123 rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0/*sync*/)); 1124 } commitAsync(Message * message)1125 ErrorCode commitAsync (Message *message) { 1126 MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message); 1127 return static_cast<ErrorCode>( 1128 rd_kafka_commit_message(rk_, msgimpl->rkmessage_,1/*async*/)); 1129 } 1130 commitSync(std::vector<TopicPartition * > & offsets)1131 ErrorCode commitSync (std::vector<TopicPartition*> &offsets) { 1132 rd_kafka_topic_partition_list_t *c_parts = 1133 partitions_to_c_parts(offsets); 1134 rd_kafka_resp_err_t err = 1135 rd_kafka_commit(rk_, c_parts, 0); 1136 if (!err) 1137 update_partitions_from_c_parts(offsets, c_parts); 1138 rd_kafka_topic_partition_list_destroy(c_parts); 1139 return static_cast<ErrorCode>(err); 1140 } 1141 commitAsync(const std::vector<TopicPartition * > & offsets)1142 ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) { 1143 rd_kafka_topic_partition_list_t *c_parts = 1144 partitions_to_c_parts(offsets); 1145 rd_kafka_resp_err_t err = 1146 rd_kafka_commit(rk_, c_parts, 1); 1147 rd_kafka_topic_partition_list_destroy(c_parts); 1148 return static_cast<ErrorCode>(err); 1149 } 1150 commitSync(OffsetCommitCb * offset_commit_cb)1151 ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) { 1152 return static_cast<ErrorCode>( 1153 rd_kafka_commit_queue(rk_, NULL, NULL, 1154 RdKafka::offset_commit_cb_trampoline0, 1155 offset_commit_cb)); 1156 } 1157 commitSync(std::vector<TopicPartition * > & offsets,OffsetCommitCb * offset_commit_cb)1158 ErrorCode commitSync (std::vector<TopicPartition*> &offsets, 1159 OffsetCommitCb *offset_commit_cb) { 1160 rd_kafka_topic_partition_list_t *c_parts = 1161 partitions_to_c_parts(offsets); 1162 rd_kafka_resp_err_t err = 1163 rd_kafka_commit_queue(rk_, c_parts, NULL, 1164 RdKafka::offset_commit_cb_trampoline0, 1165 offset_commit_cb); 1166 rd_kafka_topic_partition_list_destroy(c_parts); 1167 return static_cast<ErrorCode>(err); 1168 } 1169 1170 ErrorCode committed (std::vector<TopicPartition*> &partitions, int timeout_ms); 1171 ErrorCode position (std::vector<TopicPartition*> &partitions); 1172 groupMetadata()1173 ConsumerGroupMetadata *groupMetadata () { 1174 rd_kafka_consumer_group_metadata_t *cgmetadata; 1175 1176 cgmetadata = rd_kafka_consumer_group_metadata(rk_); 1177 if (!cgmetadata) 1178 return NULL; 1179 1180 return new ConsumerGroupMetadataImpl(cgmetadata); 1181 } 1182 1183 ErrorCode close (); 1184 1185 ErrorCode seek (const TopicPartition &partition, int timeout_ms); 1186 offsets_store(std::vector<TopicPartition * > & offsets)1187 ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) { 1188 rd_kafka_topic_partition_list_t *c_parts = 1189 partitions_to_c_parts(offsets); 1190 rd_kafka_resp_err_t err = 1191 rd_kafka_offsets_store(rk_, c_parts); 1192 update_partitions_from_c_parts(offsets, c_parts); 1193 rd_kafka_topic_partition_list_destroy(c_parts); 1194 return static_cast<ErrorCode>(err); 1195 } 1196 1197 }; 1198 1199 1200 class MetadataImpl : public Metadata { 1201 public: 1202 MetadataImpl(const rd_kafka_metadata_t *metadata); 1203 ~MetadataImpl(); 1204 brokers()1205 const std::vector<const BrokerMetadata *> *brokers() const { 1206 return &brokers_; 1207 } 1208 topics()1209 const std::vector<const TopicMetadata *> *topics() const { 1210 return &topics_; 1211 } 1212 orig_broker_name()1213 const std::string orig_broker_name() const { 1214 return std::string(metadata_->orig_broker_name); 1215 } 1216 orig_broker_id()1217 int32_t orig_broker_id() const { 1218 return metadata_->orig_broker_id; 1219 } 1220 1221 private: 1222 const rd_kafka_metadata_t *metadata_; 1223 std::vector<const BrokerMetadata *> brokers_; 1224 std::vector<const TopicMetadata *> topics_; 1225 std::string orig_broker_name_; 1226 }; 1227 1228 1229 class QueueImpl : virtual public Queue { 1230 public: ~QueueImpl()1231 ~QueueImpl () { 1232 rd_kafka_queue_destroy(queue_); 1233 } 1234 static Queue *create (Handle *base); 1235 ErrorCode forward (Queue *queue); 1236 Message *consume (int timeout_ms); 1237 int poll (int timeout_ms); 1238 void io_event_enable(int fd, const void *payload, size_t size); 1239 1240 rd_kafka_queue_t *queue_; 1241 }; 1242 1243 1244 1245 1246 1247 class ConsumerImpl : virtual public Consumer, virtual public HandleImpl { 1248 public: ~ConsumerImpl()1249 ~ConsumerImpl () { 1250 if (rk_) 1251 rd_kafka_destroy(rk_); 1252 }; 1253 static Consumer *create (Conf *conf, std::string &errstr); 1254 1255 ErrorCode start (Topic *topic, int32_t partition, int64_t offset); 1256 ErrorCode start (Topic *topic, int32_t partition, int64_t offset, 1257 Queue *queue); 1258 ErrorCode stop (Topic *topic, int32_t partition); 1259 ErrorCode seek (Topic *topic, int32_t partition, int64_t offset, 1260 int timeout_ms); 1261 Message *consume (Topic *topic, int32_t partition, int timeout_ms); 1262 Message *consume (Queue *queue, int timeout_ms); 1263 int consume_callback (Topic *topic, int32_t partition, int timeout_ms, 1264 ConsumeCb *cb, void *opaque); 1265 int consume_callback (Queue *queue, int timeout_ms, 1266 RdKafka::ConsumeCb *consume_cb, void *opaque); 1267 }; 1268 1269 1270 1271 class ProducerImpl : virtual public Producer, virtual public HandleImpl { 1272 1273 public: ~ProducerImpl()1274 ~ProducerImpl () { if (rk_) rd_kafka_destroy(rk_); }; 1275 1276 ErrorCode produce (Topic *topic, int32_t partition, 1277 int msgflags, 1278 void *payload, size_t len, 1279 const std::string *key, 1280 void *msg_opaque); 1281 1282 ErrorCode produce (Topic *topic, int32_t partition, 1283 int msgflags, 1284 void *payload, size_t len, 1285 const void *key, size_t key_len, 1286 void *msg_opaque); 1287 1288 ErrorCode produce (Topic *topic, int32_t partition, 1289 const std::vector<char> *payload, 1290 const std::vector<char> *key, 1291 void *msg_opaque); 1292 1293 ErrorCode produce (const std::string topic_name, int32_t partition, 1294 int msgflags, 1295 void *payload, size_t len, 1296 const void *key, size_t key_len, 1297 int64_t timestamp, void *msg_opaque); 1298 1299 ErrorCode produce (const std::string topic_name, int32_t partition, 1300 int msgflags, 1301 void *payload, size_t len, 1302 const void *key, size_t key_len, 1303 int64_t timestamp, 1304 RdKafka::Headers *headers, 1305 void *msg_opaque); 1306 flush(int timeout_ms)1307 ErrorCode flush (int timeout_ms) { 1308 return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_, 1309 timeout_ms)); 1310 } 1311 purge(int purge_flags)1312 ErrorCode purge (int purge_flags) { 1313 return static_cast<RdKafka::ErrorCode>(rd_kafka_purge(rk_, 1314 (int)purge_flags)); 1315 } 1316 init_transactions(int timeout_ms)1317 Error *init_transactions (int timeout_ms) { 1318 rd_kafka_error_t *c_error; 1319 1320 c_error = rd_kafka_init_transactions(rk_, timeout_ms); 1321 1322 if (c_error) 1323 return new ErrorImpl(c_error); 1324 else 1325 return NULL; 1326 } 1327 begin_transaction()1328 Error *begin_transaction () { 1329 rd_kafka_error_t *c_error; 1330 1331 c_error = rd_kafka_begin_transaction(rk_); 1332 1333 if (c_error) 1334 return new ErrorImpl(c_error); 1335 else 1336 return NULL; 1337 } 1338 send_offsets_to_transaction(const std::vector<TopicPartition * > & offsets,const ConsumerGroupMetadata * group_metadata,int timeout_ms)1339 Error *send_offsets_to_transaction ( 1340 const std::vector<TopicPartition*> &offsets, 1341 const ConsumerGroupMetadata *group_metadata, 1342 int timeout_ms) { 1343 rd_kafka_error_t *c_error; 1344 const RdKafka::ConsumerGroupMetadataImpl *cgmdimpl = 1345 dynamic_cast<const RdKafka::ConsumerGroupMetadataImpl *>(group_metadata); 1346 rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets); 1347 1348 c_error = rd_kafka_send_offsets_to_transaction(rk_, c_offsets, 1349 cgmdimpl->cgmetadata_, 1350 timeout_ms); 1351 1352 rd_kafka_topic_partition_list_destroy(c_offsets); 1353 1354 if (c_error) 1355 return new ErrorImpl(c_error); 1356 else 1357 return NULL; 1358 } 1359 commit_transaction(int timeout_ms)1360 Error *commit_transaction (int timeout_ms) { 1361 rd_kafka_error_t *c_error; 1362 1363 c_error = rd_kafka_commit_transaction(rk_, timeout_ms); 1364 1365 if (c_error) 1366 return new ErrorImpl(c_error); 1367 else 1368 return NULL; 1369 } 1370 abort_transaction(int timeout_ms)1371 Error *abort_transaction (int timeout_ms) { 1372 rd_kafka_error_t *c_error; 1373 1374 c_error = rd_kafka_abort_transaction(rk_, timeout_ms); 1375 1376 if (c_error) 1377 return new ErrorImpl(c_error); 1378 else 1379 return NULL; 1380 } 1381 1382 static Producer *create (Conf *conf, std::string &errstr); 1383 1384 }; 1385 1386 1387 1388 } 1389 1390 #endif /* _RDKAFKACPP_INT_H_ */ 1391