1 use std::io::{Read, Write};
2 use std::mem;
3 use std::time::Duration;
4
5 use codecs::{ToByte, FromByte};
6 use crc::crc32;
7 use error::{Error, ErrorKind, KafkaCode, Result};
8
9 /// Macro to return Result<()> from multiple statements
10 macro_rules! try_multi {
11 ($($expr:expr),*) => ({
12 $(try!($expr);)*;
13 Ok(())
14 })
15 }
16
17 pub mod produce;
18 pub mod offset;
19 pub mod metadata;
20 pub mod consumer;
21
22 mod zreader;
23 pub mod fetch;
24
25 // ~ convenient re-exports for request/response types defined in the
26 // submodules
27 pub use self::fetch::FetchRequest;
28 pub use self::produce::{ProduceRequest, ProduceResponse};
29 pub use self::offset::{OffsetRequest, OffsetResponse};
30 pub use self::metadata::{MetadataRequest, MetadataResponse};
31 pub use self::consumer::{GroupCoordinatorRequest, GroupCoordinatorResponse, OffsetFetchVersion,
32 OffsetFetchRequest, OffsetFetchResponse, OffsetCommitVersion,
33 OffsetCommitRequest, OffsetCommitResponse};
34
35 // --------------------------------------------------------------------
36
37 const API_KEY_PRODUCE: i16 = 0;
38 const API_KEY_FETCH: i16 = 1;
39 const API_KEY_OFFSET: i16 = 2;
40 const API_KEY_METADATA: i16 = 3;
41 // 4-7 reserved for non-public kafka api services
42 const API_KEY_OFFSET_COMMIT: i16 = 8;
43 const API_KEY_OFFSET_FETCH: i16 = 9;
44 const API_KEY_GROUP_COORDINATOR: i16 = 10;
45
46 // the default version of Kafka API we are requesting
47 const API_VERSION: i16 = 0;
48
49 // --------------------------------------------------------------------
50
51 /// Provides a way to parse the full raw response data into a
52 /// particular response structure.
53 pub trait ResponseParser {
54 type T;
parse(&self, response: Vec<u8>) -> Result<Self::T>55 fn parse(&self, response: Vec<u8>) -> Result<Self::T>;
56 }
57
58 // --------------------------------------------------------------------
59
60 impl KafkaCode {
from_protocol(n: i16) -> Option<KafkaCode>61 fn from_protocol(n: i16) -> Option<KafkaCode> {
62 if n == 0 {
63 return None;
64 }
65 if n >= KafkaCode::OffsetOutOfRange as i16 && n <= KafkaCode::UnsupportedVersion as i16 {
66 return Some(unsafe { mem::transmute(n as i8) });
67 }
68 Some(KafkaCode::Unknown)
69 }
70 }
71
72 #[test]
test_kafka_code_from_protocol()73 fn test_kafka_code_from_protocol() {
74 use std::i16;
75
76 macro_rules! assert_kafka_code {
77 ($kcode:path, $n:expr) => {
78 assert!(if let Some($kcode) = KafkaCode::from_protocol($n) { true } else { false })
79 }
80 };
81
82 assert!(if let None = KafkaCode::from_protocol(0) {
83 true
84 } else {
85 false
86 });
87 assert_kafka_code!(KafkaCode::OffsetOutOfRange, KafkaCode::OffsetOutOfRange as i16);
88 assert_kafka_code!(KafkaCode::IllegalGeneration, KafkaCode::IllegalGeneration as i16);
89 assert_kafka_code!(KafkaCode::UnsupportedVersion, KafkaCode::UnsupportedVersion as i16);
90 assert_kafka_code!(KafkaCode::Unknown, KafkaCode::Unknown as i16);
91 // ~ test some un mapped non-zero codes; should all map to "unknown"
92 assert_kafka_code!(KafkaCode::Unknown, i16::MAX);
93 assert_kafka_code!(KafkaCode::Unknown, i16::MIN);
94 assert_kafka_code!(KafkaCode::Unknown, -100);
95 assert_kafka_code!(KafkaCode::Unknown, 100);
96 }
97
98 // a (sub-) module private method for error
99 impl Error {
from_protocol(n: i16) -> Option<Error>100 fn from_protocol(n: i16) -> Option<Error> {
101 KafkaCode::from_protocol(n).map(|err| ErrorKind::Kafka(err).into())
102 }
103 }
104
105 // --------------------------------------------------------------------
106
107 #[derive(Debug)]
108 pub struct HeaderRequest<'a> {
109 pub api_key: i16,
110 pub api_version: i16,
111 pub correlation_id: i32,
112 pub client_id: &'a str,
113 }
114
115 impl<'a> HeaderRequest<'a> {
new( api_key: i16, api_version: i16, correlation_id: i32, client_id: &'a str, ) -> HeaderRequest116 fn new(
117 api_key: i16,
118 api_version: i16,
119 correlation_id: i32,
120 client_id: &'a str,
121 ) -> HeaderRequest {
122 HeaderRequest {
123 api_key: api_key,
124 api_version: api_version,
125 correlation_id: correlation_id,
126 client_id: client_id,
127 }
128 }
129 }
130
131 impl<'a> ToByte for HeaderRequest<'a> {
encode<W: Write>(&self, buffer: &mut W) -> Result<()>132 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
133 try_multi!(
134 self.api_key.encode(buffer),
135 self.api_version.encode(buffer),
136 self.correlation_id.encode(buffer),
137 self.client_id.encode(buffer)
138 )
139 }
140 }
141
142 // --------------------------------------------------------------------
143
144 #[derive(Default, Debug, Clone)]
145 pub struct HeaderResponse {
146 pub correlation: i32,
147 }
148
149 impl FromByte for HeaderResponse {
150 type R = HeaderResponse;
151
152 #[allow(unused_must_use)]
decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>153 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
154 self.correlation.decode(buffer)
155 }
156 }
157
158 // --------------------------------------------------------------------
159
to_crc(data: &[u8]) -> u32160 pub fn to_crc(data: &[u8]) -> u32 {
161 crc32::checksum_ieee(data)
162 }
163
164 // --------------------------------------------------------------------
165
166 /// Safely converts a Duration into the number of milliseconds as a
167 /// i32 as often required in the kafka protocol.
to_millis_i32(d: Duration) -> Result<i32>168 pub fn to_millis_i32(d: Duration) -> Result<i32> {
169 use std::i32;
170 let m = d.as_secs().saturating_mul(1_000).saturating_add(
171 (d.subsec_nanos() / 1_000_000) as
172 u64,
173 );
174 if m > i32::MAX as u64 {
175 bail!(ErrorKind::InvalidDuration)
176 } else {
177 Ok(m as i32)
178 }
179 }
180
181 #[test]
test_to_millis_i32()182 fn test_to_millis_i32() {
183 use std::{i32, u32, u64};
184
185 fn assert_invalid(d: Duration) {
186 match to_millis_i32(d) {
187 Err(Error(ErrorKind::InvalidDuration, _)) => {}
188 other => panic!("Expected Err(InvalidDuration) but got {:?}", other),
189 }
190 }
191 fn assert_valid(d: Duration, expected_millis: i32) {
192 let r = to_millis_i32(d);
193 match r {
194 Ok(m) => assert_eq!(expected_millis, m),
195 Err(e) => panic!("Expected Ok({}) but got Err({:?})", expected_millis, e),
196 }
197 }
198 assert_valid(Duration::from_millis(1_234), 1_234);
199 assert_valid(Duration::new(540, 123_456_789), 540_123);
200 assert_invalid(Duration::from_millis(u64::MAX));
201 assert_invalid(Duration::from_millis(u32::MAX as u64));
202 assert_invalid(Duration::from_millis(i32::MAX as u64 + 1));
203 assert_valid(Duration::from_millis(i32::MAX as u64 - 1), i32::MAX - 1);
204 }
205