1 use atomic::AtomicU64;
2 use timer::{HandlePriv, Inner};
3 use Error;
4
5 use crossbeam_utils::CachePadded;
6 use futures::task::AtomicTask;
7 use futures::Poll;
8
9 use std::cell::UnsafeCell;
10 use std::ptr;
11 use std::sync::atomic::AtomicBool;
12 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
13 use std::sync::{Arc, Weak};
14 use std::time::{Duration, Instant};
15 use std::u64;
16
17 /// Internal state shared between a `Delay` instance and the timer.
18 ///
19 /// This struct is used as a node in two intrusive data structures:
20 ///
21 /// * An atomic stack used to signal to the timer thread that the entry state
22 /// has changed. The timer thread will observe the entry on this stack and
23 /// perform any actions as necessary.
24 ///
25 /// * A doubly linked list used **only** by the timer thread. Each slot in the
26 /// timer wheel is a head pointer to the list of entries that must be
27 /// processed during that timer tick.
28 #[derive(Debug)]
29 pub(crate) struct Entry {
30 /// Only accessed from `Registration`.
31 time: CachePadded<UnsafeCell<Time>>,
32
33 /// Timer internals. Using a weak pointer allows the timer to shutdown
34 /// without all `Delay` instances having completed.
35 ///
36 /// When `None`, the entry has not yet been linked with a timer instance.
37 inner: Option<Weak<Inner>>,
38
39 /// Tracks the entry state. This value contains the following information:
40 ///
41 /// * The deadline at which the entry must be "fired".
42 /// * A flag indicating if the entry has already been fired.
43 /// * Whether or not the entry transitioned to the error state.
44 ///
45 /// When an `Entry` is created, `state` is initialized to the instant at
46 /// which the entry must be fired. When a timer is reset to a different
47 /// instant, this value is changed.
48 state: AtomicU64,
49
50 /// Task to notify once the deadline is reached.
51 task: AtomicTask,
52
53 /// True when the entry is queued in the "process" stack. This value
54 /// is set before pushing the value and unset after popping the value.
55 ///
56 /// TODO: This could possibly be rolled up into `state`.
57 pub(super) queued: AtomicBool,
58
59 /// Next entry in the "process" linked list.
60 ///
61 /// Access to this field is coordinated by the `queued` flag.
62 ///
63 /// Represents a strong Arc ref.
64 pub(super) next_atomic: UnsafeCell<*mut Entry>,
65
66 /// When the entry expires, relative to the `start` of the timer
67 /// (Inner::start). This is only used by the timer.
68 ///
69 /// A `Delay` instance can be reset to a different deadline by the thread
70 /// that owns the `Delay` instance. In this case, the timer thread will not
71 /// immediately know that this has happened. The timer thread must know the
72 /// last deadline that it saw as it uses this value to locate the entry in
73 /// its wheel.
74 ///
75 /// Once the timer thread observes that the instant has changed, it updates
76 /// the wheel and sets this value. The idea is that this value eventually
77 /// converges to the value of `state` as the timer thread makes updates.
78 when: UnsafeCell<Option<u64>>,
79
80 /// Next entry in the State's linked list.
81 ///
82 /// This is only accessed by the timer
83 pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
84
85 /// Previous entry in the State's linked list.
86 ///
87 /// This is only accessed by the timer and is used to unlink a canceled
88 /// entry.
89 ///
90 /// This is a weak reference.
91 pub(super) prev_stack: UnsafeCell<*const Entry>,
92 }
93
94 /// Stores the info for `Delay`.
95 #[derive(Debug)]
96 pub(crate) struct Time {
97 pub(crate) deadline: Instant,
98 pub(crate) duration: Duration,
99 }
100
101 /// Flag indicating a timer entry has elapsed
102 const ELAPSED: u64 = 1 << 63;
103
104 /// Flag indicating a timer entry has reached an error state
105 const ERROR: u64 = u64::MAX;
106
107 // ===== impl Entry =====
108
109 impl Entry {
new(deadline: Instant, duration: Duration) -> Entry110 pub fn new(deadline: Instant, duration: Duration) -> Entry {
111 Entry {
112 time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })),
113 inner: None,
114 task: AtomicTask::new(),
115 state: AtomicU64::new(0),
116 queued: AtomicBool::new(false),
117 next_atomic: UnsafeCell::new(ptr::null_mut()),
118 when: UnsafeCell::new(None),
119 next_stack: UnsafeCell::new(None),
120 prev_stack: UnsafeCell::new(ptr::null_mut()),
121 }
122 }
123
124 /// Only called by `Registration`
time_ref(&self) -> &Time125 pub fn time_ref(&self) -> &Time {
126 unsafe { &*self.time.get() }
127 }
128
129 /// Only called by `Registration`
time_mut(&self) -> &mut Time130 pub fn time_mut(&self) -> &mut Time {
131 unsafe { &mut *self.time.get() }
132 }
133
134 /// Returns `true` if the `Entry` is currently associated with a timer
135 /// instance.
is_registered(&self) -> bool136 pub fn is_registered(&self) -> bool {
137 self.inner.is_some()
138 }
139
140 /// Only called by `Registration`
register(me: &mut Arc<Self>)141 pub fn register(me: &mut Arc<Self>) {
142 let handle = match HandlePriv::try_current() {
143 Ok(handle) => handle,
144 Err(_) => {
145 // Could not associate the entry with a timer, transition the
146 // state to error
147 Arc::get_mut(me).unwrap().transition_to_error();
148
149 return;
150 }
151 };
152
153 Entry::register_with(me, handle)
154 }
155
156 /// Only called by `Registration`
register_with(me: &mut Arc<Self>, handle: HandlePriv)157 pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
158 assert!(!me.is_registered(), "only register an entry once");
159
160 let deadline = me.time_ref().deadline;
161
162 let inner = match handle.inner() {
163 Some(inner) => inner,
164 None => {
165 // Could not associate the entry with a timer, transition the
166 // state to error
167 Arc::get_mut(me).unwrap().transition_to_error();
168
169 return;
170 }
171 };
172
173 // Increment the number of active timeouts
174 if inner.increment().is_err() {
175 Arc::get_mut(me).unwrap().transition_to_error();
176
177 return;
178 }
179
180 // Associate the entry with the timer
181 Arc::get_mut(me).unwrap().inner = Some(handle.into_inner());
182
183 let when = inner.normalize_deadline(deadline);
184
185 // Relaxed OK: At this point, there are no other threads that have
186 // access to this entry.
187 if when <= inner.elapsed() {
188 me.state.store(ELAPSED, Relaxed);
189 return;
190 } else {
191 me.state.store(when, Relaxed);
192 }
193
194 if inner.queue(me).is_err() {
195 // The timer has shutdown, transition the entry to the error state.
196 me.error();
197 }
198 }
199
transition_to_error(&mut self)200 fn transition_to_error(&mut self) {
201 self.inner = Some(Weak::new());
202 self.state = AtomicU64::new(ERROR);
203 }
204
205 /// The current entry state as known by the timer. This is not the value of
206 /// `state`, but lets the timer know how to converge its state to `state`.
when_internal(&self) -> Option<u64>207 pub fn when_internal(&self) -> Option<u64> {
208 unsafe { (*self.when.get()) }
209 }
210
set_when_internal(&self, when: Option<u64>)211 pub fn set_when_internal(&self, when: Option<u64>) {
212 unsafe {
213 (*self.when.get()) = when;
214 }
215 }
216
217 /// Called by `Timer` to load the current value of `state` for processing
load_state(&self) -> Option<u64>218 pub fn load_state(&self) -> Option<u64> {
219 let state = self.state.load(SeqCst);
220
221 if is_elapsed(state) {
222 None
223 } else {
224 Some(state)
225 }
226 }
227
is_elapsed(&self) -> bool228 pub fn is_elapsed(&self) -> bool {
229 let state = self.state.load(SeqCst);
230 is_elapsed(state)
231 }
232
fire(&self, when: u64)233 pub fn fire(&self, when: u64) {
234 let mut curr = self.state.load(SeqCst);
235
236 loop {
237 if is_elapsed(curr) || curr > when {
238 return;
239 }
240
241 let next = ELAPSED | curr;
242 let actual = self.state.compare_and_swap(curr, next, SeqCst);
243
244 if curr == actual {
245 break;
246 }
247
248 curr = actual;
249 }
250
251 self.task.notify();
252 }
253
error(&self)254 pub fn error(&self) {
255 // Only transition to the error state if not currently elapsed
256 let mut curr = self.state.load(SeqCst);
257
258 loop {
259 if is_elapsed(curr) {
260 return;
261 }
262
263 let next = ERROR;
264
265 let actual = self.state.compare_and_swap(curr, next, SeqCst);
266
267 if curr == actual {
268 break;
269 }
270
271 curr = actual;
272 }
273
274 self.task.notify();
275 }
276
cancel(entry: &Arc<Entry>)277 pub fn cancel(entry: &Arc<Entry>) {
278 let state = entry.state.fetch_or(ELAPSED, SeqCst);
279
280 if is_elapsed(state) {
281 // Nothing more to do
282 return;
283 }
284
285 // If registered with a timer instance, try to upgrade the Arc.
286 let inner = match entry.upgrade_inner() {
287 Some(inner) => inner,
288 None => return,
289 };
290
291 let _ = inner.queue(entry);
292 }
293
poll_elapsed(&self) -> Poll<(), Error>294 pub fn poll_elapsed(&self) -> Poll<(), Error> {
295 use futures::Async::NotReady;
296
297 let mut curr = self.state.load(SeqCst);
298
299 if is_elapsed(curr) {
300 if curr == ERROR {
301 return Err(Error::shutdown());
302 } else {
303 return Ok(().into());
304 }
305 }
306
307 self.task.register();
308
309 curr = self.state.load(SeqCst).into();
310
311 if is_elapsed(curr) {
312 if curr == ERROR {
313 return Err(Error::shutdown());
314 } else {
315 return Ok(().into());
316 }
317 }
318
319 Ok(NotReady)
320 }
321
322 /// Only called by `Registration`
reset(entry: &mut Arc<Entry>)323 pub fn reset(entry: &mut Arc<Entry>) {
324 if !entry.is_registered() {
325 return;
326 }
327
328 let inner = match entry.upgrade_inner() {
329 Some(inner) => inner,
330 None => return,
331 };
332
333 let deadline = entry.time_ref().deadline;
334 let when = inner.normalize_deadline(deadline);
335 let elapsed = inner.elapsed();
336
337 let mut curr = entry.state.load(SeqCst);
338 let mut notify;
339
340 loop {
341 // In these two cases, there is no work to do when resetting the
342 // timer. If the `Entry` is in an error state, then it cannot be
343 // used anymore. If resetting the entry to the current value, then
344 // the reset is a noop.
345 if curr == ERROR || curr == when {
346 return;
347 }
348
349 let next;
350
351 if when <= elapsed {
352 next = ELAPSED;
353 notify = !is_elapsed(curr);
354 } else {
355 next = when;
356 notify = true;
357 }
358
359 let actual = entry.state.compare_and_swap(curr, next, SeqCst);
360
361 if curr == actual {
362 break;
363 }
364
365 curr = actual;
366 }
367
368 if notify {
369 let _ = inner.queue(entry);
370 }
371 }
372
upgrade_inner(&self) -> Option<Arc<Inner>>373 fn upgrade_inner(&self) -> Option<Arc<Inner>> {
374 self.inner.as_ref().and_then(|inner| inner.upgrade())
375 }
376 }
377
is_elapsed(state: u64) -> bool378 fn is_elapsed(state: u64) -> bool {
379 state & ELAPSED == ELAPSED
380 }
381
382 impl Drop for Entry {
drop(&mut self)383 fn drop(&mut self) {
384 let inner = match self.upgrade_inner() {
385 Some(inner) => inner,
386 None => return,
387 };
388
389 inner.decrement();
390 }
391 }
392
393 unsafe impl Send for Entry {}
394 unsafe impl Sync for Entry {}
395