1 #![feature(test)]
2 #![deny(warnings)]
3 
4 extern crate futures;
5 extern crate tokio;
6 
7 #[macro_use]
8 extern crate tokio_io;
9 
10 pub extern crate test;
11 
12 mod prelude {
13     pub use futures::*;
14     pub use tokio::reactor::Reactor;
15     pub use tokio::net::{TcpListener, TcpStream};
16     pub use tokio_io::io::read_to_end;
17 
18     pub use test::{self, Bencher};
19     pub use std::thread;
20     pub use std::time::Duration;
21     pub use std::io::{self, Read, Write};
22 }
23 
24 mod connect_churn {
25     use ::prelude::*;
26 
27     const NUM: usize = 300;
28     const CONCURRENT: usize = 8;
29 
30     #[bench]
one_thread(b: &mut Bencher)31     fn one_thread(b: &mut Bencher) {
32         let addr = "127.0.0.1:0".parse().unwrap();
33 
34         b.iter(move || {
35             let listener = TcpListener::bind(&addr).unwrap();
36             let addr = listener.local_addr().unwrap();
37 
38             // Spawn a single future that accepts & drops connections
39             let serve_incomings = listener.incoming()
40                 .map_err(|e| panic!("server err: {:?}", e))
41                 .for_each(|_| Ok(()));
42 
43             let connects = stream::iter_result((0..NUM).map(|_| {
44                 Ok(TcpStream::connect(&addr)
45                     .and_then(|sock| {
46                         sock.set_linger(Some(Duration::from_secs(0))).unwrap();
47                         read_to_end(sock, vec![])
48                     }))
49             }));
50 
51             let connects_concurrent = connects.buffer_unordered(CONCURRENT)
52                 .map_err(|e| panic!("client err: {:?}", e))
53                 .for_each(|_| Ok(()));
54 
55             serve_incomings.select(connects_concurrent)
56                 .map(|_| ()).map_err(|_| ())
57                 .wait().unwrap();
58         });
59     }
60 
n_workers(n: usize, b: &mut Bencher)61     fn n_workers(n: usize, b: &mut Bencher) {
62         let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
63         let (addr_tx, addr_rx) = sync::oneshot::channel();
64 
65         // Spawn reactor thread
66         let server_thread = thread::spawn(move || {
67             // Bind the TCP listener
68             let listener = TcpListener::bind(
69                 &"127.0.0.1:0".parse().unwrap()).unwrap();
70 
71             // Get the address being listened on.
72             let addr = listener.local_addr().unwrap();
73 
74             // Send the remote & address back to the main thread
75             addr_tx.send(addr).unwrap();
76 
77             // Spawn a single future that accepts & drops connections
78             let serve_incomings = listener.incoming()
79                 .map_err(|e| panic!("server err: {:?}", e))
80                 .for_each(|_| Ok(()));
81 
82             // Run server
83             serve_incomings.select(shutdown_rx)
84                 .map(|_| ()).map_err(|_| ())
85                 .wait().unwrap();
86         });
87 
88         // Get the bind addr of the server
89         let addr = addr_rx.wait().unwrap();
90 
91         b.iter(move || {
92             use std::sync::{Barrier, Arc};
93 
94             // Create a barrier to coordinate threads
95             let barrier = Arc::new(Barrier::new(n + 1));
96 
97             // Spawn worker threads
98             let threads: Vec<_> = (0..n).map(|_| {
99                 let barrier = barrier.clone();
100                 let addr = addr.clone();
101 
102                 thread::spawn(move || {
103                     let connects = stream::iter_result((0..(NUM / n)).map(|_| {
104                         Ok(TcpStream::connect(&addr)
105                             .map_err(|e| panic!("connect err: {:?}", e))
106                             .and_then(|sock| {
107                                 sock.set_linger(Some(Duration::from_secs(0))).unwrap();
108                                 read_to_end(sock, vec![])
109                             }))
110                     }));
111 
112                     barrier.wait();
113 
114                     connects.buffer_unordered(CONCURRENT)
115                         .map_err(|e| panic!("client err: {:?}", e))
116                         .for_each(|_| Ok(())).wait().unwrap();
117                 })
118             }).collect();
119 
120             barrier.wait();
121 
122             for th in threads {
123                 th.join().unwrap();
124             }
125         });
126 
127         // Shutdown the server
128         shutdown_tx.send(()).unwrap();
129         server_thread.join().unwrap();
130     }
131 
132     #[bench]
two_threads(b: &mut Bencher)133     fn two_threads(b: &mut Bencher) {
134         n_workers(1, b);
135     }
136 
137     #[bench]
multi_threads(b: &mut Bencher)138     fn multi_threads(b: &mut Bencher) {
139         n_workers(4, b);
140     }
141 }
142 
143 mod transfer {
144     use ::prelude::*;
145     use std::{cmp, mem};
146 
147     const MB: usize = 3 * 1024 * 1024;
148 
149     struct Drain {
150         sock: TcpStream,
151         chunk: usize,
152     }
153 
154     impl Future for Drain {
155         type Item = ();
156         type Error = io::Error;
157 
poll(&mut self) -> Poll<(), io::Error>158         fn poll(&mut self) -> Poll<(), io::Error> {
159             let mut buf: [u8; 1024] = unsafe { mem::uninitialized() };
160 
161             loop {
162                 match try_nb!(self.sock.read(&mut buf[..self.chunk])) {
163                     0 => return Ok(Async::Ready(())),
164                     _ => {}
165                 }
166             }
167         }
168     }
169 
170     struct Transfer {
171         sock: TcpStream,
172         rem: usize,
173         chunk: usize,
174     }
175 
176     impl Future for Transfer {
177         type Item = ();
178         type Error = io::Error;
179 
poll(&mut self) -> Poll<(), io::Error>180         fn poll(&mut self) -> Poll<(), io::Error> {
181             while self.rem > 0 {
182                 let len = cmp::min(self.rem, self.chunk);
183                 let buf = &DATA[..len];
184 
185                 let n = try_nb!(self.sock.write(&buf));
186                 self.rem -= n;
187             }
188 
189             Ok(Async::Ready(()))
190         }
191     }
192 
193     static DATA: [u8; 1024] = [0; 1024];
194 
one_thread(b: &mut Bencher, read_size: usize, write_size: usize)195     fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
196         let addr = "127.0.0.1:0".parse().unwrap();
197 
198         b.iter(move || {
199             let listener = TcpListener::bind(&addr).unwrap();
200             let addr = listener.local_addr().unwrap();
201 
202             // Spawn a single future that accepts 1 connection, Drain it and drops
203             let server = listener.incoming()
204                 .into_future() // take the first connection
205                 .map_err(|(e, _other_incomings)| e)
206                 .map(|(connection, _other_incomings)| connection.unwrap())
207                 .and_then(|sock| {
208                     sock.set_linger(Some(Duration::from_secs(0))).unwrap();
209                     let drain = Drain {
210                         sock: sock,
211                         chunk: read_size,
212                     };
213                     drain.map(|_| ()).map_err(|e| panic!("server error: {:?}", e))
214                 })
215                 .map_err(|e| panic!("server err: {:?}", e));
216 
217             let client = TcpStream::connect(&addr)
218                 .and_then(move |sock| {
219                     Transfer {
220                         sock: sock,
221                         rem: MB,
222                         chunk: write_size,
223                     }
224                 })
225                 .map_err(|e| panic!("client err: {:?}", e));
226 
227             server.join(client).wait().unwrap();
228         });
229     }
230 
231     mod small_chunks {
232         use ::prelude::*;
233 
234         #[bench]
one_thread(b: &mut Bencher)235         fn one_thread(b: &mut Bencher) {
236             super::one_thread(b, 32, 32);
237         }
238     }
239 
240     mod big_chunks {
241         use ::prelude::*;
242 
243         #[bench]
one_thread(b: &mut Bencher)244         fn one_thread(b: &mut Bencher) {
245             super::one_thread(b, 1_024, 1_024);
246         }
247     }
248 }
249