1 use crate::{ 2 cfg::{self, CfgPrivate}, 3 clear::Clear, 4 page, 5 sync::{ 6 alloc, 7 atomic::{ 8 AtomicPtr, AtomicUsize, 9 Ordering::{self, *}, 10 }, 11 }, 12 tid::Tid, 13 Pack, 14 }; 15 16 use std::{fmt, ptr, slice}; 17 18 // ┌─────────────┐ ┌────────┐ 19 // │ page 1 │ │ │ 20 // ├─────────────┤ ┌───▶│ next──┼─┐ 21 // │ page 2 │ │ ├────────┤ │ 22 // │ │ │ │XXXXXXXX│ │ 23 // │ local_free──┼─┘ ├────────┤ │ 24 // │ global_free─┼─┐ │ │◀┘ 25 // ├─────────────┤ └───▶│ next──┼─┐ 26 // │ page 3 │ ├────────┤ │ 27 // └─────────────┘ │XXXXXXXX│ │ 28 // ... ├────────┤ │ 29 // ┌─────────────┐ │XXXXXXXX│ │ 30 // │ page n │ ├────────┤ │ 31 // └─────────────┘ │ │◀┘ 32 // │ next──┼───▶ 33 // ├────────┤ 34 // │XXXXXXXX│ 35 // └────────┘ 36 // ... 37 pub(crate) struct Shard<T, C: cfg::Config> { 38 /// The shard's parent thread ID. 39 pub(crate) tid: usize, 40 /// The local free list for each page. 41 /// 42 /// These are only ever accessed from this shard's thread, so they are 43 /// stored separately from the shared state for the page that can be 44 /// accessed concurrently, to minimize false sharing. 45 local: Box<[page::Local]>, 46 /// The shared state for each page in this shard. 47 /// 48 /// This consists of the page's metadata (size, previous size), remote free 49 /// list, and a pointer to the actual array backing that page. 50 shared: Box<[page::Shared<T, C>]>, 51 } 52 53 pub(crate) struct Array<T, C: cfg::Config> { 54 shards: Box<[Ptr<T, C>]>, 55 max: AtomicUsize, 56 } 57 58 #[derive(Debug)] 59 struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>); 60 61 #[derive(Debug)] 62 pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>); 63 64 // === impl Shard === 65 66 impl<T, C> Shard<T, C> 67 where 68 C: cfg::Config, 69 { 70 #[inline(always)] with_slot<'a, U>( &'a self, idx: usize, f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>, ) -> Option<U>71 pub(crate) fn with_slot<'a, U>( 72 &'a self, 73 idx: usize, 74 f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>, 75 ) -> Option<U> { 76 debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid); 77 let (addr, page_index) = page::indices::<C>(idx); 78 79 test_println!("-> {:?}", addr); 80 if page_index > self.shared.len() { 81 return None; 82 } 83 84 self.shared[page_index].with_slot(addr, f) 85 } 86 new(tid: usize) -> Self87 pub(crate) fn new(tid: usize) -> Self { 88 let mut total_sz = 0; 89 let shared = (0..C::MAX_PAGES) 90 .map(|page_num| { 91 let sz = C::page_size(page_num); 92 let prev_sz = total_sz; 93 total_sz += sz; 94 page::Shared::new(sz, prev_sz) 95 }) 96 .collect(); 97 let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect(); 98 Self { tid, local, shared } 99 } 100 } 101 102 impl<T, C> Shard<Option<T>, C> 103 where 104 C: cfg::Config, 105 { 106 /// Remove an item on the shard's local thread. take_local(&self, idx: usize) -> Option<T>107 pub(crate) fn take_local(&self, idx: usize) -> Option<T> { 108 debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid); 109 let (addr, page_index) = page::indices::<C>(idx); 110 111 test_println!("-> remove_local {:?}", addr); 112 113 self.shared 114 .get(page_index)? 115 .take(addr, C::unpack_gen(idx), self.local(page_index)) 116 } 117 118 /// Remove an item, while on a different thread from the shard's local thread. take_remote(&self, idx: usize) -> Option<T>119 pub(crate) fn take_remote(&self, idx: usize) -> Option<T> { 120 debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid); 121 debug_assert!(Tid::<C>::current().as_usize() != self.tid); 122 123 let (addr, page_index) = page::indices::<C>(idx); 124 125 test_println!("-> take_remote {:?}; page {:?}", addr, page_index); 126 127 let shared = self.shared.get(page_index)?; 128 shared.take(addr, C::unpack_gen(idx), shared.free_list()) 129 } 130 remove_local(&self, idx: usize) -> bool131 pub(crate) fn remove_local(&self, idx: usize) -> bool { 132 debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid); 133 let (addr, page_index) = page::indices::<C>(idx); 134 135 if page_index > self.shared.len() { 136 return false; 137 } 138 139 self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index)) 140 } 141 remove_remote(&self, idx: usize) -> bool142 pub(crate) fn remove_remote(&self, idx: usize) -> bool { 143 debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid); 144 let (addr, page_index) = page::indices::<C>(idx); 145 146 if page_index > self.shared.len() { 147 return false; 148 } 149 150 let shared = &self.shared[page_index]; 151 shared.remove(addr, C::unpack_gen(idx), shared.free_list()) 152 } 153 iter<'a>(&'a self) -> std::slice::Iter<'a, page::Shared<Option<T>, C>>154 pub(crate) fn iter<'a>(&'a self) -> std::slice::Iter<'a, page::Shared<Option<T>, C>> { 155 self.shared.iter() 156 } 157 } 158 159 impl<T, C> Shard<T, C> 160 where 161 T: Clear + Default, 162 C: cfg::Config, 163 { init_with<U>( &self, mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>, ) -> Option<U>164 pub(crate) fn init_with<U>( 165 &self, 166 mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>, 167 ) -> Option<U> { 168 // Can we fit the value into an exist`ing page? 169 for (page_idx, page) in self.shared.iter().enumerate() { 170 let local = self.local(page_idx); 171 172 test_println!("-> page {}; {:?}; {:?}", page_idx, local, page); 173 174 if let Some(res) = page.init_with(local, &mut init) { 175 return Some(res); 176 } 177 } 178 179 None 180 } 181 mark_clear_local(&self, idx: usize) -> bool182 pub(crate) fn mark_clear_local(&self, idx: usize) -> bool { 183 debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid); 184 let (addr, page_index) = page::indices::<C>(idx); 185 186 if page_index > self.shared.len() { 187 return false; 188 } 189 190 self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index)) 191 } 192 mark_clear_remote(&self, idx: usize) -> bool193 pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool { 194 debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid); 195 let (addr, page_index) = page::indices::<C>(idx); 196 197 if page_index > self.shared.len() { 198 return false; 199 } 200 201 let shared = &self.shared[page_index]; 202 shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list()) 203 } 204 clear_after_release(&self, idx: usize)205 pub(crate) fn clear_after_release(&self, idx: usize) { 206 crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire); 207 let tid = Tid::<C>::current().as_usize(); 208 test_println!( 209 "-> clear_after_release; self.tid={:?}; current.tid={:?};", 210 tid, 211 self.tid 212 ); 213 if tid == self.tid { 214 self.clear_local(idx); 215 } else { 216 self.clear_remote(idx); 217 } 218 } 219 clear_local(&self, idx: usize) -> bool220 fn clear_local(&self, idx: usize) -> bool { 221 debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid); 222 let (addr, page_index) = page::indices::<C>(idx); 223 224 if page_index > self.shared.len() { 225 return false; 226 } 227 228 self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index)) 229 } 230 clear_remote(&self, idx: usize) -> bool231 fn clear_remote(&self, idx: usize) -> bool { 232 debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid); 233 let (addr, page_index) = page::indices::<C>(idx); 234 235 if page_index > self.shared.len() { 236 return false; 237 } 238 239 let shared = &self.shared[page_index]; 240 shared.clear(addr, C::unpack_gen(idx), shared.free_list()) 241 } 242 243 #[inline(always)] local(&self, i: usize) -> &page::Local244 fn local(&self, i: usize) -> &page::Local { 245 #[cfg(debug_assertions)] 246 debug_assert_eq!( 247 Tid::<C>::current().as_usize(), 248 self.tid, 249 "tried to access local data from another thread!" 250 ); 251 252 &self.local[i] 253 } 254 } 255 256 impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 258 let mut d = f.debug_struct("Shard"); 259 260 #[cfg(debug_assertions)] 261 d.field("tid", &self.tid); 262 d.field("shared", &self.shared).finish() 263 } 264 } 265 266 // === impl Array === 267 268 impl<T, C> Array<T, C> 269 where 270 C: cfg::Config, 271 { new() -> Self272 pub(crate) fn new() -> Self { 273 let mut shards = Vec::with_capacity(C::MAX_SHARDS); 274 for _ in 0..C::MAX_SHARDS { 275 // XXX(eliza): T_T this could be avoided with maybeuninit or something... 276 shards.push(Ptr::null()); 277 } 278 Self { 279 shards: shards.into(), 280 max: AtomicUsize::new(0), 281 } 282 } 283 284 #[inline] get<'a>(&'a self, idx: usize) -> Option<&'a Shard<T, C>>285 pub(crate) fn get<'a>(&'a self, idx: usize) -> Option<&'a Shard<T, C>> { 286 test_println!("-> get shard={}", idx); 287 self.shards.get(idx)?.load(Acquire) 288 } 289 290 #[inline] current<'a>(&'a self) -> (Tid<C>, &'a Shard<T, C>)291 pub(crate) fn current<'a>(&'a self) -> (Tid<C>, &'a Shard<T, C>) { 292 let tid = Tid::<C>::current(); 293 test_println!("current: {:?}", tid); 294 let idx = tid.as_usize(); 295 // It's okay for this to be relaxed. The value is only ever stored by 296 // the thread that corresponds to the index, and we are that thread. 297 let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| { 298 let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx)))); 299 test_println!("-> allocated new shard for index {} at {:p}", idx, ptr); 300 self.shards[idx].set(ptr); 301 let mut max = self.max.load(Acquire); 302 while max < idx { 303 match self.max.compare_exchange(max, idx, AcqRel, Acquire) { 304 Ok(_) => break, 305 Err(actual) => max = actual, 306 } 307 } 308 test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max); 309 unsafe { 310 // Safety: we just put it there! 311 &*ptr 312 } 313 .get_ref() 314 }); 315 (tid, shard) 316 } 317 iter_mut(&mut self) -> IterMut<'_, T, C>318 pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> { 319 test_println!("Array::iter_mut"); 320 let max = self.max.load(Acquire); 321 test_println!("-> highest index={}", max); 322 IterMut(self.shards[0..=max].iter_mut()) 323 } 324 } 325 326 impl<T, C: cfg::Config> Drop for Array<T, C> { drop(&mut self)327 fn drop(&mut self) { 328 // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... 329 let max = self.max.load(Acquire); 330 for shard in &self.shards[0..=max] { 331 // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... 332 let ptr = shard.0.load(Acquire); 333 if ptr.is_null() { 334 continue; 335 } 336 let shard = unsafe { 337 // Safety: this is the only place where these boxes are 338 // deallocated, and we have exclusive access to the shard array, 339 // because...we are dropping it... 340 Box::from_raw(ptr) 341 }; 342 drop(shard) 343 } 344 } 345 } 346 347 impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 349 let max = self.max.load(Acquire); 350 let mut set = f.debug_map(); 351 for shard in &self.shards[0..=max] { 352 let ptr = shard.0.load(Acquire); 353 if let Some(shard) = ptr::NonNull::new(ptr) { 354 set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() }); 355 } else { 356 set.entry(&format_args!("{:p}", ptr), &()); 357 } 358 } 359 set.finish() 360 } 361 } 362 363 // === impl Ptr === 364 365 impl<T, C: cfg::Config> Ptr<T, C> { 366 #[inline] null() -> Self367 fn null() -> Self { 368 Self(AtomicPtr::new(ptr::null_mut())) 369 } 370 371 #[inline] load(&self, order: Ordering) -> Option<&Shard<T, C>>372 fn load(&self, order: Ordering) -> Option<&Shard<T, C>> { 373 let ptr = self.0.load(order); 374 test_println!("---> loaded={:p} (order={:?})", ptr, order); 375 if ptr.is_null() { 376 test_println!("---> null"); 377 return None; 378 } 379 let track = unsafe { 380 // Safety: The returned reference will have the same lifetime as the 381 // reference to the shard pointer, which (morally, if not actually) 382 // owns the shard. The shard is only deallocated when the shard 383 // array is dropped, and it won't be dropped while this pointer is 384 // borrowed --- and the returned reference has the same lifetime. 385 // 386 // We know that the pointer is not null, because we just 387 // null-checked it immediately prior. 388 &*ptr 389 }; 390 391 Some(track.get_ref()) 392 } 393 394 #[inline] set(&self, new: *mut alloc::Track<Shard<T, C>>)395 fn set(&self, new: *mut alloc::Track<Shard<T, C>>) { 396 self.0 397 .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire) 398 .expect("a shard can only be inserted by the thread that owns it, this is a bug!"); 399 } 400 } 401 402 // === Iterators === 403 404 impl<'a, T, C> Iterator for IterMut<'a, T, C> 405 where 406 T: 'a, 407 C: cfg::Config + 'a, 408 { 409 type Item = &'a Shard<T, C>; next(&mut self) -> Option<Self::Item>410 fn next(&mut self) -> Option<Self::Item> { 411 test_println!("IterMut::next"); 412 loop { 413 // Skip over empty indices if they are less than the highest 414 // allocated shard. Some threads may have accessed the slab 415 // (generating a thread ID) but never actually inserted data, so 416 // they may have never allocated a shard. 417 let next = self.0.next(); 418 test_println!("-> next.is_some={}", next.is_some()); 419 if let Some(shard) = next?.load(Acquire) { 420 test_println!("-> done"); 421 return Some(shard); 422 } 423 } 424 } 425 } 426