1 use super::plumbing::*;
2 use super::ParallelIterator;
3
reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T where PI: ParallelIterator<Item = T>, R: Fn(T, T) -> T + Sync, ID: Fn() -> T + Sync, T: Send,4 pub(super) fn reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T
5 where
6 PI: ParallelIterator<Item = T>,
7 R: Fn(T, T) -> T + Sync,
8 ID: Fn() -> T + Sync,
9 T: Send,
10 {
11 let consumer = ReduceConsumer {
12 identity: &identity,
13 reduce_op: &reduce_op,
14 };
15 pi.drive_unindexed(consumer)
16 }
17
18 struct ReduceConsumer<'r, R, ID> {
19 identity: &'r ID,
20 reduce_op: &'r R,
21 }
22
23 impl<'r, R, ID> Copy for ReduceConsumer<'r, R, ID> {}
24
25 impl<'r, R, ID> Clone for ReduceConsumer<'r, R, ID> {
clone(&self) -> Self26 fn clone(&self) -> Self {
27 *self
28 }
29 }
30
31 impl<'r, R, ID, T> Consumer<T> for ReduceConsumer<'r, R, ID>
32 where
33 R: Fn(T, T) -> T + Sync,
34 ID: Fn() -> T + Sync,
35 T: Send,
36 {
37 type Folder = ReduceFolder<'r, R, T>;
38 type Reducer = Self;
39 type Result = T;
40
split_at(self, _index: usize) -> (Self, Self, Self)41 fn split_at(self, _index: usize) -> (Self, Self, Self) {
42 (self, self, self)
43 }
44
into_folder(self) -> Self::Folder45 fn into_folder(self) -> Self::Folder {
46 ReduceFolder {
47 reduce_op: self.reduce_op,
48 item: (self.identity)(),
49 }
50 }
51
full(&self) -> bool52 fn full(&self) -> bool {
53 false
54 }
55 }
56
57 impl<'r, R, ID, T> UnindexedConsumer<T> for ReduceConsumer<'r, R, ID>
58 where
59 R: Fn(T, T) -> T + Sync,
60 ID: Fn() -> T + Sync,
61 T: Send,
62 {
split_off_left(&self) -> Self63 fn split_off_left(&self) -> Self {
64 *self
65 }
66
to_reducer(&self) -> Self::Reducer67 fn to_reducer(&self) -> Self::Reducer {
68 *self
69 }
70 }
71
72 impl<'r, R, ID, T> Reducer<T> for ReduceConsumer<'r, R, ID>
73 where
74 R: Fn(T, T) -> T + Sync,
75 {
reduce(self, left: T, right: T) -> T76 fn reduce(self, left: T, right: T) -> T {
77 (self.reduce_op)(left, right)
78 }
79 }
80
81 struct ReduceFolder<'r, R, T> {
82 reduce_op: &'r R,
83 item: T,
84 }
85
86 impl<'r, R, T> Folder<T> for ReduceFolder<'r, R, T>
87 where
88 R: Fn(T, T) -> T,
89 {
90 type Result = T;
91
consume(self, item: T) -> Self92 fn consume(self, item: T) -> Self {
93 ReduceFolder {
94 reduce_op: self.reduce_op,
95 item: (self.reduce_op)(self.item, item),
96 }
97 }
98
consume_iter<I>(self, iter: I) -> Self where I: IntoIterator<Item = T>,99 fn consume_iter<I>(self, iter: I) -> Self
100 where
101 I: IntoIterator<Item = T>,
102 {
103 ReduceFolder {
104 reduce_op: self.reduce_op,
105 item: iter.into_iter().fold(self.item, self.reduce_op),
106 }
107 }
108
complete(self) -> T109 fn complete(self) -> T {
110 self.item
111 }
112
full(&self) -> bool113 fn full(&self) -> bool {
114 false
115 }
116 }
117