1 use std::cell::Cell;
2 use std::future::Future;
3 use std::panic::{catch_unwind, AssertUnwindSafe};
4 use std::pin::Pin;
5 use std::sync::atomic::{AtomicUsize, Ordering};
6 use std::task::{Context, Poll};
7 use std::thread;
8 use std::time::Duration;
9 
10 use async_task::Runnable;
11 use atomic_waker::AtomicWaker;
12 use easy_parallel::Parallel;
13 use smol::future;
14 
15 // Creates a future with event counters.
16 //
17 // Usage: `future!(f, get_waker, POLL, DROP)`
18 //
19 // The future `f` always sleeps for 200 ms, and panics the second time it is polled.
20 // When it gets polled, `POLL` is incremented.
21 // When it gets dropped, `DROP` is incremented.
22 //
23 // Every time the future is run, it stores the waker into a global variable.
24 // This waker can be extracted using the `get_waker()` function.
25 macro_rules! future {
26     ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
27         static $poll: AtomicUsize = AtomicUsize::new(0);
28         static $drop: AtomicUsize = AtomicUsize::new(0);
29         static WAKER: AtomicWaker = AtomicWaker::new();
30 
31         let ($name, $get_waker) = {
32             struct Fut(Cell<bool>, Box<i32>);
33 
34             impl Future for Fut {
35                 type Output = ();
36 
37                 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38                     WAKER.register(cx.waker());
39                     $poll.fetch_add(1, Ordering::SeqCst);
40                     thread::sleep(ms(400));
41 
42                     if self.0.get() {
43                         panic!()
44                     } else {
45                         self.0.set(true);
46                         Poll::Pending
47                     }
48                 }
49             }
50 
51             impl Drop for Fut {
52                 fn drop(&mut self) {
53                     $drop.fetch_add(1, Ordering::SeqCst);
54                 }
55             }
56 
57             (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap())
58         };
59     };
60 }
61 
62 // Creates a schedule function with event counters.
63 //
64 // Usage: `schedule!(s, chan, SCHED, DROP)`
65 //
66 // The schedule function `s` pushes the task into `chan`.
67 // When it gets invoked, `SCHED` is incremented.
68 // When it gets dropped, `DROP` is incremented.
69 //
70 // Receiver `chan` extracts the task when it is scheduled.
71 macro_rules! schedule {
72     ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
73         static $drop: AtomicUsize = AtomicUsize::new(0);
try_await<T>(f: impl Future<Output = T>) -> Option<T>74         static $sched: AtomicUsize = AtomicUsize::new(0);
75 
76         let ($name, $chan) = {
77             let (s, r) = flume::unbounded();
78 
drop_and_detach()79             struct Guard(Box<i32>);
80 
81             impl Drop for Guard {
82                 fn drop(&mut self) {
83                     $drop.fetch_add(1, Ordering::SeqCst);
84                 }
85             }
86 
87             let guard = Guard(Box::new(0));
88             let sched = move |runnable: Runnable| {
89                 &guard;
90                 $sched.fetch_add(1, Ordering::SeqCst);
91                 s.send(runnable).unwrap();
92             };
93 
94             (sched, r)
95         };
96     };
97 }
98 
99 fn ms(ms: u64) -> Duration {
100     Duration::from_millis(ms)
101 }
102 
detach_and_drop()103 fn try_await<T>(f: impl Future<Output = T>) -> Option<T> {
104     future::block_on(future::poll_once(f))
105 }
106 
107 #[test]
108 fn wake_during_run() {
109     future!(f, get_waker, POLL, DROP_F);
110     schedule!(s, chan, SCHEDULE, DROP_S);
111     let (runnable, task) = async_task::spawn(f, s);
112 
113     runnable.run();
114     let waker = get_waker();
115     waker.wake_by_ref();
116     let runnable = chan.recv().unwrap();
117 
118     Parallel::new()
119         .add(|| {
120             assert!(catch_unwind(|| runnable.run()).is_err());
121             drop(get_waker());
detach_and_run()122             assert_eq!(POLL.load(Ordering::SeqCst), 2);
123             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
124             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
125             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
126             assert_eq!(chan.len(), 0);
127         })
128         .add(|| {
129             thread::sleep(ms(200));
130 
131             waker.wake();
132             task.detach();
133             assert_eq!(POLL.load(Ordering::SeqCst), 2);
134             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
135             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
136             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
137             assert_eq!(chan.len(), 0);
138 
139             thread::sleep(ms(400));
140 
run_and_detach()141             assert_eq!(POLL.load(Ordering::SeqCst), 2);
142             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
143             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
144             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
145             assert_eq!(chan.len(), 0);
146         })
147         .run();
148 }
149 
150 #[test]
151 fn cancel_during_run() {
152     future!(f, get_waker, POLL, DROP_F);
153     schedule!(s, chan, SCHEDULE, DROP_S);
154     let (runnable, task) = async_task::spawn(f, s);
155 
156     runnable.run();
157     let waker = get_waker();
158     waker.wake();
159     let runnable = chan.recv().unwrap();
cancel_and_run()160 
161     Parallel::new()
162         .add(|| {
163             assert!(catch_unwind(|| runnable.run()).is_err());
164             drop(get_waker());
165             assert_eq!(POLL.load(Ordering::SeqCst), 2);
166             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
167             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
168             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
169             assert_eq!(chan.len(), 0);
170         })
171         .add(|| {
172             thread::sleep(ms(200));
173 
174             drop(task);
175             assert_eq!(POLL.load(Ordering::SeqCst), 2);
176             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
177             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
178             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
run_and_cancel()179             assert_eq!(chan.len(), 0);
180 
181             thread::sleep(ms(400));
182 
183             assert_eq!(POLL.load(Ordering::SeqCst), 2);
184             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
185             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
186             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
187             assert_eq!(chan.len(), 0);
188         })
189         .run();
190 }
191 
192 #[test]
193 fn wake_and_cancel_during_run() {
194     future!(f, get_waker, POLL, DROP_F);
195     schedule!(s, chan, SCHEDULE, DROP_S);
196     let (runnable, task) = async_task::spawn(f, s);
197 
cancel_join()198     runnable.run();
199     let waker = get_waker();
200     waker.wake_by_ref();
201     let runnable = chan.recv().unwrap();
202 
203     Parallel::new()
204         .add(|| {
205             assert!(catch_unwind(|| runnable.run()).is_err());
206             drop(get_waker());
207             assert_eq!(POLL.load(Ordering::SeqCst), 2);
208             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
209             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
210             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
211             assert_eq!(chan.len(), 0);
212         })
213         .add(|| {
214             thread::sleep(ms(200));
215 
216             waker.wake();
217             assert_eq!(POLL.load(Ordering::SeqCst), 2);
218             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
219             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
220             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
221             assert_eq!(chan.len(), 0);
222 
223             drop(task);
224             assert_eq!(POLL.load(Ordering::SeqCst), 2);
225             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
226             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
227             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
228             assert_eq!(chan.len(), 0);
schedule()229 
230             thread::sleep(ms(400));
231 
232             assert_eq!(POLL.load(Ordering::SeqCst), 2);
233             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
234             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
235             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
236             assert_eq!(chan.len(), 0);
237         })
238         .run();
239 }
240 
241 #[test]
242 fn cancel_and_wake_during_run() {
243     future!(f, get_waker, POLL, DROP_F);
244     schedule!(s, chan, SCHEDULE, DROP_S);
245     let (runnable, task) = async_task::spawn(f, s);
246 
247     runnable.run();
248     let waker = get_waker();
schedule_counter()249     waker.wake_by_ref();
250     let runnable = chan.recv().unwrap();
251 
252     Parallel::new()
253         .add(|| {
254             assert!(catch_unwind(|| runnable.run()).is_err());
255             drop(get_waker());
256             assert_eq!(POLL.load(Ordering::SeqCst), 2);
257             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
258             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
259             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
260             assert_eq!(chan.len(), 0);
261         })
262         .add(|| {
263             thread::sleep(ms(200));
264 
265             drop(task);
266             assert_eq!(POLL.load(Ordering::SeqCst), 2);
drop_inside_schedule()267             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
268             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
269             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
drop(&mut self)270             assert_eq!(chan.len(), 0);
271 
272             waker.wake();
273             assert_eq!(POLL.load(Ordering::SeqCst), 2);
274             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
275             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
276             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
277             assert_eq!(chan.len(), 0);
278 
279             thread::sleep(ms(400));
280 
281             assert_eq!(POLL.load(Ordering::SeqCst), 2);
282             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
283             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
284             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
waker()285             assert_eq!(chan.len(), 0);
286         })
287         .run();
288 }
289 
290 #[test]
291 fn panic_and_poll() {
292     future!(f, get_waker, POLL, DROP_F);
293     schedule!(s, chan, SCHEDULE, DROP_S);
294     let (runnable, task) = async_task::spawn(f, s);
295 
296     runnable.run();
297     get_waker().wake();
298     assert_eq!(POLL.load(Ordering::SeqCst), 1);
299     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
300     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
301     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
302 
303     let mut task = task;
304     assert!(try_await(&mut task).is_none());
305 
306     let runnable = chan.recv().unwrap();
307     assert!(catch_unwind(|| runnable.run()).is_err());
308     assert_eq!(POLL.load(Ordering::SeqCst), 2);
309     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
310     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
311     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
312 
313     assert!(catch_unwind(AssertUnwindSafe(|| try_await(&mut task))).is_err());
314     assert_eq!(POLL.load(Ordering::SeqCst), 2);
315     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
316     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
317     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
318 
319     drop(get_waker());
320     drop(task);
321     assert_eq!(POLL.load(Ordering::SeqCst), 2);
322     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
323     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
324     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
325 }
326