1 //! A "tiny" example of HTTP request/response handling using just tokio-core
2 //!
3 //! This example is intended for *learning purposes* to see how various pieces
4 //! hook up together and how HTTP can get up and running. Note that this example
5 //! is written with the restriction that it *can't* use any "big" library other
6 //! than tokio-core, if you'd like a "real world" HTTP library you likely want a
7 //! crate like Hyper.
8 //!
9 //! Code here is based on the `echo-threads` example and implements two paths,
10 //! the `/plaintext` and `/json` routes to respond with some text and json,
11 //! respectively. By default this will run I/O on all the cores your system has
12 //! available, and it doesn't support HTTP request bodies.
13 
14 extern crate bytes;
15 extern crate futures;
16 extern crate http;
17 extern crate httparse;
18 extern crate num_cpus;
19 #[macro_use]
20 extern crate serde_derive;
21 extern crate serde_json;
22 extern crate time;
23 extern crate tokio_core;
24 extern crate tokio_io;
25 
26 use std::env;
27 use std::fmt;
28 use std::io;
29 use std::net::{self, SocketAddr};
30 use std::thread;
31 
32 use bytes::BytesMut;
33 use futures::future;
34 use futures::sync::mpsc;
35 use futures::{Stream, Future, Sink};
36 use http::{Request, Response, StatusCode};
37 use http::header::HeaderValue;
38 use tokio_core::net::TcpStream;
39 use tokio_core::reactor::Core;
40 use tokio_io::codec::{Encoder, Decoder};
41 use tokio_io::{AsyncRead};
42 
main()43 fn main() {
44     // Parse the arguments, bind the TCP socket we'll be listening to, spin up
45     // our worker threads, and start shipping sockets to those worker threads.
46     let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
47     let addr = addr.parse::<SocketAddr>().unwrap();
48     let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
49         .unwrap_or(num_cpus::get());
50 
51     let listener = net::TcpListener::bind(&addr).expect("failed to bind");
52     println!("Listening on: {}", addr);
53 
54     let mut channels = Vec::new();
55     for _ in 0..num_threads {
56         let (tx, rx) = mpsc::unbounded();
57         channels.push(tx);
58         thread::spawn(|| worker(rx));
59     }
60     let mut next = 0;
61     for socket in listener.incoming() {
62         if let Ok(socket) = socket {
63             channels[next].unbounded_send(socket).expect("worker thread died");
64             next = (next + 1) % channels.len();
65         }
66     }
67 }
68 
worker(rx: mpsc::UnboundedReceiver<net::TcpStream>)69 fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
70     let mut core = Core::new().unwrap();
71     let handle = core.handle();
72 
73     let done = rx.for_each(move |socket| {
74         // Associate each socket we get with our local event loop, and then use
75         // the codec support in the tokio-io crate to deal with discrete
76         // request/response types instead of bytes. Here we'll just use our
77         // framing defined below and then use the `send_all` helper to send the
78         // responses back on the socket after we've processed them
79         let socket = future::result(TcpStream::from_stream(socket, &handle));
80         let req = socket.and_then(|socket| {
81             let (tx, rx) = socket.framed(Http).split();
82             tx.send_all(rx.and_then(respond))
83         });
84         handle.spawn(req.then(move |result| {
85             drop(result);
86             Ok(())
87         }));
88         Ok(())
89     });
90     core.run(done).unwrap();
91 }
92 
93 /// "Server logic" is implemented in this function.
94 ///
95 /// This function is a map from and HTTP request to a future of a response and
96 /// represents the various handling a server might do. Currently the contents
97 /// here are pretty uninteresting.
respond(req: Request<()>) -> Box<Future<Item = Response<String>, Error = io::Error>>98 fn respond(req: Request<()>)
99     -> Box<Future<Item = Response<String>, Error = io::Error>>
100 {
101     let mut ret = Response::builder();
102     let body = match req.uri().path() {
103         "/plaintext" => {
104             ret.header("Content-Type", "text/plain");
105             "Hello, World!".to_string()
106         }
107         "/json" => {
108             ret.header("Content-Type", "application/json");
109 
110             #[derive(Serialize)]
111             struct Message {
112                 message: &'static str,
113             }
114             serde_json::to_string(&Message { message: "Hello, World!" })
115                 .unwrap()
116         }
117         _ => {
118             ret.status(StatusCode::NOT_FOUND);
119             String::new()
120         }
121     };
122     Box::new(future::ok(ret.body(body).unwrap()))
123 }
124 
125 struct Http;
126 
127 /// Implementation of encoding an HTTP response into a `BytesMut`, basically
128 /// just writing out an HTTP/1.1 response.
129 impl Encoder for Http {
130     type Item = Response<String>;
131     type Error = io::Error;
132 
encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()>133     fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> {
134         use std::fmt::Write;
135 
136         write!(BytesWrite(dst), "\
137             HTTP/1.1 {}\r\n\
138             Server: Example\r\n\
139             Content-Length: {}\r\n\
140             Date: {}\r\n\
141         ", item.status(), item.body().len(), date::now()).unwrap();
142 
143         for (k, v) in item.headers() {
144             dst.extend_from_slice(k.as_str().as_bytes());
145             dst.extend_from_slice(b": ");
146             dst.extend_from_slice(v.as_bytes());
147             dst.extend_from_slice(b"\r\n");
148         }
149 
150         dst.extend_from_slice(b"\r\n");
151         dst.extend_from_slice(item.body().as_bytes());
152 
153         return Ok(());
154 
155         // Right now `write!` on `Vec<u8>` goes through io::Write and is not
156         // super speedy, so inline a less-crufty implementation here which
157         // doesn't go through io::Error.
158         struct BytesWrite<'a>(&'a mut BytesMut);
159 
160         impl<'a> fmt::Write for BytesWrite<'a> {
161             fn write_str(&mut self, s: &str) -> fmt::Result {
162                 self.0.extend_from_slice(s.as_bytes());
163                 Ok(())
164             }
165 
166             fn write_fmt(&mut self, args: fmt::Arguments) -> fmt::Result {
167                 fmt::write(self, args)
168             }
169         }
170     }
171 }
172 
173 /// Implementation of decoding an HTTP request from the bytes we've read so far.
174 /// This leverages the `httparse` crate to do the actual parsing and then we use
175 /// that information to construct an instance of a `http::Request` object,
176 /// trying to avoid allocations where possible.
177 impl Decoder for Http {
178     type Item = Request<()>;
179     type Error = io::Error;
180 
decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>>181     fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>> {
182         // TODO: we should grow this headers array if parsing fails and asks
183         //       for more headers
184         let mut headers = [None; 16];
185         let (method, path, version, amt) = {
186             let mut parsed_headers = [httparse::EMPTY_HEADER; 16];
187             let mut r = httparse::Request::new(&mut parsed_headers);
188             let status = r.parse(src).map_err(|e| {
189                 let msg = format!("failed to parse http request: {:?}", e);
190                 io::Error::new(io::ErrorKind::Other, msg)
191             })?;
192 
193             let amt = match status {
194                 httparse::Status::Complete(amt) => amt,
195                 httparse::Status::Partial => return Ok(None),
196             };
197 
198             let toslice = |a: &[u8]| {
199                 let start = a.as_ptr() as usize - src.as_ptr() as usize;
200                 assert!(start < src.len());
201                 (start, start + a.len())
202             };
203 
204             for (i, header) in r.headers.iter().enumerate() {
205                 let k = toslice(header.name.as_bytes());
206                 let v = toslice(header.value);
207                 headers[i] = Some((k, v));
208             }
209 
210             (toslice(r.method.unwrap().as_bytes()),
211              toslice(r.path.unwrap().as_bytes()),
212              r.version.unwrap(),
213              amt)
214         };
215         if version != 1 {
216             return Err(io::Error::new(io::ErrorKind::Other, "only HTTP/1.1 accepted"))
217         }
218         let data = src.split_to(amt).freeze();
219         let mut ret = Request::builder();
220         ret.method(&data[method.0..method.1]);
221         ret.uri(data.slice(path.0, path.1));
222         ret.version(http::Version::HTTP_11);
223         for header in headers.iter() {
224             let (k, v) = match *header {
225                 Some((ref k, ref v)) => (k, v),
226                 None => break,
227             };
228             let value = unsafe {
229                 HeaderValue::from_shared_unchecked(data.slice(v.0, v.1))
230             };
231             ret.header(&data[k.0..k.1], value);
232         }
233 
234         let req = ret.body(()).map_err(|e| {
235             io::Error::new(io::ErrorKind::Other, e)
236         })?;
237         Ok(Some(req))
238     }
239 }
240 
241 mod date {
242     use std::cell::RefCell;
243     use std::fmt::{self, Write};
244     use std::str;
245 
246     use time::{self, Duration};
247 
248     pub struct Now(());
249 
250     /// Returns a struct, which when formatted, renders an appropriate `Date`
251     /// header value.
now() -> Now252     pub fn now() -> Now {
253         Now(())
254     }
255 
256     // Gee Alex, doesn't this seem like premature optimization. Well you see
257     // there Billy, you're absolutely correct! If your server is *bottlenecked*
258     // on rendering the `Date` header, well then boy do I have news for you, you
259     // don't need this optimization.
260     //
261     // In all seriousness, though, a simple "hello world" benchmark which just
262     // sends back literally "hello world" with standard headers actually is
263     // bottlenecked on rendering a date into a byte buffer. Since it was at the
264     // top of a profile, and this was done for some competitive benchmarks, this
265     // module was written.
266     //
267     // Just to be clear, though, I was not intending on doing this because it
268     // really does seem kinda absurd, but it was done by someone else [1], so I
269     // blame them!  :)
270     //
271     // [1]: https://github.com/rapidoid/rapidoid/blob/f1c55c0555007e986b5d069fe1086e6d09933f7b/rapidoid-commons/src/main/java/org/rapidoid/commons/Dates.java#L48-L66
272 
273     struct LastRenderedNow {
274         bytes: [u8; 128],
275         amt: usize,
276         next_update: time::Timespec,
277     }
278 
279     thread_local!(static LAST: RefCell<LastRenderedNow> = RefCell::new(LastRenderedNow {
280         bytes: [0; 128],
281         amt: 0,
282         next_update: time::Timespec::new(0, 0),
283     }));
284 
285     impl fmt::Display for Now {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result286         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
287             LAST.with(|cache| {
288                 let mut cache = cache.borrow_mut();
289                 let now = time::get_time();
290                 if now > cache.next_update {
291                     cache.update(now);
292                 }
293                 f.write_str(cache.buffer())
294             })
295         }
296     }
297 
298     impl LastRenderedNow {
buffer(&self) -> &str299         fn buffer(&self) -> &str {
300             str::from_utf8(&self.bytes[..self.amt]).unwrap()
301         }
302 
update(&mut self, now: time::Timespec)303         fn update(&mut self, now: time::Timespec) {
304             self.amt = 0;
305             write!(LocalBuffer(self), "{}", time::at(now).rfc822()).unwrap();
306             self.next_update = now + Duration::seconds(1);
307             self.next_update.nsec = 0;
308         }
309     }
310 
311     struct LocalBuffer<'a>(&'a mut LastRenderedNow);
312 
313     impl<'a> fmt::Write for LocalBuffer<'a> {
write_str(&mut self, s: &str) -> fmt::Result314         fn write_str(&mut self, s: &str) -> fmt::Result {
315             let start = self.0.amt;
316             let end = start + s.len();
317             self.0.bytes[start..end].copy_from_slice(s.as_bytes());
318             self.0.amt += s.len();
319             Ok(())
320         }
321     }
322 }
323