1 #![feature(test)]
2 
3 extern crate futures;
4 extern crate tokio_core;
5 
6 #[macro_use]
7 extern crate tokio_io;
8 
9 pub extern crate test;
10 
11 mod prelude {
12     pub use futures::*;
13     pub use tokio_core::reactor::Core;
14     pub use tokio_core::net::{TcpListener, TcpStream};
15     pub use tokio_io::io::read_to_end;
16 
17     pub use test::{self, Bencher};
18     pub use std::thread;
19     pub use std::time::Duration;
20     pub use std::io::{self, Read, Write};
21 }
22 
23 mod connect_churn {
24     use ::prelude::*;
25 
26     const NUM: usize = 300;
27     const CONCURRENT: usize = 8;
28 
29     #[bench]
one_thread(b: &mut Bencher)30     fn one_thread(b: &mut Bencher) {
31         let addr = "127.0.0.1:0".parse().unwrap();
32         let mut core = Core::new().unwrap();
33         let handle = core.handle();
34         let listener = TcpListener::bind(&addr, &handle).unwrap();
35         let addr = listener.local_addr().unwrap();
36 
37         // Spawn a single task that accepts & drops connections
38         handle.spawn(
39             listener.incoming()
40                 .map_err(|e| panic!("server err: {:?}", e))
41                 .for_each(|_| Ok(())));
42 
43         b.iter(move || {
44             let connects = stream::iter((0..NUM).map(|_| {
45                 Ok(TcpStream::connect(&addr, &handle)
46                     .and_then(|sock| {
47                         sock.set_linger(Some(Duration::from_secs(0))).unwrap();
48                         read_to_end(sock, vec![])
49                     }))
50             }));
51 
52             core.run(
53                 connects.buffer_unordered(CONCURRENT)
54                     .map_err(|e| panic!("client err: {:?}", e))
55                     .for_each(|_| Ok(()))).unwrap();
56         });
57     }
58 
n_workers(n: usize, b: &mut Bencher)59     fn n_workers(n: usize, b: &mut Bencher) {
60         let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
61         let (remote_tx, remote_rx) = ::std::sync::mpsc::channel();
62 
63         // Spawn reactor thread
64         thread::spawn(move || {
65             // Create the core
66             let mut core = Core::new().unwrap();
67 
68             // Reactor handles
69             let handle = core.handle();
70             let remote = handle.remote().clone();
71 
72             // Bind the TCP listener
73             let listener = TcpListener::bind(
74                 &"127.0.0.1:0".parse().unwrap(), &handle).unwrap();
75 
76             // Get the address being listened on.
77             let addr = listener.local_addr().unwrap();
78 
79             // Send the remote & address back to the main thread
80             remote_tx.send((remote, addr)).unwrap();
81 
82             // Spawn a single task that accepts & drops connections
83             handle.spawn(
84                 listener.incoming()
85                     .map_err(|e| panic!("server err: {:?}", e))
86                     .for_each(|_| Ok(())));
87 
88             // Run the reactor
89             core.run(shutdown_rx).unwrap();
90         });
91 
92         // Get the remote info
93         let (remote, addr) = remote_rx.recv().unwrap();
94 
95         b.iter(move || {
96             use std::sync::{Barrier, Arc};
97 
98             // Create a barrier to coordinate threads
99             let barrier = Arc::new(Barrier::new(n + 1));
100 
101             // Spawn worker threads
102             let threads: Vec<_> = (0..n).map(|_| {
103                 let barrier = barrier.clone();
104                 let remote = remote.clone();
105                 let addr = addr.clone();
106 
107                 thread::spawn(move || {
108                     let connects = stream::iter((0..(NUM / n)).map(|_| {
109                         // TODO: Once `Handle` is `Send / Sync`, update this
110 
111                         let (socket_tx, socket_rx) = sync::oneshot::channel();
112 
113                         remote.spawn(move |handle| {
114                             TcpStream::connect(&addr, &handle)
115                                 .map_err(|e| panic!("connect err: {:?}", e))
116                                 .then(|res| socket_tx.send(res))
117                                 .map_err(|_| ())
118                         });
119 
120                         Ok(socket_rx
121                             .then(|res| res.unwrap())
122                             .and_then(|sock| {
123                                 sock.set_linger(Some(Duration::from_secs(0))).unwrap();
124                                 read_to_end(sock, vec![])
125                             }))
126                     }));
127 
128                     barrier.wait();
129 
130                     connects.buffer_unordered(CONCURRENT)
131                         .map_err(|e| panic!("client err: {:?}", e))
132                         .for_each(|_| Ok(())).wait().unwrap();
133                 })
134             }).collect();
135 
136             barrier.wait();
137 
138             for th in threads {
139                 th.join().unwrap();
140             }
141         });
142 
143         // Shutdown the reactor
144         shutdown_tx.send(()).unwrap();
145     }
146 
147     #[bench]
two_threads(b: &mut Bencher)148     fn two_threads(b: &mut Bencher) {
149         n_workers(1, b);
150     }
151 
152     #[bench]
multi_threads(b: &mut Bencher)153     fn multi_threads(b: &mut Bencher) {
154         n_workers(4, b);
155     }
156 }
157 
158 mod transfer {
159     use ::prelude::*;
160     use std::{cmp, mem};
161 
162     const MB: usize = 3 * 1024 * 1024;
163 
164     struct Drain {
165         sock: TcpStream,
166         chunk: usize,
167     }
168 
169     impl Future for Drain {
170         type Item = ();
171         type Error = io::Error;
172 
poll(&mut self) -> Poll<(), io::Error>173         fn poll(&mut self) -> Poll<(), io::Error> {
174             let mut buf: [u8; 1024] = unsafe { mem::uninitialized() };
175 
176             loop {
177                 match try_nb!(self.sock.read(&mut buf[..self.chunk])) {
178                     0 => return Ok(Async::Ready(())),
179                     _ => {}
180                 }
181             }
182         }
183     }
184 
185     struct Transfer {
186         sock: TcpStream,
187         rem: usize,
188         chunk: usize,
189     }
190 
191     impl Future for Transfer {
192         type Item = ();
193         type Error = io::Error;
194 
poll(&mut self) -> Poll<(), io::Error>195         fn poll(&mut self) -> Poll<(), io::Error> {
196             while self.rem > 0 {
197                 let len = cmp::min(self.rem, self.chunk);
198                 let buf = &DATA[..len];
199 
200                 let n = try_nb!(self.sock.write(&buf));
201                 self.rem -= n;
202             }
203 
204             Ok(Async::Ready(()))
205         }
206     }
207 
208     static DATA: [u8; 1024] = [0; 1024];
209 
one_thread(b: &mut Bencher, read_size: usize, write_size: usize)210     fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
211         let addr = "127.0.0.1:0".parse().unwrap();
212         let mut core = Core::new().unwrap();
213         let handle = core.handle();
214         let listener = TcpListener::bind(&addr, &handle).unwrap();
215         let addr = listener.local_addr().unwrap();
216 
217         let h2 = handle.clone();
218 
219         // Spawn a single task that accepts & drops connections
220         handle.spawn(
221             listener.incoming()
222                 .map_err(|e| panic!("server err: {:?}", e))
223                 .for_each(move |(sock, _)| {
224                     sock.set_linger(Some(Duration::from_secs(0))).unwrap();
225                     let drain = Drain {
226                         sock: sock,
227                         chunk: read_size,
228                     };
229 
230                     h2.spawn(drain.map_err(|e| panic!("server error: {:?}", e)));
231 
232                     Ok(())
233                 }));
234 
235         b.iter(move || {
236             let client = TcpStream::connect(&addr, &handle)
237                 .and_then(|sock| {
238                     Transfer {
239                         sock: sock,
240                         rem: MB,
241                         chunk: write_size,
242                     }
243                 });
244 
245             core.run(
246                 client.map_err(|e| panic!("client err: {:?}", e))
247                 ).unwrap();
248         });
249     }
250 
cross_thread(b: &mut Bencher, read_size: usize, write_size: usize)251     fn cross_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
252         let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
253         let (remote_tx, remote_rx) = ::std::sync::mpsc::channel();
254 
255         // Spawn reactor thread
256         thread::spawn(move || {
257             // Create the core
258             let mut core = Core::new().unwrap();
259 
260             // Reactor handles
261             let handle = core.handle();
262             let remote = handle.remote().clone();
263 
264             remote_tx.send(remote).unwrap();
265             core.run(shutdown_rx).unwrap();
266         });
267 
268         let remote = remote_rx.recv().unwrap();
269 
270         b.iter(move || {
271             let (server_tx, server_rx) = sync::oneshot::channel();
272             let (client_tx, client_rx) = sync::oneshot::channel();
273 
274             remote.spawn(|handle| {
275                 let sock = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap();
276                 server_tx.send(sock).unwrap();
277                 Ok(())
278             });
279 
280             let remote2 = remote.clone();
281 
282             server_rx.and_then(move |server| {
283                 let addr = server.local_addr().unwrap();
284 
285                 remote2.spawn(move |handle| {
286                     let fut = TcpStream::connect(&addr, &handle);
287                     client_tx.send(fut).ok().unwrap();
288                     Ok(())
289                 });
290 
291                 let client = client_rx
292                     .then(|res| res.unwrap())
293                     .and_then(move |sock| {
294                         Transfer {
295                             sock: sock,
296                             rem: MB,
297                             chunk: write_size,
298                         }
299                     });
300 
301                 let server = server.incoming().into_future()
302                     .map_err(|(e, _)| e)
303                     .and_then(move |(sock, _)| {
304                         let sock = sock.unwrap().0;
305                         sock.set_linger(Some(Duration::from_secs(0))).unwrap();
306 
307                         Drain {
308                             sock: sock,
309                             chunk: read_size,
310                         }
311                     });
312 
313                 client
314                     .join(server)
315                     .then(|res| {
316                         let _ = res.unwrap();
317                         Ok(())
318                     })
319             }).wait().unwrap();
320         });
321 
322         // Shutdown the reactor
323         shutdown_tx.send(()).unwrap();
324     }
325 
326     mod small_chunks {
327         use ::prelude::*;
328 
329         #[bench]
one_thread(b: &mut Bencher)330         fn one_thread(b: &mut Bencher) {
331             super::one_thread(b, 32, 32);
332         }
333 
334         #[bench]
cross_thread(b: &mut Bencher)335         fn cross_thread(b: &mut Bencher) {
336             super::cross_thread(b, 32, 32);
337         }
338     }
339 
340     mod big_chunks {
341         use ::prelude::*;
342 
343         #[bench]
one_thread(b: &mut Bencher)344         fn one_thread(b: &mut Bencher) {
345             super::one_thread(b, 1_024, 1_024);
346         }
347 
348         #[bench]
cross_thread(b: &mut Bencher)349         fn cross_thread(b: &mut Bencher) {
350             super::cross_thread(b, 1_024, 1_024);
351         }
352     }
353 }
354