1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use tokio::runtime::Runtime;
5 use tokio::sync::{mpsc, oneshot};
6 use tokio_test::{assert_err, assert_ok};
7 
8 use std::thread;
9 use std::time::Duration;
10 
11 #[test]
spawned_task_does_not_progress_without_block_on()12 fn spawned_task_does_not_progress_without_block_on() {
13     let (tx, mut rx) = oneshot::channel();
14 
15     let mut rt = rt();
16 
17     rt.spawn(async move {
18         assert_ok!(tx.send("hello"));
19     });
20 
21     thread::sleep(Duration::from_millis(50));
22 
23     assert_err!(rx.try_recv());
24 
25     let out = rt.block_on(async { assert_ok!(rx.await) });
26 
27     assert_eq!(out, "hello");
28 }
29 
30 #[test]
no_extra_poll()31 fn no_extra_poll() {
32     use std::pin::Pin;
33     use std::sync::{
34         atomic::{AtomicUsize, Ordering::SeqCst},
35         Arc,
36     };
37     use std::task::{Context, Poll};
38     use tokio::stream::{Stream, StreamExt};
39 
40     struct TrackPolls<S> {
41         npolls: Arc<AtomicUsize>,
42         s: S,
43     }
44 
45     impl<S> Stream for TrackPolls<S>
46     where
47         S: Stream,
48     {
49         type Item = S::Item;
50         fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51             // safety: we do not move s
52             let this = unsafe { self.get_unchecked_mut() };
53             this.npolls.fetch_add(1, SeqCst);
54             // safety: we are pinned, and so is s
55             unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx)
56         }
57     }
58 
59     let (tx, rx) = mpsc::unbounded_channel();
60     let mut rx = TrackPolls {
61         npolls: Arc::new(AtomicUsize::new(0)),
62         s: rx,
63     };
64     let npolls = Arc::clone(&rx.npolls);
65 
66     let mut rt = rt();
67 
68     rt.spawn(async move { while let Some(_) = rx.next().await {} });
69     rt.block_on(async {
70         tokio::task::yield_now().await;
71     });
72 
73     // should have been polled exactly once: the initial poll
74     assert_eq!(npolls.load(SeqCst), 1);
75 
76     tx.send(()).unwrap();
77     rt.block_on(async {
78         tokio::task::yield_now().await;
79     });
80 
81     // should have been polled twice more: once to yield Some(), then once to yield Pending
82     assert_eq!(npolls.load(SeqCst), 1 + 2);
83 
84     drop(tx);
85     rt.block_on(async {
86         tokio::task::yield_now().await;
87     });
88 
89     // should have been polled once more: to yield None
90     assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
91 }
92 
93 #[test]
acquire_mutex_in_drop()94 fn acquire_mutex_in_drop() {
95     use futures::future::pending;
96     use tokio::task;
97 
98     let (tx1, rx1) = oneshot::channel();
99     let (tx2, rx2) = oneshot::channel();
100 
101     let mut rt = rt();
102 
103     rt.spawn(async move {
104         let _ = rx2.await;
105         unreachable!();
106     });
107 
108     rt.spawn(async move {
109         let _ = rx1.await;
110         tx2.send(()).unwrap();
111         unreachable!();
112     });
113 
114     // Spawn a task that will never notify
115     rt.spawn(async move {
116         pending::<()>().await;
117         tx1.send(()).unwrap();
118     });
119 
120     // Tick the loop
121     rt.block_on(async {
122         task::yield_now().await;
123     });
124 
125     // Drop the rt
126     drop(rt);
127 }
128 
rt() -> Runtime129 fn rt() -> Runtime {
130     tokio::runtime::Builder::new()
131         .basic_scheduler()
132         .enable_all()
133         .build()
134         .unwrap()
135 }
136