1 use std::io::{Read, Write};
2 use std::mem;
3 use std::time::Duration;
4 
5 use codecs::{ToByte, FromByte};
6 use crc::crc32;
7 use error::{Error, ErrorKind, KafkaCode, Result};
8 
9 /// Macro to return Result<()> from multiple statements
10 macro_rules! try_multi {
11     ($($expr:expr),*) => ({
12         $(try!($expr);)*;
13         Ok(())
14     })
15 }
16 
17 pub mod produce;
18 pub mod offset;
19 pub mod metadata;
20 pub mod consumer;
21 
22 mod zreader;
23 pub mod fetch;
24 
25 // ~ convenient re-exports for request/response types defined in the
26 // submodules
27 pub use self::fetch::FetchRequest;
28 pub use self::produce::{ProduceRequest, ProduceResponse};
29 pub use self::offset::{OffsetRequest, OffsetResponse};
30 pub use self::metadata::{MetadataRequest, MetadataResponse};
31 pub use self::consumer::{GroupCoordinatorRequest, GroupCoordinatorResponse, OffsetFetchVersion,
32                          OffsetFetchRequest, OffsetFetchResponse, OffsetCommitVersion,
33                          OffsetCommitRequest, OffsetCommitResponse};
34 
35 // --------------------------------------------------------------------
36 
37 const API_KEY_PRODUCE: i16 = 0;
38 const API_KEY_FETCH: i16 = 1;
39 const API_KEY_OFFSET: i16 = 2;
40 const API_KEY_METADATA: i16 = 3;
41 // 4-7 reserved for non-public kafka api services
42 const API_KEY_OFFSET_COMMIT: i16 = 8;
43 const API_KEY_OFFSET_FETCH: i16 = 9;
44 const API_KEY_GROUP_COORDINATOR: i16 = 10;
45 
46 // the default version of Kafka API we are requesting
47 const API_VERSION: i16 = 0;
48 
49 // --------------------------------------------------------------------
50 
51 /// Provides a way to parse the full raw response data into a
52 /// particular response structure.
53 pub trait ResponseParser {
54     type T;
parse(&self, response: Vec<u8>) -> Result<Self::T>55     fn parse(&self, response: Vec<u8>) -> Result<Self::T>;
56 }
57 
58 // --------------------------------------------------------------------
59 
60 impl KafkaCode {
from_protocol(n: i16) -> Option<KafkaCode>61     fn from_protocol(n: i16) -> Option<KafkaCode> {
62         if n == 0 {
63             return None;
64         }
65         if n >= KafkaCode::OffsetOutOfRange as i16 && n <= KafkaCode::UnsupportedVersion as i16 {
66             return Some(unsafe { mem::transmute(n as i8) });
67         }
68         Some(KafkaCode::Unknown)
69     }
70 }
71 
72 #[test]
test_kafka_code_from_protocol()73 fn test_kafka_code_from_protocol() {
74     use std::i16;
75 
76     macro_rules! assert_kafka_code {
77         ($kcode:path, $n:expr) => {
78             assert!(if let Some($kcode) = KafkaCode::from_protocol($n) { true } else { false })
79         }
80     };
81 
82     assert!(if let None = KafkaCode::from_protocol(0) {
83         true
84     } else {
85         false
86     });
87     assert_kafka_code!(KafkaCode::OffsetOutOfRange, KafkaCode::OffsetOutOfRange as i16);
88     assert_kafka_code!(KafkaCode::IllegalGeneration, KafkaCode::IllegalGeneration as i16);
89     assert_kafka_code!(KafkaCode::UnsupportedVersion, KafkaCode::UnsupportedVersion as i16);
90     assert_kafka_code!(KafkaCode::Unknown, KafkaCode::Unknown as i16);
91     // ~ test some un mapped non-zero codes; should all map to "unknown"
92     assert_kafka_code!(KafkaCode::Unknown, i16::MAX);
93     assert_kafka_code!(KafkaCode::Unknown, i16::MIN);
94     assert_kafka_code!(KafkaCode::Unknown, -100);
95     assert_kafka_code!(KafkaCode::Unknown, 100);
96 }
97 
98 // a (sub-) module private method for error
99 impl Error {
from_protocol(n: i16) -> Option<Error>100     fn from_protocol(n: i16) -> Option<Error> {
101         KafkaCode::from_protocol(n).map(|err| ErrorKind::Kafka(err).into())
102     }
103 }
104 
105 // --------------------------------------------------------------------
106 
107 #[derive(Debug)]
108 pub struct HeaderRequest<'a> {
109     pub api_key: i16,
110     pub api_version: i16,
111     pub correlation_id: i32,
112     pub client_id: &'a str,
113 }
114 
115 impl<'a> HeaderRequest<'a> {
new( api_key: i16, api_version: i16, correlation_id: i32, client_id: &'a str, ) -> HeaderRequest116     fn new(
117         api_key: i16,
118         api_version: i16,
119         correlation_id: i32,
120         client_id: &'a str,
121     ) -> HeaderRequest {
122         HeaderRequest {
123             api_key: api_key,
124             api_version: api_version,
125             correlation_id: correlation_id,
126             client_id: client_id,
127         }
128     }
129 }
130 
131 impl<'a> ToByte for HeaderRequest<'a> {
encode<W: Write>(&self, buffer: &mut W) -> Result<()>132     fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
133         try_multi!(
134             self.api_key.encode(buffer),
135             self.api_version.encode(buffer),
136             self.correlation_id.encode(buffer),
137             self.client_id.encode(buffer)
138         )
139     }
140 }
141 
142 // --------------------------------------------------------------------
143 
144 #[derive(Default, Debug, Clone)]
145 pub struct HeaderResponse {
146     pub correlation: i32,
147 }
148 
149 impl FromByte for HeaderResponse {
150     type R = HeaderResponse;
151 
152     #[allow(unused_must_use)]
decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>153     fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
154         self.correlation.decode(buffer)
155     }
156 }
157 
158 // --------------------------------------------------------------------
159 
to_crc(data: &[u8]) -> u32160 pub fn to_crc(data: &[u8]) -> u32 {
161     crc32::checksum_ieee(data)
162 }
163 
164 // --------------------------------------------------------------------
165 
166 /// Safely converts a Duration into the number of milliseconds as a
167 /// i32 as often required in the kafka protocol.
to_millis_i32(d: Duration) -> Result<i32>168 pub fn to_millis_i32(d: Duration) -> Result<i32> {
169     use std::i32;
170     let m = d.as_secs().saturating_mul(1_000).saturating_add(
171         (d.subsec_nanos() / 1_000_000) as
172             u64,
173     );
174     if m > i32::MAX as u64 {
175         bail!(ErrorKind::InvalidDuration)
176     } else {
177         Ok(m as i32)
178     }
179 }
180 
181 #[test]
test_to_millis_i32()182 fn test_to_millis_i32() {
183     use std::{i32, u32, u64};
184 
185     fn assert_invalid(d: Duration) {
186         match to_millis_i32(d) {
187             Err(Error(ErrorKind::InvalidDuration, _)) => {}
188             other => panic!("Expected Err(InvalidDuration) but got {:?}", other),
189         }
190     }
191     fn assert_valid(d: Duration, expected_millis: i32) {
192         let r = to_millis_i32(d);
193         match r {
194             Ok(m) => assert_eq!(expected_millis, m),
195             Err(e) => panic!("Expected Ok({}) but got Err({:?})", expected_millis, e),
196         }
197     }
198     assert_valid(Duration::from_millis(1_234), 1_234);
199     assert_valid(Duration::new(540, 123_456_789), 540_123);
200     assert_invalid(Duration::from_millis(u64::MAX));
201     assert_invalid(Duration::from_millis(u32::MAX as u64));
202     assert_invalid(Duration::from_millis(i32::MAX as u64 + 1));
203     assert_valid(Duration::from_millis(i32::MAX as u64 - 1), i32::MAX - 1);
204 }
205