1 use super::plumbing::*;
2 use super::*;
3 use std::sync::atomic::{AtomicBool, Ordering};
4 
5 /// `WhileSome` is an iterator that yields the `Some` elements of an iterator,
6 /// halting as soon as any `None` is produced.
7 ///
8 /// This struct is created by the [`while_some()`] method on [`ParallelIterator`]
9 ///
10 /// [`while_some()`]: trait.ParallelIterator.html#method.while_some
11 /// [`ParallelIterator`]: trait.ParallelIterator.html
12 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13 #[derive(Debug, Clone)]
14 pub struct WhileSome<I: ParallelIterator> {
15     base: I,
16 }
17 
18 impl<I> WhileSome<I>
19 where
20     I: ParallelIterator,
21 {
22     /// Create a new `WhileSome` iterator.
new(base: I) -> Self23     pub(super) fn new(base: I) -> Self {
24         WhileSome { base }
25     }
26 }
27 
28 impl<I, T> ParallelIterator for WhileSome<I>
29 where
30     I: ParallelIterator<Item = Option<T>>,
31     T: Send,
32 {
33     type Item = T;
34 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,35     fn drive_unindexed<C>(self, consumer: C) -> C::Result
36     where
37         C: UnindexedConsumer<Self::Item>,
38     {
39         let full = AtomicBool::new(false);
40         let consumer1 = WhileSomeConsumer {
41             base: consumer,
42             full: &full,
43         };
44         self.base.drive_unindexed(consumer1)
45     }
46 }
47 
48 /// ////////////////////////////////////////////////////////////////////////
49 /// Consumer implementation
50 
51 struct WhileSomeConsumer<'f, C> {
52     base: C,
53     full: &'f AtomicBool,
54 }
55 
56 impl<'f, T, C> Consumer<Option<T>> for WhileSomeConsumer<'f, C>
57 where
58     C: Consumer<T>,
59     T: Send,
60 {
61     type Folder = WhileSomeFolder<'f, C::Folder>;
62     type Reducer = C::Reducer;
63     type Result = C::Result;
64 
split_at(self, index: usize) -> (Self, Self, Self::Reducer)65     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
66         let (left, right, reducer) = self.base.split_at(index);
67         (
68             WhileSomeConsumer { base: left, ..self },
69             WhileSomeConsumer {
70                 base: right,
71                 ..self
72             },
73             reducer,
74         )
75     }
76 
into_folder(self) -> Self::Folder77     fn into_folder(self) -> Self::Folder {
78         WhileSomeFolder {
79             base: self.base.into_folder(),
80             full: self.full,
81         }
82     }
83 
full(&self) -> bool84     fn full(&self) -> bool {
85         self.full.load(Ordering::Relaxed) || self.base.full()
86     }
87 }
88 
89 impl<'f, T, C> UnindexedConsumer<Option<T>> for WhileSomeConsumer<'f, C>
90 where
91     C: UnindexedConsumer<T>,
92     T: Send,
93 {
split_off_left(&self) -> Self94     fn split_off_left(&self) -> Self {
95         WhileSomeConsumer {
96             base: self.base.split_off_left(),
97             ..*self
98         }
99     }
100 
to_reducer(&self) -> Self::Reducer101     fn to_reducer(&self) -> Self::Reducer {
102         self.base.to_reducer()
103     }
104 }
105 
106 struct WhileSomeFolder<'f, C> {
107     base: C,
108     full: &'f AtomicBool,
109 }
110 
111 impl<'f, T, C> Folder<Option<T>> for WhileSomeFolder<'f, C>
112 where
113     C: Folder<T>,
114 {
115     type Result = C::Result;
116 
consume(mut self, item: Option<T>) -> Self117     fn consume(mut self, item: Option<T>) -> Self {
118         match item {
119             Some(item) => self.base = self.base.consume(item),
120             None => self.full.store(true, Ordering::Relaxed),
121         }
122         self
123     }
124 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = Option<T>>,125     fn consume_iter<I>(mut self, iter: I) -> Self
126     where
127         I: IntoIterator<Item = Option<T>>,
128     {
129         fn some<T>(full: &AtomicBool) -> impl Fn(&Option<T>) -> bool + '_ {
130             move |x| match *x {
131                 Some(_) => !full.load(Ordering::Relaxed),
132                 None => {
133                     full.store(true, Ordering::Relaxed);
134                     false
135                 }
136             }
137         }
138 
139         self.base = self.base.consume_iter(
140             iter.into_iter()
141                 .take_while(some(self.full))
142                 .map(Option::unwrap),
143         );
144         self
145     }
146 
complete(self) -> C::Result147     fn complete(self) -> C::Result {
148         self.base.complete()
149     }
150 
full(&self) -> bool151     fn full(&self) -> bool {
152         self.full.load(Ordering::Relaxed) || self.base.full()
153     }
154 }
155