1 use crate::sync::mpsc;
2
3 use futures::future::poll_fn;
4 use loom::future::block_on;
5 use loom::sync::Arc;
6 use loom::thread;
7 use tokio_test::assert_ok;
8
9 #[test]
closing_tx()10 fn closing_tx() {
11 loom::model(|| {
12 let (tx, mut rx) = mpsc::channel(16);
13
14 thread::spawn(move || {
15 tx.try_send(()).unwrap();
16 drop(tx);
17 });
18
19 let v = block_on(rx.recv());
20 assert!(v.is_some());
21
22 let v = block_on(rx.recv());
23 assert!(v.is_none());
24 });
25 }
26
27 #[test]
closing_unbounded_tx()28 fn closing_unbounded_tx() {
29 loom::model(|| {
30 let (tx, mut rx) = mpsc::unbounded_channel();
31
32 thread::spawn(move || {
33 tx.send(()).unwrap();
34 drop(tx);
35 });
36
37 let v = block_on(rx.recv());
38 assert!(v.is_some());
39
40 let v = block_on(rx.recv());
41 assert!(v.is_none());
42 });
43 }
44
45 #[test]
closing_bounded_rx()46 fn closing_bounded_rx() {
47 loom::model(|| {
48 let (tx1, rx) = mpsc::channel::<()>(16);
49 let tx2 = tx1.clone();
50 thread::spawn(move || {
51 drop(rx);
52 });
53
54 block_on(tx1.closed());
55 block_on(tx2.closed());
56 });
57 }
58
59 #[test]
closing_and_sending()60 fn closing_and_sending() {
61 loom::model(|| {
62 let (tx1, mut rx) = mpsc::channel::<()>(16);
63 let tx1 = Arc::new(tx1);
64 let tx2 = tx1.clone();
65
66 let th1 = thread::spawn(move || {
67 tx1.try_send(()).unwrap();
68 });
69
70 let th2 = thread::spawn(move || {
71 block_on(tx2.closed());
72 });
73
74 let th3 = thread::spawn(move || {
75 let v = block_on(rx.recv());
76 assert!(v.is_some());
77 drop(rx);
78 });
79
80 assert_ok!(th1.join());
81 assert_ok!(th2.join());
82 assert_ok!(th3.join());
83 });
84 }
85
86 #[test]
closing_unbounded_rx()87 fn closing_unbounded_rx() {
88 loom::model(|| {
89 let (tx1, rx) = mpsc::unbounded_channel::<()>();
90 let tx2 = tx1.clone();
91 thread::spawn(move || {
92 drop(rx);
93 });
94
95 block_on(tx1.closed());
96 block_on(tx2.closed());
97 });
98 }
99
100 #[test]
dropping_tx()101 fn dropping_tx() {
102 loom::model(|| {
103 let (tx, mut rx) = mpsc::channel::<()>(16);
104
105 for _ in 0..2 {
106 let tx = tx.clone();
107 thread::spawn(move || {
108 drop(tx);
109 });
110 }
111 drop(tx);
112
113 let v = block_on(rx.recv());
114 assert!(v.is_none());
115 });
116 }
117
118 #[test]
dropping_unbounded_tx()119 fn dropping_unbounded_tx() {
120 loom::model(|| {
121 let (tx, mut rx) = mpsc::unbounded_channel::<()>();
122
123 for _ in 0..2 {
124 let tx = tx.clone();
125 thread::spawn(move || {
126 drop(tx);
127 });
128 }
129 drop(tx);
130
131 let v = block_on(rx.recv());
132 assert!(v.is_none());
133 });
134 }
135
136 #[test]
try_recv()137 fn try_recv() {
138 loom::model(|| {
139 use crate::sync::{mpsc, Semaphore};
140 use loom::sync::{Arc, Mutex};
141
142 const PERMITS: usize = 2;
143 const TASKS: usize = 2;
144 const CYCLES: usize = 1;
145
146 struct Context {
147 sem: Arc<Semaphore>,
148 tx: mpsc::Sender<()>,
149 rx: Mutex<mpsc::Receiver<()>>,
150 }
151
152 fn run(ctx: &Context) {
153 block_on(async {
154 let permit = ctx.sem.acquire().await;
155 assert_ok!(ctx.rx.lock().unwrap().try_recv());
156 crate::task::yield_now().await;
157 assert_ok!(ctx.tx.clone().try_send(()));
158 drop(permit);
159 });
160 }
161
162 let (tx, rx) = mpsc::channel(PERMITS);
163 let sem = Arc::new(Semaphore::new(PERMITS));
164 let ctx = Arc::new(Context {
165 sem,
166 tx,
167 rx: Mutex::new(rx),
168 });
169
170 for _ in 0..PERMITS {
171 assert_ok!(ctx.tx.clone().try_send(()));
172 }
173
174 let mut ths = Vec::new();
175
176 for _ in 0..TASKS {
177 let ctx = ctx.clone();
178
179 ths.push(thread::spawn(move || {
180 run(&ctx);
181 }));
182 }
183
184 run(&ctx);
185
186 for th in ths {
187 th.join().unwrap();
188 }
189 });
190 }
191