1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use tokio::{runtime, task};
5 use tokio_test::assert_ok;
6 
7 use std::thread;
8 use std::time::Duration;
9 
10 mod support {
11     pub(crate) mod mpsc_stream;
12 }
13 
14 #[tokio::test]
basic_blocking()15 async fn basic_blocking() {
16     // Run a few times
17     for _ in 0..100 {
18         let out = assert_ok!(
19             tokio::spawn(async {
20                 assert_ok!(
21                     task::spawn_blocking(|| {
22                         thread::sleep(Duration::from_millis(5));
23                         "hello"
24                     })
25                     .await
26                 )
27             })
28             .await
29         );
30 
31         assert_eq!(out, "hello");
32     }
33 }
34 
35 #[tokio::test(flavor = "multi_thread")]
block_in_blocking()36 async fn block_in_blocking() {
37     // Run a few times
38     for _ in 0..100 {
39         let out = assert_ok!(
40             tokio::spawn(async {
41                 assert_ok!(
42                     task::spawn_blocking(|| {
43                         task::block_in_place(|| {
44                             thread::sleep(Duration::from_millis(5));
45                         });
46                         "hello"
47                     })
48                     .await
49                 )
50             })
51             .await
52         );
53 
54         assert_eq!(out, "hello");
55     }
56 }
57 
58 #[tokio::test(flavor = "multi_thread")]
block_in_block()59 async fn block_in_block() {
60     // Run a few times
61     for _ in 0..100 {
62         let out = assert_ok!(
63             tokio::spawn(async {
64                 task::block_in_place(|| {
65                     task::block_in_place(|| {
66                         thread::sleep(Duration::from_millis(5));
67                     });
68                     "hello"
69                 })
70             })
71             .await
72         );
73 
74         assert_eq!(out, "hello");
75     }
76 }
77 
78 #[tokio::test(flavor = "current_thread")]
79 #[should_panic]
no_block_in_basic_scheduler()80 async fn no_block_in_basic_scheduler() {
81     task::block_in_place(|| {});
82 }
83 
84 #[test]
yes_block_in_threaded_block_on()85 fn yes_block_in_threaded_block_on() {
86     let rt = runtime::Runtime::new().unwrap();
87     rt.block_on(async {
88         task::block_in_place(|| {});
89     });
90 }
91 
92 #[test]
93 #[should_panic]
no_block_in_basic_block_on()94 fn no_block_in_basic_block_on() {
95     let rt = runtime::Builder::new_current_thread().build().unwrap();
96     rt.block_on(async {
97         task::block_in_place(|| {});
98     });
99 }
100 
101 #[test]
can_enter_basic_rt_from_within_block_in_place()102 fn can_enter_basic_rt_from_within_block_in_place() {
103     let outer = tokio::runtime::Runtime::new().unwrap();
104 
105     outer.block_on(async {
106         tokio::task::block_in_place(|| {
107             let inner = tokio::runtime::Builder::new_current_thread()
108                 .build()
109                 .unwrap();
110 
111             inner.block_on(async {})
112         })
113     });
114 }
115 
116 #[test]
useful_panic_message_when_dropping_rt_in_rt()117 fn useful_panic_message_when_dropping_rt_in_rt() {
118     use std::panic::{catch_unwind, AssertUnwindSafe};
119 
120     let outer = tokio::runtime::Runtime::new().unwrap();
121 
122     let result = catch_unwind(AssertUnwindSafe(|| {
123         outer.block_on(async {
124             let _ = tokio::runtime::Builder::new_current_thread()
125                 .build()
126                 .unwrap();
127         });
128     }));
129 
130     assert!(result.is_err());
131     let err = result.unwrap_err();
132     let err: &'static str = err.downcast_ref::<&'static str>().unwrap();
133 
134     assert!(
135         err.contains("Cannot drop a runtime"),
136         "Wrong panic message: {:?}",
137         err
138     );
139 }
140 
141 #[test]
can_shutdown_with_zero_timeout_in_runtime()142 fn can_shutdown_with_zero_timeout_in_runtime() {
143     let outer = tokio::runtime::Runtime::new().unwrap();
144 
145     outer.block_on(async {
146         let rt = tokio::runtime::Builder::new_current_thread()
147             .build()
148             .unwrap();
149         rt.shutdown_timeout(Duration::from_nanos(0));
150     });
151 }
152 
153 #[test]
can_shutdown_now_in_runtime()154 fn can_shutdown_now_in_runtime() {
155     let outer = tokio::runtime::Runtime::new().unwrap();
156 
157     outer.block_on(async {
158         let rt = tokio::runtime::Builder::new_current_thread()
159             .build()
160             .unwrap();
161         rt.shutdown_background();
162     });
163 }
164 
165 #[test]
coop_disabled_in_block_in_place()166 fn coop_disabled_in_block_in_place() {
167     let outer = tokio::runtime::Builder::new_multi_thread()
168         .enable_time()
169         .build()
170         .unwrap();
171 
172     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
173 
174     for i in 0..200 {
175         tx.send(i).unwrap();
176     }
177     drop(tx);
178 
179     outer.block_on(async move {
180         let jh = tokio::spawn(async move {
181             tokio::task::block_in_place(move || {
182                 futures::executor::block_on(async move {
183                     use tokio_stream::StreamExt;
184                     assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
185                 })
186             })
187         });
188 
189         tokio::time::timeout(Duration::from_secs(1), jh)
190             .await
191             .expect("timed out (probably hanging)")
192             .unwrap()
193     });
194 }
195 
196 #[test]
coop_disabled_in_block_in_place_in_block_on()197 fn coop_disabled_in_block_in_place_in_block_on() {
198     let (done_tx, done_rx) = std::sync::mpsc::channel();
199     let done = done_tx.clone();
200     thread::spawn(move || {
201         let outer = tokio::runtime::Runtime::new().unwrap();
202 
203         let (tx, rx) = support::mpsc_stream::unbounded_channel_stream();
204 
205         for i in 0..200 {
206             tx.send(i).unwrap();
207         }
208         drop(tx);
209 
210         outer.block_on(async move {
211             tokio::task::block_in_place(move || {
212                 futures::executor::block_on(async move {
213                     use tokio_stream::StreamExt;
214                     assert_eq!(rx.fold(0, |n, _| n + 1).await, 200);
215                 })
216             })
217         });
218 
219         let _ = done.send(Ok(()));
220     });
221 
222     thread::spawn(move || {
223         thread::sleep(Duration::from_secs(1));
224         let _ = done_tx.send(Err("timed out (probably hanging)"));
225     });
226 
227     done_rx.recv().unwrap().unwrap();
228 }
229