1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use tokio::runtime::Runtime;
5 use tokio::sync::{mpsc, oneshot};
6 use tokio_test::{assert_err, assert_ok};
7
8 use std::thread;
9 use std::time::Duration;
10
11 #[test]
spawned_task_does_not_progress_without_block_on()12 fn spawned_task_does_not_progress_without_block_on() {
13 let (tx, mut rx) = oneshot::channel();
14
15 let mut rt = rt();
16
17 rt.spawn(async move {
18 assert_ok!(tx.send("hello"));
19 });
20
21 thread::sleep(Duration::from_millis(50));
22
23 assert_err!(rx.try_recv());
24
25 let out = rt.block_on(async { assert_ok!(rx.await) });
26
27 assert_eq!(out, "hello");
28 }
29
30 #[test]
no_extra_poll()31 fn no_extra_poll() {
32 use pin_project_lite::pin_project;
33 use std::pin::Pin;
34 use std::sync::{
35 atomic::{AtomicUsize, Ordering::SeqCst},
36 Arc,
37 };
38 use std::task::{Context, Poll};
39 use tokio::stream::{Stream, StreamExt};
40
41 pin_project! {
42 struct TrackPolls<S> {
43 npolls: Arc<AtomicUsize>,
44 #[pin]
45 s: S,
46 }
47 }
48
49 impl<S> Stream for TrackPolls<S>
50 where
51 S: Stream,
52 {
53 type Item = S::Item;
54 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55 let this = self.project();
56 this.npolls.fetch_add(1, SeqCst);
57 this.s.poll_next(cx)
58 }
59 }
60
61 let (tx, rx) = mpsc::unbounded_channel();
62 let mut rx = TrackPolls {
63 npolls: Arc::new(AtomicUsize::new(0)),
64 s: rx,
65 };
66 let npolls = Arc::clone(&rx.npolls);
67
68 let mut rt = rt();
69
70 rt.spawn(async move { while rx.next().await.is_some() {} });
71 rt.block_on(async {
72 tokio::task::yield_now().await;
73 });
74
75 // should have been polled exactly once: the initial poll
76 assert_eq!(npolls.load(SeqCst), 1);
77
78 tx.send(()).unwrap();
79 rt.block_on(async {
80 tokio::task::yield_now().await;
81 });
82
83 // should have been polled twice more: once to yield Some(), then once to yield Pending
84 assert_eq!(npolls.load(SeqCst), 1 + 2);
85
86 drop(tx);
87 rt.block_on(async {
88 tokio::task::yield_now().await;
89 });
90
91 // should have been polled once more: to yield None
92 assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
93 }
94
95 #[test]
acquire_mutex_in_drop()96 fn acquire_mutex_in_drop() {
97 use futures::future::pending;
98 use tokio::task;
99
100 let (tx1, rx1) = oneshot::channel();
101 let (tx2, rx2) = oneshot::channel();
102
103 let mut rt = rt();
104
105 rt.spawn(async move {
106 let _ = rx2.await;
107 unreachable!();
108 });
109
110 rt.spawn(async move {
111 let _ = rx1.await;
112 tx2.send(()).unwrap();
113 unreachable!();
114 });
115
116 // Spawn a task that will never notify
117 rt.spawn(async move {
118 pending::<()>().await;
119 tx1.send(()).unwrap();
120 });
121
122 // Tick the loop
123 rt.block_on(async {
124 task::yield_now().await;
125 });
126
127 // Drop the rt
128 drop(rt);
129 }
130
rt() -> Runtime131 fn rt() -> Runtime {
132 tokio::runtime::Builder::new()
133 .basic_scheduler()
134 .enable_all()
135 .build()
136 .unwrap()
137 }
138