1 /// Epoch-based garbage collector.
2 ///
3 /// # Examples
4 ///
5 /// ```
6 /// use crossbeam_epoch::Collector;
7 ///
8 /// let collector = Collector::new();
9 ///
10 /// let handle = collector.register();
11 /// drop(collector); // `handle` still works after dropping `collector`
12 ///
13 /// handle.pin().flush();
14 /// ```
15 use core::fmt;
16 
17 use crate::guard::Guard;
18 use crate::internal::{Global, Local};
19 use crate::primitive::sync::Arc;
20 
21 /// An epoch-based garbage collector.
22 pub struct Collector {
23     pub(crate) global: Arc<Global>,
24 }
25 
26 unsafe impl Send for Collector {}
27 unsafe impl Sync for Collector {}
28 
29 impl Default for Collector {
default() -> Self30     fn default() -> Self {
31         Self {
32             global: Arc::new(Global::new()),
33         }
34     }
35 }
36 
37 impl Collector {
38     /// Creates a new collector.
new() -> Self39     pub fn new() -> Self {
40         Self::default()
41     }
42 
43     /// Registers a new handle for the collector.
register(&self) -> LocalHandle44     pub fn register(&self) -> LocalHandle {
45         Local::register(self)
46     }
47 }
48 
49 impl Clone for Collector {
50     /// Creates another reference to the same garbage collector.
clone(&self) -> Self51     fn clone(&self) -> Self {
52         Collector {
53             global: self.global.clone(),
54         }
55     }
56 }
57 
58 impl fmt::Debug for Collector {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result59     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60         f.pad("Collector { .. }")
61     }
62 }
63 
64 impl PartialEq for Collector {
65     /// Checks if both handles point to the same collector.
eq(&self, rhs: &Collector) -> bool66     fn eq(&self, rhs: &Collector) -> bool {
67         Arc::ptr_eq(&self.global, &rhs.global)
68     }
69 }
70 impl Eq for Collector {}
71 
72 /// A handle to a garbage collector.
73 pub struct LocalHandle {
74     pub(crate) local: *const Local,
75 }
76 
77 impl LocalHandle {
78     /// Pins the handle.
79     #[inline]
pin(&self) -> Guard80     pub fn pin(&self) -> Guard {
81         unsafe { (*self.local).pin() }
82     }
83 
84     /// Returns `true` if the handle is pinned.
85     #[inline]
is_pinned(&self) -> bool86     pub fn is_pinned(&self) -> bool {
87         unsafe { (*self.local).is_pinned() }
88     }
89 
90     /// Returns the `Collector` associated with this handle.
91     #[inline]
collector(&self) -> &Collector92     pub fn collector(&self) -> &Collector {
93         unsafe { (*self.local).collector() }
94     }
95 }
96 
97 impl Drop for LocalHandle {
98     #[inline]
drop(&mut self)99     fn drop(&mut self) {
100         unsafe {
101             Local::release_handle(&*self.local);
102         }
103     }
104 }
105 
106 impl fmt::Debug for LocalHandle {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result107     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108         f.pad("LocalHandle { .. }")
109     }
110 }
111 
112 #[cfg(all(test, not(crossbeam_loom)))]
113 mod tests {
114     use std::mem;
115     use std::sync::atomic::{AtomicUsize, Ordering};
116 
117     use crossbeam_utils::thread;
118 
119     use crate::{Collector, Owned};
120 
121     const NUM_THREADS: usize = 8;
122 
123     #[test]
pin_reentrant()124     fn pin_reentrant() {
125         let collector = Collector::new();
126         let handle = collector.register();
127         drop(collector);
128 
129         assert!(!handle.is_pinned());
130         {
131             let _guard = &handle.pin();
132             assert!(handle.is_pinned());
133             {
134                 let _guard = &handle.pin();
135                 assert!(handle.is_pinned());
136             }
137             assert!(handle.is_pinned());
138         }
139         assert!(!handle.is_pinned());
140     }
141 
142     #[test]
flush_local_bag()143     fn flush_local_bag() {
144         let collector = Collector::new();
145         let handle = collector.register();
146         drop(collector);
147 
148         for _ in 0..100 {
149             let guard = &handle.pin();
150             unsafe {
151                 let a = Owned::new(7).into_shared(guard);
152                 guard.defer_destroy(a);
153 
154                 assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
155 
156                 while !(*guard.local).bag.with(|b| (*b).is_empty()) {
157                     guard.flush();
158                 }
159             }
160         }
161     }
162 
163     #[test]
garbage_buffering()164     fn garbage_buffering() {
165         let collector = Collector::new();
166         let handle = collector.register();
167         drop(collector);
168 
169         let guard = &handle.pin();
170         unsafe {
171             for _ in 0..10 {
172                 let a = Owned::new(7).into_shared(guard);
173                 guard.defer_destroy(a);
174             }
175             assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
176         }
177     }
178 
179     #[test]
pin_holds_advance()180     fn pin_holds_advance() {
181         let collector = Collector::new();
182 
183         thread::scope(|scope| {
184             for _ in 0..NUM_THREADS {
185                 scope.spawn(|_| {
186                     let handle = collector.register();
187                     for _ in 0..500_000 {
188                         let guard = &handle.pin();
189 
190                         let before = collector.global.epoch.load(Ordering::Relaxed);
191                         collector.global.collect(guard);
192                         let after = collector.global.epoch.load(Ordering::Relaxed);
193 
194                         assert!(after.wrapping_sub(before) <= 2);
195                     }
196                 });
197             }
198         })
199         .unwrap();
200     }
201 
202     #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS`
203     #[test]
incremental()204     fn incremental() {
205         const COUNT: usize = 100_000;
206         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
207 
208         let collector = Collector::new();
209         let handle = collector.register();
210 
211         unsafe {
212             let guard = &handle.pin();
213             for _ in 0..COUNT {
214                 let a = Owned::new(7i32).into_shared(guard);
215                 guard.defer_unchecked(move || {
216                     drop(a.into_owned());
217                     DESTROYS.fetch_add(1, Ordering::Relaxed);
218                 });
219             }
220             guard.flush();
221         }
222 
223         let mut last = 0;
224 
225         while last < COUNT {
226             let curr = DESTROYS.load(Ordering::Relaxed);
227             assert!(curr - last <= 1024);
228             last = curr;
229 
230             let guard = &handle.pin();
231             collector.global.collect(guard);
232         }
233         assert!(DESTROYS.load(Ordering::Relaxed) == 100_000);
234     }
235 
236     #[test]
buffering()237     fn buffering() {
238         const COUNT: usize = 10;
239         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
240 
241         let collector = Collector::new();
242         let handle = collector.register();
243 
244         unsafe {
245             let guard = &handle.pin();
246             for _ in 0..COUNT {
247                 let a = Owned::new(7i32).into_shared(guard);
248                 guard.defer_unchecked(move || {
249                     drop(a.into_owned());
250                     DESTROYS.fetch_add(1, Ordering::Relaxed);
251                 });
252             }
253         }
254 
255         for _ in 0..100_000 {
256             collector.global.collect(&handle.pin());
257         }
258         assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);
259 
260         handle.pin().flush();
261 
262         while DESTROYS.load(Ordering::Relaxed) < COUNT {
263             let guard = &handle.pin();
264             collector.global.collect(guard);
265         }
266         assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
267     }
268 
269     #[test]
count_drops()270     fn count_drops() {
271         const COUNT: usize = 100_000;
272         static DROPS: AtomicUsize = AtomicUsize::new(0);
273 
274         struct Elem(i32);
275 
276         impl Drop for Elem {
277             fn drop(&mut self) {
278                 DROPS.fetch_add(1, Ordering::Relaxed);
279             }
280         }
281 
282         let collector = Collector::new();
283         let handle = collector.register();
284 
285         unsafe {
286             let guard = &handle.pin();
287 
288             for _ in 0..COUNT {
289                 let a = Owned::new(Elem(7i32)).into_shared(guard);
290                 guard.defer_destroy(a);
291             }
292             guard.flush();
293         }
294 
295         while DROPS.load(Ordering::Relaxed) < COUNT {
296             let guard = &handle.pin();
297             collector.global.collect(guard);
298         }
299         assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
300     }
301 
302     #[test]
count_destroy()303     fn count_destroy() {
304         const COUNT: usize = 100_000;
305         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
306 
307         let collector = Collector::new();
308         let handle = collector.register();
309 
310         unsafe {
311             let guard = &handle.pin();
312 
313             for _ in 0..COUNT {
314                 let a = Owned::new(7i32).into_shared(guard);
315                 guard.defer_unchecked(move || {
316                     drop(a.into_owned());
317                     DESTROYS.fetch_add(1, Ordering::Relaxed);
318                 });
319             }
320             guard.flush();
321         }
322 
323         while DESTROYS.load(Ordering::Relaxed) < COUNT {
324             let guard = &handle.pin();
325             collector.global.collect(guard);
326         }
327         assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
328     }
329 
330     #[test]
drop_array()331     fn drop_array() {
332         const COUNT: usize = 700;
333         static DROPS: AtomicUsize = AtomicUsize::new(0);
334 
335         struct Elem(i32);
336 
337         impl Drop for Elem {
338             fn drop(&mut self) {
339                 DROPS.fetch_add(1, Ordering::Relaxed);
340             }
341         }
342 
343         let collector = Collector::new();
344         let handle = collector.register();
345 
346         let mut guard = handle.pin();
347 
348         let mut v = Vec::with_capacity(COUNT);
349         for i in 0..COUNT {
350             v.push(Elem(i as i32));
351         }
352 
353         {
354             let a = Owned::new(v).into_shared(&guard);
355             unsafe {
356                 guard.defer_destroy(a);
357             }
358             guard.flush();
359         }
360 
361         while DROPS.load(Ordering::Relaxed) < COUNT {
362             guard.repin();
363             collector.global.collect(&guard);
364         }
365         assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
366     }
367 
368     #[test]
destroy_array()369     fn destroy_array() {
370         const COUNT: usize = 100_000;
371         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
372 
373         let collector = Collector::new();
374         let handle = collector.register();
375 
376         unsafe {
377             let guard = &handle.pin();
378 
379             let mut v = Vec::with_capacity(COUNT);
380             for i in 0..COUNT {
381                 v.push(i as i32);
382             }
383 
384             let ptr = v.as_mut_ptr() as usize;
385             let len = v.len();
386             guard.defer_unchecked(move || {
387                 drop(Vec::from_raw_parts(ptr as *const i32 as *mut i32, len, len));
388                 DESTROYS.fetch_add(len, Ordering::Relaxed);
389             });
390             guard.flush();
391 
392             mem::forget(v);
393         }
394 
395         while DESTROYS.load(Ordering::Relaxed) < COUNT {
396             let guard = &handle.pin();
397             collector.global.collect(guard);
398         }
399         assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
400     }
401 
402     #[test]
stress()403     fn stress() {
404         const THREADS: usize = 8;
405         const COUNT: usize = 100_000;
406         static DROPS: AtomicUsize = AtomicUsize::new(0);
407 
408         struct Elem(i32);
409 
410         impl Drop for Elem {
411             fn drop(&mut self) {
412                 DROPS.fetch_add(1, Ordering::Relaxed);
413             }
414         }
415 
416         let collector = Collector::new();
417 
418         thread::scope(|scope| {
419             for _ in 0..THREADS {
420                 scope.spawn(|_| {
421                     let handle = collector.register();
422                     for _ in 0..COUNT {
423                         let guard = &handle.pin();
424                         unsafe {
425                             let a = Owned::new(Elem(7i32)).into_shared(guard);
426                             guard.defer_destroy(a);
427                         }
428                     }
429                 });
430             }
431         })
432         .unwrap();
433 
434         let handle = collector.register();
435         while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
436             let guard = &handle.pin();
437             collector.global.collect(guard);
438         }
439         assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
440     }
441 }
442