1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_INT_H_
30 #define _RDKAFKACPP_INT_H_
31 
32 #include <string>
33 #include <iostream>
34 #include <cstring>
35 #include <stdlib.h>
36 
37 #include "rdkafkacpp.h"
38 
39 extern "C" {
40 #include "../src/rdkafka.h"
41 }
42 
43 #ifdef _MSC_VER
44 /* Visual Studio */
45 #include "../src/win32_config.h"
46 #else
47 /* POSIX / UNIX based systems */
48 #include "../config.h" /* mklove output */
49 #endif
50 
51 #ifdef _MSC_VER
52 typedef int mode_t;
53 #pragma warning(disable : 4250)
54 #endif
55 
56 
57 namespace RdKafka {
58 
59 void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque);
60 void log_cb_trampoline (const rd_kafka_t *rk, int level,
61                         const char *fac, const char *buf);
62 void error_cb_trampoline (rd_kafka_t *rk, int err, const char *reason,
63                           void *opaque);
64 void throttle_cb_trampoline (rd_kafka_t *rk, const char *broker_name,
65 			     int32_t broker_id, int throttle_time_ms,
66 			     void *opaque);
67 int stats_cb_trampoline (rd_kafka_t *rk, char *json, size_t json_len,
68                          void *opaque);
69 int socket_cb_trampoline (int domain, int type, int protocol, void *opaque);
70 int open_cb_trampoline (const char *pathname, int flags, mode_t mode,
71                         void *opaque);
72 void rebalance_cb_trampoline (rd_kafka_t *rk,
73                               rd_kafka_resp_err_t err,
74                               rd_kafka_topic_partition_list_t *c_partitions,
75                               void *opaque);
76 void offset_commit_cb_trampoline0 (
77         rd_kafka_t *rk,
78         rd_kafka_resp_err_t err,
79         rd_kafka_topic_partition_list_t *c_offsets, void *opaque);
80 void oauthbearer_token_refresh_cb_trampoline (rd_kafka_t *rk,
81                                               const char *oauthbearer_config,
82                                               void *opaque);
83 
84  int ssl_cert_verify_cb_trampoline (
85          rd_kafka_t *rk,
86          const char *broker_name,
87          int32_t broker_id,
88          int *x509_error,
89          int depth,
90          const char *buf, size_t size,
91          char *errstr, size_t errstr_size,
92          void *opaque);
93 
94 rd_kafka_topic_partition_list_t *
95     partitions_to_c_parts (const std::vector<TopicPartition*> &partitions);
96 
97 /**
98  * @brief Update the application provided 'partitions' with info from 'c_parts'
99  */
100 void update_partitions_from_c_parts (std::vector<TopicPartition*> &partitions,
101                                      const rd_kafka_topic_partition_list_t *c_parts);
102 
103 
104 class ErrorImpl : public Error {
105  public:
~ErrorImpl()106   ~ErrorImpl () {
107     rd_kafka_error_destroy(c_error_);
108   };
109 
ErrorImpl(ErrorCode code,const std::string * errstr)110   ErrorImpl (ErrorCode code, const std::string *errstr) {
111     c_error_ = rd_kafka_error_new(static_cast<rd_kafka_resp_err_t>(code),
112                                   errstr ? "%s" : NULL,
113                                   errstr ? errstr->c_str() : NULL);
114   }
115 
ErrorImpl(rd_kafka_error_t * c_error)116   ErrorImpl (rd_kafka_error_t *c_error):
117       c_error_(c_error) {};
118 
create(ErrorCode code,const std::string * errstr)119   static Error *create (ErrorCode code, const std::string *errstr) {
120     return new ErrorImpl(code, errstr);
121   }
122 
code()123   ErrorCode   code () const {
124     return static_cast<ErrorCode>(rd_kafka_error_code(c_error_));
125   }
126 
name()127   std::string name () const {
128     return std::string(rd_kafka_error_name(c_error_));
129   }
130 
str()131   std::string str () const {
132     return std::string(rd_kafka_error_string(c_error_));
133   }
134 
is_fatal()135   bool is_fatal () const {
136     return rd_kafka_error_is_fatal(c_error_);
137   }
138 
is_retriable()139   bool is_retriable () const {
140     return rd_kafka_error_is_retriable(c_error_);
141   }
142 
txn_requires_abort()143   bool txn_requires_abort () const {
144     return rd_kafka_error_txn_requires_abort(c_error_);
145   }
146 
147   rd_kafka_error_t *c_error_;
148 };
149 
150 
151 class EventImpl : public Event {
152  public:
~EventImpl()153   ~EventImpl () {};
154 
EventImpl(Type type,ErrorCode err,Severity severity,const char * fac,const char * str)155   EventImpl (Type type, ErrorCode err, Severity severity,
156              const char *fac, const char *str):
157   type_(type), err_(err), severity_(severity), fac_(fac ? fac : ""),
158 	  str_(str), id_(0), throttle_time_(0) {};
159 
EventImpl(Type type)160   EventImpl (Type type):
161   type_(type), err_(ERR_NO_ERROR), severity_(EVENT_SEVERITY_EMERG),
162 	  fac_(""), str_(""), id_(0), throttle_time_(0) {};
163 
type()164   Type        type () const { return type_; }
err()165   ErrorCode   err () const { return err_; }
severity()166   Severity    severity () const { return severity_; }
fac()167   std::string fac () const { return fac_; }
str()168   std::string str () const { return str_; }
broker_name()169   std::string broker_name () const {
170 	  if (type_ == EVENT_THROTTLE)
171 		  return str_;
172 	  else
173 		  return std::string("");
174   }
broker_id()175   int         broker_id () const { return id_; }
throttle_time()176   int         throttle_time () const { return throttle_time_; }
177 
fatal()178   bool        fatal () const { return fatal_; }
179 
180   Type        type_;
181   ErrorCode   err_;
182   Severity    severity_;
183   std::string fac_;
184   std::string str_;         /* reused for THROTTLE broker_name */
185   int         id_;
186   int         throttle_time_;
187   bool        fatal_;
188 };
189 
190 
191 class HeadersImpl : public Headers {
192  public:
HeadersImpl()193   HeadersImpl ():
194   headers_ (rd_kafka_headers_new(8)) {}
195 
HeadersImpl(rd_kafka_headers_t * headers)196   HeadersImpl (rd_kafka_headers_t *headers):
197   headers_ (headers) {}
198 
HeadersImpl(const std::vector<Header> & headers)199   HeadersImpl (const std::vector<Header> &headers) {
200     if (headers.size() > 0) {
201       headers_ = rd_kafka_headers_new(headers.size());
202       from_vector(headers);
203     } else {
204       headers_ = rd_kafka_headers_new(8);
205     }
206   }
207 
~HeadersImpl()208   ~HeadersImpl() {
209     if (headers_) {
210       rd_kafka_headers_destroy(headers_);
211     }
212   }
213 
add(const std::string & key,const char * value)214   ErrorCode add(const std::string& key, const char *value) {
215     rd_kafka_resp_err_t err;
216     err = rd_kafka_header_add(headers_,
217                               key.c_str(), key.size(),
218                               value, -1);
219     return static_cast<RdKafka::ErrorCode>(err);
220   }
221 
add(const std::string & key,const void * value,size_t value_size)222   ErrorCode add(const std::string& key, const void *value, size_t value_size) {
223     rd_kafka_resp_err_t err;
224     err = rd_kafka_header_add(headers_,
225                               key.c_str(), key.size(),
226                               value, value_size);
227     return static_cast<RdKafka::ErrorCode>(err);
228   }
229 
add(const std::string & key,const std::string & value)230   ErrorCode add(const std::string &key, const std::string &value) {
231     rd_kafka_resp_err_t err;
232     err = rd_kafka_header_add(headers_,
233                               key.c_str(), key.size(),
234                               value.c_str(), value.size());
235     return static_cast<RdKafka::ErrorCode>(err);
236   }
237 
add(const Header & header)238   ErrorCode add(const Header &header) {
239     rd_kafka_resp_err_t err;
240     err = rd_kafka_header_add(headers_,
241                               header.key().c_str(), header.key().size(),
242                               header.value(), header.value_size());
243     return static_cast<RdKafka::ErrorCode>(err);
244   }
245 
remove(const std::string & key)246   ErrorCode remove(const std::string& key) {
247     rd_kafka_resp_err_t err;
248     err = rd_kafka_header_remove (headers_, key.c_str());
249     return static_cast<RdKafka::ErrorCode>(err);
250   }
251 
get(const std::string & key)252   std::vector<Headers::Header> get(const std::string &key) const {
253     std::vector<Headers::Header> headers;
254     const void *value;
255     size_t size;
256     rd_kafka_resp_err_t err;
257     for (size_t idx = 0;
258          !(err = rd_kafka_header_get(headers_, idx, key.c_str(),
259                                      &value, &size)) ;
260          idx++) {
261       headers.push_back(Headers::Header(key, value, size));
262     }
263     return headers;
264   }
265 
get_last(const std::string & key)266   Headers::Header get_last(const std::string& key) const {
267     const void *value;
268     size_t size;
269     rd_kafka_resp_err_t err;
270     err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size);
271     return Headers::Header(key, value, size,
272                            static_cast<RdKafka::ErrorCode>(err));
273   }
274 
get_all()275   std::vector<Headers::Header> get_all() const {
276     std::vector<Headers::Header> headers;
277     size_t idx = 0;
278     const char *name;
279     const void *valuep;
280     size_t size;
281     while (!rd_kafka_header_get_all(headers_, idx++,
282                                     &name, &valuep, &size)) {
283       headers.push_back(Headers::Header(name, valuep, size));
284     }
285     return headers;
286   }
287 
size()288   size_t size() const {
289     return rd_kafka_header_cnt(headers_);
290   }
291 
292   /** @brief Reset the C headers pointer to NULL. */
c_headers_destroyed()293   void c_headers_destroyed() {
294     headers_ = NULL;
295   }
296 
297   /** @returns the underlying C headers, or NULL. */
c_ptr()298   rd_kafka_headers_t *c_ptr() {
299     return headers_;
300   }
301 
302 
303 private:
from_vector(const std::vector<Header> & headers)304   void from_vector(const std::vector<Header> &headers) {
305     if (headers.size() == 0)
306       return;
307     for (std::vector<Header>::const_iterator it = headers.begin();
308          it != headers.end(); it++)
309       this->add(*it);
310   }
311 
312   HeadersImpl(HeadersImpl const&) /*= delete*/;
313   HeadersImpl& operator=(HeadersImpl const&) /*= delete*/;
314 
315   rd_kafka_headers_t *headers_;
316 };
317 
318 
319 
320 class MessageImpl : public Message {
321  public:
~MessageImpl()322   ~MessageImpl () {
323     if (free_rkmessage_)
324       rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_));
325     if (key_)
326       delete key_;
327     if (headers_)
328       delete headers_;
329   };
330 
MessageImpl(RdKafka::Topic * topic,rd_kafka_message_t * rkmessage)331   MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage):
332   topic_(topic), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL),
333   headers_(NULL) {}
334 
MessageImpl(RdKafka::Topic * topic,rd_kafka_message_t * rkmessage,bool dofree)335   MessageImpl (RdKafka::Topic *topic, rd_kafka_message_t *rkmessage,
336                bool dofree):
337   topic_(topic), rkmessage_(rkmessage), free_rkmessage_(dofree), key_(NULL),
338   headers_(NULL) {}
339 
MessageImpl(rd_kafka_message_t * rkmessage)340   MessageImpl (rd_kafka_message_t *rkmessage):
341   topic_(NULL), rkmessage_(rkmessage), free_rkmessage_(true), key_(NULL),
342   headers_(NULL) {
343     if (rkmessage->rkt) {
344       /* Possibly NULL */
345       topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt));
346     }
347   }
348 
349   /* Create errored message */
MessageImpl(RdKafka::Topic * topic,RdKafka::ErrorCode err)350   MessageImpl (RdKafka::Topic *topic, RdKafka::ErrorCode err):
351   topic_(topic), free_rkmessage_(false), key_(NULL), headers_(NULL) {
352     rkmessage_ = &rkmessage_err_;
353     memset(&rkmessage_err_, 0, sizeof(rkmessage_err_));
354     rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err);
355   }
356 
errstr()357   std::string         errstr() const {
358     /* FIXME: If there is an error string in payload (for consume_cb)
359      *        it wont be shown since 'payload' is reused for errstr
360      *        and we cant distinguish between consumer and producer.
361      *        For the producer case the payload needs to be the original
362      *        payload pointer. */
363     const char *es = rd_kafka_err2str(rkmessage_->err);
364     return std::string(es ? es : "");
365   }
366 
err()367   ErrorCode           err () const {
368     return static_cast<RdKafka::ErrorCode>(rkmessage_->err);
369   }
370 
topic()371   Topic              *topic () const { return topic_; }
topic_name()372   std::string         topic_name  () const {
373           if (rkmessage_->rkt)
374                   return rd_kafka_topic_name(rkmessage_->rkt);
375           else
376                   return "";
377   }
partition()378   int32_t             partition () const { return rkmessage_->partition; }
payload()379   void               *payload () const { return rkmessage_->payload; }
len()380   size_t              len () const { return rkmessage_->len; }
key()381   const std::string  *key () const {
382     if (key_) {
383       return key_;
384     } else if (rkmessage_->key) {
385       key_ = new std::string(static_cast<char const*>(rkmessage_->key), rkmessage_->key_len);
386       return key_;
387     }
388     return NULL;
389   }
key_pointer()390   const void         *key_pointer () const { return rkmessage_->key; }
key_len()391   size_t              key_len () const { return rkmessage_->key_len; }
392 
offset()393   int64_t             offset () const { return rkmessage_->offset; }
394 
timestamp()395   MessageTimestamp   timestamp () const {
396 	  MessageTimestamp ts;
397 	  rd_kafka_timestamp_type_t tstype;
398 	  ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype);
399 	  ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype);
400 	  return ts;
401   }
402 
msg_opaque()403   void               *msg_opaque () const { return rkmessage_->_private; };
404 
latency()405   int64_t             latency () const {
406           return rd_kafka_message_latency(rkmessage_);
407   }
408 
c_ptr()409   struct rd_kafka_message_s *c_ptr () {
410           return rkmessage_;
411   }
412 
status()413   Status status () const {
414           return static_cast<Status>(rd_kafka_message_status(rkmessage_));
415   }
416 
headers()417   Headers *headers () {
418     ErrorCode err;
419     return headers(&err);
420   }
421 
headers(ErrorCode * err)422   Headers *headers (ErrorCode *err) {
423     *err = ERR_NO_ERROR;
424 
425     if (!headers_) {
426       rd_kafka_headers_t *c_hdrs;
427       rd_kafka_resp_err_t c_err;
428 
429       if ((c_err = rd_kafka_message_detach_headers(rkmessage_, &c_hdrs))) {
430         *err = static_cast<RdKafka::ErrorCode>(c_err);
431         return NULL;
432       }
433 
434       headers_ = new HeadersImpl(c_hdrs);
435     }
436 
437     return headers_;
438   }
439 
440   RdKafka::Topic *topic_;
441   rd_kafka_message_t *rkmessage_;
442   bool free_rkmessage_;
443   /* For error signalling by the C++ layer the .._err_ message is
444    * used as a place holder and rkmessage_ is set to point to it. */
445   rd_kafka_message_t rkmessage_err_;
446   mutable std::string *key_; /* mutable because it's a cached value */
447 
448 private:
449   /* "delete" copy ctor + copy assignment, for safety of key_ */
450   MessageImpl(MessageImpl const&) /*= delete*/;
451   MessageImpl& operator=(MessageImpl const&) /*= delete*/;
452 
453   RdKafka::Headers *headers_;
454 };
455 
456 
457 class ConfImpl : public Conf {
458  public:
ConfImpl()459   ConfImpl()
460       :consume_cb_(NULL),
461       dr_cb_(NULL),
462       event_cb_(NULL),
463       socket_cb_(NULL),
464       open_cb_(NULL),
465       partitioner_cb_(NULL),
466       partitioner_kp_cb_(NULL),
467       rebalance_cb_(NULL),
468       offset_commit_cb_(NULL),
469       oauthbearer_token_refresh_cb_(NULL),
470       ssl_cert_verify_cb_(NULL),
471       rk_conf_(NULL),
472       rkt_conf_(NULL){}
~ConfImpl()473   ~ConfImpl () {
474     if (rk_conf_)
475       rd_kafka_conf_destroy(rk_conf_);
476     else if (rkt_conf_)
477       rd_kafka_topic_conf_destroy(rkt_conf_);
478   }
479 
480   Conf::ConfResult set(const std::string &name,
481                        const std::string &value,
482                        std::string &errstr);
483 
set(const std::string & name,DeliveryReportCb * dr_cb,std::string & errstr)484   Conf::ConfResult set (const std::string &name, DeliveryReportCb *dr_cb,
485                         std::string &errstr) {
486     if (name != "dr_cb") {
487       errstr = "Invalid value type, expected RdKafka::DeliveryReportCb";
488       return Conf::CONF_INVALID;
489     }
490 
491     if (!rk_conf_) {
492       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
493       return Conf::CONF_INVALID;
494     }
495 
496     dr_cb_ = dr_cb;
497     return Conf::CONF_OK;
498   }
499 
set(const std::string & name,OAuthBearerTokenRefreshCb * oauthbearer_token_refresh_cb,std::string & errstr)500   Conf::ConfResult set (const std::string &name,
501                         OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
502                         std::string &errstr) {
503     if (name != "oauthbearer_token_refresh_cb") {
504       errstr = "Invalid value type, expected RdKafka::OAuthBearerTokenRefreshCb";
505       return Conf::CONF_INVALID;
506     }
507 
508     if (!rk_conf_) {
509       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
510       return Conf::CONF_INVALID;
511     }
512 
513     oauthbearer_token_refresh_cb_ = oauthbearer_token_refresh_cb;
514     return Conf::CONF_OK;
515   }
516 
set(const std::string & name,EventCb * event_cb,std::string & errstr)517   Conf::ConfResult set (const std::string &name, EventCb *event_cb,
518                         std::string &errstr) {
519     if (name != "event_cb") {
520       errstr = "Invalid value type, expected RdKafka::EventCb";
521       return Conf::CONF_INVALID;
522     }
523 
524     if (!rk_conf_) {
525       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
526       return Conf::CONF_INVALID;
527     }
528 
529     event_cb_ = event_cb;
530     return Conf::CONF_OK;
531   }
532 
set(const std::string & name,const Conf * topic_conf,std::string & errstr)533   Conf::ConfResult set (const std::string &name, const Conf *topic_conf,
534                         std::string &errstr) {
535     const ConfImpl *tconf_impl =
536         dynamic_cast<const RdKafka::ConfImpl *>(topic_conf);
537     if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) {
538       errstr = "Invalid value type, expected RdKafka::Conf";
539       return Conf::CONF_INVALID;
540     }
541 
542     if (!rk_conf_) {
543       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
544       return Conf::CONF_INVALID;
545     }
546 
547     rd_kafka_conf_set_default_topic_conf(rk_conf_,
548                                          rd_kafka_topic_conf_dup(tconf_impl->
549                                                                  rkt_conf_));
550 
551     return Conf::CONF_OK;
552   }
553 
set(const std::string & name,PartitionerCb * partitioner_cb,std::string & errstr)554   Conf::ConfResult set (const std::string &name, PartitionerCb *partitioner_cb,
555                         std::string &errstr) {
556     if (name != "partitioner_cb") {
557       errstr = "Invalid value type, expected RdKafka::PartitionerCb";
558       return Conf::CONF_INVALID;
559     }
560 
561     if (!rkt_conf_) {
562       errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
563       return Conf::CONF_INVALID;
564     }
565 
566     partitioner_cb_ = partitioner_cb;
567     return Conf::CONF_OK;
568   }
569 
set(const std::string & name,PartitionerKeyPointerCb * partitioner_kp_cb,std::string & errstr)570   Conf::ConfResult set (const std::string &name,
571                         PartitionerKeyPointerCb *partitioner_kp_cb,
572                         std::string &errstr) {
573     if (name != "partitioner_key_pointer_cb") {
574       errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb";
575       return Conf::CONF_INVALID;
576     }
577 
578     if (!rkt_conf_) {
579       errstr = "Requires RdKafka::Conf::CONF_TOPIC object";
580       return Conf::CONF_INVALID;
581     }
582 
583     partitioner_kp_cb_ = partitioner_kp_cb;
584     return Conf::CONF_OK;
585   }
586 
set(const std::string & name,SocketCb * socket_cb,std::string & errstr)587   Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
588                         std::string &errstr) {
589     if (name != "socket_cb") {
590       errstr = "Invalid value type, expected RdKafka::SocketCb";
591       return Conf::CONF_INVALID;
592     }
593 
594     if (!rk_conf_) {
595       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
596       return Conf::CONF_INVALID;
597     }
598 
599     socket_cb_ = socket_cb;
600     return Conf::CONF_OK;
601   }
602 
603 
set(const std::string & name,OpenCb * open_cb,std::string & errstr)604   Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
605                         std::string &errstr) {
606     if (name != "open_cb") {
607       errstr = "Invalid value type, expected RdKafka::OpenCb";
608       return Conf::CONF_INVALID;
609     }
610 
611     if (!rk_conf_) {
612       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
613       return Conf::CONF_INVALID;
614     }
615 
616     open_cb_ = open_cb;
617     return Conf::CONF_OK;
618   }
619 
620 
621 
622 
set(const std::string & name,RebalanceCb * rebalance_cb,std::string & errstr)623   Conf::ConfResult set (const std::string &name, RebalanceCb *rebalance_cb,
624                         std::string &errstr) {
625     if (name != "rebalance_cb") {
626       errstr = "Invalid value type, expected RdKafka::RebalanceCb";
627       return Conf::CONF_INVALID;
628     }
629 
630     if (!rk_conf_) {
631       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
632       return Conf::CONF_INVALID;
633     }
634 
635     rebalance_cb_ = rebalance_cb;
636     return Conf::CONF_OK;
637   }
638 
639 
set(const std::string & name,OffsetCommitCb * offset_commit_cb,std::string & errstr)640   Conf::ConfResult set (const std::string &name,
641                         OffsetCommitCb *offset_commit_cb,
642                         std::string &errstr) {
643     if (name != "offset_commit_cb") {
644       errstr = "Invalid value type, expected RdKafka::OffsetCommitCb";
645       return Conf::CONF_INVALID;
646     }
647 
648     if (!rk_conf_) {
649       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
650       return Conf::CONF_INVALID;
651     }
652 
653     offset_commit_cb_ = offset_commit_cb;
654     return Conf::CONF_OK;
655   }
656 
657 
set(const std::string & name,SslCertificateVerifyCb * ssl_cert_verify_cb,std::string & errstr)658   Conf::ConfResult set (const std::string &name,
659                         SslCertificateVerifyCb *ssl_cert_verify_cb,
660                         std::string &errstr) {
661     if (name != "ssl_cert_verify_cb") {
662       errstr = "Invalid value type, expected RdKafka::SslCertificateVerifyCb";
663       return Conf::CONF_INVALID;
664     }
665 
666     if (!rk_conf_) {
667       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
668       return Conf::CONF_INVALID;
669     }
670 
671     ssl_cert_verify_cb_ = ssl_cert_verify_cb;
672     return Conf::CONF_OK;
673   }
674 
set_ssl_cert(RdKafka::CertificateType cert_type,RdKafka::CertificateEncoding cert_enc,const void * buffer,size_t size,std::string & errstr)675   Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type,
676                                  RdKafka::CertificateEncoding cert_enc,
677                                  const void *buffer, size_t size,
678                                  std::string &errstr) {
679     rd_kafka_conf_res_t res;
680     char errbuf[512];
681 
682     if (!rk_conf_) {
683       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
684       return Conf::CONF_INVALID;
685     }
686 
687     res = rd_kafka_conf_set_ssl_cert(
688         rk_conf_,
689         static_cast<rd_kafka_cert_type_t>(cert_type),
690         static_cast<rd_kafka_cert_enc_t>(cert_enc),
691         buffer, size, errbuf, sizeof(errbuf));
692 
693     if (res != RD_KAFKA_CONF_OK)
694       errstr = errbuf;
695 
696     return static_cast<Conf::ConfResult>(res);
697   }
698 
699 
get(const std::string & name,std::string & value)700   Conf::ConfResult get(const std::string &name, std::string &value) const {
701     if (name.compare("dr_cb") == 0 ||
702         name.compare("event_cb") == 0 ||
703         name.compare("partitioner_cb") == 0 ||
704         name.compare("partitioner_key_pointer_cb") == 0 ||
705         name.compare("socket_cb") == 0 ||
706         name.compare("open_cb") == 0 ||
707         name.compare("rebalance_cb") == 0 ||
708         name.compare("offset_commit_cb") == 0 ||
709         name.compare("oauthbearer_token_refresh_cb") == 0 ||
710         name.compare("ssl_cert_verify_cb") == 0) {
711       return Conf::CONF_INVALID;
712     }
713     rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID;
714 
715     /* Get size of property */
716     size_t size;
717     if (rk_conf_)
718       res = rd_kafka_conf_get(rk_conf_,
719                               name.c_str(), NULL, &size);
720     else if (rkt_conf_)
721       res = rd_kafka_topic_conf_get(rkt_conf_,
722                                     name.c_str(), NULL, &size);
723     if (res != RD_KAFKA_CONF_OK)
724       return static_cast<Conf::ConfResult>(res);
725 
726     char *tmpValue = new char[size];
727 
728     if (rk_conf_)
729       res = rd_kafka_conf_get(rk_conf_, name.c_str(),
730                               tmpValue, &size);
731     else if (rkt_conf_)
732       res = rd_kafka_topic_conf_get(rkt_conf_,
733                                     name.c_str(), tmpValue, &size);
734 
735     if (res == RD_KAFKA_CONF_OK)
736       value.assign(tmpValue);
737     delete[] tmpValue;
738 
739     return static_cast<Conf::ConfResult>(res);
740   }
741 
get(DeliveryReportCb * & dr_cb)742   Conf::ConfResult get(DeliveryReportCb *&dr_cb) const {
743       if (!rk_conf_)
744 	  return Conf::CONF_INVALID;
745       dr_cb = this->dr_cb_;
746       return Conf::CONF_OK;
747   }
748 
get(OAuthBearerTokenRefreshCb * & oauthbearer_token_refresh_cb)749   Conf::ConfResult get(
750     OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const {
751       if (!rk_conf_)
752           return Conf::CONF_INVALID;
753       oauthbearer_token_refresh_cb = this->oauthbearer_token_refresh_cb_;
754       return Conf::CONF_OK;
755   }
756 
get(EventCb * & event_cb)757   Conf::ConfResult get(EventCb *&event_cb) const {
758       if (!rk_conf_)
759 	  return Conf::CONF_INVALID;
760       event_cb = this->event_cb_;
761       return Conf::CONF_OK;
762   }
763 
get(PartitionerCb * & partitioner_cb)764   Conf::ConfResult get(PartitionerCb *&partitioner_cb) const {
765       if (!rkt_conf_)
766 	  return Conf::CONF_INVALID;
767       partitioner_cb = this->partitioner_cb_;
768       return Conf::CONF_OK;
769   }
770 
get(PartitionerKeyPointerCb * & partitioner_kp_cb)771   Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const {
772       if (!rkt_conf_)
773 	  return Conf::CONF_INVALID;
774       partitioner_kp_cb = this->partitioner_kp_cb_;
775       return Conf::CONF_OK;
776   }
777 
get(SocketCb * & socket_cb)778   Conf::ConfResult get(SocketCb *&socket_cb) const {
779       if (!rk_conf_)
780 	  return Conf::CONF_INVALID;
781       socket_cb = this->socket_cb_;
782       return Conf::CONF_OK;
783   }
784 
get(OpenCb * & open_cb)785   Conf::ConfResult get(OpenCb *&open_cb) const {
786       if (!rk_conf_)
787 	  return Conf::CONF_INVALID;
788       open_cb = this->open_cb_;
789       return Conf::CONF_OK;
790   }
791 
get(RebalanceCb * & rebalance_cb)792   Conf::ConfResult get(RebalanceCb *&rebalance_cb) const {
793       if (!rk_conf_)
794 	  return Conf::CONF_INVALID;
795       rebalance_cb = this->rebalance_cb_;
796       return Conf::CONF_OK;
797   }
798 
get(OffsetCommitCb * & offset_commit_cb)799   Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const {
800       if (!rk_conf_)
801 	  return Conf::CONF_INVALID;
802       offset_commit_cb = this->offset_commit_cb_;
803       return Conf::CONF_OK;
804     }
805 
get(SslCertificateVerifyCb * & ssl_cert_verify_cb)806   Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const {
807       if (!rk_conf_)
808               return Conf::CONF_INVALID;
809       ssl_cert_verify_cb = this->ssl_cert_verify_cb_;
810       return Conf::CONF_OK;
811   }
812 
813   std::list<std::string> *dump ();
814 
815 
set(const std::string & name,ConsumeCb * consume_cb,std::string & errstr)816   Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
817                         std::string &errstr) {
818     if (name != "consume_cb") {
819       errstr = "Invalid value type, expected RdKafka::ConsumeCb";
820       return Conf::CONF_INVALID;
821     }
822 
823     if (!rk_conf_) {
824       errstr = "Requires RdKafka::Conf::CONF_GLOBAL object";
825       return Conf::CONF_INVALID;
826     }
827 
828     consume_cb_ = consume_cb;
829     return Conf::CONF_OK;
830   }
831 
c_ptr_global()832   struct rd_kafka_conf_s *c_ptr_global () {
833     if (conf_type_ == CONF_GLOBAL)
834       return rk_conf_;
835     else
836       return NULL;
837   }
838 
c_ptr_topic()839   struct rd_kafka_topic_conf_s *c_ptr_topic () {
840     if (conf_type_ == CONF_TOPIC)
841       return rkt_conf_;
842     else
843       return NULL;
844   }
845 
846   ConsumeCb *consume_cb_;
847   DeliveryReportCb *dr_cb_;
848   EventCb *event_cb_;
849   SocketCb *socket_cb_;
850   OpenCb *open_cb_;
851   PartitionerCb *partitioner_cb_;
852   PartitionerKeyPointerCb *partitioner_kp_cb_;
853   RebalanceCb *rebalance_cb_;
854   OffsetCommitCb *offset_commit_cb_;
855   OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_;
856   SslCertificateVerifyCb *ssl_cert_verify_cb_;
857   ConfType conf_type_;
858   rd_kafka_conf_t *rk_conf_;
859   rd_kafka_topic_conf_t *rkt_conf_;
860 };
861 
862 
863 class HandleImpl : virtual public Handle {
864  public:
~HandleImpl()865   ~HandleImpl() {};
HandleImpl()866   HandleImpl () {};
name()867   const std::string name () const { return std::string(rd_kafka_name(rk_)); };
memberid()868   const std::string memberid () const {
869 	  char *str = rd_kafka_memberid(rk_);
870 	  std::string memberid = str ? str : "";
871 	  if (str)
872 		  rd_kafka_mem_free(rk_, str);
873 	  return memberid;
874   }
poll(int timeout_ms)875   int poll (int timeout_ms) { return rd_kafka_poll(rk_, timeout_ms); };
outq_len()876   int outq_len () { return rd_kafka_outq_len(rk_); };
877 
878   void set_common_config (RdKafka::ConfImpl *confimpl);
879 
880   RdKafka::ErrorCode metadata (bool all_topics,const Topic *only_rkt,
881             Metadata **metadatap, int timeout_ms);
882 
883   ErrorCode pause (std::vector<TopicPartition*> &partitions);
884   ErrorCode resume (std::vector<TopicPartition*> &partitions);
885 
query_watermark_offsets(const std::string & topic,int32_t partition,int64_t * low,int64_t * high,int timeout_ms)886   ErrorCode query_watermark_offsets (const std::string &topic,
887 				     int32_t partition,
888 				     int64_t *low, int64_t *high,
889 				     int timeout_ms) {
890     return static_cast<RdKafka::ErrorCode>(
891         rd_kafka_query_watermark_offsets(
892             rk_, topic.c_str(), partition,
893             low, high, timeout_ms));
894   }
895 
get_watermark_offsets(const std::string & topic,int32_t partition,int64_t * low,int64_t * high)896   ErrorCode get_watermark_offsets (const std::string &topic,
897                                    int32_t partition,
898                                    int64_t *low, int64_t *high) {
899     return static_cast<RdKafka::ErrorCode>(
900         rd_kafka_get_watermark_offsets(
901             rk_, topic.c_str(), partition,
902             low, high));
903   }
904 
905   Queue *get_partition_queue (const TopicPartition *partition);
906 
offsetsForTimes(std::vector<TopicPartition * > & offsets,int timeout_ms)907   ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
908                              int timeout_ms) {
909     rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
910     ErrorCode err = static_cast<ErrorCode>(
911         rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms));
912     update_partitions_from_c_parts(offsets, c_offsets);
913     rd_kafka_topic_partition_list_destroy(c_offsets);
914     return err;
915   }
916 
917   ErrorCode set_log_queue (Queue *queue);
918 
yield()919   void yield () {
920     rd_kafka_yield(rk_);
921   }
922 
clusterid(int timeout_ms)923   const std::string clusterid (int timeout_ms) {
924           char *str = rd_kafka_clusterid(rk_, timeout_ms);
925           std::string clusterid = str ? str : "";
926           if (str)
927                   rd_kafka_mem_free(rk_, str);
928           return clusterid;
929   }
930 
c_ptr()931   struct rd_kafka_s *c_ptr () {
932           return rk_;
933   }
934 
controllerid(int timeout_ms)935   int32_t controllerid (int timeout_ms) {
936           return rd_kafka_controllerid(rk_, timeout_ms);
937   }
938 
fatal_error(std::string & errstr)939   ErrorCode fatal_error (std::string &errstr) {
940           char errbuf[512];
941           RdKafka::ErrorCode err =
942                   static_cast<RdKafka::ErrorCode>(
943                           rd_kafka_fatal_error(rk_, errbuf, sizeof(errbuf)));
944           if (err)
945                   errstr = errbuf;
946           return err;
947   }
948 
oauthbearer_set_token(const std::string & token_value,int64_t md_lifetime_ms,const std::string & md_principal_name,const std::list<std::string> & extensions,std::string & errstr)949   ErrorCode oauthbearer_set_token (const std::string &token_value,
950                                    int64_t md_lifetime_ms,
951                                    const std::string &md_principal_name,
952                                    const std::list<std::string> &extensions,
953                                    std::string &errstr) {
954           char errbuf[512];
955           ErrorCode err;
956           const char **extensions_copy = new const char *[extensions.size()];
957           int elem = 0;
958 
959           for (std::list<std::string>::const_iterator it = extensions.begin();
960               it != extensions.end(); it++)
961                   extensions_copy[elem++] = it->c_str();
962           err = static_cast<ErrorCode>(rd_kafka_oauthbearer_set_token(
963                                                rk_, token_value.c_str(),
964                                                md_lifetime_ms,
965                                                md_principal_name.c_str(),
966                                                extensions_copy,
967                                                extensions.size(),
968                                                errbuf, sizeof(errbuf)));
969           free(extensions_copy);
970 
971           if (err != ERR_NO_ERROR)
972               errstr = errbuf;
973 
974           return err;
975   }
976 
oauthbearer_set_token_failure(const std::string & errstr)977   ErrorCode oauthbearer_set_token_failure(const std::string &errstr) {
978           return static_cast<ErrorCode>(rd_kafka_oauthbearer_set_token_failure(
979                                                 rk_, errstr.c_str()));
980   };
981 
982 
983   rd_kafka_t *rk_;
984   /* All Producer and Consumer callbacks must reside in HandleImpl and
985    * the opaque provided to rdkafka must be a pointer to HandleImpl, since
986    * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to
987    * HandleImpl due to the skewed diamond inheritance. */
988   ConsumeCb *consume_cb_;
989   EventCb *event_cb_;
990   SocketCb *socket_cb_;
991   OpenCb *open_cb_;
992   DeliveryReportCb *dr_cb_;
993   PartitionerCb *partitioner_cb_;
994   PartitionerKeyPointerCb *partitioner_kp_cb_;
995   RebalanceCb *rebalance_cb_;
996   OffsetCommitCb *offset_commit_cb_;
997   OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_;
998   SslCertificateVerifyCb *ssl_cert_verify_cb_;
999 };
1000 
1001 
1002 class TopicImpl : public Topic {
1003  public:
~TopicImpl()1004   ~TopicImpl () {
1005     rd_kafka_topic_destroy(rkt_);
1006   }
1007 
name()1008   const std::string name () const {
1009     return rd_kafka_topic_name(rkt_);
1010   }
1011 
partition_available(int32_t partition)1012   bool partition_available (int32_t partition) const {
1013     return !!rd_kafka_topic_partition_available(rkt_, partition);
1014   }
1015 
offset_store(int32_t partition,int64_t offset)1016   ErrorCode offset_store (int32_t partition, int64_t offset) {
1017     return static_cast<RdKafka::ErrorCode>(
1018         rd_kafka_offset_store(rkt_, partition, offset));
1019   }
1020 
1021   static Topic *create (Handle &base, const std::string &topic,
1022                         Conf *conf);
1023 
c_ptr()1024   struct rd_kafka_topic_s *c_ptr () {
1025           return rkt_;
1026   }
1027 
1028   rd_kafka_topic_t *rkt_;
1029   PartitionerCb *partitioner_cb_;
1030   PartitionerKeyPointerCb *partitioner_kp_cb_;
1031 };
1032 
1033 
1034 /**
1035  * Topic and Partition
1036  */
1037 class TopicPartitionImpl : public TopicPartition {
1038 public:
~TopicPartitionImpl()1039   ~TopicPartitionImpl() {};
1040 
1041   static TopicPartition *create (const std::string &topic, int partition);
1042 
TopicPartitionImpl(const std::string & topic,int partition)1043   TopicPartitionImpl (const std::string &topic, int partition):
1044   topic_(topic), partition_(partition), offset_(RdKafka::Topic::OFFSET_INVALID),
1045       err_(ERR_NO_ERROR) {}
1046 
TopicPartitionImpl(const std::string & topic,int partition,int64_t offset)1047   TopicPartitionImpl (const std::string &topic, int partition, int64_t offset):
1048   topic_(topic), partition_(partition), offset_(offset),
1049           err_(ERR_NO_ERROR) {}
1050 
TopicPartitionImpl(const rd_kafka_topic_partition_t * c_part)1051   TopicPartitionImpl (const rd_kafka_topic_partition_t *c_part) {
1052     topic_ = std::string(c_part->topic);
1053     partition_ = c_part->partition;
1054     offset_ = c_part->offset;
1055     err_ = static_cast<ErrorCode>(c_part->err);
1056     // FIXME: metadata
1057   }
1058 
1059   static void destroy (std::vector<TopicPartition*> &partitions);
1060 
partition()1061   int partition () const { return partition_; }
topic()1062   const std::string &topic () const { return topic_ ; }
1063 
offset()1064   int64_t offset () const { return offset_; }
1065 
err()1066   ErrorCode err () const { return err_; }
1067 
set_offset(int64_t offset)1068   void set_offset (int64_t offset) { offset_ = offset; }
1069 
1070   std::ostream& operator<<(std::ostream &ostrm) const {
1071     return ostrm << topic_ << " [" << partition_ << "]";
1072   }
1073 
1074   std::string topic_;
1075   int partition_;
1076   int64_t offset_;
1077   ErrorCode err_;
1078 };
1079 
1080 
1081 /**
1082  * @class ConsumerGroupMetadata wraps the
1083  *        C rd_kafka_consumer_group_metadata_t object.
1084  */
1085 class ConsumerGroupMetadataImpl : public ConsumerGroupMetadata {
1086  public:
~ConsumerGroupMetadataImpl()1087   ~ConsumerGroupMetadataImpl() {
1088     rd_kafka_consumer_group_metadata_destroy(cgmetadata_);
1089   }
1090 
ConsumerGroupMetadataImpl(rd_kafka_consumer_group_metadata_t * cgmetadata)1091   ConsumerGroupMetadataImpl(rd_kafka_consumer_group_metadata_t *cgmetadata):
1092       cgmetadata_(cgmetadata) {}
1093 
1094   rd_kafka_consumer_group_metadata_t *cgmetadata_;
1095 };
1096 
1097 
1098 class KafkaConsumerImpl : virtual public KafkaConsumer, virtual public HandleImpl {
1099 public:
~KafkaConsumerImpl()1100   ~KafkaConsumerImpl () {
1101 
1102   }
1103 
1104   static KafkaConsumer *create (Conf *conf, std::string &errstr);
1105 
1106   ErrorCode assignment (std::vector<TopicPartition*> &partitions);
1107   ErrorCode subscription (std::vector<std::string> &topics);
1108   ErrorCode subscribe (const std::vector<std::string> &topics);
1109   ErrorCode unsubscribe ();
1110   ErrorCode assign (const std::vector<TopicPartition*> &partitions);
1111   ErrorCode unassign ();
1112 
1113   Message *consume (int timeout_ms);
commitSync()1114   ErrorCode commitSync () {
1115     return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0/*sync*/));
1116   }
commitAsync()1117   ErrorCode commitAsync () {
1118     return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1/*async*/));
1119   }
commitSync(Message * message)1120   ErrorCode commitSync (Message *message) {
1121 	  MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
1122 	  return static_cast<ErrorCode>(
1123                   rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0/*sync*/));
1124   }
commitAsync(Message * message)1125   ErrorCode commitAsync (Message *message) {
1126 	  MessageImpl *msgimpl = dynamic_cast<MessageImpl*>(message);
1127 	  return static_cast<ErrorCode>(
1128                   rd_kafka_commit_message(rk_, msgimpl->rkmessage_,1/*async*/));
1129   }
1130 
commitSync(std::vector<TopicPartition * > & offsets)1131   ErrorCode commitSync (std::vector<TopicPartition*> &offsets) {
1132 	  rd_kafka_topic_partition_list_t *c_parts =
1133 		  partitions_to_c_parts(offsets);
1134 	  rd_kafka_resp_err_t err =
1135 		  rd_kafka_commit(rk_, c_parts, 0);
1136 	  if (!err)
1137 		  update_partitions_from_c_parts(offsets, c_parts);
1138 	  rd_kafka_topic_partition_list_destroy(c_parts);
1139 	  return static_cast<ErrorCode>(err);
1140   }
1141 
commitAsync(const std::vector<TopicPartition * > & offsets)1142   ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) {
1143 	  rd_kafka_topic_partition_list_t *c_parts =
1144 		  partitions_to_c_parts(offsets);
1145 	  rd_kafka_resp_err_t err =
1146 		  rd_kafka_commit(rk_, c_parts, 1);
1147 	  rd_kafka_topic_partition_list_destroy(c_parts);
1148 	  return static_cast<ErrorCode>(err);
1149   }
1150 
commitSync(OffsetCommitCb * offset_commit_cb)1151   ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) {
1152           return static_cast<ErrorCode>(
1153                   rd_kafka_commit_queue(rk_, NULL, NULL,
1154                                         RdKafka::offset_commit_cb_trampoline0,
1155                                         offset_commit_cb));
1156   }
1157 
commitSync(std::vector<TopicPartition * > & offsets,OffsetCommitCb * offset_commit_cb)1158   ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
1159                         OffsetCommitCb *offset_commit_cb) {
1160           rd_kafka_topic_partition_list_t *c_parts =
1161                   partitions_to_c_parts(offsets);
1162           rd_kafka_resp_err_t err =
1163                   rd_kafka_commit_queue(rk_, c_parts, NULL,
1164                                         RdKafka::offset_commit_cb_trampoline0,
1165                                         offset_commit_cb);
1166           rd_kafka_topic_partition_list_destroy(c_parts);
1167           return static_cast<ErrorCode>(err);
1168   }
1169 
1170   ErrorCode committed (std::vector<TopicPartition*> &partitions, int timeout_ms);
1171   ErrorCode position (std::vector<TopicPartition*> &partitions);
1172 
groupMetadata()1173   ConsumerGroupMetadata *groupMetadata () {
1174     rd_kafka_consumer_group_metadata_t *cgmetadata;
1175 
1176     cgmetadata = rd_kafka_consumer_group_metadata(rk_);
1177     if (!cgmetadata)
1178       return NULL;
1179 
1180     return new ConsumerGroupMetadataImpl(cgmetadata);
1181   }
1182 
1183   ErrorCode close ();
1184 
1185   ErrorCode seek (const TopicPartition &partition, int timeout_ms);
1186 
offsets_store(std::vector<TopicPartition * > & offsets)1187   ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) {
1188           rd_kafka_topic_partition_list_t *c_parts =
1189                   partitions_to_c_parts(offsets);
1190           rd_kafka_resp_err_t err =
1191                   rd_kafka_offsets_store(rk_, c_parts);
1192           update_partitions_from_c_parts(offsets, c_parts);
1193           rd_kafka_topic_partition_list_destroy(c_parts);
1194           return static_cast<ErrorCode>(err);
1195   }
1196 
1197 };
1198 
1199 
1200 class MetadataImpl : public Metadata {
1201  public:
1202   MetadataImpl(const rd_kafka_metadata_t *metadata);
1203   ~MetadataImpl();
1204 
brokers()1205   const std::vector<const BrokerMetadata *> *brokers() const {
1206     return &brokers_;
1207   }
1208 
topics()1209   const std::vector<const TopicMetadata *>  *topics() const {
1210     return &topics_;
1211   }
1212 
orig_broker_name()1213   const std::string orig_broker_name() const {
1214     return std::string(metadata_->orig_broker_name);
1215   }
1216 
orig_broker_id()1217   int32_t orig_broker_id() const {
1218     return metadata_->orig_broker_id;
1219   }
1220 
1221 private:
1222   const rd_kafka_metadata_t *metadata_;
1223   std::vector<const BrokerMetadata *> brokers_;
1224   std::vector<const TopicMetadata *> topics_;
1225   std::string orig_broker_name_;
1226 };
1227 
1228 
1229 class QueueImpl : virtual public Queue {
1230  public:
~QueueImpl()1231   ~QueueImpl () {
1232     rd_kafka_queue_destroy(queue_);
1233   }
1234   static Queue *create (Handle *base);
1235   ErrorCode forward (Queue *queue);
1236   Message *consume (int timeout_ms);
1237   int poll (int timeout_ms);
1238   void io_event_enable(int fd, const void *payload, size_t size);
1239 
1240   rd_kafka_queue_t *queue_;
1241 };
1242 
1243 
1244 
1245 
1246 
1247 class ConsumerImpl : virtual public Consumer, virtual public HandleImpl {
1248  public:
~ConsumerImpl()1249   ~ConsumerImpl () {
1250     if (rk_)
1251       rd_kafka_destroy(rk_);
1252   };
1253   static Consumer *create (Conf *conf, std::string &errstr);
1254 
1255   ErrorCode start (Topic *topic, int32_t partition, int64_t offset);
1256   ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
1257                    Queue *queue);
1258   ErrorCode stop (Topic *topic, int32_t partition);
1259   ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
1260 		  int timeout_ms);
1261   Message *consume (Topic *topic, int32_t partition, int timeout_ms);
1262   Message *consume (Queue *queue, int timeout_ms);
1263   int consume_callback (Topic *topic, int32_t partition, int timeout_ms,
1264                         ConsumeCb *cb, void *opaque);
1265   int consume_callback (Queue *queue, int timeout_ms,
1266                         RdKafka::ConsumeCb *consume_cb, void *opaque);
1267 };
1268 
1269 
1270 
1271 class ProducerImpl : virtual public Producer, virtual public HandleImpl {
1272 
1273  public:
~ProducerImpl()1274   ~ProducerImpl () { if (rk_) rd_kafka_destroy(rk_); };
1275 
1276   ErrorCode produce (Topic *topic, int32_t partition,
1277                      int msgflags,
1278                      void *payload, size_t len,
1279                      const std::string *key,
1280                      void *msg_opaque);
1281 
1282   ErrorCode produce (Topic *topic, int32_t partition,
1283                      int msgflags,
1284                      void *payload, size_t len,
1285                      const void *key, size_t key_len,
1286                      void *msg_opaque);
1287 
1288   ErrorCode produce (Topic *topic, int32_t partition,
1289                      const std::vector<char> *payload,
1290                      const std::vector<char> *key,
1291                      void *msg_opaque);
1292 
1293   ErrorCode produce (const std::string topic_name, int32_t partition,
1294                      int msgflags,
1295                      void *payload, size_t len,
1296                      const void *key, size_t key_len,
1297                      int64_t timestamp, void *msg_opaque);
1298 
1299   ErrorCode produce (const std::string topic_name, int32_t partition,
1300                      int msgflags,
1301                      void *payload, size_t len,
1302                      const void *key, size_t key_len,
1303                      int64_t timestamp,
1304                      RdKafka::Headers *headers,
1305                      void *msg_opaque);
1306 
flush(int timeout_ms)1307   ErrorCode flush (int timeout_ms) {
1308 	  return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_,
1309 								timeout_ms));
1310   }
1311 
purge(int purge_flags)1312   ErrorCode purge (int purge_flags) {
1313 	  return static_cast<RdKafka::ErrorCode>(rd_kafka_purge(rk_,
1314                                                                 (int)purge_flags));
1315   }
1316 
init_transactions(int timeout_ms)1317   Error *init_transactions (int timeout_ms) {
1318     rd_kafka_error_t *c_error;
1319 
1320     c_error = rd_kafka_init_transactions(rk_, timeout_ms);
1321 
1322     if (c_error)
1323       return new ErrorImpl(c_error);
1324     else
1325       return NULL;
1326   }
1327 
begin_transaction()1328   Error *begin_transaction () {
1329     rd_kafka_error_t *c_error;
1330 
1331     c_error = rd_kafka_begin_transaction(rk_);
1332 
1333     if (c_error)
1334       return new ErrorImpl(c_error);
1335     else
1336       return NULL;
1337   }
1338 
send_offsets_to_transaction(const std::vector<TopicPartition * > & offsets,const ConsumerGroupMetadata * group_metadata,int timeout_ms)1339   Error *send_offsets_to_transaction (
1340       const std::vector<TopicPartition*> &offsets,
1341       const ConsumerGroupMetadata *group_metadata,
1342       int timeout_ms) {
1343     rd_kafka_error_t *c_error;
1344     const RdKafka::ConsumerGroupMetadataImpl *cgmdimpl =
1345         dynamic_cast<const RdKafka::ConsumerGroupMetadataImpl *>(group_metadata);
1346     rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets);
1347 
1348     c_error = rd_kafka_send_offsets_to_transaction(rk_, c_offsets,
1349                                                    cgmdimpl->cgmetadata_,
1350                                                    timeout_ms);
1351 
1352     rd_kafka_topic_partition_list_destroy(c_offsets);
1353 
1354     if (c_error)
1355       return new ErrorImpl(c_error);
1356     else
1357       return NULL;
1358   }
1359 
commit_transaction(int timeout_ms)1360   Error *commit_transaction (int timeout_ms) {
1361     rd_kafka_error_t *c_error;
1362 
1363     c_error = rd_kafka_commit_transaction(rk_, timeout_ms);
1364 
1365     if (c_error)
1366       return new ErrorImpl(c_error);
1367     else
1368       return NULL;
1369   }
1370 
abort_transaction(int timeout_ms)1371   Error *abort_transaction (int timeout_ms) {
1372     rd_kafka_error_t *c_error;
1373 
1374     c_error = rd_kafka_abort_transaction(rk_, timeout_ms);
1375 
1376     if (c_error)
1377       return new ErrorImpl(c_error);
1378     else
1379       return NULL;
1380   }
1381 
1382   static Producer *create (Conf *conf, std::string &errstr);
1383 
1384 };
1385 
1386 
1387 
1388 }
1389 
1390 #endif /* _RDKAFKACPP_INT_H_ */
1391