1 use crate::raw::Bucket;
2 use crate::raw::{RawIter, RawIterRange, RawTable};
3 use crate::scopeguard::guard;
4 use alloc::alloc::dealloc;
5 use core::marker::PhantomData;
6 use core::mem;
7 use core::ptr::NonNull;
8 use rayon::iter::{
9     plumbing::{self, Folder, UnindexedConsumer, UnindexedProducer},
10     ParallelIterator,
11 };
12 
13 /// Parallel iterator which returns a raw pointer to every full bucket in the table.
14 pub struct RawParIter<T> {
15     iter: RawIterRange<T>,
16 }
17 
18 impl<T> From<RawIter<T>> for RawParIter<T> {
from(it: RawIter<T>) -> Self19     fn from(it: RawIter<T>) -> Self {
20         RawParIter { iter: it.iter }
21     }
22 }
23 
24 impl<T> ParallelIterator for RawParIter<T> {
25     type Item = Bucket<T>;
26 
27     #[cfg_attr(feature = "inline-more", inline)]
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,28     fn drive_unindexed<C>(self, consumer: C) -> C::Result
29     where
30         C: UnindexedConsumer<Self::Item>,
31     {
32         let producer = ParIterProducer { iter: self.iter };
33         plumbing::bridge_unindexed(producer, consumer)
34     }
35 }
36 
37 /// Producer which returns a `Bucket<T>` for every element.
38 struct ParIterProducer<T> {
39     iter: RawIterRange<T>,
40 }
41 
42 impl<T> UnindexedProducer for ParIterProducer<T> {
43     type Item = Bucket<T>;
44 
45     #[cfg_attr(feature = "inline-more", inline)]
split(self) -> (Self, Option<Self>)46     fn split(self) -> (Self, Option<Self>) {
47         let (left, right) = self.iter.split();
48         let left = ParIterProducer { iter: left };
49         let right = right.map(|right| ParIterProducer { iter: right });
50         (left, right)
51     }
52 
53     #[cfg_attr(feature = "inline-more", inline)]
fold_with<F>(self, folder: F) -> F where F: Folder<Self::Item>,54     fn fold_with<F>(self, folder: F) -> F
55     where
56         F: Folder<Self::Item>,
57     {
58         folder.consume_iter(self.iter)
59     }
60 }
61 
62 /// Parallel iterator which consumes a table and returns elements.
63 pub struct RawIntoParIter<T> {
64     table: RawTable<T>,
65 }
66 
67 impl<T: Send> ParallelIterator for RawIntoParIter<T> {
68     type Item = T;
69 
70     #[cfg_attr(feature = "inline-more", inline)]
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,71     fn drive_unindexed<C>(self, consumer: C) -> C::Result
72     where
73         C: UnindexedConsumer<Self::Item>,
74     {
75         let iter = unsafe { self.table.iter().iter };
76         let _guard = guard(self.table.into_alloc(), |alloc| {
77             if let Some((ptr, layout)) = *alloc {
78                 unsafe {
79                     dealloc(ptr.as_ptr(), layout);
80                 }
81             }
82         });
83         let producer = ParDrainProducer { iter };
84         plumbing::bridge_unindexed(producer, consumer)
85     }
86 }
87 
88 /// Parallel iterator which consumes elements without freeing the table storage.
89 pub struct RawParDrain<'a, T> {
90     // We don't use a &'a mut RawTable<T> because we want RawParDrain to be
91     // covariant over T.
92     table: NonNull<RawTable<T>>,
93     marker: PhantomData<&'a RawTable<T>>,
94 }
95 
96 unsafe impl<T> Send for RawParDrain<'_, T> {}
97 
98 impl<T: Send> ParallelIterator for RawParDrain<'_, T> {
99     type Item = T;
100 
101     #[cfg_attr(feature = "inline-more", inline)]
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,102     fn drive_unindexed<C>(self, consumer: C) -> C::Result
103     where
104         C: UnindexedConsumer<Self::Item>,
105     {
106         let _guard = guard(self.table, |table| unsafe {
107             table.as_mut().clear_no_drop()
108         });
109         let iter = unsafe { self.table.as_ref().iter().iter };
110         mem::forget(self);
111         let producer = ParDrainProducer { iter };
112         plumbing::bridge_unindexed(producer, consumer)
113     }
114 }
115 
116 impl<T> Drop for RawParDrain<'_, T> {
drop(&mut self)117     fn drop(&mut self) {
118         // If drive_unindexed is not called then simply clear the table.
119         unsafe { self.table.as_mut().clear() }
120     }
121 }
122 
123 /// Producer which will consume all elements in the range, even if it is dropped
124 /// halfway through.
125 struct ParDrainProducer<T> {
126     iter: RawIterRange<T>,
127 }
128 
129 impl<T: Send> UnindexedProducer for ParDrainProducer<T> {
130     type Item = T;
131 
132     #[cfg_attr(feature = "inline-more", inline)]
split(self) -> (Self, Option<Self>)133     fn split(self) -> (Self, Option<Self>) {
134         let (left, right) = self.iter.clone().split();
135         mem::forget(self);
136         let left = ParDrainProducer { iter: left };
137         let right = right.map(|right| ParDrainProducer { iter: right });
138         (left, right)
139     }
140 
141     #[cfg_attr(feature = "inline-more", inline)]
fold_with<F>(mut self, mut folder: F) -> F where F: Folder<Self::Item>,142     fn fold_with<F>(mut self, mut folder: F) -> F
143     where
144         F: Folder<Self::Item>,
145     {
146         // Make sure to modify the iterator in-place so that any remaining
147         // elements are processed in our Drop impl.
148         while let Some(item) = self.iter.next() {
149             folder = folder.consume(unsafe { item.read() });
150             if folder.full() {
151                 return folder;
152             }
153         }
154 
155         // If we processed all elements then we don't need to run the drop.
156         mem::forget(self);
157         folder
158     }
159 }
160 
161 impl<T> Drop for ParDrainProducer<T> {
162     #[cfg_attr(feature = "inline-more", inline)]
drop(&mut self)163     fn drop(&mut self) {
164         // Drop all remaining elements
165         if mem::needs_drop::<T>() {
166             while let Some(item) = self.iter.next() {
167                 unsafe {
168                     item.drop();
169                 }
170             }
171         }
172     }
173 }
174 
175 impl<T> RawTable<T> {
176     /// Returns a parallel iterator over the elements in a `RawTable`.
177     #[cfg_attr(feature = "inline-more", inline)]
par_iter(&self) -> RawParIter<T>178     pub unsafe fn par_iter(&self) -> RawParIter<T> {
179         RawParIter {
180             iter: self.iter().iter,
181         }
182     }
183 
184     /// Returns a parallel iterator over the elements in a `RawTable`.
185     #[cfg_attr(feature = "inline-more", inline)]
into_par_iter(self) -> RawIntoParIter<T>186     pub fn into_par_iter(self) -> RawIntoParIter<T> {
187         RawIntoParIter { table: self }
188     }
189 
190     /// Returns a parallel iterator which consumes all elements of a `RawTable`
191     /// without freeing its memory allocation.
192     #[cfg_attr(feature = "inline-more", inline)]
par_drain(&mut self) -> RawParDrain<'_, T>193     pub fn par_drain(&mut self) -> RawParDrain<'_, T> {
194         RawParDrain {
195             table: NonNull::from(self),
196             marker: PhantomData,
197         }
198     }
199 }
200