1 use crossbeam_deque::{Steal, Stealer, Worker};
2 
3 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4 use std::sync::{Mutex, TryLockError};
5 use std::thread::yield_now;
6 
7 use crate::current_num_threads;
8 use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
9 use crate::iter::ParallelIterator;
10 
11 /// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
12 ///
13 /// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
14 /// across the Rayon thread pool. This has the advantage of being able to parallelize just about
15 /// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
16 /// `par_iter` instead. However, it can still be useful for iterators that are difficult to
17 /// parallelize by other means, like channels or file or network I/O.
18 ///
19 /// The resulting iterator is not guaranteed to keep the order of the original iterator.
20 ///
21 /// # Examples
22 ///
23 /// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
24 /// use any of the `ParallelIterator` methods:
25 ///
26 /// ```
27 /// use rayon::iter::ParallelBridge;
28 /// use rayon::prelude::ParallelIterator;
29 /// use std::sync::mpsc::channel;
30 ///
31 /// let rx = {
32 ///     let (tx, rx) = channel();
33 ///
34 ///     tx.send("one!");
35 ///     tx.send("two!");
36 ///     tx.send("three!");
37 ///
38 ///     rx
39 /// };
40 ///
41 /// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
42 /// output.sort_unstable();
43 ///
44 /// assert_eq!(&*output, &["one!", "three!", "two!"]);
45 /// ```
46 pub trait ParallelBridge: Sized {
47     /// Creates a bridge from this type to a `ParallelIterator`.
par_bridge(self) -> IterBridge<Self>48     fn par_bridge(self) -> IterBridge<Self>;
49 }
50 
51 impl<T: Iterator + Send> ParallelBridge for T
52 where
53     T::Item: Send,
54 {
par_bridge(self) -> IterBridge<Self>55     fn par_bridge(self) -> IterBridge<Self> {
56         IterBridge { iter: self }
57     }
58 }
59 
60 /// `IterBridge` is a parallel iterator that wraps a sequential iterator.
61 ///
62 /// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
63 /// [`ParallelBridge`] documentation for details.
64 ///
65 /// [`ParallelBridge`]: trait.ParallelBridge.html
66 #[derive(Debug, Clone)]
67 pub struct IterBridge<Iter> {
68     iter: Iter,
69 }
70 
71 impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
72 where
73     Iter::Item: Send,
74 {
75     type Item = Iter::Item;
76 
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,77     fn drive_unindexed<C>(self, consumer: C) -> C::Result
78     where
79         C: UnindexedConsumer<Self::Item>,
80     {
81         let split_count = AtomicUsize::new(current_num_threads());
82         let worker = Worker::new_fifo();
83         let stealer = worker.stealer();
84         let done = AtomicBool::new(false);
85         let iter = Mutex::new((self.iter, worker));
86 
87         bridge_unindexed(
88             IterParallelProducer {
89                 split_count: &split_count,
90                 done: &done,
91                 iter: &iter,
92                 items: stealer,
93             },
94             consumer,
95         )
96     }
97 }
98 
99 struct IterParallelProducer<'a, Iter: Iterator> {
100     split_count: &'a AtomicUsize,
101     done: &'a AtomicBool,
102     iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
103     items: Stealer<Iter::Item>,
104 }
105 
106 // manual clone because T doesn't need to be Clone, but the derive assumes it should be
107 impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> {
clone(&self) -> Self108     fn clone(&self) -> Self {
109         IterParallelProducer {
110             split_count: self.split_count,
111             done: self.done,
112             iter: self.iter,
113             items: self.items.clone(),
114         }
115     }
116 }
117 
118 impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter>
119 where
120     Iter::Item: Send,
121 {
122     type Item = Iter::Item;
123 
split(self) -> (Self, Option<Self>)124     fn split(self) -> (Self, Option<Self>) {
125         let mut count = self.split_count.load(Ordering::SeqCst);
126 
127         loop {
128             // Check if the iterator is exhausted *and* we've consumed every item from it.
129             let done = self.done.load(Ordering::SeqCst) && self.items.is_empty();
130 
131             match count.checked_sub(1) {
132                 Some(new_count) if !done => {
133                     match self.split_count.compare_exchange_weak(
134                         count,
135                         new_count,
136                         Ordering::SeqCst,
137                         Ordering::SeqCst,
138                     ) {
139                         Ok(_) => return (self.clone(), Some(self)),
140                         Err(last_count) => count = last_count,
141                     }
142                 }
143                 _ => {
144                     return (self, None);
145                 }
146             }
147         }
148     }
149 
fold_with<F>(self, mut folder: F) -> F where F: Folder<Self::Item>,150     fn fold_with<F>(self, mut folder: F) -> F
151     where
152         F: Folder<Self::Item>,
153     {
154         loop {
155             match self.items.steal() {
156                 Steal::Success(it) => {
157                     folder = folder.consume(it);
158                     if folder.full() {
159                         return folder;
160                     }
161                 }
162                 Steal::Empty => {
163                     // Don't storm the mutex if we're already done.
164                     if self.done.load(Ordering::SeqCst) {
165                         // Someone might have pushed more between our `steal()` and `done.load()`
166                         if self.items.is_empty() {
167                             // The iterator is out of items, no use in continuing
168                             return folder;
169                         }
170                     } else {
171                         // our cache is out of items, time to load more from the iterator
172                         match self.iter.try_lock() {
173                             Ok(mut guard) => {
174                                 // Check `done` again in case we raced with the previous lock
175                                 // holder on its way out.
176                                 if self.done.load(Ordering::SeqCst) {
177                                     if self.items.is_empty() {
178                                         return folder;
179                                     }
180                                     continue;
181                                 }
182 
183                                 let count = current_num_threads();
184                                 let count = (count * count) * 2;
185 
186                                 let (ref mut iter, ref worker) = *guard;
187 
188                                 // while worker.len() < count {
189                                 // FIXME the new deque doesn't let us count items.  We can just
190                                 // push a number of items, but that doesn't consider active
191                                 // stealers elsewhere.
192                                 for _ in 0..count {
193                                     if let Some(it) = iter.next() {
194                                         worker.push(it);
195                                     } else {
196                                         self.done.store(true, Ordering::SeqCst);
197                                         break;
198                                     }
199                                 }
200                             }
201                             Err(TryLockError::WouldBlock) => {
202                                 // someone else has the mutex, just sit tight until it's ready
203                                 yield_now(); //TODO: use a thread-pool-aware yield? (#548)
204                             }
205                             Err(TryLockError::Poisoned(_)) => {
206                                 // any panics from other threads will have been caught by the pool,
207                                 // and will be re-thrown when joined - just exit
208                                 return folder;
209                             }
210                         }
211                     }
212                 }
213                 Steal::Retry => (),
214             }
215         }
216     }
217 }
218