1 use atomic::AtomicU64;
2 use timer::{HandlePriv, Inner};
3 use Error;
4 
5 use crossbeam_utils::CachePadded;
6 use futures::task::AtomicTask;
7 use futures::Poll;
8 
9 use std::cell::UnsafeCell;
10 use std::ptr;
11 use std::sync::atomic::AtomicBool;
12 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
13 use std::sync::{Arc, Weak};
14 use std::time::{Duration, Instant};
15 use std::u64;
16 
17 /// Internal state shared between a `Delay` instance and the timer.
18 ///
19 /// This struct is used as a node in two intrusive data structures:
20 ///
21 /// * An atomic stack used to signal to the timer thread that the entry state
22 ///   has changed. The timer thread will observe the entry on this stack and
23 ///   perform any actions as necessary.
24 ///
25 /// * A doubly linked list used **only** by the timer thread. Each slot in the
26 ///   timer wheel is a head pointer to the list of entries that must be
27 ///   processed during that timer tick.
28 #[derive(Debug)]
29 pub(crate) struct Entry {
30     /// Only accessed from `Registration`.
31     time: CachePadded<UnsafeCell<Time>>,
32 
33     /// Timer internals. Using a weak pointer allows the timer to shutdown
34     /// without all `Delay` instances having completed.
35     ///
36     /// When `None`, the entry has not yet been linked with a timer instance.
37     inner: Option<Weak<Inner>>,
38 
39     /// Tracks the entry state. This value contains the following information:
40     ///
41     /// * The deadline at which the entry must be "fired".
42     /// * A flag indicating if the entry has already been fired.
43     /// * Whether or not the entry transitioned to the error state.
44     ///
45     /// When an `Entry` is created, `state` is initialized to the instant at
46     /// which the entry must be fired. When a timer is reset to a different
47     /// instant, this value is changed.
48     state: AtomicU64,
49 
50     /// Task to notify once the deadline is reached.
51     task: AtomicTask,
52 
53     /// True when the entry is queued in the "process" stack. This value
54     /// is set before pushing the value and unset after popping the value.
55     ///
56     /// TODO: This could possibly be rolled up into `state`.
57     pub(super) queued: AtomicBool,
58 
59     /// Next entry in the "process" linked list.
60     ///
61     /// Access to this field is coordinated by the `queued` flag.
62     ///
63     /// Represents a strong Arc ref.
64     pub(super) next_atomic: UnsafeCell<*mut Entry>,
65 
66     /// When the entry expires, relative to the `start` of the timer
67     /// (Inner::start). This is only used by the timer.
68     ///
69     /// A `Delay` instance can be reset to a different deadline by the thread
70     /// that owns the `Delay` instance. In this case, the timer thread will not
71     /// immediately know that this has happened. The timer thread must know the
72     /// last deadline that it saw as it uses this value to locate the entry in
73     /// its wheel.
74     ///
75     /// Once the timer thread observes that the instant has changed, it updates
76     /// the wheel and sets this value. The idea is that this value eventually
77     /// converges to the value of `state` as the timer thread makes updates.
78     when: UnsafeCell<Option<u64>>,
79 
80     /// Next entry in the State's linked list.
81     ///
82     /// This is only accessed by the timer
83     pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
84 
85     /// Previous entry in the State's linked list.
86     ///
87     /// This is only accessed by the timer and is used to unlink a canceled
88     /// entry.
89     ///
90     /// This is a weak reference.
91     pub(super) prev_stack: UnsafeCell<*const Entry>,
92 }
93 
94 /// Stores the info for `Delay`.
95 #[derive(Debug)]
96 pub(crate) struct Time {
97     pub(crate) deadline: Instant,
98     pub(crate) duration: Duration,
99 }
100 
101 /// Flag indicating a timer entry has elapsed
102 const ELAPSED: u64 = 1 << 63;
103 
104 /// Flag indicating a timer entry has reached an error state
105 const ERROR: u64 = u64::MAX;
106 
107 // ===== impl Entry =====
108 
109 impl Entry {
new(deadline: Instant, duration: Duration) -> Entry110     pub fn new(deadline: Instant, duration: Duration) -> Entry {
111         Entry {
112             time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })),
113             inner: None,
114             task: AtomicTask::new(),
115             state: AtomicU64::new(0),
116             queued: AtomicBool::new(false),
117             next_atomic: UnsafeCell::new(ptr::null_mut()),
118             when: UnsafeCell::new(None),
119             next_stack: UnsafeCell::new(None),
120             prev_stack: UnsafeCell::new(ptr::null_mut()),
121         }
122     }
123 
124     /// Only called by `Registration`
time_ref(&self) -> &Time125     pub fn time_ref(&self) -> &Time {
126         unsafe { &*self.time.get() }
127     }
128 
129     /// Only called by `Registration`
time_mut(&self) -> &mut Time130     pub fn time_mut(&self) -> &mut Time {
131         unsafe { &mut *self.time.get() }
132     }
133 
134     /// Returns `true` if the `Entry` is currently associated with a timer
135     /// instance.
is_registered(&self) -> bool136     pub fn is_registered(&self) -> bool {
137         self.inner.is_some()
138     }
139 
140     /// Only called by `Registration`
register(me: &mut Arc<Self>)141     pub fn register(me: &mut Arc<Self>) {
142         let handle = match HandlePriv::try_current() {
143             Ok(handle) => handle,
144             Err(_) => {
145                 // Could not associate the entry with a timer, transition the
146                 // state to error
147                 Arc::get_mut(me).unwrap().transition_to_error();
148 
149                 return;
150             }
151         };
152 
153         Entry::register_with(me, handle)
154     }
155 
156     /// Only called by `Registration`
register_with(me: &mut Arc<Self>, handle: HandlePriv)157     pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
158         assert!(!me.is_registered(), "only register an entry once");
159 
160         let deadline = me.time_ref().deadline;
161 
162         let inner = match handle.inner() {
163             Some(inner) => inner,
164             None => {
165                 // Could not associate the entry with a timer, transition the
166                 // state to error
167                 Arc::get_mut(me).unwrap().transition_to_error();
168 
169                 return;
170             }
171         };
172 
173         // Increment the number of active timeouts
174         if inner.increment().is_err() {
175             Arc::get_mut(me).unwrap().transition_to_error();
176 
177             return;
178         }
179 
180         // Associate the entry with the timer
181         Arc::get_mut(me).unwrap().inner = Some(handle.into_inner());
182 
183         let when = inner.normalize_deadline(deadline);
184 
185         // Relaxed OK: At this point, there are no other threads that have
186         // access to this entry.
187         if when <= inner.elapsed() {
188             me.state.store(ELAPSED, Relaxed);
189             return;
190         } else {
191             me.state.store(when, Relaxed);
192         }
193 
194         if inner.queue(me).is_err() {
195             // The timer has shutdown, transition the entry to the error state.
196             me.error();
197         }
198     }
199 
transition_to_error(&mut self)200     fn transition_to_error(&mut self) {
201         self.inner = Some(Weak::new());
202         self.state = AtomicU64::new(ERROR);
203     }
204 
205     /// The current entry state as known by the timer. This is not the value of
206     /// `state`, but lets the timer know how to converge its state to `state`.
when_internal(&self) -> Option<u64>207     pub fn when_internal(&self) -> Option<u64> {
208         unsafe { (*self.when.get()) }
209     }
210 
set_when_internal(&self, when: Option<u64>)211     pub fn set_when_internal(&self, when: Option<u64>) {
212         unsafe {
213             (*self.when.get()) = when;
214         }
215     }
216 
217     /// Called by `Timer` to load the current value of `state` for processing
load_state(&self) -> Option<u64>218     pub fn load_state(&self) -> Option<u64> {
219         let state = self.state.load(SeqCst);
220 
221         if is_elapsed(state) {
222             None
223         } else {
224             Some(state)
225         }
226     }
227 
is_elapsed(&self) -> bool228     pub fn is_elapsed(&self) -> bool {
229         let state = self.state.load(SeqCst);
230         is_elapsed(state)
231     }
232 
fire(&self, when: u64)233     pub fn fire(&self, when: u64) {
234         let mut curr = self.state.load(SeqCst);
235 
236         loop {
237             if is_elapsed(curr) || curr > when {
238                 return;
239             }
240 
241             let next = ELAPSED | curr;
242             let actual = self.state.compare_and_swap(curr, next, SeqCst);
243 
244             if curr == actual {
245                 break;
246             }
247 
248             curr = actual;
249         }
250 
251         self.task.notify();
252     }
253 
error(&self)254     pub fn error(&self) {
255         // Only transition to the error state if not currently elapsed
256         let mut curr = self.state.load(SeqCst);
257 
258         loop {
259             if is_elapsed(curr) {
260                 return;
261             }
262 
263             let next = ERROR;
264 
265             let actual = self.state.compare_and_swap(curr, next, SeqCst);
266 
267             if curr == actual {
268                 break;
269             }
270 
271             curr = actual;
272         }
273 
274         self.task.notify();
275     }
276 
cancel(entry: &Arc<Entry>)277     pub fn cancel(entry: &Arc<Entry>) {
278         let state = entry.state.fetch_or(ELAPSED, SeqCst);
279 
280         if is_elapsed(state) {
281             // Nothing more to do
282             return;
283         }
284 
285         // If registered with a timer instance, try to upgrade the Arc.
286         let inner = match entry.upgrade_inner() {
287             Some(inner) => inner,
288             None => return,
289         };
290 
291         let _ = inner.queue(entry);
292     }
293 
poll_elapsed(&self) -> Poll<(), Error>294     pub fn poll_elapsed(&self) -> Poll<(), Error> {
295         use futures::Async::NotReady;
296 
297         let mut curr = self.state.load(SeqCst);
298 
299         if is_elapsed(curr) {
300             if curr == ERROR {
301                 return Err(Error::shutdown());
302             } else {
303                 return Ok(().into());
304             }
305         }
306 
307         self.task.register();
308 
309         curr = self.state.load(SeqCst).into();
310 
311         if is_elapsed(curr) {
312             if curr == ERROR {
313                 return Err(Error::shutdown());
314             } else {
315                 return Ok(().into());
316             }
317         }
318 
319         Ok(NotReady)
320     }
321 
322     /// Only called by `Registration`
reset(entry: &mut Arc<Entry>)323     pub fn reset(entry: &mut Arc<Entry>) {
324         if !entry.is_registered() {
325             return;
326         }
327 
328         let inner = match entry.upgrade_inner() {
329             Some(inner) => inner,
330             None => return,
331         };
332 
333         let deadline = entry.time_ref().deadline;
334         let when = inner.normalize_deadline(deadline);
335         let elapsed = inner.elapsed();
336 
337         let mut curr = entry.state.load(SeqCst);
338         let mut notify;
339 
340         loop {
341             // In these two cases, there is no work to do when resetting the
342             // timer. If the `Entry` is in an error state, then it cannot be
343             // used anymore. If resetting the entry to the current value, then
344             // the reset is a noop.
345             if curr == ERROR || curr == when {
346                 return;
347             }
348 
349             let next;
350 
351             if when <= elapsed {
352                 next = ELAPSED;
353                 notify = !is_elapsed(curr);
354             } else {
355                 next = when;
356                 notify = true;
357             }
358 
359             let actual = entry.state.compare_and_swap(curr, next, SeqCst);
360 
361             if curr == actual {
362                 break;
363             }
364 
365             curr = actual;
366         }
367 
368         if notify {
369             let _ = inner.queue(entry);
370         }
371     }
372 
upgrade_inner(&self) -> Option<Arc<Inner>>373     fn upgrade_inner(&self) -> Option<Arc<Inner>> {
374         self.inner.as_ref().and_then(|inner| inner.upgrade())
375     }
376 }
377 
is_elapsed(state: u64) -> bool378 fn is_elapsed(state: u64) -> bool {
379     state & ELAPSED == ELAPSED
380 }
381 
382 impl Drop for Entry {
drop(&mut self)383     fn drop(&mut self) {
384         let inner = match self.upgrade_inner() {
385             Some(inner) => inner,
386             None => return,
387         };
388 
389         inner.decrement();
390     }
391 }
392 
393 unsafe impl Send for Entry {}
394 unsafe impl Sync for Entry {}
395