1 use crate::runtime::task::{self, Schedule, Task};
2 use crate::util::linked_list::LinkedList;
3 use crate::util::TryLock;
4 
5 use std::collections::VecDeque;
6 use std::sync::Arc;
7 
8 #[test]
create_drop()9 fn create_drop() {
10     let _ = task::joinable::<_, Runtime>(async { unreachable!() });
11 }
12 
13 #[test]
schedule()14 fn schedule() {
15     with(|rt| {
16         let (task, _) = task::joinable(async {
17             crate::task::yield_now().await;
18         });
19 
20         rt.schedule(task);
21 
22         assert_eq!(2, rt.tick());
23     })
24 }
25 
26 #[test]
shutdown()27 fn shutdown() {
28     with(|rt| {
29         let (task, _) = task::joinable(async {
30             loop {
31                 crate::task::yield_now().await;
32             }
33         });
34 
35         rt.schedule(task);
36         rt.tick_max(1);
37 
38         rt.shutdown();
39     })
40 }
41 
with(f: impl FnOnce(Runtime))42 fn with(f: impl FnOnce(Runtime)) {
43     struct Reset;
44 
45     impl Drop for Reset {
46         fn drop(&mut self) {
47             let _rt = CURRENT.try_lock().unwrap().take();
48         }
49     }
50 
51     let _reset = Reset;
52 
53     let rt = Runtime(Arc::new(Inner {
54         released: task::TransferStack::new(),
55         core: TryLock::new(Core {
56             queue: VecDeque::new(),
57             tasks: LinkedList::new(),
58         }),
59     }));
60 
61     *CURRENT.try_lock().unwrap() = Some(rt.clone());
62     f(rt)
63 }
64 
65 #[derive(Clone)]
66 struct Runtime(Arc<Inner>);
67 
68 struct Inner {
69     released: task::TransferStack<Runtime>,
70     core: TryLock<Core>,
71 }
72 
73 struct Core {
74     queue: VecDeque<task::Notified<Runtime>>,
75     tasks: LinkedList<Task<Runtime>>,
76 }
77 
78 static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
79 
80 impl Runtime {
tick(&self) -> usize81     fn tick(&self) -> usize {
82         self.tick_max(usize::max_value())
83     }
84 
tick_max(&self, max: usize) -> usize85     fn tick_max(&self, max: usize) -> usize {
86         let mut n = 0;
87 
88         while !self.is_empty() && n < max {
89             let task = self.next_task();
90             n += 1;
91             task.run();
92         }
93 
94         self.0.maintenance();
95 
96         n
97     }
98 
is_empty(&self) -> bool99     fn is_empty(&self) -> bool {
100         self.0.core.try_lock().unwrap().queue.is_empty()
101     }
102 
next_task(&self) -> task::Notified<Runtime>103     fn next_task(&self) -> task::Notified<Runtime> {
104         self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
105     }
106 
shutdown(&self)107     fn shutdown(&self) {
108         let mut core = self.0.core.try_lock().unwrap();
109 
110         for task in core.tasks.iter() {
111             task.shutdown();
112         }
113 
114         while let Some(task) = core.queue.pop_back() {
115             task.shutdown();
116         }
117 
118         drop(core);
119 
120         while !self.0.core.try_lock().unwrap().tasks.is_empty() {
121             self.0.maintenance();
122         }
123     }
124 }
125 
126 impl Inner {
maintenance(&self)127     fn maintenance(&self) {
128         use std::mem::ManuallyDrop;
129 
130         for task in self.released.drain() {
131             let task = ManuallyDrop::new(task);
132 
133             // safety: see worker.rs
134             unsafe {
135                 let ptr = task.header().into();
136                 self.core.try_lock().unwrap().tasks.remove(ptr);
137             }
138         }
139     }
140 }
141 
142 impl Schedule for Runtime {
bind(task: Task<Self>) -> Runtime143     fn bind(task: Task<Self>) -> Runtime {
144         let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone();
145         rt.0.core.try_lock().unwrap().tasks.push_front(task);
146         rt
147     }
148 
release(&self, task: &Task<Self>) -> Option<Task<Self>>149     fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
150         // safety: copying worker.rs
151         let task = unsafe { Task::from_raw(task.header().into()) };
152         self.0.released.push(task);
153         None
154     }
155 
schedule(&self, task: task::Notified<Self>)156     fn schedule(&self, task: task::Notified<Self>) {
157         self.0.core.try_lock().unwrap().queue.push_back(task);
158     }
159 }
160