1 #![feature(test)] 2 #![deny(warnings)] 3 4 extern crate futures; 5 extern crate tokio; 6 7 #[macro_use] 8 extern crate tokio_io; 9 10 pub extern crate test; 11 12 mod prelude { 13 pub use futures::*; 14 pub use tokio::reactor::Reactor; 15 pub use tokio::net::{TcpListener, TcpStream}; 16 pub use tokio_io::io::read_to_end; 17 18 pub use test::{self, Bencher}; 19 pub use std::thread; 20 pub use std::time::Duration; 21 pub use std::io::{self, Read, Write}; 22 } 23 24 mod connect_churn { 25 use ::prelude::*; 26 27 const NUM: usize = 300; 28 const CONCURRENT: usize = 8; 29 30 #[bench] one_thread(b: &mut Bencher)31 fn one_thread(b: &mut Bencher) { 32 let addr = "127.0.0.1:0".parse().unwrap(); 33 34 b.iter(move || { 35 let listener = TcpListener::bind(&addr).unwrap(); 36 let addr = listener.local_addr().unwrap(); 37 38 // Spawn a single future that accepts & drops connections 39 let serve_incomings = listener.incoming() 40 .map_err(|e| panic!("server err: {:?}", e)) 41 .for_each(|_| Ok(())); 42 43 let connects = stream::iter_result((0..NUM).map(|_| { 44 Ok(TcpStream::connect(&addr) 45 .and_then(|sock| { 46 sock.set_linger(Some(Duration::from_secs(0))).unwrap(); 47 read_to_end(sock, vec![]) 48 })) 49 })); 50 51 let connects_concurrent = connects.buffer_unordered(CONCURRENT) 52 .map_err(|e| panic!("client err: {:?}", e)) 53 .for_each(|_| Ok(())); 54 55 serve_incomings.select(connects_concurrent) 56 .map(|_| ()).map_err(|_| ()) 57 .wait().unwrap(); 58 }); 59 } 60 n_workers(n: usize, b: &mut Bencher)61 fn n_workers(n: usize, b: &mut Bencher) { 62 let (shutdown_tx, shutdown_rx) = sync::oneshot::channel(); 63 let (addr_tx, addr_rx) = sync::oneshot::channel(); 64 65 // Spawn reactor thread 66 let server_thread = thread::spawn(move || { 67 // Bind the TCP listener 68 let listener = TcpListener::bind( 69 &"127.0.0.1:0".parse().unwrap()).unwrap(); 70 71 // Get the address being listened on. 72 let addr = listener.local_addr().unwrap(); 73 74 // Send the remote & address back to the main thread 75 addr_tx.send(addr).unwrap(); 76 77 // Spawn a single future that accepts & drops connections 78 let serve_incomings = listener.incoming() 79 .map_err(|e| panic!("server err: {:?}", e)) 80 .for_each(|_| Ok(())); 81 82 // Run server 83 serve_incomings.select(shutdown_rx) 84 .map(|_| ()).map_err(|_| ()) 85 .wait().unwrap(); 86 }); 87 88 // Get the bind addr of the server 89 let addr = addr_rx.wait().unwrap(); 90 91 b.iter(move || { 92 use std::sync::{Barrier, Arc}; 93 94 // Create a barrier to coordinate threads 95 let barrier = Arc::new(Barrier::new(n + 1)); 96 97 // Spawn worker threads 98 let threads: Vec<_> = (0..n).map(|_| { 99 let barrier = barrier.clone(); 100 let addr = addr.clone(); 101 102 thread::spawn(move || { 103 let connects = stream::iter_result((0..(NUM / n)).map(|_| { 104 Ok(TcpStream::connect(&addr) 105 .map_err(|e| panic!("connect err: {:?}", e)) 106 .and_then(|sock| { 107 sock.set_linger(Some(Duration::from_secs(0))).unwrap(); 108 read_to_end(sock, vec![]) 109 })) 110 })); 111 112 barrier.wait(); 113 114 connects.buffer_unordered(CONCURRENT) 115 .map_err(|e| panic!("client err: {:?}", e)) 116 .for_each(|_| Ok(())).wait().unwrap(); 117 }) 118 }).collect(); 119 120 barrier.wait(); 121 122 for th in threads { 123 th.join().unwrap(); 124 } 125 }); 126 127 // Shutdown the server 128 shutdown_tx.send(()).unwrap(); 129 server_thread.join().unwrap(); 130 } 131 132 #[bench] two_threads(b: &mut Bencher)133 fn two_threads(b: &mut Bencher) { 134 n_workers(1, b); 135 } 136 137 #[bench] multi_threads(b: &mut Bencher)138 fn multi_threads(b: &mut Bencher) { 139 n_workers(4, b); 140 } 141 } 142 143 mod transfer { 144 use ::prelude::*; 145 use std::{cmp, mem}; 146 147 const MB: usize = 3 * 1024 * 1024; 148 149 struct Drain { 150 sock: TcpStream, 151 chunk: usize, 152 } 153 154 impl Future for Drain { 155 type Item = (); 156 type Error = io::Error; 157 poll(&mut self) -> Poll<(), io::Error>158 fn poll(&mut self) -> Poll<(), io::Error> { 159 let mut buf: [u8; 1024] = unsafe { mem::uninitialized() }; 160 161 loop { 162 match try_nb!(self.sock.read(&mut buf[..self.chunk])) { 163 0 => return Ok(Async::Ready(())), 164 _ => {} 165 } 166 } 167 } 168 } 169 170 struct Transfer { 171 sock: TcpStream, 172 rem: usize, 173 chunk: usize, 174 } 175 176 impl Future for Transfer { 177 type Item = (); 178 type Error = io::Error; 179 poll(&mut self) -> Poll<(), io::Error>180 fn poll(&mut self) -> Poll<(), io::Error> { 181 while self.rem > 0 { 182 let len = cmp::min(self.rem, self.chunk); 183 let buf = &DATA[..len]; 184 185 let n = try_nb!(self.sock.write(&buf)); 186 self.rem -= n; 187 } 188 189 Ok(Async::Ready(())) 190 } 191 } 192 193 static DATA: [u8; 1024] = [0; 1024]; 194 one_thread(b: &mut Bencher, read_size: usize, write_size: usize)195 fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) { 196 let addr = "127.0.0.1:0".parse().unwrap(); 197 198 b.iter(move || { 199 let listener = TcpListener::bind(&addr).unwrap(); 200 let addr = listener.local_addr().unwrap(); 201 202 // Spawn a single future that accepts 1 connection, Drain it and drops 203 let server = listener.incoming() 204 .into_future() // take the first connection 205 .map_err(|(e, _other_incomings)| e) 206 .map(|(connection, _other_incomings)| connection.unwrap()) 207 .and_then(|sock| { 208 sock.set_linger(Some(Duration::from_secs(0))).unwrap(); 209 let drain = Drain { 210 sock: sock, 211 chunk: read_size, 212 }; 213 drain.map(|_| ()).map_err(|e| panic!("server error: {:?}", e)) 214 }) 215 .map_err(|e| panic!("server err: {:?}", e)); 216 217 let client = TcpStream::connect(&addr) 218 .and_then(move |sock| { 219 Transfer { 220 sock: sock, 221 rem: MB, 222 chunk: write_size, 223 } 224 }) 225 .map_err(|e| panic!("client err: {:?}", e)); 226 227 server.join(client).wait().unwrap(); 228 }); 229 } 230 231 mod small_chunks { 232 use ::prelude::*; 233 234 #[bench] one_thread(b: &mut Bencher)235 fn one_thread(b: &mut Bencher) { 236 super::one_thread(b, 32, 32); 237 } 238 } 239 240 mod big_chunks { 241 use ::prelude::*; 242 243 #[bench] one_thread(b: &mut Bencher)244 fn one_thread(b: &mut Bencher) { 245 super::one_thread(b, 1_024, 1_024); 246 } 247 } 248 } 249