1 use super::{Inner, Runtime};
2 
3 use reactor::Reactor;
4 
5 use std::io;
6 use std::sync::Mutex;
7 use std::time::Duration;
8 use std::any::Any;
9 
10 use num_cpus;
11 use tokio_reactor;
12 use tokio_threadpool::Builder as ThreadPoolBuilder;
13 use tokio_timer::clock::{self, Clock};
14 use tokio_timer::timer::{self, Timer};
15 use tokio_trace_core as trace;
16 
17 /// Builds Tokio Runtime with custom configuration values.
18 ///
19 /// Methods can be chained in order to set the configuration values. The
20 /// Runtime is constructed by calling [`build`].
21 ///
22 /// New instances of `Builder` are obtained via [`Builder::new`].
23 ///
24 /// See function level documentation for details on the various configuration
25 /// settings.
26 ///
27 /// [`build`]: #method.build
28 /// [`Builder::new`]: #method.new
29 ///
30 /// # Examples
31 ///
32 /// ```
33 /// extern crate tokio;
34 /// extern crate tokio_timer;
35 ///
36 /// use std::time::Duration;
37 ///
38 /// use tokio::runtime::Builder;
39 /// use tokio_timer::clock::Clock;
40 ///
41 /// fn main() {
42 ///     // build Runtime
43 ///     let mut runtime = Builder::new()
44 ///         .blocking_threads(4)
45 ///         .clock(Clock::system())
46 ///         .core_threads(4)
47 ///         .keep_alive(Some(Duration::from_secs(60)))
48 ///         .name_prefix("my-custom-name-")
49 ///         .stack_size(3 * 1024 * 1024)
50 ///         .build()
51 ///         .unwrap();
52 ///
53 ///     // use runtime ...
54 /// }
55 /// ```
56 #[derive(Debug)]
57 pub struct Builder {
58     /// Thread pool specific builder
59     threadpool_builder: ThreadPoolBuilder,
60 
61     /// The number of worker threads
62     core_threads: usize,
63 
64     /// The clock to use
65     clock: Clock,
66 }
67 
68 impl Builder {
69     /// Returns a new runtime builder initialized with default configuration
70     /// values.
71     ///
72     /// Configuration methods can be chained on the return value.
new() -> Builder73     pub fn new() -> Builder {
74         let core_threads = num_cpus::get().max(1);
75 
76         let mut threadpool_builder = ThreadPoolBuilder::new();
77         threadpool_builder.name_prefix("tokio-runtime-worker-");
78         threadpool_builder.pool_size(core_threads);
79 
80         Builder {
81             threadpool_builder,
82             core_threads,
83             clock: Clock::new(),
84         }
85     }
86 
87     /// Set the `Clock` instance that will be used by the runtime.
clock(&mut self, clock: Clock) -> &mut Self88     pub fn clock(&mut self, clock: Clock) -> &mut Self {
89         self.clock = clock;
90         self
91     }
92 
93     /// Set builder to set up the thread pool instance.
94     #[deprecated(
95         since = "0.1.9",
96         note = "use the `core_threads`, `blocking_threads`, `name_prefix`, \
97                 `keep_alive`, and `stack_size` functions on `runtime::Builder`, \
98                 instead")]
99     #[doc(hidden)]
threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self100     pub fn threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self {
101         self.threadpool_builder = val;
102         self
103     }
104 
105     /// Sets a callback to handle panics in futures.
106     ///
107     /// The callback is triggered when a panic during a future bubbles up to
108     /// Tokio. By default Tokio catches these panics, and they will be ignored.
109     /// The parameter passed to this callback is the same error value returned
110     /// from `std::panic::catch_unwind()`. To abort the process on panics, use
111     /// `std::panic::resume_unwind()` in this callback as shown below.
112     ///
113     /// # Examples
114     ///
115     /// ```
116     /// # extern crate tokio;
117     /// # extern crate futures;
118     /// # use tokio::runtime;
119     ///
120     /// # pub fn main() {
121     /// let mut rt = runtime::Builder::new()
122     ///     .panic_handler(|err| std::panic::resume_unwind(err))
123     ///     .build()
124     ///     .unwrap();
125     /// # }
126     /// ```
panic_handler<F>(&mut self, f: F) -> &mut Self where F: Fn(Box<Any + Send>) + Send + Sync + 'static,127     pub fn panic_handler<F>(&mut self, f: F) -> &mut Self
128     where
129         F: Fn(Box<Any + Send>) + Send + Sync + 'static,
130     {
131         self.threadpool_builder.panic_handler(f);
132         self
133     }
134 
135 
136     /// Set the maximum number of worker threads for the `Runtime`'s thread pool.
137     ///
138     /// This must be a number between 1 and 32,768 though it is advised to keep
139     /// this value on the smaller side.
140     ///
141     /// The default value is the number of cores available to the system.
142     ///
143     /// # Examples
144     ///
145     /// ```
146     /// # extern crate tokio;
147     /// # extern crate futures;
148     /// # use tokio::runtime;
149     ///
150     /// # pub fn main() {
151     /// let mut rt = runtime::Builder::new()
152     ///     .core_threads(4)
153     ///     .build()
154     ///     .unwrap();
155     /// # }
156     /// ```
core_threads(&mut self, val: usize) -> &mut Self157     pub fn core_threads(&mut self, val: usize) -> &mut Self {
158         self.core_threads = val;
159         self.threadpool_builder.pool_size(val);
160         self
161     }
162 
163     /// Set the maximum number of concurrent blocking sections in the `Runtime`'s
164     /// thread pool.
165     ///
166     /// When the maximum concurrent `blocking` calls is reached, any further
167     /// calls to `blocking` will return `NotReady` and the task is notified once
168     /// previously in-flight calls to `blocking` return.
169     ///
170     /// This must be a number between 1 and 32,768 though it is advised to keep
171     /// this value on the smaller side.
172     ///
173     /// The default value is 100.
174     ///
175     /// # Examples
176     ///
177     /// ```
178     /// # extern crate tokio;
179     /// # extern crate futures;
180     /// # use tokio::runtime;
181     ///
182     /// # pub fn main() {
183     /// let mut rt = runtime::Builder::new()
184     ///     .blocking_threads(200)
185     ///     .build();
186     /// # }
187     /// ```
blocking_threads(&mut self, val: usize) -> &mut Self188     pub fn blocking_threads(&mut self, val: usize) -> &mut Self {
189         self.threadpool_builder.max_blocking(val);
190         self
191     }
192 
193     /// Set the worker thread keep alive duration for threads in the `Runtime`'s
194     /// thread pool.
195     ///
196     /// If set, a worker thread will wait for up to the specified duration for
197     /// work, at which point the thread will shutdown. When work becomes
198     /// available, a new thread will eventually be spawned to replace the one
199     /// that shut down.
200     ///
201     /// When the value is `None`, the thread will wait for work forever.
202     ///
203     /// The default value is `None`.
204     ///
205     /// # Examples
206     ///
207     /// ```
208     /// # extern crate tokio;
209     /// # extern crate futures;
210     /// # use tokio::runtime;
211     /// use std::time::Duration;
212     ///
213     /// # pub fn main() {
214     /// let mut rt = runtime::Builder::new()
215     ///     .keep_alive(Some(Duration::from_secs(30)))
216     ///     .build();
217     /// # }
218     /// ```
keep_alive(&mut self, val: Option<Duration>) -> &mut Self219     pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self {
220         self.threadpool_builder.keep_alive(val);
221         self
222     }
223 
224     /// Set name prefix of threads spawned by the `Runtime`'s thread pool.
225     ///
226     /// Thread name prefix is used for generating thread names. For example, if
227     /// prefix is `my-pool-`, then threads in the pool will get names like
228     /// `my-pool-1` etc.
229     ///
230     /// The default prefix is "tokio-runtime-worker-".
231     ///
232     /// # Examples
233     ///
234     /// ```
235     /// # extern crate tokio;
236     /// # extern crate futures;
237     /// # use tokio::runtime;
238     ///
239     /// # pub fn main() {
240     /// let mut rt = runtime::Builder::new()
241     ///     .name_prefix("my-pool-")
242     ///     .build();
243     /// # }
244     /// ```
name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self245     pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
246         self.threadpool_builder.name_prefix(val);
247         self
248     }
249 
250     /// Set the stack size (in bytes) for worker threads.
251     ///
252     /// The actual stack size may be greater than this value if the platform
253     /// specifies minimal stack size.
254     ///
255     /// The default stack size for spawned threads is 2 MiB, though this
256     /// particular stack size is subject to change in the future.
257     ///
258     /// # Examples
259     ///
260     /// ```
261     /// # extern crate tokio;
262     /// # extern crate futures;
263     /// # use tokio::runtime;
264     ///
265     /// # pub fn main() {
266     /// let mut rt = runtime::Builder::new()
267     ///     .stack_size(32 * 1024)
268     ///     .build();
269     /// # }
270     /// ```
stack_size(&mut self, val: usize) -> &mut Self271     pub fn stack_size(&mut self, val: usize) -> &mut Self {
272         self.threadpool_builder.stack_size(val);
273         self
274     }
275 
276     /// Execute function `f` after each thread is started but before it starts
277     /// doing work.
278     ///
279     /// This is intended for bookkeeping and monitoring use cases.
280     ///
281     /// # Examples
282     ///
283     /// ```
284     /// # extern crate tokio;
285     /// # extern crate futures;
286     /// # use tokio::runtime;
287     ///
288     /// # pub fn main() {
289     /// let thread_pool = runtime::Builder::new()
290     ///     .after_start(|| {
291     ///         println!("thread started");
292     ///     })
293     ///     .build();
294     /// # }
295     /// ```
after_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static296     pub fn after_start<F>(&mut self, f: F) -> &mut Self
297         where F: Fn() + Send + Sync + 'static
298     {
299         self.threadpool_builder.after_start(f);
300         self
301     }
302 
303     /// Execute function `f` before each thread stops.
304     ///
305     /// This is intended for bookkeeping and monitoring use cases.
306     ///
307     /// # Examples
308     ///
309     /// ```
310     /// # extern crate tokio;
311     /// # extern crate futures;
312     /// # use tokio::runtime;
313     ///
314     /// # pub fn main() {
315     /// let thread_pool = runtime::Builder::new()
316     ///     .before_stop(|| {
317     ///         println!("thread stopping");
318     ///     })
319     ///     .build();
320     /// # }
321     /// ```
before_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static322     pub fn before_stop<F>(&mut self, f: F) -> &mut Self
323         where F: Fn() + Send + Sync + 'static
324     {
325         self.threadpool_builder.before_stop(f);
326         self
327     }
328 
329     /// Create the configured `Runtime`.
330     ///
331     /// The returned `ThreadPool` instance is ready to spawn tasks.
332     ///
333     /// # Examples
334     ///
335     /// ```
336     /// # extern crate tokio;
337     /// # use tokio::runtime::Builder;
338     /// # pub fn main() {
339     /// let runtime = Builder::new().build().unwrap();
340     /// // ... call runtime.run(...)
341     /// # let _ = runtime;
342     /// # }
343     /// ```
build(&mut self) -> io::Result<Runtime>344     pub fn build(&mut self) -> io::Result<Runtime> {
345         // TODO(stjepang): Once we remove the `threadpool_builder` method, remove this line too.
346         self.threadpool_builder.pool_size(self.core_threads);
347 
348         let mut reactor_handles = Vec::new();
349         let mut timer_handles = Vec::new();
350         let mut timers = Vec::new();
351 
352         for _ in 0..self.core_threads {
353             // Create a new reactor.
354             let reactor = Reactor::new()?;
355             reactor_handles.push(reactor.handle());
356 
357             // Create a new timer.
358             let timer = Timer::new_with_now(reactor, self.clock.clone());
359             timer_handles.push(timer.handle());
360             timers.push(Mutex::new(Some(timer)));
361         }
362 
363         // Get a handle to the clock for the runtime.
364         let clock = self.clock.clone();
365 
366         // Get the current trace dispatcher.
367         // TODO(eliza): when `tokio-trace-core` is stable enough to take a
368         // public API dependency, we should allow users to set a custom
369         // subscriber for the runtime.
370         let dispatch = trace::dispatcher::get_default(trace::Dispatch::clone);
371 
372         let pool = self
373             .threadpool_builder
374             .around_worker(move |w, enter| {
375                 let index = w.id().to_usize();
376 
377                 tokio_reactor::with_default(&reactor_handles[index], enter, |enter| {
378                     clock::with_default(&clock, enter, |enter| {
379                         timer::with_default(&timer_handles[index], enter, |_| {
380                             trace::dispatcher::with_default(&dispatch, || {
381                                 w.run();
382                             })
383                         });
384                     })
385                 });
386             })
387             .custom_park(move |worker_id| {
388                 let index = worker_id.to_usize();
389 
390                 timers[index]
391                     .lock()
392                     .unwrap()
393                     .take()
394                     .unwrap()
395             })
396             .build();
397 
398         // To support deprecated `reactor()` function
399         let reactor = Reactor::new()?;
400         let reactor_handle = reactor.handle();
401 
402         Ok(Runtime {
403             inner: Some(Inner {
404                 reactor_handle,
405                 reactor: Mutex::new(Some(reactor)),
406                 pool,
407             }),
408         })
409     }
410 }
411