1 use tokio::stream::{self, Stream, StreamExt};
2 use tokio::sync::mpsc;
3 use tokio_test::task;
4 use tokio_test::{assert_pending, assert_ready};
5
6 #[tokio::test]
merge_sync_streams()7 async fn merge_sync_streams() {
8 let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5]));
9
10 for i in 0..7 {
11 let rem = 7 - i;
12 assert_eq!(s.size_hint(), (rem, Some(rem)));
13 assert_eq!(Some(i), s.next().await);
14 }
15
16 assert!(s.next().await.is_none());
17 }
18
19 #[tokio::test]
merge_async_streams()20 async fn merge_async_streams() {
21 let (tx1, rx1) = mpsc::unbounded_channel();
22 let (tx2, rx2) = mpsc::unbounded_channel();
23
24 let mut rx = task::spawn(rx1.merge(rx2));
25
26 assert_eq!(rx.size_hint(), (0, None));
27
28 assert_pending!(rx.poll_next());
29
30 tx1.send(1).unwrap();
31
32 assert!(rx.is_woken());
33 assert_eq!(Some(1), assert_ready!(rx.poll_next()));
34
35 assert_pending!(rx.poll_next());
36 tx2.send(2).unwrap();
37
38 assert!(rx.is_woken());
39 assert_eq!(Some(2), assert_ready!(rx.poll_next()));
40 assert_pending!(rx.poll_next());
41
42 drop(tx1);
43 assert!(rx.is_woken());
44 assert_pending!(rx.poll_next());
45
46 tx2.send(3).unwrap();
47 assert!(rx.is_woken());
48 assert_eq!(Some(3), assert_ready!(rx.poll_next()));
49 assert_pending!(rx.poll_next());
50
51 drop(tx2);
52 assert!(rx.is_woken());
53 assert_eq!(None, assert_ready!(rx.poll_next()));
54 }
55
56 #[test]
size_overflow()57 fn size_overflow() {
58 struct Monster;
59
60 impl tokio::stream::Stream for Monster {
61 type Item = ();
62 fn poll_next(
63 self: std::pin::Pin<&mut Self>,
64 _cx: &mut std::task::Context<'_>,
65 ) -> std::task::Poll<Option<()>> {
66 panic!()
67 }
68
69 fn size_hint(&self) -> (usize, Option<usize>) {
70 (usize::max_value(), Some(usize::max_value()))
71 }
72 }
73
74 let m1 = Monster;
75 let m2 = Monster;
76 let m = m1.merge(m2);
77 assert_eq!(m.size_hint(), (usize::max_value(), None));
78 }
79