1 use std::future::Future;
2 use std::pin::Pin;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::task::{Context, Poll};
5 
6 use async_task::Runnable;
7 use smol::future;
8 
9 // Creates a future with event counters.
10 //
11 // Usage: `future!(f, POLL, DROP)`
12 //
13 // The future `f` always returns `Poll::Ready`.
14 // When it gets polled, `POLL` is incremented.
15 // When it gets dropped, `DROP` is incremented.
16 macro_rules! future {
17     ($name:pat, $poll:ident, $drop:ident) => {
18         static $poll: AtomicUsize = AtomicUsize::new(0);
19         static $drop: AtomicUsize = AtomicUsize::new(0);
20 
21         let $name = {
22             struct Fut(Box<i32>);
23 
24             impl Future for Fut {
25                 type Output = Box<i32>;
26 
27                 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
28                     $poll.fetch_add(1, Ordering::SeqCst);
29                     Poll::Ready(Box::new(0))
30                 }
31             }
32 
33             impl Drop for Fut {
34                 fn drop(&mut self) {
35                     $drop.fetch_add(1, Ordering::SeqCst);
36                 }
37             }
38 
39             Fut(Box::new(0))
40         };
41     };
42 }
43 
44 // Creates a schedule function with event counters.
45 //
46 // Usage: `schedule!(s, SCHED, DROP)`
47 //
48 // The schedule function `s` does nothing.
49 // When it gets invoked, `SCHED` is incremented.
50 // When it gets dropped, `DROP` is incremented.
51 macro_rules! schedule {
52     ($name:pat, $sched:ident, $drop:ident) => {
53         static $drop: AtomicUsize = AtomicUsize::new(0);
54         static $sched: AtomicUsize = AtomicUsize::new(0);
55 
56         let $name = {
57             struct Guard(Box<i32>);
58 
59             impl Drop for Guard {
60                 fn drop(&mut self) {
61                     $drop.fetch_add(1, Ordering::SeqCst);
62                 }
63             }
64 
65             let guard = Guard(Box::new(0));
66             move |_runnable| {
67                 &guard;
68                 $sched.fetch_add(1, Ordering::SeqCst);
69             }
70         };
71     };
72 }
73 
74 fn try_await<T>(f: impl Future<Output = T>) -> Option<T> {
75     future::block_on(future::poll_once(f))
76 }
77 
78 #[test]
79 fn drop_and_detach() {
80     future!(f, POLL, DROP_F);
81     schedule!(s, SCHEDULE, DROP_S);
82     let (runnable, task) = async_task::spawn(f, s);
83 
84     assert_eq!(POLL.load(Ordering::SeqCst), 0);
85     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
86     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
87     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
88 
89     drop(runnable);
90     assert_eq!(POLL.load(Ordering::SeqCst), 0);
91     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
92     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
93     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
94 
95     task.detach();
96     assert_eq!(POLL.load(Ordering::SeqCst), 0);
97     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
98     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
99     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
100 }
101 
102 #[test]
103 fn detach_and_drop() {
104     future!(f, POLL, DROP_F);
105     schedule!(s, SCHEDULE, DROP_S);
106     let (runnable, task) = async_task::spawn(f, s);
107 
108     task.detach();
109     assert_eq!(POLL.load(Ordering::SeqCst), 0);
110     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
111     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
112     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
113 
114     drop(runnable);
115     assert_eq!(POLL.load(Ordering::SeqCst), 0);
116     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
117     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
118     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
119 }
120 
121 #[test]
122 fn detach_and_run() {
123     future!(f, POLL, DROP_F);
124     schedule!(s, SCHEDULE, DROP_S);
125     let (runnable, task) = async_task::spawn(f, s);
126 
127     task.detach();
128     assert_eq!(POLL.load(Ordering::SeqCst), 0);
129     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
130     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
131     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
132 
133     runnable.run();
134     assert_eq!(POLL.load(Ordering::SeqCst), 1);
135     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
136     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
137     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
138 }
139 
140 #[test]
141 fn run_and_detach() {
142     future!(f, POLL, DROP_F);
143     schedule!(s, SCHEDULE, DROP_S);
144     let (runnable, task) = async_task::spawn(f, s);
145 
146     runnable.run();
147     assert_eq!(POLL.load(Ordering::SeqCst), 1);
148     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
149     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
150     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
151 
152     task.detach();
153     assert_eq!(POLL.load(Ordering::SeqCst), 1);
154     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
155     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
156     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
157 }
158 
159 #[test]
160 fn cancel_and_run() {
161     future!(f, POLL, DROP_F);
162     schedule!(s, SCHEDULE, DROP_S);
163     let (runnable, task) = async_task::spawn(f, s);
164 
165     drop(task);
166     assert_eq!(POLL.load(Ordering::SeqCst), 0);
167     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
168     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
169     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
170 
171     runnable.run();
172     assert_eq!(POLL.load(Ordering::SeqCst), 0);
173     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
174     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
175     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
176 }
177 
178 #[test]
179 fn run_and_cancel() {
180     future!(f, POLL, DROP_F);
181     schedule!(s, SCHEDULE, DROP_S);
182     let (runnable, task) = async_task::spawn(f, s);
183 
184     runnable.run();
185     assert_eq!(POLL.load(Ordering::SeqCst), 1);
186     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
187     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
188     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
189 
190     drop(task);
191     assert_eq!(POLL.load(Ordering::SeqCst), 1);
192     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
193     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
194     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
195 }
196 
197 #[test]
198 fn cancel_join() {
199     future!(f, POLL, DROP_F);
200     schedule!(s, SCHEDULE, DROP_S);
201     let (runnable, mut task) = async_task::spawn(f, s);
202 
203     assert!(try_await(&mut task).is_none());
204     assert_eq!(POLL.load(Ordering::SeqCst), 0);
205     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
206     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
207     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
208 
209     runnable.run();
210     assert_eq!(POLL.load(Ordering::SeqCst), 1);
211     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
212     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
213     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
214 
215     assert!(try_await(&mut task).is_some());
216     assert_eq!(POLL.load(Ordering::SeqCst), 1);
217     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
218     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
219     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
220 
221     drop(task);
222     assert_eq!(POLL.load(Ordering::SeqCst), 1);
223     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
224     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
225     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
226 }
227 
228 #[test]
229 fn schedule() {
230     let (s, r) = flume::unbounded();
231     let schedule = move |runnable| s.send(runnable).unwrap();
232     let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
233 
234     assert!(r.is_empty());
235     runnable.schedule();
236 
237     let runnable = r.recv().unwrap();
238     assert!(r.is_empty());
239     runnable.schedule();
240 
241     let runnable = r.recv().unwrap();
242     assert!(r.is_empty());
243     runnable.schedule();
244 
245     r.recv().unwrap();
246 }
247 
248 #[test]
249 fn schedule_counter() {
250     static COUNT: AtomicUsize = AtomicUsize::new(0);
251 
252     let (s, r) = flume::unbounded();
253     let schedule = move |runnable: Runnable| {
254         COUNT.fetch_add(1, Ordering::SeqCst);
255         s.send(runnable).unwrap();
256     };
257     let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
258     runnable.schedule();
259 
260     r.recv().unwrap().schedule();
261     r.recv().unwrap().schedule();
262     assert_eq!(COUNT.load(Ordering::SeqCst), 3);
263     r.recv().unwrap();
264 }
265 
266 #[test]
267 fn drop_inside_schedule() {
268     struct DropGuard(AtomicUsize);
269     impl Drop for DropGuard {
270         fn drop(&mut self) {
271             self.0.fetch_add(1, Ordering::SeqCst);
272         }
273     }
274     let guard = DropGuard(AtomicUsize::new(0));
275 
276     let (runnable, _) = async_task::spawn(async {}, move |runnable| {
277         assert_eq!(guard.0.load(Ordering::SeqCst), 0);
278         drop(runnable);
279         assert_eq!(guard.0.load(Ordering::SeqCst), 0);
280     });
281     runnable.schedule();
282 }
283 
284 #[test]
285 fn waker() {
286     let (s, r) = flume::unbounded();
287     let schedule = move |runnable| s.send(runnable).unwrap();
288     let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule);
289 
290     assert!(r.is_empty());
291     let waker = runnable.waker();
292     runnable.run();
293     waker.wake_by_ref();
294 
295     let runnable = r.recv().unwrap();
296     runnable.run();
297     waker.wake();
298     r.recv().unwrap();
299 }
300