1 use super::plumbing::*;
2 use super::*;
3 
4 use std::fmt::{self, Debug};
5 
6 impl<U, I, ID, F> Fold<I, ID, F>
7 where
8     I: ParallelIterator,
9     F: Fn(U, I::Item) -> U + Sync + Send,
10     ID: Fn() -> U + Sync + Send,
11     U: Send,
12 {
new(base: I, identity: ID, fold_op: F) -> Self13     pub(super) fn new(base: I, identity: ID, fold_op: F) -> Self {
14         Fold {
15             base,
16             identity,
17             fold_op,
18         }
19     }
20 }
21 
22 /// `Fold` is an iterator that applies a function over an iterator producing a single value.
23 /// This struct is created by the [`fold()`] method on [`ParallelIterator`]
24 ///
25 /// [`fold()`]: trait.ParallelIterator.html#method.fold
26 /// [`ParallelIterator`]: trait.ParallelIterator.html
27 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
28 #[derive(Clone)]
29 pub struct Fold<I, ID, F> {
30     base: I,
31     identity: ID,
32     fold_op: F,
33 }
34 
35 impl<I: ParallelIterator + Debug, ID, F> Debug for Fold<I, ID, F> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result36     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37         f.debug_struct("Fold").field("base", &self.base).finish()
38     }
39 }
40 
41 impl<U, I, ID, F> ParallelIterator for Fold<I, ID, F>
42 where
43     I: ParallelIterator,
44     F: Fn(U, I::Item) -> U + Sync + Send,
45     ID: Fn() -> U + Sync + Send,
46     U: Send,
47 {
48     type Item = U;
49 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,50     fn drive_unindexed<C>(self, consumer: C) -> C::Result
51     where
52         C: UnindexedConsumer<Self::Item>,
53     {
54         let consumer1 = FoldConsumer {
55             base: consumer,
56             fold_op: &self.fold_op,
57             identity: &self.identity,
58         };
59         self.base.drive_unindexed(consumer1)
60     }
61 }
62 
63 struct FoldConsumer<'c, C, ID, F> {
64     base: C,
65     fold_op: &'c F,
66     identity: &'c ID,
67 }
68 
69 impl<'r, U, T, C, ID, F> Consumer<T> for FoldConsumer<'r, C, ID, F>
70 where
71     C: Consumer<U>,
72     F: Fn(U, T) -> U + Sync,
73     ID: Fn() -> U + Sync,
74     U: Send,
75 {
76     type Folder = FoldFolder<'r, C::Folder, U, F>;
77     type Reducer = C::Reducer;
78     type Result = C::Result;
79 
split_at(self, index: usize) -> (Self, Self, Self::Reducer)80     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
81         let (left, right, reducer) = self.base.split_at(index);
82         (
83             FoldConsumer { base: left, ..self },
84             FoldConsumer {
85                 base: right,
86                 ..self
87             },
88             reducer,
89         )
90     }
91 
into_folder(self) -> Self::Folder92     fn into_folder(self) -> Self::Folder {
93         FoldFolder {
94             base: self.base.into_folder(),
95             item: (self.identity)(),
96             fold_op: self.fold_op,
97         }
98     }
99 
full(&self) -> bool100     fn full(&self) -> bool {
101         self.base.full()
102     }
103 }
104 
105 impl<'r, U, T, C, ID, F> UnindexedConsumer<T> for FoldConsumer<'r, C, ID, F>
106 where
107     C: UnindexedConsumer<U>,
108     F: Fn(U, T) -> U + Sync,
109     ID: Fn() -> U + Sync,
110     U: Send,
111 {
split_off_left(&self) -> Self112     fn split_off_left(&self) -> Self {
113         FoldConsumer {
114             base: self.base.split_off_left(),
115             ..*self
116         }
117     }
118 
to_reducer(&self) -> Self::Reducer119     fn to_reducer(&self) -> Self::Reducer {
120         self.base.to_reducer()
121     }
122 }
123 
124 struct FoldFolder<'r, C, ID, F> {
125     base: C,
126     fold_op: &'r F,
127     item: ID,
128 }
129 
130 impl<'r, C, ID, F, T> Folder<T> for FoldFolder<'r, C, ID, F>
131 where
132     C: Folder<ID>,
133     F: Fn(ID, T) -> ID + Sync,
134 {
135     type Result = C::Result;
136 
consume(self, item: T) -> Self137     fn consume(self, item: T) -> Self {
138         let item = (self.fold_op)(self.item, item);
139         FoldFolder {
140             base: self.base,
141             fold_op: self.fold_op,
142             item,
143         }
144     }
145 
consume_iter<I>(self, iter: I) -> Self where I: IntoIterator<Item = T>,146     fn consume_iter<I>(self, iter: I) -> Self
147     where
148         I: IntoIterator<Item = T>,
149     {
150         fn not_full<C, ID, T>(base: &C) -> impl Fn(&T) -> bool + '_
151         where
152             C: Folder<ID>,
153         {
154             move |_| !base.full()
155         }
156 
157         let base = self.base;
158         let item = iter
159             .into_iter()
160             // stop iterating if another thread has finished
161             .take_while(not_full(&base))
162             .fold(self.item, self.fold_op);
163 
164         FoldFolder {
165             base,
166             item,
167             fold_op: self.fold_op,
168         }
169     }
170 
complete(self) -> C::Result171     fn complete(self) -> C::Result {
172         self.base.consume(self.item).complete()
173     }
174 
full(&self) -> bool175     fn full(&self) -> bool {
176         self.base.full()
177     }
178 }
179 
180 // ///////////////////////////////////////////////////////////////////////////
181 
182 impl<U, I, F> FoldWith<I, U, F>
183 where
184     I: ParallelIterator,
185     F: Fn(U, I::Item) -> U + Sync + Send,
186     U: Send + Clone,
187 {
new(base: I, item: U, fold_op: F) -> Self188     pub(super) fn new(base: I, item: U, fold_op: F) -> Self {
189         FoldWith {
190             base,
191             item,
192             fold_op,
193         }
194     }
195 }
196 
197 /// `FoldWith` is an iterator that applies a function over an iterator producing a single value.
198 /// This struct is created by the [`fold_with()`] method on [`ParallelIterator`]
199 ///
200 /// [`fold_with()`]: trait.ParallelIterator.html#method.fold_with
201 /// [`ParallelIterator`]: trait.ParallelIterator.html
202 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
203 #[derive(Clone)]
204 pub struct FoldWith<I, U, F> {
205     base: I,
206     item: U,
207     fold_op: F,
208 }
209 
210 impl<I: ParallelIterator + Debug, U: Debug, F> Debug for FoldWith<I, U, F> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result211     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212         f.debug_struct("FoldWith")
213             .field("base", &self.base)
214             .field("item", &self.item)
215             .finish()
216     }
217 }
218 
219 impl<U, I, F> ParallelIterator for FoldWith<I, U, F>
220 where
221     I: ParallelIterator,
222     F: Fn(U, I::Item) -> U + Sync + Send,
223     U: Send + Clone,
224 {
225     type Item = U;
226 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,227     fn drive_unindexed<C>(self, consumer: C) -> C::Result
228     where
229         C: UnindexedConsumer<Self::Item>,
230     {
231         let consumer1 = FoldWithConsumer {
232             base: consumer,
233             item: self.item,
234             fold_op: &self.fold_op,
235         };
236         self.base.drive_unindexed(consumer1)
237     }
238 }
239 
240 struct FoldWithConsumer<'c, C, U, F> {
241     base: C,
242     item: U,
243     fold_op: &'c F,
244 }
245 
246 impl<'r, U, T, C, F> Consumer<T> for FoldWithConsumer<'r, C, U, F>
247 where
248     C: Consumer<U>,
249     F: Fn(U, T) -> U + Sync,
250     U: Send + Clone,
251 {
252     type Folder = FoldFolder<'r, C::Folder, U, F>;
253     type Reducer = C::Reducer;
254     type Result = C::Result;
255 
split_at(self, index: usize) -> (Self, Self, Self::Reducer)256     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
257         let (left, right, reducer) = self.base.split_at(index);
258         (
259             FoldWithConsumer {
260                 base: left,
261                 item: self.item.clone(),
262                 ..self
263             },
264             FoldWithConsumer {
265                 base: right,
266                 ..self
267             },
268             reducer,
269         )
270     }
271 
into_folder(self) -> Self::Folder272     fn into_folder(self) -> Self::Folder {
273         FoldFolder {
274             base: self.base.into_folder(),
275             item: self.item,
276             fold_op: self.fold_op,
277         }
278     }
279 
full(&self) -> bool280     fn full(&self) -> bool {
281         self.base.full()
282     }
283 }
284 
285 impl<'r, U, T, C, F> UnindexedConsumer<T> for FoldWithConsumer<'r, C, U, F>
286 where
287     C: UnindexedConsumer<U>,
288     F: Fn(U, T) -> U + Sync,
289     U: Send + Clone,
290 {
split_off_left(&self) -> Self291     fn split_off_left(&self) -> Self {
292         FoldWithConsumer {
293             base: self.base.split_off_left(),
294             item: self.item.clone(),
295             ..*self
296         }
297     }
298 
to_reducer(&self) -> Self::Reducer299     fn to_reducer(&self) -> Self::Reducer {
300         self.base.to_reducer()
301     }
302 }
303