1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 use tokio::sync::broadcast;
6 use tokio_test::task;
7 use tokio_test::{
8     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
9 };
10 
11 use std::sync::Arc;
12 
13 macro_rules! assert_recv {
14     ($e:expr) => {
15         match $e.try_recv() {
16             Ok(value) => value,
17             Err(e) => panic!("expected recv; got = {:?}", e),
18         }
19     };
20 }
21 
22 macro_rules! assert_empty {
23     ($e:expr) => {
24         match $e.try_recv() {
25             Ok(value) => panic!("expected empty; got = {:?}", value),
26             Err(broadcast::error::TryRecvError::Empty) => {}
27             Err(e) => panic!("expected empty; got = {:?}", e),
28         }
29     };
30 }
31 
32 macro_rules! assert_lagged {
33     ($e:expr, $n:expr) => {
34         match assert_err!($e) {
35             broadcast::error::TryRecvError::Lagged(n) => {
36                 assert_eq!(n, $n);
37             }
38             _ => panic!("did not lag"),
39         }
40     };
41 }
42 
43 macro_rules! assert_closed {
44     ($e:expr) => {
45         match assert_err!($e) {
46             broadcast::error::TryRecvError::Closed => {}
47             _ => panic!("did not lag"),
48         }
49     };
50 }
51 
52 trait AssertSend: Send + Sync {}
53 impl AssertSend for broadcast::Sender<i32> {}
54 impl AssertSend for broadcast::Receiver<i32> {}
55 
56 #[test]
send_try_recv_bounded()57 fn send_try_recv_bounded() {
58     let (tx, mut rx) = broadcast::channel(16);
59 
60     assert_empty!(rx);
61 
62     let n = assert_ok!(tx.send("hello"));
63     assert_eq!(n, 1);
64 
65     let val = assert_recv!(rx);
66     assert_eq!(val, "hello");
67 
68     assert_empty!(rx);
69 }
70 
71 #[test]
send_two_recv()72 fn send_two_recv() {
73     let (tx, mut rx1) = broadcast::channel(16);
74     let mut rx2 = tx.subscribe();
75 
76     assert_empty!(rx1);
77     assert_empty!(rx2);
78 
79     let n = assert_ok!(tx.send("hello"));
80     assert_eq!(n, 2);
81 
82     let val = assert_recv!(rx1);
83     assert_eq!(val, "hello");
84 
85     let val = assert_recv!(rx2);
86     assert_eq!(val, "hello");
87 
88     assert_empty!(rx1);
89     assert_empty!(rx2);
90 }
91 
92 #[test]
send_recv_bounded()93 fn send_recv_bounded() {
94     let (tx, mut rx) = broadcast::channel(16);
95 
96     let mut recv = task::spawn(rx.recv());
97 
98     assert_pending!(recv.poll());
99 
100     assert_ok!(tx.send("hello"));
101 
102     assert!(recv.is_woken());
103     let val = assert_ready_ok!(recv.poll());
104     assert_eq!(val, "hello");
105 }
106 
107 #[test]
send_two_recv_bounded()108 fn send_two_recv_bounded() {
109     let (tx, mut rx1) = broadcast::channel(16);
110     let mut rx2 = tx.subscribe();
111 
112     let mut recv1 = task::spawn(rx1.recv());
113     let mut recv2 = task::spawn(rx2.recv());
114 
115     assert_pending!(recv1.poll());
116     assert_pending!(recv2.poll());
117 
118     assert_ok!(tx.send("hello"));
119 
120     assert!(recv1.is_woken());
121     assert!(recv2.is_woken());
122 
123     let val1 = assert_ready_ok!(recv1.poll());
124     let val2 = assert_ready_ok!(recv2.poll());
125     assert_eq!(val1, "hello");
126     assert_eq!(val2, "hello");
127 
128     drop((recv1, recv2));
129 
130     let mut recv1 = task::spawn(rx1.recv());
131     let mut recv2 = task::spawn(rx2.recv());
132 
133     assert_pending!(recv1.poll());
134 
135     assert_ok!(tx.send("world"));
136 
137     assert!(recv1.is_woken());
138     assert!(!recv2.is_woken());
139 
140     let val1 = assert_ready_ok!(recv1.poll());
141     let val2 = assert_ready_ok!(recv2.poll());
142     assert_eq!(val1, "world");
143     assert_eq!(val2, "world");
144 }
145 
146 #[test]
change_tasks()147 fn change_tasks() {
148     let (tx, mut rx) = broadcast::channel(1);
149 
150     let mut recv = Box::pin(rx.recv());
151 
152     let mut task1 = task::spawn(&mut recv);
153     assert_pending!(task1.poll());
154 
155     let mut task2 = task::spawn(&mut recv);
156     assert_pending!(task2.poll());
157 
158     tx.send("hello").unwrap();
159 
160     assert!(task2.is_woken());
161 }
162 
163 #[test]
send_slow_rx()164 fn send_slow_rx() {
165     let (tx, mut rx1) = broadcast::channel(16);
166     let mut rx2 = tx.subscribe();
167 
168     {
169         let mut recv2 = task::spawn(rx2.recv());
170 
171         {
172             let mut recv1 = task::spawn(rx1.recv());
173 
174             assert_pending!(recv1.poll());
175             assert_pending!(recv2.poll());
176 
177             assert_ok!(tx.send("one"));
178 
179             assert!(recv1.is_woken());
180             assert!(recv2.is_woken());
181 
182             assert_ok!(tx.send("two"));
183 
184             let val = assert_ready_ok!(recv1.poll());
185             assert_eq!(val, "one");
186         }
187 
188         let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
189         assert_eq!(val, "two");
190 
191         let mut recv1 = task::spawn(rx1.recv());
192 
193         assert_pending!(recv1.poll());
194 
195         assert_ok!(tx.send("three"));
196 
197         assert!(recv1.is_woken());
198 
199         let val = assert_ready_ok!(recv1.poll());
200         assert_eq!(val, "three");
201 
202         let val = assert_ready_ok!(recv2.poll());
203         assert_eq!(val, "one");
204     }
205 
206     let val = assert_recv!(rx2);
207     assert_eq!(val, "two");
208 
209     let val = assert_recv!(rx2);
210     assert_eq!(val, "three");
211 }
212 
213 #[test]
drop_rx_while_values_remain()214 fn drop_rx_while_values_remain() {
215     let (tx, mut rx1) = broadcast::channel(16);
216     let mut rx2 = tx.subscribe();
217 
218     assert_ok!(tx.send("one"));
219     assert_ok!(tx.send("two"));
220 
221     assert_recv!(rx1);
222     assert_recv!(rx2);
223 
224     drop(rx2);
225     drop(rx1);
226 }
227 
228 #[test]
lagging_rx()229 fn lagging_rx() {
230     let (tx, mut rx1) = broadcast::channel(2);
231     let mut rx2 = tx.subscribe();
232 
233     assert_ok!(tx.send("one"));
234     assert_ok!(tx.send("two"));
235 
236     assert_eq!("one", assert_recv!(rx1));
237 
238     assert_ok!(tx.send("three"));
239 
240     // Lagged too far
241     let x = dbg!(rx2.try_recv());
242     assert_lagged!(x, 1);
243 
244     // Calling again gets the next value
245     assert_eq!("two", assert_recv!(rx2));
246 
247     assert_eq!("two", assert_recv!(rx1));
248     assert_eq!("three", assert_recv!(rx1));
249 
250     assert_ok!(tx.send("four"));
251     assert_ok!(tx.send("five"));
252 
253     assert_lagged!(rx2.try_recv(), 1);
254 
255     assert_ok!(tx.send("six"));
256 
257     assert_lagged!(rx2.try_recv(), 1);
258 }
259 
260 #[test]
send_no_rx()261 fn send_no_rx() {
262     let (tx, _) = broadcast::channel(16);
263 
264     assert_err!(tx.send("hello"));
265 
266     let mut rx = tx.subscribe();
267 
268     assert_ok!(tx.send("world"));
269 
270     let val = assert_recv!(rx);
271     assert_eq!("world", val);
272 }
273 
274 #[test]
275 #[should_panic]
zero_capacity()276 fn zero_capacity() {
277     broadcast::channel::<()>(0);
278 }
279 
280 #[test]
281 #[should_panic]
capacity_too_big()282 fn capacity_too_big() {
283     use std::usize;
284 
285     broadcast::channel::<()>(1 + (usize::MAX >> 1));
286 }
287 
288 #[test]
panic_in_clone()289 fn panic_in_clone() {
290     use std::panic::{self, AssertUnwindSafe};
291 
292     #[derive(Eq, PartialEq, Debug)]
293     struct MyVal(usize);
294 
295     impl Clone for MyVal {
296         fn clone(&self) -> MyVal {
297             assert_ne!(0, self.0);
298             MyVal(self.0)
299         }
300     }
301 
302     let (tx, mut rx) = broadcast::channel(16);
303 
304     assert_ok!(tx.send(MyVal(0)));
305     assert_ok!(tx.send(MyVal(1)));
306 
307     let res = panic::catch_unwind(AssertUnwindSafe(|| {
308         let _ = rx.try_recv();
309     }));
310 
311     assert_err!(res);
312 
313     let val = assert_recv!(rx);
314     assert_eq!(val, MyVal(1));
315 }
316 
317 #[test]
dropping_tx_notifies_rx()318 fn dropping_tx_notifies_rx() {
319     let (tx, mut rx1) = broadcast::channel::<()>(16);
320     let mut rx2 = tx.subscribe();
321 
322     let tx2 = tx.clone();
323 
324     let mut recv1 = task::spawn(rx1.recv());
325     let mut recv2 = task::spawn(rx2.recv());
326 
327     assert_pending!(recv1.poll());
328     assert_pending!(recv2.poll());
329 
330     drop(tx);
331 
332     assert_pending!(recv1.poll());
333     assert_pending!(recv2.poll());
334 
335     drop(tx2);
336 
337     assert!(recv1.is_woken());
338     assert!(recv2.is_woken());
339 
340     let err = assert_ready_err!(recv1.poll());
341     assert!(is_closed(err));
342 
343     let err = assert_ready_err!(recv2.poll());
344     assert!(is_closed(err));
345 }
346 
347 #[test]
unconsumed_messages_are_dropped()348 fn unconsumed_messages_are_dropped() {
349     let (tx, rx) = broadcast::channel(16);
350 
351     let msg = Arc::new(());
352 
353     assert_ok!(tx.send(msg.clone()));
354 
355     assert_eq!(2, Arc::strong_count(&msg));
356 
357     drop(rx);
358 
359     assert_eq!(1, Arc::strong_count(&msg));
360 }
361 
362 #[test]
single_capacity_recvs()363 fn single_capacity_recvs() {
364     let (tx, mut rx) = broadcast::channel(1);
365 
366     assert_ok!(tx.send(1));
367 
368     assert_eq!(assert_recv!(rx), 1);
369     assert_empty!(rx);
370 }
371 
372 #[test]
single_capacity_recvs_after_drop_1()373 fn single_capacity_recvs_after_drop_1() {
374     let (tx, mut rx) = broadcast::channel(1);
375 
376     assert_ok!(tx.send(1));
377     drop(tx);
378 
379     assert_eq!(assert_recv!(rx), 1);
380     assert_closed!(rx.try_recv());
381 }
382 
383 #[test]
single_capacity_recvs_after_drop_2()384 fn single_capacity_recvs_after_drop_2() {
385     let (tx, mut rx) = broadcast::channel(1);
386 
387     assert_ok!(tx.send(1));
388     assert_ok!(tx.send(2));
389     drop(tx);
390 
391     assert_lagged!(rx.try_recv(), 1);
392     assert_eq!(assert_recv!(rx), 2);
393     assert_closed!(rx.try_recv());
394 }
395 
396 #[test]
dropping_sender_does_not_overwrite()397 fn dropping_sender_does_not_overwrite() {
398     let (tx, mut rx) = broadcast::channel(2);
399 
400     assert_ok!(tx.send(1));
401     assert_ok!(tx.send(2));
402     drop(tx);
403 
404     assert_eq!(assert_recv!(rx), 1);
405     assert_eq!(assert_recv!(rx), 2);
406     assert_closed!(rx.try_recv());
407 }
408 
409 #[test]
lagging_receiver_recovers_after_wrap_closed_1()410 fn lagging_receiver_recovers_after_wrap_closed_1() {
411     let (tx, mut rx) = broadcast::channel(2);
412 
413     assert_ok!(tx.send(1));
414     assert_ok!(tx.send(2));
415     assert_ok!(tx.send(3));
416     drop(tx);
417 
418     assert_lagged!(rx.try_recv(), 1);
419     assert_eq!(assert_recv!(rx), 2);
420     assert_eq!(assert_recv!(rx), 3);
421     assert_closed!(rx.try_recv());
422 }
423 
424 #[test]
lagging_receiver_recovers_after_wrap_closed_2()425 fn lagging_receiver_recovers_after_wrap_closed_2() {
426     let (tx, mut rx) = broadcast::channel(2);
427 
428     assert_ok!(tx.send(1));
429     assert_ok!(tx.send(2));
430     assert_ok!(tx.send(3));
431     assert_ok!(tx.send(4));
432     drop(tx);
433 
434     assert_lagged!(rx.try_recv(), 2);
435     assert_eq!(assert_recv!(rx), 3);
436     assert_eq!(assert_recv!(rx), 4);
437     assert_closed!(rx.try_recv());
438 }
439 
440 #[test]
lagging_receiver_recovers_after_wrap_open()441 fn lagging_receiver_recovers_after_wrap_open() {
442     let (tx, mut rx) = broadcast::channel(2);
443 
444     assert_ok!(tx.send(1));
445     assert_ok!(tx.send(2));
446     assert_ok!(tx.send(3));
447 
448     assert_lagged!(rx.try_recv(), 1);
449     assert_eq!(assert_recv!(rx), 2);
450     assert_eq!(assert_recv!(rx), 3);
451     assert_empty!(rx);
452 }
453 
is_closed(err: broadcast::error::RecvError) -> bool454 fn is_closed(err: broadcast::error::RecvError) -> bool {
455     matches!(err, broadcast::error::RecvError::Closed)
456 }
457