1 use super::plumbing::*;
2 use super::*;
3 
4 use std::fmt::{self, Debug};
5 
6 /// `Filter` takes a predicate `filter_op` and filters out elements that match.
7 /// This struct is created by the [`filter()`] method on [`ParallelIterator`]
8 ///
9 /// [`filter()`]: trait.ParallelIterator.html#method.filter
10 /// [`ParallelIterator`]: trait.ParallelIterator.html
11 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
12 #[derive(Clone)]
13 pub struct Filter<I: ParallelIterator, P> {
14     base: I,
15     filter_op: P,
16 }
17 
18 impl<I: ParallelIterator + Debug, P> Debug for Filter<I, P> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result19     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20         f.debug_struct("Filter").field("base", &self.base).finish()
21     }
22 }
23 
24 impl<I, P> Filter<I, P>
25 where
26     I: ParallelIterator,
27 {
28     /// Creates a new `Filter` iterator.
new(base: I, filter_op: P) -> Self29     pub(super) fn new(base: I, filter_op: P) -> Self {
30         Filter { base, filter_op }
31     }
32 }
33 
34 impl<I, P> ParallelIterator for Filter<I, P>
35 where
36     I: ParallelIterator,
37     P: Fn(&I::Item) -> bool + Sync + Send,
38 {
39     type Item = I::Item;
40 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,41     fn drive_unindexed<C>(self, consumer: C) -> C::Result
42     where
43         C: UnindexedConsumer<Self::Item>,
44     {
45         let consumer1 = FilterConsumer::new(consumer, &self.filter_op);
46         self.base.drive_unindexed(consumer1)
47     }
48 }
49 
50 /// ////////////////////////////////////////////////////////////////////////
51 /// Consumer implementation
52 
53 struct FilterConsumer<'p, C, P> {
54     base: C,
55     filter_op: &'p P,
56 }
57 
58 impl<'p, C, P> FilterConsumer<'p, C, P> {
new(base: C, filter_op: &'p P) -> Self59     fn new(base: C, filter_op: &'p P) -> Self {
60         FilterConsumer { base, filter_op }
61     }
62 }
63 
64 impl<'p, T, C, P: 'p> Consumer<T> for FilterConsumer<'p, C, P>
65 where
66     C: Consumer<T>,
67     P: Fn(&T) -> bool + Sync,
68 {
69     type Folder = FilterFolder<'p, C::Folder, P>;
70     type Reducer = C::Reducer;
71     type Result = C::Result;
72 
split_at(self, index: usize) -> (Self, Self, C::Reducer)73     fn split_at(self, index: usize) -> (Self, Self, C::Reducer) {
74         let (left, right, reducer) = self.base.split_at(index);
75         (
76             FilterConsumer::new(left, self.filter_op),
77             FilterConsumer::new(right, self.filter_op),
78             reducer,
79         )
80     }
81 
into_folder(self) -> Self::Folder82     fn into_folder(self) -> Self::Folder {
83         FilterFolder {
84             base: self.base.into_folder(),
85             filter_op: self.filter_op,
86         }
87     }
88 
full(&self) -> bool89     fn full(&self) -> bool {
90         self.base.full()
91     }
92 }
93 
94 impl<'p, T, C, P: 'p> UnindexedConsumer<T> for FilterConsumer<'p, C, P>
95 where
96     C: UnindexedConsumer<T>,
97     P: Fn(&T) -> bool + Sync,
98 {
split_off_left(&self) -> Self99     fn split_off_left(&self) -> Self {
100         FilterConsumer::new(self.base.split_off_left(), &self.filter_op)
101     }
102 
to_reducer(&self) -> Self::Reducer103     fn to_reducer(&self) -> Self::Reducer {
104         self.base.to_reducer()
105     }
106 }
107 
108 struct FilterFolder<'p, C, P> {
109     base: C,
110     filter_op: &'p P,
111 }
112 
113 impl<'p, C, P, T> Folder<T> for FilterFolder<'p, C, P>
114 where
115     C: Folder<T>,
116     P: Fn(&T) -> bool + 'p,
117 {
118     type Result = C::Result;
119 
consume(self, item: T) -> Self120     fn consume(self, item: T) -> Self {
121         let filter_op = self.filter_op;
122         if filter_op(&item) {
123             let base = self.base.consume(item);
124             FilterFolder { base, filter_op }
125         } else {
126             self
127         }
128     }
129 
130     // This cannot easily specialize `consume_iter` to be better than
131     // the default, because that requires checking `self.base.full()`
132     // during a call to `self.base.consume_iter()`. (#632)
133 
complete(self) -> Self::Result134     fn complete(self) -> Self::Result {
135         self.base.complete()
136     }
137 
full(&self) -> bool138     fn full(&self) -> bool {
139         self.base.full()
140     }
141 }
142