1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
31 
32 /**
33  * @file rdkafkacpp.h
34  * @brief Apache Kafka C/C++ consumer and producer client library.
35  *
36  * rdkafkacpp.h contains the public C++ API for librdkafka.
37  * The API is documented in this file as comments prefixing the class,
38  * function, type, enum, define, etc.
39  * For more information, see the C interface in rdkafka.h and read the
40  * manual in INTRODUCTION.md.
41  * The C++ interface is STD C++ '03 compliant and adheres to the
42  * Google C++ Style Guide.
43 
44  * @sa For the C interface see rdkafka.h
45  *
46  * @tableofcontents
47  */
48 
49 /**@cond NO_DOC*/
50 #include <string>
51 #include <list>
52 #include <vector>
53 #include <cstdlib>
54 #include <cstring>
55 #include <stdint.h>
56 #include <sys/types.h>
57 
58 #ifdef _WIN32
59 #ifndef ssize_t
60 #ifndef _BASETSD_H_
61 #include <basetsd.h>
62 #endif
63 #ifndef _SSIZE_T_DEFINED
64 #define _SSIZE_T_DEFINED
65 typedef SSIZE_T ssize_t;
66 #endif
67 #endif
68 #undef RD_EXPORT
69 #ifdef LIBRDKAFKA_STATICLIB
70 #define RD_EXPORT
71 #else
72 #ifdef LIBRDKAFKACPP_EXPORTS
73 #define RD_EXPORT __declspec(dllexport)
74 #else
75 #define RD_EXPORT __declspec(dllimport)
76 #endif
77 #endif
78 #else
79 #define RD_EXPORT
80 #endif
81 
82 /**@endcond*/
83 
84 extern "C" {
85         /* Forward declarations */
86         struct rd_kafka_s;
87         struct rd_kafka_topic_s;
88         struct rd_kafka_message_s;
89         struct rd_kafka_conf_s;
90         struct rd_kafka_topic_conf_s;
91 }
92 
93 namespace RdKafka {
94 
95 /**
96  * @name Miscellaneous APIs
97  * @{
98  */
99 
100 /**
101  * @brief librdkafka version
102  *
103  * Interpreted as hex \c MM.mm.rr.xx:
104  *  - MM = Major
105  *  - mm = minor
106  *  - rr = revision
107  *  - xx = pre-release id (0xff is the final release)
108  *
109  * E.g.: \c 0x000801ff = 0.8.1
110  *
111  * @remark This value should only be used during compile time,
112  *         for runtime checks of version use RdKafka::version()
113  */
114 #define RD_KAFKA_VERSION  0x010700ff
115 
116 /**
117  * @brief Returns the librdkafka version as integer.
118  *
119  * @sa See RD_KAFKA_VERSION for how to parse the integer format.
120  */
121 RD_EXPORT
122 int          version ();
123 
124 /**
125  * @brief Returns the librdkafka version as string.
126  */
127 RD_EXPORT
128 std::string  version_str();
129 
130 /**
131  * @brief Returns a CSV list of the supported debug contexts
132  *        for use with Conf::Set("debug", ..).
133  */
134 RD_EXPORT
135 std::string get_debug_contexts();
136 
137 /**
138  * @brief Wait for all rd_kafka_t objects to be destroyed.
139  *
140  * @returns 0 if all kafka objects are now destroyed, or -1 if the
141  * timeout was reached.
142  * Since RdKafka handle deletion is an asynch operation the
143  * \p wait_destroyed() function can be used for applications where
144  * a clean shutdown is required.
145  */
146 RD_EXPORT
147 int          wait_destroyed(int timeout_ms);
148 
149 /**
150  * @brief Allocate memory using the same allocator librdkafka uses.
151  *
152  * This is typically an abstraction for the malloc(3) call and makes sure
153  * the application can use the same memory allocator as librdkafka for
154  * allocating pointers that are used by librdkafka.
155  *
156  * @remark Memory allocated by mem_malloc() must be freed using
157  *         mem_free().
158  */
159 RD_EXPORT
160 void *mem_malloc (size_t size);
161 
162 /**
163  * @brief Free pointer returned by librdkafka
164  *
165  * This is typically an abstraction for the free(3) call and makes sure
166  * the application can use the same memory allocator as librdkafka for
167  * freeing pointers returned by librdkafka.
168  *
169  * In standard setups it is usually not necessary to use this interface
170  * rather than the free(3) function.
171  *
172  * @remark mem_free() must only be used for pointers returned by APIs
173  *         that explicitly mention using this function for freeing.
174  */
175 RD_EXPORT
176 void mem_free (void *ptr);
177 
178 /**@}*/
179 
180 
181 
182 /**
183  * @name Constants, errors, types
184  * @{
185  *
186  *
187  */
188 
189 /**
190  * @brief Error codes.
191  *
192  * The negative error codes delimited by two underscores
193  * (\c _ERR__..) denotes errors internal to librdkafka and are
194  * displayed as \c \"Local: \<error string..\>\", while the error codes
195  * delimited by a single underscore (\c ERR_..) denote broker
196  * errors and are displayed as \c \"Broker: \<error string..\>\".
197  *
198  * @sa Use RdKafka::err2str() to translate an error code a human readable string
199  */
200 enum ErrorCode {
201 	/* Internal errors to rdkafka: */
202 	/** Begin internal error codes */
203 	ERR__BEGIN = -200,
204 	/** Received message is incorrect */
205 	ERR__BAD_MSG = -199,
206 	/** Bad/unknown compression */
207 	ERR__BAD_COMPRESSION = -198,
208 	/** Broker is going away */
209 	ERR__DESTROY = -197,
210 	/** Generic failure */
211 	ERR__FAIL = -196,
212 	/** Broker transport failure */
213 	ERR__TRANSPORT = -195,
214 	/** Critical system resource */
215 	ERR__CRIT_SYS_RESOURCE = -194,
216 	/** Failed to resolve broker */
217 	ERR__RESOLVE = -193,
218 	/** Produced message timed out*/
219 	ERR__MSG_TIMED_OUT = -192,
220 	/** Reached the end of the topic+partition queue on
221 	 *  the broker. Not really an error.
222 	 *  This event is disabled by default,
223 	 *  see the `enable.partition.eof` configuration property. */
224 	ERR__PARTITION_EOF = -191,
225 	/** Permanent: Partition does not exist in cluster. */
226 	ERR__UNKNOWN_PARTITION = -190,
227 	/** File or filesystem error */
228 	ERR__FS = -189,
229 	 /** Permanent: Topic does not exist in cluster. */
230 	ERR__UNKNOWN_TOPIC = -188,
231 	/** All broker connections are down. */
232 	ERR__ALL_BROKERS_DOWN = -187,
233 	/** Invalid argument, or invalid configuration */
234 	ERR__INVALID_ARG = -186,
235 	/** Operation timed out */
236 	ERR__TIMED_OUT = -185,
237 	/** Queue is full */
238 	ERR__QUEUE_FULL = -184,
239 	/** ISR count < required.acks */
240         ERR__ISR_INSUFF = -183,
241 	/** Broker node update */
242         ERR__NODE_UPDATE = -182,
243 	/** SSL error */
244 	ERR__SSL = -181,
245 	/** Waiting for coordinator to become available. */
246         ERR__WAIT_COORD = -180,
247 	/** Unknown client group */
248         ERR__UNKNOWN_GROUP = -179,
249 	/** Operation in progress */
250         ERR__IN_PROGRESS = -178,
251 	 /** Previous operation in progress, wait for it to finish. */
252         ERR__PREV_IN_PROGRESS = -177,
253 	 /** This operation would interfere with an existing subscription */
254         ERR__EXISTING_SUBSCRIPTION = -176,
255 	/** Assigned partitions (rebalance_cb) */
256         ERR__ASSIGN_PARTITIONS = -175,
257 	/** Revoked partitions (rebalance_cb) */
258         ERR__REVOKE_PARTITIONS = -174,
259 	/** Conflicting use */
260         ERR__CONFLICT = -173,
261 	/** Wrong state */
262         ERR__STATE = -172,
263 	/** Unknown protocol */
264         ERR__UNKNOWN_PROTOCOL = -171,
265 	/** Not implemented */
266         ERR__NOT_IMPLEMENTED = -170,
267 	/** Authentication failure*/
268 	ERR__AUTHENTICATION = -169,
269 	/** No stored offset */
270 	ERR__NO_OFFSET = -168,
271 	/** Outdated */
272 	ERR__OUTDATED = -167,
273 	/** Timed out in queue */
274 	ERR__TIMED_OUT_QUEUE = -166,
275         /** Feature not supported by broker */
276         ERR__UNSUPPORTED_FEATURE = -165,
277         /** Awaiting cache update */
278         ERR__WAIT_CACHE = -164,
279         /** Operation interrupted */
280         ERR__INTR = -163,
281         /** Key serialization error */
282         ERR__KEY_SERIALIZATION = -162,
283         /** Value serialization error */
284         ERR__VALUE_SERIALIZATION = -161,
285         /** Key deserialization error */
286         ERR__KEY_DESERIALIZATION = -160,
287         /** Value deserialization error */
288         ERR__VALUE_DESERIALIZATION = -159,
289         /** Partial response */
290         ERR__PARTIAL = -158,
291         /** Modification attempted on read-only object */
292         ERR__READ_ONLY = -157,
293         /** No such entry / item not found */
294         ERR__NOENT = -156,
295         /** Read underflow */
296         ERR__UNDERFLOW = -155,
297         /** Invalid type */
298         ERR__INVALID_TYPE = -154,
299         /** Retry operation */
300         ERR__RETRY = -153,
301         /** Purged in queue */
302         ERR__PURGE_QUEUE = -152,
303         /** Purged in flight */
304         ERR__PURGE_INFLIGHT = -151,
305         /** Fatal error: see RdKafka::Handle::fatal_error() */
306         ERR__FATAL = -150,
307         /** Inconsistent state */
308         ERR__INCONSISTENT = -149,
309         /** Gap-less ordering would not be guaranteed if proceeding */
310         ERR__GAPLESS_GUARANTEE = -148,
311         /** Maximum poll interval exceeded */
312         ERR__MAX_POLL_EXCEEDED = -147,
313         /** Unknown broker */
314         ERR__UNKNOWN_BROKER = -146,
315         /** Functionality not configured */
316         ERR__NOT_CONFIGURED = -145,
317         /** Instance has been fenced */
318         ERR__FENCED = -144,
319         /** Application generated error */
320         ERR__APPLICATION = -143,
321         /** Assignment lost */
322         ERR__ASSIGNMENT_LOST = -142,
323         /** No operation performed */
324         ERR__NOOP = -141,
325         /** No offset to automatically reset to */
326         ERR__AUTO_OFFSET_RESET = -140,
327 
328         /** End internal error codes */
329 	ERR__END = -100,
330 
331 	/* Kafka broker errors: */
332 	/** Unknown broker error */
333 	ERR_UNKNOWN = -1,
334 	/** Success */
335 	ERR_NO_ERROR = 0,
336 	/** Offset out of range */
337 	ERR_OFFSET_OUT_OF_RANGE = 1,
338 	/** Invalid message */
339 	ERR_INVALID_MSG = 2,
340 	/** Unknown topic or partition */
341 	ERR_UNKNOWN_TOPIC_OR_PART = 3,
342 	/** Invalid message size */
343 	ERR_INVALID_MSG_SIZE = 4,
344 	/** Leader not available */
345 	ERR_LEADER_NOT_AVAILABLE = 5,
346 	/** Not leader for partition */
347 	ERR_NOT_LEADER_FOR_PARTITION = 6,
348 	/** Request timed out */
349 	ERR_REQUEST_TIMED_OUT = 7,
350 	/** Broker not available */
351 	ERR_BROKER_NOT_AVAILABLE = 8,
352 	/** Replica not available */
353 	ERR_REPLICA_NOT_AVAILABLE = 9,
354 	/** Message size too large */
355 	ERR_MSG_SIZE_TOO_LARGE = 10,
356 	/** StaleControllerEpochCode */
357 	ERR_STALE_CTRL_EPOCH = 11,
358 	/** Offset metadata string too large */
359 	ERR_OFFSET_METADATA_TOO_LARGE = 12,
360 	/** Broker disconnected before response received */
361 	ERR_NETWORK_EXCEPTION = 13,
362         /** Coordinator load in progress */
363         ERR_COORDINATOR_LOAD_IN_PROGRESS = 14,
364         /** Group coordinator load in progress */
365 #define ERR_GROUP_LOAD_IN_PROGRESS           ERR_COORDINATOR_LOAD_IN_PROGRESS
366         /** Coordinator not available */
367         ERR_COORDINATOR_NOT_AVAILABLE = 15,
368         /** Group coordinator not available */
369 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE  ERR_COORDINATOR_NOT_AVAILABLE
370         /** Not coordinator */
371         ERR_NOT_COORDINATOR = 16,
372         /** Not coordinator for group */
373 #define ERR_NOT_COORDINATOR_FOR_GROUP        ERR_NOT_COORDINATOR
374 	/** Invalid topic */
375         ERR_TOPIC_EXCEPTION = 17,
376 	/** Message batch larger than configured server segment size */
377         ERR_RECORD_LIST_TOO_LARGE = 18,
378 	/** Not enough in-sync replicas */
379         ERR_NOT_ENOUGH_REPLICAS = 19,
380 	/** Message(s) written to insufficient number of in-sync replicas */
381         ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
382 	/** Invalid required acks value */
383         ERR_INVALID_REQUIRED_ACKS = 21,
384 	/** Specified group generation id is not valid */
385         ERR_ILLEGAL_GENERATION = 22,
386 	/** Inconsistent group protocol */
387         ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
388 	/** Invalid group.id */
389 	ERR_INVALID_GROUP_ID = 24,
390 	/** Unknown member */
391         ERR_UNKNOWN_MEMBER_ID = 25,
392 	/** Invalid session timeout */
393         ERR_INVALID_SESSION_TIMEOUT = 26,
394 	/** Group rebalance in progress */
395 	ERR_REBALANCE_IN_PROGRESS = 27,
396 	/** Commit offset data size is not valid */
397         ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
398 	/** Topic authorization failed */
399         ERR_TOPIC_AUTHORIZATION_FAILED = 29,
400 	/** Group authorization failed */
401 	ERR_GROUP_AUTHORIZATION_FAILED = 30,
402 	/** Cluster authorization failed */
403 	ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
404         /** Invalid timestamp */
405         ERR_INVALID_TIMESTAMP = 32,
406         /** Unsupported SASL mechanism */
407         ERR_UNSUPPORTED_SASL_MECHANISM = 33,
408         /** Illegal SASL state */
409         ERR_ILLEGAL_SASL_STATE = 34,
410         /** Unuspported version */
411         ERR_UNSUPPORTED_VERSION = 35,
412         /** Topic already exists */
413         ERR_TOPIC_ALREADY_EXISTS = 36,
414         /** Invalid number of partitions */
415         ERR_INVALID_PARTITIONS = 37,
416         /** Invalid replication factor */
417         ERR_INVALID_REPLICATION_FACTOR = 38,
418         /** Invalid replica assignment */
419         ERR_INVALID_REPLICA_ASSIGNMENT = 39,
420         /** Invalid config */
421         ERR_INVALID_CONFIG = 40,
422         /** Not controller for cluster */
423         ERR_NOT_CONTROLLER = 41,
424         /** Invalid request */
425         ERR_INVALID_REQUEST = 42,
426         /** Message format on broker does not support request */
427         ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
428         /** Policy violation */
429         ERR_POLICY_VIOLATION = 44,
430         /** Broker received an out of order sequence number */
431         ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
432         /** Broker received a duplicate sequence number */
433         ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
434         /** Producer attempted an operation with an old epoch */
435         ERR_INVALID_PRODUCER_EPOCH = 47,
436         /** Producer attempted a transactional operation in an invalid state */
437         ERR_INVALID_TXN_STATE = 48,
438         /** Producer attempted to use a producer id which is not
439          *  currently assigned to its transactional id */
440         ERR_INVALID_PRODUCER_ID_MAPPING = 49,
441         /** Transaction timeout is larger than the maximum
442          *  value allowed by the broker's max.transaction.timeout.ms */
443         ERR_INVALID_TRANSACTION_TIMEOUT = 50,
444         /** Producer attempted to update a transaction while another
445          *  concurrent operation on the same transaction was ongoing */
446         ERR_CONCURRENT_TRANSACTIONS = 51,
447         /** Indicates that the transaction coordinator sending a
448          *  WriteTxnMarker is no longer the current coordinator for a
449          *  given producer */
450         ERR_TRANSACTION_COORDINATOR_FENCED = 52,
451         /** Transactional Id authorization failed */
452         ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
453         /** Security features are disabled */
454         ERR_SECURITY_DISABLED = 54,
455         /** Operation not attempted */
456         ERR_OPERATION_NOT_ATTEMPTED = 55,
457         /** Disk error when trying to access log file on the disk */
458         ERR_KAFKA_STORAGE_ERROR = 56,
459         /** The user-specified log directory is not found in the broker config */
460         ERR_LOG_DIR_NOT_FOUND = 57,
461         /** SASL Authentication failed */
462         ERR_SASL_AUTHENTICATION_FAILED = 58,
463         /** Unknown Producer Id */
464         ERR_UNKNOWN_PRODUCER_ID = 59,
465         /** Partition reassignment is in progress */
466         ERR_REASSIGNMENT_IN_PROGRESS = 60,
467         /** Delegation Token feature is not enabled */
468         ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61,
469         /** Delegation Token is not found on server */
470         ERR_DELEGATION_TOKEN_NOT_FOUND = 62,
471         /** Specified Principal is not valid Owner/Renewer */
472         ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63,
473         /** Delegation Token requests are not allowed on this connection */
474         ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64,
475         /** Delegation Token authorization failed */
476         ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65,
477         /** Delegation Token is expired */
478         ERR_DELEGATION_TOKEN_EXPIRED = 66,
479         /** Supplied principalType is not supported */
480         ERR_INVALID_PRINCIPAL_TYPE = 67,
481         /** The group is not empty */
482         ERR_NON_EMPTY_GROUP = 68,
483         /** The group id does not exist */
484         ERR_GROUP_ID_NOT_FOUND = 69,
485         /** The fetch session ID was not found */
486         ERR_FETCH_SESSION_ID_NOT_FOUND = 70,
487         /** The fetch session epoch is invalid */
488         ERR_INVALID_FETCH_SESSION_EPOCH = 71,
489         /** No matching listener */
490         ERR_LISTENER_NOT_FOUND = 72,
491         /** Topic deletion is disabled */
492         ERR_TOPIC_DELETION_DISABLED = 73,
493         /** Leader epoch is older than broker epoch */
494         ERR_FENCED_LEADER_EPOCH = 74,
495         /** Leader epoch is newer than broker epoch */
496         ERR_UNKNOWN_LEADER_EPOCH = 75,
497         /** Unsupported compression type */
498         ERR_UNSUPPORTED_COMPRESSION_TYPE = 76,
499         /** Broker epoch has changed */
500         ERR_STALE_BROKER_EPOCH = 77,
501         /** Leader high watermark is not caught up */
502         ERR_OFFSET_NOT_AVAILABLE = 78,
503         /** Group member needs a valid member ID */
504         ERR_MEMBER_ID_REQUIRED = 79,
505         /** Preferred leader was not available */
506         ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80,
507         /** Consumer group has reached maximum size */
508         ERR_GROUP_MAX_SIZE_REACHED = 81,
509         /** Static consumer fenced by other consumer with same
510          * group.instance.id. */
511         ERR_FENCED_INSTANCE_ID = 82,
512         /** Eligible partition leaders are not available */
513         ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE = 83,
514         /** Leader election not needed for topic partition */
515         ERR_ELECTION_NOT_NEEDED = 84,
516         /** No partition reassignment is in progress */
517         ERR_NO_REASSIGNMENT_IN_PROGRESS = 85,
518         /** Deleting offsets of a topic while the consumer group is
519          *  subscribed to it */
520         ERR_GROUP_SUBSCRIBED_TO_TOPIC = 86,
521         /** Broker failed to validate record */
522         ERR_INVALID_RECORD = 87,
523         /** There are unstable offsets that need to be cleared */
524         ERR_UNSTABLE_OFFSET_COMMIT = 88,
525         /** Throttling quota has been exceeded */
526         ERR_THROTTLING_QUOTA_EXCEEDED = 89,
527         /** There is a newer producer with the same transactionalId
528          *  which fences the current one */
529         ERR_PRODUCER_FENCED = 90,
530         /** Request illegally referred to resource that does not exist */
531         ERR_RESOURCE_NOT_FOUND = 91,
532         /** Request illegally referred to the same resource twice */
533         ERR_DUPLICATE_RESOURCE = 92,
534         /** Requested credential would not meet criteria for acceptability */
535         ERR_UNACCEPTABLE_CREDENTIAL = 93,
536         /** Indicates that the either the sender or recipient of a
537          *  voter-only request is not one of the expected voters */
538         ERR_INCONSISTENT_VOTER_SET = 94,
539         /** Invalid update version */
540         ERR_INVALID_UPDATE_VERSION = 95,
541         /** Unable to update finalized features due to server error */
542         ERR_FEATURE_UPDATE_FAILED = 96,
543         /** Request principal deserialization failed during forwarding */
544         ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97
545 };
546 
547 
548 /**
549  * @brief Returns a human readable representation of a kafka error.
550  */
551 RD_EXPORT
552 std::string  err2str(RdKafka::ErrorCode err);
553 
554 
555 
556 /**
557  * @enum CertificateType
558  * @brief SSL certificate types
559  */
560 enum CertificateType {
561   CERT_PUBLIC_KEY,   /**< Client's public key */
562   CERT_PRIVATE_KEY,  /**< Client's private key */
563   CERT_CA,           /**< CA certificate */
564   CERT__CNT
565 };
566 
567 /**
568  * @enum CertificateEncoding
569  * @brief SSL certificate encoding
570  */
571 enum CertificateEncoding {
572   CERT_ENC_PKCS12,  /**< PKCS#12 */
573   CERT_ENC_DER,     /**< DER / binary X.509 ASN1 */
574   CERT_ENC_PEM,     /**< PEM */
575   CERT_ENC__CNT
576 };
577 
578 /**@} */
579 
580 
581 
582 /**@cond NO_DOC*/
583 /* Forward declarations */
584 class Handle;
585 class Producer;
586 class Message;
587 class Headers;
588 class Queue;
589 class Event;
590 class Topic;
591 class TopicPartition;
592 class Metadata;
593 class KafkaConsumer;
594 /**@endcond*/
595 
596 
597 /**
598  * @name Error class
599  * @{
600  *
601  */
602 
603 /**
604  * @brief The Error class is used as a return value from APIs to propagate
605  *        an error. The error consists of an error code which is to be used
606  *        programatically, an error string for showing to the user,
607  *        and various error flags that can be used programmatically to decide
608  *        how to handle the error; e.g., should the operation be retried,
609  *        was it a fatal error, etc.
610  *
611  * Error objects must be deleted explicitly to free its resources.
612  */
613 class RD_EXPORT Error {
614  public:
615 
616  /**
617   * @brief Create error object.
618   */
619  static Error *create (ErrorCode code, const std::string *errstr);
620 
621  virtual ~Error () { }
622 
623  /*
624   * Error accessor methods
625   */
626 
627  /**
628   * @returns the error code, e.g., RdKafka::ERR_UNKNOWN_MEMBER_ID.
629   */
630  virtual ErrorCode code () const = 0;
631 
632  /**
633   * @returns the error code name, e.g, "ERR_UNKNOWN_MEMBER_ID".
634   */
635  virtual std::string name () const = 0;
636 
637   /**
638    * @returns a human readable error string.
639    */
640  virtual std::string str () const = 0;
641 
642  /**
643   * @returns true if the error is a fatal error, indicating that the client
644   *          instance is no longer usable, else false.
645   */
646  virtual bool is_fatal () const = 0;
647 
648  /**
649   * @returns true if the operation may be retried, else false.
650   */
651  virtual bool is_retriable () const = 0;
652 
653  /**
654   * @returns true if the error is an abortable transaction error in which case
655   *          the application must call RdKafka::Producer::abort_transaction()
656   *          and start a new transaction with
657   *          RdKafka::Producer::begin_transaction() if it wishes to proceed
658   *          with transactions.
659   *          Else returns false.
660   *
661   * @remark The return value of this method is only valid for errors returned
662   *         by the transactional API.
663   */
664  virtual bool txn_requires_abort () const = 0;
665 };
666 
667 /**@}*/
668 
669 
670 /**
671  * @name Callback classes
672  * @{
673  *
674  *
675  * librdkafka uses (optional) callbacks to propagate information and
676  * delegate decisions to the application logic.
677  *
678  * An application must call RdKafka::poll() at regular intervals to
679  * serve queued callbacks.
680  */
681 
682 
683 /**
684  * @brief Delivery Report callback class
685  *
686  * The delivery report callback will be called once for each message
687  * accepted by RdKafka::Producer::produce() (et.al) with
688  * RdKafka::Message::err() set to indicate the result of the produce request.
689  *
690  * The callback is called when a message is succesfully produced or
691  * if librdkafka encountered a permanent failure, or the retry counter for
692  * temporary errors has been exhausted.
693  *
694  * An application must call RdKafka::poll() at regular intervals to
695  * serve queued delivery report callbacks.
696 
697  */
698 class RD_EXPORT DeliveryReportCb {
699  public:
700   /**
701    * @brief Delivery report callback.
702    */
703   virtual void dr_cb (Message &message) = 0;
704 
705   virtual ~DeliveryReportCb() { }
706 };
707 
708 
709 /**
710  * @brief SASL/OAUTHBEARER token refresh callback class
711  *
712  * The SASL/OAUTHBEARER token refresh callback is triggered via RdKafka::poll()
713  * whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved,
714  * typically based on the configuration defined in \c sasl.oauthbearer.config.
715  *
716  * The \c oauthbearer_config argument is the value of the
717  * \c sasl.oauthbearer.config configuration property.
718  *
719  * The callback should invoke RdKafka::Handle::oauthbearer_set_token() or
720  * RdKafka::Handle::oauthbearer_set_token_failure() to indicate success or
721  * failure, respectively.
722  *
723  * The refresh operation is eventable and may be received when an event
724  * callback handler is set with an event type of
725  * \c RdKafka::Event::EVENT_OAUTHBEARER_TOKEN_REFRESH.
726  *
727  * Note that before any SASL/OAUTHBEARER broker connection can succeed the
728  * application must call RdKafka::Handle::oauthbearer_set_token() once -- either
729  * directly or, more typically, by invoking RdKafka::poll() -- in order to
730  * cause retrieval of an initial token to occur.
731  *
732  * An application must call RdKafka::poll() at regular intervals to
733  * serve queued SASL/OAUTHBEARER token refresh callbacks (when
734  * OAUTHBEARER is the SASL mechanism).
735  */
736 class RD_EXPORT OAuthBearerTokenRefreshCb {
737  public:
738   /**
739    * @brief SASL/OAUTHBEARER token refresh callback class.
740    *
741    * @param handle The RdKafka::Handle which requires a refreshed token.
742    * @param oauthbearer_config The value of the
743    * \p sasl.oauthbearer.config configuration property for \p handle.
744    */
745   virtual void oauthbearer_token_refresh_cb (RdKafka::Handle* handle,
746                                              const std::string &oauthbearer_config) = 0;
747 
748   virtual ~OAuthBearerTokenRefreshCb() { }
749 };
750 
751 
752 /**
753  * @brief Partitioner callback class
754  *
755  * Generic partitioner callback class for implementing custom partitioners.
756  *
757  * @sa RdKafka::Conf::set() \c "partitioner_cb"
758  */
759 class RD_EXPORT PartitionerCb {
760  public:
761   /**
762    * @brief Partitioner callback
763    *
764    * Return the partition to use for \p key in \p topic.
765    *
766    * The \p msg_opaque is the same \p msg_opaque provided in the
767    * RdKafka::Producer::produce() call.
768    *
769    * @remark \p key may be NULL or the empty.
770    *
771    * @returns Must return a value between 0 and \p partition_cnt (non-inclusive).
772    *          May return RD_KAFKA_PARTITION_UA (-1) if partitioning failed.
773    *
774    * @sa The callback may use RdKafka::Topic::partition_available() to check
775    *     if a partition has an active leader broker.
776    */
777   virtual int32_t partitioner_cb (const Topic *topic,
778                                   const std::string *key,
779                                   int32_t partition_cnt,
780                                   void *msg_opaque) = 0;
781 
782   virtual ~PartitionerCb() { }
783 };
784 
785 /**
786  * @brief  Variant partitioner with key pointer
787  *
788  */
789 class PartitionerKeyPointerCb {
790  public:
791   /**
792    * @brief Variant partitioner callback that gets \p key as pointer and length
793    *        instead of as a const std::string *.
794    *
795    * @remark \p key may be NULL or have \p key_len 0.
796    *
797    * @sa See RdKafka::PartitionerCb::partitioner_cb() for exact semantics
798    */
799   virtual int32_t partitioner_cb (const Topic *topic,
800                                   const void *key,
801                                   size_t key_len,
802                                   int32_t partition_cnt,
803                                   void *msg_opaque) = 0;
804 
805   virtual ~PartitionerKeyPointerCb() { }
806 };
807 
808 
809 
810 /**
811  * @brief Event callback class
812  *
813  * Events are a generic interface for propagating errors, statistics, logs, etc
814  * from librdkafka to the application.
815  *
816  * @sa RdKafka::Event
817  */
818 class RD_EXPORT EventCb {
819  public:
820   /**
821    * @brief Event callback
822    *
823    * @sa RdKafka::Event
824    */
825   virtual void event_cb (Event &event) = 0;
826 
827   virtual ~EventCb() { }
828 };
829 
830 
831 /**
832  * @brief Event object class as passed to the EventCb callback.
833  */
834 class RD_EXPORT Event {
835  public:
836   /** @brief Event type */
837   enum Type {
838     EVENT_ERROR,     /**< Event is an error condition */
839     EVENT_STATS,     /**< Event is a statistics JSON document */
840     EVENT_LOG,       /**< Event is a log message */
841     EVENT_THROTTLE   /**< Event is a throttle level signaling from the broker */
842   };
843 
844   /** @brief EVENT_LOG severities (conforms to syslog(3) severities) */
845   enum Severity {
846     EVENT_SEVERITY_EMERG = 0,
847     EVENT_SEVERITY_ALERT = 1,
848     EVENT_SEVERITY_CRITICAL = 2,
849     EVENT_SEVERITY_ERROR = 3,
850     EVENT_SEVERITY_WARNING = 4,
851     EVENT_SEVERITY_NOTICE = 5,
852     EVENT_SEVERITY_INFO = 6,
853     EVENT_SEVERITY_DEBUG = 7
854   };
855 
856   virtual ~Event () { }
857 
858   /*
859    * Event Accessor methods
860    */
861 
862   /**
863    * @returns The event type
864    * @remark Applies to all event types
865    */
866   virtual Type        type () const = 0;
867 
868   /**
869    * @returns Event error, if any.
870    * @remark Applies to all event types except THROTTLE
871    */
872   virtual ErrorCode   err () const = 0;
873 
874   /**
875    * @returns Log severity level.
876    * @remark Applies to LOG event type.
877    */
878   virtual Severity    severity () const = 0;
879 
880   /**
881    * @returns Log facility string.
882    * @remark Applies to LOG event type.
883    */
884   virtual std::string fac () const = 0;
885 
886   /**
887    * @returns Log message string.
888    *
889    * \c EVENT_LOG: Log message string.
890    * \c EVENT_STATS: JSON object (as string).
891    *
892    * @remark Applies to LOG event type.
893    */
894   virtual std::string str () const = 0;
895 
896   /**
897    * @returns Throttle time in milliseconds.
898    * @remark Applies to THROTTLE event type.
899    */
900   virtual int         throttle_time () const = 0;
901 
902   /**
903    * @returns Throttling broker's name.
904    * @remark Applies to THROTTLE event type.
905    */
906   virtual std::string broker_name () const = 0;
907 
908   /**
909    * @returns Throttling broker's id.
910    * @remark Applies to THROTTLE event type.
911    */
912   virtual int         broker_id () const = 0;
913 
914 
915   /**
916    * @returns true if this is a fatal error.
917    * @remark Applies to ERROR event type.
918    * @sa RdKafka::Handle::fatal_error()
919    */
920   virtual bool        fatal () const = 0;
921 };
922 
923 
924 
925 /**
926  * @brief Consume callback class
927  */
928 class RD_EXPORT ConsumeCb {
929  public:
930   /**
931    * @brief The consume callback is used with
932    *        RdKafka::Consumer::consume_callback()
933    *        methods and will be called for each consumed \p message.
934    *
935    * The callback interface is optional but provides increased performance.
936    */
937   virtual void consume_cb (Message &message, void *opaque) = 0;
938 
939   virtual ~ConsumeCb() { }
940 };
941 
942 
943 /**
944  * @brief \b KafkaConsumer: Rebalance callback class
945  */
946 class RD_EXPORT RebalanceCb {
947 public:
948   /**
949    * @brief Group rebalance callback for use with RdKafka::KafkaConsumer
950    *
951    * Registering a \p rebalance_cb turns off librdkafka's automatic
952    * partition assignment/revocation and instead delegates that responsibility
953    * to the application's \p rebalance_cb.
954    *
955    * The rebalance callback is responsible for updating librdkafka's
956    * assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS
957    * and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle
958    * arbitrary rebalancing failures where \p err is neither of those.
959    * @remark In this latter case (arbitrary error), the application must
960    *         call unassign() to synchronize state.
961    *
962    * For eager/non-cooperative `partition.assignment.strategy` assignors,
963    * such as `range` and `roundrobin`, the application must use
964    * assign assign() to set and unassign() to clear the entire assignment.
965    * For the cooperative assignors, such as `cooperative-sticky`, the
966    * application must use incremental_assign() for ERR__ASSIGN_PARTITIONS and
967    * incremental_unassign() for ERR__REVOKE_PARTITIONS.
968    *
969    * Without a rebalance callback this is done automatically by librdkafka
970    * but registering a rebalance callback gives the application flexibility
971    * in performing other operations along with the assinging/revocation,
972    * such as fetching offsets from an alternate location (on assign)
973    * or manually committing offsets (on revoke).
974    *
975    * @sa RdKafka::KafkaConsumer::assign()
976    * @sa RdKafka::KafkaConsumer::incremental_assign()
977    * @sa RdKafka::KafkaConsumer::incremental_unassign()
978    * @sa RdKafka::KafkaConsumer::assignment_lost()
979    * @sa RdKafka::KafkaConsumer::rebalance_protocol()
980    *
981    * The following example show's the application's responsibilities:
982    * @code
983    *    class MyRebalanceCb : public RdKafka::RebalanceCb {
984    *     public:
985    *      void rebalance_cb (RdKafka::KafkaConsumer *consumer,
986    *                    RdKafka::ErrorCode err,
987    *                    std::vector<RdKafka::TopicPartition*> &partitions) {
988    *         if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
989    *           // application may load offets from arbitrary external
990    *           // storage here and update \p partitions
991    *           if (consumer->rebalance_protocol() == "COOPERATIVE")
992    *             consumer->incremental_assign(partitions);
993    *           else
994    *             consumer->assign(partitions);
995    *
996    *         } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
997    *           // Application may commit offsets manually here
998    *           // if auto.commit.enable=false
999    *           if (consumer->rebalance_protocol() == "COOPERATIVE")
1000    *             consumer->incremental_unassign(partitions);
1001    *           else
1002    *             consumer->unassign();
1003    *
1004    *         } else {
1005    *           std::cerr << "Rebalancing error: " <<
1006    *                        RdKafka::err2str(err) << std::endl;
1007    *           consumer->unassign();
1008    *         }
1009    *     }
1010    *  }
1011    * @endcode
1012    *
1013    * @remark The above example lacks error handling for assign calls, see
1014    *         the examples/ directory.
1015    */
1016  virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer,
1017                             RdKafka::ErrorCode err,
1018                             std::vector<TopicPartition*>&partitions) = 0;
1019 
1020  virtual ~RebalanceCb() { }
1021 };
1022 
1023 
1024 /**
1025  * @brief Offset Commit callback class
1026  */
1027 class RD_EXPORT OffsetCommitCb {
1028 public:
1029   /**
1030    * @brief Set offset commit callback for use with consumer groups
1031    *
1032    * The results of automatic or manual offset commits will be scheduled
1033    * for this callback and is served by RdKafka::KafkaConsumer::consume().
1034    *
1035    * If no partitions had valid offsets to commit this callback will be called
1036    * with \p err == ERR__NO_OFFSET which is not to be considered an error.
1037    *
1038    * The \p offsets list contains per-partition information:
1039    *   - \c topic      The topic committed
1040    *   - \c partition  The partition committed
1041    *   - \c offset:    Committed offset (attempted)
1042    *   - \c err:       Commit error
1043    */
1044   virtual void offset_commit_cb(RdKafka::ErrorCode err,
1045                                 std::vector<TopicPartition*>&offsets) = 0;
1046 
1047   virtual ~OffsetCommitCb() { }
1048 };
1049 
1050 
1051 
1052 /**
1053  * @brief SSL broker certificate verification class.
1054  *
1055  * @remark Class instance must outlive the RdKafka client instance.
1056  */
1057 class RD_EXPORT SslCertificateVerifyCb {
1058 public:
1059   /**
1060    * @brief SSL broker certificate verification callback.
1061    *
1062    * The verification callback is triggered from internal librdkafka threads
1063    * upon connecting to a broker. On each connection attempt the callback
1064    * will be called for each certificate in the broker's certificate chain,
1065    * starting at the root certification, as long as the application callback
1066    * returns 1 (valid certificate).
1067    *
1068    * \p broker_name and \p broker_id correspond to the broker the connection
1069    * is being made to.
1070    * The \c x509_error argument indicates if OpenSSL's verification of
1071    * the certificate succeed (0) or failed (an OpenSSL error code).
1072    * The application may set the SSL context error code by returning 0
1073    * from the verify callback and providing a non-zero SSL context error code
1074    * in \p x509_error.
1075    * If the verify callback sets \p x509_error to 0, returns 1, and the
1076    * original \p x509_error was non-zero, the error on the SSL context will
1077    * be cleared.
1078    * \p x509_error is always a valid pointer to an int.
1079    *
1080    * \p depth is the depth of the current certificate in the chain, starting
1081    * at the root certificate.
1082    *
1083    * The certificate itself is passed in binary DER format in \p buf of
1084    * size \p size.
1085    *
1086    * The callback must 1 if verification succeeds, or 0 if verification fails
1087    * and write a human-readable error message
1088    * to \p errstr.
1089    *
1090    * @warning This callback will be called from internal librdkafka threads.
1091    *
1092    * @remark See <openssl/x509_vfy.h> in the OpenSSL source distribution
1093    *         for a list of \p x509_error codes.
1094    */
1095   virtual bool ssl_cert_verify_cb (const std::string &broker_name,
1096                                    int32_t broker_id,
1097                                    int *x509_error,
1098                                    int depth,
1099                                    const char *buf, size_t size,
1100                                    std::string &errstr) = 0;
1101 
1102   virtual ~SslCertificateVerifyCb() {}
1103 };
1104 
1105 
1106 /**
1107  * @brief \b Portability: SocketCb callback class
1108  *
1109  */
1110 class RD_EXPORT SocketCb {
1111  public:
1112   /**
1113    * @brief Socket callback
1114    *
1115    * The socket callback is responsible for opening a socket
1116    * according to the supplied \p domain, \p type and \p protocol.
1117    * The socket shall be created with \c CLOEXEC set in a racefree fashion, if
1118    * possible.
1119    *
1120    * It is typically not required to register an alternative socket
1121    * implementation
1122    *
1123    * @returns The socket file descriptor or -1 on error (\c errno must be set)
1124    */
1125   virtual int socket_cb (int domain, int type, int protocol) = 0;
1126 
1127   virtual ~SocketCb() { }
1128 };
1129 
1130 
1131 /**
1132  * @brief \b Portability: OpenCb callback class
1133  *
1134  */
1135 class RD_EXPORT OpenCb {
1136  public:
1137   /**
1138    * @brief Open callback
1139    * The open callback is responsible for opening the file specified by
1140    * \p pathname, using \p flags and \p mode.
1141    * The file shall be opened with \c CLOEXEC set in a racefree fashion, if
1142    * possible.
1143    *
1144    * It is typically not required to register an alternative open implementation
1145    *
1146    * @remark Not currently available on native Win32
1147    */
1148   virtual int open_cb (const std::string &path, int flags, int mode) = 0;
1149 
1150   virtual ~OpenCb() { }
1151 };
1152 
1153 
1154 /**@}*/
1155 
1156 
1157 
1158 
1159 /**
1160  * @name Configuration interface
1161  * @{
1162  *
1163  */
1164 
1165 /**
1166  * @brief Configuration interface
1167  *
1168  * Holds either global or topic configuration that are passed to
1169  * RdKafka::Consumer::create(), RdKafka::Producer::create(),
1170  * RdKafka::KafkaConsumer::create(), etc.
1171  *
1172  * @sa CONFIGURATION.md for the full list of supported properties.
1173  */
1174 class RD_EXPORT Conf {
1175  public:
1176   /**
1177    * @brief Configuration object type
1178    */
1179   enum ConfType {
1180     CONF_GLOBAL, /**< Global configuration */
1181     CONF_TOPIC   /**< Topic specific configuration */
1182   };
1183 
1184   /**
1185    * @brief RdKafka::Conf::Set() result code
1186    */
1187   enum ConfResult {
1188     CONF_UNKNOWN = -2,  /**< Unknown configuration property */
1189     CONF_INVALID = -1,  /**< Invalid configuration value */
1190     CONF_OK = 0         /**< Configuration property was succesfully set */
1191   };
1192 
1193 
1194   /**
1195    * @brief Create configuration object
1196    */
1197   static Conf *create (ConfType type);
1198 
1199   virtual ~Conf () { }
1200 
1201   /**
1202    * @brief Set configuration property \p name to value \p value.
1203    *
1204    * Fallthrough:
1205    * Topic-level configuration properties may be set using this interface
1206    * in which case they are applied on the \c default_topic_conf.
1207    * If no \c default_topic_conf has been set one will be created.
1208    * Any sub-sequent set("default_topic_conf", ..) calls will
1209    * replace the current default topic configuration.
1210 
1211    * @returns CONF_OK on success, else writes a human readable error
1212    *          description to \p errstr on error.
1213    */
1214   virtual Conf::ConfResult set (const std::string &name,
1215                                 const std::string &value,
1216                                 std::string &errstr) = 0;
1217 
1218   /** @brief Use with \p name = \c \"dr_cb\" */
1219   virtual Conf::ConfResult set (const std::string &name,
1220                                 DeliveryReportCb *dr_cb,
1221                                 std::string &errstr) = 0;
1222 
1223   /** @brief Use with \p name = \c \"oauthbearer_token_refresh_cb\" */
1224   virtual Conf::ConfResult set (const std::string &name,
1225                         OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
1226                         std::string &errstr) = 0;
1227 
1228   /** @brief Use with \p name = \c \"event_cb\" */
1229   virtual Conf::ConfResult set (const std::string &name,
1230                                 EventCb *event_cb,
1231                                 std::string &errstr) = 0;
1232 
1233   /** @brief Use with \p name = \c \"default_topic_conf\"
1234    *
1235    * Sets the default topic configuration to use for for automatically
1236    * subscribed topics.
1237    *
1238    * @sa RdKafka::KafkaConsumer::subscribe()
1239    */
1240   virtual Conf::ConfResult set (const std::string &name,
1241                                 const Conf *topic_conf,
1242                                 std::string &errstr) = 0;
1243 
1244   /** @brief Use with \p name = \c \"partitioner_cb\" */
1245   virtual Conf::ConfResult set (const std::string &name,
1246                                 PartitionerCb *partitioner_cb,
1247                                 std::string &errstr) = 0;
1248 
1249   /** @brief Use with \p name = \c \"partitioner_key_pointer_cb\" */
1250   virtual Conf::ConfResult set (const std::string &name,
1251                                 PartitionerKeyPointerCb *partitioner_kp_cb,
1252                                 std::string &errstr) = 0;
1253 
1254   /** @brief Use with \p name = \c \"socket_cb\" */
1255   virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
1256                                 std::string &errstr) = 0;
1257 
1258   /** @brief Use with \p name = \c \"open_cb\" */
1259   virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
1260                                 std::string &errstr) = 0;
1261 
1262   /** @brief Use with \p name = \c \"rebalance_cb\" */
1263   virtual Conf::ConfResult set (const std::string &name,
1264                                 RebalanceCb *rebalance_cb,
1265                                 std::string &errstr) = 0;
1266 
1267   /** @brief Use with \p name = \c \"offset_commit_cb\" */
1268   virtual Conf::ConfResult set (const std::string &name,
1269                                 OffsetCommitCb *offset_commit_cb,
1270                                 std::string &errstr) = 0;
1271 
1272   /** @brief Use with \p name = \c \"ssl_cert_verify_cb\".
1273    *  @returns CONF_OK on success or CONF_INVALID if SSL is
1274    *           not supported in this build.
1275   */
1276   virtual Conf::ConfResult set(const std::string &name,
1277                                SslCertificateVerifyCb *ssl_cert_verify_cb,
1278                                std::string &errstr) = 0;
1279 
1280   /**
1281    * @brief Set certificate/key \p cert_type from the \p cert_enc encoded
1282    *        memory at \p buffer of \p size bytes.
1283    *
1284    * @param cert_type Certificate or key type to configure.
1285    * @param cert_enc  Buffer \p encoding type.
1286    * @param buffer Memory pointer to encoded certificate or key.
1287    *               The memory is not referenced after this function returns.
1288    * @param size Size of memory at \p buffer.
1289    * @param errstr A human-readable error string will be written to this string
1290    *               on failure.
1291    *
1292    * @returns CONF_OK on success or CONF_INVALID if the memory in
1293    *          \p buffer is of incorrect encoding, or if librdkafka
1294    *          was not built with SSL support.
1295    *
1296    * @remark Calling this method multiple times with the same \p cert_type
1297    *         will replace the previous value.
1298    *
1299    * @remark Calling this method with \p buffer set to NULL will clear the
1300    *         configuration for \p cert_type.
1301    *
1302    * @remark The private key may require a password, which must be specified
1303    *         with the `ssl.key.password` configuration property prior to
1304    *         calling this function.
1305    *
1306    * @remark Private and public keys in PEM format may also be set with the
1307    *         `ssl.key.pem` and `ssl.certificate.pem` configuration properties.
1308    */
1309   virtual Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type,
1310                                          RdKafka::CertificateEncoding cert_enc,
1311                                          const void *buffer, size_t size,
1312                                          std::string &errstr) = 0;
1313 
1314   /** @brief Query single configuration value
1315    *
1316    * Do not use this method to get callbacks registered by the configuration file.
1317    * Instead use the specific get() methods with the specific callback parameter in the signature.
1318    *
1319    * Fallthrough:
1320    * Topic-level configuration properties from the \c default_topic_conf
1321    * may be retrieved using this interface.
1322    *
1323    *  @returns CONF_OK if the property was set previously set and
1324    *           returns the value in \p value. */
1325   virtual Conf::ConfResult get(const std::string &name,
1326 	  std::string &value) const = 0;
1327 
1328   /** @brief Query single configuration value
1329    *  @returns CONF_OK if the property was set previously set and
1330    *           returns the value in \p dr_cb. */
1331   virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
1332 
1333   /** @brief Query single configuration value
1334    *  @returns CONF_OK if the property was set previously set and
1335    *           returns the value in \p oauthbearer_token_refresh_cb. */
1336   virtual Conf::ConfResult get(
1337           OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0;
1338 
1339   /** @brief Query single configuration value
1340    *  @returns CONF_OK if the property was set previously set and
1341    *           returns the value in \p event_cb. */
1342   virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
1343 
1344   /** @brief Query single configuration value
1345    *  @returns CONF_OK if the property was set previously set and
1346    *           returns the value in \p partitioner_cb. */
1347   virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
1348 
1349   /** @brief Query single configuration value
1350    *  @returns CONF_OK if the property was set previously set and
1351    *           returns the value in \p partitioner_kp_cb. */
1352   virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
1353 
1354   /** @brief Query single configuration value
1355    *  @returns CONF_OK if the property was set previously set and
1356    *           returns the value in \p socket_cb. */
1357   virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
1358 
1359   /** @brief Query single configuration value
1360    *  @returns CONF_OK if the property was set previously set and
1361    *           returns the value in \p open_cb. */
1362   virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
1363 
1364   /** @brief Query single configuration value
1365    *  @returns CONF_OK if the property was set previously set and
1366    *           returns the value in \p rebalance_cb. */
1367   virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
1368 
1369   /** @brief Query single configuration value
1370    *  @returns CONF_OK if the property was set previously set and
1371    *           returns the value in \p offset_commit_cb. */
1372   virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
1373 
1374   /** @brief Use with \p name = \c \"ssl_cert_verify_cb\" */
1375   virtual Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0;
1376 
1377   /** @brief Dump configuration names and values to list containing
1378    *         name,value tuples */
1379   virtual std::list<std::string> *dump () = 0;
1380 
1381   /** @brief Use with \p name = \c \"consume_cb\" */
1382   virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
1383                                 std::string &errstr) = 0;
1384 
1385   /**
1386    * @brief Returns the underlying librdkafka C rd_kafka_conf_t handle.
1387    *
1388    * @warning Calling the C API on this handle is not recommended and there
1389    *          is no official support for it, but for cases where the C++
1390    *          does not provide the proper functionality this C handle can be
1391    *          used to interact directly with the core librdkafka API.
1392    *
1393    * @remark The lifetime of the returned pointer is the same as the Conf
1394    *         object this method is called on.
1395    *
1396    * @remark Include <rdkafka/rdkafka.h> prior to including
1397    *         <rdkafka/rdkafkacpp.h>
1398    *
1399    * @returns \c rd_kafka_conf_t* if this is a CONF_GLOBAL object, else NULL.
1400    */
1401   virtual struct rd_kafka_conf_s *c_ptr_global () = 0;
1402 
1403   /**
1404    * @brief Returns the underlying librdkafka C rd_kafka_topic_conf_t handle.
1405    *
1406    * @warning Calling the C API on this handle is not recommended and there
1407    *          is no official support for it, but for cases where the C++
1408    *          does not provide the proper functionality this C handle can be
1409    *          used to interact directly with the core librdkafka API.
1410    *
1411    * @remark The lifetime of the returned pointer is the same as the Conf
1412    *         object this method is called on.
1413    *
1414    * @remark Include <rdkafka/rdkafka.h> prior to including
1415    *         <rdkafka/rdkafkacpp.h>
1416    *
1417    * @returns \c rd_kafka_topic_conf_t* if this is a CONF_TOPIC object,
1418    *          else NULL.
1419    */
1420   virtual struct rd_kafka_topic_conf_s *c_ptr_topic () = 0;
1421 
1422   /**
1423    * @brief Set callback_data for ssl engine.
1424    *
1425    * @remark The \c ssl.engine.location configuration must be set for this
1426    *         to have affect.
1427    *
1428    * @remark The memory pointed to by \p value must remain valid for the
1429    *         lifetime of the configuration object and any Kafka clients that
1430    *         use it.
1431    *
1432    * @returns CONF_OK on success, else CONF_INVALID.
1433    */
1434   virtual Conf::ConfResult set_engine_callback_data (void *value,
1435                                                      std::string &errstr) = 0;
1436 };
1437 
1438 /**@}*/
1439 
1440 
1441 /**
1442  * @name Kafka base client handle
1443  * @{
1444  *
1445  */
1446 
1447 /**
1448  * @brief Base handle, super class for specific clients.
1449  */
1450 class RD_EXPORT Handle {
1451  public:
1452   virtual ~Handle() { }
1453 
1454   /** @returns the name of the handle */
1455   virtual const std::string name () const = 0;
1456 
1457   /**
1458    * @brief Returns the client's broker-assigned group member id
1459    *
1460    * @remark This currently requires the high-level KafkaConsumer
1461    *
1462    * @returns Last assigned member id, or empty string if not currently
1463    *          a group member.
1464    */
1465   virtual const std::string memberid () const = 0;
1466 
1467 
1468   /**
1469    * @brief Polls the provided kafka handle for events.
1470    *
1471    * Events will trigger application provided callbacks to be called.
1472    *
1473    * The \p timeout_ms argument specifies the maximum amount of time
1474    * (in milliseconds) that the call will block waiting for events.
1475    * For non-blocking calls, provide 0 as \p timeout_ms.
1476    * To wait indefinately for events, provide -1.
1477    *
1478    * Events:
1479    *   - delivery report callbacks (if an RdKafka::DeliveryCb is configured) [producer]
1480    *   - event callbacks (if an RdKafka::EventCb is configured) [producer & consumer]
1481    *
1482    * @remark  An application should make sure to call poll() at regular
1483    *          intervals to serve any queued callbacks waiting to be called.
1484    *
1485    * @warning This method MUST NOT be used with the RdKafka::KafkaConsumer,
1486    *          use its RdKafka::KafkaConsumer::consume() instead.
1487    *
1488    * @returns the number of events served.
1489    */
1490   virtual int poll (int timeout_ms) = 0;
1491 
1492   /**
1493    * @brief  Returns the current out queue length
1494    *
1495    * The out queue contains messages and requests waiting to be sent to,
1496    * or acknowledged by, the broker.
1497    */
1498   virtual int outq_len () = 0;
1499 
1500   /**
1501    * @brief Request Metadata from broker.
1502    *
1503    * Parameters:
1504    *  \p all_topics  - if non-zero: request info about all topics in cluster,
1505    *                   if zero: only request info about locally known topics.
1506    *  \p only_rkt    - only request info about this topic
1507    *  \p metadatap   - pointer to hold metadata result.
1508    *                   The \p *metadatap pointer must be released with \c delete.
1509    *  \p timeout_ms  - maximum response time before failing.
1510    *
1511    * @returns RdKafka::ERR_NO_ERROR on success (in which case \p *metadatap
1512    * will be set), else RdKafka::ERR__TIMED_OUT on timeout or
1513    * other error code on error.
1514    */
1515   virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt,
1516                               Metadata **metadatap, int timeout_ms) = 0;
1517 
1518 
1519   /**
1520    * @brief Pause producing or consumption for the provided list of partitions.
1521    *
1522    * Success or error is returned per-partition in the \p partitions list.
1523    *
1524    * @returns ErrorCode::NO_ERROR
1525    *
1526    * @sa resume()
1527    */
1528   virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
1529 
1530 
1531   /**
1532    * @brief Resume producing or consumption for the provided list of partitions.
1533    *
1534    * Success or error is returned per-partition in the \p partitions list.
1535    *
1536    * @returns ErrorCode::NO_ERROR
1537    *
1538    * @sa pause()
1539    */
1540   virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
1541 
1542 
1543   /**
1544    * @brief Query broker for low (oldest/beginning)
1545    *        and high (newest/end) offsets for partition.
1546    *
1547    * Offsets are returned in \p *low and \p *high respectively.
1548    *
1549    * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure.
1550    */
1551   virtual ErrorCode query_watermark_offsets (const std::string &topic,
1552 					     int32_t partition,
1553 					     int64_t *low, int64_t *high,
1554 					     int timeout_ms) = 0;
1555 
1556   /**
1557    * @brief Get last known low (oldest/beginning)
1558    *        and high (newest/end) offsets for partition.
1559    *
1560    * The low offset is updated periodically (if statistics.interval.ms is set)
1561    * while the high offset is updated on each fetched message set from the
1562    * broker.
1563    *
1564    * If there is no cached offset (either low or high, or both) then
1565    * OFFSET_INVALID will be returned for the respective offset.
1566    *
1567    * Offsets are returned in \p *low and \p *high respectively.
1568    *
1569    * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure.
1570    *
1571    * @remark Shall only be used with an active consumer instance.
1572    */
1573   virtual ErrorCode get_watermark_offsets (const std::string &topic,
1574 					   int32_t partition,
1575 					   int64_t *low, int64_t *high) = 0;
1576 
1577 
1578   /**
1579    * @brief Look up the offsets for the given partitions by timestamp.
1580    *
1581    * The returned offset for each partition is the earliest offset whose
1582    * timestamp is greater than or equal to the given timestamp in the
1583    * corresponding partition.
1584    *
1585    * The timestamps to query are represented as \c offset in \p offsets
1586    * on input, and \c offset() will return the closest earlier offset
1587    * for the timestamp on output.
1588    *
1589    * Timestamps are expressed as milliseconds since epoch (UTC).
1590    *
1591    * The function will block for at most \p timeout_ms milliseconds.
1592    *
1593    * @remark Duplicate Topic+Partitions are not supported.
1594    * @remark Errors are also returned per TopicPartition, see \c err()
1595    *
1596    * @returns an error code for general errors, else RdKafka::ERR_NO_ERROR
1597    *          in which case per-partition errors might be set.
1598    */
1599   virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
1600                                      int timeout_ms) = 0;
1601 
1602 
1603   /**
1604    * @brief Retrieve queue for a given partition.
1605    *
1606    * @returns The fetch queue for the given partition if successful. Else,
1607    *          NULL is returned.
1608    *
1609    * @remark This function only works on consumers.
1610    */
1611   virtual Queue *get_partition_queue (const TopicPartition *partition) = 0;
1612 
1613   /**
1614    * @brief Forward librdkafka logs (and debug) to the specified queue
1615    *        for serving with one of the ..poll() calls.
1616    *
1617    *        This allows an application to serve log callbacks (\c log_cb)
1618    *        in its thread of choice.
1619    *
1620    * @param queue Queue to forward logs to. If the value is NULL the logs
1621    *        are forwarded to the main queue.
1622    *
1623    * @remark The configuration property \c log.queue MUST also be set to true.
1624    *
1625    * @remark librdkafka maintains its own reference to the provided queue.
1626    *
1627    * @returns ERR_NO_ERROR on success or an error code on error.
1628    */
1629   virtual ErrorCode set_log_queue (Queue *queue) = 0;
1630 
1631   /**
1632    * @brief Cancels the current callback dispatcher (Handle::poll(),
1633    *        KafkaConsumer::consume(), etc).
1634    *
1635    * A callback may use this to force an immediate return to the calling
1636    * code (caller of e.g. Handle::poll()) without processing any further
1637    * events.
1638    *
1639    * @remark This function MUST ONLY be called from within a
1640    *         librdkafka callback.
1641    */
1642   virtual void yield () = 0;
1643 
1644   /**
1645    * @brief Returns the ClusterId as reported in broker metadata.
1646    *
1647    * @param timeout_ms If there is no cached value from metadata retrieval
1648    *                   then this specifies the maximum amount of time
1649    *                   (in milliseconds) the call will block waiting
1650    *                   for metadata to be retrieved.
1651    *                   Use 0 for non-blocking calls.
1652    *
1653    * @remark Requires broker version >=0.10.0 and api.version.request=true.
1654    *
1655    * @returns Last cached ClusterId, or empty string if no ClusterId could be
1656    *          retrieved in the allotted timespan.
1657    */
1658   virtual const std::string clusterid (int timeout_ms) = 0;
1659 
1660   /**
1661    * @brief Returns the underlying librdkafka C rd_kafka_t handle.
1662    *
1663    * @warning Calling the C API on this handle is not recommended and there
1664    *          is no official support for it, but for cases where the C++
1665    *          does not provide the proper functionality this C handle can be
1666    *          used to interact directly with the core librdkafka API.
1667    *
1668    * @remark The lifetime of the returned pointer is the same as the Topic
1669    *         object this method is called on.
1670    *
1671    * @remark Include <rdkafka/rdkafka.h> prior to including
1672    *         <rdkafka/rdkafkacpp.h>
1673    *
1674    * @returns \c rd_kafka_t*
1675    */
1676   virtual struct rd_kafka_s *c_ptr () = 0;
1677 
1678   /**
1679    * @brief Returns the current ControllerId (controller broker id)
1680    *        as reported in broker metadata.
1681    *
1682    * @param timeout_ms If there is no cached value from metadata retrieval
1683    *                   then this specifies the maximum amount of time
1684    *                   (in milliseconds) the call will block waiting
1685    *                   for metadata to be retrieved.
1686    *                   Use 0 for non-blocking calls.
1687    *
1688    * @remark Requires broker version >=0.10.0 and api.version.request=true.
1689    *
1690    * @returns Last cached ControllerId, or -1 if no ControllerId could be
1691    *          retrieved in the allotted timespan.
1692    */
1693   virtual int32_t controllerid (int timeout_ms) = 0;
1694 
1695 
1696   /**
1697    * @brief Returns the first fatal error set on this client instance,
1698    *        or ERR_NO_ERROR if no fatal error has occurred.
1699    *
1700    * This function is to be used with the Idempotent Producer and
1701    * the Event class for \c EVENT_ERROR events to detect fatal errors.
1702    *
1703    * Generally all errors raised by the error event are to be considered
1704    * informational and temporary, the client will try to recover from all
1705    * errors in a graceful fashion (by retrying, etc).
1706    *
1707    * However, some errors should logically be considered fatal to retain
1708    * consistency; in particular a set of errors that may occur when using the
1709    * Idempotent Producer and the in-order or exactly-once producer guarantees
1710    * can't be satisfied.
1711    *
1712    * @param errstr A human readable error string if a fatal error was set.
1713    *
1714    * @returns ERR_NO_ERROR if no fatal error has been raised, else
1715    *          any other error code.
1716    */
1717   virtual ErrorCode fatal_error (std::string &errstr) const = 0;
1718 
1719   /**
1720    * @brief Set SASL/OAUTHBEARER token and metadata
1721    *
1722    * @param token_value the mandatory token value to set, often (but not
1723    *  necessarily) a JWS compact serialization as per
1724    *  https://tools.ietf.org/html/rfc7515#section-3.1.
1725    * @param md_lifetime_ms when the token expires, in terms of the number of
1726    *  milliseconds since the epoch.
1727    * @param md_principal_name the Kafka principal name associated with the
1728    *  token.
1729    * @param extensions potentially empty SASL extension keys and values where
1730    *  element [i] is the key and [i+1] is the key's value, to be communicated
1731    *  to the broker as additional key-value pairs during the initial client
1732    *  response as per https://tools.ietf.org/html/rfc7628#section-3.1.  The
1733    *  number of SASL extension keys plus values must be a non-negative multiple
1734    *  of 2. Any provided keys and values are copied.
1735    * @param errstr A human readable error string is written here, only if
1736    *  there is an error.
1737    *
1738    * The SASL/OAUTHBEARER token refresh callback should invoke
1739    * this method upon success. The extension keys must not include the reserved
1740    * key "`auth`", and all extension keys and values must conform to the
1741    * required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
1742    *
1743    *     key            = 1*(ALPHA)
1744    *     value          = *(VCHAR / SP / HTAB / CR / LF )
1745    *
1746    * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise \p errstr set
1747    *              and:<br>
1748    *          \c RdKafka::ERR__INVALID_ARG if any of the arguments are
1749    *              invalid;<br>
1750    *          \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not
1751    *              supported by this build;<br>
1752    *          \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is
1753    *              not configured as the client's authentication mechanism.<br>
1754    *
1755    * @sa RdKafka::oauthbearer_set_token_failure
1756    * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb"
1757    */
1758   virtual ErrorCode oauthbearer_set_token (const std::string &token_value,
1759                                            int64_t md_lifetime_ms,
1760                                            const std::string &md_principal_name,
1761                                            const std::list<std::string> &extensions,
1762                                            std::string &errstr) = 0;
1763 
1764     /**
1765      * @brief SASL/OAUTHBEARER token refresh failure indicator.
1766      *
1767      * @param errstr human readable error reason for failing to acquire a token.
1768      *
1769      * The SASL/OAUTHBEARER token refresh callback should
1770      * invoke this method upon failure to refresh the token.
1771      *
1772      * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise:<br>
1773      *          \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not
1774      *              supported by this build;<br>
1775      *          \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is
1776      *              not configured as the client's authentication mechanism.
1777      *
1778      * @sa RdKafka::oauthbearer_set_token
1779      * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb"
1780      */
1781     virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr) = 0;
1782 
1783    /**
1784      * @brief Allocate memory using the same allocator librdkafka uses.
1785      *
1786      * This is typically an abstraction for the malloc(3) call and makes sure
1787      * the application can use the same memory allocator as librdkafka for
1788      * allocating pointers that are used by librdkafka.
1789      *
1790      * @remark Memory allocated by mem_malloc() must be freed using
1791      *         mem_free().
1792      */
1793     virtual void *mem_malloc (size_t size) = 0;
1794 
1795    /**
1796      * @brief Free pointer returned by librdkafka
1797      *
1798      * This is typically an abstraction for the free(3) call and makes sure
1799      * the application can use the same memory allocator as librdkafka for
1800      * freeing pointers returned by librdkafka.
1801      *
1802      * In standard setups it is usually not necessary to use this interface
1803      * rather than the free(3) function.
1804      *
1805      * @remark mem_free() must only be used for pointers returned by APIs
1806      *         that explicitly mention using this function for freeing.
1807      */
1808     virtual void mem_free (void *ptr) = 0;
1809 };
1810 
1811 
1812 /**@}*/
1813 
1814 
1815 /**
1816  * @name Topic and partition objects
1817  * @{
1818  *
1819  */
1820 
1821 /**
1822  * @brief Topic+Partition
1823  *
1824  * This is a generic type to hold a single partition and various
1825  * information about it.
1826  *
1827  * Is typically used with std::vector<RdKafka::TopicPartition*> to provide
1828  * a list of partitions for different operations.
1829  */
1830 class RD_EXPORT TopicPartition {
1831 public:
1832   /**
1833    * @brief Create topic+partition object for \p topic and \p partition.
1834    *
1835    * Use \c delete to deconstruct.
1836    */
1837   static TopicPartition *create (const std::string &topic, int partition);
1838 
1839   /**
1840    * @brief Create topic+partition object for \p topic and \p partition
1841    *        with offset \p offset.
1842    *
1843    * Use \c delete to deconstruct.
1844    */
1845   static TopicPartition *create (const std::string &topic, int partition,
1846                                  int64_t offset);
1847 
1848   virtual ~TopicPartition() = 0;
1849 
1850   /**
1851    * @brief Destroy/delete the TopicPartitions in \p partitions
1852    *        and clear the vector.
1853    */
1854   static void destroy (std::vector<TopicPartition*> &partitions);
1855 
1856   /** @returns topic name */
1857   virtual const std::string &topic () const = 0;
1858 
1859   /** @returns partition id */
1860   virtual int partition () const = 0;
1861 
1862   /** @returns offset (if applicable) */
1863   virtual int64_t offset () const = 0;
1864 
1865   /** @brief Set offset */
1866   virtual void set_offset (int64_t offset) = 0;
1867 
1868   /** @returns error code (if applicable) */
1869   virtual ErrorCode err () const = 0;
1870 };
1871 
1872 
1873 
1874 /**
1875  * @brief Topic handle
1876  *
1877  */
1878 class RD_EXPORT Topic {
1879  public:
1880   /**
1881    * @brief Unassigned partition.
1882    *
1883    * The unassigned partition is used by the producer API for messages
1884    * that should be partitioned using the configured or default partitioner.
1885    */
1886   static const int32_t PARTITION_UA;
1887 
1888   /** @brief Special offsets */
1889   static const int64_t OFFSET_BEGINNING; /**< Consume from beginning */
1890   static const int64_t OFFSET_END; /**< Consume from end */
1891   static const int64_t OFFSET_STORED; /**< Use offset storage */
1892   static const int64_t OFFSET_INVALID; /**< Invalid offset */
1893 
1894 
1895   /**
1896    * @brief Creates a new topic handle for topic named \p topic_str
1897    *
1898    * \p conf is an optional configuration for the topic  that will be used
1899    * instead of the default topic configuration.
1900    * The \p conf object is reusable after this call.
1901    *
1902    * @returns the new topic handle or NULL on error (see \p errstr).
1903    */
1904   static Topic *create (Handle *base, const std::string &topic_str,
1905                         const Conf *conf, std::string &errstr);
1906 
1907   virtual ~Topic () = 0;
1908 
1909 
1910   /** @returns the topic name */
1911   virtual const std::string name () const = 0;
1912 
1913   /**
1914    * @returns true if \p partition is available for the topic (has leader).
1915    * @warning \b MUST \b ONLY be called from within a
1916    *          RdKafka::PartitionerCb callback.
1917    */
1918   virtual bool partition_available (int32_t partition) const = 0;
1919 
1920   /**
1921    * @brief Store offset \p offset + 1 for topic partition \p partition.
1922    * The offset will be committed (written) to the broker (or file) according
1923    * to \p auto.commit.interval.ms or next manual offset-less commit call.
1924    *
1925    * @remark \c enable.auto.offset.store must be set to \c false when using
1926    *         this API.
1927    *
1928    * @returns RdKafka::ERR_NO_ERROR on success or an error code if none of the
1929    *          offsets could be stored.
1930    */
1931   virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
1932 
1933   /**
1934    * @brief Returns the underlying librdkafka C rd_kafka_topic_t handle.
1935    *
1936    * @warning Calling the C API on this handle is not recommended and there
1937    *          is no official support for it, but for cases where the C++ API
1938    *          does not provide the underlying functionality this C handle can be
1939    *          used to interact directly with the core librdkafka API.
1940    *
1941    * @remark The lifetime of the returned pointer is the same as the Topic
1942    *         object this method is called on.
1943    *
1944    * @remark Include <rdkafka/rdkafka.h> prior to including
1945    *         <rdkafka/rdkafkacpp.h>
1946    *
1947    * @returns \c rd_kafka_topic_t*
1948    */
1949   virtual struct rd_kafka_topic_s *c_ptr () = 0;
1950 };
1951 
1952 
1953 /**@}*/
1954 
1955 
1956 /**
1957  * @name Message object
1958  * @{
1959  *
1960  */
1961 
1962 
1963 /**
1964  * @brief Message timestamp object
1965  *
1966  * Represents the number of milliseconds since the epoch (UTC).
1967  *
1968  * The MessageTimestampType dictates the timestamp type or origin.
1969  *
1970  * @remark Requires Apache Kafka broker version >= 0.10.0
1971  *
1972  */
1973 
1974 class RD_EXPORT MessageTimestamp {
1975 public:
1976   /*! Message timestamp type */
1977   enum MessageTimestampType {
1978     MSG_TIMESTAMP_NOT_AVAILABLE,   /**< Timestamp not available */
1979     MSG_TIMESTAMP_CREATE_TIME,     /**< Message creation time (source) */
1980     MSG_TIMESTAMP_LOG_APPEND_TIME  /**< Message log append time (broker) */
1981   };
1982 
1983   MessageTimestampType type;       /**< Timestamp type */
1984   int64_t timestamp;               /**< Milliseconds since epoch (UTC). */
1985 };
1986 
1987 
1988 /**
1989  * @brief Headers object
1990  *
1991  * Represents message headers.
1992  *
1993  * https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
1994  *
1995  * @remark Requires Apache Kafka >= 0.11.0 brokers
1996  */
1997 class RD_EXPORT Headers {
1998 public:
1999   virtual ~Headers() = 0;
2000 
2001   /**
2002    * @brief Header object
2003    *
2004    * This object represents a single Header with a key value pair
2005    * and an ErrorCode
2006    *
2007    * @remark dynamic allocation of this object is not supported.
2008    */
2009   class Header {
2010    public:
2011     /**
2012      * @brief Header object to encapsulate a single Header
2013      *
2014      * @param key the string value for the header key
2015      * @param value the bytes of the header value, or NULL
2016      * @param value_size the length in bytes of the header value
2017      *
2018      * @remark key and value are copied.
2019      *
2020      */
2021     Header(const std::string &key,
2022            const void *value,
2023            size_t value_size):
2024     key_(key), err_(ERR_NO_ERROR), value_size_(value_size) {
2025       value_ = copy_value(value, value_size);
2026     }
2027 
2028     /**
2029      * @brief Header object to encapsulate a single Header
2030      *
2031      * @param key the string value for the header key
2032      * @param value the bytes of the header value
2033      * @param value_size the length in bytes of the header value
2034      * @param err the error code if one returned
2035      *
2036      * @remark The error code is used for when the Header is constructed
2037      *         internally by using RdKafka::Headers::get_last which constructs
2038      *         a Header encapsulating the ErrorCode in the process.
2039      *         If err is set, the value and value_size fields will be undefined.
2040      */
2041     Header(const std::string &key,
2042            const void *value,
2043            size_t value_size,
2044            const RdKafka::ErrorCode err):
2045     key_(key), err_(err), value_(NULL), value_size_(value_size) {
2046       if (err == ERR_NO_ERROR)
2047         value_ = copy_value(value, value_size);
2048     }
2049 
2050     /**
2051      * @brief Copy constructor
2052      *
2053      * @param other Header to make a copy of.
2054      */
2055     Header(const Header &other):
2056     key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
2057       value_ = copy_value(other.value_, value_size_);
2058     }
2059 
2060     /**
2061      * @brief Assignment operator
2062      *
2063      * @param other Header to make a copy of.
2064      */
2065     Header& operator=(const Header &other)
2066     {
2067       if (&other == this) {
2068         return *this;
2069       }
2070 
2071       key_ = other.key_;
2072       err_ = other.err_;
2073       value_size_ = other.value_size_;
2074 
2075       if (value_ != NULL)
2076         mem_free(value_);
2077 
2078       value_ = copy_value(other.value_, value_size_);
2079 
2080       return *this;
2081     }
2082 
2083     ~Header() {
2084       if (value_ != NULL)
2085         mem_free(value_);
2086     }
2087 
2088     /** @returns the key/name associated with this Header */
2089     std::string key() const {
2090       return key_;
2091     }
2092 
2093      /** @returns returns the binary value, or NULL */
2094     const void *value() const {
2095       return value_;
2096     }
2097 
2098     /** @returns returns the value casted to a nul-terminated C string,
2099      *           or NULL. */
2100     const char *value_string() const {
2101       return static_cast<const char *>(value_);
2102     }
2103 
2104     /** @returns Value Size the length of the Value in bytes */
2105     size_t value_size() const {
2106       return value_size_;
2107     }
2108 
2109     /** @returns the error code of this Header (usually ERR_NO_ERROR) */
2110     RdKafka::ErrorCode err() const {
2111       return err_;
2112     }
2113 
2114  private:
2115     char *copy_value(const void *value, size_t value_size) {
2116       if (!value)
2117         return NULL;
2118 
2119       char *dest = (char *)mem_malloc(value_size + 1);
2120       memcpy(dest, (const char *)value, value_size);
2121       dest[value_size] = '\0';
2122 
2123       return dest;
2124     }
2125 
2126     std::string key_;
2127     RdKafka::ErrorCode err_;
2128     char *value_;
2129     size_t value_size_;
2130     void *operator new(size_t); /* Prevent dynamic allocation */
2131   };
2132 
2133   /**
2134    * @brief Create a new instance of the Headers object
2135    *
2136    * @returns an empty Headers list
2137    */
2138   static Headers *create();
2139 
2140   /**
2141    * @brief Create a new instance of the Headers object from a std::vector
2142    *
2143    * @param headers std::vector of RdKafka::Headers::Header objects.
2144    *                The headers are copied, not referenced.
2145    *
2146    * @returns a Headers list from std::vector set to the size of the std::vector
2147    */
2148   static Headers *create(const std::vector<Header> &headers);
2149 
2150   /**
2151    * @brief Adds a Header to the end of the list.
2152    *
2153    * @param key header key/name
2154    * @param value binary value, or NULL
2155    * @param value_size size of the value
2156    *
2157    * @returns an ErrorCode signalling success or failure to add the header.
2158    */
2159   virtual ErrorCode add(const std::string &key, const void *value,
2160                         size_t value_size) = 0;
2161 
2162   /**
2163    * @brief Adds a Header to the end of the list.
2164    *
2165    * Convenience method for adding a std::string as a value for the header.
2166    *
2167    * @param key header key/name
2168    * @param value value string
2169    *
2170    * @returns an ErrorCode signalling success or failure to add the header.
2171    */
2172   virtual ErrorCode add(const std::string &key, const std::string &value) = 0;
2173 
2174   /**
2175    * @brief Adds a Header to the end of the list.
2176    *
2177    * This method makes a copy of the passed header.
2178    *
2179    * @param header Existing header to copy
2180    *
2181    * @returns an ErrorCode signalling success or failure to add the header.
2182    */
2183   virtual ErrorCode add(const Header &header) = 0;
2184 
2185   /**
2186    * @brief Removes all the Headers of a given key
2187    *
2188    * @param key header key/name to remove
2189    *
2190    * @returns An ErrorCode signalling a success or failure to remove the Header.
2191    */
2192   virtual ErrorCode remove(const std::string &key) = 0;
2193 
2194   /**
2195    * @brief Gets all of the Headers of a given key
2196    *
2197    * @param key header key/name
2198    *
2199    * @remark If duplicate keys exist this will return them all as a std::vector
2200    *
2201    * @returns a std::vector containing all the Headers of the given key.
2202    */
2203   virtual std::vector<Header> get(const std::string &key) const = 0;
2204 
2205   /**
2206    * @brief Gets the last occurrence of a Header of a given key
2207    *
2208    * @param key header key/name
2209    *
2210    * @remark This will only return the most recently added header
2211    *
2212    * @returns the Header if found, otherwise a Header with an err set to
2213    *          ERR__NOENT.
2214    */
2215   virtual Header get_last(const std::string &key) const = 0;
2216 
2217   /**
2218    * @brief Returns all Headers
2219    *
2220    * @returns a std::vector containing all of the Headers
2221    */
2222   virtual std::vector<Header> get_all() const = 0;
2223 
2224   /**
2225    * @returns the number of headers.
2226    */
2227   virtual size_t size() const = 0;
2228 };
2229 
2230 
2231 /**
2232  * @brief Message object
2233  *
2234  * This object represents either a single consumed or produced message,
2235  * or an event (\p err() is set).
2236  *
2237  * An application must check RdKafka::Message::err() to see if the
2238  * object is a proper message (error is RdKafka::ERR_NO_ERROR) or a
2239  * an error event.
2240  *
2241  */
2242 class RD_EXPORT Message {
2243  public:
2244   /** @brief Message persistence status can be used by the application to
2245    *         find out if a produced message was persisted in the topic log. */
2246   enum Status {
2247     /** Message was never transmitted to the broker, or failed with
2248      *  an error indicating it was not written to the log.
2249      *  Application retry risks ordering, but not duplication. */
2250     MSG_STATUS_NOT_PERSISTED = 0,
2251 
2252     /** Message was transmitted to broker, but no acknowledgement was
2253      *  received.
2254      *  Application retry risks ordering and duplication. */
2255     MSG_STATUS_POSSIBLY_PERSISTED = 1,
2256 
2257     /** Message was written to the log and fully acknowledged.
2258      *  No reason for application to retry.
2259      *  Note: this value should only be trusted with \c acks=all. */
2260     MSG_STATUS_PERSISTED = 2,
2261   };
2262 
2263   /**
2264    * @brief Accessor functions*
2265    * @remark Not all fields are present in all types of callbacks.
2266    */
2267 
2268   /** @returns The error string if object represent an error event,
2269    *           else an empty string. */
2270   virtual std::string         errstr() const = 0;
2271 
2272   /** @returns The error code if object represents an error event, else 0. */
2273   virtual ErrorCode           err () const = 0;
2274 
2275   /** @returns the RdKafka::Topic object for a message (if applicable),
2276    *            or NULL if a corresponding RdKafka::Topic object has not been
2277    *            explicitly created with RdKafka::Topic::create().
2278    *            In this case use topic_name() instead. */
2279   virtual Topic              *topic () const = 0;
2280 
2281   /** @returns Topic name (if applicable, else empty string) */
2282   virtual std::string         topic_name () const = 0;
2283 
2284   /** @returns Partition (if applicable) */
2285   virtual int32_t             partition () const = 0;
2286 
2287   /** @returns Message payload (if applicable) */
2288   virtual void               *payload () const = 0 ;
2289 
2290   /** @returns Message payload length (if applicable) */
2291   virtual size_t              len () const = 0;
2292 
2293   /** @returns Message key as string (if applicable) */
2294   virtual const std::string  *key () const = 0;
2295 
2296   /** @returns Message key as void pointer  (if applicable) */
2297   virtual const void         *key_pointer () const = 0 ;
2298 
2299   /** @returns Message key's binary length (if applicable) */
2300   virtual size_t              key_len () const = 0;
2301 
2302   /** @returns Message or error offset (if applicable) */
2303   virtual int64_t             offset () const = 0;
2304 
2305   /** @returns Message timestamp (if applicable) */
2306   virtual MessageTimestamp    timestamp () const = 0;
2307 
2308   /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */
2309   virtual void               *msg_opaque () const = 0;
2310 
2311   virtual ~Message () = 0;
2312 
2313   /** @returns the latency in microseconds for a produced message measured
2314    *           from the produce() call, or -1 if latency is not available. */
2315   virtual int64_t             latency () const = 0;
2316 
2317   /**
2318    * @brief Returns the underlying librdkafka C rd_kafka_message_t handle.
2319    *
2320    * @warning Calling the C API on this handle is not recommended and there
2321    *          is no official support for it, but for cases where the C++ API
2322    *          does not provide the underlying functionality this C handle can be
2323    *          used to interact directly with the core librdkafka API.
2324    *
2325    * @remark The lifetime of the returned pointer is the same as the Message
2326    *         object this method is called on.
2327    *
2328    * @remark Include <rdkafka/rdkafka.h> prior to including
2329    *         <rdkafka/rdkafkacpp.h>
2330    *
2331    * @returns \c rd_kafka_message_t*
2332    */
2333   virtual struct rd_kafka_message_s *c_ptr () = 0;
2334 
2335   /**
2336    * @brief Returns the message's persistence status in the topic log.
2337    */
2338   virtual Status status () const = 0;
2339 
2340   /** @returns the Headers instance for this Message, or NULL if there
2341    *  are no headers.
2342    *
2343    * @remark The lifetime of the Headers are the same as the Message. */
2344   virtual RdKafka::Headers   *headers () = 0;
2345 
2346   /** @returns the Headers instance for this Message (if applicable).
2347    *  If NULL is returned the reason is given in \p err, which
2348    *  is either ERR__NOENT if there were no headers, or another
2349    *  error code if header parsing failed.
2350    *
2351    * @remark The lifetime of the Headers are the same as the Message. */
2352   virtual RdKafka::Headers   *headers (RdKafka::ErrorCode *err) = 0;
2353 
2354   /** @returns the broker id of the broker the message was produced to or
2355    *           fetched from, or -1 if not known/applicable. */
2356   virtual int32_t broker_id () const = 0;
2357 };
2358 
2359 /**@}*/
2360 
2361 
2362 /**
2363  * @name Queue interface
2364  * @{
2365  *
2366  */
2367 
2368 
2369 /**
2370  * @brief Queue interface
2371  *
2372  * Create a new message queue.  Message queues allows the application
2373  * to re-route consumed messages from multiple topic+partitions into
2374  * one single queue point.  This queue point, containing messages from
2375  * a number of topic+partitions, may then be served by a single
2376  * consume() method, rather than one per topic+partition combination.
2377  *
2378  * See the RdKafka::Consumer::start(), RdKafka::Consumer::consume(), and
2379  * RdKafka::Consumer::consume_callback() methods that take a queue as the first
2380  * parameter for more information.
2381  */
2382 class RD_EXPORT Queue {
2383  public:
2384   /**
2385    * @brief Create Queue object
2386    */
2387   static Queue *create (Handle *handle);
2388 
2389   /**
2390    * @brief Forward/re-route queue to \p dst.
2391    * If \p dst is \c NULL, the forwarding is removed.
2392    *
2393    * The internal refcounts for both queues are increased.
2394    *
2395    * @remark Regardless of whether \p dst is NULL or not, after calling this
2396    *         function, \p src will not forward it's fetch queue to the consumer
2397    *         queue.
2398    */
2399   virtual ErrorCode forward (Queue *dst) = 0;
2400 
2401 
2402   /**
2403    * @brief Consume message or get error event from the queue.
2404    *
2405    * @remark Use \c delete to free the message.
2406    *
2407    * @returns One of:
2408    *  - proper message (RdKafka::Message::err() is ERR_NO_ERROR)
2409    *  - error event (RdKafka::Message::err() is != ERR_NO_ERROR)
2410    *  - timeout due to no message or event in \p timeout_ms
2411    *    (RdKafka::Message::err() is ERR__TIMED_OUT)
2412    */
2413   virtual Message *consume (int timeout_ms) = 0;
2414 
2415   /**
2416    * @brief Poll queue, serving any enqueued callbacks.
2417    *
2418    * @remark Must NOT be used for queues containing messages.
2419    *
2420    * @returns the number of events served or 0 on timeout.
2421    */
2422   virtual int poll (int timeout_ms) = 0;
2423 
2424   virtual ~Queue () = 0;
2425 
2426   /**
2427    * @brief Enable IO event triggering for queue.
2428    *
2429    * To ease integration with IO based polling loops this API
2430    * allows an application to create a separate file-descriptor
2431    * that librdkafka will write \p payload (of size \p size) to
2432    * whenever a new element is enqueued on a previously empty queue.
2433    *
2434    * To remove event triggering call with \p fd = -1.
2435    *
2436    * librdkafka will maintain a copy of the \p payload.
2437    *
2438    * @remark When using forwarded queues the IO event must only be enabled
2439    *         on the final forwarded-to (destination) queue.
2440    */
2441   virtual void io_event_enable (int fd, const void *payload, size_t size) = 0;
2442 };
2443 
2444 /**@}*/
2445 
2446 /**
2447  * @name ConsumerGroupMetadata
2448  * @{
2449  *
2450  */
2451 /**
2452  * @brief ConsumerGroupMetadata holds a consumer instance's group
2453  *        metadata state.
2454  *
2455  * This class currently does not have any public methods.
2456  */
2457 class RD_EXPORT ConsumerGroupMetadata {
2458 public:
2459   virtual ~ConsumerGroupMetadata () = 0;
2460 };
2461 
2462 /**@}*/
2463 
2464 /**
2465  * @name KafkaConsumer
2466  * @{
2467  *
2468  */
2469 
2470 
2471 /**
2472  * @brief High-level KafkaConsumer (for brokers 0.9 and later)
2473  *
2474  * @remark Requires Apache Kafka >= 0.9.0 brokers
2475  *
2476  * Currently supports the \c range and \c roundrobin partition assignment
2477  * strategies (see \c partition.assignment.strategy)
2478  */
2479 class RD_EXPORT KafkaConsumer : public virtual Handle {
2480 public:
2481   /**
2482    * @brief Creates a KafkaConsumer.
2483    *
2484    * The \p conf object must have \c group.id set to the consumer group to join.
2485    *
2486    * Use RdKafka::KafkaConsumer::close() to shut down the consumer.
2487    *
2488    * @sa RdKafka::RebalanceCb
2489    * @sa CONFIGURATION.md for \c group.id, \c session.timeout.ms,
2490    *     \c partition.assignment.strategy, etc.
2491    */
2492   static KafkaConsumer *create (const Conf *conf, std::string &errstr);
2493 
2494   virtual ~KafkaConsumer () = 0;
2495 
2496 
2497   /** @brief Returns the current partition assignment as set by
2498    *         RdKafka::KafkaConsumer::assign() */
2499   virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
2500 
2501   /** @brief Returns the current subscription as set by
2502    *         RdKafka::KafkaConsumer::subscribe() */
2503   virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
2504 
2505   /**
2506    * @brief Update the subscription set to \p topics.
2507    *
2508    * Any previous subscription will be unassigned and  unsubscribed first.
2509    *
2510    * The subscription set denotes the desired topics to consume and this
2511    * set is provided to the partition assignor (one of the elected group
2512    * members) for all clients which then uses the configured
2513    * \c partition.assignment.strategy to assign the subscription sets's
2514    * topics's partitions to the consumers, depending on their subscription.
2515    *
2516    * The result of such an assignment is a rebalancing which is either
2517    * handled automatically in librdkafka or can be overridden by the application
2518    * by providing a RdKafka::RebalanceCb.
2519    *
2520    * The rebalancing passes the assigned partition set to
2521    * RdKafka::KafkaConsumer::assign() to update what partitions are actually
2522    * being fetched by the KafkaConsumer.
2523    *
2524    * Regex pattern matching automatically performed for topics prefixed
2525    * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\"
2526    *
2527    * @remark A consumer error will be raised for each unavailable topic in the
2528    *  \p topics. The error will be ERR_UNKNOWN_TOPIC_OR_PART
2529    *  for non-existent topics, and
2530    *  ERR_TOPIC_AUTHORIZATION_FAILED for unauthorized topics.
2531    *  The consumer error will be raised through consume() (et.al.)
2532    *  with the \c RdKafka::Message::err() returning one of the
2533    *  error codes mentioned above.
2534    *  The subscribe function itself is asynchronous and will not return
2535    *  an error on unavailable topics.
2536    *
2537    * @returns an error if the provided list of topics is invalid.
2538    */
2539   virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0;
2540 
2541   /** @brief Unsubscribe from the current subscription set. */
2542   virtual ErrorCode unsubscribe () = 0;
2543 
2544   /**
2545    *  @brief Update the assignment set to \p partitions.
2546    *
2547    * The assignment set is the set of partitions actually being consumed
2548    * by the KafkaConsumer.
2549    */
2550   virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0;
2551 
2552   /**
2553    * @brief Stop consumption and remove the current assignment.
2554    */
2555   virtual ErrorCode unassign () = 0;
2556 
2557   /**
2558    * @brief Consume message or get error event, triggers callbacks.
2559    *
2560    * Will automatically call registered callbacks for any such queued events,
2561    * including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
2562    * etc.
2563    *
2564    * @remark Use \c delete to free the message.
2565    *
2566    * @remark  An application should make sure to call consume() at regular
2567    *          intervals, even if no messages are expected, to serve any
2568    *          queued callbacks waiting to be called. This is especially
2569    *          important when a RebalanceCb has been registered as it needs
2570    *          to be called and handled properly to synchronize internal
2571    *          consumer state.
2572    *
2573    * @remark Application MUST NOT call \p poll() on KafkaConsumer objects.
2574    *
2575    * @returns One of:
2576    *  - proper message (RdKafka::Message::err() is ERR_NO_ERROR)
2577    *  - error event (RdKafka::Message::err() is != ERR_NO_ERROR)
2578    *  - timeout due to no message or event in \p timeout_ms
2579    *    (RdKafka::Message::err() is ERR__TIMED_OUT)
2580    */
2581   virtual Message *consume (int timeout_ms) = 0;
2582 
2583   /**
2584    * @brief Commit offsets for the current assignment.
2585    *
2586    * @remark This is the synchronous variant that blocks until offsets
2587    *         are committed or the commit fails (see return value).
2588    *
2589    * @remark If a RdKafka::OffsetCommitCb callback is registered it will
2590    *         be called with commit details on a future call to
2591    *         RdKafka::KafkaConsumer::consume()
2592 
2593    *
2594    * @returns ERR_NO_ERROR or error code.
2595    */
2596   virtual ErrorCode commitSync () = 0;
2597 
2598   /**
2599    * @brief Asynchronous version of RdKafka::KafkaConsumer::CommitSync()
2600    *
2601    * @sa RdKafka::KafkaConsumer::commitSync()
2602    */
2603   virtual ErrorCode commitAsync () = 0;
2604 
2605   /**
2606    * @brief Commit offset for a single topic+partition based on \p message
2607    *
2608    * @remark The offset committed will be the message's offset + 1.
2609    *
2610    * @remark This is the synchronous variant.
2611    *
2612    * @sa RdKafka::KafkaConsumer::commitSync()
2613    */
2614   virtual ErrorCode commitSync (Message *message) = 0;
2615 
2616   /**
2617    * @brief Commit offset for a single topic+partition based on \p message
2618    *
2619    * @remark The offset committed will be the message's offset + 1.
2620    *
2621    * @remark This is the asynchronous variant.
2622    *
2623    * @sa RdKafka::KafkaConsumer::commitSync()
2624    */
2625   virtual ErrorCode commitAsync (Message *message) = 0;
2626 
2627   /**
2628    * @brief Commit offsets for the provided list of partitions.
2629    *
2630    * @remark The \c .offset of the partitions in \p offsets should be the
2631    *         offset where consumption will resume, i.e., the last
2632    *         processed offset + 1.
2633    *
2634    * @remark This is the synchronous variant.
2635    */
2636   virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
2637 
2638   /**
2639    * @brief Commit offset for the provided list of partitions.
2640    *
2641    * @remark The \c .offset of the partitions in \p offsets should be the
2642    *         offset where consumption will resume, i.e., the last
2643    *         processed offset + 1.
2644    *
2645    * @remark This is the asynchronous variant.
2646    */
2647   virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
2648 
2649   /**
2650    * @brief Commit offsets for the current assignment.
2651    *
2652    * @remark This is the synchronous variant that blocks until offsets
2653    *         are committed or the commit fails (see return value).
2654    *
2655    * @remark The provided callback will be called from this function.
2656    *
2657    * @returns ERR_NO_ERROR or error code.
2658    */
2659   virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0;
2660 
2661   /**
2662    * @brief Commit offsets for the provided list of partitions.
2663    *
2664    * @remark This is the synchronous variant that blocks until offsets
2665    *         are committed or the commit fails (see return value).
2666    *
2667    * @remark The provided callback will be called from this function.
2668    *
2669    * @returns ERR_NO_ERROR or error code.
2670    */
2671   virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
2672                                 OffsetCommitCb *offset_commit_cb) = 0;
2673 
2674 
2675 
2676 
2677   /**
2678    * @brief Retrieve committed offsets for topics+partitions.
2679    *
2680    * @returns ERR_NO_ERROR on success in which case the
2681    *          \p offset or \p err field of each \p partitions' element is filled
2682    *          in with the stored offset, or a partition specific error.
2683    *          Else returns an error code.
2684    */
2685   virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
2686 			       int timeout_ms) = 0;
2687 
2688   /**
2689    * @brief Retrieve current positions (offsets) for topics+partitions.
2690    *
2691    * @returns ERR_NO_ERROR on success in which case the
2692    *          \p offset or \p err field of each \p partitions' element is filled
2693    *          in with the stored offset, or a partition specific error.
2694    *          Else returns an error code.
2695    */
2696   virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
2697 
2698 
2699   /**
2700    * For pausing and resuming consumption, see
2701    * @sa RdKafka::Handle::pause() and RdKafka::Handle::resume()
2702    */
2703 
2704 
2705   /**
2706    * @brief Close and shut down the proper.
2707    *
2708    * This call will block until the following operations are finished:
2709    *  - Trigger a local rebalance to void the current assignment
2710    *  - Stop consumption for current assignment
2711    *  - Commit offsets
2712    *  - Leave group
2713    *
2714    * The maximum blocking time is roughly limited to session.timeout.ms.
2715    *
2716    * @remark Callbacks, such as RdKafka::RebalanceCb and
2717    *         RdKafka::OffsetCommitCb, etc, may be called.
2718    *
2719    * @remark The consumer object must later be freed with \c delete
2720    */
2721   virtual ErrorCode close () = 0;
2722 
2723 
2724   /**
2725    * @brief Seek consumer for topic+partition to offset which is either an
2726    *        absolute or logical offset.
2727    *
2728    * If \p timeout_ms is not 0 the call will wait this long for the
2729    * seek to be performed. If the timeout is reached the internal state
2730    * will be unknown and this function returns `ERR__TIMED_OUT`.
2731    * If \p timeout_ms is 0 it will initiate the seek but return
2732    * immediately without any error reporting (e.g., async).
2733    *
2734    * This call triggers a fetch queue barrier flush.
2735    *
2736    * @remark Consumtion for the given partition must have started for the
2737    *         seek to work. Use assign() to set the starting offset.
2738    *
2739    * @returns an ErrorCode to indicate success or failure.
2740    */
2741   virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0;
2742 
2743 
2744   /**
2745    * @brief Store offset \p offset for topic partition \p partition.
2746    * The offset will be committed (written) to the offset store according
2747    * to \p auto.commit.interval.ms or the next manual offset-less commit*()
2748    *
2749    * Per-partition success/error status propagated through TopicPartition.err()
2750    *
2751    * @remark The \c .offset field is stored as is, it will NOT be + 1.
2752    *
2753    * @remark \c enable.auto.offset.store must be set to \c false when using
2754    *         this API.
2755    *
2756    * @returns RdKafka::ERR_NO_ERROR on success, or
2757    *          RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could
2758    *          be stored, or
2759    *          RdKafka::ERR___INVALID_ARG if \c enable.auto.offset.store is true.
2760    */
2761   virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
2762 
2763 
2764   /**
2765    * @returns the current consumer group metadata associated with this consumer,
2766    *          or NULL if the consumer is configured with a \c group.id.
2767    *          This metadata object should be passed to the transactional
2768    *          producer's RdKafka::Producer::send_offsets_to_transaction() API.
2769    *
2770    * @remark The returned object must be deleted by the application.
2771    *
2772    * @sa RdKafka::Producer::send_offsets_to_transaction()
2773    */
2774   virtual ConsumerGroupMetadata *groupMetadata () = 0;
2775 
2776 
2777   /** @brief Check whether the consumer considers the current assignment to
2778    *         have been lost involuntarily. This method is only applicable for
2779    *         use with a subscribing consumer. Assignments are revoked
2780    *         immediately when determined to have been lost, so this method is
2781    *         only useful within a rebalance callback. Partitions that have
2782    *         been lost may already be owned by other members in the group and
2783    *         therefore commiting offsets, for example, may fail.
2784    *
2785    * @remark Calling assign(), incremental_assign() or incremental_unassign()
2786    *         resets this flag.
2787    *
2788    * @returns Returns true if the current partition assignment is considered
2789    *          lost, false otherwise.
2790    */
2791   virtual bool assignment_lost () = 0;
2792 
2793   /**
2794    * @brief The rebalance protocol currently in use. This will be
2795    *        "NONE" if the consumer has not (yet) joined a group, else it will
2796    *        match the rebalance protocol ("EAGER", "COOPERATIVE") of the
2797    *        configured and selected assignor(s). All configured
2798    *        assignors must have the same protocol type, meaning
2799    *        online migration of a consumer group from using one
2800    *        protocol to another (in particular upgading from EAGER
2801    *        to COOPERATIVE) without a restart is not currently
2802    *        supported.
2803    *
2804    * @returns an empty string on error, or one of
2805    *          "NONE", "EAGER", "COOPERATIVE" on success.
2806    */
2807 
2808   virtual std::string rebalance_protocol () = 0;
2809 
2810 
2811   /**
2812    * @brief Incrementally add \p partitions to the current assignment.
2813    *
2814    * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used,
2815    * this method should be used in a rebalance callback to adjust the current
2816    * assignment appropriately in the case where the rebalance type is
2817    * ERR__ASSIGN_PARTITIONS. The application must pass the partition list
2818    * passed to the callback (or a copy of it), even if the list is empty.
2819    * This method may also be used outside the context of a rebalance callback.
2820    *
2821    * @returns NULL on success, or an error object if the operation was
2822    *          unsuccessful.
2823    *
2824    * @remark The returned object must be deleted by the application.
2825    */
2826   virtual Error *incremental_assign (const std::vector<TopicPartition*> &partitions) = 0;
2827 
2828 
2829   /**
2830    * @brief Incrementally remove \p partitions from the current assignment.
2831    *
2832    * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used,
2833    * this method should be used in a rebalance callback to adjust the current
2834    * assignment appropriately in the case where the rebalance type is
2835    * ERR__REVOKE_PARTITIONS. The application must pass the partition list
2836    * passed to the callback (or a copy of it), even if the list is empty.
2837    * This method may also be used outside the context of a rebalance callback.
2838    *
2839    * @returns NULL on success, or an error object if the operation was
2840    *          unsuccessful.
2841    *
2842    * @remark The returned object must be deleted by the application.
2843    */
2844   virtual Error *incremental_unassign (const std::vector<TopicPartition*> &partitions) = 0;
2845 
2846 };
2847 
2848 
2849 /**@}*/
2850 
2851 
2852 /**
2853  * @name Simple Consumer (legacy)
2854  * @{
2855  *
2856  */
2857 
2858 /**
2859  * @brief Simple Consumer (legacy)
2860  *
2861  * A simple non-balanced, non-group-aware, consumer.
2862  */
2863 class RD_EXPORT Consumer : public virtual Handle {
2864  public:
2865   /**
2866    * @brief Creates a new Kafka consumer handle.
2867    *
2868    * \p conf is an optional object that will be used instead of the default
2869    * configuration.
2870    * The \p conf object is reusable after this call.
2871    *
2872    * @returns the new handle on success or NULL on error in which case
2873    * \p errstr is set to a human readable error message.
2874    */
2875   static Consumer *create (const Conf *conf, std::string &errstr);
2876 
2877   virtual ~Consumer () = 0;
2878 
2879 
2880   /**
2881    * @brief Start consuming messages for topic and \p partition
2882    * at offset \p offset which may either be a proper offset (0..N)
2883    * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END.
2884    *
2885    * rdkafka will attempt to keep \p queued.min.messages (config property)
2886    * messages in the local queue by repeatedly fetching batches of messages
2887    * from the broker until the threshold is reached.
2888    *
2889    * The application shall use one of the \p ..->consume*() functions
2890    * to consume messages from the local queue, each kafka message being
2891    * represented as a `RdKafka::Message *` object.
2892    *
2893    * \p ..->start() must not be called multiple times for the same
2894    * topic and partition without stopping consumption first with
2895    * \p ..->stop().
2896    *
2897    * @returns an ErrorCode to indicate success or failure.
2898    */
2899   virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
2900 
2901   /**
2902    * @brief Start consuming messages for topic and \p partition on
2903    *        queue \p queue.
2904    *
2905    * @sa RdKafka::Consumer::start()
2906    */
2907   virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
2908                            Queue *queue) = 0;
2909 
2910   /**
2911    * @brief Stop consuming messages for topic and \p partition, purging
2912    *        all messages currently in the local queue.
2913    *
2914    * The application needs to be stop all consumers before destroying
2915    * the Consumer handle.
2916    *
2917    * @returns an ErrorCode to indicate success or failure.
2918    */
2919   virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
2920 
2921   /**
2922    * @brief Seek consumer for topic+partition to \p offset which is either an
2923    *        absolute or logical offset.
2924    *
2925    * If \p timeout_ms is not 0 the call will wait this long for the
2926    * seek to be performed. If the timeout is reached the internal state
2927    * will be unknown and this function returns `ERR__TIMED_OUT`.
2928    * If \p timeout_ms is 0 it will initiate the seek but return
2929    * immediately without any error reporting (e.g., async).
2930    *
2931    * This call triggers a fetch queue barrier flush.
2932    *
2933    * @returns an ErrorCode to indicate success or failure.
2934    */
2935   virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
2936 			  int timeout_ms) = 0;
2937 
2938   /**
2939    * @brief Consume a single message from \p topic and \p partition.
2940    *
2941    * \p timeout_ms is maximum amount of time to wait for a message to be
2942    * received.
2943    * Consumer must have been previously started with \p ..->start().
2944    *
2945    * @returns a Message object, the application needs to check if message
2946    * is an error or a proper message RdKafka::Message::err() and checking for
2947    * \p ERR_NO_ERROR.
2948    *
2949    * The message object must be destroyed when the application is done with it.
2950    *
2951    * Errors (in RdKafka::Message::err()):
2952    *  - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched.
2953    *  - ERR__PARTITION_EOF - End of partition reached, not an error.
2954    */
2955   virtual Message *consume (Topic *topic, int32_t partition,
2956                             int timeout_ms) = 0;
2957 
2958   /**
2959    * @brief Consume a single message from the specified queue.
2960    *
2961    * \p timeout_ms is maximum amount of time to wait for a message to be
2962    * received.
2963    * Consumer must have been previously started on the queue with
2964    * \p ..->start().
2965    *
2966    * @returns a Message object, the application needs to check if message
2967    * is an error or a proper message \p Message->err() and checking for
2968    * \p ERR_NO_ERROR.
2969    *
2970    * The message object must be destroyed when the application is done with it.
2971    *
2972    * Errors (in RdKafka::Message::err()):
2973    *   - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched
2974    *
2975    * Note that Message->topic() may be nullptr after certain kinds of
2976    * errors, so applications should check that it isn't null before
2977    * dereferencing it.
2978    */
2979   virtual Message *consume (Queue *queue, int timeout_ms) = 0;
2980 
2981   /**
2982    * @brief Consumes messages from \p topic and \p partition, calling
2983    *        the provided callback for each consumed messsage.
2984    *
2985    * \p consume_callback() provides higher throughput performance
2986    * than \p consume().
2987    *
2988    * \p timeout_ms is the maximum amount of time to wait for one or
2989    * more messages to arrive.
2990    *
2991    * The provided \p consume_cb instance has its \p consume_cb function
2992    * called for every message received.
2993    *
2994    * The \p opaque argument is passed to the \p consume_cb as \p opaque.
2995    *
2996    * @returns the number of messages processed or -1 on error.
2997    *
2998    * @sa RdKafka::Consumer::consume()
2999    */
3000   virtual int consume_callback (Topic *topic, int32_t partition,
3001                                 int timeout_ms,
3002                                 ConsumeCb *consume_cb,
3003                                 void *opaque) = 0;
3004 
3005   /**
3006    * @brief Consumes messages from \p queue, calling the provided callback for
3007    *        each consumed messsage.
3008    *
3009    * @sa RdKafka::Consumer::consume_callback()
3010    */
3011   virtual int consume_callback (Queue *queue, int timeout_ms,
3012                                 RdKafka::ConsumeCb *consume_cb,
3013                                 void *opaque) = 0;
3014 
3015   /**
3016    * @brief Converts an offset into the logical offset from the tail of a topic.
3017    *
3018    * \p offset is the (positive) number of items from the end.
3019    *
3020    * @returns the logical offset for message \p offset from the tail, this value
3021    *          may be passed to Consumer::start, et.al.
3022    * @remark The returned logical offset is specific to librdkafka.
3023    */
3024   static int64_t OffsetTail(int64_t offset);
3025 };
3026 
3027 /**@}*/
3028 
3029 
3030 /**
3031  * @name Producer
3032  * @{
3033  *
3034  */
3035 
3036 
3037 /**
3038  * @brief Producer
3039  */
3040 class RD_EXPORT Producer : public virtual Handle {
3041  public:
3042   /**
3043    * @brief Creates a new Kafka producer handle.
3044    *
3045    * \p conf is an optional object that will be used instead of the default
3046    * configuration.
3047    * The \p conf object is reusable after this call.
3048    *
3049    * @returns the new handle on success or NULL on error in which case
3050    *          \p errstr is set to a human readable error message.
3051    */
3052   static Producer *create (const Conf *conf, std::string &errstr);
3053 
3054 
3055   virtual ~Producer () = 0;
3056 
3057   /**
3058    * @brief RdKafka::Producer::produce() \p msgflags
3059    *
3060    * These flags are optional.
3061    */
3062   enum {
3063     RK_MSG_FREE = 0x1, /**< rdkafka will free(3) \p payload
3064                          * when it is done with it.
3065                          * Mutually exclusive with RK_MSG_COPY. */
3066     RK_MSG_COPY = 0x2, /**< the \p payload data will be copied
3067                         * and the \p payload pointer will not
3068                         * be used by rdkafka after the
3069                         * call returns.
3070                         * Mutually exclusive with RK_MSG_FREE. */
3071     RK_MSG_BLOCK = 0x4  /**< Block produce*() on message queue
3072                          *   full.
3073                          *   WARNING:
3074                          *   If a delivery report callback
3075                          *   is used the application MUST
3076                          *   call rd_kafka_poll() (or equiv.)
3077                          *   to make sure delivered messages
3078                          *   are drained from the internal
3079                          *   delivery report queue.
3080                          *   Failure to do so will result
3081                          *   in indefinately blocking on
3082                          *   the produce() call when the
3083                          *   message queue is full.
3084                          */
3085 
3086 
3087   /**@cond NO_DOC*/
3088   /* For backwards compatibility: */
3089 #ifndef MSG_COPY /* defined in sys/msg.h */
3090     , /** this comma must exist betwen
3091        *  RK_MSG_BLOCK and MSG_FREE
3092        */
3093     MSG_FREE = RK_MSG_FREE,
3094     MSG_COPY = RK_MSG_COPY
3095 #endif
3096   /**@endcond*/
3097   };
3098 
3099   /**
3100    * @brief Produce and send a single message to broker.
3101    *
3102    * This is an asynch non-blocking API.
3103    *
3104    * \p partition is the target partition, either:
3105    *   - RdKafka::Topic::PARTITION_UA (unassigned) for
3106    *     automatic partitioning using the topic's partitioner function, or
3107    *   - a fixed partition (0..N)
3108    *
3109    * \p msgflags is zero or more of the following flags OR:ed together:
3110    *    RK_MSG_BLOCK - block \p produce*() call if
3111    *                   \p queue.buffering.max.messages or
3112    *                   \p queue.buffering.max.kbytes are exceeded.
3113    *                   Messages are considered in-queue from the point they
3114    *                   are accepted by produce() until their corresponding
3115    *                   delivery report callback/event returns.
3116    *                   It is thus a requirement to call
3117    *                   poll() (or equiv.) from a separate
3118    *                   thread when RK_MSG_BLOCK is used.
3119    *                   See WARNING on \c RK_MSG_BLOCK above.
3120    *    RK_MSG_FREE - rdkafka will free(3) \p payload when it is done with it.
3121    *    RK_MSG_COPY - the \p payload data will be copied and the \p payload
3122    *               pointer will not be used by rdkafka after the
3123    *               call returns.
3124    *
3125    *  NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive.
3126    *
3127    *  If the function returns an error code and RK_MSG_FREE was specified, then
3128    *  the memory associated with the payload is still the caller's
3129    *  responsibility.
3130    *
3131    * \p payload is the message payload of size \p len bytes.
3132    *
3133    * \p key is an optional message key, if non-NULL it
3134    * will be passed to the topic partitioner as well as be sent with the
3135    * message to the broker and passed on to the consumer.
3136    *
3137    * \p msg_opaque is an optional application-provided per-message opaque
3138    * pointer that will provided in the delivery report callback (\p dr_cb) for
3139    * referencing this message.
3140    *
3141    * @returns an ErrorCode to indicate success or failure:
3142    *  - ERR_NO_ERROR           - message successfully enqueued for transmission.
3143    *
3144    *  - ERR__QUEUE_FULL        - maximum number of outstanding messages has been
3145    *                             reached: \c queue.buffering.max.message
3146    *
3147    *  - ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size:
3148    *                            \c messages.max.bytes
3149    *
3150    *  - ERR__UNKNOWN_PARTITION - requested \p partition is unknown in the
3151    *                           Kafka cluster.
3152    *
3153    *  - ERR__UNKNOWN_TOPIC     - topic is unknown in the Kafka cluster.
3154    */
3155   virtual ErrorCode produce (Topic *topic, int32_t partition,
3156                              int msgflags,
3157                              void *payload, size_t len,
3158                              const std::string *key,
3159                              void *msg_opaque) = 0;
3160 
3161   /**
3162    * @brief Variant produce() that passes the key as a pointer and length
3163    *        instead of as a const std::string *.
3164    */
3165   virtual ErrorCode produce (Topic *topic, int32_t partition,
3166                              int msgflags,
3167                              void *payload, size_t len,
3168                              const void *key, size_t key_len,
3169                              void *msg_opaque) = 0;
3170 
3171   /**
3172    * @brief produce() variant that takes topic as a string (no need for
3173    *        creating a Topic object), and also allows providing the
3174    *        message timestamp (milliseconds since beginning of epoch, UTC).
3175    *        Otherwise identical to produce() above.
3176    */
3177   virtual ErrorCode produce (const std::string topic_name, int32_t partition,
3178                              int msgflags,
3179                              void *payload, size_t len,
3180                              const void *key, size_t key_len,
3181                              int64_t timestamp, void *msg_opaque) = 0;
3182 
3183   /**
3184    * @brief produce() variant that that allows for Header support on produce
3185    *        Otherwise identical to produce() above.
3186    *
3187    * @warning The \p headers will be freed/deleted if the produce() call
3188    *          succeeds, or left untouched if produce() fails.
3189    */
3190   virtual ErrorCode produce (const std::string topic_name, int32_t partition,
3191                              int msgflags,
3192                              void *payload, size_t len,
3193                              const void *key, size_t key_len,
3194                              int64_t timestamp,
3195                              RdKafka::Headers *headers,
3196                              void *msg_opaque) = 0;
3197 
3198 
3199   /**
3200    * @brief Variant produce() that accepts vectors for key and payload.
3201    *        The vector data will be copied.
3202    */
3203   virtual ErrorCode produce (Topic *topic, int32_t partition,
3204                              const std::vector<char> *payload,
3205                              const std::vector<char> *key,
3206                              void *msg_opaque) = 0;
3207 
3208 
3209   /**
3210    * @brief Wait until all outstanding produce requests, et.al, are completed.
3211    *        This should typically be done prior to destroying a producer instance
3212    *        to make sure all queued and in-flight produce requests are completed
3213    *        before terminating.
3214    *
3215    * @remark This function will call Producer::poll() and thus
3216    *         trigger callbacks.
3217    *
3218    * @returns ERR__TIMED_OUT if \p timeout_ms was reached before all
3219    *          outstanding requests were completed, else ERR_NO_ERROR
3220    */
3221   virtual ErrorCode flush (int timeout_ms) = 0;
3222 
3223 
3224   /**
3225    * @brief Purge messages currently handled by the producer instance.
3226    *
3227    * @param purge_flags tells which messages should be purged and how.
3228    *
3229    * The application will need to call Handle::poll() or Producer::flush()
3230    * afterwards to serve the delivery report callbacks of the purged messages.
3231    *
3232    * Messages purged from internal queues fail with the delivery report
3233    * error code set to ERR__PURGE_QUEUE, while purged messages that
3234    * are in-flight to or from the broker will fail with the error code set to
3235    * ERR__PURGE_INFLIGHT.
3236    *
3237    * @warning Purging messages that are in-flight to or from the broker
3238    *          will ignore any sub-sequent acknowledgement for these messages
3239    *          received from the broker, effectively making it impossible
3240    *          for the application to know if the messages were successfully
3241    *          produced or not. This may result in duplicate messages if the
3242    *          application retries these messages at a later time.
3243    *
3244    * @remark This call may block for a short time while background thread
3245    *         queues are purged.
3246    *
3247    * @returns ERR_NO_ERROR on success,
3248    *          ERR__INVALID_ARG if the \p purge flags are invalid or unknown,
3249    *          ERR__NOT_IMPLEMENTED if called on a non-producer client instance.
3250    */
3251   virtual ErrorCode purge (int purge_flags) = 0;
3252 
3253   /**
3254    * @brief RdKafka::Handle::purge() \p purge_flags
3255    */
3256   enum {
3257     PURGE_QUEUE = 0x1, /**< Purge messages in internal queues */
3258 
3259     PURGE_INFLIGHT = 0x2, /*! Purge messages in-flight to or from the broker.
3260                            *  Purging these messages will void any future
3261                            *  acknowledgements from the broker, making it
3262                            *  impossible for the application to know if these
3263                            *  messages were successfully delivered or not.
3264                            *  Retrying these messages may lead to duplicates. */
3265 
3266     PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue
3267                               * purging to finish. */
3268   };
3269 
3270   /**
3271    * @name Transactional API
3272    * @{
3273    *
3274    * Requires Kafka broker version v0.11.0 or later
3275    *
3276    * See the Transactional API documentation in rdkafka.h for more information.
3277    */
3278 
3279   /**
3280    * @brief Initialize transactions for the producer instance.
3281    *
3282    * @param timeout_ms The maximum time to block. On timeout the operation
3283    *                   may continue in the background, depending on state,
3284    *                   and it is okay to call init_transactions() again.
3285    *
3286    * @returns an RdKafka::Error object on error, or NULL on success.
3287    *          Check whether the returned error object permits retrying
3288    *          by calling RdKafka::Error::is_retriable(), or whether a fatal
3289    *          error has been raised by calling RdKafka::Error::is_fatal().
3290    *
3291    * @remark The returned error object (if not NULL) must be deleted.
3292    *
3293    * See rd_kafka_init_transactions() in rdkafka.h for more information.
3294    *
3295    */
3296   virtual Error *init_transactions (int timeout_ms) = 0;
3297 
3298 
3299   /**
3300    * @brief init_transactions() must have been called successfully
3301    *        (once) before this function is called.
3302    *
3303    * @returns an RdKafka::Error object on error, or NULL on success.
3304    *          Check whether a fatal error has been raised by calling
3305    *          RdKafka::Error::is_fatal_error().
3306    *
3307    * @remark The returned error object (if not NULL) must be deleted.
3308    *
3309    * See rd_kafka_begin_transaction() in rdkafka.h for more information.
3310    */
3311   virtual Error *begin_transaction () = 0;
3312 
3313   /**
3314    * @brief Sends a list of topic partition offsets to the consumer group
3315    *        coordinator for \p group_metadata, and marks the offsets as part
3316    *        part of the current transaction.
3317    *        These offsets will be considered committed only if the transaction
3318    *        is committed successfully.
3319    *
3320    *        The offsets should be the next message your application will
3321    *        consume,
3322    *        i.e., the last processed message's offset + 1 for each partition.
3323    *        Either track the offsets manually during processing or use
3324    *        RdKafka::KafkaConsumer::position() (on the consumer) to get the
3325    *        current offsets for
3326    *        the partitions assigned to the consumer.
3327    *
3328    *        Use this method at the end of a consume-transform-produce loop prior
3329    *        to committing the transaction with commit_transaction().
3330    *
3331    * @param offsets List of offsets to commit to the consumer group upon
3332    *                successful commit of the transaction. Offsets should be
3333    *                the next message to consume,
3334    *                e.g., last processed message + 1.
3335    * @param group_metadata The current consumer group metadata as returned by
3336    *                   RdKafka::KafkaConsumer::groupMetadata() on the consumer
3337    *                   instance the provided offsets were consumed from.
3338    * @param timeout_ms Maximum time allowed to register the
3339    *                   offsets on the broker.
3340    *
3341    * @remark This function must be called on the transactional producer
3342    *         instance, not the consumer.
3343    *
3344    * @remark The consumer must disable auto commits
3345    *         (set \c enable.auto.commit to false on the consumer).
3346    *
3347    * @returns an RdKafka::Error object on error, or NULL on success.
3348    *          Check whether the returned error object permits retrying
3349    *          by calling RdKafka::Error::is_retriable(), or whether an abortable
3350    *          or fatal error has been raised by calling
3351    *          RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal()
3352    *          respectively.
3353    *
3354    * @remark The returned error object (if not NULL) must be deleted.
3355    *
3356    * See rd_kafka_send_offsets_to_transaction() in rdkafka.h for
3357    * more information.
3358    */
3359   virtual Error *send_offsets_to_transaction (
3360           const std::vector<TopicPartition*> &offsets,
3361           const ConsumerGroupMetadata *group_metadata,
3362           int timeout_ms) = 0;
3363 
3364   /**
3365    * @brief Commit the current transaction as started with begin_transaction().
3366    *
3367    *        Any outstanding messages will be flushed (delivered) before actually
3368    *        committing the transaction.
3369    *
3370    * @param timeout_ms The maximum time to block. On timeout the operation
3371    *                   may continue in the background, depending on state,
3372    *                   and it is okay to call this function again.
3373    *                   Pass -1 to use the remaining transaction timeout,
3374    *                   this is the recommended use.
3375    *
3376    * @remark It is strongly recommended to always pass -1 (remaining transaction
3377    *         time) as the \p timeout_ms. Using other values risk internal
3378    *         state desynchronization in case any of the underlying protocol
3379    *         requests fail.
3380    *
3381    * @returns an RdKafka::Error object on error, or NULL on success.
3382    *          Check whether the returned error object permits retrying
3383    *          by calling RdKafka::Error::is_retriable(), or whether an abortable
3384    *          or fatal error has been raised by calling
3385    *          RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal()
3386    *          respectively.
3387    *
3388    * @remark The returned error object (if not NULL) must be deleted.
3389    *
3390    * See rd_kafka_commit_transaction() in rdkafka.h for more information.
3391    */
3392   virtual Error *commit_transaction (int timeout_ms) = 0;
3393 
3394   /**
3395    * @brief Aborts the ongoing transaction.
3396    *
3397    *        This function should also be used to recover from non-fatal abortable
3398    *        transaction errors.
3399    *
3400    *        Any outstanding messages will be purged and fail with
3401    *        RdKafka::ERR__PURGE_INFLIGHT or RdKafka::ERR__PURGE_QUEUE.
3402    *        See RdKafka::Producer::purge() for details.
3403    *
3404    * @param timeout_ms The maximum time to block. On timeout the operation
3405    *                   may continue in the background, depending on state,
3406    *                   and it is okay to call this function again.
3407    *                   Pass -1 to use the remaining transaction timeout,
3408    *                   this is the recommended use.
3409    *
3410    * @remark It is strongly recommended to always pass -1 (remaining transaction
3411    *         time) as the \p timeout_ms. Using other values risk internal
3412    *         state desynchronization in case any of the underlying protocol
3413    *         requests fail.
3414    *
3415    * @returns an RdKafka::Error object on error, or NULL on success.
3416    *          Check whether the returned error object permits retrying
3417    *          by calling RdKafka::Error::is_retriable(), or whether a
3418    *          fatal error has been raised by calling RdKafka::Error::is_fatal().
3419    *
3420    * @remark The returned error object (if not NULL) must be deleted.
3421    *
3422    * See rd_kafka_abort_transaction() in rdkafka.h for more information.
3423    */
3424   virtual Error *abort_transaction (int timeout_ms) = 0;
3425 
3426   /**@}*/
3427 };
3428 
3429 /**@}*/
3430 
3431 
3432 /**
3433  * @name Metadata interface
3434  * @{
3435  *
3436  */
3437 
3438 
3439 /**
3440  * @brief Metadata: Broker information
3441  */
3442 class BrokerMetadata {
3443  public:
3444   /** @returns Broker id */
3445   virtual int32_t id() const = 0;
3446 
3447   /** @returns Broker hostname */
3448   virtual const std::string host() const = 0;
3449 
3450   /** @returns Broker listening port */
3451   virtual int port() const = 0;
3452 
3453   virtual ~BrokerMetadata() = 0;
3454 };
3455 
3456 
3457 
3458 /**
3459  * @brief Metadata: Partition information
3460  */
3461 class PartitionMetadata {
3462  public:
3463   /** @brief Replicas */
3464   typedef std::vector<int32_t> ReplicasVector;
3465   /** @brief ISRs (In-Sync-Replicas) */
3466   typedef std::vector<int32_t> ISRSVector;
3467 
3468   /** @brief Replicas iterator */
3469   typedef ReplicasVector::const_iterator ReplicasIterator;
3470   /** @brief ISRs iterator */
3471   typedef ISRSVector::const_iterator     ISRSIterator;
3472 
3473 
3474   /** @returns Partition id */
3475   virtual int32_t id() const = 0;
3476 
3477   /** @returns Partition error reported by broker */
3478   virtual ErrorCode err() const = 0;
3479 
3480   /** @returns Leader broker (id) for partition */
3481   virtual int32_t leader() const = 0;
3482 
3483   /** @returns Replica brokers */
3484   virtual const std::vector<int32_t> *replicas() const = 0;
3485 
3486   /** @returns In-Sync-Replica brokers
3487    *  @warning The broker may return a cached/outdated list of ISRs.
3488    */
3489   virtual const std::vector<int32_t> *isrs() const = 0;
3490 
3491   virtual ~PartitionMetadata() = 0;
3492 };
3493 
3494 
3495 
3496 /**
3497  * @brief Metadata: Topic information
3498  */
3499 class TopicMetadata {
3500  public:
3501   /** @brief Partitions */
3502   typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
3503   /** @brief Partitions iterator */
3504   typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
3505 
3506   /** @returns Topic name */
3507   virtual const std::string topic() const = 0;
3508 
3509   /** @returns Partition list */
3510   virtual const PartitionMetadataVector *partitions() const = 0;
3511 
3512   /** @returns Topic error reported by broker */
3513   virtual ErrorCode err() const = 0;
3514 
3515   virtual ~TopicMetadata() = 0;
3516 };
3517 
3518 
3519 /**
3520  * @brief Metadata container
3521  */
3522 class Metadata {
3523  public:
3524   /** @brief Brokers */
3525   typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
3526   /** @brief Topics */
3527   typedef std::vector<const TopicMetadata*>  TopicMetadataVector;
3528 
3529   /** @brief Brokers iterator */
3530   typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
3531   /** @brief Topics iterator */
3532   typedef TopicMetadataVector::const_iterator  TopicMetadataIterator;
3533 
3534 
3535   /**
3536    * @brief Broker list
3537    * @remark Ownership of the returned pointer is retained by the instance of
3538    * Metadata that is called.
3539    */
3540   virtual const BrokerMetadataVector *brokers() const = 0;
3541 
3542   /**
3543    * @brief Topic list
3544    * @remark Ownership of the returned pointer is retained by the instance of
3545    * Metadata that is called.
3546    */
3547   virtual const TopicMetadataVector  *topics() const = 0;
3548 
3549   /** @brief Broker (id) originating this metadata */
3550   virtual int32_t orig_broker_id() const = 0;
3551 
3552   /** @brief Broker (name) originating this metadata */
3553   virtual const std::string orig_broker_name() const = 0;
3554 
3555   virtual ~Metadata() = 0;
3556 };
3557 
3558 /**@}*/
3559 
3560 }
3561 
3562 
3563 #endif /* _RDKAFKACPP_H_ */
3564