1 /*
2 * librdkafka - Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2014 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "rdkafkacpp_int.h"
30
31 using namespace RdKafka;
32
~BrokerMetadata()33 BrokerMetadata::~BrokerMetadata() {};
~PartitionMetadata()34 PartitionMetadata::~PartitionMetadata() {};
~TopicMetadata()35 TopicMetadata::~TopicMetadata() {};
~Metadata()36 Metadata::~Metadata() {};
37
38
39 /**
40 * Metadata: Broker information handler implementation
41 */
42 class BrokerMetadataImpl : public BrokerMetadata {
43 public:
BrokerMetadataImpl(const rd_kafka_metadata_broker_t * broker_metadata)44 BrokerMetadataImpl(const rd_kafka_metadata_broker_t *broker_metadata)
45 :broker_metadata_(broker_metadata),host_(broker_metadata->host) {}
46
id() const47 int32_t id() const{return broker_metadata_->id;}
48
host() const49 const std::string host() const {return host_;}
port() const50 int port() const {return broker_metadata_->port;}
51
~BrokerMetadataImpl()52 virtual ~BrokerMetadataImpl() {}
53
54 private:
55 const rd_kafka_metadata_broker_t *broker_metadata_;
56 const std::string host_;
57 };
58
59 /**
60 * Metadata: Partition information handler
61 */
62 class PartitionMetadataImpl : public PartitionMetadata {
63 public:
64 // @TODO too much memory copy? maybe we should create a new vector class that read directly from C arrays?
65 // @TODO use auto_ptr?
PartitionMetadataImpl(const rd_kafka_metadata_partition_t * partition_metadata)66 PartitionMetadataImpl(const rd_kafka_metadata_partition_t *partition_metadata)
67 :partition_metadata_(partition_metadata) {
68 replicas_.reserve(partition_metadata->replica_cnt);
69 for(int i=0;i<partition_metadata->replica_cnt;++i)
70 replicas_.push_back(partition_metadata->replicas[i]);
71
72 isrs_.reserve(partition_metadata->isr_cnt);
73 for(int i=0;i<partition_metadata->isr_cnt;++i)
74 isrs_.push_back(partition_metadata->isrs[i]);
75 }
76
id() const77 int32_t id() const {
78 return partition_metadata_->id;
79 }
leader() const80 int32_t leader() const {
81 return partition_metadata_->leader;
82 }
err() const83 ErrorCode err() const {
84 return static_cast<ErrorCode>(partition_metadata_->err);
85 }
86
replicas() const87 const std::vector<int32_t> *replicas() const {return &replicas_;}
isrs() const88 const std::vector<int32_t> *isrs() const {return &isrs_;}
89
~PartitionMetadataImpl()90 ~PartitionMetadataImpl() {};
91
92 private:
93 const rd_kafka_metadata_partition_t *partition_metadata_;
94 std::vector<int32_t> replicas_,isrs_;
95 };
96
97 /**
98 * Metadata: Topic information handler
99 */
100 class TopicMetadataImpl : public TopicMetadata{
101 public:
TopicMetadataImpl(const rd_kafka_metadata_topic_t * topic_metadata)102 TopicMetadataImpl(const rd_kafka_metadata_topic_t *topic_metadata)
103 :topic_metadata_(topic_metadata),topic_(topic_metadata->topic) {
104 partitions_.reserve(topic_metadata->partition_cnt);
105 for(int i=0;i<topic_metadata->partition_cnt;++i)
106 partitions_.push_back(
107 new PartitionMetadataImpl(&topic_metadata->partitions[i])
108 );
109 }
110
~TopicMetadataImpl()111 ~TopicMetadataImpl(){
112 for(size_t i=0;i<partitions_.size();++i)
113 delete partitions_[i];
114 }
115
topic() const116 const std::string topic() const {return topic_;}
partitions() const117 const std::vector<const PartitionMetadata *> *partitions() const {
118 return &partitions_;
119 }
err() const120 ErrorCode err() const {return static_cast<ErrorCode>(topic_metadata_->err);}
121
122 private:
123 const rd_kafka_metadata_topic_t *topic_metadata_;
124 const std::string topic_;
125 std::vector<const PartitionMetadata *> partitions_;
126
127 };
128
MetadataImpl(const rd_kafka_metadata_t * metadata)129 MetadataImpl::MetadataImpl(const rd_kafka_metadata_t *metadata)
130 :metadata_(metadata)
131 {
132 brokers_.reserve(metadata->broker_cnt);
133 for(int i=0;i<metadata->broker_cnt;++i)
134 brokers_.push_back(new BrokerMetadataImpl(&metadata->brokers[i]));
135
136 topics_.reserve(metadata->topic_cnt);
137 for(int i=0;i<metadata->topic_cnt;++i)
138 topics_.push_back(new TopicMetadataImpl(&metadata->topics[i]));
139
140 }
141
~MetadataImpl()142 MetadataImpl::~MetadataImpl() {
143 for(size_t i=0;i<brokers_.size();++i)
144 delete brokers_[i];
145 for(size_t i=0;i<topics_.size();++i)
146 delete topics_[i];
147
148
149 if(metadata_)
150 rd_kafka_metadata_destroy(metadata_);
151 }
152