1 use crate::sync::broadcast;
2 use crate::sync::broadcast::error::RecvError::{Closed, Lagged};
3
4 use loom::future::block_on;
5 use loom::sync::Arc;
6 use loom::thread;
7 use tokio_test::{assert_err, assert_ok};
8
9 #[test]
broadcast_send()10 fn broadcast_send() {
11 loom::model(|| {
12 let (tx1, mut rx) = broadcast::channel(2);
13 let tx1 = Arc::new(tx1);
14 let tx2 = tx1.clone();
15
16 let th1 = thread::spawn(move || {
17 block_on(async {
18 assert_ok!(tx1.send("one"));
19 assert_ok!(tx1.send("two"));
20 assert_ok!(tx1.send("three"));
21 });
22 });
23
24 let th2 = thread::spawn(move || {
25 block_on(async {
26 assert_ok!(tx2.send("eins"));
27 assert_ok!(tx2.send("zwei"));
28 assert_ok!(tx2.send("drei"));
29 });
30 });
31
32 block_on(async {
33 let mut num = 0;
34 loop {
35 match rx.recv().await {
36 Ok(_) => num += 1,
37 Err(Closed) => break,
38 Err(Lagged(n)) => num += n as usize,
39 }
40 }
41 assert_eq!(num, 6);
42 });
43
44 assert_ok!(th1.join());
45 assert_ok!(th2.join());
46 });
47 }
48
49 // An `Arc` is used as the value in order to detect memory leaks.
50 #[test]
broadcast_two()51 fn broadcast_two() {
52 loom::model(|| {
53 let (tx, mut rx1) = broadcast::channel::<Arc<&'static str>>(16);
54 let mut rx2 = tx.subscribe();
55
56 let th1 = thread::spawn(move || {
57 block_on(async {
58 let v = assert_ok!(rx1.recv().await);
59 assert_eq!(*v, "hello");
60
61 let v = assert_ok!(rx1.recv().await);
62 assert_eq!(*v, "world");
63
64 match assert_err!(rx1.recv().await) {
65 Closed => {}
66 _ => panic!(),
67 }
68 });
69 });
70
71 let th2 = thread::spawn(move || {
72 block_on(async {
73 let v = assert_ok!(rx2.recv().await);
74 assert_eq!(*v, "hello");
75
76 let v = assert_ok!(rx2.recv().await);
77 assert_eq!(*v, "world");
78
79 match assert_err!(rx2.recv().await) {
80 Closed => {}
81 _ => panic!(),
82 }
83 });
84 });
85
86 assert_ok!(tx.send(Arc::new("hello")));
87 assert_ok!(tx.send(Arc::new("world")));
88 drop(tx);
89
90 assert_ok!(th1.join());
91 assert_ok!(th2.join());
92 });
93 }
94
95 #[test]
broadcast_wrap()96 fn broadcast_wrap() {
97 loom::model(|| {
98 let (tx, mut rx1) = broadcast::channel(2);
99 let mut rx2 = tx.subscribe();
100
101 let th1 = thread::spawn(move || {
102 block_on(async {
103 let mut num = 0;
104
105 loop {
106 match rx1.recv().await {
107 Ok(_) => num += 1,
108 Err(Closed) => break,
109 Err(Lagged(n)) => num += n as usize,
110 }
111 }
112
113 assert_eq!(num, 3);
114 });
115 });
116
117 let th2 = thread::spawn(move || {
118 block_on(async {
119 let mut num = 0;
120
121 loop {
122 match rx2.recv().await {
123 Ok(_) => num += 1,
124 Err(Closed) => break,
125 Err(Lagged(n)) => num += n as usize,
126 }
127 }
128
129 assert_eq!(num, 3);
130 });
131 });
132
133 assert_ok!(tx.send("one"));
134 assert_ok!(tx.send("two"));
135 assert_ok!(tx.send("three"));
136
137 drop(tx);
138
139 assert_ok!(th1.join());
140 assert_ok!(th2.join());
141 });
142 }
143
144 #[test]
drop_rx()145 fn drop_rx() {
146 loom::model(|| {
147 let (tx, mut rx1) = broadcast::channel(16);
148 let rx2 = tx.subscribe();
149
150 let th1 = thread::spawn(move || {
151 block_on(async {
152 let v = assert_ok!(rx1.recv().await);
153 assert_eq!(v, "one");
154
155 let v = assert_ok!(rx1.recv().await);
156 assert_eq!(v, "two");
157
158 let v = assert_ok!(rx1.recv().await);
159 assert_eq!(v, "three");
160
161 match assert_err!(rx1.recv().await) {
162 Closed => {}
163 _ => panic!(),
164 }
165 });
166 });
167
168 let th2 = thread::spawn(move || {
169 drop(rx2);
170 });
171
172 assert_ok!(tx.send("one"));
173 assert_ok!(tx.send("two"));
174 assert_ok!(tx.send("three"));
175 drop(tx);
176
177 assert_ok!(th1.join());
178 assert_ok!(th2.join());
179 });
180 }
181
182 #[test]
drop_multiple_rx_with_overflow()183 fn drop_multiple_rx_with_overflow() {
184 loom::model(move || {
185 // It is essential to have multiple senders and receivers in this test case.
186 let (tx, mut rx) = broadcast::channel(1);
187 let _rx2 = tx.subscribe();
188
189 let _ = tx.send(());
190 let tx2 = tx.clone();
191 let th1 = thread::spawn(move || {
192 block_on(async {
193 for _ in 0..100 {
194 let _ = tx2.send(());
195 }
196 });
197 });
198 let _ = tx.send(());
199
200 let th2 = thread::spawn(move || {
201 block_on(async { while let Ok(_) = rx.recv().await {} });
202 });
203
204 assert_ok!(th1.join());
205 assert_ok!(th2.join());
206 });
207 }
208