1 use crate::runtime::handle::Handle;
2 use crate::runtime::{blocking, driver, Callback, Runtime, Spawner};
3 
4 use std::fmt;
5 use std::io;
6 use std::time::Duration;
7 
8 /// Builds Tokio Runtime with custom configuration values.
9 ///
10 /// Methods can be chained in order to set the configuration values. The
11 /// Runtime is constructed by calling [`build`].
12 ///
13 /// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
14 /// or [`Builder::new_current_thread`].
15 ///
16 /// See function level documentation for details on the various configuration
17 /// settings.
18 ///
19 /// [`build`]: method@Self::build
20 /// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
21 /// [`Builder::new_current_thread`]: method@Self::new_current_thread
22 ///
23 /// # Examples
24 ///
25 /// ```
26 /// use tokio::runtime::Builder;
27 ///
28 /// fn main() {
29 ///     // build runtime
30 ///     let runtime = Builder::new_multi_thread()
31 ///         .worker_threads(4)
32 ///         .thread_name("my-custom-name")
33 ///         .thread_stack_size(3 * 1024 * 1024)
34 ///         .build()
35 ///         .unwrap();
36 ///
37 ///     // use runtime ...
38 /// }
39 /// ```
40 pub struct Builder {
41     /// Runtime type
42     kind: Kind,
43 
44     /// Whether or not to enable the I/O driver
45     enable_io: bool,
46 
47     /// Whether or not to enable the time driver
48     enable_time: bool,
49 
50     /// Whether or not the clock should start paused.
51     start_paused: bool,
52 
53     /// The number of worker threads, used by Runtime.
54     ///
55     /// Only used when not using the current-thread executor.
56     worker_threads: Option<usize>,
57 
58     /// Cap on thread usage.
59     max_blocking_threads: usize,
60 
61     /// Name fn used for threads spawned by the runtime.
62     pub(super) thread_name: ThreadNameFn,
63 
64     /// Stack size used for threads spawned by the runtime.
65     pub(super) thread_stack_size: Option<usize>,
66 
67     /// Callback to run after each thread starts.
68     pub(super) after_start: Option<Callback>,
69 
70     /// To run before each worker thread stops
71     pub(super) before_stop: Option<Callback>,
72 
73     /// To run before each worker thread is parked.
74     pub(super) before_park: Option<Callback>,
75 
76     /// To run after each thread is unparked.
77     pub(super) after_unpark: Option<Callback>,
78 
79     /// Customizable keep alive timeout for BlockingPool
80     pub(super) keep_alive: Option<Duration>,
81 }
82 
83 pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
84 
85 pub(crate) enum Kind {
86     CurrentThread,
87     #[cfg(feature = "rt-multi-thread")]
88     MultiThread,
89 }
90 
91 impl Builder {
92     /// Returns a new builder with the current thread scheduler selected.
93     ///
94     /// Configuration methods can be chained on the return value.
95     ///
96     /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
97     /// [`LocalSet`].
98     ///
99     /// [`LocalSet`]: crate::task::LocalSet
new_current_thread() -> Builder100     pub fn new_current_thread() -> Builder {
101         Builder::new(Kind::CurrentThread)
102     }
103 
104     /// Returns a new builder with the multi thread scheduler selected.
105     ///
106     /// Configuration methods can be chained on the return value.
107     #[cfg(feature = "rt-multi-thread")]
108     #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
new_multi_thread() -> Builder109     pub fn new_multi_thread() -> Builder {
110         Builder::new(Kind::MultiThread)
111     }
112 
113     /// Returns a new runtime builder initialized with default configuration
114     /// values.
115     ///
116     /// Configuration methods can be chained on the return value.
new(kind: Kind) -> Builder117     pub(crate) fn new(kind: Kind) -> Builder {
118         Builder {
119             kind,
120 
121             // I/O defaults to "off"
122             enable_io: false,
123 
124             // Time defaults to "off"
125             enable_time: false,
126 
127             // The clock starts not-paused
128             start_paused: false,
129 
130             // Default to lazy auto-detection (one thread per CPU core)
131             worker_threads: None,
132 
133             max_blocking_threads: 512,
134 
135             // Default thread name
136             thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
137 
138             // Do not set a stack size by default
139             thread_stack_size: None,
140 
141             // No worker thread callbacks
142             after_start: None,
143             before_stop: None,
144             before_park: None,
145             after_unpark: None,
146 
147             keep_alive: None,
148         }
149     }
150 
151     /// Enables both I/O and time drivers.
152     ///
153     /// Doing this is a shorthand for calling `enable_io` and `enable_time`
154     /// individually. If additional components are added to Tokio in the future,
155     /// `enable_all` will include these future components.
156     ///
157     /// # Examples
158     ///
159     /// ```
160     /// use tokio::runtime;
161     ///
162     /// let rt = runtime::Builder::new_multi_thread()
163     ///     .enable_all()
164     ///     .build()
165     ///     .unwrap();
166     /// ```
enable_all(&mut self) -> &mut Self167     pub fn enable_all(&mut self) -> &mut Self {
168         #[cfg(any(feature = "net", feature = "process", all(unix, feature = "signal")))]
169         self.enable_io();
170         #[cfg(feature = "time")]
171         self.enable_time();
172 
173         self
174     }
175 
176     /// Sets the number of worker threads the `Runtime` will use.
177     ///
178     /// This can be any number above 0 though it is advised to keep this value
179     /// on the smaller side.
180     ///
181     /// # Default
182     ///
183     /// The default value is the number of cores available to the system.
184     ///
185     /// # Panic
186     ///
187     /// When using the `current_thread` runtime this method will panic, since
188     /// those variants do not allow setting worker thread counts.
189     ///
190     ///
191     /// # Examples
192     ///
193     /// ## Multi threaded runtime with 4 threads
194     ///
195     /// ```
196     /// use tokio::runtime;
197     ///
198     /// // This will spawn a work-stealing runtime with 4 worker threads.
199     /// let rt = runtime::Builder::new_multi_thread()
200     ///     .worker_threads(4)
201     ///     .build()
202     ///     .unwrap();
203     ///
204     /// rt.spawn(async move {});
205     /// ```
206     ///
207     /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
208     ///
209     /// ```
210     /// use tokio::runtime;
211     ///
212     /// // Create a runtime that _must_ be driven from a call
213     /// // to `Runtime::block_on`.
214     /// let rt = runtime::Builder::new_current_thread()
215     ///     .build()
216     ///     .unwrap();
217     ///
218     /// // This will run the runtime and future on the current thread
219     /// rt.block_on(async move {});
220     /// ```
221     ///
222     /// # Panic
223     ///
224     /// This will panic if `val` is not larger than `0`.
worker_threads(&mut self, val: usize) -> &mut Self225     pub fn worker_threads(&mut self, val: usize) -> &mut Self {
226         assert!(val > 0, "Worker threads cannot be set to 0");
227         self.worker_threads = Some(val);
228         self
229     }
230 
231     /// Specifies the limit for additional threads spawned by the Runtime.
232     ///
233     /// These threads are used for blocking operations like tasks spawned
234     /// through [`spawn_blocking`]. Unlike the [`worker_threads`], they are not
235     /// always active and will exit if left idle for too long. You can change
236     /// this timeout duration with [`thread_keep_alive`].
237     ///
238     /// The default value is 512.
239     ///
240     /// # Panic
241     ///
242     /// This will panic if `val` is not larger than `0`.
243     ///
244     /// # Upgrading from 0.x
245     ///
246     /// In old versions `max_threads` limited both blocking and worker threads, but the
247     /// current `max_blocking_threads` does not include async worker threads in the count.
248     ///
249     /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
250     /// [`worker_threads`]: Self::worker_threads
251     /// [`thread_keep_alive`]: Self::thread_keep_alive
252     #[cfg_attr(docsrs, doc(alias = "max_threads"))]
max_blocking_threads(&mut self, val: usize) -> &mut Self253     pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
254         assert!(val > 0, "Max blocking threads cannot be set to 0");
255         self.max_blocking_threads = val;
256         self
257     }
258 
259     /// Sets name of threads spawned by the `Runtime`'s thread pool.
260     ///
261     /// The default name is "tokio-runtime-worker".
262     ///
263     /// # Examples
264     ///
265     /// ```
266     /// # use tokio::runtime;
267     ///
268     /// # pub fn main() {
269     /// let rt = runtime::Builder::new_multi_thread()
270     ///     .thread_name("my-pool")
271     ///     .build();
272     /// # }
273     /// ```
thread_name(&mut self, val: impl Into<String>) -> &mut Self274     pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
275         let val = val.into();
276         self.thread_name = std::sync::Arc::new(move || val.clone());
277         self
278     }
279 
280     /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
281     ///
282     /// The default name fn is `|| "tokio-runtime-worker".into()`.
283     ///
284     /// # Examples
285     ///
286     /// ```
287     /// # use tokio::runtime;
288     /// # use std::sync::atomic::{AtomicUsize, Ordering};
289     ///
290     /// # pub fn main() {
291     /// let rt = runtime::Builder::new_multi_thread()
292     ///     .thread_name_fn(|| {
293     ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
294     ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
295     ///        format!("my-pool-{}", id)
296     ///     })
297     ///     .build();
298     /// # }
299     /// ```
thread_name_fn<F>(&mut self, f: F) -> &mut Self where F: Fn() -> String + Send + Sync + 'static,300     pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
301     where
302         F: Fn() -> String + Send + Sync + 'static,
303     {
304         self.thread_name = std::sync::Arc::new(f);
305         self
306     }
307 
308     /// Sets the stack size (in bytes) for worker threads.
309     ///
310     /// The actual stack size may be greater than this value if the platform
311     /// specifies minimal stack size.
312     ///
313     /// The default stack size for spawned threads is 2 MiB, though this
314     /// particular stack size is subject to change in the future.
315     ///
316     /// # Examples
317     ///
318     /// ```
319     /// # use tokio::runtime;
320     ///
321     /// # pub fn main() {
322     /// let rt = runtime::Builder::new_multi_thread()
323     ///     .thread_stack_size(32 * 1024)
324     ///     .build();
325     /// # }
326     /// ```
thread_stack_size(&mut self, val: usize) -> &mut Self327     pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
328         self.thread_stack_size = Some(val);
329         self
330     }
331 
332     /// Executes function `f` after each thread is started but before it starts
333     /// doing work.
334     ///
335     /// This is intended for bookkeeping and monitoring use cases.
336     ///
337     /// # Examples
338     ///
339     /// ```
340     /// # use tokio::runtime;
341     ///
342     /// # pub fn main() {
343     /// let runtime = runtime::Builder::new_multi_thread()
344     ///     .on_thread_start(|| {
345     ///         println!("thread started");
346     ///     })
347     ///     .build();
348     /// # }
349     /// ```
350     #[cfg(not(loom))]
on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,351     pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
352     where
353         F: Fn() + Send + Sync + 'static,
354     {
355         self.after_start = Some(std::sync::Arc::new(f));
356         self
357     }
358 
359     /// Executes function `f` before each thread stops.
360     ///
361     /// This is intended for bookkeeping and monitoring use cases.
362     ///
363     /// # Examples
364     ///
365     /// ```
366     /// # use tokio::runtime;
367     ///
368     /// # pub fn main() {
369     /// let runtime = runtime::Builder::new_multi_thread()
370     ///     .on_thread_stop(|| {
371     ///         println!("thread stopping");
372     ///     })
373     ///     .build();
374     /// # }
375     /// ```
376     #[cfg(not(loom))]
on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,377     pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
378     where
379         F: Fn() + Send + Sync + 'static,
380     {
381         self.before_stop = Some(std::sync::Arc::new(f));
382         self
383     }
384 
385     /// Executes function `f` just before a thread is parked (goes idle).
386     /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
387     /// can be called, and may result in this thread being unparked immediately.
388     ///
389     /// This can be used to start work only when the executor is idle, or for bookkeeping
390     /// and monitoring purposes.
391     ///
392     /// Note: There can only be one park callback for a runtime; calling this function
393     /// more than once replaces the last callback defined, rather than adding to it.
394     ///
395     /// # Examples
396     ///
397     /// ## Multithreaded executor
398     /// ```
399     /// # use std::sync::Arc;
400     /// # use std::sync::atomic::{AtomicBool, Ordering};
401     /// # use tokio::runtime;
402     /// # use tokio::sync::Barrier;
403     /// # pub fn main() {
404     /// let once = AtomicBool::new(true);
405     /// let barrier = Arc::new(Barrier::new(2));
406     ///
407     /// let runtime = runtime::Builder::new_multi_thread()
408     ///     .worker_threads(1)
409     ///     .on_thread_park({
410     ///         let barrier = barrier.clone();
411     ///         move || {
412     ///             let barrier = barrier.clone();
413     ///             if once.swap(false, Ordering::Relaxed) {
414     ///                 tokio::spawn(async move { barrier.wait().await; });
415     ///            }
416     ///         }
417     ///     })
418     ///     .build()
419     ///     .unwrap();
420     ///
421     /// runtime.block_on(async {
422     ///    barrier.wait().await;
423     /// })
424     /// # }
425     /// ```
426     /// ## Current thread executor
427     /// ```
428     /// # use std::sync::Arc;
429     /// # use std::sync::atomic::{AtomicBool, Ordering};
430     /// # use tokio::runtime;
431     /// # use tokio::sync::Barrier;
432     /// # pub fn main() {
433     /// let once = AtomicBool::new(true);
434     /// let barrier = Arc::new(Barrier::new(2));
435     ///
436     /// let runtime = runtime::Builder::new_current_thread()
437     ///     .on_thread_park({
438     ///         let barrier = barrier.clone();
439     ///         move || {
440     ///             let barrier = barrier.clone();
441     ///             if once.swap(false, Ordering::Relaxed) {
442     ///                 tokio::spawn(async move { barrier.wait().await; });
443     ///            }
444     ///         }
445     ///     })
446     ///     .build()
447     ///     .unwrap();
448     ///
449     /// runtime.block_on(async {
450     ///    barrier.wait().await;
451     /// })
452     /// # }
453     /// ```
454     #[cfg(not(loom))]
on_thread_park<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,455     pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
456     where
457         F: Fn() + Send + Sync + 'static,
458     {
459         self.before_park = Some(std::sync::Arc::new(f));
460         self
461     }
462 
463     /// Executes function `f` just after a thread unparks (starts executing tasks).
464     ///
465     /// This is intended for bookkeeping and monitoring use cases; note that work
466     /// in this callback will increase latencies when the application has allowed one or
467     /// more runtime threads to go idle.
468     ///
469     /// Note: There can only be one unpark callback for a runtime; calling this function
470     /// more than once replaces the last callback defined, rather than adding to it.
471     ///
472     /// # Examples
473     ///
474     /// ```
475     /// # use tokio::runtime;
476     ///
477     /// # pub fn main() {
478     /// let runtime = runtime::Builder::new_multi_thread()
479     ///     .on_thread_unpark(|| {
480     ///         println!("thread unparking");
481     ///     })
482     ///     .build();
483     ///
484     /// runtime.unwrap().block_on(async {
485     ///    tokio::task::yield_now().await;
486     ///    println!("Hello from Tokio!");
487     /// })
488     /// # }
489     /// ```
490     #[cfg(not(loom))]
on_thread_unpark<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,491     pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
492     where
493         F: Fn() + Send + Sync + 'static,
494     {
495         self.after_unpark = Some(std::sync::Arc::new(f));
496         self
497     }
498 
499     /// Creates the configured `Runtime`.
500     ///
501     /// The returned `Runtime` instance is ready to spawn tasks.
502     ///
503     /// # Examples
504     ///
505     /// ```
506     /// use tokio::runtime::Builder;
507     ///
508     /// let rt  = Builder::new_multi_thread().build().unwrap();
509     ///
510     /// rt.block_on(async {
511     ///     println!("Hello from the Tokio runtime");
512     /// });
513     /// ```
build(&mut self) -> io::Result<Runtime>514     pub fn build(&mut self) -> io::Result<Runtime> {
515         match &self.kind {
516             Kind::CurrentThread => self.build_basic_runtime(),
517             #[cfg(feature = "rt-multi-thread")]
518             Kind::MultiThread => self.build_threaded_runtime(),
519         }
520     }
521 
get_cfg(&self) -> driver::Cfg522     fn get_cfg(&self) -> driver::Cfg {
523         driver::Cfg {
524             enable_pause_time: match self.kind {
525                 Kind::CurrentThread => true,
526                 #[cfg(feature = "rt-multi-thread")]
527                 Kind::MultiThread => false,
528             },
529             enable_io: self.enable_io,
530             enable_time: self.enable_time,
531             start_paused: self.start_paused,
532         }
533     }
534 
535     /// Sets a custom timeout for a thread in the blocking pool.
536     ///
537     /// By default, the timeout for a thread is set to 10 seconds. This can
538     /// be overridden using .thread_keep_alive().
539     ///
540     /// # Example
541     ///
542     /// ```
543     /// # use tokio::runtime;
544     /// # use std::time::Duration;
545     ///
546     /// # pub fn main() {
547     /// let rt = runtime::Builder::new_multi_thread()
548     ///     .thread_keep_alive(Duration::from_millis(100))
549     ///     .build();
550     /// # }
551     /// ```
thread_keep_alive(&mut self, duration: Duration) -> &mut Self552     pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
553         self.keep_alive = Some(duration);
554         self
555     }
556 
build_basic_runtime(&mut self) -> io::Result<Runtime>557     fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
558         use crate::runtime::{BasicScheduler, Kind};
559 
560         let (driver, resources) = driver::Driver::new(self.get_cfg())?;
561 
562         // And now put a single-threaded scheduler on top of the timer. When
563         // there are no futures ready to do something, it'll let the timer or
564         // the reactor to generate some new stimuli for the futures to continue
565         // in their life.
566         let scheduler =
567             BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone());
568         let spawner = Spawner::Basic(scheduler.spawner().clone());
569 
570         // Blocking pool
571         let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
572         let blocking_spawner = blocking_pool.spawner().clone();
573 
574         Ok(Runtime {
575             kind: Kind::CurrentThread(scheduler),
576             handle: Handle {
577                 spawner,
578                 io_handle: resources.io_handle,
579                 time_handle: resources.time_handle,
580                 signal_handle: resources.signal_handle,
581                 clock: resources.clock,
582                 blocking_spawner,
583             },
584             blocking_pool,
585         })
586     }
587 }
588 
589 cfg_io_driver! {
590     impl Builder {
591         /// Enables the I/O driver.
592         ///
593         /// Doing this enables using net, process, signal, and some I/O types on
594         /// the runtime.
595         ///
596         /// # Examples
597         ///
598         /// ```
599         /// use tokio::runtime;
600         ///
601         /// let rt = runtime::Builder::new_multi_thread()
602         ///     .enable_io()
603         ///     .build()
604         ///     .unwrap();
605         /// ```
606         pub fn enable_io(&mut self) -> &mut Self {
607             self.enable_io = true;
608             self
609         }
610     }
611 }
612 
613 cfg_time! {
614     impl Builder {
615         /// Enables the time driver.
616         ///
617         /// Doing this enables using `tokio::time` on the runtime.
618         ///
619         /// # Examples
620         ///
621         /// ```
622         /// use tokio::runtime;
623         ///
624         /// let rt = runtime::Builder::new_multi_thread()
625         ///     .enable_time()
626         ///     .build()
627         ///     .unwrap();
628         /// ```
629         pub fn enable_time(&mut self) -> &mut Self {
630             self.enable_time = true;
631             self
632         }
633     }
634 }
635 
636 cfg_test_util! {
637     impl Builder {
638         /// Controls if the runtime's clock starts paused or advancing.
639         ///
640         /// Pausing time requires the current-thread runtime; construction of
641         /// the runtime will panic otherwise.
642         ///
643         /// # Examples
644         ///
645         /// ```
646         /// use tokio::runtime;
647         ///
648         /// let rt = runtime::Builder::new_current_thread()
649         ///     .enable_time()
650         ///     .start_paused(true)
651         ///     .build()
652         ///     .unwrap();
653         /// ```
654         pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
655             self.start_paused = start_paused;
656             self
657         }
658     }
659 }
660 
661 cfg_rt_multi_thread! {
662     impl Builder {
663         fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
664             use crate::loom::sys::num_cpus;
665             use crate::runtime::{Kind, ThreadPool};
666             use crate::runtime::park::Parker;
667 
668             let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
669 
670             let (driver, resources) = driver::Driver::new(self.get_cfg())?;
671 
672             let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone());
673             let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
674 
675             // Create the blocking pool
676             let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
677             let blocking_spawner = blocking_pool.spawner().clone();
678 
679             // Create the runtime handle
680             let handle = Handle {
681                 spawner,
682                 io_handle: resources.io_handle,
683                 time_handle: resources.time_handle,
684                 signal_handle: resources.signal_handle,
685                 clock: resources.clock,
686                 blocking_spawner,
687             };
688 
689             // Spawn the thread pool workers
690             let _enter = crate::runtime::context::enter(handle.clone());
691             launch.launch();
692 
693             Ok(Runtime {
694                 kind: Kind::ThreadPool(scheduler),
695                 handle,
696                 blocking_pool,
697             })
698         }
699     }
700 }
701 
702 impl fmt::Debug for Builder {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result703     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
704         fmt.debug_struct("Builder")
705             .field("worker_threads", &self.worker_threads)
706             .field("max_blocking_threads", &self.max_blocking_threads)
707             .field(
708                 "thread_name",
709                 &"<dyn Fn() -> String + Send + Sync + 'static>",
710             )
711             .field("thread_stack_size", &self.thread_stack_size)
712             .field("after_start", &self.after_start.as_ref().map(|_| "..."))
713             .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
714             .field("before_park", &self.before_park.as_ref().map(|_| "..."))
715             .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
716             .finish()
717     }
718 }
719