1 extern crate futures;
2 #[macro_use]
3 extern crate log;
4 extern crate tokio_io;
5 extern crate tokio_process;
6 
7 use std::io;
8 use std::process::{Command, ExitStatus, Stdio};
9 
10 use futures::future::Future;
11 use futures::stream::{self, Stream};
12 use tokio_io::io::{read_until, write_all};
13 use tokio_process::{Child, CommandExt};
14 
15 mod support;
16 
cat() -> Command17 fn cat() -> Command {
18     let mut cmd = support::cmd("cat");
19     cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
20     cmd
21 }
22 
feed_cat(mut cat: Child, n: usize) -> Box<Future<Item = ExitStatus, Error = io::Error>>23 fn feed_cat(mut cat: Child, n: usize) -> Box<Future<Item = ExitStatus, Error = io::Error>> {
24     let stdin = cat.stdin().take().unwrap();
25     let stdout = cat.stdout().take().unwrap();
26 
27     debug!("starting to feed");
28     // Produce n lines on the child's stdout.
29     let numbers = stream::iter_ok(0..n);
30     let write = numbers
31         .fold(stdin, |stdin, i| {
32             debug!("sending line {} to child", i);
33             write_all(stdin, format!("line {}\n", i).into_bytes()).map(|p| p.0)
34         })
35         .map(|_| ());
36 
37     // Try to read `n + 1` lines, ensuring the last one is empty
38     // (i.e. EOF is reached after `n` lines.
39     let reader = io::BufReader::new(stdout);
40     let expected_numbers = stream::iter_ok(0..=n);
41     let read = expected_numbers.fold((reader, 0), move |(reader, i), _| {
42         let done = i >= n;
43         debug!("starting read from child");
44         read_until(reader, b'\n', Vec::new()).and_then(move |(reader, vec)| {
45             debug!(
46                 "read line {} from child ({} bytes, done: {})",
47                 i,
48                 vec.len(),
49                 done
50             );
51             match (done, vec.len()) {
52                 (false, 0) => Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")),
53                 (true, n) if n != 0 => Err(io::Error::new(io::ErrorKind::Other, "extraneous data")),
54                 _ => {
55                     let s = std::str::from_utf8(&vec).unwrap();
56                     let expected = format!("line {}\n", i);
57                     if done || s == expected {
58                         Ok((reader, i + 1))
59                     } else {
60                         Err(io::Error::new(io::ErrorKind::Other, "unexpected data"))
61                     }
62                 }
63             }
64         })
65     });
66 
67     // Compose reading and writing concurrently.
68     Box::new(write.join(read).and_then(|_| cat))
69 }
70 
71 /// Check for the following properties when feeding stdin and
72 /// consuming stdout of a cat-like process:
73 ///
74 /// - A number of lines that amounts to a number of bytes exceeding a
75 ///   typical OS buffer size can be fed to the child without
76 ///   deadlock. This tests that we also consume the stdout
77 ///   concurrently; otherwise this would deadlock.
78 ///
79 /// - We read the same lines from the child that we fed it.
80 ///
81 /// - The child does produce EOF on stdout after the last line.
82 #[test]
feed_a_lot()83 fn feed_a_lot() {
84     let child = cat().spawn_async().unwrap();
85     let status = support::run_with_timeout(feed_cat(child, 10000)).unwrap();
86     assert_eq!(status.code(), Some(0));
87 }
88 
89 #[test]
wait_with_output_captures()90 fn wait_with_output_captures() {
91     let mut child = cat().spawn_async().unwrap();
92     let stdin = child.stdin().take().unwrap();
93     let out = child.wait_with_output();
94 
95     let future = write_all(stdin, b"1234").map(|p| p.1).join(out);
96     let ret = support::run_with_timeout(future).unwrap();
97     let (written, output) = ret;
98 
99     assert!(output.status.success());
100     assert_eq!(output.stdout, written);
101     assert_eq!(output.stderr.len(), 0);
102 }
103 
104 #[test]
status_closes_any_pipes()105 fn status_closes_any_pipes() {
106     // Cat will open a pipe between the parent and child.
107     // If `status_async` doesn't ensure the handles are closed,
108     // we would end up blocking forever (and time out).
109     let child = cat().status_async().expect("failed to spawn child");
110 
111     support::run_with_timeout(child)
112         .expect("time out exceeded! did we get stuck waiting on the child?");
113 }
114