1 use std::future::Future; 2 use std::pin::Pin; 3 use std::sync::atomic::{AtomicUsize, Ordering}; 4 use std::task::{Context, Poll}; 5 6 use async_task::Runnable; 7 use smol::future; 8 9 // Creates a future with event counters. 10 // 11 // Usage: `future!(f, POLL, DROP)` 12 // 13 // The future `f` always returns `Poll::Ready`. 14 // When it gets polled, `POLL` is incremented. 15 // When it gets dropped, `DROP` is incremented. 16 macro_rules! future { 17 ($name:pat, $poll:ident, $drop:ident) => { 18 static $poll: AtomicUsize = AtomicUsize::new(0); 19 static $drop: AtomicUsize = AtomicUsize::new(0); 20 21 let $name = { 22 struct Fut(Box<i32>); 23 24 impl Future for Fut { 25 type Output = Box<i32>; 26 27 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { 28 $poll.fetch_add(1, Ordering::SeqCst); 29 Poll::Ready(Box::new(0)) 30 } 31 } 32 33 impl Drop for Fut { 34 fn drop(&mut self) { 35 $drop.fetch_add(1, Ordering::SeqCst); 36 } 37 } 38 39 Fut(Box::new(0)) 40 }; 41 }; 42 } 43 44 // Creates a schedule function with event counters. 45 // 46 // Usage: `schedule!(s, SCHED, DROP)` 47 // 48 // The schedule function `s` does nothing. 49 // When it gets invoked, `SCHED` is incremented. 50 // When it gets dropped, `DROP` is incremented. 51 macro_rules! schedule { 52 ($name:pat, $sched:ident, $drop:ident) => { 53 static $drop: AtomicUsize = AtomicUsize::new(0); 54 static $sched: AtomicUsize = AtomicUsize::new(0); 55 56 let $name = { 57 struct Guard(Box<i32>); 58 59 impl Drop for Guard { 60 fn drop(&mut self) { 61 $drop.fetch_add(1, Ordering::SeqCst); 62 } 63 } 64 65 let guard = Guard(Box::new(0)); 66 move |_runnable| { 67 &guard; 68 $sched.fetch_add(1, Ordering::SeqCst); 69 } 70 }; 71 }; 72 } 73 74 fn try_await<T>(f: impl Future<Output = T>) -> Option<T> { 75 future::block_on(future::poll_once(f)) 76 } 77 78 #[test] 79 fn drop_and_detach() { 80 future!(f, POLL, DROP_F); 81 schedule!(s, SCHEDULE, DROP_S); 82 let (runnable, task) = async_task::spawn(f, s); 83 84 assert_eq!(POLL.load(Ordering::SeqCst), 0); 85 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 86 assert_eq!(DROP_F.load(Ordering::SeqCst), 0); 87 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 88 89 drop(runnable); 90 assert_eq!(POLL.load(Ordering::SeqCst), 0); 91 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 92 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 93 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 94 95 task.detach(); 96 assert_eq!(POLL.load(Ordering::SeqCst), 0); 97 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 98 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 99 assert_eq!(DROP_S.load(Ordering::SeqCst), 1); 100 } 101 102 #[test] 103 fn detach_and_drop() { 104 future!(f, POLL, DROP_F); 105 schedule!(s, SCHEDULE, DROP_S); 106 let (runnable, task) = async_task::spawn(f, s); 107 108 task.detach(); 109 assert_eq!(POLL.load(Ordering::SeqCst), 0); 110 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 111 assert_eq!(DROP_F.load(Ordering::SeqCst), 0); 112 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 113 114 drop(runnable); 115 assert_eq!(POLL.load(Ordering::SeqCst), 0); 116 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 117 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 118 assert_eq!(DROP_S.load(Ordering::SeqCst), 1); 119 } 120 121 #[test] 122 fn detach_and_run() { 123 future!(f, POLL, DROP_F); 124 schedule!(s, SCHEDULE, DROP_S); 125 let (runnable, task) = async_task::spawn(f, s); 126 127 task.detach(); 128 assert_eq!(POLL.load(Ordering::SeqCst), 0); 129 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 130 assert_eq!(DROP_F.load(Ordering::SeqCst), 0); 131 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 132 133 runnable.run(); 134 assert_eq!(POLL.load(Ordering::SeqCst), 1); 135 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 136 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 137 assert_eq!(DROP_S.load(Ordering::SeqCst), 1); 138 } 139 140 #[test] 141 fn run_and_detach() { 142 future!(f, POLL, DROP_F); 143 schedule!(s, SCHEDULE, DROP_S); 144 let (runnable, task) = async_task::spawn(f, s); 145 146 runnable.run(); 147 assert_eq!(POLL.load(Ordering::SeqCst), 1); 148 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 149 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 150 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 151 152 task.detach(); 153 assert_eq!(POLL.load(Ordering::SeqCst), 1); 154 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 155 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 156 assert_eq!(DROP_S.load(Ordering::SeqCst), 1); 157 } 158 159 #[test] 160 fn cancel_and_run() { 161 future!(f, POLL, DROP_F); 162 schedule!(s, SCHEDULE, DROP_S); 163 let (runnable, task) = async_task::spawn(f, s); 164 165 drop(task); 166 assert_eq!(POLL.load(Ordering::SeqCst), 0); 167 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 168 assert_eq!(DROP_F.load(Ordering::SeqCst), 0); 169 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 170 171 runnable.run(); 172 assert_eq!(POLL.load(Ordering::SeqCst), 0); 173 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 174 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 175 assert_eq!(DROP_S.load(Ordering::SeqCst), 1); 176 } 177 178 #[test] 179 fn run_and_cancel() { 180 future!(f, POLL, DROP_F); 181 schedule!(s, SCHEDULE, DROP_S); 182 let (runnable, task) = async_task::spawn(f, s); 183 184 runnable.run(); 185 assert_eq!(POLL.load(Ordering::SeqCst), 1); 186 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 187 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 188 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 189 190 drop(task); 191 assert_eq!(POLL.load(Ordering::SeqCst), 1); 192 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 193 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 194 assert_eq!(DROP_S.load(Ordering::SeqCst), 1); 195 } 196 197 #[test] 198 fn cancel_join() { 199 future!(f, POLL, DROP_F); 200 schedule!(s, SCHEDULE, DROP_S); 201 let (runnable, mut task) = async_task::spawn(f, s); 202 203 assert!(try_await(&mut task).is_none()); 204 assert_eq!(POLL.load(Ordering::SeqCst), 0); 205 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 206 assert_eq!(DROP_F.load(Ordering::SeqCst), 0); 207 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 208 209 runnable.run(); 210 assert_eq!(POLL.load(Ordering::SeqCst), 1); 211 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 212 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 213 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 214 215 assert!(try_await(&mut task).is_some()); 216 assert_eq!(POLL.load(Ordering::SeqCst), 1); 217 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 218 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 219 assert_eq!(DROP_S.load(Ordering::SeqCst), 0); 220 221 drop(task); 222 assert_eq!(POLL.load(Ordering::SeqCst), 1); 223 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); 224 assert_eq!(DROP_F.load(Ordering::SeqCst), 1); 225 assert_eq!(DROP_S.load(Ordering::SeqCst), 1); 226 } 227 228 #[test] 229 fn schedule() { 230 let (s, r) = flume::unbounded(); 231 let schedule = move |runnable| s.send(runnable).unwrap(); 232 let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule); 233 234 assert!(r.is_empty()); 235 runnable.schedule(); 236 237 let runnable = r.recv().unwrap(); 238 assert!(r.is_empty()); 239 runnable.schedule(); 240 241 let runnable = r.recv().unwrap(); 242 assert!(r.is_empty()); 243 runnable.schedule(); 244 245 r.recv().unwrap(); 246 } 247 248 #[test] 249 fn schedule_counter() { 250 static COUNT: AtomicUsize = AtomicUsize::new(0); 251 252 let (s, r) = flume::unbounded(); 253 let schedule = move |runnable: Runnable| { 254 COUNT.fetch_add(1, Ordering::SeqCst); 255 s.send(runnable).unwrap(); 256 }; 257 let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule); 258 runnable.schedule(); 259 260 r.recv().unwrap().schedule(); 261 r.recv().unwrap().schedule(); 262 assert_eq!(COUNT.load(Ordering::SeqCst), 3); 263 r.recv().unwrap(); 264 } 265 266 #[test] 267 fn drop_inside_schedule() { 268 struct DropGuard(AtomicUsize); 269 impl Drop for DropGuard { 270 fn drop(&mut self) { 271 self.0.fetch_add(1, Ordering::SeqCst); 272 } 273 } 274 let guard = DropGuard(AtomicUsize::new(0)); 275 276 let (runnable, _) = async_task::spawn(async {}, move |runnable| { 277 assert_eq!(guard.0.load(Ordering::SeqCst), 0); 278 drop(runnable); 279 assert_eq!(guard.0.load(Ordering::SeqCst), 0); 280 }); 281 runnable.schedule(); 282 } 283 284 #[test] 285 fn waker() { 286 let (s, r) = flume::unbounded(); 287 let schedule = move |runnable| s.send(runnable).unwrap(); 288 let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule); 289 290 assert!(r.is_empty()); 291 let waker = runnable.waker(); 292 runnable.run(); 293 waker.wake_by_ref(); 294 295 let runnable = r.recv().unwrap(); 296 runnable.run(); 297 waker.wake(); 298 r.recv().unwrap(); 299 } 300