1 use super::plumbing::*;
2 use super::*;
3 
4 /// `Flatten` turns each element to a parallel iterator, then flattens these iterators
5 /// together. This struct is created by the [`flatten()`] method on [`ParallelIterator`].
6 ///
7 /// [`flatten()`]: trait.ParallelIterator.html#method.flatten
8 /// [`ParallelIterator`]: trait.ParallelIterator.html
9 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
10 #[derive(Debug, Clone)]
11 pub struct Flatten<I: ParallelIterator> {
12     base: I,
13 }
14 
15 impl<I> Flatten<I>
16 where
17     I: ParallelIterator,
18     I::Item: IntoParallelIterator,
19 {
20     /// Creates a new `Flatten` iterator.
new(base: I) -> Self21     pub(super) fn new(base: I) -> Self {
22         Flatten { base }
23     }
24 }
25 
26 impl<I> ParallelIterator for Flatten<I>
27 where
28     I: ParallelIterator,
29     I::Item: IntoParallelIterator,
30 {
31     type Item = <I::Item as IntoParallelIterator>::Item;
32 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,33     fn drive_unindexed<C>(self, consumer: C) -> C::Result
34     where
35         C: UnindexedConsumer<Self::Item>,
36     {
37         let consumer = FlattenConsumer::new(consumer);
38         self.base.drive_unindexed(consumer)
39     }
40 }
41 
42 /// ////////////////////////////////////////////////////////////////////////
43 /// Consumer implementation
44 
45 struct FlattenConsumer<C> {
46     base: C,
47 }
48 
49 impl<C> FlattenConsumer<C> {
new(base: C) -> Self50     fn new(base: C) -> Self {
51         FlattenConsumer { base }
52     }
53 }
54 
55 impl<T, C> Consumer<T> for FlattenConsumer<C>
56 where
57     C: UnindexedConsumer<T::Item>,
58     T: IntoParallelIterator,
59 {
60     type Folder = FlattenFolder<C, C::Result>;
61     type Reducer = C::Reducer;
62     type Result = C::Result;
63 
split_at(self, index: usize) -> (Self, Self, C::Reducer)64     fn split_at(self, index: usize) -> (Self, Self, C::Reducer) {
65         let (left, right, reducer) = self.base.split_at(index);
66         (
67             FlattenConsumer::new(left),
68             FlattenConsumer::new(right),
69             reducer,
70         )
71     }
72 
into_folder(self) -> Self::Folder73     fn into_folder(self) -> Self::Folder {
74         FlattenFolder {
75             base: self.base,
76             previous: None,
77         }
78     }
79 
full(&self) -> bool80     fn full(&self) -> bool {
81         self.base.full()
82     }
83 }
84 
85 impl<T, C> UnindexedConsumer<T> for FlattenConsumer<C>
86 where
87     C: UnindexedConsumer<T::Item>,
88     T: IntoParallelIterator,
89 {
split_off_left(&self) -> Self90     fn split_off_left(&self) -> Self {
91         FlattenConsumer::new(self.base.split_off_left())
92     }
93 
to_reducer(&self) -> Self::Reducer94     fn to_reducer(&self) -> Self::Reducer {
95         self.base.to_reducer()
96     }
97 }
98 
99 struct FlattenFolder<C, R> {
100     base: C,
101     previous: Option<R>,
102 }
103 
104 impl<T, C> Folder<T> for FlattenFolder<C, C::Result>
105 where
106     C: UnindexedConsumer<T::Item>,
107     T: IntoParallelIterator,
108 {
109     type Result = C::Result;
110 
consume(self, item: T) -> Self111     fn consume(self, item: T) -> Self {
112         let par_iter = item.into_par_iter();
113         let consumer = self.base.split_off_left();
114         let result = par_iter.drive_unindexed(consumer);
115 
116         let previous = match self.previous {
117             None => Some(result),
118             Some(previous) => {
119                 let reducer = self.base.to_reducer();
120                 Some(reducer.reduce(previous, result))
121             }
122         };
123 
124         FlattenFolder {
125             base: self.base,
126             previous,
127         }
128     }
129 
complete(self) -> Self::Result130     fn complete(self) -> Self::Result {
131         match self.previous {
132             Some(previous) => previous,
133             None => self.base.into_folder().complete(),
134         }
135     }
136 
full(&self) -> bool137     fn full(&self) -> bool {
138         self.base.full()
139     }
140 }
141