use super::plumbing::*; use super::*; use std::sync::atomic::{AtomicBool, Ordering}; pub(super) fn find(pi: I, find_op: P) -> Option where I: ParallelIterator, P: Fn(&I::Item) -> bool + Sync, { let found = AtomicBool::new(false); let consumer = FindConsumer::new(&find_op, &found); pi.drive_unindexed(consumer) } struct FindConsumer<'p, P> { find_op: &'p P, found: &'p AtomicBool, } impl<'p, P> FindConsumer<'p, P> { fn new(find_op: &'p P, found: &'p AtomicBool) -> Self { FindConsumer { find_op, found } } } impl<'p, T, P: 'p> Consumer for FindConsumer<'p, P> where T: Send, P: Fn(&T) -> bool + Sync, { type Folder = FindFolder<'p, T, P>; type Reducer = FindReducer; type Result = Option; fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { (self.split_off_left(), self, FindReducer) } fn into_folder(self) -> Self::Folder { FindFolder { find_op: self.find_op, found: self.found, item: None, } } fn full(&self) -> bool { self.found.load(Ordering::Relaxed) } } impl<'p, T, P: 'p> UnindexedConsumer for FindConsumer<'p, P> where T: Send, P: Fn(&T) -> bool + Sync, { fn split_off_left(&self) -> Self { FindConsumer::new(self.find_op, self.found) } fn to_reducer(&self) -> Self::Reducer { FindReducer } } struct FindFolder<'p, T, P> { find_op: &'p P, found: &'p AtomicBool, item: Option, } impl<'p, T, P> Folder for FindFolder<'p, T, P> where P: Fn(&T) -> bool + 'p, { type Result = Option; fn consume(mut self, item: T) -> Self { if (self.find_op)(&item) { self.found.store(true, Ordering::Relaxed); self.item = Some(item); } self } fn consume_iter(mut self, iter: I) -> Self where I: IntoIterator, { fn not_full(found: &AtomicBool) -> impl Fn(&T) -> bool + '_ { move |_| !found.load(Ordering::Relaxed) } self.item = iter .into_iter() // stop iterating if another thread has found something .take_while(not_full(&self.found)) .find(self.find_op); if self.item.is_some() { self.found.store(true, Ordering::Relaxed) } self } fn complete(self) -> Self::Result { self.item } fn full(&self) -> bool { self.found.load(Ordering::Relaxed) } } struct FindReducer; impl Reducer> for FindReducer { fn reduce(self, left: Option, right: Option) -> Option { left.or(right) } }