1 use crate::loom::sync::atomic::AtomicUsize;
2 use crate::loom::sync::Arc;
3 use crate::loom::thread;
4 use crate::runtime::{Builder, Runtime};
5 use crate::sync::oneshot::{self, Receiver};
6 use crate::task;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::sync::atomic::Ordering::{Acquire, Release};
10 use std::task::{Context, Poll};
11 
assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize)12 fn assert_at_most_num_polls(rt: Arc<Runtime>, at_most_polls: usize) {
13     let (tx, rx) = oneshot::channel();
14     let num_polls = Arc::new(AtomicUsize::new(0));
15     rt.spawn(async move {
16         for _ in 0..12 {
17             task::yield_now().await;
18         }
19         tx.send(()).unwrap();
20     });
21 
22     rt.block_on(async {
23         BlockedFuture {
24             rx,
25             num_polls: num_polls.clone(),
26         }
27         .await;
28     });
29 
30     let polls = num_polls.load(Acquire);
31     assert!(polls <= at_most_polls);
32 }
33 
34 #[test]
block_on_num_polls()35 fn block_on_num_polls() {
36     loom::model(|| {
37         // we expect at most 3 number of polls because there are
38         // three points at which we poll the future. At any of these
39         // points it can be ready:
40         //
41         // - when we fail to steal the parker and we block on a
42         //   notification that it is available.
43         //
44         // - when we steal the parker and we schedule the future
45         //
46         // - when the future is woken up and we have ran the max
47         //   number of tasks for the current tick or there are no
48         //   more tasks to run.
49         //
50         let at_most = 3;
51 
52         let rt1 = Arc::new(Builder::new_current_thread().build().unwrap());
53         let rt2 = rt1.clone();
54         let rt3 = rt1.clone();
55 
56         let th1 = thread::spawn(move || assert_at_most_num_polls(rt1, at_most));
57         let th2 = thread::spawn(move || assert_at_most_num_polls(rt2, at_most));
58         let th3 = thread::spawn(move || assert_at_most_num_polls(rt3, at_most));
59 
60         th1.join().unwrap();
61         th2.join().unwrap();
62         th3.join().unwrap();
63     });
64 }
65 
66 #[test]
assert_no_unnecessary_polls()67 fn assert_no_unnecessary_polls() {
68     loom::model(|| {
69         // // After we poll outer future, woken should reset to false
70         let rt = Builder::new_current_thread().build().unwrap();
71         let (tx, rx) = oneshot::channel();
72         let pending_cnt = Arc::new(AtomicUsize::new(0));
73 
74         rt.spawn(async move {
75             for _ in 0..24 {
76                 task::yield_now().await;
77             }
78             tx.send(()).unwrap();
79         });
80 
81         let pending_cnt_clone = pending_cnt.clone();
82         rt.block_on(async move {
83             // use task::yield_now() to ensure woken set to true
84             // ResetFuture will be polled at most once
85             // Here comes two cases
86             // 1. recv no message from channel, ResetFuture will be polled
87             //    but get Pending and we record ResetFuture.pending_cnt ++.
88             //    Then when message arrive, ResetFuture returns Ready. So we
89             //    expect ResetFuture.pending_cnt = 1
90             // 2. recv message from channel, ResetFuture returns Ready immediately.
91             //    We expect ResetFuture.pending_cnt = 0
92             task::yield_now().await;
93             ResetFuture {
94                 rx,
95                 pending_cnt: pending_cnt_clone,
96             }
97             .await;
98         });
99 
100         let pending_cnt = pending_cnt.load(Acquire);
101         assert!(pending_cnt <= 1);
102     });
103 }
104 
105 struct BlockedFuture {
106     rx: Receiver<()>,
107     num_polls: Arc<AtomicUsize>,
108 }
109 
110 impl Future for BlockedFuture {
111     type Output = ();
112 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>113     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
114         self.num_polls.fetch_add(1, Release);
115 
116         match Pin::new(&mut self.rx).poll(cx) {
117             Poll::Pending => Poll::Pending,
118             _ => Poll::Ready(()),
119         }
120     }
121 }
122 
123 struct ResetFuture {
124     rx: Receiver<()>,
125     pending_cnt: Arc<AtomicUsize>,
126 }
127 
128 impl Future for ResetFuture {
129     type Output = ();
130 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>131     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132         match Pin::new(&mut self.rx).poll(cx) {
133             Poll::Pending => {
134                 self.pending_cnt.fetch_add(1, Release);
135                 Poll::Pending
136             }
137             _ => Poll::Ready(()),
138         }
139     }
140 }
141