1 //! An example of hooking up stdin/stdout to either a TCP or UDP stream.
2 //!
3 //! This example will connect to a socket address specified in the argument list
4 //! and then forward all data read on stdin to the server, printing out all data
5 //! received on stdout. An optional `--udp` argument can be passed to specify
6 //! that the connection should be made over UDP instead of TCP, translating each
7 //! line entered on stdin to a UDP packet to be sent to the remote address.
8 //!
9 //! Note that this is not currently optimized for performance, especially
10 //! around buffer management. Rather it's intended to show an example of
11 //! working with a client.
12 //!
13 //! This example can be quite useful when interacting with the other examples in
14 //! this repository! Many of them recommend running this as a simple "hook up
15 //! stdin/stdout to a server" to get up and running.
16 
17 #![deny(warnings)]
18 
19 extern crate bytes;
20 extern crate futures;
21 extern crate tokio;
22 extern crate tokio_io;
23 
24 use std::env;
25 use std::io::{self, Read, Write};
26 use std::net::SocketAddr;
27 use std::thread;
28 
29 use futures::sync::mpsc;
30 use tokio::prelude::*;
31 
main() -> Result<(), Box<std::error::Error>>32 fn main() -> Result<(), Box<std::error::Error>> {
33     // Determine if we're going to run in TCP or UDP mode
34     let mut args = env::args().skip(1).collect::<Vec<_>>();
35     let tcp = match args.iter().position(|a| a == "--udp") {
36         Some(i) => {
37             args.remove(i);
38             false
39         }
40         None => true,
41     };
42 
43     // Parse what address we're going to connect to
44     let addr = match args.first() {
45         Some(addr) => addr,
46         None => Err("this program requires at least one argument")?,
47     };
48     let addr = addr.parse::<SocketAddr>()?;
49 
50     // Right now Tokio doesn't support a handle to stdin running on the event
51     // loop, so we farm out that work to a separate thread. This thread will
52     // read data (with blocking I/O) from stdin and then send it to the event
53     // loop over a standard futures channel.
54     let (stdin_tx, stdin_rx) = mpsc::channel(0);
55     thread::spawn(|| read_stdin(stdin_tx));
56     let stdin_rx = stdin_rx.map_err(|_| panic!("errors not possible on rx"));
57 
58     // Now that we've got our stdin read we either set up our TCP connection or
59     // our UDP connection to get a stream of bytes we're going to emit to
60     // stdout.
61     let stdout = if tcp {
62         tcp::connect(&addr, Box::new(stdin_rx))?
63     } else {
64         udp::connect(&addr, Box::new(stdin_rx))?
65     };
66 
67     // And now with our stream of bytes to write to stdout, we execute that in
68     // the event loop! Note that this is doing blocking I/O to emit data to
69     // stdout, and in general it's a no-no to do that sort of work on the event
70     // loop. In this case, though, we know it's ok as the event loop isn't
71     // otherwise running anything useful.
72     let mut out = io::stdout();
73 
74     tokio::run({
75         stdout
76             .for_each(move |chunk| out.write_all(&chunk))
77             .map_err(|e| println!("error reading stdout; error = {:?}", e))
78     });
79     Ok(())
80 }
81 
82 mod codec {
83     use bytes::{BufMut, BytesMut};
84     use std::io;
85     use tokio::codec::{Decoder, Encoder};
86 
87     /// A simple `Codec` implementation that just ships bytes around.
88     ///
89     /// This type is used for "framing" a TCP/UDP stream of bytes but it's really
90     /// just a convenient method for us to work with streams/sinks for now.
91     /// This'll just take any data read and interpret it as a "frame" and
92     /// conversely just shove data into the output location without looking at
93     /// it.
94     pub struct Bytes;
95 
96     impl Decoder for Bytes {
97         type Item = BytesMut;
98         type Error = io::Error;
99 
decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>>100         fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
101             if buf.len() > 0 {
102                 let len = buf.len();
103                 Ok(Some(buf.split_to(len)))
104             } else {
105                 Ok(None)
106             }
107         }
108     }
109 
110     impl Encoder for Bytes {
111         type Item = Vec<u8>;
112         type Error = io::Error;
113 
encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()>114         fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
115             buf.put(&data[..]);
116             Ok(())
117         }
118     }
119 }
120 
121 mod tcp {
122     use tokio;
123     use tokio::codec::Decoder;
124     use tokio::net::TcpStream;
125     use tokio::prelude::*;
126 
127     use bytes::BytesMut;
128     use codec::Bytes;
129 
130     use std::error::Error;
131     use std::io;
132     use std::net::SocketAddr;
133 
connect( addr: &SocketAddr, stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>134     pub fn connect(
135         addr: &SocketAddr,
136         stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>,
137     ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> {
138         let tcp = TcpStream::connect(addr);
139 
140         // After the TCP connection has been established, we set up our client
141         // to start forwarding data.
142         //
143         // First we use the `Io::framed` method with a simple implementation of
144         // a `Codec` (listed below) that just ships bytes around. We then split
145         // that in two to work with the stream and sink separately.
146         //
147         // Half of the work we're going to do is to take all data we receive on
148         // `stdin` and send that along the TCP stream (`sink`). The second half
149         // is to take all the data we receive (`stream`) and then write that to
150         // stdout. We'll be passing this handle back out from this method.
151         //
152         // You'll also note that we *spawn* the work to read stdin and write it
153         // to the TCP stream. This is done to ensure that happens concurrently
154         // with us reading data from the stream.
155         let stream = Box::new(
156             tcp.map(move |stream| {
157                 let (sink, stream) = Bytes.framed(stream).split();
158 
159                 tokio::spawn(stdin.forward(sink).then(|result| {
160                     if let Err(e) = result {
161                         println!("failed to write to socket: {}", e)
162                     }
163                     Ok(())
164                 }));
165 
166                 stream
167             })
168             .flatten_stream(),
169         );
170         Ok(stream)
171     }
172 }
173 
174 mod udp {
175     use std::error::Error;
176     use std::io;
177     use std::net::SocketAddr;
178 
179     use bytes::BytesMut;
180     use tokio;
181     use tokio::net::{UdpFramed, UdpSocket};
182     use tokio::prelude::*;
183 
184     use codec::Bytes;
185 
connect( &addr: &SocketAddr, stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>186     pub fn connect(
187         &addr: &SocketAddr,
188         stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>,
189     ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> {
190         // We'll bind our UDP socket to a local IP/port, but for now we
191         // basically let the OS pick both of those.
192         let addr_to_bind = if addr.ip().is_ipv4() {
193             "0.0.0.0:0".parse()?
194         } else {
195             "[::]:0".parse()?
196         };
197         let udp = match UdpSocket::bind(&addr_to_bind) {
198             Ok(udp) => udp,
199             Err(_) => Err("failed to bind socket")?,
200         };
201 
202         // Like above with TCP we use an instance of `Bytes` codec to transform
203         // this UDP socket into a framed sink/stream which operates over
204         // discrete values. In this case we're working with *pairs* of socket
205         // addresses and byte buffers.
206         let (sink, stream) = UdpFramed::new(udp, Bytes).split();
207 
208         // All bytes from `stdin` will go to the `addr` specified in our
209         // argument list. Like with TCP this is spawned concurrently
210         let forward_stdin = stdin
211             .map(move |chunk| (chunk, addr))
212             .forward(sink)
213             .then(|result| {
214                 if let Err(e) = result {
215                     println!("failed to write to socket: {}", e)
216                 }
217                 Ok(())
218             });
219 
220         // With UDP we could receive data from any source, so filter out
221         // anything coming from a different address
222         let receive = stream.filter_map(move |(chunk, src)| {
223             if src == addr {
224                 Some(chunk.into())
225             } else {
226                 None
227             }
228         });
229 
230         let stream = Box::new(
231             future::lazy(|| {
232                 tokio::spawn(forward_stdin);
233                 future::ok(receive)
234             })
235             .flatten_stream(),
236         );
237         Ok(stream)
238     }
239 }
240 
241 // Our helper method which will read data from stdin and send it along the
242 // sender provided.
read_stdin(mut tx: mpsc::Sender<Vec<u8>>)243 fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) {
244     let mut stdin = io::stdin();
245     loop {
246         let mut buf = vec![0; 1024];
247         let n = match stdin.read(&mut buf) {
248             Err(_) | Ok(0) => break,
249             Ok(n) => n,
250         };
251         buf.truncate(n);
252         tx = match tx.send(buf).wait() {
253             Ok(tx) => tx,
254             Err(_) => break,
255         };
256     }
257 }
258