1 use futures::channel::oneshot;
2 use futures::executor::LocalPool;
3 use futures::future::{self, Future, lazy, poll_fn};
4 use futures::task::{Context, Poll, Spawn, LocalSpawn, Waker};
5 use std::cell::{Cell, RefCell};
6 use std::pin::Pin;
7 use std::rc::Rc;
8 use std::thread;
9 use std::time::Duration;
10 use std::sync::Arc;
11 use std::sync::atomic::{Ordering, AtomicBool};
12
13 struct Pending(Rc<()>);
14
15 impl Future for Pending {
16 type Output = ();
17
poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()>18 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
19 Poll::Pending
20 }
21 }
22
pending() -> Pending23 fn pending() -> Pending {
24 Pending(Rc::new(()))
25 }
26
27 #[test]
run_until_single_future()28 fn run_until_single_future() {
29 let mut cnt = 0;
30
31 {
32 let mut pool = LocalPool::new();
33 let fut = lazy(|_| {
34 cnt += 1;
35 });
36 pool.run_until(fut);
37 }
38
39 assert_eq!(cnt, 1);
40 }
41
42 #[test]
run_until_ignores_spawned()43 fn run_until_ignores_spawned() {
44 let mut pool = LocalPool::new();
45 let spawn = pool.spawner();
46 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
47 pool.run_until(lazy(|_| ()));
48 }
49
50 #[test]
run_until_executes_spawned()51 fn run_until_executes_spawned() {
52 let (tx, rx) = oneshot::channel();
53 let mut pool = LocalPool::new();
54 let spawn = pool.spawner();
55 spawn.spawn_local_obj(Box::pin(lazy(move |_| {
56 tx.send(()).unwrap();
57 })).into()).unwrap();
58 pool.run_until(rx).unwrap();
59 }
60
61 #[test]
run_returns_if_empty()62 fn run_returns_if_empty() {
63 let mut pool = LocalPool::new();
64 pool.run();
65 pool.run();
66 }
67
68 #[test]
run_executes_spawned()69 fn run_executes_spawned() {
70 let cnt = Rc::new(Cell::new(0));
71 let cnt2 = cnt.clone();
72
73 let mut pool = LocalPool::new();
74 let spawn = pool.spawner();
75 let spawn2 = pool.spawner();
76
77 spawn.spawn_local_obj(Box::pin(lazy(move |_| {
78 spawn2.spawn_local_obj(Box::pin(lazy(move |_| {
79 cnt2.set(cnt2.get() + 1);
80 })).into()).unwrap();
81 })).into()).unwrap();
82
83 pool.run();
84
85 assert_eq!(cnt.get(), 1);
86 }
87
88
89 #[test]
run_spawn_many()90 fn run_spawn_many() {
91 const ITER: usize = 200;
92
93 let cnt = Rc::new(Cell::new(0));
94
95 let mut pool = LocalPool::new();
96 let spawn = pool.spawner();
97
98 for _ in 0..ITER {
99 let cnt = cnt.clone();
100 spawn.spawn_local_obj(Box::pin(lazy(move |_| {
101 cnt.set(cnt.get() + 1);
102 })).into()).unwrap();
103 }
104
105 pool.run();
106
107 assert_eq!(cnt.get(), ITER);
108 }
109
110 #[test]
try_run_one_returns_if_empty()111 fn try_run_one_returns_if_empty() {
112 let mut pool = LocalPool::new();
113 assert!(!pool.try_run_one());
114 }
115
116 #[test]
try_run_one_executes_one_ready()117 fn try_run_one_executes_one_ready() {
118 const ITER: usize = 200;
119
120 let cnt = Rc::new(Cell::new(0));
121
122 let mut pool = LocalPool::new();
123 let spawn = pool.spawner();
124
125 for _ in 0..ITER {
126 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
127
128 let cnt = cnt.clone();
129 spawn.spawn_local_obj(Box::pin(lazy(move |_| {
130 cnt.set(cnt.get() + 1);
131 })).into()).unwrap();
132
133 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
134 }
135
136 for i in 0..ITER {
137 assert_eq!(cnt.get(), i);
138 assert!(pool.try_run_one());
139 assert_eq!(cnt.get(), i + 1);
140 }
141 assert!(!pool.try_run_one());
142 }
143
144 #[test]
try_run_one_returns_on_no_progress()145 fn try_run_one_returns_on_no_progress() {
146 const ITER: usize = 10;
147
148 let cnt = Rc::new(Cell::new(0));
149
150 let mut pool = LocalPool::new();
151 let spawn = pool.spawner();
152
153 let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
154 {
155 let cnt = cnt.clone();
156 let waker = waker.clone();
157 spawn.spawn_local_obj(Box::pin(poll_fn(move |ctx| {
158 cnt.set(cnt.get() + 1);
159 waker.set(Some(ctx.waker().clone()));
160 if cnt.get() == ITER {
161 Poll::Ready(())
162 } else {
163 Poll::Pending
164 }
165 })).into()).unwrap();
166 }
167
168 for i in 0..ITER - 1 {
169 assert_eq!(cnt.get(), i);
170 assert!(!pool.try_run_one());
171 assert_eq!(cnt.get(), i + 1);
172 let w = waker.take();
173 assert!(w.is_some());
174 w.unwrap().wake();
175 }
176 assert!(pool.try_run_one());
177 assert_eq!(cnt.get(), ITER);
178 }
179
180 #[test]
try_run_one_runs_sub_futures()181 fn try_run_one_runs_sub_futures() {
182 let mut pool = LocalPool::new();
183 let spawn = pool.spawner();
184 let cnt = Rc::new(Cell::new(0));
185
186 let inner_spawner = spawn.clone();
187 let cnt1 = cnt.clone();
188 spawn.spawn_local_obj(Box::pin(poll_fn(move |_| {
189 cnt1.set(cnt1.get() + 1);
190
191 let cnt2 = cnt1.clone();
192 inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{
193 cnt2.set(cnt2.get() + 1)
194 })).into()).unwrap();
195
196 Poll::Pending
197 })).into()).unwrap();
198
199 pool.try_run_one();
200 assert_eq!(cnt.get(), 2);
201 }
202
203 #[test]
run_until_stalled_returns_if_empty()204 fn run_until_stalled_returns_if_empty() {
205 let mut pool = LocalPool::new();
206 pool.run_until_stalled();
207 pool.run_until_stalled();
208 }
209
210 #[test]
run_until_stalled_returns_multiple_times()211 fn run_until_stalled_returns_multiple_times() {
212 let mut pool = LocalPool::new();
213 let spawn = pool.spawner();
214 let cnt = Rc::new(Cell::new(0));
215
216 let cnt1 = cnt.clone();
217 spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt1.set(cnt1.get() + 1) })).into()).unwrap();
218 pool.run_until_stalled();
219 assert_eq!(cnt.get(), 1);
220
221 let cnt2 = cnt.clone();
222 spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt2.set(cnt2.get() + 1) })).into()).unwrap();
223 pool.run_until_stalled();
224 assert_eq!(cnt.get(), 2);
225 }
226
227 #[test]
run_until_stalled_runs_spawned_sub_futures()228 fn run_until_stalled_runs_spawned_sub_futures() {
229 let mut pool = LocalPool::new();
230 let spawn = pool.spawner();
231 let cnt = Rc::new(Cell::new(0));
232
233 let inner_spawner = spawn.clone();
234 let cnt1 = cnt.clone();
235 spawn.spawn_local_obj(Box::pin(poll_fn(move |_| {
236 cnt1.set(cnt1.get() + 1);
237
238 let cnt2 = cnt1.clone();
239 inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{
240 cnt2.set(cnt2.get() + 1)
241 })).into()).unwrap();
242
243 Poll::Pending
244 })).into()).unwrap();
245
246 pool.run_until_stalled();
247 assert_eq!(cnt.get(), 2);
248 }
249
250 #[test]
run_until_stalled_executes_all_ready()251 fn run_until_stalled_executes_all_ready() {
252 const ITER: usize = 200;
253 const PER_ITER: usize = 3;
254
255 let cnt = Rc::new(Cell::new(0));
256
257 let mut pool = LocalPool::new();
258 let spawn = pool.spawner();
259
260 for i in 0..ITER {
261 for _ in 0..PER_ITER {
262 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
263
264 let cnt = cnt.clone();
265 spawn.spawn_local_obj(Box::pin(lazy(move |_| {
266 cnt.set(cnt.get() + 1);
267 })).into()).unwrap();
268
269 // also add some pending tasks to test if they are ignored
270 spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
271 }
272 assert_eq!(cnt.get(), i * PER_ITER);
273 pool.run_until_stalled();
274 assert_eq!(cnt.get(), (i + 1) * PER_ITER);
275 }
276 }
277
278 #[test]
279 #[should_panic]
nesting_run()280 fn nesting_run() {
281 let mut pool = LocalPool::new();
282 let spawn = pool.spawner();
283
284 spawn.spawn_obj(Box::pin(lazy(|_| {
285 let mut pool = LocalPool::new();
286 pool.run();
287 })).into()).unwrap();
288
289 pool.run();
290 }
291
292 #[test]
293 #[should_panic]
nesting_run_run_until_stalled()294 fn nesting_run_run_until_stalled() {
295 let mut pool = LocalPool::new();
296 let spawn = pool.spawner();
297
298 spawn.spawn_obj(Box::pin(lazy(|_| {
299 let mut pool = LocalPool::new();
300 pool.run_until_stalled();
301 })).into()).unwrap();
302
303 pool.run();
304 }
305
306 #[test]
tasks_are_scheduled_fairly()307 fn tasks_are_scheduled_fairly() {
308 let state = Rc::new(RefCell::new([0, 0]));
309
310 struct Spin {
311 state: Rc<RefCell<[i32; 2]>>,
312 idx: usize,
313 }
314
315 impl Future for Spin {
316 type Output = ();
317
318 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
319 let mut state = self.state.borrow_mut();
320
321 if self.idx == 0 {
322 let diff = state[0] - state[1];
323
324 assert!(diff.abs() <= 1);
325
326 if state[0] >= 50 {
327 return Poll::Ready(());
328 }
329 }
330
331 state[self.idx] += 1;
332
333 if state[self.idx] >= 100 {
334 return Poll::Ready(());
335 }
336
337 cx.waker().wake_by_ref();
338 Poll::Pending
339 }
340 }
341
342 let mut pool = LocalPool::new();
343 let spawn = pool.spawner();
344
345 spawn.spawn_local_obj(Box::pin(Spin {
346 state: state.clone(),
347 idx: 0,
348 }).into()).unwrap();
349
350 spawn.spawn_local_obj(Box::pin(Spin {
351 state,
352 idx: 1,
353 }).into()).unwrap();
354
355 pool.run();
356 }
357
358 // Tests that the use of park/unpark in user-code has no
359 // effect on the expected behaviour of the executor.
360 #[test]
park_unpark_independence()361 fn park_unpark_independence() {
362 let mut done = false;
363
364 let future = future::poll_fn(move |cx| {
365 if done {
366 return Poll::Ready(())
367 }
368 done = true;
369 cx.waker().clone().wake(); // (*)
370 // some user-code that temporarily parks the thread
371 let test = thread::current();
372 let latch = Arc::new(AtomicBool::new(false));
373 let signal = latch.clone();
374 thread::spawn(move || {
375 thread::sleep(Duration::from_millis(10));
376 signal.store(true, Ordering::SeqCst);
377 test.unpark()
378 });
379 while !latch.load(Ordering::Relaxed) {
380 thread::park();
381 }
382 Poll::Pending // Expect to be called again due to (*).
383 });
384
385 futures::executor::block_on(future)
386 }
387
388