1 #![feature(test)] 2 3 extern crate futures; 4 extern crate rand; 5 extern crate test; 6 extern crate threadpool; 7 extern crate tokio_threadpool; 8 9 const ITER: usize = 1_000; 10 11 mod blocking { 12 use super::*; 13 14 use futures::future::*; 15 use tokio_threadpool::{blocking, Builder}; 16 17 #[bench] cpu_bound(b: &mut test::Bencher)18 fn cpu_bound(b: &mut test::Bencher) { 19 let pool = Builder::new().pool_size(2).max_blocking(20).build(); 20 21 b.iter(|| { 22 let count_down = Arc::new(CountDown::new(::ITER)); 23 24 for _ in 0..::ITER { 25 let count_down = count_down.clone(); 26 27 pool.spawn(lazy(move || { 28 poll_fn(|| blocking(|| perform_complex_computation()).map_err(|_| panic!())) 29 .and_then(move |_| { 30 // Do something with the value 31 count_down.dec(); 32 Ok(()) 33 }) 34 })); 35 } 36 37 count_down.wait(); 38 }) 39 } 40 } 41 42 mod message_passing { 43 use super::*; 44 45 use futures::future::*; 46 use futures::sync::oneshot; 47 use tokio_threadpool::Builder; 48 49 #[bench] cpu_bound(b: &mut test::Bencher)50 fn cpu_bound(b: &mut test::Bencher) { 51 let pool = Builder::new().pool_size(2).max_blocking(20).build(); 52 53 let blocking = threadpool::ThreadPool::new(20); 54 55 b.iter(|| { 56 let count_down = Arc::new(CountDown::new(::ITER)); 57 58 for _ in 0..::ITER { 59 let count_down = count_down.clone(); 60 let blocking = blocking.clone(); 61 62 pool.spawn(lazy(move || { 63 // Create a channel to receive the return value. 64 let (tx, rx) = oneshot::channel(); 65 66 // Spawn a task on the blocking thread pool to process the 67 // computation. 68 blocking.execute(move || { 69 let res = perform_complex_computation(); 70 tx.send(res).unwrap(); 71 }); 72 73 rx.and_then(move |_| { 74 count_down.dec(); 75 Ok(()) 76 }) 77 .map_err(|_| panic!()) 78 })); 79 } 80 81 count_down.wait(); 82 }) 83 } 84 } 85 perform_complex_computation() -> usize86fn perform_complex_computation() -> usize { 87 use rand::*; 88 89 // Simulate a CPU heavy computation 90 let mut rng = rand::thread_rng(); 91 rng.gen() 92 } 93 94 // Util for waiting until the tasks complete 95 96 use std::sync::atomic::AtomicUsize; 97 use std::sync::atomic::Ordering::*; 98 use std::sync::*; 99 100 struct CountDown { 101 rem: AtomicUsize, 102 mutex: Mutex<()>, 103 condvar: Condvar, 104 } 105 106 impl CountDown { new(rem: usize) -> Self107 fn new(rem: usize) -> Self { 108 CountDown { 109 rem: AtomicUsize::new(rem), 110 mutex: Mutex::new(()), 111 condvar: Condvar::new(), 112 } 113 } 114 dec(&self)115 fn dec(&self) { 116 let prev = self.rem.fetch_sub(1, AcqRel); 117 118 if prev != 1 { 119 return; 120 } 121 122 let _lock = self.mutex.lock().unwrap(); 123 self.condvar.notify_all(); 124 } 125 wait(&self)126 fn wait(&self) { 127 let mut lock = self.mutex.lock().unwrap(); 128 129 loop { 130 if self.rem.load(Acquire) == 0 { 131 return; 132 } 133 134 lock = self.condvar.wait(lock).unwrap(); 135 } 136 } 137 } 138