1 use std::collections::HashMap;
2 use std::collections::VecDeque;
3 use std::fmt;
4 use std::hash::BuildHasherDefault;
5
6 use fnv::FnvHasher;
7
8 use client::{KafkaClient, FetchGroupOffset, FetchOffset};
9 use client::metadata::Topics;
10 use error::{ErrorKind, Result, KafkaCode};
11
12 use super::assignment::{Assignment, AssignmentRef, Assignments};
13 use super::config::Config;
14
15 pub type PartitionHasher = BuildHasherDefault<FnvHasher>;
16
17 // The "fetch state" for a particular topci partition.
18 #[derive(Debug)]
19 pub struct FetchState {
20 /// ~ specifies the offset which to fetch from
21 pub offset: i64,
22 /// ~ specifies the max_bytes to be fetched
23 pub max_bytes: i32,
24 }
25
26 #[derive(Debug, PartialEq, Eq, Hash)]
27 pub struct TopicPartition {
28 /// ~ indirect reference to the topic through config.topic(..)
29 pub topic_ref: AssignmentRef,
30 /// ~ the partition to retry
31 pub partition: i32,
32 }
33
34 #[derive(Debug)]
35 pub struct ConsumedOffset {
36 /// ~ the consumed offset
37 pub offset: i64,
38 /// ~ true if the consumed offset is chnaged but not committed to
39 /// kafka yet
40 pub dirty: bool,
41 }
42
43 pub struct State {
44 /// Contains the topic partitions the consumer is assigned to
45 /// consume; this is a _read-only_ data structure
46 pub assignments: Assignments,
47
48 /// Contains the information relevant for the next fetch operation
49 /// on the corresponding partitions
50 pub fetch_offsets: HashMap<TopicPartition, FetchState, PartitionHasher>,
51
52 /// Specifies partitions to be fetched on their own in the next
53 /// poll request.
54 pub retry_partitions: VecDeque<TopicPartition>,
55
56 /// Contains the offsets of messages marked as "consumed" (to be
57 /// committed)
58 pub consumed_offsets: HashMap<TopicPartition, ConsumedOffset, PartitionHasher>,
59 }
60
61 impl<'a> fmt::Debug for State {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result62 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
63 write!(
64 f,
65 "State {{ assignments: {:?}, fetch_offsets: {:?}, retry_partitions: {:?}, \
66 consumed_offsets: {:?} }}",
67 self.assignments,
68 self.fetch_offsets_debug(),
69 TopicPartitionsDebug {
70 state: self,
71 tps: &self.retry_partitions,
72 },
73 self.consumed_offsets_debug()
74 )
75 }
76 }
77
78 impl State {
new( client: &mut KafkaClient, config: &Config, assignments: Assignments, ) -> Result<State>79 pub fn new(
80 client: &mut KafkaClient,
81 config: &Config,
82 assignments: Assignments,
83 ) -> Result<State> {
84 let (consumed_offsets, fetch_offsets) = {
85 let subscriptions = {
86 let xs = assignments.as_slice();
87 let mut subs = Vec::with_capacity(xs.len());
88 for x in xs {
89 subs.push(try!(determine_partitions(x, client.topics())));
90 }
91 subs
92 };
93 let n = subscriptions.iter().map(|s| s.partitions.len()).fold(
94 0,
95 |acc, n| {
96 acc + n
97 },
98 );
99 let consumed =
100 try!(load_consumed_offsets(client, &config.group, &assignments, &subscriptions, n));
101
102 let fetch_next =
103 try!(load_fetch_states(client, config, &assignments, &subscriptions, &consumed, n));
104 (consumed, fetch_next)
105 };
106 Ok(State {
107 assignments: assignments,
108 fetch_offsets: fetch_offsets,
109 retry_partitions: VecDeque::new(),
110 consumed_offsets: consumed_offsets,
111 })
112 }
113
topic_name(&self, assignment: AssignmentRef) -> &str114 pub fn topic_name(&self, assignment: AssignmentRef) -> &str {
115 self.assignments[assignment].topic()
116 }
117
topic_ref(&self, name: &str) -> Option<AssignmentRef>118 pub fn topic_ref(&self, name: &str) -> Option<AssignmentRef> {
119 self.assignments.topic_ref(name)
120 }
121
122 /// Returns a wrapper around `self.fetch_offsets` for nice dumping
123 /// in debug messages
fetch_offsets_debug<'a>(&'a self) -> OffsetsMapDebug<'a, FetchState>124 pub fn fetch_offsets_debug<'a>(&'a self) -> OffsetsMapDebug<'a, FetchState> {
125 OffsetsMapDebug {
126 state: self,
127 offsets: &self.fetch_offsets,
128 }
129 }
130
consumed_offsets_debug<'a>(&'a self) -> OffsetsMapDebug<'a, ConsumedOffset>131 pub fn consumed_offsets_debug<'a>(&'a self) -> OffsetsMapDebug<'a, ConsumedOffset> {
132 OffsetsMapDebug {
133 state: self,
134 offsets: &self.consumed_offsets,
135 }
136 }
137 }
138
139 // Specifies the actual partitions of a topic to be consumed
140 struct Subscription<'a> {
141 assignment: &'a Assignment, // the assignment - user configuration
142 partitions: Vec<i32>, // the actual partitions to be consumed
143 }
144
145 /// Determines the partitions to be consumed according to the
146 /// specified topic and requested partitions configuration. Returns an
147 /// ordered list of the partition ids to consume.
determine_partitions<'a>( assignment: &'a Assignment, metadata: Topics, ) -> Result<Subscription<'a>>148 fn determine_partitions<'a>(
149 assignment: &'a Assignment,
150 metadata: Topics,
151 ) -> Result<Subscription<'a>> {
152 let topic = assignment.topic();
153 let req_partitions = assignment.partitions();
154 let avail_partitions = match metadata.partitions(topic) {
155 // ~ fail if the underlying topic is unkonwn to the given client
156 None => {
157 debug!("determine_partitions: no such topic: {} (all metadata: {:?})", topic, metadata);
158 bail!(ErrorKind::Kafka(KafkaCode::UnknownTopicOrPartition));
159 }
160 Some(tp) => tp,
161 };
162 let ps = if req_partitions.is_empty() {
163 // ~ no partitions configured ... use all available
164 let mut ps: Vec<i32> = Vec::with_capacity(avail_partitions.len());
165 for p in avail_partitions {
166 ps.push(p.id());
167 }
168 ps
169 } else {
170 // ~ validate that all partitions we're going to consume are
171 // available
172 let mut ps: Vec<i32> = Vec::with_capacity(req_partitions.len());
173 for &p in req_partitions {
174 match avail_partitions.partition(p) {
175 None => {
176 debug!(
177 "determine_partitions: no such partition: \"{}:{}\" \
178 (all metadata: {:?})",
179 topic,
180 p,
181 metadata
182 );
183 bail!(ErrorKind::Kafka(KafkaCode::UnknownTopicOrPartition));
184 }
185 Some(_) => ps.push(p),
186 };
187 }
188 ps
189 };
190 Ok(Subscription {
191 assignment: assignment,
192 partitions: ps,
193 })
194 }
195
196 // Fetches the so-far commited/consumed offsets for the configured
197 // group/topic/partitions.
load_consumed_offsets( client: &mut KafkaClient, group: &str, assignments: &Assignments, subscriptions: &[Subscription], result_capacity: usize, ) -> Result<HashMap<TopicPartition, ConsumedOffset, PartitionHasher>>198 fn load_consumed_offsets(
199 client: &mut KafkaClient,
200 group: &str,
201 assignments: &Assignments,
202 subscriptions: &[Subscription],
203 result_capacity: usize,
204 ) -> Result<HashMap<TopicPartition, ConsumedOffset, PartitionHasher>> {
205 assert!(!subscriptions.is_empty());
206 // ~ pre-allocate the right size
207 let mut offs = HashMap::with_capacity_and_hasher(result_capacity, PartitionHasher::default());
208 // ~ no group, no persisted consumed offsets
209 if group.is_empty() {
210 return Ok(offs);
211 }
212 // ~ otherwise try load them for the group
213 let tpos = try!(client.fetch_group_offsets(
214 group,
215 subscriptions.iter().flat_map(|s| {
216 let topic = s.assignment.topic();
217 s.partitions.iter().map(
218 move |&p| FetchGroupOffset::new(topic, p),
219 )
220 }),
221 ));
222 for (topic, pos) in tpos {
223 for po in pos {
224 if po.offset != -1 {
225 offs.insert(
226 TopicPartition {
227 topic_ref: assignments.topic_ref(&topic).expect("non-assigned topic"),
228 partition: po.partition,
229 },
230
231 // the committed offset is the next message to be fetched, so
232 // the last consumed message is that - 1
233 ConsumedOffset {
234 offset: po.offset - 1,
235 dirty: false,
236 },
237 );
238 }
239 }
240 }
241
242 debug!("load_consumed_offsets: constructed consumed: {:#?}", offs);
243
244 Ok(offs)
245 }
246
247 /// Fetches the "next fetch" offsets/states based on the specified
248 /// subscriptions and the given consumed offsets.
load_fetch_states( client: &mut KafkaClient, config: &Config, assignments: &Assignments, subscriptions: &[Subscription], consumed_offsets: &HashMap<TopicPartition, ConsumedOffset, PartitionHasher>, result_capacity: usize, ) -> Result<HashMap<TopicPartition, FetchState, PartitionHasher>>249 fn load_fetch_states(
250 client: &mut KafkaClient,
251 config: &Config,
252 assignments: &Assignments,
253 subscriptions: &[Subscription],
254 consumed_offsets: &HashMap<TopicPartition, ConsumedOffset, PartitionHasher>,
255 result_capacity: usize,
256 ) -> Result<HashMap<TopicPartition, FetchState, PartitionHasher>> {
257 fn load_partition_offsets(
258 client: &mut KafkaClient,
259 topics: &[&str],
260 offset: FetchOffset,
261 ) -> Result<HashMap<String, HashMap<i32, i64, PartitionHasher>>> {
262 let toffs = try!(client.fetch_offsets(topics, offset));
263 let mut m = HashMap::with_capacity(toffs.len());
264 for (topic, poffs) in toffs {
265 let mut pidx =
266 HashMap::with_capacity_and_hasher(poffs.len(), PartitionHasher::default());
267
268 for poff in poffs {
269 pidx.insert(poff.partition, poff.offset);
270 }
271
272 m.insert(topic, pidx);
273 }
274 Ok(m)
275 }
276
277 let mut fetch_offsets =
278 HashMap::with_capacity_and_hasher(result_capacity, PartitionHasher::default());
279 let max_bytes = client.fetch_max_bytes_per_partition();
280 let subscription_topics: Vec<_> = subscriptions.iter().map(|s| s.assignment.topic()).collect();
281 if consumed_offsets.is_empty() {
282 // ~ if there are no offsets on behalf of the consumer
283 // group - if any - we can directly use the fallback offsets.
284 let offsets =
285 try!(load_partition_offsets(client, &subscription_topics, config.fallback_offset));
286 for s in subscriptions {
287 let topic_ref = assignments.topic_ref(s.assignment.topic()).expect(
288 "unassigned subscription",
289 );
290 match offsets.get(s.assignment.topic()) {
291 None => {
292 debug!(
293 "load_fetch_states: failed to load fallback offsets for: {}",
294 s.assignment.topic()
295 );
296 bail!(ErrorKind::Kafka(KafkaCode::UnknownTopicOrPartition));
297 }
298 Some(offsets) => {
299 for p in &s.partitions {
300 fetch_offsets.insert(
301 TopicPartition {
302 topic_ref: topic_ref,
303 partition: *p,
304 },
305 FetchState {
306 offset: *offsets.get(p).unwrap_or(&-1),
307 max_bytes: max_bytes,
308 },
309 );
310 }
311 }
312 }
313 }
314 } else {
315 // fetch the earliest and latest available offsets
316 let latest =
317 try!(load_partition_offsets(client, &subscription_topics, FetchOffset::Latest));
318 let earliest =
319 try!(load_partition_offsets(client, &subscription_topics, FetchOffset::Earliest));
320 // ~ for each subscribed partition if we have a
321 // consumed_offset verify it is in the earliest/latest range
322 // and use that consumed_offset+1 as the fetch_message.
323 for s in subscriptions {
324 let topic_ref = assignments.topic_ref(s.assignment.topic()).expect(
325 "unassigned subscription",
326 );
327 for p in &s.partitions {
328 let l_off = *latest
329 .get(s.assignment.topic())
330 .and_then(|ps| ps.get(p))
331 .unwrap_or(&-1);
332 let e_off = *earliest
333 .get(s.assignment.topic())
334 .and_then(|ps| ps.get(p))
335 .unwrap_or(&-1);
336
337 let tp = TopicPartition {
338 topic_ref: topic_ref,
339 partition: *p,
340 };
341
342 // the "latest" offset is the offset of the "next coming message"
343 let offset = match consumed_offsets.get(&tp) {
344 Some(co) if co.offset >= e_off && co.offset < l_off => co.offset + 1,
345 _ => {
346 match config.fallback_offset {
347 FetchOffset::Latest => l_off,
348 FetchOffset::Earliest => e_off,
349 _ => {
350 debug!(
351 "cannot determine fetch offset \
352 (group: {} / topic: {} / partition: {})",
353 &config.group,
354 s.assignment.topic(),
355 p
356 );
357 bail!(ErrorKind::Kafka(KafkaCode::Unknown));
358 }
359 }
360 }
361 };
362 fetch_offsets.insert(
363 tp,
364 FetchState {
365 offset: offset,
366 max_bytes: max_bytes,
367 },
368 );
369 }
370 }
371 }
372 Ok(fetch_offsets)
373 }
374
375 pub struct OffsetsMapDebug<'a, T: 'a> {
376 state: &'a State,
377 offsets: &'a HashMap<TopicPartition, T, PartitionHasher>,
378 }
379
380 impl<'a, T: fmt::Debug + 'a> fmt::Debug for OffsetsMapDebug<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result381 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
382 try!(write!(f, "{{"));
383 for (i, (tp, v)) in self.offsets.iter().enumerate() {
384 if i != 0 {
385 try!(write!(f, ", "));
386 }
387 let topic = self.state.topic_name(tp.topic_ref);
388 try!(write!(f, "\"{}:{}\": {:?}", topic, tp.partition, v));
389 }
390 write!(f, "}}")
391 }
392 }
393
394 struct TopicPartitionsDebug<'a> {
395 state: &'a State,
396 tps: &'a VecDeque<TopicPartition>,
397 }
398
399 impl<'a> fmt::Debug for TopicPartitionsDebug<'a> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result400 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
401 try!(write!(f, "["));
402 for (i, tp) in self.tps.iter().enumerate() {
403 if i != 0 {
404 try!(write!(f, " ,"));
405 }
406 try!(write!(f, "\"{}:{}\"", self.state.topic_name(tp.topic_ref), tp.partition));
407 }
408 write!(f, "]")
409 }
410 }
411