1 /// Epoch-based garbage collector. 2 /// 3 /// # Examples 4 /// 5 /// ``` 6 /// use crossbeam_epoch::Collector; 7 /// 8 /// let collector = Collector::new(); 9 /// 10 /// let handle = collector.register(); 11 /// drop(collector); // `handle` still works after dropping `collector` 12 /// 13 /// handle.pin().flush(); 14 /// ``` 15 use alloc::sync::Arc; 16 use core::fmt; 17 18 use guard::Guard; 19 use internal::{Global, Local}; 20 21 /// An epoch-based garbage collector. 22 pub struct Collector { 23 pub(crate) global: Arc<Global>, 24 } 25 26 unsafe impl Send for Collector {} 27 unsafe impl Sync for Collector {} 28 29 impl Collector { 30 /// Creates a new collector. new() -> Self31 pub fn new() -> Self { 32 Collector { 33 global: Arc::new(Global::new()), 34 } 35 } 36 37 /// Registers a new handle for the collector. register(&self) -> LocalHandle38 pub fn register(&self) -> LocalHandle { 39 Local::register(self) 40 } 41 } 42 43 impl Clone for Collector { 44 /// Creates another reference to the same garbage collector. clone(&self) -> Self45 fn clone(&self) -> Self { 46 Collector { 47 global: self.global.clone(), 48 } 49 } 50 } 51 52 impl fmt::Debug for Collector { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result53 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 54 f.pad("Collector { .. }") 55 } 56 } 57 58 impl PartialEq for Collector { 59 /// Checks if both handles point to the same collector. eq(&self, rhs: &Collector) -> bool60 fn eq(&self, rhs: &Collector) -> bool { 61 Arc::ptr_eq(&self.global, &rhs.global) 62 } 63 } 64 impl Eq for Collector {} 65 66 /// A handle to a garbage collector. 67 pub struct LocalHandle { 68 pub(crate) local: *const Local, 69 } 70 71 impl LocalHandle { 72 /// Pins the handle. 73 #[inline] pin(&self) -> Guard74 pub fn pin(&self) -> Guard { 75 unsafe { (*self.local).pin() } 76 } 77 78 /// Returns `true` if the handle is pinned. 79 #[inline] is_pinned(&self) -> bool80 pub fn is_pinned(&self) -> bool { 81 unsafe { (*self.local).is_pinned() } 82 } 83 84 /// Returns the `Collector` associated with this handle. 85 #[inline] collector(&self) -> &Collector86 pub fn collector(&self) -> &Collector { 87 unsafe { (*self.local).collector() } 88 } 89 } 90 91 impl Drop for LocalHandle { 92 #[inline] drop(&mut self)93 fn drop(&mut self) { 94 unsafe { 95 Local::release_handle(&*self.local); 96 } 97 } 98 } 99 100 impl fmt::Debug for LocalHandle { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result101 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 102 f.pad("LocalHandle { .. }") 103 } 104 } 105 106 #[cfg(test)] 107 mod tests { 108 use std::mem; 109 use std::sync::atomic::{AtomicUsize, Ordering}; 110 111 use crossbeam_utils::thread; 112 113 use {Collector, Owned}; 114 115 const NUM_THREADS: usize = 8; 116 117 #[test] pin_reentrant()118 fn pin_reentrant() { 119 let collector = Collector::new(); 120 let handle = collector.register(); 121 drop(collector); 122 123 assert!(!handle.is_pinned()); 124 { 125 let _guard = &handle.pin(); 126 assert!(handle.is_pinned()); 127 { 128 let _guard = &handle.pin(); 129 assert!(handle.is_pinned()); 130 } 131 assert!(handle.is_pinned()); 132 } 133 assert!(!handle.is_pinned()); 134 } 135 136 #[test] flush_local_bag()137 fn flush_local_bag() { 138 let collector = Collector::new(); 139 let handle = collector.register(); 140 drop(collector); 141 142 for _ in 0..100 { 143 let guard = &handle.pin(); 144 unsafe { 145 let a = Owned::new(7).into_shared(guard); 146 guard.defer_destroy(a); 147 148 assert!(!(*(*guard.local).bag.get()).is_empty()); 149 150 while !(*(*guard.local).bag.get()).is_empty() { 151 guard.flush(); 152 } 153 } 154 } 155 } 156 157 #[test] garbage_buffering()158 fn garbage_buffering() { 159 let collector = Collector::new(); 160 let handle = collector.register(); 161 drop(collector); 162 163 let guard = &handle.pin(); 164 unsafe { 165 for _ in 0..10 { 166 let a = Owned::new(7).into_shared(guard); 167 guard.defer_destroy(a); 168 } 169 assert!(!(*(*guard.local).bag.get()).is_empty()); 170 } 171 } 172 173 #[test] pin_holds_advance()174 fn pin_holds_advance() { 175 let collector = Collector::new(); 176 177 thread::scope(|scope| { 178 for _ in 0..NUM_THREADS { 179 scope.spawn(|_| { 180 let handle = collector.register(); 181 for _ in 0..500_000 { 182 let guard = &handle.pin(); 183 184 let before = collector.global.epoch.load(Ordering::Relaxed); 185 collector.global.collect(guard); 186 let after = collector.global.epoch.load(Ordering::Relaxed); 187 188 assert!(after.wrapping_sub(before) <= 2); 189 } 190 }); 191 } 192 }) 193 .unwrap(); 194 } 195 196 #[test] incremental()197 fn incremental() { 198 const COUNT: usize = 100_000; 199 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 200 201 let collector = Collector::new(); 202 let handle = collector.register(); 203 204 unsafe { 205 let guard = &handle.pin(); 206 for _ in 0..COUNT { 207 let a = Owned::new(7i32).into_shared(guard); 208 guard.defer_unchecked(move || { 209 drop(a.into_owned()); 210 DESTROYS.fetch_add(1, Ordering::Relaxed); 211 }); 212 } 213 guard.flush(); 214 } 215 216 let mut last = 0; 217 218 while last < COUNT { 219 let curr = DESTROYS.load(Ordering::Relaxed); 220 assert!(curr - last <= 1024); 221 last = curr; 222 223 let guard = &handle.pin(); 224 collector.global.collect(guard); 225 } 226 assert!(DESTROYS.load(Ordering::Relaxed) == 100_000); 227 } 228 229 #[test] buffering()230 fn buffering() { 231 const COUNT: usize = 10; 232 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 233 234 let collector = Collector::new(); 235 let handle = collector.register(); 236 237 unsafe { 238 let guard = &handle.pin(); 239 for _ in 0..COUNT { 240 let a = Owned::new(7i32).into_shared(guard); 241 guard.defer_unchecked(move || { 242 drop(a.into_owned()); 243 DESTROYS.fetch_add(1, Ordering::Relaxed); 244 }); 245 } 246 } 247 248 for _ in 0..100_000 { 249 collector.global.collect(&handle.pin()); 250 } 251 assert!(DESTROYS.load(Ordering::Relaxed) < COUNT); 252 253 handle.pin().flush(); 254 255 while DESTROYS.load(Ordering::Relaxed) < COUNT { 256 let guard = &handle.pin(); 257 collector.global.collect(guard); 258 } 259 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); 260 } 261 262 #[test] count_drops()263 fn count_drops() { 264 const COUNT: usize = 100_000; 265 static DROPS: AtomicUsize = AtomicUsize::new(0); 266 267 struct Elem(i32); 268 269 impl Drop for Elem { 270 fn drop(&mut self) { 271 DROPS.fetch_add(1, Ordering::Relaxed); 272 } 273 } 274 275 let collector = Collector::new(); 276 let handle = collector.register(); 277 278 unsafe { 279 let guard = &handle.pin(); 280 281 for _ in 0..COUNT { 282 let a = Owned::new(Elem(7i32)).into_shared(guard); 283 guard.defer_destroy(a); 284 } 285 guard.flush(); 286 } 287 288 while DROPS.load(Ordering::Relaxed) < COUNT { 289 let guard = &handle.pin(); 290 collector.global.collect(guard); 291 } 292 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT); 293 } 294 295 #[test] count_destroy()296 fn count_destroy() { 297 const COUNT: usize = 100_000; 298 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 299 300 let collector = Collector::new(); 301 let handle = collector.register(); 302 303 unsafe { 304 let guard = &handle.pin(); 305 306 for _ in 0..COUNT { 307 let a = Owned::new(7i32).into_shared(guard); 308 guard.defer_unchecked(move || { 309 drop(a.into_owned()); 310 DESTROYS.fetch_add(1, Ordering::Relaxed); 311 }); 312 } 313 guard.flush(); 314 } 315 316 while DESTROYS.load(Ordering::Relaxed) < COUNT { 317 let guard = &handle.pin(); 318 collector.global.collect(guard); 319 } 320 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); 321 } 322 323 #[test] drop_array()324 fn drop_array() { 325 const COUNT: usize = 700; 326 static DROPS: AtomicUsize = AtomicUsize::new(0); 327 328 struct Elem(i32); 329 330 impl Drop for Elem { 331 fn drop(&mut self) { 332 DROPS.fetch_add(1, Ordering::Relaxed); 333 } 334 } 335 336 let collector = Collector::new(); 337 let handle = collector.register(); 338 339 let mut guard = handle.pin(); 340 341 let mut v = Vec::with_capacity(COUNT); 342 for i in 0..COUNT { 343 v.push(Elem(i as i32)); 344 } 345 346 { 347 let a = Owned::new(v).into_shared(&guard); 348 unsafe { 349 guard.defer_destroy(a); 350 } 351 guard.flush(); 352 } 353 354 while DROPS.load(Ordering::Relaxed) < COUNT { 355 guard.repin(); 356 collector.global.collect(&guard); 357 } 358 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT); 359 } 360 361 #[test] destroy_array()362 fn destroy_array() { 363 const COUNT: usize = 100_000; 364 static DESTROYS: AtomicUsize = AtomicUsize::new(0); 365 366 let collector = Collector::new(); 367 let handle = collector.register(); 368 369 unsafe { 370 let guard = &handle.pin(); 371 372 let mut v = Vec::with_capacity(COUNT); 373 for i in 0..COUNT { 374 v.push(i as i32); 375 } 376 377 let ptr = v.as_mut_ptr() as usize; 378 let len = v.len(); 379 guard.defer_unchecked(move || { 380 drop(Vec::from_raw_parts(ptr as *const u8 as *mut u8, len, len)); 381 DESTROYS.fetch_add(len, Ordering::Relaxed); 382 }); 383 guard.flush(); 384 385 mem::forget(v); 386 } 387 388 while DESTROYS.load(Ordering::Relaxed) < COUNT { 389 let guard = &handle.pin(); 390 collector.global.collect(guard); 391 } 392 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); 393 } 394 395 #[test] stress()396 fn stress() { 397 const THREADS: usize = 8; 398 const COUNT: usize = 100_000; 399 static DROPS: AtomicUsize = AtomicUsize::new(0); 400 401 struct Elem(i32); 402 403 impl Drop for Elem { 404 fn drop(&mut self) { 405 DROPS.fetch_add(1, Ordering::Relaxed); 406 } 407 } 408 409 let collector = Collector::new(); 410 411 thread::scope(|scope| { 412 for _ in 0..THREADS { 413 scope.spawn(|_| { 414 let handle = collector.register(); 415 for _ in 0..COUNT { 416 let guard = &handle.pin(); 417 unsafe { 418 let a = Owned::new(Elem(7i32)).into_shared(guard); 419 guard.defer_destroy(a); 420 } 421 } 422 }); 423 } 424 }) 425 .unwrap(); 426 427 let handle = collector.register(); 428 while DROPS.load(Ordering::Relaxed) < COUNT * THREADS { 429 let guard = &handle.pin(); 430 collector.global.collect(guard); 431 } 432 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS); 433 } 434 } 435