1 /// Epoch-based garbage collector.
2 ///
3 /// # Examples
4 ///
5 /// ```
6 /// use crossbeam_epoch::Collector;
7 ///
8 /// let collector = Collector::new();
9 ///
10 /// let handle = collector.register();
11 /// drop(collector); // `handle` still works after dropping `collector`
12 ///
13 /// handle.pin().flush();
14 /// ```
15 use alloc::sync::Arc;
16 use core::fmt;
17 
18 use guard::Guard;
19 use internal::{Global, Local};
20 
21 /// An epoch-based garbage collector.
22 pub struct Collector {
23     pub(crate) global: Arc<Global>,
24 }
25 
26 unsafe impl Send for Collector {}
27 unsafe impl Sync for Collector {}
28 
29 impl Collector {
30     /// Creates a new collector.
new() -> Self31     pub fn new() -> Self {
32         Collector {
33             global: Arc::new(Global::new()),
34         }
35     }
36 
37     /// Registers a new handle for the collector.
register(&self) -> LocalHandle38     pub fn register(&self) -> LocalHandle {
39         Local::register(self)
40     }
41 }
42 
43 impl Clone for Collector {
44     /// Creates another reference to the same garbage collector.
clone(&self) -> Self45     fn clone(&self) -> Self {
46         Collector {
47             global: self.global.clone(),
48         }
49     }
50 }
51 
52 impl fmt::Debug for Collector {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result53     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
54         f.pad("Collector { .. }")
55     }
56 }
57 
58 impl PartialEq for Collector {
59     /// Checks if both handles point to the same collector.
eq(&self, rhs: &Collector) -> bool60     fn eq(&self, rhs: &Collector) -> bool {
61         Arc::ptr_eq(&self.global, &rhs.global)
62     }
63 }
64 impl Eq for Collector {}
65 
66 /// A handle to a garbage collector.
67 pub struct LocalHandle {
68     pub(crate) local: *const Local,
69 }
70 
71 impl LocalHandle {
72     /// Pins the handle.
73     #[inline]
pin(&self) -> Guard74     pub fn pin(&self) -> Guard {
75         unsafe { (*self.local).pin() }
76     }
77 
78     /// Returns `true` if the handle is pinned.
79     #[inline]
is_pinned(&self) -> bool80     pub fn is_pinned(&self) -> bool {
81         unsafe { (*self.local).is_pinned() }
82     }
83 
84     /// Returns the `Collector` associated with this handle.
85     #[inline]
collector(&self) -> &Collector86     pub fn collector(&self) -> &Collector {
87         unsafe { (*self.local).collector() }
88     }
89 }
90 
91 impl Drop for LocalHandle {
92     #[inline]
drop(&mut self)93     fn drop(&mut self) {
94         unsafe {
95             Local::release_handle(&*self.local);
96         }
97     }
98 }
99 
100 impl fmt::Debug for LocalHandle {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result101     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102         f.pad("LocalHandle { .. }")
103     }
104 }
105 
106 #[cfg(test)]
107 mod tests {
108     use std::mem;
109     use std::sync::atomic::{AtomicUsize, Ordering};
110 
111     use crossbeam_utils::thread;
112 
113     use {Collector, Owned};
114 
115     const NUM_THREADS: usize = 8;
116 
117     #[test]
pin_reentrant()118     fn pin_reentrant() {
119         let collector = Collector::new();
120         let handle = collector.register();
121         drop(collector);
122 
123         assert!(!handle.is_pinned());
124         {
125             let _guard = &handle.pin();
126             assert!(handle.is_pinned());
127             {
128                 let _guard = &handle.pin();
129                 assert!(handle.is_pinned());
130             }
131             assert!(handle.is_pinned());
132         }
133         assert!(!handle.is_pinned());
134     }
135 
136     #[test]
flush_local_bag()137     fn flush_local_bag() {
138         let collector = Collector::new();
139         let handle = collector.register();
140         drop(collector);
141 
142         for _ in 0..100 {
143             let guard = &handle.pin();
144             unsafe {
145                 let a = Owned::new(7).into_shared(guard);
146                 guard.defer_destroy(a);
147 
148                 assert!(!(*(*guard.local).bag.get()).is_empty());
149 
150                 while !(*(*guard.local).bag.get()).is_empty() {
151                     guard.flush();
152                 }
153             }
154         }
155     }
156 
157     #[test]
garbage_buffering()158     fn garbage_buffering() {
159         let collector = Collector::new();
160         let handle = collector.register();
161         drop(collector);
162 
163         let guard = &handle.pin();
164         unsafe {
165             for _ in 0..10 {
166                 let a = Owned::new(7).into_shared(guard);
167                 guard.defer_destroy(a);
168             }
169             assert!(!(*(*guard.local).bag.get()).is_empty());
170         }
171     }
172 
173     #[test]
pin_holds_advance()174     fn pin_holds_advance() {
175         let collector = Collector::new();
176 
177         thread::scope(|scope| {
178             for _ in 0..NUM_THREADS {
179                 scope.spawn(|_| {
180                     let handle = collector.register();
181                     for _ in 0..500_000 {
182                         let guard = &handle.pin();
183 
184                         let before = collector.global.epoch.load(Ordering::Relaxed);
185                         collector.global.collect(guard);
186                         let after = collector.global.epoch.load(Ordering::Relaxed);
187 
188                         assert!(after.wrapping_sub(before) <= 2);
189                     }
190                 });
191             }
192         })
193         .unwrap();
194     }
195 
196     #[test]
incremental()197     fn incremental() {
198         const COUNT: usize = 100_000;
199         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
200 
201         let collector = Collector::new();
202         let handle = collector.register();
203 
204         unsafe {
205             let guard = &handle.pin();
206             for _ in 0..COUNT {
207                 let a = Owned::new(7i32).into_shared(guard);
208                 guard.defer_unchecked(move || {
209                     drop(a.into_owned());
210                     DESTROYS.fetch_add(1, Ordering::Relaxed);
211                 });
212             }
213             guard.flush();
214         }
215 
216         let mut last = 0;
217 
218         while last < COUNT {
219             let curr = DESTROYS.load(Ordering::Relaxed);
220             assert!(curr - last <= 1024);
221             last = curr;
222 
223             let guard = &handle.pin();
224             collector.global.collect(guard);
225         }
226         assert!(DESTROYS.load(Ordering::Relaxed) == 100_000);
227     }
228 
229     #[test]
buffering()230     fn buffering() {
231         const COUNT: usize = 10;
232         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
233 
234         let collector = Collector::new();
235         let handle = collector.register();
236 
237         unsafe {
238             let guard = &handle.pin();
239             for _ in 0..COUNT {
240                 let a = Owned::new(7i32).into_shared(guard);
241                 guard.defer_unchecked(move || {
242                     drop(a.into_owned());
243                     DESTROYS.fetch_add(1, Ordering::Relaxed);
244                 });
245             }
246         }
247 
248         for _ in 0..100_000 {
249             collector.global.collect(&handle.pin());
250         }
251         assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);
252 
253         handle.pin().flush();
254 
255         while DESTROYS.load(Ordering::Relaxed) < COUNT {
256             let guard = &handle.pin();
257             collector.global.collect(guard);
258         }
259         assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
260     }
261 
262     #[test]
count_drops()263     fn count_drops() {
264         const COUNT: usize = 100_000;
265         static DROPS: AtomicUsize = AtomicUsize::new(0);
266 
267         struct Elem(i32);
268 
269         impl Drop for Elem {
270             fn drop(&mut self) {
271                 DROPS.fetch_add(1, Ordering::Relaxed);
272             }
273         }
274 
275         let collector = Collector::new();
276         let handle = collector.register();
277 
278         unsafe {
279             let guard = &handle.pin();
280 
281             for _ in 0..COUNT {
282                 let a = Owned::new(Elem(7i32)).into_shared(guard);
283                 guard.defer_destroy(a);
284             }
285             guard.flush();
286         }
287 
288         while DROPS.load(Ordering::Relaxed) < COUNT {
289             let guard = &handle.pin();
290             collector.global.collect(guard);
291         }
292         assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
293     }
294 
295     #[test]
count_destroy()296     fn count_destroy() {
297         const COUNT: usize = 100_000;
298         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
299 
300         let collector = Collector::new();
301         let handle = collector.register();
302 
303         unsafe {
304             let guard = &handle.pin();
305 
306             for _ in 0..COUNT {
307                 let a = Owned::new(7i32).into_shared(guard);
308                 guard.defer_unchecked(move || {
309                     drop(a.into_owned());
310                     DESTROYS.fetch_add(1, Ordering::Relaxed);
311                 });
312             }
313             guard.flush();
314         }
315 
316         while DESTROYS.load(Ordering::Relaxed) < COUNT {
317             let guard = &handle.pin();
318             collector.global.collect(guard);
319         }
320         assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
321     }
322 
323     #[test]
drop_array()324     fn drop_array() {
325         const COUNT: usize = 700;
326         static DROPS: AtomicUsize = AtomicUsize::new(0);
327 
328         struct Elem(i32);
329 
330         impl Drop for Elem {
331             fn drop(&mut self) {
332                 DROPS.fetch_add(1, Ordering::Relaxed);
333             }
334         }
335 
336         let collector = Collector::new();
337         let handle = collector.register();
338 
339         let mut guard = handle.pin();
340 
341         let mut v = Vec::with_capacity(COUNT);
342         for i in 0..COUNT {
343             v.push(Elem(i as i32));
344         }
345 
346         {
347             let a = Owned::new(v).into_shared(&guard);
348             unsafe {
349                 guard.defer_destroy(a);
350             }
351             guard.flush();
352         }
353 
354         while DROPS.load(Ordering::Relaxed) < COUNT {
355             guard.repin();
356             collector.global.collect(&guard);
357         }
358         assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
359     }
360 
361     #[test]
destroy_array()362     fn destroy_array() {
363         const COUNT: usize = 100_000;
364         static DESTROYS: AtomicUsize = AtomicUsize::new(0);
365 
366         let collector = Collector::new();
367         let handle = collector.register();
368 
369         unsafe {
370             let guard = &handle.pin();
371 
372             let mut v = Vec::with_capacity(COUNT);
373             for i in 0..COUNT {
374                 v.push(i as i32);
375             }
376 
377             let ptr = v.as_mut_ptr() as usize;
378             let len = v.len();
379             guard.defer_unchecked(move || {
380                 drop(Vec::from_raw_parts(ptr as *const u8 as *mut u8, len, len));
381                 DESTROYS.fetch_add(len, Ordering::Relaxed);
382             });
383             guard.flush();
384 
385             mem::forget(v);
386         }
387 
388         while DESTROYS.load(Ordering::Relaxed) < COUNT {
389             let guard = &handle.pin();
390             collector.global.collect(guard);
391         }
392         assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
393     }
394 
395     #[test]
stress()396     fn stress() {
397         const THREADS: usize = 8;
398         const COUNT: usize = 100_000;
399         static DROPS: AtomicUsize = AtomicUsize::new(0);
400 
401         struct Elem(i32);
402 
403         impl Drop for Elem {
404             fn drop(&mut self) {
405                 DROPS.fetch_add(1, Ordering::Relaxed);
406             }
407         }
408 
409         let collector = Collector::new();
410 
411         thread::scope(|scope| {
412             for _ in 0..THREADS {
413                 scope.spawn(|_| {
414                     let handle = collector.register();
415                     for _ in 0..COUNT {
416                         let guard = &handle.pin();
417                         unsafe {
418                             let a = Owned::new(Elem(7i32)).into_shared(guard);
419                             guard.defer_destroy(a);
420                         }
421                     }
422                 });
423             }
424         })
425         .unwrap();
426 
427         let handle = collector.register();
428         while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
429             let guard = &handle.pin();
430             collector.global.collect(guard);
431         }
432         assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
433     }
434 }
435