1 #![allow(bare_trait_objects, unknown_lints)]
2 
3 #[macro_use]
4 extern crate futures;
5 
6 use futures::prelude::*;
7 use futures::executor;
8 use futures::future::{err, ok};
9 use futures::stream::{empty, iter_ok, poll_fn, Peekable};
10 use futures::sync::oneshot;
11 use futures::sync::mpsc;
12 
13 mod support;
14 use support::*;
15 
16 pub struct Iter<I> {
17     iter: I,
18 }
19 
iter<J, T, E>(i: J) -> Iter<J::IntoIter> where J: IntoIterator<Item=Result<T, E>>,20 pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
21     where J: IntoIterator<Item=Result<T, E>>,
22 {
23     Iter {
24         iter: i.into_iter(),
25     }
26 }
27 
28 impl<I, T, E> Stream for Iter<I>
29     where I: Iterator<Item=Result<T, E>>,
30 {
31     type Item = T;
32     type Error = E;
33 
poll(&mut self) -> Poll<Option<T>, E>34     fn poll(&mut self) -> Poll<Option<T>, E> {
35         match self.iter.next() {
36             Some(Ok(e)) => Ok(Async::Ready(Some(e))),
37             Some(Err(e)) => Err(e),
38             None => Ok(Async::Ready(None)),
39         }
40     }
41 }
42 
list() -> Box<Stream<Item=i32, Error=u32> + Send>43 fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
44     let (tx, rx) = mpsc::channel(1);
45     tx.send(Ok(1))
46       .and_then(|tx| tx.send(Ok(2)))
47       .and_then(|tx| tx.send(Ok(3)))
48       .forget();
49     Box::new(rx.then(|r| r.unwrap()))
50 }
51 
err_list() -> Box<Stream<Item=i32, Error=u32> + Send>52 fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> {
53     let (tx, rx) = mpsc::channel(1);
54     tx.send(Ok(1))
55       .and_then(|tx| tx.send(Ok(2)))
56       .and_then(|tx| tx.send(Err(3)))
57       .forget();
58     Box::new(rx.then(|r| r.unwrap()))
59 }
60 
61 #[test]
map()62 fn map() {
63     assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4]));
64 }
65 
66 #[test]
map_err()67 fn map_err() {
68     assert_done(|| err_list().map_err(|a| a + 1).collect(), Err(4));
69 }
70 
71 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
72 struct FromErrTest(u32);
73 
74 impl From<u32> for FromErrTest {
from(i: u32) -> FromErrTest75     fn from(i: u32) -> FromErrTest {
76         FromErrTest(i)
77     }
78 }
79 
80 #[test]
from_err()81 fn from_err() {
82     assert_done(|| err_list().from_err().collect(), Err(FromErrTest(3)));
83 }
84 
85 #[test]
fold()86 fn fold() {
87     assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6));
88     assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3));
89 }
90 
91 #[test]
filter()92 fn filter() {
93     assert_done(|| list().filter(|a| *a % 2 == 0).collect(), Ok(vec![2]));
94 }
95 
96 #[test]
filter_map()97 fn filter_map() {
98     assert_done(|| list().filter_map(|x| {
99         if x % 2 == 0 {
100             Some(x + 10)
101         } else {
102             None
103         }
104     }).collect(), Ok(vec![12]));
105 }
106 
107 #[test]
and_then()108 fn and_then() {
109     assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
110     assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect(),
111                 Err(1));
112 }
113 
114 #[test]
then()115 fn then() {
116     assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
117 
118 }
119 
120 #[test]
or_else()121 fn or_else() {
122     assert_done(|| err_list().or_else(|a| {
123         ok::<i32, u32>(a as i32)
124     }).collect(), Ok(vec![1, 2, 3]));
125 }
126 
127 #[test]
flatten()128 fn flatten() {
129     assert_done(|| list().map(|_| list()).flatten().collect(),
130                 Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
131 
132 }
133 
134 #[test]
skip()135 fn skip() {
136     assert_done(|| list().skip(2).collect(), Ok(vec![3]));
137 }
138 
139 #[test]
skip_passes_errors_through()140 fn skip_passes_errors_through() {
141     let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)])
142         .skip(1)
143         .wait();
144     assert_eq!(s.next(), Some(Err(1)));
145     assert_eq!(s.next(), Some(Err(2)));
146     assert_eq!(s.next(), Some(Ok(4)));
147     assert_eq!(s.next(), Some(Ok(5)));
148     assert_eq!(s.next(), None);
149 }
150 
151 #[test]
skip_while()152 fn skip_while() {
153     assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
154                 Ok(vec![2, 3]));
155 }
156 #[test]
take()157 fn take() {
158     assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
159 }
160 
161 #[test]
take_while()162 fn take_while() {
163     assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
164                 Ok(vec![1, 2]));
165 }
166 
167 #[test]
take_passes_errors_through()168 fn take_passes_errors_through() {
169     let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)])
170         .take(1)
171         .wait();
172     assert_eq!(s.next(), Some(Err(1)));
173     assert_eq!(s.next(), Some(Err(2)));
174     assert_eq!(s.next(), Some(Ok(3)));
175     assert_eq!(s.next(), None);
176 
177     let mut s = iter(vec![Ok(1), Err(2)]).take(1).wait();
178     assert_eq!(s.next(), Some(Ok(1)));
179     assert_eq!(s.next(), None);
180 }
181 
182 #[test]
peekable()183 fn peekable() {
184     assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3]));
185 }
186 
187 #[test]
fuse()188 fn fuse() {
189     let mut stream = list().fuse().wait();
190     assert_eq!(stream.next(), Some(Ok(1)));
191     assert_eq!(stream.next(), Some(Ok(2)));
192     assert_eq!(stream.next(), Some(Ok(3)));
193     assert_eq!(stream.next(), None);
194     assert_eq!(stream.next(), None);
195     assert_eq!(stream.next(), None);
196 }
197 
198 #[test]
buffered()199 fn buffered() {
200     let (tx, rx) = mpsc::channel(1);
201     let (a, b) = oneshot::channel::<u32>();
202     let (c, d) = oneshot::channel::<u32>();
203 
204     tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
205       .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
206       .forget();
207 
208     let mut rx = rx.buffered(2);
209     sassert_empty(&mut rx);
210     c.send(3).unwrap();
211     sassert_empty(&mut rx);
212     a.send(5).unwrap();
213     let mut rx = rx.wait();
214     assert_eq!(rx.next(), Some(Ok(5)));
215     assert_eq!(rx.next(), Some(Ok(3)));
216     assert_eq!(rx.next(), None);
217 
218     let (tx, rx) = mpsc::channel(1);
219     let (a, b) = oneshot::channel::<u32>();
220     let (c, d) = oneshot::channel::<u32>();
221 
222     tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
223       .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
224       .forget();
225 
226     let mut rx = rx.buffered(1);
227     sassert_empty(&mut rx);
228     c.send(3).unwrap();
229     sassert_empty(&mut rx);
230     a.send(5).unwrap();
231     let mut rx = rx.wait();
232     assert_eq!(rx.next(), Some(Ok(5)));
233     assert_eq!(rx.next(), Some(Ok(3)));
234     assert_eq!(rx.next(), None);
235 }
236 
237 #[test]
unordered()238 fn unordered() {
239     let (tx, rx) = mpsc::channel(1);
240     let (a, b) = oneshot::channel::<u32>();
241     let (c, d) = oneshot::channel::<u32>();
242 
243     tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
244       .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
245       .forget();
246 
247     let mut rx = rx.buffer_unordered(2);
248     sassert_empty(&mut rx);
249     let mut rx = rx.wait();
250     c.send(3).unwrap();
251     assert_eq!(rx.next(), Some(Ok(3)));
252     a.send(5).unwrap();
253     assert_eq!(rx.next(), Some(Ok(5)));
254     assert_eq!(rx.next(), None);
255 
256     let (tx, rx) = mpsc::channel(1);
257     let (a, b) = oneshot::channel::<u32>();
258     let (c, d) = oneshot::channel::<u32>();
259 
260     tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
261       .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
262       .forget();
263 
264     // We don't even get to see `c` until `a` completes.
265     let mut rx = rx.buffer_unordered(1);
266     sassert_empty(&mut rx);
267     c.send(3).unwrap();
268     sassert_empty(&mut rx);
269     a.send(5).unwrap();
270     let mut rx = rx.wait();
271     assert_eq!(rx.next(), Some(Ok(5)));
272     assert_eq!(rx.next(), Some(Ok(3)));
273     assert_eq!(rx.next(), None);
274 }
275 
276 #[test]
zip()277 fn zip() {
278     assert_done(|| list().zip(list()).collect(),
279                 Ok(vec![(1, 1), (2, 2), (3, 3)]));
280     assert_done(|| list().zip(list().take(2)).collect(),
281                 Ok(vec![(1, 1), (2, 2)]));
282     assert_done(|| list().take(2).zip(list()).collect(),
283                 Ok(vec![(1, 1), (2, 2)]));
284     assert_done(|| err_list().zip(list()).collect(), Err(3));
285     assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
286                 Ok(vec![(1, 2), (2, 3), (3, 4)]));
287 }
288 
289 #[test]
peek()290 fn peek() {
291     struct Peek {
292         inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>>
293     }
294 
295     impl Future for Peek {
296         type Item = ();
297         type Error = u32;
298 
299         fn poll(&mut self) -> Poll<(), u32> {
300             {
301                 let res = try_ready!(self.inner.peek());
302                 assert_eq!(res, Some(&1));
303             }
304             assert_eq!(self.inner.peek().unwrap(), Some(&1).into());
305             assert_eq!(self.inner.poll().unwrap(), Some(1).into());
306             Ok(().into())
307         }
308     }
309 
310     Peek {
311         inner: list().peekable(),
312     }.wait().unwrap()
313 }
314 
315 #[test]
wait()316 fn wait() {
317     assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(),
318                Ok(vec![1, 2, 3]));
319 }
320 
321 #[test]
chunks()322 fn chunks() {
323     assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]]));
324     assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]]));
325     assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]]));
326     let mut list = executor::spawn(err_list().chunks(3));
327     let i = list.wait_stream().unwrap().unwrap();
328     assert_eq!(i, vec![1, 2]);
329     let i = list.wait_stream().unwrap().unwrap_err();
330     assert_eq!(i, 3);
331 }
332 
333 #[test]
334 #[should_panic]
chunks_panic_on_cap_zero()335 fn chunks_panic_on_cap_zero() {
336     let _ = list().chunks(0);
337 }
338 
339 #[test]
select()340 fn select() {
341     let a = iter_ok::<_, u32>(vec![1, 2, 3]);
342     let b = iter_ok(vec![4, 5, 6]);
343     assert_done(|| a.select(b).collect(), Ok(vec![1, 4, 2, 5, 3, 6]));
344 
345     let a = iter_ok::<_, u32>(vec![1, 2, 3]);
346     let b = iter_ok(vec![1, 2]);
347     assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
348 
349     let a = iter_ok(vec![1, 2]);
350     let b = iter_ok::<_, u32>(vec![1, 2, 3]);
351     assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
352 }
353 
354 #[test]
forward()355 fn forward() {
356     let v = Vec::new();
357     let v = iter_ok::<_, ()>(vec![0, 1]).forward(v).wait().unwrap().1;
358     assert_eq!(v, vec![0, 1]);
359 
360     let v = iter_ok::<_, ()>(vec![2, 3]).forward(v).wait().unwrap().1;
361     assert_eq!(v, vec![0, 1, 2, 3]);
362 
363     assert_done(move || iter_ok(vec![4, 5]).forward(v).map(|(_, s)| s),
364                 Ok::<_, ()>(vec![0, 1, 2, 3, 4, 5]));
365 }
366 
367 #[test]
368 #[allow(deprecated)]
concat()369 fn concat() {
370     let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
371     assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
372 
373     let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
374     assert_done(move || b.concat(), Err(()));
375 }
376 
377 #[test]
concat2()378 fn concat2() {
379     let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
380     assert_done(move || a.concat2(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
381 
382     let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
383     assert_done(move || b.concat2(), Err(()));
384 
385     let c = empty::<Vec<()>, ()>();
386     assert_done(move || c.concat2(), Ok(vec![]))
387 }
388 
389 #[test]
stream_poll_fn()390 fn stream_poll_fn() {
391     let mut counter = 5usize;
392 
393     let read_stream = poll_fn(move || -> Poll<Option<usize>, std::io::Error> {
394         if counter == 0 {
395             return Ok(Async::Ready(None));
396         }
397         counter -= 1;
398         Ok(Async::Ready(Some(counter)))
399     });
400 
401     assert_eq!(read_stream.wait().count(), 5);
402 }
403 
404 #[test]
inspect()405 fn inspect() {
406     let mut seen = vec![];
407     assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3]));
408     assert_eq!(seen, [1, 2, 3]);
409 }
410 
411 #[test]
inspect_err()412 fn inspect_err() {
413     let mut seen = vec![];
414     assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect(), Err(3));
415     assert_eq!(seen, [3]);
416 }
417