1 use std::{task::Context, time::Duration};
2 
3 #[cfg(not(loom))]
4 use futures::task::noop_waker_ref;
5 
6 use crate::loom::sync::Arc;
7 use crate::loom::thread;
8 use crate::{
9     loom::sync::atomic::{AtomicBool, Ordering},
10     park::Unpark,
11 };
12 
13 use super::{Handle, TimerEntry};
14 
15 struct MockUnpark {}
16 impl Unpark for MockUnpark {
unpark(&self)17     fn unpark(&self) {}
18 }
19 impl MockUnpark {
mock() -> Box<dyn Unpark>20     fn mock() -> Box<dyn Unpark> {
21         Box::new(Self {})
22     }
23 }
24 
block_on<T>(f: impl std::future::Future<Output = T>) -> T25 fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
26     #[cfg(loom)]
27     return loom::future::block_on(f);
28 
29     #[cfg(not(loom))]
30     return futures::executor::block_on(f);
31 }
32 
model(f: impl Fn() + Send + Sync + 'static)33 fn model(f: impl Fn() + Send + Sync + 'static) {
34     #[cfg(loom)]
35     loom::model(f);
36 
37     #[cfg(not(loom))]
38     f();
39 }
40 
41 #[test]
single_timer()42 fn single_timer() {
43     model(|| {
44         let clock = crate::time::clock::Clock::new(true, false);
45         let time_source = super::ClockTime::new(clock.clone());
46 
47         let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
48         let handle = Handle::new(Arc::new(inner));
49 
50         let handle_ = handle.clone();
51         let jh = thread::spawn(move || {
52             let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
53             pin!(entry);
54 
55             block_on(futures::future::poll_fn(|cx| {
56                 entry.as_mut().poll_elapsed(cx)
57             }))
58             .unwrap();
59         });
60 
61         thread::yield_now();
62 
63         // This may or may not return Some (depending on how it races with the
64         // thread). If it does return None, however, the timer should complete
65         // synchronously.
66         handle.process_at_time(time_source.now() + 2_000_000_000);
67 
68         jh.join().unwrap();
69     })
70 }
71 
72 #[test]
drop_timer()73 fn drop_timer() {
74     model(|| {
75         let clock = crate::time::clock::Clock::new(true, false);
76         let time_source = super::ClockTime::new(clock.clone());
77 
78         let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
79         let handle = Handle::new(Arc::new(inner));
80 
81         let handle_ = handle.clone();
82         let jh = thread::spawn(move || {
83             let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
84             pin!(entry);
85 
86             let _ = entry
87                 .as_mut()
88                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
89             let _ = entry
90                 .as_mut()
91                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
92         });
93 
94         thread::yield_now();
95 
96         // advance 2s in the future.
97         handle.process_at_time(time_source.now() + 2_000_000_000);
98 
99         jh.join().unwrap();
100     })
101 }
102 
103 #[test]
change_waker()104 fn change_waker() {
105     model(|| {
106         let clock = crate::time::clock::Clock::new(true, false);
107         let time_source = super::ClockTime::new(clock.clone());
108 
109         let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
110         let handle = Handle::new(Arc::new(inner));
111 
112         let handle_ = handle.clone();
113         let jh = thread::spawn(move || {
114             let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1));
115             pin!(entry);
116 
117             let _ = entry
118                 .as_mut()
119                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
120 
121             block_on(futures::future::poll_fn(|cx| {
122                 entry.as_mut().poll_elapsed(cx)
123             }))
124             .unwrap();
125         });
126 
127         thread::yield_now();
128 
129         // advance 2s
130         handle.process_at_time(time_source.now() + 2_000_000_000);
131 
132         jh.join().unwrap();
133     })
134 }
135 
136 #[test]
reset_future()137 fn reset_future() {
138     model(|| {
139         let finished_early = Arc::new(AtomicBool::new(false));
140 
141         let clock = crate::time::clock::Clock::new(true, false);
142         let time_source = super::ClockTime::new(clock.clone());
143 
144         let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
145         let handle = Handle::new(Arc::new(inner));
146 
147         let handle_ = handle.clone();
148         let finished_early_ = finished_early.clone();
149         let start = clock.now();
150 
151         let jh = thread::spawn(move || {
152             let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1));
153             pin!(entry);
154 
155             let _ = entry
156                 .as_mut()
157                 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
158 
159             entry.as_mut().reset(start + Duration::from_secs(2));
160 
161             // shouldn't complete before 2s
162             block_on(futures::future::poll_fn(|cx| {
163                 entry.as_mut().poll_elapsed(cx)
164             }))
165             .unwrap();
166 
167             finished_early_.store(true, Ordering::Relaxed);
168         });
169 
170         thread::yield_now();
171 
172         // This may or may not return a wakeup time.
173         handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500)));
174 
175         assert!(!finished_early.load(Ordering::Relaxed));
176 
177         handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500)));
178 
179         jh.join().unwrap();
180 
181         assert!(finished_early.load(Ordering::Relaxed));
182     })
183 }
184 
185 #[test]
186 #[cfg(not(loom))]
poll_process_levels()187 fn poll_process_levels() {
188     let clock = crate::time::clock::Clock::new(true, false);
189     clock.pause();
190 
191     let time_source = super::ClockTime::new(clock.clone());
192 
193     let inner = super::Inner::new(time_source, MockUnpark::mock());
194     let handle = Handle::new(Arc::new(inner));
195 
196     let mut entries = vec![];
197 
198     for i in 0..1024 {
199         let mut entry = Box::pin(TimerEntry::new(
200             &handle,
201             clock.now() + Duration::from_millis(i),
202         ));
203 
204         let _ = entry
205             .as_mut()
206             .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
207 
208         entries.push(entry);
209     }
210 
211     for t in 1..1024 {
212         handle.process_at_time(t as u64);
213         for (deadline, future) in entries.iter_mut().enumerate() {
214             let mut context = Context::from_waker(noop_waker_ref());
215             if deadline <= t {
216                 assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
217             } else {
218                 assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
219             }
220         }
221     }
222 }
223 
224 #[test]
225 #[cfg(not(loom))]
poll_process_levels_targeted()226 fn poll_process_levels_targeted() {
227     let mut context = Context::from_waker(noop_waker_ref());
228 
229     let clock = crate::time::clock::Clock::new(true, false);
230     clock.pause();
231 
232     let time_source = super::ClockTime::new(clock.clone());
233 
234     let inner = super::Inner::new(time_source, MockUnpark::mock());
235     let handle = Handle::new(Arc::new(inner));
236 
237     let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193));
238     pin!(e1);
239 
240     handle.process_at_time(62);
241     assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
242     handle.process_at_time(192);
243     handle.process_at_time(192);
244 }
245 
246 /*
247 #[test]
248 fn balanced_incr_and_decr() {
249     const OPS: usize = 5;
250 
251     fn incr(inner: Arc<Inner>) {
252         for _ in 0..OPS {
253             inner.increment().expect("increment should not have failed");
254             thread::yield_now();
255         }
256     }
257 
258     fn decr(inner: Arc<Inner>) {
259         let mut ops_performed = 0;
260         while ops_performed < OPS {
261             if inner.num(Ordering::Relaxed) > 0 {
262                 ops_performed += 1;
263                 inner.decrement();
264             }
265             thread::yield_now();
266         }
267     }
268 
269     loom::model(|| {
270         let unpark = Box::new(MockUnpark);
271         let instant = Instant::now();
272 
273         let inner = Arc::new(Inner::new(instant, unpark));
274 
275         let incr_inner = inner.clone();
276         let decr_inner = inner.clone();
277 
278         let incr_hndle = thread::spawn(move || incr(incr_inner));
279         let decr_hndle = thread::spawn(move || decr(decr_inner));
280 
281         incr_hndle.join().expect("should never fail");
282         decr_hndle.join().expect("should never fail");
283 
284         assert_eq!(inner.num(Ordering::SeqCst), 0);
285     })
286 }
287 */
288