1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "rdkafkacpp_int.h"
30 
31 using namespace RdKafka;
32 
~BrokerMetadata()33 BrokerMetadata::~BrokerMetadata() {};
~PartitionMetadata()34 PartitionMetadata::~PartitionMetadata() {};
~TopicMetadata()35 TopicMetadata::~TopicMetadata() {};
~Metadata()36 Metadata::~Metadata() {};
37 
38 
39 /**
40  * Metadata: Broker information handler implementation
41  */
42 class BrokerMetadataImpl : public BrokerMetadata {
43  public:
BrokerMetadataImpl(const rd_kafka_metadata_broker_t * broker_metadata)44   BrokerMetadataImpl(const rd_kafka_metadata_broker_t *broker_metadata)
45   :broker_metadata_(broker_metadata),host_(broker_metadata->host) {}
46 
id() const47   int32_t      id() const{return broker_metadata_->id;}
48 
host() const49   const std::string host() const {return host_;}
port() const50   int port() const {return broker_metadata_->port;}
51 
~BrokerMetadataImpl()52   virtual ~BrokerMetadataImpl() {}
53 
54  private:
55   const rd_kafka_metadata_broker_t *broker_metadata_;
56   const std::string host_;
57 };
58 
59 /**
60  * Metadata: Partition information handler
61  */
62 class PartitionMetadataImpl : public PartitionMetadata {
63  public:
64   // @TODO too much memory copy? maybe we should create a new vector class that read directly from C arrays?
65   // @TODO use auto_ptr?
PartitionMetadataImpl(const rd_kafka_metadata_partition_t * partition_metadata)66   PartitionMetadataImpl(const rd_kafka_metadata_partition_t *partition_metadata)
67   :partition_metadata_(partition_metadata) {
68     replicas_.reserve(partition_metadata->replica_cnt);
69     for(int i=0;i<partition_metadata->replica_cnt;++i)
70       replicas_.push_back(partition_metadata->replicas[i]);
71 
72     isrs_.reserve(partition_metadata->isr_cnt);
73     for(int i=0;i<partition_metadata->isr_cnt;++i)
74       isrs_.push_back(partition_metadata->isrs[i]);
75   }
76 
id() const77   int32_t                    id() const {
78     return partition_metadata_->id;
79   }
leader() const80   int32_t                    leader() const {
81     return partition_metadata_->leader;
82   }
err() const83   ErrorCode                  err() const {
84     return static_cast<ErrorCode>(partition_metadata_->err);
85   }
86 
replicas() const87   const std::vector<int32_t> *replicas() const {return &replicas_;}
isrs() const88   const std::vector<int32_t> *isrs() const {return &isrs_;}
89 
~PartitionMetadataImpl()90   ~PartitionMetadataImpl() {};
91 
92  private:
93   const rd_kafka_metadata_partition_t *partition_metadata_;
94   std::vector<int32_t> replicas_,isrs_;
95 };
96 
97 /**
98  * Metadata: Topic information handler
99  */
100 class TopicMetadataImpl : public TopicMetadata{
101  public:
TopicMetadataImpl(const rd_kafka_metadata_topic_t * topic_metadata)102   TopicMetadataImpl(const rd_kafka_metadata_topic_t *topic_metadata)
103   :topic_metadata_(topic_metadata),topic_(topic_metadata->topic) {
104     partitions_.reserve(topic_metadata->partition_cnt);
105     for(int i=0;i<topic_metadata->partition_cnt;++i)
106       partitions_.push_back(
107         new PartitionMetadataImpl(&topic_metadata->partitions[i])
108       );
109   }
110 
~TopicMetadataImpl()111   ~TopicMetadataImpl(){
112     for(size_t i=0;i<partitions_.size();++i)
113       delete partitions_[i];
114   }
115 
topic() const116   const std::string topic() const {return topic_;}
partitions() const117   const std::vector<const PartitionMetadata *> *partitions() const {
118     return &partitions_;
119   }
err() const120   ErrorCode err() const {return static_cast<ErrorCode>(topic_metadata_->err);}
121 
122  private:
123   const rd_kafka_metadata_topic_t *topic_metadata_;
124   const std::string topic_;
125   std::vector<const PartitionMetadata *> partitions_;
126 
127 };
128 
MetadataImpl(const rd_kafka_metadata_t * metadata)129 MetadataImpl::MetadataImpl(const rd_kafka_metadata_t *metadata)
130 :metadata_(metadata)
131 {
132   brokers_.reserve(metadata->broker_cnt);
133   for(int i=0;i<metadata->broker_cnt;++i)
134     brokers_.push_back(new BrokerMetadataImpl(&metadata->brokers[i]));
135 
136   topics_.reserve(metadata->topic_cnt);
137   for(int i=0;i<metadata->topic_cnt;++i)
138     topics_.push_back(new TopicMetadataImpl(&metadata->topics[i]));
139 
140 }
141 
~MetadataImpl()142 MetadataImpl::~MetadataImpl() {
143   for(size_t i=0;i<brokers_.size();++i)
144     delete brokers_[i];
145   for(size_t i=0;i<topics_.size();++i)
146     delete topics_[i];
147 
148 
149   if(metadata_)
150     rd_kafka_metadata_destroy(metadata_);
151 }
152