1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 use tokio::sync::broadcast;
6 use tokio_test::task;
7 use tokio_test::{
8     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
9 };
10 
11 use std::sync::Arc;
12 
13 macro_rules! assert_recv {
14     ($e:expr) => {
15         match $e.try_recv() {
16             Ok(value) => value,
17             Err(e) => panic!("expected recv; got = {:?}", e),
18         }
19     };
20 }
21 
22 macro_rules! assert_empty {
23     ($e:expr) => {
24         match $e.try_recv() {
25             Ok(value) => panic!("expected empty; got = {:?}", value),
26             Err(broadcast::TryRecvError::Empty) => {}
27             Err(e) => panic!("expected empty; got = {:?}", e),
28         }
29     };
30 }
31 
32 macro_rules! assert_lagged {
33     ($e:expr, $n:expr) => {
34         match assert_err!($e) {
35             broadcast::TryRecvError::Lagged(n) => {
36                 assert_eq!(n, $n);
37             }
38             _ => panic!("did not lag"),
39         }
40     };
41 }
42 
43 macro_rules! assert_closed {
44     ($e:expr) => {
45         match assert_err!($e) {
46             broadcast::TryRecvError::Closed => {}
47             _ => panic!("did not lag"),
48         }
49     };
50 }
51 
52 trait AssertSend: Send + Sync {}
53 impl AssertSend for broadcast::Sender<i32> {}
54 impl AssertSend for broadcast::Receiver<i32> {}
55 
56 #[test]
send_try_recv_bounded()57 fn send_try_recv_bounded() {
58     let (tx, mut rx) = broadcast::channel(16);
59 
60     assert_empty!(rx);
61 
62     let n = assert_ok!(tx.send("hello"));
63     assert_eq!(n, 1);
64 
65     let val = assert_recv!(rx);
66     assert_eq!(val, "hello");
67 
68     assert_empty!(rx);
69 }
70 
71 #[test]
send_two_recv()72 fn send_two_recv() {
73     let (tx, mut rx1) = broadcast::channel(16);
74     let mut rx2 = tx.subscribe();
75 
76     assert_empty!(rx1);
77     assert_empty!(rx2);
78 
79     let n = assert_ok!(tx.send("hello"));
80     assert_eq!(n, 2);
81 
82     let val = assert_recv!(rx1);
83     assert_eq!(val, "hello");
84 
85     let val = assert_recv!(rx2);
86     assert_eq!(val, "hello");
87 
88     assert_empty!(rx1);
89     assert_empty!(rx2);
90 }
91 
92 #[tokio::test]
send_recv_into_stream_ready()93 async fn send_recv_into_stream_ready() {
94     use tokio::stream::StreamExt;
95 
96     let (tx, rx) = broadcast::channel::<i32>(8);
97     tokio::pin! {
98         let rx = rx.into_stream();
99     }
100 
101     assert_ok!(tx.send(1));
102     assert_ok!(tx.send(2));
103 
104     assert_eq!(Some(Ok(1)), rx.next().await);
105     assert_eq!(Some(Ok(2)), rx.next().await);
106 
107     drop(tx);
108 
109     assert_eq!(None, rx.next().await);
110 }
111 
112 #[tokio::test]
send_recv_into_stream_pending()113 async fn send_recv_into_stream_pending() {
114     use tokio::stream::StreamExt;
115 
116     let (tx, rx) = broadcast::channel::<i32>(8);
117 
118     tokio::pin! {
119         let rx = rx.into_stream();
120     }
121 
122     let mut recv = task::spawn(rx.next());
123     assert_pending!(recv.poll());
124 
125     assert_ok!(tx.send(1));
126 
127     assert!(recv.is_woken());
128     let val = assert_ready!(recv.poll());
129     assert_eq!(val, Some(Ok(1)));
130 }
131 
132 #[test]
send_recv_bounded()133 fn send_recv_bounded() {
134     let (tx, mut rx) = broadcast::channel(16);
135 
136     let mut recv = task::spawn(rx.recv());
137 
138     assert_pending!(recv.poll());
139 
140     assert_ok!(tx.send("hello"));
141 
142     assert!(recv.is_woken());
143     let val = assert_ready_ok!(recv.poll());
144     assert_eq!(val, "hello");
145 }
146 
147 #[test]
send_two_recv_bounded()148 fn send_two_recv_bounded() {
149     let (tx, mut rx1) = broadcast::channel(16);
150     let mut rx2 = tx.subscribe();
151 
152     let mut recv1 = task::spawn(rx1.recv());
153     let mut recv2 = task::spawn(rx2.recv());
154 
155     assert_pending!(recv1.poll());
156     assert_pending!(recv2.poll());
157 
158     assert_ok!(tx.send("hello"));
159 
160     assert!(recv1.is_woken());
161     assert!(recv2.is_woken());
162 
163     let val1 = assert_ready_ok!(recv1.poll());
164     let val2 = assert_ready_ok!(recv2.poll());
165     assert_eq!(val1, "hello");
166     assert_eq!(val2, "hello");
167 
168     drop((recv1, recv2));
169 
170     let mut recv1 = task::spawn(rx1.recv());
171     let mut recv2 = task::spawn(rx2.recv());
172 
173     assert_pending!(recv1.poll());
174 
175     assert_ok!(tx.send("world"));
176 
177     assert!(recv1.is_woken());
178     assert!(!recv2.is_woken());
179 
180     let val1 = assert_ready_ok!(recv1.poll());
181     let val2 = assert_ready_ok!(recv2.poll());
182     assert_eq!(val1, "world");
183     assert_eq!(val2, "world");
184 }
185 
186 #[test]
change_tasks()187 fn change_tasks() {
188     let (tx, mut rx) = broadcast::channel(1);
189 
190     let mut recv = Box::pin(rx.recv());
191 
192     let mut task1 = task::spawn(&mut recv);
193     assert_pending!(task1.poll());
194 
195     let mut task2 = task::spawn(&mut recv);
196     assert_pending!(task2.poll());
197 
198     tx.send("hello").unwrap();
199 
200     assert!(task2.is_woken());
201 }
202 
203 #[test]
send_slow_rx()204 fn send_slow_rx() {
205     let (tx, mut rx1) = broadcast::channel(16);
206     let mut rx2 = tx.subscribe();
207 
208     {
209         let mut recv2 = task::spawn(rx2.recv());
210 
211         {
212             let mut recv1 = task::spawn(rx1.recv());
213 
214             assert_pending!(recv1.poll());
215             assert_pending!(recv2.poll());
216 
217             assert_ok!(tx.send("one"));
218 
219             assert!(recv1.is_woken());
220             assert!(recv2.is_woken());
221 
222             assert_ok!(tx.send("two"));
223 
224             let val = assert_ready_ok!(recv1.poll());
225             assert_eq!(val, "one");
226         }
227 
228         let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
229         assert_eq!(val, "two");
230 
231         let mut recv1 = task::spawn(rx1.recv());
232 
233         assert_pending!(recv1.poll());
234 
235         assert_ok!(tx.send("three"));
236 
237         assert!(recv1.is_woken());
238 
239         let val = assert_ready_ok!(recv1.poll());
240         assert_eq!(val, "three");
241 
242         let val = assert_ready_ok!(recv2.poll());
243         assert_eq!(val, "one");
244     }
245 
246     let val = assert_recv!(rx2);
247     assert_eq!(val, "two");
248 
249     let val = assert_recv!(rx2);
250     assert_eq!(val, "three");
251 }
252 
253 #[test]
drop_rx_while_values_remain()254 fn drop_rx_while_values_remain() {
255     let (tx, mut rx1) = broadcast::channel(16);
256     let mut rx2 = tx.subscribe();
257 
258     assert_ok!(tx.send("one"));
259     assert_ok!(tx.send("two"));
260 
261     assert_recv!(rx1);
262     assert_recv!(rx2);
263 
264     drop(rx2);
265     drop(rx1);
266 }
267 
268 #[test]
lagging_rx()269 fn lagging_rx() {
270     let (tx, mut rx1) = broadcast::channel(2);
271     let mut rx2 = tx.subscribe();
272 
273     assert_ok!(tx.send("one"));
274     assert_ok!(tx.send("two"));
275 
276     assert_eq!("one", assert_recv!(rx1));
277 
278     assert_ok!(tx.send("three"));
279 
280     // Lagged too far
281     let x = dbg!(rx2.try_recv());
282     assert_lagged!(x, 1);
283 
284     // Calling again gets the next value
285     assert_eq!("two", assert_recv!(rx2));
286 
287     assert_eq!("two", assert_recv!(rx1));
288     assert_eq!("three", assert_recv!(rx1));
289 
290     assert_ok!(tx.send("four"));
291     assert_ok!(tx.send("five"));
292 
293     assert_lagged!(rx2.try_recv(), 1);
294 
295     assert_ok!(tx.send("six"));
296 
297     assert_lagged!(rx2.try_recv(), 1);
298 }
299 
300 #[test]
send_no_rx()301 fn send_no_rx() {
302     let (tx, _) = broadcast::channel(16);
303 
304     assert_err!(tx.send("hello"));
305 
306     let mut rx = tx.subscribe();
307 
308     assert_ok!(tx.send("world"));
309 
310     let val = assert_recv!(rx);
311     assert_eq!("world", val);
312 }
313 
314 #[test]
315 #[should_panic]
zero_capacity()316 fn zero_capacity() {
317     broadcast::channel::<()>(0);
318 }
319 
320 #[test]
321 #[should_panic]
capacity_too_big()322 fn capacity_too_big() {
323     use std::usize;
324 
325     broadcast::channel::<()>(1 + (usize::MAX >> 1));
326 }
327 
328 #[test]
panic_in_clone()329 fn panic_in_clone() {
330     use std::panic::{self, AssertUnwindSafe};
331 
332     #[derive(Eq, PartialEq, Debug)]
333     struct MyVal(usize);
334 
335     impl Clone for MyVal {
336         fn clone(&self) -> MyVal {
337             assert_ne!(0, self.0);
338             MyVal(self.0)
339         }
340     }
341 
342     let (tx, mut rx) = broadcast::channel(16);
343 
344     assert_ok!(tx.send(MyVal(0)));
345     assert_ok!(tx.send(MyVal(1)));
346 
347     let res = panic::catch_unwind(AssertUnwindSafe(|| {
348         let _ = rx.try_recv();
349     }));
350 
351     assert_err!(res);
352 
353     let val = assert_recv!(rx);
354     assert_eq!(val, MyVal(1));
355 }
356 
357 #[test]
dropping_tx_notifies_rx()358 fn dropping_tx_notifies_rx() {
359     let (tx, mut rx1) = broadcast::channel::<()>(16);
360     let mut rx2 = tx.subscribe();
361 
362     let tx2 = tx.clone();
363 
364     let mut recv1 = task::spawn(rx1.recv());
365     let mut recv2 = task::spawn(rx2.recv());
366 
367     assert_pending!(recv1.poll());
368     assert_pending!(recv2.poll());
369 
370     drop(tx);
371 
372     assert_pending!(recv1.poll());
373     assert_pending!(recv2.poll());
374 
375     drop(tx2);
376 
377     assert!(recv1.is_woken());
378     assert!(recv2.is_woken());
379 
380     let err = assert_ready_err!(recv1.poll());
381     assert!(is_closed(err));
382 
383     let err = assert_ready_err!(recv2.poll());
384     assert!(is_closed(err));
385 }
386 
387 #[test]
unconsumed_messages_are_dropped()388 fn unconsumed_messages_are_dropped() {
389     let (tx, rx) = broadcast::channel(16);
390 
391     let msg = Arc::new(());
392 
393     assert_ok!(tx.send(msg.clone()));
394 
395     assert_eq!(2, Arc::strong_count(&msg));
396 
397     drop(rx);
398 
399     assert_eq!(1, Arc::strong_count(&msg));
400 }
401 
402 #[test]
single_capacity_recvs()403 fn single_capacity_recvs() {
404     let (tx, mut rx) = broadcast::channel(1);
405 
406     assert_ok!(tx.send(1));
407 
408     assert_eq!(assert_recv!(rx), 1);
409     assert_empty!(rx);
410 }
411 
412 #[test]
single_capacity_recvs_after_drop_1()413 fn single_capacity_recvs_after_drop_1() {
414     let (tx, mut rx) = broadcast::channel(1);
415 
416     assert_ok!(tx.send(1));
417     drop(tx);
418 
419     assert_eq!(assert_recv!(rx), 1);
420     assert_closed!(rx.try_recv());
421 }
422 
423 #[test]
single_capacity_recvs_after_drop_2()424 fn single_capacity_recvs_after_drop_2() {
425     let (tx, mut rx) = broadcast::channel(1);
426 
427     assert_ok!(tx.send(1));
428     assert_ok!(tx.send(2));
429     drop(tx);
430 
431     assert_lagged!(rx.try_recv(), 1);
432     assert_eq!(assert_recv!(rx), 2);
433     assert_closed!(rx.try_recv());
434 }
435 
436 #[test]
dropping_sender_does_not_overwrite()437 fn dropping_sender_does_not_overwrite() {
438     let (tx, mut rx) = broadcast::channel(2);
439 
440     assert_ok!(tx.send(1));
441     assert_ok!(tx.send(2));
442     drop(tx);
443 
444     assert_eq!(assert_recv!(rx), 1);
445     assert_eq!(assert_recv!(rx), 2);
446     assert_closed!(rx.try_recv());
447 }
448 
449 #[test]
lagging_receiver_recovers_after_wrap_closed_1()450 fn lagging_receiver_recovers_after_wrap_closed_1() {
451     let (tx, mut rx) = broadcast::channel(2);
452 
453     assert_ok!(tx.send(1));
454     assert_ok!(tx.send(2));
455     assert_ok!(tx.send(3));
456     drop(tx);
457 
458     assert_lagged!(rx.try_recv(), 1);
459     assert_eq!(assert_recv!(rx), 2);
460     assert_eq!(assert_recv!(rx), 3);
461     assert_closed!(rx.try_recv());
462 }
463 
464 #[test]
lagging_receiver_recovers_after_wrap_closed_2()465 fn lagging_receiver_recovers_after_wrap_closed_2() {
466     let (tx, mut rx) = broadcast::channel(2);
467 
468     assert_ok!(tx.send(1));
469     assert_ok!(tx.send(2));
470     assert_ok!(tx.send(3));
471     assert_ok!(tx.send(4));
472     drop(tx);
473 
474     assert_lagged!(rx.try_recv(), 2);
475     assert_eq!(assert_recv!(rx), 3);
476     assert_eq!(assert_recv!(rx), 4);
477     assert_closed!(rx.try_recv());
478 }
479 
480 #[test]
lagging_receiver_recovers_after_wrap_open()481 fn lagging_receiver_recovers_after_wrap_open() {
482     let (tx, mut rx) = broadcast::channel(2);
483 
484     assert_ok!(tx.send(1));
485     assert_ok!(tx.send(2));
486     assert_ok!(tx.send(3));
487 
488     assert_lagged!(rx.try_recv(), 1);
489     assert_eq!(assert_recv!(rx), 2);
490     assert_eq!(assert_recv!(rx), 3);
491     assert_empty!(rx);
492 }
493 
494 #[tokio::test]
send_recv_stream_ready_deprecated()495 async fn send_recv_stream_ready_deprecated() {
496     use tokio::stream::StreamExt;
497 
498     let (tx, mut rx) = broadcast::channel::<i32>(8);
499 
500     assert_ok!(tx.send(1));
501     assert_ok!(tx.send(2));
502 
503     assert_eq!(Some(Ok(1)), rx.next().await);
504     assert_eq!(Some(Ok(2)), rx.next().await);
505 
506     drop(tx);
507 
508     assert_eq!(None, rx.next().await);
509 }
510 
511 #[tokio::test]
send_recv_stream_pending_deprecated()512 async fn send_recv_stream_pending_deprecated() {
513     use tokio::stream::StreamExt;
514 
515     let (tx, mut rx) = broadcast::channel::<i32>(8);
516 
517     let mut recv = task::spawn(rx.next());
518     assert_pending!(recv.poll());
519 
520     assert_ok!(tx.send(1));
521 
522     assert!(recv.is_woken());
523     let val = assert_ready!(recv.poll());
524     assert_eq!(val, Some(Ok(1)));
525 }
526 
is_closed(err: broadcast::RecvError) -> bool527 fn is_closed(err: broadcast::RecvError) -> bool {
528     match err {
529         broadcast::RecvError::Closed => true,
530         _ => false,
531     }
532 }
533