1 use crate::runtime::queue;
2 use crate::runtime::task::{self, Schedule, Task};
3 
4 use std::thread;
5 use std::time::Duration;
6 
7 #[test]
fits_256()8 fn fits_256() {
9     let (_, mut local) = queue::local();
10     let inject = queue::Inject::new();
11 
12     for _ in 0..256 {
13         let (task, _) = task::joinable::<_, Runtime>(async {});
14         local.push_back(task, &inject);
15     }
16 
17     assert!(inject.pop().is_none());
18 
19     while local.pop().is_some() {}
20 }
21 
22 #[test]
overflow()23 fn overflow() {
24     let (_, mut local) = queue::local();
25     let inject = queue::Inject::new();
26 
27     for _ in 0..257 {
28         let (task, _) = task::joinable::<_, Runtime>(async {});
29         local.push_back(task, &inject);
30     }
31 
32     let mut n = 0;
33 
34     while inject.pop().is_some() {
35         n += 1;
36     }
37 
38     while local.pop().is_some() {
39         n += 1;
40     }
41 
42     assert_eq!(n, 257);
43 }
44 
45 #[test]
steal_batch()46 fn steal_batch() {
47     let (steal1, mut local1) = queue::local();
48     let (_, mut local2) = queue::local();
49     let inject = queue::Inject::new();
50 
51     for _ in 0..4 {
52         let (task, _) = task::joinable::<_, Runtime>(async {});
53         local1.push_back(task, &inject);
54     }
55 
56     assert!(steal1.steal_into(&mut local2).is_some());
57 
58     for _ in 0..1 {
59         assert!(local2.pop().is_some());
60     }
61 
62     assert!(local2.pop().is_none());
63 
64     for _ in 0..2 {
65         assert!(local1.pop().is_some());
66     }
67 
68     assert!(local1.pop().is_none());
69 }
70 
71 #[test]
stress1()72 fn stress1() {
73     const NUM_ITER: usize = 1;
74     const NUM_STEAL: usize = 1_000;
75     const NUM_LOCAL: usize = 1_000;
76     const NUM_PUSH: usize = 500;
77     const NUM_POP: usize = 250;
78 
79     for _ in 0..NUM_ITER {
80         let (steal, mut local) = queue::local();
81         let inject = queue::Inject::new();
82 
83         let th = thread::spawn(move || {
84             let (_, mut local) = queue::local();
85             let mut n = 0;
86 
87             for _ in 0..NUM_STEAL {
88                 if steal.steal_into(&mut local).is_some() {
89                     n += 1;
90                 }
91 
92                 while local.pop().is_some() {
93                     n += 1;
94                 }
95 
96                 thread::yield_now();
97             }
98 
99             n
100         });
101 
102         let mut n = 0;
103 
104         for _ in 0..NUM_LOCAL {
105             for _ in 0..NUM_PUSH {
106                 let (task, _) = task::joinable::<_, Runtime>(async {});
107                 local.push_back(task, &inject);
108             }
109 
110             for _ in 0..NUM_POP {
111                 if local.pop().is_some() {
112                     n += 1;
113                 } else {
114                     break;
115                 }
116             }
117         }
118 
119         while inject.pop().is_some() {
120             n += 1;
121         }
122 
123         n += th.join().unwrap();
124 
125         assert_eq!(n, NUM_LOCAL * NUM_PUSH);
126     }
127 }
128 
129 #[test]
stress2()130 fn stress2() {
131     const NUM_ITER: usize = 1;
132     const NUM_TASKS: usize = 1_000_000;
133     const NUM_STEAL: usize = 1_000;
134 
135     for _ in 0..NUM_ITER {
136         let (steal, mut local) = queue::local();
137         let inject = queue::Inject::new();
138 
139         let th = thread::spawn(move || {
140             let (_, mut local) = queue::local();
141             let mut n = 0;
142 
143             for _ in 0..NUM_STEAL {
144                 if steal.steal_into(&mut local).is_some() {
145                     n += 1;
146                 }
147 
148                 while local.pop().is_some() {
149                     n += 1;
150                 }
151 
152                 thread::sleep(Duration::from_micros(10));
153             }
154 
155             n
156         });
157 
158         let mut num_pop = 0;
159 
160         for i in 0..NUM_TASKS {
161             let (task, _) = task::joinable::<_, Runtime>(async {});
162             local.push_back(task, &inject);
163 
164             if i % 128 == 0 && local.pop().is_some() {
165                 num_pop += 1;
166             }
167 
168             while inject.pop().is_some() {
169                 num_pop += 1;
170             }
171         }
172 
173         num_pop += th.join().unwrap();
174 
175         while local.pop().is_some() {
176             num_pop += 1;
177         }
178 
179         while inject.pop().is_some() {
180             num_pop += 1;
181         }
182 
183         assert_eq!(num_pop, NUM_TASKS);
184     }
185 }
186 
187 struct Runtime;
188 
189 impl Schedule for Runtime {
bind(task: Task<Self>) -> Runtime190     fn bind(task: Task<Self>) -> Runtime {
191         std::mem::forget(task);
192         Runtime
193     }
194 
release(&self, _task: &Task<Self>) -> Option<Task<Self>>195     fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
196         None
197     }
198 
schedule(&self, _task: task::Notified<Self>)199     fn schedule(&self, _task: task::Notified<Self>) {
200         unreachable!();
201     }
202 }
203