1 use crate::sync::mpsc;
2 
3 use futures::future::poll_fn;
4 use loom::future::block_on;
5 use loom::sync::Arc;
6 use loom::thread;
7 use tokio_test::assert_ok;
8 
9 #[test]
closing_tx()10 fn closing_tx() {
11     loom::model(|| {
12         let (tx, mut rx) = mpsc::channel(16);
13 
14         thread::spawn(move || {
15             tx.try_send(()).unwrap();
16             drop(tx);
17         });
18 
19         let v = block_on(rx.recv());
20         assert!(v.is_some());
21 
22         let v = block_on(rx.recv());
23         assert!(v.is_none());
24     });
25 }
26 
27 #[test]
closing_unbounded_tx()28 fn closing_unbounded_tx() {
29     loom::model(|| {
30         let (tx, mut rx) = mpsc::unbounded_channel();
31 
32         thread::spawn(move || {
33             tx.send(()).unwrap();
34             drop(tx);
35         });
36 
37         let v = block_on(rx.recv());
38         assert!(v.is_some());
39 
40         let v = block_on(rx.recv());
41         assert!(v.is_none());
42     });
43 }
44 
45 #[test]
closing_bounded_rx()46 fn closing_bounded_rx() {
47     loom::model(|| {
48         let (tx1, rx) = mpsc::channel::<()>(16);
49         let tx2 = tx1.clone();
50         thread::spawn(move || {
51             drop(rx);
52         });
53 
54         block_on(tx1.closed());
55         block_on(tx2.closed());
56     });
57 }
58 
59 #[test]
closing_and_sending()60 fn closing_and_sending() {
61     loom::model(|| {
62         let (tx1, mut rx) = mpsc::channel::<()>(16);
63         let tx1 = Arc::new(tx1);
64         let tx2 = tx1.clone();
65 
66         let th1 = thread::spawn(move || {
67             tx1.try_send(()).unwrap();
68         });
69 
70         let th2 = thread::spawn(move || {
71             block_on(tx2.closed());
72         });
73 
74         let th3 = thread::spawn(move || {
75             let v = block_on(rx.recv());
76             assert!(v.is_some());
77             drop(rx);
78         });
79 
80         assert_ok!(th1.join());
81         assert_ok!(th2.join());
82         assert_ok!(th3.join());
83     });
84 }
85 
86 #[test]
closing_unbounded_rx()87 fn closing_unbounded_rx() {
88     loom::model(|| {
89         let (tx1, rx) = mpsc::unbounded_channel::<()>();
90         let tx2 = tx1.clone();
91         thread::spawn(move || {
92             drop(rx);
93         });
94 
95         block_on(tx1.closed());
96         block_on(tx2.closed());
97     });
98 }
99 
100 #[test]
dropping_tx()101 fn dropping_tx() {
102     loom::model(|| {
103         let (tx, mut rx) = mpsc::channel::<()>(16);
104 
105         for _ in 0..2 {
106             let tx = tx.clone();
107             thread::spawn(move || {
108                 drop(tx);
109             });
110         }
111         drop(tx);
112 
113         let v = block_on(rx.recv());
114         assert!(v.is_none());
115     });
116 }
117 
118 #[test]
dropping_unbounded_tx()119 fn dropping_unbounded_tx() {
120     loom::model(|| {
121         let (tx, mut rx) = mpsc::unbounded_channel::<()>();
122 
123         for _ in 0..2 {
124             let tx = tx.clone();
125             thread::spawn(move || {
126                 drop(tx);
127             });
128         }
129         drop(tx);
130 
131         let v = block_on(rx.recv());
132         assert!(v.is_none());
133     });
134 }
135 
136 #[test]
try_recv()137 fn try_recv() {
138     loom::model(|| {
139         use crate::sync::{mpsc, Semaphore};
140         use loom::sync::{Arc, Mutex};
141 
142         const PERMITS: usize = 2;
143         const TASKS: usize = 2;
144         const CYCLES: usize = 1;
145 
146         struct Context {
147             sem: Arc<Semaphore>,
148             tx: mpsc::Sender<()>,
149             rx: Mutex<mpsc::Receiver<()>>,
150         }
151 
152         fn run(ctx: &Context) {
153             block_on(async {
154                 let permit = ctx.sem.acquire().await;
155                 assert_ok!(ctx.rx.lock().unwrap().try_recv());
156                 crate::task::yield_now().await;
157                 assert_ok!(ctx.tx.clone().try_send(()));
158                 drop(permit);
159             });
160         }
161 
162         let (tx, rx) = mpsc::channel(PERMITS);
163         let sem = Arc::new(Semaphore::new(PERMITS));
164         let ctx = Arc::new(Context {
165             sem,
166             tx,
167             rx: Mutex::new(rx),
168         });
169 
170         for _ in 0..PERMITS {
171             assert_ok!(ctx.tx.clone().try_send(()));
172         }
173 
174         let mut ths = Vec::new();
175 
176         for _ in 0..TASKS {
177             let ctx = ctx.clone();
178 
179             ths.push(thread::spawn(move || {
180                 run(&ctx);
181             }));
182         }
183 
184         run(&ctx);
185 
186         for th in ths {
187             th.join().unwrap();
188         }
189     });
190 }
191