1 use std::collections::HashMap;
2 use std::collections::VecDeque;
3 use std::fmt;
4 use std::hash::BuildHasherDefault;
5 
6 use fnv::FnvHasher;
7 
8 use client::{KafkaClient, FetchGroupOffset, FetchOffset};
9 use client::metadata::Topics;
10 use error::{ErrorKind, Result, KafkaCode};
11 
12 use super::assignment::{Assignment, AssignmentRef, Assignments};
13 use super::config::Config;
14 
15 pub type PartitionHasher = BuildHasherDefault<FnvHasher>;
16 
17 // The "fetch state" for a particular topci partition.
18 #[derive(Debug)]
19 pub struct FetchState {
20     /// ~ specifies the offset which to fetch from
21     pub offset: i64,
22     /// ~ specifies the max_bytes to be fetched
23     pub max_bytes: i32,
24 }
25 
26 #[derive(Debug, PartialEq, Eq, Hash)]
27 pub struct TopicPartition {
28     /// ~ indirect reference to the topic through config.topic(..)
29     pub topic_ref: AssignmentRef,
30     /// ~ the partition to retry
31     pub partition: i32,
32 }
33 
34 #[derive(Debug)]
35 pub struct ConsumedOffset {
36     /// ~ the consumed offset
37     pub offset: i64,
38     /// ~ true if the consumed offset is chnaged but not committed to
39     /// kafka yet
40     pub dirty: bool,
41 }
42 
43 pub struct State {
44     /// Contains the topic partitions the consumer is assigned to
45     /// consume; this is a _read-only_ data structure
46     pub assignments: Assignments,
47 
48     /// Contains the information relevant for the next fetch operation
49     /// on the corresponding partitions
50     pub fetch_offsets: HashMap<TopicPartition, FetchState, PartitionHasher>,
51 
52     /// Specifies partitions to be fetched on their own in the next
53     /// poll request.
54     pub retry_partitions: VecDeque<TopicPartition>,
55 
56     /// Contains the offsets of messages marked as "consumed" (to be
57     /// committed)
58     pub consumed_offsets: HashMap<TopicPartition, ConsumedOffset, PartitionHasher>,
59 }
60 
61 impl<'a> fmt::Debug for State {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result62     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
63         write!(
64             f,
65             "State {{ assignments: {:?}, fetch_offsets: {:?}, retry_partitions: {:?}, \
66                 consumed_offsets: {:?} }}",
67             self.assignments,
68             self.fetch_offsets_debug(),
69             TopicPartitionsDebug {
70                 state: self,
71                 tps: &self.retry_partitions,
72             },
73             self.consumed_offsets_debug()
74         )
75     }
76 }
77 
78 impl State {
new( client: &mut KafkaClient, config: &Config, assignments: Assignments, ) -> Result<State>79     pub fn new(
80         client: &mut KafkaClient,
81         config: &Config,
82         assignments: Assignments,
83     ) -> Result<State> {
84         let (consumed_offsets, fetch_offsets) = {
85             let subscriptions = {
86                 let xs = assignments.as_slice();
87                 let mut subs = Vec::with_capacity(xs.len());
88                 for x in xs {
89                     subs.push(try!(determine_partitions(x, client.topics())));
90                 }
91                 subs
92             };
93             let n = subscriptions.iter().map(|s| s.partitions.len()).fold(
94                 0,
95                 |acc, n| {
96                     acc + n
97                 },
98             );
99             let consumed =
100                 try!(load_consumed_offsets(client, &config.group, &assignments, &subscriptions, n));
101 
102             let fetch_next =
103                 try!(load_fetch_states(client, config, &assignments, &subscriptions, &consumed, n));
104             (consumed, fetch_next)
105         };
106         Ok(State {
107             assignments: assignments,
108             fetch_offsets: fetch_offsets,
109             retry_partitions: VecDeque::new(),
110             consumed_offsets: consumed_offsets,
111         })
112     }
113 
topic_name(&self, assignment: AssignmentRef) -> &str114     pub fn topic_name(&self, assignment: AssignmentRef) -> &str {
115         self.assignments[assignment].topic()
116     }
117 
topic_ref(&self, name: &str) -> Option<AssignmentRef>118     pub fn topic_ref(&self, name: &str) -> Option<AssignmentRef> {
119         self.assignments.topic_ref(name)
120     }
121 
122     /// Returns a wrapper around `self.fetch_offsets` for nice dumping
123     /// in debug messages
fetch_offsets_debug<'a>(&'a self) -> OffsetsMapDebug<'a, FetchState>124     pub fn fetch_offsets_debug<'a>(&'a self) -> OffsetsMapDebug<'a, FetchState> {
125         OffsetsMapDebug {
126             state: self,
127             offsets: &self.fetch_offsets,
128         }
129     }
130 
consumed_offsets_debug<'a>(&'a self) -> OffsetsMapDebug<'a, ConsumedOffset>131     pub fn consumed_offsets_debug<'a>(&'a self) -> OffsetsMapDebug<'a, ConsumedOffset> {
132         OffsetsMapDebug {
133             state: self,
134             offsets: &self.consumed_offsets,
135         }
136     }
137 }
138 
139 // Specifies the actual partitions of a topic to be consumed
140 struct Subscription<'a> {
141     assignment: &'a Assignment, // the assignment - user configuration
142     partitions: Vec<i32>, // the actual partitions to be consumed
143 }
144 
145 /// Determines the partitions to be consumed according to the
146 /// specified topic and requested partitions configuration. Returns an
147 /// ordered list of the partition ids to consume.
determine_partitions<'a>( assignment: &'a Assignment, metadata: Topics, ) -> Result<Subscription<'a>>148 fn determine_partitions<'a>(
149     assignment: &'a Assignment,
150     metadata: Topics,
151 ) -> Result<Subscription<'a>> {
152     let topic = assignment.topic();
153     let req_partitions = assignment.partitions();
154     let avail_partitions = match metadata.partitions(topic) {
155         // ~ fail if the underlying topic is unkonwn to the given client
156         None => {
157             debug!("determine_partitions: no such topic: {} (all metadata: {:?})", topic, metadata);
158             bail!(ErrorKind::Kafka(KafkaCode::UnknownTopicOrPartition));
159         }
160         Some(tp) => tp,
161     };
162     let ps = if req_partitions.is_empty() {
163         // ~ no partitions configured ... use all available
164         let mut ps: Vec<i32> = Vec::with_capacity(avail_partitions.len());
165         for p in avail_partitions {
166             ps.push(p.id());
167         }
168         ps
169     } else {
170         // ~ validate that all partitions we're going to consume are
171         // available
172         let mut ps: Vec<i32> = Vec::with_capacity(req_partitions.len());
173         for &p in req_partitions {
174             match avail_partitions.partition(p) {
175                 None => {
176                     debug!(
177                         "determine_partitions: no such partition: \"{}:{}\" \
178                             (all metadata: {:?})",
179                         topic,
180                         p,
181                         metadata
182                     );
183                     bail!(ErrorKind::Kafka(KafkaCode::UnknownTopicOrPartition));
184                 }
185                 Some(_) => ps.push(p),
186             };
187         }
188         ps
189     };
190     Ok(Subscription {
191         assignment: assignment,
192         partitions: ps,
193     })
194 }
195 
196 // Fetches the so-far commited/consumed offsets for the configured
197 // group/topic/partitions.
load_consumed_offsets( client: &mut KafkaClient, group: &str, assignments: &Assignments, subscriptions: &[Subscription], result_capacity: usize, ) -> Result<HashMap<TopicPartition, ConsumedOffset, PartitionHasher>>198 fn load_consumed_offsets(
199     client: &mut KafkaClient,
200     group: &str,
201     assignments: &Assignments,
202     subscriptions: &[Subscription],
203     result_capacity: usize,
204 ) -> Result<HashMap<TopicPartition, ConsumedOffset, PartitionHasher>> {
205     assert!(!subscriptions.is_empty());
206     // ~ pre-allocate the right size
207     let mut offs = HashMap::with_capacity_and_hasher(result_capacity, PartitionHasher::default());
208     // ~ no group, no persisted consumed offsets
209     if group.is_empty() {
210         return Ok(offs);
211     }
212     // ~ otherwise try load them for the group
213     let tpos = try!(client.fetch_group_offsets(
214         group,
215         subscriptions.iter().flat_map(|s| {
216             let topic = s.assignment.topic();
217             s.partitions.iter().map(
218                 move |&p| FetchGroupOffset::new(topic, p),
219             )
220         }),
221     ));
222     for (topic, pos) in tpos {
223         for po in pos {
224             if po.offset != -1 {
225                 offs.insert(
226                     TopicPartition {
227                         topic_ref: assignments.topic_ref(&topic).expect("non-assigned topic"),
228                         partition: po.partition,
229                     },
230 
231                     // the committed offset is the next message to be fetched, so
232                     // the last consumed message is that - 1
233                     ConsumedOffset {
234                         offset: po.offset - 1,
235                         dirty: false,
236                     },
237                 );
238             }
239         }
240     }
241 
242     debug!("load_consumed_offsets: constructed consumed: {:#?}", offs);
243 
244     Ok(offs)
245 }
246 
247 /// Fetches the "next fetch" offsets/states based on the specified
248 /// subscriptions and the given consumed offsets.
load_fetch_states( client: &mut KafkaClient, config: &Config, assignments: &Assignments, subscriptions: &[Subscription], consumed_offsets: &HashMap<TopicPartition, ConsumedOffset, PartitionHasher>, result_capacity: usize, ) -> Result<HashMap<TopicPartition, FetchState, PartitionHasher>>249 fn load_fetch_states(
250     client: &mut KafkaClient,
251     config: &Config,
252     assignments: &Assignments,
253     subscriptions: &[Subscription],
254     consumed_offsets: &HashMap<TopicPartition, ConsumedOffset, PartitionHasher>,
255     result_capacity: usize,
256 ) -> Result<HashMap<TopicPartition, FetchState, PartitionHasher>> {
257     fn load_partition_offsets(
258         client: &mut KafkaClient,
259         topics: &[&str],
260         offset: FetchOffset,
261     ) -> Result<HashMap<String, HashMap<i32, i64, PartitionHasher>>> {
262         let toffs = try!(client.fetch_offsets(topics, offset));
263         let mut m = HashMap::with_capacity(toffs.len());
264         for (topic, poffs) in toffs {
265             let mut pidx =
266                 HashMap::with_capacity_and_hasher(poffs.len(), PartitionHasher::default());
267 
268             for poff in poffs {
269                 pidx.insert(poff.partition, poff.offset);
270             }
271 
272             m.insert(topic, pidx);
273         }
274         Ok(m)
275     }
276 
277     let mut fetch_offsets =
278         HashMap::with_capacity_and_hasher(result_capacity, PartitionHasher::default());
279     let max_bytes = client.fetch_max_bytes_per_partition();
280     let subscription_topics: Vec<_> = subscriptions.iter().map(|s| s.assignment.topic()).collect();
281     if consumed_offsets.is_empty() {
282         // ~ if there are no offsets on behalf of the consumer
283         // group - if any - we can directly use the fallback offsets.
284         let offsets =
285             try!(load_partition_offsets(client, &subscription_topics, config.fallback_offset));
286         for s in subscriptions {
287             let topic_ref = assignments.topic_ref(s.assignment.topic()).expect(
288                 "unassigned subscription",
289             );
290             match offsets.get(s.assignment.topic()) {
291                 None => {
292                     debug!(
293                         "load_fetch_states: failed to load fallback offsets for: {}",
294                         s.assignment.topic()
295                     );
296                     bail!(ErrorKind::Kafka(KafkaCode::UnknownTopicOrPartition));
297                 }
298                 Some(offsets) => {
299                     for p in &s.partitions {
300                         fetch_offsets.insert(
301                             TopicPartition {
302                                 topic_ref: topic_ref,
303                                 partition: *p,
304                             },
305                             FetchState {
306                                 offset: *offsets.get(p).unwrap_or(&-1),
307                                 max_bytes: max_bytes,
308                             },
309                         );
310                     }
311                 }
312             }
313         }
314     } else {
315         // fetch the earliest and latest available offsets
316         let latest =
317             try!(load_partition_offsets(client, &subscription_topics, FetchOffset::Latest));
318         let earliest =
319             try!(load_partition_offsets(client, &subscription_topics, FetchOffset::Earliest));
320         // ~ for each subscribed partition if we have a
321         // consumed_offset verify it is in the earliest/latest range
322         // and use that consumed_offset+1 as the fetch_message.
323         for s in subscriptions {
324             let topic_ref = assignments.topic_ref(s.assignment.topic()).expect(
325                 "unassigned subscription",
326             );
327             for p in &s.partitions {
328                 let l_off = *latest
329                     .get(s.assignment.topic())
330                     .and_then(|ps| ps.get(p))
331                     .unwrap_or(&-1);
332                 let e_off = *earliest
333                     .get(s.assignment.topic())
334                     .and_then(|ps| ps.get(p))
335                     .unwrap_or(&-1);
336 
337                 let tp = TopicPartition {
338                     topic_ref: topic_ref,
339                     partition: *p,
340                 };
341 
342                 // the "latest" offset is the offset of the "next coming message"
343                 let offset = match consumed_offsets.get(&tp) {
344                     Some(co) if co.offset >= e_off && co.offset < l_off => co.offset + 1,
345                     _ => {
346                         match config.fallback_offset {
347                             FetchOffset::Latest => l_off,
348                             FetchOffset::Earliest => e_off,
349                             _ => {
350                                 debug!(
351                                     "cannot determine fetch offset \
352                                         (group: {} / topic: {} / partition: {})",
353                                     &config.group,
354                                     s.assignment.topic(),
355                                     p
356                                 );
357                                 bail!(ErrorKind::Kafka(KafkaCode::Unknown));
358                             }
359                         }
360                     }
361                 };
362                 fetch_offsets.insert(
363                     tp,
364                     FetchState {
365                         offset: offset,
366                         max_bytes: max_bytes,
367                     },
368                 );
369             }
370         }
371     }
372     Ok(fetch_offsets)
373 }
374 
375 pub struct OffsetsMapDebug<'a, T: 'a> {
376     state: &'a State,
377     offsets: &'a HashMap<TopicPartition, T, PartitionHasher>,
378 }
379 
380 impl<'a, T: fmt::Debug + 'a> fmt::Debug for OffsetsMapDebug<'a, T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result381     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
382         try!(write!(f, "{{"));
383         for (i, (tp, v)) in self.offsets.iter().enumerate() {
384             if i != 0 {
385                 try!(write!(f, ", "));
386             }
387             let topic = self.state.topic_name(tp.topic_ref);
388             try!(write!(f, "\"{}:{}\": {:?}", topic, tp.partition, v));
389         }
390         write!(f, "}}")
391     }
392 }
393 
394 struct TopicPartitionsDebug<'a> {
395     state: &'a State,
396     tps: &'a VecDeque<TopicPartition>,
397 }
398 
399 impl<'a> fmt::Debug for TopicPartitionsDebug<'a> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result400     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
401         try!(write!(f, "["));
402         for (i, tp) in self.tps.iter().enumerate() {
403             if i != 0 {
404                 try!(write!(f, " ,"));
405             }
406             try!(write!(f, "\"{}:{}\"", self.state.topic_name(tp.topic_ref), tp.partition));
407         }
408         write!(f, "]")
409     }
410 }
411