1 use super::plumbing::*;
2 use super::*;
3
4 use std::fmt::{self, Debug};
5
6 /// `Update` is an iterator that mutates the elements of an
7 /// underlying iterator before they are yielded.
8 ///
9 /// This struct is created by the [`update()`] method on [`ParallelIterator`]
10 ///
11 /// [`update()`]: trait.ParallelIterator.html#method.update
12 /// [`ParallelIterator`]: trait.ParallelIterator.html
13 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14 #[derive(Clone)]
15 pub struct Update<I: ParallelIterator, F> {
16 base: I,
17 update_op: F,
18 }
19
20 impl<I: ParallelIterator + Debug, F> Debug for Update<I, F> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result21 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22 f.debug_struct("Update").field("base", &self.base).finish()
23 }
24 }
25
26 impl<I, F> Update<I, F>
27 where
28 I: ParallelIterator,
29 {
30 /// Creates a new `Update` iterator.
new(base: I, update_op: F) -> Self31 pub(super) fn new(base: I, update_op: F) -> Self {
32 Update { base, update_op }
33 }
34 }
35
36 impl<I, F> ParallelIterator for Update<I, F>
37 where
38 I: ParallelIterator,
39 F: Fn(&mut I::Item) + Send + Sync,
40 {
41 type Item = I::Item;
42
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,43 fn drive_unindexed<C>(self, consumer: C) -> C::Result
44 where
45 C: UnindexedConsumer<Self::Item>,
46 {
47 let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
48 self.base.drive_unindexed(consumer1)
49 }
50
opt_len(&self) -> Option<usize>51 fn opt_len(&self) -> Option<usize> {
52 self.base.opt_len()
53 }
54 }
55
56 impl<I, F> IndexedParallelIterator for Update<I, F>
57 where
58 I: IndexedParallelIterator,
59 F: Fn(&mut I::Item) + Send + Sync,
60 {
drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>,61 fn drive<C>(self, consumer: C) -> C::Result
62 where
63 C: Consumer<Self::Item>,
64 {
65 let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
66 self.base.drive(consumer1)
67 }
68
len(&self) -> usize69 fn len(&self) -> usize {
70 self.base.len()
71 }
72
with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>,73 fn with_producer<CB>(self, callback: CB) -> CB::Output
74 where
75 CB: ProducerCallback<Self::Item>,
76 {
77 return self.base.with_producer(Callback {
78 callback,
79 update_op: self.update_op,
80 });
81
82 struct Callback<CB, F> {
83 callback: CB,
84 update_op: F,
85 }
86
87 impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
88 where
89 CB: ProducerCallback<T>,
90 F: Fn(&mut T) + Send + Sync,
91 {
92 type Output = CB::Output;
93
94 fn callback<P>(self, base: P) -> CB::Output
95 where
96 P: Producer<Item = T>,
97 {
98 let producer = UpdateProducer {
99 base,
100 update_op: &self.update_op,
101 };
102 self.callback.callback(producer)
103 }
104 }
105 }
106 }
107
108 /// ////////////////////////////////////////////////////////////////////////
109
110 struct UpdateProducer<'f, P, F> {
111 base: P,
112 update_op: &'f F,
113 }
114
115 impl<'f, P, F> Producer for UpdateProducer<'f, P, F>
116 where
117 P: Producer,
118 F: Fn(&mut P::Item) + Send + Sync,
119 {
120 type Item = P::Item;
121 type IntoIter = UpdateSeq<P::IntoIter, &'f F>;
122
into_iter(self) -> Self::IntoIter123 fn into_iter(self) -> Self::IntoIter {
124 UpdateSeq {
125 base: self.base.into_iter(),
126 update_op: self.update_op,
127 }
128 }
129
min_len(&self) -> usize130 fn min_len(&self) -> usize {
131 self.base.min_len()
132 }
max_len(&self) -> usize133 fn max_len(&self) -> usize {
134 self.base.max_len()
135 }
136
split_at(self, index: usize) -> (Self, Self)137 fn split_at(self, index: usize) -> (Self, Self) {
138 let (left, right) = self.base.split_at(index);
139 (
140 UpdateProducer {
141 base: left,
142 update_op: self.update_op,
143 },
144 UpdateProducer {
145 base: right,
146 update_op: self.update_op,
147 },
148 )
149 }
150
fold_with<G>(self, folder: G) -> G where G: Folder<Self::Item>,151 fn fold_with<G>(self, folder: G) -> G
152 where
153 G: Folder<Self::Item>,
154 {
155 let folder1 = UpdateFolder {
156 base: folder,
157 update_op: self.update_op,
158 };
159 self.base.fold_with(folder1).base
160 }
161 }
162
163 /// ////////////////////////////////////////////////////////////////////////
164 /// Consumer implementation
165
166 struct UpdateConsumer<'f, C, F> {
167 base: C,
168 update_op: &'f F,
169 }
170
171 impl<'f, C, F> UpdateConsumer<'f, C, F> {
new(base: C, update_op: &'f F) -> Self172 fn new(base: C, update_op: &'f F) -> Self {
173 UpdateConsumer { base, update_op }
174 }
175 }
176
177 impl<'f, T, C, F> Consumer<T> for UpdateConsumer<'f, C, F>
178 where
179 C: Consumer<T>,
180 F: Fn(&mut T) + Send + Sync,
181 {
182 type Folder = UpdateFolder<'f, C::Folder, F>;
183 type Reducer = C::Reducer;
184 type Result = C::Result;
185
split_at(self, index: usize) -> (Self, Self, Self::Reducer)186 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
187 let (left, right, reducer) = self.base.split_at(index);
188 (
189 UpdateConsumer::new(left, self.update_op),
190 UpdateConsumer::new(right, self.update_op),
191 reducer,
192 )
193 }
194
into_folder(self) -> Self::Folder195 fn into_folder(self) -> Self::Folder {
196 UpdateFolder {
197 base: self.base.into_folder(),
198 update_op: self.update_op,
199 }
200 }
201
full(&self) -> bool202 fn full(&self) -> bool {
203 self.base.full()
204 }
205 }
206
207 impl<'f, T, C, F> UnindexedConsumer<T> for UpdateConsumer<'f, C, F>
208 where
209 C: UnindexedConsumer<T>,
210 F: Fn(&mut T) + Send + Sync,
211 {
split_off_left(&self) -> Self212 fn split_off_left(&self) -> Self {
213 UpdateConsumer::new(self.base.split_off_left(), &self.update_op)
214 }
215
to_reducer(&self) -> Self::Reducer216 fn to_reducer(&self) -> Self::Reducer {
217 self.base.to_reducer()
218 }
219 }
220
221 struct UpdateFolder<'f, C, F> {
222 base: C,
223 update_op: &'f F,
224 }
225
apply<T>(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T226 fn apply<T>(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T {
227 move |mut item| {
228 update_op(&mut item);
229 item
230 }
231 }
232
233 impl<'f, T, C, F> Folder<T> for UpdateFolder<'f, C, F>
234 where
235 C: Folder<T>,
236 F: Fn(&mut T),
237 {
238 type Result = C::Result;
239
consume(self, mut item: T) -> Self240 fn consume(self, mut item: T) -> Self {
241 (self.update_op)(&mut item);
242
243 UpdateFolder {
244 base: self.base.consume(item),
245 update_op: self.update_op,
246 }
247 }
248
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,249 fn consume_iter<I>(mut self, iter: I) -> Self
250 where
251 I: IntoIterator<Item = T>,
252 {
253 let update_op = self.update_op;
254 self.base = self
255 .base
256 .consume_iter(iter.into_iter().map(apply(update_op)));
257 self
258 }
259
complete(self) -> C::Result260 fn complete(self) -> C::Result {
261 self.base.complete()
262 }
263
full(&self) -> bool264 fn full(&self) -> bool {
265 self.base.full()
266 }
267 }
268
269 /// Standard Update adaptor, based on `itertools::adaptors::Update`
270 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
271 #[derive(Debug, Clone)]
272 struct UpdateSeq<I, F> {
273 base: I,
274 update_op: F,
275 }
276
277 impl<I, F> Iterator for UpdateSeq<I, F>
278 where
279 I: Iterator,
280 F: Fn(&mut I::Item),
281 {
282 type Item = I::Item;
283
next(&mut self) -> Option<Self::Item>284 fn next(&mut self) -> Option<Self::Item> {
285 let mut v = self.base.next()?;
286 (self.update_op)(&mut v);
287 Some(v)
288 }
289
size_hint(&self) -> (usize, Option<usize>)290 fn size_hint(&self) -> (usize, Option<usize>) {
291 self.base.size_hint()
292 }
293
fold<Acc, G>(self, init: Acc, g: G) -> Acc where G: FnMut(Acc, Self::Item) -> Acc,294 fn fold<Acc, G>(self, init: Acc, g: G) -> Acc
295 where
296 G: FnMut(Acc, Self::Item) -> Acc,
297 {
298 self.base.map(apply(self.update_op)).fold(init, g)
299 }
300
301 // if possible, re-use inner iterator specializations in collect
collect<C>(self) -> C where C: ::std::iter::FromIterator<Self::Item>,302 fn collect<C>(self) -> C
303 where
304 C: ::std::iter::FromIterator<Self::Item>,
305 {
306 self.base.map(apply(self.update_op)).collect()
307 }
308 }
309
310 impl<I, F> ExactSizeIterator for UpdateSeq<I, F>
311 where
312 I: ExactSizeIterator,
313 F: Fn(&mut I::Item),
314 {
315 }
316
317 impl<I, F> DoubleEndedIterator for UpdateSeq<I, F>
318 where
319 I: DoubleEndedIterator,
320 F: Fn(&mut I::Item),
321 {
next_back(&mut self) -> Option<Self::Item>322 fn next_back(&mut self) -> Option<Self::Item> {
323 let mut v = self.base.next_back()?;
324 (self.update_op)(&mut v);
325 Some(v)
326 }
327 }
328