1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use tokio::runtime::Runtime;
5 use tokio::sync::{mpsc, oneshot};
6 use tokio_test::{assert_err, assert_ok};
7 
8 use std::thread;
9 use std::time::Duration;
10 
11 #[test]
spawned_task_does_not_progress_without_block_on()12 fn spawned_task_does_not_progress_without_block_on() {
13     let (tx, mut rx) = oneshot::channel();
14 
15     let mut rt = rt();
16 
17     rt.spawn(async move {
18         assert_ok!(tx.send("hello"));
19     });
20 
21     thread::sleep(Duration::from_millis(50));
22 
23     assert_err!(rx.try_recv());
24 
25     let out = rt.block_on(async { assert_ok!(rx.await) });
26 
27     assert_eq!(out, "hello");
28 }
29 
30 #[test]
no_extra_poll()31 fn no_extra_poll() {
32     use pin_project_lite::pin_project;
33     use std::pin::Pin;
34     use std::sync::{
35         atomic::{AtomicUsize, Ordering::SeqCst},
36         Arc,
37     };
38     use std::task::{Context, Poll};
39     use tokio::stream::{Stream, StreamExt};
40 
41     pin_project! {
42         struct TrackPolls<S> {
43             npolls: Arc<AtomicUsize>,
44             #[pin]
45             s: S,
46         }
47     }
48 
49     impl<S> Stream for TrackPolls<S>
50     where
51         S: Stream,
52     {
53         type Item = S::Item;
54         fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55             let this = self.project();
56             this.npolls.fetch_add(1, SeqCst);
57             this.s.poll_next(cx)
58         }
59     }
60 
61     let (tx, rx) = mpsc::unbounded_channel();
62     let mut rx = TrackPolls {
63         npolls: Arc::new(AtomicUsize::new(0)),
64         s: rx,
65     };
66     let npolls = Arc::clone(&rx.npolls);
67 
68     let mut rt = rt();
69 
70     rt.spawn(async move { while rx.next().await.is_some() {} });
71     rt.block_on(async {
72         tokio::task::yield_now().await;
73     });
74 
75     // should have been polled exactly once: the initial poll
76     assert_eq!(npolls.load(SeqCst), 1);
77 
78     tx.send(()).unwrap();
79     rt.block_on(async {
80         tokio::task::yield_now().await;
81     });
82 
83     // should have been polled twice more: once to yield Some(), then once to yield Pending
84     assert_eq!(npolls.load(SeqCst), 1 + 2);
85 
86     drop(tx);
87     rt.block_on(async {
88         tokio::task::yield_now().await;
89     });
90 
91     // should have been polled once more: to yield None
92     assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
93 }
94 
95 #[test]
acquire_mutex_in_drop()96 fn acquire_mutex_in_drop() {
97     use futures::future::pending;
98     use tokio::task;
99 
100     let (tx1, rx1) = oneshot::channel();
101     let (tx2, rx2) = oneshot::channel();
102 
103     let mut rt = rt();
104 
105     rt.spawn(async move {
106         let _ = rx2.await;
107         unreachable!();
108     });
109 
110     rt.spawn(async move {
111         let _ = rx1.await;
112         tx2.send(()).unwrap();
113         unreachable!();
114     });
115 
116     // Spawn a task that will never notify
117     rt.spawn(async move {
118         pending::<()>().await;
119         tx1.send(()).unwrap();
120     });
121 
122     // Tick the loop
123     rt.block_on(async {
124         task::yield_now().await;
125     });
126 
127     // Drop the rt
128     drop(rt);
129 }
130 
rt() -> Runtime131 fn rt() -> Runtime {
132     tokio::runtime::Builder::new()
133         .basic_scheduler()
134         .enable_all()
135         .build()
136         .unwrap()
137 }
138