1 use crate::sync::broadcast;
2 use crate::sync::broadcast::error::RecvError::{Closed, Lagged};
3 
4 use loom::future::block_on;
5 use loom::sync::Arc;
6 use loom::thread;
7 use tokio_test::{assert_err, assert_ok};
8 
9 #[test]
10 fn broadcast_send() {
11     loom::model(|| {
12         let (tx1, mut rx) = broadcast::channel(2);
13         let tx1 = Arc::new(tx1);
14         let tx2 = tx1.clone();
15 
16         let th1 = thread::spawn(move || {
17             block_on(async {
18                 assert_ok!(tx1.send("one"));
19                 assert_ok!(tx1.send("two"));
20                 assert_ok!(tx1.send("three"));
21             });
22         });
23 
24         let th2 = thread::spawn(move || {
25             block_on(async {
26                 assert_ok!(tx2.send("eins"));
27                 assert_ok!(tx2.send("zwei"));
28                 assert_ok!(tx2.send("drei"));
29             });
30         });
31 
32         block_on(async {
33             let mut num = 0;
34             loop {
35                 match rx.recv().await {
36                     Ok(_) => num += 1,
37                     Err(Closed) => break,
38                     Err(Lagged(n)) => num += n as usize,
39                 }
40             }
41             assert_eq!(num, 6);
42         });
43 
44         assert_ok!(th1.join());
45         assert_ok!(th2.join());
46     });
47 }
48 
49 // An `Arc` is used as the value in order to detect memory leaks.
50 #[test]
51 fn broadcast_two() {
52     loom::model(|| {
53         let (tx, mut rx1) = broadcast::channel::<Arc<&'static str>>(16);
54         let mut rx2 = tx.subscribe();
55 
56         let th1 = thread::spawn(move || {
57             block_on(async {
58                 let v = assert_ok!(rx1.recv().await);
59                 assert_eq!(*v, "hello");
60 
61                 let v = assert_ok!(rx1.recv().await);
62                 assert_eq!(*v, "world");
63 
64                 match assert_err!(rx1.recv().await) {
65                     Closed => {}
66                     _ => panic!(),
67                 }
68             });
69         });
70 
71         let th2 = thread::spawn(move || {
72             block_on(async {
73                 let v = assert_ok!(rx2.recv().await);
74                 assert_eq!(*v, "hello");
75 
76                 let v = assert_ok!(rx2.recv().await);
77                 assert_eq!(*v, "world");
78 
79                 match assert_err!(rx2.recv().await) {
80                     Closed => {}
81                     _ => panic!(),
82                 }
83             });
84         });
85 
86         assert_ok!(tx.send(Arc::new("hello")));
87         assert_ok!(tx.send(Arc::new("world")));
88         drop(tx);
89 
90         assert_ok!(th1.join());
91         assert_ok!(th2.join());
92     });
93 }
94 
95 #[test]
96 fn broadcast_wrap() {
97     loom::model(|| {
98         let (tx, mut rx1) = broadcast::channel(2);
99         let mut rx2 = tx.subscribe();
100 
101         let th1 = thread::spawn(move || {
102             block_on(async {
103                 let mut num = 0;
104 
105                 loop {
106                     match rx1.recv().await {
107                         Ok(_) => num += 1,
108                         Err(Closed) => break,
109                         Err(Lagged(n)) => num += n as usize,
110                     }
111                 }
112 
113                 assert_eq!(num, 3);
114             });
115         });
116 
117         let th2 = thread::spawn(move || {
118             block_on(async {
119                 let mut num = 0;
120 
121                 loop {
122                     match rx2.recv().await {
123                         Ok(_) => num += 1,
124                         Err(Closed) => break,
125                         Err(Lagged(n)) => num += n as usize,
126                     }
127                 }
128 
129                 assert_eq!(num, 3);
130             });
131         });
132 
133         assert_ok!(tx.send("one"));
134         assert_ok!(tx.send("two"));
135         assert_ok!(tx.send("three"));
136 
137         drop(tx);
138 
139         assert_ok!(th1.join());
140         assert_ok!(th2.join());
141     });
142 }
143 
144 #[test]
145 fn drop_rx() {
146     loom::model(|| {
147         let (tx, mut rx1) = broadcast::channel(16);
148         let rx2 = tx.subscribe();
149 
150         let th1 = thread::spawn(move || {
151             block_on(async {
152                 let v = assert_ok!(rx1.recv().await);
153                 assert_eq!(v, "one");
154 
155                 let v = assert_ok!(rx1.recv().await);
156                 assert_eq!(v, "two");
157 
158                 let v = assert_ok!(rx1.recv().await);
159                 assert_eq!(v, "three");
160 
161                 match assert_err!(rx1.recv().await) {
162                     Closed => {}
163                     _ => panic!(),
164                 }
165             });
166         });
167 
168         let th2 = thread::spawn(move || {
169             drop(rx2);
170         });
171 
172         assert_ok!(tx.send("one"));
173         assert_ok!(tx.send("two"));
174         assert_ok!(tx.send("three"));
175         drop(tx);
176 
177         assert_ok!(th1.join());
178         assert_ok!(th2.join());
179     });
180 }
181 
182 #[test]
183 fn drop_multiple_rx_with_overflow() {
184     loom::model(move || {
185         // It is essential to have multiple senders and receivers in this test case.
186         let (tx, mut rx) = broadcast::channel(1);
187         let _rx2 = tx.subscribe();
188 
189         let _ = tx.send(());
190         let tx2 = tx.clone();
191         let th1 = thread::spawn(move || {
192             block_on(async {
193                 for _ in 0..100 {
194                     let _ = tx2.send(());
195                 }
196             });
197         });
198         let _ = tx.send(());
199 
200         let th2 = thread::spawn(move || {
201             block_on(async { while let Ok(_) = rx.recv().await {} });
202         });
203 
204         assert_ok!(th1.join());
205         assert_ok!(th2.join());
206     });
207 }
208