1 use crossbeam_deque::{Steal, Stealer, Worker}; 2 3 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 4 use std::sync::{Mutex, TryLockError}; 5 use std::thread::yield_now; 6 7 use crate::current_num_threads; 8 use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer}; 9 use crate::iter::ParallelIterator; 10 11 /// Conversion trait to convert an `Iterator` to a `ParallelIterator`. 12 /// 13 /// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items 14 /// across the Rayon thread pool. This has the advantage of being able to parallelize just about 15 /// anything, but the resulting `ParallelIterator` can be less efficient than if you started with 16 /// `par_iter` instead. However, it can still be useful for iterators that are difficult to 17 /// parallelize by other means, like channels or file or network I/O. 18 /// 19 /// The resulting iterator is not guaranteed to keep the order of the original iterator. 20 /// 21 /// # Examples 22 /// 23 /// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can 24 /// use any of the `ParallelIterator` methods: 25 /// 26 /// ``` 27 /// use rayon::iter::ParallelBridge; 28 /// use rayon::prelude::ParallelIterator; 29 /// use std::sync::mpsc::channel; 30 /// 31 /// let rx = { 32 /// let (tx, rx) = channel(); 33 /// 34 /// tx.send("one!"); 35 /// tx.send("two!"); 36 /// tx.send("three!"); 37 /// 38 /// rx 39 /// }; 40 /// 41 /// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect(); 42 /// output.sort_unstable(); 43 /// 44 /// assert_eq!(&*output, &["one!", "three!", "two!"]); 45 /// ``` 46 pub trait ParallelBridge: Sized { 47 /// Creates a bridge from this type to a `ParallelIterator`. par_bridge(self) -> IterBridge<Self>48 fn par_bridge(self) -> IterBridge<Self>; 49 } 50 51 impl<T: Iterator + Send> ParallelBridge for T 52 where 53 T::Item: Send, 54 { par_bridge(self) -> IterBridge<Self>55 fn par_bridge(self) -> IterBridge<Self> { 56 IterBridge { iter: self } 57 } 58 } 59 60 /// `IterBridge` is a parallel iterator that wraps a sequential iterator. 61 /// 62 /// This type is created when using the `par_bridge` method on `ParallelBridge`. See the 63 /// [`ParallelBridge`] documentation for details. 64 /// 65 /// [`ParallelBridge`]: trait.ParallelBridge.html 66 #[derive(Debug, Clone)] 67 pub struct IterBridge<Iter> { 68 iter: Iter, 69 } 70 71 impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter> 72 where 73 Iter::Item: Send, 74 { 75 type Item = Iter::Item; 76 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,77 fn drive_unindexed<C>(self, consumer: C) -> C::Result 78 where 79 C: UnindexedConsumer<Self::Item>, 80 { 81 let split_count = AtomicUsize::new(current_num_threads()); 82 let worker = Worker::new_fifo(); 83 let stealer = worker.stealer(); 84 let done = AtomicBool::new(false); 85 let iter = Mutex::new((self.iter, worker)); 86 87 bridge_unindexed( 88 IterParallelProducer { 89 split_count: &split_count, 90 done: &done, 91 iter: &iter, 92 items: stealer, 93 }, 94 consumer, 95 ) 96 } 97 } 98 99 struct IterParallelProducer<'a, Iter: Iterator> { 100 split_count: &'a AtomicUsize, 101 done: &'a AtomicBool, 102 iter: &'a Mutex<(Iter, Worker<Iter::Item>)>, 103 items: Stealer<Iter::Item>, 104 } 105 106 // manual clone because T doesn't need to be Clone, but the derive assumes it should be 107 impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> { clone(&self) -> Self108 fn clone(&self) -> Self { 109 IterParallelProducer { 110 split_count: self.split_count, 111 done: self.done, 112 iter: self.iter, 113 items: self.items.clone(), 114 } 115 } 116 } 117 118 impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter> 119 where 120 Iter::Item: Send, 121 { 122 type Item = Iter::Item; 123 split(self) -> (Self, Option<Self>)124 fn split(self) -> (Self, Option<Self>) { 125 let mut count = self.split_count.load(Ordering::SeqCst); 126 127 loop { 128 // Check if the iterator is exhausted *and* we've consumed every item from it. 129 let done = self.done.load(Ordering::SeqCst) && self.items.is_empty(); 130 131 match count.checked_sub(1) { 132 Some(new_count) if !done => { 133 match self.split_count.compare_exchange_weak( 134 count, 135 new_count, 136 Ordering::SeqCst, 137 Ordering::SeqCst, 138 ) { 139 Ok(_) => return (self.clone(), Some(self)), 140 Err(last_count) => count = last_count, 141 } 142 } 143 _ => { 144 return (self, None); 145 } 146 } 147 } 148 } 149 fold_with<F>(self, mut folder: F) -> F where F: Folder<Self::Item>,150 fn fold_with<F>(self, mut folder: F) -> F 151 where 152 F: Folder<Self::Item>, 153 { 154 loop { 155 match self.items.steal() { 156 Steal::Success(it) => { 157 folder = folder.consume(it); 158 if folder.full() { 159 return folder; 160 } 161 } 162 Steal::Empty => { 163 // Don't storm the mutex if we're already done. 164 if self.done.load(Ordering::SeqCst) { 165 // Someone might have pushed more between our `steal()` and `done.load()` 166 if self.items.is_empty() { 167 // The iterator is out of items, no use in continuing 168 return folder; 169 } 170 } else { 171 // our cache is out of items, time to load more from the iterator 172 match self.iter.try_lock() { 173 Ok(mut guard) => { 174 // Check `done` again in case we raced with the previous lock 175 // holder on its way out. 176 if self.done.load(Ordering::SeqCst) { 177 if self.items.is_empty() { 178 return folder; 179 } 180 continue; 181 } 182 183 let count = current_num_threads(); 184 let count = (count * count) * 2; 185 186 let (ref mut iter, ref worker) = *guard; 187 188 // while worker.len() < count { 189 // FIXME the new deque doesn't let us count items. We can just 190 // push a number of items, but that doesn't consider active 191 // stealers elsewhere. 192 for _ in 0..count { 193 if let Some(it) = iter.next() { 194 worker.push(it); 195 } else { 196 self.done.store(true, Ordering::SeqCst); 197 break; 198 } 199 } 200 } 201 Err(TryLockError::WouldBlock) => { 202 // someone else has the mutex, just sit tight until it's ready 203 yield_now(); //TODO: use a thread-pool-aware yield? (#548) 204 } 205 Err(TryLockError::Poisoned(_)) => { 206 // any panics from other threads will have been caught by the pool, 207 // and will be re-thrown when joined - just exit 208 return folder; 209 } 210 } 211 } 212 } 213 Steal::Retry => (), 214 } 215 } 216 } 217 } 218