1 use super::plumbing::*;
2 use super::*;
3 
4 use std::fmt::{self, Debug};
5 
6 /// `Update` is an iterator that mutates the elements of an
7 /// underlying iterator before they are yielded.
8 ///
9 /// This struct is created by the [`update()`] method on [`ParallelIterator`]
10 ///
11 /// [`update()`]: trait.ParallelIterator.html#method.update
12 /// [`ParallelIterator`]: trait.ParallelIterator.html
13 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14 #[derive(Clone)]
15 pub struct Update<I: ParallelIterator, F> {
16     base: I,
17     update_op: F,
18 }
19 
20 impl<I: ParallelIterator + Debug, F> Debug for Update<I, F> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result21     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22         f.debug_struct("Update").field("base", &self.base).finish()
23     }
24 }
25 
26 impl<I, F> Update<I, F>
27 where
28     I: ParallelIterator,
29 {
30     /// Creates a new `Update` iterator.
new(base: I, update_op: F) -> Self31     pub(super) fn new(base: I, update_op: F) -> Self {
32         Update { base, update_op }
33     }
34 }
35 
36 impl<I, F> ParallelIterator for Update<I, F>
37 where
38     I: ParallelIterator,
39     F: Fn(&mut I::Item) + Send + Sync,
40 {
41     type Item = I::Item;
42 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,43     fn drive_unindexed<C>(self, consumer: C) -> C::Result
44     where
45         C: UnindexedConsumer<Self::Item>,
46     {
47         let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
48         self.base.drive_unindexed(consumer1)
49     }
50 
opt_len(&self) -> Option<usize>51     fn opt_len(&self) -> Option<usize> {
52         self.base.opt_len()
53     }
54 }
55 
56 impl<I, F> IndexedParallelIterator for Update<I, F>
57 where
58     I: IndexedParallelIterator,
59     F: Fn(&mut I::Item) + Send + Sync,
60 {
drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>,61     fn drive<C>(self, consumer: C) -> C::Result
62     where
63         C: Consumer<Self::Item>,
64     {
65         let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
66         self.base.drive(consumer1)
67     }
68 
len(&self) -> usize69     fn len(&self) -> usize {
70         self.base.len()
71     }
72 
with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>,73     fn with_producer<CB>(self, callback: CB) -> CB::Output
74     where
75         CB: ProducerCallback<Self::Item>,
76     {
77         return self.base.with_producer(Callback {
78             callback,
79             update_op: self.update_op,
80         });
81 
82         struct Callback<CB, F> {
83             callback: CB,
84             update_op: F,
85         }
86 
87         impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
88         where
89             CB: ProducerCallback<T>,
90             F: Fn(&mut T) + Send + Sync,
91         {
92             type Output = CB::Output;
93 
94             fn callback<P>(self, base: P) -> CB::Output
95             where
96                 P: Producer<Item = T>,
97             {
98                 let producer = UpdateProducer {
99                     base,
100                     update_op: &self.update_op,
101                 };
102                 self.callback.callback(producer)
103             }
104         }
105     }
106 }
107 
108 /// ////////////////////////////////////////////////////////////////////////
109 
110 struct UpdateProducer<'f, P, F> {
111     base: P,
112     update_op: &'f F,
113 }
114 
115 impl<'f, P, F> Producer for UpdateProducer<'f, P, F>
116 where
117     P: Producer,
118     F: Fn(&mut P::Item) + Send + Sync,
119 {
120     type Item = P::Item;
121     type IntoIter = UpdateSeq<P::IntoIter, &'f F>;
122 
into_iter(self) -> Self::IntoIter123     fn into_iter(self) -> Self::IntoIter {
124         UpdateSeq {
125             base: self.base.into_iter(),
126             update_op: self.update_op,
127         }
128     }
129 
min_len(&self) -> usize130     fn min_len(&self) -> usize {
131         self.base.min_len()
132     }
max_len(&self) -> usize133     fn max_len(&self) -> usize {
134         self.base.max_len()
135     }
136 
split_at(self, index: usize) -> (Self, Self)137     fn split_at(self, index: usize) -> (Self, Self) {
138         let (left, right) = self.base.split_at(index);
139         (
140             UpdateProducer {
141                 base: left,
142                 update_op: self.update_op,
143             },
144             UpdateProducer {
145                 base: right,
146                 update_op: self.update_op,
147             },
148         )
149     }
150 
fold_with<G>(self, folder: G) -> G where G: Folder<Self::Item>,151     fn fold_with<G>(self, folder: G) -> G
152     where
153         G: Folder<Self::Item>,
154     {
155         let folder1 = UpdateFolder {
156             base: folder,
157             update_op: self.update_op,
158         };
159         self.base.fold_with(folder1).base
160     }
161 }
162 
163 /// ////////////////////////////////////////////////////////////////////////
164 /// Consumer implementation
165 
166 struct UpdateConsumer<'f, C, F> {
167     base: C,
168     update_op: &'f F,
169 }
170 
171 impl<'f, C, F> UpdateConsumer<'f, C, F> {
new(base: C, update_op: &'f F) -> Self172     fn new(base: C, update_op: &'f F) -> Self {
173         UpdateConsumer { base, update_op }
174     }
175 }
176 
177 impl<'f, T, C, F> Consumer<T> for UpdateConsumer<'f, C, F>
178 where
179     C: Consumer<T>,
180     F: Fn(&mut T) + Send + Sync,
181 {
182     type Folder = UpdateFolder<'f, C::Folder, F>;
183     type Reducer = C::Reducer;
184     type Result = C::Result;
185 
split_at(self, index: usize) -> (Self, Self, Self::Reducer)186     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
187         let (left, right, reducer) = self.base.split_at(index);
188         (
189             UpdateConsumer::new(left, self.update_op),
190             UpdateConsumer::new(right, self.update_op),
191             reducer,
192         )
193     }
194 
into_folder(self) -> Self::Folder195     fn into_folder(self) -> Self::Folder {
196         UpdateFolder {
197             base: self.base.into_folder(),
198             update_op: self.update_op,
199         }
200     }
201 
full(&self) -> bool202     fn full(&self) -> bool {
203         self.base.full()
204     }
205 }
206 
207 impl<'f, T, C, F> UnindexedConsumer<T> for UpdateConsumer<'f, C, F>
208 where
209     C: UnindexedConsumer<T>,
210     F: Fn(&mut T) + Send + Sync,
211 {
split_off_left(&self) -> Self212     fn split_off_left(&self) -> Self {
213         UpdateConsumer::new(self.base.split_off_left(), &self.update_op)
214     }
215 
to_reducer(&self) -> Self::Reducer216     fn to_reducer(&self) -> Self::Reducer {
217         self.base.to_reducer()
218     }
219 }
220 
221 struct UpdateFolder<'f, C, F> {
222     base: C,
223     update_op: &'f F,
224 }
225 
apply<T>(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T226 fn apply<T>(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T {
227     move |mut item| {
228         update_op(&mut item);
229         item
230     }
231 }
232 
233 impl<'f, T, C, F> Folder<T> for UpdateFolder<'f, C, F>
234 where
235     C: Folder<T>,
236     F: Fn(&mut T),
237 {
238     type Result = C::Result;
239 
consume(self, mut item: T) -> Self240     fn consume(self, mut item: T) -> Self {
241         (self.update_op)(&mut item);
242 
243         UpdateFolder {
244             base: self.base.consume(item),
245             update_op: self.update_op,
246         }
247     }
248 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,249     fn consume_iter<I>(mut self, iter: I) -> Self
250     where
251         I: IntoIterator<Item = T>,
252     {
253         let update_op = self.update_op;
254         self.base = self
255             .base
256             .consume_iter(iter.into_iter().map(apply(update_op)));
257         self
258     }
259 
complete(self) -> C::Result260     fn complete(self) -> C::Result {
261         self.base.complete()
262     }
263 
full(&self) -> bool264     fn full(&self) -> bool {
265         self.base.full()
266     }
267 }
268 
269 /// Standard Update adaptor, based on `itertools::adaptors::Update`
270 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
271 #[derive(Debug, Clone)]
272 struct UpdateSeq<I, F> {
273     base: I,
274     update_op: F,
275 }
276 
277 impl<I, F> Iterator for UpdateSeq<I, F>
278 where
279     I: Iterator,
280     F: Fn(&mut I::Item),
281 {
282     type Item = I::Item;
283 
next(&mut self) -> Option<Self::Item>284     fn next(&mut self) -> Option<Self::Item> {
285         let mut v = self.base.next()?;
286         (self.update_op)(&mut v);
287         Some(v)
288     }
289 
size_hint(&self) -> (usize, Option<usize>)290     fn size_hint(&self) -> (usize, Option<usize>) {
291         self.base.size_hint()
292     }
293 
fold<Acc, G>(self, init: Acc, g: G) -> Acc where G: FnMut(Acc, Self::Item) -> Acc,294     fn fold<Acc, G>(self, init: Acc, g: G) -> Acc
295     where
296         G: FnMut(Acc, Self::Item) -> Acc,
297     {
298         self.base.map(apply(self.update_op)).fold(init, g)
299     }
300 
301     // if possible, re-use inner iterator specializations in collect
collect<C>(self) -> C where C: ::std::iter::FromIterator<Self::Item>,302     fn collect<C>(self) -> C
303     where
304         C: ::std::iter::FromIterator<Self::Item>,
305     {
306         self.base.map(apply(self.update_op)).collect()
307     }
308 }
309 
310 impl<I, F> ExactSizeIterator for UpdateSeq<I, F>
311 where
312     I: ExactSizeIterator,
313     F: Fn(&mut I::Item),
314 {
315 }
316 
317 impl<I, F> DoubleEndedIterator for UpdateSeq<I, F>
318 where
319     I: DoubleEndedIterator,
320     F: Fn(&mut I::Item),
321 {
next_back(&mut self) -> Option<Self::Item>322     fn next_back(&mut self) -> Option<Self::Item> {
323         let mut v = self.base.next_back()?;
324         (self.update_op)(&mut v);
325         Some(v)
326     }
327 }
328