1 use crate::raw::Bucket; 2 use crate::raw::{RawIter, RawIterRange, RawTable}; 3 use crate::scopeguard::guard; 4 use alloc::alloc::dealloc; 5 use core::marker::PhantomData; 6 use core::mem; 7 use core::ptr::NonNull; 8 use rayon::iter::{ 9 plumbing::{self, Folder, UnindexedConsumer, UnindexedProducer}, 10 ParallelIterator, 11 }; 12 13 /// Parallel iterator which returns a raw pointer to every full bucket in the table. 14 pub struct RawParIter<T> { 15 iter: RawIterRange<T>, 16 } 17 18 impl<T> From<RawIter<T>> for RawParIter<T> { from(it: RawIter<T>) -> Self19 fn from(it: RawIter<T>) -> Self { 20 RawParIter { iter: it.iter } 21 } 22 } 23 24 impl<T> ParallelIterator for RawParIter<T> { 25 type Item = Bucket<T>; 26 27 #[cfg_attr(feature = "inline-more", inline)] drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,28 fn drive_unindexed<C>(self, consumer: C) -> C::Result 29 where 30 C: UnindexedConsumer<Self::Item>, 31 { 32 let producer = ParIterProducer { iter: self.iter }; 33 plumbing::bridge_unindexed(producer, consumer) 34 } 35 } 36 37 /// Producer which returns a `Bucket<T>` for every element. 38 struct ParIterProducer<T> { 39 iter: RawIterRange<T>, 40 } 41 42 impl<T> UnindexedProducer for ParIterProducer<T> { 43 type Item = Bucket<T>; 44 45 #[cfg_attr(feature = "inline-more", inline)] split(self) -> (Self, Option<Self>)46 fn split(self) -> (Self, Option<Self>) { 47 let (left, right) = self.iter.split(); 48 let left = ParIterProducer { iter: left }; 49 let right = right.map(|right| ParIterProducer { iter: right }); 50 (left, right) 51 } 52 53 #[cfg_attr(feature = "inline-more", inline)] fold_with<F>(self, folder: F) -> F where F: Folder<Self::Item>,54 fn fold_with<F>(self, folder: F) -> F 55 where 56 F: Folder<Self::Item>, 57 { 58 folder.consume_iter(self.iter) 59 } 60 } 61 62 /// Parallel iterator which consumes a table and returns elements. 63 pub struct RawIntoParIter<T> { 64 table: RawTable<T>, 65 } 66 67 impl<T: Send> ParallelIterator for RawIntoParIter<T> { 68 type Item = T; 69 70 #[cfg_attr(feature = "inline-more", inline)] drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,71 fn drive_unindexed<C>(self, consumer: C) -> C::Result 72 where 73 C: UnindexedConsumer<Self::Item>, 74 { 75 let iter = unsafe { self.table.iter().iter }; 76 let _guard = guard(self.table.into_alloc(), |alloc| { 77 if let Some((ptr, layout)) = *alloc { 78 unsafe { 79 dealloc(ptr.as_ptr(), layout); 80 } 81 } 82 }); 83 let producer = ParDrainProducer { iter }; 84 plumbing::bridge_unindexed(producer, consumer) 85 } 86 } 87 88 /// Parallel iterator which consumes elements without freeing the table storage. 89 pub struct RawParDrain<'a, T> { 90 // We don't use a &'a mut RawTable<T> because we want RawParDrain to be 91 // covariant over T. 92 table: NonNull<RawTable<T>>, 93 marker: PhantomData<&'a RawTable<T>>, 94 } 95 96 unsafe impl<T> Send for RawParDrain<'_, T> {} 97 98 impl<T: Send> ParallelIterator for RawParDrain<'_, T> { 99 type Item = T; 100 101 #[cfg_attr(feature = "inline-more", inline)] drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,102 fn drive_unindexed<C>(self, consumer: C) -> C::Result 103 where 104 C: UnindexedConsumer<Self::Item>, 105 { 106 let _guard = guard(self.table, |table| unsafe { 107 table.as_mut().clear_no_drop() 108 }); 109 let iter = unsafe { self.table.as_ref().iter().iter }; 110 mem::forget(self); 111 let producer = ParDrainProducer { iter }; 112 plumbing::bridge_unindexed(producer, consumer) 113 } 114 } 115 116 impl<T> Drop for RawParDrain<'_, T> { drop(&mut self)117 fn drop(&mut self) { 118 // If drive_unindexed is not called then simply clear the table. 119 unsafe { self.table.as_mut().clear() } 120 } 121 } 122 123 /// Producer which will consume all elements in the range, even if it is dropped 124 /// halfway through. 125 struct ParDrainProducer<T> { 126 iter: RawIterRange<T>, 127 } 128 129 impl<T: Send> UnindexedProducer for ParDrainProducer<T> { 130 type Item = T; 131 132 #[cfg_attr(feature = "inline-more", inline)] split(self) -> (Self, Option<Self>)133 fn split(self) -> (Self, Option<Self>) { 134 let (left, right) = self.iter.clone().split(); 135 mem::forget(self); 136 let left = ParDrainProducer { iter: left }; 137 let right = right.map(|right| ParDrainProducer { iter: right }); 138 (left, right) 139 } 140 141 #[cfg_attr(feature = "inline-more", inline)] fold_with<F>(mut self, mut folder: F) -> F where F: Folder<Self::Item>,142 fn fold_with<F>(mut self, mut folder: F) -> F 143 where 144 F: Folder<Self::Item>, 145 { 146 // Make sure to modify the iterator in-place so that any remaining 147 // elements are processed in our Drop impl. 148 while let Some(item) = self.iter.next() { 149 folder = folder.consume(unsafe { item.read() }); 150 if folder.full() { 151 return folder; 152 } 153 } 154 155 // If we processed all elements then we don't need to run the drop. 156 mem::forget(self); 157 folder 158 } 159 } 160 161 impl<T> Drop for ParDrainProducer<T> { 162 #[cfg_attr(feature = "inline-more", inline)] drop(&mut self)163 fn drop(&mut self) { 164 // Drop all remaining elements 165 if mem::needs_drop::<T>() { 166 while let Some(item) = self.iter.next() { 167 unsafe { 168 item.drop(); 169 } 170 } 171 } 172 } 173 } 174 175 impl<T> RawTable<T> { 176 /// Returns a parallel iterator over the elements in a `RawTable`. 177 #[cfg_attr(feature = "inline-more", inline)] par_iter(&self) -> RawParIter<T>178 pub unsafe fn par_iter(&self) -> RawParIter<T> { 179 RawParIter { 180 iter: self.iter().iter, 181 } 182 } 183 184 /// Returns a parallel iterator over the elements in a `RawTable`. 185 #[cfg_attr(feature = "inline-more", inline)] into_par_iter(self) -> RawIntoParIter<T>186 pub fn into_par_iter(self) -> RawIntoParIter<T> { 187 RawIntoParIter { table: self } 188 } 189 190 /// Returns a parallel iterator which consumes all elements of a `RawTable` 191 /// without freeing its memory allocation. 192 #[cfg_attr(feature = "inline-more", inline)] par_drain(&mut self) -> RawParDrain<'_, T>193 pub fn par_drain(&mut self) -> RawParDrain<'_, T> { 194 RawParDrain { 195 table: NonNull::from(self), 196 marker: PhantomData, 197 } 198 } 199 } 200