1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
31 
32 /**
33  * @file rdkafkacpp.h
34  * @brief Apache Kafka C/C++ consumer and producer client library.
35  *
36  * rdkafkacpp.h contains the public C++ API for librdkafka.
37  * The API is documented in this file as comments prefixing the class,
38  * function, type, enum, define, etc.
39  * For more information, see the C interface in rdkafka.h and read the
40  * manual in INTRODUCTION.md.
41  * The C++ interface is STD C++ '03 compliant and adheres to the
42  * Google C++ Style Guide.
43 
44  * @sa For the C interface see rdkafka.h
45  *
46  * @tableofcontents
47  */
48 
49 /**@cond NO_DOC*/
50 #include <string>
51 #include <list>
52 #include <vector>
53 #include <cstdlib>
54 #include <cstring>
55 #include <stdint.h>
56 #include <sys/types.h>
57 
58 #ifdef _WIN32
59 #ifndef ssize_t
60 #ifndef _BASETSD_H_
61 #include <basetsd.h>
62 #endif
63 #ifndef _SSIZE_T_DEFINED
64 #define _SSIZE_T_DEFINED
65 typedef SSIZE_T ssize_t;
66 #endif
67 #endif
68 #undef RD_EXPORT
69 #ifdef LIBRDKAFKA_STATICLIB
70 #define RD_EXPORT
71 #else
72 #ifdef LIBRDKAFKACPP_EXPORTS
73 #define RD_EXPORT __declspec(dllexport)
74 #else
75 #define RD_EXPORT __declspec(dllimport)
76 #endif
77 #endif
78 #else
79 #define RD_EXPORT
80 #endif
81 
82 /**@endcond*/
83 
84 extern "C" {
85         /* Forward declarations */
86         struct rd_kafka_s;
87         struct rd_kafka_topic_s;
88         struct rd_kafka_message_s;
89         struct rd_kafka_conf_s;
90         struct rd_kafka_topic_conf_s;
91 }
92 
93 namespace RdKafka {
94 
95 /**
96  * @name Miscellaneous APIs
97  * @{
98  */
99 
100 /**
101  * @brief librdkafka version
102  *
103  * Interpreted as hex \c MM.mm.rr.xx:
104  *  - MM = Major
105  *  - mm = minor
106  *  - rr = revision
107  *  - xx = pre-release id (0xff is the final release)
108  *
109  * E.g.: \c 0x000801ff = 0.8.1
110  *
111  * @remark This value should only be used during compile time,
112  *         for runtime checks of version use RdKafka::version()
113  */
114 #define RD_KAFKA_VERSION  0x010802ff
115 
116 /**
117  * @brief Returns the librdkafka version as integer.
118  *
119  * @sa See RD_KAFKA_VERSION for how to parse the integer format.
120  */
121 RD_EXPORT
122 int          version ();
123 
124 /**
125  * @brief Returns the librdkafka version as string.
126  */
127 RD_EXPORT
128 std::string  version_str();
129 
130 /**
131  * @brief Returns a CSV list of the supported debug contexts
132  *        for use with Conf::Set("debug", ..).
133  */
134 RD_EXPORT
135 std::string get_debug_contexts();
136 
137 /**
138  * @brief Wait for all rd_kafka_t objects to be destroyed.
139  *
140  * @returns 0 if all kafka objects are now destroyed, or -1 if the
141  * timeout was reached.
142  * Since RdKafka handle deletion is an asynch operation the
143  * \p wait_destroyed() function can be used for applications where
144  * a clean shutdown is required.
145  */
146 RD_EXPORT
147 int          wait_destroyed(int timeout_ms);
148 
149 /**
150  * @brief Allocate memory using the same allocator librdkafka uses.
151  *
152  * This is typically an abstraction for the malloc(3) call and makes sure
153  * the application can use the same memory allocator as librdkafka for
154  * allocating pointers that are used by librdkafka.
155  *
156  * @remark Memory allocated by mem_malloc() must be freed using
157  *         mem_free().
158  */
159 RD_EXPORT
160 void *mem_malloc (size_t size);
161 
162 /**
163  * @brief Free pointer returned by librdkafka
164  *
165  * This is typically an abstraction for the free(3) call and makes sure
166  * the application can use the same memory allocator as librdkafka for
167  * freeing pointers returned by librdkafka.
168  *
169  * In standard setups it is usually not necessary to use this interface
170  * rather than the free(3) function.
171  *
172  * @remark mem_free() must only be used for pointers returned by APIs
173  *         that explicitly mention using this function for freeing.
174  */
175 RD_EXPORT
176 void mem_free (void *ptr);
177 
178 /**@}*/
179 
180 
181 
182 /**
183  * @name Constants, errors, types
184  * @{
185  *
186  *
187  */
188 
189 /**
190  * @brief Error codes.
191  *
192  * The negative error codes delimited by two underscores
193  * (\c _ERR__..) denotes errors internal to librdkafka and are
194  * displayed as \c \"Local: \<error string..\>\", while the error codes
195  * delimited by a single underscore (\c ERR_..) denote broker
196  * errors and are displayed as \c \"Broker: \<error string..\>\".
197  *
198  * @sa Use RdKafka::err2str() to translate an error code a human readable string
199  */
200 enum ErrorCode {
201 	/* Internal errors to rdkafka: */
202 	/** Begin internal error codes */
203 	ERR__BEGIN = -200,
204 	/** Received message is incorrect */
205 	ERR__BAD_MSG = -199,
206 	/** Bad/unknown compression */
207 	ERR__BAD_COMPRESSION = -198,
208 	/** Broker is going away */
209 	ERR__DESTROY = -197,
210 	/** Generic failure */
211 	ERR__FAIL = -196,
212 	/** Broker transport failure */
213 	ERR__TRANSPORT = -195,
214 	/** Critical system resource */
215 	ERR__CRIT_SYS_RESOURCE = -194,
216 	/** Failed to resolve broker */
217 	ERR__RESOLVE = -193,
218 	/** Produced message timed out*/
219 	ERR__MSG_TIMED_OUT = -192,
220 	/** Reached the end of the topic+partition queue on
221 	 *  the broker. Not really an error.
222 	 *  This event is disabled by default,
223 	 *  see the `enable.partition.eof` configuration property. */
224 	ERR__PARTITION_EOF = -191,
225 	/** Permanent: Partition does not exist in cluster. */
226 	ERR__UNKNOWN_PARTITION = -190,
227 	/** File or filesystem error */
228 	ERR__FS = -189,
229 	 /** Permanent: Topic does not exist in cluster. */
230 	ERR__UNKNOWN_TOPIC = -188,
231 	/** All broker connections are down. */
232 	ERR__ALL_BROKERS_DOWN = -187,
233 	/** Invalid argument, or invalid configuration */
234 	ERR__INVALID_ARG = -186,
235 	/** Operation timed out */
236 	ERR__TIMED_OUT = -185,
237 	/** Queue is full */
238 	ERR__QUEUE_FULL = -184,
239 	/** ISR count < required.acks */
240         ERR__ISR_INSUFF = -183,
241 	/** Broker node update */
242         ERR__NODE_UPDATE = -182,
243 	/** SSL error */
244 	ERR__SSL = -181,
245 	/** Waiting for coordinator to become available. */
246         ERR__WAIT_COORD = -180,
247 	/** Unknown client group */
248         ERR__UNKNOWN_GROUP = -179,
249 	/** Operation in progress */
250         ERR__IN_PROGRESS = -178,
251 	 /** Previous operation in progress, wait for it to finish. */
252         ERR__PREV_IN_PROGRESS = -177,
253 	 /** This operation would interfere with an existing subscription */
254         ERR__EXISTING_SUBSCRIPTION = -176,
255 	/** Assigned partitions (rebalance_cb) */
256         ERR__ASSIGN_PARTITIONS = -175,
257 	/** Revoked partitions (rebalance_cb) */
258         ERR__REVOKE_PARTITIONS = -174,
259 	/** Conflicting use */
260         ERR__CONFLICT = -173,
261 	/** Wrong state */
262         ERR__STATE = -172,
263 	/** Unknown protocol */
264         ERR__UNKNOWN_PROTOCOL = -171,
265 	/** Not implemented */
266         ERR__NOT_IMPLEMENTED = -170,
267 	/** Authentication failure*/
268 	ERR__AUTHENTICATION = -169,
269 	/** No stored offset */
270 	ERR__NO_OFFSET = -168,
271 	/** Outdated */
272 	ERR__OUTDATED = -167,
273 	/** Timed out in queue */
274 	ERR__TIMED_OUT_QUEUE = -166,
275         /** Feature not supported by broker */
276         ERR__UNSUPPORTED_FEATURE = -165,
277         /** Awaiting cache update */
278         ERR__WAIT_CACHE = -164,
279         /** Operation interrupted */
280         ERR__INTR = -163,
281         /** Key serialization error */
282         ERR__KEY_SERIALIZATION = -162,
283         /** Value serialization error */
284         ERR__VALUE_SERIALIZATION = -161,
285         /** Key deserialization error */
286         ERR__KEY_DESERIALIZATION = -160,
287         /** Value deserialization error */
288         ERR__VALUE_DESERIALIZATION = -159,
289         /** Partial response */
290         ERR__PARTIAL = -158,
291         /** Modification attempted on read-only object */
292         ERR__READ_ONLY = -157,
293         /** No such entry / item not found */
294         ERR__NOENT = -156,
295         /** Read underflow */
296         ERR__UNDERFLOW = -155,
297         /** Invalid type */
298         ERR__INVALID_TYPE = -154,
299         /** Retry operation */
300         ERR__RETRY = -153,
301         /** Purged in queue */
302         ERR__PURGE_QUEUE = -152,
303         /** Purged in flight */
304         ERR__PURGE_INFLIGHT = -151,
305         /** Fatal error: see RdKafka::Handle::fatal_error() */
306         ERR__FATAL = -150,
307         /** Inconsistent state */
308         ERR__INCONSISTENT = -149,
309         /** Gap-less ordering would not be guaranteed if proceeding */
310         ERR__GAPLESS_GUARANTEE = -148,
311         /** Maximum poll interval exceeded */
312         ERR__MAX_POLL_EXCEEDED = -147,
313         /** Unknown broker */
314         ERR__UNKNOWN_BROKER = -146,
315         /** Functionality not configured */
316         ERR__NOT_CONFIGURED = -145,
317         /** Instance has been fenced */
318         ERR__FENCED = -144,
319         /** Application generated error */
320         ERR__APPLICATION = -143,
321         /** Assignment lost */
322         ERR__ASSIGNMENT_LOST = -142,
323         /** No operation performed */
324         ERR__NOOP = -141,
325         /** No offset to automatically reset to */
326         ERR__AUTO_OFFSET_RESET = -140,
327 
328         /** End internal error codes */
329 	ERR__END = -100,
330 
331 	/* Kafka broker errors: */
332 	/** Unknown broker error */
333 	ERR_UNKNOWN = -1,
334 	/** Success */
335 	ERR_NO_ERROR = 0,
336 	/** Offset out of range */
337 	ERR_OFFSET_OUT_OF_RANGE = 1,
338 	/** Invalid message */
339 	ERR_INVALID_MSG = 2,
340 	/** Unknown topic or partition */
341 	ERR_UNKNOWN_TOPIC_OR_PART = 3,
342 	/** Invalid message size */
343 	ERR_INVALID_MSG_SIZE = 4,
344 	/** Leader not available */
345 	ERR_LEADER_NOT_AVAILABLE = 5,
346 	/** Not leader for partition */
347 	ERR_NOT_LEADER_FOR_PARTITION = 6,
348 	/** Request timed out */
349 	ERR_REQUEST_TIMED_OUT = 7,
350 	/** Broker not available */
351 	ERR_BROKER_NOT_AVAILABLE = 8,
352 	/** Replica not available */
353 	ERR_REPLICA_NOT_AVAILABLE = 9,
354 	/** Message size too large */
355 	ERR_MSG_SIZE_TOO_LARGE = 10,
356 	/** StaleControllerEpochCode */
357 	ERR_STALE_CTRL_EPOCH = 11,
358 	/** Offset metadata string too large */
359 	ERR_OFFSET_METADATA_TOO_LARGE = 12,
360 	/** Broker disconnected before response received */
361 	ERR_NETWORK_EXCEPTION = 13,
362         /** Coordinator load in progress */
363         ERR_COORDINATOR_LOAD_IN_PROGRESS = 14,
364         /** Group coordinator load in progress */
365 #define ERR_GROUP_LOAD_IN_PROGRESS           ERR_COORDINATOR_LOAD_IN_PROGRESS
366         /** Coordinator not available */
367         ERR_COORDINATOR_NOT_AVAILABLE = 15,
368         /** Group coordinator not available */
369 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE  ERR_COORDINATOR_NOT_AVAILABLE
370         /** Not coordinator */
371         ERR_NOT_COORDINATOR = 16,
372         /** Not coordinator for group */
373 #define ERR_NOT_COORDINATOR_FOR_GROUP        ERR_NOT_COORDINATOR
374 	/** Invalid topic */
375         ERR_TOPIC_EXCEPTION = 17,
376 	/** Message batch larger than configured server segment size */
377         ERR_RECORD_LIST_TOO_LARGE = 18,
378 	/** Not enough in-sync replicas */
379         ERR_NOT_ENOUGH_REPLICAS = 19,
380 	/** Message(s) written to insufficient number of in-sync replicas */
381         ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
382 	/** Invalid required acks value */
383         ERR_INVALID_REQUIRED_ACKS = 21,
384 	/** Specified group generation id is not valid */
385         ERR_ILLEGAL_GENERATION = 22,
386 	/** Inconsistent group protocol */
387         ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
388 	/** Invalid group.id */
389 	ERR_INVALID_GROUP_ID = 24,
390 	/** Unknown member */
391         ERR_UNKNOWN_MEMBER_ID = 25,
392 	/** Invalid session timeout */
393         ERR_INVALID_SESSION_TIMEOUT = 26,
394 	/** Group rebalance in progress */
395 	ERR_REBALANCE_IN_PROGRESS = 27,
396 	/** Commit offset data size is not valid */
397         ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
398 	/** Topic authorization failed */
399         ERR_TOPIC_AUTHORIZATION_FAILED = 29,
400 	/** Group authorization failed */
401 	ERR_GROUP_AUTHORIZATION_FAILED = 30,
402 	/** Cluster authorization failed */
403 	ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
404         /** Invalid timestamp */
405         ERR_INVALID_TIMESTAMP = 32,
406         /** Unsupported SASL mechanism */
407         ERR_UNSUPPORTED_SASL_MECHANISM = 33,
408         /** Illegal SASL state */
409         ERR_ILLEGAL_SASL_STATE = 34,
410         /** Unuspported version */
411         ERR_UNSUPPORTED_VERSION = 35,
412         /** Topic already exists */
413         ERR_TOPIC_ALREADY_EXISTS = 36,
414         /** Invalid number of partitions */
415         ERR_INVALID_PARTITIONS = 37,
416         /** Invalid replication factor */
417         ERR_INVALID_REPLICATION_FACTOR = 38,
418         /** Invalid replica assignment */
419         ERR_INVALID_REPLICA_ASSIGNMENT = 39,
420         /** Invalid config */
421         ERR_INVALID_CONFIG = 40,
422         /** Not controller for cluster */
423         ERR_NOT_CONTROLLER = 41,
424         /** Invalid request */
425         ERR_INVALID_REQUEST = 42,
426         /** Message format on broker does not support request */
427         ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
428         /** Policy violation */
429         ERR_POLICY_VIOLATION = 44,
430         /** Broker received an out of order sequence number */
431         ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
432         /** Broker received a duplicate sequence number */
433         ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
434         /** Producer attempted an operation with an old epoch */
435         ERR_INVALID_PRODUCER_EPOCH = 47,
436         /** Producer attempted a transactional operation in an invalid state */
437         ERR_INVALID_TXN_STATE = 48,
438         /** Producer attempted to use a producer id which is not
439          *  currently assigned to its transactional id */
440         ERR_INVALID_PRODUCER_ID_MAPPING = 49,
441         /** Transaction timeout is larger than the maximum
442          *  value allowed by the broker's max.transaction.timeout.ms */
443         ERR_INVALID_TRANSACTION_TIMEOUT = 50,
444         /** Producer attempted to update a transaction while another
445          *  concurrent operation on the same transaction was ongoing */
446         ERR_CONCURRENT_TRANSACTIONS = 51,
447         /** Indicates that the transaction coordinator sending a
448          *  WriteTxnMarker is no longer the current coordinator for a
449          *  given producer */
450         ERR_TRANSACTION_COORDINATOR_FENCED = 52,
451         /** Transactional Id authorization failed */
452         ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
453         /** Security features are disabled */
454         ERR_SECURITY_DISABLED = 54,
455         /** Operation not attempted */
456         ERR_OPERATION_NOT_ATTEMPTED = 55,
457         /** Disk error when trying to access log file on the disk */
458         ERR_KAFKA_STORAGE_ERROR = 56,
459         /** The user-specified log directory is not found in the broker config */
460         ERR_LOG_DIR_NOT_FOUND = 57,
461         /** SASL Authentication failed */
462         ERR_SASL_AUTHENTICATION_FAILED = 58,
463         /** Unknown Producer Id */
464         ERR_UNKNOWN_PRODUCER_ID = 59,
465         /** Partition reassignment is in progress */
466         ERR_REASSIGNMENT_IN_PROGRESS = 60,
467         /** Delegation Token feature is not enabled */
468         ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61,
469         /** Delegation Token is not found on server */
470         ERR_DELEGATION_TOKEN_NOT_FOUND = 62,
471         /** Specified Principal is not valid Owner/Renewer */
472         ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63,
473         /** Delegation Token requests are not allowed on this connection */
474         ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64,
475         /** Delegation Token authorization failed */
476         ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65,
477         /** Delegation Token is expired */
478         ERR_DELEGATION_TOKEN_EXPIRED = 66,
479         /** Supplied principalType is not supported */
480         ERR_INVALID_PRINCIPAL_TYPE = 67,
481         /** The group is not empty */
482         ERR_NON_EMPTY_GROUP = 68,
483         /** The group id does not exist */
484         ERR_GROUP_ID_NOT_FOUND = 69,
485         /** The fetch session ID was not found */
486         ERR_FETCH_SESSION_ID_NOT_FOUND = 70,
487         /** The fetch session epoch is invalid */
488         ERR_INVALID_FETCH_SESSION_EPOCH = 71,
489         /** No matching listener */
490         ERR_LISTENER_NOT_FOUND = 72,
491         /** Topic deletion is disabled */
492         ERR_TOPIC_DELETION_DISABLED = 73,
493         /** Leader epoch is older than broker epoch */
494         ERR_FENCED_LEADER_EPOCH = 74,
495         /** Leader epoch is newer than broker epoch */
496         ERR_UNKNOWN_LEADER_EPOCH = 75,
497         /** Unsupported compression type */
498         ERR_UNSUPPORTED_COMPRESSION_TYPE = 76,
499         /** Broker epoch has changed */
500         ERR_STALE_BROKER_EPOCH = 77,
501         /** Leader high watermark is not caught up */
502         ERR_OFFSET_NOT_AVAILABLE = 78,
503         /** Group member needs a valid member ID */
504         ERR_MEMBER_ID_REQUIRED = 79,
505         /** Preferred leader was not available */
506         ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80,
507         /** Consumer group has reached maximum size */
508         ERR_GROUP_MAX_SIZE_REACHED = 81,
509         /** Static consumer fenced by other consumer with same
510          * group.instance.id. */
511         ERR_FENCED_INSTANCE_ID = 82,
512         /** Eligible partition leaders are not available */
513         ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE = 83,
514         /** Leader election not needed for topic partition */
515         ERR_ELECTION_NOT_NEEDED = 84,
516         /** No partition reassignment is in progress */
517         ERR_NO_REASSIGNMENT_IN_PROGRESS = 85,
518         /** Deleting offsets of a topic while the consumer group is
519          *  subscribed to it */
520         ERR_GROUP_SUBSCRIBED_TO_TOPIC = 86,
521         /** Broker failed to validate record */
522         ERR_INVALID_RECORD = 87,
523         /** There are unstable offsets that need to be cleared */
524         ERR_UNSTABLE_OFFSET_COMMIT = 88,
525         /** Throttling quota has been exceeded */
526         ERR_THROTTLING_QUOTA_EXCEEDED = 89,
527         /** There is a newer producer with the same transactionalId
528          *  which fences the current one */
529         ERR_PRODUCER_FENCED = 90,
530         /** Request illegally referred to resource that does not exist */
531         ERR_RESOURCE_NOT_FOUND = 91,
532         /** Request illegally referred to the same resource twice */
533         ERR_DUPLICATE_RESOURCE = 92,
534         /** Requested credential would not meet criteria for acceptability */
535         ERR_UNACCEPTABLE_CREDENTIAL = 93,
536         /** Indicates that the either the sender or recipient of a
537          *  voter-only request is not one of the expected voters */
538         ERR_INCONSISTENT_VOTER_SET = 94,
539         /** Invalid update version */
540         ERR_INVALID_UPDATE_VERSION = 95,
541         /** Unable to update finalized features due to server error */
542         ERR_FEATURE_UPDATE_FAILED = 96,
543         /** Request principal deserialization failed during forwarding */
544         ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97
545 };
546 
547 
548 /**
549  * @brief Returns a human readable representation of a kafka error.
550  */
551 RD_EXPORT
552 std::string  err2str(RdKafka::ErrorCode err);
553 
554 
555 
556 /**
557  * @enum CertificateType
558  * @brief SSL certificate types
559  */
560 enum CertificateType {
561   CERT_PUBLIC_KEY,   /**< Client's public key */
562   CERT_PRIVATE_KEY,  /**< Client's private key */
563   CERT_CA,           /**< CA certificate */
564   CERT__CNT
565 };
566 
567 /**
568  * @enum CertificateEncoding
569  * @brief SSL certificate encoding
570  */
571 enum CertificateEncoding {
572   CERT_ENC_PKCS12,  /**< PKCS#12 */
573   CERT_ENC_DER,     /**< DER / binary X.509 ASN1 */
574   CERT_ENC_PEM,     /**< PEM */
575   CERT_ENC__CNT
576 };
577 
578 /**@} */
579 
580 
581 
582 /**@cond NO_DOC*/
583 /* Forward declarations */
584 class Handle;
585 class Producer;
586 class Message;
587 class Headers;
588 class Queue;
589 class Event;
590 class Topic;
591 class TopicPartition;
592 class Metadata;
593 class KafkaConsumer;
594 /**@endcond*/
595 
596 
597 /**
598  * @name Error class
599  * @{
600  *
601  */
602 
603 /**
604  * @brief The Error class is used as a return value from APIs to propagate
605  *        an error. The error consists of an error code which is to be used
606  *        programatically, an error string for showing to the user,
607  *        and various error flags that can be used programmatically to decide
608  *        how to handle the error; e.g., should the operation be retried,
609  *        was it a fatal error, etc.
610  *
611  * Error objects must be deleted explicitly to free its resources.
612  */
613 class RD_EXPORT Error {
614  public:
615 
616  /**
617   * @brief Create error object.
618   */
619  static Error *create (ErrorCode code, const std::string *errstr);
620 
~Error()621  virtual ~Error () { }
622 
623  /*
624   * Error accessor methods
625   */
626 
627  /**
628   * @returns the error code, e.g., RdKafka::ERR_UNKNOWN_MEMBER_ID.
629   */
630  virtual ErrorCode code () const = 0;
631 
632  /**
633   * @returns the error code name, e.g, "ERR_UNKNOWN_MEMBER_ID".
634   */
635  virtual std::string name () const = 0;
636 
637   /**
638    * @returns a human readable error string.
639    */
640  virtual std::string str () const = 0;
641 
642  /**
643   * @returns true if the error is a fatal error, indicating that the client
644   *          instance is no longer usable, else false.
645   */
646  virtual bool is_fatal () const = 0;
647 
648  /**
649   * @returns true if the operation may be retried, else false.
650   */
651  virtual bool is_retriable () const = 0;
652 
653  /**
654   * @returns true if the error is an abortable transaction error in which case
655   *          the application must call RdKafka::Producer::abort_transaction()
656   *          and start a new transaction with
657   *          RdKafka::Producer::begin_transaction() if it wishes to proceed
658   *          with transactions.
659   *          Else returns false.
660   *
661   * @remark The return value of this method is only valid for errors returned
662   *         by the transactional API.
663   */
664  virtual bool txn_requires_abort () const = 0;
665 };
666 
667 /**@}*/
668 
669 
670 /**
671  * @name Callback classes
672  * @{
673  *
674  *
675  * librdkafka uses (optional) callbacks to propagate information and
676  * delegate decisions to the application logic.
677  *
678  * An application must call RdKafka::poll() at regular intervals to
679  * serve queued callbacks.
680  */
681 
682 
683 /**
684  * @brief Delivery Report callback class
685  *
686  * The delivery report callback will be called once for each message
687  * accepted by RdKafka::Producer::produce() (et.al) with
688  * RdKafka::Message::err() set to indicate the result of the produce request.
689  *
690  * The callback is called when a message is succesfully produced or
691  * if librdkafka encountered a permanent failure, or the retry counter for
692  * temporary errors has been exhausted.
693  *
694  * An application must call RdKafka::poll() at regular intervals to
695  * serve queued delivery report callbacks.
696 
697  */
698 class RD_EXPORT DeliveryReportCb {
699  public:
700   /**
701    * @brief Delivery report callback.
702    */
703   virtual void dr_cb (Message &message) = 0;
704 
~DeliveryReportCb()705   virtual ~DeliveryReportCb() { }
706 };
707 
708 
709 /**
710  * @brief SASL/OAUTHBEARER token refresh callback class
711  *
712  * The SASL/OAUTHBEARER token refresh callback is triggered via RdKafka::poll()
713  * whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved,
714  * typically based on the configuration defined in \c sasl.oauthbearer.config.
715  *
716  * The \c oauthbearer_config argument is the value of the
717  * \c sasl.oauthbearer.config configuration property.
718  *
719  * The callback should invoke RdKafka::Handle::oauthbearer_set_token() or
720  * RdKafka::Handle::oauthbearer_set_token_failure() to indicate success or
721  * failure, respectively.
722  *
723  * The refresh operation is eventable and may be received when an event
724  * callback handler is set with an event type of
725  * \c RdKafka::Event::EVENT_OAUTHBEARER_TOKEN_REFRESH.
726  *
727  * Note that before any SASL/OAUTHBEARER broker connection can succeed the
728  * application must call RdKafka::Handle::oauthbearer_set_token() once -- either
729  * directly or, more typically, by invoking RdKafka::poll() -- in order to
730  * cause retrieval of an initial token to occur.
731  *
732  * An application must call RdKafka::poll() at regular intervals to
733  * serve queued SASL/OAUTHBEARER token refresh callbacks (when
734  * OAUTHBEARER is the SASL mechanism).
735  */
736 class RD_EXPORT OAuthBearerTokenRefreshCb {
737  public:
738   /**
739    * @brief SASL/OAUTHBEARER token refresh callback class.
740    *
741    * @param handle The RdKafka::Handle which requires a refreshed token.
742    * @param oauthbearer_config The value of the
743    * \p sasl.oauthbearer.config configuration property for \p handle.
744    */
745   virtual void oauthbearer_token_refresh_cb (RdKafka::Handle* handle,
746                                              const std::string &oauthbearer_config) = 0;
747 
~OAuthBearerTokenRefreshCb()748   virtual ~OAuthBearerTokenRefreshCb() { }
749 };
750 
751 
752 /**
753  * @brief Partitioner callback class
754  *
755  * Generic partitioner callback class for implementing custom partitioners.
756  *
757  * @sa RdKafka::Conf::set() \c "partitioner_cb"
758  */
759 class RD_EXPORT PartitionerCb {
760  public:
761   /**
762    * @brief Partitioner callback
763    *
764    * Return the partition to use for \p key in \p topic.
765    *
766    * The \p msg_opaque is the same \p msg_opaque provided in the
767    * RdKafka::Producer::produce() call.
768    *
769    * @remark \p key may be NULL or the empty.
770    *
771    * @returns Must return a value between 0 and \p partition_cnt (non-inclusive).
772    *          May return RD_KAFKA_PARTITION_UA (-1) if partitioning failed.
773    *
774    * @sa The callback may use RdKafka::Topic::partition_available() to check
775    *     if a partition has an active leader broker.
776    */
777   virtual int32_t partitioner_cb (const Topic *topic,
778                                   const std::string *key,
779                                   int32_t partition_cnt,
780                                   void *msg_opaque) = 0;
781 
~PartitionerCb()782   virtual ~PartitionerCb() { }
783 };
784 
785 /**
786  * @brief  Variant partitioner with key pointer
787  *
788  */
789 class PartitionerKeyPointerCb {
790  public:
791   /**
792    * @brief Variant partitioner callback that gets \p key as pointer and length
793    *        instead of as a const std::string *.
794    *
795    * @remark \p key may be NULL or have \p key_len 0.
796    *
797    * @sa See RdKafka::PartitionerCb::partitioner_cb() for exact semantics
798    */
799   virtual int32_t partitioner_cb (const Topic *topic,
800                                   const void *key,
801                                   size_t key_len,
802                                   int32_t partition_cnt,
803                                   void *msg_opaque) = 0;
804 
~PartitionerKeyPointerCb()805   virtual ~PartitionerKeyPointerCb() { }
806 };
807 
808 
809 
810 /**
811  * @brief Event callback class
812  *
813  * Events are a generic interface for propagating errors, statistics, logs, etc
814  * from librdkafka to the application.
815  *
816  * @sa RdKafka::Event
817  */
818 class RD_EXPORT EventCb {
819  public:
820   /**
821    * @brief Event callback
822    *
823    * @sa RdKafka::Event
824    */
825   virtual void event_cb (Event &event) = 0;
826 
~EventCb()827   virtual ~EventCb() { }
828 };
829 
830 
831 /**
832  * @brief Event object class as passed to the EventCb callback.
833  */
834 class RD_EXPORT Event {
835  public:
836   /** @brief Event type */
837   enum Type {
838     EVENT_ERROR,     /**< Event is an error condition */
839     EVENT_STATS,     /**< Event is a statistics JSON document */
840     EVENT_LOG,       /**< Event is a log message */
841     EVENT_THROTTLE   /**< Event is a throttle level signaling from the broker */
842   };
843 
844   /** @brief EVENT_LOG severities (conforms to syslog(3) severities) */
845   enum Severity {
846     EVENT_SEVERITY_EMERG = 0,
847     EVENT_SEVERITY_ALERT = 1,
848     EVENT_SEVERITY_CRITICAL = 2,
849     EVENT_SEVERITY_ERROR = 3,
850     EVENT_SEVERITY_WARNING = 4,
851     EVENT_SEVERITY_NOTICE = 5,
852     EVENT_SEVERITY_INFO = 6,
853     EVENT_SEVERITY_DEBUG = 7
854   };
855 
~Event()856   virtual ~Event () { }
857 
858   /*
859    * Event Accessor methods
860    */
861 
862   /**
863    * @returns The event type
864    * @remark Applies to all event types
865    */
866   virtual Type        type () const = 0;
867 
868   /**
869    * @returns Event error, if any.
870    * @remark Applies to all event types except THROTTLE
871    */
872   virtual ErrorCode   err () const = 0;
873 
874   /**
875    * @returns Log severity level.
876    * @remark Applies to LOG event type.
877    */
878   virtual Severity    severity () const = 0;
879 
880   /**
881    * @returns Log facility string.
882    * @remark Applies to LOG event type.
883    */
884   virtual std::string fac () const = 0;
885 
886   /**
887    * @returns Log message string.
888    *
889    * \c EVENT_LOG: Log message string.
890    * \c EVENT_STATS: JSON object (as string).
891    *
892    * @remark Applies to LOG event type.
893    */
894   virtual std::string str () const = 0;
895 
896   /**
897    * @returns Throttle time in milliseconds.
898    * @remark Applies to THROTTLE event type.
899    */
900   virtual int         throttle_time () const = 0;
901 
902   /**
903    * @returns Throttling broker's name.
904    * @remark Applies to THROTTLE event type.
905    */
906   virtual std::string broker_name () const = 0;
907 
908   /**
909    * @returns Throttling broker's id.
910    * @remark Applies to THROTTLE event type.
911    */
912   virtual int         broker_id () const = 0;
913 
914 
915   /**
916    * @returns true if this is a fatal error.
917    * @remark Applies to ERROR event type.
918    * @sa RdKafka::Handle::fatal_error()
919    */
920   virtual bool        fatal () const = 0;
921 };
922 
923 
924 
925 /**
926  * @brief Consume callback class
927  */
928 class RD_EXPORT ConsumeCb {
929  public:
930   /**
931    * @brief The consume callback is used with
932    *        RdKafka::Consumer::consume_callback()
933    *        methods and will be called for each consumed \p message.
934    *
935    * The callback interface is optional but provides increased performance.
936    */
937   virtual void consume_cb (Message &message, void *opaque) = 0;
938 
~ConsumeCb()939   virtual ~ConsumeCb() { }
940 };
941 
942 
943 /**
944  * @brief \b KafkaConsumer: Rebalance callback class
945  */
946 class RD_EXPORT RebalanceCb {
947 public:
948   /**
949    * @brief Group rebalance callback for use with RdKafka::KafkaConsumer
950    *
951    * Registering a \p rebalance_cb turns off librdkafka's automatic
952    * partition assignment/revocation and instead delegates that responsibility
953    * to the application's \p rebalance_cb.
954    *
955    * The rebalance callback is responsible for updating librdkafka's
956    * assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS
957    * and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle
958    * arbitrary rebalancing failures where \p err is neither of those.
959    * @remark In this latter case (arbitrary error), the application must
960    *         call unassign() to synchronize state.
961    *
962    * For eager/non-cooperative `partition.assignment.strategy` assignors,
963    * such as `range` and `roundrobin`, the application must use
964    * assign assign() to set and unassign() to clear the entire assignment.
965    * For the cooperative assignors, such as `cooperative-sticky`, the
966    * application must use incremental_assign() for ERR__ASSIGN_PARTITIONS and
967    * incremental_unassign() for ERR__REVOKE_PARTITIONS.
968    *
969    * Without a rebalance callback this is done automatically by librdkafka
970    * but registering a rebalance callback gives the application flexibility
971    * in performing other operations along with the assinging/revocation,
972    * such as fetching offsets from an alternate location (on assign)
973    * or manually committing offsets (on revoke).
974    *
975    * @sa RdKafka::KafkaConsumer::assign()
976    * @sa RdKafka::KafkaConsumer::incremental_assign()
977    * @sa RdKafka::KafkaConsumer::incremental_unassign()
978    * @sa RdKafka::KafkaConsumer::assignment_lost()
979    * @sa RdKafka::KafkaConsumer::rebalance_protocol()
980    *
981    * The following example show's the application's responsibilities:
982    * @code
983    *    class MyRebalanceCb : public RdKafka::RebalanceCb {
984    *     public:
985    *      void rebalance_cb (RdKafka::KafkaConsumer *consumer,
986    *                    RdKafka::ErrorCode err,
987    *                    std::vector<RdKafka::TopicPartition*> &partitions) {
988    *         if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
989    *           // application may load offets from arbitrary external
990    *           // storage here and update \p partitions
991    *           if (consumer->rebalance_protocol() == "COOPERATIVE")
992    *             consumer->incremental_assign(partitions);
993    *           else
994    *             consumer->assign(partitions);
995    *
996    *         } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
997    *           // Application may commit offsets manually here
998    *           // if auto.commit.enable=false
999    *           if (consumer->rebalance_protocol() == "COOPERATIVE")
1000    *             consumer->incremental_unassign(partitions);
1001    *           else
1002    *             consumer->unassign();
1003    *
1004    *         } else {
1005    *           std::cerr << "Rebalancing error: " <<
1006    *                        RdKafka::err2str(err) << std::endl;
1007    *           consumer->unassign();
1008    *         }
1009    *     }
1010    *  }
1011    * @endcode
1012    *
1013    * @remark The above example lacks error handling for assign calls, see
1014    *         the examples/ directory.
1015    */
1016  virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer,
1017                             RdKafka::ErrorCode err,
1018                             std::vector<TopicPartition*>&partitions) = 0;
1019 
~RebalanceCb()1020  virtual ~RebalanceCb() { }
1021 };
1022 
1023 
1024 /**
1025  * @brief Offset Commit callback class
1026  */
1027 class RD_EXPORT OffsetCommitCb {
1028 public:
1029   /**
1030    * @brief Set offset commit callback for use with consumer groups
1031    *
1032    * The results of automatic or manual offset commits will be scheduled
1033    * for this callback and is served by RdKafka::KafkaConsumer::consume().
1034    *
1035    * If no partitions had valid offsets to commit this callback will be called
1036    * with \p err == ERR__NO_OFFSET which is not to be considered an error.
1037    *
1038    * The \p offsets list contains per-partition information:
1039    *   - \c topic      The topic committed
1040    *   - \c partition  The partition committed
1041    *   - \c offset:    Committed offset (attempted)
1042    *   - \c err:       Commit error
1043    */
1044   virtual void offset_commit_cb(RdKafka::ErrorCode err,
1045                                 std::vector<TopicPartition*>&offsets) = 0;
1046 
~OffsetCommitCb()1047   virtual ~OffsetCommitCb() { }
1048 };
1049 
1050 
1051 
1052 /**
1053  * @brief SSL broker certificate verification class.
1054  *
1055  * @remark Class instance must outlive the RdKafka client instance.
1056  */
1057 class RD_EXPORT SslCertificateVerifyCb {
1058 public:
1059   /**
1060    * @brief SSL broker certificate verification callback.
1061    *
1062    * The verification callback is triggered from internal librdkafka threads
1063    * upon connecting to a broker. On each connection attempt the callback
1064    * will be called for each certificate in the broker's certificate chain,
1065    * starting at the root certification, as long as the application callback
1066    * returns 1 (valid certificate).
1067    *
1068    * \p broker_name and \p broker_id correspond to the broker the connection
1069    * is being made to.
1070    * The \c x509_error argument indicates if OpenSSL's verification of
1071    * the certificate succeed (0) or failed (an OpenSSL error code).
1072    * The application may set the SSL context error code by returning 0
1073    * from the verify callback and providing a non-zero SSL context error code
1074    * in \p x509_error.
1075    * If the verify callback sets \p x509_error to 0, returns 1, and the
1076    * original \p x509_error was non-zero, the error on the SSL context will
1077    * be cleared.
1078    * \p x509_error is always a valid pointer to an int.
1079    *
1080    * \p depth is the depth of the current certificate in the chain, starting
1081    * at the root certificate.
1082    *
1083    * The certificate itself is passed in binary DER format in \p buf of
1084    * size \p size.
1085    *
1086    * The callback must 1 if verification succeeds, or 0 if verification fails
1087    * and write a human-readable error message
1088    * to \p errstr.
1089    *
1090    * @warning This callback will be called from internal librdkafka threads.
1091    *
1092    * @remark See <openssl/x509_vfy.h> in the OpenSSL source distribution
1093    *         for a list of \p x509_error codes.
1094    */
1095   virtual bool ssl_cert_verify_cb (const std::string &broker_name,
1096                                    int32_t broker_id,
1097                                    int *x509_error,
1098                                    int depth,
1099                                    const char *buf, size_t size,
1100                                    std::string &errstr) = 0;
1101 
~SslCertificateVerifyCb()1102   virtual ~SslCertificateVerifyCb() {}
1103 };
1104 
1105 
1106 /**
1107  * @brief \b Portability: SocketCb callback class
1108  *
1109  */
1110 class RD_EXPORT SocketCb {
1111  public:
1112   /**
1113    * @brief Socket callback
1114    *
1115    * The socket callback is responsible for opening a socket
1116    * according to the supplied \p domain, \p type and \p protocol.
1117    * The socket shall be created with \c CLOEXEC set in a racefree fashion, if
1118    * possible.
1119    *
1120    * It is typically not required to register an alternative socket
1121    * implementation
1122    *
1123    * @returns The socket file descriptor or -1 on error (\c errno must be set)
1124    */
1125   virtual int socket_cb (int domain, int type, int protocol) = 0;
1126 
~SocketCb()1127   virtual ~SocketCb() { }
1128 };
1129 
1130 
1131 /**
1132  * @brief \b Portability: OpenCb callback class
1133  *
1134  */
1135 class RD_EXPORT OpenCb {
1136  public:
1137   /**
1138    * @brief Open callback
1139    * The open callback is responsible for opening the file specified by
1140    * \p pathname, using \p flags and \p mode.
1141    * The file shall be opened with \c CLOEXEC set in a racefree fashion, if
1142    * possible.
1143    *
1144    * It is typically not required to register an alternative open implementation
1145    *
1146    * @remark Not currently available on native Win32
1147    */
1148   virtual int open_cb (const std::string &path, int flags, int mode) = 0;
1149 
~OpenCb()1150   virtual ~OpenCb() { }
1151 };
1152 
1153 
1154 /**@}*/
1155 
1156 
1157 
1158 
1159 /**
1160  * @name Configuration interface
1161  * @{
1162  *
1163  */
1164 
1165 /**
1166  * @brief Configuration interface
1167  *
1168  * Holds either global or topic configuration that are passed to
1169  * RdKafka::Consumer::create(), RdKafka::Producer::create(),
1170  * RdKafka::KafkaConsumer::create(), etc.
1171  *
1172  * @sa CONFIGURATION.md for the full list of supported properties.
1173  */
1174 class RD_EXPORT Conf {
1175  public:
1176   /**
1177    * @brief Configuration object type
1178    */
1179   enum ConfType {
1180     CONF_GLOBAL, /**< Global configuration */
1181     CONF_TOPIC   /**< Topic specific configuration */
1182   };
1183 
1184   /**
1185    * @brief RdKafka::Conf::Set() result code
1186    */
1187   enum ConfResult {
1188     CONF_UNKNOWN = -2,  /**< Unknown configuration property */
1189     CONF_INVALID = -1,  /**< Invalid configuration value */
1190     CONF_OK = 0         /**< Configuration property was succesfully set */
1191   };
1192 
1193 
1194   /**
1195    * @brief Create configuration object
1196    */
1197   static Conf *create (ConfType type);
1198 
~Conf()1199   virtual ~Conf () { }
1200 
1201   /**
1202    * @brief Set configuration property \p name to value \p value.
1203    *
1204    * Fallthrough:
1205    * Topic-level configuration properties may be set using this interface
1206    * in which case they are applied on the \c default_topic_conf.
1207    * If no \c default_topic_conf has been set one will be created.
1208    * Any sub-sequent set("default_topic_conf", ..) calls will
1209    * replace the current default topic configuration.
1210 
1211    * @returns CONF_OK on success, else writes a human readable error
1212    *          description to \p errstr on error.
1213    */
1214   virtual Conf::ConfResult set (const std::string &name,
1215                                 const std::string &value,
1216                                 std::string &errstr) = 0;
1217 
1218   /** @brief Use with \p name = \c \"dr_cb\" */
1219   virtual Conf::ConfResult set (const std::string &name,
1220                                 DeliveryReportCb *dr_cb,
1221                                 std::string &errstr) = 0;
1222 
1223   /** @brief Use with \p name = \c \"oauthbearer_token_refresh_cb\" */
1224   virtual Conf::ConfResult set (const std::string &name,
1225                         OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
1226                         std::string &errstr) = 0;
1227 
1228   /** @brief Use with \p name = \c \"event_cb\" */
1229   virtual Conf::ConfResult set (const std::string &name,
1230                                 EventCb *event_cb,
1231                                 std::string &errstr) = 0;
1232 
1233   /** @brief Use with \p name = \c \"default_topic_conf\"
1234    *
1235    * Sets the default topic configuration to use for for automatically
1236    * subscribed topics.
1237    *
1238    * @sa RdKafka::KafkaConsumer::subscribe()
1239    */
1240   virtual Conf::ConfResult set (const std::string &name,
1241                                 const Conf *topic_conf,
1242                                 std::string &errstr) = 0;
1243 
1244   /** @brief Use with \p name = \c \"partitioner_cb\" */
1245   virtual Conf::ConfResult set (const std::string &name,
1246                                 PartitionerCb *partitioner_cb,
1247                                 std::string &errstr) = 0;
1248 
1249   /** @brief Use with \p name = \c \"partitioner_key_pointer_cb\" */
1250   virtual Conf::ConfResult set (const std::string &name,
1251                                 PartitionerKeyPointerCb *partitioner_kp_cb,
1252                                 std::string &errstr) = 0;
1253 
1254   /** @brief Use with \p name = \c \"socket_cb\" */
1255   virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
1256                                 std::string &errstr) = 0;
1257 
1258   /** @brief Use with \p name = \c \"open_cb\" */
1259   virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
1260                                 std::string &errstr) = 0;
1261 
1262   /** @brief Use with \p name = \c \"rebalance_cb\" */
1263   virtual Conf::ConfResult set (const std::string &name,
1264                                 RebalanceCb *rebalance_cb,
1265                                 std::string &errstr) = 0;
1266 
1267   /** @brief Use with \p name = \c \"offset_commit_cb\" */
1268   virtual Conf::ConfResult set (const std::string &name,
1269                                 OffsetCommitCb *offset_commit_cb,
1270                                 std::string &errstr) = 0;
1271 
1272   /** @brief Use with \p name = \c \"ssl_cert_verify_cb\".
1273    *  @returns CONF_OK on success or CONF_INVALID if SSL is
1274    *           not supported in this build.
1275   */
1276   virtual Conf::ConfResult set(const std::string &name,
1277                                SslCertificateVerifyCb *ssl_cert_verify_cb,
1278                                std::string &errstr) = 0;
1279 
1280   /**
1281    * @brief Set certificate/key \p cert_type from the \p cert_enc encoded
1282    *        memory at \p buffer of \p size bytes.
1283    *
1284    * @param cert_type Certificate or key type to configure.
1285    * @param cert_enc  Buffer \p encoding type.
1286    * @param buffer Memory pointer to encoded certificate or key.
1287    *               The memory is not referenced after this function returns.
1288    * @param size Size of memory at \p buffer.
1289    * @param errstr A human-readable error string will be written to this string
1290    *               on failure.
1291    *
1292    * @returns CONF_OK on success or CONF_INVALID if the memory in
1293    *          \p buffer is of incorrect encoding, or if librdkafka
1294    *          was not built with SSL support.
1295    *
1296    * @remark Calling this method multiple times with the same \p cert_type
1297    *         will replace the previous value.
1298    *
1299    * @remark Calling this method with \p buffer set to NULL will clear the
1300    *         configuration for \p cert_type.
1301    *
1302    * @remark The private key may require a password, which must be specified
1303    *         with the `ssl.key.password` configuration property prior to
1304    *         calling this function.
1305    *
1306    * @remark Private and public keys in PEM format may also be set with the
1307    *         `ssl.key.pem` and `ssl.certificate.pem` configuration properties.
1308    *
1309    * @remark CA certificate in PEM format may also be set with the
1310    *         `ssl.ca.pem` configuration property.
1311    */
1312   virtual Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type,
1313                                          RdKafka::CertificateEncoding cert_enc,
1314                                          const void *buffer, size_t size,
1315                                          std::string &errstr) = 0;
1316 
1317   /** @brief Query single configuration value
1318    *
1319    * Do not use this method to get callbacks registered by the configuration file.
1320    * Instead use the specific get() methods with the specific callback parameter in the signature.
1321    *
1322    * Fallthrough:
1323    * Topic-level configuration properties from the \c default_topic_conf
1324    * may be retrieved using this interface.
1325    *
1326    *  @returns CONF_OK if the property was set previously set and
1327    *           returns the value in \p value. */
1328   virtual Conf::ConfResult get(const std::string &name,
1329 	  std::string &value) const = 0;
1330 
1331   /** @brief Query single configuration value
1332    *  @returns CONF_OK if the property was set previously set and
1333    *           returns the value in \p dr_cb. */
1334   virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
1335 
1336   /** @brief Query single configuration value
1337    *  @returns CONF_OK if the property was set previously set and
1338    *           returns the value in \p oauthbearer_token_refresh_cb. */
1339   virtual Conf::ConfResult get(
1340           OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0;
1341 
1342   /** @brief Query single configuration value
1343    *  @returns CONF_OK if the property was set previously set and
1344    *           returns the value in \p event_cb. */
1345   virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
1346 
1347   /** @brief Query single configuration value
1348    *  @returns CONF_OK if the property was set previously set and
1349    *           returns the value in \p partitioner_cb. */
1350   virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
1351 
1352   /** @brief Query single configuration value
1353    *  @returns CONF_OK if the property was set previously set and
1354    *           returns the value in \p partitioner_kp_cb. */
1355   virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
1356 
1357   /** @brief Query single configuration value
1358    *  @returns CONF_OK if the property was set previously set and
1359    *           returns the value in \p socket_cb. */
1360   virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
1361 
1362   /** @brief Query single configuration value
1363    *  @returns CONF_OK if the property was set previously set and
1364    *           returns the value in \p open_cb. */
1365   virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
1366 
1367   /** @brief Query single configuration value
1368    *  @returns CONF_OK if the property was set previously set and
1369    *           returns the value in \p rebalance_cb. */
1370   virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
1371 
1372   /** @brief Query single configuration value
1373    *  @returns CONF_OK if the property was set previously set and
1374    *           returns the value in \p offset_commit_cb. */
1375   virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
1376 
1377   /** @brief Use with \p name = \c \"ssl_cert_verify_cb\" */
1378   virtual Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0;
1379 
1380   /** @brief Dump configuration names and values to list containing
1381    *         name,value tuples */
1382   virtual std::list<std::string> *dump () = 0;
1383 
1384   /** @brief Use with \p name = \c \"consume_cb\" */
1385   virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
1386                                 std::string &errstr) = 0;
1387 
1388   /**
1389    * @brief Returns the underlying librdkafka C rd_kafka_conf_t handle.
1390    *
1391    * @warning Calling the C API on this handle is not recommended and there
1392    *          is no official support for it, but for cases where the C++
1393    *          does not provide the proper functionality this C handle can be
1394    *          used to interact directly with the core librdkafka API.
1395    *
1396    * @remark The lifetime of the returned pointer is the same as the Conf
1397    *         object this method is called on.
1398    *
1399    * @remark Include <rdkafka/rdkafka.h> prior to including
1400    *         <rdkafka/rdkafkacpp.h>
1401    *
1402    * @returns \c rd_kafka_conf_t* if this is a CONF_GLOBAL object, else NULL.
1403    */
1404   virtual struct rd_kafka_conf_s *c_ptr_global () = 0;
1405 
1406   /**
1407    * @brief Returns the underlying librdkafka C rd_kafka_topic_conf_t handle.
1408    *
1409    * @warning Calling the C API on this handle is not recommended and there
1410    *          is no official support for it, but for cases where the C++
1411    *          does not provide the proper functionality this C handle can be
1412    *          used to interact directly with the core librdkafka API.
1413    *
1414    * @remark The lifetime of the returned pointer is the same as the Conf
1415    *         object this method is called on.
1416    *
1417    * @remark Include <rdkafka/rdkafka.h> prior to including
1418    *         <rdkafka/rdkafkacpp.h>
1419    *
1420    * @returns \c rd_kafka_topic_conf_t* if this is a CONF_TOPIC object,
1421    *          else NULL.
1422    */
1423   virtual struct rd_kafka_topic_conf_s *c_ptr_topic () = 0;
1424 
1425   /**
1426    * @brief Set callback_data for ssl engine.
1427    *
1428    * @remark The \c ssl.engine.location configuration must be set for this
1429    *         to have affect.
1430    *
1431    * @remark The memory pointed to by \p value must remain valid for the
1432    *         lifetime of the configuration object and any Kafka clients that
1433    *         use it.
1434    *
1435    * @returns CONF_OK on success, else CONF_INVALID.
1436    */
1437   virtual Conf::ConfResult set_engine_callback_data (void *value,
1438                                                      std::string &errstr) = 0;
1439 };
1440 
1441 /**@}*/
1442 
1443 
1444 /**
1445  * @name Kafka base client handle
1446  * @{
1447  *
1448  */
1449 
1450 /**
1451  * @brief Base handle, super class for specific clients.
1452  */
1453 class RD_EXPORT Handle {
1454  public:
~Handle()1455   virtual ~Handle() { }
1456 
1457   /** @returns the name of the handle */
1458   virtual const std::string name () const = 0;
1459 
1460   /**
1461    * @brief Returns the client's broker-assigned group member id
1462    *
1463    * @remark This currently requires the high-level KafkaConsumer
1464    *
1465    * @returns Last assigned member id, or empty string if not currently
1466    *          a group member.
1467    */
1468   virtual const std::string memberid () const = 0;
1469 
1470 
1471   /**
1472    * @brief Polls the provided kafka handle for events.
1473    *
1474    * Events will trigger application provided callbacks to be called.
1475    *
1476    * The \p timeout_ms argument specifies the maximum amount of time
1477    * (in milliseconds) that the call will block waiting for events.
1478    * For non-blocking calls, provide 0 as \p timeout_ms.
1479    * To wait indefinately for events, provide -1.
1480    *
1481    * Events:
1482    *   - delivery report callbacks (if an RdKafka::DeliveryCb is configured) [producer]
1483    *   - event callbacks (if an RdKafka::EventCb is configured) [producer & consumer]
1484    *
1485    * @remark  An application should make sure to call poll() at regular
1486    *          intervals to serve any queued callbacks waiting to be called.
1487    *
1488    * @warning This method MUST NOT be used with the RdKafka::KafkaConsumer,
1489    *          use its RdKafka::KafkaConsumer::consume() instead.
1490    *
1491    * @returns the number of events served.
1492    */
1493   virtual int poll (int timeout_ms) = 0;
1494 
1495   /**
1496    * @brief  Returns the current out queue length
1497    *
1498    * The out queue contains messages and requests waiting to be sent to,
1499    * or acknowledged by, the broker.
1500    */
1501   virtual int outq_len () = 0;
1502 
1503   /**
1504    * @brief Request Metadata from broker.
1505    *
1506    * Parameters:
1507    *  \p all_topics  - if non-zero: request info about all topics in cluster,
1508    *                   if zero: only request info about locally known topics.
1509    *  \p only_rkt    - only request info about this topic
1510    *  \p metadatap   - pointer to hold metadata result.
1511    *                   The \p *metadatap pointer must be released with \c delete.
1512    *  \p timeout_ms  - maximum response time before failing.
1513    *
1514    * @returns RdKafka::ERR_NO_ERROR on success (in which case \p *metadatap
1515    * will be set), else RdKafka::ERR__TIMED_OUT on timeout or
1516    * other error code on error.
1517    */
1518   virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt,
1519                               Metadata **metadatap, int timeout_ms) = 0;
1520 
1521 
1522   /**
1523    * @brief Pause producing or consumption for the provided list of partitions.
1524    *
1525    * Success or error is returned per-partition in the \p partitions list.
1526    *
1527    * @returns ErrorCode::NO_ERROR
1528    *
1529    * @sa resume()
1530    */
1531   virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
1532 
1533 
1534   /**
1535    * @brief Resume producing or consumption for the provided list of partitions.
1536    *
1537    * Success or error is returned per-partition in the \p partitions list.
1538    *
1539    * @returns ErrorCode::NO_ERROR
1540    *
1541    * @sa pause()
1542    */
1543   virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
1544 
1545 
1546   /**
1547    * @brief Query broker for low (oldest/beginning)
1548    *        and high (newest/end) offsets for partition.
1549    *
1550    * Offsets are returned in \p *low and \p *high respectively.
1551    *
1552    * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure.
1553    */
1554   virtual ErrorCode query_watermark_offsets (const std::string &topic,
1555 					     int32_t partition,
1556 					     int64_t *low, int64_t *high,
1557 					     int timeout_ms) = 0;
1558 
1559   /**
1560    * @brief Get last known low (oldest/beginning)
1561    *        and high (newest/end) offsets for partition.
1562    *
1563    * The low offset is updated periodically (if statistics.interval.ms is set)
1564    * while the high offset is updated on each fetched message set from the
1565    * broker.
1566    *
1567    * If there is no cached offset (either low or high, or both) then
1568    * OFFSET_INVALID will be returned for the respective offset.
1569    *
1570    * Offsets are returned in \p *low and \p *high respectively.
1571    *
1572    * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure.
1573    *
1574    * @remark Shall only be used with an active consumer instance.
1575    */
1576   virtual ErrorCode get_watermark_offsets (const std::string &topic,
1577 					   int32_t partition,
1578 					   int64_t *low, int64_t *high) = 0;
1579 
1580 
1581   /**
1582    * @brief Look up the offsets for the given partitions by timestamp.
1583    *
1584    * The returned offset for each partition is the earliest offset whose
1585    * timestamp is greater than or equal to the given timestamp in the
1586    * corresponding partition.
1587    *
1588    * The timestamps to query are represented as \c offset in \p offsets
1589    * on input, and \c offset() will return the closest earlier offset
1590    * for the timestamp on output.
1591    *
1592    * Timestamps are expressed as milliseconds since epoch (UTC).
1593    *
1594    * The function will block for at most \p timeout_ms milliseconds.
1595    *
1596    * @remark Duplicate Topic+Partitions are not supported.
1597    * @remark Errors are also returned per TopicPartition, see \c err()
1598    *
1599    * @returns an error code for general errors, else RdKafka::ERR_NO_ERROR
1600    *          in which case per-partition errors might be set.
1601    */
1602   virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
1603                                      int timeout_ms) = 0;
1604 
1605 
1606   /**
1607    * @brief Retrieve queue for a given partition.
1608    *
1609    * @returns The fetch queue for the given partition if successful. Else,
1610    *          NULL is returned.
1611    *
1612    * @remark This function only works on consumers.
1613    */
1614   virtual Queue *get_partition_queue (const TopicPartition *partition) = 0;
1615 
1616   /**
1617    * @brief Forward librdkafka logs (and debug) to the specified queue
1618    *        for serving with one of the ..poll() calls.
1619    *
1620    *        This allows an application to serve log callbacks (\c log_cb)
1621    *        in its thread of choice.
1622    *
1623    * @param queue Queue to forward logs to. If the value is NULL the logs
1624    *        are forwarded to the main queue.
1625    *
1626    * @remark The configuration property \c log.queue MUST also be set to true.
1627    *
1628    * @remark librdkafka maintains its own reference to the provided queue.
1629    *
1630    * @returns ERR_NO_ERROR on success or an error code on error.
1631    */
1632   virtual ErrorCode set_log_queue (Queue *queue) = 0;
1633 
1634   /**
1635    * @brief Cancels the current callback dispatcher (Handle::poll(),
1636    *        KafkaConsumer::consume(), etc).
1637    *
1638    * A callback may use this to force an immediate return to the calling
1639    * code (caller of e.g. Handle::poll()) without processing any further
1640    * events.
1641    *
1642    * @remark This function MUST ONLY be called from within a
1643    *         librdkafka callback.
1644    */
1645   virtual void yield () = 0;
1646 
1647   /**
1648    * @brief Returns the ClusterId as reported in broker metadata.
1649    *
1650    * @param timeout_ms If there is no cached value from metadata retrieval
1651    *                   then this specifies the maximum amount of time
1652    *                   (in milliseconds) the call will block waiting
1653    *                   for metadata to be retrieved.
1654    *                   Use 0 for non-blocking calls.
1655    *
1656    * @remark Requires broker version >=0.10.0 and api.version.request=true.
1657    *
1658    * @returns Last cached ClusterId, or empty string if no ClusterId could be
1659    *          retrieved in the allotted timespan.
1660    */
1661   virtual const std::string clusterid (int timeout_ms) = 0;
1662 
1663   /**
1664    * @brief Returns the underlying librdkafka C rd_kafka_t handle.
1665    *
1666    * @warning Calling the C API on this handle is not recommended and there
1667    *          is no official support for it, but for cases where the C++
1668    *          does not provide the proper functionality this C handle can be
1669    *          used to interact directly with the core librdkafka API.
1670    *
1671    * @remark The lifetime of the returned pointer is the same as the Topic
1672    *         object this method is called on.
1673    *
1674    * @remark Include <rdkafka/rdkafka.h> prior to including
1675    *         <rdkafka/rdkafkacpp.h>
1676    *
1677    * @returns \c rd_kafka_t*
1678    */
1679   virtual struct rd_kafka_s *c_ptr () = 0;
1680 
1681   /**
1682    * @brief Returns the current ControllerId (controller broker id)
1683    *        as reported in broker metadata.
1684    *
1685    * @param timeout_ms If there is no cached value from metadata retrieval
1686    *                   then this specifies the maximum amount of time
1687    *                   (in milliseconds) the call will block waiting
1688    *                   for metadata to be retrieved.
1689    *                   Use 0 for non-blocking calls.
1690    *
1691    * @remark Requires broker version >=0.10.0 and api.version.request=true.
1692    *
1693    * @returns Last cached ControllerId, or -1 if no ControllerId could be
1694    *          retrieved in the allotted timespan.
1695    */
1696   virtual int32_t controllerid (int timeout_ms) = 0;
1697 
1698 
1699   /**
1700    * @brief Returns the first fatal error set on this client instance,
1701    *        or ERR_NO_ERROR if no fatal error has occurred.
1702    *
1703    * This function is to be used with the Idempotent Producer and
1704    * the Event class for \c EVENT_ERROR events to detect fatal errors.
1705    *
1706    * Generally all errors raised by the error event are to be considered
1707    * informational and temporary, the client will try to recover from all
1708    * errors in a graceful fashion (by retrying, etc).
1709    *
1710    * However, some errors should logically be considered fatal to retain
1711    * consistency; in particular a set of errors that may occur when using the
1712    * Idempotent Producer and the in-order or exactly-once producer guarantees
1713    * can't be satisfied.
1714    *
1715    * @param errstr A human readable error string if a fatal error was set.
1716    *
1717    * @returns ERR_NO_ERROR if no fatal error has been raised, else
1718    *          any other error code.
1719    */
1720   virtual ErrorCode fatal_error (std::string &errstr) const = 0;
1721 
1722   /**
1723    * @brief Set SASL/OAUTHBEARER token and metadata
1724    *
1725    * @param token_value the mandatory token value to set, often (but not
1726    *  necessarily) a JWS compact serialization as per
1727    *  https://tools.ietf.org/html/rfc7515#section-3.1.
1728    * @param md_lifetime_ms when the token expires, in terms of the number of
1729    *  milliseconds since the epoch.
1730    * @param md_principal_name the Kafka principal name associated with the
1731    *  token.
1732    * @param extensions potentially empty SASL extension keys and values where
1733    *  element [i] is the key and [i+1] is the key's value, to be communicated
1734    *  to the broker as additional key-value pairs during the initial client
1735    *  response as per https://tools.ietf.org/html/rfc7628#section-3.1.  The
1736    *  number of SASL extension keys plus values must be a non-negative multiple
1737    *  of 2. Any provided keys and values are copied.
1738    * @param errstr A human readable error string is written here, only if
1739    *  there is an error.
1740    *
1741    * The SASL/OAUTHBEARER token refresh callback should invoke
1742    * this method upon success. The extension keys must not include the reserved
1743    * key "`auth`", and all extension keys and values must conform to the
1744    * required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
1745    *
1746    *     key            = 1*(ALPHA)
1747    *     value          = *(VCHAR / SP / HTAB / CR / LF )
1748    *
1749    * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise \p errstr set
1750    *              and:<br>
1751    *          \c RdKafka::ERR__INVALID_ARG if any of the arguments are
1752    *              invalid;<br>
1753    *          \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not
1754    *              supported by this build;<br>
1755    *          \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is
1756    *              not configured as the client's authentication mechanism.<br>
1757    *
1758    * @sa RdKafka::oauthbearer_set_token_failure
1759    * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb"
1760    */
1761   virtual ErrorCode oauthbearer_set_token (const std::string &token_value,
1762                                            int64_t md_lifetime_ms,
1763                                            const std::string &md_principal_name,
1764                                            const std::list<std::string> &extensions,
1765                                            std::string &errstr) = 0;
1766 
1767     /**
1768      * @brief SASL/OAUTHBEARER token refresh failure indicator.
1769      *
1770      * @param errstr human readable error reason for failing to acquire a token.
1771      *
1772      * The SASL/OAUTHBEARER token refresh callback should
1773      * invoke this method upon failure to refresh the token.
1774      *
1775      * @returns \c RdKafka::ERR_NO_ERROR on success, otherwise:<br>
1776      *          \c RdKafka::ERR__NOT_IMPLEMENTED if SASL/OAUTHBEARER is not
1777      *              supported by this build;<br>
1778      *          \c RdKafka::ERR__STATE if SASL/OAUTHBEARER is supported but is
1779      *              not configured as the client's authentication mechanism.
1780      *
1781      * @sa RdKafka::oauthbearer_set_token
1782      * @sa RdKafka::Conf::set() \c "oauthbearer_token_refresh_cb"
1783      */
1784     virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr) = 0;
1785 
1786    /**
1787      * @brief Allocate memory using the same allocator librdkafka uses.
1788      *
1789      * This is typically an abstraction for the malloc(3) call and makes sure
1790      * the application can use the same memory allocator as librdkafka for
1791      * allocating pointers that are used by librdkafka.
1792      *
1793      * @remark Memory allocated by mem_malloc() must be freed using
1794      *         mem_free().
1795      */
1796     virtual void *mem_malloc (size_t size) = 0;
1797 
1798    /**
1799      * @brief Free pointer returned by librdkafka
1800      *
1801      * This is typically an abstraction for the free(3) call and makes sure
1802      * the application can use the same memory allocator as librdkafka for
1803      * freeing pointers returned by librdkafka.
1804      *
1805      * In standard setups it is usually not necessary to use this interface
1806      * rather than the free(3) function.
1807      *
1808      * @remark mem_free() must only be used for pointers returned by APIs
1809      *         that explicitly mention using this function for freeing.
1810      */
1811     virtual void mem_free (void *ptr) = 0;
1812 };
1813 
1814 
1815 /**@}*/
1816 
1817 
1818 /**
1819  * @name Topic and partition objects
1820  * @{
1821  *
1822  */
1823 
1824 /**
1825  * @brief Topic+Partition
1826  *
1827  * This is a generic type to hold a single partition and various
1828  * information about it.
1829  *
1830  * Is typically used with std::vector<RdKafka::TopicPartition*> to provide
1831  * a list of partitions for different operations.
1832  */
1833 class RD_EXPORT TopicPartition {
1834 public:
1835   /**
1836    * @brief Create topic+partition object for \p topic and \p partition.
1837    *
1838    * Use \c delete to deconstruct.
1839    */
1840   static TopicPartition *create (const std::string &topic, int partition);
1841 
1842   /**
1843    * @brief Create topic+partition object for \p topic and \p partition
1844    *        with offset \p offset.
1845    *
1846    * Use \c delete to deconstruct.
1847    */
1848   static TopicPartition *create (const std::string &topic, int partition,
1849                                  int64_t offset);
1850 
1851   virtual ~TopicPartition() = 0;
1852 
1853   /**
1854    * @brief Destroy/delete the TopicPartitions in \p partitions
1855    *        and clear the vector.
1856    */
1857   static void destroy (std::vector<TopicPartition*> &partitions);
1858 
1859   /** @returns topic name */
1860   virtual const std::string &topic () const = 0;
1861 
1862   /** @returns partition id */
1863   virtual int partition () const = 0;
1864 
1865   /** @returns offset (if applicable) */
1866   virtual int64_t offset () const = 0;
1867 
1868   /** @brief Set offset */
1869   virtual void set_offset (int64_t offset) = 0;
1870 
1871   /** @returns error code (if applicable) */
1872   virtual ErrorCode err () const = 0;
1873 };
1874 
1875 
1876 
1877 /**
1878  * @brief Topic handle
1879  *
1880  */
1881 class RD_EXPORT Topic {
1882  public:
1883   /**
1884    * @brief Unassigned partition.
1885    *
1886    * The unassigned partition is used by the producer API for messages
1887    * that should be partitioned using the configured or default partitioner.
1888    */
1889   static const int32_t PARTITION_UA;
1890 
1891   /** @brief Special offsets */
1892   static const int64_t OFFSET_BEGINNING; /**< Consume from beginning */
1893   static const int64_t OFFSET_END; /**< Consume from end */
1894   static const int64_t OFFSET_STORED; /**< Use offset storage */
1895   static const int64_t OFFSET_INVALID; /**< Invalid offset */
1896 
1897 
1898   /**
1899    * @brief Creates a new topic handle for topic named \p topic_str
1900    *
1901    * \p conf is an optional configuration for the topic  that will be used
1902    * instead of the default topic configuration.
1903    * The \p conf object is reusable after this call.
1904    *
1905    * @returns the new topic handle or NULL on error (see \p errstr).
1906    */
1907   static Topic *create (Handle *base, const std::string &topic_str,
1908                         const Conf *conf, std::string &errstr);
1909 
1910   virtual ~Topic () = 0;
1911 
1912 
1913   /** @returns the topic name */
1914   virtual const std::string name () const = 0;
1915 
1916   /**
1917    * @returns true if \p partition is available for the topic (has leader).
1918    * @warning \b MUST \b ONLY be called from within a
1919    *          RdKafka::PartitionerCb callback.
1920    */
1921   virtual bool partition_available (int32_t partition) const = 0;
1922 
1923   /**
1924    * @brief Store offset \p offset + 1 for topic partition \p partition.
1925    * The offset will be committed (written) to the broker (or file) according
1926    * to \p auto.commit.interval.ms or next manual offset-less commit call.
1927    *
1928    * @remark \c enable.auto.offset.store must be set to \c false when using
1929    *         this API.
1930    *
1931    * @returns RdKafka::ERR_NO_ERROR on success or an error code if none of the
1932    *          offsets could be stored.
1933    */
1934   virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
1935 
1936   /**
1937    * @brief Returns the underlying librdkafka C rd_kafka_topic_t handle.
1938    *
1939    * @warning Calling the C API on this handle is not recommended and there
1940    *          is no official support for it, but for cases where the C++ API
1941    *          does not provide the underlying functionality this C handle can be
1942    *          used to interact directly with the core librdkafka API.
1943    *
1944    * @remark The lifetime of the returned pointer is the same as the Topic
1945    *         object this method is called on.
1946    *
1947    * @remark Include <rdkafka/rdkafka.h> prior to including
1948    *         <rdkafka/rdkafkacpp.h>
1949    *
1950    * @returns \c rd_kafka_topic_t*
1951    */
1952   virtual struct rd_kafka_topic_s *c_ptr () = 0;
1953 };
1954 
1955 
1956 /**@}*/
1957 
1958 
1959 /**
1960  * @name Message object
1961  * @{
1962  *
1963  */
1964 
1965 
1966 /**
1967  * @brief Message timestamp object
1968  *
1969  * Represents the number of milliseconds since the epoch (UTC).
1970  *
1971  * The MessageTimestampType dictates the timestamp type or origin.
1972  *
1973  * @remark Requires Apache Kafka broker version >= 0.10.0
1974  *
1975  */
1976 
1977 class RD_EXPORT MessageTimestamp {
1978 public:
1979   /*! Message timestamp type */
1980   enum MessageTimestampType {
1981     MSG_TIMESTAMP_NOT_AVAILABLE,   /**< Timestamp not available */
1982     MSG_TIMESTAMP_CREATE_TIME,     /**< Message creation time (source) */
1983     MSG_TIMESTAMP_LOG_APPEND_TIME  /**< Message log append time (broker) */
1984   };
1985 
1986   MessageTimestampType type;       /**< Timestamp type */
1987   int64_t timestamp;               /**< Milliseconds since epoch (UTC). */
1988 };
1989 
1990 
1991 /**
1992  * @brief Headers object
1993  *
1994  * Represents message headers.
1995  *
1996  * https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
1997  *
1998  * @remark Requires Apache Kafka >= 0.11.0 brokers
1999  */
2000 class RD_EXPORT Headers {
2001 public:
2002   virtual ~Headers() = 0;
2003 
2004   /**
2005    * @brief Header object
2006    *
2007    * This object represents a single Header with a key value pair
2008    * and an ErrorCode
2009    *
2010    * @remark dynamic allocation of this object is not supported.
2011    */
2012   class Header {
2013    public:
2014     /**
2015      * @brief Header object to encapsulate a single Header
2016      *
2017      * @param key the string value for the header key
2018      * @param value the bytes of the header value, or NULL
2019      * @param value_size the length in bytes of the header value
2020      *
2021      * @remark key and value are copied.
2022      *
2023      */
Header(const std::string & key,const void * value,size_t value_size)2024     Header(const std::string &key,
2025            const void *value,
2026            size_t value_size):
2027     key_(key), err_(ERR_NO_ERROR), value_size_(value_size) {
2028       value_ = copy_value(value, value_size);
2029     }
2030 
2031     /**
2032      * @brief Header object to encapsulate a single Header
2033      *
2034      * @param key the string value for the header key
2035      * @param value the bytes of the header value
2036      * @param value_size the length in bytes of the header value
2037      * @param err the error code if one returned
2038      *
2039      * @remark The error code is used for when the Header is constructed
2040      *         internally by using RdKafka::Headers::get_last which constructs
2041      *         a Header encapsulating the ErrorCode in the process.
2042      *         If err is set, the value and value_size fields will be undefined.
2043      */
Header(const std::string & key,const void * value,size_t value_size,const RdKafka::ErrorCode err)2044     Header(const std::string &key,
2045            const void *value,
2046            size_t value_size,
2047            const RdKafka::ErrorCode err):
2048     key_(key), err_(err), value_(NULL), value_size_(value_size) {
2049       if (err == ERR_NO_ERROR)
2050         value_ = copy_value(value, value_size);
2051     }
2052 
2053     /**
2054      * @brief Copy constructor
2055      *
2056      * @param other Header to make a copy of.
2057      */
Header(const Header & other)2058     Header(const Header &other):
2059     key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
2060       value_ = copy_value(other.value_, value_size_);
2061     }
2062 
2063     /**
2064      * @brief Assignment operator
2065      *
2066      * @param other Header to make a copy of.
2067      */
2068     Header& operator=(const Header &other)
2069     {
2070       if (&other == this) {
2071         return *this;
2072       }
2073 
2074       key_ = other.key_;
2075       err_ = other.err_;
2076       value_size_ = other.value_size_;
2077 
2078       if (value_ != NULL)
2079         mem_free(value_);
2080 
2081       value_ = copy_value(other.value_, value_size_);
2082 
2083       return *this;
2084     }
2085 
~Header()2086     ~Header() {
2087       if (value_ != NULL)
2088         mem_free(value_);
2089     }
2090 
2091     /** @returns the key/name associated with this Header */
key()2092     std::string key() const {
2093       return key_;
2094     }
2095 
2096      /** @returns returns the binary value, or NULL */
value()2097     const void *value() const {
2098       return value_;
2099     }
2100 
2101     /** @returns returns the value casted to a nul-terminated C string,
2102      *           or NULL. */
value_string()2103     const char *value_string() const {
2104       return static_cast<const char *>(value_);
2105     }
2106 
2107     /** @returns Value Size the length of the Value in bytes */
value_size()2108     size_t value_size() const {
2109       return value_size_;
2110     }
2111 
2112     /** @returns the error code of this Header (usually ERR_NO_ERROR) */
err()2113     RdKafka::ErrorCode err() const {
2114       return err_;
2115     }
2116 
2117  private:
copy_value(const void * value,size_t value_size)2118     char *copy_value(const void *value, size_t value_size) {
2119       if (!value)
2120         return NULL;
2121 
2122       char *dest = (char *)mem_malloc(value_size + 1);
2123       memcpy(dest, (const char *)value, value_size);
2124       dest[value_size] = '\0';
2125 
2126       return dest;
2127     }
2128 
2129     std::string key_;
2130     RdKafka::ErrorCode err_;
2131     char *value_;
2132     size_t value_size_;
2133     void *operator new(size_t); /* Prevent dynamic allocation */
2134   };
2135 
2136   /**
2137    * @brief Create a new instance of the Headers object
2138    *
2139    * @returns an empty Headers list
2140    */
2141   static Headers *create();
2142 
2143   /**
2144    * @brief Create a new instance of the Headers object from a std::vector
2145    *
2146    * @param headers std::vector of RdKafka::Headers::Header objects.
2147    *                The headers are copied, not referenced.
2148    *
2149    * @returns a Headers list from std::vector set to the size of the std::vector
2150    */
2151   static Headers *create(const std::vector<Header> &headers);
2152 
2153   /**
2154    * @brief Adds a Header to the end of the list.
2155    *
2156    * @param key header key/name
2157    * @param value binary value, or NULL
2158    * @param value_size size of the value
2159    *
2160    * @returns an ErrorCode signalling success or failure to add the header.
2161    */
2162   virtual ErrorCode add(const std::string &key, const void *value,
2163                         size_t value_size) = 0;
2164 
2165   /**
2166    * @brief Adds a Header to the end of the list.
2167    *
2168    * Convenience method for adding a std::string as a value for the header.
2169    *
2170    * @param key header key/name
2171    * @param value value string
2172    *
2173    * @returns an ErrorCode signalling success or failure to add the header.
2174    */
2175   virtual ErrorCode add(const std::string &key, const std::string &value) = 0;
2176 
2177   /**
2178    * @brief Adds a Header to the end of the list.
2179    *
2180    * This method makes a copy of the passed header.
2181    *
2182    * @param header Existing header to copy
2183    *
2184    * @returns an ErrorCode signalling success or failure to add the header.
2185    */
2186   virtual ErrorCode add(const Header &header) = 0;
2187 
2188   /**
2189    * @brief Removes all the Headers of a given key
2190    *
2191    * @param key header key/name to remove
2192    *
2193    * @returns An ErrorCode signalling a success or failure to remove the Header.
2194    */
2195   virtual ErrorCode remove(const std::string &key) = 0;
2196 
2197   /**
2198    * @brief Gets all of the Headers of a given key
2199    *
2200    * @param key header key/name
2201    *
2202    * @remark If duplicate keys exist this will return them all as a std::vector
2203    *
2204    * @returns a std::vector containing all the Headers of the given key.
2205    */
2206   virtual std::vector<Header> get(const std::string &key) const = 0;
2207 
2208   /**
2209    * @brief Gets the last occurrence of a Header of a given key
2210    *
2211    * @param key header key/name
2212    *
2213    * @remark This will only return the most recently added header
2214    *
2215    * @returns the Header if found, otherwise a Header with an err set to
2216    *          ERR__NOENT.
2217    */
2218   virtual Header get_last(const std::string &key) const = 0;
2219 
2220   /**
2221    * @brief Returns all Headers
2222    *
2223    * @returns a std::vector containing all of the Headers
2224    */
2225   virtual std::vector<Header> get_all() const = 0;
2226 
2227   /**
2228    * @returns the number of headers.
2229    */
2230   virtual size_t size() const = 0;
2231 };
2232 
2233 
2234 /**
2235  * @brief Message object
2236  *
2237  * This object represents either a single consumed or produced message,
2238  * or an event (\p err() is set).
2239  *
2240  * An application must check RdKafka::Message::err() to see if the
2241  * object is a proper message (error is RdKafka::ERR_NO_ERROR) or a
2242  * an error event.
2243  *
2244  */
2245 class RD_EXPORT Message {
2246  public:
2247   /** @brief Message persistence status can be used by the application to
2248    *         find out if a produced message was persisted in the topic log. */
2249   enum Status {
2250     /** Message was never transmitted to the broker, or failed with
2251      *  an error indicating it was not written to the log.
2252      *  Application retry risks ordering, but not duplication. */
2253     MSG_STATUS_NOT_PERSISTED = 0,
2254 
2255     /** Message was transmitted to broker, but no acknowledgement was
2256      *  received.
2257      *  Application retry risks ordering and duplication. */
2258     MSG_STATUS_POSSIBLY_PERSISTED = 1,
2259 
2260     /** Message was written to the log and fully acknowledged.
2261      *  No reason for application to retry.
2262      *  Note: this value should only be trusted with \c acks=all. */
2263     MSG_STATUS_PERSISTED = 2,
2264   };
2265 
2266   /**
2267    * @brief Accessor functions*
2268    * @remark Not all fields are present in all types of callbacks.
2269    */
2270 
2271   /** @returns The error string if object represent an error event,
2272    *           else an empty string. */
2273   virtual std::string         errstr() const = 0;
2274 
2275   /** @returns The error code if object represents an error event, else 0. */
2276   virtual ErrorCode           err () const = 0;
2277 
2278   /** @returns the RdKafka::Topic object for a message (if applicable),
2279    *            or NULL if a corresponding RdKafka::Topic object has not been
2280    *            explicitly created with RdKafka::Topic::create().
2281    *            In this case use topic_name() instead. */
2282   virtual Topic              *topic () const = 0;
2283 
2284   /** @returns Topic name (if applicable, else empty string) */
2285   virtual std::string         topic_name () const = 0;
2286 
2287   /** @returns Partition (if applicable) */
2288   virtual int32_t             partition () const = 0;
2289 
2290   /** @returns Message payload (if applicable) */
2291   virtual void               *payload () const = 0 ;
2292 
2293   /** @returns Message payload length (if applicable) */
2294   virtual size_t              len () const = 0;
2295 
2296   /** @returns Message key as string (if applicable) */
2297   virtual const std::string  *key () const = 0;
2298 
2299   /** @returns Message key as void pointer  (if applicable) */
2300   virtual const void         *key_pointer () const = 0 ;
2301 
2302   /** @returns Message key's binary length (if applicable) */
2303   virtual size_t              key_len () const = 0;
2304 
2305   /** @returns Message or error offset (if applicable) */
2306   virtual int64_t             offset () const = 0;
2307 
2308   /** @returns Message timestamp (if applicable) */
2309   virtual MessageTimestamp    timestamp () const = 0;
2310 
2311   /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */
2312   virtual void               *msg_opaque () const = 0;
2313 
2314   virtual ~Message () = 0;
2315 
2316   /** @returns the latency in microseconds for a produced message measured
2317    *           from the produce() call, or -1 if latency is not available. */
2318   virtual int64_t             latency () const = 0;
2319 
2320   /**
2321    * @brief Returns the underlying librdkafka C rd_kafka_message_t handle.
2322    *
2323    * @warning Calling the C API on this handle is not recommended and there
2324    *          is no official support for it, but for cases where the C++ API
2325    *          does not provide the underlying functionality this C handle can be
2326    *          used to interact directly with the core librdkafka API.
2327    *
2328    * @remark The lifetime of the returned pointer is the same as the Message
2329    *         object this method is called on.
2330    *
2331    * @remark Include <rdkafka/rdkafka.h> prior to including
2332    *         <rdkafka/rdkafkacpp.h>
2333    *
2334    * @returns \c rd_kafka_message_t*
2335    */
2336   virtual struct rd_kafka_message_s *c_ptr () = 0;
2337 
2338   /**
2339    * @brief Returns the message's persistence status in the topic log.
2340    */
2341   virtual Status status () const = 0;
2342 
2343   /** @returns the Headers instance for this Message, or NULL if there
2344    *  are no headers.
2345    *
2346    * @remark The lifetime of the Headers are the same as the Message. */
2347   virtual RdKafka::Headers   *headers () = 0;
2348 
2349   /** @returns the Headers instance for this Message (if applicable).
2350    *  If NULL is returned the reason is given in \p err, which
2351    *  is either ERR__NOENT if there were no headers, or another
2352    *  error code if header parsing failed.
2353    *
2354    * @remark The lifetime of the Headers are the same as the Message. */
2355   virtual RdKafka::Headers   *headers (RdKafka::ErrorCode *err) = 0;
2356 
2357   /** @returns the broker id of the broker the message was produced to or
2358    *           fetched from, or -1 if not known/applicable. */
2359   virtual int32_t broker_id () const = 0;
2360 };
2361 
2362 /**@}*/
2363 
2364 
2365 /**
2366  * @name Queue interface
2367  * @{
2368  *
2369  */
2370 
2371 
2372 /**
2373  * @brief Queue interface
2374  *
2375  * Create a new message queue.  Message queues allows the application
2376  * to re-route consumed messages from multiple topic+partitions into
2377  * one single queue point.  This queue point, containing messages from
2378  * a number of topic+partitions, may then be served by a single
2379  * consume() method, rather than one per topic+partition combination.
2380  *
2381  * See the RdKafka::Consumer::start(), RdKafka::Consumer::consume(), and
2382  * RdKafka::Consumer::consume_callback() methods that take a queue as the first
2383  * parameter for more information.
2384  */
2385 class RD_EXPORT Queue {
2386  public:
2387   /**
2388    * @brief Create Queue object
2389    */
2390   static Queue *create (Handle *handle);
2391 
2392   /**
2393    * @brief Forward/re-route queue to \p dst.
2394    * If \p dst is \c NULL, the forwarding is removed.
2395    *
2396    * The internal refcounts for both queues are increased.
2397    *
2398    * @remark Regardless of whether \p dst is NULL or not, after calling this
2399    *         function, \p src will not forward it's fetch queue to the consumer
2400    *         queue.
2401    */
2402   virtual ErrorCode forward (Queue *dst) = 0;
2403 
2404 
2405   /**
2406    * @brief Consume message or get error event from the queue.
2407    *
2408    * @remark Use \c delete to free the message.
2409    *
2410    * @returns One of:
2411    *  - proper message (RdKafka::Message::err() is ERR_NO_ERROR)
2412    *  - error event (RdKafka::Message::err() is != ERR_NO_ERROR)
2413    *  - timeout due to no message or event in \p timeout_ms
2414    *    (RdKafka::Message::err() is ERR__TIMED_OUT)
2415    */
2416   virtual Message *consume (int timeout_ms) = 0;
2417 
2418   /**
2419    * @brief Poll queue, serving any enqueued callbacks.
2420    *
2421    * @remark Must NOT be used for queues containing messages.
2422    *
2423    * @returns the number of events served or 0 on timeout.
2424    */
2425   virtual int poll (int timeout_ms) = 0;
2426 
2427   virtual ~Queue () = 0;
2428 
2429   /**
2430    * @brief Enable IO event triggering for queue.
2431    *
2432    * To ease integration with IO based polling loops this API
2433    * allows an application to create a separate file-descriptor
2434    * that librdkafka will write \p payload (of size \p size) to
2435    * whenever a new element is enqueued on a previously empty queue.
2436    *
2437    * To remove event triggering call with \p fd = -1.
2438    *
2439    * librdkafka will maintain a copy of the \p payload.
2440    *
2441    * @remark When using forwarded queues the IO event must only be enabled
2442    *         on the final forwarded-to (destination) queue.
2443    */
2444   virtual void io_event_enable (int fd, const void *payload, size_t size) = 0;
2445 };
2446 
2447 /**@}*/
2448 
2449 /**
2450  * @name ConsumerGroupMetadata
2451  * @{
2452  *
2453  */
2454 /**
2455  * @brief ConsumerGroupMetadata holds a consumer instance's group
2456  *        metadata state.
2457  *
2458  * This class currently does not have any public methods.
2459  */
2460 class RD_EXPORT ConsumerGroupMetadata {
2461 public:
2462   virtual ~ConsumerGroupMetadata () = 0;
2463 };
2464 
2465 /**@}*/
2466 
2467 /**
2468  * @name KafkaConsumer
2469  * @{
2470  *
2471  */
2472 
2473 
2474 /**
2475  * @brief High-level KafkaConsumer (for brokers 0.9 and later)
2476  *
2477  * @remark Requires Apache Kafka >= 0.9.0 brokers
2478  *
2479  * Currently supports the \c range and \c roundrobin partition assignment
2480  * strategies (see \c partition.assignment.strategy)
2481  */
2482 class RD_EXPORT KafkaConsumer : public virtual Handle {
2483 public:
2484   /**
2485    * @brief Creates a KafkaConsumer.
2486    *
2487    * The \p conf object must have \c group.id set to the consumer group to join.
2488    *
2489    * Use RdKafka::KafkaConsumer::close() to shut down the consumer.
2490    *
2491    * @sa RdKafka::RebalanceCb
2492    * @sa CONFIGURATION.md for \c group.id, \c session.timeout.ms,
2493    *     \c partition.assignment.strategy, etc.
2494    */
2495   static KafkaConsumer *create (const Conf *conf, std::string &errstr);
2496 
2497   virtual ~KafkaConsumer () = 0;
2498 
2499 
2500   /** @brief Returns the current partition assignment as set by
2501    *         RdKafka::KafkaConsumer::assign() */
2502   virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
2503 
2504   /** @brief Returns the current subscription as set by
2505    *         RdKafka::KafkaConsumer::subscribe() */
2506   virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
2507 
2508   /**
2509    * @brief Update the subscription set to \p topics.
2510    *
2511    * Any previous subscription will be unassigned and  unsubscribed first.
2512    *
2513    * The subscription set denotes the desired topics to consume and this
2514    * set is provided to the partition assignor (one of the elected group
2515    * members) for all clients which then uses the configured
2516    * \c partition.assignment.strategy to assign the subscription sets's
2517    * topics's partitions to the consumers, depending on their subscription.
2518    *
2519    * The result of such an assignment is a rebalancing which is either
2520    * handled automatically in librdkafka or can be overridden by the application
2521    * by providing a RdKafka::RebalanceCb.
2522    *
2523    * The rebalancing passes the assigned partition set to
2524    * RdKafka::KafkaConsumer::assign() to update what partitions are actually
2525    * being fetched by the KafkaConsumer.
2526    *
2527    * Regex pattern matching automatically performed for topics prefixed
2528    * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\"
2529    *
2530    * @remark A consumer error will be raised for each unavailable topic in the
2531    *  \p topics. The error will be ERR_UNKNOWN_TOPIC_OR_PART
2532    *  for non-existent topics, and
2533    *  ERR_TOPIC_AUTHORIZATION_FAILED for unauthorized topics.
2534    *  The consumer error will be raised through consume() (et.al.)
2535    *  with the \c RdKafka::Message::err() returning one of the
2536    *  error codes mentioned above.
2537    *  The subscribe function itself is asynchronous and will not return
2538    *  an error on unavailable topics.
2539    *
2540    * @returns an error if the provided list of topics is invalid.
2541    */
2542   virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0;
2543 
2544   /** @brief Unsubscribe from the current subscription set. */
2545   virtual ErrorCode unsubscribe () = 0;
2546 
2547   /**
2548    *  @brief Update the assignment set to \p partitions.
2549    *
2550    * The assignment set is the set of partitions actually being consumed
2551    * by the KafkaConsumer.
2552    */
2553   virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0;
2554 
2555   /**
2556    * @brief Stop consumption and remove the current assignment.
2557    */
2558   virtual ErrorCode unassign () = 0;
2559 
2560   /**
2561    * @brief Consume message or get error event, triggers callbacks.
2562    *
2563    * Will automatically call registered callbacks for any such queued events,
2564    * including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
2565    * etc.
2566    *
2567    * @remark Use \c delete to free the message.
2568    *
2569    * @remark  An application should make sure to call consume() at regular
2570    *          intervals, even if no messages are expected, to serve any
2571    *          queued callbacks waiting to be called. This is especially
2572    *          important when a RebalanceCb has been registered as it needs
2573    *          to be called and handled properly to synchronize internal
2574    *          consumer state.
2575    *
2576    * @remark Application MUST NOT call \p poll() on KafkaConsumer objects.
2577    *
2578    * @returns One of:
2579    *  - proper message (RdKafka::Message::err() is ERR_NO_ERROR)
2580    *  - error event (RdKafka::Message::err() is != ERR_NO_ERROR)
2581    *  - timeout due to no message or event in \p timeout_ms
2582    *    (RdKafka::Message::err() is ERR__TIMED_OUT)
2583    */
2584   virtual Message *consume (int timeout_ms) = 0;
2585 
2586   /**
2587    * @brief Commit offsets for the current assignment.
2588    *
2589    * @remark This is the synchronous variant that blocks until offsets
2590    *         are committed or the commit fails (see return value).
2591    *
2592    * @remark If a RdKafka::OffsetCommitCb callback is registered it will
2593    *         be called with commit details on a future call to
2594    *         RdKafka::KafkaConsumer::consume()
2595 
2596    *
2597    * @returns ERR_NO_ERROR or error code.
2598    */
2599   virtual ErrorCode commitSync () = 0;
2600 
2601   /**
2602    * @brief Asynchronous version of RdKafka::KafkaConsumer::CommitSync()
2603    *
2604    * @sa RdKafka::KafkaConsumer::commitSync()
2605    */
2606   virtual ErrorCode commitAsync () = 0;
2607 
2608   /**
2609    * @brief Commit offset for a single topic+partition based on \p message
2610    *
2611    * @remark The offset committed will be the message's offset + 1.
2612    *
2613    * @remark This is the synchronous variant.
2614    *
2615    * @sa RdKafka::KafkaConsumer::commitSync()
2616    */
2617   virtual ErrorCode commitSync (Message *message) = 0;
2618 
2619   /**
2620    * @brief Commit offset for a single topic+partition based on \p message
2621    *
2622    * @remark The offset committed will be the message's offset + 1.
2623    *
2624    * @remark This is the asynchronous variant.
2625    *
2626    * @sa RdKafka::KafkaConsumer::commitSync()
2627    */
2628   virtual ErrorCode commitAsync (Message *message) = 0;
2629 
2630   /**
2631    * @brief Commit offsets for the provided list of partitions.
2632    *
2633    * @remark The \c .offset of the partitions in \p offsets should be the
2634    *         offset where consumption will resume, i.e., the last
2635    *         processed offset + 1.
2636    *
2637    * @remark This is the synchronous variant.
2638    */
2639   virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
2640 
2641   /**
2642    * @brief Commit offset for the provided list of partitions.
2643    *
2644    * @remark The \c .offset of the partitions in \p offsets should be the
2645    *         offset where consumption will resume, i.e., the last
2646    *         processed offset + 1.
2647    *
2648    * @remark This is the asynchronous variant.
2649    */
2650   virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
2651 
2652   /**
2653    * @brief Commit offsets for the current assignment.
2654    *
2655    * @remark This is the synchronous variant that blocks until offsets
2656    *         are committed or the commit fails (see return value).
2657    *
2658    * @remark The provided callback will be called from this function.
2659    *
2660    * @returns ERR_NO_ERROR or error code.
2661    */
2662   virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0;
2663 
2664   /**
2665    * @brief Commit offsets for the provided list of partitions.
2666    *
2667    * @remark This is the synchronous variant that blocks until offsets
2668    *         are committed or the commit fails (see return value).
2669    *
2670    * @remark The provided callback will be called from this function.
2671    *
2672    * @returns ERR_NO_ERROR or error code.
2673    */
2674   virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
2675                                 OffsetCommitCb *offset_commit_cb) = 0;
2676 
2677 
2678 
2679 
2680   /**
2681    * @brief Retrieve committed offsets for topics+partitions.
2682    *
2683    * @returns ERR_NO_ERROR on success in which case the
2684    *          \p offset or \p err field of each \p partitions' element is filled
2685    *          in with the stored offset, or a partition specific error.
2686    *          Else returns an error code.
2687    */
2688   virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
2689 			       int timeout_ms) = 0;
2690 
2691   /**
2692    * @brief Retrieve current positions (offsets) for topics+partitions.
2693    *
2694    * @returns ERR_NO_ERROR on success in which case the
2695    *          \p offset or \p err field of each \p partitions' element is filled
2696    *          in with the stored offset, or a partition specific error.
2697    *          Else returns an error code.
2698    */
2699   virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
2700 
2701 
2702   /**
2703    * For pausing and resuming consumption, see
2704    * @sa RdKafka::Handle::pause() and RdKafka::Handle::resume()
2705    */
2706 
2707 
2708   /**
2709    * @brief Close and shut down the proper.
2710    *
2711    * This call will block until the following operations are finished:
2712    *  - Trigger a local rebalance to void the current assignment
2713    *  - Stop consumption for current assignment
2714    *  - Commit offsets
2715    *  - Leave group
2716    *
2717    * The maximum blocking time is roughly limited to session.timeout.ms.
2718    *
2719    * @remark Callbacks, such as RdKafka::RebalanceCb and
2720    *         RdKafka::OffsetCommitCb, etc, may be called.
2721    *
2722    * @remark The consumer object must later be freed with \c delete
2723    */
2724   virtual ErrorCode close () = 0;
2725 
2726 
2727   /**
2728    * @brief Seek consumer for topic+partition to offset which is either an
2729    *        absolute or logical offset.
2730    *
2731    * If \p timeout_ms is not 0 the call will wait this long for the
2732    * seek to be performed. If the timeout is reached the internal state
2733    * will be unknown and this function returns `ERR__TIMED_OUT`.
2734    * If \p timeout_ms is 0 it will initiate the seek but return
2735    * immediately without any error reporting (e.g., async).
2736    *
2737    * This call triggers a fetch queue barrier flush.
2738    *
2739    * @remark Consumtion for the given partition must have started for the
2740    *         seek to work. Use assign() to set the starting offset.
2741    *
2742    * @returns an ErrorCode to indicate success or failure.
2743    */
2744   virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0;
2745 
2746 
2747   /**
2748    * @brief Store offset \p offset for topic partition \p partition.
2749    * The offset will be committed (written) to the offset store according
2750    * to \p auto.commit.interval.ms or the next manual offset-less commit*()
2751    *
2752    * Per-partition success/error status propagated through TopicPartition.err()
2753    *
2754    * @remark The \c .offset field is stored as is, it will NOT be + 1.
2755    *
2756    * @remark \c enable.auto.offset.store must be set to \c false when using
2757    *         this API.
2758    *
2759    * @returns RdKafka::ERR_NO_ERROR on success, or
2760    *          RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could
2761    *          be stored, or
2762    *          RdKafka::ERR___INVALID_ARG if \c enable.auto.offset.store is true.
2763    */
2764   virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
2765 
2766 
2767   /**
2768    * @returns the current consumer group metadata associated with this consumer,
2769    *          or NULL if the consumer is configured with a \c group.id.
2770    *          This metadata object should be passed to the transactional
2771    *          producer's RdKafka::Producer::send_offsets_to_transaction() API.
2772    *
2773    * @remark The returned object must be deleted by the application.
2774    *
2775    * @sa RdKafka::Producer::send_offsets_to_transaction()
2776    */
2777   virtual ConsumerGroupMetadata *groupMetadata () = 0;
2778 
2779 
2780   /** @brief Check whether the consumer considers the current assignment to
2781    *         have been lost involuntarily. This method is only applicable for
2782    *         use with a subscribing consumer. Assignments are revoked
2783    *         immediately when determined to have been lost, so this method is
2784    *         only useful within a rebalance callback. Partitions that have
2785    *         been lost may already be owned by other members in the group and
2786    *         therefore commiting offsets, for example, may fail.
2787    *
2788    * @remark Calling assign(), incremental_assign() or incremental_unassign()
2789    *         resets this flag.
2790    *
2791    * @returns Returns true if the current partition assignment is considered
2792    *          lost, false otherwise.
2793    */
2794   virtual bool assignment_lost () = 0;
2795 
2796   /**
2797    * @brief The rebalance protocol currently in use. This will be
2798    *        "NONE" if the consumer has not (yet) joined a group, else it will
2799    *        match the rebalance protocol ("EAGER", "COOPERATIVE") of the
2800    *        configured and selected assignor(s). All configured
2801    *        assignors must have the same protocol type, meaning
2802    *        online migration of a consumer group from using one
2803    *        protocol to another (in particular upgading from EAGER
2804    *        to COOPERATIVE) without a restart is not currently
2805    *        supported.
2806    *
2807    * @returns an empty string on error, or one of
2808    *          "NONE", "EAGER", "COOPERATIVE" on success.
2809    */
2810 
2811   virtual std::string rebalance_protocol () = 0;
2812 
2813 
2814   /**
2815    * @brief Incrementally add \p partitions to the current assignment.
2816    *
2817    * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used,
2818    * this method should be used in a rebalance callback to adjust the current
2819    * assignment appropriately in the case where the rebalance type is
2820    * ERR__ASSIGN_PARTITIONS. The application must pass the partition list
2821    * passed to the callback (or a copy of it), even if the list is empty.
2822    * This method may also be used outside the context of a rebalance callback.
2823    *
2824    * @returns NULL on success, or an error object if the operation was
2825    *          unsuccessful.
2826    *
2827    * @remark The returned object must be deleted by the application.
2828    */
2829   virtual Error *incremental_assign (const std::vector<TopicPartition*> &partitions) = 0;
2830 
2831 
2832   /**
2833    * @brief Incrementally remove \p partitions from the current assignment.
2834    *
2835    * If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used,
2836    * this method should be used in a rebalance callback to adjust the current
2837    * assignment appropriately in the case where the rebalance type is
2838    * ERR__REVOKE_PARTITIONS. The application must pass the partition list
2839    * passed to the callback (or a copy of it), even if the list is empty.
2840    * This method may also be used outside the context of a rebalance callback.
2841    *
2842    * @returns NULL on success, or an error object if the operation was
2843    *          unsuccessful.
2844    *
2845    * @remark The returned object must be deleted by the application.
2846    */
2847   virtual Error *incremental_unassign (const std::vector<TopicPartition*> &partitions) = 0;
2848 
2849 };
2850 
2851 
2852 /**@}*/
2853 
2854 
2855 /**
2856  * @name Simple Consumer (legacy)
2857  * @{
2858  *
2859  */
2860 
2861 /**
2862  * @brief Simple Consumer (legacy)
2863  *
2864  * A simple non-balanced, non-group-aware, consumer.
2865  */
2866 class RD_EXPORT Consumer : public virtual Handle {
2867  public:
2868   /**
2869    * @brief Creates a new Kafka consumer handle.
2870    *
2871    * \p conf is an optional object that will be used instead of the default
2872    * configuration.
2873    * The \p conf object is reusable after this call.
2874    *
2875    * @returns the new handle on success or NULL on error in which case
2876    * \p errstr is set to a human readable error message.
2877    */
2878   static Consumer *create (const Conf *conf, std::string &errstr);
2879 
2880   virtual ~Consumer () = 0;
2881 
2882 
2883   /**
2884    * @brief Start consuming messages for topic and \p partition
2885    * at offset \p offset which may either be a proper offset (0..N)
2886    * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END.
2887    *
2888    * rdkafka will attempt to keep \p queued.min.messages (config property)
2889    * messages in the local queue by repeatedly fetching batches of messages
2890    * from the broker until the threshold is reached.
2891    *
2892    * The application shall use one of the \p ..->consume*() functions
2893    * to consume messages from the local queue, each kafka message being
2894    * represented as a `RdKafka::Message *` object.
2895    *
2896    * \p ..->start() must not be called multiple times for the same
2897    * topic and partition without stopping consumption first with
2898    * \p ..->stop().
2899    *
2900    * @returns an ErrorCode to indicate success or failure.
2901    */
2902   virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
2903 
2904   /**
2905    * @brief Start consuming messages for topic and \p partition on
2906    *        queue \p queue.
2907    *
2908    * @sa RdKafka::Consumer::start()
2909    */
2910   virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
2911                            Queue *queue) = 0;
2912 
2913   /**
2914    * @brief Stop consuming messages for topic and \p partition, purging
2915    *        all messages currently in the local queue.
2916    *
2917    * The application needs to be stop all consumers before destroying
2918    * the Consumer handle.
2919    *
2920    * @returns an ErrorCode to indicate success or failure.
2921    */
2922   virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
2923 
2924   /**
2925    * @brief Seek consumer for topic+partition to \p offset which is either an
2926    *        absolute or logical offset.
2927    *
2928    * If \p timeout_ms is not 0 the call will wait this long for the
2929    * seek to be performed. If the timeout is reached the internal state
2930    * will be unknown and this function returns `ERR__TIMED_OUT`.
2931    * If \p timeout_ms is 0 it will initiate the seek but return
2932    * immediately without any error reporting (e.g., async).
2933    *
2934    * This call triggers a fetch queue barrier flush.
2935    *
2936    * @returns an ErrorCode to indicate success or failure.
2937    */
2938   virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
2939 			  int timeout_ms) = 0;
2940 
2941   /**
2942    * @brief Consume a single message from \p topic and \p partition.
2943    *
2944    * \p timeout_ms is maximum amount of time to wait for a message to be
2945    * received.
2946    * Consumer must have been previously started with \p ..->start().
2947    *
2948    * @returns a Message object, the application needs to check if message
2949    * is an error or a proper message RdKafka::Message::err() and checking for
2950    * \p ERR_NO_ERROR.
2951    *
2952    * The message object must be destroyed when the application is done with it.
2953    *
2954    * Errors (in RdKafka::Message::err()):
2955    *  - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched.
2956    *  - ERR__PARTITION_EOF - End of partition reached, not an error.
2957    */
2958   virtual Message *consume (Topic *topic, int32_t partition,
2959                             int timeout_ms) = 0;
2960 
2961   /**
2962    * @brief Consume a single message from the specified queue.
2963    *
2964    * \p timeout_ms is maximum amount of time to wait for a message to be
2965    * received.
2966    * Consumer must have been previously started on the queue with
2967    * \p ..->start().
2968    *
2969    * @returns a Message object, the application needs to check if message
2970    * is an error or a proper message \p Message->err() and checking for
2971    * \p ERR_NO_ERROR.
2972    *
2973    * The message object must be destroyed when the application is done with it.
2974    *
2975    * Errors (in RdKafka::Message::err()):
2976    *   - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched
2977    *
2978    * Note that Message->topic() may be nullptr after certain kinds of
2979    * errors, so applications should check that it isn't null before
2980    * dereferencing it.
2981    */
2982   virtual Message *consume (Queue *queue, int timeout_ms) = 0;
2983 
2984   /**
2985    * @brief Consumes messages from \p topic and \p partition, calling
2986    *        the provided callback for each consumed messsage.
2987    *
2988    * \p consume_callback() provides higher throughput performance
2989    * than \p consume().
2990    *
2991    * \p timeout_ms is the maximum amount of time to wait for one or
2992    * more messages to arrive.
2993    *
2994    * The provided \p consume_cb instance has its \p consume_cb function
2995    * called for every message received.
2996    *
2997    * The \p opaque argument is passed to the \p consume_cb as \p opaque.
2998    *
2999    * @returns the number of messages processed or -1 on error.
3000    *
3001    * @sa RdKafka::Consumer::consume()
3002    */
3003   virtual int consume_callback (Topic *topic, int32_t partition,
3004                                 int timeout_ms,
3005                                 ConsumeCb *consume_cb,
3006                                 void *opaque) = 0;
3007 
3008   /**
3009    * @brief Consumes messages from \p queue, calling the provided callback for
3010    *        each consumed messsage.
3011    *
3012    * @sa RdKafka::Consumer::consume_callback()
3013    */
3014   virtual int consume_callback (Queue *queue, int timeout_ms,
3015                                 RdKafka::ConsumeCb *consume_cb,
3016                                 void *opaque) = 0;
3017 
3018   /**
3019    * @brief Converts an offset into the logical offset from the tail of a topic.
3020    *
3021    * \p offset is the (positive) number of items from the end.
3022    *
3023    * @returns the logical offset for message \p offset from the tail, this value
3024    *          may be passed to Consumer::start, et.al.
3025    * @remark The returned logical offset is specific to librdkafka.
3026    */
3027   static int64_t OffsetTail(int64_t offset);
3028 };
3029 
3030 /**@}*/
3031 
3032 
3033 /**
3034  * @name Producer
3035  * @{
3036  *
3037  */
3038 
3039 
3040 /**
3041  * @brief Producer
3042  */
3043 class RD_EXPORT Producer : public virtual Handle {
3044  public:
3045   /**
3046    * @brief Creates a new Kafka producer handle.
3047    *
3048    * \p conf is an optional object that will be used instead of the default
3049    * configuration.
3050    * The \p conf object is reusable after this call.
3051    *
3052    * @returns the new handle on success or NULL on error in which case
3053    *          \p errstr is set to a human readable error message.
3054    */
3055   static Producer *create (const Conf *conf, std::string &errstr);
3056 
3057 
3058   virtual ~Producer () = 0;
3059 
3060   /**
3061    * @brief RdKafka::Producer::produce() \p msgflags
3062    *
3063    * These flags are optional.
3064    */
3065   enum {
3066     RK_MSG_FREE = 0x1, /**< rdkafka will free(3) \p payload
3067                          * when it is done with it.
3068                          * Mutually exclusive with RK_MSG_COPY. */
3069     RK_MSG_COPY = 0x2, /**< the \p payload data will be copied
3070                         * and the \p payload pointer will not
3071                         * be used by rdkafka after the
3072                         * call returns.
3073                         * Mutually exclusive with RK_MSG_FREE. */
3074     RK_MSG_BLOCK = 0x4  /**< Block produce*() on message queue
3075                          *   full.
3076                          *   WARNING:
3077                          *   If a delivery report callback
3078                          *   is used the application MUST
3079                          *   call rd_kafka_poll() (or equiv.)
3080                          *   to make sure delivered messages
3081                          *   are drained from the internal
3082                          *   delivery report queue.
3083                          *   Failure to do so will result
3084                          *   in indefinately blocking on
3085                          *   the produce() call when the
3086                          *   message queue is full.
3087                          */
3088 
3089 
3090   /**@cond NO_DOC*/
3091   /* For backwards compatibility: */
3092 #ifndef MSG_COPY /* defined in sys/msg.h */
3093     , /** this comma must exist betwen
3094        *  RK_MSG_BLOCK and MSG_FREE
3095        */
3096     MSG_FREE = RK_MSG_FREE,
3097     MSG_COPY = RK_MSG_COPY
3098 #endif
3099   /**@endcond*/
3100   };
3101 
3102   /**
3103    * @brief Produce and send a single message to broker.
3104    *
3105    * This is an asynch non-blocking API.
3106    *
3107    * \p partition is the target partition, either:
3108    *   - RdKafka::Topic::PARTITION_UA (unassigned) for
3109    *     automatic partitioning using the topic's partitioner function, or
3110    *   - a fixed partition (0..N)
3111    *
3112    * \p msgflags is zero or more of the following flags OR:ed together:
3113    *    RK_MSG_BLOCK - block \p produce*() call if
3114    *                   \p queue.buffering.max.messages or
3115    *                   \p queue.buffering.max.kbytes are exceeded.
3116    *                   Messages are considered in-queue from the point they
3117    *                   are accepted by produce() until their corresponding
3118    *                   delivery report callback/event returns.
3119    *                   It is thus a requirement to call
3120    *                   poll() (or equiv.) from a separate
3121    *                   thread when RK_MSG_BLOCK is used.
3122    *                   See WARNING on \c RK_MSG_BLOCK above.
3123    *    RK_MSG_FREE - rdkafka will free(3) \p payload when it is done with it.
3124    *    RK_MSG_COPY - the \p payload data will be copied and the \p payload
3125    *               pointer will not be used by rdkafka after the
3126    *               call returns.
3127    *
3128    *  NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive.
3129    *
3130    *  If the function returns an error code and RK_MSG_FREE was specified, then
3131    *  the memory associated with the payload is still the caller's
3132    *  responsibility.
3133    *
3134    * \p payload is the message payload of size \p len bytes.
3135    *
3136    * \p key is an optional message key, if non-NULL it
3137    * will be passed to the topic partitioner as well as be sent with the
3138    * message to the broker and passed on to the consumer.
3139    *
3140    * \p msg_opaque is an optional application-provided per-message opaque
3141    * pointer that will provided in the delivery report callback (\p dr_cb) for
3142    * referencing this message.
3143    *
3144    * @returns an ErrorCode to indicate success or failure:
3145    *  - ERR_NO_ERROR           - message successfully enqueued for transmission.
3146    *
3147    *  - ERR__QUEUE_FULL        - maximum number of outstanding messages has been
3148    *                             reached: \c queue.buffering.max.message
3149    *
3150    *  - ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size:
3151    *                            \c messages.max.bytes
3152    *
3153    *  - ERR__UNKNOWN_PARTITION - requested \p partition is unknown in the
3154    *                           Kafka cluster.
3155    *
3156    *  - ERR__UNKNOWN_TOPIC     - topic is unknown in the Kafka cluster.
3157    */
3158   virtual ErrorCode produce (Topic *topic, int32_t partition,
3159                              int msgflags,
3160                              void *payload, size_t len,
3161                              const std::string *key,
3162                              void *msg_opaque) = 0;
3163 
3164   /**
3165    * @brief Variant produce() that passes the key as a pointer and length
3166    *        instead of as a const std::string *.
3167    */
3168   virtual ErrorCode produce (Topic *topic, int32_t partition,
3169                              int msgflags,
3170                              void *payload, size_t len,
3171                              const void *key, size_t key_len,
3172                              void *msg_opaque) = 0;
3173 
3174   /**
3175    * @brief produce() variant that takes topic as a string (no need for
3176    *        creating a Topic object), and also allows providing the
3177    *        message timestamp (milliseconds since beginning of epoch, UTC).
3178    *        Otherwise identical to produce() above.
3179    */
3180   virtual ErrorCode produce (const std::string topic_name, int32_t partition,
3181                              int msgflags,
3182                              void *payload, size_t len,
3183                              const void *key, size_t key_len,
3184                              int64_t timestamp, void *msg_opaque) = 0;
3185 
3186   /**
3187    * @brief produce() variant that that allows for Header support on produce
3188    *        Otherwise identical to produce() above.
3189    *
3190    * @warning The \p headers will be freed/deleted if the produce() call
3191    *          succeeds, or left untouched if produce() fails.
3192    */
3193   virtual ErrorCode produce (const std::string topic_name, int32_t partition,
3194                              int msgflags,
3195                              void *payload, size_t len,
3196                              const void *key, size_t key_len,
3197                              int64_t timestamp,
3198                              RdKafka::Headers *headers,
3199                              void *msg_opaque) = 0;
3200 
3201 
3202   /**
3203    * @brief Variant produce() that accepts vectors for key and payload.
3204    *        The vector data will be copied.
3205    */
3206   virtual ErrorCode produce (Topic *topic, int32_t partition,
3207                              const std::vector<char> *payload,
3208                              const std::vector<char> *key,
3209                              void *msg_opaque) = 0;
3210 
3211 
3212   /**
3213    * @brief Wait until all outstanding produce requests, et.al, are completed.
3214    *        This should typically be done prior to destroying a producer instance
3215    *        to make sure all queued and in-flight produce requests are completed
3216    *        before terminating.
3217    *
3218    * @remark The \c linger.ms time will be ignored for the duration of the call,
3219    *         queued messages will be sent to the broker as soon as possible.
3220    *
3221    * @remark This function will call Producer::poll() and thus
3222    *         trigger callbacks.
3223    *
3224    * @returns ERR__TIMED_OUT if \p timeout_ms was reached before all
3225    *          outstanding requests were completed, else ERR_NO_ERROR
3226    */
3227   virtual ErrorCode flush (int timeout_ms) = 0;
3228 
3229 
3230   /**
3231    * @brief Purge messages currently handled by the producer instance.
3232    *
3233    * @param purge_flags tells which messages should be purged and how.
3234    *
3235    * The application will need to call Handle::poll() or Producer::flush()
3236    * afterwards to serve the delivery report callbacks of the purged messages.
3237    *
3238    * Messages purged from internal queues fail with the delivery report
3239    * error code set to ERR__PURGE_QUEUE, while purged messages that
3240    * are in-flight to or from the broker will fail with the error code set to
3241    * ERR__PURGE_INFLIGHT.
3242    *
3243    * @warning Purging messages that are in-flight to or from the broker
3244    *          will ignore any sub-sequent acknowledgement for these messages
3245    *          received from the broker, effectively making it impossible
3246    *          for the application to know if the messages were successfully
3247    *          produced or not. This may result in duplicate messages if the
3248    *          application retries these messages at a later time.
3249    *
3250    * @remark This call may block for a short time while background thread
3251    *         queues are purged.
3252    *
3253    * @returns ERR_NO_ERROR on success,
3254    *          ERR__INVALID_ARG if the \p purge flags are invalid or unknown,
3255    *          ERR__NOT_IMPLEMENTED if called on a non-producer client instance.
3256    */
3257   virtual ErrorCode purge (int purge_flags) = 0;
3258 
3259   /**
3260    * @brief RdKafka::Handle::purge() \p purge_flags
3261    */
3262   enum {
3263     PURGE_QUEUE = 0x1, /**< Purge messages in internal queues */
3264 
3265     PURGE_INFLIGHT = 0x2, /*! Purge messages in-flight to or from the broker.
3266                            *  Purging these messages will void any future
3267                            *  acknowledgements from the broker, making it
3268                            *  impossible for the application to know if these
3269                            *  messages were successfully delivered or not.
3270                            *  Retrying these messages may lead to duplicates. */
3271 
3272     PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue
3273                               * purging to finish. */
3274   };
3275 
3276   /**
3277    * @name Transactional API
3278    * @{
3279    *
3280    * Requires Kafka broker version v0.11.0 or later
3281    *
3282    * See the Transactional API documentation in rdkafka.h for more information.
3283    */
3284 
3285   /**
3286    * @brief Initialize transactions for the producer instance.
3287    *
3288    * @param timeout_ms The maximum time to block. On timeout the operation
3289    *                   may continue in the background, depending on state,
3290    *                   and it is okay to call init_transactions() again.
3291    *
3292    * @returns an RdKafka::Error object on error, or NULL on success.
3293    *          Check whether the returned error object permits retrying
3294    *          by calling RdKafka::Error::is_retriable(), or whether a fatal
3295    *          error has been raised by calling RdKafka::Error::is_fatal().
3296    *
3297    * @remark The returned error object (if not NULL) must be deleted.
3298    *
3299    * See rd_kafka_init_transactions() in rdkafka.h for more information.
3300    *
3301    */
3302   virtual Error *init_transactions (int timeout_ms) = 0;
3303 
3304 
3305   /**
3306    * @brief init_transactions() must have been called successfully
3307    *        (once) before this function is called.
3308    *
3309    * @returns an RdKafka::Error object on error, or NULL on success.
3310    *          Check whether a fatal error has been raised by calling
3311    *          RdKafka::Error::is_fatal_error().
3312    *
3313    * @remark The returned error object (if not NULL) must be deleted.
3314    *
3315    * See rd_kafka_begin_transaction() in rdkafka.h for more information.
3316    */
3317   virtual Error *begin_transaction () = 0;
3318 
3319   /**
3320    * @brief Sends a list of topic partition offsets to the consumer group
3321    *        coordinator for \p group_metadata, and marks the offsets as part
3322    *        part of the current transaction.
3323    *        These offsets will be considered committed only if the transaction
3324    *        is committed successfully.
3325    *
3326    *        The offsets should be the next message your application will
3327    *        consume,
3328    *        i.e., the last processed message's offset + 1 for each partition.
3329    *        Either track the offsets manually during processing or use
3330    *        RdKafka::KafkaConsumer::position() (on the consumer) to get the
3331    *        current offsets for
3332    *        the partitions assigned to the consumer.
3333    *
3334    *        Use this method at the end of a consume-transform-produce loop prior
3335    *        to committing the transaction with commit_transaction().
3336    *
3337    * @param offsets List of offsets to commit to the consumer group upon
3338    *                successful commit of the transaction. Offsets should be
3339    *                the next message to consume,
3340    *                e.g., last processed message + 1.
3341    * @param group_metadata The current consumer group metadata as returned by
3342    *                   RdKafka::KafkaConsumer::groupMetadata() on the consumer
3343    *                   instance the provided offsets were consumed from.
3344    * @param timeout_ms Maximum time allowed to register the
3345    *                   offsets on the broker.
3346    *
3347    * @remark This function must be called on the transactional producer
3348    *         instance, not the consumer.
3349    *
3350    * @remark The consumer must disable auto commits
3351    *         (set \c enable.auto.commit to false on the consumer).
3352    *
3353    * @returns an RdKafka::Error object on error, or NULL on success.
3354    *          Check whether the returned error object permits retrying
3355    *          by calling RdKafka::Error::is_retriable(), or whether an abortable
3356    *          or fatal error has been raised by calling
3357    *          RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal()
3358    *          respectively.
3359    *
3360    * @remark The returned error object (if not NULL) must be deleted.
3361    *
3362    * See rd_kafka_send_offsets_to_transaction() in rdkafka.h for
3363    * more information.
3364    */
3365   virtual Error *send_offsets_to_transaction (
3366           const std::vector<TopicPartition*> &offsets,
3367           const ConsumerGroupMetadata *group_metadata,
3368           int timeout_ms) = 0;
3369 
3370   /**
3371    * @brief Commit the current transaction as started with begin_transaction().
3372    *
3373    *        Any outstanding messages will be flushed (delivered) before actually
3374    *        committing the transaction.
3375    *
3376    * @param timeout_ms The maximum time to block. On timeout the operation
3377    *                   may continue in the background, depending on state,
3378    *                   and it is okay to call this function again.
3379    *                   Pass -1 to use the remaining transaction timeout,
3380    *                   this is the recommended use.
3381    *
3382    * @remark It is strongly recommended to always pass -1 (remaining transaction
3383    *         time) as the \p timeout_ms. Using other values risk internal
3384    *         state desynchronization in case any of the underlying protocol
3385    *         requests fail.
3386    *
3387    * @returns an RdKafka::Error object on error, or NULL on success.
3388    *          Check whether the returned error object permits retrying
3389    *          by calling RdKafka::Error::is_retriable(), or whether an abortable
3390    *          or fatal error has been raised by calling
3391    *          RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal()
3392    *          respectively.
3393    *
3394    * @remark The returned error object (if not NULL) must be deleted.
3395    *
3396    * See rd_kafka_commit_transaction() in rdkafka.h for more information.
3397    */
3398   virtual Error *commit_transaction (int timeout_ms) = 0;
3399 
3400   /**
3401    * @brief Aborts the ongoing transaction.
3402    *
3403    *        This function should also be used to recover from non-fatal abortable
3404    *        transaction errors.
3405    *
3406    *        Any outstanding messages will be purged and fail with
3407    *        RdKafka::ERR__PURGE_INFLIGHT or RdKafka::ERR__PURGE_QUEUE.
3408    *        See RdKafka::Producer::purge() for details.
3409    *
3410    * @param timeout_ms The maximum time to block. On timeout the operation
3411    *                   may continue in the background, depending on state,
3412    *                   and it is okay to call this function again.
3413    *                   Pass -1 to use the remaining transaction timeout,
3414    *                   this is the recommended use.
3415    *
3416    * @remark It is strongly recommended to always pass -1 (remaining transaction
3417    *         time) as the \p timeout_ms. Using other values risk internal
3418    *         state desynchronization in case any of the underlying protocol
3419    *         requests fail.
3420    *
3421    * @returns an RdKafka::Error object on error, or NULL on success.
3422    *          Check whether the returned error object permits retrying
3423    *          by calling RdKafka::Error::is_retriable(), or whether a
3424    *          fatal error has been raised by calling RdKafka::Error::is_fatal().
3425    *
3426    * @remark The returned error object (if not NULL) must be deleted.
3427    *
3428    * See rd_kafka_abort_transaction() in rdkafka.h for more information.
3429    */
3430   virtual Error *abort_transaction (int timeout_ms) = 0;
3431 
3432   /**@}*/
3433 };
3434 
3435 /**@}*/
3436 
3437 
3438 /**
3439  * @name Metadata interface
3440  * @{
3441  *
3442  */
3443 
3444 
3445 /**
3446  * @brief Metadata: Broker information
3447  */
3448 class BrokerMetadata {
3449  public:
3450   /** @returns Broker id */
3451   virtual int32_t id() const = 0;
3452 
3453   /** @returns Broker hostname */
3454   virtual const std::string host() const = 0;
3455 
3456   /** @returns Broker listening port */
3457   virtual int port() const = 0;
3458 
3459   virtual ~BrokerMetadata() = 0;
3460 };
3461 
3462 
3463 
3464 /**
3465  * @brief Metadata: Partition information
3466  */
3467 class PartitionMetadata {
3468  public:
3469   /** @brief Replicas */
3470   typedef std::vector<int32_t> ReplicasVector;
3471   /** @brief ISRs (In-Sync-Replicas) */
3472   typedef std::vector<int32_t> ISRSVector;
3473 
3474   /** @brief Replicas iterator */
3475   typedef ReplicasVector::const_iterator ReplicasIterator;
3476   /** @brief ISRs iterator */
3477   typedef ISRSVector::const_iterator     ISRSIterator;
3478 
3479 
3480   /** @returns Partition id */
3481   virtual int32_t id() const = 0;
3482 
3483   /** @returns Partition error reported by broker */
3484   virtual ErrorCode err() const = 0;
3485 
3486   /** @returns Leader broker (id) for partition */
3487   virtual int32_t leader() const = 0;
3488 
3489   /** @returns Replica brokers */
3490   virtual const std::vector<int32_t> *replicas() const = 0;
3491 
3492   /** @returns In-Sync-Replica brokers
3493    *  @warning The broker may return a cached/outdated list of ISRs.
3494    */
3495   virtual const std::vector<int32_t> *isrs() const = 0;
3496 
3497   virtual ~PartitionMetadata() = 0;
3498 };
3499 
3500 
3501 
3502 /**
3503  * @brief Metadata: Topic information
3504  */
3505 class TopicMetadata {
3506  public:
3507   /** @brief Partitions */
3508   typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
3509   /** @brief Partitions iterator */
3510   typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
3511 
3512   /** @returns Topic name */
3513   virtual const std::string topic() const = 0;
3514 
3515   /** @returns Partition list */
3516   virtual const PartitionMetadataVector *partitions() const = 0;
3517 
3518   /** @returns Topic error reported by broker */
3519   virtual ErrorCode err() const = 0;
3520 
3521   virtual ~TopicMetadata() = 0;
3522 };
3523 
3524 
3525 /**
3526  * @brief Metadata container
3527  */
3528 class Metadata {
3529  public:
3530   /** @brief Brokers */
3531   typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
3532   /** @brief Topics */
3533   typedef std::vector<const TopicMetadata*>  TopicMetadataVector;
3534 
3535   /** @brief Brokers iterator */
3536   typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
3537   /** @brief Topics iterator */
3538   typedef TopicMetadataVector::const_iterator  TopicMetadataIterator;
3539 
3540 
3541   /**
3542    * @brief Broker list
3543    * @remark Ownership of the returned pointer is retained by the instance of
3544    * Metadata that is called.
3545    */
3546   virtual const BrokerMetadataVector *brokers() const = 0;
3547 
3548   /**
3549    * @brief Topic list
3550    * @remark Ownership of the returned pointer is retained by the instance of
3551    * Metadata that is called.
3552    */
3553   virtual const TopicMetadataVector  *topics() const = 0;
3554 
3555   /** @brief Broker (id) originating this metadata */
3556   virtual int32_t orig_broker_id() const = 0;
3557 
3558   /** @brief Broker (name) originating this metadata */
3559   virtual const std::string orig_broker_name() const = 0;
3560 
3561   virtual ~Metadata() = 0;
3562 };
3563 
3564 /**@}*/
3565 
3566 }
3567 
3568 
3569 #endif /* _RDKAFKACPP_H_ */
3570