1 use futures::channel::mpsc;
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future::{self, FutureExt};
4 use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt};
5 use futures::task::Poll;
6 use futures_test::task::noop_context;
7 
8 #[test]
is_terminated()9 fn is_terminated() {
10     let mut cx = noop_context();
11     let mut tasks = SelectAll::new();
12 
13     assert_eq!(tasks.is_terminated(), false);
14     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
15     assert_eq!(tasks.is_terminated(), true);
16 
17     // Test that the sentinel value doesn't leak
18     assert_eq!(tasks.is_empty(), true);
19     assert_eq!(tasks.len(), 0);
20 
21     tasks.push(future::ready(1).into_stream());
22 
23     assert_eq!(tasks.is_empty(), false);
24     assert_eq!(tasks.len(), 1);
25 
26     assert_eq!(tasks.is_terminated(), false);
27     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
28     assert_eq!(tasks.is_terminated(), false);
29     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
30     assert_eq!(tasks.is_terminated(), true);
31 }
32 
33 #[test]
issue_1626()34 fn issue_1626() {
35     let a = stream::iter(0..=2);
36     let b = stream::iter(10..=14);
37 
38     let mut s = block_on_stream(stream::select_all(vec![a, b]));
39 
40     assert_eq!(s.next(), Some(0));
41     assert_eq!(s.next(), Some(10));
42     assert_eq!(s.next(), Some(1));
43     assert_eq!(s.next(), Some(11));
44     assert_eq!(s.next(), Some(2));
45     assert_eq!(s.next(), Some(12));
46     assert_eq!(s.next(), Some(13));
47     assert_eq!(s.next(), Some(14));
48     assert_eq!(s.next(), None);
49 }
50 
51 #[test]
works_1()52 fn works_1() {
53     let (a_tx, a_rx) = mpsc::unbounded::<u32>();
54     let (b_tx, b_rx) = mpsc::unbounded::<u32>();
55     let (c_tx, c_rx) = mpsc::unbounded::<u32>();
56 
57     let streams = vec![a_rx, b_rx, c_rx];
58 
59     let mut stream = block_on_stream(select_all(streams));
60 
61     b_tx.unbounded_send(99).unwrap();
62     a_tx.unbounded_send(33).unwrap();
63     assert_eq!(Some(33), stream.next());
64     assert_eq!(Some(99), stream.next());
65 
66     b_tx.unbounded_send(99).unwrap();
67     a_tx.unbounded_send(33).unwrap();
68     assert_eq!(Some(33), stream.next());
69     assert_eq!(Some(99), stream.next());
70 
71     c_tx.unbounded_send(42).unwrap();
72     assert_eq!(Some(42), stream.next());
73     a_tx.unbounded_send(43).unwrap();
74     assert_eq!(Some(43), stream.next());
75 
76     drop((a_tx, b_tx, c_tx));
77     assert_eq!(None, stream.next());
78 }
79 
80 #[test]
clear()81 fn clear() {
82     let mut tasks =
83         select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]);
84 
85     assert_eq!(block_on(tasks.next()), Some(1));
86     assert!(!tasks.is_empty());
87 
88     tasks.clear();
89     assert!(tasks.is_empty());
90 
91     tasks.push(stream::iter(vec![3].into_iter()));
92     assert!(!tasks.is_empty());
93 
94     tasks.clear();
95     assert!(tasks.is_empty());
96 
97     assert_eq!(block_on(tasks.next()), None);
98     assert!(tasks.is_terminated());
99     tasks.clear();
100     assert!(!tasks.is_terminated());
101 }
102 
103 #[test]
iter_mut()104 fn iter_mut() {
105     let mut stream =
106         vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
107             .into_iter()
108             .collect::<SelectAll<_>>();
109 
110     let mut iter = stream.iter_mut();
111     assert_eq!(iter.len(), 3);
112     assert!(iter.next().is_some());
113     assert_eq!(iter.len(), 2);
114     assert!(iter.next().is_some());
115     assert_eq!(iter.len(), 1);
116     assert!(iter.next().is_some());
117     assert_eq!(iter.len(), 0);
118     assert!(iter.next().is_none());
119 
120     let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
121         .into_iter()
122         .collect::<SelectAll<_>>();
123 
124     assert_eq!(stream.len(), 3);
125     assert_eq!(block_on(stream.next()), Some(1));
126     assert_eq!(stream.len(), 2);
127     let mut iter = stream.iter_mut();
128     assert_eq!(iter.len(), 2);
129     assert!(iter.next().is_some());
130     assert_eq!(iter.len(), 1);
131     assert!(iter.next().is_some());
132     assert_eq!(iter.len(), 0);
133     assert!(iter.next().is_none());
134 
135     assert_eq!(block_on(stream.next()), Some(2));
136     assert_eq!(stream.len(), 2);
137     assert_eq!(block_on(stream.next()), None);
138     let mut iter = stream.iter_mut();
139     assert_eq!(iter.len(), 0);
140     assert!(iter.next().is_none());
141 }
142 
143 #[test]
iter()144 fn iter() {
145     let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
146         .into_iter()
147         .collect::<SelectAll<_>>();
148 
149     let mut iter = stream.iter();
150     assert_eq!(iter.len(), 3);
151     assert!(iter.next().is_some());
152     assert_eq!(iter.len(), 2);
153     assert!(iter.next().is_some());
154     assert_eq!(iter.len(), 1);
155     assert!(iter.next().is_some());
156     assert_eq!(iter.len(), 0);
157     assert!(iter.next().is_none());
158 
159     let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
160         .into_iter()
161         .collect::<SelectAll<_>>();
162 
163     assert_eq!(stream.len(), 3);
164     assert_eq!(block_on(stream.next()), Some(1));
165     assert_eq!(stream.len(), 2);
166     let mut iter = stream.iter();
167     assert_eq!(iter.len(), 2);
168     assert!(iter.next().is_some());
169     assert_eq!(iter.len(), 1);
170     assert!(iter.next().is_some());
171     assert_eq!(iter.len(), 0);
172     assert!(iter.next().is_none());
173 
174     assert_eq!(block_on(stream.next()), Some(2));
175     assert_eq!(stream.len(), 2);
176     assert_eq!(block_on(stream.next()), None);
177     let mut iter = stream.iter();
178     assert_eq!(iter.len(), 0);
179     assert!(iter.next().is_none());
180 }
181 
182 #[test]
into_iter()183 fn into_iter() {
184     let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
185         .into_iter()
186         .collect::<SelectAll<_>>();
187 
188     let mut iter = stream.into_iter();
189     assert_eq!(iter.len(), 3);
190     assert!(iter.next().is_some());
191     assert_eq!(iter.len(), 2);
192     assert!(iter.next().is_some());
193     assert_eq!(iter.len(), 1);
194     assert!(iter.next().is_some());
195     assert_eq!(iter.len(), 0);
196     assert!(iter.next().is_none());
197 }
198