1 use core::cell::UnsafeCell;
2 use core::fmt;
3 use core::sync::atomic::AtomicUsize;
4 use core::sync::atomic::Ordering::{Acquire, Release, AcqRel};
5 use core::task::Waker;
6 
7 /// A synchronization primitive for task wakeup.
8 ///
9 /// Sometimes the task interested in a given event will change over time.
10 /// An `AtomicWaker` can coordinate concurrent notifications with the consumer
11 /// potentially "updating" the underlying task to wake up. This is useful in
12 /// scenarios where a computation completes in another thread and wants to
13 /// notify the consumer, but the consumer is in the process of being migrated to
14 /// a new logical task.
15 ///
16 /// Consumers should call `register` before checking the result of a computation
17 /// and producers should call `wake` after producing the computation (this
18 /// differs from the usual `thread::park` pattern). It is also permitted for
19 /// `wake` to be called **before** `register`. This results in a no-op.
20 ///
21 /// A single `AtomicWaker` may be reused for any number of calls to `register` or
22 /// `wake`.
23 ///
24 /// # Memory ordering
25 ///
26 /// Calling `register` "acquires" all memory "released" by calls to `wake`
27 /// before the call to `register`.  Later calls to `wake` will wake the
28 /// registered waker (on contention this wake might be triggered in `register`).
29 ///
30 /// For concurrent calls to `register` (should be avoided) the ordering is only
31 /// guaranteed for the winning call.
32 ///
33 /// # Examples
34 ///
35 /// Here is a simple example providing a `Flag` that can be signalled manually
36 /// when it is ready.
37 ///
38 /// ```
39 /// use futures::future::Future;
40 /// use futures::task::{Context, Poll, AtomicWaker};
41 /// use std::sync::Arc;
42 /// use std::sync::atomic::AtomicBool;
43 /// use std::sync::atomic::Ordering::Relaxed;
44 /// use std::pin::Pin;
45 ///
46 /// struct Inner {
47 ///     waker: AtomicWaker,
48 ///     set: AtomicBool,
49 /// }
50 ///
51 /// #[derive(Clone)]
52 /// struct Flag(Arc<Inner>);
53 ///
54 /// impl Flag {
55 ///     pub fn new() -> Self {
56 ///         Self(Arc::new(Inner {
57 ///             waker: AtomicWaker::new(),
58 ///             set: AtomicBool::new(false),
59 ///         }))
60 ///     }
61 ///
62 ///     pub fn signal(&self) {
63 ///         self.0.set.store(true, Relaxed);
64 ///         self.0.waker.wake();
65 ///     }
66 /// }
67 ///
68 /// impl Future for Flag {
69 ///     type Output = ();
70 ///
71 ///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
72 ///         // quick check to avoid registration if already done.
73 ///         if self.0.set.load(Relaxed) {
74 ///             return Poll::Ready(());
75 ///         }
76 ///
77 ///         self.0.waker.register(cx.waker());
78 ///
79 ///         // Need to check condition **after** `register` to avoid a race
80 ///         // condition that would result in lost notifications.
81 ///         if self.0.set.load(Relaxed) {
82 ///             Poll::Ready(())
83 ///         } else {
84 ///             Poll::Pending
85 ///         }
86 ///     }
87 /// }
88 /// ```
89 pub struct AtomicWaker {
90     state: AtomicUsize,
91     waker: UnsafeCell<Option<Waker>>,
92 }
93 
94 // `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell
95 // stores a `Waker` value produced by calls to `register` and many threads can
96 // race to take the waker (to wake it) by calling `wake`.
97 //
98 // If a new `Waker` instance is produced by calling `register` before an
99 // existing one is consumed, then the existing one is overwritten.
100 //
101 // While `AtomicWaker` is single-producer, the implementation ensures memory
102 // safety. In the event of concurrent calls to `register`, there will be a
103 // single winner whose waker will get stored in the cell. The losers will not
104 // have their tasks woken. As such, callers should ensure to add synchronization
105 // to calls to `register`.
106 //
107 // The implementation uses a single `AtomicUsize` value to coordinate access to
108 // the `Waker` cell. There are two bits that are operated on independently.
109 // These are represented by `REGISTERING` and `WAKING`.
110 //
111 // The `REGISTERING` bit is set when a producer enters the critical section. The
112 // `WAKING` bit is set when a consumer enters the critical section. Neither bit
113 // being set is represented by `WAITING`.
114 //
115 // A thread obtains an exclusive lock on the waker cell by transitioning the
116 // state from `WAITING` to `REGISTERING` or `WAKING`, depending on the operation
117 // the thread wishes to perform. When this transition is made, it is guaranteed
118 // that no other thread will access the waker cell.
119 //
120 // # Registering
121 //
122 // On a call to `register`, an attempt to transition the state from WAITING to
123 // REGISTERING is made. On success, the caller obtains a lock on the waker cell.
124 //
125 // If the lock is obtained, then the thread sets the waker cell to the waker
126 // provided as an argument. Then it attempts to transition the state back from
127 // `REGISTERING` -> `WAITING`.
128 //
129 // If this transition is successful, then the registering process is complete
130 // and the next call to `wake` will observe the waker.
131 //
132 // If the transition fails, then there was a concurrent call to `wake` that was
133 // unable to access the waker cell (due to the registering thread holding the
134 // lock). To handle this, the registering thread removes the waker it just set
135 // from the cell and calls `wake` on it. This call to wake represents the
136 // attempt to wake by the other thread (that set the `WAKING` bit). The state is
137 // then transitioned from `REGISTERING | WAKING` back to `WAITING`.  This
138 // transition must succeed because, at this point, the state cannot be
139 // transitioned by another thread.
140 //
141 // # Waking
142 //
143 // On a call to `wake`, an attempt to transition the state from `WAITING` to
144 // `WAKING` is made. On success, the caller obtains a lock on the waker cell.
145 //
146 // If the lock is obtained, then the thread takes ownership of the current value
147 // in the waker cell, and calls `wake` on it. The state is then transitioned
148 // back to `WAITING`. This transition must succeed as, at this point, the state
149 // cannot be transitioned by another thread.
150 //
151 // If the thread is unable to obtain the lock, the `WAKING` bit is still.  This
152 // is because it has either been set by the current thread but the previous
153 // value included the `REGISTERING` bit **or** a concurrent thread is in the
154 // `WAKING` critical section. Either way, no action must be taken.
155 //
156 // If the current thread is the only concurrent call to `wake` and another
157 // thread is in the `register` critical section, when the other thread **exits**
158 // the `register` critical section, it will observe the `WAKING` bit and handle
159 // the wake itself.
160 //
161 // If another thread is in the `wake` critical section, then it will handle
162 // waking the task.
163 //
164 // # A potential race (is safely handled).
165 //
166 // Imagine the following situation:
167 //
168 // * Thread A obtains the `wake` lock and wakes a task.
169 //
170 // * Before thread A releases the `wake` lock, the woken task is scheduled.
171 //
172 // * Thread B attempts to wake the task. In theory this should result in the
173 //   task being woken, but it cannot because thread A still holds the wake lock.
174 //
175 // This case is handled by requiring users of `AtomicWaker` to call `register`
176 // **before** attempting to observe the application state change that resulted
177 // in the task being awoken. The wakers also change the application state before
178 // calling wake.
179 //
180 // Because of this, the waker will do one of two things.
181 //
182 // 1) Observe the application state change that Thread B is woken for. In this
183 //    case, it is OK for Thread B's wake to be lost.
184 //
185 // 2) Call register before attempting to observe the application state. Since
186 //    Thread A still holds the `wake` lock, the call to `register` will result
187 //    in the task waking itself and get scheduled again.
188 
189 /// Idle state
190 const WAITING: usize = 0;
191 
192 /// A new waker value is being registered with the `AtomicWaker` cell.
193 const REGISTERING: usize = 0b01;
194 
195 /// The waker currently registered with the `AtomicWaker` cell is being woken.
196 const WAKING: usize = 0b10;
197 
198 impl AtomicWaker {
199     /// Create an `AtomicWaker`.
new() -> Self200     pub const fn new() -> Self {
201         // Make sure that task is Sync
202         trait AssertSync: Sync {}
203         impl AssertSync for Waker {}
204 
205         Self {
206             state: AtomicUsize::new(WAITING),
207             waker: UnsafeCell::new(None),
208         }
209     }
210 
211     /// Registers the waker to be notified on calls to `wake`.
212     ///
213     /// The new task will take place of any previous tasks that were registered
214     /// by previous calls to `register`. Any calls to `wake` that happen after
215     /// a call to `register` (as defined by the memory ordering rules), will
216     /// notify the `register` caller's task and deregister the waker from future
217     /// notifications. Because of this, callers should ensure `register` gets
218     /// invoked with a new `Waker` **each** time they require a wakeup.
219     ///
220     /// It is safe to call `register` with multiple other threads concurrently
221     /// calling `wake`. This will result in the `register` caller's current
222     /// task being notified once.
223     ///
224     /// This function is safe to call concurrently, but this is generally a bad
225     /// idea. Concurrent calls to `register` will attempt to register different
226     /// tasks to be notified. One of the callers will win and have its task set,
227     /// but there is no guarantee as to which caller will succeed.
228     ///
229     /// # Examples
230     ///
231     /// Here is how `register` is used when implementing a flag.
232     ///
233     /// ```
234     /// use futures::future::Future;
235     /// use futures::task::{Context, Poll, AtomicWaker};
236     /// use std::sync::atomic::AtomicBool;
237     /// use std::sync::atomic::Ordering::Relaxed;
238     /// use std::pin::Pin;
239     ///
240     /// struct Flag {
241     ///     waker: AtomicWaker,
242     ///     set: AtomicBool,
243     /// }
244     ///
245     /// impl Future for Flag {
246     ///     type Output = ();
247     ///
248     ///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
249     ///         // Register **before** checking `set` to avoid a race condition
250     ///         // that would result in lost notifications.
251     ///         self.waker.register(cx.waker());
252     ///
253     ///         if self.set.load(Relaxed) {
254     ///             Poll::Ready(())
255     ///         } else {
256     ///             Poll::Pending
257     ///         }
258     ///     }
259     /// }
260     /// ```
register(&self, waker: &Waker)261     pub fn register(&self, waker: &Waker) {
262         match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
263             WAITING => {
264                 unsafe {
265                     // Locked acquired, update the waker cell
266                     *self.waker.get() = Some(waker.clone());
267 
268                     // Release the lock. If the state transitioned to include
269                     // the `WAKING` bit, this means that at least one wake has
270                     // been called concurrently.
271                     //
272                     // Start by assuming that the state is `REGISTERING` as this
273                     // is what we just set it to. If this holds, we know that no
274                     // other writes were performed in the meantime, so there is
275                     // nothing to acquire, only release. In case of concurrent
276                     // wakers, we need to acquire their releases, so success needs
277                     // to do both.
278                     let res = self.state.compare_exchange(
279                         REGISTERING, WAITING, AcqRel, Acquire);
280 
281                     match res {
282                         Ok(_) => {
283                             // memory ordering: acquired self.state during CAS
284                             // - if previous wakes went through it syncs with
285                             //   their final release (`fetch_and`)
286                             // - if there was no previous wake the next wake
287                             //   will wake us, no sync needed.
288                         }
289                         Err(actual) => {
290                             // This branch can only be reached if at least one
291                             // concurrent thread called `wake`. In this
292                             // case, `actual` **must** be `REGISTERING |
293                             // `WAKING`.
294                             debug_assert_eq!(actual, REGISTERING | WAKING);
295 
296                             // Take the waker to wake once the atomic operation has
297                             // completed.
298                             let waker = (*self.waker.get()).take().unwrap();
299 
300                             // We need to return to WAITING state (clear our lock and
301                             // concurrent WAKING flag). This needs to acquire all
302                             // WAKING fetch_or releases and it needs to release our
303                             // update to self.waker, so we need a `swap` operation.
304                             self.state.swap(WAITING, AcqRel);
305 
306                             // memory ordering: we acquired the state for all
307                             // concurrent wakes, but future wakes might still
308                             // need to wake us in case we can't make progress
309                             // from the pending wakes.
310                             //
311                             // So we simply schedule to come back later (we could
312                             // also simply leave the registration in place above).
313                             waker.wake();
314                         }
315                     }
316                 }
317             }
318             WAKING => {
319                 // Currently in the process of waking the task, i.e.,
320                 // `wake` is currently being called on the old task handle.
321                 //
322                 // memory ordering: we acquired the state for all
323                 // concurrent wakes, but future wakes might still
324                 // need to wake us in case we can't make progress
325                 // from the pending wakes.
326                 //
327                 // So we simply schedule to come back later (we
328                 // could also spin here trying to acquire the lock
329                 // to register).
330                 waker.wake_by_ref();
331             }
332             state => {
333                 // In this case, a concurrent thread is holding the
334                 // "registering" lock. This probably indicates a bug in the
335                 // caller's code as racing to call `register` doesn't make much
336                 // sense.
337                 //
338                 // memory ordering: don't care. a concurrent register() is going
339                 // to succeed and provide proper memory ordering.
340                 //
341                 // We just want to maintain memory safety. It is ok to drop the
342                 // call to `register`.
343                 debug_assert!(
344                     state == REGISTERING ||
345                     state == REGISTERING | WAKING);
346             }
347         }
348     }
349 
350     /// Calls `wake` on the last `Waker` passed to `register`.
351     ///
352     /// If `register` has not been called yet, then this does nothing.
wake(&self)353     pub fn wake(&self) {
354         if let Some(waker) = self.take() {
355             waker.wake();
356         }
357     }
358 
359     /// Returns the last `Waker` passed to `register`, so that the user can wake it.
360     ///
361     ///
362     /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user
363     /// to take the waker and then wake it separately, rather than performing both steps in one
364     /// atomic action.
365     ///
366     /// If a waker has not been registered, this returns `None`.
take(&self) -> Option<Waker>367     pub fn take(&self) -> Option<Waker> {
368         // AcqRel ordering is used in order to acquire the value of the `task`
369         // cell as well as to establish a `release` ordering with whatever
370         // memory the `AtomicWaker` is associated with.
371         match self.state.fetch_or(WAKING, AcqRel) {
372             WAITING => {
373                 // The waking lock has been acquired.
374                 let waker = unsafe { (*self.waker.get()).take() };
375 
376                 // Release the lock
377                 self.state.fetch_and(!WAKING, Release);
378 
379                 waker
380             }
381             state => {
382                 // There is a concurrent thread currently updating the
383                 // associated task.
384                 //
385                 // Nothing more to do as the `WAKING` bit has been set. It
386                 // doesn't matter if there are concurrent registering threads or
387                 // not.
388                 //
389                 debug_assert!(
390                     state == REGISTERING ||
391                     state == REGISTERING | WAKING ||
392                     state == WAKING);
393                 None
394             }
395         }
396     }
397 }
398 
399 impl Default for AtomicWaker {
default() -> Self400     fn default() -> Self {
401         Self::new()
402     }
403 }
404 
405 impl fmt::Debug for AtomicWaker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result406     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
407         write!(f, "AtomicWaker")
408     }
409 }
410 
411 unsafe impl Send for AtomicWaker {}
412 unsafe impl Sync for AtomicWaker {}
413