1 use crate::loom::sync::atomic::AtomicUsize;
2 
3 use std::fmt;
4 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5 use std::usize;
6 
7 pub(super) struct State {
8     val: AtomicUsize,
9 }
10 
11 /// Current state value
12 #[derive(Copy, Clone)]
13 pub(super) struct Snapshot(usize);
14 
15 type UpdateResult = Result<Snapshot, Snapshot>;
16 
17 /// The task is currently being run.
18 const RUNNING: usize = 0b0001;
19 
20 /// The task is complete.
21 ///
22 /// Once this bit is set, it is never unset
23 const COMPLETE: usize = 0b0010;
24 
25 /// Extracts the task's lifecycle value from the state
26 const LIFECYCLE_MASK: usize = 0b11;
27 
28 /// Flag tracking if the task has been pushed into a run queue.
29 const NOTIFIED: usize = 0b100;
30 
31 /// The join handle is still around
32 const JOIN_INTEREST: usize = 0b1_000;
33 
34 /// A join handle waker has been set
35 const JOIN_WAKER: usize = 0b10_000;
36 
37 /// The task has been forcibly cancelled.
38 const CANCELLED: usize = 0b100_000;
39 
40 /// All bits
41 const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
42 
43 /// Bits used by the ref count portion of the state.
44 const REF_COUNT_MASK: usize = !STATE_MASK;
45 
46 /// Number of positions to shift the ref count
47 const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
48 
49 /// One ref count
50 const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
51 
52 /// State a task is initialized with
53 ///
54 /// A task is initialized with two references: one for the scheduler and one for
55 /// the `JoinHandle`. As the task starts with a `JoinHandle`, `JOIN_INTERST` is
56 /// set. A new task is immediately pushed into the run queue for execution and
57 /// starts with the `NOTIFIED` flag set.
58 const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED;
59 
60 /// All transitions are performed via RMW operations. This establishes an
61 /// unambiguous modification order.
62 impl State {
63     /// Return a task's initial state
new() -> State64     pub(super) fn new() -> State {
65         // A task is initialized with three references: one for the scheduler,
66         // one for the `JoinHandle`, one for the task handle made available in
67         // release. As the task starts with a `JoinHandle`, `JOIN_INTERST` is
68         // set. A new task is immediately pushed into the run queue for
69         // execution and starts with the `NOTIFIED` flag set.
70         State {
71             val: AtomicUsize::new(INITIAL_STATE),
72         }
73     }
74 
75     /// Loads the current state, establishes `Acquire` ordering.
load(&self) -> Snapshot76     pub(super) fn load(&self) -> Snapshot {
77         Snapshot(self.val.load(Acquire))
78     }
79 
80     /// Attempt to transition the lifecycle to `Running`.
81     ///
82     /// If `ref_inc` is set, the reference count is also incremented.
83     ///
84     /// The `NOTIFIED` bit is always unset.
transition_to_running(&self, ref_inc: bool) -> UpdateResult85     pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult {
86         self.fetch_update(|curr| {
87             assert!(curr.is_notified());
88 
89             let mut next = curr;
90 
91             if !next.is_idle() {
92                 return None;
93             }
94 
95             if ref_inc {
96                 next.ref_inc();
97             }
98 
99             next.set_running();
100             next.unset_notified();
101             Some(next)
102         })
103     }
104 
105     /// Transitions the task from `Running` -> `Idle`.
106     ///
107     /// Returns `Ok` if the transition to `Idle` is successful, `Err` otherwise.
108     /// In both cases, a snapshot of the state from **after** the transition is
109     /// returned.
110     ///
111     /// The transition to `Idle` fails if the task has been flagged to be
112     /// cancelled.
transition_to_idle(&self) -> UpdateResult113     pub(super) fn transition_to_idle(&self) -> UpdateResult {
114         self.fetch_update(|curr| {
115             assert!(curr.is_running());
116 
117             if curr.is_cancelled() {
118                 return None;
119             }
120 
121             let mut next = curr;
122             next.unset_running();
123 
124             if next.is_notified() {
125                 // The caller needs to schedule the task. To do this, it needs a
126                 // waker. The waker requires a ref count.
127                 next.ref_inc();
128             }
129 
130             Some(next)
131         })
132     }
133 
134     /// Transitions the task from `Running` -> `Complete`.
transition_to_complete(&self) -> Snapshot135     pub(super) fn transition_to_complete(&self) -> Snapshot {
136         const DELTA: usize = RUNNING | COMPLETE;
137 
138         let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
139         assert!(prev.is_running());
140         assert!(!prev.is_complete());
141 
142         Snapshot(prev.0 ^ DELTA)
143     }
144 
145     /// Transition from `Complete` -> `Terminal`, decrementing the reference
146     /// count by 1.
147     ///
148     /// When `ref_dec` is set, an additional ref count decrement is performed.
149     /// This is used to batch atomic ops when possible.
transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot150     pub(super) fn transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot {
151         self.fetch_update(|mut snapshot| {
152             if complete {
153                 snapshot.set_complete();
154             } else {
155                 assert!(snapshot.is_complete());
156             }
157 
158             // Decrement the primary handle
159             snapshot.ref_dec();
160 
161             if ref_dec {
162                 // Decrement a second time
163                 snapshot.ref_dec();
164             }
165 
166             Some(snapshot)
167         })
168         .unwrap()
169     }
170 
171     /// Transitions the state to `NOTIFIED`.
172     ///
173     /// Returns `true` if the task needs to be submitted to the pool for
174     /// execution
transition_to_notified(&self) -> bool175     pub(super) fn transition_to_notified(&self) -> bool {
176         let prev = Snapshot(self.val.fetch_or(NOTIFIED, AcqRel));
177         prev.will_need_queueing()
178     }
179 
180     /// Set the `CANCELLED` bit and attempt to transition to `Running`.
181     ///
182     /// Returns `true` if the transition to `Running` succeeded.
transition_to_shutdown(&self) -> bool183     pub(super) fn transition_to_shutdown(&self) -> bool {
184         let mut prev = Snapshot(0);
185 
186         let _ = self.fetch_update(|mut snapshot| {
187             prev = snapshot;
188 
189             if snapshot.is_idle() {
190                 snapshot.set_running();
191 
192                 if snapshot.is_notified() {
193                     // If the task is idle and notified, this indicates the task is
194                     // in the run queue and is considered owned by the scheduler.
195                     // The shutdown operation claims ownership of the task, which
196                     // means we need to assign an additional ref-count to the task
197                     // in the queue.
198                     snapshot.ref_inc();
199                 }
200             }
201 
202             snapshot.set_cancelled();
203             Some(snapshot)
204         });
205 
206         prev.is_idle()
207     }
208 
209     /// Optimistically tries to swap the state assuming the join handle is
210     /// __immediately__ dropped on spawn
drop_join_handle_fast(&self) -> Result<(), ()>211     pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
212         use std::sync::atomic::Ordering::Relaxed;
213 
214         // Relaxed is acceptable as if this function is called and succeeds,
215         // then nothing has been done w/ the join handle.
216         //
217         // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
218         // set, at which point the CAS will fail.
219         //
220         // Given this, there is no risk if this operation is reordered.
221         self.val
222             .compare_exchange_weak(
223                 INITIAL_STATE,
224                 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
225                 Release,
226                 Relaxed,
227             )
228             .map(|_| ())
229             .map_err(|_| ())
230     }
231 
232     /// Try to unset the JOIN_INTEREST flag.
233     ///
234     /// Returns `Ok` if the operation happens before the task transitions to a
235     /// completed state, `Err` otherwise.
unset_join_interested(&self) -> UpdateResult236     pub(super) fn unset_join_interested(&self) -> UpdateResult {
237         self.fetch_update(|curr| {
238             assert!(curr.is_join_interested());
239 
240             if curr.is_complete() {
241                 return None;
242             }
243 
244             let mut next = curr;
245             next.unset_join_interested();
246 
247             Some(next)
248         })
249     }
250 
251     /// Set the `JOIN_WAKER` bit.
252     ///
253     /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
254     /// the task has completed.
set_join_waker(&self) -> UpdateResult255     pub(super) fn set_join_waker(&self) -> UpdateResult {
256         self.fetch_update(|curr| {
257             assert!(curr.is_join_interested());
258             assert!(!curr.has_join_waker());
259 
260             if curr.is_complete() {
261                 return None;
262             }
263 
264             let mut next = curr;
265             next.set_join_waker();
266 
267             Some(next)
268         })
269     }
270 
271     /// Unsets the `JOIN_WAKER` bit.
272     ///
273     /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
274     /// the task has completed.
unset_waker(&self) -> UpdateResult275     pub(super) fn unset_waker(&self) -> UpdateResult {
276         self.fetch_update(|curr| {
277             assert!(curr.is_join_interested());
278             assert!(curr.has_join_waker());
279 
280             if curr.is_complete() {
281                 return None;
282             }
283 
284             let mut next = curr;
285             next.unset_join_waker();
286 
287             Some(next)
288         })
289     }
290 
ref_inc(&self)291     pub(super) fn ref_inc(&self) {
292         use std::process;
293         use std::sync::atomic::Ordering::Relaxed;
294 
295         // Using a relaxed ordering is alright here, as knowledge of the
296         // original reference prevents other threads from erroneously deleting
297         // the object.
298         //
299         // As explained in the [Boost documentation][1], Increasing the
300         // reference counter can always be done with memory_order_relaxed: New
301         // references to an object can only be formed from an existing
302         // reference, and passing an existing reference from one thread to
303         // another must already provide any required synchronization.
304         //
305         // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
306         let prev = self.val.fetch_add(REF_ONE, Relaxed);
307 
308         // If the reference count overflowed, abort.
309         if prev > isize::max_value() as usize {
310             process::abort();
311         }
312     }
313 
314     /// Returns `true` if the task should be released.
ref_dec(&self) -> bool315     pub(super) fn ref_dec(&self) -> bool {
316         let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
317         prev.ref_count() == 1
318     }
319 
fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> where F: FnMut(Snapshot) -> Option<Snapshot>,320     fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
321     where
322         F: FnMut(Snapshot) -> Option<Snapshot>,
323     {
324         let mut curr = self.load();
325 
326         loop {
327             let next = match f(curr) {
328                 Some(next) => next,
329                 None => return Err(curr),
330             };
331 
332             let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
333 
334             match res {
335                 Ok(_) => return Ok(next),
336                 Err(actual) => curr = Snapshot(actual),
337             }
338         }
339     }
340 }
341 
342 // ===== impl Snapshot =====
343 
344 impl Snapshot {
345     /// Returns `true` if the task is in an idle state.
is_idle(self) -> bool346     pub(super) fn is_idle(self) -> bool {
347         self.0 & (RUNNING | COMPLETE) == 0
348     }
349 
350     /// Returns `true` if the task has been flagged as notified.
is_notified(self) -> bool351     pub(super) fn is_notified(self) -> bool {
352         self.0 & NOTIFIED == NOTIFIED
353     }
354 
unset_notified(&mut self)355     fn unset_notified(&mut self) {
356         self.0 &= !NOTIFIED
357     }
358 
is_running(self) -> bool359     pub(super) fn is_running(self) -> bool {
360         self.0 & RUNNING == RUNNING
361     }
362 
set_running(&mut self)363     fn set_running(&mut self) {
364         self.0 |= RUNNING;
365     }
366 
unset_running(&mut self)367     fn unset_running(&mut self) {
368         self.0 &= !RUNNING;
369     }
370 
is_cancelled(self) -> bool371     pub(super) fn is_cancelled(self) -> bool {
372         self.0 & CANCELLED == CANCELLED
373     }
374 
set_cancelled(&mut self)375     fn set_cancelled(&mut self) {
376         self.0 |= CANCELLED;
377     }
378 
set_complete(&mut self)379     fn set_complete(&mut self) {
380         self.0 |= COMPLETE;
381     }
382 
383     /// Returns `true` if the task's future has completed execution.
is_complete(self) -> bool384     pub(super) fn is_complete(self) -> bool {
385         self.0 & COMPLETE == COMPLETE
386     }
387 
is_join_interested(self) -> bool388     pub(super) fn is_join_interested(self) -> bool {
389         self.0 & JOIN_INTEREST == JOIN_INTEREST
390     }
391 
unset_join_interested(&mut self)392     fn unset_join_interested(&mut self) {
393         self.0 &= !JOIN_INTEREST
394     }
395 
has_join_waker(self) -> bool396     pub(super) fn has_join_waker(self) -> bool {
397         self.0 & JOIN_WAKER == JOIN_WAKER
398     }
399 
set_join_waker(&mut self)400     fn set_join_waker(&mut self) {
401         self.0 |= JOIN_WAKER;
402     }
403 
unset_join_waker(&mut self)404     fn unset_join_waker(&mut self) {
405         self.0 &= !JOIN_WAKER
406     }
407 
ref_count(self) -> usize408     pub(super) fn ref_count(self) -> usize {
409         (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
410     }
411 
ref_inc(&mut self)412     fn ref_inc(&mut self) {
413         assert!(self.0 <= isize::max_value() as usize);
414         self.0 += REF_ONE;
415     }
416 
ref_dec(&mut self)417     pub(super) fn ref_dec(&mut self) {
418         assert!(self.ref_count() > 0);
419         self.0 -= REF_ONE
420     }
421 
will_need_queueing(self) -> bool422     fn will_need_queueing(self) -> bool {
423         !self.is_notified() && self.is_idle()
424     }
425 }
426 
427 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result428     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
429         let snapshot = self.load();
430         snapshot.fmt(fmt)
431     }
432 }
433 
434 impl fmt::Debug for Snapshot {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result435     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
436         fmt.debug_struct("Snapshot")
437             .field("is_running", &self.is_running())
438             .field("is_complete", &self.is_complete())
439             .field("is_notified", &self.is_notified())
440             .field("is_cancelled", &self.is_cancelled())
441             .field("is_join_interested", &self.is_join_interested())
442             .field("has_join_waker", &self.has_join_waker())
443             .field("ref_count", &self.ref_count())
444             .finish()
445     }
446 }
447