1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 // All io tests that deal with shutdown is currently ignored because there are known bugs in with
5 // shutting down the io driver while concurrently registering new resources. See
6 // https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 fo more details.
7 //
8 // When this has been fixed we want to re-enable these tests.
9 
10 use std::time::Duration;
11 use tokio::runtime::{Handle, Runtime};
12 use tokio::sync::mpsc;
13 use tokio::task::spawn_blocking;
14 use tokio::{fs, net, time};
15 
16 macro_rules! multi_threaded_rt_test {
17     ($($t:tt)*) => {
18         mod threaded_scheduler_4_threads_only {
19             use super::*;
20 
21             $($t)*
22 
23             fn rt() -> Runtime {
24                 tokio::runtime::Builder::new_multi_thread()
25                     .worker_threads(4)
26                     .enable_all()
27                     .build()
28                     .unwrap()
29             }
30         }
31 
32         mod threaded_scheduler_1_thread_only {
33             use super::*;
34 
35             $($t)*
36 
37             fn rt() -> Runtime {
38                 tokio::runtime::Builder::new_multi_thread()
39                     .worker_threads(1)
40                     .enable_all()
41                     .build()
42                     .unwrap()
43             }
44         }
45     }
46 }
47 
48 macro_rules! rt_test {
49     ($($t:tt)*) => {
50         mod current_thread_scheduler {
51             use super::*;
52 
53             $($t)*
54 
55             fn rt() -> Runtime {
56                 tokio::runtime::Builder::new_current_thread()
57                     .enable_all()
58                     .build()
59                     .unwrap()
60             }
61         }
62 
63         mod threaded_scheduler_4_threads {
64             use super::*;
65 
66             $($t)*
67 
68             fn rt() -> Runtime {
69                 tokio::runtime::Builder::new_multi_thread()
70                     .worker_threads(4)
71                     .enable_all()
72                     .build()
73                     .unwrap()
74             }
75         }
76 
77         mod threaded_scheduler_1_thread {
78             use super::*;
79 
80             $($t)*
81 
82             fn rt() -> Runtime {
83                 tokio::runtime::Builder::new_multi_thread()
84                     .worker_threads(1)
85                     .enable_all()
86                     .build()
87                     .unwrap()
88             }
89         }
90     }
91 }
92 
93 // ==== runtime independent futures ======
94 
95 #[test]
basic()96 fn basic() {
97     test_with_runtimes(|| {
98         let one = Handle::current().block_on(async { 1 });
99         assert_eq!(1, one);
100     });
101 }
102 
103 #[test]
bounded_mpsc_channel()104 fn bounded_mpsc_channel() {
105     test_with_runtimes(|| {
106         let (tx, mut rx) = mpsc::channel(1024);
107 
108         Handle::current().block_on(tx.send(42)).unwrap();
109 
110         let value = Handle::current().block_on(rx.recv()).unwrap();
111         assert_eq!(value, 42);
112     });
113 }
114 
115 #[test]
unbounded_mpsc_channel()116 fn unbounded_mpsc_channel() {
117     test_with_runtimes(|| {
118         let (tx, mut rx) = mpsc::unbounded_channel();
119 
120         let _ = tx.send(42);
121 
122         let value = Handle::current().block_on(rx.recv()).unwrap();
123         assert_eq!(value, 42);
124     })
125 }
126 
127 rt_test! {
128     // ==== spawn blocking futures ======
129 
130     #[test]
131     fn basic_fs() {
132         let rt = rt();
133         let _enter = rt.enter();
134 
135         let contents = Handle::current()
136             .block_on(fs::read_to_string("Cargo.toml"))
137             .unwrap();
138         assert!(contents.contains("https://tokio.rs"));
139     }
140 
141     #[test]
142     fn fs_shutdown_before_started() {
143         let rt = rt();
144         let _enter = rt.enter();
145         rt.shutdown_timeout(Duration::from_secs(1000));
146 
147         let err: std::io::Error = Handle::current()
148             .block_on(fs::read_to_string("Cargo.toml"))
149             .unwrap_err();
150 
151         assert_eq!(err.kind(), std::io::ErrorKind::Other);
152 
153         let inner_err = err.get_ref().expect("no inner error");
154         assert_eq!(inner_err.to_string(), "background task failed");
155     }
156 
157     #[test]
158     fn basic_spawn_blocking() {
159         let rt = rt();
160         let _enter = rt.enter();
161 
162         let answer = Handle::current()
163             .block_on(spawn_blocking(|| {
164                 std::thread::sleep(Duration::from_millis(100));
165                 42
166             }))
167             .unwrap();
168 
169         assert_eq!(answer, 42);
170     }
171 
172     #[test]
173     fn spawn_blocking_after_shutdown_fails() {
174         let rt = rt();
175         let _enter = rt.enter();
176         rt.shutdown_timeout(Duration::from_secs(1000));
177 
178         let join_err = Handle::current()
179             .block_on(spawn_blocking(|| {
180                 std::thread::sleep(Duration::from_millis(100));
181                 42
182             }))
183             .unwrap_err();
184 
185         assert!(join_err.is_cancelled());
186     }
187 
188     #[test]
189     fn spawn_blocking_started_before_shutdown_continues() {
190         let rt = rt();
191         let _enter = rt.enter();
192 
193         let handle = spawn_blocking(|| {
194             std::thread::sleep(Duration::from_secs(1));
195             42
196         });
197 
198         rt.shutdown_timeout(Duration::from_secs(1000));
199 
200         let answer = Handle::current().block_on(handle).unwrap();
201 
202         assert_eq!(answer, 42);
203     }
204 
205     // ==== net ======
206 
207     #[test]
208     fn tcp_listener_bind() {
209         let rt = rt();
210         let _enter = rt.enter();
211 
212         Handle::current()
213             .block_on(net::TcpListener::bind("127.0.0.1:0"))
214             .unwrap();
215     }
216 
217     // All io tests are ignored for now. See above why that is.
218     #[ignore]
219     #[test]
220     fn tcp_listener_connect_after_shutdown() {
221         let rt = rt();
222         let _enter = rt.enter();
223 
224         rt.shutdown_timeout(Duration::from_secs(1000));
225 
226         let err = Handle::current()
227             .block_on(net::TcpListener::bind("127.0.0.1:0"))
228             .unwrap_err();
229 
230         assert_eq!(err.kind(), std::io::ErrorKind::Other);
231         assert_eq!(
232             err.get_ref().unwrap().to_string(),
233             "A Tokio 1.x context was found, but it is being shutdown.",
234         );
235     }
236 
237     // All io tests are ignored for now. See above why that is.
238     #[ignore]
239     #[test]
240     fn tcp_listener_connect_before_shutdown() {
241         let rt = rt();
242         let _enter = rt.enter();
243 
244         let bind_future = net::TcpListener::bind("127.0.0.1:0");
245 
246         rt.shutdown_timeout(Duration::from_secs(1000));
247 
248         let err = Handle::current().block_on(bind_future).unwrap_err();
249 
250         assert_eq!(err.kind(), std::io::ErrorKind::Other);
251         assert_eq!(
252             err.get_ref().unwrap().to_string(),
253             "A Tokio 1.x context was found, but it is being shutdown.",
254         );
255     }
256 
257     #[test]
258     fn udp_socket_bind() {
259         let rt = rt();
260         let _enter = rt.enter();
261 
262         Handle::current()
263             .block_on(net::UdpSocket::bind("127.0.0.1:0"))
264             .unwrap();
265     }
266 
267     // All io tests are ignored for now. See above why that is.
268     #[ignore]
269     #[test]
270     fn udp_stream_bind_after_shutdown() {
271         let rt = rt();
272         let _enter = rt.enter();
273 
274         rt.shutdown_timeout(Duration::from_secs(1000));
275 
276         let err = Handle::current()
277             .block_on(net::UdpSocket::bind("127.0.0.1:0"))
278             .unwrap_err();
279 
280         assert_eq!(err.kind(), std::io::ErrorKind::Other);
281         assert_eq!(
282             err.get_ref().unwrap().to_string(),
283             "A Tokio 1.x context was found, but it is being shutdown.",
284         );
285     }
286 
287     // All io tests are ignored for now. See above why that is.
288     #[ignore]
289     #[test]
290     fn udp_stream_bind_before_shutdown() {
291         let rt = rt();
292         let _enter = rt.enter();
293 
294         let bind_future = net::UdpSocket::bind("127.0.0.1:0");
295 
296         rt.shutdown_timeout(Duration::from_secs(1000));
297 
298         let err = Handle::current().block_on(bind_future).unwrap_err();
299 
300         assert_eq!(err.kind(), std::io::ErrorKind::Other);
301         assert_eq!(
302             err.get_ref().unwrap().to_string(),
303             "A Tokio 1.x context was found, but it is being shutdown.",
304         );
305     }
306 
307     // All io tests are ignored for now. See above why that is.
308     #[ignore]
309     #[cfg(unix)]
310     #[test]
311     fn unix_listener_bind_after_shutdown() {
312         let rt = rt();
313         let _enter = rt.enter();
314 
315         let dir = tempfile::tempdir().unwrap();
316         let path = dir.path().join("socket");
317 
318         rt.shutdown_timeout(Duration::from_secs(1000));
319 
320         let err = net::UnixListener::bind(path).unwrap_err();
321 
322         assert_eq!(err.kind(), std::io::ErrorKind::Other);
323         assert_eq!(
324             err.get_ref().unwrap().to_string(),
325             "A Tokio 1.x context was found, but it is being shutdown.",
326         );
327     }
328 
329     // All io tests are ignored for now. See above why that is.
330     #[ignore]
331     #[cfg(unix)]
332     #[test]
333     fn unix_listener_shutdown_after_bind() {
334         let rt = rt();
335         let _enter = rt.enter();
336 
337         let dir = tempfile::tempdir().unwrap();
338         let path = dir.path().join("socket");
339 
340         let listener = net::UnixListener::bind(path).unwrap();
341 
342         rt.shutdown_timeout(Duration::from_secs(1000));
343 
344         // this should not timeout but fail immediately since the runtime has been shutdown
345         let err = Handle::current().block_on(listener.accept()).unwrap_err();
346 
347         assert_eq!(err.kind(), std::io::ErrorKind::Other);
348         assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
349     }
350 
351     // All io tests are ignored for now. See above why that is.
352     #[ignore]
353     #[cfg(unix)]
354     #[test]
355     fn unix_listener_shutdown_after_accept() {
356         let rt = rt();
357         let _enter = rt.enter();
358 
359         let dir = tempfile::tempdir().unwrap();
360         let path = dir.path().join("socket");
361 
362         let listener = net::UnixListener::bind(path).unwrap();
363 
364         let accept_future = listener.accept();
365 
366         rt.shutdown_timeout(Duration::from_secs(1000));
367 
368         // this should not timeout but fail immediately since the runtime has been shutdown
369         let err = Handle::current().block_on(accept_future).unwrap_err();
370 
371         assert_eq!(err.kind(), std::io::ErrorKind::Other);
372         assert_eq!(err.get_ref().unwrap().to_string(), "reactor gone");
373     }
374 
375     // ==== nesting ======
376 
377     #[test]
378     #[should_panic(
379         expected = "Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks."
380     )]
381     fn nesting() {
382         fn some_non_async_function() -> i32 {
383             Handle::current().block_on(time::sleep(Duration::from_millis(10)));
384             1
385         }
386 
387         let rt = rt();
388 
389         rt.block_on(async { some_non_async_function() });
390     }
391 
392     #[test]
393     fn spawn_after_runtime_dropped() {
394         use futures::future::FutureExt;
395 
396         let rt = rt();
397 
398         let handle = rt.block_on(async move {
399             Handle::current()
400         });
401 
402         let jh1 = handle.spawn(futures::future::pending::<()>());
403 
404         drop(rt);
405 
406         let jh2 = handle.spawn(futures::future::pending::<()>());
407 
408         let err1 = jh1.now_or_never().unwrap().unwrap_err();
409         let err2 = jh2.now_or_never().unwrap().unwrap_err();
410         assert!(err1.is_cancelled());
411         assert!(err2.is_cancelled());
412     }
413 }
414 
415 multi_threaded_rt_test! {
416     #[cfg(unix)]
417     #[test]
418     fn unix_listener_bind() {
419         let rt = rt();
420         let _enter = rt.enter();
421 
422         let dir = tempfile::tempdir().unwrap();
423         let path = dir.path().join("socket");
424 
425         let listener = net::UnixListener::bind(path).unwrap();
426 
427         // this should timeout and not fail immediately since the runtime has not been shutdown
428         let _: tokio::time::error::Elapsed = Handle::current()
429             .block_on(tokio::time::timeout(
430                 Duration::from_millis(10),
431                 listener.accept(),
432             ))
433             .unwrap_err();
434     }
435 
436     // ==== timers ======
437 
438     // `Handle::block_on` doesn't work with timer futures on a current thread runtime as there is no
439     // one to drive the timers so they will just hang forever. Therefore they are not tested.
440 
441     #[test]
442     fn sleep() {
443         let rt = rt();
444         let _enter = rt.enter();
445 
446         Handle::current().block_on(time::sleep(Duration::from_millis(100)));
447     }
448 
449     #[test]
450     #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
451     fn sleep_before_shutdown_panics() {
452         let rt = rt();
453         let _enter = rt.enter();
454 
455         let f = time::sleep(Duration::from_millis(100));
456 
457         rt.shutdown_timeout(Duration::from_secs(1000));
458 
459         Handle::current().block_on(f);
460     }
461 
462     #[test]
463     #[should_panic(expected = "A Tokio 1.x context was found, but it is being shutdown.")]
464     fn sleep_after_shutdown_panics() {
465         let rt = rt();
466         let _enter = rt.enter();
467 
468         rt.shutdown_timeout(Duration::from_secs(1000));
469 
470         Handle::current().block_on(time::sleep(Duration::from_millis(100)));
471     }
472 }
473 
474 // ==== utils ======
475 
476 /// Create a new multi threaded runtime
new_multi_thread(n: usize) -> Runtime477 fn new_multi_thread(n: usize) -> Runtime {
478     tokio::runtime::Builder::new_multi_thread()
479         .worker_threads(n)
480         .enable_all()
481         .build()
482         .unwrap()
483 }
484 
485 /// Create a new single threaded runtime
new_current_thread() -> Runtime486 fn new_current_thread() -> Runtime {
487     tokio::runtime::Builder::new_current_thread()
488         .enable_all()
489         .build()
490         .unwrap()
491 }
492 
493 /// Utility to test things on both kinds of runtimes both before and after shutting it down.
test_with_runtimes<F>(f: F) where F: Fn(),494 fn test_with_runtimes<F>(f: F)
495 where
496     F: Fn(),
497 {
498     {
499         println!("current thread runtime");
500 
501         let rt = new_current_thread();
502         let _enter = rt.enter();
503         f();
504 
505         println!("current thread runtime after shutdown");
506         rt.shutdown_timeout(Duration::from_secs(1000));
507         f();
508     }
509 
510     {
511         println!("multi thread (1 thread) runtime");
512 
513         let rt = new_multi_thread(1);
514         let _enter = rt.enter();
515         f();
516 
517         println!("multi thread runtime after shutdown");
518         rt.shutdown_timeout(Duration::from_secs(1000));
519         f();
520     }
521 
522     {
523         println!("multi thread (4 threads) runtime");
524 
525         let rt = new_multi_thread(4);
526         let _enter = rt.enter();
527         f();
528 
529         println!("multi thread runtime after shutdown");
530         rt.shutdown_timeout(Duration::from_secs(1000));
531         f();
532     }
533 }
534