1 //! Parallel iterator types for [vectors][std::vec] (`Vec<T>`) 2 //! 3 //! You will rarely need to interact with this module directly unless you need 4 //! to name one of the iterator types. 5 //! 6 //! [std::vec]: https://doc.rust-lang.org/stable/std/vec/ 7 8 use crate::iter::plumbing::*; 9 use crate::iter::*; 10 use crate::math::simplify_range; 11 use crate::slice::{Iter, IterMut}; 12 use std::iter; 13 use std::mem; 14 use std::ops::{Range, RangeBounds}; 15 use std::ptr; 16 use std::slice; 17 18 impl<'data, T: Sync + 'data> IntoParallelIterator for &'data Vec<T> { 19 type Item = &'data T; 20 type Iter = Iter<'data, T>; 21 into_par_iter(self) -> Self::Iter22 fn into_par_iter(self) -> Self::Iter { 23 <&[T]>::into_par_iter(self) 24 } 25 } 26 27 impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut Vec<T> { 28 type Item = &'data mut T; 29 type Iter = IterMut<'data, T>; 30 into_par_iter(self) -> Self::Iter31 fn into_par_iter(self) -> Self::Iter { 32 <&mut [T]>::into_par_iter(self) 33 } 34 } 35 36 /// Parallel iterator that moves out of a vector. 37 #[derive(Debug, Clone)] 38 pub struct IntoIter<T: Send> { 39 vec: Vec<T>, 40 } 41 42 impl<T: Send> IntoParallelIterator for Vec<T> { 43 type Item = T; 44 type Iter = IntoIter<T>; 45 into_par_iter(self) -> Self::Iter46 fn into_par_iter(self) -> Self::Iter { 47 IntoIter { vec: self } 48 } 49 } 50 51 impl<T: Send> ParallelIterator for IntoIter<T> { 52 type Item = T; 53 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,54 fn drive_unindexed<C>(self, consumer: C) -> C::Result 55 where 56 C: UnindexedConsumer<Self::Item>, 57 { 58 bridge(self, consumer) 59 } 60 opt_len(&self) -> Option<usize>61 fn opt_len(&self) -> Option<usize> { 62 Some(self.len()) 63 } 64 } 65 66 impl<T: Send> IndexedParallelIterator for IntoIter<T> { drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>,67 fn drive<C>(self, consumer: C) -> C::Result 68 where 69 C: Consumer<Self::Item>, 70 { 71 bridge(self, consumer) 72 } 73 len(&self) -> usize74 fn len(&self) -> usize { 75 self.vec.len() 76 } 77 with_producer<CB>(mut self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>,78 fn with_producer<CB>(mut self, callback: CB) -> CB::Output 79 where 80 CB: ProducerCallback<Self::Item>, 81 { 82 // Drain every item, and then the vector only needs to free its buffer. 83 self.vec.par_drain(..).with_producer(callback) 84 } 85 } 86 87 impl<'data, T: Send> ParallelDrainRange<usize> for &'data mut Vec<T> { 88 type Iter = Drain<'data, T>; 89 type Item = T; 90 par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter91 fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter { 92 Drain { 93 orig_len: self.len(), 94 range: simplify_range(range, self.len()), 95 vec: self, 96 } 97 } 98 } 99 100 /// Draining parallel iterator that moves a range out of a vector, but keeps the total capacity. 101 #[derive(Debug)] 102 pub struct Drain<'data, T: Send> { 103 vec: &'data mut Vec<T>, 104 range: Range<usize>, 105 orig_len: usize, 106 } 107 108 impl<'data, T: Send> ParallelIterator for Drain<'data, T> { 109 type Item = T; 110 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,111 fn drive_unindexed<C>(self, consumer: C) -> C::Result 112 where 113 C: UnindexedConsumer<Self::Item>, 114 { 115 bridge(self, consumer) 116 } 117 opt_len(&self) -> Option<usize>118 fn opt_len(&self) -> Option<usize> { 119 Some(self.len()) 120 } 121 } 122 123 impl<'data, T: Send> IndexedParallelIterator for Drain<'data, T> { drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>,124 fn drive<C>(self, consumer: C) -> C::Result 125 where 126 C: Consumer<Self::Item>, 127 { 128 bridge(self, consumer) 129 } 130 len(&self) -> usize131 fn len(&self) -> usize { 132 self.range.len() 133 } 134 with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>,135 fn with_producer<CB>(self, callback: CB) -> CB::Output 136 where 137 CB: ProducerCallback<Self::Item>, 138 { 139 unsafe { 140 // Make the vector forget about the drained items, and temporarily the tail too. 141 let start = self.range.start; 142 self.vec.set_len(start); 143 144 // Create the producer as the exclusive "owner" of the slice. 145 let producer = { 146 // Get a correct borrow lifetime, then extend it to the original length. 147 let mut slice = &mut self.vec[start..]; 148 slice = slice::from_raw_parts_mut(slice.as_mut_ptr(), self.range.len()); 149 DrainProducer::new(slice) 150 }; 151 152 // The producer will move or drop each item from the drained range. 153 callback.callback(producer) 154 } 155 } 156 } 157 158 impl<'data, T: Send> Drop for Drain<'data, T> { drop(&mut self)159 fn drop(&mut self) { 160 if self.range.len() > 0 { 161 let Range { start, end } = self.range; 162 if self.vec.len() != start { 163 // We must not have produced, so just call a normal drain to remove the items. 164 assert_eq!(self.vec.len(), self.orig_len); 165 self.vec.drain(start..end); 166 } else if end < self.orig_len { 167 // The producer was responsible for consuming the drained items. 168 // Move the tail items to their new place, then set the length to include them. 169 unsafe { 170 let ptr = self.vec.as_mut_ptr().add(start); 171 let tail_ptr = self.vec.as_ptr().add(end); 172 let tail_len = self.orig_len - end; 173 ptr::copy(tail_ptr, ptr, tail_len); 174 self.vec.set_len(start + tail_len); 175 } 176 } 177 } 178 } 179 } 180 181 /// //////////////////////////////////////////////////////////////////////// 182 183 pub(crate) struct DrainProducer<'data, T: Send> { 184 slice: &'data mut [T], 185 } 186 187 impl<'data, T: 'data + Send> DrainProducer<'data, T> { 188 /// Creates a draining producer, which *moves* items from the slice. 189 /// 190 /// Unsafe bacause `!Copy` data must not be read after the borrow is released. new(slice: &'data mut [T]) -> Self191 pub(crate) unsafe fn new(slice: &'data mut [T]) -> Self { 192 DrainProducer { slice } 193 } 194 } 195 196 impl<'data, T: 'data + Send> Producer for DrainProducer<'data, T> { 197 type Item = T; 198 type IntoIter = SliceDrain<'data, T>; 199 into_iter(mut self) -> Self::IntoIter200 fn into_iter(mut self) -> Self::IntoIter { 201 // replace the slice so we don't drop it twice 202 let slice = mem::replace(&mut self.slice, &mut []); 203 SliceDrain { 204 iter: slice.iter_mut(), 205 } 206 } 207 split_at(mut self, index: usize) -> (Self, Self)208 fn split_at(mut self, index: usize) -> (Self, Self) { 209 // replace the slice so we don't drop it twice 210 let slice = mem::replace(&mut self.slice, &mut []); 211 let (left, right) = slice.split_at_mut(index); 212 unsafe { (DrainProducer::new(left), DrainProducer::new(right)) } 213 } 214 } 215 216 impl<'data, T: 'data + Send> Drop for DrainProducer<'data, T> { drop(&mut self)217 fn drop(&mut self) { 218 // use `Drop for [T]` 219 unsafe { ptr::drop_in_place(self.slice) }; 220 } 221 } 222 223 /// //////////////////////////////////////////////////////////////////////// 224 225 // like std::vec::Drain, without updating a source Vec 226 pub(crate) struct SliceDrain<'data, T> { 227 iter: slice::IterMut<'data, T>, 228 } 229 230 impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { 231 type Item = T; 232 next(&mut self) -> Option<T>233 fn next(&mut self) -> Option<T> { 234 // Coerce the pointer early, so we don't keep the 235 // reference that's about to be invalidated. 236 let ptr: *const T = self.iter.next()?; 237 Some(unsafe { ptr::read(ptr) }) 238 } 239 size_hint(&self) -> (usize, Option<usize>)240 fn size_hint(&self) -> (usize, Option<usize>) { 241 self.iter.size_hint() 242 } 243 count(self) -> usize244 fn count(self) -> usize { 245 self.iter.len() 246 } 247 } 248 249 impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> { next_back(&mut self) -> Option<Self::Item>250 fn next_back(&mut self) -> Option<Self::Item> { 251 // Coerce the pointer early, so we don't keep the 252 // reference that's about to be invalidated. 253 let ptr: *const T = self.iter.next_back()?; 254 Some(unsafe { ptr::read(ptr) }) 255 } 256 } 257 258 impl<'data, T: 'data> ExactSizeIterator for SliceDrain<'data, T> { len(&self) -> usize259 fn len(&self) -> usize { 260 self.iter.len() 261 } 262 } 263 264 impl<'data, T: 'data> iter::FusedIterator for SliceDrain<'data, T> {} 265 266 impl<'data, T: 'data> Drop for SliceDrain<'data, T> { drop(&mut self)267 fn drop(&mut self) { 268 // extract the iterator so we can use `Drop for [T]` 269 let iter = mem::replace(&mut self.iter, [].iter_mut()); 270 unsafe { ptr::drop_in_place(iter.into_slice()) }; 271 } 272 } 273