1 //! The global data and participant for garbage collection.
2 //!
3 //! # Registration
4 //!
5 //! In order to track all participants in one place, we need some form of participant
6 //! registration. When a participant is created, it is registered to a global lock-free
7 //! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8 //! list.
9 //!
10 //! # Pinning
11 //!
12 //! Every participant contains an integer that tells whether the participant is pinned and if so,
13 //! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14 //! aids in periodic global epoch advancement.
15 //!
16 //! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17 //! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18 //!
19 //! # Thread-local bag
20 //!
21 //! Objects that get unlinked from concurrent data structures must be stashed away until the global
22 //! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23 //! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24 //! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25 //! for amortizing the synchronization cost of pushing the garbages to a global queue.
26 //!
27 //! # Global queue
28 //!
29 //! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30 //! destroyed along the way. This design reduces contention on data structures. The global queue
31 //! cannot be explicitly accessed: the only way to interact with it is by calling functions
32 //! `defer()` that adds an object tothe thread-local bag, or `collect()` that manually triggers
33 //! garbage collection.
34 //!
35 //! Ideally each instance of concurrent data structure may have its own queue that gets fully
36 //! destroyed as soon as the data structure gets dropped.
37 
38 use core::cell::{Cell, UnsafeCell};
39 use core::mem::{self, ManuallyDrop};
40 use core::num::Wrapping;
41 use core::sync::atomic;
42 use core::sync::atomic::Ordering;
43 use core::{fmt, ptr};
44 
45 use crossbeam_utils::CachePadded;
46 
47 use atomic::{Owned, Shared};
48 use collector::{Collector, LocalHandle};
49 use deferred::Deferred;
50 use epoch::{AtomicEpoch, Epoch};
51 use guard::{unprotected, Guard};
52 use sync::list::{Entry, IsElement, IterError, List};
53 use sync::queue::Queue;
54 
55 /// Maximum number of objects a bag can contain.
56 #[cfg(not(feature = "sanitize"))]
57 const MAX_OBJECTS: usize = 64;
58 #[cfg(feature = "sanitize")]
59 const MAX_OBJECTS: usize = 4;
60 
61 /// A bag of deferred functions.
62 pub struct Bag {
63     /// Stashed objects.
64     deferreds: [Deferred; MAX_OBJECTS],
65     len: usize,
66 }
67 
68 /// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
69 unsafe impl Send for Bag {}
70 
71 impl Bag {
72     /// Returns a new, empty bag.
new() -> Self73     pub fn new() -> Self {
74         Self::default()
75     }
76 
77     /// Returns `true` if the bag is empty.
is_empty(&self) -> bool78     pub fn is_empty(&self) -> bool {
79         self.len == 0
80     }
81 
82     /// Attempts to insert a deferred function into the bag.
83     ///
84     /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
85     /// full.
86     ///
87     /// # Safety
88     ///
89     /// It should be safe for another thread to execute the given function.
try_push(&mut self, deferred: Deferred) -> Result<(), Deferred>90     pub unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
91         if self.len < MAX_OBJECTS {
92             self.deferreds[self.len] = deferred;
93             self.len += 1;
94             Ok(())
95         } else {
96             Err(deferred)
97         }
98     }
99 
100     /// Seals the bag with the given epoch.
seal(self, epoch: Epoch) -> SealedBag101     fn seal(self, epoch: Epoch) -> SealedBag {
102         SealedBag { epoch, bag: self }
103     }
104 }
105 
106 impl Default for Bag {
107     // TODO(taiki-e): when the minimum supported Rust version is bumped to 1.31+,
108     // replace this with `#[rustfmt::skip]`.
109     #[cfg_attr(rustfmt, rustfmt_skip)]
default() -> Self110     fn default() -> Self {
111         // TODO: [no_op; MAX_OBJECTS] syntax blocked by https://github.com/rust-lang/rust/issues/49147
112         #[cfg(not(feature = "sanitize"))]
113         return Bag {
114             len: 0,
115             deferreds: [
116                 Deferred::new(no_op_func),
117                 Deferred::new(no_op_func),
118                 Deferred::new(no_op_func),
119                 Deferred::new(no_op_func),
120                 Deferred::new(no_op_func),
121                 Deferred::new(no_op_func),
122                 Deferred::new(no_op_func),
123                 Deferred::new(no_op_func),
124                 Deferred::new(no_op_func),
125                 Deferred::new(no_op_func),
126                 Deferred::new(no_op_func),
127                 Deferred::new(no_op_func),
128                 Deferred::new(no_op_func),
129                 Deferred::new(no_op_func),
130                 Deferred::new(no_op_func),
131                 Deferred::new(no_op_func),
132                 Deferred::new(no_op_func),
133                 Deferred::new(no_op_func),
134                 Deferred::new(no_op_func),
135                 Deferred::new(no_op_func),
136                 Deferred::new(no_op_func),
137                 Deferred::new(no_op_func),
138                 Deferred::new(no_op_func),
139                 Deferred::new(no_op_func),
140                 Deferred::new(no_op_func),
141                 Deferred::new(no_op_func),
142                 Deferred::new(no_op_func),
143                 Deferred::new(no_op_func),
144                 Deferred::new(no_op_func),
145                 Deferred::new(no_op_func),
146                 Deferred::new(no_op_func),
147                 Deferred::new(no_op_func),
148                 Deferred::new(no_op_func),
149                 Deferred::new(no_op_func),
150                 Deferred::new(no_op_func),
151                 Deferred::new(no_op_func),
152                 Deferred::new(no_op_func),
153                 Deferred::new(no_op_func),
154                 Deferred::new(no_op_func),
155                 Deferred::new(no_op_func),
156                 Deferred::new(no_op_func),
157                 Deferred::new(no_op_func),
158                 Deferred::new(no_op_func),
159                 Deferred::new(no_op_func),
160                 Deferred::new(no_op_func),
161                 Deferred::new(no_op_func),
162                 Deferred::new(no_op_func),
163                 Deferred::new(no_op_func),
164                 Deferred::new(no_op_func),
165                 Deferred::new(no_op_func),
166                 Deferred::new(no_op_func),
167                 Deferred::new(no_op_func),
168                 Deferred::new(no_op_func),
169                 Deferred::new(no_op_func),
170                 Deferred::new(no_op_func),
171                 Deferred::new(no_op_func),
172                 Deferred::new(no_op_func),
173                 Deferred::new(no_op_func),
174                 Deferred::new(no_op_func),
175                 Deferred::new(no_op_func),
176                 Deferred::new(no_op_func),
177                 Deferred::new(no_op_func),
178                 Deferred::new(no_op_func),
179                 Deferred::new(no_op_func),
180             ],
181         };
182         #[cfg(feature = "sanitize")]
183         return Bag {
184             len: 0,
185             deferreds: [
186                 Deferred::new(no_op_func),
187                 Deferred::new(no_op_func),
188                 Deferred::new(no_op_func),
189                 Deferred::new(no_op_func),
190             ],
191         };
192     }
193 }
194 
195 impl Drop for Bag {
drop(&mut self)196     fn drop(&mut self) {
197         // Call all deferred functions.
198         for deferred in &mut self.deferreds[..self.len] {
199             let no_op = Deferred::new(no_op_func);
200             let owned_deferred = mem::replace(deferred, no_op);
201             owned_deferred.call();
202         }
203     }
204 }
205 
206 // can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
207 impl fmt::Debug for Bag {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result208     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
209         f.debug_struct("Bag")
210             .field("deferreds", &&self.deferreds[..self.len])
211             .finish()
212     }
213 }
214 
no_op_func()215 fn no_op_func() {}
216 
217 /// A pair of an epoch and a bag.
218 #[derive(Default, Debug)]
219 struct SealedBag {
220     epoch: Epoch,
221     bag: Bag,
222 }
223 
224 /// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
225 unsafe impl Sync for SealedBag {}
226 
227 impl SealedBag {
228     /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
is_expired(&self, global_epoch: Epoch) -> bool229     fn is_expired(&self, global_epoch: Epoch) -> bool {
230         // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
231         // is within one epoch of the current one cannot be destroyed yet.
232         global_epoch.wrapping_sub(self.epoch) >= 2
233     }
234 }
235 
236 /// The global data for a garbage collector.
237 pub struct Global {
238     /// The intrusive linked list of `Local`s.
239     locals: List<Local>,
240 
241     /// The global queue of bags of deferred functions.
242     queue: Queue<SealedBag>,
243 
244     /// The global epoch.
245     pub(crate) epoch: CachePadded<AtomicEpoch>,
246 }
247 
248 impl Global {
249     /// Number of bags to destroy.
250     const COLLECT_STEPS: usize = 8;
251 
252     /// Creates a new global data for garbage collection.
253     #[inline]
new() -> Self254     pub fn new() -> Self {
255         Self {
256             locals: List::new(),
257             queue: Queue::new(),
258             epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
259         }
260     }
261 
262     /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
push_bag(&self, bag: &mut Bag, guard: &Guard)263     pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
264         let bag = mem::replace(bag, Bag::new());
265 
266         atomic::fence(Ordering::SeqCst);
267 
268         let epoch = self.epoch.load(Ordering::Relaxed);
269         self.queue.push(bag.seal(epoch), guard);
270     }
271 
272     /// Collects several bags from the global queue and executes deferred functions in them.
273     ///
274     /// Note: This may itself produce garbage and in turn allocate new bags.
275     ///
276     /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
277     /// path. In other words, we want the compiler to optimize branching for the case when
278     /// `collect()` is not called.
279     #[cold]
collect(&self, guard: &Guard)280     pub fn collect(&self, guard: &Guard) {
281         let global_epoch = self.try_advance(guard);
282 
283         let steps = if cfg!(feature = "sanitize") {
284             usize::max_value()
285         } else {
286             Self::COLLECT_STEPS
287         };
288 
289         for _ in 0..steps {
290             match self.queue.try_pop_if(
291                 &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
292                 guard,
293             ) {
294                 None => break,
295                 Some(sealed_bag) => drop(sealed_bag),
296             }
297         }
298     }
299 
300     /// Attempts to advance the global epoch.
301     ///
302     /// The global epoch can advance only if all currently pinned participants have been pinned in
303     /// the current epoch.
304     ///
305     /// Returns the current global epoch.
306     ///
307     /// `try_advance()` is annotated `#[cold]` because it is rarely called.
308     #[cold]
try_advance(&self, guard: &Guard) -> Epoch309     pub fn try_advance(&self, guard: &Guard) -> Epoch {
310         let global_epoch = self.epoch.load(Ordering::Relaxed);
311         atomic::fence(Ordering::SeqCst);
312 
313         // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
314         // easy to implement in a lock-free manner. However, traversal can be slow due to cache
315         // misses and data dependencies. We should experiment with other data structures as well.
316         for local in self.locals.iter(&guard) {
317             match local {
318                 Err(IterError::Stalled) => {
319                     // A concurrent thread stalled this iteration. That thread might also try to
320                     // advance the epoch, in which case we leave the job to it. Otherwise, the
321                     // epoch will not be advanced.
322                     return global_epoch;
323                 }
324                 Ok(local) => {
325                     let local_epoch = local.epoch.load(Ordering::Relaxed);
326 
327                     // If the participant was pinned in a different epoch, we cannot advance the
328                     // global epoch just yet.
329                     if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
330                         return global_epoch;
331                     }
332                 }
333             }
334         }
335         atomic::fence(Ordering::Acquire);
336 
337         // All pinned participants were pinned in the current global epoch.
338         // Now let's advance the global epoch...
339         //
340         // Note that if another thread already advanced it before us, this store will simply
341         // overwrite the global epoch with the same value. This is true because `try_advance` was
342         // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
343         // advanced two steps ahead of it.
344         let new_epoch = global_epoch.successor();
345         self.epoch.store(new_epoch, Ordering::Release);
346         new_epoch
347     }
348 }
349 
350 /// Participant for garbage collection.
351 pub struct Local {
352     /// A node in the intrusive linked list of `Local`s.
353     entry: Entry,
354 
355     /// The local epoch.
356     epoch: AtomicEpoch,
357 
358     /// A reference to the global data.
359     ///
360     /// When all guards and handles get dropped, this reference is destroyed.
361     collector: UnsafeCell<ManuallyDrop<Collector>>,
362 
363     /// The local bag of deferred functions.
364     pub(crate) bag: UnsafeCell<Bag>,
365 
366     /// The number of guards keeping this participant pinned.
367     guard_count: Cell<usize>,
368 
369     /// The number of active handles.
370     handle_count: Cell<usize>,
371 
372     /// Total number of pinnings performed.
373     ///
374     /// This is just an auxilliary counter that sometimes kicks off collection.
375     pin_count: Cell<Wrapping<usize>>,
376 }
377 
378 impl Local {
379     /// Number of pinnings after which a participant will execute some deferred functions from the
380     /// global queue.
381     const PINNINGS_BETWEEN_COLLECT: usize = 128;
382 
383     /// Registers a new `Local` in the provided `Global`.
register(collector: &Collector) -> LocalHandle384     pub fn register(collector: &Collector) -> LocalHandle {
385         unsafe {
386             // Since we dereference no pointers in this block, it is safe to use `unprotected`.
387 
388             let local = Owned::new(Local {
389                 entry: Entry::default(),
390                 epoch: AtomicEpoch::new(Epoch::starting()),
391                 collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
392                 bag: UnsafeCell::new(Bag::new()),
393                 guard_count: Cell::new(0),
394                 handle_count: Cell::new(1),
395                 pin_count: Cell::new(Wrapping(0)),
396             })
397             .into_shared(&unprotected());
398             collector.global.locals.insert(local, &unprotected());
399             LocalHandle {
400                 local: local.as_raw(),
401             }
402         }
403     }
404 
405     /// Returns a reference to the `Global` in which this `Local` resides.
406     #[inline]
global(&self) -> &Global407     pub fn global(&self) -> &Global {
408         &self.collector().global
409     }
410 
411     /// Returns a reference to the `Collector` in which this `Local` resides.
412     #[inline]
collector(&self) -> &Collector413     pub fn collector(&self) -> &Collector {
414         unsafe { &**self.collector.get() }
415     }
416 
417     /// Returns `true` if the current participant is pinned.
418     #[inline]
is_pinned(&self) -> bool419     pub fn is_pinned(&self) -> bool {
420         self.guard_count.get() > 0
421     }
422 
423     /// Adds `deferred` to the thread-local bag.
424     ///
425     /// # Safety
426     ///
427     /// It should be safe for another thread to execute the given function.
defer(&self, mut deferred: Deferred, guard: &Guard)428     pub unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
429         let bag = &mut *self.bag.get();
430 
431         while let Err(d) = bag.try_push(deferred) {
432             self.global().push_bag(bag, guard);
433             deferred = d;
434         }
435     }
436 
flush(&self, guard: &Guard)437     pub fn flush(&self, guard: &Guard) {
438         let bag = unsafe { &mut *self.bag.get() };
439 
440         if !bag.is_empty() {
441             self.global().push_bag(bag, guard);
442         }
443 
444         self.global().collect(guard);
445     }
446 
447     /// Pins the `Local`.
448     #[inline]
pin(&self) -> Guard449     pub fn pin(&self) -> Guard {
450         let guard = Guard { local: self };
451 
452         let guard_count = self.guard_count.get();
453         self.guard_count.set(guard_count.checked_add(1).unwrap());
454 
455         if guard_count == 0 {
456             let global_epoch = self.global().epoch.load(Ordering::Relaxed);
457             let new_epoch = global_epoch.pinned();
458 
459             // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
460             // The fence makes sure that any future loads from `Atomic`s will not happen before
461             // this store.
462             if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
463                 // HACK(stjepang): On x86 architectures there are two different ways of executing
464                 // a `SeqCst` fence.
465                 //
466                 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
467                 // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg`
468                 //    instruction.
469                 //
470                 // Both instructions have the effect of a full barrier, but benchmarks have shown
471                 // that the second one makes pinning faster in this particular case.  It is not
472                 // clear that this is permitted by the C++ memory model (SC fences work very
473                 // differently from SC accesses), but experimental evidence suggests that this
474                 // works fine.  Using inline assembly would be a viable (and correct) alternative,
475                 // but alas, that is not possible on stable Rust.
476                 let current = Epoch::starting();
477                 let previous = self
478                     .epoch
479                     .compare_and_swap(current, new_epoch, Ordering::SeqCst);
480                 debug_assert_eq!(current, previous, "participant was expected to be unpinned");
481                 // We add a compiler fence to make it less likely for LLVM to do something wrong
482                 // here.  Formally, this is not enough to get rid of data races; practically,
483                 // it should go a long way.
484                 atomic::compiler_fence(Ordering::SeqCst);
485             } else {
486                 self.epoch.store(new_epoch, Ordering::Relaxed);
487                 atomic::fence(Ordering::SeqCst);
488             }
489 
490             // Increment the pin counter.
491             let count = self.pin_count.get();
492             self.pin_count.set(count + Wrapping(1));
493 
494             // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
495             // some garbage.
496             if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
497                 self.global().collect(&guard);
498             }
499         }
500 
501         guard
502     }
503 
504     /// Unpins the `Local`.
505     #[inline]
unpin(&self)506     pub fn unpin(&self) {
507         let guard_count = self.guard_count.get();
508         self.guard_count.set(guard_count - 1);
509 
510         if guard_count == 1 {
511             self.epoch.store(Epoch::starting(), Ordering::Release);
512 
513             if self.handle_count.get() == 0 {
514                 self.finalize();
515             }
516         }
517     }
518 
519     /// Unpins and then pins the `Local`.
520     #[inline]
repin(&self)521     pub fn repin(&self) {
522         let guard_count = self.guard_count.get();
523 
524         // Update the local epoch only if there's only one guard.
525         if guard_count == 1 {
526             let epoch = self.epoch.load(Ordering::Relaxed);
527             let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
528 
529             // Update the local epoch only if the global epoch is greater than the local epoch.
530             if epoch != global_epoch {
531                 // We store the new epoch with `Release` because we need to ensure any memory
532                 // accesses from the previous epoch do not leak into the new one.
533                 self.epoch.store(global_epoch, Ordering::Release);
534 
535                 // However, we don't need a following `SeqCst` fence, because it is safe for memory
536                 // accesses from the new epoch to be executed before updating the local epoch. At
537                 // worse, other threads will see the new epoch late and delay GC slightly.
538             }
539         }
540     }
541 
542     /// Increments the handle count.
543     #[inline]
acquire_handle(&self)544     pub fn acquire_handle(&self) {
545         let handle_count = self.handle_count.get();
546         debug_assert!(handle_count >= 1);
547         self.handle_count.set(handle_count + 1);
548     }
549 
550     /// Decrements the handle count.
551     #[inline]
release_handle(&self)552     pub fn release_handle(&self) {
553         let guard_count = self.guard_count.get();
554         let handle_count = self.handle_count.get();
555         debug_assert!(handle_count >= 1);
556         self.handle_count.set(handle_count - 1);
557 
558         if guard_count == 0 && handle_count == 1 {
559             self.finalize();
560         }
561     }
562 
563     /// Removes the `Local` from the global linked list.
564     #[cold]
finalize(&self)565     fn finalize(&self) {
566         debug_assert_eq!(self.guard_count.get(), 0);
567         debug_assert_eq!(self.handle_count.get(), 0);
568 
569         // Temporarily increment handle count. This is required so that the following call to `pin`
570         // doesn't call `finalize` again.
571         self.handle_count.set(1);
572         unsafe {
573             // Pin and move the local bag into the global queue. It's important that `push_bag`
574             // doesn't defer destruction on any new garbage.
575             let guard = &self.pin();
576             self.global().push_bag(&mut *self.bag.get(), guard);
577         }
578         // Revert the handle count back to zero.
579         self.handle_count.set(0);
580 
581         unsafe {
582             // Take the reference to the `Global` out of this `Local`. Since we're not protected
583             // by a guard at this time, it's crucial that the reference is read before marking the
584             // `Local` as deleted.
585             let collector: Collector = ptr::read(&*(*self.collector.get()));
586 
587             // Mark this node in the linked list as deleted.
588             self.entry.delete(&unprotected());
589 
590             // Finally, drop the reference to the global. Note that this might be the last reference
591             // to the `Global`. If so, the global data will be destroyed and all deferred functions
592             // in its queue will be executed.
593             drop(collector);
594         }
595     }
596 }
597 
598 impl IsElement<Local> for Local {
entry_of(local: &Local) -> &Entry599     fn entry_of(local: &Local) -> &Entry {
600         let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
601         unsafe { &*entry_ptr }
602     }
603 
element_of(entry: &Entry) -> &Local604     unsafe fn element_of(entry: &Entry) -> &Local {
605         // offset_of! macro uses unsafe, but it's unnecessary in this context.
606         #[allow(unused_unsafe)]
607         let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
608         &*local_ptr
609     }
610 
finalize(entry: &Entry, guard: &Guard)611     unsafe fn finalize(entry: &Entry, guard: &Guard) {
612         guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
613     }
614 }
615 
616 #[cfg(test)]
617 mod tests {
618     use std::sync::atomic::{AtomicUsize, Ordering};
619 
620     use super::*;
621 
622     #[test]
check_defer()623     fn check_defer() {
624         static FLAG: AtomicUsize = AtomicUsize::new(0);
625         fn set() {
626             FLAG.store(42, Ordering::Relaxed);
627         }
628 
629         let d = Deferred::new(set);
630         assert_eq!(FLAG.load(Ordering::Relaxed), 0);
631         d.call();
632         assert_eq!(FLAG.load(Ordering::Relaxed), 42);
633     }
634 
635     #[test]
check_bag()636     fn check_bag() {
637         static FLAG: AtomicUsize = AtomicUsize::new(0);
638         fn incr() {
639             FLAG.fetch_add(1, Ordering::Relaxed);
640         }
641 
642         let mut bag = Bag::new();
643         assert!(bag.is_empty());
644 
645         for _ in 0..MAX_OBJECTS {
646             assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
647             assert!(!bag.is_empty());
648             assert_eq!(FLAG.load(Ordering::Relaxed), 0);
649         }
650 
651         let result = unsafe { bag.try_push(Deferred::new(incr)) };
652         assert!(result.is_err());
653         assert!(!bag.is_empty());
654         assert_eq!(FLAG.load(Ordering::Relaxed), 0);
655 
656         drop(bag);
657         assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
658     }
659 }
660