1 use std::io::{Read, Write};
2 
3 use error::Result;
4 use codecs::{AsStrings, ToByte, FromByte};
5 
6 use super::{HeaderRequest, HeaderResponse};
7 use super::{API_KEY_METADATA, API_VERSION};
8 
9 #[derive(Debug)]
10 pub struct MetadataRequest<'a, T: 'a> {
11     pub header: HeaderRequest<'a>,
12     pub topics: &'a [T],
13 }
14 
15 impl<'a, T: AsRef<str>> MetadataRequest<'a, T> {
new(correlation_id: i32, client_id: &'a str, topics: &'a [T]) -> MetadataRequest<'a, T>16     pub fn new(correlation_id: i32, client_id: &'a str, topics: &'a [T]) -> MetadataRequest<'a, T> {
17         MetadataRequest {
18             header: HeaderRequest::new(API_KEY_METADATA, API_VERSION, correlation_id, client_id),
19             topics: topics,
20         }
21     }
22 }
23 
24 impl<'a, T: AsRef<str> + 'a> ToByte for MetadataRequest<'a, T> {
encode<W: Write>(&self, buffer: &mut W) -> Result<()>25     fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
26         try_multi!(self.header.encode(buffer), AsStrings(self.topics).encode(buffer))
27     }
28 }
29 
30 // --------------------------------------------------------------------
31 
32 #[derive(Default, Debug)]
33 pub struct MetadataResponse {
34     pub header: HeaderResponse,
35     pub brokers: Vec<BrokerMetadata>,
36     pub topics: Vec<TopicMetadata>,
37 }
38 
39 #[derive(Default, Debug)]
40 pub struct BrokerMetadata {
41     pub node_id: i32,
42     pub host: String,
43     pub port: i32,
44 }
45 
46 #[derive(Default, Debug)]
47 pub struct TopicMetadata {
48     pub error: i16,
49     pub topic: String,
50     pub partitions: Vec<PartitionMetadata>,
51 }
52 
53 #[derive(Default, Debug)]
54 pub struct PartitionMetadata {
55     pub error: i16,
56     pub id: i32,
57     pub leader: i32,
58     pub replicas: Vec<i32>,
59     pub isr: Vec<i32>,
60 }
61 
62 impl FromByte for MetadataResponse {
63     type R = MetadataResponse;
64 
65     #[allow(unused_must_use)]
decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>66     fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
67         try_multi!(
68             self.header.decode(buffer),
69             self.brokers.decode(buffer),
70             self.topics.decode(buffer)
71         )
72     }
73 }
74 
75 impl FromByte for BrokerMetadata {
76     type R = BrokerMetadata;
77 
78     #[allow(unused_must_use)]
decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>79     fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
80         try_multi!(self.node_id.decode(buffer), self.host.decode(buffer), self.port.decode(buffer))
81     }
82 }
83 
84 impl FromByte for TopicMetadata {
85     type R = TopicMetadata;
86 
87     #[allow(unused_must_use)]
decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>88     fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
89         try_multi!(
90             self.error.decode(buffer),
91             self.topic.decode(buffer),
92             self.partitions.decode(buffer)
93         )
94     }
95 }
96 
97 impl FromByte for PartitionMetadata {
98     type R = PartitionMetadata;
99 
100     #[allow(unused_must_use)]
decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>101     fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
102         try_multi!(
103             self.error.decode(buffer),
104             self.id.decode(buffer),
105             self.leader.decode(buffer),
106             self.replicas.decode(buffer),
107             self.isr.decode(buffer)
108         )
109     }
110 }
111