1 #[macro_use]
2 extern crate futures;
3 
4 use futures::prelude::*;
5 use futures::executor;
6 use futures::future::{err, ok};
7 use futures::stream::{empty, iter_ok, poll_fn, Peekable};
8 use futures::sync::oneshot;
9 use futures::sync::mpsc;
10 
11 mod support;
12 use support::*;
13 
14 pub struct Iter<I> {
15     iter: I,
16 }
17 
iter<J, T, E>(i: J) -> Iter<J::IntoIter> where J: IntoIterator<Item=Result<T, E>>,18 pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
19     where J: IntoIterator<Item=Result<T, E>>,
20 {
21     Iter {
22         iter: i.into_iter(),
23     }
24 }
25 
26 impl<I, T, E> Stream for Iter<I>
27     where I: Iterator<Item=Result<T, E>>,
28 {
29     type Item = T;
30     type Error = E;
31 
poll(&mut self) -> Poll<Option<T>, E>32     fn poll(&mut self) -> Poll<Option<T>, E> {
33         match self.iter.next() {
34             Some(Ok(e)) => Ok(Async::Ready(Some(e))),
35             Some(Err(e)) => Err(e),
36             None => Ok(Async::Ready(None)),
37         }
38     }
39 }
40 
list() -> Box<Stream<Item=i32, Error=u32> + Send>41 fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
42     let (tx, rx) = mpsc::channel(1);
43     tx.send(Ok(1))
44       .and_then(|tx| tx.send(Ok(2)))
45       .and_then(|tx| tx.send(Ok(3)))
46       .forget();
47     Box::new(rx.then(|r| r.unwrap()))
48 }
49 
err_list() -> Box<Stream<Item=i32, Error=u32> + Send>50 fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> {
51     let (tx, rx) = mpsc::channel(1);
52     tx.send(Ok(1))
53       .and_then(|tx| tx.send(Ok(2)))
54       .and_then(|tx| tx.send(Err(3)))
55       .forget();
56     Box::new(rx.then(|r| r.unwrap()))
57 }
58 
59 #[test]
map()60 fn map() {
61     assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4]));
62 }
63 
64 #[test]
map_err()65 fn map_err() {
66     assert_done(|| err_list().map_err(|a| a + 1).collect(), Err(4));
67 }
68 
69 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
70 struct FromErrTest(u32);
71 
72 impl From<u32> for FromErrTest {
from(i: u32) -> FromErrTest73     fn from(i: u32) -> FromErrTest {
74         FromErrTest(i)
75     }
76 }
77 
78 #[test]
from_err()79 fn from_err() {
80     assert_done(|| err_list().from_err().collect(), Err(FromErrTest(3)));
81 }
82 
83 #[test]
fold()84 fn fold() {
85     assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6));
86     assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3));
87 }
88 
89 #[test]
filter()90 fn filter() {
91     assert_done(|| list().filter(|a| *a % 2 == 0).collect(), Ok(vec![2]));
92 }
93 
94 #[test]
filter_map()95 fn filter_map() {
96     assert_done(|| list().filter_map(|x| {
97         if x % 2 == 0 {
98             Some(x + 10)
99         } else {
100             None
101         }
102     }).collect(), Ok(vec![12]));
103 }
104 
105 #[test]
and_then()106 fn and_then() {
107     assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
108     assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect(),
109                 Err(1));
110 }
111 
112 #[test]
then()113 fn then() {
114     assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
115 
116 }
117 
118 #[test]
or_else()119 fn or_else() {
120     assert_done(|| err_list().or_else(|a| {
121         ok::<i32, u32>(a as i32)
122     }).collect(), Ok(vec![1, 2, 3]));
123 }
124 
125 #[test]
flatten()126 fn flatten() {
127     assert_done(|| list().map(|_| list()).flatten().collect(),
128                 Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
129 
130 }
131 
132 #[test]
skip()133 fn skip() {
134     assert_done(|| list().skip(2).collect(), Ok(vec![3]));
135 }
136 
137 #[test]
skip_passes_errors_through()138 fn skip_passes_errors_through() {
139     let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)])
140         .skip(1)
141         .wait();
142     assert_eq!(s.next(), Some(Err(1)));
143     assert_eq!(s.next(), Some(Err(2)));
144     assert_eq!(s.next(), Some(Ok(4)));
145     assert_eq!(s.next(), Some(Ok(5)));
146     assert_eq!(s.next(), None);
147 }
148 
149 #[test]
skip_while()150 fn skip_while() {
151     assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
152                 Ok(vec![2, 3]));
153 }
154 #[test]
take()155 fn take() {
156     assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
157 }
158 
159 #[test]
take_while()160 fn take_while() {
161     assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
162                 Ok(vec![1, 2]));
163 }
164 
165 #[test]
take_passes_errors_through()166 fn take_passes_errors_through() {
167     let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)])
168         .take(1)
169         .wait();
170     assert_eq!(s.next(), Some(Err(1)));
171     assert_eq!(s.next(), Some(Err(2)));
172     assert_eq!(s.next(), Some(Ok(3)));
173     assert_eq!(s.next(), None);
174 
175     let mut s = iter(vec![Ok(1), Err(2)]).take(1).wait();
176     assert_eq!(s.next(), Some(Ok(1)));
177     assert_eq!(s.next(), None);
178 }
179 
180 #[test]
peekable()181 fn peekable() {
182     assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3]));
183 }
184 
185 #[test]
fuse()186 fn fuse() {
187     let mut stream = list().fuse().wait();
188     assert_eq!(stream.next(), Some(Ok(1)));
189     assert_eq!(stream.next(), Some(Ok(2)));
190     assert_eq!(stream.next(), Some(Ok(3)));
191     assert_eq!(stream.next(), None);
192     assert_eq!(stream.next(), None);
193     assert_eq!(stream.next(), None);
194 }
195 
196 #[test]
buffered()197 fn buffered() {
198     let (tx, rx) = mpsc::channel(1);
199     let (a, b) = oneshot::channel::<u32>();
200     let (c, d) = oneshot::channel::<u32>();
201 
202     tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
203       .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
204       .forget();
205 
206     let mut rx = rx.buffered(2);
207     sassert_empty(&mut rx);
208     c.send(3).unwrap();
209     sassert_empty(&mut rx);
210     a.send(5).unwrap();
211     let mut rx = rx.wait();
212     assert_eq!(rx.next(), Some(Ok(5)));
213     assert_eq!(rx.next(), Some(Ok(3)));
214     assert_eq!(rx.next(), None);
215 
216     let (tx, rx) = mpsc::channel(1);
217     let (a, b) = oneshot::channel::<u32>();
218     let (c, d) = oneshot::channel::<u32>();
219 
220     tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
221       .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
222       .forget();
223 
224     let mut rx = rx.buffered(1);
225     sassert_empty(&mut rx);
226     c.send(3).unwrap();
227     sassert_empty(&mut rx);
228     a.send(5).unwrap();
229     let mut rx = rx.wait();
230     assert_eq!(rx.next(), Some(Ok(5)));
231     assert_eq!(rx.next(), Some(Ok(3)));
232     assert_eq!(rx.next(), None);
233 }
234 
235 #[test]
unordered()236 fn unordered() {
237     let (tx, rx) = mpsc::channel(1);
238     let (a, b) = oneshot::channel::<u32>();
239     let (c, d) = oneshot::channel::<u32>();
240 
241     tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
242       .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
243       .forget();
244 
245     let mut rx = rx.buffer_unordered(2);
246     sassert_empty(&mut rx);
247     let mut rx = rx.wait();
248     c.send(3).unwrap();
249     assert_eq!(rx.next(), Some(Ok(3)));
250     a.send(5).unwrap();
251     assert_eq!(rx.next(), Some(Ok(5)));
252     assert_eq!(rx.next(), None);
253 
254     let (tx, rx) = mpsc::channel(1);
255     let (a, b) = oneshot::channel::<u32>();
256     let (c, d) = oneshot::channel::<u32>();
257 
258     tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
259       .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
260       .forget();
261 
262     // We don't even get to see `c` until `a` completes.
263     let mut rx = rx.buffer_unordered(1);
264     sassert_empty(&mut rx);
265     c.send(3).unwrap();
266     sassert_empty(&mut rx);
267     a.send(5).unwrap();
268     let mut rx = rx.wait();
269     assert_eq!(rx.next(), Some(Ok(5)));
270     assert_eq!(rx.next(), Some(Ok(3)));
271     assert_eq!(rx.next(), None);
272 }
273 
274 #[test]
zip()275 fn zip() {
276     assert_done(|| list().zip(list()).collect(),
277                 Ok(vec![(1, 1), (2, 2), (3, 3)]));
278     assert_done(|| list().zip(list().take(2)).collect(),
279                 Ok(vec![(1, 1), (2, 2)]));
280     assert_done(|| list().take(2).zip(list()).collect(),
281                 Ok(vec![(1, 1), (2, 2)]));
282     assert_done(|| err_list().zip(list()).collect(), Err(3));
283     assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
284                 Ok(vec![(1, 2), (2, 3), (3, 4)]));
285 }
286 
287 #[test]
peek()288 fn peek() {
289     struct Peek {
290         inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>>
291     }
292 
293     impl Future for Peek {
294         type Item = ();
295         type Error = u32;
296 
297         fn poll(&mut self) -> Poll<(), u32> {
298             {
299                 let res = try_ready!(self.inner.peek());
300                 assert_eq!(res, Some(&1));
301             }
302             assert_eq!(self.inner.peek().unwrap(), Some(&1).into());
303             assert_eq!(self.inner.poll().unwrap(), Some(1).into());
304             Ok(().into())
305         }
306     }
307 
308     Peek {
309         inner: list().peekable(),
310     }.wait().unwrap()
311 }
312 
313 #[test]
wait()314 fn wait() {
315     assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(),
316                Ok(vec![1, 2, 3]));
317 }
318 
319 #[test]
chunks()320 fn chunks() {
321     assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]]));
322     assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]]));
323     assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]]));
324     let mut list = executor::spawn(err_list().chunks(3));
325     let i = list.wait_stream().unwrap().unwrap();
326     assert_eq!(i, vec![1, 2]);
327     let i = list.wait_stream().unwrap().unwrap_err();
328     assert_eq!(i, 3);
329 }
330 
331 #[test]
332 #[should_panic]
chunks_panic_on_cap_zero()333 fn chunks_panic_on_cap_zero() {
334     let _ = list().chunks(0);
335 }
336 
337 #[test]
select()338 fn select() {
339     let a = iter_ok::<_, u32>(vec![1, 2, 3]);
340     let b = iter_ok(vec![4, 5, 6]);
341     assert_done(|| a.select(b).collect(), Ok(vec![1, 4, 2, 5, 3, 6]));
342 
343     let a = iter_ok::<_, u32>(vec![1, 2, 3]);
344     let b = iter_ok(vec![1, 2]);
345     assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
346 
347     let a = iter_ok(vec![1, 2]);
348     let b = iter_ok::<_, u32>(vec![1, 2, 3]);
349     assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
350 }
351 
352 #[test]
forward()353 fn forward() {
354     let v = Vec::new();
355     let v = iter_ok::<_, ()>(vec![0, 1]).forward(v).wait().unwrap().1;
356     assert_eq!(v, vec![0, 1]);
357 
358     let v = iter_ok::<_, ()>(vec![2, 3]).forward(v).wait().unwrap().1;
359     assert_eq!(v, vec![0, 1, 2, 3]);
360 
361     assert_done(move || iter_ok(vec![4, 5]).forward(v).map(|(_, s)| s),
362                 Ok::<_, ()>(vec![0, 1, 2, 3, 4, 5]));
363 }
364 
365 #[test]
366 #[allow(deprecated)]
concat()367 fn concat() {
368     let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
369     assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
370 
371     let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
372     assert_done(move || b.concat(), Err(()));
373 }
374 
375 #[test]
concat2()376 fn concat2() {
377     let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
378     assert_done(move || a.concat2(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
379 
380     let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
381     assert_done(move || b.concat2(), Err(()));
382 
383     let c = empty::<Vec<()>, ()>();
384     assert_done(move || c.concat2(), Ok(vec![]))
385 }
386 
387 #[test]
stream_poll_fn()388 fn stream_poll_fn() {
389     let mut counter = 5usize;
390 
391     let read_stream = poll_fn(move || -> Poll<Option<usize>, std::io::Error> {
392         if counter == 0 {
393             return Ok(Async::Ready(None));
394         }
395         counter -= 1;
396         Ok(Async::Ready(Some(counter)))
397     });
398 
399     assert_eq!(read_stream.wait().count(), 5);
400 }
401 
402 #[test]
inspect()403 fn inspect() {
404     let mut seen = vec![];
405     assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3]));
406     assert_eq!(seen, [1, 2, 3]);
407 }
408 
409 #[test]
inspect_err()410 fn inspect_err() {
411     let mut seen = vec![];
412     assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect(), Err(3));
413     assert_eq!(seen, [3]);
414 }
415