1 use super::plumbing::*;
2 use super::*;
3 
4 /// This trait abstracts the different ways we can "unzip" one parallel
5 /// iterator into two distinct consumers, which we can handle almost
6 /// identically apart from how to process the individual items.
7 trait UnzipOp<T>: Sync + Send {
8     /// The type of item expected by the left consumer.
9     type Left: Send;
10 
11     /// The type of item expected by the right consumer.
12     type Right: Send;
13 
14     /// Consumes one item and feeds it to one or both of the underlying folders.
consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB) where FA: Folder<Self::Left>, FB: Folder<Self::Right>15     fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
16     where
17         FA: Folder<Self::Left>,
18         FB: Folder<Self::Right>;
19 
20     /// Reports whether this op may support indexed consumers.
21     /// - e.g. true for `unzip` where the item count passed through directly.
22     /// - e.g. false for `partition` where the sorting is not yet known.
indexable() -> bool23     fn indexable() -> bool {
24         false
25     }
26 }
27 
28 /// Runs an unzip-like operation into default `ParallelExtend` collections.
execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB) where I: ParallelIterator, OP: UnzipOp<I::Item>, FromA: Default + Send + ParallelExtend<OP::Left>, FromB: Default + Send + ParallelExtend<OP::Right>,29 fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB)
30 where
31     I: ParallelIterator,
32     OP: UnzipOp<I::Item>,
33     FromA: Default + Send + ParallelExtend<OP::Left>,
34     FromB: Default + Send + ParallelExtend<OP::Right>,
35 {
36     let mut a = FromA::default();
37     let mut b = FromB::default();
38     execute_into(&mut a, &mut b, pi, op);
39     (a, b)
40 }
41 
42 /// Runs an unzip-like operation into `ParallelExtend` collections.
execute_into<I, OP, FromA, FromB>(a: &mut FromA, b: &mut FromB, pi: I, op: OP) where I: ParallelIterator, OP: UnzipOp<I::Item>, FromA: Send + ParallelExtend<OP::Left>, FromB: Send + ParallelExtend<OP::Right>,43 fn execute_into<I, OP, FromA, FromB>(a: &mut FromA, b: &mut FromB, pi: I, op: OP)
44 where
45     I: ParallelIterator,
46     OP: UnzipOp<I::Item>,
47     FromA: Send + ParallelExtend<OP::Left>,
48     FromB: Send + ParallelExtend<OP::Right>,
49 {
50     // We have no idea what the consumers will look like for these
51     // collections' `par_extend`, but we can intercept them in our own
52     // `drive_unindexed`.  Start with the left side, type `A`:
53     let iter = UnzipA { base: pi, op, b };
54     a.par_extend(iter);
55 }
56 
57 /// Unzips the items of a parallel iterator into a pair of arbitrary
58 /// `ParallelExtend` containers.
59 ///
60 /// This is called by `ParallelIterator::unzip`.
unzip<I, A, B, FromA, FromB>(pi: I) -> (FromA, FromB) where I: ParallelIterator<Item = (A, B)>, FromA: Default + Send + ParallelExtend<A>, FromB: Default + Send + ParallelExtend<B>, A: Send, B: Send,61 pub(super) fn unzip<I, A, B, FromA, FromB>(pi: I) -> (FromA, FromB)
62 where
63     I: ParallelIterator<Item = (A, B)>,
64     FromA: Default + Send + ParallelExtend<A>,
65     FromB: Default + Send + ParallelExtend<B>,
66     A: Send,
67     B: Send,
68 {
69     execute(pi, Unzip)
70 }
71 
72 /// Unzips an `IndexedParallelIterator` into two arbitrary `Consumer`s.
73 ///
74 /// This is called by `super::collect::unzip_into_vecs`.
unzip_indexed<I, A, B, CA, CB>(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result) where I: IndexedParallelIterator<Item = (A, B)>, CA: Consumer<A>, CB: Consumer<B>, A: Send, B: Send,75 pub(super) fn unzip_indexed<I, A, B, CA, CB>(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result)
76 where
77     I: IndexedParallelIterator<Item = (A, B)>,
78     CA: Consumer<A>,
79     CB: Consumer<B>,
80     A: Send,
81     B: Send,
82 {
83     let consumer = UnzipConsumer {
84         op: &Unzip,
85         left,
86         right,
87     };
88     pi.drive(consumer)
89 }
90 
91 /// An `UnzipOp` that splits a tuple directly into the two consumers.
92 struct Unzip;
93 
94 impl<A: Send, B: Send> UnzipOp<(A, B)> for Unzip {
95     type Left = A;
96     type Right = B;
97 
consume<FA, FB>(&self, item: (A, B), left: FA, right: FB) -> (FA, FB) where FA: Folder<A>, FB: Folder<B>,98     fn consume<FA, FB>(&self, item: (A, B), left: FA, right: FB) -> (FA, FB)
99     where
100         FA: Folder<A>,
101         FB: Folder<B>,
102     {
103         (left.consume(item.0), right.consume(item.1))
104     }
105 
indexable() -> bool106     fn indexable() -> bool {
107         true
108     }
109 }
110 
111 /// Partitions the items of a parallel iterator into a pair of arbitrary
112 /// `ParallelExtend` containers.
113 ///
114 /// This is called by `ParallelIterator::partition`.
partition<I, A, B, P>(pi: I, predicate: P) -> (A, B) where I: ParallelIterator, A: Default + Send + ParallelExtend<I::Item>, B: Default + Send + ParallelExtend<I::Item>, P: Fn(&I::Item) -> bool + Sync + Send,115 pub(super) fn partition<I, A, B, P>(pi: I, predicate: P) -> (A, B)
116 where
117     I: ParallelIterator,
118     A: Default + Send + ParallelExtend<I::Item>,
119     B: Default + Send + ParallelExtend<I::Item>,
120     P: Fn(&I::Item) -> bool + Sync + Send,
121 {
122     execute(pi, Partition { predicate })
123 }
124 
125 /// An `UnzipOp` that routes items depending on a predicate function.
126 struct Partition<P> {
127     predicate: P,
128 }
129 
130 impl<P, T> UnzipOp<T> for Partition<P>
131 where
132     P: Fn(&T) -> bool + Sync + Send,
133     T: Send,
134 {
135     type Left = T;
136     type Right = T;
137 
consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB) where FA: Folder<T>, FB: Folder<T>,138     fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
139     where
140         FA: Folder<T>,
141         FB: Folder<T>,
142     {
143         if (self.predicate)(&item) {
144             (left.consume(item), right)
145         } else {
146             (left, right.consume(item))
147         }
148     }
149 }
150 
151 /// Partitions and maps the items of a parallel iterator into a pair of
152 /// arbitrary `ParallelExtend` containers.
153 ///
154 /// This called by `ParallelIterator::partition_map`.
partition_map<I, A, B, P, L, R>(pi: I, predicate: P) -> (A, B) where I: ParallelIterator, A: Default + Send + ParallelExtend<L>, B: Default + Send + ParallelExtend<R>, P: Fn(I::Item) -> Either<L, R> + Sync + Send, L: Send, R: Send,155 pub(super) fn partition_map<I, A, B, P, L, R>(pi: I, predicate: P) -> (A, B)
156 where
157     I: ParallelIterator,
158     A: Default + Send + ParallelExtend<L>,
159     B: Default + Send + ParallelExtend<R>,
160     P: Fn(I::Item) -> Either<L, R> + Sync + Send,
161     L: Send,
162     R: Send,
163 {
164     execute(pi, PartitionMap { predicate })
165 }
166 
167 /// An `UnzipOp` that routes items depending on how they are mapped `Either`.
168 struct PartitionMap<P> {
169     predicate: P,
170 }
171 
172 impl<P, L, R, T> UnzipOp<T> for PartitionMap<P>
173 where
174     P: Fn(T) -> Either<L, R> + Sync + Send,
175     L: Send,
176     R: Send,
177 {
178     type Left = L;
179     type Right = R;
180 
consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB) where FA: Folder<L>, FB: Folder<R>,181     fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
182     where
183         FA: Folder<L>,
184         FB: Folder<R>,
185     {
186         match (self.predicate)(item) {
187             Either::Left(item) => (left.consume(item), right),
188             Either::Right(item) => (left, right.consume(item)),
189         }
190     }
191 }
192 
193 /// A fake iterator to intercept the `Consumer` for type `A`.
194 struct UnzipA<'b, I, OP, FromB> {
195     base: I,
196     op: OP,
197     b: &'b mut FromB,
198 }
199 
200 impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB>
201 where
202     I: ParallelIterator,
203     OP: UnzipOp<I::Item>,
204     FromB: Send + ParallelExtend<OP::Right>,
205 {
206     type Item = OP::Left;
207 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,208     fn drive_unindexed<C>(self, consumer: C) -> C::Result
209     where
210         C: UnindexedConsumer<Self::Item>,
211     {
212         let mut result = None;
213         {
214             // Now it's time to find the consumer for type `B`
215             let iter = UnzipB {
216                 base: self.base,
217                 op: self.op,
218                 left_consumer: consumer,
219                 left_result: &mut result,
220             };
221             self.b.par_extend(iter);
222         }
223         // NB: If for some reason `b.par_extend` doesn't actually drive the
224         // iterator, then we won't have a result for the left side to return
225         // at all.  We can't fake an arbitrary consumer's result, so panic.
226         result.expect("unzip consumers didn't execute!")
227     }
228 
opt_len(&self) -> Option<usize>229     fn opt_len(&self) -> Option<usize> {
230         if OP::indexable() {
231             self.base.opt_len()
232         } else {
233             None
234         }
235     }
236 }
237 
238 /// A fake iterator to intercept the `Consumer` for type `B`.
239 struct UnzipB<'r, I, OP, CA>
240 where
241     I: ParallelIterator,
242     OP: UnzipOp<I::Item>,
243     CA: UnindexedConsumer<OP::Left>,
244     CA::Result: 'r,
245 {
246     base: I,
247     op: OP,
248     left_consumer: CA,
249     left_result: &'r mut Option<CA::Result>,
250 }
251 
252 impl<'r, I, OP, CA> ParallelIterator for UnzipB<'r, I, OP, CA>
253 where
254     I: ParallelIterator,
255     OP: UnzipOp<I::Item>,
256     CA: UnindexedConsumer<OP::Left>,
257 {
258     type Item = OP::Right;
259 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,260     fn drive_unindexed<C>(self, consumer: C) -> C::Result
261     where
262         C: UnindexedConsumer<Self::Item>,
263     {
264         // Now that we have two consumers, we can unzip the real iterator.
265         let consumer = UnzipConsumer {
266             op: &self.op,
267             left: self.left_consumer,
268             right: consumer,
269         };
270 
271         let result = self.base.drive_unindexed(consumer);
272         *self.left_result = Some(result.0);
273         result.1
274     }
275 
opt_len(&self) -> Option<usize>276     fn opt_len(&self) -> Option<usize> {
277         if OP::indexable() {
278             self.base.opt_len()
279         } else {
280             None
281         }
282     }
283 }
284 
285 /// `Consumer` that unzips into two other `Consumer`s
286 struct UnzipConsumer<'a, OP, CA, CB> {
287     op: &'a OP,
288     left: CA,
289     right: CB,
290 }
291 
292 impl<'a, T, OP, CA, CB> Consumer<T> for UnzipConsumer<'a, OP, CA, CB>
293 where
294     OP: UnzipOp<T>,
295     CA: Consumer<OP::Left>,
296     CB: Consumer<OP::Right>,
297 {
298     type Folder = UnzipFolder<'a, OP, CA::Folder, CB::Folder>;
299     type Reducer = UnzipReducer<CA::Reducer, CB::Reducer>;
300     type Result = (CA::Result, CB::Result);
301 
split_at(self, index: usize) -> (Self, Self, Self::Reducer)302     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
303         let (left1, left2, left_reducer) = self.left.split_at(index);
304         let (right1, right2, right_reducer) = self.right.split_at(index);
305 
306         (
307             UnzipConsumer {
308                 op: self.op,
309                 left: left1,
310                 right: right1,
311             },
312             UnzipConsumer {
313                 op: self.op,
314                 left: left2,
315                 right: right2,
316             },
317             UnzipReducer {
318                 left: left_reducer,
319                 right: right_reducer,
320             },
321         )
322     }
323 
into_folder(self) -> Self::Folder324     fn into_folder(self) -> Self::Folder {
325         UnzipFolder {
326             op: self.op,
327             left: self.left.into_folder(),
328             right: self.right.into_folder(),
329         }
330     }
331 
full(&self) -> bool332     fn full(&self) -> bool {
333         // don't stop until everyone is full
334         self.left.full() && self.right.full()
335     }
336 }
337 
338 impl<'a, T, OP, CA, CB> UnindexedConsumer<T> for UnzipConsumer<'a, OP, CA, CB>
339 where
340     OP: UnzipOp<T>,
341     CA: UnindexedConsumer<OP::Left>,
342     CB: UnindexedConsumer<OP::Right>,
343 {
split_off_left(&self) -> Self344     fn split_off_left(&self) -> Self {
345         UnzipConsumer {
346             op: self.op,
347             left: self.left.split_off_left(),
348             right: self.right.split_off_left(),
349         }
350     }
351 
to_reducer(&self) -> Self::Reducer352     fn to_reducer(&self) -> Self::Reducer {
353         UnzipReducer {
354             left: self.left.to_reducer(),
355             right: self.right.to_reducer(),
356         }
357     }
358 }
359 
360 /// `Folder` that unzips into two other `Folder`s
361 struct UnzipFolder<'a, OP, FA, FB> {
362     op: &'a OP,
363     left: FA,
364     right: FB,
365 }
366 
367 impl<'a, T, OP, FA, FB> Folder<T> for UnzipFolder<'a, OP, FA, FB>
368 where
369     OP: UnzipOp<T>,
370     FA: Folder<OP::Left>,
371     FB: Folder<OP::Right>,
372 {
373     type Result = (FA::Result, FB::Result);
374 
consume(self, item: T) -> Self375     fn consume(self, item: T) -> Self {
376         let (left, right) = self.op.consume(item, self.left, self.right);
377         UnzipFolder {
378             op: self.op,
379             left,
380             right,
381         }
382     }
383 
complete(self) -> Self::Result384     fn complete(self) -> Self::Result {
385         (self.left.complete(), self.right.complete())
386     }
387 
full(&self) -> bool388     fn full(&self) -> bool {
389         // don't stop until everyone is full
390         self.left.full() && self.right.full()
391     }
392 }
393 
394 /// `Reducer` that unzips into two other `Reducer`s
395 struct UnzipReducer<RA, RB> {
396     left: RA,
397     right: RB,
398 }
399 
400 impl<A, B, RA, RB> Reducer<(A, B)> for UnzipReducer<RA, RB>
401 where
402     RA: Reducer<A>,
403     RB: Reducer<B>,
404 {
reduce(self, left: (A, B), right: (A, B)) -> (A, B)405     fn reduce(self, left: (A, B), right: (A, B)) -> (A, B) {
406         (
407             self.left.reduce(left.0, right.0),
408             self.right.reduce(left.1, right.1),
409         )
410     }
411 }
412 
413 impl<A, B, FromA, FromB> ParallelExtend<(A, B)> for (FromA, FromB)
414 where
415     A: Send,
416     B: Send,
417     FromA: Send + ParallelExtend<A>,
418     FromB: Send + ParallelExtend<B>,
419 {
par_extend<I>(&mut self, pi: I) where I: IntoParallelIterator<Item = (A, B)>,420     fn par_extend<I>(&mut self, pi: I)
421     where
422         I: IntoParallelIterator<Item = (A, B)>,
423     {
424         execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), Unzip);
425     }
426 }
427 
428 impl<L, R, A, B> ParallelExtend<Either<L, R>> for (A, B)
429 where
430     L: Send,
431     R: Send,
432     A: Send + ParallelExtend<L>,
433     B: Send + ParallelExtend<R>,
434 {
par_extend<I>(&mut self, pi: I) where I: IntoParallelIterator<Item = Either<L, R>>,435     fn par_extend<I>(&mut self, pi: I)
436     where
437         I: IntoParallelIterator<Item = Either<L, R>>,
438     {
439         execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), UnEither);
440     }
441 }
442 
443 /// An `UnzipOp` that routes items depending on their `Either` variant.
444 struct UnEither;
445 
446 impl<L, R> UnzipOp<Either<L, R>> for UnEither
447 where
448     L: Send,
449     R: Send,
450 {
451     type Left = L;
452     type Right = R;
453 
consume<FL, FR>(&self, item: Either<L, R>, left: FL, right: FR) -> (FL, FR) where FL: Folder<L>, FR: Folder<R>,454     fn consume<FL, FR>(&self, item: Either<L, R>, left: FL, right: FR) -> (FL, FR)
455     where
456         FL: Folder<L>,
457         FR: Folder<R>,
458     {
459         match item {
460             Either::Left(item) => (left.consume(item), right),
461             Either::Right(item) => (left, right.consume(item)),
462         }
463     }
464 }
465 
466 impl<A, B, FromA, FromB> FromParallelIterator<(A, B)> for (FromA, FromB)
467 where
468     A: Send,
469     B: Send,
470     FromA: Send + FromParallelIterator<A>,
471     FromB: Send + FromParallelIterator<B>,
472 {
from_par_iter<I>(pi: I) -> Self where I: IntoParallelIterator<Item = (A, B)>,473     fn from_par_iter<I>(pi: I) -> Self
474     where
475         I: IntoParallelIterator<Item = (A, B)>,
476     {
477         let (a, b): (Collector<FromA>, Collector<FromB>) = pi.into_par_iter().unzip();
478         (a.result.unwrap(), b.result.unwrap())
479     }
480 }
481 
482 impl<L, R, A, B> FromParallelIterator<Either<L, R>> for (A, B)
483 where
484     L: Send,
485     R: Send,
486     A: Send + FromParallelIterator<L>,
487     B: Send + FromParallelIterator<R>,
488 {
from_par_iter<I>(pi: I) -> Self where I: IntoParallelIterator<Item = Either<L, R>>,489     fn from_par_iter<I>(pi: I) -> Self
490     where
491         I: IntoParallelIterator<Item = Either<L, R>>,
492     {
493         fn identity<T>(x: T) -> T {
494             x
495         }
496 
497         let (a, b): (Collector<A>, Collector<B>) = pi.into_par_iter().partition_map(identity);
498         (a.result.unwrap(), b.result.unwrap())
499     }
500 }
501 
502 /// Shim to implement a one-time `ParallelExtend` using `FromParallelIterator`.
503 struct Collector<FromT> {
504     result: Option<FromT>,
505 }
506 
507 impl<FromT> Default for Collector<FromT> {
default() -> Self508     fn default() -> Self {
509         Collector { result: None }
510     }
511 }
512 
513 impl<T, FromT> ParallelExtend<T> for Collector<FromT>
514 where
515     T: Send,
516     FromT: Send + FromParallelIterator<T>,
517 {
par_extend<I>(&mut self, pi: I) where I: IntoParallelIterator<Item = T>,518     fn par_extend<I>(&mut self, pi: I)
519     where
520         I: IntoParallelIterator<Item = T>,
521     {
522         debug_assert!(self.result.is_none());
523         self.result = Some(pi.into_par_iter().collect());
524     }
525 }
526