1 use crate::runtime::task::{self, Schedule, Task};
2 use crate::util::linked_list::LinkedList;
3 use crate::util::TryLock;
4
5 use std::collections::VecDeque;
6 use std::sync::Arc;
7
8 #[test]
create_drop()9 fn create_drop() {
10 let _ = task::joinable::<_, Runtime>(async { unreachable!() });
11 }
12
13 #[test]
schedule()14 fn schedule() {
15 with(|rt| {
16 let (task, _) = task::joinable(async {
17 crate::task::yield_now().await;
18 });
19
20 rt.schedule(task);
21
22 assert_eq!(2, rt.tick());
23 })
24 }
25
26 #[test]
shutdown()27 fn shutdown() {
28 with(|rt| {
29 let (task, _) = task::joinable(async {
30 loop {
31 crate::task::yield_now().await;
32 }
33 });
34
35 rt.schedule(task);
36 rt.tick_max(1);
37
38 rt.shutdown();
39 })
40 }
41
with(f: impl FnOnce(Runtime))42 fn with(f: impl FnOnce(Runtime)) {
43 struct Reset;
44
45 impl Drop for Reset {
46 fn drop(&mut self) {
47 let _rt = CURRENT.try_lock().unwrap().take();
48 }
49 }
50
51 let _reset = Reset;
52
53 let rt = Runtime(Arc::new(Inner {
54 released: task::TransferStack::new(),
55 core: TryLock::new(Core {
56 queue: VecDeque::new(),
57 tasks: LinkedList::new(),
58 }),
59 }));
60
61 *CURRENT.try_lock().unwrap() = Some(rt.clone());
62 f(rt)
63 }
64
65 #[derive(Clone)]
66 struct Runtime(Arc<Inner>);
67
68 struct Inner {
69 released: task::TransferStack<Runtime>,
70 core: TryLock<Core>,
71 }
72
73 struct Core {
74 queue: VecDeque<task::Notified<Runtime>>,
75 tasks: LinkedList<Task<Runtime>>,
76 }
77
78 static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
79
80 impl Runtime {
tick(&self) -> usize81 fn tick(&self) -> usize {
82 self.tick_max(usize::max_value())
83 }
84
tick_max(&self, max: usize) -> usize85 fn tick_max(&self, max: usize) -> usize {
86 let mut n = 0;
87
88 while !self.is_empty() && n < max {
89 let task = self.next_task();
90 n += 1;
91 task.run();
92 }
93
94 self.0.maintenance();
95
96 n
97 }
98
is_empty(&self) -> bool99 fn is_empty(&self) -> bool {
100 self.0.core.try_lock().unwrap().queue.is_empty()
101 }
102
next_task(&self) -> task::Notified<Runtime>103 fn next_task(&self) -> task::Notified<Runtime> {
104 self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
105 }
106
shutdown(&self)107 fn shutdown(&self) {
108 let mut core = self.0.core.try_lock().unwrap();
109
110 for task in core.tasks.iter() {
111 task.shutdown();
112 }
113
114 while let Some(task) = core.queue.pop_back() {
115 task.shutdown();
116 }
117
118 drop(core);
119
120 while !self.0.core.try_lock().unwrap().tasks.is_empty() {
121 self.0.maintenance();
122 }
123 }
124 }
125
126 impl Inner {
maintenance(&self)127 fn maintenance(&self) {
128 use std::mem::ManuallyDrop;
129
130 for task in self.released.drain() {
131 let task = ManuallyDrop::new(task);
132
133 // safety: see worker.rs
134 unsafe {
135 let ptr = task.header().into();
136 self.core.try_lock().unwrap().tasks.remove(ptr);
137 }
138 }
139 }
140 }
141
142 impl Schedule for Runtime {
bind(task: Task<Self>) -> Runtime143 fn bind(task: Task<Self>) -> Runtime {
144 let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone();
145 rt.0.core.try_lock().unwrap().tasks.push_front(task);
146 rt
147 }
148
release(&self, task: &Task<Self>) -> Option<Task<Self>>149 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
150 // safety: copying worker.rs
151 let task = unsafe { Task::from_raw(task.header().into()) };
152 self.0.released.push(task);
153 None
154 }
155
schedule(&self, task: task::Notified<Self>)156 fn schedule(&self, task: task::Notified<Self>) {
157 self.0.core.try_lock().unwrap().queue.push_back(task);
158 }
159 }
160