1 use crate::runtime::handle::Handle; 2 use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; 3 4 use std::fmt; 5 use std::io; 6 use std::time::Duration; 7 8 /// Builds Tokio Runtime with custom configuration values. 9 /// 10 /// Methods can be chained in order to set the configuration values. The 11 /// Runtime is constructed by calling [`build`]. 12 /// 13 /// New instances of `Builder` are obtained via [`Builder::new_multi_thread`] 14 /// or [`Builder::new_current_thread`]. 15 /// 16 /// See function level documentation for details on the various configuration 17 /// settings. 18 /// 19 /// [`build`]: method@Self::build 20 /// [`Builder::new_multi_thread`]: method@Self::new_multi_thread 21 /// [`Builder::new_current_thread`]: method@Self::new_current_thread 22 /// 23 /// # Examples 24 /// 25 /// ``` 26 /// use tokio::runtime::Builder; 27 /// 28 /// fn main() { 29 /// // build runtime 30 /// let runtime = Builder::new_multi_thread() 31 /// .worker_threads(4) 32 /// .thread_name("my-custom-name") 33 /// .thread_stack_size(3 * 1024 * 1024) 34 /// .build() 35 /// .unwrap(); 36 /// 37 /// // use runtime ... 38 /// } 39 /// ``` 40 pub struct Builder { 41 /// Runtime type 42 kind: Kind, 43 44 /// Whether or not to enable the I/O driver 45 enable_io: bool, 46 47 /// Whether or not to enable the time driver 48 enable_time: bool, 49 50 /// Whether or not the clock should start paused. 51 start_paused: bool, 52 53 /// The number of worker threads, used by Runtime. 54 /// 55 /// Only used when not using the current-thread executor. 56 worker_threads: Option<usize>, 57 58 /// Cap on thread usage. 59 max_blocking_threads: usize, 60 61 /// Name fn used for threads spawned by the runtime. 62 pub(super) thread_name: ThreadNameFn, 63 64 /// Stack size used for threads spawned by the runtime. 65 pub(super) thread_stack_size: Option<usize>, 66 67 /// Callback to run after each thread starts. 68 pub(super) after_start: Option<Callback>, 69 70 /// To run before each worker thread stops 71 pub(super) before_stop: Option<Callback>, 72 73 /// Customizable keep alive timeout for BlockingPool 74 pub(super) keep_alive: Option<Duration>, 75 } 76 77 pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>; 78 79 pub(crate) enum Kind { 80 CurrentThread, 81 #[cfg(feature = "rt-multi-thread")] 82 MultiThread, 83 } 84 85 impl Builder { 86 /// Returns a new builder with the current thread scheduler selected. 87 /// 88 /// Configuration methods can be chained on the return value. 89 /// 90 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a 91 /// [`LocalSet`]. 92 /// 93 /// [`LocalSet`]: crate::task::LocalSet new_current_thread() -> Builder94 pub fn new_current_thread() -> Builder { 95 Builder::new(Kind::CurrentThread) 96 } 97 98 /// Returns a new builder with the multi thread scheduler selected. 99 /// 100 /// Configuration methods can be chained on the return value. 101 #[cfg(feature = "rt-multi-thread")] 102 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] new_multi_thread() -> Builder103 pub fn new_multi_thread() -> Builder { 104 Builder::new(Kind::MultiThread) 105 } 106 107 /// Returns a new runtime builder initialized with default configuration 108 /// values. 109 /// 110 /// Configuration methods can be chained on the return value. new(kind: Kind) -> Builder111 pub(crate) fn new(kind: Kind) -> Builder { 112 Builder { 113 kind, 114 115 // I/O defaults to "off" 116 enable_io: false, 117 118 // Time defaults to "off" 119 enable_time: false, 120 121 // The clock starts not-paused 122 start_paused: false, 123 124 // Default to lazy auto-detection (one thread per CPU core) 125 worker_threads: None, 126 127 max_blocking_threads: 512, 128 129 // Default thread name 130 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()), 131 132 // Do not set a stack size by default 133 thread_stack_size: None, 134 135 // No worker thread callbacks 136 after_start: None, 137 before_stop: None, 138 139 keep_alive: None, 140 } 141 } 142 143 /// Enables both I/O and time drivers. 144 /// 145 /// Doing this is a shorthand for calling `enable_io` and `enable_time` 146 /// individually. If additional components are added to Tokio in the future, 147 /// `enable_all` will include these future components. 148 /// 149 /// # Examples 150 /// 151 /// ``` 152 /// use tokio::runtime; 153 /// 154 /// let rt = runtime::Builder::new_multi_thread() 155 /// .enable_all() 156 /// .build() 157 /// .unwrap(); 158 /// ``` enable_all(&mut self) -> &mut Self159 pub fn enable_all(&mut self) -> &mut Self { 160 #[cfg(any(feature = "net", feature = "process", all(unix, feature = "signal")))] 161 self.enable_io(); 162 #[cfg(feature = "time")] 163 self.enable_time(); 164 165 self 166 } 167 168 /// Sets the number of worker threads the `Runtime` will use. 169 /// 170 /// This can be any number above 0 though it is advised to keep this value 171 /// on the smaller side. 172 /// 173 /// # Default 174 /// 175 /// The default value is the number of cores available to the system. 176 /// 177 /// # Panic 178 /// 179 /// When using the `current_thread` runtime this method will panic, since 180 /// those variants do not allow setting worker thread counts. 181 /// 182 /// 183 /// # Examples 184 /// 185 /// ## Multi threaded runtime with 4 threads 186 /// 187 /// ``` 188 /// use tokio::runtime; 189 /// 190 /// // This will spawn a work-stealing runtime with 4 worker threads. 191 /// let rt = runtime::Builder::new_multi_thread() 192 /// .worker_threads(4) 193 /// .build() 194 /// .unwrap(); 195 /// 196 /// rt.spawn(async move {}); 197 /// ``` 198 /// 199 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`) 200 /// 201 /// ``` 202 /// use tokio::runtime; 203 /// 204 /// // Create a runtime that _must_ be driven from a call 205 /// // to `Runtime::block_on`. 206 /// let rt = runtime::Builder::new_current_thread() 207 /// .build() 208 /// .unwrap(); 209 /// 210 /// // This will run the runtime and future on the current thread 211 /// rt.block_on(async move {}); 212 /// ``` 213 /// 214 /// # Panic 215 /// 216 /// This will panic if `val` is not larger than `0`. worker_threads(&mut self, val: usize) -> &mut Self217 pub fn worker_threads(&mut self, val: usize) -> &mut Self { 218 assert!(val > 0, "Worker threads cannot be set to 0"); 219 self.worker_threads = Some(val); 220 self 221 } 222 223 /// Specifies the limit for additional threads spawned by the Runtime. 224 /// 225 /// These threads are used for blocking operations like tasks spawned 226 /// through [`spawn_blocking`]. Unlike the [`worker_threads`], they are not 227 /// always active and will exit if left idle for too long. You can change 228 /// this timeout duration with [`thread_keep_alive`]. 229 /// 230 /// The default value is 512. 231 /// 232 /// # Panic 233 /// 234 /// This will panic if `val` is not larger than `0`. 235 /// 236 /// # Upgrading from 0.x 237 /// 238 /// In old versions `max_threads` limited both blocking and worker threads, but the 239 /// current `max_blocking_threads` does not include async worker threads in the count. 240 /// 241 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking 242 /// [`worker_threads`]: Self::worker_threads 243 /// [`thread_keep_alive`]: Self::thread_keep_alive 244 #[cfg_attr(docsrs, doc(alias = "max_threads"))] max_blocking_threads(&mut self, val: usize) -> &mut Self245 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self { 246 assert!(val > 0, "Max blocking threads cannot be set to 0"); 247 self.max_blocking_threads = val; 248 self 249 } 250 251 /// Sets name of threads spawned by the `Runtime`'s thread pool. 252 /// 253 /// The default name is "tokio-runtime-worker". 254 /// 255 /// # Examples 256 /// 257 /// ``` 258 /// # use tokio::runtime; 259 /// 260 /// # pub fn main() { 261 /// let rt = runtime::Builder::new_multi_thread() 262 /// .thread_name("my-pool") 263 /// .build(); 264 /// # } 265 /// ``` thread_name(&mut self, val: impl Into<String>) -> &mut Self266 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { 267 let val = val.into(); 268 self.thread_name = std::sync::Arc::new(move || val.clone()); 269 self 270 } 271 272 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool. 273 /// 274 /// The default name fn is `|| "tokio-runtime-worker".into()`. 275 /// 276 /// # Examples 277 /// 278 /// ``` 279 /// # use tokio::runtime; 280 /// # use std::sync::atomic::{AtomicUsize, Ordering}; 281 /// 282 /// # pub fn main() { 283 /// let rt = runtime::Builder::new_multi_thread() 284 /// .thread_name_fn(|| { 285 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); 286 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); 287 /// format!("my-pool-{}", id) 288 /// }) 289 /// .build(); 290 /// # } 291 /// ``` thread_name_fn<F>(&mut self, f: F) -> &mut Self where F: Fn() -> String + Send + Sync + 'static,292 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self 293 where 294 F: Fn() -> String + Send + Sync + 'static, 295 { 296 self.thread_name = std::sync::Arc::new(f); 297 self 298 } 299 300 /// Sets the stack size (in bytes) for worker threads. 301 /// 302 /// The actual stack size may be greater than this value if the platform 303 /// specifies minimal stack size. 304 /// 305 /// The default stack size for spawned threads is 2 MiB, though this 306 /// particular stack size is subject to change in the future. 307 /// 308 /// # Examples 309 /// 310 /// ``` 311 /// # use tokio::runtime; 312 /// 313 /// # pub fn main() { 314 /// let rt = runtime::Builder::new_multi_thread() 315 /// .thread_stack_size(32 * 1024) 316 /// .build(); 317 /// # } 318 /// ``` thread_stack_size(&mut self, val: usize) -> &mut Self319 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self { 320 self.thread_stack_size = Some(val); 321 self 322 } 323 324 /// Executes function `f` after each thread is started but before it starts 325 /// doing work. 326 /// 327 /// This is intended for bookkeeping and monitoring use cases. 328 /// 329 /// # Examples 330 /// 331 /// ``` 332 /// # use tokio::runtime; 333 /// 334 /// # pub fn main() { 335 /// let runtime = runtime::Builder::new_multi_thread() 336 /// .on_thread_start(|| { 337 /// println!("thread started"); 338 /// }) 339 /// .build(); 340 /// # } 341 /// ``` 342 #[cfg(not(loom))] on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,343 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self 344 where 345 F: Fn() + Send + Sync + 'static, 346 { 347 self.after_start = Some(std::sync::Arc::new(f)); 348 self 349 } 350 351 /// Executes function `f` before each thread stops. 352 /// 353 /// This is intended for bookkeeping and monitoring use cases. 354 /// 355 /// # Examples 356 /// 357 /// ``` 358 /// # use tokio::runtime; 359 /// 360 /// # pub fn main() { 361 /// let runtime = runtime::Builder::new_multi_thread() 362 /// .on_thread_stop(|| { 363 /// println!("thread stopping"); 364 /// }) 365 /// .build(); 366 /// # } 367 /// ``` 368 #[cfg(not(loom))] on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,369 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self 370 where 371 F: Fn() + Send + Sync + 'static, 372 { 373 self.before_stop = Some(std::sync::Arc::new(f)); 374 self 375 } 376 377 /// Creates the configured `Runtime`. 378 /// 379 /// The returned `Runtime` instance is ready to spawn tasks. 380 /// 381 /// # Examples 382 /// 383 /// ``` 384 /// use tokio::runtime::Builder; 385 /// 386 /// let rt = Builder::new_multi_thread().build().unwrap(); 387 /// 388 /// rt.block_on(async { 389 /// println!("Hello from the Tokio runtime"); 390 /// }); 391 /// ``` build(&mut self) -> io::Result<Runtime>392 pub fn build(&mut self) -> io::Result<Runtime> { 393 match &self.kind { 394 Kind::CurrentThread => self.build_basic_runtime(), 395 #[cfg(feature = "rt-multi-thread")] 396 Kind::MultiThread => self.build_threaded_runtime(), 397 } 398 } 399 get_cfg(&self) -> driver::Cfg400 fn get_cfg(&self) -> driver::Cfg { 401 driver::Cfg { 402 enable_pause_time: match self.kind { 403 Kind::CurrentThread => true, 404 #[cfg(feature = "rt-multi-thread")] 405 Kind::MultiThread => false, 406 }, 407 enable_io: self.enable_io, 408 enable_time: self.enable_time, 409 start_paused: self.start_paused, 410 } 411 } 412 413 /// Sets a custom timeout for a thread in the blocking pool. 414 /// 415 /// By default, the timeout for a thread is set to 10 seconds. This can 416 /// be overridden using .thread_keep_alive(). 417 /// 418 /// # Example 419 /// 420 /// ``` 421 /// # use tokio::runtime; 422 /// # use std::time::Duration; 423 /// 424 /// # pub fn main() { 425 /// let rt = runtime::Builder::new_multi_thread() 426 /// .thread_keep_alive(Duration::from_millis(100)) 427 /// .build(); 428 /// # } 429 /// ``` thread_keep_alive(&mut self, duration: Duration) -> &mut Self430 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self { 431 self.keep_alive = Some(duration); 432 self 433 } 434 build_basic_runtime(&mut self) -> io::Result<Runtime>435 fn build_basic_runtime(&mut self) -> io::Result<Runtime> { 436 use crate::runtime::{BasicScheduler, Kind}; 437 438 let (driver, resources) = driver::Driver::new(self.get_cfg())?; 439 440 // And now put a single-threaded scheduler on top of the timer. When 441 // there are no futures ready to do something, it'll let the timer or 442 // the reactor to generate some new stimuli for the futures to continue 443 // in their life. 444 let scheduler = BasicScheduler::new(driver); 445 let spawner = Spawner::Basic(scheduler.spawner().clone()); 446 447 // Blocking pool 448 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); 449 let blocking_spawner = blocking_pool.spawner().clone(); 450 451 Ok(Runtime { 452 kind: Kind::CurrentThread(scheduler), 453 handle: Handle { 454 spawner, 455 io_handle: resources.io_handle, 456 time_handle: resources.time_handle, 457 signal_handle: resources.signal_handle, 458 clock: resources.clock, 459 blocking_spawner, 460 }, 461 blocking_pool, 462 }) 463 } 464 } 465 466 cfg_io_driver! { 467 impl Builder { 468 /// Enables the I/O driver. 469 /// 470 /// Doing this enables using net, process, signal, and some I/O types on 471 /// the runtime. 472 /// 473 /// # Examples 474 /// 475 /// ``` 476 /// use tokio::runtime; 477 /// 478 /// let rt = runtime::Builder::new_multi_thread() 479 /// .enable_io() 480 /// .build() 481 /// .unwrap(); 482 /// ``` 483 pub fn enable_io(&mut self) -> &mut Self { 484 self.enable_io = true; 485 self 486 } 487 } 488 } 489 490 cfg_time! { 491 impl Builder { 492 /// Enables the time driver. 493 /// 494 /// Doing this enables using `tokio::time` on the runtime. 495 /// 496 /// # Examples 497 /// 498 /// ``` 499 /// use tokio::runtime; 500 /// 501 /// let rt = runtime::Builder::new_multi_thread() 502 /// .enable_time() 503 /// .build() 504 /// .unwrap(); 505 /// ``` 506 pub fn enable_time(&mut self) -> &mut Self { 507 self.enable_time = true; 508 self 509 } 510 } 511 } 512 513 cfg_test_util! { 514 impl Builder { 515 /// Controls if the runtime's clock starts paused or advancing. 516 /// 517 /// Pausing time requires the current-thread runtime; construction of 518 /// the runtime will panic otherwise. 519 /// 520 /// # Examples 521 /// 522 /// ``` 523 /// use tokio::runtime; 524 /// 525 /// let rt = runtime::Builder::new_current_thread() 526 /// .enable_time() 527 /// .start_paused(true) 528 /// .build() 529 /// .unwrap(); 530 /// ``` 531 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self { 532 self.start_paused = start_paused; 533 self 534 } 535 } 536 } 537 538 cfg_rt_multi_thread! { 539 impl Builder { 540 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { 541 use crate::loom::sys::num_cpus; 542 use crate::runtime::{Kind, ThreadPool}; 543 use crate::runtime::park::Parker; 544 545 let core_threads = self.worker_threads.unwrap_or_else(num_cpus); 546 547 let (driver, resources) = driver::Driver::new(self.get_cfg())?; 548 549 let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); 550 let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); 551 552 // Create the blocking pool 553 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); 554 let blocking_spawner = blocking_pool.spawner().clone(); 555 556 // Create the runtime handle 557 let handle = Handle { 558 spawner, 559 io_handle: resources.io_handle, 560 time_handle: resources.time_handle, 561 signal_handle: resources.signal_handle, 562 clock: resources.clock, 563 blocking_spawner, 564 }; 565 566 // Spawn the thread pool workers 567 let _enter = crate::runtime::context::enter(handle.clone()); 568 launch.launch(); 569 570 Ok(Runtime { 571 kind: Kind::ThreadPool(scheduler), 572 handle, 573 blocking_pool, 574 }) 575 } 576 } 577 } 578 579 impl fmt::Debug for Builder { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result580 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 581 fmt.debug_struct("Builder") 582 .field("worker_threads", &self.worker_threads) 583 .field("max_blocking_threads", &self.max_blocking_threads) 584 .field( 585 "thread_name", 586 &"<dyn Fn() -> String + Send + Sync + 'static>", 587 ) 588 .field("thread_stack_size", &self.thread_stack_size) 589 .field("after_start", &self.after_start.as_ref().map(|_| "...")) 590 .field("before_stop", &self.after_start.as_ref().map(|_| "...")) 591 .finish() 592 } 593 } 594