1 use crate::runtime::queue;
2 use crate::runtime::stats::WorkerStatsBatcher;
3 use crate::runtime::task::{self, Inject, Schedule, Task};
4
5 use std::thread;
6 use std::time::Duration;
7
8 #[test]
fits_256()9 fn fits_256() {
10 let (_, mut local) = queue::local();
11 let inject = Inject::new();
12
13 for _ in 0..256 {
14 let (task, _) = super::unowned(async {});
15 local.push_back(task, &inject);
16 }
17
18 assert!(inject.pop().is_none());
19
20 while local.pop().is_some() {}
21 }
22
23 #[test]
overflow()24 fn overflow() {
25 let (_, mut local) = queue::local();
26 let inject = Inject::new();
27
28 for _ in 0..257 {
29 let (task, _) = super::unowned(async {});
30 local.push_back(task, &inject);
31 }
32
33 let mut n = 0;
34
35 while inject.pop().is_some() {
36 n += 1;
37 }
38
39 while local.pop().is_some() {
40 n += 1;
41 }
42
43 assert_eq!(n, 257);
44 }
45
46 #[test]
steal_batch()47 fn steal_batch() {
48 let mut stats = WorkerStatsBatcher::new(0);
49
50 let (steal1, mut local1) = queue::local();
51 let (_, mut local2) = queue::local();
52 let inject = Inject::new();
53
54 for _ in 0..4 {
55 let (task, _) = super::unowned(async {});
56 local1.push_back(task, &inject);
57 }
58
59 assert!(steal1.steal_into(&mut local2, &mut stats).is_some());
60
61 for _ in 0..1 {
62 assert!(local2.pop().is_some());
63 }
64
65 assert!(local2.pop().is_none());
66
67 for _ in 0..2 {
68 assert!(local1.pop().is_some());
69 }
70
71 assert!(local1.pop().is_none());
72 }
73
74 #[test]
stress1()75 fn stress1() {
76 const NUM_ITER: usize = 1;
77 const NUM_STEAL: usize = 1_000;
78 const NUM_LOCAL: usize = 1_000;
79 const NUM_PUSH: usize = 500;
80 const NUM_POP: usize = 250;
81
82 for _ in 0..NUM_ITER {
83 let (steal, mut local) = queue::local();
84 let inject = Inject::new();
85
86 let th = thread::spawn(move || {
87 let mut stats = WorkerStatsBatcher::new(0);
88 let (_, mut local) = queue::local();
89 let mut n = 0;
90
91 for _ in 0..NUM_STEAL {
92 if steal.steal_into(&mut local, &mut stats).is_some() {
93 n += 1;
94 }
95
96 while local.pop().is_some() {
97 n += 1;
98 }
99
100 thread::yield_now();
101 }
102
103 n
104 });
105
106 let mut n = 0;
107
108 for _ in 0..NUM_LOCAL {
109 for _ in 0..NUM_PUSH {
110 let (task, _) = super::unowned(async {});
111 local.push_back(task, &inject);
112 }
113
114 for _ in 0..NUM_POP {
115 if local.pop().is_some() {
116 n += 1;
117 } else {
118 break;
119 }
120 }
121 }
122
123 while inject.pop().is_some() {
124 n += 1;
125 }
126
127 n += th.join().unwrap();
128
129 assert_eq!(n, NUM_LOCAL * NUM_PUSH);
130 }
131 }
132
133 #[test]
stress2()134 fn stress2() {
135 const NUM_ITER: usize = 1;
136 const NUM_TASKS: usize = 1_000_000;
137 const NUM_STEAL: usize = 1_000;
138
139 for _ in 0..NUM_ITER {
140 let (steal, mut local) = queue::local();
141 let inject = Inject::new();
142
143 let th = thread::spawn(move || {
144 let mut stats = WorkerStatsBatcher::new(0);
145 let (_, mut local) = queue::local();
146 let mut n = 0;
147
148 for _ in 0..NUM_STEAL {
149 if steal.steal_into(&mut local, &mut stats).is_some() {
150 n += 1;
151 }
152
153 while local.pop().is_some() {
154 n += 1;
155 }
156
157 thread::sleep(Duration::from_micros(10));
158 }
159
160 n
161 });
162
163 let mut num_pop = 0;
164
165 for i in 0..NUM_TASKS {
166 let (task, _) = super::unowned(async {});
167 local.push_back(task, &inject);
168
169 if i % 128 == 0 && local.pop().is_some() {
170 num_pop += 1;
171 }
172
173 while inject.pop().is_some() {
174 num_pop += 1;
175 }
176 }
177
178 num_pop += th.join().unwrap();
179
180 while local.pop().is_some() {
181 num_pop += 1;
182 }
183
184 while inject.pop().is_some() {
185 num_pop += 1;
186 }
187
188 assert_eq!(num_pop, NUM_TASKS);
189 }
190 }
191
192 struct Runtime;
193
194 impl Schedule for Runtime {
release(&self, _task: &Task<Self>) -> Option<Task<Self>>195 fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
196 None
197 }
198
schedule(&self, _task: task::Notified<Self>)199 fn schedule(&self, _task: task::Notified<Self>) {
200 unreachable!();
201 }
202 }
203