1 use core::cell::UnsafeCell; 2 use core::fmt; 3 use core::sync::atomic::AtomicUsize; 4 use core::sync::atomic::Ordering::{Acquire, Release, AcqRel}; 5 use core::task::Waker; 6 7 /// A synchronization primitive for task wakeup. 8 /// 9 /// Sometimes the task interested in a given event will change over time. 10 /// An `AtomicWaker` can coordinate concurrent notifications with the consumer 11 /// potentially "updating" the underlying task to wake up. This is useful in 12 /// scenarios where a computation completes in another thread and wants to 13 /// notify the consumer, but the consumer is in the process of being migrated to 14 /// a new logical task. 15 /// 16 /// Consumers should call `register` before checking the result of a computation 17 /// and producers should call `wake` after producing the computation (this 18 /// differs from the usual `thread::park` pattern). It is also permitted for 19 /// `wake` to be called **before** `register`. This results in a no-op. 20 /// 21 /// A single `AtomicWaker` may be reused for any number of calls to `register` or 22 /// `wake`. 23 /// 24 /// # Memory ordering 25 /// 26 /// Calling `register` "acquires" all memory "released" by calls to `wake` 27 /// before the call to `register`. Later calls to `wake` will wake the 28 /// registered waker (on contention this wake might be triggered in `register`). 29 /// 30 /// For concurrent calls to `register` (should be avoided) the ordering is only 31 /// guaranteed for the winning call. 32 /// 33 /// # Examples 34 /// 35 /// Here is a simple example providing a `Flag` that can be signalled manually 36 /// when it is ready. 37 /// 38 /// ``` 39 /// use futures::future::Future; 40 /// use futures::task::{Context, Poll, AtomicWaker}; 41 /// use std::sync::Arc; 42 /// use std::sync::atomic::AtomicBool; 43 /// use std::sync::atomic::Ordering::Relaxed; 44 /// use std::pin::Pin; 45 /// 46 /// struct Inner { 47 /// waker: AtomicWaker, 48 /// set: AtomicBool, 49 /// } 50 /// 51 /// #[derive(Clone)] 52 /// struct Flag(Arc<Inner>); 53 /// 54 /// impl Flag { 55 /// pub fn new() -> Self { 56 /// Flag(Arc::new(Inner { 57 /// waker: AtomicWaker::new(), 58 /// set: AtomicBool::new(false), 59 /// })) 60 /// } 61 /// 62 /// pub fn signal(&self) { 63 /// self.0.set.store(true, Relaxed); 64 /// self.0.waker.wake(); 65 /// } 66 /// } 67 /// 68 /// impl Future for Flag { 69 /// type Output = (); 70 /// 71 /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { 72 /// // quick check to avoid registration if already done. 73 /// if self.0.set.load(Relaxed) { 74 /// return Poll::Ready(()); 75 /// } 76 /// 77 /// self.0.waker.register(cx.waker()); 78 /// 79 /// // Need to check condition **after** `register` to avoid a race 80 /// // condition that would result in lost notifications. 81 /// if self.0.set.load(Relaxed) { 82 /// Poll::Ready(()) 83 /// } else { 84 /// Poll::Pending 85 /// } 86 /// } 87 /// } 88 /// ``` 89 pub struct AtomicWaker { 90 state: AtomicUsize, 91 waker: UnsafeCell<Option<Waker>>, 92 } 93 94 // `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell 95 // stores a `Waker` value produced by calls to `register` and many threads can 96 // race to take the waker (to wake it) by calling `wake`. 97 // 98 // If a new `Waker` instance is produced by calling `register` before an 99 // existing one is consumed, then the existing one is overwritten. 100 // 101 // While `AtomicWaker` is single-producer, the implementation ensures memory 102 // safety. In the event of concurrent calls to `register`, there will be a 103 // single winner whose waker will get stored in the cell. The losers will not 104 // have their tasks woken. As such, callers should ensure to add synchronization 105 // to calls to `register`. 106 // 107 // The implementation uses a single `AtomicUsize` value to coordinate access to 108 // the `Waker` cell. There are two bits that are operated on independently. 109 // These are represented by `REGISTERING` and `WAKING`. 110 // 111 // The `REGISTERING` bit is set when a producer enters the critical section. The 112 // `WAKING` bit is set when a consumer enters the critical section. Neither bit 113 // being set is represented by `WAITING`. 114 // 115 // A thread obtains an exclusive lock on the waker cell by transitioning the 116 // state from `WAITING` to `REGISTERING` or `WAKING`, depending on the operation 117 // the thread wishes to perform. When this transition is made, it is guaranteed 118 // that no other thread will access the waker cell. 119 // 120 // # Registering 121 // 122 // On a call to `register`, an attempt to transition the state from WAITING to 123 // REGISTERING is made. On success, the caller obtains a lock on the waker cell. 124 // 125 // If the lock is obtained, then the thread sets the waker cell to the waker 126 // provided as an argument. Then it attempts to transition the state back from 127 // `REGISTERING` -> `WAITING`. 128 // 129 // If this transition is successful, then the registering process is complete 130 // and the next call to `wake` will observe the waker. 131 // 132 // If the transition fails, then there was a concurrent call to `wake` that was 133 // unable to access the waker cell (due to the registering thread holding the 134 // lock). To handle this, the registering thread removes the waker it just set 135 // from the cell and calls `wake` on it. This call to wake represents the 136 // attempt to wake by the other thread (that set the `WAKING` bit). The state is 137 // then transitioned from `REGISTERING | WAKING` back to `WAITING`. This 138 // transition must succeed because, at this point, the state cannot be 139 // transitioned by another thread. 140 // 141 // # Waking 142 // 143 // On a call to `wake`, an attempt to transition the state from `WAITING` to 144 // `WAKING` is made. On success, the caller obtains a lock on the waker cell. 145 // 146 // If the lock is obtained, then the thread takes ownership of the current value 147 // in the waker cell, and calls `wake` on it. The state is then transitioned 148 // back to `WAITING`. This transition must succeed as, at this point, the state 149 // cannot be transitioned by another thread. 150 // 151 // If the thread is unable to obtain the lock, the `WAKING` bit is still. This 152 // is because it has either been set by the current thread but the previous 153 // value included the `REGISTERING` bit **or** a concurrent thread is in the 154 // `WAKING` critical section. Either way, no action must be taken. 155 // 156 // If the current thread is the only concurrent call to `wake` and another 157 // thread is in the `register` critical section, when the other thread **exits** 158 // the `register` critical section, it will observe the `WAKING` bit and handle 159 // the wake itself. 160 // 161 // If another thread is in the `wake` critical section, then it will handle 162 // waking the task. 163 // 164 // # A potential race (is safely handled). 165 // 166 // Imagine the following situation: 167 // 168 // * Thread A obtains the `wake` lock and wakes a task. 169 // 170 // * Before thread A releases the `wake` lock, the woken task is scheduled. 171 // 172 // * Thread B attempts to wake the task. In theory this should result in the 173 // task being woken, but it cannot because thread A still holds the wake lock. 174 // 175 // This case is handled by requiring users of `AtomicWaker` to call `register` 176 // **before** attempting to observe the application state change that resulted 177 // in the task being awoken. The wakers also change the application state before 178 // calling wake. 179 // 180 // Because of this, the waker will do one of two things. 181 // 182 // 1) Observe the application state change that Thread B is woken for. In this 183 // case, it is OK for Thread B's wake to be lost. 184 // 185 // 2) Call register before attempting to observe the application state. Since 186 // Thread A still holds the `wake` lock, the call to `register` will result 187 // in the task waking itself and get scheduled again. 188 189 /// Idle state 190 const WAITING: usize = 0; 191 192 /// A new waker value is being registered with the `AtomicWaker` cell. 193 const REGISTERING: usize = 0b01; 194 195 /// The waker currently registered with the `AtomicWaker` cell is being woken. 196 const WAKING: usize = 0b10; 197 198 impl AtomicWaker { 199 /// Create an `AtomicWaker`. new() -> Self200 pub const fn new() -> Self { 201 // Make sure that task is Sync 202 trait AssertSync: Sync {} 203 impl AssertSync for Waker {} 204 205 AtomicWaker { 206 state: AtomicUsize::new(WAITING), 207 waker: UnsafeCell::new(None), 208 } 209 } 210 211 /// Registers the waker to be notified on calls to `wake`. 212 /// 213 /// The new task will take place of any previous tasks that were registered 214 /// by previous calls to `register`. Any calls to `wake` that happen after 215 /// a call to `register` (as defined by the memory ordering rules), will 216 /// notify the `register` caller's task and deregister the waker from future 217 /// notifications. Because of this, callers should ensure `register` gets 218 /// invoked with a new `Waker` **each** time they require a wakeup. 219 /// 220 /// It is safe to call `register` with multiple other threads concurrently 221 /// calling `wake`. This will result in the `register` caller's current 222 /// task being notified once. 223 /// 224 /// This function is safe to call concurrently, but this is generally a bad 225 /// idea. Concurrent calls to `register` will attempt to register different 226 /// tasks to be notified. One of the callers will win and have its task set, 227 /// but there is no guarantee as to which caller will succeed. 228 /// 229 /// # Examples 230 /// 231 /// Here is how `register` is used when implementing a flag. 232 /// 233 /// ``` 234 /// use futures::future::Future; 235 /// use futures::task::{Context, Poll, AtomicWaker}; 236 /// use std::sync::atomic::AtomicBool; 237 /// use std::sync::atomic::Ordering::Relaxed; 238 /// use std::pin::Pin; 239 /// 240 /// struct Flag { 241 /// waker: AtomicWaker, 242 /// set: AtomicBool, 243 /// } 244 /// 245 /// impl Future for Flag { 246 /// type Output = (); 247 /// 248 /// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { 249 /// // Register **before** checking `set` to avoid a race condition 250 /// // that would result in lost notifications. 251 /// self.waker.register(cx.waker()); 252 /// 253 /// if self.set.load(Relaxed) { 254 /// Poll::Ready(()) 255 /// } else { 256 /// Poll::Pending 257 /// } 258 /// } 259 /// } 260 /// ``` register(&self, waker: &Waker)261 pub fn register(&self, waker: &Waker) { 262 match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { 263 WAITING => { 264 unsafe { 265 // Locked acquired, update the waker cell 266 *self.waker.get() = Some(waker.clone()); 267 268 // Release the lock. If the state transitioned to include 269 // the `WAKING` bit, this means that at least one wake has 270 // been called concurrently. 271 // 272 // Start by assuming that the state is `REGISTERING` as this 273 // is what we just set it to. If this holds, we know that no 274 // other writes were performed in the meantime, so there is 275 // nothing to acquire, only release. In case of concurrent 276 // wakers, we need to acquire their releases, so success needs 277 // to do both. 278 let res = self.state.compare_exchange( 279 REGISTERING, WAITING, AcqRel, Acquire); 280 281 match res { 282 Ok(_) => { 283 // memory ordering: acquired self.state during CAS 284 // - if previous wakes went through it syncs with 285 // their final release (`fetch_and`) 286 // - if there was no previous wake the next wake 287 // will wake us, no sync needed. 288 } 289 Err(actual) => { 290 // This branch can only be reached if at least one 291 // concurrent thread called `wake`. In this 292 // case, `actual` **must** be `REGISTERING | 293 // `WAKING`. 294 debug_assert_eq!(actual, REGISTERING | WAKING); 295 296 // Take the waker to wake once the atomic operation has 297 // completed. 298 let waker = (*self.waker.get()).take().unwrap(); 299 300 // We need to return to WAITING state (clear our lock and 301 // concurrent WAKING flag). This needs to acquire all 302 // WAKING fetch_or releases and it needs to release our 303 // update to self.waker, so we need a `swap` operation. 304 self.state.swap(WAITING, AcqRel); 305 306 // memory ordering: we acquired the state for all 307 // concurrent wakes, but future wakes might still 308 // need to wake us in case we can't make progress 309 // from the pending wakes. 310 // 311 // So we simply schedule to come back later (we could 312 // also simply leave the registration in place above). 313 waker.wake(); 314 } 315 } 316 } 317 } 318 WAKING => { 319 // Currently in the process of waking the task, i.e., 320 // `wake` is currently being called on the old task handle. 321 // 322 // memory ordering: we acquired the state for all 323 // concurrent wakes, but future wakes might still 324 // need to wake us in case we can't make progress 325 // from the pending wakes. 326 // 327 // So we simply schedule to come back later (we 328 // could also spin here trying to acquire the lock 329 // to register). 330 waker.wake_by_ref(); 331 } 332 state => { 333 // In this case, a concurrent thread is holding the 334 // "registering" lock. This probably indicates a bug in the 335 // caller's code as racing to call `register` doesn't make much 336 // sense. 337 // 338 // memory ordering: don't care. a concurrent register() is going 339 // to succeed and provide proper memory ordering. 340 // 341 // We just want to maintain memory safety. It is ok to drop the 342 // call to `register`. 343 debug_assert!( 344 state == REGISTERING || 345 state == REGISTERING | WAKING); 346 } 347 } 348 } 349 350 /// Calls `wake` on the last `Waker` passed to `register`. 351 /// 352 /// If `register` has not been called yet, then this does nothing. wake(&self)353 pub fn wake(&self) { 354 if let Some(waker) = self.take() { 355 waker.wake(); 356 } 357 } 358 359 /// Returns the last `Waker` passed to `register`, so that the user can wake it. 360 /// 361 /// 362 /// Sometimes, just waking the AtomicWaker is not fine grained enough. This allows the user 363 /// to take the waker and then wake it separately, rather than performing both steps in one 364 /// atomic action. 365 /// 366 /// If a waker has not been registered, this returns `None`. take(&self) -> Option<Waker>367 pub fn take(&self) -> Option<Waker> { 368 // AcqRel ordering is used in order to acquire the value of the `task` 369 // cell as well as to establish a `release` ordering with whatever 370 // memory the `AtomicWaker` is associated with. 371 match self.state.fetch_or(WAKING, AcqRel) { 372 WAITING => { 373 // The waking lock has been acquired. 374 let waker = unsafe { (*self.waker.get()).take() }; 375 376 // Release the lock 377 self.state.fetch_and(!WAKING, Release); 378 379 waker 380 } 381 state => { 382 // There is a concurrent thread currently updating the 383 // associated task. 384 // 385 // Nothing more to do as the `WAKING` bit has been set. It 386 // doesn't matter if there are concurrent registering threads or 387 // not. 388 // 389 debug_assert!( 390 state == REGISTERING || 391 state == REGISTERING | WAKING || 392 state == WAKING); 393 None 394 } 395 } 396 } 397 } 398 399 impl Default for AtomicWaker { default() -> Self400 fn default() -> Self { 401 AtomicWaker::new() 402 } 403 } 404 405 impl fmt::Debug for AtomicWaker { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result406 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 407 write!(f, "AtomicWaker") 408 } 409 } 410 411 unsafe impl Send for AtomicWaker {} 412 unsafe impl Sync for AtomicWaker {} 413