1 use super::plumbing::*; 2 use super::*; 3 use std::sync::atomic::{AtomicBool, Ordering}; 4 5 /// `WhileSome` is an iterator that yields the `Some` elements of an iterator, 6 /// halting as soon as any `None` is produced. 7 /// 8 /// This struct is created by the [`while_some()`] method on [`ParallelIterator`] 9 /// 10 /// [`while_some()`]: trait.ParallelIterator.html#method.while_some 11 /// [`ParallelIterator`]: trait.ParallelIterator.html 12 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] 13 #[derive(Debug, Clone)] 14 pub struct WhileSome<I: ParallelIterator> { 15 base: I, 16 } 17 18 impl<I> WhileSome<I> 19 where 20 I: ParallelIterator, 21 { 22 /// Creates a new `WhileSome` iterator. new(base: I) -> Self23 pub(super) fn new(base: I) -> Self { 24 WhileSome { base } 25 } 26 } 27 28 impl<I, T> ParallelIterator for WhileSome<I> 29 where 30 I: ParallelIterator<Item = Option<T>>, 31 T: Send, 32 { 33 type Item = T; 34 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,35 fn drive_unindexed<C>(self, consumer: C) -> C::Result 36 where 37 C: UnindexedConsumer<Self::Item>, 38 { 39 let full = AtomicBool::new(false); 40 let consumer1 = WhileSomeConsumer { 41 base: consumer, 42 full: &full, 43 }; 44 self.base.drive_unindexed(consumer1) 45 } 46 } 47 48 /// //////////////////////////////////////////////////////////////////////// 49 /// Consumer implementation 50 51 struct WhileSomeConsumer<'f, C> { 52 base: C, 53 full: &'f AtomicBool, 54 } 55 56 impl<'f, T, C> Consumer<Option<T>> for WhileSomeConsumer<'f, C> 57 where 58 C: Consumer<T>, 59 T: Send, 60 { 61 type Folder = WhileSomeFolder<'f, C::Folder>; 62 type Reducer = C::Reducer; 63 type Result = C::Result; 64 split_at(self, index: usize) -> (Self, Self, Self::Reducer)65 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { 66 let (left, right, reducer) = self.base.split_at(index); 67 ( 68 WhileSomeConsumer { base: left, ..self }, 69 WhileSomeConsumer { 70 base: right, 71 ..self 72 }, 73 reducer, 74 ) 75 } 76 into_folder(self) -> Self::Folder77 fn into_folder(self) -> Self::Folder { 78 WhileSomeFolder { 79 base: self.base.into_folder(), 80 full: self.full, 81 } 82 } 83 full(&self) -> bool84 fn full(&self) -> bool { 85 self.full.load(Ordering::Relaxed) || self.base.full() 86 } 87 } 88 89 impl<'f, T, C> UnindexedConsumer<Option<T>> for WhileSomeConsumer<'f, C> 90 where 91 C: UnindexedConsumer<T>, 92 T: Send, 93 { split_off_left(&self) -> Self94 fn split_off_left(&self) -> Self { 95 WhileSomeConsumer { 96 base: self.base.split_off_left(), 97 ..*self 98 } 99 } 100 to_reducer(&self) -> Self::Reducer101 fn to_reducer(&self) -> Self::Reducer { 102 self.base.to_reducer() 103 } 104 } 105 106 struct WhileSomeFolder<'f, C> { 107 base: C, 108 full: &'f AtomicBool, 109 } 110 111 impl<'f, T, C> Folder<Option<T>> for WhileSomeFolder<'f, C> 112 where 113 C: Folder<T>, 114 { 115 type Result = C::Result; 116 consume(mut self, item: Option<T>) -> Self117 fn consume(mut self, item: Option<T>) -> Self { 118 match item { 119 Some(item) => self.base = self.base.consume(item), 120 None => self.full.store(true, Ordering::Relaxed), 121 } 122 self 123 } 124 consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = Option<T>>,125 fn consume_iter<I>(mut self, iter: I) -> Self 126 where 127 I: IntoIterator<Item = Option<T>>, 128 { 129 fn some<T>(full: &AtomicBool) -> impl Fn(&Option<T>) -> bool + '_ { 130 move |x| match *x { 131 Some(_) => !full.load(Ordering::Relaxed), 132 None => { 133 full.store(true, Ordering::Relaxed); 134 false 135 } 136 } 137 } 138 139 self.base = self.base.consume_iter( 140 iter.into_iter() 141 .take_while(some(self.full)) 142 .map(Option::unwrap), 143 ); 144 self 145 } 146 complete(self) -> C::Result147 fn complete(self) -> C::Result { 148 self.base.complete() 149 } 150 full(&self) -> bool151 fn full(&self) -> bool { 152 self.full.load(Ordering::Relaxed) || self.base.full() 153 } 154 } 155