1 /* 2 * librdkafka - Apache Kafka C/C++ library 3 * 4 * Copyright (c) 2014 Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #ifndef _RDKAFKACPP_H_ 30 #define _RDKAFKACPP_H_ 31 32 /** 33 * @file rdkafkacpp.h 34 * @brief Apache Kafka C/C++ consumer and producer client library. 35 * 36 * rdkafkacpp.h contains the public C++ API for librdkafka. 37 * The API is documented in this file as comments prefixing the class, 38 * function, type, enum, define, etc. 39 * For more information, see the C interface in rdkafka.h and read the 40 * manual in INTRODUCTION.md. 41 * The C++ interface is STD C++ '03 compliant and adheres to the 42 * Google C++ Style Guide. 43 44 * @sa For the C interface see rdkafka.h 45 * 46 * @tableofcontents 47 */ 48 49 /**@cond NO_DOC*/ 50 #include <string> 51 #include <list> 52 #include <vector> 53 #include <cstdlib> 54 #include <cstring> 55 #include <stdint.h> 56 #include <sys/types.h> 57 58 #ifdef _WIN32 59 #ifndef ssize_t 60 #ifndef _BASETSD_H_ 61 #include <basetsd.h> 62 #endif 63 #ifndef _SSIZE_T_DEFINED 64 #define _SSIZE_T_DEFINED 65 typedef SSIZE_T ssize_t; 66 #endif 67 #endif 68 #undef RD_EXPORT 69 #ifdef LIBRDKAFKA_STATICLIB 70 #define RD_EXPORT 71 #else 72 #ifdef LIBRDKAFKACPP_EXPORTS 73 #define RD_EXPORT __declspec(dllexport) 74 #else 75 #define RD_EXPORT __declspec(dllimport) 76 #endif 77 #endif 78 #else 79 #define RD_EXPORT 80 #endif 81 82 /**@endcond*/ 83 84 extern "C" { 85 /* Forward declarations */ 86 struct rd_kafka_s; 87 struct rd_kafka_topic_s; 88 struct rd_kafka_message_s; 89 struct rd_kafka_conf_s; 90 struct rd_kafka_topic_conf_s; 91 } 92 93 namespace RdKafka { 94 95 /** 96 * @name Miscellaneous APIs 97 * @{ 98 */ 99 100 /** 101 * @brief librdkafka version 102 * 103 * Interpreted as hex \c MM.mm.rr.xx: 104 * - MM = Major 105 * - mm = minor 106 * - rr = revision 107 * - xx = pre-release id (0xff is the final release) 108 * 109 * E.g.: \c 0x000801ff = 0.8.1 110 * 111 * @remark This value should only be used during compile time, 112 * for runtime checks of version use RdKafka::version() 113 */ 114 #define RD_KAFKA_VERSION 0x010802ff 115 116 /** 117 * @brief Returns the librdkafka version as integer. 118 * 119 * @sa See RD_KAFKA_VERSION for how to parse the integer format. 120 */ 121 RD_EXPORT 122 int version (); 123 124 /** 125 * @brief Returns the librdkafka version as string. 126 */ 127 RD_EXPORT 128 std::string version_str(); 129 130 /** 131 * @brief Returns a CSV list of the supported debug contexts 132 * for use with Conf::Set("debug", ..). 133 */ 134 RD_EXPORT 135 std::string get_debug_contexts(); 136 137 /** 138 * @brief Wait for all rd_kafka_t objects to be destroyed. 139 * 140 * @returns 0 if all kafka objects are now destroyed, or -1 if the 141 * timeout was reached. 142 * Since RdKafka handle deletion is an asynch operation the 143 * \p wait_destroyed() function can be used for applications where 144 * a clean shutdown is required. 145 */ 146 RD_EXPORT 147 int wait_destroyed(int timeout_ms); 148 149 /** 150 * @brief Allocate memory using the same allocator librdkafka uses. 151 * 152 * This is typically an abstraction for the malloc(3) call and makes sure 153 * the application can use the same memory allocator as librdkafka for 154 * allocating pointers that are used by librdkafka. 155 * 156 * @remark Memory allocated by mem_malloc() must be freed using 157 * mem_free(). 158 */ 159 RD_EXPORT 160 void *mem_malloc (size_t size); 161 162 /** 163 * @brief Free pointer returned by librdkafka 164 * 165 * This is typically an abstraction for the free(3) call and makes sure 166 * the application can use the same memory allocator as librdkafka for 167 * freeing pointers returned by librdkafka. 168 * 169 * In standard setups it is usually not necessary to use this interface 170 * rather than the free(3) function. 171 * 172 * @remark mem_free() must only be used for pointers returned by APIs 173 * that explicitly mention using this function for freeing. 174 */ 175 RD_EXPORT 176 void mem_free (void *ptr); 177 178 /**@}*/ 179 180 181 182 /** 183 * @name Constants, errors, types 184 * @{ 185 * 186 * 187 */ 188 189 /** 190 * @brief Error codes. 191 * 192 * The negative error codes delimited by two underscores 193 * (\c _ERR__..) denotes errors internal to librdkafka and are 194 * displayed as \c \"Local: \<error string..\>\", while the error codes 195 * delimited by a single underscore (\c ERR_..) denote broker 196 * errors and are displayed as \c \"Broker: \<error string..\>\". 197 * 198 * @sa Use RdKafka::err2str() to translate an error code a human readable string 199 */ 200 enum ErrorCode { 201 /* Internal errors to rdkafka: */ 202 /** Begin internal error codes */ 203 ERR__BEGIN = -200, 204 /** Received message is incorrect */ 205 ERR__BAD_MSG = -199, 206 /** Bad/unknown compression */ 207 ERR__BAD_COMPRESSION = -198, 208 /** Broker is going away */ 209 ERR__DESTROY = -197, 210 /** Generic failure */ 211 ERR__FAIL = -196, 212 /** Broker transport failure */ 213 ERR__TRANSPORT = -195, 214 /** Critical system resource */ 215 ERR__CRIT_SYS_RESOURCE = -194, 216 /** Failed to resolve broker */ 217 ERR__RESOLVE = -193, 218 /** Produced message timed out*/ 219 ERR__MSG_TIMED_OUT = -192, 220 /** Reached the end of the topic+partition queue on 221 * the broker. Not really an error. 222 * This event is disabled by default, 223 * see the `enable.partition.eof` configuration property. */ 224 ERR__PARTITION_EOF = -191, 225 /** Permanent: Partition does not exist in cluster. */ 226 ERR__UNKNOWN_PARTITION = -190, 227 /** File or filesystem error */ 228 ERR__FS = -189, 229 /** Permanent: Topic does not exist in cluster. */ 230 ERR__UNKNOWN_TOPIC = -188, 231 /** All broker connections are down. */ 232 ERR__ALL_BROKERS_DOWN = -187, 233 /** Invalid argument, or invalid configuration */ 234 ERR__INVALID_ARG = -186, 235 /** Operation timed out */ 236 ERR__TIMED_OUT = -185, 237 /** Queue is full */ 238 ERR__QUEUE_FULL = -184, 239 /** ISR count < required.acks */ 240 ERR__ISR_INSUFF = -183, 241 /** Broker node update */ 242 ERR__NODE_UPDATE = -182, 243 /** SSL error */ 244 ERR__SSL = -181, 245 /** Waiting for coordinator to become available. */ 246 ERR__WAIT_COORD = -180, 247 /** Unknown client group */ 248 ERR__UNKNOWN_GROUP = -179, 249 /** Operation in progress */ 250 ERR__IN_PROGRESS = -178, 251 /** Previous operation in progress, wait for it to finish. */ 252 ERR__PREV_IN_PROGRESS = -177, 253 /** This operation would interfere with an existing subscription */ 254 ERR__EXISTING_SUBSCRIPTION = -176, 255 /** Assigned partitions (rebalance_cb) */ 256 ERR__ASSIGN_PARTITIONS = -175, 257 /** Revoked partitions (rebalance_cb) */ 258 ERR__REVOKE_PARTITIONS = -174, 259 /** Conflicting use */ 260 ERR__CONFLICT = -173, 261 /** Wrong state */ 262 ERR__STATE = -172, 263 /** Unknown protocol */ 264 ERR__UNKNOWN_PROTOCOL = -171, 265 /** Not implemented */ 266 ERR__NOT_IMPLEMENTED = -170, 267 /** Authentication failure*/ 268 ERR__AUTHENTICATION = -169, 269 /** No stored offset */ 270 ERR__NO_OFFSET = -168, 271 /** Outdated */ 272 ERR__OUTDATED = -167, 273 /** Timed out in queue */ 274 ERR__TIMED_OUT_QUEUE = -166, 275 /** Feature not supported by broker */ 276 ERR__UNSUPPORTED_FEATURE = -165, 277 /** Awaiting cache update */ 278 ERR__WAIT_CACHE = -164, 279 /** Operation interrupted */ 280 ERR__INTR = -163, 281 /** Key serialization error */ 282 ERR__KEY_SERIALIZATION = -162, 283 /** Value serialization error */ 284 ERR__VALUE_SERIALIZATION = -161, 285 /** Key deserialization error */ 286 ERR__KEY_DESERIALIZATION = -160, 287 /** Value deserialization error */ 288 ERR__VALUE_DESERIALIZATION = -159, 289 /** Partial response */ 290 ERR__PARTIAL = -158, 291 /** Modification attempted on read-only object */ 292 ERR__READ_ONLY = -157, 293 /** No such entry / item not found */ 294 ERR__NOENT = -156, 295 /** Read underflow */ 296 ERR__UNDERFLOW = -155, 297 /** Invalid type */ 298 ERR__INVALID_TYPE = -154, 299 /** Retry operation */ 300 ERR__RETRY = -153, 301 /** Purged in queue */ 302 ERR__PURGE_QUEUE = -152, 303 /** Purged in flight */ 304 ERR__PURGE_INFLIGHT = -151, 305 /** Fatal error: see RdKafka::Handle::fatal_error() */ 306 ERR__FATAL = -150, 307 /** Inconsistent state */ 308 ERR__INCONSISTENT = -149, 309 /** Gap-less ordering would not be guaranteed if proceeding */ 310 ERR__GAPLESS_GUARANTEE = -148, 311 /** Maximum poll interval exceeded */ 312 ERR__MAX_POLL_EXCEEDED = -147, 313 /** Unknown broker */ 314 ERR__UNKNOWN_BROKER = -146, 315 /** Functionality not configured */ 316 ERR__NOT_CONFIGURED = -145, 317 /** Instance has been fenced */ 318 ERR__FENCED = -144, 319 /** Application generated error */ 320 ERR__APPLICATION = -143, 321 /** Assignment lost */ 322 ERR__ASSIGNMENT_LOST = -142, 323 /** No operation performed */ 324 ERR__NOOP = -141, 325 /** No offset to automatically reset to */ 326 ERR__AUTO_OFFSET_RESET = -140, 327 328 /** End internal error codes */ 329 ERR__END = -100, 330 331 /* Kafka broker errors: */ 332 /** Unknown broker error */ 333 ERR_UNKNOWN = -1, 334 /** Success */ 335 ERR_NO_ERROR = 0, 336 /** Offset out of range */ 337 ERR_OFFSET_OUT_OF_RANGE = 1, 338 /** Invalid message */ 339 ERR_INVALID_MSG = 2, 340 /** Unknown topic or partition */ 341 ERR_UNKNOWN_TOPIC_OR_PART = 3, 342 /** Invalid message size */ 343 ERR_INVALID_MSG_SIZE = 4, 344 /** Leader not available */ 345 ERR_LEADER_NOT_AVAILABLE = 5, 346 /** Not leader for partition */ 347 ERR_NOT_LEADER_FOR_PARTITION = 6, 348 /** Request timed out */ 349 ERR_REQUEST_TIMED_OUT = 7, 350 /** Broker not available */ 351 ERR_BROKER_NOT_AVAILABLE = 8, 352 /** Replica not available */ 353 ERR_REPLICA_NOT_AVAILABLE = 9, 354 /** Message size too large */ 355 ERR_MSG_SIZE_TOO_LARGE = 10, 356 /** StaleControllerEpochCode */ 357 ERR_STALE_CTRL_EPOCH = 11, 358 /** Offset metadata string too large */ 359 ERR_OFFSET_METADATA_TOO_LARGE = 12, 360 /** Broker disconnected before response received */ 361 ERR_NETWORK_EXCEPTION = 13, 362 /** Coordinator load in progress */ 363 ERR_COORDINATOR_LOAD_IN_PROGRESS = 14, 364 /** Group coordinator load in progress */ 365 #define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS 366 /** Coordinator not available */ 367 ERR_COORDINATOR_NOT_AVAILABLE = 15, 368 /** Group coordinator not available */ 369 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE 370 /** Not coordinator */ 371 ERR_NOT_COORDINATOR = 16, 372 /** Not coordinator for group */ 373 #define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR 374 /** Invalid topic */ 375 ERR_TOPIC_EXCEPTION = 17, 376 /** Message batch larger than configured server segment size */ 377 ERR_RECORD_LIST_TOO_LARGE = 18, 378 /** Not enough in-sync replicas */ 379 ERR_NOT_ENOUGH_REPLICAS = 19, 380 /** Message(s) written to insufficient number of in-sync replicas */ 381 ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20, 382 /** Invalid required acks value */ 383 ERR_INVALID_REQUIRED_ACKS = 21, 384 /** Specified group generation id is not valid */ 385 ERR_ILLEGAL_GENERATION = 22, 386 /** Inconsistent group protocol */ 387 ERR_INCONSISTENT_GROUP_PROTOCOL = 23, 388 /** Invalid group.id */ 389 ERR_INVALID_GROUP_ID = 24, 390 /** Unknown member */ 391 ERR_UNKNOWN_MEMBER_ID = 25, 392 /** Invalid session timeout */ 393 ERR_INVALID_SESSION_TIMEOUT = 26, 394 /** Group rebalance in progress */ 395 ERR_REBALANCE_IN_PROGRESS = 27, 396 /** Commit offset data size is not valid */ 397 ERR_INVALID_COMMIT_OFFSET_SIZE = 28, 398 /** Topic authorization failed */ 399 ERR_TOPIC_AUTHORIZATION_FAILED = 29, 400 /** Group authorization failed */ 401 ERR_GROUP_AUTHORIZATION_FAILED = 30, 402 /** Cluster authorization failed */ 403 ERR_CLUSTER_AUTHORIZATION_FAILED = 31, 404 /** Invalid timestamp */ 405 ERR_INVALID_TIMESTAMP = 32, 406 /** Unsupported SASL mechanism */ 407 ERR_UNSUPPORTED_SASL_MECHANISM = 33, 408 /** Illegal SASL state */ 409 ERR_ILLEGAL_SASL_STATE = 34, 410 /** Unuspported version */ 411 ERR_UNSUPPORTED_VERSION = 35, 412 /** Topic already exists */ 413 ERR_TOPIC_ALREADY_EXISTS = 36, 414 /** Invalid number of partitions */ 415 ERR_INVALID_PARTITIONS = 37, 416 /** Invalid replication factor */ 417 ERR_INVALID_REPLICATION_FACTOR = 38, 418 /** Invalid replica assignment */ 419 ERR_INVALID_REPLICA_ASSIGNMENT = 39, 420 /** Invalid config */ 421 ERR_INVALID_CONFIG = 40, 422 /** Not controller for cluster */ 423 ERR_NOT_CONTROLLER = 41, 424 /** Invalid request */ 425 ERR_INVALID_REQUEST = 42, 426 /** Message format on broker does not support request */ 427 ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43, 428 /** Policy violation */ 429 ERR_POLICY_VIOLATION = 44, 430 /** Broker received an out of order sequence number */ 431 ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45, 432 /** Broker received a duplicate sequence number */ 433 ERR_DUPLICATE_SEQUENCE_NUMBER = 46, 434 /** Producer attempted an operation with an old epoch */ 435 ERR_INVALID_PRODUCER_EPOCH = 47, 436 /** Producer attempted a transactional operation in an invalid state */ 437 ERR_INVALID_TXN_STATE = 48, 438 /** Producer attempted to use a producer id which is not 439 * currently assigned to its transactional id */ 440 ERR_INVALID_PRODUCER_ID_MAPPING = 49, 441 /** Transaction timeout is larger than the maximum 442 * value allowed by the broker's max.transaction.timeout.ms */ 443 ERR_INVALID_TRANSACTION_TIMEOUT = 50, 444 /** Producer attempted to update a transaction while another 445 * concurrent operation on the same transaction was ongoing */ 446 ERR_CONCURRENT_TRANSACTIONS = 51, 447 /** Indicates that the transaction coordinator sending a 448 * WriteTxnMarker is no longer the current coordinator for a 449 * given producer */ 450 ERR_TRANSACTION_COORDINATOR_FENCED = 52, 451 /** Transactional Id authorization failed */ 452 ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53, 453 /** Security features are disabled */ 454 ERR_SECURITY_DISABLED = 54, 455 /** Operation not attempted */ 456 ERR_OPERATION_NOT_ATTEMPTED = 55, 457 /** Disk error when trying to access log file on the disk */ 458 ERR_KAFKA_STORAGE_ERROR = 56, 459 /** The user-specified log directory is not found in the broker config */ 460 ERR_LOG_DIR_NOT_FOUND = 57, 461 /** SASL Authentication failed */ 462 ERR_SASL_AUTHENTICATION_FAILED = 58, 463 /** Unknown Producer Id */ 464 ERR_UNKNOWN_PRODUCER_ID = 59, 465 /** Partition reassignment is in progress */ 466 ERR_REASSIGNMENT_IN_PROGRESS = 60, 467 /** Delegation Token feature is not enabled */ 468 ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61, 469 /** Delegation Token is not found on server */ 470 ERR_DELEGATION_TOKEN_NOT_FOUND = 62, 471 /** Specified Principal is not valid Owner/Renewer */ 472 ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63, 473 /** Delegation Token requests are not allowed on this connection */ 474 ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64, 475 /** Delegation Token authorization failed */ 476 ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65, 477 /** Delegation Token is expired */ 478 ERR_DELEGATION_TOKEN_EXPIRED = 66, 479 /** Supplied principalType is not supported */ 480 ERR_INVALID_PRINCIPAL_TYPE = 67, 481 /** The group is not empty */ 482 ERR_NON_EMPTY_GROUP = 68, 483 /** The group id does not exist */ 484 ERR_GROUP_ID_NOT_FOUND = 69, 485 /** The fetch session ID was not found */ 486 ERR_FETCH_SESSION_ID_NOT_FOUND = 70, 487 /** The fetch session epoch is invalid */ 488 ERR_INVALID_FETCH_SESSION_EPOCH = 71, 489 /** No matching listener */ 490 ERR_LISTENER_NOT_FOUND = 72, 491 /** Topic deletion is disabled */ 492 ERR_TOPIC_DELETION_DISABLED = 73, 493 /** Leader epoch is older than broker epoch */ 494 ERR_FENCED_LEADER_EPOCH = 74, 495 /** Leader epoch is newer than broker epoch */ 496 ERR_UNKNOWN_LEADER_EPOCH = 75, 497 /** Unsupported compression type */ 498 ERR_UNSUPPORTED_COMPRESSION_TYPE = 76, 499 /** Broker epoch has changed */ 500 ERR_STALE_BROKER_EPOCH = 77, 501 /** Leader high watermark is not caught up */ 502 ERR_OFFSET_NOT_AVAILABLE = 78, 503 /** Group member needs a valid member ID */ 504 ERR_MEMBER_ID_REQUIRED = 79, 505 /** Preferred leader was not available */ 506 ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80, 507 /** Consumer group has reached maximum size */ 508 ERR_GROUP_MAX_SIZE_REACHED = 81, 509 /** Static consumer fenced by other consumer with same 510 * group.instance.id. */ 511 ERR_FENCED_INSTANCE_ID = 82, 512 /** Eligible partition leaders are not available */ 513 ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE = 83, 514 /** Leader election not needed for topic partition */ 515 ERR_ELECTION_NOT_NEEDED = 84, 516 /** No partition reassignment is in progress */ 517 ERR_NO_REASSIGNMENT_IN_PROGRESS = 85, 518 /** Deleting offsets of a topic while the consumer group is 519 * subscribed to it */ 520 ERR_GROUP_SUBSCRIBED_TO_TOPIC = 86, 521 /** Broker failed to validate record */ 522 ERR_INVALID_RECORD = 87, 523 /** There are unstable offsets that need to be cleared */ 524 ERR_UNSTABLE_OFFSET_COMMIT = 88, 525 /** Throttling quota has been exceeded */ 526 ERR_THROTTLING_QUOTA_EXCEEDED = 89, 527 /** There is a newer producer with the same transactionalId 528 * which fences the current one */ 529 ERR_PRODUCER_FENCED = 90, 530 /** Request illegally referred to resource that does not exist */ 531 ERR_RESOURCE_NOT_FOUND = 91, 532 /** Request illegally referred to the same resource twice */ 533 ERR_DUPLICATE_RESOURCE = 92, 534 /** Requested credential would not meet criteria for acceptability */ 535 ERR_UNACCEPTABLE_CREDENTIAL = 93, 536 /** Indicates that the either the sender or recipient of a 537 * voter-only request is not one of the expected voters */ 538 ERR_INCONSISTENT_VOTER_SET = 94, 539 /** Invalid update version */ 540 ERR_INVALID_UPDATE_VERSION = 95, 541 /** Unable to update finalized features due to server error */ 542 ERR_FEATURE_UPDATE_FAILED = 96, 543 /** Request principal deserialization failed during forwarding */ 544 ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97 545 }; 546 547 548 /** 549 * @brief Returns a human readable representation of a kafka error. 550 */ 551 RD_EXPORT 552 std::string err2str(RdKafka::ErrorCode err); 553 554 555 556 /** 557 * @enum CertificateType 558 * @brief SSL certificate types 559 */ 560 enum CertificateType { 561 CERT_PUBLIC_KEY, /**< Client's public key */ 562 CERT_PRIVATE_KEY, /**< Client's private key */ 563 CERT_CA, /**< CA certificate */ 564 CERT__CNT 565 }; 566 567 /** 568 * @enum CertificateEncoding 569 * @brief SSL certificate encoding 570 */ 571 enum CertificateEncoding { 572 CERT_ENC_PKCS12, /**< PKCS#12 */ 573 CERT_ENC_DER, /**< DER / binary X.509 ASN1 */ 574 CERT_ENC_PEM, /**< PEM */ 575 CERT_ENC__CNT 576 }; 577 578 /**@} */ 579 580 581 582 /**@cond NO_DOC*/ 583 /* Forward declarations */ 584 class Handle; 585 class Producer; 586 class Message; 587 class Headers; 588 class Queue; 589 class Event; 590 class Topic; 591 class TopicPartition; 592 class Metadata; 593 class KafkaConsumer; 594 /**@endcond*/ 595 596 597 /** 598 * @name Error class 599 * @{ 600 * 601 */ 602 603 /** 604 * @brief The Error class is used as a return value from APIs to propagate 605 * an error. The error consists of an error code which is to be used 606 * programatically, an error string for showing to the user, 607 * and various error flags that can be used programmatically to decide 608 * how to handle the error; e.g., should the operation be retried, 609 * was it a fatal error, etc. 610 * 611 * Error objects must be deleted explicitly to free its resources. 612 */ 613 class RD_EXPORT Error { 614 public: 615 616 /** 617 * @brief Create error object. 618 */ 619 static Error *create (ErrorCode code, const std::string *errstr); 620 ~Error()621 virtual ~Error () { } 622 623 /* 624 * Error accessor methods 625 */ 626 627 /** 628 * @returns the error code, e.g., RdKafka::ERR_UNKNOWN_MEMBER_ID. 629 */ 630 virtual ErrorCode code () const = 0; 631 632 /** 633 * @returns the error code name, e.g, "ERR_UNKNOWN_MEMBER_ID". 634 */ 635 virtual std::string name () const = 0; 636 637 /** 638 * @returns a human readable error string. 639 */ 640 virtual std::string str () const = 0; 641 642 /** 643 * @returns true if the error is a fatal error, indicating that the client 644 * instance is no longer usable, else false. 645 */ 646 virtual bool is_fatal () const = 0; 647 648 /** 649 * @returns true if the operation may be retried, else false. 650 */ 651 virtual bool is_retriable () const = 0; 652 653 /** 654 * @returns true if the error is an abortable transaction error in which case 655 * the application must call RdKafka::Producer::abort_transaction() 656 * and start a new transaction with 657 * RdKafka::Producer::begin_transaction() if it wishes to proceed 658 * with transactions. 659 * Else returns false. 660 * 661 * @remark The return value of this method is only valid for errors returned 662 * by the transactional API. 663 */ 664 virtual bool txn_requires_abort () const = 0; 665 }; 666 667 /**@}*/ 668 669 670 /** 671 * @name Callback classes 672 * @{ 673 * 674 * 675 * librdkafka uses (optional) callbacks to propagate information and 676 * delegate decisions to the application logic. 677 * 678 * An application must call RdKafka::poll() at regular intervals to 679 * serve queued callbacks. 680 */ 681 682 683 /** 684 * @brief Delivery Report callback class 685 * 686 * The delivery report callback will be called once for each message 687 * accepted by RdKafka::Producer::produce() (et.al) with 688 * RdKafka::Message::err() set to indicate the result of the produce request. 689 * 690 * The callback is called when a message is succesfully produced or 691 * if librdkafka encountered a permanent failure, or the retry counter for 692 * temporary errors has been exhausted. 693 * 694 * An application must call RdKafka::poll() at regular intervals to 695 * serve queued delivery report callbacks. 696 697 */ 698 class RD_EXPORT DeliveryReportCb { 699 public: 700 /** 701 * @brief Delivery report callback. 702 */ 703 virtual void dr_cb (Message &message) = 0; 704 ~DeliveryReportCb()705 virtual ~DeliveryReportCb() { } 706 }; 707 708 709 /** 710 * @brief SASL/OAUTHBEARER token refresh callback class 711 * 712 * The SASL/OAUTHBEARER token refresh callback is triggered via RdKafka::poll() 713 * whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved, 714 * typically based on the configuration defined in \c sasl.oauthbearer.config. 715 * 716 * The \c oauthbearer_config argument is the value of the 717 * \c sasl.oauthbearer.config configuration property. 718 * 719 * The callback should invoke RdKafka::Handle::oauthbearer_set_token() or 720 * RdKafka::Handle::oauthbearer_set_token_failure() to indicate success or 721 * failure, respectively. 722 * 723 * The refresh operation is eventable and may be received when an event 724 * callback handler is set with an event type of 725 * \c RdKafka::Event::EVENT_OAUTHBEARER_TOKEN_REFRESH. 726 * 727 * Note that before any SASL/OAUTHBEARER broker connection can succeed the 728 * application must call RdKafka::Handle::oauthbearer_set_token() once -- either 729 * directly or, more typically, by invoking RdKafka::poll() -- in order to 730 * cause retrieval of an initial token to occur. 731 * 732 * An application must call RdKafka::poll() at regular intervals to 733 * serve queued SASL/OAUTHBEARER token refresh callbacks (when 734 * OAUTHBEARER is the SASL mechanism). 735 */ 736 class RD_EXPORT OAuthBearerTokenRefreshCb { 737 public: 738 /** 739 * @brief SASL/OAUTHBEARER token refresh callback class. 740 * 741 * @param handle The RdKafka::Handle which requires a refreshed token. 742 * @param oauthbearer_config The value of the 743 * \p sasl.oauthbearer.config configuration property for \p handle. 744 */ 745 virtual void oauthbearer_token_refresh_cb (RdKafka::Handle* handle, 746 const std::string &oauthbearer_config) = 0; 747 ~OAuthBearerTokenRefreshCb()748 virtual ~OAuthBearerTokenRefreshCb() { } 749 }; 750 751 752 /** 753 * @brief Partitioner callback class 754 * 755 * Generic partitioner callback class for implementing custom partitioners. 756 * 757 * @sa RdKafka::Conf::set() \c "partitioner_cb" 758 */ 759 class RD_EXPORT PartitionerCb { 760 public: 761 /** 762 * @brief Partitioner callback 763 * 764 * Return the partition to use for \p key in \p topic. 765 * 766 * The \p msg_opaque is the same \p msg_opaque provided in the 767 * RdKafka::Producer::produce() call. 768 * 769 * @remark \p key may be NULL or the empty. 770 * 771 * @returns Must return a value between 0 and \p partition_cnt (non-inclusive). 772 * May return RD_KAFKA_PARTITION_UA (-1) if partitioning failed. 773 * 774 * @sa The callback may use RdKafka::Topic::partition_available() to check 775 * if a partition has an active leader broker. 776 */ 777 virtual int32_t partitioner_cb (const Topic *topic, 778 const std::string *key, 779 int32_t partition_cnt, 780 void *msg_opaque) = 0; 781 ~PartitionerCb()782 virtual ~PartitionerCb() { } 783 }; 784 785 /** 786 * @brief Variant partitioner with key pointer 787 * 788 */ 789 class PartitionerKeyPointerCb { 790 public: 791 /** 792 * @brief Variant partitioner callback that gets \p key as pointer and length 793 * instead of as a const std::string *. 794 * 795 * @remark \p key may be NULL or have \p key_len 0. 796 * 797 * @sa See RdKafka::PartitionerCb::partitioner_cb() for exact semantics 798 */ 799 virtual int32_t partitioner_cb (const Topic *topic, 800 const void *key, 801 size_t key_len, 802 int32_t partition_cnt, 803 void *msg_opaque) = 0; 804 ~PartitionerKeyPointerCb()805 virtual ~PartitionerKeyPointerCb() { } 806 }; 807 808 809 810 /** 811 * @brief Event callback class 812 * 813 * Events are a generic interface for propagating errors, statistics, logs, etc 814 * from librdkafka to the application. 815 * 816 * @sa RdKafka::Event 817 */ 818 class RD_EXPORT EventCb { 819 public: 820 /** 821 * @brief Event callback 822 * 823 * @sa RdKafka::Event 824 */ 825 virtual void event_cb (Event &event) = 0; 826 ~EventCb()827 virtual ~EventCb() { } 828 }; 829 830 831 /** 832 * @brief Event object class as passed to the EventCb callback. 833 */ 834 class RD_EXPORT Event { 835 public: 836 /** @brief Event type */ 837 enum Type { 838 EVENT_ERROR, /**< Event is an error condition */ 839 EVENT_STATS, /**< Event is a statistics JSON document */ 840 EVENT_LOG, /**< Event is a log message */ 841 EVENT_THROTTLE /**< Event is a throttle level signaling from the broker */ 842 }; 843 844 /** @brief EVENT_LOG severities (conforms to syslog(3) severities) */ 845 enum Severity { 846 EVENT_SEVERITY_EMERG = 0, 847 EVENT_SEVERITY_ALERT = 1, 848 EVENT_SEVERITY_CRITICAL = 2, 849 EVENT_SEVERITY_ERROR = 3, 850 EVENT_SEVERITY_WARNING = 4, 851 EVENT_SEVERITY_NOTICE = 5, 852 EVENT_SEVERITY_INFO = 6, 853 EVENT_SEVERITY_DEBUG = 7 854 }; 855 ~Event()856 virtual ~Event () { } 857 858 /* 859 * Event Accessor methods 860 */ 861 862 /** 863 * @returns The event type 864 * @remark Applies to all event types 865 */ 866 virtual Type type () const = 0; 867 868 /** 869 * @returns Event error, if any. 870 * @remark Applies to all event types except THROTTLE 871 */ 872 virtual ErrorCode err () const = 0; 873 874 /** 875 * @returns Log severity level. 876 * @remark Applies to LOG event type. 877 */ 878 virtual Severity severity () const = 0; 879 880 /** 881 * @returns Log facility string. 882 * @remark Applies to LOG event type. 883 */ 884 virtual std::string fac () const = 0; 885 886 /** 887 * @returns Log message string. 888 * 889 * \c EVENT_LOG: Log message string. 890 * \c EVENT_STATS: JSON object (as string). 891 * 892 * @remark Applies to LOG event type. 893 */ 894 virtual std::string str () const = 0; 895 896 /** 897 * @returns Throttle time in milliseconds. 898 * @remark Applies to THROTTLE event type. 899 */ 900 virtual int throttle_time () const = 0; 901 902 /** 903 * @returns Throttling broker's name. 904 * @remark Applies to THROTTLE event type. 905 */ 906 virtual std::string broker_name () const = 0; 907 908 /** 909 * @returns Throttling broker's id. 910 * @remark Applies to THROTTLE event type. 911 */ 912 virtual int broker_id () const = 0; 913 914 915 /** 916 * @returns true if this is a fatal error. 917 * @remark Applies to ERROR event type. 918 * @sa RdKafka::Handle::fatal_error() 919 */ 920 virtual bool fatal () const = 0; 921 }; 922 923 924 925 /** 926 * @brief Consume callback class 927 */ 928 class RD_EXPORT ConsumeCb { 929 public: 930 /** 931 * @brief The consume callback is used with 932 * RdKafka::Consumer::consume_callback() 933 * methods and will be called for each consumed \p message. 934 * 935 * The callback interface is optional but provides increased performance. 936 */ 937 virtual void consume_cb (Message &message, void *opaque) = 0; 938 ~ConsumeCb()939 virtual ~ConsumeCb() { } 940 }; 941 942 943 /** 944 * @brief \b KafkaConsumer: Rebalance callback class 945 */ 946 class RD_EXPORT RebalanceCb { 947 public: 948 /** 949 * @brief Group rebalance callback for use with RdKafka::KafkaConsumer 950 * 951 * Registering a \p rebalance_cb turns off librdkafka's automatic 952 * partition assignment/revocation and instead delegates that responsibility 953 * to the application's \p rebalance_cb. 954 * 955 * The rebalance callback is responsible for updating librdkafka's 956 * assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS 957 * and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle 958 * arbitrary rebalancing failures where \p err is neither of those. 959 * @remark In this latter case (arbitrary error), the application must 960 * call unassign() to synchronize state. 961 * 962 * For eager/non-cooperative `partition.assignment.strategy` assignors, 963 * such as `range` and `roundrobin`, the application must use 964 * assign assign() to set and unassign() to clear the entire assignment. 965 * For the cooperative assignors, such as `cooperative-sticky`, the 966 * application must use incremental_assign() for ERR__ASSIGN_PARTITIONS and 967 * incremental_unassign() for ERR__REVOKE_PARTITIONS. 968 * 969 * Without a rebalance callback this is done automatically by librdkafka 970 * but registering a rebalance callback gives the application flexibility 971 * in performing other operations along with the assinging/revocation, 972 * such as fetching offsets from an alternate location (on assign) 973 * or manually committing offsets (on revoke). 974 * 975 * @sa RdKafka::KafkaConsumer::assign() 976 * @sa RdKafka::KafkaConsumer::incremental_assign() 977 * @sa RdKafka::KafkaConsumer::incremental_unassign() 978 * @sa RdKafka::KafkaConsumer::assignment_lost() 979 * @sa RdKafka::KafkaConsumer::rebalance_protocol() 980 * 981 * The following example show's the application's responsibilities: 982 * @code 983 * class MyRebalanceCb : public RdKafka::RebalanceCb { 984 * public: 985 * void rebalance_cb (RdKafka::KafkaConsumer *consumer, 986 * RdKafka::ErrorCode err, 987 * std::vector<RdKafka::TopicPartition*> &partitions) { 988 * if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { 989 * // application may load offets from arbitrary external 990 * // storage here and update \p partitions 991 * if (consumer->rebalance_protocol() == "COOPERATIVE") 992 * consumer->incremental_assign(partitions); 993 * else 994 * consumer->assign(partitions); 995 * 996 * } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) { 997 * // Application may commit offsets manually here 998 * // if auto.commit.enable=false 999 * if (consumer->rebalance_protocol() == "COOPERATIVE") 1000 * consumer->incremental_unassign(partitions); 1001 * else 1002 * consumer->unassign(); 1003 * 1004 * } else { 1005 * std::cerr << "Rebalancing error: " << 1006 * RdKafka::err2str(err) << std::endl; 1007 * consumer->unassign(); 1008 * } 1009 * } 1010 * } 1011 * @endcode 1012 * 1013 * @remark The above example lacks error handling for assign calls, see 1014 * the examples/ directory. 1015 */ 1016 virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer, 1017 RdKafka::ErrorCode err, 1018 std::vector<TopicPartition*>&partitions) = 0; 1019 ~RebalanceCb()1020 virtual ~RebalanceCb() { } 1021 }; 1022 1023 1024 /** 1025 * @brief Offset Commit callback class 1026 */ 1027 class RD_EXPORT OffsetCommitCb { 1028 public: 1029 /** 1030 * @brief Set offset commit callback for use with consumer groups 1031 * 1032 * The results of automatic or manual offset commits will be scheduled 1033 * for this callback and is served by RdKafka::KafkaConsumer::consume(). 1034 * 1035 * If no partitions had valid offsets to commit this callback will be called 1036 * with \p err == ERR__NO_OFFSET which is not to be considered an error. 1037 * 1038 * The \p offsets list contains per-partition information: 1039 * - \c topic The topic committed 1040 * - \c partition The partition committed 1041 * - \c offset: Committed offset (attempted) 1042 * - \c err: Commit error 1043 */ 1044 virtual void offset_commit_cb(RdKafka::ErrorCode err, 1045 std::vector<TopicPartition*>&offsets) = 0; 1046 ~OffsetCommitCb()1047 virtual ~OffsetCommitCb() { } 1048 }; 1049 1050 1051 1052 /** 1053 * @brief SSL broker certificate verification class. 1054 * 1055 * @remark Class instance must outlive the RdKafka client instance. 1056 */ 1057 class RD_EXPORT SslCertificateVerifyCb { 1058 public: 1059 /** 1060 * @brief SSL broker certificate verification callback. 1061 * 1062 * The verification callback is triggered from internal librdkafka threads 1063 * upon connecting to a broker. On each connection attempt the callback 1064 * will be called for each certificate in the broker's certificate chain, 1065 * starting at the root certification, as long as the application callback 1066 * returns 1 (valid certificate). 1067 * 1068 * \p broker_name and \p broker_id correspond to the broker the connection 1069 * is being made to. 1070 * The \c x509_error argument indicates if OpenSSL's verification of 1071 * the certificate succeed (0) or failed (an OpenSSL error code). 1072 * The application may set the SSL context error code by returning 0 1073 * from the verify callback and providing a non-zero SSL context error code 1074 * in \p x509_error. 1075 * If the verify callback sets \p x509_error to 0, returns 1, and the 1076 * original \p x509_error was non-zero, the error on the SSL context will 1077 * be cleared. 1078 * \p x509_error is always a valid pointer to an int. 1079 * 1080 * \p depth is the depth of the current certificate in the chain, starting 1081 * at the root certificate. 1082 * 1083 * The certificate itself is passed in binary DER format in \p buf of 1084 * size \p size. 1085 * 1086 * The callback must 1 if verification succeeds, or 0 if verification fails 1087 * and write a human-readable error message 1088 * to \p errstr. 1089 * 1090 * @warning This callback will be called from internal librdkafka threads. 1091 * 1092 * @remark See <openssl/x509_vfy.h> in the OpenSSL source distribution 1093 * for a list of \p x509_error codes. 1094 */ 1095 virtual bool ssl_cert_verify_cb (const std::string &broker_name, 1096 int32_t broker_id, 1097 int *x509_error, 1098 int depth, 1099 const char *buf, size_t size, 1100 std::string &errstr) = 0; 1101 ~SslCertificateVerifyCb()1102 virtual ~SslCertificateVerifyCb() {} 1103 }; 1104 1105 1106 /** 1107 * @brief \b Portability: SocketCb callback class 1108 * 1109 */ 1110 class RD_EXPORT SocketCb { 1111 public: 1112 /** 1113 * @brief Socket callback 1114 * 1115 * The socket callback is responsible for opening a socket 1116 * according to the supplied \p domain, \p type and \p protocol. 1117 * The socket shall be created with \c CLOEXEC set in a racefree fashion, if 1118 * possible. 1119 * 1120 * It is typically not required to register an alternative socket 1121 * implementation 1122 * 1123 * @returns The socket file descriptor or -1 on error (\c errno must be set) 1124 */ 1125 virtual int socket_cb (int domain, int type, int protocol) = 0; 1126 ~SocketCb()1127 virtual ~SocketCb() { } 1128 }; 1129 1130 1131 /** 1132 * @brief \b Portability: OpenCb callback class 1133 * 1134 */ 1135 class RD_EXPORT OpenCb { 1136 public: 1137 /** 1138 * @brief Open callback 1139 * The open callback is responsible for opening the file specified by 1140 * \p pathname, using \p flags and \p mode. 1141 * The file shall be opened with \c CLOEXEC set in a racefree fashion, if 1142 * possible. 1143 * 1144 * It is typically not required to register an alternative open implementation 1145 * 1146 * @remark Not currently available on native Win32 1147 */ 1148 virtual int open_cb (const std::string &path, int flags, int mode) = 0; 1149 ~OpenCb()1150 virtual ~OpenCb() { } 1151 }; 1152 1153 1154 /**@}*/ 1155 1156 1157 1158 1159 /** 1160 * @name Configuration interface 1161 * @{ 1162 * 1163 */ 1164 1165 /** 1166 * @brief Configuration interface 1167 * 1168 * Holds either global or topic configuration that are passed to 1169 * RdKafka::Consumer::create(), RdKafka::Producer::create(), 1170 * RdKafka::KafkaConsumer::create(), etc. 1171 * 1172 * @sa CONFIGURATION.md for the full list of supported properties. 1173 */ 1174 class RD_EXPORT Conf { 1175 public: 1176 /** 1177 * @brief Configuration object type 1178 */ 1179 enum ConfType { 1180 CONF_GLOBAL, /**< Global configuration */ 1181 CONF_TOPIC /**< Topic specific configuration */ 1182 }; 1183 1184 /** 1185 * @brief RdKafka::Conf::Set() result code 1186 */ 1187 enum ConfResult { 1188 CONF_UNKNOWN = -2, /**< Unknown configuration property */ 1189 CONF_INVALID = -1, /**< Invalid configuration value */ 1190 CONF_OK = 0 /**< Configuration property was succesfully set */ 1191 }; 1192 1193 1194 /** 1195 * @brief Create configuration object 1196 */ 1197 static Conf *create (ConfType type); 1198 ~Conf()1199 virtual ~Conf () { } 1200 1201 /** 1202 * @brief Set configuration property \p name to value \p value. 1203 * 1204 * Fallthrough: 1205 * Topic-level configuration properties may be set using this interface 1206 * in which case they are applied on the \c default_topic_conf. 1207 * If no \c default_topic_conf has been set one will be created. 1208 * Any sub-sequent set("default_topic_conf", ..) calls will 1209 * replace the current default topic configuration. 1210 1211 * @returns CONF_OK on success, else writes a human readable error 1212 * description to \p errstr on error. 1213 */ 1214 virtual Conf::ConfResult set (const std::string &name, 1215 const std::string &value, 1216 std::string &errstr) = 0; 1217 1218 /** @brief Use with \p name = \c \"dr_cb\" */ 1219 virtual Conf::ConfResult set (const std::string &name, 1220 DeliveryReportCb *dr_cb, 1221 std::string &errstr) = 0; 1222 1223 /** @brief Use with \p name = \c \"oauthbearer_token_refresh_cb\" */ 1224 virtual Conf::ConfResult set (const std::string &name, 1225 OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, 1226 std::string &errstr) = 0; 1227 1228 /** @brief Use with \p name = \c \"event_cb\" */ 1229 virtual Conf::ConfResult set (const std::string &name, 1230 EventCb *event_cb, 1231 std::string &errstr) = 0; 1232 1233 /** @brief Use with \p name = \c \"default_topic_conf\" 1234 * 1235 * Sets the default topic configuration to use for for automatically 1236 * subscribed topics. 1237 * 1238 * @sa RdKafka::KafkaConsumer::subscribe() 1239 */ 1240 virtual Conf::ConfResult set (const std::string &name, 1241 const Conf *topic_conf, 1242 std::string &errstr) = 0; 1243 1244 /** @brief Use with \p name = \c \"partitioner_cb\" */ 1245 virtual Conf::ConfResult set (const std::string &name, 1246 PartitionerCb *partitioner_cb, 1247 std::string &errstr) = 0; 1248 1249 /** @brief Use with \p name = \c \"partitioner_key_pointer_cb\" */ 1250 virtual Conf::ConfResult set (const std::string &name, 1251 PartitionerKeyPointerCb *partitioner_kp_cb, 1252 std::string &errstr) = 0; 1253 1254 /** @brief Use with \p name = \c \"socket_cb\" */ 1255 virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb, 1256 std::string &errstr) = 0; 1257 1258 /** @brief Use with \p name = \c \"open_cb\" */ 1259 virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb, 1260 std::string &errstr) = 0; 1261 1262 /** @brief Use with \p name = \c \"rebalance_cb\" */ 1263 virtual Conf::ConfResult set (const std::string &name, 1264 RebalanceCb *rebalance_cb, 1265 std::string &errstr) = 0; 1266 1267 /** @brief Use with \p name = \c \"offset_commit_cb\" */ 1268 virtual Conf::ConfResult set (const std::string &name, 1269 OffsetCommitCb *offset_commit_cb, 1270 std::string &errstr) = 0; 1271 1272 /** @brief Use with \p name = \c \"ssl_cert_verify_cb\". 1273 * @returns CONF_OK on success or CONF_INVALID if SSL is 1274 * not supported in this build. 1275 */ 1276 virtual Conf::ConfResult set(const std::string &name, 1277 SslCertificateVerifyCb *ssl_cert_verify_cb, 1278 std::string &errstr) = 0; 1279 1280 /** 1281 * @brief Set certificate/key \p cert_type from the \p cert_enc encoded 1282 * memory at \p buffer of \p size bytes. 1283 * 1284 * @param cert_type Certificate or key type to configure. 1285 * @param cert_enc Buffer \p encoding type. 1286 * @param buffer Memory pointer to encoded certificate or key. 1287 * The memory is not referenced after this function returns. 1288 * @param size Size of memory at \p buffer. 1289 * @param errstr A human-readable error string will be written to this string 1290 * on failure. 1291 * 1292 * @returns CONF_OK on success or CONF_INVALID if the memory in 1293 * \p buffer is of incorrect encoding, or if librdkafka 1294 * was not built with SSL support. 1295 * 1296 * @remark Calling this method multiple times with the same \p cert_type 1297 * will replace the previous value. 1298 * 1299 * @remark Calling this method with \p buffer set to NULL will clear the 1300 * configuration for \p cert_type. 1301 * 1302 * @remark The private key may require a password, which must be specified 1303 * with the `ssl.key.password` configuration property prior to 1304 * calling this function. 1305 * 1306 * @remark Private and public keys in PEM format may also be set with the 1307 * `ssl.key.pem` and `ssl.certificate.pem` configuration properties. 1308 * 1309 * @remark CA certificate in PEM format may also be set with the 1310 * `ssl.ca.pem` configuration property. 1311 */ 1312 virtual Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type, 1313 RdKafka::CertificateEncoding cert_enc, 1314 const void *buffer, size_t size, 1315 std::string &errstr) = 0; 1316 1317 /** @brief Query single configuration value 1318 * 1319 * Do not use this method to get callbacks registered by the configuration file. 1320 * Instead use the specific get() methods with the specific callback parameter in the signature. 1321 * 1322 * Fallthrough: 1323 * Topic-level configuration properties from the \c default_topic_conf 1324 * may be retrieved using this interface. 1325 * 1326 * @returns CONF_OK if the property was set previously set and 1327 * returns the value in \p value. */ 1328 virtual Conf::ConfResult get(const std::string &name, 1329 std::string &value) const = 0; 1330 1331 /** @brief Query single configuration value 1332 * @returns CONF_OK if the property was set previously set and 1333 * returns the value in \p dr_cb. */ 1334 virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0; 1335 1336 /** @brief Query single configuration value 1337 * @returns CONF_OK if the property was set previously set and 1338 * returns the value in \p oauthbearer_token_refresh_cb. */ 1339 virtual Conf::ConfResult get( 1340 OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0; 1341 1342 /** @brief Query single configuration value 1343 * @returns CONF_OK if the property was set previously set and 1344 * returns the value in \p event_cb. */ 1345 virtual Conf::ConfResult get(EventCb *&event_cb) const = 0; 1346 1347 /** @brief Query single configuration value 1348 * @returns CONF_OK if the property was set previously set and 1349 * returns the value in \p partitioner_cb. */ 1350 virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0; 1351 1352 /** @brief Query single configuration value 1353 * @returns CONF_OK if the property was set previously set and 1354 * returns the value in \p partitioner_kp_cb. */ 1355 virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0; 1356 1357 /** @brief Query single configuration value 1358 * @returns CONF_OK if the property was set previously set and 1359 * returns the value in \p socket_cb. */ 1360 virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0; 1361 1362 /** @brief Query single configuration value 1363 * @returns CONF_OK if the property was set previously set and 1364 * returns the value in \p open_cb. */ 1365 virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0; 1366 1367 /** @brief Query single configuration value 1368 * @returns CONF_OK if the property was set previously set and 1369 * returns the value in \p rebalance_cb. */ 1370 virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0; 1371 1372 /** @brief Query single configuration value 1373 * @returns CONF_OK if the property was set previously set and 1374 * returns the value in \p offset_commit_cb. */ 1375 virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0; 1376 1377 /** @brief Use with \p name = \c \"ssl_cert_verify_cb\" */ 1378 virtual Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0; 1379 1380 /** @brief Dump configuration names and values to list containing 1381 * name,value tuples */ 1382 virtual std::list<std::string> *dump () = 0; 1383 1384 /** @brief Use with \p name = \c \"consume_cb\" */ 1385 virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb, 1386 std::string &errstr) = 0; 1387 1388 /** 1389 * @brief Returns the underlying librdkafka C rd_kafka_conf_t handle. 1390 * 1391 * @warning Calling the C API on this handle is not recommended and there 1392 * is no official support for it, but for cases where the C++ 1393 * does not provide the proper functionality this C handle can be 1394 * used to interact directly with the core librdkafka API. 1395 * 1396 * @remark The lifetime of the returned pointer is the same as the Conf 1397 * object this method is called on. 1398 * 1399 * @remark Include <rdkafka/rdkafka.h> prior to including 1400 * <rdkafka/rdkafkacpp.h> 1401 * 1402 * @returns \c rd_kafka_conf_t* if this is a CONF_GLOBAL object, else NULL. 1403 */ 1404 virtual struct rd_kafka_conf_s *c_ptr_global () = 0; 1405 1406 /** 1407 * @brief Returns the underlying librdkafka C rd_kafka_topic_conf_t handle. 1408 * 1409 * @warning Calling the C API on this handle is not recommended and there 1410 * is no official support for it, but for cases where the C++ 1411 * does not provide the proper functionality this C handle can be 1412 * used to interact directly with the core librdkafka API. 1413 * 1414 * @remark The lifetime of the returned pointer is the same as the Conf 1415 * object this method is called on. 1416 * 1417 * @remark Include <rdkafka/rdkafka.h> prior to including 1418 * <rdkafka/rdkafkacpp.h> 1419 * 1420 * @returns \c rd_kafka_topic_conf_t* if this is a CONF_TOPIC object, 1421 * else NULL. 1422 */ 1423 virtual struct rd_kafka_topic_conf_s *c_ptr_topic () = 0; 1424 1425 /** 1426 * @brief Set callback_data for ssl engine. 1427 * 1428 * @remark The \c ssl.engine.location configuration must be set for this 1429 * to have affect. 1430 * 1431 * @remark The memory pointed to by \p value must remain valid for the 1432 * lifetime of the configuration object and any Kafka clients that 1433 * use it. 1434 * 1435 * @returns CONF_OK on success, else CONF_INVALID. 1436 */ 1437 virtual Conf::ConfResult set_engine_callback_data (void *value, 1438 std::string &errstr) = 0; 1439 }; 1440 1441 /**@}*/ 1442 1443 1444 /** 1445 * @name Kafka base client handle 1446 * @{ 1447 * 1448 */ 1449 1450 /** 1451 * @brief Base handle, super class for specific clients. 1452 */ 1453 class RD_EXPORT Handle { 1454 public: ~Handle()1455 virtual ~Handle() { } 1456 1457 /** @returns the name of the handle */ 1458 virtual const std::string name () const = 0; 1459 1460 /** 1461 * @brief Returns the client's broker-assigned group member id 1462 * 1463 * @remark This currently requires the high-level KafkaConsumer 1464 * 1465 * @returns Last assigned member id, or empty string if not currently 1466 * a group member. 1467 */ 1468 virtual const std::string memberid () const = 0; 1469 1470 1471 /** 1472 * @brief Polls the provided kafka handle for events. 1473 * 1474 * Events will trigger application provided callbacks to be called. 1475 * 1476 * The \p timeout_ms argument specifies the maximum amount of time 1477 * (in milliseconds) that the call will block waiting for events. 1478 * For non-blocking calls, provide 0 as \p timeout_ms. 1479 * To wait indefinately for events, provide -1. 1480 * 1481 * Events: 1482 * - delivery report callbacks (if an RdKafka::DeliveryCb is configured) [producer] 1483 * - event callbacks (if an RdKafka::EventCb is configured) [producer & consumer] 1484 * 1485 * @remark An application should make sure to call poll() at regular 1486 * intervals to serve any queued callbacks waiting to be called. 1487 * 1488 * @warning This method MUST NOT be used with the RdKafka::KafkaConsumer, 1489 * use its RdKafka::KafkaConsumer::consume() instead. 1490 * 1491 * @returns the number of events served. 1492 */ 1493 virtual int poll (int timeout_ms) = 0; 1494 1495 /** 1496 * @brief Returns the current out queue length 1497 * 1498 * The out queue contains messages and requests waiting to be sent to, 1499 * or acknowledged by, the broker. 1500 */ 1501 virtual int outq_len () = 0; 1502 1503 /** 1504 * @brief Request Metadata from broker. 1505 * 1506 * Parameters: 1507 * \p all_topics - if non-zero: request info about all topics in cluster, 1508 * if zero: only request info about locally known topics. 1509 * \p only_rkt - only request info about this topic 1510 * \p metadatap - pointer to hold metadata result. 1511 * The \p *metadatap pointer must be released with \c delete. 1512 * \p timeout_ms - maximum response time before failing. 1513 * 1514 * @returns RdKafka::ERR_NO_ERROR on success (in which case \p *metadatap 1515 * will be set), else RdKafka::ERR__TIMED_OUT on timeout or 1516 * other error code on error. 1517 */ 1518 virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, 1519 Metadata **metadatap, int timeout_ms) = 0; 1520 1521 1522 /** 1523 * @brief Pause producing or consumption for the provided list of partitions. 1524 * 1525 * Success or error is returned per-partition in the \p partitions list. 1526 * 1527 * @returns ErrorCode::NO_ERROR 1528 * 1529 * @sa resume() 1530 */ 1531 virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0; 1532 1533 1534 /** 1535 * @brief Resume producing or consumption for the provided list of partitions. 1536 * 1537 * Success or error is returned per-partition in the \p partitions list. 1538 * 1539 * @returns ErrorCode::NO_ERROR 1540 * 1541 * @sa pause() 1542 */ 1543 virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0; 1544 1545 1546 /** 1547 * @brief Query broker for low (oldest/beginning) 1548 * and high (newest/end) offsets for partition. 1549 * 1550 * Offsets are returned in \p *low and \p *high respectively. 1551 * 1552 * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure. 1553 */ 1554 virtual ErrorCode query_watermark_offsets (const std::string &topic, 1555 int32_t partition, 1556 int64_t *low, int64_t *high, 1557 int timeout_ms) = 0; 1558 1559 /** 1560 * @brief Get last known low (oldest/beginning) 1561 * and high (newest/end) offsets for partition. 1562 * 1563 * The low offset is updated periodically (if statistics.interval.ms is set) 1564 * while the high offset is updated on each fetched message set from the 1565 * broker. 1566 * 1567 * If there is no cached offset (either low or high, or both) then 1568 * OFFSET_INVALID will be returned for the respective offset. 1569 * 1570 * Offsets are returned in \p *low and \p *high respectively. 1571 * 1572 * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure. 1573 * 1574 * @remark Shall only be used with an active consumer instance. 1575 */ 1576 virtual ErrorCode get_watermark_offsets (const std::string &topic, 1577 int32_t partition, 1578 int64_t *low, int64_t *high) = 0; 1579 1580 1581 /** 1582 * @brief Look up the offsets for the given partitions by timestamp. 1583 * 1584 * The returned offset for each partition is the earliest offset whose 1585 * timestamp is greater than or equal to the given timestamp in the 1586 * corresponding partition. 1587 * 1588 * The timestamps to query are represented as \c offset in \p offsets 1589 * on input, and \c offset() will return the closest earlier offset 1590 * for the timestamp on output. 1591 * 1592 * Timestamps are expressed as milliseconds since epoch (UTC). 1593 * 1594 * The function will block for at most \p timeout_ms milliseconds. 1595 * 1596 * @remark Duplicate Topic+Partitions are not supported. 1597 * @remark Errors are also returned per TopicPartition, see \c err() 1598 * 1599 * @returns an error code for general errors, else RdKafka::ERR_NO_ERROR 1600 * in which case per-partition errors might be set. 1601 */ 1602 virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets, 1603 int timeout_ms) = 0; 1604 1605 1606 /** 1607 * @brief Retrieve queue for a given partition. 1608 * 1609 * @returns The fetch queue for the given partition if successful. Else, 1610 * NULL is returned. 1611 * 1612 * @remark This function only works on consumers. 1613 */ 1614 virtual Queue *get_partition_queue (const TopicPartition *partition) = 0; 1615 1616 /** 1617 * @brief Forward librdkafka logs (and debug) to the specified queue 1618 * for serving with one of the ..poll() calls. 1619 * 1620 * This allows an application to serve log callbacks (\c log_cb) 1621 * in its thread of choice. 1622 * 1623 * @param queue Queue to forward logs to. If the value is NULL the logs 1624 * are forwarded to the main queue. 1625 * 1626 * @remark The configuration property \c log.queue MUST also be set to true. 1627 * 1628 * @remark librdkafka maintains its own reference to the provided queue. 1629 * 1630 * @returns ERR_NO_ERROR on success or an error code on error. 1631 */ 1632 virtual ErrorCode set_log_queue (Queue *queue) = 0; 1633 1634 /** 1635 * @brief Cancels the current callback dispatcher (Handle::poll(), 1636 * KafkaConsumer::consume(), etc). 1637 * 1638 * A callback may use this to force an immediate return to the calling 1639 * code (caller of e.g. Handle::poll()) without processing any further 1640 * events. 1641 * 1642 * @remark This function MUST ONLY be called from within a 1643 * librdkafka callback. 1644 */ 1645 virtual void yield () = 0; 1646 1647 /** 1648 * @brief Returns the ClusterId as reported in broker metadata. 1649 * 1650 * @param timeout_ms If there is no cached value from metadata retrieval 1651 * then this specifies the maximum amount of time 1652 * (in milliseconds) the call will block waiting 1653 * for metadata to be retrieved. 1654 * Use 0 for non-blocking calls. 1655 * 1656 * @remark Requires broker version >=0.10.0 and api.version.request=true. 1657 * 1658 * @returns Last cached ClusterId, or empty string if no ClusterId could be 1659 * retrieved in the allotted timespan. 1660 */ 1661 virtual const std::string clusterid (int timeout_ms) = 0; 1662 1663 /** 1664 * @brief Returns the underlying librdkafka C rd_kafka_t handle. 1665 * 1666 * @warning Calling the C API on this handle is not recommended and there 1667 * is no official support for it, but for cases where the C++ 1668 * does not provide the proper functionality this C handle can be 1669 * used to interact directly with the core librdkafka API. 1670 * 1671 * @remark The lifetime of the returned pointer is the same as the Topic 1672 * object this method is called on. 1673 * 1674 * @remark Include <rdkafka/rdkafka.h> prior to including 1675 * <rdkafka/rdkafkacpp.h> 1676 * 1677 * @returns \c rd_kafka_t* 1678 */ 1679 virtual struct rd_kafka_s *c_ptr () = 0; 1680 1681 /** 1682 * @brief Returns the current ControllerId (controller broker id) 1683 * as reported in broker metadata. 1684 * 1685 * @param timeout_ms If there is no cached value from metadata retrieval 1686 * then this specifies the maximum amount of time 1687 * (in milliseconds) the call will block waiting 1688 * for metadata to be retrieved. 1689 * Use 0 for non-blocking calls. 1690 * 1691 * @remark Requires broker version >=0.10.0 and api.version.request=true. 1692 * 1693 * @returns Last cached ControllerId, or -1 if no ControllerId could be 1694 * retrieved in the allotted timespan. 1695 */ 1696 virtual int32_t controllerid (int timeout_ms) = 0; 1697 1698 1699 /** 1700 * @brief Returns the first fatal error set on this client instance, 1701 * or ERR_NO_ERROR if no fatal error has occurred. 1702 * 1703 * This function is to be used with the Idempotent Producer and 1704 * the Event class for \c EVENT_ERROR events to detect fatal errors. 1705 * 1706 * Generally all errors raised by the error event are to be considered 1707 * informational and temporary, the client will try to recover from all 1708 * errors in a graceful fashion (by retrying, etc). 1709 * 1710 * However, some errors should logically be considered fatal to retain 1711 * consistency; in particular a set of errors that may occur when using the 1712 * Idempotent Producer and the in-order or exactly-once producer guarantees 1713 * can't be satisfied. 1714 * 1715 * @param errstr A human readable error string if a fatal error was set. 1716 * 1717 * @returns ERR_NO_ERROR if no fatal error has been raised, else 1718 * any other error code. 1719 */ 1720 virtual ErrorCode fatal_error (std::string &errstr) const = 0; 1721 1722 /** 1723 * @brief Set SASL/OAUTHBEARER token and metadata 1724 * 1725 * @param token_value the mandatory token value to set, often (but not 1726 * necessarily) a JWS compact serialization as per 1727 * https://tools.ietf.org/html/rfc7515#section-3.1. 1728 * @param md_lifetime_ms when the token expires, in terms of the number of 1729 * milliseconds since the epoch. 1730 * @param md_principal_name the Kafka principal name associated with the 1731 * token. 1732 * @param extensions potentially empty SASL extension keys and values where 1733 * element [i] is the key and [i+1] is the key's value, to be communicated 1734 * to the broker as additional key-value pairs during the initial client 1735 * response as per https://tools.ietf.org/html/rfc7628#section-3.1. The 1736 * number of SASL extension keys plus values must be a non-negative multiple 1737 * of 2. Any provided keys and values are copied. 1738 * @param errstr A human readable error string is written here, only if 1739 * there is an error. 1740 * 1741 * The SASL/OAUTHBEARER token refresh callback should invoke 1742 * this method upon success. The extension keys must not include the reserved 1743 * key "`auth`", and all extension keys and values must conform to the 1744 * required format as per https://tools.ietf.org/html/rfc7628#section-3.1: 1745 * 1746 * key = 1*(ALPHA) 1747 * value = *(VCHAR / SP / HTAB / CR / LF ) 1748 * 1749 * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise \p errstr set 1750 * and:<br> 1751 * \c RdKafka::ERR__INVALID_ARG if any of the arguments are 1752 * invalid;<br> 1753 * \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not 1754 * supported by this build;<br> 1755 * \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is 1756 * not configured as the client's authentication mechanism.<br> 1757 * 1758 * @sa RdKafka::oauthbearer_set_token_failure 1759 * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb" 1760 */ 1761 virtual ErrorCode oauthbearer_set_token (const std::string &token_value, 1762 int64_t md_lifetime_ms, 1763 const std::string &md_principal_name, 1764 const std::list<std::string> &extensions, 1765 std::string &errstr) = 0; 1766 1767 /** 1768 * @brief SASL/OAUTHBEARER token refresh failure indicator. 1769 * 1770 * @param errstr human readable error reason for failing to acquire a token. 1771 * 1772 * The SASL/OAUTHBEARER token refresh callback should 1773 * invoke this method upon failure to refresh the token. 1774 * 1775 * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise:<br> 1776 * \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not 1777 * supported by this build;<br> 1778 * \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is 1779 * not configured as the client's authentication mechanism. 1780 * 1781 * @sa RdKafka::oauthbearer_set_token 1782 * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb" 1783 */ 1784 virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr) = 0; 1785 1786 /** 1787 * @brief Allocate memory using the same allocator librdkafka uses. 1788 * 1789 * This is typically an abstraction for the malloc(3) call and makes sure 1790 * the application can use the same memory allocator as librdkafka for 1791 * allocating pointers that are used by librdkafka. 1792 * 1793 * @remark Memory allocated by mem_malloc() must be freed using 1794 * mem_free(). 1795 */ 1796 virtual void *mem_malloc (size_t size) = 0; 1797 1798 /** 1799 * @brief Free pointer returned by librdkafka 1800 * 1801 * This is typically an abstraction for the free(3) call and makes sure 1802 * the application can use the same memory allocator as librdkafka for 1803 * freeing pointers returned by librdkafka. 1804 * 1805 * In standard setups it is usually not necessary to use this interface 1806 * rather than the free(3) function. 1807 * 1808 * @remark mem_free() must only be used for pointers returned by APIs 1809 * that explicitly mention using this function for freeing. 1810 */ 1811 virtual void mem_free (void *ptr) = 0; 1812 }; 1813 1814 1815 /**@}*/ 1816 1817 1818 /** 1819 * @name Topic and partition objects 1820 * @{ 1821 * 1822 */ 1823 1824 /** 1825 * @brief Topic+Partition 1826 * 1827 * This is a generic type to hold a single partition and various 1828 * information about it. 1829 * 1830 * Is typically used with std::vector<RdKafka::TopicPartition*> to provide 1831 * a list of partitions for different operations. 1832 */ 1833 class RD_EXPORT TopicPartition { 1834 public: 1835 /** 1836 * @brief Create topic+partition object for \p topic and \p partition. 1837 * 1838 * Use \c delete to deconstruct. 1839 */ 1840 static TopicPartition *create (const std::string &topic, int partition); 1841 1842 /** 1843 * @brief Create topic+partition object for \p topic and \p partition 1844 * with offset \p offset. 1845 * 1846 * Use \c delete to deconstruct. 1847 */ 1848 static TopicPartition *create (const std::string &topic, int partition, 1849 int64_t offset); 1850 1851 virtual ~TopicPartition() = 0; 1852 1853 /** 1854 * @brief Destroy/delete the TopicPartitions in \p partitions 1855 * and clear the vector. 1856 */ 1857 static void destroy (std::vector<TopicPartition*> &partitions); 1858 1859 /** @returns topic name */ 1860 virtual const std::string &topic () const = 0; 1861 1862 /** @returns partition id */ 1863 virtual int partition () const = 0; 1864 1865 /** @returns offset (if applicable) */ 1866 virtual int64_t offset () const = 0; 1867 1868 /** @brief Set offset */ 1869 virtual void set_offset (int64_t offset) = 0; 1870 1871 /** @returns error code (if applicable) */ 1872 virtual ErrorCode err () const = 0; 1873 }; 1874 1875 1876 1877 /** 1878 * @brief Topic handle 1879 * 1880 */ 1881 class RD_EXPORT Topic { 1882 public: 1883 /** 1884 * @brief Unassigned partition. 1885 * 1886 * The unassigned partition is used by the producer API for messages 1887 * that should be partitioned using the configured or default partitioner. 1888 */ 1889 static const int32_t PARTITION_UA; 1890 1891 /** @brief Special offsets */ 1892 static const int64_t OFFSET_BEGINNING; /**< Consume from beginning */ 1893 static const int64_t OFFSET_END; /**< Consume from end */ 1894 static const int64_t OFFSET_STORED; /**< Use offset storage */ 1895 static const int64_t OFFSET_INVALID; /**< Invalid offset */ 1896 1897 1898 /** 1899 * @brief Creates a new topic handle for topic named \p topic_str 1900 * 1901 * \p conf is an optional configuration for the topic that will be used 1902 * instead of the default topic configuration. 1903 * The \p conf object is reusable after this call. 1904 * 1905 * @returns the new topic handle or NULL on error (see \p errstr). 1906 */ 1907 static Topic *create (Handle *base, const std::string &topic_str, 1908 const Conf *conf, std::string &errstr); 1909 1910 virtual ~Topic () = 0; 1911 1912 1913 /** @returns the topic name */ 1914 virtual const std::string name () const = 0; 1915 1916 /** 1917 * @returns true if \p partition is available for the topic (has leader). 1918 * @warning \b MUST \b ONLY be called from within a 1919 * RdKafka::PartitionerCb callback. 1920 */ 1921 virtual bool partition_available (int32_t partition) const = 0; 1922 1923 /** 1924 * @brief Store offset \p offset + 1 for topic partition \p partition. 1925 * The offset will be committed (written) to the broker (or file) according 1926 * to \p auto.commit.interval.ms or next manual offset-less commit call. 1927 * 1928 * @remark \c enable.auto.offset.store must be set to \c false when using 1929 * this API. 1930 * 1931 * @returns RdKafka::ERR_NO_ERROR on success or an error code if none of the 1932 * offsets could be stored. 1933 */ 1934 virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0; 1935 1936 /** 1937 * @brief Returns the underlying librdkafka C rd_kafka_topic_t handle. 1938 * 1939 * @warning Calling the C API on this handle is not recommended and there 1940 * is no official support for it, but for cases where the C++ API 1941 * does not provide the underlying functionality this C handle can be 1942 * used to interact directly with the core librdkafka API. 1943 * 1944 * @remark The lifetime of the returned pointer is the same as the Topic 1945 * object this method is called on. 1946 * 1947 * @remark Include <rdkafka/rdkafka.h> prior to including 1948 * <rdkafka/rdkafkacpp.h> 1949 * 1950 * @returns \c rd_kafka_topic_t* 1951 */ 1952 virtual struct rd_kafka_topic_s *c_ptr () = 0; 1953 }; 1954 1955 1956 /**@}*/ 1957 1958 1959 /** 1960 * @name Message object 1961 * @{ 1962 * 1963 */ 1964 1965 1966 /** 1967 * @brief Message timestamp object 1968 * 1969 * Represents the number of milliseconds since the epoch (UTC). 1970 * 1971 * The MessageTimestampType dictates the timestamp type or origin. 1972 * 1973 * @remark Requires Apache Kafka broker version >= 0.10.0 1974 * 1975 */ 1976 1977 class RD_EXPORT MessageTimestamp { 1978 public: 1979 /*! Message timestamp type */ 1980 enum MessageTimestampType { 1981 MSG_TIMESTAMP_NOT_AVAILABLE, /**< Timestamp not available */ 1982 MSG_TIMESTAMP_CREATE_TIME, /**< Message creation time (source) */ 1983 MSG_TIMESTAMP_LOG_APPEND_TIME /**< Message log append time (broker) */ 1984 }; 1985 1986 MessageTimestampType type; /**< Timestamp type */ 1987 int64_t timestamp; /**< Milliseconds since epoch (UTC). */ 1988 }; 1989 1990 1991 /** 1992 * @brief Headers object 1993 * 1994 * Represents message headers. 1995 * 1996 * https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers 1997 * 1998 * @remark Requires Apache Kafka >= 0.11.0 brokers 1999 */ 2000 class RD_EXPORT Headers { 2001 public: 2002 virtual ~Headers() = 0; 2003 2004 /** 2005 * @brief Header object 2006 * 2007 * This object represents a single Header with a key value pair 2008 * and an ErrorCode 2009 * 2010 * @remark dynamic allocation of this object is not supported. 2011 */ 2012 class Header { 2013 public: 2014 /** 2015 * @brief Header object to encapsulate a single Header 2016 * 2017 * @param key the string value for the header key 2018 * @param value the bytes of the header value, or NULL 2019 * @param value_size the length in bytes of the header value 2020 * 2021 * @remark key and value are copied. 2022 * 2023 */ Header(const std::string & key,const void * value,size_t value_size)2024 Header(const std::string &key, 2025 const void *value, 2026 size_t value_size): 2027 key_(key), err_(ERR_NO_ERROR), value_size_(value_size) { 2028 value_ = copy_value(value, value_size); 2029 } 2030 2031 /** 2032 * @brief Header object to encapsulate a single Header 2033 * 2034 * @param key the string value for the header key 2035 * @param value the bytes of the header value 2036 * @param value_size the length in bytes of the header value 2037 * @param err the error code if one returned 2038 * 2039 * @remark The error code is used for when the Header is constructed 2040 * internally by using RdKafka::Headers::get_last which constructs 2041 * a Header encapsulating the ErrorCode in the process. 2042 * If err is set, the value and value_size fields will be undefined. 2043 */ Header(const std::string & key,const void * value,size_t value_size,const RdKafka::ErrorCode err)2044 Header(const std::string &key, 2045 const void *value, 2046 size_t value_size, 2047 const RdKafka::ErrorCode err): 2048 key_(key), err_(err), value_(NULL), value_size_(value_size) { 2049 if (err == ERR_NO_ERROR) 2050 value_ = copy_value(value, value_size); 2051 } 2052 2053 /** 2054 * @brief Copy constructor 2055 * 2056 * @param other Header to make a copy of. 2057 */ Header(const Header & other)2058 Header(const Header &other): 2059 key_(other.key_), err_(other.err_), value_size_(other.value_size_) { 2060 value_ = copy_value(other.value_, value_size_); 2061 } 2062 2063 /** 2064 * @brief Assignment operator 2065 * 2066 * @param other Header to make a copy of. 2067 */ 2068 Header& operator=(const Header &other) 2069 { 2070 if (&other == this) { 2071 return *this; 2072 } 2073 2074 key_ = other.key_; 2075 err_ = other.err_; 2076 value_size_ = other.value_size_; 2077 2078 if (value_ != NULL) 2079 mem_free(value_); 2080 2081 value_ = copy_value(other.value_, value_size_); 2082 2083 return *this; 2084 } 2085 ~Header()2086 ~Header() { 2087 if (value_ != NULL) 2088 mem_free(value_); 2089 } 2090 2091 /** @returns the key/name associated with this Header */ key()2092 std::string key() const { 2093 return key_; 2094 } 2095 2096 /** @returns returns the binary value, or NULL */ value()2097 const void *value() const { 2098 return value_; 2099 } 2100 2101 /** @returns returns the value casted to a nul-terminated C string, 2102 * or NULL. */ value_string()2103 const char *value_string() const { 2104 return static_cast<const char *>(value_); 2105 } 2106 2107 /** @returns Value Size the length of the Value in bytes */ value_size()2108 size_t value_size() const { 2109 return value_size_; 2110 } 2111 2112 /** @returns the error code of this Header (usually ERR_NO_ERROR) */ err()2113 RdKafka::ErrorCode err() const { 2114 return err_; 2115 } 2116 2117 private: copy_value(const void * value,size_t value_size)2118 char *copy_value(const void *value, size_t value_size) { 2119 if (!value) 2120 return NULL; 2121 2122 char *dest = (char *)mem_malloc(value_size + 1); 2123 memcpy(dest, (const char *)value, value_size); 2124 dest[value_size] = '\0'; 2125 2126 return dest; 2127 } 2128 2129 std::string key_; 2130 RdKafka::ErrorCode err_; 2131 char *value_; 2132 size_t value_size_; 2133 void *operator new(size_t); /* Prevent dynamic allocation */ 2134 }; 2135 2136 /** 2137 * @brief Create a new instance of the Headers object 2138 * 2139 * @returns an empty Headers list 2140 */ 2141 static Headers *create(); 2142 2143 /** 2144 * @brief Create a new instance of the Headers object from a std::vector 2145 * 2146 * @param headers std::vector of RdKafka::Headers::Header objects. 2147 * The headers are copied, not referenced. 2148 * 2149 * @returns a Headers list from std::vector set to the size of the std::vector 2150 */ 2151 static Headers *create(const std::vector<Header> &headers); 2152 2153 /** 2154 * @brief Adds a Header to the end of the list. 2155 * 2156 * @param key header key/name 2157 * @param value binary value, or NULL 2158 * @param value_size size of the value 2159 * 2160 * @returns an ErrorCode signalling success or failure to add the header. 2161 */ 2162 virtual ErrorCode add(const std::string &key, const void *value, 2163 size_t value_size) = 0; 2164 2165 /** 2166 * @brief Adds a Header to the end of the list. 2167 * 2168 * Convenience method for adding a std::string as a value for the header. 2169 * 2170 * @param key header key/name 2171 * @param value value string 2172 * 2173 * @returns an ErrorCode signalling success or failure to add the header. 2174 */ 2175 virtual ErrorCode add(const std::string &key, const std::string &value) = 0; 2176 2177 /** 2178 * @brief Adds a Header to the end of the list. 2179 * 2180 * This method makes a copy of the passed header. 2181 * 2182 * @param header Existing header to copy 2183 * 2184 * @returns an ErrorCode signalling success or failure to add the header. 2185 */ 2186 virtual ErrorCode add(const Header &header) = 0; 2187 2188 /** 2189 * @brief Removes all the Headers of a given key 2190 * 2191 * @param key header key/name to remove 2192 * 2193 * @returns An ErrorCode signalling a success or failure to remove the Header. 2194 */ 2195 virtual ErrorCode remove(const std::string &key) = 0; 2196 2197 /** 2198 * @brief Gets all of the Headers of a given key 2199 * 2200 * @param key header key/name 2201 * 2202 * @remark If duplicate keys exist this will return them all as a std::vector 2203 * 2204 * @returns a std::vector containing all the Headers of the given key. 2205 */ 2206 virtual std::vector<Header> get(const std::string &key) const = 0; 2207 2208 /** 2209 * @brief Gets the last occurrence of a Header of a given key 2210 * 2211 * @param key header key/name 2212 * 2213 * @remark This will only return the most recently added header 2214 * 2215 * @returns the Header if found, otherwise a Header with an err set to 2216 * ERR__NOENT. 2217 */ 2218 virtual Header get_last(const std::string &key) const = 0; 2219 2220 /** 2221 * @brief Returns all Headers 2222 * 2223 * @returns a std::vector containing all of the Headers 2224 */ 2225 virtual std::vector<Header> get_all() const = 0; 2226 2227 /** 2228 * @returns the number of headers. 2229 */ 2230 virtual size_t size() const = 0; 2231 }; 2232 2233 2234 /** 2235 * @brief Message object 2236 * 2237 * This object represents either a single consumed or produced message, 2238 * or an event (\p err() is set). 2239 * 2240 * An application must check RdKafka::Message::err() to see if the 2241 * object is a proper message (error is RdKafka::ERR_NO_ERROR) or a 2242 * an error event. 2243 * 2244 */ 2245 class RD_EXPORT Message { 2246 public: 2247 /** @brief Message persistence status can be used by the application to 2248 * find out if a produced message was persisted in the topic log. */ 2249 enum Status { 2250 /** Message was never transmitted to the broker, or failed with 2251 * an error indicating it was not written to the log. 2252 * Application retry risks ordering, but not duplication. */ 2253 MSG_STATUS_NOT_PERSISTED = 0, 2254 2255 /** Message was transmitted to broker, but no acknowledgement was 2256 * received. 2257 * Application retry risks ordering and duplication. */ 2258 MSG_STATUS_POSSIBLY_PERSISTED = 1, 2259 2260 /** Message was written to the log and fully acknowledged. 2261 * No reason for application to retry. 2262 * Note: this value should only be trusted with \c acks=all. */ 2263 MSG_STATUS_PERSISTED = 2, 2264 }; 2265 2266 /** 2267 * @brief Accessor functions* 2268 * @remark Not all fields are present in all types of callbacks. 2269 */ 2270 2271 /** @returns The error string if object represent an error event, 2272 * else an empty string. */ 2273 virtual std::string errstr() const = 0; 2274 2275 /** @returns The error code if object represents an error event, else 0. */ 2276 virtual ErrorCode err () const = 0; 2277 2278 /** @returns the RdKafka::Topic object for a message (if applicable), 2279 * or NULL if a corresponding RdKafka::Topic object has not been 2280 * explicitly created with RdKafka::Topic::create(). 2281 * In this case use topic_name() instead. */ 2282 virtual Topic *topic () const = 0; 2283 2284 /** @returns Topic name (if applicable, else empty string) */ 2285 virtual std::string topic_name () const = 0; 2286 2287 /** @returns Partition (if applicable) */ 2288 virtual int32_t partition () const = 0; 2289 2290 /** @returns Message payload (if applicable) */ 2291 virtual void *payload () const = 0 ; 2292 2293 /** @returns Message payload length (if applicable) */ 2294 virtual size_t len () const = 0; 2295 2296 /** @returns Message key as string (if applicable) */ 2297 virtual const std::string *key () const = 0; 2298 2299 /** @returns Message key as void pointer (if applicable) */ 2300 virtual const void *key_pointer () const = 0 ; 2301 2302 /** @returns Message key's binary length (if applicable) */ 2303 virtual size_t key_len () const = 0; 2304 2305 /** @returns Message or error offset (if applicable) */ 2306 virtual int64_t offset () const = 0; 2307 2308 /** @returns Message timestamp (if applicable) */ 2309 virtual MessageTimestamp timestamp () const = 0; 2310 2311 /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */ 2312 virtual void *msg_opaque () const = 0; 2313 2314 virtual ~Message () = 0; 2315 2316 /** @returns the latency in microseconds for a produced message measured 2317 * from the produce() call, or -1 if latency is not available. */ 2318 virtual int64_t latency () const = 0; 2319 2320 /** 2321 * @brief Returns the underlying librdkafka C rd_kafka_message_t handle. 2322 * 2323 * @warning Calling the C API on this handle is not recommended and there 2324 * is no official support for it, but for cases where the C++ API 2325 * does not provide the underlying functionality this C handle can be 2326 * used to interact directly with the core librdkafka API. 2327 * 2328 * @remark The lifetime of the returned pointer is the same as the Message 2329 * object this method is called on. 2330 * 2331 * @remark Include <rdkafka/rdkafka.h> prior to including 2332 * <rdkafka/rdkafkacpp.h> 2333 * 2334 * @returns \c rd_kafka_message_t* 2335 */ 2336 virtual struct rd_kafka_message_s *c_ptr () = 0; 2337 2338 /** 2339 * @brief Returns the message's persistence status in the topic log. 2340 */ 2341 virtual Status status () const = 0; 2342 2343 /** @returns the Headers instance for this Message, or NULL if there 2344 * are no headers. 2345 * 2346 * @remark The lifetime of the Headers are the same as the Message. */ 2347 virtual RdKafka::Headers *headers () = 0; 2348 2349 /** @returns the Headers instance for this Message (if applicable). 2350 * If NULL is returned the reason is given in \p err, which 2351 * is either ERR__NOENT if there were no headers, or another 2352 * error code if header parsing failed. 2353 * 2354 * @remark The lifetime of the Headers are the same as the Message. */ 2355 virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0; 2356 2357 /** @returns the broker id of the broker the message was produced to or 2358 * fetched from, or -1 if not known/applicable. */ 2359 virtual int32_t broker_id () const = 0; 2360 }; 2361 2362 /**@}*/ 2363 2364 2365 /** 2366 * @name Queue interface 2367 * @{ 2368 * 2369 */ 2370 2371 2372 /** 2373 * @brief Queue interface 2374 * 2375 * Create a new message queue. Message queues allows the application 2376 * to re-route consumed messages from multiple topic+partitions into 2377 * one single queue point. This queue point, containing messages from 2378 * a number of topic+partitions, may then be served by a single 2379 * consume() method, rather than one per topic+partition combination. 2380 * 2381 * See the RdKafka::Consumer::start(), RdKafka::Consumer::consume(), and 2382 * RdKafka::Consumer::consume_callback() methods that take a queue as the first 2383 * parameter for more information. 2384 */ 2385 class RD_EXPORT Queue { 2386 public: 2387 /** 2388 * @brief Create Queue object 2389 */ 2390 static Queue *create (Handle *handle); 2391 2392 /** 2393 * @brief Forward/re-route queue to \p dst. 2394 * If \p dst is \c NULL, the forwarding is removed. 2395 * 2396 * The internal refcounts for both queues are increased. 2397 * 2398 * @remark Regardless of whether \p dst is NULL or not, after calling this 2399 * function, \p src will not forward it's fetch queue to the consumer 2400 * queue. 2401 */ 2402 virtual ErrorCode forward (Queue *dst) = 0; 2403 2404 2405 /** 2406 * @brief Consume message or get error event from the queue. 2407 * 2408 * @remark Use \c delete to free the message. 2409 * 2410 * @returns One of: 2411 * - proper message (RdKafka::Message::err() is ERR_NO_ERROR) 2412 * - error event (RdKafka::Message::err() is != ERR_NO_ERROR) 2413 * - timeout due to no message or event in \p timeout_ms 2414 * (RdKafka::Message::err() is ERR__TIMED_OUT) 2415 */ 2416 virtual Message *consume (int timeout_ms) = 0; 2417 2418 /** 2419 * @brief Poll queue, serving any enqueued callbacks. 2420 * 2421 * @remark Must NOT be used for queues containing messages. 2422 * 2423 * @returns the number of events served or 0 on timeout. 2424 */ 2425 virtual int poll (int timeout_ms) = 0; 2426 2427 virtual ~Queue () = 0; 2428 2429 /** 2430 * @brief Enable IO event triggering for queue. 2431 * 2432 * To ease integration with IO based polling loops this API 2433 * allows an application to create a separate file-descriptor 2434 * that librdkafka will write \p payload (of size \p size) to 2435 * whenever a new element is enqueued on a previously empty queue. 2436 * 2437 * To remove event triggering call with \p fd = -1. 2438 * 2439 * librdkafka will maintain a copy of the \p payload. 2440 * 2441 * @remark When using forwarded queues the IO event must only be enabled 2442 * on the final forwarded-to (destination) queue. 2443 */ 2444 virtual void io_event_enable (int fd, const void *payload, size_t size) = 0; 2445 }; 2446 2447 /**@}*/ 2448 2449 /** 2450 * @name ConsumerGroupMetadata 2451 * @{ 2452 * 2453 */ 2454 /** 2455 * @brief ConsumerGroupMetadata holds a consumer instance's group 2456 * metadata state. 2457 * 2458 * This class currently does not have any public methods. 2459 */ 2460 class RD_EXPORT ConsumerGroupMetadata { 2461 public: 2462 virtual ~ConsumerGroupMetadata () = 0; 2463 }; 2464 2465 /**@}*/ 2466 2467 /** 2468 * @name KafkaConsumer 2469 * @{ 2470 * 2471 */ 2472 2473 2474 /** 2475 * @brief High-level KafkaConsumer (for brokers 0.9 and later) 2476 * 2477 * @remark Requires Apache Kafka >= 0.9.0 brokers 2478 * 2479 * Currently supports the \c range and \c roundrobin partition assignment 2480 * strategies (see \c partition.assignment.strategy) 2481 */ 2482 class RD_EXPORT KafkaConsumer : public virtual Handle { 2483 public: 2484 /** 2485 * @brief Creates a KafkaConsumer. 2486 * 2487 * The \p conf object must have \c group.id set to the consumer group to join. 2488 * 2489 * Use RdKafka::KafkaConsumer::close() to shut down the consumer. 2490 * 2491 * @sa RdKafka::RebalanceCb 2492 * @sa CONFIGURATION.md for \c group.id, \c session.timeout.ms, 2493 * \c partition.assignment.strategy, etc. 2494 */ 2495 static KafkaConsumer *create (const Conf *conf, std::string &errstr); 2496 2497 virtual ~KafkaConsumer () = 0; 2498 2499 2500 /** @brief Returns the current partition assignment as set by 2501 * RdKafka::KafkaConsumer::assign() */ 2502 virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0; 2503 2504 /** @brief Returns the current subscription as set by 2505 * RdKafka::KafkaConsumer::subscribe() */ 2506 virtual ErrorCode subscription (std::vector<std::string> &topics) = 0; 2507 2508 /** 2509 * @brief Update the subscription set to \p topics. 2510 * 2511 * Any previous subscription will be unassigned and unsubscribed first. 2512 * 2513 * The subscription set denotes the desired topics to consume and this 2514 * set is provided to the partition assignor (one of the elected group 2515 * members) for all clients which then uses the configured 2516 * \c partition.assignment.strategy to assign the subscription sets's 2517 * topics's partitions to the consumers, depending on their subscription. 2518 * 2519 * The result of such an assignment is a rebalancing which is either 2520 * handled automatically in librdkafka or can be overridden by the application 2521 * by providing a RdKafka::RebalanceCb. 2522 * 2523 * The rebalancing passes the assigned partition set to 2524 * RdKafka::KafkaConsumer::assign() to update what partitions are actually 2525 * being fetched by the KafkaConsumer. 2526 * 2527 * Regex pattern matching automatically performed for topics prefixed 2528 * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\" 2529 * 2530 * @remark A consumer error will be raised for each unavailable topic in the 2531 * \p topics. The error will be ERR_UNKNOWN_TOPIC_OR_PART 2532 * for non-existent topics, and 2533 * ERR_TOPIC_AUTHORIZATION_FAILED for unauthorized topics. 2534 * The consumer error will be raised through consume() (et.al.) 2535 * with the \c RdKafka::Message::err() returning one of the 2536 * error codes mentioned above. 2537 * The subscribe function itself is asynchronous and will not return 2538 * an error on unavailable topics. 2539 * 2540 * @returns an error if the provided list of topics is invalid. 2541 */ 2542 virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0; 2543 2544 /** @brief Unsubscribe from the current subscription set. */ 2545 virtual ErrorCode unsubscribe () = 0; 2546 2547 /** 2548 * @brief Update the assignment set to \p partitions. 2549 * 2550 * The assignment set is the set of partitions actually being consumed 2551 * by the KafkaConsumer. 2552 */ 2553 virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0; 2554 2555 /** 2556 * @brief Stop consumption and remove the current assignment. 2557 */ 2558 virtual ErrorCode unassign () = 0; 2559 2560 /** 2561 * @brief Consume message or get error event, triggers callbacks. 2562 * 2563 * Will automatically call registered callbacks for any such queued events, 2564 * including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb, 2565 * etc. 2566 * 2567 * @remark Use \c delete to free the message. 2568 * 2569 * @remark An application should make sure to call consume() at regular 2570 * intervals, even if no messages are expected, to serve any 2571 * queued callbacks waiting to be called. This is especially 2572 * important when a RebalanceCb has been registered as it needs 2573 * to be called and handled properly to synchronize internal 2574 * consumer state. 2575 * 2576 * @remark Application MUST NOT call \p poll() on KafkaConsumer objects. 2577 * 2578 * @returns One of: 2579 * - proper message (RdKafka::Message::err() is ERR_NO_ERROR) 2580 * - error event (RdKafka::Message::err() is != ERR_NO_ERROR) 2581 * - timeout due to no message or event in \p timeout_ms 2582 * (RdKafka::Message::err() is ERR__TIMED_OUT) 2583 */ 2584 virtual Message *consume (int timeout_ms) = 0; 2585 2586 /** 2587 * @brief Commit offsets for the current assignment. 2588 * 2589 * @remark This is the synchronous variant that blocks until offsets 2590 * are committed or the commit fails (see return value). 2591 * 2592 * @remark If a RdKafka::OffsetCommitCb callback is registered it will 2593 * be called with commit details on a future call to 2594 * RdKafka::KafkaConsumer::consume() 2595 2596 * 2597 * @returns ERR_NO_ERROR or error code. 2598 */ 2599 virtual ErrorCode commitSync () = 0; 2600 2601 /** 2602 * @brief Asynchronous version of RdKafka::KafkaConsumer::CommitSync() 2603 * 2604 * @sa RdKafka::KafkaConsumer::commitSync() 2605 */ 2606 virtual ErrorCode commitAsync () = 0; 2607 2608 /** 2609 * @brief Commit offset for a single topic+partition based on \p message 2610 * 2611 * @remark The offset committed will be the message's offset + 1. 2612 * 2613 * @remark This is the synchronous variant. 2614 * 2615 * @sa RdKafka::KafkaConsumer::commitSync() 2616 */ 2617 virtual ErrorCode commitSync (Message *message) = 0; 2618 2619 /** 2620 * @brief Commit offset for a single topic+partition based on \p message 2621 * 2622 * @remark The offset committed will be the message's offset + 1. 2623 * 2624 * @remark This is the asynchronous variant. 2625 * 2626 * @sa RdKafka::KafkaConsumer::commitSync() 2627 */ 2628 virtual ErrorCode commitAsync (Message *message) = 0; 2629 2630 /** 2631 * @brief Commit offsets for the provided list of partitions. 2632 * 2633 * @remark The \c .offset of the partitions in \p offsets should be the 2634 * offset where consumption will resume, i.e., the last 2635 * processed offset + 1. 2636 * 2637 * @remark This is the synchronous variant. 2638 */ 2639 virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0; 2640 2641 /** 2642 * @brief Commit offset for the provided list of partitions. 2643 * 2644 * @remark The \c .offset of the partitions in \p offsets should be the 2645 * offset where consumption will resume, i.e., the last 2646 * processed offset + 1. 2647 * 2648 * @remark This is the asynchronous variant. 2649 */ 2650 virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0; 2651 2652 /** 2653 * @brief Commit offsets for the current assignment. 2654 * 2655 * @remark This is the synchronous variant that blocks until offsets 2656 * are committed or the commit fails (see return value). 2657 * 2658 * @remark The provided callback will be called from this function. 2659 * 2660 * @returns ERR_NO_ERROR or error code. 2661 */ 2662 virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0; 2663 2664 /** 2665 * @brief Commit offsets for the provided list of partitions. 2666 * 2667 * @remark This is the synchronous variant that blocks until offsets 2668 * are committed or the commit fails (see return value). 2669 * 2670 * @remark The provided callback will be called from this function. 2671 * 2672 * @returns ERR_NO_ERROR or error code. 2673 */ 2674 virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets, 2675 OffsetCommitCb *offset_commit_cb) = 0; 2676 2677 2678 2679 2680 /** 2681 * @brief Retrieve committed offsets for topics+partitions. 2682 * 2683 * @returns ERR_NO_ERROR on success in which case the 2684 * \p offset or \p err field of each \p partitions' element is filled 2685 * in with the stored offset, or a partition specific error. 2686 * Else returns an error code. 2687 */ 2688 virtual ErrorCode committed (std::vector<TopicPartition*> &partitions, 2689 int timeout_ms) = 0; 2690 2691 /** 2692 * @brief Retrieve current positions (offsets) for topics+partitions. 2693 * 2694 * @returns ERR_NO_ERROR on success in which case the 2695 * \p offset or \p err field of each \p partitions' element is filled 2696 * in with the stored offset, or a partition specific error. 2697 * Else returns an error code. 2698 */ 2699 virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0; 2700 2701 2702 /** 2703 * For pausing and resuming consumption, see 2704 * @sa RdKafka::Handle::pause() and RdKafka::Handle::resume() 2705 */ 2706 2707 2708 /** 2709 * @brief Close and shut down the proper. 2710 * 2711 * This call will block until the following operations are finished: 2712 * - Trigger a local rebalance to void the current assignment 2713 * - Stop consumption for current assignment 2714 * - Commit offsets 2715 * - Leave group 2716 * 2717 * The maximum blocking time is roughly limited to session.timeout.ms. 2718 * 2719 * @remark Callbacks, such as RdKafka::RebalanceCb and 2720 * RdKafka::OffsetCommitCb, etc, may be called. 2721 * 2722 * @remark The consumer object must later be freed with \c delete 2723 */ 2724 virtual ErrorCode close () = 0; 2725 2726 2727 /** 2728 * @brief Seek consumer for topic+partition to offset which is either an 2729 * absolute or logical offset. 2730 * 2731 * If \p timeout_ms is not 0 the call will wait this long for the 2732 * seek to be performed. If the timeout is reached the internal state 2733 * will be unknown and this function returns `ERR__TIMED_OUT`. 2734 * If \p timeout_ms is 0 it will initiate the seek but return 2735 * immediately without any error reporting (e.g., async). 2736 * 2737 * This call triggers a fetch queue barrier flush. 2738 * 2739 * @remark Consumtion for the given partition must have started for the 2740 * seek to work. Use assign() to set the starting offset. 2741 * 2742 * @returns an ErrorCode to indicate success or failure. 2743 */ 2744 virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0; 2745 2746 2747 /** 2748 * @brief Store offset \p offset for topic partition \p partition. 2749 * The offset will be committed (written) to the offset store according 2750 * to \p auto.commit.interval.ms or the next manual offset-less commit*() 2751 * 2752 * Per-partition success/error status propagated through TopicPartition.err() 2753 * 2754 * @remark The \c .offset field is stored as is, it will NOT be + 1. 2755 * 2756 * @remark \c enable.auto.offset.store must be set to \c false when using 2757 * this API. 2758 * 2759 * @returns RdKafka::ERR_NO_ERROR on success, or 2760 * RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could 2761 * be stored, or 2762 * RdKafka::ERR___INVALID_ARG if \c enable.auto.offset.store is true. 2763 */ 2764 virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0; 2765 2766 2767 /** 2768 * @returns the current consumer group metadata associated with this consumer, 2769 * or NULL if the consumer is configured with a \c group.id. 2770 * This metadata object should be passed to the transactional 2771 * producer's RdKafka::Producer::send_offsets_to_transaction() API. 2772 * 2773 * @remark The returned object must be deleted by the application. 2774 * 2775 * @sa RdKafka::Producer::send_offsets_to_transaction() 2776 */ 2777 virtual ConsumerGroupMetadata *groupMetadata () = 0; 2778 2779 2780 /** @brief Check whether the consumer considers the current assignment to 2781 * have been lost involuntarily. This method is only applicable for 2782 * use with a subscribing consumer. Assignments are revoked 2783 * immediately when determined to have been lost, so this method is 2784 * only useful within a rebalance callback. Partitions that have 2785 * been lost may already be owned by other members in the group and 2786 * therefore commiting offsets, for example, may fail. 2787 * 2788 * @remark Calling assign(), incremental_assign() or incremental_unassign() 2789 * resets this flag. 2790 * 2791 * @returns Returns true if the current partition assignment is considered 2792 * lost, false otherwise. 2793 */ 2794 virtual bool assignment_lost () = 0; 2795 2796 /** 2797 * @brief The rebalance protocol currently in use. This will be 2798 * "NONE" if the consumer has not (yet) joined a group, else it will 2799 * match the rebalance protocol ("EAGER", "COOPERATIVE") of the 2800 * configured and selected assignor(s). All configured 2801 * assignors must have the same protocol type, meaning 2802 * online migration of a consumer group from using one 2803 * protocol to another (in particular upgading from EAGER 2804 * to COOPERATIVE) without a restart is not currently 2805 * supported. 2806 * 2807 * @returns an empty string on error, or one of 2808 * "NONE", "EAGER", "COOPERATIVE" on success. 2809 */ 2810 2811 virtual std::string rebalance_protocol () = 0; 2812 2813 2814 /** 2815 * @brief Incrementally add \p partitions to the current assignment. 2816 * 2817 * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, 2818 * this method should be used in a rebalance callback to adjust the current 2819 * assignment appropriately in the case where the rebalance type is 2820 * ERR__ASSIGN_PARTITIONS. The application must pass the partition list 2821 * passed to the callback (or a copy of it), even if the list is empty. 2822 * This method may also be used outside the context of a rebalance callback. 2823 * 2824 * @returns NULL on success, or an error object if the operation was 2825 * unsuccessful. 2826 * 2827 * @remark The returned object must be deleted by the application. 2828 */ 2829 virtual Error *incremental_assign (const std::vector<TopicPartition*> &partitions) = 0; 2830 2831 2832 /** 2833 * @brief Incrementally remove \p partitions from the current assignment. 2834 * 2835 * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, 2836 * this method should be used in a rebalance callback to adjust the current 2837 * assignment appropriately in the case where the rebalance type is 2838 * ERR__REVOKE_PARTITIONS. The application must pass the partition list 2839 * passed to the callback (or a copy of it), even if the list is empty. 2840 * This method may also be used outside the context of a rebalance callback. 2841 * 2842 * @returns NULL on success, or an error object if the operation was 2843 * unsuccessful. 2844 * 2845 * @remark The returned object must be deleted by the application. 2846 */ 2847 virtual Error *incremental_unassign (const std::vector<TopicPartition*> &partitions) = 0; 2848 2849 }; 2850 2851 2852 /**@}*/ 2853 2854 2855 /** 2856 * @name Simple Consumer (legacy) 2857 * @{ 2858 * 2859 */ 2860 2861 /** 2862 * @brief Simple Consumer (legacy) 2863 * 2864 * A simple non-balanced, non-group-aware, consumer. 2865 */ 2866 class RD_EXPORT Consumer : public virtual Handle { 2867 public: 2868 /** 2869 * @brief Creates a new Kafka consumer handle. 2870 * 2871 * \p conf is an optional object that will be used instead of the default 2872 * configuration. 2873 * The \p conf object is reusable after this call. 2874 * 2875 * @returns the new handle on success or NULL on error in which case 2876 * \p errstr is set to a human readable error message. 2877 */ 2878 static Consumer *create (const Conf *conf, std::string &errstr); 2879 2880 virtual ~Consumer () = 0; 2881 2882 2883 /** 2884 * @brief Start consuming messages for topic and \p partition 2885 * at offset \p offset which may either be a proper offset (0..N) 2886 * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END. 2887 * 2888 * rdkafka will attempt to keep \p queued.min.messages (config property) 2889 * messages in the local queue by repeatedly fetching batches of messages 2890 * from the broker until the threshold is reached. 2891 * 2892 * The application shall use one of the \p ..->consume*() functions 2893 * to consume messages from the local queue, each kafka message being 2894 * represented as a `RdKafka::Message *` object. 2895 * 2896 * \p ..->start() must not be called multiple times for the same 2897 * topic and partition without stopping consumption first with 2898 * \p ..->stop(). 2899 * 2900 * @returns an ErrorCode to indicate success or failure. 2901 */ 2902 virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0; 2903 2904 /** 2905 * @brief Start consuming messages for topic and \p partition on 2906 * queue \p queue. 2907 * 2908 * @sa RdKafka::Consumer::start() 2909 */ 2910 virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset, 2911 Queue *queue) = 0; 2912 2913 /** 2914 * @brief Stop consuming messages for topic and \p partition, purging 2915 * all messages currently in the local queue. 2916 * 2917 * The application needs to be stop all consumers before destroying 2918 * the Consumer handle. 2919 * 2920 * @returns an ErrorCode to indicate success or failure. 2921 */ 2922 virtual ErrorCode stop (Topic *topic, int32_t partition) = 0; 2923 2924 /** 2925 * @brief Seek consumer for topic+partition to \p offset which is either an 2926 * absolute or logical offset. 2927 * 2928 * If \p timeout_ms is not 0 the call will wait this long for the 2929 * seek to be performed. If the timeout is reached the internal state 2930 * will be unknown and this function returns `ERR__TIMED_OUT`. 2931 * If \p timeout_ms is 0 it will initiate the seek but return 2932 * immediately without any error reporting (e.g., async). 2933 * 2934 * This call triggers a fetch queue barrier flush. 2935 * 2936 * @returns an ErrorCode to indicate success or failure. 2937 */ 2938 virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset, 2939 int timeout_ms) = 0; 2940 2941 /** 2942 * @brief Consume a single message from \p topic and \p partition. 2943 * 2944 * \p timeout_ms is maximum amount of time to wait for a message to be 2945 * received. 2946 * Consumer must have been previously started with \p ..->start(). 2947 * 2948 * @returns a Message object, the application needs to check if message 2949 * is an error or a proper message RdKafka::Message::err() and checking for 2950 * \p ERR_NO_ERROR. 2951 * 2952 * The message object must be destroyed when the application is done with it. 2953 * 2954 * Errors (in RdKafka::Message::err()): 2955 * - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched. 2956 * - ERR__PARTITION_EOF - End of partition reached, not an error. 2957 */ 2958 virtual Message *consume (Topic *topic, int32_t partition, 2959 int timeout_ms) = 0; 2960 2961 /** 2962 * @brief Consume a single message from the specified queue. 2963 * 2964 * \p timeout_ms is maximum amount of time to wait for a message to be 2965 * received. 2966 * Consumer must have been previously started on the queue with 2967 * \p ..->start(). 2968 * 2969 * @returns a Message object, the application needs to check if message 2970 * is an error or a proper message \p Message->err() and checking for 2971 * \p ERR_NO_ERROR. 2972 * 2973 * The message object must be destroyed when the application is done with it. 2974 * 2975 * Errors (in RdKafka::Message::err()): 2976 * - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched 2977 * 2978 * Note that Message->topic() may be nullptr after certain kinds of 2979 * errors, so applications should check that it isn't null before 2980 * dereferencing it. 2981 */ 2982 virtual Message *consume (Queue *queue, int timeout_ms) = 0; 2983 2984 /** 2985 * @brief Consumes messages from \p topic and \p partition, calling 2986 * the provided callback for each consumed messsage. 2987 * 2988 * \p consume_callback() provides higher throughput performance 2989 * than \p consume(). 2990 * 2991 * \p timeout_ms is the maximum amount of time to wait for one or 2992 * more messages to arrive. 2993 * 2994 * The provided \p consume_cb instance has its \p consume_cb function 2995 * called for every message received. 2996 * 2997 * The \p opaque argument is passed to the \p consume_cb as \p opaque. 2998 * 2999 * @returns the number of messages processed or -1 on error. 3000 * 3001 * @sa RdKafka::Consumer::consume() 3002 */ 3003 virtual int consume_callback (Topic *topic, int32_t partition, 3004 int timeout_ms, 3005 ConsumeCb *consume_cb, 3006 void *opaque) = 0; 3007 3008 /** 3009 * @brief Consumes messages from \p queue, calling the provided callback for 3010 * each consumed messsage. 3011 * 3012 * @sa RdKafka::Consumer::consume_callback() 3013 */ 3014 virtual int consume_callback (Queue *queue, int timeout_ms, 3015 RdKafka::ConsumeCb *consume_cb, 3016 void *opaque) = 0; 3017 3018 /** 3019 * @brief Converts an offset into the logical offset from the tail of a topic. 3020 * 3021 * \p offset is the (positive) number of items from the end. 3022 * 3023 * @returns the logical offset for message \p offset from the tail, this value 3024 * may be passed to Consumer::start, et.al. 3025 * @remark The returned logical offset is specific to librdkafka. 3026 */ 3027 static int64_t OffsetTail(int64_t offset); 3028 }; 3029 3030 /**@}*/ 3031 3032 3033 /** 3034 * @name Producer 3035 * @{ 3036 * 3037 */ 3038 3039 3040 /** 3041 * @brief Producer 3042 */ 3043 class RD_EXPORT Producer : public virtual Handle { 3044 public: 3045 /** 3046 * @brief Creates a new Kafka producer handle. 3047 * 3048 * \p conf is an optional object that will be used instead of the default 3049 * configuration. 3050 * The \p conf object is reusable after this call. 3051 * 3052 * @returns the new handle on success or NULL on error in which case 3053 * \p errstr is set to a human readable error message. 3054 */ 3055 static Producer *create (const Conf *conf, std::string &errstr); 3056 3057 3058 virtual ~Producer () = 0; 3059 3060 /** 3061 * @brief RdKafka::Producer::produce() \p msgflags 3062 * 3063 * These flags are optional. 3064 */ 3065 enum { 3066 RK_MSG_FREE = 0x1, /**< rdkafka will free(3) \p payload 3067 * when it is done with it. 3068 * Mutually exclusive with RK_MSG_COPY. */ 3069 RK_MSG_COPY = 0x2, /**< the \p payload data will be copied 3070 * and the \p payload pointer will not 3071 * be used by rdkafka after the 3072 * call returns. 3073 * Mutually exclusive with RK_MSG_FREE. */ 3074 RK_MSG_BLOCK = 0x4 /**< Block produce*() on message queue 3075 * full. 3076 * WARNING: 3077 * If a delivery report callback 3078 * is used the application MUST 3079 * call rd_kafka_poll() (or equiv.) 3080 * to make sure delivered messages 3081 * are drained from the internal 3082 * delivery report queue. 3083 * Failure to do so will result 3084 * in indefinately blocking on 3085 * the produce() call when the 3086 * message queue is full. 3087 */ 3088 3089 3090 /**@cond NO_DOC*/ 3091 /* For backwards compatibility: */ 3092 #ifndef MSG_COPY /* defined in sys/msg.h */ 3093 , /** this comma must exist betwen 3094 * RK_MSG_BLOCK and MSG_FREE 3095 */ 3096 MSG_FREE = RK_MSG_FREE, 3097 MSG_COPY = RK_MSG_COPY 3098 #endif 3099 /**@endcond*/ 3100 }; 3101 3102 /** 3103 * @brief Produce and send a single message to broker. 3104 * 3105 * This is an asynch non-blocking API. 3106 * 3107 * \p partition is the target partition, either: 3108 * - RdKafka::Topic::PARTITION_UA (unassigned) for 3109 * automatic partitioning using the topic's partitioner function, or 3110 * - a fixed partition (0..N) 3111 * 3112 * \p msgflags is zero or more of the following flags OR:ed together: 3113 * RK_MSG_BLOCK - block \p produce*() call if 3114 * \p queue.buffering.max.messages or 3115 * \p queue.buffering.max.kbytes are exceeded. 3116 * Messages are considered in-queue from the point they 3117 * are accepted by produce() until their corresponding 3118 * delivery report callback/event returns. 3119 * It is thus a requirement to call 3120 * poll() (or equiv.) from a separate 3121 * thread when RK_MSG_BLOCK is used. 3122 * See WARNING on \c RK_MSG_BLOCK above. 3123 * RK_MSG_FREE - rdkafka will free(3) \p payload when it is done with it. 3124 * RK_MSG_COPY - the \p payload data will be copied and the \p payload 3125 * pointer will not be used by rdkafka after the 3126 * call returns. 3127 * 3128 * NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive. 3129 * 3130 * If the function returns an error code and RK_MSG_FREE was specified, then 3131 * the memory associated with the payload is still the caller's 3132 * responsibility. 3133 * 3134 * \p payload is the message payload of size \p len bytes. 3135 * 3136 * \p key is an optional message key, if non-NULL it 3137 * will be passed to the topic partitioner as well as be sent with the 3138 * message to the broker and passed on to the consumer. 3139 * 3140 * \p msg_opaque is an optional application-provided per-message opaque 3141 * pointer that will provided in the delivery report callback (\p dr_cb) for 3142 * referencing this message. 3143 * 3144 * @returns an ErrorCode to indicate success or failure: 3145 * - ERR_NO_ERROR - message successfully enqueued for transmission. 3146 * 3147 * - ERR__QUEUE_FULL - maximum number of outstanding messages has been 3148 * reached: \c queue.buffering.max.message 3149 * 3150 * - ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size: 3151 * \c messages.max.bytes 3152 * 3153 * - ERR__UNKNOWN_PARTITION - requested \p partition is unknown in the 3154 * Kafka cluster. 3155 * 3156 * - ERR__UNKNOWN_TOPIC - topic is unknown in the Kafka cluster. 3157 */ 3158 virtual ErrorCode produce (Topic *topic, int32_t partition, 3159 int msgflags, 3160 void *payload, size_t len, 3161 const std::string *key, 3162 void *msg_opaque) = 0; 3163 3164 /** 3165 * @brief Variant produce() that passes the key as a pointer and length 3166 * instead of as a const std::string *. 3167 */ 3168 virtual ErrorCode produce (Topic *topic, int32_t partition, 3169 int msgflags, 3170 void *payload, size_t len, 3171 const void *key, size_t key_len, 3172 void *msg_opaque) = 0; 3173 3174 /** 3175 * @brief produce() variant that takes topic as a string (no need for 3176 * creating a Topic object), and also allows providing the 3177 * message timestamp (milliseconds since beginning of epoch, UTC). 3178 * Otherwise identical to produce() above. 3179 */ 3180 virtual ErrorCode produce (const std::string topic_name, int32_t partition, 3181 int msgflags, 3182 void *payload, size_t len, 3183 const void *key, size_t key_len, 3184 int64_t timestamp, void *msg_opaque) = 0; 3185 3186 /** 3187 * @brief produce() variant that that allows for Header support on produce 3188 * Otherwise identical to produce() above. 3189 * 3190 * @warning The \p headers will be freed/deleted if the produce() call 3191 * succeeds, or left untouched if produce() fails. 3192 */ 3193 virtual ErrorCode produce (const std::string topic_name, int32_t partition, 3194 int msgflags, 3195 void *payload, size_t len, 3196 const void *key, size_t key_len, 3197 int64_t timestamp, 3198 RdKafka::Headers *headers, 3199 void *msg_opaque) = 0; 3200 3201 3202 /** 3203 * @brief Variant produce() that accepts vectors for key and payload. 3204 * The vector data will be copied. 3205 */ 3206 virtual ErrorCode produce (Topic *topic, int32_t partition, 3207 const std::vector<char> *payload, 3208 const std::vector<char> *key, 3209 void *msg_opaque) = 0; 3210 3211 3212 /** 3213 * @brief Wait until all outstanding produce requests, et.al, are completed. 3214 * This should typically be done prior to destroying a producer instance 3215 * to make sure all queued and in-flight produce requests are completed 3216 * before terminating. 3217 * 3218 * @remark The \c linger.ms time will be ignored for the duration of the call, 3219 * queued messages will be sent to the broker as soon as possible. 3220 * 3221 * @remark This function will call Producer::poll() and thus 3222 * trigger callbacks. 3223 * 3224 * @returns ERR__TIMED_OUT if \p timeout_ms was reached before all 3225 * outstanding requests were completed, else ERR_NO_ERROR 3226 */ 3227 virtual ErrorCode flush (int timeout_ms) = 0; 3228 3229 3230 /** 3231 * @brief Purge messages currently handled by the producer instance. 3232 * 3233 * @param purge_flags tells which messages should be purged and how. 3234 * 3235 * The application will need to call Handle::poll() or Producer::flush() 3236 * afterwards to serve the delivery report callbacks of the purged messages. 3237 * 3238 * Messages purged from internal queues fail with the delivery report 3239 * error code set to ERR__PURGE_QUEUE, while purged messages that 3240 * are in-flight to or from the broker will fail with the error code set to 3241 * ERR__PURGE_INFLIGHT. 3242 * 3243 * @warning Purging messages that are in-flight to or from the broker 3244 * will ignore any sub-sequent acknowledgement for these messages 3245 * received from the broker, effectively making it impossible 3246 * for the application to know if the messages were successfully 3247 * produced or not. This may result in duplicate messages if the 3248 * application retries these messages at a later time. 3249 * 3250 * @remark This call may block for a short time while background thread 3251 * queues are purged. 3252 * 3253 * @returns ERR_NO_ERROR on success, 3254 * ERR__INVALID_ARG if the \p purge flags are invalid or unknown, 3255 * ERR__NOT_IMPLEMENTED if called on a non-producer client instance. 3256 */ 3257 virtual ErrorCode purge (int purge_flags) = 0; 3258 3259 /** 3260 * @brief RdKafka::Handle::purge() \p purge_flags 3261 */ 3262 enum { 3263 PURGE_QUEUE = 0x1, /**< Purge messages in internal queues */ 3264 3265 PURGE_INFLIGHT = 0x2, /*! Purge messages in-flight to or from the broker. 3266 * Purging these messages will void any future 3267 * acknowledgements from the broker, making it 3268 * impossible for the application to know if these 3269 * messages were successfully delivered or not. 3270 * Retrying these messages may lead to duplicates. */ 3271 3272 PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue 3273 * purging to finish. */ 3274 }; 3275 3276 /** 3277 * @name Transactional API 3278 * @{ 3279 * 3280 * Requires Kafka broker version v0.11.0 or later 3281 * 3282 * See the Transactional API documentation in rdkafka.h for more information. 3283 */ 3284 3285 /** 3286 * @brief Initialize transactions for the producer instance. 3287 * 3288 * @param timeout_ms The maximum time to block. On timeout the operation 3289 * may continue in the background, depending on state, 3290 * and it is okay to call init_transactions() again. 3291 * 3292 * @returns an RdKafka::Error object on error, or NULL on success. 3293 * Check whether the returned error object permits retrying 3294 * by calling RdKafka::Error::is_retriable(), or whether a fatal 3295 * error has been raised by calling RdKafka::Error::is_fatal(). 3296 * 3297 * @remark The returned error object (if not NULL) must be deleted. 3298 * 3299 * See rd_kafka_init_transactions() in rdkafka.h for more information. 3300 * 3301 */ 3302 virtual Error *init_transactions (int timeout_ms) = 0; 3303 3304 3305 /** 3306 * @brief init_transactions() must have been called successfully 3307 * (once) before this function is called. 3308 * 3309 * @returns an RdKafka::Error object on error, or NULL on success. 3310 * Check whether a fatal error has been raised by calling 3311 * RdKafka::Error::is_fatal_error(). 3312 * 3313 * @remark The returned error object (if not NULL) must be deleted. 3314 * 3315 * See rd_kafka_begin_transaction() in rdkafka.h for more information. 3316 */ 3317 virtual Error *begin_transaction () = 0; 3318 3319 /** 3320 * @brief Sends a list of topic partition offsets to the consumer group 3321 * coordinator for \p group_metadata, and marks the offsets as part 3322 * part of the current transaction. 3323 * These offsets will be considered committed only if the transaction 3324 * is committed successfully. 3325 * 3326 * The offsets should be the next message your application will 3327 * consume, 3328 * i.e., the last processed message's offset + 1 for each partition. 3329 * Either track the offsets manually during processing or use 3330 * RdKafka::KafkaConsumer::position() (on the consumer) to get the 3331 * current offsets for 3332 * the partitions assigned to the consumer. 3333 * 3334 * Use this method at the end of a consume-transform-produce loop prior 3335 * to committing the transaction with commit_transaction(). 3336 * 3337 * @param offsets List of offsets to commit to the consumer group upon 3338 * successful commit of the transaction. Offsets should be 3339 * the next message to consume, 3340 * e.g., last processed message + 1. 3341 * @param group_metadata The current consumer group metadata as returned by 3342 * RdKafka::KafkaConsumer::groupMetadata() on the consumer 3343 * instance the provided offsets were consumed from. 3344 * @param timeout_ms Maximum time allowed to register the 3345 * offsets on the broker. 3346 * 3347 * @remark This function must be called on the transactional producer 3348 * instance, not the consumer. 3349 * 3350 * @remark The consumer must disable auto commits 3351 * (set \c enable.auto.commit to false on the consumer). 3352 * 3353 * @returns an RdKafka::Error object on error, or NULL on success. 3354 * Check whether the returned error object permits retrying 3355 * by calling RdKafka::Error::is_retriable(), or whether an abortable 3356 * or fatal error has been raised by calling 3357 * RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal() 3358 * respectively. 3359 * 3360 * @remark The returned error object (if not NULL) must be deleted. 3361 * 3362 * See rd_kafka_send_offsets_to_transaction() in rdkafka.h for 3363 * more information. 3364 */ 3365 virtual Error *send_offsets_to_transaction ( 3366 const std::vector<TopicPartition*> &offsets, 3367 const ConsumerGroupMetadata *group_metadata, 3368 int timeout_ms) = 0; 3369 3370 /** 3371 * @brief Commit the current transaction as started with begin_transaction(). 3372 * 3373 * Any outstanding messages will be flushed (delivered) before actually 3374 * committing the transaction. 3375 * 3376 * @param timeout_ms The maximum time to block. On timeout the operation 3377 * may continue in the background, depending on state, 3378 * and it is okay to call this function again. 3379 * Pass -1 to use the remaining transaction timeout, 3380 * this is the recommended use. 3381 * 3382 * @remark It is strongly recommended to always pass -1 (remaining transaction 3383 * time) as the \p timeout_ms. Using other values risk internal 3384 * state desynchronization in case any of the underlying protocol 3385 * requests fail. 3386 * 3387 * @returns an RdKafka::Error object on error, or NULL on success. 3388 * Check whether the returned error object permits retrying 3389 * by calling RdKafka::Error::is_retriable(), or whether an abortable 3390 * or fatal error has been raised by calling 3391 * RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal() 3392 * respectively. 3393 * 3394 * @remark The returned error object (if not NULL) must be deleted. 3395 * 3396 * See rd_kafka_commit_transaction() in rdkafka.h for more information. 3397 */ 3398 virtual Error *commit_transaction (int timeout_ms) = 0; 3399 3400 /** 3401 * @brief Aborts the ongoing transaction. 3402 * 3403 * This function should also be used to recover from non-fatal abortable 3404 * transaction errors. 3405 * 3406 * Any outstanding messages will be purged and fail with 3407 * RdKafka::ERR__PURGE_INFLIGHT or RdKafka::ERR__PURGE_QUEUE. 3408 * See RdKafka::Producer::purge() for details. 3409 * 3410 * @param timeout_ms The maximum time to block. On timeout the operation 3411 * may continue in the background, depending on state, 3412 * and it is okay to call this function again. 3413 * Pass -1 to use the remaining transaction timeout, 3414 * this is the recommended use. 3415 * 3416 * @remark It is strongly recommended to always pass -1 (remaining transaction 3417 * time) as the \p timeout_ms. Using other values risk internal 3418 * state desynchronization in case any of the underlying protocol 3419 * requests fail. 3420 * 3421 * @returns an RdKafka::Error object on error, or NULL on success. 3422 * Check whether the returned error object permits retrying 3423 * by calling RdKafka::Error::is_retriable(), or whether a 3424 * fatal error has been raised by calling RdKafka::Error::is_fatal(). 3425 * 3426 * @remark The returned error object (if not NULL) must be deleted. 3427 * 3428 * See rd_kafka_abort_transaction() in rdkafka.h for more information. 3429 */ 3430 virtual Error *abort_transaction (int timeout_ms) = 0; 3431 3432 /**@}*/ 3433 }; 3434 3435 /**@}*/ 3436 3437 3438 /** 3439 * @name Metadata interface 3440 * @{ 3441 * 3442 */ 3443 3444 3445 /** 3446 * @brief Metadata: Broker information 3447 */ 3448 class BrokerMetadata { 3449 public: 3450 /** @returns Broker id */ 3451 virtual int32_t id() const = 0; 3452 3453 /** @returns Broker hostname */ 3454 virtual const std::string host() const = 0; 3455 3456 /** @returns Broker listening port */ 3457 virtual int port() const = 0; 3458 3459 virtual ~BrokerMetadata() = 0; 3460 }; 3461 3462 3463 3464 /** 3465 * @brief Metadata: Partition information 3466 */ 3467 class PartitionMetadata { 3468 public: 3469 /** @brief Replicas */ 3470 typedef std::vector<int32_t> ReplicasVector; 3471 /** @brief ISRs (In-Sync-Replicas) */ 3472 typedef std::vector<int32_t> ISRSVector; 3473 3474 /** @brief Replicas iterator */ 3475 typedef ReplicasVector::const_iterator ReplicasIterator; 3476 /** @brief ISRs iterator */ 3477 typedef ISRSVector::const_iterator ISRSIterator; 3478 3479 3480 /** @returns Partition id */ 3481 virtual int32_t id() const = 0; 3482 3483 /** @returns Partition error reported by broker */ 3484 virtual ErrorCode err() const = 0; 3485 3486 /** @returns Leader broker (id) for partition */ 3487 virtual int32_t leader() const = 0; 3488 3489 /** @returns Replica brokers */ 3490 virtual const std::vector<int32_t> *replicas() const = 0; 3491 3492 /** @returns In-Sync-Replica brokers 3493 * @warning The broker may return a cached/outdated list of ISRs. 3494 */ 3495 virtual const std::vector<int32_t> *isrs() const = 0; 3496 3497 virtual ~PartitionMetadata() = 0; 3498 }; 3499 3500 3501 3502 /** 3503 * @brief Metadata: Topic information 3504 */ 3505 class TopicMetadata { 3506 public: 3507 /** @brief Partitions */ 3508 typedef std::vector<const PartitionMetadata*> PartitionMetadataVector; 3509 /** @brief Partitions iterator */ 3510 typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator; 3511 3512 /** @returns Topic name */ 3513 virtual const std::string topic() const = 0; 3514 3515 /** @returns Partition list */ 3516 virtual const PartitionMetadataVector *partitions() const = 0; 3517 3518 /** @returns Topic error reported by broker */ 3519 virtual ErrorCode err() const = 0; 3520 3521 virtual ~TopicMetadata() = 0; 3522 }; 3523 3524 3525 /** 3526 * @brief Metadata container 3527 */ 3528 class Metadata { 3529 public: 3530 /** @brief Brokers */ 3531 typedef std::vector<const BrokerMetadata*> BrokerMetadataVector; 3532 /** @brief Topics */ 3533 typedef std::vector<const TopicMetadata*> TopicMetadataVector; 3534 3535 /** @brief Brokers iterator */ 3536 typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator; 3537 /** @brief Topics iterator */ 3538 typedef TopicMetadataVector::const_iterator TopicMetadataIterator; 3539 3540 3541 /** 3542 * @brief Broker list 3543 * @remark Ownership of the returned pointer is retained by the instance of 3544 * Metadata that is called. 3545 */ 3546 virtual const BrokerMetadataVector *brokers() const = 0; 3547 3548 /** 3549 * @brief Topic list 3550 * @remark Ownership of the returned pointer is retained by the instance of 3551 * Metadata that is called. 3552 */ 3553 virtual const TopicMetadataVector *topics() const = 0; 3554 3555 /** @brief Broker (id) originating this metadata */ 3556 virtual int32_t orig_broker_id() const = 0; 3557 3558 /** @brief Broker (name) originating this metadata */ 3559 virtual const std::string orig_broker_name() const = 0; 3560 3561 virtual ~Metadata() = 0; 3562 }; 3563 3564 /**@}*/ 3565 3566 } 3567 3568 3569 #endif /* _RDKAFKACPP_H_ */ 3570