1 use super::plumbing::*; 2 use super::*; 3 4 /// `Flatten` turns each element to a parallel iterator, then flattens these iterators 5 /// together. This struct is created by the [`flatten()`] method on [`ParallelIterator`]. 6 /// 7 /// [`flatten()`]: trait.ParallelIterator.html#method.flatten 8 /// [`ParallelIterator`]: trait.ParallelIterator.html 9 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] 10 #[derive(Debug, Clone)] 11 pub struct Flatten<I: ParallelIterator> { 12 base: I, 13 } 14 15 impl<I> Flatten<I> 16 where 17 I: ParallelIterator, 18 I::Item: IntoParallelIterator, 19 { 20 /// Creates a new `Flatten` iterator. new(base: I) -> Self21 pub(super) fn new(base: I) -> Self { 22 Flatten { base } 23 } 24 } 25 26 impl<I> ParallelIterator for Flatten<I> 27 where 28 I: ParallelIterator, 29 I::Item: IntoParallelIterator, 30 { 31 type Item = <I::Item as IntoParallelIterator>::Item; 32 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,33 fn drive_unindexed<C>(self, consumer: C) -> C::Result 34 where 35 C: UnindexedConsumer<Self::Item>, 36 { 37 let consumer = FlattenConsumer::new(consumer); 38 self.base.drive_unindexed(consumer) 39 } 40 } 41 42 /// //////////////////////////////////////////////////////////////////////// 43 /// Consumer implementation 44 45 struct FlattenConsumer<C> { 46 base: C, 47 } 48 49 impl<C> FlattenConsumer<C> { new(base: C) -> Self50 fn new(base: C) -> Self { 51 FlattenConsumer { base } 52 } 53 } 54 55 impl<T, C> Consumer<T> for FlattenConsumer<C> 56 where 57 C: UnindexedConsumer<T::Item>, 58 T: IntoParallelIterator, 59 { 60 type Folder = FlattenFolder<C, C::Result>; 61 type Reducer = C::Reducer; 62 type Result = C::Result; 63 split_at(self, index: usize) -> (Self, Self, C::Reducer)64 fn split_at(self, index: usize) -> (Self, Self, C::Reducer) { 65 let (left, right, reducer) = self.base.split_at(index); 66 ( 67 FlattenConsumer::new(left), 68 FlattenConsumer::new(right), 69 reducer, 70 ) 71 } 72 into_folder(self) -> Self::Folder73 fn into_folder(self) -> Self::Folder { 74 FlattenFolder { 75 base: self.base, 76 previous: None, 77 } 78 } 79 full(&self) -> bool80 fn full(&self) -> bool { 81 self.base.full() 82 } 83 } 84 85 impl<T, C> UnindexedConsumer<T> for FlattenConsumer<C> 86 where 87 C: UnindexedConsumer<T::Item>, 88 T: IntoParallelIterator, 89 { split_off_left(&self) -> Self90 fn split_off_left(&self) -> Self { 91 FlattenConsumer::new(self.base.split_off_left()) 92 } 93 to_reducer(&self) -> Self::Reducer94 fn to_reducer(&self) -> Self::Reducer { 95 self.base.to_reducer() 96 } 97 } 98 99 struct FlattenFolder<C, R> { 100 base: C, 101 previous: Option<R>, 102 } 103 104 impl<T, C> Folder<T> for FlattenFolder<C, C::Result> 105 where 106 C: UnindexedConsumer<T::Item>, 107 T: IntoParallelIterator, 108 { 109 type Result = C::Result; 110 consume(self, item: T) -> Self111 fn consume(self, item: T) -> Self { 112 let par_iter = item.into_par_iter(); 113 let consumer = self.base.split_off_left(); 114 let result = par_iter.drive_unindexed(consumer); 115 116 let previous = match self.previous { 117 None => Some(result), 118 Some(previous) => { 119 let reducer = self.base.to_reducer(); 120 Some(reducer.reduce(previous, result)) 121 } 122 }; 123 124 FlattenFolder { 125 base: self.base, 126 previous, 127 } 128 } 129 complete(self) -> Self::Result130 fn complete(self) -> Self::Result { 131 match self.previous { 132 Some(previous) => previous, 133 None => self.base.into_folder().complete(), 134 } 135 } 136 full(&self) -> bool137 fn full(&self) -> bool { 138 self.base.full() 139 } 140 } 141