1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use tokio::runtime::{self, Runtime};
5 use tokio::sync::{mpsc, oneshot};
6 use tokio::task::{self, LocalSet};
7 use tokio::time;
8 
9 use std::cell::Cell;
10 use std::sync::atomic::Ordering::{self, SeqCst};
11 use std::sync::atomic::{AtomicBool, AtomicUsize};
12 use std::time::Duration;
13 
14 #[tokio::test(basic_scheduler)]
local_basic_scheduler()15 async fn local_basic_scheduler() {
16     LocalSet::new()
17         .run_until(async {
18             task::spawn_local(async {}).await.unwrap();
19         })
20         .await;
21 }
22 
23 #[tokio::test(threaded_scheduler)]
local_threadpool()24 async fn local_threadpool() {
25     thread_local! {
26         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
27     }
28 
29     ON_RT_THREAD.with(|cell| cell.set(true));
30 
31     LocalSet::new()
32         .run_until(async {
33             assert!(ON_RT_THREAD.with(|cell| cell.get()));
34             task::spawn_local(async {
35                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
36             })
37             .await
38             .unwrap();
39         })
40         .await;
41 }
42 
43 #[tokio::test(threaded_scheduler)]
localset_future_threadpool()44 async fn localset_future_threadpool() {
45     thread_local! {
46         static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false);
47     }
48 
49     ON_LOCAL_THREAD.with(|cell| cell.set(true));
50 
51     let local = LocalSet::new();
52     local.spawn_local(async move {
53         assert!(ON_LOCAL_THREAD.with(|cell| cell.get()));
54     });
55     local.await;
56 }
57 
58 #[tokio::test(threaded_scheduler)]
localset_future_timers()59 async fn localset_future_timers() {
60     static RAN1: AtomicBool = AtomicBool::new(false);
61     static RAN2: AtomicBool = AtomicBool::new(false);
62 
63     let local = LocalSet::new();
64     local.spawn_local(async move {
65         time::delay_for(Duration::from_millis(10)).await;
66         RAN1.store(true, Ordering::SeqCst);
67     });
68     local.spawn_local(async move {
69         time::delay_for(Duration::from_millis(20)).await;
70         RAN2.store(true, Ordering::SeqCst);
71     });
72     local.await;
73     assert!(RAN1.load(Ordering::SeqCst));
74     assert!(RAN2.load(Ordering::SeqCst));
75 }
76 
77 #[tokio::test]
localset_future_drives_all_local_futs()78 async fn localset_future_drives_all_local_futs() {
79     static RAN1: AtomicBool = AtomicBool::new(false);
80     static RAN2: AtomicBool = AtomicBool::new(false);
81     static RAN3: AtomicBool = AtomicBool::new(false);
82 
83     let local = LocalSet::new();
84     local.spawn_local(async move {
85         task::spawn_local(async {
86             task::yield_now().await;
87             RAN3.store(true, Ordering::SeqCst);
88         });
89         task::yield_now().await;
90         RAN1.store(true, Ordering::SeqCst);
91     });
92     local.spawn_local(async move {
93         task::yield_now().await;
94         RAN2.store(true, Ordering::SeqCst);
95     });
96     local.await;
97     assert!(RAN1.load(Ordering::SeqCst));
98     assert!(RAN2.load(Ordering::SeqCst));
99     assert!(RAN3.load(Ordering::SeqCst));
100 }
101 
102 #[tokio::test(threaded_scheduler)]
local_threadpool_timer()103 async fn local_threadpool_timer() {
104     // This test ensures that runtime services like the timer are properly
105     // set for the local task set.
106     thread_local! {
107         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
108     }
109 
110     ON_RT_THREAD.with(|cell| cell.set(true));
111 
112     LocalSet::new()
113         .run_until(async {
114             assert!(ON_RT_THREAD.with(|cell| cell.get()));
115             let join = task::spawn_local(async move {
116                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
117                 time::delay_for(Duration::from_millis(10)).await;
118                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
119             });
120             join.await.unwrap();
121         })
122         .await;
123 }
124 
125 #[test]
126 // This will panic, since the thread that calls `block_on` cannot use
127 // in-place blocking inside of `block_on`.
128 #[should_panic]
local_threadpool_blocking_in_place()129 fn local_threadpool_blocking_in_place() {
130     thread_local! {
131         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
132     }
133 
134     ON_RT_THREAD.with(|cell| cell.set(true));
135 
136     let mut rt = runtime::Builder::new()
137         .threaded_scheduler()
138         .enable_all()
139         .build()
140         .unwrap();
141     LocalSet::new().block_on(&mut rt, async {
142         assert!(ON_RT_THREAD.with(|cell| cell.get()));
143         let join = task::spawn_local(async move {
144             assert!(ON_RT_THREAD.with(|cell| cell.get()));
145             task::block_in_place(|| {});
146             assert!(ON_RT_THREAD.with(|cell| cell.get()));
147         });
148         join.await.unwrap();
149     });
150 }
151 
152 #[tokio::test(threaded_scheduler)]
local_threadpool_blocking_run()153 async fn local_threadpool_blocking_run() {
154     thread_local! {
155         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
156     }
157 
158     ON_RT_THREAD.with(|cell| cell.set(true));
159 
160     LocalSet::new()
161         .run_until(async {
162             assert!(ON_RT_THREAD.with(|cell| cell.get()));
163             let join = task::spawn_local(async move {
164                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
165                 task::spawn_blocking(|| {
166                     assert!(
167                         !ON_RT_THREAD.with(|cell| cell.get()),
168                         "blocking must not run on the local task set's thread"
169                     );
170                 })
171                 .await
172                 .unwrap();
173                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
174             });
175             join.await.unwrap();
176         })
177         .await;
178 }
179 
180 #[tokio::test(threaded_scheduler)]
all_spawns_are_local()181 async fn all_spawns_are_local() {
182     use futures::future;
183     thread_local! {
184         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
185     }
186 
187     ON_RT_THREAD.with(|cell| cell.set(true));
188 
189     LocalSet::new()
190         .run_until(async {
191             assert!(ON_RT_THREAD.with(|cell| cell.get()));
192             let handles = (0..128)
193                 .map(|_| {
194                     task::spawn_local(async {
195                         assert!(ON_RT_THREAD.with(|cell| cell.get()));
196                     })
197                 })
198                 .collect::<Vec<_>>();
199             for joined in future::join_all(handles).await {
200                 joined.unwrap();
201             }
202         })
203         .await;
204 }
205 
206 #[tokio::test(threaded_scheduler)]
nested_spawn_is_local()207 async fn nested_spawn_is_local() {
208     thread_local! {
209         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
210     }
211 
212     ON_RT_THREAD.with(|cell| cell.set(true));
213 
214     LocalSet::new()
215         .run_until(async {
216             assert!(ON_RT_THREAD.with(|cell| cell.get()));
217             task::spawn_local(async {
218                 assert!(ON_RT_THREAD.with(|cell| cell.get()));
219                 task::spawn_local(async {
220                     assert!(ON_RT_THREAD.with(|cell| cell.get()));
221                     task::spawn_local(async {
222                         assert!(ON_RT_THREAD.with(|cell| cell.get()));
223                         task::spawn_local(async {
224                             assert!(ON_RT_THREAD.with(|cell| cell.get()));
225                         })
226                         .await
227                         .unwrap();
228                     })
229                     .await
230                     .unwrap();
231                 })
232                 .await
233                 .unwrap();
234             })
235             .await
236             .unwrap();
237         })
238         .await;
239 }
240 
241 #[test]
join_local_future_elsewhere()242 fn join_local_future_elsewhere() {
243     thread_local! {
244         static ON_RT_THREAD: Cell<bool> = Cell::new(false);
245     }
246 
247     ON_RT_THREAD.with(|cell| cell.set(true));
248 
249     let mut rt = runtime::Builder::new()
250         .threaded_scheduler()
251         .build()
252         .unwrap();
253     let local = LocalSet::new();
254     local.block_on(&mut rt, async move {
255         let (tx, rx) = oneshot::channel();
256         let join = task::spawn_local(async move {
257             println!("hello world running...");
258             assert!(
259                 ON_RT_THREAD.with(|cell| cell.get()),
260                 "local task must run on local thread, no matter where it is awaited"
261             );
262             rx.await.unwrap();
263 
264             println!("hello world task done");
265             "hello world"
266         });
267         let join2 = task::spawn(async move {
268             assert!(
269                 !ON_RT_THREAD.with(|cell| cell.get()),
270                 "spawned task should be on a worker"
271             );
272 
273             tx.send(()).expect("task shouldn't have ended yet");
274             println!("waking up hello world...");
275 
276             join.await.expect("task should complete successfully");
277 
278             println!("hello world task joined");
279         });
280         join2.await.unwrap()
281     });
282 }
283 
284 #[test]
drop_cancels_tasks()285 fn drop_cancels_tasks() {
286     use std::rc::Rc;
287 
288     // This test reproduces issue #1842
289     let mut rt = rt();
290     let rc1 = Rc::new(());
291     let rc2 = rc1.clone();
292 
293     let (started_tx, started_rx) = oneshot::channel();
294 
295     let local = LocalSet::new();
296     local.spawn_local(async move {
297         // Move this in
298         let _rc2 = rc2;
299 
300         started_tx.send(()).unwrap();
301         loop {
302             time::delay_for(Duration::from_secs(3600)).await;
303         }
304     });
305 
306     local.block_on(&mut rt, async {
307         started_rx.await.unwrap();
308     });
309     drop(local);
310     drop(rt);
311 
312     assert_eq!(1, Rc::strong_count(&rc1));
313 }
314 
315 /// Runs a test function in a separate thread, and panics if the test does not
316 /// complete within the specified timeout, or if the test function panics.
317 ///
318 /// This is intended for running tests whose failure mode is a hang or infinite
319 /// loop that cannot be detected otherwise.
with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static)320 fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
321     use std::sync::mpsc::RecvTimeoutError;
322 
323     let (done_tx, done_rx) = std::sync::mpsc::channel();
324     let thread = std::thread::spawn(move || {
325         f();
326 
327         // Send a message on the channel so that the test thread can
328         // determine if we have entered an infinite loop:
329         done_tx.send(()).unwrap();
330     });
331 
332     // Since the failure mode of this test is an infinite loop, rather than
333     // something we can easily make assertions about, we'll run it in a
334     // thread. When the test thread finishes, it will send a message on a
335     // channel to this thread. We'll wait for that message with a fairly
336     // generous timeout, and if we don't recieve it, we assume the test
337     // thread has hung.
338     //
339     // Note that it should definitely complete in under a minute, but just
340     // in case CI is slow, we'll give it a long timeout.
341     match done_rx.recv_timeout(timeout) {
342         Err(RecvTimeoutError::Timeout) => panic!(
343             "test did not complete within {:?} seconds, \
344              we have (probably) entered an infinite loop!",
345             timeout,
346         ),
347         // Did the test thread panic? We'll find out for sure when we `join`
348         // with it.
349         Err(RecvTimeoutError::Disconnected) => {
350             println!("done_rx dropped, did the test thread panic?");
351         }
352         // Test completed successfully!
353         Ok(()) => {}
354     }
355 
356     thread.join().expect("test thread should not panic!")
357 }
358 
359 #[test]
drop_cancels_remote_tasks()360 fn drop_cancels_remote_tasks() {
361     // This test reproduces issue #1885.
362     with_timeout(Duration::from_secs(60), || {
363         let (tx, mut rx) = mpsc::channel::<()>(1024);
364 
365         let mut rt = rt();
366 
367         let local = LocalSet::new();
368         local.spawn_local(async move { while rx.recv().await.is_some() {} });
369         local.block_on(&mut rt, async {
370             time::delay_for(Duration::from_millis(1)).await;
371         });
372 
373         drop(tx);
374 
375         // This enters an infinite loop if the remote notified tasks are not
376         // properly cancelled.
377         drop(local);
378     });
379 }
380 
381 #[test]
local_tasks_wake_join_all()382 fn local_tasks_wake_join_all() {
383     // This test reproduces issue #2460.
384     with_timeout(Duration::from_secs(60), || {
385         use futures::future::join_all;
386         use tokio::task::LocalSet;
387 
388         let mut rt = rt();
389         let set = LocalSet::new();
390         let mut handles = Vec::new();
391 
392         for _ in 1..=128 {
393             handles.push(set.spawn_local(async move {
394                 tokio::task::spawn_local(async move {}).await.unwrap();
395             }));
396         }
397 
398         rt.block_on(set.run_until(join_all(handles)));
399     });
400 }
401 
402 #[tokio::test]
local_tasks_are_polled_after_tick()403 async fn local_tasks_are_polled_after_tick() {
404     // Reproduces issues #1899 and #1900
405 
406     static RX1: AtomicUsize = AtomicUsize::new(0);
407     static RX2: AtomicUsize = AtomicUsize::new(0);
408     static EXPECTED: usize = 500;
409 
410     let (tx, mut rx) = mpsc::unbounded_channel();
411 
412     let local = LocalSet::new();
413 
414     local
415         .run_until(async {
416             let task2 = task::spawn(async move {
417                 // Wait a bit
418                 time::delay_for(Duration::from_millis(100)).await;
419 
420                 let mut oneshots = Vec::with_capacity(EXPECTED);
421 
422                 // Send values
423                 for _ in 0..EXPECTED {
424                     let (oneshot_tx, oneshot_rx) = oneshot::channel();
425                     oneshots.push(oneshot_tx);
426                     tx.send(oneshot_rx).unwrap();
427                 }
428 
429                 time::delay_for(Duration::from_millis(100)).await;
430 
431                 for tx in oneshots.drain(..) {
432                     tx.send(()).unwrap();
433                 }
434 
435                 time::delay_for(Duration::from_millis(300)).await;
436                 let rx1 = RX1.load(SeqCst);
437                 let rx2 = RX2.load(SeqCst);
438                 println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2);
439                 assert_eq!(EXPECTED, rx1);
440                 assert_eq!(EXPECTED, rx2);
441             });
442 
443             while let Some(oneshot) = rx.recv().await {
444                 RX1.fetch_add(1, SeqCst);
445 
446                 task::spawn_local(async move {
447                     oneshot.await.unwrap();
448                     RX2.fetch_add(1, SeqCst);
449                 });
450             }
451 
452             task2.await.unwrap();
453         })
454         .await;
455 }
456 
457 #[tokio::test]
acquire_mutex_in_drop()458 async fn acquire_mutex_in_drop() {
459     use futures::future::pending;
460 
461     let (tx1, rx1) = oneshot::channel();
462     let (tx2, rx2) = oneshot::channel();
463     let local = LocalSet::new();
464 
465     local.spawn_local(async move {
466         let _ = rx2.await;
467         unreachable!();
468     });
469 
470     local.spawn_local(async move {
471         let _ = rx1.await;
472         tx2.send(()).unwrap();
473         unreachable!();
474     });
475 
476     // Spawn a task that will never notify
477     local.spawn_local(async move {
478         pending::<()>().await;
479         tx1.send(()).unwrap();
480     });
481 
482     // Tick the loop
483     local
484         .run_until(async {
485             task::yield_now().await;
486         })
487         .await;
488 
489     // Drop the LocalSet
490     drop(local);
491 }
492 
rt() -> Runtime493 fn rt() -> Runtime {
494     tokio::runtime::Builder::new()
495         .basic_scheduler()
496         .enable_all()
497         .build()
498         .unwrap()
499 }
500