1 use crate::loom::sync::atomic::AtomicUsize;
2 
3 use std::fmt;
4 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5 use std::usize;
6 
7 pub(super) struct State {
8     val: AtomicUsize,
9 }
10 
11 /// Current state value
12 #[derive(Copy, Clone)]
13 pub(super) struct Snapshot(usize);
14 
15 /// The task is currently being run.
16 const RUNNING: usize = 0b00_0001;
17 
18 /// The task has been notified by a waker.
19 const NOTIFIED: usize = 0b00_0010;
20 
21 /// The task is complete.
22 ///
23 /// Once this bit is set, it is never unset
24 const COMPLETE: usize = 0b00_0100;
25 
26 /// The primary task handle has been dropped.
27 const RELEASED: usize = 0b00_1000;
28 
29 /// The join handle is still around
30 const JOIN_INTEREST: usize = 0b01_0000;
31 
32 /// A join handle waker has been set
33 const JOIN_WAKER: usize = 0b10_0000;
34 
35 /// The task has been forcibly canceled.
36 const CANCELLED: usize = 0b100_0000;
37 
38 /// All bits
39 const LIFECYCLE_MASK: usize =
40     RUNNING | NOTIFIED | COMPLETE | RELEASED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
41 
42 /// Bits used by the waker ref count portion of the state.
43 ///
44 /// Ref counts only cover **wakers**. Other handles are tracked with other state
45 /// bits.
46 const WAKER_COUNT_MASK: usize = usize::MAX - LIFECYCLE_MASK;
47 
48 /// Number of positions to shift the ref count
49 const WAKER_COUNT_SHIFT: usize = WAKER_COUNT_MASK.count_zeros() as usize;
50 
51 /// One ref count
52 const WAKER_ONE: usize = 1 << WAKER_COUNT_SHIFT;
53 
54 /// Initial state
55 const INITIAL_STATE: usize = NOTIFIED;
56 
57 /// All transitions are performed via RMW operations. This establishes an
58 /// unambiguous modification order.
59 impl State {
60     /// Starts with a ref count of 2
new_joinable() -> State61     pub(super) fn new_joinable() -> State {
62         State {
63             val: AtomicUsize::new(INITIAL_STATE | JOIN_INTEREST),
64         }
65     }
66 
67     /// Loads the current state, establishes `Acquire` ordering.
load(&self) -> Snapshot68     pub(super) fn load(&self) -> Snapshot {
69         Snapshot(self.val.load(Acquire))
70     }
71 
72     /// Transitions a task to the `Running` state.
73     ///
74     /// Returns a snapshot of the state **after** the transition.
transition_to_running(&self) -> Snapshot75     pub(super) fn transition_to_running(&self) -> Snapshot {
76         const DELTA: usize = RUNNING | NOTIFIED;
77 
78         let prev = Snapshot(self.val.fetch_xor(DELTA, Acquire));
79         debug_assert!(prev.is_notified());
80 
81         if prev.is_running() {
82             // We were signalled to cancel
83             //
84             // Apply the state
85             let prev = self.val.fetch_or(CANCELLED, AcqRel);
86             return Snapshot(prev | CANCELLED);
87         }
88 
89         debug_assert!(!prev.is_running());
90 
91         let next = Snapshot(prev.0 ^ DELTA);
92 
93         debug_assert!(next.is_running());
94         debug_assert!(!next.is_notified());
95 
96         next
97     }
98 
99     /// Transitions the task from `Running` -> `Idle`.
100     ///
101     /// Returns a snapshot of the state **after** the transition.
transition_to_idle(&self) -> Snapshot102     pub(super) fn transition_to_idle(&self) -> Snapshot {
103         const DELTA: usize = RUNNING;
104 
105         let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
106 
107         if !prev.is_running() {
108             // We were signaled to cancel.
109             //
110             // Apply the state
111             let prev = self.val.fetch_or(CANCELLED, AcqRel);
112             return Snapshot(prev | CANCELLED);
113         }
114 
115         let next = Snapshot(prev.0 ^ DELTA);
116 
117         debug_assert!(!next.is_running());
118 
119         next
120     }
121 
122     /// Transitions the task from `Running` -> `Complete`.
123     ///
124     /// Returns a snapshot of the state **after** the transition.
transition_to_complete(&self) -> Snapshot125     pub(super) fn transition_to_complete(&self) -> Snapshot {
126         const DELTA: usize = RUNNING | COMPLETE;
127 
128         let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
129 
130         debug_assert!(!prev.is_complete());
131 
132         let next = Snapshot(prev.0 ^ DELTA);
133 
134         debug_assert!(next.is_complete());
135 
136         next
137     }
138 
139     /// Transitions the task from `Running` -> `Released`.
140     ///
141     /// Returns a snapshot of the state **after** the transition.
transition_to_released(&self) -> Snapshot142     pub(super) fn transition_to_released(&self) -> Snapshot {
143         const DELTA: usize = RUNNING | COMPLETE | RELEASED;
144 
145         let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
146 
147         debug_assert!(prev.is_running());
148         debug_assert!(!prev.is_complete());
149         debug_assert!(!prev.is_released());
150 
151         let next = Snapshot(prev.0 ^ DELTA);
152 
153         debug_assert!(!next.is_running());
154         debug_assert!(next.is_complete());
155         debug_assert!(next.is_released());
156 
157         next
158     }
159 
160     /// Transitions the task to the canceled state.
161     ///
162     /// Returns the snapshot of the state **after** the transition **if** the
163     /// transition was made successfully
164     ///
165     /// # States
166     ///
167     /// - Notifed: task may be in a queue, caller must not release.
168     /// - Running: cannot drop. The poll handle will handle releasing.
169     /// - Other prior states do not require cancellation.
170     ///
171     /// If the task has been notified, then it may still be in a queue. The
172     /// caller must not release the task.
transition_to_canceled_from_queue(&self) -> Snapshot173     pub(super) fn transition_to_canceled_from_queue(&self) -> Snapshot {
174         let prev = Snapshot(self.val.fetch_or(CANCELLED, AcqRel));
175 
176         debug_assert!(!prev.is_complete());
177         debug_assert!(!prev.is_running() || prev.is_notified());
178 
179         Snapshot(prev.0 | CANCELLED)
180     }
181 
transition_to_canceled_from_list(&self) -> Option<Snapshot>182     pub(super) fn transition_to_canceled_from_list(&self) -> Option<Snapshot> {
183         let mut prev = self.load();
184 
185         loop {
186             if !prev.is_active() {
187                 return None;
188             }
189 
190             let mut next = prev;
191 
192             // Use the running flag to signal cancellation
193             if prev.is_running() {
194                 next.0 -= RUNNING;
195                 next.0 |= NOTIFIED;
196             } else if prev.is_notified() {
197                 next.0 += RUNNING;
198                 next.0 |= NOTIFIED;
199             } else {
200                 next.0 |= CANCELLED;
201             }
202 
203             let res = self.val.compare_exchange(prev.0, next.0, AcqRel, Acquire);
204 
205             match res {
206                 Ok(_) if next.is_canceled() => return Some(next),
207                 Ok(_) => return None,
208                 Err(actual) => {
209                     prev = Snapshot(actual);
210                 }
211             }
212         }
213     }
214 
215     /// Transitions to `Released`. Called when primary task handle is
216     /// dropped. This is roughly a "ref decrement" operation.
217     ///
218     /// Returns a snapshot of the state **after** the transition.
release_task(&self) -> Snapshot219     pub(super) fn release_task(&self) -> Snapshot {
220         use crate::loom::sync::atomic;
221 
222         const DELTA: usize = RELEASED;
223 
224         let prev = Snapshot(self.val.fetch_or(DELTA, Release));
225 
226         debug_assert!(!prev.is_released());
227         debug_assert!(prev.is_terminal(), "state = {:?}", prev);
228 
229         let next = Snapshot(prev.0 | DELTA);
230 
231         debug_assert!(next.is_released());
232 
233         if next.is_final_ref() || (next.has_join_waker() && !next.is_join_interested()) {
234             // The final reference to the task was dropped, the caller must free the
235             // memory. Establish an acquire ordering.
236             atomic::fence(Acquire);
237         }
238 
239         next
240     }
241 
242     /// Transitions the state to `Scheduled`.
243     ///
244     /// Returns `true` if the task needs to be submitted to the pool for
245     /// execution
transition_to_notified(&self) -> bool246     pub(super) fn transition_to_notified(&self) -> bool {
247         const MASK: usize = RUNNING | NOTIFIED | COMPLETE | CANCELLED;
248 
249         let prev = self.val.fetch_or(NOTIFIED, Release);
250         prev & MASK == 0
251     }
252 
253     /// Optimistically tries to swap the state assuming the join handle is
254     /// __immediately__ dropped on spawn
drop_join_handle_fast(&self) -> bool255     pub(super) fn drop_join_handle_fast(&self) -> bool {
256         use std::sync::atomic::Ordering::Relaxed;
257 
258         // Relaxed is acceptable as if this function is called and succeeds,
259         // then nothing has been done w/ the join handle.
260         //
261         // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
262         // set, at which point the CAS will fail.
263         //
264         // Given this, there is no risk if this operation is reordered.
265         self.val
266             .compare_exchange_weak(
267                 INITIAL_STATE | JOIN_INTEREST,
268                 INITIAL_STATE,
269                 Release,
270                 Relaxed,
271             )
272             .is_ok()
273     }
274 
275     /// The join handle has completed by reading the output.
276     ///
277     /// Returns a snapshot of the state **after** the transition.
complete_join_handle(&self) -> Snapshot278     pub(super) fn complete_join_handle(&self) -> Snapshot {
279         use crate::loom::sync::atomic;
280 
281         const DELTA: usize = JOIN_INTEREST;
282 
283         let prev = Snapshot(self.val.fetch_sub(DELTA, Release));
284 
285         debug_assert!(prev.is_join_interested());
286 
287         let next = Snapshot(prev.0 - DELTA);
288 
289         if !next.is_final_ref() {
290             return next;
291         }
292 
293         atomic::fence(Acquire);
294 
295         next
296     }
297 
298     /// The join handle is being dropped, this fails if the task has been
299     /// completed and the output must be dropped first then
300     /// `complete_join_handle` should be called.
301     ///
302     /// Returns a snapshot of the state **after** the transition.
drop_join_handle_slow(&self) -> Result<Snapshot, Snapshot>303     pub(super) fn drop_join_handle_slow(&self) -> Result<Snapshot, Snapshot> {
304         const MASK: usize = COMPLETE | CANCELLED;
305 
306         let mut prev = self.val.load(Acquire);
307 
308         loop {
309             // Once the complete bit is set, it is never unset.
310             if prev & MASK != 0 {
311                 return Err(Snapshot(prev));
312             }
313 
314             debug_assert!(prev & JOIN_INTEREST == JOIN_INTEREST);
315 
316             let next = (prev - JOIN_INTEREST) & !JOIN_WAKER;
317 
318             let res = self.val.compare_exchange(prev, next, AcqRel, Acquire);
319 
320             match res {
321                 Ok(_) => {
322                     return Ok(Snapshot(next));
323                 }
324                 Err(actual) => {
325                     prev = actual;
326                 }
327             }
328         }
329     }
330 
331     /// Stores the join waker.
store_join_waker(&self) -> Snapshot332     pub(super) fn store_join_waker(&self) -> Snapshot {
333         use crate::loom::sync::atomic;
334 
335         const DELTA: usize = JOIN_WAKER;
336 
337         let prev = Snapshot(self.val.fetch_xor(DELTA, Release));
338 
339         debug_assert!(!prev.has_join_waker());
340 
341         let next = Snapshot(prev.0 ^ DELTA);
342 
343         debug_assert!(next.has_join_waker());
344 
345         if next.is_complete() {
346             atomic::fence(Acquire);
347         }
348 
349         next
350     }
351 
unset_waker(&self) -> Snapshot352     pub(super) fn unset_waker(&self) -> Snapshot {
353         const MASK: usize = COMPLETE | CANCELLED;
354 
355         let mut prev = self.val.load(Acquire);
356 
357         loop {
358             // Once the `COMPLETE` bit is set, it is never unset
359             if prev & MASK != 0 {
360                 return Snapshot(prev);
361             }
362 
363             debug_assert!(Snapshot(prev).has_join_waker());
364 
365             let next = prev - JOIN_WAKER;
366 
367             let res = self.val.compare_exchange(prev, next, AcqRel, Acquire);
368 
369             match res {
370                 Ok(_) => return Snapshot(next),
371                 Err(actual) => {
372                     prev = actual;
373                 }
374             }
375         }
376     }
377 
ref_inc(&self)378     pub(super) fn ref_inc(&self) {
379         use std::process;
380         use std::sync::atomic::Ordering::Relaxed;
381 
382         // Using a relaxed ordering is alright here, as knowledge of the
383         // original reference prevents other threads from erroneously deleting
384         // the object.
385         //
386         // As explained in the [Boost documentation][1], Increasing the
387         // reference counter can always be done with memory_order_relaxed: New
388         // references to an object can only be formed from an existing
389         // reference, and passing an existing reference from one thread to
390         // another must already provide any required synchronization.
391         //
392         // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
393         let prev = self.val.fetch_add(WAKER_ONE, Relaxed);
394 
395         // If the reference count overflowed, abort.
396         if prev > isize::max_value() as usize {
397             process::abort();
398         }
399     }
400 
401     /// Returns `true` if the task should be released.
ref_dec(&self) -> bool402     pub(super) fn ref_dec(&self) -> bool {
403         use crate::loom::sync::atomic;
404 
405         let prev = self.val.fetch_sub(WAKER_ONE, Release);
406         let next = Snapshot(prev - WAKER_ONE);
407 
408         if next.is_final_ref() {
409             atomic::fence(Acquire);
410         }
411 
412         next.is_final_ref()
413     }
414 }
415 
416 impl Snapshot {
is_running(self) -> bool417     pub(super) fn is_running(self) -> bool {
418         self.0 & RUNNING == RUNNING
419     }
420 
is_notified(self) -> bool421     pub(super) fn is_notified(self) -> bool {
422         self.0 & NOTIFIED == NOTIFIED
423     }
424 
is_released(self) -> bool425     pub(super) fn is_released(self) -> bool {
426         self.0 & RELEASED == RELEASED
427     }
428 
is_complete(self) -> bool429     pub(super) fn is_complete(self) -> bool {
430         self.0 & COMPLETE == COMPLETE
431     }
432 
is_canceled(self) -> bool433     pub(super) fn is_canceled(self) -> bool {
434         self.0 & CANCELLED == CANCELLED
435     }
436 
437     /// Used during normal runtime.
is_active(self) -> bool438     pub(super) fn is_active(self) -> bool {
439         self.0 & (COMPLETE | CANCELLED) == 0
440     }
441 
442     /// Used before dropping the task
is_terminal(self) -> bool443     pub(super) fn is_terminal(self) -> bool {
444         // When both the notified & running flags are set, the task was canceled
445         // after being notified, before it was run.
446         //
447         // There is a race where:
448         // - The task state transitions to notified
449         // - The global queue is shutdown
450         // - The waker attempts to push into the global queue and fails.
451         // - The waker holds the last reference to the task, thus drops it.
452         //
453         // In this scenario, the cancelled bit will never get set.
454         !self.is_active() || (self.is_notified() && self.is_running())
455     }
456 
is_join_interested(self) -> bool457     pub(super) fn is_join_interested(self) -> bool {
458         self.0 & JOIN_INTEREST == JOIN_INTEREST
459     }
460 
has_join_waker(self) -> bool461     pub(super) fn has_join_waker(self) -> bool {
462         self.0 & JOIN_WAKER == JOIN_WAKER
463     }
464 
is_final_ref(self) -> bool465     pub(super) fn is_final_ref(self) -> bool {
466         const MASK: usize = WAKER_COUNT_MASK | RELEASED | JOIN_INTEREST;
467 
468         (self.0 & MASK) == RELEASED
469     }
470 }
471 
472 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result473     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
474         use std::sync::atomic::Ordering::SeqCst;
475 
476         let snapshot = Snapshot(self.val.load(SeqCst));
477 
478         fmt.debug_struct("State")
479             .field("snapshot", &snapshot)
480             .finish()
481     }
482 }
483 
484 impl fmt::Debug for Snapshot {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result485     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
486         fmt.debug_struct("Snapshot")
487             .field("is_running", &self.is_running())
488             .field("is_notified", &self.is_notified())
489             .field("is_released", &self.is_released())
490             .field("is_complete", &self.is_complete())
491             .field("is_canceled", &self.is_canceled())
492             .field("is_join_interested", &self.is_join_interested())
493             .field("has_join_waker", &self.has_join_waker())
494             .field("is_final_ref", &self.is_final_ref())
495             .finish()
496     }
497 }
498