1 /// Epoch-based garbage collector. 2 /// 3 /// # Examples 4 /// 5 /// ``` 6 /// use crossbeam_epoch::Collector; 7 /// 8 /// let collector = Collector::new(); 9 /// 10 /// let handle = collector.register(); 11 /// drop(collector); // `handle` still works after dropping `collector` 12 /// 13 /// handle.pin().flush(); 14 /// ``` 15 use core::fmt; 16 17 use crate::guard::Guard; 18 use crate::internal::{Global, Local}; 19 use crate::primitive::sync::Arc; 20 21 /// An epoch-based garbage collector. 22 pub struct Collector { 23 pub(crate) global: Arc<Global>, 24 } 25 26 unsafe impl Send for Collector {} 27 unsafe impl Sync for Collector {} 28 29 impl Default for Collector { default() -> Self30 fn default() -> Self { 31 Self { 32 global: Arc::new(Global::new()), 33 } 34 } 35 } 36 37 impl Collector { 38 /// Creates a new collector. new() -> Self39 pub fn new() -> Self { 40 Self::default() 41 } 42 43 /// Registers a new handle for the collector. register(&self) -> LocalHandle44 pub fn register(&self) -> LocalHandle { 45 Local::register(self) 46 } 47 } 48 49 impl Clone for Collector { 50 /// Creates another reference to the same garbage collector. clone(&self) -> Self51 fn clone(&self) -> Self { 52 Collector { 53 global: self.global.clone(), 54 } 55 } 56 } 57 58 impl fmt::Debug for Collector { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 60 f.pad("Collector { .. }") 61 } 62 } 63 64 impl PartialEq for Collector { 65 /// Checks if both handles point to the same collector. eq(&self, rhs: &Collector) -> bool66 fn eq(&self, rhs: &Collector) -> bool { 67 Arc::ptr_eq(&self.global, &rhs.global) 68 } 69 } 70 impl Eq for Collector {} 71 72 /// A handle to a garbage collector. 73 pub struct LocalHandle { 74 pub(crate) local: *const Local, 75 } 76 77 impl LocalHandle { 78 /// Pins the handle. 79 #[inline] pin(&self) -> Guard80 pub fn pin(&self) -> Guard { 81 unsafe { (*self.local).pin() } 82 } 83 84 /// Returns `true` if the handle is pinned. 85 #[inline] is_pinned(&self) -> bool86 pub fn is_pinned(&self) -> bool { 87 unsafe { (*self.local).is_pinned() } 88 } 89 90 /// Returns the `Collector` associated with this handle. 91 #[inline] collector(&self) -> &Collector92 pub fn collector(&self) -> &Collector { 93 unsafe { (*self.local).collector() } 94 } 95 } 96 97 impl Drop for LocalHandle { 98 #[inline] drop(&mut self)99 fn drop(&mut self) { 100 unsafe { 101 Local::release_handle(&*self.local); 102 } 103 } 104 } 105 106 impl fmt::Debug for LocalHandle { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 108 f.pad("LocalHandle { .. }") 109 } 110 } 111 112 #[cfg(all(test, not(crossbeam_loom)))] 113 mod tests { 114 use std::mem; 115 use std::sync::atomic::{AtomicUsize, Ordering}; 116 117 use crossbeam_utils::thread; 118 119 use crate::{Collector, Owned}; 120 121 const NUM_THREADS: usize = 8; 122 123 #[test] pin_reentrant()124 fn pin_reentrant() { 125 let collector = Collector::new(); 126 let handle = collector.register(); 127 drop(collector); 128 129 assert!(!handle.is_pinned()); 130 { 131 let _guard = &handle.pin(); 132 assert!(handle.is_pinned()); 133 { 134 let _guard = &handle.pin(); 135 assert!(handle.is_pinned()); 136 } 137 assert!(handle.is_pinned()); 138 } 139 assert!(!handle.is_pinned()); 140 } 141 142 #[test] flush_local_bag()143 fn flush_local_bag() { 144 let collector = Collector::new(); 145 let handle = collector.register(); 146 drop(collector); 147 148 for _ in 0..100 { 149 let guard = &handle.pin(); 150 unsafe { 151 let a = Owned::new(7).into_shared(guard); 152 guard.defer_destroy(a); 153 154 assert!(!(*guard.local).bag.with(|b| (*b).is_empty())); 155 156 while !(*guard.local).bag.with(|b| (*b).is_empty()) { 157 guard.flush(); 158 } 159 } 160 } 161 } 162 163 #[test] garbage_buffering()164 fn garbage_buffering() { 165 let collector = Collector::new(); 166 let handle = collector.register(); 167 drop(collector); 168 169 let guard = &handle.pin(); 170 unsafe { 171 for _ in 0..10 { 172 let a = Owned::new(7).into_shared(guard); 173 guard.defer_destroy(a); 174 } 175 assert!(!(*guard.local).bag.with(|b| (*b).is_empty())); 176 } 177 } 178 179 #[test] pin_holds_advance()180 fn pin_holds_advance() { 181 let collector = Collector::new(); 182 183 thread::scope(|scope| { 184 for _ in 0..NUM_THREADS { 185 scope.spawn(|_| { 186 let handle = collector.register(); 187 for _ in 0..500_000 { 188 let guard = &handle.pin(); 189 190 let before = collector.global.epoch.load(Ordering::Relaxed); 191 collector.global.collect(guard); 192 let after = collector.global.epoch.load(Ordering::Relaxed); 193 194 assert!(after.wrapping_sub(before) <= 2); 195 } 196 }); 197 } 198 }) 199 .unwrap(); 200 } 201 202 #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS` 203 #[test] incremental()204 fn incremental() { 205 const COUNT: usize = 100_000; 206 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 207 208 let collector = Collector::new(); 209 let handle = collector.register(); 210 211 unsafe { 212 let guard = &handle.pin(); 213 for _ in 0..COUNT { 214 let a = Owned::new(7i32).into_shared(guard); 215 guard.defer_unchecked(move || { 216 drop(a.into_owned()); 217 DESTROYS.fetch_add(1, Ordering::Relaxed); 218 }); 219 } 220 guard.flush(); 221 } 222 223 let mut last = 0; 224 225 while last < COUNT { 226 let curr = DESTROYS.load(Ordering::Relaxed); 227 assert!(curr - last <= 1024); 228 last = curr; 229 230 let guard = &handle.pin(); 231 collector.global.collect(guard); 232 } 233 assert!(DESTROYS.load(Ordering::Relaxed) == 100_000); 234 } 235 236 #[test] buffering()237 fn buffering() { 238 const COUNT: usize = 10; 239 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 240 241 let collector = Collector::new(); 242 let handle = collector.register(); 243 244 unsafe { 245 let guard = &handle.pin(); 246 for _ in 0..COUNT { 247 let a = Owned::new(7i32).into_shared(guard); 248 guard.defer_unchecked(move || { 249 drop(a.into_owned()); 250 DESTROYS.fetch_add(1, Ordering::Relaxed); 251 }); 252 } 253 } 254 255 for _ in 0..100_000 { 256 collector.global.collect(&handle.pin()); 257 } 258 assert!(DESTROYS.load(Ordering::Relaxed) < COUNT); 259 260 handle.pin().flush(); 261 262 while DESTROYS.load(Ordering::Relaxed) < COUNT { 263 let guard = &handle.pin(); 264 collector.global.collect(guard); 265 } 266 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); 267 } 268 269 #[test] count_drops()270 fn count_drops() { 271 const COUNT: usize = 100_000; 272 static DROPS: AtomicUsize = AtomicUsize::new(0); 273 274 struct Elem(i32); 275 276 impl Drop for Elem { 277 fn drop(&mut self) { 278 DROPS.fetch_add(1, Ordering::Relaxed); 279 } 280 } 281 282 let collector = Collector::new(); 283 let handle = collector.register(); 284 285 unsafe { 286 let guard = &handle.pin(); 287 288 for _ in 0..COUNT { 289 let a = Owned::new(Elem(7i32)).into_shared(guard); 290 guard.defer_destroy(a); 291 } 292 guard.flush(); 293 } 294 295 while DROPS.load(Ordering::Relaxed) < COUNT { 296 let guard = &handle.pin(); 297 collector.global.collect(guard); 298 } 299 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT); 300 } 301 302 #[test] count_destroy()303 fn count_destroy() { 304 const COUNT: usize = 100_000; 305 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 306 307 let collector = Collector::new(); 308 let handle = collector.register(); 309 310 unsafe { 311 let guard = &handle.pin(); 312 313 for _ in 0..COUNT { 314 let a = Owned::new(7i32).into_shared(guard); 315 guard.defer_unchecked(move || { 316 drop(a.into_owned()); 317 DESTROYS.fetch_add(1, Ordering::Relaxed); 318 }); 319 } 320 guard.flush(); 321 } 322 323 while DESTROYS.load(Ordering::Relaxed) < COUNT { 324 let guard = &handle.pin(); 325 collector.global.collect(guard); 326 } 327 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); 328 } 329 330 #[test] drop_array()331 fn drop_array() { 332 const COUNT: usize = 700; 333 static DROPS: AtomicUsize = AtomicUsize::new(0); 334 335 struct Elem(i32); 336 337 impl Drop for Elem { 338 fn drop(&mut self) { 339 DROPS.fetch_add(1, Ordering::Relaxed); 340 } 341 } 342 343 let collector = Collector::new(); 344 let handle = collector.register(); 345 346 let mut guard = handle.pin(); 347 348 let mut v = Vec::with_capacity(COUNT); 349 for i in 0..COUNT { 350 v.push(Elem(i as i32)); 351 } 352 353 { 354 let a = Owned::new(v).into_shared(&guard); 355 unsafe { 356 guard.defer_destroy(a); 357 } 358 guard.flush(); 359 } 360 361 while DROPS.load(Ordering::Relaxed) < COUNT { 362 guard.repin(); 363 collector.global.collect(&guard); 364 } 365 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT); 366 } 367 368 #[test] destroy_array()369 fn destroy_array() { 370 const COUNT: usize = 100_000; 371 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 372 373 let collector = Collector::new(); 374 let handle = collector.register(); 375 376 unsafe { 377 let guard = &handle.pin(); 378 379 let mut v = Vec::with_capacity(COUNT); 380 for i in 0..COUNT { 381 v.push(i as i32); 382 } 383 384 let ptr = v.as_mut_ptr() as usize; 385 let len = v.len(); 386 guard.defer_unchecked(move || { 387 drop(Vec::from_raw_parts(ptr as *const i32 as *mut i32, len, len)); 388 DESTROYS.fetch_add(len, Ordering::Relaxed); 389 }); 390 guard.flush(); 391 392 mem::forget(v); 393 } 394 395 while DESTROYS.load(Ordering::Relaxed) < COUNT { 396 let guard = &handle.pin(); 397 collector.global.collect(guard); 398 } 399 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); 400 } 401 402 #[test] stress()403 fn stress() { 404 const THREADS: usize = 8; 405 const COUNT: usize = 100_000; 406 static DROPS: AtomicUsize = AtomicUsize::new(0); 407 408 struct Elem(i32); 409 410 impl Drop for Elem { 411 fn drop(&mut self) { 412 DROPS.fetch_add(1, Ordering::Relaxed); 413 } 414 } 415 416 let collector = Collector::new(); 417 418 thread::scope(|scope| { 419 for _ in 0..THREADS { 420 scope.spawn(|_| { 421 let handle = collector.register(); 422 for _ in 0..COUNT { 423 let guard = &handle.pin(); 424 unsafe { 425 let a = Owned::new(Elem(7i32)).into_shared(guard); 426 guard.defer_destroy(a); 427 } 428 } 429 }); 430 } 431 }) 432 .unwrap(); 433 434 let handle = collector.register(); 435 while DROPS.load(Ordering::Relaxed) < COUNT * THREADS { 436 let guard = &handle.pin(); 437 collector.global.collect(guard); 438 } 439 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS); 440 } 441 } 442