1 use crate::primitive::sync::atomic;
2 use core::cell::Cell;
3 use core::fmt;
4 
5 const SPIN_LIMIT: u32 = 6;
6 const YIELD_LIMIT: u32 = 10;
7 
8 /// Performs exponential backoff in spin loops.
9 ///
10 /// Backing off in spin loops reduces contention and improves overall performance.
11 ///
12 /// This primitive can execute *YIELD* and *PAUSE* instructions, yield the current thread to the OS
13 /// scheduler, and tell when is a good time to block the thread using a different synchronization
14 /// mechanism. Each step of the back off procedure takes roughly twice as long as the previous
15 /// step.
16 ///
17 /// # Examples
18 ///
19 /// Backing off in a lock-free loop:
20 ///
21 /// ```
22 /// use crossbeam_utils::Backoff;
23 /// use std::sync::atomic::AtomicUsize;
24 /// use std::sync::atomic::Ordering::SeqCst;
25 ///
26 /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize {
27 ///     let backoff = Backoff::new();
28 ///     loop {
29 ///         let val = a.load(SeqCst);
30 ///         if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() {
31 ///             return val;
32 ///         }
33 ///         backoff.spin();
34 ///     }
35 /// }
36 /// ```
37 ///
38 /// Waiting for an [`AtomicBool`] to become `true`:
39 ///
40 /// ```
41 /// use crossbeam_utils::Backoff;
42 /// use std::sync::atomic::AtomicBool;
43 /// use std::sync::atomic::Ordering::SeqCst;
44 ///
45 /// fn spin_wait(ready: &AtomicBool) {
46 ///     let backoff = Backoff::new();
47 ///     while !ready.load(SeqCst) {
48 ///         backoff.snooze();
49 ///     }
50 /// }
51 /// ```
52 ///
53 /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait.
54 /// Note that whoever sets the atomic variable to `true` must notify the parked thread by calling
55 /// [`unpark()`]:
56 ///
57 /// ```
58 /// use crossbeam_utils::Backoff;
59 /// use std::sync::atomic::AtomicBool;
60 /// use std::sync::atomic::Ordering::SeqCst;
61 /// use std::thread;
62 ///
63 /// fn blocking_wait(ready: &AtomicBool) {
64 ///     let backoff = Backoff::new();
65 ///     while !ready.load(SeqCst) {
66 ///         if backoff.is_completed() {
67 ///             thread::park();
68 ///         } else {
69 ///             backoff.snooze();
70 ///         }
71 ///     }
72 /// }
73 /// ```
74 ///
75 /// [`is_completed`]: Backoff::is_completed
76 /// [`std::thread::park()`]: std::thread::park
77 /// [`Condvar`]: std::sync::Condvar
78 /// [`AtomicBool`]: std::sync::atomic::AtomicBool
79 /// [`unpark()`]: std::thread::Thread::unpark
80 pub struct Backoff {
81     step: Cell<u32>,
82 }
83 
84 impl Backoff {
85     /// Creates a new `Backoff`.
86     ///
87     /// # Examples
88     ///
89     /// ```
90     /// use crossbeam_utils::Backoff;
91     ///
92     /// let backoff = Backoff::new();
93     /// ```
94     #[inline]
new() -> Self95     pub fn new() -> Self {
96         Backoff { step: Cell::new(0) }
97     }
98 
99     /// Resets the `Backoff`.
100     ///
101     /// # Examples
102     ///
103     /// ```
104     /// use crossbeam_utils::Backoff;
105     ///
106     /// let backoff = Backoff::new();
107     /// backoff.reset();
108     /// ```
109     #[inline]
reset(&self)110     pub fn reset(&self) {
111         self.step.set(0);
112     }
113 
114     /// Backs off in a lock-free loop.
115     ///
116     /// This method should be used when we need to retry an operation because another thread made
117     /// progress.
118     ///
119     /// The processor may yield using the *YIELD* or *PAUSE* instruction.
120     ///
121     /// # Examples
122     ///
123     /// Backing off in a lock-free loop:
124     ///
125     /// ```
126     /// use crossbeam_utils::Backoff;
127     /// use std::sync::atomic::AtomicUsize;
128     /// use std::sync::atomic::Ordering::SeqCst;
129     ///
130     /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize {
131     ///     let backoff = Backoff::new();
132     ///     loop {
133     ///         let val = a.load(SeqCst);
134     ///         if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() {
135     ///             return val;
136     ///         }
137     ///         backoff.spin();
138     ///     }
139     /// }
140     ///
141     /// let a = AtomicUsize::new(7);
142     /// assert_eq!(fetch_mul(&a, 8), 7);
143     /// assert_eq!(a.load(SeqCst), 56);
144     /// ```
145     #[inline]
spin(&self)146     pub fn spin(&self) {
147         for _ in 0..1 << self.step.get().min(SPIN_LIMIT) {
148             // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
149             // use [`core::hint::spin_loop`] instead.
150             #[allow(deprecated)]
151             atomic::spin_loop_hint();
152         }
153 
154         if self.step.get() <= SPIN_LIMIT {
155             self.step.set(self.step.get() + 1);
156         }
157     }
158 
159     /// Backs off in a blocking loop.
160     ///
161     /// This method should be used when we need to wait for another thread to make progress.
162     ///
163     /// The processor may yield using the *YIELD* or *PAUSE* instruction and the current thread
164     /// may yield by giving up a timeslice to the OS scheduler.
165     ///
166     /// In `#[no_std]` environments, this method is equivalent to [`spin`].
167     ///
168     /// If possible, use [`is_completed`] to check when it is advised to stop using backoff and
169     /// block the current thread using a different synchronization mechanism instead.
170     ///
171     /// [`spin`]: Backoff::spin
172     /// [`is_completed`]: Backoff::is_completed
173     ///
174     /// # Examples
175     ///
176     /// Waiting for an [`AtomicBool`] to become `true`:
177     ///
178     /// ```
179     /// use crossbeam_utils::Backoff;
180     /// use std::sync::Arc;
181     /// use std::sync::atomic::AtomicBool;
182     /// use std::sync::atomic::Ordering::SeqCst;
183     /// use std::thread;
184     /// use std::time::Duration;
185     ///
186     /// fn spin_wait(ready: &AtomicBool) {
187     ///     let backoff = Backoff::new();
188     ///     while !ready.load(SeqCst) {
189     ///         backoff.snooze();
190     ///     }
191     /// }
192     ///
193     /// let ready = Arc::new(AtomicBool::new(false));
194     /// let ready2 = ready.clone();
195     ///
196     /// thread::spawn(move || {
197     ///     thread::sleep(Duration::from_millis(100));
198     ///     ready2.store(true, SeqCst);
199     /// });
200     ///
201     /// assert_eq!(ready.load(SeqCst), false);
202     /// spin_wait(&ready);
203     /// assert_eq!(ready.load(SeqCst), true);
204     /// ```
205     ///
206     /// [`AtomicBool`]: std::sync::atomic::AtomicBool
207     #[inline]
snooze(&self)208     pub fn snooze(&self) {
209         if self.step.get() <= SPIN_LIMIT {
210             for _ in 0..1 << self.step.get() {
211                 // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
212                 // use [`core::hint::spin_loop`] instead.
213                 #[allow(deprecated)]
214                 atomic::spin_loop_hint();
215             }
216         } else {
217             #[cfg(not(feature = "std"))]
218             for _ in 0..1 << self.step.get() {
219                 // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
220                 // use [`core::hint::spin_loop`] instead.
221                 #[allow(deprecated)]
222                 atomic::spin_loop_hint();
223             }
224 
225             #[cfg(feature = "std")]
226             ::std::thread::yield_now();
227         }
228 
229         if self.step.get() <= YIELD_LIMIT {
230             self.step.set(self.step.get() + 1);
231         }
232     }
233 
234     /// Returns `true` if exponential backoff has completed and blocking the thread is advised.
235     ///
236     /// # Examples
237     ///
238     /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait:
239     ///
240     /// ```
241     /// use crossbeam_utils::Backoff;
242     /// use std::sync::Arc;
243     /// use std::sync::atomic::AtomicBool;
244     /// use std::sync::atomic::Ordering::SeqCst;
245     /// use std::thread;
246     /// use std::time::Duration;
247     ///
248     /// fn blocking_wait(ready: &AtomicBool) {
249     ///     let backoff = Backoff::new();
250     ///     while !ready.load(SeqCst) {
251     ///         if backoff.is_completed() {
252     ///             thread::park();
253     ///         } else {
254     ///             backoff.snooze();
255     ///         }
256     ///     }
257     /// }
258     ///
259     /// let ready = Arc::new(AtomicBool::new(false));
260     /// let ready2 = ready.clone();
261     /// let waiter = thread::current();
262     ///
263     /// thread::spawn(move || {
264     ///     thread::sleep(Duration::from_millis(100));
265     ///     ready2.store(true, SeqCst);
266     ///     waiter.unpark();
267     /// });
268     ///
269     /// assert_eq!(ready.load(SeqCst), false);
270     /// blocking_wait(&ready);
271     /// assert_eq!(ready.load(SeqCst), true);
272     /// ```
273     ///
274     /// [`AtomicBool`]: std::sync::atomic::AtomicBool
275     #[inline]
is_completed(&self) -> bool276     pub fn is_completed(&self) -> bool {
277         self.step.get() > YIELD_LIMIT
278     }
279 }
280 
281 impl fmt::Debug for Backoff {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result282     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283         f.debug_struct("Backoff")
284             .field("step", &self.step)
285             .field("is_completed", &self.is_completed())
286             .finish()
287     }
288 }
289 
290 impl Default for Backoff {
default() -> Backoff291     fn default() -> Backoff {
292         Backoff::new()
293     }
294 }
295