1 use super::plumbing::*;
2 use super::*;
3
4 use std::fmt::{self, Debug};
5
6
7 /// `Update` is an iterator that mutates the elements of an
8 /// underlying iterator before they are yielded.
9 ///
10 /// This struct is created by the [`update()`] method on [`ParallelIterator`]
11 ///
12 /// [`update()`]: trait.ParallelIterator.html#method.update
13 /// [`ParallelIterator`]: trait.ParallelIterator.html
14 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
15 #[derive(Clone)]
16 pub struct Update<I: ParallelIterator, F> {
17 base: I,
18 update_op: F,
19 }
20
21 impl<I: ParallelIterator + Debug, F> Debug for Update<I, F> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result22 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
23 f.debug_struct("Update")
24 .field("base", &self.base)
25 .finish()
26 }
27 }
28
29 /// Create a new `Update` iterator.
30 ///
31 /// NB: a free fn because it is NOT part of the end-user API.
new<I, F>(base: I, update_op: F) -> Update<I, F> where I: ParallelIterator32 pub fn new<I, F>(base: I, update_op: F) -> Update<I, F>
33 where I: ParallelIterator
34 {
35 Update {
36 base: base,
37 update_op: update_op,
38 }
39 }
40
41 impl<I, F> ParallelIterator for Update<I, F>
42 where I: ParallelIterator,
43 F: Fn(&mut I::Item) + Send + Sync,
44 {
45 type Item = I::Item;
46
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>47 fn drive_unindexed<C>(self, consumer: C) -> C::Result
48 where C: UnindexedConsumer<Self::Item>
49 {
50 let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
51 self.base.drive_unindexed(consumer1)
52 }
53
opt_len(&self) -> Option<usize>54 fn opt_len(&self) -> Option<usize> {
55 self.base.opt_len()
56 }
57 }
58
59 impl<I, F> IndexedParallelIterator for Update<I, F>
60 where I: IndexedParallelIterator,
61 F: Fn(&mut I::Item) + Send + Sync,
62 {
drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>63 fn drive<C>(self, consumer: C) -> C::Result
64 where C: Consumer<Self::Item>
65 {
66 let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
67 self.base.drive(consumer1)
68 }
69
len(&self) -> usize70 fn len(&self) -> usize {
71 self.base.len()
72 }
73
with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>74 fn with_producer<CB>(self, callback: CB) -> CB::Output
75 where CB: ProducerCallback<Self::Item>
76 {
77 return self.base.with_producer(Callback {
78 callback: callback,
79 update_op: self.update_op,
80 });
81
82 struct Callback<CB, F> {
83 callback: CB,
84 update_op: F,
85 }
86
87 impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
88 where CB: ProducerCallback<T>,
89 F: Fn(&mut T) + Send + Sync,
90 {
91 type Output = CB::Output;
92
93 fn callback<P>(self, base: P) -> CB::Output
94 where P: Producer<Item = T>
95 {
96 let producer = UpdateProducer {
97 base: base,
98 update_op: &self.update_op,
99 };
100 self.callback.callback(producer)
101 }
102 }
103 }
104 }
105
106 /// ////////////////////////////////////////////////////////////////////////
107
108 struct UpdateProducer<'f, P, F: 'f> {
109 base: P,
110 update_op: &'f F,
111 }
112
113 impl<'f, P, F> Producer for UpdateProducer<'f, P, F>
114 where P: Producer,
115 F: Fn(&mut P::Item) + Send + Sync,
116 {
117 type Item = P::Item;
118 type IntoIter = UpdateSeq<P::IntoIter, &'f F>;
119
into_iter(self) -> Self::IntoIter120 fn into_iter(self) -> Self::IntoIter {
121 UpdateSeq {
122 base: self.base.into_iter(),
123 update_op: self.update_op,
124 }
125 }
126
min_len(&self) -> usize127 fn min_len(&self) -> usize {
128 self.base.min_len()
129 }
max_len(&self) -> usize130 fn max_len(&self) -> usize {
131 self.base.max_len()
132 }
133
split_at(self, index: usize) -> (Self, Self)134 fn split_at(self, index: usize) -> (Self, Self) {
135 let (left, right) = self.base.split_at(index);
136 (UpdateProducer {
137 base: left,
138 update_op: self.update_op,
139 },
140 UpdateProducer {
141 base: right,
142 update_op: self.update_op,
143 })
144 }
145
fold_with<G>(self, folder: G) -> G where G: Folder<Self::Item>146 fn fold_with<G>(self, folder: G) -> G
147 where G: Folder<Self::Item>
148 {
149 let folder1 = UpdateFolder { base: folder, update_op: self.update_op, };
150 self.base.fold_with(folder1).base
151 }
152 }
153
154
155 /// ////////////////////////////////////////////////////////////////////////
156 /// Consumer implementation
157
158 struct UpdateConsumer<'f, C, F: 'f> {
159 base: C,
160 update_op: &'f F,
161 }
162
163 impl<'f, C, F> UpdateConsumer<'f, C, F> {
new(base: C, update_op: &'f F) -> Self164 fn new(base: C, update_op: &'f F) -> Self {
165 UpdateConsumer {
166 base: base,
167 update_op: update_op,
168 }
169 }
170 }
171
172 impl<'f, T, C, F> Consumer<T> for UpdateConsumer<'f, C, F>
173 where C: Consumer<T>,
174 F: Fn(&mut T) + Send + Sync,
175 {
176 type Folder = UpdateFolder<'f, C::Folder, F>;
177 type Reducer = C::Reducer;
178 type Result = C::Result;
179
split_at(self, index: usize) -> (Self, Self, Self::Reducer)180 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
181 let (left, right, reducer) = self.base.split_at(index);
182 (UpdateConsumer::new(left, self.update_op), UpdateConsumer::new(right, self.update_op), reducer)
183 }
184
into_folder(self) -> Self::Folder185 fn into_folder(self) -> Self::Folder {
186 UpdateFolder {
187 base: self.base.into_folder(),
188 update_op: self.update_op,
189 }
190 }
191
full(&self) -> bool192 fn full(&self) -> bool {
193 self.base.full()
194 }
195 }
196
197 impl<'f, T, C, F> UnindexedConsumer<T> for UpdateConsumer<'f, C, F>
198 where C: UnindexedConsumer<T>,
199 F: Fn(&mut T) + Send + Sync,
200 {
split_off_left(&self) -> Self201 fn split_off_left(&self) -> Self {
202 UpdateConsumer::new(self.base.split_off_left(), &self.update_op)
203 }
204
to_reducer(&self) -> Self::Reducer205 fn to_reducer(&self) -> Self::Reducer {
206 self.base.to_reducer()
207 }
208 }
209
210 struct UpdateFolder<'f, C, F: 'f> {
211 base: C,
212 update_op: &'f F,
213 }
214
215 impl<'f, T, C, F> Folder<T> for UpdateFolder<'f, C, F>
216 where C: Folder<T>,
217 F: Fn(& mut T)
218 {
219 type Result = C::Result;
220
consume(self, mut item: T) -> Self221 fn consume(self, mut item: T) -> Self {
222 (self.update_op)(&mut item);
223
224 UpdateFolder {
225 base: self.base.consume(item),
226 update_op: self.update_op,
227 }
228 }
229
complete(self) -> C::Result230 fn complete(self) -> C::Result {
231 self.base.complete()
232 }
233
full(&self) -> bool234 fn full(&self) -> bool {
235 self.base.full()
236 }
237 }
238
239 /// Standard Update adaptor, based on `itertools::adaptors::Update`
240 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
241 #[derive(Debug, Clone)]
242 struct UpdateSeq<I, F> {
243 base: I,
244 update_op: F,
245 }
246
247 impl<I, F> Iterator for UpdateSeq<I, F>
248 where
249 I: Iterator,
250 F: FnMut(&mut I::Item),
251 {
252 type Item = I::Item;
253
next(&mut self) -> Option<Self::Item>254 fn next(&mut self) -> Option<Self::Item> {
255 if let Some(mut v) = self.base.next() {
256 (self.update_op)(&mut v);
257 Some(v)
258 } else {
259 None
260 }
261 }
262
size_hint(&self) -> (usize, Option<usize>)263 fn size_hint(&self) -> (usize, Option<usize>) {
264 self.base.size_hint()
265 }
266
fold<Acc, G>(self, init: Acc, mut g: G) -> Acc where G: FnMut(Acc, Self::Item) -> Acc,267 fn fold<Acc, G>(self, init: Acc, mut g: G) -> Acc
268 where G: FnMut(Acc, Self::Item) -> Acc,
269 {
270 let mut f = self.update_op;
271 self.base.fold(init, move |acc, mut v| { f(&mut v); g(acc, v) })
272 }
273
274 // if possible, re-use inner iterator specializations in collect
collect<C>(self) -> C where C: ::std::iter::FromIterator<Self::Item>275 fn collect<C>(self) -> C
276 where C: ::std::iter::FromIterator<Self::Item>
277 {
278 let mut f = self.update_op;
279 self.base.map(move |mut v| { f(&mut v); v }).collect()
280 }
281 }
282
283 impl<I, F> ExactSizeIterator for UpdateSeq<I, F>
284 where
285 I: ExactSizeIterator,
286 F: FnMut(&mut I::Item),
287 {}
288
289 impl<I, F> DoubleEndedIterator for UpdateSeq<I, F>
290 where
291 I: DoubleEndedIterator,
292 F: FnMut(&mut I::Item),
293 {
next_back(&mut self) -> Option<Self::Item>294 fn next_back(&mut self) -> Option<Self::Item> {
295 if let Some(mut v) = self.base.next_back() {
296 (self.update_op)(&mut v);
297 Some(v)
298 } else {
299 None
300 }
301 }
302 }
303