1 use crate::runtime::blocking::NoopSchedule;
2 use crate::runtime::task::{self, unowned, JoinHandle, OwnedTasks, Schedule, Task};
3 use crate::util::TryLock;
4 
5 use std::collections::VecDeque;
6 use std::future::Future;
7 use std::sync::atomic::{AtomicBool, Ordering};
8 use std::sync::Arc;
9 
10 struct AssertDropHandle {
11     is_dropped: Arc<AtomicBool>,
12 }
13 impl AssertDropHandle {
14     #[track_caller]
assert_dropped(&self)15     fn assert_dropped(&self) {
16         assert!(self.is_dropped.load(Ordering::SeqCst));
17     }
18 
19     #[track_caller]
assert_not_dropped(&self)20     fn assert_not_dropped(&self) {
21         assert!(!self.is_dropped.load(Ordering::SeqCst));
22     }
23 }
24 
25 struct AssertDrop {
26     is_dropped: Arc<AtomicBool>,
27 }
28 impl AssertDrop {
new() -> (Self, AssertDropHandle)29     fn new() -> (Self, AssertDropHandle) {
30         let shared = Arc::new(AtomicBool::new(false));
31         (
32             AssertDrop {
33                 is_dropped: shared.clone(),
34             },
35             AssertDropHandle {
36                 is_dropped: shared.clone(),
37             },
38         )
39     }
40 }
41 impl Drop for AssertDrop {
drop(&mut self)42     fn drop(&mut self) {
43         self.is_dropped.store(true, Ordering::SeqCst);
44     }
45 }
46 
47 // A Notified does not shut down on drop, but it is dropped once the ref-count
48 // hits zero.
49 #[test]
create_drop1()50 fn create_drop1() {
51     let (ad, handle) = AssertDrop::new();
52     let (notified, join) = unowned(
53         async {
54             drop(ad);
55             unreachable!()
56         },
57         NoopSchedule,
58     );
59     drop(notified);
60     handle.assert_not_dropped();
61     drop(join);
62     handle.assert_dropped();
63 }
64 
65 #[test]
create_drop2()66 fn create_drop2() {
67     let (ad, handle) = AssertDrop::new();
68     let (notified, join) = unowned(
69         async {
70             drop(ad);
71             unreachable!()
72         },
73         NoopSchedule,
74     );
75     drop(join);
76     handle.assert_not_dropped();
77     drop(notified);
78     handle.assert_dropped();
79 }
80 
81 // Shutting down through Notified works
82 #[test]
create_shutdown1()83 fn create_shutdown1() {
84     let (ad, handle) = AssertDrop::new();
85     let (notified, join) = unowned(
86         async {
87             drop(ad);
88             unreachable!()
89         },
90         NoopSchedule,
91     );
92     drop(join);
93     handle.assert_not_dropped();
94     notified.shutdown();
95     handle.assert_dropped();
96 }
97 
98 #[test]
create_shutdown2()99 fn create_shutdown2() {
100     let (ad, handle) = AssertDrop::new();
101     let (notified, join) = unowned(
102         async {
103             drop(ad);
104             unreachable!()
105         },
106         NoopSchedule,
107     );
108     handle.assert_not_dropped();
109     notified.shutdown();
110     handle.assert_dropped();
111     drop(join);
112 }
113 
114 #[test]
unowned_poll()115 fn unowned_poll() {
116     let (task, _) = unowned(async {}, NoopSchedule);
117     task.run();
118 }
119 
120 #[test]
schedule()121 fn schedule() {
122     with(|rt| {
123         rt.spawn(async {
124             crate::task::yield_now().await;
125         });
126 
127         assert_eq!(2, rt.tick());
128         rt.shutdown();
129     })
130 }
131 
132 #[test]
shutdown()133 fn shutdown() {
134     with(|rt| {
135         rt.spawn(async {
136             loop {
137                 crate::task::yield_now().await;
138             }
139         });
140 
141         rt.tick_max(1);
142 
143         rt.shutdown();
144     })
145 }
146 
147 #[test]
shutdown_immediately()148 fn shutdown_immediately() {
149     with(|rt| {
150         rt.spawn(async {
151             loop {
152                 crate::task::yield_now().await;
153             }
154         });
155 
156         rt.shutdown();
157     })
158 }
159 
160 #[test]
spawn_during_shutdown()161 fn spawn_during_shutdown() {
162     static DID_SPAWN: AtomicBool = AtomicBool::new(false);
163 
164     struct SpawnOnDrop(Runtime);
165     impl Drop for SpawnOnDrop {
166         fn drop(&mut self) {
167             DID_SPAWN.store(true, Ordering::SeqCst);
168             self.0.spawn(async {});
169         }
170     }
171 
172     with(|rt| {
173         let rt2 = rt.clone();
174         rt.spawn(async move {
175             let _spawn_on_drop = SpawnOnDrop(rt2);
176 
177             loop {
178                 crate::task::yield_now().await;
179             }
180         });
181 
182         rt.tick_max(1);
183         rt.shutdown();
184     });
185 
186     assert!(DID_SPAWN.load(Ordering::SeqCst));
187 }
188 
with(f: impl FnOnce(Runtime))189 fn with(f: impl FnOnce(Runtime)) {
190     struct Reset;
191 
192     impl Drop for Reset {
193         fn drop(&mut self) {
194             let _rt = CURRENT.try_lock().unwrap().take();
195         }
196     }
197 
198     let _reset = Reset;
199 
200     let rt = Runtime(Arc::new(Inner {
201         owned: OwnedTasks::new(),
202         core: TryLock::new(Core {
203             queue: VecDeque::new(),
204         }),
205     }));
206 
207     *CURRENT.try_lock().unwrap() = Some(rt.clone());
208     f(rt)
209 }
210 
211 #[derive(Clone)]
212 struct Runtime(Arc<Inner>);
213 
214 struct Inner {
215     core: TryLock<Core>,
216     owned: OwnedTasks<Runtime>,
217 }
218 
219 struct Core {
220     queue: VecDeque<task::Notified<Runtime>>,
221 }
222 
223 static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
224 
225 impl Runtime {
spawn<T>(&self, future: T) -> JoinHandle<T::Output> where T: 'static + Send + Future, T::Output: 'static + Send,226     fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
227     where
228         T: 'static + Send + Future,
229         T::Output: 'static + Send,
230     {
231         let (handle, notified) = self.0.owned.bind(future, self.clone());
232 
233         if let Some(notified) = notified {
234             self.schedule(notified);
235         }
236 
237         handle
238     }
239 
tick(&self) -> usize240     fn tick(&self) -> usize {
241         self.tick_max(usize::MAX)
242     }
243 
tick_max(&self, max: usize) -> usize244     fn tick_max(&self, max: usize) -> usize {
245         let mut n = 0;
246 
247         while !self.is_empty() && n < max {
248             let task = self.next_task();
249             n += 1;
250             let task = self.0.owned.assert_owner(task);
251             task.run();
252         }
253 
254         n
255     }
256 
is_empty(&self) -> bool257     fn is_empty(&self) -> bool {
258         self.0.core.try_lock().unwrap().queue.is_empty()
259     }
260 
next_task(&self) -> task::Notified<Runtime>261     fn next_task(&self) -> task::Notified<Runtime> {
262         self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
263     }
264 
shutdown(&self)265     fn shutdown(&self) {
266         let mut core = self.0.core.try_lock().unwrap();
267 
268         self.0.owned.close_and_shutdown_all();
269 
270         while let Some(task) = core.queue.pop_back() {
271             drop(task);
272         }
273 
274         drop(core);
275 
276         assert!(self.0.owned.is_empty());
277     }
278 }
279 
280 impl Schedule for Runtime {
release(&self, task: &Task<Self>) -> Option<Task<Self>>281     fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
282         self.0.owned.remove(task)
283     }
284 
schedule(&self, task: task::Notified<Self>)285     fn schedule(&self, task: task::Notified<Self>) {
286         self.0.core.try_lock().unwrap().queue.push_back(task);
287     }
288 }
289