1 //! An example of hooking up stdin/stdout to either a TCP or UDP stream.
2 //!
3 //! This example will connect to a socket address specified in the argument list
4 //! and then forward all data read on stdin to the server, printing out all data
5 //! received on stdout. An optional `--udp` argument can be passed to specify
6 //! that the connection should be made over UDP instead of TCP, translating each
7 //! line entered on stdin to a UDP packet to be sent to the remote address.
8 //!
9 //! Note that this is not currently optimized for performance, especially
10 //! around buffer management. Rather it's intended to show an example of
11 //! working with a client.
12 //!
13 //! This example can be quite useful when interacting with the other examples in
14 //! this repository! Many of them recommend running this as a simple "hook up
15 //! stdin/stdout to a server" to get up and running.
16
17 #![deny(warnings)]
18
19 extern crate bytes;
20 extern crate futures;
21 extern crate tokio;
22 extern crate tokio_io;
23
24 use std::env;
25 use std::io::{self, Read, Write};
26 use std::net::SocketAddr;
27 use std::thread;
28
29 use futures::sync::mpsc;
30 use tokio::prelude::*;
31
main() -> Result<(), Box<std::error::Error>>32 fn main() -> Result<(), Box<std::error::Error>> {
33 // Determine if we're going to run in TCP or UDP mode
34 let mut args = env::args().skip(1).collect::<Vec<_>>();
35 let tcp = match args.iter().position(|a| a == "--udp") {
36 Some(i) => {
37 args.remove(i);
38 false
39 }
40 None => true,
41 };
42
43 // Parse what address we're going to connect to
44 let addr = match args.first() {
45 Some(addr) => addr,
46 None => Err("this program requires at least one argument")?,
47 };
48 let addr = addr.parse::<SocketAddr>()?;
49
50 // Right now Tokio doesn't support a handle to stdin running on the event
51 // loop, so we farm out that work to a separate thread. This thread will
52 // read data (with blocking I/O) from stdin and then send it to the event
53 // loop over a standard futures channel.
54 let (stdin_tx, stdin_rx) = mpsc::channel(0);
55 thread::spawn(|| read_stdin(stdin_tx));
56 let stdin_rx = stdin_rx.map_err(|_| panic!("errors not possible on rx"));
57
58 // Now that we've got our stdin read we either set up our TCP connection or
59 // our UDP connection to get a stream of bytes we're going to emit to
60 // stdout.
61 let stdout = if tcp {
62 tcp::connect(&addr, Box::new(stdin_rx))?
63 } else {
64 udp::connect(&addr, Box::new(stdin_rx))?
65 };
66
67 // And now with our stream of bytes to write to stdout, we execute that in
68 // the event loop! Note that this is doing blocking I/O to emit data to
69 // stdout, and in general it's a no-no to do that sort of work on the event
70 // loop. In this case, though, we know it's ok as the event loop isn't
71 // otherwise running anything useful.
72 let mut out = io::stdout();
73
74 tokio::run({
75 stdout
76 .for_each(move |chunk| out.write_all(&chunk))
77 .map_err(|e| println!("error reading stdout; error = {:?}", e))
78 });
79 Ok(())
80 }
81
82 mod codec {
83 use bytes::{BufMut, BytesMut};
84 use std::io;
85 use tokio::codec::{Decoder, Encoder};
86
87 /// A simple `Codec` implementation that just ships bytes around.
88 ///
89 /// This type is used for "framing" a TCP/UDP stream of bytes but it's really
90 /// just a convenient method for us to work with streams/sinks for now.
91 /// This'll just take any data read and interpret it as a "frame" and
92 /// conversely just shove data into the output location without looking at
93 /// it.
94 pub struct Bytes;
95
96 impl Decoder for Bytes {
97 type Item = BytesMut;
98 type Error = io::Error;
99
decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>>100 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
101 if buf.len() > 0 {
102 let len = buf.len();
103 Ok(Some(buf.split_to(len)))
104 } else {
105 Ok(None)
106 }
107 }
108 }
109
110 impl Encoder for Bytes {
111 type Item = Vec<u8>;
112 type Error = io::Error;
113
encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()>114 fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
115 buf.put(&data[..]);
116 Ok(())
117 }
118 }
119 }
120
121 mod tcp {
122 use tokio;
123 use tokio::codec::Decoder;
124 use tokio::net::TcpStream;
125 use tokio::prelude::*;
126
127 use bytes::BytesMut;
128 use codec::Bytes;
129
130 use std::error::Error;
131 use std::io;
132 use std::net::SocketAddr;
133
connect( addr: &SocketAddr, stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>134 pub fn connect(
135 addr: &SocketAddr,
136 stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>,
137 ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> {
138 let tcp = TcpStream::connect(addr);
139
140 // After the TCP connection has been established, we set up our client
141 // to start forwarding data.
142 //
143 // First we use the `Io::framed` method with a simple implementation of
144 // a `Codec` (listed below) that just ships bytes around. We then split
145 // that in two to work with the stream and sink separately.
146 //
147 // Half of the work we're going to do is to take all data we receive on
148 // `stdin` and send that along the TCP stream (`sink`). The second half
149 // is to take all the data we receive (`stream`) and then write that to
150 // stdout. We'll be passing this handle back out from this method.
151 //
152 // You'll also note that we *spawn* the work to read stdin and write it
153 // to the TCP stream. This is done to ensure that happens concurrently
154 // with us reading data from the stream.
155 let stream = Box::new(
156 tcp.map(move |stream| {
157 let (sink, stream) = Bytes.framed(stream).split();
158
159 tokio::spawn(stdin.forward(sink).then(|result| {
160 if let Err(e) = result {
161 println!("failed to write to socket: {}", e)
162 }
163 Ok(())
164 }));
165
166 stream
167 })
168 .flatten_stream(),
169 );
170 Ok(stream)
171 }
172 }
173
174 mod udp {
175 use std::error::Error;
176 use std::io;
177 use std::net::SocketAddr;
178
179 use bytes::BytesMut;
180 use tokio;
181 use tokio::net::{UdpFramed, UdpSocket};
182 use tokio::prelude::*;
183
184 use codec::Bytes;
185
connect( &addr: &SocketAddr, stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>, ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>>186 pub fn connect(
187 &addr: &SocketAddr,
188 stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>,
189 ) -> Result<Box<Stream<Item = BytesMut, Error = io::Error> + Send>, Box<Error>> {
190 // We'll bind our UDP socket to a local IP/port, but for now we
191 // basically let the OS pick both of those.
192 let addr_to_bind = if addr.ip().is_ipv4() {
193 "0.0.0.0:0".parse()?
194 } else {
195 "[::]:0".parse()?
196 };
197 let udp = match UdpSocket::bind(&addr_to_bind) {
198 Ok(udp) => udp,
199 Err(_) => Err("failed to bind socket")?,
200 };
201
202 // Like above with TCP we use an instance of `Bytes` codec to transform
203 // this UDP socket into a framed sink/stream which operates over
204 // discrete values. In this case we're working with *pairs* of socket
205 // addresses and byte buffers.
206 let (sink, stream) = UdpFramed::new(udp, Bytes).split();
207
208 // All bytes from `stdin` will go to the `addr` specified in our
209 // argument list. Like with TCP this is spawned concurrently
210 let forward_stdin = stdin
211 .map(move |chunk| (chunk, addr))
212 .forward(sink)
213 .then(|result| {
214 if let Err(e) = result {
215 println!("failed to write to socket: {}", e)
216 }
217 Ok(())
218 });
219
220 // With UDP we could receive data from any source, so filter out
221 // anything coming from a different address
222 let receive = stream.filter_map(move |(chunk, src)| {
223 if src == addr {
224 Some(chunk.into())
225 } else {
226 None
227 }
228 });
229
230 let stream = Box::new(
231 future::lazy(|| {
232 tokio::spawn(forward_stdin);
233 future::ok(receive)
234 })
235 .flatten_stream(),
236 );
237 Ok(stream)
238 }
239 }
240
241 // Our helper method which will read data from stdin and send it along the
242 // sender provided.
read_stdin(mut tx: mpsc::Sender<Vec<u8>>)243 fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) {
244 let mut stdin = io::stdin();
245 loop {
246 let mut buf = vec![0; 1024];
247 let n = match stdin.read(&mut buf) {
248 Err(_) | Ok(0) => break,
249 Ok(n) => n,
250 };
251 buf.truncate(n);
252 tx = match tx.send(buf).wait() {
253 Ok(tx) => tx,
254 Err(_) => break,
255 };
256 }
257 }
258