1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use tokio::runtime::{self, Runtime};
5 use tokio::sync::{mpsc, oneshot};
6 use tokio::task::{self, LocalSet};
7 use tokio::time;
8
9 use std::cell::Cell;
10 use std::sync::atomic::Ordering::{self, SeqCst};
11 use std::sync::atomic::{AtomicBool, AtomicUsize};
12 use std::time::Duration;
13
14 #[tokio::test(basic_scheduler)]
local_basic_scheduler()15 async fn local_basic_scheduler() {
16 LocalSet::new()
17 .run_until(async {
18 task::spawn_local(async {}).await.unwrap();
19 })
20 .await;
21 }
22
23 #[tokio::test(threaded_scheduler)]
local_threadpool()24 async fn local_threadpool() {
25 thread_local! {
26 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
27 }
28
29 ON_RT_THREAD.with(|cell| cell.set(true));
30
31 LocalSet::new()
32 .run_until(async {
33 assert!(ON_RT_THREAD.with(|cell| cell.get()));
34 task::spawn_local(async {
35 assert!(ON_RT_THREAD.with(|cell| cell.get()));
36 })
37 .await
38 .unwrap();
39 })
40 .await;
41 }
42
43 #[tokio::test(threaded_scheduler)]
localset_future_threadpool()44 async fn localset_future_threadpool() {
45 thread_local! {
46 static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false);
47 }
48
49 ON_LOCAL_THREAD.with(|cell| cell.set(true));
50
51 let local = LocalSet::new();
52 local.spawn_local(async move {
53 assert!(ON_LOCAL_THREAD.with(|cell| cell.get()));
54 });
55 local.await;
56 }
57
58 #[tokio::test(threaded_scheduler)]
localset_future_timers()59 async fn localset_future_timers() {
60 static RAN1: AtomicBool = AtomicBool::new(false);
61 static RAN2: AtomicBool = AtomicBool::new(false);
62
63 let local = LocalSet::new();
64 local.spawn_local(async move {
65 time::delay_for(Duration::from_millis(10)).await;
66 RAN1.store(true, Ordering::SeqCst);
67 });
68 local.spawn_local(async move {
69 time::delay_for(Duration::from_millis(20)).await;
70 RAN2.store(true, Ordering::SeqCst);
71 });
72 local.await;
73 assert!(RAN1.load(Ordering::SeqCst));
74 assert!(RAN2.load(Ordering::SeqCst));
75 }
76
77 #[tokio::test]
localset_future_drives_all_local_futs()78 async fn localset_future_drives_all_local_futs() {
79 static RAN1: AtomicBool = AtomicBool::new(false);
80 static RAN2: AtomicBool = AtomicBool::new(false);
81 static RAN3: AtomicBool = AtomicBool::new(false);
82
83 let local = LocalSet::new();
84 local.spawn_local(async move {
85 task::spawn_local(async {
86 task::yield_now().await;
87 RAN3.store(true, Ordering::SeqCst);
88 });
89 task::yield_now().await;
90 RAN1.store(true, Ordering::SeqCst);
91 });
92 local.spawn_local(async move {
93 task::yield_now().await;
94 RAN2.store(true, Ordering::SeqCst);
95 });
96 local.await;
97 assert!(RAN1.load(Ordering::SeqCst));
98 assert!(RAN2.load(Ordering::SeqCst));
99 assert!(RAN3.load(Ordering::SeqCst));
100 }
101
102 #[tokio::test(threaded_scheduler)]
local_threadpool_timer()103 async fn local_threadpool_timer() {
104 // This test ensures that runtime services like the timer are properly
105 // set for the local task set.
106 thread_local! {
107 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
108 }
109
110 ON_RT_THREAD.with(|cell| cell.set(true));
111
112 LocalSet::new()
113 .run_until(async {
114 assert!(ON_RT_THREAD.with(|cell| cell.get()));
115 let join = task::spawn_local(async move {
116 assert!(ON_RT_THREAD.with(|cell| cell.get()));
117 time::delay_for(Duration::from_millis(10)).await;
118 assert!(ON_RT_THREAD.with(|cell| cell.get()));
119 });
120 join.await.unwrap();
121 })
122 .await;
123 }
124
125 #[test]
126 // This will panic, since the thread that calls `block_on` cannot use
127 // in-place blocking inside of `block_on`.
128 #[should_panic]
local_threadpool_blocking_in_place()129 fn local_threadpool_blocking_in_place() {
130 thread_local! {
131 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
132 }
133
134 ON_RT_THREAD.with(|cell| cell.set(true));
135
136 let mut rt = runtime::Builder::new()
137 .threaded_scheduler()
138 .enable_all()
139 .build()
140 .unwrap();
141 LocalSet::new().block_on(&mut rt, async {
142 assert!(ON_RT_THREAD.with(|cell| cell.get()));
143 let join = task::spawn_local(async move {
144 assert!(ON_RT_THREAD.with(|cell| cell.get()));
145 task::block_in_place(|| {});
146 assert!(ON_RT_THREAD.with(|cell| cell.get()));
147 });
148 join.await.unwrap();
149 });
150 }
151
152 #[tokio::test(threaded_scheduler)]
local_threadpool_blocking_run()153 async fn local_threadpool_blocking_run() {
154 thread_local! {
155 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
156 }
157
158 ON_RT_THREAD.with(|cell| cell.set(true));
159
160 LocalSet::new()
161 .run_until(async {
162 assert!(ON_RT_THREAD.with(|cell| cell.get()));
163 let join = task::spawn_local(async move {
164 assert!(ON_RT_THREAD.with(|cell| cell.get()));
165 task::spawn_blocking(|| {
166 assert!(
167 !ON_RT_THREAD.with(|cell| cell.get()),
168 "blocking must not run on the local task set's thread"
169 );
170 })
171 .await
172 .unwrap();
173 assert!(ON_RT_THREAD.with(|cell| cell.get()));
174 });
175 join.await.unwrap();
176 })
177 .await;
178 }
179
180 #[tokio::test(threaded_scheduler)]
all_spawns_are_local()181 async fn all_spawns_are_local() {
182 use futures::future;
183 thread_local! {
184 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
185 }
186
187 ON_RT_THREAD.with(|cell| cell.set(true));
188
189 LocalSet::new()
190 .run_until(async {
191 assert!(ON_RT_THREAD.with(|cell| cell.get()));
192 let handles = (0..128)
193 .map(|_| {
194 task::spawn_local(async {
195 assert!(ON_RT_THREAD.with(|cell| cell.get()));
196 })
197 })
198 .collect::<Vec<_>>();
199 for joined in future::join_all(handles).await {
200 joined.unwrap();
201 }
202 })
203 .await;
204 }
205
206 #[tokio::test(threaded_scheduler)]
nested_spawn_is_local()207 async fn nested_spawn_is_local() {
208 thread_local! {
209 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
210 }
211
212 ON_RT_THREAD.with(|cell| cell.set(true));
213
214 LocalSet::new()
215 .run_until(async {
216 assert!(ON_RT_THREAD.with(|cell| cell.get()));
217 task::spawn_local(async {
218 assert!(ON_RT_THREAD.with(|cell| cell.get()));
219 task::spawn_local(async {
220 assert!(ON_RT_THREAD.with(|cell| cell.get()));
221 task::spawn_local(async {
222 assert!(ON_RT_THREAD.with(|cell| cell.get()));
223 task::spawn_local(async {
224 assert!(ON_RT_THREAD.with(|cell| cell.get()));
225 })
226 .await
227 .unwrap();
228 })
229 .await
230 .unwrap();
231 })
232 .await
233 .unwrap();
234 })
235 .await
236 .unwrap();
237 })
238 .await;
239 }
240
241 #[test]
join_local_future_elsewhere()242 fn join_local_future_elsewhere() {
243 thread_local! {
244 static ON_RT_THREAD: Cell<bool> = Cell::new(false);
245 }
246
247 ON_RT_THREAD.with(|cell| cell.set(true));
248
249 let mut rt = runtime::Builder::new()
250 .threaded_scheduler()
251 .build()
252 .unwrap();
253 let local = LocalSet::new();
254 local.block_on(&mut rt, async move {
255 let (tx, rx) = oneshot::channel();
256 let join = task::spawn_local(async move {
257 println!("hello world running...");
258 assert!(
259 ON_RT_THREAD.with(|cell| cell.get()),
260 "local task must run on local thread, no matter where it is awaited"
261 );
262 rx.await.unwrap();
263
264 println!("hello world task done");
265 "hello world"
266 });
267 let join2 = task::spawn(async move {
268 assert!(
269 !ON_RT_THREAD.with(|cell| cell.get()),
270 "spawned task should be on a worker"
271 );
272
273 tx.send(()).expect("task shouldn't have ended yet");
274 println!("waking up hello world...");
275
276 join.await.expect("task should complete successfully");
277
278 println!("hello world task joined");
279 });
280 join2.await.unwrap()
281 });
282 }
283
284 #[test]
drop_cancels_tasks()285 fn drop_cancels_tasks() {
286 use std::rc::Rc;
287
288 // This test reproduces issue #1842
289 let mut rt = rt();
290 let rc1 = Rc::new(());
291 let rc2 = rc1.clone();
292
293 let (started_tx, started_rx) = oneshot::channel();
294
295 let local = LocalSet::new();
296 local.spawn_local(async move {
297 // Move this in
298 let _rc2 = rc2;
299
300 started_tx.send(()).unwrap();
301 loop {
302 time::delay_for(Duration::from_secs(3600)).await;
303 }
304 });
305
306 local.block_on(&mut rt, async {
307 started_rx.await.unwrap();
308 });
309 drop(local);
310 drop(rt);
311
312 assert_eq!(1, Rc::strong_count(&rc1));
313 }
314
315 /// Runs a test function in a separate thread, and panics if the test does not
316 /// complete within the specified timeout, or if the test function panics.
317 ///
318 /// This is intended for running tests whose failure mode is a hang or infinite
319 /// loop that cannot be detected otherwise.
with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static)320 fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
321 use std::sync::mpsc::RecvTimeoutError;
322
323 let (done_tx, done_rx) = std::sync::mpsc::channel();
324 let thread = std::thread::spawn(move || {
325 f();
326
327 // Send a message on the channel so that the test thread can
328 // determine if we have entered an infinite loop:
329 done_tx.send(()).unwrap();
330 });
331
332 // Since the failure mode of this test is an infinite loop, rather than
333 // something we can easily make assertions about, we'll run it in a
334 // thread. When the test thread finishes, it will send a message on a
335 // channel to this thread. We'll wait for that message with a fairly
336 // generous timeout, and if we don't recieve it, we assume the test
337 // thread has hung.
338 //
339 // Note that it should definitely complete in under a minute, but just
340 // in case CI is slow, we'll give it a long timeout.
341 match done_rx.recv_timeout(timeout) {
342 Err(RecvTimeoutError::Timeout) => panic!(
343 "test did not complete within {:?} seconds, \
344 we have (probably) entered an infinite loop!",
345 timeout,
346 ),
347 // Did the test thread panic? We'll find out for sure when we `join`
348 // with it.
349 Err(RecvTimeoutError::Disconnected) => {
350 println!("done_rx dropped, did the test thread panic?");
351 }
352 // Test completed successfully!
353 Ok(()) => {}
354 }
355
356 thread.join().expect("test thread should not panic!")
357 }
358
359 #[test]
drop_cancels_remote_tasks()360 fn drop_cancels_remote_tasks() {
361 // This test reproduces issue #1885.
362 with_timeout(Duration::from_secs(60), || {
363 let (tx, mut rx) = mpsc::channel::<()>(1024);
364
365 let mut rt = rt();
366
367 let local = LocalSet::new();
368 local.spawn_local(async move { while rx.recv().await.is_some() {} });
369 local.block_on(&mut rt, async {
370 time::delay_for(Duration::from_millis(1)).await;
371 });
372
373 drop(tx);
374
375 // This enters an infinite loop if the remote notified tasks are not
376 // properly cancelled.
377 drop(local);
378 });
379 }
380
381 #[test]
local_tasks_wake_join_all()382 fn local_tasks_wake_join_all() {
383 // This test reproduces issue #2460.
384 with_timeout(Duration::from_secs(60), || {
385 use futures::future::join_all;
386 use tokio::task::LocalSet;
387
388 let mut rt = rt();
389 let set = LocalSet::new();
390 let mut handles = Vec::new();
391
392 for _ in 1..=128 {
393 handles.push(set.spawn_local(async move {
394 tokio::task::spawn_local(async move {}).await.unwrap();
395 }));
396 }
397
398 rt.block_on(set.run_until(join_all(handles)));
399 });
400 }
401
402 #[tokio::test]
local_tasks_are_polled_after_tick()403 async fn local_tasks_are_polled_after_tick() {
404 // Reproduces issues #1899 and #1900
405
406 static RX1: AtomicUsize = AtomicUsize::new(0);
407 static RX2: AtomicUsize = AtomicUsize::new(0);
408 static EXPECTED: usize = 500;
409
410 let (tx, mut rx) = mpsc::unbounded_channel();
411
412 let local = LocalSet::new();
413
414 local
415 .run_until(async {
416 let task2 = task::spawn(async move {
417 // Wait a bit
418 time::delay_for(Duration::from_millis(100)).await;
419
420 let mut oneshots = Vec::with_capacity(EXPECTED);
421
422 // Send values
423 for _ in 0..EXPECTED {
424 let (oneshot_tx, oneshot_rx) = oneshot::channel();
425 oneshots.push(oneshot_tx);
426 tx.send(oneshot_rx).unwrap();
427 }
428
429 time::delay_for(Duration::from_millis(100)).await;
430
431 for tx in oneshots.drain(..) {
432 tx.send(()).unwrap();
433 }
434
435 time::delay_for(Duration::from_millis(300)).await;
436 let rx1 = RX1.load(SeqCst);
437 let rx2 = RX2.load(SeqCst);
438 println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2);
439 assert_eq!(EXPECTED, rx1);
440 assert_eq!(EXPECTED, rx2);
441 });
442
443 while let Some(oneshot) = rx.recv().await {
444 RX1.fetch_add(1, SeqCst);
445
446 task::spawn_local(async move {
447 oneshot.await.unwrap();
448 RX2.fetch_add(1, SeqCst);
449 });
450 }
451
452 task2.await.unwrap();
453 })
454 .await;
455 }
456
457 #[tokio::test]
acquire_mutex_in_drop()458 async fn acquire_mutex_in_drop() {
459 use futures::future::pending;
460
461 let (tx1, rx1) = oneshot::channel();
462 let (tx2, rx2) = oneshot::channel();
463 let local = LocalSet::new();
464
465 local.spawn_local(async move {
466 let _ = rx2.await;
467 unreachable!();
468 });
469
470 local.spawn_local(async move {
471 let _ = rx1.await;
472 tx2.send(()).unwrap();
473 unreachable!();
474 });
475
476 // Spawn a task that will never notify
477 local.spawn_local(async move {
478 pending::<()>().await;
479 tx1.send(()).unwrap();
480 });
481
482 // Tick the loop
483 local
484 .run_until(async {
485 task::yield_now().await;
486 })
487 .await;
488
489 // Drop the LocalSet
490 drop(local);
491 }
492
rt() -> Runtime493 fn rt() -> Runtime {
494 tokio::runtime::Builder::new()
495 .basic_scheduler()
496 .enable_all()
497 .build()
498 .unwrap()
499 }
500