1 #![feature(test)] 2 3 extern crate futures; 4 extern crate tokio_core; 5 6 #[macro_use] 7 extern crate tokio_io; 8 9 pub extern crate test; 10 11 mod prelude { 12 pub use futures::*; 13 pub use tokio_core::reactor::Core; 14 pub use tokio_core::net::{TcpListener, TcpStream}; 15 pub use tokio_io::io::read_to_end; 16 17 pub use test::{self, Bencher}; 18 pub use std::thread; 19 pub use std::time::Duration; 20 pub use std::io::{self, Read, Write}; 21 } 22 23 mod connect_churn { 24 use ::prelude::*; 25 26 const NUM: usize = 300; 27 const CONCURRENT: usize = 8; 28 29 #[bench] one_thread(b: &mut Bencher)30 fn one_thread(b: &mut Bencher) { 31 let addr = "127.0.0.1:0".parse().unwrap(); 32 let mut core = Core::new().unwrap(); 33 let handle = core.handle(); 34 let listener = TcpListener::bind(&addr, &handle).unwrap(); 35 let addr = listener.local_addr().unwrap(); 36 37 // Spawn a single task that accepts & drops connections 38 handle.spawn( 39 listener.incoming() 40 .map_err(|e| panic!("server err: {:?}", e)) 41 .for_each(|_| Ok(()))); 42 43 b.iter(move || { 44 let connects = stream::iter((0..NUM).map(|_| { 45 Ok(TcpStream::connect(&addr, &handle) 46 .and_then(|sock| { 47 sock.set_linger(Some(Duration::from_secs(0))).unwrap(); 48 read_to_end(sock, vec![]) 49 })) 50 })); 51 52 core.run( 53 connects.buffer_unordered(CONCURRENT) 54 .map_err(|e| panic!("client err: {:?}", e)) 55 .for_each(|_| Ok(()))).unwrap(); 56 }); 57 } 58 n_workers(n: usize, b: &mut Bencher)59 fn n_workers(n: usize, b: &mut Bencher) { 60 let (shutdown_tx, shutdown_rx) = sync::oneshot::channel(); 61 let (remote_tx, remote_rx) = ::std::sync::mpsc::channel(); 62 63 // Spawn reactor thread 64 thread::spawn(move || { 65 // Create the core 66 let mut core = Core::new().unwrap(); 67 68 // Reactor handles 69 let handle = core.handle(); 70 let remote = handle.remote().clone(); 71 72 // Bind the TCP listener 73 let listener = TcpListener::bind( 74 &"127.0.0.1:0".parse().unwrap(), &handle).unwrap(); 75 76 // Get the address being listened on. 77 let addr = listener.local_addr().unwrap(); 78 79 // Send the remote & address back to the main thread 80 remote_tx.send((remote, addr)).unwrap(); 81 82 // Spawn a single task that accepts & drops connections 83 handle.spawn( 84 listener.incoming() 85 .map_err(|e| panic!("server err: {:?}", e)) 86 .for_each(|_| Ok(()))); 87 88 // Run the reactor 89 core.run(shutdown_rx).unwrap(); 90 }); 91 92 // Get the remote info 93 let (remote, addr) = remote_rx.recv().unwrap(); 94 95 b.iter(move || { 96 use std::sync::{Barrier, Arc}; 97 98 // Create a barrier to coordinate threads 99 let barrier = Arc::new(Barrier::new(n + 1)); 100 101 // Spawn worker threads 102 let threads: Vec<_> = (0..n).map(|_| { 103 let barrier = barrier.clone(); 104 let remote = remote.clone(); 105 let addr = addr.clone(); 106 107 thread::spawn(move || { 108 let connects = stream::iter((0..(NUM / n)).map(|_| { 109 // TODO: Once `Handle` is `Send / Sync`, update this 110 111 let (socket_tx, socket_rx) = sync::oneshot::channel(); 112 113 remote.spawn(move |handle| { 114 TcpStream::connect(&addr, &handle) 115 .map_err(|e| panic!("connect err: {:?}", e)) 116 .then(|res| socket_tx.send(res)) 117 .map_err(|_| ()) 118 }); 119 120 Ok(socket_rx 121 .then(|res| res.unwrap()) 122 .and_then(|sock| { 123 sock.set_linger(Some(Duration::from_secs(0))).unwrap(); 124 read_to_end(sock, vec![]) 125 })) 126 })); 127 128 barrier.wait(); 129 130 connects.buffer_unordered(CONCURRENT) 131 .map_err(|e| panic!("client err: {:?}", e)) 132 .for_each(|_| Ok(())).wait().unwrap(); 133 }) 134 }).collect(); 135 136 barrier.wait(); 137 138 for th in threads { 139 th.join().unwrap(); 140 } 141 }); 142 143 // Shutdown the reactor 144 shutdown_tx.send(()).unwrap(); 145 } 146 147 #[bench] two_threads(b: &mut Bencher)148 fn two_threads(b: &mut Bencher) { 149 n_workers(1, b); 150 } 151 152 #[bench] multi_threads(b: &mut Bencher)153 fn multi_threads(b: &mut Bencher) { 154 n_workers(4, b); 155 } 156 } 157 158 mod transfer { 159 use ::prelude::*; 160 use std::{cmp, mem}; 161 162 const MB: usize = 3 * 1024 * 1024; 163 164 struct Drain { 165 sock: TcpStream, 166 chunk: usize, 167 } 168 169 impl Future for Drain { 170 type Item = (); 171 type Error = io::Error; 172 poll(&mut self) -> Poll<(), io::Error>173 fn poll(&mut self) -> Poll<(), io::Error> { 174 let mut buf: [u8; 1024] = unsafe { mem::uninitialized() }; 175 176 loop { 177 match try_nb!(self.sock.read(&mut buf[..self.chunk])) { 178 0 => return Ok(Async::Ready(())), 179 _ => {} 180 } 181 } 182 } 183 } 184 185 struct Transfer { 186 sock: TcpStream, 187 rem: usize, 188 chunk: usize, 189 } 190 191 impl Future for Transfer { 192 type Item = (); 193 type Error = io::Error; 194 poll(&mut self) -> Poll<(), io::Error>195 fn poll(&mut self) -> Poll<(), io::Error> { 196 while self.rem > 0 { 197 let len = cmp::min(self.rem, self.chunk); 198 let buf = &DATA[..len]; 199 200 let n = try_nb!(self.sock.write(&buf)); 201 self.rem -= n; 202 } 203 204 Ok(Async::Ready(())) 205 } 206 } 207 208 static DATA: [u8; 1024] = [0; 1024]; 209 one_thread(b: &mut Bencher, read_size: usize, write_size: usize)210 fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) { 211 let addr = "127.0.0.1:0".parse().unwrap(); 212 let mut core = Core::new().unwrap(); 213 let handle = core.handle(); 214 let listener = TcpListener::bind(&addr, &handle).unwrap(); 215 let addr = listener.local_addr().unwrap(); 216 217 let h2 = handle.clone(); 218 219 // Spawn a single task that accepts & drops connections 220 handle.spawn( 221 listener.incoming() 222 .map_err(|e| panic!("server err: {:?}", e)) 223 .for_each(move |(sock, _)| { 224 sock.set_linger(Some(Duration::from_secs(0))).unwrap(); 225 let drain = Drain { 226 sock: sock, 227 chunk: read_size, 228 }; 229 230 h2.spawn(drain.map_err(|e| panic!("server error: {:?}", e))); 231 232 Ok(()) 233 })); 234 235 b.iter(move || { 236 let client = TcpStream::connect(&addr, &handle) 237 .and_then(|sock| { 238 Transfer { 239 sock: sock, 240 rem: MB, 241 chunk: write_size, 242 } 243 }); 244 245 core.run( 246 client.map_err(|e| panic!("client err: {:?}", e)) 247 ).unwrap(); 248 }); 249 } 250 cross_thread(b: &mut Bencher, read_size: usize, write_size: usize)251 fn cross_thread(b: &mut Bencher, read_size: usize, write_size: usize) { 252 let (shutdown_tx, shutdown_rx) = sync::oneshot::channel(); 253 let (remote_tx, remote_rx) = ::std::sync::mpsc::channel(); 254 255 // Spawn reactor thread 256 thread::spawn(move || { 257 // Create the core 258 let mut core = Core::new().unwrap(); 259 260 // Reactor handles 261 let handle = core.handle(); 262 let remote = handle.remote().clone(); 263 264 remote_tx.send(remote).unwrap(); 265 core.run(shutdown_rx).unwrap(); 266 }); 267 268 let remote = remote_rx.recv().unwrap(); 269 270 b.iter(move || { 271 let (server_tx, server_rx) = sync::oneshot::channel(); 272 let (client_tx, client_rx) = sync::oneshot::channel(); 273 274 remote.spawn(|handle| { 275 let sock = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap(); 276 server_tx.send(sock).unwrap(); 277 Ok(()) 278 }); 279 280 let remote2 = remote.clone(); 281 282 server_rx.and_then(move |server| { 283 let addr = server.local_addr().unwrap(); 284 285 remote2.spawn(move |handle| { 286 let fut = TcpStream::connect(&addr, &handle); 287 client_tx.send(fut).ok().unwrap(); 288 Ok(()) 289 }); 290 291 let client = client_rx 292 .then(|res| res.unwrap()) 293 .and_then(move |sock| { 294 Transfer { 295 sock: sock, 296 rem: MB, 297 chunk: write_size, 298 } 299 }); 300 301 let server = server.incoming().into_future() 302 .map_err(|(e, _)| e) 303 .and_then(move |(sock, _)| { 304 let sock = sock.unwrap().0; 305 sock.set_linger(Some(Duration::from_secs(0))).unwrap(); 306 307 Drain { 308 sock: sock, 309 chunk: read_size, 310 } 311 }); 312 313 client 314 .join(server) 315 .then(|res| { 316 let _ = res.unwrap(); 317 Ok(()) 318 }) 319 }).wait().unwrap(); 320 }); 321 322 // Shutdown the reactor 323 shutdown_tx.send(()).unwrap(); 324 } 325 326 mod small_chunks { 327 use ::prelude::*; 328 329 #[bench] one_thread(b: &mut Bencher)330 fn one_thread(b: &mut Bencher) { 331 super::one_thread(b, 32, 32); 332 } 333 334 #[bench] cross_thread(b: &mut Bencher)335 fn cross_thread(b: &mut Bencher) { 336 super::cross_thread(b, 32, 32); 337 } 338 } 339 340 mod big_chunks { 341 use ::prelude::*; 342 343 #[bench] one_thread(b: &mut Bencher)344 fn one_thread(b: &mut Bencher) { 345 super::one_thread(b, 1_024, 1_024); 346 } 347 348 #[bench] cross_thread(b: &mut Bencher)349 fn cross_thread(b: &mut Bencher) { 350 super::cross_thread(b, 1_024, 1_024); 351 } 352 } 353 } 354