1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4
5 use tokio::sync::broadcast;
6 use tokio_test::task;
7 use tokio_test::{
8 assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
9 };
10
11 use std::sync::Arc;
12
13 macro_rules! assert_recv {
14 ($e:expr) => {
15 match $e.try_recv() {
16 Ok(value) => value,
17 Err(e) => panic!("expected recv; got = {:?}", e),
18 }
19 };
20 }
21
22 macro_rules! assert_empty {
23 ($e:expr) => {
24 match $e.try_recv() {
25 Ok(value) => panic!("expected empty; got = {:?}", value),
26 Err(broadcast::TryRecvError::Empty) => {}
27 Err(e) => panic!("expected empty; got = {:?}", e),
28 }
29 };
30 }
31
32 macro_rules! assert_lagged {
33 ($e:expr, $n:expr) => {
34 match assert_err!($e) {
35 broadcast::TryRecvError::Lagged(n) => {
36 assert_eq!(n, $n);
37 }
38 _ => panic!("did not lag"),
39 }
40 };
41 }
42
43 macro_rules! assert_closed {
44 ($e:expr) => {
45 match assert_err!($e) {
46 broadcast::TryRecvError::Closed => {}
47 _ => panic!("did not lag"),
48 }
49 };
50 }
51
52 trait AssertSend: Send + Sync {}
53 impl AssertSend for broadcast::Sender<i32> {}
54 impl AssertSend for broadcast::Receiver<i32> {}
55
56 #[test]
send_try_recv_bounded()57 fn send_try_recv_bounded() {
58 let (tx, mut rx) = broadcast::channel(16);
59
60 assert_empty!(rx);
61
62 let n = assert_ok!(tx.send("hello"));
63 assert_eq!(n, 1);
64
65 let val = assert_recv!(rx);
66 assert_eq!(val, "hello");
67
68 assert_empty!(rx);
69 }
70
71 #[test]
send_two_recv()72 fn send_two_recv() {
73 let (tx, mut rx1) = broadcast::channel(16);
74 let mut rx2 = tx.subscribe();
75
76 assert_empty!(rx1);
77 assert_empty!(rx2);
78
79 let n = assert_ok!(tx.send("hello"));
80 assert_eq!(n, 2);
81
82 let val = assert_recv!(rx1);
83 assert_eq!(val, "hello");
84
85 let val = assert_recv!(rx2);
86 assert_eq!(val, "hello");
87
88 assert_empty!(rx1);
89 assert_empty!(rx2);
90 }
91
92 #[tokio::test]
send_recv_into_stream_ready()93 async fn send_recv_into_stream_ready() {
94 use tokio::stream::StreamExt;
95
96 let (tx, rx) = broadcast::channel::<i32>(8);
97 tokio::pin! {
98 let rx = rx.into_stream();
99 }
100
101 assert_ok!(tx.send(1));
102 assert_ok!(tx.send(2));
103
104 assert_eq!(Some(Ok(1)), rx.next().await);
105 assert_eq!(Some(Ok(2)), rx.next().await);
106
107 drop(tx);
108
109 assert_eq!(None, rx.next().await);
110 }
111
112 #[tokio::test]
send_recv_into_stream_pending()113 async fn send_recv_into_stream_pending() {
114 use tokio::stream::StreamExt;
115
116 let (tx, rx) = broadcast::channel::<i32>(8);
117
118 tokio::pin! {
119 let rx = rx.into_stream();
120 }
121
122 let mut recv = task::spawn(rx.next());
123 assert_pending!(recv.poll());
124
125 assert_ok!(tx.send(1));
126
127 assert!(recv.is_woken());
128 let val = assert_ready!(recv.poll());
129 assert_eq!(val, Some(Ok(1)));
130 }
131
132 #[test]
send_recv_bounded()133 fn send_recv_bounded() {
134 let (tx, mut rx) = broadcast::channel(16);
135
136 let mut recv = task::spawn(rx.recv());
137
138 assert_pending!(recv.poll());
139
140 assert_ok!(tx.send("hello"));
141
142 assert!(recv.is_woken());
143 let val = assert_ready_ok!(recv.poll());
144 assert_eq!(val, "hello");
145 }
146
147 #[test]
send_two_recv_bounded()148 fn send_two_recv_bounded() {
149 let (tx, mut rx1) = broadcast::channel(16);
150 let mut rx2 = tx.subscribe();
151
152 let mut recv1 = task::spawn(rx1.recv());
153 let mut recv2 = task::spawn(rx2.recv());
154
155 assert_pending!(recv1.poll());
156 assert_pending!(recv2.poll());
157
158 assert_ok!(tx.send("hello"));
159
160 assert!(recv1.is_woken());
161 assert!(recv2.is_woken());
162
163 let val1 = assert_ready_ok!(recv1.poll());
164 let val2 = assert_ready_ok!(recv2.poll());
165 assert_eq!(val1, "hello");
166 assert_eq!(val2, "hello");
167
168 drop((recv1, recv2));
169
170 let mut recv1 = task::spawn(rx1.recv());
171 let mut recv2 = task::spawn(rx2.recv());
172
173 assert_pending!(recv1.poll());
174
175 assert_ok!(tx.send("world"));
176
177 assert!(recv1.is_woken());
178 assert!(!recv2.is_woken());
179
180 let val1 = assert_ready_ok!(recv1.poll());
181 let val2 = assert_ready_ok!(recv2.poll());
182 assert_eq!(val1, "world");
183 assert_eq!(val2, "world");
184 }
185
186 #[test]
change_tasks()187 fn change_tasks() {
188 let (tx, mut rx) = broadcast::channel(1);
189
190 let mut recv = Box::pin(rx.recv());
191
192 let mut task1 = task::spawn(&mut recv);
193 assert_pending!(task1.poll());
194
195 let mut task2 = task::spawn(&mut recv);
196 assert_pending!(task2.poll());
197
198 tx.send("hello").unwrap();
199
200 assert!(task2.is_woken());
201 }
202
203 #[test]
send_slow_rx()204 fn send_slow_rx() {
205 let (tx, mut rx1) = broadcast::channel(16);
206 let mut rx2 = tx.subscribe();
207
208 {
209 let mut recv2 = task::spawn(rx2.recv());
210
211 {
212 let mut recv1 = task::spawn(rx1.recv());
213
214 assert_pending!(recv1.poll());
215 assert_pending!(recv2.poll());
216
217 assert_ok!(tx.send("one"));
218
219 assert!(recv1.is_woken());
220 assert!(recv2.is_woken());
221
222 assert_ok!(tx.send("two"));
223
224 let val = assert_ready_ok!(recv1.poll());
225 assert_eq!(val, "one");
226 }
227
228 let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
229 assert_eq!(val, "two");
230
231 let mut recv1 = task::spawn(rx1.recv());
232
233 assert_pending!(recv1.poll());
234
235 assert_ok!(tx.send("three"));
236
237 assert!(recv1.is_woken());
238
239 let val = assert_ready_ok!(recv1.poll());
240 assert_eq!(val, "three");
241
242 let val = assert_ready_ok!(recv2.poll());
243 assert_eq!(val, "one");
244 }
245
246 let val = assert_recv!(rx2);
247 assert_eq!(val, "two");
248
249 let val = assert_recv!(rx2);
250 assert_eq!(val, "three");
251 }
252
253 #[test]
drop_rx_while_values_remain()254 fn drop_rx_while_values_remain() {
255 let (tx, mut rx1) = broadcast::channel(16);
256 let mut rx2 = tx.subscribe();
257
258 assert_ok!(tx.send("one"));
259 assert_ok!(tx.send("two"));
260
261 assert_recv!(rx1);
262 assert_recv!(rx2);
263
264 drop(rx2);
265 drop(rx1);
266 }
267
268 #[test]
lagging_rx()269 fn lagging_rx() {
270 let (tx, mut rx1) = broadcast::channel(2);
271 let mut rx2 = tx.subscribe();
272
273 assert_ok!(tx.send("one"));
274 assert_ok!(tx.send("two"));
275
276 assert_eq!("one", assert_recv!(rx1));
277
278 assert_ok!(tx.send("three"));
279
280 // Lagged too far
281 let x = dbg!(rx2.try_recv());
282 assert_lagged!(x, 1);
283
284 // Calling again gets the next value
285 assert_eq!("two", assert_recv!(rx2));
286
287 assert_eq!("two", assert_recv!(rx1));
288 assert_eq!("three", assert_recv!(rx1));
289
290 assert_ok!(tx.send("four"));
291 assert_ok!(tx.send("five"));
292
293 assert_lagged!(rx2.try_recv(), 1);
294
295 assert_ok!(tx.send("six"));
296
297 assert_lagged!(rx2.try_recv(), 1);
298 }
299
300 #[test]
send_no_rx()301 fn send_no_rx() {
302 let (tx, _) = broadcast::channel(16);
303
304 assert_err!(tx.send("hello"));
305
306 let mut rx = tx.subscribe();
307
308 assert_ok!(tx.send("world"));
309
310 let val = assert_recv!(rx);
311 assert_eq!("world", val);
312 }
313
314 #[test]
315 #[should_panic]
zero_capacity()316 fn zero_capacity() {
317 broadcast::channel::<()>(0);
318 }
319
320 #[test]
321 #[should_panic]
capacity_too_big()322 fn capacity_too_big() {
323 use std::usize;
324
325 broadcast::channel::<()>(1 + (usize::MAX >> 1));
326 }
327
328 #[test]
panic_in_clone()329 fn panic_in_clone() {
330 use std::panic::{self, AssertUnwindSafe};
331
332 #[derive(Eq, PartialEq, Debug)]
333 struct MyVal(usize);
334
335 impl Clone for MyVal {
336 fn clone(&self) -> MyVal {
337 assert_ne!(0, self.0);
338 MyVal(self.0)
339 }
340 }
341
342 let (tx, mut rx) = broadcast::channel(16);
343
344 assert_ok!(tx.send(MyVal(0)));
345 assert_ok!(tx.send(MyVal(1)));
346
347 let res = panic::catch_unwind(AssertUnwindSafe(|| {
348 let _ = rx.try_recv();
349 }));
350
351 assert_err!(res);
352
353 let val = assert_recv!(rx);
354 assert_eq!(val, MyVal(1));
355 }
356
357 #[test]
dropping_tx_notifies_rx()358 fn dropping_tx_notifies_rx() {
359 let (tx, mut rx1) = broadcast::channel::<()>(16);
360 let mut rx2 = tx.subscribe();
361
362 let tx2 = tx.clone();
363
364 let mut recv1 = task::spawn(rx1.recv());
365 let mut recv2 = task::spawn(rx2.recv());
366
367 assert_pending!(recv1.poll());
368 assert_pending!(recv2.poll());
369
370 drop(tx);
371
372 assert_pending!(recv1.poll());
373 assert_pending!(recv2.poll());
374
375 drop(tx2);
376
377 assert!(recv1.is_woken());
378 assert!(recv2.is_woken());
379
380 let err = assert_ready_err!(recv1.poll());
381 assert!(is_closed(err));
382
383 let err = assert_ready_err!(recv2.poll());
384 assert!(is_closed(err));
385 }
386
387 #[test]
unconsumed_messages_are_dropped()388 fn unconsumed_messages_are_dropped() {
389 let (tx, rx) = broadcast::channel(16);
390
391 let msg = Arc::new(());
392
393 assert_ok!(tx.send(msg.clone()));
394
395 assert_eq!(2, Arc::strong_count(&msg));
396
397 drop(rx);
398
399 assert_eq!(1, Arc::strong_count(&msg));
400 }
401
402 #[test]
single_capacity_recvs()403 fn single_capacity_recvs() {
404 let (tx, mut rx) = broadcast::channel(1);
405
406 assert_ok!(tx.send(1));
407
408 assert_eq!(assert_recv!(rx), 1);
409 assert_empty!(rx);
410 }
411
412 #[test]
single_capacity_recvs_after_drop_1()413 fn single_capacity_recvs_after_drop_1() {
414 let (tx, mut rx) = broadcast::channel(1);
415
416 assert_ok!(tx.send(1));
417 drop(tx);
418
419 assert_eq!(assert_recv!(rx), 1);
420 assert_closed!(rx.try_recv());
421 }
422
423 #[test]
single_capacity_recvs_after_drop_2()424 fn single_capacity_recvs_after_drop_2() {
425 let (tx, mut rx) = broadcast::channel(1);
426
427 assert_ok!(tx.send(1));
428 assert_ok!(tx.send(2));
429 drop(tx);
430
431 assert_lagged!(rx.try_recv(), 1);
432 assert_eq!(assert_recv!(rx), 2);
433 assert_closed!(rx.try_recv());
434 }
435
436 #[test]
dropping_sender_does_not_overwrite()437 fn dropping_sender_does_not_overwrite() {
438 let (tx, mut rx) = broadcast::channel(2);
439
440 assert_ok!(tx.send(1));
441 assert_ok!(tx.send(2));
442 drop(tx);
443
444 assert_eq!(assert_recv!(rx), 1);
445 assert_eq!(assert_recv!(rx), 2);
446 assert_closed!(rx.try_recv());
447 }
448
449 #[test]
lagging_receiver_recovers_after_wrap_closed_1()450 fn lagging_receiver_recovers_after_wrap_closed_1() {
451 let (tx, mut rx) = broadcast::channel(2);
452
453 assert_ok!(tx.send(1));
454 assert_ok!(tx.send(2));
455 assert_ok!(tx.send(3));
456 drop(tx);
457
458 assert_lagged!(rx.try_recv(), 1);
459 assert_eq!(assert_recv!(rx), 2);
460 assert_eq!(assert_recv!(rx), 3);
461 assert_closed!(rx.try_recv());
462 }
463
464 #[test]
lagging_receiver_recovers_after_wrap_closed_2()465 fn lagging_receiver_recovers_after_wrap_closed_2() {
466 let (tx, mut rx) = broadcast::channel(2);
467
468 assert_ok!(tx.send(1));
469 assert_ok!(tx.send(2));
470 assert_ok!(tx.send(3));
471 assert_ok!(tx.send(4));
472 drop(tx);
473
474 assert_lagged!(rx.try_recv(), 2);
475 assert_eq!(assert_recv!(rx), 3);
476 assert_eq!(assert_recv!(rx), 4);
477 assert_closed!(rx.try_recv());
478 }
479
480 #[test]
lagging_receiver_recovers_after_wrap_open()481 fn lagging_receiver_recovers_after_wrap_open() {
482 let (tx, mut rx) = broadcast::channel(2);
483
484 assert_ok!(tx.send(1));
485 assert_ok!(tx.send(2));
486 assert_ok!(tx.send(3));
487
488 assert_lagged!(rx.try_recv(), 1);
489 assert_eq!(assert_recv!(rx), 2);
490 assert_eq!(assert_recv!(rx), 3);
491 assert_empty!(rx);
492 }
493
494 #[tokio::test]
send_recv_stream_ready_deprecated()495 async fn send_recv_stream_ready_deprecated() {
496 use tokio::stream::StreamExt;
497
498 let (tx, mut rx) = broadcast::channel::<i32>(8);
499
500 assert_ok!(tx.send(1));
501 assert_ok!(tx.send(2));
502
503 assert_eq!(Some(Ok(1)), rx.next().await);
504 assert_eq!(Some(Ok(2)), rx.next().await);
505
506 drop(tx);
507
508 assert_eq!(None, rx.next().await);
509 }
510
511 #[tokio::test]
send_recv_stream_pending_deprecated()512 async fn send_recv_stream_pending_deprecated() {
513 use tokio::stream::StreamExt;
514
515 let (tx, mut rx) = broadcast::channel::<i32>(8);
516
517 let mut recv = task::spawn(rx.next());
518 assert_pending!(recv.poll());
519
520 assert_ok!(tx.send(1));
521
522 assert!(recv.is_woken());
523 let val = assert_ready!(recv.poll());
524 assert_eq!(val, Some(Ok(1)));
525 }
526
is_closed(err: broadcast::RecvError) -> bool527 fn is_closed(err: broadcast::RecvError) -> bool {
528 match err {
529 broadcast::RecvError::Closed => true,
530 _ => false,
531 }
532 }
533