1 use crate::{
2     cfg::{self, CfgPrivate},
3     clear::Clear,
4     page,
5     sync::{
6         alloc,
7         atomic::{
8             AtomicPtr, AtomicUsize,
9             Ordering::{self, *},
10         },
11     },
12     tid::Tid,
13     Pack,
14 };
15 
16 use std::{fmt, ptr, slice};
17 
18 // ┌─────────────┐      ┌────────┐
19 // │ page 1      │      │        │
20 // ├─────────────┤ ┌───▶│  next──┼─┐
21 // │ page 2      │ │    ├────────┤ │
22 // │             │ │    │XXXXXXXX│ │
23 // │ local_free──┼─┘    ├────────┤ │
24 // │ global_free─┼─┐    │        │◀┘
25 // ├─────────────┤ └───▶│  next──┼─┐
26 // │   page 3    │      ├────────┤ │
27 // └─────────────┘      │XXXXXXXX│ │
28 //       ...            ├────────┤ │
29 // ┌─────────────┐      │XXXXXXXX│ │
30 // │ page n      │      ├────────┤ │
31 // └─────────────┘      │        │◀┘
32 //                      │  next──┼───▶
33 //                      ├────────┤
34 //                      │XXXXXXXX│
35 //                      └────────┘
36 //                         ...
37 pub(crate) struct Shard<T, C: cfg::Config> {
38     /// The shard's parent thread ID.
39     pub(crate) tid: usize,
40     /// The local free list for each page.
41     ///
42     /// These are only ever accessed from this shard's thread, so they are
43     /// stored separately from the shared state for the page that can be
44     /// accessed concurrently, to minimize false sharing.
45     local: Box<[page::Local]>,
46     /// The shared state for each page in this shard.
47     ///
48     /// This consists of the page's metadata (size, previous size), remote free
49     /// list, and a pointer to the actual array backing that page.
50     shared: Box<[page::Shared<T, C>]>,
51 }
52 
53 pub(crate) struct Array<T, C: cfg::Config> {
54     shards: Box<[Ptr<T, C>]>,
55     max: AtomicUsize,
56 }
57 
58 #[derive(Debug)]
59 struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>);
60 
61 #[derive(Debug)]
62 pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>);
63 
64 // === impl Shard ===
65 
66 impl<T, C> Shard<T, C>
67 where
68     C: cfg::Config,
69 {
70     #[inline(always)]
with_slot<'a, U>( &'a self, idx: usize, f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>, ) -> Option<U>71     pub(crate) fn with_slot<'a, U>(
72         &'a self,
73         idx: usize,
74         f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>,
75     ) -> Option<U> {
76         debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
77         let (addr, page_index) = page::indices::<C>(idx);
78 
79         test_println!("-> {:?}", addr);
80         if page_index > self.shared.len() {
81             return None;
82         }
83 
84         self.shared[page_index].with_slot(addr, f)
85     }
86 
new(tid: usize) -> Self87     pub(crate) fn new(tid: usize) -> Self {
88         let mut total_sz = 0;
89         let shared = (0..C::MAX_PAGES)
90             .map(|page_num| {
91                 let sz = C::page_size(page_num);
92                 let prev_sz = total_sz;
93                 total_sz += sz;
94                 page::Shared::new(sz, prev_sz)
95             })
96             .collect();
97         let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect();
98         Self { tid, local, shared }
99     }
100 }
101 
102 impl<T, C> Shard<Option<T>, C>
103 where
104     C: cfg::Config,
105 {
106     /// Remove an item on the shard's local thread.
take_local(&self, idx: usize) -> Option<T>107     pub(crate) fn take_local(&self, idx: usize) -> Option<T> {
108         debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
109         let (addr, page_index) = page::indices::<C>(idx);
110 
111         test_println!("-> remove_local {:?}", addr);
112 
113         self.shared
114             .get(page_index)?
115             .take(addr, C::unpack_gen(idx), self.local(page_index))
116     }
117 
118     /// Remove an item, while on a different thread from the shard's local thread.
take_remote(&self, idx: usize) -> Option<T>119     pub(crate) fn take_remote(&self, idx: usize) -> Option<T> {
120         debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
121         debug_assert!(Tid::<C>::current().as_usize() != self.tid);
122 
123         let (addr, page_index) = page::indices::<C>(idx);
124 
125         test_println!("-> take_remote {:?}; page {:?}", addr, page_index);
126 
127         let shared = self.shared.get(page_index)?;
128         shared.take(addr, C::unpack_gen(idx), shared.free_list())
129     }
130 
remove_local(&self, idx: usize) -> bool131     pub(crate) fn remove_local(&self, idx: usize) -> bool {
132         debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
133         let (addr, page_index) = page::indices::<C>(idx);
134 
135         if page_index > self.shared.len() {
136             return false;
137         }
138 
139         self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index))
140     }
141 
remove_remote(&self, idx: usize) -> bool142     pub(crate) fn remove_remote(&self, idx: usize) -> bool {
143         debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
144         let (addr, page_index) = page::indices::<C>(idx);
145 
146         if page_index > self.shared.len() {
147             return false;
148         }
149 
150         let shared = &self.shared[page_index];
151         shared.remove(addr, C::unpack_gen(idx), shared.free_list())
152     }
153 
iter<'a>(&'a self) -> std::slice::Iter<'a, page::Shared<Option<T>, C>>154     pub(crate) fn iter<'a>(&'a self) -> std::slice::Iter<'a, page::Shared<Option<T>, C>> {
155         self.shared.iter()
156     }
157 }
158 
159 impl<T, C> Shard<T, C>
160 where
161     T: Clear + Default,
162     C: cfg::Config,
163 {
init_with<U>( &self, mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>, ) -> Option<U>164     pub(crate) fn init_with<U>(
165         &self,
166         mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>,
167     ) -> Option<U> {
168         // Can we fit the value into an exist`ing page?
169         for (page_idx, page) in self.shared.iter().enumerate() {
170             let local = self.local(page_idx);
171 
172             test_println!("-> page {}; {:?}; {:?}", page_idx, local, page);
173 
174             if let Some(res) = page.init_with(local, &mut init) {
175                 return Some(res);
176             }
177         }
178 
179         None
180     }
181 
mark_clear_local(&self, idx: usize) -> bool182     pub(crate) fn mark_clear_local(&self, idx: usize) -> bool {
183         debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
184         let (addr, page_index) = page::indices::<C>(idx);
185 
186         if page_index > self.shared.len() {
187             return false;
188         }
189 
190         self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index))
191     }
192 
mark_clear_remote(&self, idx: usize) -> bool193     pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool {
194         debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
195         let (addr, page_index) = page::indices::<C>(idx);
196 
197         if page_index > self.shared.len() {
198             return false;
199         }
200 
201         let shared = &self.shared[page_index];
202         shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list())
203     }
204 
clear_after_release(&self, idx: usize)205     pub(crate) fn clear_after_release(&self, idx: usize) {
206         crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire);
207         let tid = Tid::<C>::current().as_usize();
208         test_println!(
209             "-> clear_after_release; self.tid={:?}; current.tid={:?};",
210             tid,
211             self.tid
212         );
213         if tid == self.tid {
214             self.clear_local(idx);
215         } else {
216             self.clear_remote(idx);
217         }
218     }
219 
clear_local(&self, idx: usize) -> bool220     fn clear_local(&self, idx: usize) -> bool {
221         debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
222         let (addr, page_index) = page::indices::<C>(idx);
223 
224         if page_index > self.shared.len() {
225             return false;
226         }
227 
228         self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index))
229     }
230 
clear_remote(&self, idx: usize) -> bool231     fn clear_remote(&self, idx: usize) -> bool {
232         debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
233         let (addr, page_index) = page::indices::<C>(idx);
234 
235         if page_index > self.shared.len() {
236             return false;
237         }
238 
239         let shared = &self.shared[page_index];
240         shared.clear(addr, C::unpack_gen(idx), shared.free_list())
241     }
242 
243     #[inline(always)]
local(&self, i: usize) -> &page::Local244     fn local(&self, i: usize) -> &page::Local {
245         #[cfg(debug_assertions)]
246         debug_assert_eq!(
247             Tid::<C>::current().as_usize(),
248             self.tid,
249             "tried to access local data from another thread!"
250         );
251 
252         &self.local[i]
253     }
254 }
255 
256 impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result257     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258         let mut d = f.debug_struct("Shard");
259 
260         #[cfg(debug_assertions)]
261         d.field("tid", &self.tid);
262         d.field("shared", &self.shared).finish()
263     }
264 }
265 
266 // === impl Array ===
267 
268 impl<T, C> Array<T, C>
269 where
270     C: cfg::Config,
271 {
new() -> Self272     pub(crate) fn new() -> Self {
273         let mut shards = Vec::with_capacity(C::MAX_SHARDS);
274         for _ in 0..C::MAX_SHARDS {
275             // XXX(eliza): T_T this could be avoided with maybeuninit or something...
276             shards.push(Ptr::null());
277         }
278         Self {
279             shards: shards.into(),
280             max: AtomicUsize::new(0),
281         }
282     }
283 
284     #[inline]
get<'a>(&'a self, idx: usize) -> Option<&'a Shard<T, C>>285     pub(crate) fn get<'a>(&'a self, idx: usize) -> Option<&'a Shard<T, C>> {
286         test_println!("-> get shard={}", idx);
287         self.shards.get(idx)?.load(Acquire)
288     }
289 
290     #[inline]
current<'a>(&'a self) -> (Tid<C>, &'a Shard<T, C>)291     pub(crate) fn current<'a>(&'a self) -> (Tid<C>, &'a Shard<T, C>) {
292         let tid = Tid::<C>::current();
293         test_println!("current: {:?}", tid);
294         let idx = tid.as_usize();
295         // It's okay for this to be relaxed. The value is only ever stored by
296         // the thread that corresponds to the index, and we are that thread.
297         let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| {
298             let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx))));
299             test_println!("-> allocated new shard for index {} at {:p}", idx, ptr);
300             self.shards[idx].set(ptr);
301             let mut max = self.max.load(Acquire);
302             while max < idx {
303                 match self.max.compare_exchange(max, idx, AcqRel, Acquire) {
304                     Ok(_) => break,
305                     Err(actual) => max = actual,
306                 }
307             }
308             test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max);
309             unsafe {
310                 // Safety: we just put it there!
311                 &*ptr
312             }
313             .get_ref()
314         });
315         (tid, shard)
316     }
317 
iter_mut(&mut self) -> IterMut<'_, T, C>318     pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> {
319         test_println!("Array::iter_mut");
320         let max = self.max.load(Acquire);
321         test_println!("-> highest index={}", max);
322         IterMut(self.shards[0..=max].iter_mut())
323     }
324 }
325 
326 impl<T, C: cfg::Config> Drop for Array<T, C> {
drop(&mut self)327     fn drop(&mut self) {
328         // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
329         let max = self.max.load(Acquire);
330         for shard in &self.shards[0..=max] {
331             // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
332             let ptr = shard.0.load(Acquire);
333             if ptr.is_null() {
334                 continue;
335             }
336             let shard = unsafe {
337                 // Safety: this is the only place where these boxes are
338                 // deallocated, and we have exclusive access to the shard array,
339                 // because...we are dropping it...
340                 Box::from_raw(ptr)
341             };
342             drop(shard)
343         }
344     }
345 }
346 
347 impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result348     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349         let max = self.max.load(Acquire);
350         let mut set = f.debug_map();
351         for shard in &self.shards[0..=max] {
352             let ptr = shard.0.load(Acquire);
353             if let Some(shard) = ptr::NonNull::new(ptr) {
354                 set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() });
355             } else {
356                 set.entry(&format_args!("{:p}", ptr), &());
357             }
358         }
359         set.finish()
360     }
361 }
362 
363 // === impl Ptr ===
364 
365 impl<T, C: cfg::Config> Ptr<T, C> {
366     #[inline]
null() -> Self367     fn null() -> Self {
368         Self(AtomicPtr::new(ptr::null_mut()))
369     }
370 
371     #[inline]
load(&self, order: Ordering) -> Option<&Shard<T, C>>372     fn load(&self, order: Ordering) -> Option<&Shard<T, C>> {
373         let ptr = self.0.load(order);
374         test_println!("---> loaded={:p} (order={:?})", ptr, order);
375         if ptr.is_null() {
376             test_println!("---> null");
377             return None;
378         }
379         let track = unsafe {
380             // Safety: The returned reference will have the same lifetime as the
381             // reference to the shard pointer, which (morally, if not actually)
382             // owns the shard. The shard is only deallocated when the shard
383             // array is dropped, and it won't be dropped while this pointer is
384             // borrowed --- and the returned reference has the same lifetime.
385             //
386             // We know that the pointer is not null, because we just
387             // null-checked it immediately prior.
388             &*ptr
389         };
390 
391         Some(track.get_ref())
392     }
393 
394     #[inline]
set(&self, new: *mut alloc::Track<Shard<T, C>>)395     fn set(&self, new: *mut alloc::Track<Shard<T, C>>) {
396         self.0
397             .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire)
398             .expect("a shard can only be inserted by the thread that owns it, this is a bug!");
399     }
400 }
401 
402 // === Iterators ===
403 
404 impl<'a, T, C> Iterator for IterMut<'a, T, C>
405 where
406     T: 'a,
407     C: cfg::Config + 'a,
408 {
409     type Item = &'a Shard<T, C>;
next(&mut self) -> Option<Self::Item>410     fn next(&mut self) -> Option<Self::Item> {
411         test_println!("IterMut::next");
412         loop {
413             // Skip over empty indices if they are less than the highest
414             // allocated shard. Some threads may have accessed the slab
415             // (generating a thread ID) but never actually inserted data, so
416             // they may have never allocated a shard.
417             let next = self.0.next();
418             test_println!("-> next.is_some={}", next.is_some());
419             if let Some(shard) = next?.load(Acquire) {
420                 test_println!("-> done");
421                 return Some(shard);
422             }
423         }
424     }
425 }
426