1 /* 2 * librdkafka - Apache Kafka C/C++ library 3 * 4 * Copyright (c) 2014 Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #ifndef _RDKAFKACPP_H_ 30 #define _RDKAFKACPP_H_ 31 32 /** 33 * @file rdkafkacpp.h 34 * @brief Apache Kafka C/C++ consumer and producer client library. 35 * 36 * rdkafkacpp.h contains the public C++ API for librdkafka. 37 * The API is documented in this file as comments prefixing the class, 38 * function, type, enum, define, etc. 39 * For more information, see the C interface in rdkafka.h and read the 40 * manual in INTRODUCTION.md. 41 * The C++ interface is STD C++ '03 compliant and adheres to the 42 * Google C++ Style Guide. 43 44 * @sa For the C interface see rdkafka.h 45 * 46 * @tableofcontents 47 */ 48 49 /**@cond NO_DOC*/ 50 #include <string> 51 #include <list> 52 #include <vector> 53 #include <cstdlib> 54 #include <cstring> 55 #include <stdint.h> 56 #include <sys/types.h> 57 58 #ifdef _WIN32 59 #ifndef ssize_t 60 #ifndef _BASETSD_H_ 61 #include <basetsd.h> 62 #endif 63 #ifndef _SSIZE_T_DEFINED 64 #define _SSIZE_T_DEFINED 65 typedef SSIZE_T ssize_t; 66 #endif 67 #endif 68 #undef RD_EXPORT 69 #ifdef LIBRDKAFKA_STATICLIB 70 #define RD_EXPORT 71 #else 72 #ifdef LIBRDKAFKACPP_EXPORTS 73 #define RD_EXPORT __declspec(dllexport) 74 #else 75 #define RD_EXPORT __declspec(dllimport) 76 #endif 77 #endif 78 #else 79 #define RD_EXPORT 80 #endif 81 82 /**@endcond*/ 83 84 extern "C" { 85 /* Forward declarations */ 86 struct rd_kafka_s; 87 struct rd_kafka_topic_s; 88 struct rd_kafka_message_s; 89 struct rd_kafka_conf_s; 90 struct rd_kafka_topic_conf_s; 91 } 92 93 namespace RdKafka { 94 95 /** 96 * @name Miscellaneous APIs 97 * @{ 98 */ 99 100 /** 101 * @brief librdkafka version 102 * 103 * Interpreted as hex \c MM.mm.rr.xx: 104 * - MM = Major 105 * - mm = minor 106 * - rr = revision 107 * - xx = pre-release id (0xff is the final release) 108 * 109 * E.g.: \c 0x000801ff = 0.8.1 110 * 111 * @remark This value should only be used during compile time, 112 * for runtime checks of version use RdKafka::version() 113 */ 114 #define RD_KAFKA_VERSION 0x010700ff 115 116 /** 117 * @brief Returns the librdkafka version as integer. 118 * 119 * @sa See RD_KAFKA_VERSION for how to parse the integer format. 120 */ 121 RD_EXPORT 122 int version (); 123 124 /** 125 * @brief Returns the librdkafka version as string. 126 */ 127 RD_EXPORT 128 std::string version_str(); 129 130 /** 131 * @brief Returns a CSV list of the supported debug contexts 132 * for use with Conf::Set("debug", ..). 133 */ 134 RD_EXPORT 135 std::string get_debug_contexts(); 136 137 /** 138 * @brief Wait for all rd_kafka_t objects to be destroyed. 139 * 140 * @returns 0 if all kafka objects are now destroyed, or -1 if the 141 * timeout was reached. 142 * Since RdKafka handle deletion is an asynch operation the 143 * \p wait_destroyed() function can be used for applications where 144 * a clean shutdown is required. 145 */ 146 RD_EXPORT 147 int wait_destroyed(int timeout_ms); 148 149 /** 150 * @brief Allocate memory using the same allocator librdkafka uses. 151 * 152 * This is typically an abstraction for the malloc(3) call and makes sure 153 * the application can use the same memory allocator as librdkafka for 154 * allocating pointers that are used by librdkafka. 155 * 156 * @remark Memory allocated by mem_malloc() must be freed using 157 * mem_free(). 158 */ 159 RD_EXPORT 160 void *mem_malloc (size_t size); 161 162 /** 163 * @brief Free pointer returned by librdkafka 164 * 165 * This is typically an abstraction for the free(3) call and makes sure 166 * the application can use the same memory allocator as librdkafka for 167 * freeing pointers returned by librdkafka. 168 * 169 * In standard setups it is usually not necessary to use this interface 170 * rather than the free(3) function. 171 * 172 * @remark mem_free() must only be used for pointers returned by APIs 173 * that explicitly mention using this function for freeing. 174 */ 175 RD_EXPORT 176 void mem_free (void *ptr); 177 178 /**@}*/ 179 180 181 182 /** 183 * @name Constants, errors, types 184 * @{ 185 * 186 * 187 */ 188 189 /** 190 * @brief Error codes. 191 * 192 * The negative error codes delimited by two underscores 193 * (\c _ERR__..) denotes errors internal to librdkafka and are 194 * displayed as \c \"Local: \<error string..\>\", while the error codes 195 * delimited by a single underscore (\c ERR_..) denote broker 196 * errors and are displayed as \c \"Broker: \<error string..\>\". 197 * 198 * @sa Use RdKafka::err2str() to translate an error code a human readable string 199 */ 200 enum ErrorCode { 201 /* Internal errors to rdkafka: */ 202 /** Begin internal error codes */ 203 ERR__BEGIN = -200, 204 /** Received message is incorrect */ 205 ERR__BAD_MSG = -199, 206 /** Bad/unknown compression */ 207 ERR__BAD_COMPRESSION = -198, 208 /** Broker is going away */ 209 ERR__DESTROY = -197, 210 /** Generic failure */ 211 ERR__FAIL = -196, 212 /** Broker transport failure */ 213 ERR__TRANSPORT = -195, 214 /** Critical system resource */ 215 ERR__CRIT_SYS_RESOURCE = -194, 216 /** Failed to resolve broker */ 217 ERR__RESOLVE = -193, 218 /** Produced message timed out*/ 219 ERR__MSG_TIMED_OUT = -192, 220 /** Reached the end of the topic+partition queue on 221 * the broker. Not really an error. 222 * This event is disabled by default, 223 * see the `enable.partition.eof` configuration property. */ 224 ERR__PARTITION_EOF = -191, 225 /** Permanent: Partition does not exist in cluster. */ 226 ERR__UNKNOWN_PARTITION = -190, 227 /** File or filesystem error */ 228 ERR__FS = -189, 229 /** Permanent: Topic does not exist in cluster. */ 230 ERR__UNKNOWN_TOPIC = -188, 231 /** All broker connections are down. */ 232 ERR__ALL_BROKERS_DOWN = -187, 233 /** Invalid argument, or invalid configuration */ 234 ERR__INVALID_ARG = -186, 235 /** Operation timed out */ 236 ERR__TIMED_OUT = -185, 237 /** Queue is full */ 238 ERR__QUEUE_FULL = -184, 239 /** ISR count < required.acks */ 240 ERR__ISR_INSUFF = -183, 241 /** Broker node update */ 242 ERR__NODE_UPDATE = -182, 243 /** SSL error */ 244 ERR__SSL = -181, 245 /** Waiting for coordinator to become available. */ 246 ERR__WAIT_COORD = -180, 247 /** Unknown client group */ 248 ERR__UNKNOWN_GROUP = -179, 249 /** Operation in progress */ 250 ERR__IN_PROGRESS = -178, 251 /** Previous operation in progress, wait for it to finish. */ 252 ERR__PREV_IN_PROGRESS = -177, 253 /** This operation would interfere with an existing subscription */ 254 ERR__EXISTING_SUBSCRIPTION = -176, 255 /** Assigned partitions (rebalance_cb) */ 256 ERR__ASSIGN_PARTITIONS = -175, 257 /** Revoked partitions (rebalance_cb) */ 258 ERR__REVOKE_PARTITIONS = -174, 259 /** Conflicting use */ 260 ERR__CONFLICT = -173, 261 /** Wrong state */ 262 ERR__STATE = -172, 263 /** Unknown protocol */ 264 ERR__UNKNOWN_PROTOCOL = -171, 265 /** Not implemented */ 266 ERR__NOT_IMPLEMENTED = -170, 267 /** Authentication failure*/ 268 ERR__AUTHENTICATION = -169, 269 /** No stored offset */ 270 ERR__NO_OFFSET = -168, 271 /** Outdated */ 272 ERR__OUTDATED = -167, 273 /** Timed out in queue */ 274 ERR__TIMED_OUT_QUEUE = -166, 275 /** Feature not supported by broker */ 276 ERR__UNSUPPORTED_FEATURE = -165, 277 /** Awaiting cache update */ 278 ERR__WAIT_CACHE = -164, 279 /** Operation interrupted */ 280 ERR__INTR = -163, 281 /** Key serialization error */ 282 ERR__KEY_SERIALIZATION = -162, 283 /** Value serialization error */ 284 ERR__VALUE_SERIALIZATION = -161, 285 /** Key deserialization error */ 286 ERR__KEY_DESERIALIZATION = -160, 287 /** Value deserialization error */ 288 ERR__VALUE_DESERIALIZATION = -159, 289 /** Partial response */ 290 ERR__PARTIAL = -158, 291 /** Modification attempted on read-only object */ 292 ERR__READ_ONLY = -157, 293 /** No such entry / item not found */ 294 ERR__NOENT = -156, 295 /** Read underflow */ 296 ERR__UNDERFLOW = -155, 297 /** Invalid type */ 298 ERR__INVALID_TYPE = -154, 299 /** Retry operation */ 300 ERR__RETRY = -153, 301 /** Purged in queue */ 302 ERR__PURGE_QUEUE = -152, 303 /** Purged in flight */ 304 ERR__PURGE_INFLIGHT = -151, 305 /** Fatal error: see RdKafka::Handle::fatal_error() */ 306 ERR__FATAL = -150, 307 /** Inconsistent state */ 308 ERR__INCONSISTENT = -149, 309 /** Gap-less ordering would not be guaranteed if proceeding */ 310 ERR__GAPLESS_GUARANTEE = -148, 311 /** Maximum poll interval exceeded */ 312 ERR__MAX_POLL_EXCEEDED = -147, 313 /** Unknown broker */ 314 ERR__UNKNOWN_BROKER = -146, 315 /** Functionality not configured */ 316 ERR__NOT_CONFIGURED = -145, 317 /** Instance has been fenced */ 318 ERR__FENCED = -144, 319 /** Application generated error */ 320 ERR__APPLICATION = -143, 321 /** Assignment lost */ 322 ERR__ASSIGNMENT_LOST = -142, 323 /** No operation performed */ 324 ERR__NOOP = -141, 325 /** No offset to automatically reset to */ 326 ERR__AUTO_OFFSET_RESET = -140, 327 328 /** End internal error codes */ 329 ERR__END = -100, 330 331 /* Kafka broker errors: */ 332 /** Unknown broker error */ 333 ERR_UNKNOWN = -1, 334 /** Success */ 335 ERR_NO_ERROR = 0, 336 /** Offset out of range */ 337 ERR_OFFSET_OUT_OF_RANGE = 1, 338 /** Invalid message */ 339 ERR_INVALID_MSG = 2, 340 /** Unknown topic or partition */ 341 ERR_UNKNOWN_TOPIC_OR_PART = 3, 342 /** Invalid message size */ 343 ERR_INVALID_MSG_SIZE = 4, 344 /** Leader not available */ 345 ERR_LEADER_NOT_AVAILABLE = 5, 346 /** Not leader for partition */ 347 ERR_NOT_LEADER_FOR_PARTITION = 6, 348 /** Request timed out */ 349 ERR_REQUEST_TIMED_OUT = 7, 350 /** Broker not available */ 351 ERR_BROKER_NOT_AVAILABLE = 8, 352 /** Replica not available */ 353 ERR_REPLICA_NOT_AVAILABLE = 9, 354 /** Message size too large */ 355 ERR_MSG_SIZE_TOO_LARGE = 10, 356 /** StaleControllerEpochCode */ 357 ERR_STALE_CTRL_EPOCH = 11, 358 /** Offset metadata string too large */ 359 ERR_OFFSET_METADATA_TOO_LARGE = 12, 360 /** Broker disconnected before response received */ 361 ERR_NETWORK_EXCEPTION = 13, 362 /** Coordinator load in progress */ 363 ERR_COORDINATOR_LOAD_IN_PROGRESS = 14, 364 /** Group coordinator load in progress */ 365 #define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS 366 /** Coordinator not available */ 367 ERR_COORDINATOR_NOT_AVAILABLE = 15, 368 /** Group coordinator not available */ 369 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE 370 /** Not coordinator */ 371 ERR_NOT_COORDINATOR = 16, 372 /** Not coordinator for group */ 373 #define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR 374 /** Invalid topic */ 375 ERR_TOPIC_EXCEPTION = 17, 376 /** Message batch larger than configured server segment size */ 377 ERR_RECORD_LIST_TOO_LARGE = 18, 378 /** Not enough in-sync replicas */ 379 ERR_NOT_ENOUGH_REPLICAS = 19, 380 /** Message(s) written to insufficient number of in-sync replicas */ 381 ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20, 382 /** Invalid required acks value */ 383 ERR_INVALID_REQUIRED_ACKS = 21, 384 /** Specified group generation id is not valid */ 385 ERR_ILLEGAL_GENERATION = 22, 386 /** Inconsistent group protocol */ 387 ERR_INCONSISTENT_GROUP_PROTOCOL = 23, 388 /** Invalid group.id */ 389 ERR_INVALID_GROUP_ID = 24, 390 /** Unknown member */ 391 ERR_UNKNOWN_MEMBER_ID = 25, 392 /** Invalid session timeout */ 393 ERR_INVALID_SESSION_TIMEOUT = 26, 394 /** Group rebalance in progress */ 395 ERR_REBALANCE_IN_PROGRESS = 27, 396 /** Commit offset data size is not valid */ 397 ERR_INVALID_COMMIT_OFFSET_SIZE = 28, 398 /** Topic authorization failed */ 399 ERR_TOPIC_AUTHORIZATION_FAILED = 29, 400 /** Group authorization failed */ 401 ERR_GROUP_AUTHORIZATION_FAILED = 30, 402 /** Cluster authorization failed */ 403 ERR_CLUSTER_AUTHORIZATION_FAILED = 31, 404 /** Invalid timestamp */ 405 ERR_INVALID_TIMESTAMP = 32, 406 /** Unsupported SASL mechanism */ 407 ERR_UNSUPPORTED_SASL_MECHANISM = 33, 408 /** Illegal SASL state */ 409 ERR_ILLEGAL_SASL_STATE = 34, 410 /** Unuspported version */ 411 ERR_UNSUPPORTED_VERSION = 35, 412 /** Topic already exists */ 413 ERR_TOPIC_ALREADY_EXISTS = 36, 414 /** Invalid number of partitions */ 415 ERR_INVALID_PARTITIONS = 37, 416 /** Invalid replication factor */ 417 ERR_INVALID_REPLICATION_FACTOR = 38, 418 /** Invalid replica assignment */ 419 ERR_INVALID_REPLICA_ASSIGNMENT = 39, 420 /** Invalid config */ 421 ERR_INVALID_CONFIG = 40, 422 /** Not controller for cluster */ 423 ERR_NOT_CONTROLLER = 41, 424 /** Invalid request */ 425 ERR_INVALID_REQUEST = 42, 426 /** Message format on broker does not support request */ 427 ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43, 428 /** Policy violation */ 429 ERR_POLICY_VIOLATION = 44, 430 /** Broker received an out of order sequence number */ 431 ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45, 432 /** Broker received a duplicate sequence number */ 433 ERR_DUPLICATE_SEQUENCE_NUMBER = 46, 434 /** Producer attempted an operation with an old epoch */ 435 ERR_INVALID_PRODUCER_EPOCH = 47, 436 /** Producer attempted a transactional operation in an invalid state */ 437 ERR_INVALID_TXN_STATE = 48, 438 /** Producer attempted to use a producer id which is not 439 * currently assigned to its transactional id */ 440 ERR_INVALID_PRODUCER_ID_MAPPING = 49, 441 /** Transaction timeout is larger than the maximum 442 * value allowed by the broker's max.transaction.timeout.ms */ 443 ERR_INVALID_TRANSACTION_TIMEOUT = 50, 444 /** Producer attempted to update a transaction while another 445 * concurrent operation on the same transaction was ongoing */ 446 ERR_CONCURRENT_TRANSACTIONS = 51, 447 /** Indicates that the transaction coordinator sending a 448 * WriteTxnMarker is no longer the current coordinator for a 449 * given producer */ 450 ERR_TRANSACTION_COORDINATOR_FENCED = 52, 451 /** Transactional Id authorization failed */ 452 ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53, 453 /** Security features are disabled */ 454 ERR_SECURITY_DISABLED = 54, 455 /** Operation not attempted */ 456 ERR_OPERATION_NOT_ATTEMPTED = 55, 457 /** Disk error when trying to access log file on the disk */ 458 ERR_KAFKA_STORAGE_ERROR = 56, 459 /** The user-specified log directory is not found in the broker config */ 460 ERR_LOG_DIR_NOT_FOUND = 57, 461 /** SASL Authentication failed */ 462 ERR_SASL_AUTHENTICATION_FAILED = 58, 463 /** Unknown Producer Id */ 464 ERR_UNKNOWN_PRODUCER_ID = 59, 465 /** Partition reassignment is in progress */ 466 ERR_REASSIGNMENT_IN_PROGRESS = 60, 467 /** Delegation Token feature is not enabled */ 468 ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61, 469 /** Delegation Token is not found on server */ 470 ERR_DELEGATION_TOKEN_NOT_FOUND = 62, 471 /** Specified Principal is not valid Owner/Renewer */ 472 ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63, 473 /** Delegation Token requests are not allowed on this connection */ 474 ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64, 475 /** Delegation Token authorization failed */ 476 ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65, 477 /** Delegation Token is expired */ 478 ERR_DELEGATION_TOKEN_EXPIRED = 66, 479 /** Supplied principalType is not supported */ 480 ERR_INVALID_PRINCIPAL_TYPE = 67, 481 /** The group is not empty */ 482 ERR_NON_EMPTY_GROUP = 68, 483 /** The group id does not exist */ 484 ERR_GROUP_ID_NOT_FOUND = 69, 485 /** The fetch session ID was not found */ 486 ERR_FETCH_SESSION_ID_NOT_FOUND = 70, 487 /** The fetch session epoch is invalid */ 488 ERR_INVALID_FETCH_SESSION_EPOCH = 71, 489 /** No matching listener */ 490 ERR_LISTENER_NOT_FOUND = 72, 491 /** Topic deletion is disabled */ 492 ERR_TOPIC_DELETION_DISABLED = 73, 493 /** Leader epoch is older than broker epoch */ 494 ERR_FENCED_LEADER_EPOCH = 74, 495 /** Leader epoch is newer than broker epoch */ 496 ERR_UNKNOWN_LEADER_EPOCH = 75, 497 /** Unsupported compression type */ 498 ERR_UNSUPPORTED_COMPRESSION_TYPE = 76, 499 /** Broker epoch has changed */ 500 ERR_STALE_BROKER_EPOCH = 77, 501 /** Leader high watermark is not caught up */ 502 ERR_OFFSET_NOT_AVAILABLE = 78, 503 /** Group member needs a valid member ID */ 504 ERR_MEMBER_ID_REQUIRED = 79, 505 /** Preferred leader was not available */ 506 ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80, 507 /** Consumer group has reached maximum size */ 508 ERR_GROUP_MAX_SIZE_REACHED = 81, 509 /** Static consumer fenced by other consumer with same 510 * group.instance.id. */ 511 ERR_FENCED_INSTANCE_ID = 82, 512 /** Eligible partition leaders are not available */ 513 ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE = 83, 514 /** Leader election not needed for topic partition */ 515 ERR_ELECTION_NOT_NEEDED = 84, 516 /** No partition reassignment is in progress */ 517 ERR_NO_REASSIGNMENT_IN_PROGRESS = 85, 518 /** Deleting offsets of a topic while the consumer group is 519 * subscribed to it */ 520 ERR_GROUP_SUBSCRIBED_TO_TOPIC = 86, 521 /** Broker failed to validate record */ 522 ERR_INVALID_RECORD = 87, 523 /** There are unstable offsets that need to be cleared */ 524 ERR_UNSTABLE_OFFSET_COMMIT = 88, 525 /** Throttling quota has been exceeded */ 526 ERR_THROTTLING_QUOTA_EXCEEDED = 89, 527 /** There is a newer producer with the same transactionalId 528 * which fences the current one */ 529 ERR_PRODUCER_FENCED = 90, 530 /** Request illegally referred to resource that does not exist */ 531 ERR_RESOURCE_NOT_FOUND = 91, 532 /** Request illegally referred to the same resource twice */ 533 ERR_DUPLICATE_RESOURCE = 92, 534 /** Requested credential would not meet criteria for acceptability */ 535 ERR_UNACCEPTABLE_CREDENTIAL = 93, 536 /** Indicates that the either the sender or recipient of a 537 * voter-only request is not one of the expected voters */ 538 ERR_INCONSISTENT_VOTER_SET = 94, 539 /** Invalid update version */ 540 ERR_INVALID_UPDATE_VERSION = 95, 541 /** Unable to update finalized features due to server error */ 542 ERR_FEATURE_UPDATE_FAILED = 96, 543 /** Request principal deserialization failed during forwarding */ 544 ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97 545 }; 546 547 548 /** 549 * @brief Returns a human readable representation of a kafka error. 550 */ 551 RD_EXPORT 552 std::string err2str(RdKafka::ErrorCode err); 553 554 555 556 /** 557 * @enum CertificateType 558 * @brief SSL certificate types 559 */ 560 enum CertificateType { 561 CERT_PUBLIC_KEY, /**< Client's public key */ 562 CERT_PRIVATE_KEY, /**< Client's private key */ 563 CERT_CA, /**< CA certificate */ 564 CERT__CNT 565 }; 566 567 /** 568 * @enum CertificateEncoding 569 * @brief SSL certificate encoding 570 */ 571 enum CertificateEncoding { 572 CERT_ENC_PKCS12, /**< PKCS#12 */ 573 CERT_ENC_DER, /**< DER / binary X.509 ASN1 */ 574 CERT_ENC_PEM, /**< PEM */ 575 CERT_ENC__CNT 576 }; 577 578 /**@} */ 579 580 581 582 /**@cond NO_DOC*/ 583 /* Forward declarations */ 584 class Handle; 585 class Producer; 586 class Message; 587 class Headers; 588 class Queue; 589 class Event; 590 class Topic; 591 class TopicPartition; 592 class Metadata; 593 class KafkaConsumer; 594 /**@endcond*/ 595 596 597 /** 598 * @name Error class 599 * @{ 600 * 601 */ 602 603 /** 604 * @brief The Error class is used as a return value from APIs to propagate 605 * an error. The error consists of an error code which is to be used 606 * programatically, an error string for showing to the user, 607 * and various error flags that can be used programmatically to decide 608 * how to handle the error; e.g., should the operation be retried, 609 * was it a fatal error, etc. 610 * 611 * Error objects must be deleted explicitly to free its resources. 612 */ 613 class RD_EXPORT Error { 614 public: 615 616 /** 617 * @brief Create error object. 618 */ 619 static Error *create (ErrorCode code, const std::string *errstr); 620 621 virtual ~Error () { } 622 623 /* 624 * Error accessor methods 625 */ 626 627 /** 628 * @returns the error code, e.g., RdKafka::ERR_UNKNOWN_MEMBER_ID. 629 */ 630 virtual ErrorCode code () const = 0; 631 632 /** 633 * @returns the error code name, e.g, "ERR_UNKNOWN_MEMBER_ID". 634 */ 635 virtual std::string name () const = 0; 636 637 /** 638 * @returns a human readable error string. 639 */ 640 virtual std::string str () const = 0; 641 642 /** 643 * @returns true if the error is a fatal error, indicating that the client 644 * instance is no longer usable, else false. 645 */ 646 virtual bool is_fatal () const = 0; 647 648 /** 649 * @returns true if the operation may be retried, else false. 650 */ 651 virtual bool is_retriable () const = 0; 652 653 /** 654 * @returns true if the error is an abortable transaction error in which case 655 * the application must call RdKafka::Producer::abort_transaction() 656 * and start a new transaction with 657 * RdKafka::Producer::begin_transaction() if it wishes to proceed 658 * with transactions. 659 * Else returns false. 660 * 661 * @remark The return value of this method is only valid for errors returned 662 * by the transactional API. 663 */ 664 virtual bool txn_requires_abort () const = 0; 665 }; 666 667 /**@}*/ 668 669 670 /** 671 * @name Callback classes 672 * @{ 673 * 674 * 675 * librdkafka uses (optional) callbacks to propagate information and 676 * delegate decisions to the application logic. 677 * 678 * An application must call RdKafka::poll() at regular intervals to 679 * serve queued callbacks. 680 */ 681 682 683 /** 684 * @brief Delivery Report callback class 685 * 686 * The delivery report callback will be called once for each message 687 * accepted by RdKafka::Producer::produce() (et.al) with 688 * RdKafka::Message::err() set to indicate the result of the produce request. 689 * 690 * The callback is called when a message is succesfully produced or 691 * if librdkafka encountered a permanent failure, or the retry counter for 692 * temporary errors has been exhausted. 693 * 694 * An application must call RdKafka::poll() at regular intervals to 695 * serve queued delivery report callbacks. 696 697 */ 698 class RD_EXPORT DeliveryReportCb { 699 public: 700 /** 701 * @brief Delivery report callback. 702 */ 703 virtual void dr_cb (Message &message) = 0; 704 705 virtual ~DeliveryReportCb() { } 706 }; 707 708 709 /** 710 * @brief SASL/OAUTHBEARER token refresh callback class 711 * 712 * The SASL/OAUTHBEARER token refresh callback is triggered via RdKafka::poll() 713 * whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved, 714 * typically based on the configuration defined in \c sasl.oauthbearer.config. 715 * 716 * The \c oauthbearer_config argument is the value of the 717 * \c sasl.oauthbearer.config configuration property. 718 * 719 * The callback should invoke RdKafka::Handle::oauthbearer_set_token() or 720 * RdKafka::Handle::oauthbearer_set_token_failure() to indicate success or 721 * failure, respectively. 722 * 723 * The refresh operation is eventable and may be received when an event 724 * callback handler is set with an event type of 725 * \c RdKafka::Event::EVENT_OAUTHBEARER_TOKEN_REFRESH. 726 * 727 * Note that before any SASL/OAUTHBEARER broker connection can succeed the 728 * application must call RdKafka::Handle::oauthbearer_set_token() once -- either 729 * directly or, more typically, by invoking RdKafka::poll() -- in order to 730 * cause retrieval of an initial token to occur. 731 * 732 * An application must call RdKafka::poll() at regular intervals to 733 * serve queued SASL/OAUTHBEARER token refresh callbacks (when 734 * OAUTHBEARER is the SASL mechanism). 735 */ 736 class RD_EXPORT OAuthBearerTokenRefreshCb { 737 public: 738 /** 739 * @brief SASL/OAUTHBEARER token refresh callback class. 740 * 741 * @param handle The RdKafka::Handle which requires a refreshed token. 742 * @param oauthbearer_config The value of the 743 * \p sasl.oauthbearer.config configuration property for \p handle. 744 */ 745 virtual void oauthbearer_token_refresh_cb (RdKafka::Handle* handle, 746 const std::string &oauthbearer_config) = 0; 747 748 virtual ~OAuthBearerTokenRefreshCb() { } 749 }; 750 751 752 /** 753 * @brief Partitioner callback class 754 * 755 * Generic partitioner callback class for implementing custom partitioners. 756 * 757 * @sa RdKafka::Conf::set() \c "partitioner_cb" 758 */ 759 class RD_EXPORT PartitionerCb { 760 public: 761 /** 762 * @brief Partitioner callback 763 * 764 * Return the partition to use for \p key in \p topic. 765 * 766 * The \p msg_opaque is the same \p msg_opaque provided in the 767 * RdKafka::Producer::produce() call. 768 * 769 * @remark \p key may be NULL or the empty. 770 * 771 * @returns Must return a value between 0 and \p partition_cnt (non-inclusive). 772 * May return RD_KAFKA_PARTITION_UA (-1) if partitioning failed. 773 * 774 * @sa The callback may use RdKafka::Topic::partition_available() to check 775 * if a partition has an active leader broker. 776 */ 777 virtual int32_t partitioner_cb (const Topic *topic, 778 const std::string *key, 779 int32_t partition_cnt, 780 void *msg_opaque) = 0; 781 782 virtual ~PartitionerCb() { } 783 }; 784 785 /** 786 * @brief Variant partitioner with key pointer 787 * 788 */ 789 class PartitionerKeyPointerCb { 790 public: 791 /** 792 * @brief Variant partitioner callback that gets \p key as pointer and length 793 * instead of as a const std::string *. 794 * 795 * @remark \p key may be NULL or have \p key_len 0. 796 * 797 * @sa See RdKafka::PartitionerCb::partitioner_cb() for exact semantics 798 */ 799 virtual int32_t partitioner_cb (const Topic *topic, 800 const void *key, 801 size_t key_len, 802 int32_t partition_cnt, 803 void *msg_opaque) = 0; 804 805 virtual ~PartitionerKeyPointerCb() { } 806 }; 807 808 809 810 /** 811 * @brief Event callback class 812 * 813 * Events are a generic interface for propagating errors, statistics, logs, etc 814 * from librdkafka to the application. 815 * 816 * @sa RdKafka::Event 817 */ 818 class RD_EXPORT EventCb { 819 public: 820 /** 821 * @brief Event callback 822 * 823 * @sa RdKafka::Event 824 */ 825 virtual void event_cb (Event &event) = 0; 826 827 virtual ~EventCb() { } 828 }; 829 830 831 /** 832 * @brief Event object class as passed to the EventCb callback. 833 */ 834 class RD_EXPORT Event { 835 public: 836 /** @brief Event type */ 837 enum Type { 838 EVENT_ERROR, /**< Event is an error condition */ 839 EVENT_STATS, /**< Event is a statistics JSON document */ 840 EVENT_LOG, /**< Event is a log message */ 841 EVENT_THROTTLE /**< Event is a throttle level signaling from the broker */ 842 }; 843 844 /** @brief EVENT_LOG severities (conforms to syslog(3) severities) */ 845 enum Severity { 846 EVENT_SEVERITY_EMERG = 0, 847 EVENT_SEVERITY_ALERT = 1, 848 EVENT_SEVERITY_CRITICAL = 2, 849 EVENT_SEVERITY_ERROR = 3, 850 EVENT_SEVERITY_WARNING = 4, 851 EVENT_SEVERITY_NOTICE = 5, 852 EVENT_SEVERITY_INFO = 6, 853 EVENT_SEVERITY_DEBUG = 7 854 }; 855 856 virtual ~Event () { } 857 858 /* 859 * Event Accessor methods 860 */ 861 862 /** 863 * @returns The event type 864 * @remark Applies to all event types 865 */ 866 virtual Type type () const = 0; 867 868 /** 869 * @returns Event error, if any. 870 * @remark Applies to all event types except THROTTLE 871 */ 872 virtual ErrorCode err () const = 0; 873 874 /** 875 * @returns Log severity level. 876 * @remark Applies to LOG event type. 877 */ 878 virtual Severity severity () const = 0; 879 880 /** 881 * @returns Log facility string. 882 * @remark Applies to LOG event type. 883 */ 884 virtual std::string fac () const = 0; 885 886 /** 887 * @returns Log message string. 888 * 889 * \c EVENT_LOG: Log message string. 890 * \c EVENT_STATS: JSON object (as string). 891 * 892 * @remark Applies to LOG event type. 893 */ 894 virtual std::string str () const = 0; 895 896 /** 897 * @returns Throttle time in milliseconds. 898 * @remark Applies to THROTTLE event type. 899 */ 900 virtual int throttle_time () const = 0; 901 902 /** 903 * @returns Throttling broker's name. 904 * @remark Applies to THROTTLE event type. 905 */ 906 virtual std::string broker_name () const = 0; 907 908 /** 909 * @returns Throttling broker's id. 910 * @remark Applies to THROTTLE event type. 911 */ 912 virtual int broker_id () const = 0; 913 914 915 /** 916 * @returns true if this is a fatal error. 917 * @remark Applies to ERROR event type. 918 * @sa RdKafka::Handle::fatal_error() 919 */ 920 virtual bool fatal () const = 0; 921 }; 922 923 924 925 /** 926 * @brief Consume callback class 927 */ 928 class RD_EXPORT ConsumeCb { 929 public: 930 /** 931 * @brief The consume callback is used with 932 * RdKafka::Consumer::consume_callback() 933 * methods and will be called for each consumed \p message. 934 * 935 * The callback interface is optional but provides increased performance. 936 */ 937 virtual void consume_cb (Message &message, void *opaque) = 0; 938 939 virtual ~ConsumeCb() { } 940 }; 941 942 943 /** 944 * @brief \b KafkaConsumer: Rebalance callback class 945 */ 946 class RD_EXPORT RebalanceCb { 947 public: 948 /** 949 * @brief Group rebalance callback for use with RdKafka::KafkaConsumer 950 * 951 * Registering a \p rebalance_cb turns off librdkafka's automatic 952 * partition assignment/revocation and instead delegates that responsibility 953 * to the application's \p rebalance_cb. 954 * 955 * The rebalance callback is responsible for updating librdkafka's 956 * assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS 957 * and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle 958 * arbitrary rebalancing failures where \p err is neither of those. 959 * @remark In this latter case (arbitrary error), the application must 960 * call unassign() to synchronize state. 961 * 962 * For eager/non-cooperative `partition.assignment.strategy` assignors, 963 * such as `range` and `roundrobin`, the application must use 964 * assign assign() to set and unassign() to clear the entire assignment. 965 * For the cooperative assignors, such as `cooperative-sticky`, the 966 * application must use incremental_assign() for ERR__ASSIGN_PARTITIONS and 967 * incremental_unassign() for ERR__REVOKE_PARTITIONS. 968 * 969 * Without a rebalance callback this is done automatically by librdkafka 970 * but registering a rebalance callback gives the application flexibility 971 * in performing other operations along with the assinging/revocation, 972 * such as fetching offsets from an alternate location (on assign) 973 * or manually committing offsets (on revoke). 974 * 975 * @sa RdKafka::KafkaConsumer::assign() 976 * @sa RdKafka::KafkaConsumer::incremental_assign() 977 * @sa RdKafka::KafkaConsumer::incremental_unassign() 978 * @sa RdKafka::KafkaConsumer::assignment_lost() 979 * @sa RdKafka::KafkaConsumer::rebalance_protocol() 980 * 981 * The following example show's the application's responsibilities: 982 * @code 983 * class MyRebalanceCb : public RdKafka::RebalanceCb { 984 * public: 985 * void rebalance_cb (RdKafka::KafkaConsumer *consumer, 986 * RdKafka::ErrorCode err, 987 * std::vector<RdKafka::TopicPartition*> &partitions) { 988 * if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { 989 * // application may load offets from arbitrary external 990 * // storage here and update \p partitions 991 * if (consumer->rebalance_protocol() == "COOPERATIVE") 992 * consumer->incremental_assign(partitions); 993 * else 994 * consumer->assign(partitions); 995 * 996 * } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) { 997 * // Application may commit offsets manually here 998 * // if auto.commit.enable=false 999 * if (consumer->rebalance_protocol() == "COOPERATIVE") 1000 * consumer->incremental_unassign(partitions); 1001 * else 1002 * consumer->unassign(); 1003 * 1004 * } else { 1005 * std::cerr << "Rebalancing error: " << 1006 * RdKafka::err2str(err) << std::endl; 1007 * consumer->unassign(); 1008 * } 1009 * } 1010 * } 1011 * @endcode 1012 * 1013 * @remark The above example lacks error handling for assign calls, see 1014 * the examples/ directory. 1015 */ 1016 virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer, 1017 RdKafka::ErrorCode err, 1018 std::vector<TopicPartition*>&partitions) = 0; 1019 1020 virtual ~RebalanceCb() { } 1021 }; 1022 1023 1024 /** 1025 * @brief Offset Commit callback class 1026 */ 1027 class RD_EXPORT OffsetCommitCb { 1028 public: 1029 /** 1030 * @brief Set offset commit callback for use with consumer groups 1031 * 1032 * The results of automatic or manual offset commits will be scheduled 1033 * for this callback and is served by RdKafka::KafkaConsumer::consume(). 1034 * 1035 * If no partitions had valid offsets to commit this callback will be called 1036 * with \p err == ERR__NO_OFFSET which is not to be considered an error. 1037 * 1038 * The \p offsets list contains per-partition information: 1039 * - \c topic The topic committed 1040 * - \c partition The partition committed 1041 * - \c offset: Committed offset (attempted) 1042 * - \c err: Commit error 1043 */ 1044 virtual void offset_commit_cb(RdKafka::ErrorCode err, 1045 std::vector<TopicPartition*>&offsets) = 0; 1046 1047 virtual ~OffsetCommitCb() { } 1048 }; 1049 1050 1051 1052 /** 1053 * @brief SSL broker certificate verification class. 1054 * 1055 * @remark Class instance must outlive the RdKafka client instance. 1056 */ 1057 class RD_EXPORT SslCertificateVerifyCb { 1058 public: 1059 /** 1060 * @brief SSL broker certificate verification callback. 1061 * 1062 * The verification callback is triggered from internal librdkafka threads 1063 * upon connecting to a broker. On each connection attempt the callback 1064 * will be called for each certificate in the broker's certificate chain, 1065 * starting at the root certification, as long as the application callback 1066 * returns 1 (valid certificate). 1067 * 1068 * \p broker_name and \p broker_id correspond to the broker the connection 1069 * is being made to. 1070 * The \c x509_error argument indicates if OpenSSL's verification of 1071 * the certificate succeed (0) or failed (an OpenSSL error code). 1072 * The application may set the SSL context error code by returning 0 1073 * from the verify callback and providing a non-zero SSL context error code 1074 * in \p x509_error. 1075 * If the verify callback sets \p x509_error to 0, returns 1, and the 1076 * original \p x509_error was non-zero, the error on the SSL context will 1077 * be cleared. 1078 * \p x509_error is always a valid pointer to an int. 1079 * 1080 * \p depth is the depth of the current certificate in the chain, starting 1081 * at the root certificate. 1082 * 1083 * The certificate itself is passed in binary DER format in \p buf of 1084 * size \p size. 1085 * 1086 * The callback must 1 if verification succeeds, or 0 if verification fails 1087 * and write a human-readable error message 1088 * to \p errstr. 1089 * 1090 * @warning This callback will be called from internal librdkafka threads. 1091 * 1092 * @remark See <openssl/x509_vfy.h> in the OpenSSL source distribution 1093 * for a list of \p x509_error codes. 1094 */ 1095 virtual bool ssl_cert_verify_cb (const std::string &broker_name, 1096 int32_t broker_id, 1097 int *x509_error, 1098 int depth, 1099 const char *buf, size_t size, 1100 std::string &errstr) = 0; 1101 1102 virtual ~SslCertificateVerifyCb() {} 1103 }; 1104 1105 1106 /** 1107 * @brief \b Portability: SocketCb callback class 1108 * 1109 */ 1110 class RD_EXPORT SocketCb { 1111 public: 1112 /** 1113 * @brief Socket callback 1114 * 1115 * The socket callback is responsible for opening a socket 1116 * according to the supplied \p domain, \p type and \p protocol. 1117 * The socket shall be created with \c CLOEXEC set in a racefree fashion, if 1118 * possible. 1119 * 1120 * It is typically not required to register an alternative socket 1121 * implementation 1122 * 1123 * @returns The socket file descriptor or -1 on error (\c errno must be set) 1124 */ 1125 virtual int socket_cb (int domain, int type, int protocol) = 0; 1126 1127 virtual ~SocketCb() { } 1128 }; 1129 1130 1131 /** 1132 * @brief \b Portability: OpenCb callback class 1133 * 1134 */ 1135 class RD_EXPORT OpenCb { 1136 public: 1137 /** 1138 * @brief Open callback 1139 * The open callback is responsible for opening the file specified by 1140 * \p pathname, using \p flags and \p mode. 1141 * The file shall be opened with \c CLOEXEC set in a racefree fashion, if 1142 * possible. 1143 * 1144 * It is typically not required to register an alternative open implementation 1145 * 1146 * @remark Not currently available on native Win32 1147 */ 1148 virtual int open_cb (const std::string &path, int flags, int mode) = 0; 1149 1150 virtual ~OpenCb() { } 1151 }; 1152 1153 1154 /**@}*/ 1155 1156 1157 1158 1159 /** 1160 * @name Configuration interface 1161 * @{ 1162 * 1163 */ 1164 1165 /** 1166 * @brief Configuration interface 1167 * 1168 * Holds either global or topic configuration that are passed to 1169 * RdKafka::Consumer::create(), RdKafka::Producer::create(), 1170 * RdKafka::KafkaConsumer::create(), etc. 1171 * 1172 * @sa CONFIGURATION.md for the full list of supported properties. 1173 */ 1174 class RD_EXPORT Conf { 1175 public: 1176 /** 1177 * @brief Configuration object type 1178 */ 1179 enum ConfType { 1180 CONF_GLOBAL, /**< Global configuration */ 1181 CONF_TOPIC /**< Topic specific configuration */ 1182 }; 1183 1184 /** 1185 * @brief RdKafka::Conf::Set() result code 1186 */ 1187 enum ConfResult { 1188 CONF_UNKNOWN = -2, /**< Unknown configuration property */ 1189 CONF_INVALID = -1, /**< Invalid configuration value */ 1190 CONF_OK = 0 /**< Configuration property was succesfully set */ 1191 }; 1192 1193 1194 /** 1195 * @brief Create configuration object 1196 */ 1197 static Conf *create (ConfType type); 1198 1199 virtual ~Conf () { } 1200 1201 /** 1202 * @brief Set configuration property \p name to value \p value. 1203 * 1204 * Fallthrough: 1205 * Topic-level configuration properties may be set using this interface 1206 * in which case they are applied on the \c default_topic_conf. 1207 * If no \c default_topic_conf has been set one will be created. 1208 * Any sub-sequent set("default_topic_conf", ..) calls will 1209 * replace the current default topic configuration. 1210 1211 * @returns CONF_OK on success, else writes a human readable error 1212 * description to \p errstr on error. 1213 */ 1214 virtual Conf::ConfResult set (const std::string &name, 1215 const std::string &value, 1216 std::string &errstr) = 0; 1217 1218 /** @brief Use with \p name = \c \"dr_cb\" */ 1219 virtual Conf::ConfResult set (const std::string &name, 1220 DeliveryReportCb *dr_cb, 1221 std::string &errstr) = 0; 1222 1223 /** @brief Use with \p name = \c \"oauthbearer_token_refresh_cb\" */ 1224 virtual Conf::ConfResult set (const std::string &name, 1225 OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, 1226 std::string &errstr) = 0; 1227 1228 /** @brief Use with \p name = \c \"event_cb\" */ 1229 virtual Conf::ConfResult set (const std::string &name, 1230 EventCb *event_cb, 1231 std::string &errstr) = 0; 1232 1233 /** @brief Use with \p name = \c \"default_topic_conf\" 1234 * 1235 * Sets the default topic configuration to use for for automatically 1236 * subscribed topics. 1237 * 1238 * @sa RdKafka::KafkaConsumer::subscribe() 1239 */ 1240 virtual Conf::ConfResult set (const std::string &name, 1241 const Conf *topic_conf, 1242 std::string &errstr) = 0; 1243 1244 /** @brief Use with \p name = \c \"partitioner_cb\" */ 1245 virtual Conf::ConfResult set (const std::string &name, 1246 PartitionerCb *partitioner_cb, 1247 std::string &errstr) = 0; 1248 1249 /** @brief Use with \p name = \c \"partitioner_key_pointer_cb\" */ 1250 virtual Conf::ConfResult set (const std::string &name, 1251 PartitionerKeyPointerCb *partitioner_kp_cb, 1252 std::string &errstr) = 0; 1253 1254 /** @brief Use with \p name = \c \"socket_cb\" */ 1255 virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb, 1256 std::string &errstr) = 0; 1257 1258 /** @brief Use with \p name = \c \"open_cb\" */ 1259 virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb, 1260 std::string &errstr) = 0; 1261 1262 /** @brief Use with \p name = \c \"rebalance_cb\" */ 1263 virtual Conf::ConfResult set (const std::string &name, 1264 RebalanceCb *rebalance_cb, 1265 std::string &errstr) = 0; 1266 1267 /** @brief Use with \p name = \c \"offset_commit_cb\" */ 1268 virtual Conf::ConfResult set (const std::string &name, 1269 OffsetCommitCb *offset_commit_cb, 1270 std::string &errstr) = 0; 1271 1272 /** @brief Use with \p name = \c \"ssl_cert_verify_cb\". 1273 * @returns CONF_OK on success or CONF_INVALID if SSL is 1274 * not supported in this build. 1275 */ 1276 virtual Conf::ConfResult set(const std::string &name, 1277 SslCertificateVerifyCb *ssl_cert_verify_cb, 1278 std::string &errstr) = 0; 1279 1280 /** 1281 * @brief Set certificate/key \p cert_type from the \p cert_enc encoded 1282 * memory at \p buffer of \p size bytes. 1283 * 1284 * @param cert_type Certificate or key type to configure. 1285 * @param cert_enc Buffer \p encoding type. 1286 * @param buffer Memory pointer to encoded certificate or key. 1287 * The memory is not referenced after this function returns. 1288 * @param size Size of memory at \p buffer. 1289 * @param errstr A human-readable error string will be written to this string 1290 * on failure. 1291 * 1292 * @returns CONF_OK on success or CONF_INVALID if the memory in 1293 * \p buffer is of incorrect encoding, or if librdkafka 1294 * was not built with SSL support. 1295 * 1296 * @remark Calling this method multiple times with the same \p cert_type 1297 * will replace the previous value. 1298 * 1299 * @remark Calling this method with \p buffer set to NULL will clear the 1300 * configuration for \p cert_type. 1301 * 1302 * @remark The private key may require a password, which must be specified 1303 * with the `ssl.key.password` configuration property prior to 1304 * calling this function. 1305 * 1306 * @remark Private and public keys in PEM format may also be set with the 1307 * `ssl.key.pem` and `ssl.certificate.pem` configuration properties. 1308 */ 1309 virtual Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type, 1310 RdKafka::CertificateEncoding cert_enc, 1311 const void *buffer, size_t size, 1312 std::string &errstr) = 0; 1313 1314 /** @brief Query single configuration value 1315 * 1316 * Do not use this method to get callbacks registered by the configuration file. 1317 * Instead use the specific get() methods with the specific callback parameter in the signature. 1318 * 1319 * Fallthrough: 1320 * Topic-level configuration properties from the \c default_topic_conf 1321 * may be retrieved using this interface. 1322 * 1323 * @returns CONF_OK if the property was set previously set and 1324 * returns the value in \p value. */ 1325 virtual Conf::ConfResult get(const std::string &name, 1326 std::string &value) const = 0; 1327 1328 /** @brief Query single configuration value 1329 * @returns CONF_OK if the property was set previously set and 1330 * returns the value in \p dr_cb. */ 1331 virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0; 1332 1333 /** @brief Query single configuration value 1334 * @returns CONF_OK if the property was set previously set and 1335 * returns the value in \p oauthbearer_token_refresh_cb. */ 1336 virtual Conf::ConfResult get( 1337 OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0; 1338 1339 /** @brief Query single configuration value 1340 * @returns CONF_OK if the property was set previously set and 1341 * returns the value in \p event_cb. */ 1342 virtual Conf::ConfResult get(EventCb *&event_cb) const = 0; 1343 1344 /** @brief Query single configuration value 1345 * @returns CONF_OK if the property was set previously set and 1346 * returns the value in \p partitioner_cb. */ 1347 virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0; 1348 1349 /** @brief Query single configuration value 1350 * @returns CONF_OK if the property was set previously set and 1351 * returns the value in \p partitioner_kp_cb. */ 1352 virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0; 1353 1354 /** @brief Query single configuration value 1355 * @returns CONF_OK if the property was set previously set and 1356 * returns the value in \p socket_cb. */ 1357 virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0; 1358 1359 /** @brief Query single configuration value 1360 * @returns CONF_OK if the property was set previously set and 1361 * returns the value in \p open_cb. */ 1362 virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0; 1363 1364 /** @brief Query single configuration value 1365 * @returns CONF_OK if the property was set previously set and 1366 * returns the value in \p rebalance_cb. */ 1367 virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0; 1368 1369 /** @brief Query single configuration value 1370 * @returns CONF_OK if the property was set previously set and 1371 * returns the value in \p offset_commit_cb. */ 1372 virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0; 1373 1374 /** @brief Use with \p name = \c \"ssl_cert_verify_cb\" */ 1375 virtual Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0; 1376 1377 /** @brief Dump configuration names and values to list containing 1378 * name,value tuples */ 1379 virtual std::list<std::string> *dump () = 0; 1380 1381 /** @brief Use with \p name = \c \"consume_cb\" */ 1382 virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb, 1383 std::string &errstr) = 0; 1384 1385 /** 1386 * @brief Returns the underlying librdkafka C rd_kafka_conf_t handle. 1387 * 1388 * @warning Calling the C API on this handle is not recommended and there 1389 * is no official support for it, but for cases where the C++ 1390 * does not provide the proper functionality this C handle can be 1391 * used to interact directly with the core librdkafka API. 1392 * 1393 * @remark The lifetime of the returned pointer is the same as the Conf 1394 * object this method is called on. 1395 * 1396 * @remark Include <rdkafka/rdkafka.h> prior to including 1397 * <rdkafka/rdkafkacpp.h> 1398 * 1399 * @returns \c rd_kafka_conf_t* if this is a CONF_GLOBAL object, else NULL. 1400 */ 1401 virtual struct rd_kafka_conf_s *c_ptr_global () = 0; 1402 1403 /** 1404 * @brief Returns the underlying librdkafka C rd_kafka_topic_conf_t handle. 1405 * 1406 * @warning Calling the C API on this handle is not recommended and there 1407 * is no official support for it, but for cases where the C++ 1408 * does not provide the proper functionality this C handle can be 1409 * used to interact directly with the core librdkafka API. 1410 * 1411 * @remark The lifetime of the returned pointer is the same as the Conf 1412 * object this method is called on. 1413 * 1414 * @remark Include <rdkafka/rdkafka.h> prior to including 1415 * <rdkafka/rdkafkacpp.h> 1416 * 1417 * @returns \c rd_kafka_topic_conf_t* if this is a CONF_TOPIC object, 1418 * else NULL. 1419 */ 1420 virtual struct rd_kafka_topic_conf_s *c_ptr_topic () = 0; 1421 1422 /** 1423 * @brief Set callback_data for ssl engine. 1424 * 1425 * @remark The \c ssl.engine.location configuration must be set for this 1426 * to have affect. 1427 * 1428 * @remark The memory pointed to by \p value must remain valid for the 1429 * lifetime of the configuration object and any Kafka clients that 1430 * use it. 1431 * 1432 * @returns CONF_OK on success, else CONF_INVALID. 1433 */ 1434 virtual Conf::ConfResult set_engine_callback_data (void *value, 1435 std::string &errstr) = 0; 1436 }; 1437 1438 /**@}*/ 1439 1440 1441 /** 1442 * @name Kafka base client handle 1443 * @{ 1444 * 1445 */ 1446 1447 /** 1448 * @brief Base handle, super class for specific clients. 1449 */ 1450 class RD_EXPORT Handle { 1451 public: 1452 virtual ~Handle() { } 1453 1454 /** @returns the name of the handle */ 1455 virtual const std::string name () const = 0; 1456 1457 /** 1458 * @brief Returns the client's broker-assigned group member id 1459 * 1460 * @remark This currently requires the high-level KafkaConsumer 1461 * 1462 * @returns Last assigned member id, or empty string if not currently 1463 * a group member. 1464 */ 1465 virtual const std::string memberid () const = 0; 1466 1467 1468 /** 1469 * @brief Polls the provided kafka handle for events. 1470 * 1471 * Events will trigger application provided callbacks to be called. 1472 * 1473 * The \p timeout_ms argument specifies the maximum amount of time 1474 * (in milliseconds) that the call will block waiting for events. 1475 * For non-blocking calls, provide 0 as \p timeout_ms. 1476 * To wait indefinately for events, provide -1. 1477 * 1478 * Events: 1479 * - delivery report callbacks (if an RdKafka::DeliveryCb is configured) [producer] 1480 * - event callbacks (if an RdKafka::EventCb is configured) [producer & consumer] 1481 * 1482 * @remark An application should make sure to call poll() at regular 1483 * intervals to serve any queued callbacks waiting to be called. 1484 * 1485 * @warning This method MUST NOT be used with the RdKafka::KafkaConsumer, 1486 * use its RdKafka::KafkaConsumer::consume() instead. 1487 * 1488 * @returns the number of events served. 1489 */ 1490 virtual int poll (int timeout_ms) = 0; 1491 1492 /** 1493 * @brief Returns the current out queue length 1494 * 1495 * The out queue contains messages and requests waiting to be sent to, 1496 * or acknowledged by, the broker. 1497 */ 1498 virtual int outq_len () = 0; 1499 1500 /** 1501 * @brief Request Metadata from broker. 1502 * 1503 * Parameters: 1504 * \p all_topics - if non-zero: request info about all topics in cluster, 1505 * if zero: only request info about locally known topics. 1506 * \p only_rkt - only request info about this topic 1507 * \p metadatap - pointer to hold metadata result. 1508 * The \p *metadatap pointer must be released with \c delete. 1509 * \p timeout_ms - maximum response time before failing. 1510 * 1511 * @returns RdKafka::ERR_NO_ERROR on success (in which case \p *metadatap 1512 * will be set), else RdKafka::ERR__TIMED_OUT on timeout or 1513 * other error code on error. 1514 */ 1515 virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, 1516 Metadata **metadatap, int timeout_ms) = 0; 1517 1518 1519 /** 1520 * @brief Pause producing or consumption for the provided list of partitions. 1521 * 1522 * Success or error is returned per-partition in the \p partitions list. 1523 * 1524 * @returns ErrorCode::NO_ERROR 1525 * 1526 * @sa resume() 1527 */ 1528 virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0; 1529 1530 1531 /** 1532 * @brief Resume producing or consumption for the provided list of partitions. 1533 * 1534 * Success or error is returned per-partition in the \p partitions list. 1535 * 1536 * @returns ErrorCode::NO_ERROR 1537 * 1538 * @sa pause() 1539 */ 1540 virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0; 1541 1542 1543 /** 1544 * @brief Query broker for low (oldest/beginning) 1545 * and high (newest/end) offsets for partition. 1546 * 1547 * Offsets are returned in \p *low and \p *high respectively. 1548 * 1549 * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure. 1550 */ 1551 virtual ErrorCode query_watermark_offsets (const std::string &topic, 1552 int32_t partition, 1553 int64_t *low, int64_t *high, 1554 int timeout_ms) = 0; 1555 1556 /** 1557 * @brief Get last known low (oldest/beginning) 1558 * and high (newest/end) offsets for partition. 1559 * 1560 * The low offset is updated periodically (if statistics.interval.ms is set) 1561 * while the high offset is updated on each fetched message set from the 1562 * broker. 1563 * 1564 * If there is no cached offset (either low or high, or both) then 1565 * OFFSET_INVALID will be returned for the respective offset. 1566 * 1567 * Offsets are returned in \p *low and \p *high respectively. 1568 * 1569 * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure. 1570 * 1571 * @remark Shall only be used with an active consumer instance. 1572 */ 1573 virtual ErrorCode get_watermark_offsets (const std::string &topic, 1574 int32_t partition, 1575 int64_t *low, int64_t *high) = 0; 1576 1577 1578 /** 1579 * @brief Look up the offsets for the given partitions by timestamp. 1580 * 1581 * The returned offset for each partition is the earliest offset whose 1582 * timestamp is greater than or equal to the given timestamp in the 1583 * corresponding partition. 1584 * 1585 * The timestamps to query are represented as \c offset in \p offsets 1586 * on input, and \c offset() will return the closest earlier offset 1587 * for the timestamp on output. 1588 * 1589 * Timestamps are expressed as milliseconds since epoch (UTC). 1590 * 1591 * The function will block for at most \p timeout_ms milliseconds. 1592 * 1593 * @remark Duplicate Topic+Partitions are not supported. 1594 * @remark Errors are also returned per TopicPartition, see \c err() 1595 * 1596 * @returns an error code for general errors, else RdKafka::ERR_NO_ERROR 1597 * in which case per-partition errors might be set. 1598 */ 1599 virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets, 1600 int timeout_ms) = 0; 1601 1602 1603 /** 1604 * @brief Retrieve queue for a given partition. 1605 * 1606 * @returns The fetch queue for the given partition if successful. Else, 1607 * NULL is returned. 1608 * 1609 * @remark This function only works on consumers. 1610 */ 1611 virtual Queue *get_partition_queue (const TopicPartition *partition) = 0; 1612 1613 /** 1614 * @brief Forward librdkafka logs (and debug) to the specified queue 1615 * for serving with one of the ..poll() calls. 1616 * 1617 * This allows an application to serve log callbacks (\c log_cb) 1618 * in its thread of choice. 1619 * 1620 * @param queue Queue to forward logs to. If the value is NULL the logs 1621 * are forwarded to the main queue. 1622 * 1623 * @remark The configuration property \c log.queue MUST also be set to true. 1624 * 1625 * @remark librdkafka maintains its own reference to the provided queue. 1626 * 1627 * @returns ERR_NO_ERROR on success or an error code on error. 1628 */ 1629 virtual ErrorCode set_log_queue (Queue *queue) = 0; 1630 1631 /** 1632 * @brief Cancels the current callback dispatcher (Handle::poll(), 1633 * KafkaConsumer::consume(), etc). 1634 * 1635 * A callback may use this to force an immediate return to the calling 1636 * code (caller of e.g. Handle::poll()) without processing any further 1637 * events. 1638 * 1639 * @remark This function MUST ONLY be called from within a 1640 * librdkafka callback. 1641 */ 1642 virtual void yield () = 0; 1643 1644 /** 1645 * @brief Returns the ClusterId as reported in broker metadata. 1646 * 1647 * @param timeout_ms If there is no cached value from metadata retrieval 1648 * then this specifies the maximum amount of time 1649 * (in milliseconds) the call will block waiting 1650 * for metadata to be retrieved. 1651 * Use 0 for non-blocking calls. 1652 * 1653 * @remark Requires broker version >=0.10.0 and api.version.request=true. 1654 * 1655 * @returns Last cached ClusterId, or empty string if no ClusterId could be 1656 * retrieved in the allotted timespan. 1657 */ 1658 virtual const std::string clusterid (int timeout_ms) = 0; 1659 1660 /** 1661 * @brief Returns the underlying librdkafka C rd_kafka_t handle. 1662 * 1663 * @warning Calling the C API on this handle is not recommended and there 1664 * is no official support for it, but for cases where the C++ 1665 * does not provide the proper functionality this C handle can be 1666 * used to interact directly with the core librdkafka API. 1667 * 1668 * @remark The lifetime of the returned pointer is the same as the Topic 1669 * object this method is called on. 1670 * 1671 * @remark Include <rdkafka/rdkafka.h> prior to including 1672 * <rdkafka/rdkafkacpp.h> 1673 * 1674 * @returns \c rd_kafka_t* 1675 */ 1676 virtual struct rd_kafka_s *c_ptr () = 0; 1677 1678 /** 1679 * @brief Returns the current ControllerId (controller broker id) 1680 * as reported in broker metadata. 1681 * 1682 * @param timeout_ms If there is no cached value from metadata retrieval 1683 * then this specifies the maximum amount of time 1684 * (in milliseconds) the call will block waiting 1685 * for metadata to be retrieved. 1686 * Use 0 for non-blocking calls. 1687 * 1688 * @remark Requires broker version >=0.10.0 and api.version.request=true. 1689 * 1690 * @returns Last cached ControllerId, or -1 if no ControllerId could be 1691 * retrieved in the allotted timespan. 1692 */ 1693 virtual int32_t controllerid (int timeout_ms) = 0; 1694 1695 1696 /** 1697 * @brief Returns the first fatal error set on this client instance, 1698 * or ERR_NO_ERROR if no fatal error has occurred. 1699 * 1700 * This function is to be used with the Idempotent Producer and 1701 * the Event class for \c EVENT_ERROR events to detect fatal errors. 1702 * 1703 * Generally all errors raised by the error event are to be considered 1704 * informational and temporary, the client will try to recover from all 1705 * errors in a graceful fashion (by retrying, etc). 1706 * 1707 * However, some errors should logically be considered fatal to retain 1708 * consistency; in particular a set of errors that may occur when using the 1709 * Idempotent Producer and the in-order or exactly-once producer guarantees 1710 * can't be satisfied. 1711 * 1712 * @param errstr A human readable error string if a fatal error was set. 1713 * 1714 * @returns ERR_NO_ERROR if no fatal error has been raised, else 1715 * any other error code. 1716 */ 1717 virtual ErrorCode fatal_error (std::string &errstr) const = 0; 1718 1719 /** 1720 * @brief Set SASL/OAUTHBEARER token and metadata 1721 * 1722 * @param token_value the mandatory token value to set, often (but not 1723 * necessarily) a JWS compact serialization as per 1724 * https://tools.ietf.org/html/rfc7515#section-3.1. 1725 * @param md_lifetime_ms when the token expires, in terms of the number of 1726 * milliseconds since the epoch. 1727 * @param md_principal_name the Kafka principal name associated with the 1728 * token. 1729 * @param extensions potentially empty SASL extension keys and values where 1730 * element [i] is the key and [i+1] is the key's value, to be communicated 1731 * to the broker as additional key-value pairs during the initial client 1732 * response as per https://tools.ietf.org/html/rfc7628#section-3.1. The 1733 * number of SASL extension keys plus values must be a non-negative multiple 1734 * of 2. Any provided keys and values are copied. 1735 * @param errstr A human readable error string is written here, only if 1736 * there is an error. 1737 * 1738 * The SASL/OAUTHBEARER token refresh callback should invoke 1739 * this method upon success. The extension keys must not include the reserved 1740 * key "`auth`", and all extension keys and values must conform to the 1741 * required format as per https://tools.ietf.org/html/rfc7628#section-3.1: 1742 * 1743 * key = 1*(ALPHA) 1744 * value = *(VCHAR / SP / HTAB / CR / LF ) 1745 * 1746 * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise \p errstr set 1747 * and:<br> 1748 * \c RdKafka::ERR__INVALID_ARG if any of the arguments are 1749 * invalid;<br> 1750 * \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not 1751 * supported by this build;<br> 1752 * \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is 1753 * not configured as the client's authentication mechanism.<br> 1754 * 1755 * @sa RdKafka::oauthbearer_set_token_failure 1756 * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb" 1757 */ 1758 virtual ErrorCode oauthbearer_set_token (const std::string &token_value, 1759 int64_t md_lifetime_ms, 1760 const std::string &md_principal_name, 1761 const std::list<std::string> &extensions, 1762 std::string &errstr) = 0; 1763 1764 /** 1765 * @brief SASL/OAUTHBEARER token refresh failure indicator. 1766 * 1767 * @param errstr human readable error reason for failing to acquire a token. 1768 * 1769 * The SASL/OAUTHBEARER token refresh callback should 1770 * invoke this method upon failure to refresh the token. 1771 * 1772 * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise:<br> 1773 * \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not 1774 * supported by this build;<br> 1775 * \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is 1776 * not configured as the client's authentication mechanism. 1777 * 1778 * @sa RdKafka::oauthbearer_set_token 1779 * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb" 1780 */ 1781 virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr) = 0; 1782 1783 /** 1784 * @brief Allocate memory using the same allocator librdkafka uses. 1785 * 1786 * This is typically an abstraction for the malloc(3) call and makes sure 1787 * the application can use the same memory allocator as librdkafka for 1788 * allocating pointers that are used by librdkafka. 1789 * 1790 * @remark Memory allocated by mem_malloc() must be freed using 1791 * mem_free(). 1792 */ 1793 virtual void *mem_malloc (size_t size) = 0; 1794 1795 /** 1796 * @brief Free pointer returned by librdkafka 1797 * 1798 * This is typically an abstraction for the free(3) call and makes sure 1799 * the application can use the same memory allocator as librdkafka for 1800 * freeing pointers returned by librdkafka. 1801 * 1802 * In standard setups it is usually not necessary to use this interface 1803 * rather than the free(3) function. 1804 * 1805 * @remark mem_free() must only be used for pointers returned by APIs 1806 * that explicitly mention using this function for freeing. 1807 */ 1808 virtual void mem_free (void *ptr) = 0; 1809 }; 1810 1811 1812 /**@}*/ 1813 1814 1815 /** 1816 * @name Topic and partition objects 1817 * @{ 1818 * 1819 */ 1820 1821 /** 1822 * @brief Topic+Partition 1823 * 1824 * This is a generic type to hold a single partition and various 1825 * information about it. 1826 * 1827 * Is typically used with std::vector<RdKafka::TopicPartition*> to provide 1828 * a list of partitions for different operations. 1829 */ 1830 class RD_EXPORT TopicPartition { 1831 public: 1832 /** 1833 * @brief Create topic+partition object for \p topic and \p partition. 1834 * 1835 * Use \c delete to deconstruct. 1836 */ 1837 static TopicPartition *create (const std::string &topic, int partition); 1838 1839 /** 1840 * @brief Create topic+partition object for \p topic and \p partition 1841 * with offset \p offset. 1842 * 1843 * Use \c delete to deconstruct. 1844 */ 1845 static TopicPartition *create (const std::string &topic, int partition, 1846 int64_t offset); 1847 1848 virtual ~TopicPartition() = 0; 1849 1850 /** 1851 * @brief Destroy/delete the TopicPartitions in \p partitions 1852 * and clear the vector. 1853 */ 1854 static void destroy (std::vector<TopicPartition*> &partitions); 1855 1856 /** @returns topic name */ 1857 virtual const std::string &topic () const = 0; 1858 1859 /** @returns partition id */ 1860 virtual int partition () const = 0; 1861 1862 /** @returns offset (if applicable) */ 1863 virtual int64_t offset () const = 0; 1864 1865 /** @brief Set offset */ 1866 virtual void set_offset (int64_t offset) = 0; 1867 1868 /** @returns error code (if applicable) */ 1869 virtual ErrorCode err () const = 0; 1870 }; 1871 1872 1873 1874 /** 1875 * @brief Topic handle 1876 * 1877 */ 1878 class RD_EXPORT Topic { 1879 public: 1880 /** 1881 * @brief Unassigned partition. 1882 * 1883 * The unassigned partition is used by the producer API for messages 1884 * that should be partitioned using the configured or default partitioner. 1885 */ 1886 static const int32_t PARTITION_UA; 1887 1888 /** @brief Special offsets */ 1889 static const int64_t OFFSET_BEGINNING; /**< Consume from beginning */ 1890 static const int64_t OFFSET_END; /**< Consume from end */ 1891 static const int64_t OFFSET_STORED; /**< Use offset storage */ 1892 static const int64_t OFFSET_INVALID; /**< Invalid offset */ 1893 1894 1895 /** 1896 * @brief Creates a new topic handle for topic named \p topic_str 1897 * 1898 * \p conf is an optional configuration for the topic that will be used 1899 * instead of the default topic configuration. 1900 * The \p conf object is reusable after this call. 1901 * 1902 * @returns the new topic handle or NULL on error (see \p errstr). 1903 */ 1904 static Topic *create (Handle *base, const std::string &topic_str, 1905 const Conf *conf, std::string &errstr); 1906 1907 virtual ~Topic () = 0; 1908 1909 1910 /** @returns the topic name */ 1911 virtual const std::string name () const = 0; 1912 1913 /** 1914 * @returns true if \p partition is available for the topic (has leader). 1915 * @warning \b MUST \b ONLY be called from within a 1916 * RdKafka::PartitionerCb callback. 1917 */ 1918 virtual bool partition_available (int32_t partition) const = 0; 1919 1920 /** 1921 * @brief Store offset \p offset + 1 for topic partition \p partition. 1922 * The offset will be committed (written) to the broker (or file) according 1923 * to \p auto.commit.interval.ms or next manual offset-less commit call. 1924 * 1925 * @remark \c enable.auto.offset.store must be set to \c false when using 1926 * this API. 1927 * 1928 * @returns RdKafka::ERR_NO_ERROR on success or an error code if none of the 1929 * offsets could be stored. 1930 */ 1931 virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0; 1932 1933 /** 1934 * @brief Returns the underlying librdkafka C rd_kafka_topic_t handle. 1935 * 1936 * @warning Calling the C API on this handle is not recommended and there 1937 * is no official support for it, but for cases where the C++ API 1938 * does not provide the underlying functionality this C handle can be 1939 * used to interact directly with the core librdkafka API. 1940 * 1941 * @remark The lifetime of the returned pointer is the same as the Topic 1942 * object this method is called on. 1943 * 1944 * @remark Include <rdkafka/rdkafka.h> prior to including 1945 * <rdkafka/rdkafkacpp.h> 1946 * 1947 * @returns \c rd_kafka_topic_t* 1948 */ 1949 virtual struct rd_kafka_topic_s *c_ptr () = 0; 1950 }; 1951 1952 1953 /**@}*/ 1954 1955 1956 /** 1957 * @name Message object 1958 * @{ 1959 * 1960 */ 1961 1962 1963 /** 1964 * @brief Message timestamp object 1965 * 1966 * Represents the number of milliseconds since the epoch (UTC). 1967 * 1968 * The MessageTimestampType dictates the timestamp type or origin. 1969 * 1970 * @remark Requires Apache Kafka broker version >= 0.10.0 1971 * 1972 */ 1973 1974 class RD_EXPORT MessageTimestamp { 1975 public: 1976 /*! Message timestamp type */ 1977 enum MessageTimestampType { 1978 MSG_TIMESTAMP_NOT_AVAILABLE, /**< Timestamp not available */ 1979 MSG_TIMESTAMP_CREATE_TIME, /**< Message creation time (source) */ 1980 MSG_TIMESTAMP_LOG_APPEND_TIME /**< Message log append time (broker) */ 1981 }; 1982 1983 MessageTimestampType type; /**< Timestamp type */ 1984 int64_t timestamp; /**< Milliseconds since epoch (UTC). */ 1985 }; 1986 1987 1988 /** 1989 * @brief Headers object 1990 * 1991 * Represents message headers. 1992 * 1993 * https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers 1994 * 1995 * @remark Requires Apache Kafka >= 0.11.0 brokers 1996 */ 1997 class RD_EXPORT Headers { 1998 public: 1999 virtual ~Headers() = 0; 2000 2001 /** 2002 * @brief Header object 2003 * 2004 * This object represents a single Header with a key value pair 2005 * and an ErrorCode 2006 * 2007 * @remark dynamic allocation of this object is not supported. 2008 */ 2009 class Header { 2010 public: 2011 /** 2012 * @brief Header object to encapsulate a single Header 2013 * 2014 * @param key the string value for the header key 2015 * @param value the bytes of the header value, or NULL 2016 * @param value_size the length in bytes of the header value 2017 * 2018 * @remark key and value are copied. 2019 * 2020 */ 2021 Header(const std::string &key, 2022 const void *value, 2023 size_t value_size): 2024 key_(key), err_(ERR_NO_ERROR), value_size_(value_size) { 2025 value_ = copy_value(value, value_size); 2026 } 2027 2028 /** 2029 * @brief Header object to encapsulate a single Header 2030 * 2031 * @param key the string value for the header key 2032 * @param value the bytes of the header value 2033 * @param value_size the length in bytes of the header value 2034 * @param err the error code if one returned 2035 * 2036 * @remark The error code is used for when the Header is constructed 2037 * internally by using RdKafka::Headers::get_last which constructs 2038 * a Header encapsulating the ErrorCode in the process. 2039 * If err is set, the value and value_size fields will be undefined. 2040 */ 2041 Header(const std::string &key, 2042 const void *value, 2043 size_t value_size, 2044 const RdKafka::ErrorCode err): 2045 key_(key), err_(err), value_(NULL), value_size_(value_size) { 2046 if (err == ERR_NO_ERROR) 2047 value_ = copy_value(value, value_size); 2048 } 2049 2050 /** 2051 * @brief Copy constructor 2052 * 2053 * @param other Header to make a copy of. 2054 */ 2055 Header(const Header &other): 2056 key_(other.key_), err_(other.err_), value_size_(other.value_size_) { 2057 value_ = copy_value(other.value_, value_size_); 2058 } 2059 2060 /** 2061 * @brief Assignment operator 2062 * 2063 * @param other Header to make a copy of. 2064 */ 2065 Header& operator=(const Header &other) 2066 { 2067 if (&other == this) { 2068 return *this; 2069 } 2070 2071 key_ = other.key_; 2072 err_ = other.err_; 2073 value_size_ = other.value_size_; 2074 2075 if (value_ != NULL) 2076 mem_free(value_); 2077 2078 value_ = copy_value(other.value_, value_size_); 2079 2080 return *this; 2081 } 2082 2083 ~Header() { 2084 if (value_ != NULL) 2085 mem_free(value_); 2086 } 2087 2088 /** @returns the key/name associated with this Header */ 2089 std::string key() const { 2090 return key_; 2091 } 2092 2093 /** @returns returns the binary value, or NULL */ 2094 const void *value() const { 2095 return value_; 2096 } 2097 2098 /** @returns returns the value casted to a nul-terminated C string, 2099 * or NULL. */ 2100 const char *value_string() const { 2101 return static_cast<const char *>(value_); 2102 } 2103 2104 /** @returns Value Size the length of the Value in bytes */ 2105 size_t value_size() const { 2106 return value_size_; 2107 } 2108 2109 /** @returns the error code of this Header (usually ERR_NO_ERROR) */ 2110 RdKafka::ErrorCode err() const { 2111 return err_; 2112 } 2113 2114 private: 2115 char *copy_value(const void *value, size_t value_size) { 2116 if (!value) 2117 return NULL; 2118 2119 char *dest = (char *)mem_malloc(value_size + 1); 2120 memcpy(dest, (const char *)value, value_size); 2121 dest[value_size] = '\0'; 2122 2123 return dest; 2124 } 2125 2126 std::string key_; 2127 RdKafka::ErrorCode err_; 2128 char *value_; 2129 size_t value_size_; 2130 void *operator new(size_t); /* Prevent dynamic allocation */ 2131 }; 2132 2133 /** 2134 * @brief Create a new instance of the Headers object 2135 * 2136 * @returns an empty Headers list 2137 */ 2138 static Headers *create(); 2139 2140 /** 2141 * @brief Create a new instance of the Headers object from a std::vector 2142 * 2143 * @param headers std::vector of RdKafka::Headers::Header objects. 2144 * The headers are copied, not referenced. 2145 * 2146 * @returns a Headers list from std::vector set to the size of the std::vector 2147 */ 2148 static Headers *create(const std::vector<Header> &headers); 2149 2150 /** 2151 * @brief Adds a Header to the end of the list. 2152 * 2153 * @param key header key/name 2154 * @param value binary value, or NULL 2155 * @param value_size size of the value 2156 * 2157 * @returns an ErrorCode signalling success or failure to add the header. 2158 */ 2159 virtual ErrorCode add(const std::string &key, const void *value, 2160 size_t value_size) = 0; 2161 2162 /** 2163 * @brief Adds a Header to the end of the list. 2164 * 2165 * Convenience method for adding a std::string as a value for the header. 2166 * 2167 * @param key header key/name 2168 * @param value value string 2169 * 2170 * @returns an ErrorCode signalling success or failure to add the header. 2171 */ 2172 virtual ErrorCode add(const std::string &key, const std::string &value) = 0; 2173 2174 /** 2175 * @brief Adds a Header to the end of the list. 2176 * 2177 * This method makes a copy of the passed header. 2178 * 2179 * @param header Existing header to copy 2180 * 2181 * @returns an ErrorCode signalling success or failure to add the header. 2182 */ 2183 virtual ErrorCode add(const Header &header) = 0; 2184 2185 /** 2186 * @brief Removes all the Headers of a given key 2187 * 2188 * @param key header key/name to remove 2189 * 2190 * @returns An ErrorCode signalling a success or failure to remove the Header. 2191 */ 2192 virtual ErrorCode remove(const std::string &key) = 0; 2193 2194 /** 2195 * @brief Gets all of the Headers of a given key 2196 * 2197 * @param key header key/name 2198 * 2199 * @remark If duplicate keys exist this will return them all as a std::vector 2200 * 2201 * @returns a std::vector containing all the Headers of the given key. 2202 */ 2203 virtual std::vector<Header> get(const std::string &key) const = 0; 2204 2205 /** 2206 * @brief Gets the last occurrence of a Header of a given key 2207 * 2208 * @param key header key/name 2209 * 2210 * @remark This will only return the most recently added header 2211 * 2212 * @returns the Header if found, otherwise a Header with an err set to 2213 * ERR__NOENT. 2214 */ 2215 virtual Header get_last(const std::string &key) const = 0; 2216 2217 /** 2218 * @brief Returns all Headers 2219 * 2220 * @returns a std::vector containing all of the Headers 2221 */ 2222 virtual std::vector<Header> get_all() const = 0; 2223 2224 /** 2225 * @returns the number of headers. 2226 */ 2227 virtual size_t size() const = 0; 2228 }; 2229 2230 2231 /** 2232 * @brief Message object 2233 * 2234 * This object represents either a single consumed or produced message, 2235 * or an event (\p err() is set). 2236 * 2237 * An application must check RdKafka::Message::err() to see if the 2238 * object is a proper message (error is RdKafka::ERR_NO_ERROR) or a 2239 * an error event. 2240 * 2241 */ 2242 class RD_EXPORT Message { 2243 public: 2244 /** @brief Message persistence status can be used by the application to 2245 * find out if a produced message was persisted in the topic log. */ 2246 enum Status { 2247 /** Message was never transmitted to the broker, or failed with 2248 * an error indicating it was not written to the log. 2249 * Application retry risks ordering, but not duplication. */ 2250 MSG_STATUS_NOT_PERSISTED = 0, 2251 2252 /** Message was transmitted to broker, but no acknowledgement was 2253 * received. 2254 * Application retry risks ordering and duplication. */ 2255 MSG_STATUS_POSSIBLY_PERSISTED = 1, 2256 2257 /** Message was written to the log and fully acknowledged. 2258 * No reason for application to retry. 2259 * Note: this value should only be trusted with \c acks=all. */ 2260 MSG_STATUS_PERSISTED = 2, 2261 }; 2262 2263 /** 2264 * @brief Accessor functions* 2265 * @remark Not all fields are present in all types of callbacks. 2266 */ 2267 2268 /** @returns The error string if object represent an error event, 2269 * else an empty string. */ 2270 virtual std::string errstr() const = 0; 2271 2272 /** @returns The error code if object represents an error event, else 0. */ 2273 virtual ErrorCode err () const = 0; 2274 2275 /** @returns the RdKafka::Topic object for a message (if applicable), 2276 * or NULL if a corresponding RdKafka::Topic object has not been 2277 * explicitly created with RdKafka::Topic::create(). 2278 * In this case use topic_name() instead. */ 2279 virtual Topic *topic () const = 0; 2280 2281 /** @returns Topic name (if applicable, else empty string) */ 2282 virtual std::string topic_name () const = 0; 2283 2284 /** @returns Partition (if applicable) */ 2285 virtual int32_t partition () const = 0; 2286 2287 /** @returns Message payload (if applicable) */ 2288 virtual void *payload () const = 0 ; 2289 2290 /** @returns Message payload length (if applicable) */ 2291 virtual size_t len () const = 0; 2292 2293 /** @returns Message key as string (if applicable) */ 2294 virtual const std::string *key () const = 0; 2295 2296 /** @returns Message key as void pointer (if applicable) */ 2297 virtual const void *key_pointer () const = 0 ; 2298 2299 /** @returns Message key's binary length (if applicable) */ 2300 virtual size_t key_len () const = 0; 2301 2302 /** @returns Message or error offset (if applicable) */ 2303 virtual int64_t offset () const = 0; 2304 2305 /** @returns Message timestamp (if applicable) */ 2306 virtual MessageTimestamp timestamp () const = 0; 2307 2308 /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */ 2309 virtual void *msg_opaque () const = 0; 2310 2311 virtual ~Message () = 0; 2312 2313 /** @returns the latency in microseconds for a produced message measured 2314 * from the produce() call, or -1 if latency is not available. */ 2315 virtual int64_t latency () const = 0; 2316 2317 /** 2318 * @brief Returns the underlying librdkafka C rd_kafka_message_t handle. 2319 * 2320 * @warning Calling the C API on this handle is not recommended and there 2321 * is no official support for it, but for cases where the C++ API 2322 * does not provide the underlying functionality this C handle can be 2323 * used to interact directly with the core librdkafka API. 2324 * 2325 * @remark The lifetime of the returned pointer is the same as the Message 2326 * object this method is called on. 2327 * 2328 * @remark Include <rdkafka/rdkafka.h> prior to including 2329 * <rdkafka/rdkafkacpp.h> 2330 * 2331 * @returns \c rd_kafka_message_t* 2332 */ 2333 virtual struct rd_kafka_message_s *c_ptr () = 0; 2334 2335 /** 2336 * @brief Returns the message's persistence status in the topic log. 2337 */ 2338 virtual Status status () const = 0; 2339 2340 /** @returns the Headers instance for this Message, or NULL if there 2341 * are no headers. 2342 * 2343 * @remark The lifetime of the Headers are the same as the Message. */ 2344 virtual RdKafka::Headers *headers () = 0; 2345 2346 /** @returns the Headers instance for this Message (if applicable). 2347 * If NULL is returned the reason is given in \p err, which 2348 * is either ERR__NOENT if there were no headers, or another 2349 * error code if header parsing failed. 2350 * 2351 * @remark The lifetime of the Headers are the same as the Message. */ 2352 virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0; 2353 2354 /** @returns the broker id of the broker the message was produced to or 2355 * fetched from, or -1 if not known/applicable. */ 2356 virtual int32_t broker_id () const = 0; 2357 }; 2358 2359 /**@}*/ 2360 2361 2362 /** 2363 * @name Queue interface 2364 * @{ 2365 * 2366 */ 2367 2368 2369 /** 2370 * @brief Queue interface 2371 * 2372 * Create a new message queue. Message queues allows the application 2373 * to re-route consumed messages from multiple topic+partitions into 2374 * one single queue point. This queue point, containing messages from 2375 * a number of topic+partitions, may then be served by a single 2376 * consume() method, rather than one per topic+partition combination. 2377 * 2378 * See the RdKafka::Consumer::start(), RdKafka::Consumer::consume(), and 2379 * RdKafka::Consumer::consume_callback() methods that take a queue as the first 2380 * parameter for more information. 2381 */ 2382 class RD_EXPORT Queue { 2383 public: 2384 /** 2385 * @brief Create Queue object 2386 */ 2387 static Queue *create (Handle *handle); 2388 2389 /** 2390 * @brief Forward/re-route queue to \p dst. 2391 * If \p dst is \c NULL, the forwarding is removed. 2392 * 2393 * The internal refcounts for both queues are increased. 2394 * 2395 * @remark Regardless of whether \p dst is NULL or not, after calling this 2396 * function, \p src will not forward it's fetch queue to the consumer 2397 * queue. 2398 */ 2399 virtual ErrorCode forward (Queue *dst) = 0; 2400 2401 2402 /** 2403 * @brief Consume message or get error event from the queue. 2404 * 2405 * @remark Use \c delete to free the message. 2406 * 2407 * @returns One of: 2408 * - proper message (RdKafka::Message::err() is ERR_NO_ERROR) 2409 * - error event (RdKafka::Message::err() is != ERR_NO_ERROR) 2410 * - timeout due to no message or event in \p timeout_ms 2411 * (RdKafka::Message::err() is ERR__TIMED_OUT) 2412 */ 2413 virtual Message *consume (int timeout_ms) = 0; 2414 2415 /** 2416 * @brief Poll queue, serving any enqueued callbacks. 2417 * 2418 * @remark Must NOT be used for queues containing messages. 2419 * 2420 * @returns the number of events served or 0 on timeout. 2421 */ 2422 virtual int poll (int timeout_ms) = 0; 2423 2424 virtual ~Queue () = 0; 2425 2426 /** 2427 * @brief Enable IO event triggering for queue. 2428 * 2429 * To ease integration with IO based polling loops this API 2430 * allows an application to create a separate file-descriptor 2431 * that librdkafka will write \p payload (of size \p size) to 2432 * whenever a new element is enqueued on a previously empty queue. 2433 * 2434 * To remove event triggering call with \p fd = -1. 2435 * 2436 * librdkafka will maintain a copy of the \p payload. 2437 * 2438 * @remark When using forwarded queues the IO event must only be enabled 2439 * on the final forwarded-to (destination) queue. 2440 */ 2441 virtual void io_event_enable (int fd, const void *payload, size_t size) = 0; 2442 }; 2443 2444 /**@}*/ 2445 2446 /** 2447 * @name ConsumerGroupMetadata 2448 * @{ 2449 * 2450 */ 2451 /** 2452 * @brief ConsumerGroupMetadata holds a consumer instance's group 2453 * metadata state. 2454 * 2455 * This class currently does not have any public methods. 2456 */ 2457 class RD_EXPORT ConsumerGroupMetadata { 2458 public: 2459 virtual ~ConsumerGroupMetadata () = 0; 2460 }; 2461 2462 /**@}*/ 2463 2464 /** 2465 * @name KafkaConsumer 2466 * @{ 2467 * 2468 */ 2469 2470 2471 /** 2472 * @brief High-level KafkaConsumer (for brokers 0.9 and later) 2473 * 2474 * @remark Requires Apache Kafka >= 0.9.0 brokers 2475 * 2476 * Currently supports the \c range and \c roundrobin partition assignment 2477 * strategies (see \c partition.assignment.strategy) 2478 */ 2479 class RD_EXPORT KafkaConsumer : public virtual Handle { 2480 public: 2481 /** 2482 * @brief Creates a KafkaConsumer. 2483 * 2484 * The \p conf object must have \c group.id set to the consumer group to join. 2485 * 2486 * Use RdKafka::KafkaConsumer::close() to shut down the consumer. 2487 * 2488 * @sa RdKafka::RebalanceCb 2489 * @sa CONFIGURATION.md for \c group.id, \c session.timeout.ms, 2490 * \c partition.assignment.strategy, etc. 2491 */ 2492 static KafkaConsumer *create (const Conf *conf, std::string &errstr); 2493 2494 virtual ~KafkaConsumer () = 0; 2495 2496 2497 /** @brief Returns the current partition assignment as set by 2498 * RdKafka::KafkaConsumer::assign() */ 2499 virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0; 2500 2501 /** @brief Returns the current subscription as set by 2502 * RdKafka::KafkaConsumer::subscribe() */ 2503 virtual ErrorCode subscription (std::vector<std::string> &topics) = 0; 2504 2505 /** 2506 * @brief Update the subscription set to \p topics. 2507 * 2508 * Any previous subscription will be unassigned and unsubscribed first. 2509 * 2510 * The subscription set denotes the desired topics to consume and this 2511 * set is provided to the partition assignor (one of the elected group 2512 * members) for all clients which then uses the configured 2513 * \c partition.assignment.strategy to assign the subscription sets's 2514 * topics's partitions to the consumers, depending on their subscription. 2515 * 2516 * The result of such an assignment is a rebalancing which is either 2517 * handled automatically in librdkafka or can be overridden by the application 2518 * by providing a RdKafka::RebalanceCb. 2519 * 2520 * The rebalancing passes the assigned partition set to 2521 * RdKafka::KafkaConsumer::assign() to update what partitions are actually 2522 * being fetched by the KafkaConsumer. 2523 * 2524 * Regex pattern matching automatically performed for topics prefixed 2525 * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\" 2526 * 2527 * @remark A consumer error will be raised for each unavailable topic in the 2528 * \p topics. The error will be ERR_UNKNOWN_TOPIC_OR_PART 2529 * for non-existent topics, and 2530 * ERR_TOPIC_AUTHORIZATION_FAILED for unauthorized topics. 2531 * The consumer error will be raised through consume() (et.al.) 2532 * with the \c RdKafka::Message::err() returning one of the 2533 * error codes mentioned above. 2534 * The subscribe function itself is asynchronous and will not return 2535 * an error on unavailable topics. 2536 * 2537 * @returns an error if the provided list of topics is invalid. 2538 */ 2539 virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0; 2540 2541 /** @brief Unsubscribe from the current subscription set. */ 2542 virtual ErrorCode unsubscribe () = 0; 2543 2544 /** 2545 * @brief Update the assignment set to \p partitions. 2546 * 2547 * The assignment set is the set of partitions actually being consumed 2548 * by the KafkaConsumer. 2549 */ 2550 virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0; 2551 2552 /** 2553 * @brief Stop consumption and remove the current assignment. 2554 */ 2555 virtual ErrorCode unassign () = 0; 2556 2557 /** 2558 * @brief Consume message or get error event, triggers callbacks. 2559 * 2560 * Will automatically call registered callbacks for any such queued events, 2561 * including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb, 2562 * etc. 2563 * 2564 * @remark Use \c delete to free the message. 2565 * 2566 * @remark An application should make sure to call consume() at regular 2567 * intervals, even if no messages are expected, to serve any 2568 * queued callbacks waiting to be called. This is especially 2569 * important when a RebalanceCb has been registered as it needs 2570 * to be called and handled properly to synchronize internal 2571 * consumer state. 2572 * 2573 * @remark Application MUST NOT call \p poll() on KafkaConsumer objects. 2574 * 2575 * @returns One of: 2576 * - proper message (RdKafka::Message::err() is ERR_NO_ERROR) 2577 * - error event (RdKafka::Message::err() is != ERR_NO_ERROR) 2578 * - timeout due to no message or event in \p timeout_ms 2579 * (RdKafka::Message::err() is ERR__TIMED_OUT) 2580 */ 2581 virtual Message *consume (int timeout_ms) = 0; 2582 2583 /** 2584 * @brief Commit offsets for the current assignment. 2585 * 2586 * @remark This is the synchronous variant that blocks until offsets 2587 * are committed or the commit fails (see return value). 2588 * 2589 * @remark If a RdKafka::OffsetCommitCb callback is registered it will 2590 * be called with commit details on a future call to 2591 * RdKafka::KafkaConsumer::consume() 2592 2593 * 2594 * @returns ERR_NO_ERROR or error code. 2595 */ 2596 virtual ErrorCode commitSync () = 0; 2597 2598 /** 2599 * @brief Asynchronous version of RdKafka::KafkaConsumer::CommitSync() 2600 * 2601 * @sa RdKafka::KafkaConsumer::commitSync() 2602 */ 2603 virtual ErrorCode commitAsync () = 0; 2604 2605 /** 2606 * @brief Commit offset for a single topic+partition based on \p message 2607 * 2608 * @remark The offset committed will be the message's offset + 1. 2609 * 2610 * @remark This is the synchronous variant. 2611 * 2612 * @sa RdKafka::KafkaConsumer::commitSync() 2613 */ 2614 virtual ErrorCode commitSync (Message *message) = 0; 2615 2616 /** 2617 * @brief Commit offset for a single topic+partition based on \p message 2618 * 2619 * @remark The offset committed will be the message's offset + 1. 2620 * 2621 * @remark This is the asynchronous variant. 2622 * 2623 * @sa RdKafka::KafkaConsumer::commitSync() 2624 */ 2625 virtual ErrorCode commitAsync (Message *message) = 0; 2626 2627 /** 2628 * @brief Commit offsets for the provided list of partitions. 2629 * 2630 * @remark The \c .offset of the partitions in \p offsets should be the 2631 * offset where consumption will resume, i.e., the last 2632 * processed offset + 1. 2633 * 2634 * @remark This is the synchronous variant. 2635 */ 2636 virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0; 2637 2638 /** 2639 * @brief Commit offset for the provided list of partitions. 2640 * 2641 * @remark The \c .offset of the partitions in \p offsets should be the 2642 * offset where consumption will resume, i.e., the last 2643 * processed offset + 1. 2644 * 2645 * @remark This is the asynchronous variant. 2646 */ 2647 virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0; 2648 2649 /** 2650 * @brief Commit offsets for the current assignment. 2651 * 2652 * @remark This is the synchronous variant that blocks until offsets 2653 * are committed or the commit fails (see return value). 2654 * 2655 * @remark The provided callback will be called from this function. 2656 * 2657 * @returns ERR_NO_ERROR or error code. 2658 */ 2659 virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0; 2660 2661 /** 2662 * @brief Commit offsets for the provided list of partitions. 2663 * 2664 * @remark This is the synchronous variant that blocks until offsets 2665 * are committed or the commit fails (see return value). 2666 * 2667 * @remark The provided callback will be called from this function. 2668 * 2669 * @returns ERR_NO_ERROR or error code. 2670 */ 2671 virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets, 2672 OffsetCommitCb *offset_commit_cb) = 0; 2673 2674 2675 2676 2677 /** 2678 * @brief Retrieve committed offsets for topics+partitions. 2679 * 2680 * @returns ERR_NO_ERROR on success in which case the 2681 * \p offset or \p err field of each \p partitions' element is filled 2682 * in with the stored offset, or a partition specific error. 2683 * Else returns an error code. 2684 */ 2685 virtual ErrorCode committed (std::vector<TopicPartition*> &partitions, 2686 int timeout_ms) = 0; 2687 2688 /** 2689 * @brief Retrieve current positions (offsets) for topics+partitions. 2690 * 2691 * @returns ERR_NO_ERROR on success in which case the 2692 * \p offset or \p err field of each \p partitions' element is filled 2693 * in with the stored offset, or a partition specific error. 2694 * Else returns an error code. 2695 */ 2696 virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0; 2697 2698 2699 /** 2700 * For pausing and resuming consumption, see 2701 * @sa RdKafka::Handle::pause() and RdKafka::Handle::resume() 2702 */ 2703 2704 2705 /** 2706 * @brief Close and shut down the proper. 2707 * 2708 * This call will block until the following operations are finished: 2709 * - Trigger a local rebalance to void the current assignment 2710 * - Stop consumption for current assignment 2711 * - Commit offsets 2712 * - Leave group 2713 * 2714 * The maximum blocking time is roughly limited to session.timeout.ms. 2715 * 2716 * @remark Callbacks, such as RdKafka::RebalanceCb and 2717 * RdKafka::OffsetCommitCb, etc, may be called. 2718 * 2719 * @remark The consumer object must later be freed with \c delete 2720 */ 2721 virtual ErrorCode close () = 0; 2722 2723 2724 /** 2725 * @brief Seek consumer for topic+partition to offset which is either an 2726 * absolute or logical offset. 2727 * 2728 * If \p timeout_ms is not 0 the call will wait this long for the 2729 * seek to be performed. If the timeout is reached the internal state 2730 * will be unknown and this function returns `ERR__TIMED_OUT`. 2731 * If \p timeout_ms is 0 it will initiate the seek but return 2732 * immediately without any error reporting (e.g., async). 2733 * 2734 * This call triggers a fetch queue barrier flush. 2735 * 2736 * @remark Consumtion for the given partition must have started for the 2737 * seek to work. Use assign() to set the starting offset. 2738 * 2739 * @returns an ErrorCode to indicate success or failure. 2740 */ 2741 virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0; 2742 2743 2744 /** 2745 * @brief Store offset \p offset for topic partition \p partition. 2746 * The offset will be committed (written) to the offset store according 2747 * to \p auto.commit.interval.ms or the next manual offset-less commit*() 2748 * 2749 * Per-partition success/error status propagated through TopicPartition.err() 2750 * 2751 * @remark The \c .offset field is stored as is, it will NOT be + 1. 2752 * 2753 * @remark \c enable.auto.offset.store must be set to \c false when using 2754 * this API. 2755 * 2756 * @returns RdKafka::ERR_NO_ERROR on success, or 2757 * RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could 2758 * be stored, or 2759 * RdKafka::ERR___INVALID_ARG if \c enable.auto.offset.store is true. 2760 */ 2761 virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0; 2762 2763 2764 /** 2765 * @returns the current consumer group metadata associated with this consumer, 2766 * or NULL if the consumer is configured with a \c group.id. 2767 * This metadata object should be passed to the transactional 2768 * producer's RdKafka::Producer::send_offsets_to_transaction() API. 2769 * 2770 * @remark The returned object must be deleted by the application. 2771 * 2772 * @sa RdKafka::Producer::send_offsets_to_transaction() 2773 */ 2774 virtual ConsumerGroupMetadata *groupMetadata () = 0; 2775 2776 2777 /** @brief Check whether the consumer considers the current assignment to 2778 * have been lost involuntarily. This method is only applicable for 2779 * use with a subscribing consumer. Assignments are revoked 2780 * immediately when determined to have been lost, so this method is 2781 * only useful within a rebalance callback. Partitions that have 2782 * been lost may already be owned by other members in the group and 2783 * therefore commiting offsets, for example, may fail. 2784 * 2785 * @remark Calling assign(), incremental_assign() or incremental_unassign() 2786 * resets this flag. 2787 * 2788 * @returns Returns true if the current partition assignment is considered 2789 * lost, false otherwise. 2790 */ 2791 virtual bool assignment_lost () = 0; 2792 2793 /** 2794 * @brief The rebalance protocol currently in use. This will be 2795 * "NONE" if the consumer has not (yet) joined a group, else it will 2796 * match the rebalance protocol ("EAGER", "COOPERATIVE") of the 2797 * configured and selected assignor(s). All configured 2798 * assignors must have the same protocol type, meaning 2799 * online migration of a consumer group from using one 2800 * protocol to another (in particular upgading from EAGER 2801 * to COOPERATIVE) without a restart is not currently 2802 * supported. 2803 * 2804 * @returns an empty string on error, or one of 2805 * "NONE", "EAGER", "COOPERATIVE" on success. 2806 */ 2807 2808 virtual std::string rebalance_protocol () = 0; 2809 2810 2811 /** 2812 * @brief Incrementally add \p partitions to the current assignment. 2813 * 2814 * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, 2815 * this method should be used in a rebalance callback to adjust the current 2816 * assignment appropriately in the case where the rebalance type is 2817 * ERR__ASSIGN_PARTITIONS. The application must pass the partition list 2818 * passed to the callback (or a copy of it), even if the list is empty. 2819 * This method may also be used outside the context of a rebalance callback. 2820 * 2821 * @returns NULL on success, or an error object if the operation was 2822 * unsuccessful. 2823 * 2824 * @remark The returned object must be deleted by the application. 2825 */ 2826 virtual Error *incremental_assign (const std::vector<TopicPartition*> &partitions) = 0; 2827 2828 2829 /** 2830 * @brief Incrementally remove \p partitions from the current assignment. 2831 * 2832 * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, 2833 * this method should be used in a rebalance callback to adjust the current 2834 * assignment appropriately in the case where the rebalance type is 2835 * ERR__REVOKE_PARTITIONS. The application must pass the partition list 2836 * passed to the callback (or a copy of it), even if the list is empty. 2837 * This method may also be used outside the context of a rebalance callback. 2838 * 2839 * @returns NULL on success, or an error object if the operation was 2840 * unsuccessful. 2841 * 2842 * @remark The returned object must be deleted by the application. 2843 */ 2844 virtual Error *incremental_unassign (const std::vector<TopicPartition*> &partitions) = 0; 2845 2846 }; 2847 2848 2849 /**@}*/ 2850 2851 2852 /** 2853 * @name Simple Consumer (legacy) 2854 * @{ 2855 * 2856 */ 2857 2858 /** 2859 * @brief Simple Consumer (legacy) 2860 * 2861 * A simple non-balanced, non-group-aware, consumer. 2862 */ 2863 class RD_EXPORT Consumer : public virtual Handle { 2864 public: 2865 /** 2866 * @brief Creates a new Kafka consumer handle. 2867 * 2868 * \p conf is an optional object that will be used instead of the default 2869 * configuration. 2870 * The \p conf object is reusable after this call. 2871 * 2872 * @returns the new handle on success or NULL on error in which case 2873 * \p errstr is set to a human readable error message. 2874 */ 2875 static Consumer *create (const Conf *conf, std::string &errstr); 2876 2877 virtual ~Consumer () = 0; 2878 2879 2880 /** 2881 * @brief Start consuming messages for topic and \p partition 2882 * at offset \p offset which may either be a proper offset (0..N) 2883 * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END. 2884 * 2885 * rdkafka will attempt to keep \p queued.min.messages (config property) 2886 * messages in the local queue by repeatedly fetching batches of messages 2887 * from the broker until the threshold is reached. 2888 * 2889 * The application shall use one of the \p ..->consume*() functions 2890 * to consume messages from the local queue, each kafka message being 2891 * represented as a `RdKafka::Message *` object. 2892 * 2893 * \p ..->start() must not be called multiple times for the same 2894 * topic and partition without stopping consumption first with 2895 * \p ..->stop(). 2896 * 2897 * @returns an ErrorCode to indicate success or failure. 2898 */ 2899 virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0; 2900 2901 /** 2902 * @brief Start consuming messages for topic and \p partition on 2903 * queue \p queue. 2904 * 2905 * @sa RdKafka::Consumer::start() 2906 */ 2907 virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset, 2908 Queue *queue) = 0; 2909 2910 /** 2911 * @brief Stop consuming messages for topic and \p partition, purging 2912 * all messages currently in the local queue. 2913 * 2914 * The application needs to be stop all consumers before destroying 2915 * the Consumer handle. 2916 * 2917 * @returns an ErrorCode to indicate success or failure. 2918 */ 2919 virtual ErrorCode stop (Topic *topic, int32_t partition) = 0; 2920 2921 /** 2922 * @brief Seek consumer for topic+partition to \p offset which is either an 2923 * absolute or logical offset. 2924 * 2925 * If \p timeout_ms is not 0 the call will wait this long for the 2926 * seek to be performed. If the timeout is reached the internal state 2927 * will be unknown and this function returns `ERR__TIMED_OUT`. 2928 * If \p timeout_ms is 0 it will initiate the seek but return 2929 * immediately without any error reporting (e.g., async). 2930 * 2931 * This call triggers a fetch queue barrier flush. 2932 * 2933 * @returns an ErrorCode to indicate success or failure. 2934 */ 2935 virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset, 2936 int timeout_ms) = 0; 2937 2938 /** 2939 * @brief Consume a single message from \p topic and \p partition. 2940 * 2941 * \p timeout_ms is maximum amount of time to wait for a message to be 2942 * received. 2943 * Consumer must have been previously started with \p ..->start(). 2944 * 2945 * @returns a Message object, the application needs to check if message 2946 * is an error or a proper message RdKafka::Message::err() and checking for 2947 * \p ERR_NO_ERROR. 2948 * 2949 * The message object must be destroyed when the application is done with it. 2950 * 2951 * Errors (in RdKafka::Message::err()): 2952 * - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched. 2953 * - ERR__PARTITION_EOF - End of partition reached, not an error. 2954 */ 2955 virtual Message *consume (Topic *topic, int32_t partition, 2956 int timeout_ms) = 0; 2957 2958 /** 2959 * @brief Consume a single message from the specified queue. 2960 * 2961 * \p timeout_ms is maximum amount of time to wait for a message to be 2962 * received. 2963 * Consumer must have been previously started on the queue with 2964 * \p ..->start(). 2965 * 2966 * @returns a Message object, the application needs to check if message 2967 * is an error or a proper message \p Message->err() and checking for 2968 * \p ERR_NO_ERROR. 2969 * 2970 * The message object must be destroyed when the application is done with it. 2971 * 2972 * Errors (in RdKafka::Message::err()): 2973 * - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched 2974 * 2975 * Note that Message->topic() may be nullptr after certain kinds of 2976 * errors, so applications should check that it isn't null before 2977 * dereferencing it. 2978 */ 2979 virtual Message *consume (Queue *queue, int timeout_ms) = 0; 2980 2981 /** 2982 * @brief Consumes messages from \p topic and \p partition, calling 2983 * the provided callback for each consumed messsage. 2984 * 2985 * \p consume_callback() provides higher throughput performance 2986 * than \p consume(). 2987 * 2988 * \p timeout_ms is the maximum amount of time to wait for one or 2989 * more messages to arrive. 2990 * 2991 * The provided \p consume_cb instance has its \p consume_cb function 2992 * called for every message received. 2993 * 2994 * The \p opaque argument is passed to the \p consume_cb as \p opaque. 2995 * 2996 * @returns the number of messages processed or -1 on error. 2997 * 2998 * @sa RdKafka::Consumer::consume() 2999 */ 3000 virtual int consume_callback (Topic *topic, int32_t partition, 3001 int timeout_ms, 3002 ConsumeCb *consume_cb, 3003 void *opaque) = 0; 3004 3005 /** 3006 * @brief Consumes messages from \p queue, calling the provided callback for 3007 * each consumed messsage. 3008 * 3009 * @sa RdKafka::Consumer::consume_callback() 3010 */ 3011 virtual int consume_callback (Queue *queue, int timeout_ms, 3012 RdKafka::ConsumeCb *consume_cb, 3013 void *opaque) = 0; 3014 3015 /** 3016 * @brief Converts an offset into the logical offset from the tail of a topic. 3017 * 3018 * \p offset is the (positive) number of items from the end. 3019 * 3020 * @returns the logical offset for message \p offset from the tail, this value 3021 * may be passed to Consumer::start, et.al. 3022 * @remark The returned logical offset is specific to librdkafka. 3023 */ 3024 static int64_t OffsetTail(int64_t offset); 3025 }; 3026 3027 /**@}*/ 3028 3029 3030 /** 3031 * @name Producer 3032 * @{ 3033 * 3034 */ 3035 3036 3037 /** 3038 * @brief Producer 3039 */ 3040 class RD_EXPORT Producer : public virtual Handle { 3041 public: 3042 /** 3043 * @brief Creates a new Kafka producer handle. 3044 * 3045 * \p conf is an optional object that will be used instead of the default 3046 * configuration. 3047 * The \p conf object is reusable after this call. 3048 * 3049 * @returns the new handle on success or NULL on error in which case 3050 * \p errstr is set to a human readable error message. 3051 */ 3052 static Producer *create (const Conf *conf, std::string &errstr); 3053 3054 3055 virtual ~Producer () = 0; 3056 3057 /** 3058 * @brief RdKafka::Producer::produce() \p msgflags 3059 * 3060 * These flags are optional. 3061 */ 3062 enum { 3063 RK_MSG_FREE = 0x1, /**< rdkafka will free(3) \p payload 3064 * when it is done with it. 3065 * Mutually exclusive with RK_MSG_COPY. */ 3066 RK_MSG_COPY = 0x2, /**< the \p payload data will be copied 3067 * and the \p payload pointer will not 3068 * be used by rdkafka after the 3069 * call returns. 3070 * Mutually exclusive with RK_MSG_FREE. */ 3071 RK_MSG_BLOCK = 0x4 /**< Block produce*() on message queue 3072 * full. 3073 * WARNING: 3074 * If a delivery report callback 3075 * is used the application MUST 3076 * call rd_kafka_poll() (or equiv.) 3077 * to make sure delivered messages 3078 * are drained from the internal 3079 * delivery report queue. 3080 * Failure to do so will result 3081 * in indefinately blocking on 3082 * the produce() call when the 3083 * message queue is full. 3084 */ 3085 3086 3087 /**@cond NO_DOC*/ 3088 /* For backwards compatibility: */ 3089 #ifndef MSG_COPY /* defined in sys/msg.h */ 3090 , /** this comma must exist betwen 3091 * RK_MSG_BLOCK and MSG_FREE 3092 */ 3093 MSG_FREE = RK_MSG_FREE, 3094 MSG_COPY = RK_MSG_COPY 3095 #endif 3096 /**@endcond*/ 3097 }; 3098 3099 /** 3100 * @brief Produce and send a single message to broker. 3101 * 3102 * This is an asynch non-blocking API. 3103 * 3104 * \p partition is the target partition, either: 3105 * - RdKafka::Topic::PARTITION_UA (unassigned) for 3106 * automatic partitioning using the topic's partitioner function, or 3107 * - a fixed partition (0..N) 3108 * 3109 * \p msgflags is zero or more of the following flags OR:ed together: 3110 * RK_MSG_BLOCK - block \p produce*() call if 3111 * \p queue.buffering.max.messages or 3112 * \p queue.buffering.max.kbytes are exceeded. 3113 * Messages are considered in-queue from the point they 3114 * are accepted by produce() until their corresponding 3115 * delivery report callback/event returns. 3116 * It is thus a requirement to call 3117 * poll() (or equiv.) from a separate 3118 * thread when RK_MSG_BLOCK is used. 3119 * See WARNING on \c RK_MSG_BLOCK above. 3120 * RK_MSG_FREE - rdkafka will free(3) \p payload when it is done with it. 3121 * RK_MSG_COPY - the \p payload data will be copied and the \p payload 3122 * pointer will not be used by rdkafka after the 3123 * call returns. 3124 * 3125 * NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive. 3126 * 3127 * If the function returns an error code and RK_MSG_FREE was specified, then 3128 * the memory associated with the payload is still the caller's 3129 * responsibility. 3130 * 3131 * \p payload is the message payload of size \p len bytes. 3132 * 3133 * \p key is an optional message key, if non-NULL it 3134 * will be passed to the topic partitioner as well as be sent with the 3135 * message to the broker and passed on to the consumer. 3136 * 3137 * \p msg_opaque is an optional application-provided per-message opaque 3138 * pointer that will provided in the delivery report callback (\p dr_cb) for 3139 * referencing this message. 3140 * 3141 * @returns an ErrorCode to indicate success or failure: 3142 * - ERR_NO_ERROR - message successfully enqueued for transmission. 3143 * 3144 * - ERR__QUEUE_FULL - maximum number of outstanding messages has been 3145 * reached: \c queue.buffering.max.message 3146 * 3147 * - ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size: 3148 * \c messages.max.bytes 3149 * 3150 * - ERR__UNKNOWN_PARTITION - requested \p partition is unknown in the 3151 * Kafka cluster. 3152 * 3153 * - ERR__UNKNOWN_TOPIC - topic is unknown in the Kafka cluster. 3154 */ 3155 virtual ErrorCode produce (Topic *topic, int32_t partition, 3156 int msgflags, 3157 void *payload, size_t len, 3158 const std::string *key, 3159 void *msg_opaque) = 0; 3160 3161 /** 3162 * @brief Variant produce() that passes the key as a pointer and length 3163 * instead of as a const std::string *. 3164 */ 3165 virtual ErrorCode produce (Topic *topic, int32_t partition, 3166 int msgflags, 3167 void *payload, size_t len, 3168 const void *key, size_t key_len, 3169 void *msg_opaque) = 0; 3170 3171 /** 3172 * @brief produce() variant that takes topic as a string (no need for 3173 * creating a Topic object), and also allows providing the 3174 * message timestamp (milliseconds since beginning of epoch, UTC). 3175 * Otherwise identical to produce() above. 3176 */ 3177 virtual ErrorCode produce (const std::string topic_name, int32_t partition, 3178 int msgflags, 3179 void *payload, size_t len, 3180 const void *key, size_t key_len, 3181 int64_t timestamp, void *msg_opaque) = 0; 3182 3183 /** 3184 * @brief produce() variant that that allows for Header support on produce 3185 * Otherwise identical to produce() above. 3186 * 3187 * @warning The \p headers will be freed/deleted if the produce() call 3188 * succeeds, or left untouched if produce() fails. 3189 */ 3190 virtual ErrorCode produce (const std::string topic_name, int32_t partition, 3191 int msgflags, 3192 void *payload, size_t len, 3193 const void *key, size_t key_len, 3194 int64_t timestamp, 3195 RdKafka::Headers *headers, 3196 void *msg_opaque) = 0; 3197 3198 3199 /** 3200 * @brief Variant produce() that accepts vectors for key and payload. 3201 * The vector data will be copied. 3202 */ 3203 virtual ErrorCode produce (Topic *topic, int32_t partition, 3204 const std::vector<char> *payload, 3205 const std::vector<char> *key, 3206 void *msg_opaque) = 0; 3207 3208 3209 /** 3210 * @brief Wait until all outstanding produce requests, et.al, are completed. 3211 * This should typically be done prior to destroying a producer instance 3212 * to make sure all queued and in-flight produce requests are completed 3213 * before terminating. 3214 * 3215 * @remark This function will call Producer::poll() and thus 3216 * trigger callbacks. 3217 * 3218 * @returns ERR__TIMED_OUT if \p timeout_ms was reached before all 3219 * outstanding requests were completed, else ERR_NO_ERROR 3220 */ 3221 virtual ErrorCode flush (int timeout_ms) = 0; 3222 3223 3224 /** 3225 * @brief Purge messages currently handled by the producer instance. 3226 * 3227 * @param purge_flags tells which messages should be purged and how. 3228 * 3229 * The application will need to call Handle::poll() or Producer::flush() 3230 * afterwards to serve the delivery report callbacks of the purged messages. 3231 * 3232 * Messages purged from internal queues fail with the delivery report 3233 * error code set to ERR__PURGE_QUEUE, while purged messages that 3234 * are in-flight to or from the broker will fail with the error code set to 3235 * ERR__PURGE_INFLIGHT. 3236 * 3237 * @warning Purging messages that are in-flight to or from the broker 3238 * will ignore any sub-sequent acknowledgement for these messages 3239 * received from the broker, effectively making it impossible 3240 * for the application to know if the messages were successfully 3241 * produced or not. This may result in duplicate messages if the 3242 * application retries these messages at a later time. 3243 * 3244 * @remark This call may block for a short time while background thread 3245 * queues are purged. 3246 * 3247 * @returns ERR_NO_ERROR on success, 3248 * ERR__INVALID_ARG if the \p purge flags are invalid or unknown, 3249 * ERR__NOT_IMPLEMENTED if called on a non-producer client instance. 3250 */ 3251 virtual ErrorCode purge (int purge_flags) = 0; 3252 3253 /** 3254 * @brief RdKafka::Handle::purge() \p purge_flags 3255 */ 3256 enum { 3257 PURGE_QUEUE = 0x1, /**< Purge messages in internal queues */ 3258 3259 PURGE_INFLIGHT = 0x2, /*! Purge messages in-flight to or from the broker. 3260 * Purging these messages will void any future 3261 * acknowledgements from the broker, making it 3262 * impossible for the application to know if these 3263 * messages were successfully delivered or not. 3264 * Retrying these messages may lead to duplicates. */ 3265 3266 PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue 3267 * purging to finish. */ 3268 }; 3269 3270 /** 3271 * @name Transactional API 3272 * @{ 3273 * 3274 * Requires Kafka broker version v0.11.0 or later 3275 * 3276 * See the Transactional API documentation in rdkafka.h for more information. 3277 */ 3278 3279 /** 3280 * @brief Initialize transactions for the producer instance. 3281 * 3282 * @param timeout_ms The maximum time to block. On timeout the operation 3283 * may continue in the background, depending on state, 3284 * and it is okay to call init_transactions() again. 3285 * 3286 * @returns an RdKafka::Error object on error, or NULL on success. 3287 * Check whether the returned error object permits retrying 3288 * by calling RdKafka::Error::is_retriable(), or whether a fatal 3289 * error has been raised by calling RdKafka::Error::is_fatal(). 3290 * 3291 * @remark The returned error object (if not NULL) must be deleted. 3292 * 3293 * See rd_kafka_init_transactions() in rdkafka.h for more information. 3294 * 3295 */ 3296 virtual Error *init_transactions (int timeout_ms) = 0; 3297 3298 3299 /** 3300 * @brief init_transactions() must have been called successfully 3301 * (once) before this function is called. 3302 * 3303 * @returns an RdKafka::Error object on error, or NULL on success. 3304 * Check whether a fatal error has been raised by calling 3305 * RdKafka::Error::is_fatal_error(). 3306 * 3307 * @remark The returned error object (if not NULL) must be deleted. 3308 * 3309 * See rd_kafka_begin_transaction() in rdkafka.h for more information. 3310 */ 3311 virtual Error *begin_transaction () = 0; 3312 3313 /** 3314 * @brief Sends a list of topic partition offsets to the consumer group 3315 * coordinator for \p group_metadata, and marks the offsets as part 3316 * part of the current transaction. 3317 * These offsets will be considered committed only if the transaction 3318 * is committed successfully. 3319 * 3320 * The offsets should be the next message your application will 3321 * consume, 3322 * i.e., the last processed message's offset + 1 for each partition. 3323 * Either track the offsets manually during processing or use 3324 * RdKafka::KafkaConsumer::position() (on the consumer) to get the 3325 * current offsets for 3326 * the partitions assigned to the consumer. 3327 * 3328 * Use this method at the end of a consume-transform-produce loop prior 3329 * to committing the transaction with commit_transaction(). 3330 * 3331 * @param offsets List of offsets to commit to the consumer group upon 3332 * successful commit of the transaction. Offsets should be 3333 * the next message to consume, 3334 * e.g., last processed message + 1. 3335 * @param group_metadata The current consumer group metadata as returned by 3336 * RdKafka::KafkaConsumer::groupMetadata() on the consumer 3337 * instance the provided offsets were consumed from. 3338 * @param timeout_ms Maximum time allowed to register the 3339 * offsets on the broker. 3340 * 3341 * @remark This function must be called on the transactional producer 3342 * instance, not the consumer. 3343 * 3344 * @remark The consumer must disable auto commits 3345 * (set \c enable.auto.commit to false on the consumer). 3346 * 3347 * @returns an RdKafka::Error object on error, or NULL on success. 3348 * Check whether the returned error object permits retrying 3349 * by calling RdKafka::Error::is_retriable(), or whether an abortable 3350 * or fatal error has been raised by calling 3351 * RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal() 3352 * respectively. 3353 * 3354 * @remark The returned error object (if not NULL) must be deleted. 3355 * 3356 * See rd_kafka_send_offsets_to_transaction() in rdkafka.h for 3357 * more information. 3358 */ 3359 virtual Error *send_offsets_to_transaction ( 3360 const std::vector<TopicPartition*> &offsets, 3361 const ConsumerGroupMetadata *group_metadata, 3362 int timeout_ms) = 0; 3363 3364 /** 3365 * @brief Commit the current transaction as started with begin_transaction(). 3366 * 3367 * Any outstanding messages will be flushed (delivered) before actually 3368 * committing the transaction. 3369 * 3370 * @param timeout_ms The maximum time to block. On timeout the operation 3371 * may continue in the background, depending on state, 3372 * and it is okay to call this function again. 3373 * Pass -1 to use the remaining transaction timeout, 3374 * this is the recommended use. 3375 * 3376 * @remark It is strongly recommended to always pass -1 (remaining transaction 3377 * time) as the \p timeout_ms. Using other values risk internal 3378 * state desynchronization in case any of the underlying protocol 3379 * requests fail. 3380 * 3381 * @returns an RdKafka::Error object on error, or NULL on success. 3382 * Check whether the returned error object permits retrying 3383 * by calling RdKafka::Error::is_retriable(), or whether an abortable 3384 * or fatal error has been raised by calling 3385 * RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal() 3386 * respectively. 3387 * 3388 * @remark The returned error object (if not NULL) must be deleted. 3389 * 3390 * See rd_kafka_commit_transaction() in rdkafka.h for more information. 3391 */ 3392 virtual Error *commit_transaction (int timeout_ms) = 0; 3393 3394 /** 3395 * @brief Aborts the ongoing transaction. 3396 * 3397 * This function should also be used to recover from non-fatal abortable 3398 * transaction errors. 3399 * 3400 * Any outstanding messages will be purged and fail with 3401 * RdKafka::ERR__PURGE_INFLIGHT or RdKafka::ERR__PURGE_QUEUE. 3402 * See RdKafka::Producer::purge() for details. 3403 * 3404 * @param timeout_ms The maximum time to block. On timeout the operation 3405 * may continue in the background, depending on state, 3406 * and it is okay to call this function again. 3407 * Pass -1 to use the remaining transaction timeout, 3408 * this is the recommended use. 3409 * 3410 * @remark It is strongly recommended to always pass -1 (remaining transaction 3411 * time) as the \p timeout_ms. Using other values risk internal 3412 * state desynchronization in case any of the underlying protocol 3413 * requests fail. 3414 * 3415 * @returns an RdKafka::Error object on error, or NULL on success. 3416 * Check whether the returned error object permits retrying 3417 * by calling RdKafka::Error::is_retriable(), or whether a 3418 * fatal error has been raised by calling RdKafka::Error::is_fatal(). 3419 * 3420 * @remark The returned error object (if not NULL) must be deleted. 3421 * 3422 * See rd_kafka_abort_transaction() in rdkafka.h for more information. 3423 */ 3424 virtual Error *abort_transaction (int timeout_ms) = 0; 3425 3426 /**@}*/ 3427 }; 3428 3429 /**@}*/ 3430 3431 3432 /** 3433 * @name Metadata interface 3434 * @{ 3435 * 3436 */ 3437 3438 3439 /** 3440 * @brief Metadata: Broker information 3441 */ 3442 class BrokerMetadata { 3443 public: 3444 /** @returns Broker id */ 3445 virtual int32_t id() const = 0; 3446 3447 /** @returns Broker hostname */ 3448 virtual const std::string host() const = 0; 3449 3450 /** @returns Broker listening port */ 3451 virtual int port() const = 0; 3452 3453 virtual ~BrokerMetadata() = 0; 3454 }; 3455 3456 3457 3458 /** 3459 * @brief Metadata: Partition information 3460 */ 3461 class PartitionMetadata { 3462 public: 3463 /** @brief Replicas */ 3464 typedef std::vector<int32_t> ReplicasVector; 3465 /** @brief ISRs (In-Sync-Replicas) */ 3466 typedef std::vector<int32_t> ISRSVector; 3467 3468 /** @brief Replicas iterator */ 3469 typedef ReplicasVector::const_iterator ReplicasIterator; 3470 /** @brief ISRs iterator */ 3471 typedef ISRSVector::const_iterator ISRSIterator; 3472 3473 3474 /** @returns Partition id */ 3475 virtual int32_t id() const = 0; 3476 3477 /** @returns Partition error reported by broker */ 3478 virtual ErrorCode err() const = 0; 3479 3480 /** @returns Leader broker (id) for partition */ 3481 virtual int32_t leader() const = 0; 3482 3483 /** @returns Replica brokers */ 3484 virtual const std::vector<int32_t> *replicas() const = 0; 3485 3486 /** @returns In-Sync-Replica brokers 3487 * @warning The broker may return a cached/outdated list of ISRs. 3488 */ 3489 virtual const std::vector<int32_t> *isrs() const = 0; 3490 3491 virtual ~PartitionMetadata() = 0; 3492 }; 3493 3494 3495 3496 /** 3497 * @brief Metadata: Topic information 3498 */ 3499 class TopicMetadata { 3500 public: 3501 /** @brief Partitions */ 3502 typedef std::vector<const PartitionMetadata*> PartitionMetadataVector; 3503 /** @brief Partitions iterator */ 3504 typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator; 3505 3506 /** @returns Topic name */ 3507 virtual const std::string topic() const = 0; 3508 3509 /** @returns Partition list */ 3510 virtual const PartitionMetadataVector *partitions() const = 0; 3511 3512 /** @returns Topic error reported by broker */ 3513 virtual ErrorCode err() const = 0; 3514 3515 virtual ~TopicMetadata() = 0; 3516 }; 3517 3518 3519 /** 3520 * @brief Metadata container 3521 */ 3522 class Metadata { 3523 public: 3524 /** @brief Brokers */ 3525 typedef std::vector<const BrokerMetadata*> BrokerMetadataVector; 3526 /** @brief Topics */ 3527 typedef std::vector<const TopicMetadata*> TopicMetadataVector; 3528 3529 /** @brief Brokers iterator */ 3530 typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator; 3531 /** @brief Topics iterator */ 3532 typedef TopicMetadataVector::const_iterator TopicMetadataIterator; 3533 3534 3535 /** 3536 * @brief Broker list 3537 * @remark Ownership of the returned pointer is retained by the instance of 3538 * Metadata that is called. 3539 */ 3540 virtual const BrokerMetadataVector *brokers() const = 0; 3541 3542 /** 3543 * @brief Topic list 3544 * @remark Ownership of the returned pointer is retained by the instance of 3545 * Metadata that is called. 3546 */ 3547 virtual const TopicMetadataVector *topics() const = 0; 3548 3549 /** @brief Broker (id) originating this metadata */ 3550 virtual int32_t orig_broker_id() const = 0; 3551 3552 /** @brief Broker (name) originating this metadata */ 3553 virtual const std::string orig_broker_name() const = 0; 3554 3555 virtual ~Metadata() = 0; 3556 }; 3557 3558 /**@}*/ 3559 3560 } 3561 3562 3563 #endif /* _RDKAFKACPP_H_ */ 3564