1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use tokio::runtime::Runtime;
5 use tokio::sync::{mpsc, oneshot};
6 use tokio_test::{assert_err, assert_ok};
7
8 use std::thread;
9 use std::time::Duration;
10
11 #[test]
spawned_task_does_not_progress_without_block_on()12 fn spawned_task_does_not_progress_without_block_on() {
13 let (tx, mut rx) = oneshot::channel();
14
15 let mut rt = rt();
16
17 rt.spawn(async move {
18 assert_ok!(tx.send("hello"));
19 });
20
21 thread::sleep(Duration::from_millis(50));
22
23 assert_err!(rx.try_recv());
24
25 let out = rt.block_on(async { assert_ok!(rx.await) });
26
27 assert_eq!(out, "hello");
28 }
29
30 #[test]
no_extra_poll()31 fn no_extra_poll() {
32 use std::pin::Pin;
33 use std::sync::{
34 atomic::{AtomicUsize, Ordering::SeqCst},
35 Arc,
36 };
37 use std::task::{Context, Poll};
38 use tokio::stream::{Stream, StreamExt};
39
40 struct TrackPolls<S> {
41 npolls: Arc<AtomicUsize>,
42 s: S,
43 }
44
45 impl<S> Stream for TrackPolls<S>
46 where
47 S: Stream,
48 {
49 type Item = S::Item;
50 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 // safety: we do not move s
52 let this = unsafe { self.get_unchecked_mut() };
53 this.npolls.fetch_add(1, SeqCst);
54 // safety: we are pinned, and so is s
55 unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx)
56 }
57 }
58
59 let (tx, rx) = mpsc::unbounded_channel();
60 let mut rx = TrackPolls {
61 npolls: Arc::new(AtomicUsize::new(0)),
62 s: rx,
63 };
64 let npolls = Arc::clone(&rx.npolls);
65
66 let mut rt = rt();
67
68 rt.spawn(async move { while let Some(_) = rx.next().await {} });
69 rt.block_on(async {
70 tokio::task::yield_now().await;
71 });
72
73 // should have been polled exactly once: the initial poll
74 assert_eq!(npolls.load(SeqCst), 1);
75
76 tx.send(()).unwrap();
77 rt.block_on(async {
78 tokio::task::yield_now().await;
79 });
80
81 // should have been polled twice more: once to yield Some(), then once to yield Pending
82 assert_eq!(npolls.load(SeqCst), 1 + 2);
83
84 drop(tx);
85 rt.block_on(async {
86 tokio::task::yield_now().await;
87 });
88
89 // should have been polled once more: to yield None
90 assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
91 }
92
93 #[test]
acquire_mutex_in_drop()94 fn acquire_mutex_in_drop() {
95 use futures::future::pending;
96 use tokio::task;
97
98 let (tx1, rx1) = oneshot::channel();
99 let (tx2, rx2) = oneshot::channel();
100
101 let mut rt = rt();
102
103 rt.spawn(async move {
104 let _ = rx2.await;
105 unreachable!();
106 });
107
108 rt.spawn(async move {
109 let _ = rx1.await;
110 tx2.send(()).unwrap();
111 unreachable!();
112 });
113
114 // Spawn a task that will never notify
115 rt.spawn(async move {
116 pending::<()>().await;
117 tx1.send(()).unwrap();
118 });
119
120 // Tick the loop
121 rt.block_on(async {
122 task::yield_now().await;
123 });
124
125 // Drop the rt
126 drop(rt);
127 }
128
rt() -> Runtime129 fn rt() -> Runtime {
130 tokio::runtime::Builder::new()
131 .basic_scheduler()
132 .enable_all()
133 .build()
134 .unwrap()
135 }
136