1 use crate::loom::sync::atomic::AtomicUsize;
2 use crate::loom::sync::Arc;
3 use crate::loom::thread;
4 use crate::runtime::{Builder, Runtime};
5 use crate::sync::oneshot::{self, Receiver};
6 use crate::task;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::sync::atomic::Ordering::{Acquire, Release};
10 use std::task::{Context, Poll};
11
assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize)12 fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
13 let (tx, rx) = oneshot::channel();
14 let num_polls = Arc::new(AtomicUsize::new(0));
15 rt.spawn(async move {
16 for _ in 0..12 {
17 task::yield_now().await;
18 }
19 tx.send(()).unwrap();
20 });
21
22 rt.block_on(async {
23 BlockedFuture {
24 rx,
25 num_polls: num_polls.clone(),
26 }
27 .await;
28 });
29
30 let polls = num_polls.load(Acquire);
31 assert!(polls <= at_most_polls);
32 }
33
34 #[test]
block_on_num_polls()35 fn block_on_num_polls() {
36 loom::model(|| {
37 // we expect at most 3 number of polls because there are
38 // three points at which we poll the future. At any of these
39 // points it can be ready:
40 //
41 // - when we fail to steal the parker and we block on a
42 // notification that it is available.
43 //
44 // - when we steal the parker and we schedule the future
45 //
46 // - when the future is woken up and we have ran the max
47 // number of tasks for the current tick or there are no
48 // more tasks to run.
49 //
50 let at_most = 3;
51
52 let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
53 let rt2 = rt1.clone();
54 let rt3 = rt1.clone();
55
56 let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most));
57 let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most));
58 let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most));
59
60 th1.join().unwrap();
61 th2.join().unwrap();
62 th3.join().unwrap();
63 });
64 }
65
66 #[test]
assert_no_unnecessary_polls()67 fn assert_no_unnecessary_polls() {
68 loom::model(|| {
69 // // After we poll outer future, woken should reset to false
70 let rt = Builder::new_current_thread().build().unwrap();
71 let (tx, rx) = oneshot::channel();
72 let pending_cnt = Arc::new(AtomicUsize::new(0));
73
74 rt.spawn(async move {
75 for _ in 0..24 {
76 task::yield_now().await;
77 }
78 tx.send(()).unwrap();
79 });
80
81 let pending_cnt_clone = pending_cnt.clone();
82 rt.block_on(async move {
83 // use task::yield_now() to ensure woken set to true
84 // ResetFuture will be polled at most once
85 // Here comes two cases
86 // 1. recv no message from channel, ResetFuture will be polled
87 // but get Pending and we record ResetFuture.pending_cnt ++.
88 // Then when message arrive, ResetFuture returns Ready. So we
89 // expect ResetFuture.pending_cnt = 1
90 // 2. recv message from channel, ResetFuture returns Ready immediately.
91 // We expect ResetFuture.pending_cnt = 0
92 task::yield_now().await;
93 ResetFuture {
94 rx,
95 pending_cnt: pending_cnt_clone,
96 }
97 .await;
98 });
99
100 let pending_cnt = pending_cnt.load(Acquire);
101 assert!(pending_cnt <= 1);
102 });
103 }
104
105 struct BlockedFuture {
106 rx: Receiver<()>,
107 num_polls: Arc<AtomicUsize>,
108 }
109
110 impl Future for BlockedFuture {
111 type Output = ();
112
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>113 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
114 self.num_polls.fetch_add(1, Release);
115
116 match Pin::new(&mut self.rx).poll(cx) {
117 Poll::Pending => Poll::Pending,
118 _ => Poll::Ready(()),
119 }
120 }
121 }
122
123 struct ResetFuture {
124 rx: Receiver<()>,
125 pending_cnt: Arc<AtomicUsize>,
126 }
127
128 impl Future for ResetFuture {
129 type Output = ();
130
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>131 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132 match Pin::new(&mut self.rx).poll(cx) {
133 Poll::Pending => {
134 self.pending_cnt.fetch_add(1, Release);
135 Poll::Pending
136 }
137 _ => Poll::Ready(()),
138 }
139 }
140 }
141