1 use super::{Inner, Runtime};
2 
3 use reactor::Reactor;
4 
5 use std::io;
6 use std::sync::Mutex;
7 use std::time::Duration;
8 use std::any::Any;
9 
10 use num_cpus;
11 use tokio_reactor;
12 use tokio_threadpool::Builder as ThreadPoolBuilder;
13 use tokio_timer::clock::{self, Clock};
14 use tokio_timer::timer::{self, Timer};
15 
16 #[cfg(feature = "experimental-tracing")]
17 use tracing_core as trace;
18 
19 /// Builds Tokio Runtime with custom configuration values.
20 ///
21 /// Methods can be chained in order to set the configuration values. The
22 /// Runtime is constructed by calling [`build`].
23 ///
24 /// New instances of `Builder` are obtained via [`Builder::new`].
25 ///
26 /// See function level documentation for details on the various configuration
27 /// settings.
28 ///
29 /// [`build`]: #method.build
30 /// [`Builder::new`]: #method.new
31 ///
32 /// # Examples
33 ///
34 /// ```
35 /// extern crate tokio;
36 /// extern crate tokio_timer;
37 ///
38 /// use std::time::Duration;
39 ///
40 /// use tokio::runtime::Builder;
41 /// use tokio_timer::clock::Clock;
42 ///
43 /// fn main() {
44 ///     // build Runtime
45 ///     let mut runtime = Builder::new()
46 ///         .blocking_threads(4)
47 ///         .clock(Clock::system())
48 ///         .core_threads(4)
49 ///         .keep_alive(Some(Duration::from_secs(60)))
50 ///         .name_prefix("my-custom-name-")
51 ///         .stack_size(3 * 1024 * 1024)
52 ///         .build()
53 ///         .unwrap();
54 ///
55 ///     // use runtime ...
56 /// }
57 /// ```
58 #[derive(Debug)]
59 pub struct Builder {
60     /// Thread pool specific builder
61     threadpool_builder: ThreadPoolBuilder,
62 
63     /// The number of worker threads
64     core_threads: usize,
65 
66     /// The clock to use
67     clock: Clock,
68 }
69 
70 impl Builder {
71     /// Returns a new runtime builder initialized with default configuration
72     /// values.
73     ///
74     /// Configuration methods can be chained on the return value.
new() -> Builder75     pub fn new() -> Builder {
76         let core_threads = num_cpus::get().max(1);
77 
78         let mut threadpool_builder = ThreadPoolBuilder::new();
79         threadpool_builder.name_prefix("tokio-runtime-worker-");
80         threadpool_builder.pool_size(core_threads);
81 
82         Builder {
83             threadpool_builder,
84             core_threads,
85             clock: Clock::new(),
86         }
87     }
88 
89     /// Set the `Clock` instance that will be used by the runtime.
clock(&mut self, clock: Clock) -> &mut Self90     pub fn clock(&mut self, clock: Clock) -> &mut Self {
91         self.clock = clock;
92         self
93     }
94 
95     /// Set builder to set up the thread pool instance.
96     #[deprecated(
97         since = "0.1.9",
98         note = "use the `core_threads`, `blocking_threads`, `name_prefix`, \
99                 `keep_alive`, and `stack_size` functions on `runtime::Builder`, \
100                 instead")]
101     #[doc(hidden)]
threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self102     pub fn threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self {
103         self.threadpool_builder = val;
104         self
105     }
106 
107     /// Sets a callback to handle panics in futures.
108     ///
109     /// The callback is triggered when a panic during a future bubbles up to
110     /// Tokio. By default Tokio catches these panics, and they will be ignored.
111     /// The parameter passed to this callback is the same error value returned
112     /// from `std::panic::catch_unwind()`. To abort the process on panics, use
113     /// `std::panic::resume_unwind()` in this callback as shown below.
114     ///
115     /// # Examples
116     ///
117     /// ```
118     /// # extern crate tokio;
119     /// # extern crate futures;
120     /// # use tokio::runtime;
121     ///
122     /// # pub fn main() {
123     /// let mut rt = runtime::Builder::new()
124     ///     .panic_handler(|err| std::panic::resume_unwind(err))
125     ///     .build()
126     ///     .unwrap();
127     /// # }
128     /// ```
panic_handler<F>(&mut self, f: F) -> &mut Self where F: Fn(Box<Any + Send>) + Send + Sync + 'static,129     pub fn panic_handler<F>(&mut self, f: F) -> &mut Self
130     where
131         F: Fn(Box<Any + Send>) + Send + Sync + 'static,
132     {
133         self.threadpool_builder.panic_handler(f);
134         self
135     }
136 
137 
138     /// Set the maximum number of worker threads for the `Runtime`'s thread pool.
139     ///
140     /// This must be a number between 1 and 32,768 though it is advised to keep
141     /// this value on the smaller side.
142     ///
143     /// The default value is the number of cores available to the system.
144     ///
145     /// # Examples
146     ///
147     /// ```
148     /// # extern crate tokio;
149     /// # extern crate futures;
150     /// # use tokio::runtime;
151     ///
152     /// # pub fn main() {
153     /// let mut rt = runtime::Builder::new()
154     ///     .core_threads(4)
155     ///     .build()
156     ///     .unwrap();
157     /// # }
158     /// ```
core_threads(&mut self, val: usize) -> &mut Self159     pub fn core_threads(&mut self, val: usize) -> &mut Self {
160         self.core_threads = val;
161         self.threadpool_builder.pool_size(val);
162         self
163     }
164 
165     /// Set the maximum number of concurrent blocking sections in the `Runtime`'s
166     /// thread pool.
167     ///
168     /// When the maximum concurrent `blocking` calls is reached, any further
169     /// calls to `blocking` will return `NotReady` and the task is notified once
170     /// previously in-flight calls to `blocking` return.
171     ///
172     /// This must be a number between 1 and 32,768 though it is advised to keep
173     /// this value on the smaller side.
174     ///
175     /// The default value is 100.
176     ///
177     /// # Examples
178     ///
179     /// ```
180     /// # extern crate tokio;
181     /// # extern crate futures;
182     /// # use tokio::runtime;
183     ///
184     /// # pub fn main() {
185     /// let mut rt = runtime::Builder::new()
186     ///     .blocking_threads(200)
187     ///     .build();
188     /// # }
189     /// ```
blocking_threads(&mut self, val: usize) -> &mut Self190     pub fn blocking_threads(&mut self, val: usize) -> &mut Self {
191         self.threadpool_builder.max_blocking(val);
192         self
193     }
194 
195     /// Set the worker thread keep alive duration for threads in the `Runtime`'s
196     /// thread pool.
197     ///
198     /// If set, a worker thread will wait for up to the specified duration for
199     /// work, at which point the thread will shutdown. When work becomes
200     /// available, a new thread will eventually be spawned to replace the one
201     /// that shut down.
202     ///
203     /// When the value is `None`, the thread will wait for work forever.
204     ///
205     /// The default value is `None`.
206     ///
207     /// # Examples
208     ///
209     /// ```
210     /// # extern crate tokio;
211     /// # extern crate futures;
212     /// # use tokio::runtime;
213     /// use std::time::Duration;
214     ///
215     /// # pub fn main() {
216     /// let mut rt = runtime::Builder::new()
217     ///     .keep_alive(Some(Duration::from_secs(30)))
218     ///     .build();
219     /// # }
220     /// ```
keep_alive(&mut self, val: Option<Duration>) -> &mut Self221     pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self {
222         self.threadpool_builder.keep_alive(val);
223         self
224     }
225 
226     /// Set name prefix of threads spawned by the `Runtime`'s thread pool.
227     ///
228     /// Thread name prefix is used for generating thread names. For example, if
229     /// prefix is `my-pool-`, then threads in the pool will get names like
230     /// `my-pool-1` etc.
231     ///
232     /// The default prefix is "tokio-runtime-worker-".
233     ///
234     /// # Examples
235     ///
236     /// ```
237     /// # extern crate tokio;
238     /// # extern crate futures;
239     /// # use tokio::runtime;
240     ///
241     /// # pub fn main() {
242     /// let mut rt = runtime::Builder::new()
243     ///     .name_prefix("my-pool-")
244     ///     .build();
245     /// # }
246     /// ```
name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self247     pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
248         self.threadpool_builder.name_prefix(val);
249         self
250     }
251 
252     /// Set the stack size (in bytes) for worker threads.
253     ///
254     /// The actual stack size may be greater than this value if the platform
255     /// specifies minimal stack size.
256     ///
257     /// The default stack size for spawned threads is 2 MiB, though this
258     /// particular stack size is subject to change in the future.
259     ///
260     /// # Examples
261     ///
262     /// ```
263     /// # extern crate tokio;
264     /// # extern crate futures;
265     /// # use tokio::runtime;
266     ///
267     /// # pub fn main() {
268     /// let mut rt = runtime::Builder::new()
269     ///     .stack_size(32 * 1024)
270     ///     .build();
271     /// # }
272     /// ```
stack_size(&mut self, val: usize) -> &mut Self273     pub fn stack_size(&mut self, val: usize) -> &mut Self {
274         self.threadpool_builder.stack_size(val);
275         self
276     }
277 
278     /// Execute function `f` after each thread is started but before it starts
279     /// doing work.
280     ///
281     /// This is intended for bookkeeping and monitoring use cases.
282     ///
283     /// # Examples
284     ///
285     /// ```
286     /// # extern crate tokio;
287     /// # extern crate futures;
288     /// # use tokio::runtime;
289     ///
290     /// # pub fn main() {
291     /// let thread_pool = runtime::Builder::new()
292     ///     .after_start(|| {
293     ///         println!("thread started");
294     ///     })
295     ///     .build();
296     /// # }
297     /// ```
after_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static298     pub fn after_start<F>(&mut self, f: F) -> &mut Self
299         where F: Fn() + Send + Sync + 'static
300     {
301         self.threadpool_builder.after_start(f);
302         self
303     }
304 
305     /// Execute function `f` before each thread stops.
306     ///
307     /// This is intended for bookkeeping and monitoring use cases.
308     ///
309     /// # Examples
310     ///
311     /// ```
312     /// # extern crate tokio;
313     /// # extern crate futures;
314     /// # use tokio::runtime;
315     ///
316     /// # pub fn main() {
317     /// let thread_pool = runtime::Builder::new()
318     ///     .before_stop(|| {
319     ///         println!("thread stopping");
320     ///     })
321     ///     .build();
322     /// # }
323     /// ```
before_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static324     pub fn before_stop<F>(&mut self, f: F) -> &mut Self
325         where F: Fn() + Send + Sync + 'static
326     {
327         self.threadpool_builder.before_stop(f);
328         self
329     }
330 
331     /// Create the configured `Runtime`.
332     ///
333     /// The returned `ThreadPool` instance is ready to spawn tasks.
334     ///
335     /// # Examples
336     ///
337     /// ```
338     /// # extern crate tokio;
339     /// # use tokio::runtime::Builder;
340     /// # pub fn main() {
341     /// let runtime = Builder::new().build().unwrap();
342     /// // ... call runtime.run(...)
343     /// # let _ = runtime;
344     /// # }
345     /// ```
build(&mut self) -> io::Result<Runtime>346     pub fn build(&mut self) -> io::Result<Runtime> {
347         // TODO(stjepang): Once we remove the `threadpool_builder` method, remove this line too.
348         self.threadpool_builder.pool_size(self.core_threads);
349 
350         let mut reactor_handles = Vec::new();
351         let mut timer_handles = Vec::new();
352         let mut timers = Vec::new();
353 
354         for _ in 0..self.core_threads {
355             // Create a new reactor.
356             let reactor = Reactor::new()?;
357             reactor_handles.push(reactor.handle());
358 
359             // Create a new timer.
360             let timer = Timer::new_with_now(reactor, self.clock.clone());
361             timer_handles.push(timer.handle());
362             timers.push(Mutex::new(Some(timer)));
363         }
364 
365         // Get a handle to the clock for the runtime.
366         let clock = self.clock.clone();
367 
368         // Get the current trace dispatcher.
369         // TODO(eliza): when `tracing-core` is stable enough to take a
370         // public API dependency, we should allow users to set a custom
371         // subscriber for the runtime.
372         #[cfg(feature = "experimental-tracing")]
373         let dispatch = trace::dispatcher::get_default(trace::Dispatch::clone);
374 
375         let pool = self
376             .threadpool_builder
377             .around_worker(move |w, enter| {
378                 let index = w.id().to_usize();
379 
380                 tokio_reactor::with_default(&reactor_handles[index], enter, |enter| {
381                     clock::with_default(&clock, enter, |enter| {
382                         timer::with_default(&timer_handles[index], enter, |_| {
383 
384                             #[cfg(feature = "experimental-tracing")]
385                             trace::dispatcher::with_default(&dispatch, || {
386                                 w.run();
387                             });
388 
389                             #[cfg(not(feature = "experimental-tracing"))]
390                             w.run();
391                         });
392                     })
393                 });
394             })
395             .custom_park(move |worker_id| {
396                 let index = worker_id.to_usize();
397 
398                 timers[index]
399                     .lock()
400                     .unwrap()
401                     .take()
402                     .unwrap()
403             })
404             .build();
405 
406         // To support deprecated `reactor()` function
407         let reactor = Reactor::new()?;
408         let reactor_handle = reactor.handle();
409 
410         Ok(Runtime {
411             inner: Some(Inner {
412                 reactor_handle,
413                 reactor: Mutex::new(Some(reactor)),
414                 pool,
415             }),
416         })
417     }
418 }
419