1 use crate::runtime::handle::Handle;
2 use crate::runtime::{blocking, driver, Callback, Runtime, Spawner};
3 
4 use std::fmt;
5 use std::io;
6 use std::time::Duration;
7 
8 /// Builds Tokio Runtime with custom configuration values.
9 ///
10 /// Methods can be chained in order to set the configuration values. The
11 /// Runtime is constructed by calling [`build`].
12 ///
13 /// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
14 /// or [`Builder::new_current_thread`].
15 ///
16 /// See function level documentation for details on the various configuration
17 /// settings.
18 ///
19 /// [`build`]: method@Self::build
20 /// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
21 /// [`Builder::new_current_thread`]: method@Self::new_current_thread
22 ///
23 /// # Examples
24 ///
25 /// ```
26 /// use tokio::runtime::Builder;
27 ///
28 /// fn main() {
29 ///     // build runtime
30 ///     let runtime = Builder::new_multi_thread()
31 ///         .worker_threads(4)
32 ///         .thread_name("my-custom-name")
33 ///         .thread_stack_size(3 * 1024 * 1024)
34 ///         .build()
35 ///         .unwrap();
36 ///
37 ///     // use runtime ...
38 /// }
39 /// ```
40 pub struct Builder {
41     /// Runtime type
42     kind: Kind,
43 
44     /// Whether or not to enable the I/O driver
45     enable_io: bool,
46 
47     /// Whether or not to enable the time driver
48     enable_time: bool,
49 
50     /// Whether or not the clock should start paused.
51     start_paused: bool,
52 
53     /// The number of worker threads, used by Runtime.
54     ///
55     /// Only used when not using the current-thread executor.
56     worker_threads: Option<usize>,
57 
58     /// Cap on thread usage.
59     max_blocking_threads: usize,
60 
61     /// Name fn used for threads spawned by the runtime.
62     pub(super) thread_name: ThreadNameFn,
63 
64     /// Stack size used for threads spawned by the runtime.
65     pub(super) thread_stack_size: Option<usize>,
66 
67     /// Callback to run after each thread starts.
68     pub(super) after_start: Option<Callback>,
69 
70     /// To run before each worker thread stops
71     pub(super) before_stop: Option<Callback>,
72 
73     /// Customizable keep alive timeout for BlockingPool
74     pub(super) keep_alive: Option<Duration>,
75 }
76 
77 pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
78 
79 pub(crate) enum Kind {
80     CurrentThread,
81     #[cfg(feature = "rt-multi-thread")]
82     MultiThread,
83 }
84 
85 impl Builder {
86     /// Returns a new builder with the current thread scheduler selected.
87     ///
88     /// Configuration methods can be chained on the return value.
89     ///
90     /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
91     /// [`LocalSet`].
92     ///
93     /// [`LocalSet`]: crate::task::LocalSet
new_current_thread() -> Builder94     pub fn new_current_thread() -> Builder {
95         Builder::new(Kind::CurrentThread)
96     }
97 
98     /// Returns a new builder with the multi thread scheduler selected.
99     ///
100     /// Configuration methods can be chained on the return value.
101     #[cfg(feature = "rt-multi-thread")]
102     #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
new_multi_thread() -> Builder103     pub fn new_multi_thread() -> Builder {
104         Builder::new(Kind::MultiThread)
105     }
106 
107     /// Returns a new runtime builder initialized with default configuration
108     /// values.
109     ///
110     /// Configuration methods can be chained on the return value.
new(kind: Kind) -> Builder111     pub(crate) fn new(kind: Kind) -> Builder {
112         Builder {
113             kind,
114 
115             // I/O defaults to "off"
116             enable_io: false,
117 
118             // Time defaults to "off"
119             enable_time: false,
120 
121             // The clock starts not-paused
122             start_paused: false,
123 
124             // Default to lazy auto-detection (one thread per CPU core)
125             worker_threads: None,
126 
127             max_blocking_threads: 512,
128 
129             // Default thread name
130             thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
131 
132             // Do not set a stack size by default
133             thread_stack_size: None,
134 
135             // No worker thread callbacks
136             after_start: None,
137             before_stop: None,
138 
139             keep_alive: None,
140         }
141     }
142 
143     /// Enables both I/O and time drivers.
144     ///
145     /// Doing this is a shorthand for calling `enable_io` and `enable_time`
146     /// individually. If additional components are added to Tokio in the future,
147     /// `enable_all` will include these future components.
148     ///
149     /// # Examples
150     ///
151     /// ```
152     /// use tokio::runtime;
153     ///
154     /// let rt = runtime::Builder::new_multi_thread()
155     ///     .enable_all()
156     ///     .build()
157     ///     .unwrap();
158     /// ```
enable_all(&mut self) -> &mut Self159     pub fn enable_all(&mut self) -> &mut Self {
160         #[cfg(any(feature = "net", feature = "process", all(unix, feature = "signal")))]
161         self.enable_io();
162         #[cfg(feature = "time")]
163         self.enable_time();
164 
165         self
166     }
167 
168     /// Sets the number of worker threads the `Runtime` will use.
169     ///
170     /// This can be any number above 0 though it is advised to keep this value
171     /// on the smaller side.
172     ///
173     /// # Default
174     ///
175     /// The default value is the number of cores available to the system.
176     ///
177     /// # Panic
178     ///
179     /// When using the `current_thread` runtime this method will panic, since
180     /// those variants do not allow setting worker thread counts.
181     ///
182     ///
183     /// # Examples
184     ///
185     /// ## Multi threaded runtime with 4 threads
186     ///
187     /// ```
188     /// use tokio::runtime;
189     ///
190     /// // This will spawn a work-stealing runtime with 4 worker threads.
191     /// let rt = runtime::Builder::new_multi_thread()
192     ///     .worker_threads(4)
193     ///     .build()
194     ///     .unwrap();
195     ///
196     /// rt.spawn(async move {});
197     /// ```
198     ///
199     /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
200     ///
201     /// ```
202     /// use tokio::runtime;
203     ///
204     /// // Create a runtime that _must_ be driven from a call
205     /// // to `Runtime::block_on`.
206     /// let rt = runtime::Builder::new_current_thread()
207     ///     .build()
208     ///     .unwrap();
209     ///
210     /// // This will run the runtime and future on the current thread
211     /// rt.block_on(async move {});
212     /// ```
213     ///
214     /// # Panic
215     ///
216     /// This will panic if `val` is not larger than `0`.
worker_threads(&mut self, val: usize) -> &mut Self217     pub fn worker_threads(&mut self, val: usize) -> &mut Self {
218         assert!(val > 0, "Worker threads cannot be set to 0");
219         self.worker_threads = Some(val);
220         self
221     }
222 
223     /// Specifies the limit for additional threads spawned by the Runtime.
224     ///
225     /// These threads are used for blocking operations like tasks spawned
226     /// through [`spawn_blocking`]. Unlike the [`worker_threads`], they are not
227     /// always active and will exit if left idle for too long. You can change
228     /// this timeout duration with [`thread_keep_alive`].
229     ///
230     /// The default value is 512.
231     ///
232     /// # Panic
233     ///
234     /// This will panic if `val` is not larger than `0`.
235     ///
236     /// # Upgrading from 0.x
237     ///
238     /// In old versions `max_threads` limited both blocking and worker threads, but the
239     /// current `max_blocking_threads` does not include async worker threads in the count.
240     ///
241     /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
242     /// [`worker_threads`]: Self::worker_threads
243     /// [`thread_keep_alive`]: Self::thread_keep_alive
244     #[cfg_attr(docsrs, doc(alias = "max_threads"))]
max_blocking_threads(&mut self, val: usize) -> &mut Self245     pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
246         assert!(val > 0, "Max blocking threads cannot be set to 0");
247         self.max_blocking_threads = val;
248         self
249     }
250 
251     /// Sets name of threads spawned by the `Runtime`'s thread pool.
252     ///
253     /// The default name is "tokio-runtime-worker".
254     ///
255     /// # Examples
256     ///
257     /// ```
258     /// # use tokio::runtime;
259     ///
260     /// # pub fn main() {
261     /// let rt = runtime::Builder::new_multi_thread()
262     ///     .thread_name("my-pool")
263     ///     .build();
264     /// # }
265     /// ```
thread_name(&mut self, val: impl Into<String>) -> &mut Self266     pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
267         let val = val.into();
268         self.thread_name = std::sync::Arc::new(move || val.clone());
269         self
270     }
271 
272     /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
273     ///
274     /// The default name fn is `|| "tokio-runtime-worker".into()`.
275     ///
276     /// # Examples
277     ///
278     /// ```
279     /// # use tokio::runtime;
280     /// # use std::sync::atomic::{AtomicUsize, Ordering};
281     ///
282     /// # pub fn main() {
283     /// let rt = runtime::Builder::new_multi_thread()
284     ///     .thread_name_fn(|| {
285     ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
286     ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
287     ///        format!("my-pool-{}", id)
288     ///     })
289     ///     .build();
290     /// # }
291     /// ```
thread_name_fn<F>(&mut self, f: F) -> &mut Self where F: Fn() -> String + Send + Sync + 'static,292     pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
293     where
294         F: Fn() -> String + Send + Sync + 'static,
295     {
296         self.thread_name = std::sync::Arc::new(f);
297         self
298     }
299 
300     /// Sets the stack size (in bytes) for worker threads.
301     ///
302     /// The actual stack size may be greater than this value if the platform
303     /// specifies minimal stack size.
304     ///
305     /// The default stack size for spawned threads is 2 MiB, though this
306     /// particular stack size is subject to change in the future.
307     ///
308     /// # Examples
309     ///
310     /// ```
311     /// # use tokio::runtime;
312     ///
313     /// # pub fn main() {
314     /// let rt = runtime::Builder::new_multi_thread()
315     ///     .thread_stack_size(32 * 1024)
316     ///     .build();
317     /// # }
318     /// ```
thread_stack_size(&mut self, val: usize) -> &mut Self319     pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
320         self.thread_stack_size = Some(val);
321         self
322     }
323 
324     /// Executes function `f` after each thread is started but before it starts
325     /// doing work.
326     ///
327     /// This is intended for bookkeeping and monitoring use cases.
328     ///
329     /// # Examples
330     ///
331     /// ```
332     /// # use tokio::runtime;
333     ///
334     /// # pub fn main() {
335     /// let runtime = runtime::Builder::new_multi_thread()
336     ///     .on_thread_start(|| {
337     ///         println!("thread started");
338     ///     })
339     ///     .build();
340     /// # }
341     /// ```
342     #[cfg(not(loom))]
on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,343     pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
344     where
345         F: Fn() + Send + Sync + 'static,
346     {
347         self.after_start = Some(std::sync::Arc::new(f));
348         self
349     }
350 
351     /// Executes function `f` before each thread stops.
352     ///
353     /// This is intended for bookkeeping and monitoring use cases.
354     ///
355     /// # Examples
356     ///
357     /// ```
358     /// # use tokio::runtime;
359     ///
360     /// # pub fn main() {
361     /// let runtime = runtime::Builder::new_multi_thread()
362     ///     .on_thread_stop(|| {
363     ///         println!("thread stopping");
364     ///     })
365     ///     .build();
366     /// # }
367     /// ```
368     #[cfg(not(loom))]
on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,369     pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
370     where
371         F: Fn() + Send + Sync + 'static,
372     {
373         self.before_stop = Some(std::sync::Arc::new(f));
374         self
375     }
376 
377     /// Creates the configured `Runtime`.
378     ///
379     /// The returned `Runtime` instance is ready to spawn tasks.
380     ///
381     /// # Examples
382     ///
383     /// ```
384     /// use tokio::runtime::Builder;
385     ///
386     /// let rt  = Builder::new_multi_thread().build().unwrap();
387     ///
388     /// rt.block_on(async {
389     ///     println!("Hello from the Tokio runtime");
390     /// });
391     /// ```
build(&mut self) -> io::Result<Runtime>392     pub fn build(&mut self) -> io::Result<Runtime> {
393         match &self.kind {
394             Kind::CurrentThread => self.build_basic_runtime(),
395             #[cfg(feature = "rt-multi-thread")]
396             Kind::MultiThread => self.build_threaded_runtime(),
397         }
398     }
399 
get_cfg(&self) -> driver::Cfg400     fn get_cfg(&self) -> driver::Cfg {
401         driver::Cfg {
402             enable_pause_time: match self.kind {
403                 Kind::CurrentThread => true,
404                 #[cfg(feature = "rt-multi-thread")]
405                 Kind::MultiThread => false,
406             },
407             enable_io: self.enable_io,
408             enable_time: self.enable_time,
409             start_paused: self.start_paused,
410         }
411     }
412 
413     /// Sets a custom timeout for a thread in the blocking pool.
414     ///
415     /// By default, the timeout for a thread is set to 10 seconds. This can
416     /// be overridden using .thread_keep_alive().
417     ///
418     /// # Example
419     ///
420     /// ```
421     /// # use tokio::runtime;
422     /// # use std::time::Duration;
423     ///
424     /// # pub fn main() {
425     /// let rt = runtime::Builder::new_multi_thread()
426     ///     .thread_keep_alive(Duration::from_millis(100))
427     ///     .build();
428     /// # }
429     /// ```
thread_keep_alive(&mut self, duration: Duration) -> &mut Self430     pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
431         self.keep_alive = Some(duration);
432         self
433     }
434 
build_basic_runtime(&mut self) -> io::Result<Runtime>435     fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
436         use crate::runtime::{BasicScheduler, Kind};
437 
438         let (driver, resources) = driver::Driver::new(self.get_cfg())?;
439 
440         // And now put a single-threaded scheduler on top of the timer. When
441         // there are no futures ready to do something, it'll let the timer or
442         // the reactor to generate some new stimuli for the futures to continue
443         // in their life.
444         let scheduler = BasicScheduler::new(driver);
445         let spawner = Spawner::Basic(scheduler.spawner().clone());
446 
447         // Blocking pool
448         let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
449         let blocking_spawner = blocking_pool.spawner().clone();
450 
451         Ok(Runtime {
452             kind: Kind::CurrentThread(scheduler),
453             handle: Handle {
454                 spawner,
455                 io_handle: resources.io_handle,
456                 time_handle: resources.time_handle,
457                 signal_handle: resources.signal_handle,
458                 clock: resources.clock,
459                 blocking_spawner,
460             },
461             blocking_pool,
462         })
463     }
464 }
465 
466 cfg_io_driver! {
467     impl Builder {
468         /// Enables the I/O driver.
469         ///
470         /// Doing this enables using net, process, signal, and some I/O types on
471         /// the runtime.
472         ///
473         /// # Examples
474         ///
475         /// ```
476         /// use tokio::runtime;
477         ///
478         /// let rt = runtime::Builder::new_multi_thread()
479         ///     .enable_io()
480         ///     .build()
481         ///     .unwrap();
482         /// ```
483         pub fn enable_io(&mut self) -> &mut Self {
484             self.enable_io = true;
485             self
486         }
487     }
488 }
489 
490 cfg_time! {
491     impl Builder {
492         /// Enables the time driver.
493         ///
494         /// Doing this enables using `tokio::time` on the runtime.
495         ///
496         /// # Examples
497         ///
498         /// ```
499         /// use tokio::runtime;
500         ///
501         /// let rt = runtime::Builder::new_multi_thread()
502         ///     .enable_time()
503         ///     .build()
504         ///     .unwrap();
505         /// ```
506         pub fn enable_time(&mut self) -> &mut Self {
507             self.enable_time = true;
508             self
509         }
510     }
511 }
512 
513 cfg_test_util! {
514     impl Builder {
515         /// Controls if the runtime's clock starts paused or advancing.
516         ///
517         /// Pausing time requires the current-thread runtime; construction of
518         /// the runtime will panic otherwise.
519         ///
520         /// # Examples
521         ///
522         /// ```
523         /// use tokio::runtime;
524         ///
525         /// let rt = runtime::Builder::new_current_thread()
526         ///     .enable_time()
527         ///     .start_paused(true)
528         ///     .build()
529         ///     .unwrap();
530         /// ```
531         pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
532             self.start_paused = start_paused;
533             self
534         }
535     }
536 }
537 
538 cfg_rt_multi_thread! {
539     impl Builder {
540         fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
541             use crate::loom::sys::num_cpus;
542             use crate::runtime::{Kind, ThreadPool};
543             use crate::runtime::park::Parker;
544 
545             let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
546 
547             let (driver, resources) = driver::Driver::new(self.get_cfg())?;
548 
549             let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
550             let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
551 
552             // Create the blocking pool
553             let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
554             let blocking_spawner = blocking_pool.spawner().clone();
555 
556             // Create the runtime handle
557             let handle = Handle {
558                 spawner,
559                 io_handle: resources.io_handle,
560                 time_handle: resources.time_handle,
561                 signal_handle: resources.signal_handle,
562                 clock: resources.clock,
563                 blocking_spawner,
564             };
565 
566             // Spawn the thread pool workers
567             let _enter = crate::runtime::context::enter(handle.clone());
568             launch.launch();
569 
570             Ok(Runtime {
571                 kind: Kind::ThreadPool(scheduler),
572                 handle,
573                 blocking_pool,
574             })
575         }
576     }
577 }
578 
579 impl fmt::Debug for Builder {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result580     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
581         fmt.debug_struct("Builder")
582             .field("worker_threads", &self.worker_threads)
583             .field("max_blocking_threads", &self.max_blocking_threads)
584             .field(
585                 "thread_name",
586                 &"<dyn Fn() -> String + Send + Sync + 'static>",
587             )
588             .field("thread_stack_size", &self.thread_stack_size)
589             .field("after_start", &self.after_start.as_ref().map(|_| "..."))
590             .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
591             .finish()
592     }
593 }
594