1 use crate::runtime::queue;
2 use crate::runtime::stats::WorkerStatsBatcher;
3 use crate::runtime::task::{self, Inject, Schedule, Task};
4 
5 use std::thread;
6 use std::time::Duration;
7 
8 #[test]
fits_256()9 fn fits_256() {
10     let (_, mut local) = queue::local();
11     let inject = Inject::new();
12 
13     for _ in 0..256 {
14         let (task, _) = super::unowned(async {});
15         local.push_back(task, &inject);
16     }
17 
18     assert!(inject.pop().is_none());
19 
20     while local.pop().is_some() {}
21 }
22 
23 #[test]
overflow()24 fn overflow() {
25     let (_, mut local) = queue::local();
26     let inject = Inject::new();
27 
28     for _ in 0..257 {
29         let (task, _) = super::unowned(async {});
30         local.push_back(task, &inject);
31     }
32 
33     let mut n = 0;
34 
35     while inject.pop().is_some() {
36         n += 1;
37     }
38 
39     while local.pop().is_some() {
40         n += 1;
41     }
42 
43     assert_eq!(n, 257);
44 }
45 
46 #[test]
steal_batch()47 fn steal_batch() {
48     let mut stats = WorkerStatsBatcher::new(0);
49 
50     let (steal1, mut local1) = queue::local();
51     let (_, mut local2) = queue::local();
52     let inject = Inject::new();
53 
54     for _ in 0..4 {
55         let (task, _) = super::unowned(async {});
56         local1.push_back(task, &inject);
57     }
58 
59     assert!(steal1.steal_into(&mut local2, &mut stats).is_some());
60 
61     for _ in 0..1 {
62         assert!(local2.pop().is_some());
63     }
64 
65     assert!(local2.pop().is_none());
66 
67     for _ in 0..2 {
68         assert!(local1.pop().is_some());
69     }
70 
71     assert!(local1.pop().is_none());
72 }
73 
74 #[test]
stress1()75 fn stress1() {
76     const NUM_ITER: usize = 1;
77     const NUM_STEAL: usize = 1_000;
78     const NUM_LOCAL: usize = 1_000;
79     const NUM_PUSH: usize = 500;
80     const NUM_POP: usize = 250;
81 
82     for _ in 0..NUM_ITER {
83         let (steal, mut local) = queue::local();
84         let inject = Inject::new();
85 
86         let th = thread::spawn(move || {
87             let mut stats = WorkerStatsBatcher::new(0);
88             let (_, mut local) = queue::local();
89             let mut n = 0;
90 
91             for _ in 0..NUM_STEAL {
92                 if steal.steal_into(&mut local, &mut stats).is_some() {
93                     n += 1;
94                 }
95 
96                 while local.pop().is_some() {
97                     n += 1;
98                 }
99 
100                 thread::yield_now();
101             }
102 
103             n
104         });
105 
106         let mut n = 0;
107 
108         for _ in 0..NUM_LOCAL {
109             for _ in 0..NUM_PUSH {
110                 let (task, _) = super::unowned(async {});
111                 local.push_back(task, &inject);
112             }
113 
114             for _ in 0..NUM_POP {
115                 if local.pop().is_some() {
116                     n += 1;
117                 } else {
118                     break;
119                 }
120             }
121         }
122 
123         while inject.pop().is_some() {
124             n += 1;
125         }
126 
127         n += th.join().unwrap();
128 
129         assert_eq!(n, NUM_LOCAL * NUM_PUSH);
130     }
131 }
132 
133 #[test]
stress2()134 fn stress2() {
135     const NUM_ITER: usize = 1;
136     const NUM_TASKS: usize = 1_000_000;
137     const NUM_STEAL: usize = 1_000;
138 
139     for _ in 0..NUM_ITER {
140         let (steal, mut local) = queue::local();
141         let inject = Inject::new();
142 
143         let th = thread::spawn(move || {
144             let mut stats = WorkerStatsBatcher::new(0);
145             let (_, mut local) = queue::local();
146             let mut n = 0;
147 
148             for _ in 0..NUM_STEAL {
149                 if steal.steal_into(&mut local, &mut stats).is_some() {
150                     n += 1;
151                 }
152 
153                 while local.pop().is_some() {
154                     n += 1;
155                 }
156 
157                 thread::sleep(Duration::from_micros(10));
158             }
159 
160             n
161         });
162 
163         let mut num_pop = 0;
164 
165         for i in 0..NUM_TASKS {
166             let (task, _) = super::unowned(async {});
167             local.push_back(task, &inject);
168 
169             if i % 128 == 0 && local.pop().is_some() {
170                 num_pop += 1;
171             }
172 
173             while inject.pop().is_some() {
174                 num_pop += 1;
175             }
176         }
177 
178         num_pop += th.join().unwrap();
179 
180         while local.pop().is_some() {
181             num_pop += 1;
182         }
183 
184         while inject.pop().is_some() {
185             num_pop += 1;
186         }
187 
188         assert_eq!(num_pop, NUM_TASKS);
189     }
190 }
191 
192 struct Runtime;
193 
194 impl Schedule for Runtime {
release(&self, _task: &Task<Self>) -> Option<Task<Self>>195     fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
196         None
197     }
198 
schedule(&self, _task: task::Notified<Self>)199     fn schedule(&self, _task: task::Notified<Self>) {
200         unreachable!();
201     }
202 }
203