1 //! The global data and participant for garbage collection.
2 //!
3 //! # Registration
4 //!
5 //! In order to track all participants in one place, we need some form of participant
6 //! registration. When a participant is created, it is registered to a global lock-free
7 //! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8 //! list.
9 //!
10 //! # Pinning
11 //!
12 //! Every participant contains an integer that tells whether the participant is pinned and if so,
13 //! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14 //! aids in periodic global epoch advancement.
15 //!
16 //! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17 //! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18 //!
19 //! # Thread-local bag
20 //!
21 //! Objects that get unlinked from concurrent data structures must be stashed away until the global
22 //! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23 //! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24 //! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25 //! for amortizing the synchronization cost of pushing the garbages to a global queue.
26 //!
27 //! # Global queue
28 //!
29 //! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30 //! destroyed along the way. This design reduces contention on data structures. The global queue
31 //! cannot be explicitly accessed: the only way to interact with it is by calling functions
32 //! `defer()` that adds an object tothe thread-local bag, or `collect()` that manually triggers
33 //! garbage collection.
34 //!
35 //! Ideally each instance of concurrent data structure may have its own queue that gets fully
36 //! destroyed as soon as the data structure gets dropped.
37
38 use core::cell::{Cell, UnsafeCell};
39 use core::mem::{self, ManuallyDrop};
40 use core::num::Wrapping;
41 use core::sync::atomic;
42 use core::sync::atomic::Ordering;
43 use core::{fmt, ptr};
44
45 use crossbeam_utils::CachePadded;
46
47 use atomic::{Owned, Shared};
48 use collector::{Collector, LocalHandle};
49 use deferred::Deferred;
50 use epoch::{AtomicEpoch, Epoch};
51 use guard::{unprotected, Guard};
52 use sync::list::{Entry, IsElement, IterError, List};
53 use sync::queue::Queue;
54
55 /// Maximum number of objects a bag can contain.
56 #[cfg(not(feature = "sanitize"))]
57 const MAX_OBJECTS: usize = 64;
58 #[cfg(feature = "sanitize")]
59 const MAX_OBJECTS: usize = 4;
60
61 /// A bag of deferred functions.
62 pub struct Bag {
63 /// Stashed objects.
64 deferreds: [Deferred; MAX_OBJECTS],
65 len: usize,
66 }
67
68 /// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
69 unsafe impl Send for Bag {}
70
71 impl Bag {
72 /// Returns a new, empty bag.
new() -> Self73 pub fn new() -> Self {
74 Self::default()
75 }
76
77 /// Returns `true` if the bag is empty.
is_empty(&self) -> bool78 pub fn is_empty(&self) -> bool {
79 self.len == 0
80 }
81
82 /// Attempts to insert a deferred function into the bag.
83 ///
84 /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
85 /// full.
86 ///
87 /// # Safety
88 ///
89 /// It should be safe for another thread to execute the given function.
try_push(&mut self, deferred: Deferred) -> Result<(), Deferred>90 pub unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
91 if self.len < MAX_OBJECTS {
92 self.deferreds[self.len] = deferred;
93 self.len += 1;
94 Ok(())
95 } else {
96 Err(deferred)
97 }
98 }
99
100 /// Seals the bag with the given epoch.
seal(self, epoch: Epoch) -> SealedBag101 fn seal(self, epoch: Epoch) -> SealedBag {
102 SealedBag { epoch, bag: self }
103 }
104 }
105
106 impl Default for Bag {
107 // TODO(taiki-e): when the minimum supported Rust version is bumped to 1.31+,
108 // replace this with `#[rustfmt::skip]`.
109 #[cfg_attr(rustfmt, rustfmt_skip)]
default() -> Self110 fn default() -> Self {
111 // TODO: [no_op; MAX_OBJECTS] syntax blocked by https://github.com/rust-lang/rust/issues/49147
112 #[cfg(not(feature = "sanitize"))]
113 return Bag {
114 len: 0,
115 deferreds: [
116 Deferred::new(no_op_func),
117 Deferred::new(no_op_func),
118 Deferred::new(no_op_func),
119 Deferred::new(no_op_func),
120 Deferred::new(no_op_func),
121 Deferred::new(no_op_func),
122 Deferred::new(no_op_func),
123 Deferred::new(no_op_func),
124 Deferred::new(no_op_func),
125 Deferred::new(no_op_func),
126 Deferred::new(no_op_func),
127 Deferred::new(no_op_func),
128 Deferred::new(no_op_func),
129 Deferred::new(no_op_func),
130 Deferred::new(no_op_func),
131 Deferred::new(no_op_func),
132 Deferred::new(no_op_func),
133 Deferred::new(no_op_func),
134 Deferred::new(no_op_func),
135 Deferred::new(no_op_func),
136 Deferred::new(no_op_func),
137 Deferred::new(no_op_func),
138 Deferred::new(no_op_func),
139 Deferred::new(no_op_func),
140 Deferred::new(no_op_func),
141 Deferred::new(no_op_func),
142 Deferred::new(no_op_func),
143 Deferred::new(no_op_func),
144 Deferred::new(no_op_func),
145 Deferred::new(no_op_func),
146 Deferred::new(no_op_func),
147 Deferred::new(no_op_func),
148 Deferred::new(no_op_func),
149 Deferred::new(no_op_func),
150 Deferred::new(no_op_func),
151 Deferred::new(no_op_func),
152 Deferred::new(no_op_func),
153 Deferred::new(no_op_func),
154 Deferred::new(no_op_func),
155 Deferred::new(no_op_func),
156 Deferred::new(no_op_func),
157 Deferred::new(no_op_func),
158 Deferred::new(no_op_func),
159 Deferred::new(no_op_func),
160 Deferred::new(no_op_func),
161 Deferred::new(no_op_func),
162 Deferred::new(no_op_func),
163 Deferred::new(no_op_func),
164 Deferred::new(no_op_func),
165 Deferred::new(no_op_func),
166 Deferred::new(no_op_func),
167 Deferred::new(no_op_func),
168 Deferred::new(no_op_func),
169 Deferred::new(no_op_func),
170 Deferred::new(no_op_func),
171 Deferred::new(no_op_func),
172 Deferred::new(no_op_func),
173 Deferred::new(no_op_func),
174 Deferred::new(no_op_func),
175 Deferred::new(no_op_func),
176 Deferred::new(no_op_func),
177 Deferred::new(no_op_func),
178 Deferred::new(no_op_func),
179 Deferred::new(no_op_func),
180 ],
181 };
182 #[cfg(feature = "sanitize")]
183 return Bag {
184 len: 0,
185 deferreds: [
186 Deferred::new(no_op_func),
187 Deferred::new(no_op_func),
188 Deferred::new(no_op_func),
189 Deferred::new(no_op_func),
190 ],
191 };
192 }
193 }
194
195 impl Drop for Bag {
drop(&mut self)196 fn drop(&mut self) {
197 // Call all deferred functions.
198 for deferred in &mut self.deferreds[..self.len] {
199 let no_op = Deferred::new(no_op_func);
200 let owned_deferred = mem::replace(deferred, no_op);
201 owned_deferred.call();
202 }
203 }
204 }
205
206 // can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
207 impl fmt::Debug for Bag {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result208 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
209 f.debug_struct("Bag")
210 .field("deferreds", &&self.deferreds[..self.len])
211 .finish()
212 }
213 }
214
no_op_func()215 fn no_op_func() {}
216
217 /// A pair of an epoch and a bag.
218 #[derive(Default, Debug)]
219 struct SealedBag {
220 epoch: Epoch,
221 bag: Bag,
222 }
223
224 /// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
225 unsafe impl Sync for SealedBag {}
226
227 impl SealedBag {
228 /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
is_expired(&self, global_epoch: Epoch) -> bool229 fn is_expired(&self, global_epoch: Epoch) -> bool {
230 // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
231 // is within one epoch of the current one cannot be destroyed yet.
232 global_epoch.wrapping_sub(self.epoch) >= 2
233 }
234 }
235
236 /// The global data for a garbage collector.
237 pub struct Global {
238 /// The intrusive linked list of `Local`s.
239 locals: List<Local>,
240
241 /// The global queue of bags of deferred functions.
242 queue: Queue<SealedBag>,
243
244 /// The global epoch.
245 pub(crate) epoch: CachePadded<AtomicEpoch>,
246 }
247
248 impl Global {
249 /// Number of bags to destroy.
250 const COLLECT_STEPS: usize = 8;
251
252 /// Creates a new global data for garbage collection.
253 #[inline]
new() -> Self254 pub fn new() -> Self {
255 Self {
256 locals: List::new(),
257 queue: Queue::new(),
258 epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
259 }
260 }
261
262 /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
push_bag(&self, bag: &mut Bag, guard: &Guard)263 pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
264 let bag = mem::replace(bag, Bag::new());
265
266 atomic::fence(Ordering::SeqCst);
267
268 let epoch = self.epoch.load(Ordering::Relaxed);
269 self.queue.push(bag.seal(epoch), guard);
270 }
271
272 /// Collects several bags from the global queue and executes deferred functions in them.
273 ///
274 /// Note: This may itself produce garbage and in turn allocate new bags.
275 ///
276 /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
277 /// path. In other words, we want the compiler to optimize branching for the case when
278 /// `collect()` is not called.
279 #[cold]
collect(&self, guard: &Guard)280 pub fn collect(&self, guard: &Guard) {
281 let global_epoch = self.try_advance(guard);
282
283 let steps = if cfg!(feature = "sanitize") {
284 usize::max_value()
285 } else {
286 Self::COLLECT_STEPS
287 };
288
289 for _ in 0..steps {
290 match self.queue.try_pop_if(
291 &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
292 guard,
293 ) {
294 None => break,
295 Some(sealed_bag) => drop(sealed_bag),
296 }
297 }
298 }
299
300 /// Attempts to advance the global epoch.
301 ///
302 /// The global epoch can advance only if all currently pinned participants have been pinned in
303 /// the current epoch.
304 ///
305 /// Returns the current global epoch.
306 ///
307 /// `try_advance()` is annotated `#[cold]` because it is rarely called.
308 #[cold]
try_advance(&self, guard: &Guard) -> Epoch309 pub fn try_advance(&self, guard: &Guard) -> Epoch {
310 let global_epoch = self.epoch.load(Ordering::Relaxed);
311 atomic::fence(Ordering::SeqCst);
312
313 // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
314 // easy to implement in a lock-free manner. However, traversal can be slow due to cache
315 // misses and data dependencies. We should experiment with other data structures as well.
316 for local in self.locals.iter(&guard) {
317 match local {
318 Err(IterError::Stalled) => {
319 // A concurrent thread stalled this iteration. That thread might also try to
320 // advance the epoch, in which case we leave the job to it. Otherwise, the
321 // epoch will not be advanced.
322 return global_epoch;
323 }
324 Ok(local) => {
325 let local_epoch = local.epoch.load(Ordering::Relaxed);
326
327 // If the participant was pinned in a different epoch, we cannot advance the
328 // global epoch just yet.
329 if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
330 return global_epoch;
331 }
332 }
333 }
334 }
335 atomic::fence(Ordering::Acquire);
336
337 // All pinned participants were pinned in the current global epoch.
338 // Now let's advance the global epoch...
339 //
340 // Note that if another thread already advanced it before us, this store will simply
341 // overwrite the global epoch with the same value. This is true because `try_advance` was
342 // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
343 // advanced two steps ahead of it.
344 let new_epoch = global_epoch.successor();
345 self.epoch.store(new_epoch, Ordering::Release);
346 new_epoch
347 }
348 }
349
350 /// Participant for garbage collection.
351 pub struct Local {
352 /// A node in the intrusive linked list of `Local`s.
353 entry: Entry,
354
355 /// The local epoch.
356 epoch: AtomicEpoch,
357
358 /// A reference to the global data.
359 ///
360 /// When all guards and handles get dropped, this reference is destroyed.
361 collector: UnsafeCell<ManuallyDrop<Collector>>,
362
363 /// The local bag of deferred functions.
364 pub(crate) bag: UnsafeCell<Bag>,
365
366 /// The number of guards keeping this participant pinned.
367 guard_count: Cell<usize>,
368
369 /// The number of active handles.
370 handle_count: Cell<usize>,
371
372 /// Total number of pinnings performed.
373 ///
374 /// This is just an auxilliary counter that sometimes kicks off collection.
375 pin_count: Cell<Wrapping<usize>>,
376 }
377
378 impl Local {
379 /// Number of pinnings after which a participant will execute some deferred functions from the
380 /// global queue.
381 const PINNINGS_BETWEEN_COLLECT: usize = 128;
382
383 /// Registers a new `Local` in the provided `Global`.
register(collector: &Collector) -> LocalHandle384 pub fn register(collector: &Collector) -> LocalHandle {
385 unsafe {
386 // Since we dereference no pointers in this block, it is safe to use `unprotected`.
387
388 let local = Owned::new(Local {
389 entry: Entry::default(),
390 epoch: AtomicEpoch::new(Epoch::starting()),
391 collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
392 bag: UnsafeCell::new(Bag::new()),
393 guard_count: Cell::new(0),
394 handle_count: Cell::new(1),
395 pin_count: Cell::new(Wrapping(0)),
396 })
397 .into_shared(&unprotected());
398 collector.global.locals.insert(local, &unprotected());
399 LocalHandle {
400 local: local.as_raw(),
401 }
402 }
403 }
404
405 /// Returns a reference to the `Global` in which this `Local` resides.
406 #[inline]
global(&self) -> &Global407 pub fn global(&self) -> &Global {
408 &self.collector().global
409 }
410
411 /// Returns a reference to the `Collector` in which this `Local` resides.
412 #[inline]
collector(&self) -> &Collector413 pub fn collector(&self) -> &Collector {
414 unsafe { &**self.collector.get() }
415 }
416
417 /// Returns `true` if the current participant is pinned.
418 #[inline]
is_pinned(&self) -> bool419 pub fn is_pinned(&self) -> bool {
420 self.guard_count.get() > 0
421 }
422
423 /// Adds `deferred` to the thread-local bag.
424 ///
425 /// # Safety
426 ///
427 /// It should be safe for another thread to execute the given function.
defer(&self, mut deferred: Deferred, guard: &Guard)428 pub unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
429 let bag = &mut *self.bag.get();
430
431 while let Err(d) = bag.try_push(deferred) {
432 self.global().push_bag(bag, guard);
433 deferred = d;
434 }
435 }
436
flush(&self, guard: &Guard)437 pub fn flush(&self, guard: &Guard) {
438 let bag = unsafe { &mut *self.bag.get() };
439
440 if !bag.is_empty() {
441 self.global().push_bag(bag, guard);
442 }
443
444 self.global().collect(guard);
445 }
446
447 /// Pins the `Local`.
448 #[inline]
pin(&self) -> Guard449 pub fn pin(&self) -> Guard {
450 let guard = Guard { local: self };
451
452 let guard_count = self.guard_count.get();
453 self.guard_count.set(guard_count.checked_add(1).unwrap());
454
455 if guard_count == 0 {
456 let global_epoch = self.global().epoch.load(Ordering::Relaxed);
457 let new_epoch = global_epoch.pinned();
458
459 // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
460 // The fence makes sure that any future loads from `Atomic`s will not happen before
461 // this store.
462 if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
463 // HACK(stjepang): On x86 architectures there are two different ways of executing
464 // a `SeqCst` fence.
465 //
466 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
467 // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg`
468 // instruction.
469 //
470 // Both instructions have the effect of a full barrier, but benchmarks have shown
471 // that the second one makes pinning faster in this particular case. It is not
472 // clear that this is permitted by the C++ memory model (SC fences work very
473 // differently from SC accesses), but experimental evidence suggests that this
474 // works fine. Using inline assembly would be a viable (and correct) alternative,
475 // but alas, that is not possible on stable Rust.
476 let current = Epoch::starting();
477 let previous = self
478 .epoch
479 .compare_and_swap(current, new_epoch, Ordering::SeqCst);
480 debug_assert_eq!(current, previous, "participant was expected to be unpinned");
481 // We add a compiler fence to make it less likely for LLVM to do something wrong
482 // here. Formally, this is not enough to get rid of data races; practically,
483 // it should go a long way.
484 atomic::compiler_fence(Ordering::SeqCst);
485 } else {
486 self.epoch.store(new_epoch, Ordering::Relaxed);
487 atomic::fence(Ordering::SeqCst);
488 }
489
490 // Increment the pin counter.
491 let count = self.pin_count.get();
492 self.pin_count.set(count + Wrapping(1));
493
494 // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
495 // some garbage.
496 if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
497 self.global().collect(&guard);
498 }
499 }
500
501 guard
502 }
503
504 /// Unpins the `Local`.
505 #[inline]
unpin(&self)506 pub fn unpin(&self) {
507 let guard_count = self.guard_count.get();
508 self.guard_count.set(guard_count - 1);
509
510 if guard_count == 1 {
511 self.epoch.store(Epoch::starting(), Ordering::Release);
512
513 if self.handle_count.get() == 0 {
514 self.finalize();
515 }
516 }
517 }
518
519 /// Unpins and then pins the `Local`.
520 #[inline]
repin(&self)521 pub fn repin(&self) {
522 let guard_count = self.guard_count.get();
523
524 // Update the local epoch only if there's only one guard.
525 if guard_count == 1 {
526 let epoch = self.epoch.load(Ordering::Relaxed);
527 let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
528
529 // Update the local epoch only if the global epoch is greater than the local epoch.
530 if epoch != global_epoch {
531 // We store the new epoch with `Release` because we need to ensure any memory
532 // accesses from the previous epoch do not leak into the new one.
533 self.epoch.store(global_epoch, Ordering::Release);
534
535 // However, we don't need a following `SeqCst` fence, because it is safe for memory
536 // accesses from the new epoch to be executed before updating the local epoch. At
537 // worse, other threads will see the new epoch late and delay GC slightly.
538 }
539 }
540 }
541
542 /// Increments the handle count.
543 #[inline]
acquire_handle(&self)544 pub fn acquire_handle(&self) {
545 let handle_count = self.handle_count.get();
546 debug_assert!(handle_count >= 1);
547 self.handle_count.set(handle_count + 1);
548 }
549
550 /// Decrements the handle count.
551 #[inline]
release_handle(&self)552 pub fn release_handle(&self) {
553 let guard_count = self.guard_count.get();
554 let handle_count = self.handle_count.get();
555 debug_assert!(handle_count >= 1);
556 self.handle_count.set(handle_count - 1);
557
558 if guard_count == 0 && handle_count == 1 {
559 self.finalize();
560 }
561 }
562
563 /// Removes the `Local` from the global linked list.
564 #[cold]
finalize(&self)565 fn finalize(&self) {
566 debug_assert_eq!(self.guard_count.get(), 0);
567 debug_assert_eq!(self.handle_count.get(), 0);
568
569 // Temporarily increment handle count. This is required so that the following call to `pin`
570 // doesn't call `finalize` again.
571 self.handle_count.set(1);
572 unsafe {
573 // Pin and move the local bag into the global queue. It's important that `push_bag`
574 // doesn't defer destruction on any new garbage.
575 let guard = &self.pin();
576 self.global().push_bag(&mut *self.bag.get(), guard);
577 }
578 // Revert the handle count back to zero.
579 self.handle_count.set(0);
580
581 unsafe {
582 // Take the reference to the `Global` out of this `Local`. Since we're not protected
583 // by a guard at this time, it's crucial that the reference is read before marking the
584 // `Local` as deleted.
585 let collector: Collector = ptr::read(&*(*self.collector.get()));
586
587 // Mark this node in the linked list as deleted.
588 self.entry.delete(&unprotected());
589
590 // Finally, drop the reference to the global. Note that this might be the last reference
591 // to the `Global`. If so, the global data will be destroyed and all deferred functions
592 // in its queue will be executed.
593 drop(collector);
594 }
595 }
596 }
597
598 impl IsElement<Local> for Local {
entry_of(local: &Local) -> &Entry599 fn entry_of(local: &Local) -> &Entry {
600 let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
601 unsafe { &*entry_ptr }
602 }
603
element_of(entry: &Entry) -> &Local604 unsafe fn element_of(entry: &Entry) -> &Local {
605 // offset_of! macro uses unsafe, but it's unnecessary in this context.
606 #[allow(unused_unsafe)]
607 let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
608 &*local_ptr
609 }
610
finalize(entry: &Entry, guard: &Guard)611 unsafe fn finalize(entry: &Entry, guard: &Guard) {
612 guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
613 }
614 }
615
616 #[cfg(test)]
617 mod tests {
618 use std::sync::atomic::{AtomicUsize, Ordering};
619
620 use super::*;
621
622 #[test]
check_defer()623 fn check_defer() {
624 static FLAG: AtomicUsize = AtomicUsize::new(0);
625 fn set() {
626 FLAG.store(42, Ordering::Relaxed);
627 }
628
629 let d = Deferred::new(set);
630 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
631 d.call();
632 assert_eq!(FLAG.load(Ordering::Relaxed), 42);
633 }
634
635 #[test]
check_bag()636 fn check_bag() {
637 static FLAG: AtomicUsize = AtomicUsize::new(0);
638 fn incr() {
639 FLAG.fetch_add(1, Ordering::Relaxed);
640 }
641
642 let mut bag = Bag::new();
643 assert!(bag.is_empty());
644
645 for _ in 0..MAX_OBJECTS {
646 assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
647 assert!(!bag.is_empty());
648 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
649 }
650
651 let result = unsafe { bag.try_push(Deferred::new(incr)) };
652 assert!(result.is_err());
653 assert!(!bag.is_empty());
654 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
655
656 drop(bag);
657 assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
658 }
659 }
660