1 //! Traits and functions used to implement parallel iteration.  These are
2 //! low-level details -- users of parallel iterators should not need to
3 //! interact with them directly.  See [the `plumbing` README][r] for a general overview.
4 //!
5 //! [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
6 
7 use crate::join_context;
8 
9 use super::IndexedParallelIterator;
10 
11 use std::cmp;
12 use std::usize;
13 
14 /// The `ProducerCallback` trait is a kind of generic closure,
15 /// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in
16 /// the plumbing README][r] for more details.
17 ///
18 /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback
19 /// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html
20 pub trait ProducerCallback<T> {
21     /// The type of value returned by this callback. Analogous to
22     /// [`Output` from the `FnOnce` trait][Output].
23     ///
24     /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output
25     type Output;
26 
27     /// Invokes the callback with the given producer as argument. The
28     /// key point of this trait is that this method is generic over
29     /// `P`, and hence implementors must be defined for any producer.
callback<P>(self, producer: P) -> Self::Output where P: Producer<Item = T>30     fn callback<P>(self, producer: P) -> Self::Output
31     where
32         P: Producer<Item = T>;
33 }
34 
35 /// A `Producer` is effectively a "splittable `IntoIterator`". That
36 /// is, a producer is a value which can be converted into an iterator
37 /// at any time: at that point, it simply produces items on demand,
38 /// like any iterator. But what makes a `Producer` special is that,
39 /// *before* we convert to an iterator, we can also **split** it at a
40 /// particular point using the `split_at` method. This will yield up
41 /// two producers, one producing the items before that point, and one
42 /// producing the items after that point (these two producers can then
43 /// independently be split further, or be converted into iterators).
44 /// In Rayon, this splitting is used to divide between threads.
45 /// See [the `plumbing` README][r] for further details.
46 ///
47 /// Note that each producer will always produce a fixed number of
48 /// items N. However, this number N is not queryable through the API;
49 /// the consumer is expected to track it.
50 ///
51 /// NB. You might expect `Producer` to extend the `IntoIterator`
52 /// trait.  However, [rust-lang/rust#20671][20671] prevents us from
53 /// declaring the DoubleEndedIterator and ExactSizeIterator
54 /// constraints on a required IntoIterator trait, so we inline
55 /// IntoIterator here until that issue is fixed.
56 ///
57 /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
58 /// [20671]: https://github.com/rust-lang/rust/issues/20671
59 pub trait Producer: Send + Sized {
60     /// The type of item that will be produced by this producer once
61     /// it is converted into an iterator.
62     type Item;
63 
64     /// The type of iterator we will become.
65     type IntoIter: Iterator<Item = Self::Item> + DoubleEndedIterator + ExactSizeIterator;
66 
67     /// Convert `self` into an iterator; at this point, no more parallel splits
68     /// are possible.
into_iter(self) -> Self::IntoIter69     fn into_iter(self) -> Self::IntoIter;
70 
71     /// The minimum number of items that we will process
72     /// sequentially. Defaults to 1, which means that we will split
73     /// all the way down to a single item. This can be raised higher
74     /// using the [`with_min_len`] method, which will force us to
75     /// create sequential tasks at a larger granularity. Note that
76     /// Rayon automatically normally attempts to adjust the size of
77     /// parallel splits to reduce overhead, so this should not be
78     /// needed.
79     ///
80     /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len
min_len(&self) -> usize81     fn min_len(&self) -> usize {
82         1
83     }
84 
85     /// The maximum number of items that we will process
86     /// sequentially. Defaults to MAX, which means that we can choose
87     /// not to split at all. This can be lowered using the
88     /// [`with_max_len`] method, which will force us to create more
89     /// parallel tasks. Note that Rayon automatically normally
90     /// attempts to adjust the size of parallel splits to reduce
91     /// overhead, so this should not be needed.
92     ///
93     /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len
max_len(&self) -> usize94     fn max_len(&self) -> usize {
95         usize::MAX
96     }
97 
98     /// Split into two producers; one produces items `0..index`, the
99     /// other `index..N`. Index must be less than or equal to `N`.
split_at(self, index: usize) -> (Self, Self)100     fn split_at(self, index: usize) -> (Self, Self);
101 
102     /// Iterate the producer, feeding each element to `folder`, and
103     /// stop when the folder is full (or all elements have been consumed).
104     ///
105     /// The provided implementation is sufficient for most iterables.
fold_with<F>(self, folder: F) -> F where F: Folder<Self::Item>,106     fn fold_with<F>(self, folder: F) -> F
107     where
108         F: Folder<Self::Item>,
109     {
110         folder.consume_iter(self.into_iter())
111     }
112 }
113 
114 /// A consumer is effectively a [generalized "fold" operation][fold],
115 /// and in fact each consumer will eventually be converted into a
116 /// [`Folder`]. What makes a consumer special is that, like a
117 /// [`Producer`], it can be **split** into multiple consumers using
118 /// the `split_at` method. When a consumer is split, it produces two
119 /// consumers, as well as a **reducer**. The two consumers can be fed
120 /// items independently, and when they are done the reducer is used to
121 /// combine their two results into one. See [the `plumbing`
122 /// README][r] for further details.
123 ///
124 /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
125 /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
126 /// [`Folder`]: trait.Folder.html
127 /// [`Producer`]: trait.Producer.html
128 pub trait Consumer<Item>: Send + Sized {
129     /// The type of folder that this consumer can be converted into.
130     type Folder: Folder<Item, Result = Self::Result>;
131 
132     /// The type of reducer that is produced if this consumer is split.
133     type Reducer: Reducer<Self::Result>;
134 
135     /// The type of result that this consumer will ultimately produce.
136     type Result: Send;
137 
138     /// Divide the consumer into two consumers, one processing items
139     /// `0..index` and one processing items from `index..`. Also
140     /// produces a reducer that can be used to reduce the results at
141     /// the end.
split_at(self, index: usize) -> (Self, Self, Self::Reducer)142     fn split_at(self, index: usize) -> (Self, Self, Self::Reducer);
143 
144     /// Convert the consumer into a folder that can consume items
145     /// sequentially, eventually producing a final result.
into_folder(self) -> Self::Folder146     fn into_folder(self) -> Self::Folder;
147 
148     /// Hint whether this `Consumer` would like to stop processing
149     /// further items, e.g. if a search has been completed.
full(&self) -> bool150     fn full(&self) -> bool;
151 }
152 
153 /// The `Folder` trait encapsulates [the standard fold
154 /// operation][fold].  It can be fed many items using the `consume`
155 /// method. At the end, once all items have been consumed, it can then
156 /// be converted (using `complete`) into a final value.
157 ///
158 /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
159 pub trait Folder<Item>: Sized {
160     /// The type of result that will ultimately be produced by the folder.
161     type Result;
162 
163     /// Consume next item and return new sequential state.
consume(self, item: Item) -> Self164     fn consume(self, item: Item) -> Self;
165 
166     /// Consume items from the iterator until full, and return new sequential state.
167     ///
168     /// This method is **optional**. The default simply iterates over
169     /// `iter`, invoking `consume` and checking after each iteration
170     /// whether `full` returns false.
171     ///
172     /// The main reason to override it is if you can provide a more
173     /// specialized, efficient implementation.
consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = Item>,174     fn consume_iter<I>(mut self, iter: I) -> Self
175     where
176         I: IntoIterator<Item = Item>,
177     {
178         for item in iter {
179             self = self.consume(item);
180             if self.full() {
181                 break;
182             }
183         }
184         self
185     }
186 
187     /// Finish consuming items, produce final result.
complete(self) -> Self::Result188     fn complete(self) -> Self::Result;
189 
190     /// Hint whether this `Folder` would like to stop processing
191     /// further items, e.g. if a search has been completed.
full(&self) -> bool192     fn full(&self) -> bool;
193 }
194 
195 /// The reducer is the final step of a `Consumer` -- after a consumer
196 /// has been split into two parts, and each of those parts has been
197 /// fully processed, we are left with two results. The reducer is then
198 /// used to combine those two results into one. See [the `plumbing`
199 /// README][r] for further details.
200 ///
201 /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
202 pub trait Reducer<Result> {
203     /// Reduce two final results into one; this is executed after a
204     /// split.
reduce(self, left: Result, right: Result) -> Result205     fn reduce(self, left: Result, right: Result) -> Result;
206 }
207 
208 /// A stateless consumer can be freely copied. These consumers can be
209 /// used like regular consumers, but they also support a
210 /// `split_off_left` method that does not take an index to split, but
211 /// simply splits at some arbitrary point (`for_each`, for example,
212 /// produces an unindexed consumer).
213 pub trait UnindexedConsumer<I>: Consumer<I> {
214     /// Splits off a "left" consumer and returns it. The `self`
215     /// consumer should then be used to consume the "right" portion of
216     /// the data. (The ordering matters for methods like find_first --
217     /// values produced by the returned value are given precedence
218     /// over values produced by `self`.) Once the left and right
219     /// halves have been fully consumed, you should reduce the results
220     /// with the result of `to_reducer`.
split_off_left(&self) -> Self221     fn split_off_left(&self) -> Self;
222 
223     /// Creates a reducer that can be used to combine the results from
224     /// a split consumer.
to_reducer(&self) -> Self::Reducer225     fn to_reducer(&self) -> Self::Reducer;
226 }
227 
228 /// A variant on `Producer` which does not know its exact length or
229 /// cannot represent it in a `usize`. These producers act like
230 /// ordinary producers except that they cannot be told to split at a
231 /// particular point. Instead, you just ask them to split 'somewhere'.
232 ///
233 /// (In principle, `Producer` could extend this trait; however, it
234 /// does not because to do so would require producers to carry their
235 /// own length with them.)
236 pub trait UnindexedProducer: Send + Sized {
237     /// The type of item returned by this producer.
238     type Item;
239 
240     /// Split midway into a new producer if possible, otherwise return `None`.
split(self) -> (Self, Option<Self>)241     fn split(self) -> (Self, Option<Self>);
242 
243     /// Iterate the producer, feeding each element to `folder`, and
244     /// stop when the folder is full (or all elements have been consumed).
fold_with<F>(self, folder: F) -> F where F: Folder<Self::Item>245     fn fold_with<F>(self, folder: F) -> F
246     where
247         F: Folder<Self::Item>;
248 }
249 
250 /// A splitter controls the policy for splitting into smaller work items.
251 ///
252 /// Thief-splitting is an adaptive policy that starts by splitting into
253 /// enough jobs for every worker thread, and then resets itself whenever a
254 /// job is actually stolen into a different thread.
255 #[derive(Clone, Copy)]
256 struct Splitter {
257     /// The `splits` tell us approximately how many remaining times we'd
258     /// like to split this job.  We always just divide it by two though, so
259     /// the effective number of pieces will be `next_power_of_two()`.
260     splits: usize,
261 }
262 
263 impl Splitter {
264     #[inline]
new() -> Splitter265     fn new() -> Splitter {
266         Splitter {
267             splits: crate::current_num_threads(),
268         }
269     }
270 
271     #[inline]
try_split(&mut self, stolen: bool) -> bool272     fn try_split(&mut self, stolen: bool) -> bool {
273         let Splitter { splits } = *self;
274 
275         if stolen {
276             // This job was stolen!  Reset the number of desired splits to the
277             // thread count, if that's more than we had remaining anyway.
278             self.splits = cmp::max(crate::current_num_threads(), self.splits / 2);
279             true
280         } else if splits > 0 {
281             // We have splits remaining, make it so.
282             self.splits /= 2;
283             true
284         } else {
285             // Not stolen, and no more splits -- we're done!
286             false
287         }
288     }
289 }
290 
291 /// The length splitter is built on thief-splitting, but additionally takes
292 /// into account the remaining length of the iterator.
293 #[derive(Clone, Copy)]
294 struct LengthSplitter {
295     inner: Splitter,
296 
297     /// The smallest we're willing to divide into.  Usually this is just 1,
298     /// but you can choose a larger working size with `with_min_len()`.
299     min: usize,
300 }
301 
302 impl LengthSplitter {
303     /// Creates a new splitter based on lengths.
304     ///
305     /// The `min` is a hard lower bound.  We'll never split below that, but
306     /// of course an iterator might start out smaller already.
307     ///
308     /// The `max` is an upper bound on the working size, used to determine
309     /// the minimum number of times we need to split to get under that limit.
310     /// The adaptive algorithm may very well split even further, but never
311     /// smaller than the `min`.
312     #[inline]
new(min: usize, max: usize, len: usize) -> LengthSplitter313     fn new(min: usize, max: usize, len: usize) -> LengthSplitter {
314         let mut splitter = LengthSplitter {
315             inner: Splitter::new(),
316             min: cmp::max(min, 1),
317         };
318 
319         // Divide the given length by the max working length to get the minimum
320         // number of splits we need to get under that max.  This rounds down,
321         // but the splitter actually gives `next_power_of_two()` pieces anyway.
322         // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces.
323         let min_splits = len / cmp::max(max, 1);
324 
325         // Only update the value if it's not splitting enough already.
326         if min_splits > splitter.inner.splits {
327             splitter.inner.splits = min_splits;
328         }
329 
330         splitter
331     }
332 
333     #[inline]
try_split(&mut self, len: usize, stolen: bool) -> bool334     fn try_split(&mut self, len: usize, stolen: bool) -> bool {
335         // If splitting wouldn't make us too small, try the inner splitter.
336         len / 2 >= self.min && self.inner.try_split(stolen)
337     }
338 }
339 
340 /// This helper function is used to "connect" a parallel iterator to a
341 /// consumer. It will convert the `par_iter` into a producer P and
342 /// then pull items from P and feed them to `consumer`, splitting and
343 /// creating parallel threads as needed.
344 ///
345 /// This is useful when you are implementing your own parallel
346 /// iterators: it is often used as the definition of the
347 /// [`drive_unindexed`] or [`drive`] methods.
348 ///
349 /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed
350 /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive
bridge<I, C>(par_iter: I, consumer: C) -> C::Result where I: IndexedParallelIterator, C: Consumer<I::Item>,351 pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result
352 where
353     I: IndexedParallelIterator,
354     C: Consumer<I::Item>,
355 {
356     let len = par_iter.len();
357     return par_iter.with_producer(Callback { len, consumer });
358 
359     struct Callback<C> {
360         len: usize,
361         consumer: C,
362     }
363 
364     impl<C, I> ProducerCallback<I> for Callback<C>
365     where
366         C: Consumer<I>,
367     {
368         type Output = C::Result;
369         fn callback<P>(self, producer: P) -> C::Result
370         where
371             P: Producer<Item = I>,
372         {
373             bridge_producer_consumer(self.len, producer, self.consumer)
374         }
375     }
376 }
377 
378 /// This helper function is used to "connect" a producer and a
379 /// consumer. You may prefer to call [`bridge`], which wraps this
380 /// function. This function will draw items from `producer` and feed
381 /// them to `consumer`, splitting and creating parallel tasks when
382 /// needed.
383 ///
384 /// This is useful when you are implementing your own parallel
385 /// iterators: it is often used as the definition of the
386 /// [`drive_unindexed`] or [`drive`] methods.
387 ///
388 /// [`bridge`]: fn.bridge.html
389 /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed
390 /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive
bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result where P: Producer, C: Consumer<P::Item>,391 pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result
392 where
393     P: Producer,
394     C: Consumer<P::Item>,
395 {
396     let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len);
397     return helper(len, false, splitter, producer, consumer);
398 
399     fn helper<P, C>(
400         len: usize,
401         migrated: bool,
402         mut splitter: LengthSplitter,
403         producer: P,
404         consumer: C,
405     ) -> C::Result
406     where
407         P: Producer,
408         C: Consumer<P::Item>,
409     {
410         if consumer.full() {
411             consumer.into_folder().complete()
412         } else if splitter.try_split(len, migrated) {
413             let mid = len / 2;
414             let (left_producer, right_producer) = producer.split_at(mid);
415             let (left_consumer, right_consumer, reducer) = consumer.split_at(mid);
416             let (left_result, right_result) = join_context(
417                 |context| {
418                     helper(
419                         mid,
420                         context.migrated(),
421                         splitter,
422                         left_producer,
423                         left_consumer,
424                     )
425                 },
426                 |context| {
427                     helper(
428                         len - mid,
429                         context.migrated(),
430                         splitter,
431                         right_producer,
432                         right_consumer,
433                     )
434                 },
435             );
436             reducer.reduce(left_result, right_result)
437         } else {
438             producer.fold_with(consumer.into_folder()).complete()
439         }
440     }
441 }
442 
443 /// A variant of [`bridge_producer_consumer`] where the producer is an unindexed producer.
444 ///
445 /// [`bridge_producer_consumer`]: fn.bridge_producer_consumer.html
bridge_unindexed<P, C>(producer: P, consumer: C) -> C::Result where P: UnindexedProducer, C: UnindexedConsumer<P::Item>,446 pub fn bridge_unindexed<P, C>(producer: P, consumer: C) -> C::Result
447 where
448     P: UnindexedProducer,
449     C: UnindexedConsumer<P::Item>,
450 {
451     let splitter = Splitter::new();
452     bridge_unindexed_producer_consumer(false, splitter, producer, consumer)
453 }
454 
bridge_unindexed_producer_consumer<P, C>( migrated: bool, mut splitter: Splitter, producer: P, consumer: C, ) -> C::Result where P: UnindexedProducer, C: UnindexedConsumer<P::Item>,455 fn bridge_unindexed_producer_consumer<P, C>(
456     migrated: bool,
457     mut splitter: Splitter,
458     producer: P,
459     consumer: C,
460 ) -> C::Result
461 where
462     P: UnindexedProducer,
463     C: UnindexedConsumer<P::Item>,
464 {
465     if consumer.full() {
466         consumer.into_folder().complete()
467     } else if splitter.try_split(migrated) {
468         match producer.split() {
469             (left_producer, Some(right_producer)) => {
470                 let (reducer, left_consumer, right_consumer) =
471                     (consumer.to_reducer(), consumer.split_off_left(), consumer);
472                 let bridge = bridge_unindexed_producer_consumer;
473                 let (left_result, right_result) = join_context(
474                     |context| bridge(context.migrated(), splitter, left_producer, left_consumer),
475                     |context| bridge(context.migrated(), splitter, right_producer, right_consumer),
476                 );
477                 reducer.reduce(left_result, right_result)
478             }
479             (producer, None) => producer.fold_with(consumer.into_folder()).complete(),
480         }
481     } else {
482         producer.fold_with(consumer.into_folder()).complete()
483     }
484 }
485