1 use super::Entry;
2 use Error;
3 
4 use std::ptr;
5 use std::sync::atomic::AtomicPtr;
6 use std::sync::atomic::Ordering::SeqCst;
7 use std::sync::Arc;
8 
9 /// A stack of `Entry` nodes
10 #[derive(Debug)]
11 pub(crate) struct AtomicStack {
12     /// Stack head
13     head: AtomicPtr<Entry>,
14 }
15 
16 /// Entries that were removed from the stack
17 #[derive(Debug)]
18 pub(crate) struct AtomicStackEntries {
19     ptr: *mut Entry,
20 }
21 
22 /// Used to indicate that the timer has shutdown.
23 const SHUTDOWN: *mut Entry = 1 as *mut _;
24 
25 impl AtomicStack {
new() -> AtomicStack26     pub fn new() -> AtomicStack {
27         AtomicStack {
28             head: AtomicPtr::new(ptr::null_mut()),
29         }
30     }
31 
32     /// Push an entry onto the stack.
33     ///
34     /// Returns `true` if the entry was pushed, `false` if the entry is already
35     /// on the stack, `Err` if the timer is shutdown.
push(&self, entry: &Arc<Entry>) -> Result<bool, Error>36     pub fn push(&self, entry: &Arc<Entry>) -> Result<bool, Error> {
37         // First, set the queued bit on the entry
38         let queued = entry.queued.fetch_or(true, SeqCst).into();
39 
40         if queued {
41             // Already queued, nothing more to do
42             return Ok(false);
43         }
44 
45         let ptr = Arc::into_raw(entry.clone()) as *mut _;
46 
47         let mut curr = self.head.load(SeqCst);
48 
49         loop {
50             if curr == SHUTDOWN {
51                 // Don't leak the entry node
52                 let _ = unsafe { Arc::from_raw(ptr) };
53 
54                 return Err(Error::shutdown());
55             }
56 
57             // Update the `next` pointer. This is safe because setting the queued
58             // bit is a "lock" on this field.
59             unsafe {
60                 *(entry.next_atomic.get()) = curr;
61             }
62 
63             let actual = self.head.compare_and_swap(curr, ptr, SeqCst);
64 
65             if actual == curr {
66                 break;
67             }
68 
69             curr = actual;
70         }
71 
72         Ok(true)
73     }
74 
75     /// Take all entries from the stack
take(&self) -> AtomicStackEntries76     pub fn take(&self) -> AtomicStackEntries {
77         let ptr = self.head.swap(ptr::null_mut(), SeqCst);
78         AtomicStackEntries { ptr }
79     }
80 
81     /// Drain all remaining nodes in the stack and prevent any new nodes from
82     /// being pushed onto the stack.
shutdown(&self)83     pub fn shutdown(&self) {
84         // Shutdown the processing queue
85         let ptr = self.head.swap(SHUTDOWN, SeqCst);
86 
87         // Let the drop fn of `AtomicStackEntries` handle draining the stack
88         drop(AtomicStackEntries { ptr });
89     }
90 }
91 
92 // ===== impl AtomicStackEntries =====
93 
94 impl Iterator for AtomicStackEntries {
95     type Item = Arc<Entry>;
96 
next(&mut self) -> Option<Self::Item>97     fn next(&mut self) -> Option<Self::Item> {
98         if self.ptr.is_null() {
99             return None;
100         }
101 
102         // Convert the pointer to an `Arc<Entry>`
103         let entry = unsafe { Arc::from_raw(self.ptr) };
104 
105         // Update `self.ptr` to point to the next element of the stack
106         self.ptr = unsafe { (*entry.next_atomic.get()) };
107 
108         // Unset the queued flag
109         let res = entry.queued.fetch_and(false, SeqCst);
110         debug_assert!(res);
111 
112         // Return the entry
113         Some(entry)
114     }
115 }
116 
117 impl Drop for AtomicStackEntries {
drop(&mut self)118     fn drop(&mut self) {
119         while let Some(entry) = self.next() {
120             // Flag the entry as errored
121             entry.error();
122         }
123     }
124 }
125