1 use futures::executor::block_on;
2 use futures::future::{Future, FutureExt};
3 use futures::stream::{self, StreamExt, TryStreamExt};
4 use futures::io::{AsyncBufReadExt, Cursor};
5 use futures::task::Poll;
6 use futures_test::io::AsyncReadTestExt;
7 use futures_test::task::noop_context;
8 
9 macro_rules! block_on_next {
10     ($expr:expr) => {
11         block_on($expr.next()).unwrap().unwrap()
12     };
13 }
14 
15 #[test]
lines()16 fn lines() {
17     let buf = Cursor::new(&b"12\r"[..]);
18     let mut s = buf.lines();
19     assert_eq!(block_on_next!(s), "12\r".to_string());
20     assert!(block_on(s.next()).is_none());
21 
22     let buf = Cursor::new(&b"12\r\n\n"[..]);
23     let mut s = buf.lines();
24     assert_eq!(block_on_next!(s), "12".to_string());
25     assert_eq!(block_on_next!(s), "".to_string());
26     assert!(block_on(s.next()).is_none());
27 }
28 
run<F: Future + Unpin>(mut f: F) -> F::Output29 fn run<F: Future + Unpin>(mut f: F) -> F::Output {
30     let mut cx = noop_context();
31     loop {
32         if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
33             return x;
34         }
35     }
36 }
37 
38 macro_rules! run_next {
39     ($expr:expr) => {
40         run($expr.next()).unwrap().unwrap()
41     };
42 }
43 
44 #[test]
maybe_pending()45 fn maybe_pending() {
46     let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]])
47         .map(Ok)
48         .into_async_read()
49         .interleave_pending();
50     let mut s = buf.lines();
51     assert_eq!(run_next!(s), "12\r".to_string());
52     assert!(run(s.next()).is_none());
53 
54     let buf = stream::iter(vec![&b"12"[..], &b"\r\n"[..], &b"\n"[..]])
55         .map(Ok)
56         .into_async_read()
57         .interleave_pending();
58     let mut s = buf.lines();
59     assert_eq!(run_next!(s), "12".to_string());
60     assert_eq!(run_next!(s), "".to_string());
61     assert!(run(s.next()).is_none());
62 }
63