1 use super::plumbing::*; 2 use super::*; 3 4 use std::fmt::{self, Debug}; 5 6 impl<U, I, ID, F> Fold<I, ID, F> 7 where 8 I: ParallelIterator, 9 F: Fn(U, I::Item) -> U + Sync + Send, 10 ID: Fn() -> U + Sync + Send, 11 U: Send, 12 { new(base: I, identity: ID, fold_op: F) -> Self13 pub(super) fn new(base: I, identity: ID, fold_op: F) -> Self { 14 Fold { 15 base, 16 identity, 17 fold_op, 18 } 19 } 20 } 21 22 /// `Fold` is an iterator that applies a function over an iterator producing a single value. 23 /// This struct is created by the [`fold()`] method on [`ParallelIterator`] 24 /// 25 /// [`fold()`]: trait.ParallelIterator.html#method.fold 26 /// [`ParallelIterator`]: trait.ParallelIterator.html 27 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] 28 #[derive(Clone)] 29 pub struct Fold<I, ID, F> { 30 base: I, 31 identity: ID, 32 fold_op: F, 33 } 34 35 impl<I: ParallelIterator + Debug, ID, F> Debug for Fold<I, ID, F> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 37 f.debug_struct("Fold").field("base", &self.base).finish() 38 } 39 } 40 41 impl<U, I, ID, F> ParallelIterator for Fold<I, ID, F> 42 where 43 I: ParallelIterator, 44 F: Fn(U, I::Item) -> U + Sync + Send, 45 ID: Fn() -> U + Sync + Send, 46 U: Send, 47 { 48 type Item = U; 49 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,50 fn drive_unindexed<C>(self, consumer: C) -> C::Result 51 where 52 C: UnindexedConsumer<Self::Item>, 53 { 54 let consumer1 = FoldConsumer { 55 base: consumer, 56 fold_op: &self.fold_op, 57 identity: &self.identity, 58 }; 59 self.base.drive_unindexed(consumer1) 60 } 61 } 62 63 struct FoldConsumer<'c, C, ID: 'c, F: 'c> { 64 base: C, 65 fold_op: &'c F, 66 identity: &'c ID, 67 } 68 69 impl<'r, U, T, C, ID, F> Consumer<T> for FoldConsumer<'r, C, ID, F> 70 where 71 C: Consumer<U>, 72 F: Fn(U, T) -> U + Sync, 73 ID: Fn() -> U + Sync, 74 U: Send, 75 { 76 type Folder = FoldFolder<'r, C::Folder, U, F>; 77 type Reducer = C::Reducer; 78 type Result = C::Result; 79 split_at(self, index: usize) -> (Self, Self, Self::Reducer)80 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { 81 let (left, right, reducer) = self.base.split_at(index); 82 ( 83 FoldConsumer { base: left, ..self }, 84 FoldConsumer { 85 base: right, 86 ..self 87 }, 88 reducer, 89 ) 90 } 91 into_folder(self) -> Self::Folder92 fn into_folder(self) -> Self::Folder { 93 FoldFolder { 94 base: self.base.into_folder(), 95 item: (self.identity)(), 96 fold_op: self.fold_op, 97 } 98 } 99 full(&self) -> bool100 fn full(&self) -> bool { 101 self.base.full() 102 } 103 } 104 105 impl<'r, U, T, C, ID, F> UnindexedConsumer<T> for FoldConsumer<'r, C, ID, F> 106 where 107 C: UnindexedConsumer<U>, 108 F: Fn(U, T) -> U + Sync, 109 ID: Fn() -> U + Sync, 110 U: Send, 111 { split_off_left(&self) -> Self112 fn split_off_left(&self) -> Self { 113 FoldConsumer { 114 base: self.base.split_off_left(), 115 ..*self 116 } 117 } 118 to_reducer(&self) -> Self::Reducer119 fn to_reducer(&self) -> Self::Reducer { 120 self.base.to_reducer() 121 } 122 } 123 124 struct FoldFolder<'r, C, ID, F: 'r> { 125 base: C, 126 fold_op: &'r F, 127 item: ID, 128 } 129 130 impl<'r, C, ID, F, T> Folder<T> for FoldFolder<'r, C, ID, F> 131 where 132 C: Folder<ID>, 133 F: Fn(ID, T) -> ID + Sync, 134 { 135 type Result = C::Result; 136 consume(self, item: T) -> Self137 fn consume(self, item: T) -> Self { 138 let item = (self.fold_op)(self.item, item); 139 FoldFolder { 140 base: self.base, 141 fold_op: self.fold_op, 142 item, 143 } 144 } 145 consume_iter<I>(self, iter: I) -> Self where I: IntoIterator<Item = T>,146 fn consume_iter<I>(self, iter: I) -> Self 147 where 148 I: IntoIterator<Item = T>, 149 { 150 fn not_full<C, ID, T>(base: &C) -> impl Fn(&T) -> bool + '_ 151 where 152 C: Folder<ID>, 153 { 154 move |_| !base.full() 155 } 156 157 let base = self.base; 158 let item = iter 159 .into_iter() 160 // stop iterating if another thread has finished 161 .take_while(not_full(&base)) 162 .fold(self.item, self.fold_op); 163 164 FoldFolder { 165 base, 166 item, 167 fold_op: self.fold_op, 168 } 169 } 170 complete(self) -> C::Result171 fn complete(self) -> C::Result { 172 self.base.consume(self.item).complete() 173 } 174 full(&self) -> bool175 fn full(&self) -> bool { 176 self.base.full() 177 } 178 } 179 180 // /////////////////////////////////////////////////////////////////////////// 181 182 impl<U, I, F> FoldWith<I, U, F> 183 where 184 I: ParallelIterator, 185 F: Fn(U, I::Item) -> U + Sync + Send, 186 U: Send + Clone, 187 { new(base: I, item: U, fold_op: F) -> Self188 pub(super) fn new(base: I, item: U, fold_op: F) -> Self { 189 FoldWith { 190 base, 191 item, 192 fold_op, 193 } 194 } 195 } 196 197 /// `FoldWith` is an iterator that applies a function over an iterator producing a single value. 198 /// This struct is created by the [`fold_with()`] method on [`ParallelIterator`] 199 /// 200 /// [`fold_with()`]: trait.ParallelIterator.html#method.fold_with 201 /// [`ParallelIterator`]: trait.ParallelIterator.html 202 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] 203 #[derive(Clone)] 204 pub struct FoldWith<I, U, F> { 205 base: I, 206 item: U, 207 fold_op: F, 208 } 209 210 impl<I: ParallelIterator + Debug, U: Debug, F> Debug for FoldWith<I, U, F> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 212 f.debug_struct("FoldWith") 213 .field("base", &self.base) 214 .field("item", &self.item) 215 .finish() 216 } 217 } 218 219 impl<U, I, F> ParallelIterator for FoldWith<I, U, F> 220 where 221 I: ParallelIterator, 222 F: Fn(U, I::Item) -> U + Sync + Send, 223 U: Send + Clone, 224 { 225 type Item = U; 226 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,227 fn drive_unindexed<C>(self, consumer: C) -> C::Result 228 where 229 C: UnindexedConsumer<Self::Item>, 230 { 231 let consumer1 = FoldWithConsumer { 232 base: consumer, 233 item: self.item, 234 fold_op: &self.fold_op, 235 }; 236 self.base.drive_unindexed(consumer1) 237 } 238 } 239 240 struct FoldWithConsumer<'c, C, U, F: 'c> { 241 base: C, 242 item: U, 243 fold_op: &'c F, 244 } 245 246 impl<'r, U, T, C, F> Consumer<T> for FoldWithConsumer<'r, C, U, F> 247 where 248 C: Consumer<U>, 249 F: Fn(U, T) -> U + Sync, 250 U: Send + Clone, 251 { 252 type Folder = FoldFolder<'r, C::Folder, U, F>; 253 type Reducer = C::Reducer; 254 type Result = C::Result; 255 split_at(self, index: usize) -> (Self, Self, Self::Reducer)256 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { 257 let (left, right, reducer) = self.base.split_at(index); 258 ( 259 FoldWithConsumer { 260 base: left, 261 item: self.item.clone(), 262 ..self 263 }, 264 FoldWithConsumer { 265 base: right, 266 ..self 267 }, 268 reducer, 269 ) 270 } 271 into_folder(self) -> Self::Folder272 fn into_folder(self) -> Self::Folder { 273 FoldFolder { 274 base: self.base.into_folder(), 275 item: self.item, 276 fold_op: self.fold_op, 277 } 278 } 279 full(&self) -> bool280 fn full(&self) -> bool { 281 self.base.full() 282 } 283 } 284 285 impl<'r, U, T, C, F> UnindexedConsumer<T> for FoldWithConsumer<'r, C, U, F> 286 where 287 C: UnindexedConsumer<U>, 288 F: Fn(U, T) -> U + Sync, 289 U: Send + Clone, 290 { split_off_left(&self) -> Self291 fn split_off_left(&self) -> Self { 292 FoldWithConsumer { 293 base: self.base.split_off_left(), 294 item: self.item.clone(), 295 ..*self 296 } 297 } 298 to_reducer(&self) -> Self::Reducer299 fn to_reducer(&self) -> Self::Reducer { 300 self.base.to_reducer() 301 } 302 } 303