1 use crate::raw::Bucket;
2 use crate::raw::{Allocator, Global, RawIter, RawIterRange, RawTable};
3 use crate::scopeguard::guard;
4 use alloc::alloc::dealloc;
5 use core::marker::PhantomData;
6 use core::mem;
7 use core::ptr::NonNull;
8 use rayon::iter::{
9     plumbing::{self, Folder, UnindexedConsumer, UnindexedProducer},
10     ParallelIterator,
11 };
12 
13 /// Parallel iterator which returns a raw pointer to every full bucket in the table.
14 pub struct RawParIter<T> {
15     iter: RawIterRange<T>,
16 }
17 
18 impl<T> RawParIter<T> {
19     #[cfg_attr(feature = "inline-more", inline)]
iter(&self) -> RawIterRange<T>20     pub(super) unsafe fn iter(&self) -> RawIterRange<T> {
21         self.iter.clone()
22     }
23 }
24 
25 impl<T> Clone for RawParIter<T> {
26     #[cfg_attr(feature = "inline-more", inline)]
clone(&self) -> Self27     fn clone(&self) -> Self {
28         Self {
29             iter: self.iter.clone(),
30         }
31     }
32 }
33 
34 impl<T> From<RawIter<T>> for RawParIter<T> {
from(it: RawIter<T>) -> Self35     fn from(it: RawIter<T>) -> Self {
36         RawParIter { iter: it.iter }
37     }
38 }
39 
40 impl<T> ParallelIterator for RawParIter<T> {
41     type Item = Bucket<T>;
42 
43     #[cfg_attr(feature = "inline-more", inline)]
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,44     fn drive_unindexed<C>(self, consumer: C) -> C::Result
45     where
46         C: UnindexedConsumer<Self::Item>,
47     {
48         let producer = ParIterProducer { iter: self.iter };
49         plumbing::bridge_unindexed(producer, consumer)
50     }
51 }
52 
53 /// Producer which returns a `Bucket<T>` for every element.
54 struct ParIterProducer<T> {
55     iter: RawIterRange<T>,
56 }
57 
58 impl<T> UnindexedProducer for ParIterProducer<T> {
59     type Item = Bucket<T>;
60 
61     #[cfg_attr(feature = "inline-more", inline)]
split(self) -> (Self, Option<Self>)62     fn split(self) -> (Self, Option<Self>) {
63         let (left, right) = self.iter.split();
64         let left = ParIterProducer { iter: left };
65         let right = right.map(|right| ParIterProducer { iter: right });
66         (left, right)
67     }
68 
69     #[cfg_attr(feature = "inline-more", inline)]
fold_with<F>(self, folder: F) -> F where F: Folder<Self::Item>,70     fn fold_with<F>(self, folder: F) -> F
71     where
72         F: Folder<Self::Item>,
73     {
74         folder.consume_iter(self.iter)
75     }
76 }
77 
78 /// Parallel iterator which consumes a table and returns elements.
79 pub struct RawIntoParIter<T, A: Allocator + Clone = Global> {
80     table: RawTable<T, A>,
81 }
82 
83 impl<T, A: Allocator + Clone> RawIntoParIter<T, A> {
84     #[cfg_attr(feature = "inline-more", inline)]
par_iter(&self) -> RawParIter<T>85     pub(super) unsafe fn par_iter(&self) -> RawParIter<T> {
86         self.table.par_iter()
87     }
88 }
89 
90 impl<T: Send, A: Allocator + Clone> ParallelIterator for RawIntoParIter<T, A> {
91     type Item = T;
92 
93     #[cfg_attr(feature = "inline-more", inline)]
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,94     fn drive_unindexed<C>(self, consumer: C) -> C::Result
95     where
96         C: UnindexedConsumer<Self::Item>,
97     {
98         let iter = unsafe { self.table.iter().iter };
99         let _guard = guard(self.table.into_allocation(), |alloc| {
100             if let Some((ptr, layout)) = *alloc {
101                 unsafe {
102                     dealloc(ptr.as_ptr(), layout);
103                 }
104             }
105         });
106         let producer = ParDrainProducer { iter };
107         plumbing::bridge_unindexed(producer, consumer)
108     }
109 }
110 
111 /// Parallel iterator which consumes elements without freeing the table storage.
112 pub struct RawParDrain<'a, T, A: Allocator + Clone = Global> {
113     // We don't use a &'a mut RawTable<T> because we want RawParDrain to be
114     // covariant over T.
115     table: NonNull<RawTable<T, A>>,
116     marker: PhantomData<&'a RawTable<T, A>>,
117 }
118 
119 unsafe impl<T, A: Allocator + Clone> Send for RawParDrain<'_, T, A> {}
120 
121 impl<T, A: Allocator + Clone> RawParDrain<'_, T, A> {
122     #[cfg_attr(feature = "inline-more", inline)]
par_iter(&self) -> RawParIter<T>123     pub(super) unsafe fn par_iter(&self) -> RawParIter<T> {
124         self.table.as_ref().par_iter()
125     }
126 }
127 
128 impl<T: Send, A: Allocator + Clone> ParallelIterator for RawParDrain<'_, T, A> {
129     type Item = T;
130 
131     #[cfg_attr(feature = "inline-more", inline)]
drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,132     fn drive_unindexed<C>(self, consumer: C) -> C::Result
133     where
134         C: UnindexedConsumer<Self::Item>,
135     {
136         let _guard = guard(self.table, |table| unsafe {
137             table.as_mut().clear_no_drop()
138         });
139         let iter = unsafe { self.table.as_ref().iter().iter };
140         mem::forget(self);
141         let producer = ParDrainProducer { iter };
142         plumbing::bridge_unindexed(producer, consumer)
143     }
144 }
145 
146 impl<T, A: Allocator + Clone> Drop for RawParDrain<'_, T, A> {
drop(&mut self)147     fn drop(&mut self) {
148         // If drive_unindexed is not called then simply clear the table.
149         unsafe { self.table.as_mut().clear() }
150     }
151 }
152 
153 /// Producer which will consume all elements in the range, even if it is dropped
154 /// halfway through.
155 struct ParDrainProducer<T> {
156     iter: RawIterRange<T>,
157 }
158 
159 impl<T: Send> UnindexedProducer for ParDrainProducer<T> {
160     type Item = T;
161 
162     #[cfg_attr(feature = "inline-more", inline)]
split(self) -> (Self, Option<Self>)163     fn split(self) -> (Self, Option<Self>) {
164         let (left, right) = self.iter.clone().split();
165         mem::forget(self);
166         let left = ParDrainProducer { iter: left };
167         let right = right.map(|right| ParDrainProducer { iter: right });
168         (left, right)
169     }
170 
171     #[cfg_attr(feature = "inline-more", inline)]
fold_with<F>(mut self, mut folder: F) -> F where F: Folder<Self::Item>,172     fn fold_with<F>(mut self, mut folder: F) -> F
173     where
174         F: Folder<Self::Item>,
175     {
176         // Make sure to modify the iterator in-place so that any remaining
177         // elements are processed in our Drop impl.
178         while let Some(item) = self.iter.next() {
179             folder = folder.consume(unsafe { item.read() });
180             if folder.full() {
181                 return folder;
182             }
183         }
184 
185         // If we processed all elements then we don't need to run the drop.
186         mem::forget(self);
187         folder
188     }
189 }
190 
191 impl<T> Drop for ParDrainProducer<T> {
192     #[cfg_attr(feature = "inline-more", inline)]
drop(&mut self)193     fn drop(&mut self) {
194         // Drop all remaining elements
195         if mem::needs_drop::<T>() {
196             while let Some(item) = self.iter.next() {
197                 unsafe {
198                     item.drop();
199                 }
200             }
201         }
202     }
203 }
204 
205 impl<T, A: Allocator + Clone> RawTable<T, A> {
206     /// Returns a parallel iterator over the elements in a `RawTable`.
207     #[cfg_attr(feature = "inline-more", inline)]
par_iter(&self) -> RawParIter<T>208     pub unsafe fn par_iter(&self) -> RawParIter<T> {
209         RawParIter {
210             iter: self.iter().iter,
211         }
212     }
213 
214     /// Returns a parallel iterator over the elements in a `RawTable`.
215     #[cfg_attr(feature = "inline-more", inline)]
into_par_iter(self) -> RawIntoParIter<T, A>216     pub fn into_par_iter(self) -> RawIntoParIter<T, A> {
217         RawIntoParIter { table: self }
218     }
219 
220     /// Returns a parallel iterator which consumes all elements of a `RawTable`
221     /// without freeing its memory allocation.
222     #[cfg_attr(feature = "inline-more", inline)]
par_drain(&mut self) -> RawParDrain<'_, T, A>223     pub fn par_drain(&mut self) -> RawParDrain<'_, T, A> {
224         RawParDrain {
225             table: NonNull::from(self),
226             marker: PhantomData,
227         }
228     }
229 }
230