1 use crate::{
2     cfg::{self, CfgPrivate},
3     clear::Clear,
4     page,
5     sync::{
6         alloc,
7         atomic::{
8             AtomicPtr, AtomicUsize,
9             Ordering::{self, *},
10         },
11     },
12     tid::Tid,
13     Pack,
14 };
15 
16 use std::{fmt, ptr, slice};
17 
18 // ┌─────────────┐      ┌────────┐
19 // │ page 1      │      │        │
20 // ├─────────────┤ ┌───▶│  next──┼─┐
21 // │ page 2      │ │    ├────────┤ │
22 // │             │ │    │XXXXXXXX│ │
23 // │ local_free──┼─┘    ├────────┤ │
24 // │ global_free─┼─┐    │        │◀┘
25 // ├─────────────┤ └───▶│  next──┼─┐
26 // │   page 3    │      ├────────┤ │
27 // └─────────────┘      │XXXXXXXX│ │
28 //       ...            ├────────┤ │
29 // ┌─────────────┐      │XXXXXXXX│ │
30 // │ page n      │      ├────────┤ │
31 // └─────────────┘      │        │◀┘
32 //                      │  next──┼───▶
33 //                      ├────────┤
34 //                      │XXXXXXXX│
35 //                      └────────┘
36 //                         ...
37 pub(crate) struct Shard<T, C: cfg::Config> {
38     /// The shard's parent thread ID.
39     pub(crate) tid: usize,
40     /// The local free list for each page.
41     ///
42     /// These are only ever accessed from this shard's thread, so they are
43     /// stored separately from the shared state for the page that can be
44     /// accessed concurrently, to minimize false sharing.
45     local: Box<[page::Local]>,
46     /// The shared state for each page in this shard.
47     ///
48     /// This consists of the page's metadata (size, previous size), remote free
49     /// list, and a pointer to the actual array backing that page.
50     shared: Box<[page::Shared<T, C>]>,
51 }
52 
53 pub(crate) struct Array<T, C: cfg::Config> {
54     shards: Box<[Ptr<T, C>]>,
55     max: AtomicUsize,
56 }
57 
58 #[derive(Debug)]
59 struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>);
60 
61 #[derive(Debug)]
62 pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>);
63 
64 // === impl Shard ===
65 
66 impl<T, C> Shard<T, C>
67 where
68     C: cfg::Config,
69 {
70     #[inline(always)]
with_slot<'a, U>( &'a self, idx: usize, f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>, ) -> Option<U>71     pub(crate) fn with_slot<'a, U>(
72         &'a self,
73         idx: usize,
74         f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>,
75     ) -> Option<U> {
76         debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
77         let (addr, page_index) = page::indices::<C>(idx);
78 
79         test_println!("-> {:?}", addr);
80         if page_index > self.shared.len() {
81             return None;
82         }
83 
84         self.shared[page_index].with_slot(addr, f)
85     }
86 
new(tid: usize) -> Self87     pub(crate) fn new(tid: usize) -> Self {
88         let mut total_sz = 0;
89         let shared = (0..C::MAX_PAGES)
90             .map(|page_num| {
91                 let sz = C::page_size(page_num);
92                 let prev_sz = total_sz;
93                 total_sz += sz;
94                 page::Shared::new(sz, prev_sz)
95             })
96             .collect();
97         let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect();
98         Self { tid, local, shared }
99     }
100 }
101 
102 impl<T, C> Shard<Option<T>, C>
103 where
104     C: cfg::Config,
105 {
106     /// Remove an item on the shard's local thread.
take_local(&self, idx: usize) -> Option<T>107     pub(crate) fn take_local(&self, idx: usize) -> Option<T> {
108         debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
109         let (addr, page_index) = page::indices::<C>(idx);
110 
111         test_println!("-> remove_local {:?}", addr);
112 
113         self.shared
114             .get(page_index)?
115             .take(addr, C::unpack_gen(idx), self.local(page_index))
116     }
117 
118     /// Remove an item, while on a different thread from the shard's local thread.
take_remote(&self, idx: usize) -> Option<T>119     pub(crate) fn take_remote(&self, idx: usize) -> Option<T> {
120         debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
121         debug_assert!(Tid::<C>::current().as_usize() != self.tid);
122 
123         let (addr, page_index) = page::indices::<C>(idx);
124 
125         test_println!("-> take_remote {:?}; page {:?}", addr, page_index);
126 
127         let shared = self.shared.get(page_index)?;
128         shared.take(addr, C::unpack_gen(idx), shared.free_list())
129     }
130 
remove_local(&self, idx: usize) -> bool131     pub(crate) fn remove_local(&self, idx: usize) -> bool {
132         debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
133         let (addr, page_index) = page::indices::<C>(idx);
134 
135         if page_index > self.shared.len() {
136             return false;
137         }
138 
139         self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index))
140     }
141 
remove_remote(&self, idx: usize) -> bool142     pub(crate) fn remove_remote(&self, idx: usize) -> bool {
143         debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
144         let (addr, page_index) = page::indices::<C>(idx);
145 
146         if page_index > self.shared.len() {
147             return false;
148         }
149 
150         let shared = &self.shared[page_index];
151         shared.remove(addr, C::unpack_gen(idx), shared.free_list())
152     }
153 
iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>>154     pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> {
155         self.shared.iter()
156     }
157 }
158 
159 impl<T, C> Shard<T, C>
160 where
161     T: Clear + Default,
162     C: cfg::Config,
163 {
init_with<U>( &self, mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>, ) -> Option<U>164     pub(crate) fn init_with<U>(
165         &self,
166         mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>,
167     ) -> Option<U> {
168         // Can we fit the value into an exist`ing page?
169         for (page_idx, page) in self.shared.iter().enumerate() {
170             let local = self.local(page_idx);
171 
172             test_println!("-> page {}; {:?}; {:?}", page_idx, local, page);
173 
174             if let Some(res) = page.init_with(local, &mut init) {
175                 return Some(res);
176             }
177         }
178 
179         None
180     }
181 
mark_clear_local(&self, idx: usize) -> bool182     pub(crate) fn mark_clear_local(&self, idx: usize) -> bool {
183         debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
184         let (addr, page_index) = page::indices::<C>(idx);
185 
186         if page_index > self.shared.len() {
187             return false;
188         }
189 
190         self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index))
191     }
192 
mark_clear_remote(&self, idx: usize) -> bool193     pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool {
194         debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
195         let (addr, page_index) = page::indices::<C>(idx);
196 
197         if page_index > self.shared.len() {
198             return false;
199         }
200 
201         let shared = &self.shared[page_index];
202         shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list())
203     }
204 
clear_after_release(&self, idx: usize)205     pub(crate) fn clear_after_release(&self, idx: usize) {
206         crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire);
207         let tid = Tid::<C>::current().as_usize();
208         test_println!(
209             "-> clear_after_release; self.tid={:?}; current.tid={:?};",
210             tid,
211             self.tid
212         );
213         if tid == self.tid {
214             self.clear_local(idx);
215         } else {
216             self.clear_remote(idx);
217         }
218     }
219 
clear_local(&self, idx: usize) -> bool220     fn clear_local(&self, idx: usize) -> bool {
221         debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
222         let (addr, page_index) = page::indices::<C>(idx);
223 
224         if page_index > self.shared.len() {
225             return false;
226         }
227 
228         self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index))
229     }
230 
clear_remote(&self, idx: usize) -> bool231     fn clear_remote(&self, idx: usize) -> bool {
232         debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
233         let (addr, page_index) = page::indices::<C>(idx);
234 
235         if page_index > self.shared.len() {
236             return false;
237         }
238 
239         let shared = &self.shared[page_index];
240         shared.clear(addr, C::unpack_gen(idx), shared.free_list())
241     }
242 
243     #[inline(always)]
local(&self, i: usize) -> &page::Local244     fn local(&self, i: usize) -> &page::Local {
245         #[cfg(debug_assertions)]
246         debug_assert_eq_in_drop!(
247             Tid::<C>::current().as_usize(),
248             self.tid,
249             "tried to access local data from another thread!"
250         );
251 
252         &self.local[i]
253     }
254 }
255 
256 impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result257     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258         let mut d = f.debug_struct("Shard");
259 
260         #[cfg(debug_assertions)]
261         d.field("tid", &self.tid);
262         d.field("shared", &self.shared).finish()
263     }
264 }
265 
266 // === impl Array ===
267 
268 impl<T, C> Array<T, C>
269 where
270     C: cfg::Config,
271 {
new() -> Self272     pub(crate) fn new() -> Self {
273         let mut shards = Vec::with_capacity(C::MAX_SHARDS);
274         for _ in 0..C::MAX_SHARDS {
275             // XXX(eliza): T_T this could be avoided with maybeuninit or something...
276             shards.push(Ptr::null());
277         }
278         Self {
279             shards: shards.into(),
280             max: AtomicUsize::new(0),
281         }
282     }
283 
284     #[inline]
get(&self, idx: usize) -> Option<&Shard<T, C>>285     pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> {
286         test_println!("-> get shard={}", idx);
287         self.shards.get(idx)?.load(Acquire)
288     }
289 
290     #[inline]
current(&self) -> (Tid<C>, &Shard<T, C>)291     pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) {
292         let tid = Tid::<C>::current();
293         test_println!("current: {:?}", tid);
294         let idx = tid.as_usize();
295         assert!(
296             idx < self.shards.len(),
297             "Thread count overflowed the configured max count. \
298             Thread index = {}, max threads = {}.",
299             idx,
300             C::MAX_SHARDS,
301         );
302         // It's okay for this to be relaxed. The value is only ever stored by
303         // the thread that corresponds to the index, and we are that thread.
304         let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| {
305             let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx))));
306             test_println!("-> allocated new shard for index {} at {:p}", idx, ptr);
307             self.shards[idx].set(ptr);
308             let mut max = self.max.load(Acquire);
309             while max < idx {
310                 match self.max.compare_exchange(max, idx, AcqRel, Acquire) {
311                     Ok(_) => break,
312                     Err(actual) => max = actual,
313                 }
314             }
315             test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max);
316             unsafe {
317                 // Safety: we just put it there!
318                 &*ptr
319             }
320             .get_ref()
321         });
322         (tid, shard)
323     }
324 
iter_mut(&mut self) -> IterMut<'_, T, C>325     pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> {
326         test_println!("Array::iter_mut");
327         let max = self.max.load(Acquire);
328         test_println!("-> highest index={}", max);
329         IterMut(self.shards[0..=max].iter_mut())
330     }
331 }
332 
333 impl<T, C: cfg::Config> Drop for Array<T, C> {
drop(&mut self)334     fn drop(&mut self) {
335         // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
336         let max = self.max.load(Acquire);
337         for shard in &self.shards[0..=max] {
338             // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
339             let ptr = shard.0.load(Acquire);
340             if ptr.is_null() {
341                 continue;
342             }
343             let shard = unsafe {
344                 // Safety: this is the only place where these boxes are
345                 // deallocated, and we have exclusive access to the shard array,
346                 // because...we are dropping it...
347                 Box::from_raw(ptr)
348             };
349             drop(shard)
350         }
351     }
352 }
353 
354 impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result355     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356         let max = self.max.load(Acquire);
357         let mut set = f.debug_map();
358         for shard in &self.shards[0..=max] {
359             let ptr = shard.0.load(Acquire);
360             if let Some(shard) = ptr::NonNull::new(ptr) {
361                 set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() });
362             } else {
363                 set.entry(&format_args!("{:p}", ptr), &());
364             }
365         }
366         set.finish()
367     }
368 }
369 
370 // === impl Ptr ===
371 
372 impl<T, C: cfg::Config> Ptr<T, C> {
373     #[inline]
null() -> Self374     fn null() -> Self {
375         Self(AtomicPtr::new(ptr::null_mut()))
376     }
377 
378     #[inline]
load(&self, order: Ordering) -> Option<&Shard<T, C>>379     fn load(&self, order: Ordering) -> Option<&Shard<T, C>> {
380         let ptr = self.0.load(order);
381         test_println!("---> loaded={:p} (order={:?})", ptr, order);
382         if ptr.is_null() {
383             test_println!("---> null");
384             return None;
385         }
386         let track = unsafe {
387             // Safety: The returned reference will have the same lifetime as the
388             // reference to the shard pointer, which (morally, if not actually)
389             // owns the shard. The shard is only deallocated when the shard
390             // array is dropped, and it won't be dropped while this pointer is
391             // borrowed --- and the returned reference has the same lifetime.
392             //
393             // We know that the pointer is not null, because we just
394             // null-checked it immediately prior.
395             &*ptr
396         };
397 
398         Some(track.get_ref())
399     }
400 
401     #[inline]
set(&self, new: *mut alloc::Track<Shard<T, C>>)402     fn set(&self, new: *mut alloc::Track<Shard<T, C>>) {
403         self.0
404             .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire)
405             .expect("a shard can only be inserted by the thread that owns it, this is a bug!");
406     }
407 }
408 
409 // === Iterators ===
410 
411 impl<'a, T, C> Iterator for IterMut<'a, T, C>
412 where
413     T: 'a,
414     C: cfg::Config + 'a,
415 {
416     type Item = &'a Shard<T, C>;
next(&mut self) -> Option<Self::Item>417     fn next(&mut self) -> Option<Self::Item> {
418         test_println!("IterMut::next");
419         loop {
420             // Skip over empty indices if they are less than the highest
421             // allocated shard. Some threads may have accessed the slab
422             // (generating a thread ID) but never actually inserted data, so
423             // they may have never allocated a shard.
424             let next = self.0.next();
425             test_println!("-> next.is_some={}", next.is_some());
426             if let Some(shard) = next?.load(Acquire) {
427                 test_println!("-> done");
428                 return Some(shard);
429             }
430         }
431     }
432 }
433