1 use pool::{Backup, BackupId};
2 
3 use std::sync::atomic::AtomicUsize;
4 use std::sync::atomic::Ordering::{AcqRel, Acquire};
5 
6 #[derive(Debug)]
7 pub(crate) struct BackupStack {
8     state: AtomicUsize,
9 }
10 
11 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
12 struct State(usize);
13 
14 pub(crate) const MAX_BACKUP: usize = 1 << 15;
15 
16 /// Extracts the head of the backup stack from the state
17 const STACK_MASK: usize = ((1 << 16) - 1);
18 
19 /// Used to mark the stack as empty
20 pub(crate) const EMPTY: BackupId = BackupId(MAX_BACKUP);
21 
22 /// Used to mark the stack as terminated
23 pub(crate) const TERMINATED: BackupId = BackupId(EMPTY.0 + 1);
24 
25 /// How many bits the Treiber ABA guard is offset by
26 const ABA_GUARD_SHIFT: usize = 16;
27 
28 #[cfg(target_pointer_width = "64")]
29 const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1;
30 
31 #[cfg(target_pointer_width = "32")]
32 const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1;
33 
34 // ===== impl BackupStack =====
35 
36 impl BackupStack {
new() -> BackupStack37     pub fn new() -> BackupStack {
38         let state = AtomicUsize::new(State::new().into());
39         BackupStack { state }
40     }
41 
42     /// Push a backup thread onto the stack
43     ///
44     /// # Return
45     ///
46     /// Returns `Ok` on success.
47     ///
48     /// Returns `Err` if the pool has transitioned to the `TERMINATED` state.
49     /// When terminated, pushing new entries is no longer permitted.
push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()>50     pub fn push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()> {
51         let mut state: State = self.state.load(Acquire).into();
52 
53         entries[id.0].set_pushed(AcqRel);
54 
55         loop {
56             let mut next = state;
57 
58             let head = state.head();
59 
60             if head == TERMINATED {
61                 // The pool is terminated, cannot push the sleeper.
62                 return Err(());
63             }
64 
65             entries[id.0].set_next_sleeper(head);
66             next.set_head(id);
67 
68             let actual = self
69                 .state
70                 .compare_and_swap(state.into(), next.into(), AcqRel)
71                 .into();
72 
73             if state == actual {
74                 return Ok(());
75             }
76 
77             state = actual;
78         }
79     }
80 
81     /// Pop a backup thread off the stack.
82     ///
83     /// If `terminate` is set and the stack is empty when this function is
84     /// called, the state of the stack is transitioned to "terminated". At this
85     /// point, no further entries can be pushed onto the stack.
86     ///
87     /// # Return
88     ///
89     /// * Returns the index of the popped worker and the worker's observed
90     ///   state.
91     ///
92     /// * `Ok(None)` if the stack is empty.
93     /// * `Err(_)` is returned if the pool has been shutdown.
pop(&self, entries: &[Backup], terminate: bool) -> Result<Option<BackupId>, ()>94     pub fn pop(&self, entries: &[Backup], terminate: bool) -> Result<Option<BackupId>, ()> {
95         // Figure out the empty value
96         let terminal = match terminate {
97             true => TERMINATED,
98             false => EMPTY,
99         };
100 
101         let mut state: State = self.state.load(Acquire).into();
102 
103         loop {
104             let head = state.head();
105 
106             if head == EMPTY {
107                 let mut next = state;
108                 next.set_head(terminal);
109 
110                 if next == state {
111                     debug_assert!(terminal == EMPTY);
112                     return Ok(None);
113                 }
114 
115                 let actual = self
116                     .state
117                     .compare_and_swap(state.into(), next.into(), AcqRel)
118                     .into();
119 
120                 if actual != state {
121                     state = actual;
122                     continue;
123                 }
124 
125                 return Ok(None);
126             } else if head == TERMINATED {
127                 return Err(());
128             }
129 
130             debug_assert!(head.0 < MAX_BACKUP);
131 
132             let mut next = state;
133 
134             let next_head = entries[head.0].next_sleeper();
135 
136             // TERMINATED can never be set as the "next pointer" on a worker.
137             debug_assert!(next_head != TERMINATED);
138 
139             if next_head == EMPTY {
140                 next.set_head(terminal);
141             } else {
142                 next.set_head(next_head);
143             }
144 
145             let actual = self
146                 .state
147                 .compare_and_swap(state.into(), next.into(), AcqRel)
148                 .into();
149 
150             if actual == state {
151                 debug_assert!(entries[head.0].is_pushed());
152                 return Ok(Some(head));
153             }
154 
155             state = actual;
156         }
157     }
158 }
159 
160 // ===== impl State =====
161 
162 impl State {
new() -> State163     fn new() -> State {
164         State(EMPTY.0)
165     }
166 
head(&self) -> BackupId167     fn head(&self) -> BackupId {
168         BackupId(self.0 & STACK_MASK)
169     }
170 
set_head(&mut self, val: BackupId)171     fn set_head(&mut self, val: BackupId) {
172         let val = val.0;
173 
174         // The ABA guard protects against the ABA problem w/ Treiber stacks
175         let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK;
176 
177         self.0 = (aba_guard << ABA_GUARD_SHIFT) | val;
178     }
179 }
180 
181 impl From<usize> for State {
from(src: usize) -> Self182     fn from(src: usize) -> Self {
183         State(src)
184     }
185 }
186 
187 impl From<State> for usize {
from(src: State) -> Self188     fn from(src: State) -> Self {
189         src.0
190     }
191 }
192