1 use std::io::{Read, Write}; 2 3 use error::Result; 4 use codecs::{AsStrings, ToByte, FromByte}; 5 6 use super::{HeaderRequest, HeaderResponse}; 7 use super::{API_KEY_METADATA, API_VERSION}; 8 9 #[derive(Debug)] 10 pub struct MetadataRequest<'a, T: 'a> { 11 pub header: HeaderRequest<'a>, 12 pub topics: &'a [T], 13 } 14 15 impl<'a, T: AsRef<str>> MetadataRequest<'a, T> { new(correlation_id: i32, client_id: &'a str, topics: &'a [T]) -> MetadataRequest<'a, T>16 pub fn new(correlation_id: i32, client_id: &'a str, topics: &'a [T]) -> MetadataRequest<'a, T> { 17 MetadataRequest { 18 header: HeaderRequest::new(API_KEY_METADATA, API_VERSION, correlation_id, client_id), 19 topics: topics, 20 } 21 } 22 } 23 24 impl<'a, T: AsRef<str> + 'a> ToByte for MetadataRequest<'a, T> { encode<W: Write>(&self, buffer: &mut W) -> Result<()>25 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> { 26 try_multi!(self.header.encode(buffer), AsStrings(self.topics).encode(buffer)) 27 } 28 } 29 30 // -------------------------------------------------------------------- 31 32 #[derive(Default, Debug)] 33 pub struct MetadataResponse { 34 pub header: HeaderResponse, 35 pub brokers: Vec<BrokerMetadata>, 36 pub topics: Vec<TopicMetadata>, 37 } 38 39 #[derive(Default, Debug)] 40 pub struct BrokerMetadata { 41 pub node_id: i32, 42 pub host: String, 43 pub port: i32, 44 } 45 46 #[derive(Default, Debug)] 47 pub struct TopicMetadata { 48 pub error: i16, 49 pub topic: String, 50 pub partitions: Vec<PartitionMetadata>, 51 } 52 53 #[derive(Default, Debug)] 54 pub struct PartitionMetadata { 55 pub error: i16, 56 pub id: i32, 57 pub leader: i32, 58 pub replicas: Vec<i32>, 59 pub isr: Vec<i32>, 60 } 61 62 impl FromByte for MetadataResponse { 63 type R = MetadataResponse; 64 65 #[allow(unused_must_use)] decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>66 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> { 67 try_multi!( 68 self.header.decode(buffer), 69 self.brokers.decode(buffer), 70 self.topics.decode(buffer) 71 ) 72 } 73 } 74 75 impl FromByte for BrokerMetadata { 76 type R = BrokerMetadata; 77 78 #[allow(unused_must_use)] decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>79 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> { 80 try_multi!(self.node_id.decode(buffer), self.host.decode(buffer), self.port.decode(buffer)) 81 } 82 } 83 84 impl FromByte for TopicMetadata { 85 type R = TopicMetadata; 86 87 #[allow(unused_must_use)] decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>88 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> { 89 try_multi!( 90 self.error.decode(buffer), 91 self.topic.decode(buffer), 92 self.partitions.decode(buffer) 93 ) 94 } 95 } 96 97 impl FromByte for PartitionMetadata { 98 type R = PartitionMetadata; 99 100 #[allow(unused_must_use)] decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>101 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> { 102 try_multi!( 103 self.error.decode(buffer), 104 self.id.decode(buffer), 105 self.leader.decode(buffer), 106 self.replicas.decode(buffer), 107 self.isr.decode(buffer) 108 ) 109 } 110 } 111