1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use tokio::runtime::Runtime;
5 use tokio::sync::oneshot;
6 use tokio_test::{assert_err, assert_ok};
7 
8 use std::thread;
9 use tokio::time::{timeout, Duration};
10 
11 mod support {
12     pub(crate) mod mpsc_stream;
13 }
14 
15 #[test]
spawned_task_does_not_progress_without_block_on()16 fn spawned_task_does_not_progress_without_block_on() {
17     let (tx, mut rx) = oneshot::channel();
18 
19     let rt = rt();
20 
21     rt.spawn(async move {
22         assert_ok!(tx.send("hello"));
23     });
24 
25     thread::sleep(Duration::from_millis(50));
26 
27     assert_err!(rx.try_recv());
28 
29     let out = rt.block_on(async { assert_ok!(rx.await) });
30 
31     assert_eq!(out, "hello");
32 }
33 
34 #[test]
no_extra_poll()35 fn no_extra_poll() {
36     use pin_project_lite::pin_project;
37     use std::pin::Pin;
38     use std::sync::{
39         atomic::{AtomicUsize, Ordering::SeqCst},
40         Arc,
41     };
42     use std::task::{Context, Poll};
43     use tokio_stream::{Stream, StreamExt};
44 
45     pin_project! {
46         struct TrackPolls<S> {
47             npolls: Arc<AtomicUsize>,
48             #[pin]
49             s: S,
50         }
51     }
52 
53     impl<S> Stream for TrackPolls<S>
54     where
55         S: Stream,
56     {
57         type Item = S::Item;
58         fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59             let this = self.project();
60             this.npolls.fetch_add(1, SeqCst);
61             this.s.poll_next(cx)
62         }
63     }
64 
65     let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>();
66     let rx = TrackPolls {
67         npolls: Arc::new(AtomicUsize::new(0)),
68         s: rx,
69     };
70     let npolls = Arc::clone(&rx.npolls);
71 
72     let rt = rt();
73 
74     // TODO: could probably avoid this, but why not.
75     let mut rx = Box::pin(rx);
76 
77     rt.spawn(async move { while rx.next().await.is_some() {} });
78     rt.block_on(async {
79         tokio::task::yield_now().await;
80     });
81 
82     // should have been polled exactly once: the initial poll
83     assert_eq!(npolls.load(SeqCst), 1);
84 
85     tx.send(()).unwrap();
86     rt.block_on(async {
87         tokio::task::yield_now().await;
88     });
89 
90     // should have been polled twice more: once to yield Some(), then once to yield Pending
91     assert_eq!(npolls.load(SeqCst), 1 + 2);
92 
93     drop(tx);
94     rt.block_on(async {
95         tokio::task::yield_now().await;
96     });
97 
98     // should have been polled once more: to yield None
99     assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
100 }
101 
102 #[test]
acquire_mutex_in_drop()103 fn acquire_mutex_in_drop() {
104     use futures::future::pending;
105     use tokio::task;
106 
107     let (tx1, rx1) = oneshot::channel();
108     let (tx2, rx2) = oneshot::channel();
109 
110     let rt = rt();
111 
112     rt.spawn(async move {
113         let _ = rx2.await;
114         unreachable!();
115     });
116 
117     rt.spawn(async move {
118         let _ = rx1.await;
119         tx2.send(()).unwrap();
120         unreachable!();
121     });
122 
123     // Spawn a task that will never notify
124     rt.spawn(async move {
125         pending::<()>().await;
126         tx1.send(()).unwrap();
127     });
128 
129     // Tick the loop
130     rt.block_on(async {
131         task::yield_now().await;
132     });
133 
134     // Drop the rt
135     drop(rt);
136 }
137 
138 #[test]
139 #[should_panic(
140     expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
141 )]
timeout_panics_when_no_time_handle()142 fn timeout_panics_when_no_time_handle() {
143     let rt = tokio::runtime::Builder::new_current_thread()
144         .build()
145         .unwrap();
146     rt.block_on(async {
147         let (_tx, rx) = oneshot::channel::<()>();
148         let dur = Duration::from_millis(20);
149         let _ = timeout(dur, rx).await;
150     });
151 }
152 
rt() -> Runtime153 fn rt() -> Runtime {
154     tokio::runtime::Builder::new_current_thread()
155         .enable_all()
156         .build()
157         .unwrap()
158 }
159