1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 // All io tests that deal with shutdown is currently ignored because there are known bugs in with
5 // shutting down the io driver while concurrently registering new resources. See
6 // https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 fo more details.
7 //
8 // When this has been fixed we want to re-enable these tests.
9
10 use std::time::Duration;
11 use tokio::runtime::{Handle, Runtime};
12 use tokio::sync::mpsc;
13 use tokio::task::spawn_blocking;
14 use tokio::{fs, net, time};
15
16 macro_rules! multi_threaded_rt_test {
17 ($($t:tt)*) => {
18 mod threaded_scheduler_4_threads_only {
19 use super::*;
20
21 $($t)*
22
23 fn rt() -> Runtime {
24 tokio::runtime::Builder::new_multi_thread()
25 .worker_threads(4)
26 .enable_all()
27 .build()
28 .unwrap()
29 }
30 }
31
32 mod threaded_scheduler_1_thread_only {
33 use super::*;
34
35 $($t)*
36
37 fn rt() -> Runtime {
38 tokio::runtime::Builder::new_multi_thread()
39 .worker_threads(1)
40 .enable_all()
41 .build()
42 .unwrap()
43 }
44 }
45 }
46 }
47
48 macro_rules! rt_test {
49 ($($t:tt)*) => {
50 mod current_thread_scheduler {
51 use super::*;
52
53 $($t)*
54
55 fn rt() -> Runtime {
56 tokio::runtime::Builder::new_current_thread()
57 .enable_all()
58 .build()
59 .unwrap()
60 }
61 }
62
63 mod threaded_scheduler_4_threads {
64 use super::*;
65
66 $($t)*
67
68 fn rt() -> Runtime {
69 tokio::runtime::Builder::new_multi_thread()
70 .worker_threads(4)
71 .enable_all()
72 .build()
73 .unwrap()
74 }
75 }
76
77 mod threaded_scheduler_1_thread {
78 use super::*;
79
80 $($t)*
81
82 fn rt() -> Runtime {
83 tokio::runtime::Builder::new_multi_thread()
84 .worker_threads(1)
85 .enable_all()
86 .build()
87 .unwrap()
88 }
89 }
90 }
91 }
92
93 // ==== runtime independent futures ======
94
95 #[test]
basic()96 fn basic() {
97 test_with_runtimes(|| {
98 let one = Handle::current().block_on(async { 1 });
99 assert_eq!(1, one);
100 });
101 }
102
103 #[test]
bounded_mpsc_channel()104 fn bounded_mpsc_channel() {
105 test_with_runtimes(|| {
106 let (tx, mut rx) = mpsc::channel(1024);
107
108 Handle::current().block_on(tx.send(42)).unwrap();
109
110 let value = Handle::current().block_on(rx.recv()).unwrap();
111 assert_eq!(value, 42);
112 });
113 }
114
115 #[test]
unbounded_mpsc_channel()116 fn unbounded_mpsc_channel() {
117 test_with_runtimes(|| {
118 let (tx, mut rx) = mpsc::unbounded_channel();
119
120 let _ = tx.send(42);
121
122 let value = Handle::current().block_on(rx.recv()).unwrap();
123 assert_eq!(value, 42);
124 })
125 }
126
127 rt_test! {
128 // ==== spawn blocking futures ======
129
130 #[test]
131 fn basic_fs() {
132 let rt = rt();
133 let _enter = rt.enter();
134
135 let contents = Handle::current()
136 .block_on(fs::read_to_string("Cargo.toml"))
137 .unwrap();
138 assert!(contents.contains("Cargo.toml"));
139 }
140
141 #[test]
142 fn fs_shutdown_before_started() {
143 let rt = rt();
144 let _enter = rt.enter();
145 rt.shutdown_timeout(Duration::from_secs(1000));
146
147 let err: std::io::Error = Handle::current()
148 .block_on(fs::read_to_string("Cargo.toml"))
149 .unwrap_err();
150
151 assert_eq!(err.kind(), std::io::ErrorKind::Other);
152
153 let inner_err = err.get_ref().expect("no inner error");
154 assert_eq!(inner_err.to_string(), "background task failed");
155 }
156
157 #[test]
158 fn basic_spawn_blocking() {
159 let rt = rt();
160 let _enter = rt.enter();
161
162 let answer = Handle::current()
163 .block_on(spawn_blocking(|| {
164 std::thread::sleep(Duration::from_millis(100));
165 42
166 }))
167 .unwrap();
168
169 assert_eq!(answer, 42);
170 }
171
172 #[test]
173 fn spawn_blocking_after_shutdown_fails() {
174 let rt = rt();
175 let _enter = rt.enter();
176 rt.shutdown_timeout(Duration::from_secs(1000));
177
178 let join_err = Handle::current()
179 .block_on(spawn_blocking(|| {
180 std::thread::sleep(Duration::from_millis(100));
181 42
182 }))
183 .unwrap_err();
184
185 assert!(join_err.is_cancelled());
186 }
187
188 #[test]
189 fn spawn_blocking_started_before_shutdown_continues() {
190 let rt = rt();
191 let _enter = rt.enter();
192
193 let handle = spawn_blocking(|| {
194 std::thread::sleep(Duration::from_secs(1));
195 42
196 });
197
198 rt.shutdown_timeout(Duration::from_secs(1000));
199
200 let answer = Handle::current().block_on(handle).unwrap();
201
202 assert_eq!(answer, 42);
203 }
204
205 // ==== net ======
206
207 #[test]
208 fn tcp_listener_bind() {
209 let rt = rt();
210 let _enter = rt.enter();
211
212 Handle::current()
213 .block_on(net::TcpListener::bind("127.0.0.1:0"))
214 .unwrap();
215 }
216
217 // All io tests are ignored for now. See above why that is.
218 #[ignore]
219 #[test]
220 fn tcp_listener_connect_after_shutdown() {
221 let rt = rt();
222 let _enter = rt.enter();
223
224 rt.shutdown_timeout(Duration::from_secs(1000));
225
226 let err = Handle::current()
227 .block_on(net::TcpListener::bind("127.0.0.1:0"))
228 .unwrap_err();
229
230 assert_eq!(err.kind(), std::io::ErrorKind::Other);
231 assert_eq!(
232 err.get_ref().unwrap().to_string(),
233 "A Tokio 1.x context was found, but it is being shutdown.",
234 );
235 }
236
237 // All io tests are ignored for now. See above why that is.
238 #[ignore]
239 #[test]
240 fn tcp_listener_connect_before_shutdown() {
241 let rt = rt();
242 let _enter = rt.enter();
243
244 let bind_future = net::TcpListener::bind("127.0.0.1:0");
245
246 rt.shutdown_timeout(Duration::from_secs(1000));
247
248 let err = Handle::current().block_on(bind_future).unwrap_err();
249
250 assert_eq!(err.kind(), std::io::ErrorKind::Other);
251 assert_eq!(
252 err.get_ref().unwrap().to_string(),
253 "A Tokio 1.x context was found, but it is being shutdown.",
254 );
255 }
256
257 #[test]
258 fn udp_socket_bind() {
259 let rt = rt();
260 let _enter = rt.enter();
261
262 Handle::current()
263 .block_on(net::UdpSocket::bind("127.0.0.1:0"))
264 .unwrap();
265 }
266
267 // All io tests are ignored for now. See above why that is.
268 #[ignore]
269 #[test]
270 fn udp_stream_bind_after_shutdown() {
271 let rt = rt();
272 let _enter = rt.enter();
273
274 rt.shutdown_timeout(Duration::from_secs(1000));
275
276 let err = Handle::current()
277 .block_on(net::UdpSocket::bind("127.0.0.1:0"))
278 .unwrap_err();
279
280 assert_eq!(err.kind(), std::io::ErrorKind::Other);
281 assert_eq!(
282 err.get_ref().unwrap().to_string(),
283 "A Tokio 1.x context was found, but it is being shutdown.",
284 );
285 }
286
287 // All io tests are ignored for now. See above why that is.
288 #[ignore]
289 #[test]
290 fn udp_stream_bind_before_shutdown() {
291 let rt = rt();
292 let _enter = rt.enter();
293
294 let bind_future = net::UdpSocket::bind("127.0.0.1:0");
295
296 rt.shutdown_timeout(Duration::from_secs(1000));
297
298 let err = Handle::current().block_on(bind_future).unwrap_err();
299
300 assert_eq!(err.kind(), std::io::ErrorKind::Other);
301 assert_eq!(
302 err.get_ref().unwrap().to_string(),
303 "A Tokio 1.x context was found, but it is being shutdown.",
304 );
305 }
306
307 // All io tests are ignored for now. See above why that is.
308 #[ignore]
309 #[cfg(unix)]
310 #[test]
311 fn unix_listener_bind_after_shutdown() {
312 let rt = rt();
313 let _enter = rt.enter();
314
315 let dir = tempfile::tempdir().unwrap();
316 let path = dir.path().join("socket");
317
318 rt.shutdown_timeout(Duration::from_secs(1000));
319
320 let err = net::UnixListener::bind(path).unwrap_err();
321
322 assert_eq!(err.kind(), std::io::ErrorKind::Other);
323 assert_eq!(
324 err.get_ref().unwrap().to_string(),
325 "A Tokio 1.x context was found, but it is being shutdown.",
326 );
327 }
328
329 // All io tests are ignored for now. See above why that is.
330 #[ignore]
331 #[cfg(unix)]
332 #[test]
333 fn unix_listener_shutdown_after_bind() {
334 let rt = rt();
335 let _enter = rt.enter();
336
337 let dir = tempfile::tempdir().unwrap();
338 let path = dir.path().join("socket");
339
340 let listener = net::UnixListener::bind(path).unwrap();
341
342 rt.shutdown_timeout(Duration::from_secs(1000));
343
344 // this should not timeout but fail immediately since the runtime has been shutdown
345 let err = Handle::current().block_on(listener.accept()).unwrap_err();
346
347 assert_eq!(err.kind(), std::io::ErrorKind::Other);
348 assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
349 }
350
351 // All io tests are ignored for now. See above why that is.
352 #[ignore]
353 #[cfg(unix)]
354 #[test]
355 fn unix_listener_shutdown_after_accept() {
356 let rt = rt();
357 let _enter = rt.enter();
358
359 let dir = tempfile::tempdir().unwrap();
360 let path = dir.path().join("socket");
361
362 let listener = net::UnixListener::bind(path).unwrap();
363
364 let accept_future = listener.accept();
365
366 rt.shutdown_timeout(Duration::from_secs(1000));
367
368 // this should not timeout but fail immediately since the runtime has been shutdown
369 let err = Handle::current().block_on(accept_future).unwrap_err();
370
371 assert_eq!(err.kind(), std::io::ErrorKind::Other);
372 assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
373 }
374
375 // ==== nesting ======
376
377 #[test]
378 #[should_panic(
379 expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks."
380 )]
381 fn nesting() {
382 fn some_non_async_function() -> i32 {
383 Handle::current().block_on(time::sleep(Duration::from_millis(10)));
384 1
385 }
386
387 let rt = rt();
388
389 rt.block_on(async { some_non_async_function() });
390 }
391
392 #[test]
393 fn spawn_after_runtime_dropped() {
394 use futures::future::FutureExt;
395
396 let rt = rt();
397
398 let handle = rt.block_on(async move {
399 Handle::current()
400 });
401
402 let jh1 = handle.spawn(futures::future::pending::<()>());
403
404 drop(rt);
405
406 let jh2 = handle.spawn(futures::future::pending::<()>());
407
408 let err1 = jh1.now_or_never().unwrap().unwrap_err();
409 let err2 = jh2.now_or_never().unwrap().unwrap_err();
410 assert!(err1.is_cancelled());
411 assert!(err2.is_cancelled());
412 }
413 }
414
415 multi_threaded_rt_test! {
416 #[cfg(unix)]
417 #[test]
418 fn unix_listener_bind() {
419 let rt = rt();
420 let _enter = rt.enter();
421
422 let dir = tempfile::tempdir().unwrap();
423 let path = dir.path().join("socket");
424
425 let listener = net::UnixListener::bind(path).unwrap();
426
427 // this should timeout and not fail immediately since the runtime has not been shutdown
428 let _: tokio::time::error::Elapsed = Handle::current()
429 .block_on(tokio::time::timeout(
430 Duration::from_millis(10),
431 listener.accept(),
432 ))
433 .unwrap_err();
434 }
435
436 // ==== timers ======
437
438 // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no
439 // one to drive the timers so they will just hang forever. Therefore they are not tested.
440
441 #[test]
442 fn sleep() {
443 let rt = rt();
444 let _enter = rt.enter();
445
446 Handle::current().block_on(time::sleep(Duration::from_millis(100)));
447 }
448
449 #[test]
450 #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
451 fn sleep_before_shutdown_panics() {
452 let rt = rt();
453 let _enter = rt.enter();
454
455 let f = time::sleep(Duration::from_millis(100));
456
457 rt.shutdown_timeout(Duration::from_secs(1000));
458
459 Handle::current().block_on(f);
460 }
461
462 #[test]
463 #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
464 fn sleep_after_shutdown_panics() {
465 let rt = rt();
466 let _enter = rt.enter();
467
468 rt.shutdown_timeout(Duration::from_secs(1000));
469
470 Handle::current().block_on(time::sleep(Duration::from_millis(100)));
471 }
472 }
473
474 // ==== utils ======
475
476 /// Create a new multi threaded runtime
new_multi_thread(n: usize) -> Runtime477 fn new_multi_thread(n: usize) -> Runtime {
478 tokio::runtime::Builder::new_multi_thread()
479 .worker_threads(n)
480 .enable_all()
481 .build()
482 .unwrap()
483 }
484
485 /// Create a new single threaded runtime
new_current_thread() -> Runtime486 fn new_current_thread() -> Runtime {
487 tokio::runtime::Builder::new_current_thread()
488 .enable_all()
489 .build()
490 .unwrap()
491 }
492
493 /// Utility to test things on both kinds of runtimes both before and after shutting it down.
test_with_runtimes<F>(f: F) where F: Fn(),494 fn test_with_runtimes<F>(f: F)
495 where
496 F: Fn(),
497 {
498 {
499 println!("current thread runtime");
500
501 let rt = new_current_thread();
502 let _enter = rt.enter();
503 f();
504
505 println!("current thread runtime after shutdown");
506 rt.shutdown_timeout(Duration::from_secs(1000));
507 f();
508 }
509
510 {
511 println!("multi thread (1 thread) runtime");
512
513 let rt = new_multi_thread(1);
514 let _enter = rt.enter();
515 f();
516
517 println!("multi thread runtime after shutdown");
518 rt.shutdown_timeout(Duration::from_secs(1000));
519 f();
520 }
521
522 {
523 println!("multi thread (4 threads) runtime");
524
525 let rt = new_multi_thread(4);
526 let _enter = rt.enter();
527 f();
528
529 println!("multi thread runtime after shutdown");
530 rt.shutdown_timeout(Duration::from_secs(1000));
531 f();
532 }
533 }
534