1 use crate::runtime::handle::Handle;
2 use crate::runtime::shell::Shell;
3 use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
4 
5 use std::fmt;
6 #[cfg(not(loom))]
7 use std::sync::Arc;
8 
9 /// Builds Tokio Runtime with custom configuration values.
10 ///
11 /// Methods can be chained in order to set the configuration values. The
12 /// Runtime is constructed by calling [`build`].
13 ///
14 /// New instances of `Builder` are obtained via [`Builder::new`].
15 ///
16 /// See function level documentation for details on the various configuration
17 /// settings.
18 ///
19 /// [`build`]: method@Self::build
20 /// [`Builder::new`]: method@Self::new
21 ///
22 /// # Examples
23 ///
24 /// ```
25 /// use tokio::runtime::Builder;
26 ///
27 /// fn main() {
28 ///     // build runtime
29 ///     let runtime = Builder::new()
30 ///         .threaded_scheduler()
31 ///         .core_threads(4)
32 ///         .thread_name("my-custom-name")
33 ///         .thread_stack_size(3 * 1024 * 1024)
34 ///         .build()
35 ///         .unwrap();
36 ///
37 ///     // use runtime ...
38 /// }
39 /// ```
40 pub struct Builder {
41     /// The task execution model to use.
42     kind: Kind,
43 
44     /// Whether or not to enable the I/O driver
45     enable_io: bool,
46 
47     /// Whether or not to enable the time driver
48     enable_time: bool,
49 
50     /// The number of worker threads, used by Runtime.
51     ///
52     /// Only used when not using the current-thread executor.
53     core_threads: Option<usize>,
54 
55     /// Cap on thread usage.
56     max_threads: usize,
57 
58     /// Name used for threads spawned by the runtime.
59     pub(super) thread_name: String,
60 
61     /// Stack size used for threads spawned by the runtime.
62     pub(super) thread_stack_size: Option<usize>,
63 
64     /// Callback to run after each thread starts.
65     pub(super) after_start: Option<Callback>,
66 
67     /// To run before each worker thread stops
68     pub(super) before_stop: Option<Callback>,
69 }
70 
71 #[derive(Debug, Clone, Copy)]
72 enum Kind {
73     Shell,
74     #[cfg(feature = "rt-core")]
75     Basic,
76     #[cfg(feature = "rt-threaded")]
77     ThreadPool,
78 }
79 
80 impl Builder {
81     /// Returns a new runtime builder initialized with default configuration
82     /// values.
83     ///
84     /// Configuration methods can be chained on the return value.
new() -> Builder85     pub fn new() -> Builder {
86         Builder {
87             // No task execution by default
88             kind: Kind::Shell,
89 
90             // I/O defaults to "off"
91             enable_io: false,
92 
93             // Time defaults to "off"
94             enable_time: false,
95 
96             // Default to lazy auto-detection (one thread per CPU core)
97             core_threads: None,
98 
99             max_threads: 512,
100 
101             // Default thread name
102             thread_name: "tokio-runtime-worker".into(),
103 
104             // Do not set a stack size by default
105             thread_stack_size: None,
106 
107             // No worker thread callbacks
108             after_start: None,
109             before_stop: None,
110         }
111     }
112 
113     /// Enables both I/O and time drivers.
114     ///
115     /// Doing this is a shorthand for calling `enable_io` and `enable_time`
116     /// individually. If additional components are added to Tokio in the future,
117     /// `enable_all` will include these future components.
118     ///
119     /// # Examples
120     ///
121     /// ```
122     /// use tokio::runtime;
123     ///
124     /// let rt = runtime::Builder::new()
125     ///     .threaded_scheduler()
126     ///     .enable_all()
127     ///     .build()
128     ///     .unwrap();
129     /// ```
enable_all(&mut self) -> &mut Self130     pub fn enable_all(&mut self) -> &mut Self {
131         #[cfg(feature = "io-driver")]
132         self.enable_io();
133         #[cfg(feature = "time")]
134         self.enable_time();
135 
136         self
137     }
138 
139     #[deprecated(note = "In future will be replaced by core_threads method")]
140     /// Sets the maximum number of worker threads for the `Runtime`'s thread pool.
141     ///
142     /// This must be a number between 1 and 32,768 though it is advised to keep
143     /// this value on the smaller side.
144     ///
145     /// The default value is the number of cores available to the system.
num_threads(&mut self, val: usize) -> &mut Self146     pub fn num_threads(&mut self, val: usize) -> &mut Self {
147         self.core_threads = Some(val);
148         self
149     }
150 
151     /// Sets the core number of worker threads for the `Runtime`'s thread pool.
152     ///
153     /// This should be a number between 1 and 32,768 though it is advised to keep
154     /// this value on the smaller side.
155     ///
156     /// The default value is the number of cores available to the system.
157     ///
158     /// These threads will be always active and running.
159     ///
160     /// # Examples
161     ///
162     /// ```
163     /// use tokio::runtime;
164     ///
165     /// let rt = runtime::Builder::new()
166     ///     .threaded_scheduler()
167     ///     .core_threads(4)
168     ///     .build()
169     ///     .unwrap();
170     /// ```
core_threads(&mut self, val: usize) -> &mut Self171     pub fn core_threads(&mut self, val: usize) -> &mut Self {
172         assert_ne!(val, 0, "Core threads cannot be zero");
173         self.core_threads = Some(val);
174         self
175     }
176 
177     /// Specifies limit for threads, spawned by the Runtime.
178     ///
179     /// This is number of threads to be used by Runtime, including `core_threads`
180     /// Having `max_threads` less than `core_threads` results in invalid configuration
181     /// when building multi-threaded `Runtime`, which would cause a panic.
182     ///
183     /// Similarly to the `core_threads`, this number should be between 1 and 32,768.
184     ///
185     /// The default value is 512.
186     ///
187     /// When multi-threaded runtime is not used, will act as limit on additional threads.
188     ///
189     /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
190     /// blocking annotations) as `max_threads - core_threads`.
max_threads(&mut self, val: usize) -> &mut Self191     pub fn max_threads(&mut self, val: usize) -> &mut Self {
192         assert_ne!(val, 0, "Thread limit cannot be zero");
193         self.max_threads = val;
194         self
195     }
196 
197     /// Sets name of threads spawned by the `Runtime`'s thread pool.
198     ///
199     /// The default name is "tokio-runtime-worker".
200     ///
201     /// # Examples
202     ///
203     /// ```
204     /// # use tokio::runtime;
205     ///
206     /// # pub fn main() {
207     /// let rt = runtime::Builder::new()
208     ///     .thread_name("my-pool")
209     ///     .build();
210     /// # }
211     /// ```
thread_name(&mut self, val: impl Into<String>) -> &mut Self212     pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
213         self.thread_name = val.into();
214         self
215     }
216 
217     /// Sets the stack size (in bytes) for worker threads.
218     ///
219     /// The actual stack size may be greater than this value if the platform
220     /// specifies minimal stack size.
221     ///
222     /// The default stack size for spawned threads is 2 MiB, though this
223     /// particular stack size is subject to change in the future.
224     ///
225     /// # Examples
226     ///
227     /// ```
228     /// # use tokio::runtime;
229     ///
230     /// # pub fn main() {
231     /// let rt = runtime::Builder::new()
232     ///     .threaded_scheduler()
233     ///     .thread_stack_size(32 * 1024)
234     ///     .build();
235     /// # }
236     /// ```
thread_stack_size(&mut self, val: usize) -> &mut Self237     pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
238         self.thread_stack_size = Some(val);
239         self
240     }
241 
242     /// Executes function `f` after each thread is started but before it starts
243     /// doing work.
244     ///
245     /// This is intended for bookkeeping and monitoring use cases.
246     ///
247     /// # Examples
248     ///
249     /// ```
250     /// # use tokio::runtime;
251     ///
252     /// # pub fn main() {
253     /// let runtime = runtime::Builder::new()
254     ///     .threaded_scheduler()
255     ///     .on_thread_start(|| {
256     ///         println!("thread started");
257     ///     })
258     ///     .build();
259     /// # }
260     /// ```
261     #[cfg(not(loom))]
on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,262     pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
263     where
264         F: Fn() + Send + Sync + 'static,
265     {
266         self.after_start = Some(Arc::new(f));
267         self
268     }
269 
270     /// Executes function `f` before each thread stops.
271     ///
272     /// This is intended for bookkeeping and monitoring use cases.
273     ///
274     /// # Examples
275     ///
276     /// ```
277     /// # use tokio::runtime;
278     ///
279     /// # pub fn main() {
280     /// let runtime = runtime::Builder::new()
281     ///     .threaded_scheduler()
282     ///     .on_thread_stop(|| {
283     ///         println!("thread stopping");
284     ///     })
285     ///     .build();
286     /// # }
287     /// ```
288     #[cfg(not(loom))]
on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,289     pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
290     where
291         F: Fn() + Send + Sync + 'static,
292     {
293         self.before_stop = Some(Arc::new(f));
294         self
295     }
296 
297     /// Creates the configured `Runtime`.
298     ///
299     /// The returned `ThreadPool` instance is ready to spawn tasks.
300     ///
301     /// # Examples
302     ///
303     /// ```
304     /// use tokio::runtime::Builder;
305     ///
306     /// let mut rt = Builder::new().build().unwrap();
307     ///
308     /// rt.block_on(async {
309     ///     println!("Hello from the Tokio runtime");
310     /// });
311     /// ```
build(&mut self) -> io::Result<Runtime>312     pub fn build(&mut self) -> io::Result<Runtime> {
313         match self.kind {
314             Kind::Shell => self.build_shell_runtime(),
315             #[cfg(feature = "rt-core")]
316             Kind::Basic => self.build_basic_runtime(),
317             #[cfg(feature = "rt-threaded")]
318             Kind::ThreadPool => self.build_threaded_runtime(),
319         }
320     }
321 
build_shell_runtime(&mut self) -> io::Result<Runtime>322     fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
323         use crate::runtime::Kind;
324 
325         let clock = time::create_clock();
326 
327         // Create I/O driver
328         let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
329         let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
330 
331         let spawner = Spawner::Shell;
332 
333         let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
334         let blocking_spawner = blocking_pool.spawner().clone();
335 
336         Ok(Runtime {
337             kind: Kind::Shell(Shell::new(driver)),
338             handle: Handle {
339                 spawner,
340                 io_handle,
341                 time_handle,
342                 clock,
343                 blocking_spawner,
344             },
345             blocking_pool,
346         })
347     }
348 }
349 
350 cfg_io_driver! {
351     impl Builder {
352         /// Enables the I/O driver.
353         ///
354         /// Doing this enables using net, process, signal, and some I/O types on
355         /// the runtime.
356         ///
357         /// # Examples
358         ///
359         /// ```
360         /// use tokio::runtime;
361         ///
362         /// let rt = runtime::Builder::new()
363         ///     .enable_io()
364         ///     .build()
365         ///     .unwrap();
366         /// ```
367         pub fn enable_io(&mut self) -> &mut Self {
368             self.enable_io = true;
369             self
370         }
371     }
372 }
373 
374 cfg_time! {
375     impl Builder {
376         /// Enables the time driver.
377         ///
378         /// Doing this enables using `tokio::time` on the runtime.
379         ///
380         /// # Examples
381         ///
382         /// ```
383         /// use tokio::runtime;
384         ///
385         /// let rt = runtime::Builder::new()
386         ///     .enable_time()
387         ///     .build()
388         ///     .unwrap();
389         /// ```
390         pub fn enable_time(&mut self) -> &mut Self {
391             self.enable_time = true;
392             self
393         }
394     }
395 }
396 
397 cfg_rt_core! {
398     impl Builder {
399         /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread.
400         ///
401         /// The executor and all necessary drivers will all be run on the current
402         /// thread during [`block_on`] calls.
403         ///
404         /// See also [the module level documentation][1], which has a section on scheduler
405         /// types.
406         ///
407         /// [1]: index.html#runtime-configurations
408         /// [`block_on`]: Runtime::block_on
409         pub fn basic_scheduler(&mut self) -> &mut Self {
410             self.kind = Kind::Basic;
411             self
412         }
413 
414         fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
415             use crate::runtime::{BasicScheduler, Kind};
416 
417             let clock = time::create_clock();
418 
419             // Create I/O driver
420             let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
421 
422             let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
423 
424             // And now put a single-threaded scheduler on top of the timer. When
425             // there are no futures ready to do something, it'll let the timer or
426             // the reactor to generate some new stimuli for the futures to continue
427             // in their life.
428             let scheduler = BasicScheduler::new(driver);
429             let spawner = Spawner::Basic(scheduler.spawner().clone());
430 
431             // Blocking pool
432             let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
433             let blocking_spawner = blocking_pool.spawner().clone();
434 
435             Ok(Runtime {
436                 kind: Kind::Basic(scheduler),
437                 handle: Handle {
438                     spawner,
439                     io_handle,
440                     time_handle,
441                     clock,
442                     blocking_spawner,
443                 },
444                 blocking_pool,
445             })
446         }
447     }
448 }
449 
450 cfg_rt_threaded! {
451     impl Builder {
452         /// Sets runtime to use a multi-threaded scheduler for executing tasks.
453         ///
454         /// See also [the module level documentation][1], which has a section on scheduler
455         /// types.
456         ///
457         /// [1]: index.html#runtime-configurations
458         pub fn threaded_scheduler(&mut self) -> &mut Self {
459             self.kind = Kind::ThreadPool;
460             self
461         }
462 
463         fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
464             use crate::loom::sys::num_cpus;
465             use crate::runtime::{Kind, ThreadPool};
466             use crate::runtime::park::Parker;
467             use std::cmp;
468 
469             let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
470             assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
471 
472             let clock = time::create_clock();
473 
474             let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
475             let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
476             let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
477             let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
478 
479             // Create the blocking pool
480             let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
481             let blocking_spawner = blocking_pool.spawner().clone();
482 
483             // Create the runtime handle
484             let handle = Handle {
485                 spawner,
486                 io_handle,
487                 time_handle,
488                 clock,
489                 blocking_spawner,
490             };
491 
492             // Spawn the thread pool workers
493             handle.enter(|| launch.launch());
494 
495             Ok(Runtime {
496                 kind: Kind::ThreadPool(scheduler),
497                 handle,
498                 blocking_pool,
499             })
500         }
501     }
502 }
503 
504 impl Default for Builder {
default() -> Self505     fn default() -> Self {
506         Self::new()
507     }
508 }
509 
510 impl fmt::Debug for Builder {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result511     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
512         fmt.debug_struct("Builder")
513             .field("kind", &self.kind)
514             .field("core_threads", &self.core_threads)
515             .field("max_threads", &self.max_threads)
516             .field("thread_name", &self.thread_name)
517             .field("thread_stack_size", &self.thread_stack_size)
518             .field("after_start", &self.after_start.as_ref().map(|_| "..."))
519             .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
520             .finish()
521     }
522 }
523