1 #![feature(test)]
2 
3 extern crate futures;
4 extern crate rand;
5 extern crate test;
6 extern crate threadpool;
7 extern crate tokio_threadpool;
8 
9 const ITER: usize = 1_000;
10 
11 mod blocking {
12     use super::*;
13 
14     use futures::future::*;
15     use tokio_threadpool::{blocking, Builder};
16 
17     #[bench]
cpu_bound(b: &mut test::Bencher)18     fn cpu_bound(b: &mut test::Bencher) {
19         let pool = Builder::new().pool_size(2).max_blocking(20).build();
20 
21         b.iter(|| {
22             let count_down = Arc::new(CountDown::new(::ITER));
23 
24             for _ in 0..::ITER {
25                 let count_down = count_down.clone();
26 
27                 pool.spawn(lazy(move || {
28                     poll_fn(|| blocking(|| perform_complex_computation()).map_err(|_| panic!()))
29                         .and_then(move |_| {
30                             // Do something with the value
31                             count_down.dec();
32                             Ok(())
33                         })
34                 }));
35             }
36 
37             count_down.wait();
38         })
39     }
40 }
41 
42 mod message_passing {
43     use super::*;
44 
45     use futures::future::*;
46     use futures::sync::oneshot;
47     use tokio_threadpool::Builder;
48 
49     #[bench]
cpu_bound(b: &mut test::Bencher)50     fn cpu_bound(b: &mut test::Bencher) {
51         let pool = Builder::new().pool_size(2).max_blocking(20).build();
52 
53         let blocking = threadpool::ThreadPool::new(20);
54 
55         b.iter(|| {
56             let count_down = Arc::new(CountDown::new(::ITER));
57 
58             for _ in 0..::ITER {
59                 let count_down = count_down.clone();
60                 let blocking = blocking.clone();
61 
62                 pool.spawn(lazy(move || {
63                     // Create a channel to receive the return value.
64                     let (tx, rx) = oneshot::channel();
65 
66                     // Spawn a task on the blocking thread pool to process the
67                     // computation.
68                     blocking.execute(move || {
69                         let res = perform_complex_computation();
70                         tx.send(res).unwrap();
71                     });
72 
73                     rx.and_then(move |_| {
74                         count_down.dec();
75                         Ok(())
76                     })
77                     .map_err(|_| panic!())
78                 }));
79             }
80 
81             count_down.wait();
82         })
83     }
84 }
85 
perform_complex_computation() -> usize86 fn perform_complex_computation() -> usize {
87     use rand::*;
88 
89     // Simulate a CPU heavy computation
90     let mut rng = rand::thread_rng();
91     rng.gen()
92 }
93 
94 // Util for waiting until the tasks complete
95 
96 use std::sync::atomic::AtomicUsize;
97 use std::sync::atomic::Ordering::*;
98 use std::sync::*;
99 
100 struct CountDown {
101     rem: AtomicUsize,
102     mutex: Mutex<()>,
103     condvar: Condvar,
104 }
105 
106 impl CountDown {
new(rem: usize) -> Self107     fn new(rem: usize) -> Self {
108         CountDown {
109             rem: AtomicUsize::new(rem),
110             mutex: Mutex::new(()),
111             condvar: Condvar::new(),
112         }
113     }
114 
dec(&self)115     fn dec(&self) {
116         let prev = self.rem.fetch_sub(1, AcqRel);
117 
118         if prev != 1 {
119             return;
120         }
121 
122         let _lock = self.mutex.lock().unwrap();
123         self.condvar.notify_all();
124     }
125 
wait(&self)126     fn wait(&self) {
127         let mut lock = self.mutex.lock().unwrap();
128 
129         loop {
130             if self.rem.load(Acquire) == 0 {
131                 return;
132             }
133 
134             lock = self.condvar.wait(lock).unwrap();
135         }
136     }
137 }
138