1 use futures::channel::oneshot;
2 use futures::executor::LocalPool;
3 use futures::future::{self, Future, lazy, poll_fn};
4 use futures::task::{Context, Poll, Spawn, LocalSpawn, Waker};
5 use std::cell::{Cell, RefCell};
6 use std::pin::Pin;
7 use std::rc::Rc;
8 use std::thread;
9 use std::time::Duration;
10 use std::sync::Arc;
11 use std::sync::atomic::{Ordering, AtomicBool};
12 
13 struct Pending(Rc<()>);
14 
15 impl Future for Pending {
16     type Output = ();
17 
poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()>18     fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
19         Poll::Pending
20     }
21 }
22 
pending() -> Pending23 fn pending() -> Pending {
24     Pending(Rc::new(()))
25 }
26 
27 #[test]
run_until_single_future()28 fn run_until_single_future() {
29     let mut cnt = 0;
30 
31     {
32         let mut pool = LocalPool::new();
33         let fut = lazy(|_| {
34             cnt += 1;
35         });
36         pool.run_until(fut);
37     }
38 
39     assert_eq!(cnt, 1);
40 }
41 
42 #[test]
run_until_ignores_spawned()43 fn run_until_ignores_spawned() {
44     let mut pool = LocalPool::new();
45     let spawn = pool.spawner();
46     spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
47     pool.run_until(lazy(|_| ()));
48 }
49 
50 #[test]
run_until_executes_spawned()51 fn run_until_executes_spawned() {
52     let (tx, rx) = oneshot::channel();
53     let mut pool = LocalPool::new();
54     let spawn = pool.spawner();
55     spawn.spawn_local_obj(Box::pin(lazy(move |_| {
56         tx.send(()).unwrap();
57     })).into()).unwrap();
58     pool.run_until(rx).unwrap();
59 }
60 
61 #[test]
run_returns_if_empty()62 fn run_returns_if_empty() {
63     let mut pool = LocalPool::new();
64     pool.run();
65     pool.run();
66 }
67 
68 #[test]
run_executes_spawned()69 fn run_executes_spawned() {
70     let cnt = Rc::new(Cell::new(0));
71     let cnt2 = cnt.clone();
72 
73     let mut pool = LocalPool::new();
74     let spawn = pool.spawner();
75     let spawn2 = pool.spawner();
76 
77     spawn.spawn_local_obj(Box::pin(lazy(move |_| {
78         spawn2.spawn_local_obj(Box::pin(lazy(move |_| {
79             cnt2.set(cnt2.get() + 1);
80         })).into()).unwrap();
81     })).into()).unwrap();
82 
83     pool.run();
84 
85     assert_eq!(cnt.get(), 1);
86 }
87 
88 
89 #[test]
run_spawn_many()90 fn run_spawn_many() {
91     const ITER: usize = 200;
92 
93     let cnt = Rc::new(Cell::new(0));
94 
95     let mut pool = LocalPool::new();
96     let spawn = pool.spawner();
97 
98     for _ in 0..ITER {
99         let cnt = cnt.clone();
100         spawn.spawn_local_obj(Box::pin(lazy(move |_| {
101             cnt.set(cnt.get() + 1);
102         })).into()).unwrap();
103     }
104 
105     pool.run();
106 
107     assert_eq!(cnt.get(), ITER);
108 }
109 
110 #[test]
try_run_one_returns_if_empty()111 fn try_run_one_returns_if_empty() {
112     let mut pool = LocalPool::new();
113     assert!(!pool.try_run_one());
114 }
115 
116 #[test]
try_run_one_executes_one_ready()117 fn try_run_one_executes_one_ready() {
118     const ITER: usize = 200;
119 
120     let cnt = Rc::new(Cell::new(0));
121 
122     let mut pool = LocalPool::new();
123     let spawn = pool.spawner();
124 
125     for _ in 0..ITER {
126         spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
127 
128         let cnt = cnt.clone();
129         spawn.spawn_local_obj(Box::pin(lazy(move |_| {
130             cnt.set(cnt.get() + 1);
131         })).into()).unwrap();
132 
133         spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
134     }
135 
136     for i in 0..ITER {
137         assert_eq!(cnt.get(), i);
138         assert!(pool.try_run_one());
139         assert_eq!(cnt.get(), i + 1);
140     }
141     assert!(!pool.try_run_one());
142 }
143 
144 #[test]
try_run_one_returns_on_no_progress()145 fn try_run_one_returns_on_no_progress() {
146     const ITER: usize = 10;
147 
148     let cnt = Rc::new(Cell::new(0));
149 
150     let mut pool = LocalPool::new();
151     let spawn = pool.spawner();
152 
153     let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
154     {
155         let cnt = cnt.clone();
156         let waker = waker.clone();
157         spawn.spawn_local_obj(Box::pin(poll_fn(move |ctx| {
158             cnt.set(cnt.get() + 1);
159             waker.set(Some(ctx.waker().clone()));
160             if cnt.get() == ITER {
161                 Poll::Ready(())
162             } else {
163                 Poll::Pending
164             }
165         })).into()).unwrap();
166     }
167 
168     for i in 0..ITER - 1 {
169         assert_eq!(cnt.get(), i);
170         assert!(!pool.try_run_one());
171         assert_eq!(cnt.get(), i + 1);
172         let w = waker.take();
173         assert!(w.is_some());
174         w.unwrap().wake();
175     }
176     assert!(pool.try_run_one());
177     assert_eq!(cnt.get(), ITER);
178 }
179 
180 #[test]
try_run_one_runs_sub_futures()181 fn try_run_one_runs_sub_futures() {
182     let mut pool = LocalPool::new();
183     let spawn = pool.spawner();
184     let cnt = Rc::new(Cell::new(0));
185 
186     let inner_spawner = spawn.clone();
187     let cnt1 = cnt.clone();
188     spawn.spawn_local_obj(Box::pin(poll_fn(move |_| {
189         cnt1.set(cnt1.get() + 1);
190 
191         let cnt2 = cnt1.clone();
192         inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{
193             cnt2.set(cnt2.get() + 1)
194         })).into()).unwrap();
195 
196         Poll::Pending
197     })).into()).unwrap();
198 
199     pool.try_run_one();
200     assert_eq!(cnt.get(), 2);
201 }
202 
203 #[test]
run_until_stalled_returns_if_empty()204 fn run_until_stalled_returns_if_empty() {
205     let mut pool = LocalPool::new();
206     pool.run_until_stalled();
207     pool.run_until_stalled();
208 }
209 
210 #[test]
run_until_stalled_returns_multiple_times()211 fn run_until_stalled_returns_multiple_times() {
212     let mut pool = LocalPool::new();
213     let spawn = pool.spawner();
214     let cnt = Rc::new(Cell::new(0));
215 
216     let cnt1 = cnt.clone();
217     spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt1.set(cnt1.get() + 1) })).into()).unwrap();
218     pool.run_until_stalled();
219     assert_eq!(cnt.get(), 1);
220 
221     let cnt2 = cnt.clone();
222     spawn.spawn_local_obj(Box::pin(lazy(move |_|{ cnt2.set(cnt2.get() + 1) })).into()).unwrap();
223     pool.run_until_stalled();
224     assert_eq!(cnt.get(), 2);
225 }
226 
227 #[test]
run_until_stalled_runs_spawned_sub_futures()228 fn run_until_stalled_runs_spawned_sub_futures() {
229     let mut pool = LocalPool::new();
230     let spawn = pool.spawner();
231     let cnt = Rc::new(Cell::new(0));
232 
233     let inner_spawner = spawn.clone();
234     let cnt1 = cnt.clone();
235     spawn.spawn_local_obj(Box::pin(poll_fn(move |_| {
236         cnt1.set(cnt1.get() + 1);
237 
238         let cnt2 = cnt1.clone();
239         inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{
240             cnt2.set(cnt2.get() + 1)
241         })).into()).unwrap();
242 
243         Poll::Pending
244     })).into()).unwrap();
245 
246     pool.run_until_stalled();
247     assert_eq!(cnt.get(), 2);
248 }
249 
250 #[test]
run_until_stalled_executes_all_ready()251 fn run_until_stalled_executes_all_ready() {
252     const ITER: usize = 200;
253     const PER_ITER: usize = 3;
254 
255     let cnt = Rc::new(Cell::new(0));
256 
257     let mut pool = LocalPool::new();
258     let spawn = pool.spawner();
259 
260     for i in 0..ITER {
261         for _ in 0..PER_ITER {
262             spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
263 
264             let cnt = cnt.clone();
265             spawn.spawn_local_obj(Box::pin(lazy(move |_| {
266                 cnt.set(cnt.get() + 1);
267             })).into()).unwrap();
268 
269             // also add some pending tasks to test if they are ignored
270             spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
271         }
272         assert_eq!(cnt.get(), i * PER_ITER);
273         pool.run_until_stalled();
274         assert_eq!(cnt.get(), (i + 1) * PER_ITER);
275     }
276 }
277 
278 #[test]
279 #[should_panic]
nesting_run()280 fn nesting_run() {
281     let mut pool = LocalPool::new();
282     let spawn = pool.spawner();
283 
284     spawn.spawn_obj(Box::pin(lazy(|_| {
285         let mut pool = LocalPool::new();
286         pool.run();
287     })).into()).unwrap();
288 
289     pool.run();
290 }
291 
292 #[test]
293 #[should_panic]
nesting_run_run_until_stalled()294 fn nesting_run_run_until_stalled() {
295     let mut pool = LocalPool::new();
296     let spawn = pool.spawner();
297 
298     spawn.spawn_obj(Box::pin(lazy(|_| {
299         let mut pool = LocalPool::new();
300         pool.run_until_stalled();
301     })).into()).unwrap();
302 
303     pool.run();
304 }
305 
306 #[test]
tasks_are_scheduled_fairly()307 fn tasks_are_scheduled_fairly() {
308     let state = Rc::new(RefCell::new([0, 0]));
309 
310     struct Spin {
311         state: Rc<RefCell<[i32; 2]>>,
312         idx: usize,
313     }
314 
315     impl Future for Spin {
316         type Output = ();
317 
318         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
319             let mut state = self.state.borrow_mut();
320 
321             if self.idx == 0 {
322                 let diff = state[0] - state[1];
323 
324                 assert!(diff.abs() <= 1);
325 
326                 if state[0] >= 50 {
327                     return Poll::Ready(());
328                 }
329             }
330 
331             state[self.idx] += 1;
332 
333             if state[self.idx] >= 100 {
334                 return Poll::Ready(());
335             }
336 
337             cx.waker().wake_by_ref();
338             Poll::Pending
339         }
340     }
341 
342     let mut pool = LocalPool::new();
343     let spawn = pool.spawner();
344 
345     spawn.spawn_local_obj(Box::pin(Spin {
346         state: state.clone(),
347         idx: 0,
348     }).into()).unwrap();
349 
350     spawn.spawn_local_obj(Box::pin(Spin {
351         state,
352         idx: 1,
353     }).into()).unwrap();
354 
355     pool.run();
356 }
357 
358 // Tests that the use of park/unpark in user-code has no
359 // effect on the expected behaviour of the executor.
360 #[test]
park_unpark_independence()361 fn park_unpark_independence() {
362     let mut done = false;
363 
364     let future = future::poll_fn(move |cx| {
365         if done {
366             return Poll::Ready(())
367         }
368         done = true;
369         cx.waker().clone().wake(); // (*)
370         // some user-code that temporarily parks the thread
371         let test = thread::current();
372         let latch = Arc::new(AtomicBool::new(false));
373         let signal = latch.clone();
374         thread::spawn(move || {
375             thread::sleep(Duration::from_millis(10));
376             signal.store(true, Ordering::SeqCst);
377             test.unpark()
378         });
379         while !latch.load(Ordering::Relaxed) {
380             thread::park();
381         }
382         Poll::Pending // Expect to be called again due to (*).
383     });
384 
385     futures::executor::block_on(future)
386 }
387 
388