1 use crate::runtime::blocking::NoopSchedule;
2 use crate::runtime::task::{self, unowned, JoinHandle, OwnedTasks, Schedule, Task};
3 use crate::util::TryLock;
4
5 use std::collections::VecDeque;
6 use std::future::Future;
7 use std::sync::atomic::{AtomicBool, Ordering};
8 use std::sync::Arc;
9
10 struct AssertDropHandle {
11 is_dropped: Arc<AtomicBool>,
12 }
13 impl AssertDropHandle {
14 #[track_caller]
assert_dropped(&self)15 fn assert_dropped(&self) {
16 assert!(self.is_dropped.load(Ordering::SeqCst));
17 }
18
19 #[track_caller]
assert_not_dropped(&self)20 fn assert_not_dropped(&self) {
21 assert!(!self.is_dropped.load(Ordering::SeqCst));
22 }
23 }
24
25 struct AssertDrop {
26 is_dropped: Arc<AtomicBool>,
27 }
28 impl AssertDrop {
new() -> (Self, AssertDropHandle)29 fn new() -> (Self, AssertDropHandle) {
30 let shared = Arc::new(AtomicBool::new(false));
31 (
32 AssertDrop {
33 is_dropped: shared.clone(),
34 },
35 AssertDropHandle {
36 is_dropped: shared.clone(),
37 },
38 )
39 }
40 }
41 impl Drop for AssertDrop {
drop(&mut self)42 fn drop(&mut self) {
43 self.is_dropped.store(true, Ordering::SeqCst);
44 }
45 }
46
47 // A Notified does not shut down on drop, but it is dropped once the ref-count
48 // hits zero.
49 #[test]
create_drop1()50 fn create_drop1() {
51 let (ad, handle) = AssertDrop::new();
52 let (notified, join) = unowned(
53 async {
54 drop(ad);
55 unreachable!()
56 },
57 NoopSchedule,
58 );
59 drop(notified);
60 handle.assert_not_dropped();
61 drop(join);
62 handle.assert_dropped();
63 }
64
65 #[test]
create_drop2()66 fn create_drop2() {
67 let (ad, handle) = AssertDrop::new();
68 let (notified, join) = unowned(
69 async {
70 drop(ad);
71 unreachable!()
72 },
73 NoopSchedule,
74 );
75 drop(join);
76 handle.assert_not_dropped();
77 drop(notified);
78 handle.assert_dropped();
79 }
80
81 // Shutting down through Notified works
82 #[test]
create_shutdown1()83 fn create_shutdown1() {
84 let (ad, handle) = AssertDrop::new();
85 let (notified, join) = unowned(
86 async {
87 drop(ad);
88 unreachable!()
89 },
90 NoopSchedule,
91 );
92 drop(join);
93 handle.assert_not_dropped();
94 notified.shutdown();
95 handle.assert_dropped();
96 }
97
98 #[test]
create_shutdown2()99 fn create_shutdown2() {
100 let (ad, handle) = AssertDrop::new();
101 let (notified, join) = unowned(
102 async {
103 drop(ad);
104 unreachable!()
105 },
106 NoopSchedule,
107 );
108 handle.assert_not_dropped();
109 notified.shutdown();
110 handle.assert_dropped();
111 drop(join);
112 }
113
114 #[test]
unowned_poll()115 fn unowned_poll() {
116 let (task, _) = unowned(async {}, NoopSchedule);
117 task.run();
118 }
119
120 #[test]
schedule()121 fn schedule() {
122 with(|rt| {
123 rt.spawn(async {
124 crate::task::yield_now().await;
125 });
126
127 assert_eq!(2, rt.tick());
128 rt.shutdown();
129 })
130 }
131
132 #[test]
shutdown()133 fn shutdown() {
134 with(|rt| {
135 rt.spawn(async {
136 loop {
137 crate::task::yield_now().await;
138 }
139 });
140
141 rt.tick_max(1);
142
143 rt.shutdown();
144 })
145 }
146
147 #[test]
shutdown_immediately()148 fn shutdown_immediately() {
149 with(|rt| {
150 rt.spawn(async {
151 loop {
152 crate::task::yield_now().await;
153 }
154 });
155
156 rt.shutdown();
157 })
158 }
159
160 #[test]
spawn_during_shutdown()161 fn spawn_during_shutdown() {
162 static DID_SPAWN: AtomicBool = AtomicBool::new(false);
163
164 struct SpawnOnDrop(Runtime);
165 impl Drop for SpawnOnDrop {
166 fn drop(&mut self) {
167 DID_SPAWN.store(true, Ordering::SeqCst);
168 self.0.spawn(async {});
169 }
170 }
171
172 with(|rt| {
173 let rt2 = rt.clone();
174 rt.spawn(async move {
175 let _spawn_on_drop = SpawnOnDrop(rt2);
176
177 loop {
178 crate::task::yield_now().await;
179 }
180 });
181
182 rt.tick_max(1);
183 rt.shutdown();
184 });
185
186 assert!(DID_SPAWN.load(Ordering::SeqCst));
187 }
188
with(f: impl FnOnce(Runtime))189 fn with(f: impl FnOnce(Runtime)) {
190 struct Reset;
191
192 impl Drop for Reset {
193 fn drop(&mut self) {
194 let _rt = CURRENT.try_lock().unwrap().take();
195 }
196 }
197
198 let _reset = Reset;
199
200 let rt = Runtime(Arc::new(Inner {
201 owned: OwnedTasks::new(),
202 core: TryLock::new(Core {
203 queue: VecDeque::new(),
204 }),
205 }));
206
207 *CURRENT.try_lock().unwrap() = Some(rt.clone());
208 f(rt)
209 }
210
211 #[derive(Clone)]
212 struct Runtime(Arc<Inner>);
213
214 struct Inner {
215 core: TryLock<Core>,
216 owned: OwnedTasks<Runtime>,
217 }
218
219 struct Core {
220 queue: VecDeque<task::Notified<Runtime>>,
221 }
222
223 static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
224
225 impl Runtime {
spawn<T>(&self, future: T) -> JoinHandle<T::Output> where T: 'static + Send + Future, T::Output: 'static + Send,226 fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
227 where
228 T: 'static + Send + Future,
229 T::Output: 'static + Send,
230 {
231 let (handle, notified) = self.0.owned.bind(future, self.clone());
232
233 if let Some(notified) = notified {
234 self.schedule(notified);
235 }
236
237 handle
238 }
239
tick(&self) -> usize240 fn tick(&self) -> usize {
241 self.tick_max(usize::MAX)
242 }
243
tick_max(&self, max: usize) -> usize244 fn tick_max(&self, max: usize) -> usize {
245 let mut n = 0;
246
247 while !self.is_empty() && n < max {
248 let task = self.next_task();
249 n += 1;
250 let task = self.0.owned.assert_owner(task);
251 task.run();
252 }
253
254 n
255 }
256
is_empty(&self) -> bool257 fn is_empty(&self) -> bool {
258 self.0.core.try_lock().unwrap().queue.is_empty()
259 }
260
next_task(&self) -> task::Notified<Runtime>261 fn next_task(&self) -> task::Notified<Runtime> {
262 self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
263 }
264
shutdown(&self)265 fn shutdown(&self) {
266 let mut core = self.0.core.try_lock().unwrap();
267
268 self.0.owned.close_and_shutdown_all();
269
270 while let Some(task) = core.queue.pop_back() {
271 drop(task);
272 }
273
274 drop(core);
275
276 assert!(self.0.owned.is_empty());
277 }
278 }
279
280 impl Schedule for Runtime {
release(&self, task: &Task<Self>) -> Option<Task<Self>>281 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
282 self.0.owned.remove(task)
283 }
284
schedule(&self, task: task::Notified<Self>)285 fn schedule(&self, task: task::Notified<Self>) {
286 self.0.core.try_lock().unwrap().queue.push_back(task);
287 }
288 }
289