1 use crate::runtime::handle::Handle;
2 use crate::runtime::shell::Shell;
3 use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
4 
5 use std::fmt;
6 #[cfg(not(loom))]
7 use std::sync::Arc;
8 #[cfg(feature = "rt-core")]
9 use std::time::Duration;
10 
11 /// Builds Tokio Runtime with custom configuration values.
12 ///
13 /// Methods can be chained in order to set the configuration values. The
14 /// Runtime is constructed by calling [`build`].
15 ///
16 /// New instances of `Builder` are obtained via [`Builder::new`].
17 ///
18 /// See function level documentation for details on the various configuration
19 /// settings.
20 ///
21 /// [`build`]: #method.build
22 /// [`Builder::new`]: #method.new
23 ///
24 /// # Examples
25 ///
26 /// ```
27 /// use tokio::runtime::Builder;
28 ///
29 /// fn main() {
30 ///     // build runtime
31 ///     let runtime = Builder::new()
32 ///         .threaded_scheduler()
33 ///         .core_threads(4)
34 ///         .thread_name("my-custom-name")
35 ///         .thread_stack_size(3 * 1024 * 1024)
36 ///         .build()
37 ///         .unwrap();
38 ///
39 ///     // use runtime ...
40 /// }
41 /// ```
42 pub struct Builder {
43     /// The task execution model to use.
44     kind: Kind,
45 
46     /// Whether or not to enable the I/O driver
47     enable_io: bool,
48 
49     /// Whether or not to enable the time driver
50     enable_time: bool,
51 
52     /// The number of worker threads, used by Runtime.
53     ///
54     /// Only used when not using the current-thread executor.
55     core_threads: Option<usize>,
56 
57     /// Cap on thread usage.
58     max_threads: usize,
59 
60     /// Name used for threads spawned by the runtime.
61     pub(super) thread_name: String,
62 
63     /// Stack size used for threads spawned by the runtime.
64     pub(super) thread_stack_size: Option<usize>,
65 
66     /// Callback to run after each thread starts.
67     pub(super) after_start: Option<Callback>,
68 
69     /// To run before each worker thread stops
70     pub(super) before_stop: Option<Callback>,
71 
72     /// Max throttling duration
73     #[cfg(feature = "rt-core")]
74     pub(super) max_throttling: Option<std::time::Duration>,
75 }
76 
77 #[derive(Debug, Clone, Copy)]
78 enum Kind {
79     Shell,
80     #[cfg(feature = "rt-core")]
81     Basic,
82     #[cfg(feature = "rt-threaded")]
83     ThreadPool,
84 }
85 
86 impl Builder {
87     /// Returns a new runtime builder initialized with default configuration
88     /// values.
89     ///
90     /// Configuration methods can be chained on the return value.
new() -> Builder91     pub fn new() -> Builder {
92         Builder {
93             // No task execution by default
94             kind: Kind::Shell,
95 
96             // I/O defaults to "off"
97             enable_io: false,
98 
99             // Time defaults to "off"
100             enable_time: false,
101 
102             // Default to lazy auto-detection (one thread per CPU core)
103             core_threads: None,
104 
105             max_threads: 512,
106 
107             // Default thread name
108             thread_name: "tokio-runtime-worker".into(),
109 
110             // Do not set a stack size by default
111             thread_stack_size: None,
112 
113             // No worker thread callbacks
114             after_start: None,
115             before_stop: None,
116 
117             // No throttling by default
118             #[cfg(feature = "rt-core")]
119             max_throttling: None,
120         }
121     }
122 
123     /// Enables both I/O and time drivers.
124     ///
125     /// Doing this is a shorthand for calling `enable_io` and `enable_time`
126     /// individually. If additional components are added to Tokio in the future,
127     /// `enable_all` will include these future components.
128     ///
129     /// # Examples
130     ///
131     /// ```
132     /// use tokio::runtime;
133     ///
134     /// let rt = runtime::Builder::new()
135     ///     .threaded_scheduler()
136     ///     .enable_all()
137     ///     .build()
138     ///     .unwrap();
139     /// ```
enable_all(&mut self) -> &mut Self140     pub fn enable_all(&mut self) -> &mut Self {
141         #[cfg(feature = "io-driver")]
142         self.enable_io();
143         #[cfg(feature = "time")]
144         self.enable_time();
145 
146         self
147     }
148 
149     #[deprecated(note = "In future will be replaced by core_threads method")]
150     /// Sets the maximum number of worker threads for the `Runtime`'s thread pool.
151     ///
152     /// This must be a number between 1 and 32,768 though it is advised to keep
153     /// this value on the smaller side.
154     ///
155     /// The default value is the number of cores available to the system.
num_threads(&mut self, val: usize) -> &mut Self156     pub fn num_threads(&mut self, val: usize) -> &mut Self {
157         self.core_threads = Some(val);
158         self
159     }
160 
161     /// Sets the core number of worker threads for the `Runtime`'s thread pool.
162     ///
163     /// This should be a number between 1 and 32,768 though it is advised to keep
164     /// this value on the smaller side.
165     ///
166     /// The default value is the number of cores available to the system.
167     ///
168     /// These threads will be always active and running.
169     ///
170     /// # Examples
171     ///
172     /// ```
173     /// use tokio::runtime;
174     ///
175     /// let rt = runtime::Builder::new()
176     ///     .threaded_scheduler()
177     ///     .core_threads(4)
178     ///     .build()
179     ///     .unwrap();
180     /// ```
core_threads(&mut self, val: usize) -> &mut Self181     pub fn core_threads(&mut self, val: usize) -> &mut Self {
182         assert_ne!(val, 0, "Core threads cannot be zero");
183         self.core_threads = Some(val);
184         self
185     }
186 
187     /// Specifies limit for threads, spawned by the Runtime.
188     ///
189     /// This is number of threads to be used by Runtime, including `core_threads`
190     /// Having `max_threads` less than `core_threads` results in invalid configuration
191     /// when building multi-threaded `Runtime`, which would cause a panic.
192     ///
193     /// Similarly to the `core_threads`, this number should be between 1 and 32,768.
194     ///
195     /// The default value is 512.
196     ///
197     /// When multi-threaded runtime is not used, will act as limit on additional threads.
198     ///
199     /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
200     /// blocking annotations) as `max_threads - core_threads`.
max_threads(&mut self, val: usize) -> &mut Self201     pub fn max_threads(&mut self, val: usize) -> &mut Self {
202         assert_ne!(val, 0, "Thread limit cannot be zero");
203         self.max_threads = val;
204         self
205     }
206 
207     /// Sets name of threads spawned by the `Runtime`'s thread pool.
208     ///
209     /// The default name is "tokio-runtime-worker".
210     ///
211     /// # Examples
212     ///
213     /// ```
214     /// # use tokio::runtime;
215     ///
216     /// # pub fn main() {
217     /// let rt = runtime::Builder::new()
218     ///     .thread_name("my-pool")
219     ///     .build();
220     /// # }
221     /// ```
thread_name(&mut self, val: impl Into<String>) -> &mut Self222     pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
223         self.thread_name = val.into();
224         self
225     }
226 
227     /// Sets the stack size (in bytes) for worker threads.
228     ///
229     /// The actual stack size may be greater than this value if the platform
230     /// specifies minimal stack size.
231     ///
232     /// The default stack size for spawned threads is 2 MiB, though this
233     /// particular stack size is subject to change in the future.
234     ///
235     /// # Examples
236     ///
237     /// ```
238     /// # use tokio::runtime;
239     ///
240     /// # pub fn main() {
241     /// let rt = runtime::Builder::new()
242     ///     .threaded_scheduler()
243     ///     .thread_stack_size(32 * 1024)
244     ///     .build();
245     /// # }
246     /// ```
thread_stack_size(&mut self, val: usize) -> &mut Self247     pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
248         self.thread_stack_size = Some(val);
249         self
250     }
251 
252     /// Executes function `f` after each thread is started but before it starts
253     /// doing work.
254     ///
255     /// This is intended for bookkeeping and monitoring use cases.
256     ///
257     /// # Examples
258     ///
259     /// ```
260     /// # use tokio::runtime;
261     ///
262     /// # pub fn main() {
263     /// let runtime = runtime::Builder::new()
264     ///     .threaded_scheduler()
265     ///     .on_thread_start(|| {
266     ///         println!("thread started");
267     ///     })
268     ///     .build();
269     /// # }
270     /// ```
271     #[cfg(not(loom))]
on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,272     pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
273     where
274         F: Fn() + Send + Sync + 'static,
275     {
276         self.after_start = Some(Arc::new(f));
277         self
278     }
279 
280     /// Executes function `f` before each thread stops.
281     ///
282     /// This is intended for bookkeeping and monitoring use cases.
283     ///
284     /// # Examples
285     ///
286     /// ```
287     /// # use tokio::runtime;
288     ///
289     /// # pub fn main() {
290     /// let runtime = runtime::Builder::new()
291     ///     .threaded_scheduler()
292     ///     .on_thread_stop(|| {
293     ///         println!("thread stopping");
294     ///     })
295     ///     .build();
296     /// # }
297     /// ```
298     #[cfg(not(loom))]
on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,299     pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
300     where
301         F: Fn() + Send + Sync + 'static,
302     {
303         self.before_stop = Some(Arc::new(f));
304         self
305     }
306 
307     /// Sets the maximum throttling duration.
308     ///
309     /// Throttling reduces syscalls & context switches
310     /// by grouping timers, I/O and tasks handling.
311     ///
312     /// The default is to not apply any throttling.
313     ///
314     /// This is only available for the basic scheduler.
315     ///
316     /// # Examples
317     ///
318     /// ```
319     /// # use tokio::runtime;
320     /// # use std::time::Duration;
321     ///
322     /// # pub fn main() {
323     /// let rt = runtime::Builder::new()
324     ///     .basic_scheduler()
325     ///     .enable_all()
326     ///     .max_throttling(Duration::from_millis(20))
327     ///     .build();
328     /// # }
329     /// ```
330     #[cfg(feature = "rt-core")]
max_throttling(&mut self, dur: Duration) -> &mut Self331     pub fn max_throttling(&mut self, dur: Duration) -> &mut Self {
332         self.max_throttling = Some(dur);
333         self
334     }
335 
336     /// Creates the configured `Runtime`.
337     ///
338     /// The returned `ThreadPool` instance is ready to spawn tasks.
339     ///
340     /// # Examples
341     ///
342     /// ```
343     /// use tokio::runtime::Builder;
344     ///
345     /// let mut rt = Builder::new().build().unwrap();
346     ///
347     /// rt.block_on(async {
348     ///     println!("Hello from the Tokio runtime");
349     /// });
350     /// ```
build(&mut self) -> io::Result<Runtime>351     pub fn build(&mut self) -> io::Result<Runtime> {
352         match self.kind {
353             Kind::Shell => self.build_shell_runtime(),
354             #[cfg(feature = "rt-core")]
355             Kind::Basic => self.build_basic_runtime(),
356             #[cfg(feature = "rt-threaded")]
357             Kind::ThreadPool => self.build_threaded_runtime(),
358         }
359     }
360 
build_shell_runtime(&mut self) -> io::Result<Runtime>361     fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
362         use crate::runtime::Kind;
363 
364         let clock = time::create_clock();
365 
366         // Create I/O driver
367         let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
368         let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
369 
370         let spawner = Spawner::Shell;
371 
372         let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
373         let blocking_spawner = blocking_pool.spawner().clone();
374 
375         Ok(Runtime {
376             kind: Kind::Shell(Shell::new(driver)),
377             handle: Handle {
378                 spawner,
379                 io_handle,
380                 time_handle,
381                 clock,
382                 blocking_spawner,
383             },
384             blocking_pool,
385         })
386     }
387 }
388 
389 cfg_io_driver! {
390     impl Builder {
391         /// Enables the I/O driver.
392         ///
393         /// Doing this enables using net, process, signal, and some I/O types on
394         /// the runtime.
395         ///
396         /// # Examples
397         ///
398         /// ```
399         /// use tokio::runtime;
400         ///
401         /// let rt = runtime::Builder::new()
402         ///     .enable_io()
403         ///     .build()
404         ///     .unwrap();
405         /// ```
406         pub fn enable_io(&mut self) -> &mut Self {
407             self.enable_io = true;
408             self
409         }
410     }
411 }
412 
413 cfg_time! {
414     impl Builder {
415         /// Enables the time driver.
416         ///
417         /// Doing this enables using `tokio::time` on the runtime.
418         ///
419         /// # Examples
420         ///
421         /// ```
422         /// use tokio::runtime;
423         ///
424         /// let rt = runtime::Builder::new()
425         ///     .enable_time()
426         ///     .build()
427         ///     .unwrap();
428         /// ```
429         pub fn enable_time(&mut self) -> &mut Self {
430             self.enable_time = true;
431             self
432         }
433     }
434 }
435 
436 cfg_rt_core! {
437     impl Builder {
438         /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread.
439         ///
440         /// The executor and all necessary drivers will all be run on the current
441         /// thread during `block_on` calls.
442         ///
443         /// See also [the module level documentation][1], which has a section on scheduler
444         /// types.
445         ///
446         /// [1]: index.html#runtime-configurations
447         pub fn basic_scheduler(&mut self) -> &mut Self {
448             self.kind = Kind::Basic;
449             self
450         }
451 
452         fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
453             use crate::runtime::{BasicScheduler, Kind};
454 
455             let clock = time::create_clock();
456 
457             let max_throttling = self.max_throttling.take().filter(|max_throttling| max_throttling.as_millis() > 0);
458 
459             let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
460             let (driver, time_handle) = time::create_throttling_driver(
461                 self.enable_time,
462                 io_driver,
463                 clock.clone(),
464                 max_throttling,
465             );
466 
467             // And now put a single-threaded scheduler on top of the timer. When
468             // there are no futures ready to do something, it'll let the timer or
469             // the reactor to generate some new stimuli for the futures to continue
470             // in their life.
471             let scheduler = BasicScheduler::new(driver, max_throttling);
472             let spawner = Spawner::Basic(scheduler.spawner());
473 
474             // Blocking pool
475             let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
476             let blocking_spawner = blocking_pool.spawner().clone();
477 
478             Ok(Runtime {
479                 kind: Kind::Basic(scheduler),
480                 handle: Handle {
481                     spawner,
482                     io_handle,
483                     time_handle,
484                     clock,
485                     blocking_spawner,
486                 },
487                 blocking_pool,
488             })
489         }
490     }
491 }
492 
493 cfg_rt_threaded! {
494     impl Builder {
495         /// Sets runtime to use a multi-threaded scheduler for executing tasks.
496         ///
497         /// See also [the module level documentation][1], which has a section on scheduler
498         /// types.
499         ///
500         /// [1]: index.html#runtime-configurations
501         pub fn threaded_scheduler(&mut self) -> &mut Self {
502             self.kind = Kind::ThreadPool;
503             self
504         }
505 
506         fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
507             use crate::runtime::{Kind, ThreadPool};
508             use crate::runtime::park::Parker;
509 
510             let core_threads = self.core_threads.unwrap_or_else(crate::loom::sys::num_cpus);
511             assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
512 
513             let clock = time::create_clock();
514 
515             let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
516             let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
517             let (scheduler, workers) = ThreadPool::new(core_threads, Parker::new(driver));
518             let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
519 
520             // Create the blocking pool
521             let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
522             let blocking_spawner = blocking_pool.spawner().clone();
523 
524             // Create the runtime handle
525             let handle = Handle {
526                 spawner,
527                 io_handle,
528                 time_handle,
529                 clock,
530                 blocking_spawner,
531             };
532 
533             // Spawn the thread pool workers
534             workers.spawn(&handle);
535 
536             // FIXME(fengalin): use max_throttling?
537             Ok(Runtime {
538                 kind: Kind::ThreadPool(scheduler),
539                 handle,
540                 blocking_pool,
541             })
542         }
543     }
544 }
545 
546 impl Default for Builder {
default() -> Self547     fn default() -> Self {
548         Self::new()
549     }
550 }
551 
552 impl fmt::Debug for Builder {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result553     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
554         fmt.debug_struct("Builder")
555             .field("kind", &self.kind)
556             .field("core_threads", &self.core_threads)
557             .field("max_threads", &self.max_threads)
558             .field("thread_name", &self.thread_name)
559             .field("thread_stack_size", &self.thread_stack_size)
560             .field("after_start", &self.after_start.as_ref().map(|_| "..."))
561             .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
562             .finish()
563     }
564 }
565