1 use pool::{Backup, BackupId}; 2 3 use std::sync::atomic::AtomicUsize; 4 use std::sync::atomic::Ordering::{AcqRel, Acquire}; 5 6 #[derive(Debug)] 7 pub(crate) struct BackupStack { 8 state: AtomicUsize, 9 } 10 11 #[derive(Debug, Eq, PartialEq, Clone, Copy)] 12 struct State(usize); 13 14 pub(crate) const MAX_BACKUP: usize = 1 << 15; 15 16 /// Extracts the head of the backup stack from the state 17 const STACK_MASK: usize = ((1 << 16) - 1); 18 19 /// Used to mark the stack as empty 20 pub(crate) const EMPTY: BackupId = BackupId(MAX_BACKUP); 21 22 /// Used to mark the stack as terminated 23 pub(crate) const TERMINATED: BackupId = BackupId(EMPTY.0 + 1); 24 25 /// How many bits the Treiber ABA guard is offset by 26 const ABA_GUARD_SHIFT: usize = 16; 27 28 #[cfg(target_pointer_width = "64")] 29 const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1; 30 31 #[cfg(target_pointer_width = "32")] 32 const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1; 33 34 // ===== impl BackupStack ===== 35 36 impl BackupStack { new() -> BackupStack37 pub fn new() -> BackupStack { 38 let state = AtomicUsize::new(State::new().into()); 39 BackupStack { state } 40 } 41 42 /// Push a backup thread onto the stack 43 /// 44 /// # Return 45 /// 46 /// Returns `Ok` on success. 47 /// 48 /// Returns `Err` if the pool has transitioned to the `TERMINATED` state. 49 /// When terminated, pushing new entries is no longer permitted. push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()>50 pub fn push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()> { 51 let mut state: State = self.state.load(Acquire).into(); 52 53 entries[id.0].set_pushed(AcqRel); 54 55 loop { 56 let mut next = state; 57 58 let head = state.head(); 59 60 if head == TERMINATED { 61 // The pool is terminated, cannot push the sleeper. 62 return Err(()); 63 } 64 65 entries[id.0].set_next_sleeper(head); 66 next.set_head(id); 67 68 let actual = self 69 .state 70 .compare_and_swap(state.into(), next.into(), AcqRel) 71 .into(); 72 73 if state == actual { 74 return Ok(()); 75 } 76 77 state = actual; 78 } 79 } 80 81 /// Pop a backup thread off the stack. 82 /// 83 /// If `terminate` is set and the stack is empty when this function is 84 /// called, the state of the stack is transitioned to "terminated". At this 85 /// point, no further entries can be pushed onto the stack. 86 /// 87 /// # Return 88 /// 89 /// * Returns the index of the popped worker and the worker's observed 90 /// state. 91 /// 92 /// * `Ok(None)` if the stack is empty. 93 /// * `Err(_)` is returned if the pool has been shutdown. pop(&self, entries: &[Backup], terminate: bool) -> Result<Option<BackupId>, ()>94 pub fn pop(&self, entries: &[Backup], terminate: bool) -> Result<Option<BackupId>, ()> { 95 // Figure out the empty value 96 let terminal = match terminate { 97 true => TERMINATED, 98 false => EMPTY, 99 }; 100 101 let mut state: State = self.state.load(Acquire).into(); 102 103 loop { 104 let head = state.head(); 105 106 if head == EMPTY { 107 let mut next = state; 108 next.set_head(terminal); 109 110 if next == state { 111 debug_assert!(terminal == EMPTY); 112 return Ok(None); 113 } 114 115 let actual = self 116 .state 117 .compare_and_swap(state.into(), next.into(), AcqRel) 118 .into(); 119 120 if actual != state { 121 state = actual; 122 continue; 123 } 124 125 return Ok(None); 126 } else if head == TERMINATED { 127 return Err(()); 128 } 129 130 debug_assert!(head.0 < MAX_BACKUP); 131 132 let mut next = state; 133 134 let next_head = entries[head.0].next_sleeper(); 135 136 // TERMINATED can never be set as the "next pointer" on a worker. 137 debug_assert!(next_head != TERMINATED); 138 139 if next_head == EMPTY { 140 next.set_head(terminal); 141 } else { 142 next.set_head(next_head); 143 } 144 145 let actual = self 146 .state 147 .compare_and_swap(state.into(), next.into(), AcqRel) 148 .into(); 149 150 if actual == state { 151 debug_assert!(entries[head.0].is_pushed()); 152 return Ok(Some(head)); 153 } 154 155 state = actual; 156 } 157 } 158 } 159 160 // ===== impl State ===== 161 162 impl State { new() -> State163 fn new() -> State { 164 State(EMPTY.0) 165 } 166 head(&self) -> BackupId167 fn head(&self) -> BackupId { 168 BackupId(self.0 & STACK_MASK) 169 } 170 set_head(&mut self, val: BackupId)171 fn set_head(&mut self, val: BackupId) { 172 let val = val.0; 173 174 // The ABA guard protects against the ABA problem w/ Treiber stacks 175 let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK; 176 177 self.0 = (aba_guard << ABA_GUARD_SHIFT) | val; 178 } 179 } 180 181 impl From<usize> for State { from(src: usize) -> Self182 fn from(src: usize) -> Self { 183 State(src) 184 } 185 } 186 187 impl From<State> for usize { from(src: State) -> Self188 fn from(src: State) -> Self { 189 src.0 190 } 191 } 192