1 use super::plumbing::*;
2 use super::*;
3 
4 use std::fmt::{self, Debug};
5 
6 
7 /// `Update` is an iterator that mutates the elements of an
8 /// underlying iterator before they are yielded.
9 ///
10 /// This struct is created by the [`update()`] method on [`ParallelIterator`]
11 ///
12 /// [`update()`]: trait.ParallelIterator.html#method.update
13 /// [`ParallelIterator`]: trait.ParallelIterator.html
14 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
15 #[derive(Clone)]
16 pub struct Update<I: ParallelIterator, F> {
17     base: I,
18     update_op: F,
19 }
20 
21 impl<I: ParallelIterator + Debug, F> Debug for Update<I, F> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result22     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
23         f.debug_struct("Update")
24             .field("base", &self.base)
25             .finish()
26     }
27 }
28 
29 /// Create a new `Update` iterator.
30 ///
31 /// NB: a free fn because it is NOT part of the end-user API.
new<I, F>(base: I, update_op: F) -> Update<I, F> where I: ParallelIterator32 pub fn new<I, F>(base: I, update_op: F) -> Update<I, F>
33     where I: ParallelIterator
34 {
35     Update {
36         base: base,
37         update_op: update_op,
38     }
39 }
40 
41 impl<I, F> ParallelIterator for Update<I, F>
42     where I: ParallelIterator,
43           F: Fn(&mut I::Item) + Send + Sync,
44 {
45     type Item = I::Item;
46 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>47     fn drive_unindexed<C>(self, consumer: C) -> C::Result
48         where C: UnindexedConsumer<Self::Item>
49     {
50         let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
51         self.base.drive_unindexed(consumer1)
52     }
53 
opt_len(&self) -> Option<usize>54     fn opt_len(&self) -> Option<usize> {
55         self.base.opt_len()
56     }
57 }
58 
59 impl<I, F> IndexedParallelIterator for Update<I, F>
60     where I: IndexedParallelIterator,
61           F: Fn(&mut I::Item) + Send + Sync,
62 {
drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>63     fn drive<C>(self, consumer: C) -> C::Result
64         where C: Consumer<Self::Item>
65     {
66         let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
67         self.base.drive(consumer1)
68     }
69 
len(&self) -> usize70     fn len(&self) -> usize {
71         self.base.len()
72     }
73 
with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>74     fn with_producer<CB>(self, callback: CB) -> CB::Output
75         where CB: ProducerCallback<Self::Item>
76     {
77         return self.base.with_producer(Callback {
78                                            callback: callback,
79                                            update_op: self.update_op,
80                                        });
81 
82         struct Callback<CB, F> {
83             callback: CB,
84             update_op: F,
85         }
86 
87         impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
88             where CB: ProducerCallback<T>,
89                   F: Fn(&mut T) + Send + Sync,
90         {
91             type Output = CB::Output;
92 
93             fn callback<P>(self, base: P) -> CB::Output
94                 where P: Producer<Item = T>
95             {
96                 let producer = UpdateProducer {
97                     base: base,
98                     update_op: &self.update_op,
99                 };
100                 self.callback.callback(producer)
101             }
102         }
103     }
104 }
105 
106 /// ////////////////////////////////////////////////////////////////////////
107 
108 struct UpdateProducer<'f, P, F: 'f> {
109     base: P,
110     update_op: &'f F,
111 }
112 
113 impl<'f, P, F> Producer for UpdateProducer<'f, P, F>
114     where P: Producer,
115           F: Fn(&mut P::Item) + Send + Sync,
116 {
117     type Item = P::Item;
118     type IntoIter = UpdateSeq<P::IntoIter, &'f F>;
119 
into_iter(self) -> Self::IntoIter120     fn into_iter(self) -> Self::IntoIter {
121         UpdateSeq {
122             base: self.base.into_iter(),
123             update_op: self.update_op,
124         }
125     }
126 
min_len(&self) -> usize127     fn min_len(&self) -> usize {
128         self.base.min_len()
129     }
max_len(&self) -> usize130     fn max_len(&self) -> usize {
131         self.base.max_len()
132     }
133 
split_at(self, index: usize) -> (Self, Self)134     fn split_at(self, index: usize) -> (Self, Self) {
135         let (left, right) = self.base.split_at(index);
136         (UpdateProducer {
137              base: left,
138              update_op: self.update_op,
139          },
140          UpdateProducer {
141              base: right,
142              update_op: self.update_op,
143          })
144     }
145 
fold_with<G>(self, folder: G) -> G where G: Folder<Self::Item>146     fn fold_with<G>(self, folder: G) -> G
147         where G: Folder<Self::Item>
148     {
149         let folder1 = UpdateFolder { base: folder, update_op: self.update_op, };
150         self.base.fold_with(folder1).base
151     }
152 }
153 
154 
155 /// ////////////////////////////////////////////////////////////////////////
156 /// Consumer implementation
157 
158 struct UpdateConsumer<'f, C, F: 'f> {
159     base: C,
160     update_op: &'f F,
161 }
162 
163 impl<'f, C, F> UpdateConsumer<'f, C, F> {
new(base: C, update_op: &'f F) -> Self164     fn new(base: C, update_op: &'f F) -> Self {
165         UpdateConsumer {
166             base: base,
167             update_op: update_op,
168         }
169     }
170 }
171 
172 impl<'f, T, C, F> Consumer<T> for UpdateConsumer<'f, C, F>
173     where C: Consumer<T>,
174           F: Fn(&mut T) + Send + Sync,
175 {
176     type Folder = UpdateFolder<'f, C::Folder, F>;
177     type Reducer = C::Reducer;
178     type Result = C::Result;
179 
split_at(self, index: usize) -> (Self, Self, Self::Reducer)180     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
181         let (left, right, reducer) = self.base.split_at(index);
182         (UpdateConsumer::new(left, self.update_op), UpdateConsumer::new(right, self.update_op), reducer)
183     }
184 
into_folder(self) -> Self::Folder185     fn into_folder(self) -> Self::Folder {
186         UpdateFolder {
187             base: self.base.into_folder(),
188             update_op: self.update_op,
189         }
190     }
191 
full(&self) -> bool192     fn full(&self) -> bool {
193         self.base.full()
194     }
195 }
196 
197 impl<'f, T, C, F> UnindexedConsumer<T> for UpdateConsumer<'f, C, F>
198     where C: UnindexedConsumer<T>,
199           F: Fn(&mut T) + Send + Sync,
200 {
split_off_left(&self) -> Self201     fn split_off_left(&self) -> Self {
202         UpdateConsumer::new(self.base.split_off_left(), &self.update_op)
203     }
204 
to_reducer(&self) -> Self::Reducer205     fn to_reducer(&self) -> Self::Reducer {
206         self.base.to_reducer()
207     }
208 }
209 
210 struct UpdateFolder<'f, C, F: 'f> {
211     base: C,
212     update_op: &'f F,
213 }
214 
215 impl<'f, T, C, F> Folder<T> for UpdateFolder<'f, C, F>
216     where C: Folder<T>,
217           F: Fn(& mut T)
218 {
219     type Result = C::Result;
220 
consume(self, mut item: T) -> Self221     fn consume(self, mut item: T) -> Self {
222         (self.update_op)(&mut item);
223 
224         UpdateFolder {
225             base: self.base.consume(item),
226             update_op: self.update_op,
227         }
228     }
229 
complete(self) -> C::Result230     fn complete(self) -> C::Result {
231         self.base.complete()
232     }
233 
full(&self) -> bool234     fn full(&self) -> bool {
235         self.base.full()
236     }
237 }
238 
239 /// Standard Update adaptor, based on `itertools::adaptors::Update`
240 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
241 #[derive(Debug, Clone)]
242 struct UpdateSeq<I, F> {
243     base: I,
244     update_op: F,
245 }
246 
247 impl<I, F> Iterator for UpdateSeq<I, F>
248 where
249     I: Iterator,
250     F: FnMut(&mut I::Item),
251 {
252     type Item = I::Item;
253 
next(&mut self) -> Option<Self::Item>254     fn next(&mut self) -> Option<Self::Item> {
255         if let Some(mut v) = self.base.next() {
256             (self.update_op)(&mut v);
257             Some(v)
258         } else {
259             None
260         }
261     }
262 
size_hint(&self) -> (usize, Option<usize>)263     fn size_hint(&self) -> (usize, Option<usize>) {
264         self.base.size_hint()
265     }
266 
fold<Acc, G>(self, init: Acc, mut g: G) -> Acc where G: FnMut(Acc, Self::Item) -> Acc,267     fn fold<Acc, G>(self, init: Acc, mut g: G) -> Acc
268         where G: FnMut(Acc, Self::Item) -> Acc,
269     {
270         let mut f = self.update_op;
271         self.base.fold(init, move |acc, mut v| { f(&mut v); g(acc, v) })
272     }
273 
274     // if possible, re-use inner iterator specializations in collect
collect<C>(self) -> C where C: ::std::iter::FromIterator<Self::Item>275     fn collect<C>(self) -> C
276         where C: ::std::iter::FromIterator<Self::Item>
277     {
278         let mut f = self.update_op;
279         self.base.map(move |mut v| { f(&mut v); v }).collect()
280     }
281 }
282 
283 impl<I, F> ExactSizeIterator for UpdateSeq<I, F>
284 where
285     I: ExactSizeIterator,
286     F: FnMut(&mut I::Item),
287 {}
288 
289 impl<I, F> DoubleEndedIterator for UpdateSeq<I, F>
290 where
291     I: DoubleEndedIterator,
292     F: FnMut(&mut I::Item),
293 {
next_back(&mut self) -> Option<Self::Item>294     fn next_back(&mut self) -> Option<Self::Item> {
295         if let Some(mut v) = self.base.next_back() {
296             (self.update_op)(&mut v);
297             Some(v)
298         } else {
299             None
300         }
301     }
302 }
303