use crate::raw::Bucket; use crate::raw::{Allocator, Global, RawIter, RawIterRange, RawTable}; use crate::scopeguard::guard; use alloc::alloc::dealloc; use core::marker::PhantomData; use core::mem; use core::ptr::NonNull; use rayon::iter::{ plumbing::{self, Folder, UnindexedConsumer, UnindexedProducer}, ParallelIterator, }; /// Parallel iterator which returns a raw pointer to every full bucket in the table. pub struct RawParIter { iter: RawIterRange, } impl RawParIter { #[cfg_attr(feature = "inline-more", inline)] pub(super) unsafe fn iter(&self) -> RawIterRange { self.iter.clone() } } impl Clone for RawParIter { #[cfg_attr(feature = "inline-more", inline)] fn clone(&self) -> Self { Self { iter: self.iter.clone(), } } } impl From> for RawParIter { fn from(it: RawIter) -> Self { RawParIter { iter: it.iter } } } impl ParallelIterator for RawParIter { type Item = Bucket; #[cfg_attr(feature = "inline-more", inline)] fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer, { let producer = ParIterProducer { iter: self.iter }; plumbing::bridge_unindexed(producer, consumer) } } /// Producer which returns a `Bucket` for every element. struct ParIterProducer { iter: RawIterRange, } impl UnindexedProducer for ParIterProducer { type Item = Bucket; #[cfg_attr(feature = "inline-more", inline)] fn split(self) -> (Self, Option) { let (left, right) = self.iter.split(); let left = ParIterProducer { iter: left }; let right = right.map(|right| ParIterProducer { iter: right }); (left, right) } #[cfg_attr(feature = "inline-more", inline)] fn fold_with(self, folder: F) -> F where F: Folder, { folder.consume_iter(self.iter) } } /// Parallel iterator which consumes a table and returns elements. pub struct RawIntoParIter { table: RawTable, } impl RawIntoParIter { #[cfg_attr(feature = "inline-more", inline)] pub(super) unsafe fn par_iter(&self) -> RawParIter { self.table.par_iter() } } impl ParallelIterator for RawIntoParIter { type Item = T; #[cfg_attr(feature = "inline-more", inline)] fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer, { let iter = unsafe { self.table.iter().iter }; let _guard = guard(self.table.into_allocation(), |alloc| { if let Some((ptr, layout)) = *alloc { unsafe { dealloc(ptr.as_ptr(), layout); } } }); let producer = ParDrainProducer { iter }; plumbing::bridge_unindexed(producer, consumer) } } /// Parallel iterator which consumes elements without freeing the table storage. pub struct RawParDrain<'a, T, A: Allocator + Clone = Global> { // We don't use a &'a mut RawTable because we want RawParDrain to be // covariant over T. table: NonNull>, marker: PhantomData<&'a RawTable>, } unsafe impl Send for RawParDrain<'_, T, A> {} impl RawParDrain<'_, T, A> { #[cfg_attr(feature = "inline-more", inline)] pub(super) unsafe fn par_iter(&self) -> RawParIter { self.table.as_ref().par_iter() } } impl ParallelIterator for RawParDrain<'_, T, A> { type Item = T; #[cfg_attr(feature = "inline-more", inline)] fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer, { let _guard = guard(self.table, |table| unsafe { table.as_mut().clear_no_drop() }); let iter = unsafe { self.table.as_ref().iter().iter }; mem::forget(self); let producer = ParDrainProducer { iter }; plumbing::bridge_unindexed(producer, consumer) } } impl Drop for RawParDrain<'_, T, A> { fn drop(&mut self) { // If drive_unindexed is not called then simply clear the table. unsafe { self.table.as_mut().clear() } } } /// Producer which will consume all elements in the range, even if it is dropped /// halfway through. struct ParDrainProducer { iter: RawIterRange, } impl UnindexedProducer for ParDrainProducer { type Item = T; #[cfg_attr(feature = "inline-more", inline)] fn split(self) -> (Self, Option) { let (left, right) = self.iter.clone().split(); mem::forget(self); let left = ParDrainProducer { iter: left }; let right = right.map(|right| ParDrainProducer { iter: right }); (left, right) } #[cfg_attr(feature = "inline-more", inline)] fn fold_with(mut self, mut folder: F) -> F where F: Folder, { // Make sure to modify the iterator in-place so that any remaining // elements are processed in our Drop impl. while let Some(item) = self.iter.next() { folder = folder.consume(unsafe { item.read() }); if folder.full() { return folder; } } // If we processed all elements then we don't need to run the drop. mem::forget(self); folder } } impl Drop for ParDrainProducer { #[cfg_attr(feature = "inline-more", inline)] fn drop(&mut self) { // Drop all remaining elements if mem::needs_drop::() { while let Some(item) = self.iter.next() { unsafe { item.drop(); } } } } } impl RawTable { /// Returns a parallel iterator over the elements in a `RawTable`. #[cfg_attr(feature = "inline-more", inline)] pub unsafe fn par_iter(&self) -> RawParIter { RawParIter { iter: self.iter().iter, } } /// Returns a parallel iterator over the elements in a `RawTable`. #[cfg_attr(feature = "inline-more", inline)] pub fn into_par_iter(self) -> RawIntoParIter { RawIntoParIter { table: self } } /// Returns a parallel iterator which consumes all elements of a `RawTable` /// without freeing its memory allocation. #[cfg_attr(feature = "inline-more", inline)] pub fn par_drain(&mut self) -> RawParDrain<'_, T, A> { RawParDrain { table: NonNull::from(self), marker: PhantomData, } } }