1 use super::plumbing::*;
2 use super::*;
3 use std::sync::atomic::{AtomicBool, Ordering};
4 
find<I, P>(pi: I, find_op: P) -> Option<I::Item> where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync,5 pub(super) fn find<I, P>(pi: I, find_op: P) -> Option<I::Item>
6 where
7     I: ParallelIterator,
8     P: Fn(&I::Item) -> bool + Sync,
9 {
10     let found = AtomicBool::new(false);
11     let consumer = FindConsumer::new(&find_op, &found);
12     pi.drive_unindexed(consumer)
13 }
14 
15 struct FindConsumer<'p, P> {
16     find_op: &'p P,
17     found: &'p AtomicBool,
18 }
19 
20 impl<'p, P> FindConsumer<'p, P> {
new(find_op: &'p P, found: &'p AtomicBool) -> Self21     fn new(find_op: &'p P, found: &'p AtomicBool) -> Self {
22         FindConsumer { find_op, found }
23     }
24 }
25 
26 impl<'p, T, P: 'p> Consumer<T> for FindConsumer<'p, P>
27 where
28     T: Send,
29     P: Fn(&T) -> bool + Sync,
30 {
31     type Folder = FindFolder<'p, T, P>;
32     type Reducer = FindReducer;
33     type Result = Option<T>;
34 
split_at(self, _index: usize) -> (Self, Self, Self::Reducer)35     fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
36         (self.split_off_left(), self, FindReducer)
37     }
38 
into_folder(self) -> Self::Folder39     fn into_folder(self) -> Self::Folder {
40         FindFolder {
41             find_op: self.find_op,
42             found: self.found,
43             item: None,
44         }
45     }
46 
full(&self) -> bool47     fn full(&self) -> bool {
48         self.found.load(Ordering::Relaxed)
49     }
50 }
51 
52 impl<'p, T, P: 'p> UnindexedConsumer<T> for FindConsumer<'p, P>
53 where
54     T: Send,
55     P: Fn(&T) -> bool + Sync,
56 {
split_off_left(&self) -> Self57     fn split_off_left(&self) -> Self {
58         FindConsumer::new(self.find_op, self.found)
59     }
60 
to_reducer(&self) -> Self::Reducer61     fn to_reducer(&self) -> Self::Reducer {
62         FindReducer
63     }
64 }
65 
66 struct FindFolder<'p, T, P> {
67     find_op: &'p P,
68     found: &'p AtomicBool,
69     item: Option<T>,
70 }
71 
72 impl<'p, T, P> Folder<T> for FindFolder<'p, T, P>
73 where
74     P: Fn(&T) -> bool + 'p,
75 {
76     type Result = Option<T>;
77 
consume(mut self, item: T) -> Self78     fn consume(mut self, item: T) -> Self {
79         if (self.find_op)(&item) {
80             self.found.store(true, Ordering::Relaxed);
81             self.item = Some(item);
82         }
83         self
84     }
85 
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,86     fn consume_iter<I>(mut self, iter: I) -> Self
87     where
88         I: IntoIterator<Item = T>,
89     {
90         fn not_full<T>(found: &AtomicBool) -> impl Fn(&T) -> bool + '_ {
91             move |_| !found.load(Ordering::Relaxed)
92         }
93 
94         self.item = iter
95             .into_iter()
96             // stop iterating if another thread has found something
97             .take_while(not_full(&self.found))
98             .find(self.find_op);
99         if self.item.is_some() {
100             self.found.store(true, Ordering::Relaxed)
101         }
102         self
103     }
104 
complete(self) -> Self::Result105     fn complete(self) -> Self::Result {
106         self.item
107     }
108 
full(&self) -> bool109     fn full(&self) -> bool {
110         self.found.load(Ordering::Relaxed)
111     }
112 }
113 
114 struct FindReducer;
115 
116 impl<T> Reducer<Option<T>> for FindReducer {
reduce(self, left: Option<T>, right: Option<T>) -> Option<T>117     fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
118         left.or(right)
119     }
120 }
121