1 //! A "tiny" example of HTTP request/response handling using transports.
2 //!
3 //! This example is intended for *learning purposes* to see how various pieces
4 //! hook up together and how HTTP can get up and running. Note that this example
5 //! is written with the restriction that it *can't* use any "big" library other
6 //! than Tokio, if you'd like a "real world" HTTP library you likely want a
7 //! crate like Hyper.
8 //!
9 //! Code here is based on the `echo-threads` example and implements two paths,
10 //! the `/plaintext` and `/json` routes to respond with some text and json,
11 //! respectively. By default this will run I/O on all the cores your system has
12 //! available, and it doesn't support HTTP request bodies.
13 
14 #![deny(warnings)]
15 
16 extern crate bytes;
17 extern crate http;
18 extern crate httparse;
19 #[macro_use]
20 extern crate serde_derive;
21 extern crate serde_json;
22 extern crate time;
23 extern crate tokio;
24 extern crate tokio_io;
25 
26 use std::net::SocketAddr;
27 use std::{env, fmt, io};
28 
29 use tokio::codec::{Decoder, Encoder};
30 use tokio::net::{TcpListener, TcpStream};
31 use tokio::prelude::*;
32 
33 use bytes::BytesMut;
34 use http::header::HeaderValue;
35 use http::{Request, Response, StatusCode};
36 
main() -> Result<(), Box<std::error::Error>>37 fn main() -> Result<(), Box<std::error::Error>> {
38     // Parse the arguments, bind the TCP socket we'll be listening to, spin up
39     // our worker threads, and start shipping sockets to those worker threads.
40     let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
41     let addr = addr.parse::<SocketAddr>()?;
42 
43     let listener = TcpListener::bind(&addr)?;
44     println!("Listening on: {}", addr);
45 
46     tokio::run({
47         listener
48             .incoming()
49             .map_err(|e| println!("failed to accept socket; error = {:?}", e))
50             .for_each(|socket| {
51                 process(socket);
52                 Ok(())
53             })
54     });
55     Ok(())
56 }
57 
process(socket: TcpStream)58 fn process(socket: TcpStream) {
59     let (tx, rx) =
60         // Frame the socket using the `Http` protocol. This maps the TCP socket
61         // to a Stream + Sink of HTTP frames.
62         Http.framed(socket)
63         // This splits a single `Stream + Sink` value into two separate handles
64         // that can be used independently (even on different tasks or threads).
65         .split();
66 
67     // Map all requests into responses and send them back to the client.
68     let task = tx.send_all(rx.and_then(respond)).then(|res| {
69         if let Err(e) = res {
70             println!("failed to process connection; error = {:?}", e);
71         }
72 
73         Ok(())
74     });
75 
76     // Spawn the task that handles the connection.
77     tokio::spawn(task);
78 }
79 
80 /// "Server logic" is implemented in this function.
81 ///
82 /// This function is a map from and HTTP request to a future of a response and
83 /// represents the various handling a server might do. Currently the contents
84 /// here are pretty uninteresting.
respond(req: Request<()>) -> Box<Future<Item = Response<String>, Error = io::Error> + Send>85 fn respond(req: Request<()>) -> Box<Future<Item = Response<String>, Error = io::Error> + Send> {
86     let f = future::lazy(move || {
87         let mut response = Response::builder();
88         let body = match req.uri().path() {
89             "/plaintext" => {
90                 response.header("Content-Type", "text/plain");
91                 "Hello, World!".to_string()
92             }
93             "/json" => {
94                 response.header("Content-Type", "application/json");
95 
96                 #[derive(Serialize)]
97                 struct Message {
98                     message: &'static str,
99                 }
100                 serde_json::to_string(&Message {
101                     message: "Hello, World!",
102                 })?
103             }
104             _ => {
105                 response.status(StatusCode::NOT_FOUND);
106                 String::new()
107             }
108         };
109         let response = response
110             .body(body)
111             .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
112         Ok(response)
113     });
114 
115     Box::new(f)
116 }
117 
118 struct Http;
119 
120 /// Implementation of encoding an HTTP response into a `BytesMut`, basically
121 /// just writing out an HTTP/1.1 response.
122 impl Encoder for Http {
123     type Item = Response<String>;
124     type Error = io::Error;
125 
encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()>126     fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> {
127         use std::fmt::Write;
128 
129         write!(
130             BytesWrite(dst),
131             "\
132              HTTP/1.1 {}\r\n\
133              Server: Example\r\n\
134              Content-Length: {}\r\n\
135              Date: {}\r\n\
136              ",
137             item.status(),
138             item.body().len(),
139             date::now()
140         )
141         .unwrap();
142 
143         for (k, v) in item.headers() {
144             dst.extend_from_slice(k.as_str().as_bytes());
145             dst.extend_from_slice(b": ");
146             dst.extend_from_slice(v.as_bytes());
147             dst.extend_from_slice(b"\r\n");
148         }
149 
150         dst.extend_from_slice(b"\r\n");
151         dst.extend_from_slice(item.body().as_bytes());
152 
153         return Ok(());
154 
155         // Right now `write!` on `Vec<u8>` goes through io::Write and is not
156         // super speedy, so inline a less-crufty implementation here which
157         // doesn't go through io::Error.
158         struct BytesWrite<'a>(&'a mut BytesMut);
159 
160         impl<'a> fmt::Write for BytesWrite<'a> {
161             fn write_str(&mut self, s: &str) -> fmt::Result {
162                 self.0.extend_from_slice(s.as_bytes());
163                 Ok(())
164             }
165 
166             fn write_fmt(&mut self, args: fmt::Arguments) -> fmt::Result {
167                 fmt::write(self, args)
168             }
169         }
170     }
171 }
172 
173 /// Implementation of decoding an HTTP request from the bytes we've read so far.
174 /// This leverages the `httparse` crate to do the actual parsing and then we use
175 /// that information to construct an instance of a `http::Request` object,
176 /// trying to avoid allocations where possible.
177 impl Decoder for Http {
178     type Item = Request<()>;
179     type Error = io::Error;
180 
decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>>181     fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>> {
182         // TODO: we should grow this headers array if parsing fails and asks
183         //       for more headers
184         let mut headers = [None; 16];
185         let (method, path, version, amt) = {
186             let mut parsed_headers = [httparse::EMPTY_HEADER; 16];
187             let mut r = httparse::Request::new(&mut parsed_headers);
188             let status = r.parse(src).map_err(|e| {
189                 let msg = format!("failed to parse http request: {:?}", e);
190                 io::Error::new(io::ErrorKind::Other, msg)
191             })?;
192 
193             let amt = match status {
194                 httparse::Status::Complete(amt) => amt,
195                 httparse::Status::Partial => return Ok(None),
196             };
197 
198             let toslice = |a: &[u8]| {
199                 let start = a.as_ptr() as usize - src.as_ptr() as usize;
200                 assert!(start < src.len());
201                 (start, start + a.len())
202             };
203 
204             for (i, header) in r.headers.iter().enumerate() {
205                 let k = toslice(header.name.as_bytes());
206                 let v = toslice(header.value);
207                 headers[i] = Some((k, v));
208             }
209 
210             (
211                 toslice(r.method.unwrap().as_bytes()),
212                 toslice(r.path.unwrap().as_bytes()),
213                 r.version.unwrap(),
214                 amt,
215             )
216         };
217         if version != 1 {
218             return Err(io::Error::new(
219                 io::ErrorKind::Other,
220                 "only HTTP/1.1 accepted",
221             ));
222         }
223         let data = src.split_to(amt).freeze();
224         let mut ret = Request::builder();
225         ret.method(&data[method.0..method.1]);
226         ret.uri(data.slice(path.0, path.1));
227         ret.version(http::Version::HTTP_11);
228         for header in headers.iter() {
229             let (k, v) = match *header {
230                 Some((ref k, ref v)) => (k, v),
231                 None => break,
232             };
233             let value = unsafe { HeaderValue::from_shared_unchecked(data.slice(v.0, v.1)) };
234             ret.header(&data[k.0..k.1], value);
235         }
236 
237         let req = ret
238             .body(())
239             .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
240         Ok(Some(req))
241     }
242 }
243 
244 mod date {
245     use std::cell::RefCell;
246     use std::fmt::{self, Write};
247     use std::str;
248 
249     use time::{self, Duration};
250 
251     pub struct Now(());
252 
253     /// Returns a struct, which when formatted, renders an appropriate `Date`
254     /// header value.
now() -> Now255     pub fn now() -> Now {
256         Now(())
257     }
258 
259     // Gee Alex, doesn't this seem like premature optimization. Well you see
260     // there Billy, you're absolutely correct! If your server is *bottlenecked*
261     // on rendering the `Date` header, well then boy do I have news for you, you
262     // don't need this optimization.
263     //
264     // In all seriousness, though, a simple "hello world" benchmark which just
265     // sends back literally "hello world" with standard headers actually is
266     // bottlenecked on rendering a date into a byte buffer. Since it was at the
267     // top of a profile, and this was done for some competitive benchmarks, this
268     // module was written.
269     //
270     // Just to be clear, though, I was not intending on doing this because it
271     // really does seem kinda absurd, but it was done by someone else [1], so I
272     // blame them!  :)
273     //
274     // [1]: https://github.com/rapidoid/rapidoid/blob/f1c55c0555007e986b5d069fe1086e6d09933f7b/rapidoid-commons/src/main/java/org/rapidoid/commons/Dates.java#L48-L66
275 
276     struct LastRenderedNow {
277         bytes: [u8; 128],
278         amt: usize,
279         next_update: time::Timespec,
280     }
281 
282     thread_local!(static LAST: RefCell<LastRenderedNow> = RefCell::new(LastRenderedNow {
283         bytes: [0; 128],
284         amt: 0,
285         next_update: time::Timespec::new(0, 0),
286     }));
287 
288     impl fmt::Display for Now {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result289         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
290             LAST.with(|cache| {
291                 let mut cache = cache.borrow_mut();
292                 let now = time::get_time();
293                 if now >= cache.next_update {
294                     cache.update(now);
295                 }
296                 f.write_str(cache.buffer())
297             })
298         }
299     }
300 
301     impl LastRenderedNow {
buffer(&self) -> &str302         fn buffer(&self) -> &str {
303             str::from_utf8(&self.bytes[..self.amt]).unwrap()
304         }
305 
update(&mut self, now: time::Timespec)306         fn update(&mut self, now: time::Timespec) {
307             self.amt = 0;
308             write!(LocalBuffer(self), "{}", time::at(now).rfc822()).unwrap();
309             self.next_update = now + Duration::seconds(1);
310             self.next_update.nsec = 0;
311         }
312     }
313 
314     struct LocalBuffer<'a>(&'a mut LastRenderedNow);
315 
316     impl<'a> fmt::Write for LocalBuffer<'a> {
write_str(&mut self, s: &str) -> fmt::Result317         fn write_str(&mut self, s: &str) -> fmt::Result {
318             let start = self.0.amt;
319             let end = start + s.len();
320             self.0.bytes[start..end].copy_from_slice(s.as_bytes());
321             self.0.amt += s.len();
322             Ok(())
323         }
324     }
325 }
326