1 use super::Entry; 2 use Error; 3 4 use std::ptr; 5 use std::sync::atomic::AtomicPtr; 6 use std::sync::atomic::Ordering::SeqCst; 7 use std::sync::Arc; 8 9 /// A stack of `Entry` nodes 10 #[derive(Debug)] 11 pub(crate) struct AtomicStack { 12 /// Stack head 13 head: AtomicPtr<Entry>, 14 } 15 16 /// Entries that were removed from the stack 17 #[derive(Debug)] 18 pub(crate) struct AtomicStackEntries { 19 ptr: *mut Entry, 20 } 21 22 /// Used to indicate that the timer has shutdown. 23 const SHUTDOWN: *mut Entry = 1 as *mut _; 24 25 impl AtomicStack { new() -> AtomicStack26 pub fn new() -> AtomicStack { 27 AtomicStack { 28 head: AtomicPtr::new(ptr::null_mut()), 29 } 30 } 31 32 /// Push an entry onto the stack. 33 /// 34 /// Returns `true` if the entry was pushed, `false` if the entry is already 35 /// on the stack, `Err` if the timer is shutdown. push(&self, entry: &Arc<Entry>) -> Result<bool, Error>36 pub fn push(&self, entry: &Arc<Entry>) -> Result<bool, Error> { 37 // First, set the queued bit on the entry 38 let queued = entry.queued.fetch_or(true, SeqCst).into(); 39 40 if queued { 41 // Already queued, nothing more to do 42 return Ok(false); 43 } 44 45 let ptr = Arc::into_raw(entry.clone()) as *mut _; 46 47 let mut curr = self.head.load(SeqCst); 48 49 loop { 50 if curr == SHUTDOWN { 51 // Don't leak the entry node 52 let _ = unsafe { Arc::from_raw(ptr) }; 53 54 return Err(Error::shutdown()); 55 } 56 57 // Update the `next` pointer. This is safe because setting the queued 58 // bit is a "lock" on this field. 59 unsafe { 60 *(entry.next_atomic.get()) = curr; 61 } 62 63 let actual = self.head.compare_and_swap(curr, ptr, SeqCst); 64 65 if actual == curr { 66 break; 67 } 68 69 curr = actual; 70 } 71 72 Ok(true) 73 } 74 75 /// Take all entries from the stack take(&self) -> AtomicStackEntries76 pub fn take(&self) -> AtomicStackEntries { 77 let ptr = self.head.swap(ptr::null_mut(), SeqCst); 78 AtomicStackEntries { ptr } 79 } 80 81 /// Drain all remaining nodes in the stack and prevent any new nodes from 82 /// being pushed onto the stack. shutdown(&self)83 pub fn shutdown(&self) { 84 // Shutdown the processing queue 85 let ptr = self.head.swap(SHUTDOWN, SeqCst); 86 87 // Let the drop fn of `AtomicStackEntries` handle draining the stack 88 drop(AtomicStackEntries { ptr }); 89 } 90 } 91 92 // ===== impl AtomicStackEntries ===== 93 94 impl Iterator for AtomicStackEntries { 95 type Item = Arc<Entry>; 96 next(&mut self) -> Option<Self::Item>97 fn next(&mut self) -> Option<Self::Item> { 98 if self.ptr.is_null() { 99 return None; 100 } 101 102 // Convert the pointer to an `Arc<Entry>` 103 let entry = unsafe { Arc::from_raw(self.ptr) }; 104 105 // Update `self.ptr` to point to the next element of the stack 106 self.ptr = unsafe { (*entry.next_atomic.get()) }; 107 108 // Unset the queued flag 109 let res = entry.queued.fetch_and(false, SeqCst); 110 debug_assert!(res); 111 112 // Return the entry 113 Some(entry) 114 } 115 } 116 117 impl Drop for AtomicStackEntries { drop(&mut self)118 fn drop(&mut self) { 119 while let Some(entry) = self.next() { 120 // Flag the entry as errored 121 entry.error(); 122 } 123 } 124 } 125