1 use crate::runtime::handle::Handle; 2 use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; 3 4 use std::fmt; 5 use std::io; 6 use std::time::Duration; 7 8 /// Builds Tokio Runtime with custom configuration values. 9 /// 10 /// Methods can be chained in order to set the configuration values. The 11 /// Runtime is constructed by calling [`build`]. 12 /// 13 /// New instances of `Builder` are obtained via [`Builder::new_multi_thread`] 14 /// or [`Builder::new_current_thread`]. 15 /// 16 /// See function level documentation for details on the various configuration 17 /// settings. 18 /// 19 /// [`build`]: method@Self::build 20 /// [`Builder::new_multi_thread`]: method@Self::new_multi_thread 21 /// [`Builder::new_current_thread`]: method@Self::new_current_thread 22 /// 23 /// # Examples 24 /// 25 /// ``` 26 /// use tokio::runtime::Builder; 27 /// 28 /// fn main() { 29 /// // build runtime 30 /// let runtime = Builder::new_multi_thread() 31 /// .worker_threads(4) 32 /// .thread_name("my-custom-name") 33 /// .thread_stack_size(3 * 1024 * 1024) 34 /// .build() 35 /// .unwrap(); 36 /// 37 /// // use runtime ... 38 /// } 39 /// ``` 40 pub struct Builder { 41 /// Runtime type 42 kind: Kind, 43 44 /// Whether or not to enable the I/O driver 45 enable_io: bool, 46 47 /// Whether or not to enable the time driver 48 enable_time: bool, 49 50 /// Whether or not the clock should start paused. 51 start_paused: bool, 52 53 /// The number of worker threads, used by Runtime. 54 /// 55 /// Only used when not using the current-thread executor. 56 worker_threads: Option<usize>, 57 58 /// Cap on thread usage. 59 max_blocking_threads: usize, 60 61 /// Name fn used for threads spawned by the runtime. 62 pub(super) thread_name: ThreadNameFn, 63 64 /// Stack size used for threads spawned by the runtime. 65 pub(super) thread_stack_size: Option<usize>, 66 67 /// Callback to run after each thread starts. 68 pub(super) after_start: Option<Callback>, 69 70 /// To run before each worker thread stops 71 pub(super) before_stop: Option<Callback>, 72 73 /// To run before each worker thread is parked. 74 pub(super) before_park: Option<Callback>, 75 76 /// To run after each thread is unparked. 77 pub(super) after_unpark: Option<Callback>, 78 79 /// Customizable keep alive timeout for BlockingPool 80 pub(super) keep_alive: Option<Duration>, 81 } 82 83 pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>; 84 85 pub(crate) enum Kind { 86 CurrentThread, 87 #[cfg(feature = "rt-multi-thread")] 88 MultiThread, 89 } 90 91 impl Builder { 92 /// Returns a new builder with the current thread scheduler selected. 93 /// 94 /// Configuration methods can be chained on the return value. 95 /// 96 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a 97 /// [`LocalSet`]. 98 /// 99 /// [`LocalSet`]: crate::task::LocalSet new_current_thread() -> Builder100 pub fn new_current_thread() -> Builder { 101 Builder::new(Kind::CurrentThread) 102 } 103 104 /// Returns a new builder with the multi thread scheduler selected. 105 /// 106 /// Configuration methods can be chained on the return value. 107 #[cfg(feature = "rt-multi-thread")] 108 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] new_multi_thread() -> Builder109 pub fn new_multi_thread() -> Builder { 110 Builder::new(Kind::MultiThread) 111 } 112 113 /// Returns a new runtime builder initialized with default configuration 114 /// values. 115 /// 116 /// Configuration methods can be chained on the return value. new(kind: Kind) -> Builder117 pub(crate) fn new(kind: Kind) -> Builder { 118 Builder { 119 kind, 120 121 // I/O defaults to "off" 122 enable_io: false, 123 124 // Time defaults to "off" 125 enable_time: false, 126 127 // The clock starts not-paused 128 start_paused: false, 129 130 // Default to lazy auto-detection (one thread per CPU core) 131 worker_threads: None, 132 133 max_blocking_threads: 512, 134 135 // Default thread name 136 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()), 137 138 // Do not set a stack size by default 139 thread_stack_size: None, 140 141 // No worker thread callbacks 142 after_start: None, 143 before_stop: None, 144 before_park: None, 145 after_unpark: None, 146 147 keep_alive: None, 148 } 149 } 150 151 /// Enables both I/O and time drivers. 152 /// 153 /// Doing this is a shorthand for calling `enable_io` and `enable_time` 154 /// individually. If additional components are added to Tokio in the future, 155 /// `enable_all` will include these future components. 156 /// 157 /// # Examples 158 /// 159 /// ``` 160 /// use tokio::runtime; 161 /// 162 /// let rt = runtime::Builder::new_multi_thread() 163 /// .enable_all() 164 /// .build() 165 /// .unwrap(); 166 /// ``` enable_all(&mut self) -> &mut Self167 pub fn enable_all(&mut self) -> &mut Self { 168 #[cfg(any(feature = "net", feature = "process", all(unix, feature = "signal")))] 169 self.enable_io(); 170 #[cfg(feature = "time")] 171 self.enable_time(); 172 173 self 174 } 175 176 /// Sets the number of worker threads the `Runtime` will use. 177 /// 178 /// This can be any number above 0 though it is advised to keep this value 179 /// on the smaller side. 180 /// 181 /// # Default 182 /// 183 /// The default value is the number of cores available to the system. 184 /// 185 /// # Panic 186 /// 187 /// When using the `current_thread` runtime this method will panic, since 188 /// those variants do not allow setting worker thread counts. 189 /// 190 /// 191 /// # Examples 192 /// 193 /// ## Multi threaded runtime with 4 threads 194 /// 195 /// ``` 196 /// use tokio::runtime; 197 /// 198 /// // This will spawn a work-stealing runtime with 4 worker threads. 199 /// let rt = runtime::Builder::new_multi_thread() 200 /// .worker_threads(4) 201 /// .build() 202 /// .unwrap(); 203 /// 204 /// rt.spawn(async move {}); 205 /// ``` 206 /// 207 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`) 208 /// 209 /// ``` 210 /// use tokio::runtime; 211 /// 212 /// // Create a runtime that _must_ be driven from a call 213 /// // to `Runtime::block_on`. 214 /// let rt = runtime::Builder::new_current_thread() 215 /// .build() 216 /// .unwrap(); 217 /// 218 /// // This will run the runtime and future on the current thread 219 /// rt.block_on(async move {}); 220 /// ``` 221 /// 222 /// # Panic 223 /// 224 /// This will panic if `val` is not larger than `0`. worker_threads(&mut self, val: usize) -> &mut Self225 pub fn worker_threads(&mut self, val: usize) -> &mut Self { 226 assert!(val > 0, "Worker threads cannot be set to 0"); 227 self.worker_threads = Some(val); 228 self 229 } 230 231 /// Specifies the limit for additional threads spawned by the Runtime. 232 /// 233 /// These threads are used for blocking operations like tasks spawned 234 /// through [`spawn_blocking`]. Unlike the [`worker_threads`], they are not 235 /// always active and will exit if left idle for too long. You can change 236 /// this timeout duration with [`thread_keep_alive`]. 237 /// 238 /// The default value is 512. 239 /// 240 /// # Panic 241 /// 242 /// This will panic if `val` is not larger than `0`. 243 /// 244 /// # Upgrading from 0.x 245 /// 246 /// In old versions `max_threads` limited both blocking and worker threads, but the 247 /// current `max_blocking_threads` does not include async worker threads in the count. 248 /// 249 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking 250 /// [`worker_threads`]: Self::worker_threads 251 /// [`thread_keep_alive`]: Self::thread_keep_alive 252 #[cfg_attr(docsrs, doc(alias = "max_threads"))] max_blocking_threads(&mut self, val: usize) -> &mut Self253 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self { 254 assert!(val > 0, "Max blocking threads cannot be set to 0"); 255 self.max_blocking_threads = val; 256 self 257 } 258 259 /// Sets name of threads spawned by the `Runtime`'s thread pool. 260 /// 261 /// The default name is "tokio-runtime-worker". 262 /// 263 /// # Examples 264 /// 265 /// ``` 266 /// # use tokio::runtime; 267 /// 268 /// # pub fn main() { 269 /// let rt = runtime::Builder::new_multi_thread() 270 /// .thread_name("my-pool") 271 /// .build(); 272 /// # } 273 /// ``` thread_name(&mut self, val: impl Into<String>) -> &mut Self274 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { 275 let val = val.into(); 276 self.thread_name = std::sync::Arc::new(move || val.clone()); 277 self 278 } 279 280 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool. 281 /// 282 /// The default name fn is `|| "tokio-runtime-worker".into()`. 283 /// 284 /// # Examples 285 /// 286 /// ``` 287 /// # use tokio::runtime; 288 /// # use std::sync::atomic::{AtomicUsize, Ordering}; 289 /// 290 /// # pub fn main() { 291 /// let rt = runtime::Builder::new_multi_thread() 292 /// .thread_name_fn(|| { 293 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); 294 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); 295 /// format!("my-pool-{}", id) 296 /// }) 297 /// .build(); 298 /// # } 299 /// ``` thread_name_fn<F>(&mut self, f: F) -> &mut Self where F: Fn() -> String + Send + Sync + 'static,300 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self 301 where 302 F: Fn() -> String + Send + Sync + 'static, 303 { 304 self.thread_name = std::sync::Arc::new(f); 305 self 306 } 307 308 /// Sets the stack size (in bytes) for worker threads. 309 /// 310 /// The actual stack size may be greater than this value if the platform 311 /// specifies minimal stack size. 312 /// 313 /// The default stack size for spawned threads is 2 MiB, though this 314 /// particular stack size is subject to change in the future. 315 /// 316 /// # Examples 317 /// 318 /// ``` 319 /// # use tokio::runtime; 320 /// 321 /// # pub fn main() { 322 /// let rt = runtime::Builder::new_multi_thread() 323 /// .thread_stack_size(32 * 1024) 324 /// .build(); 325 /// # } 326 /// ``` thread_stack_size(&mut self, val: usize) -> &mut Self327 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self { 328 self.thread_stack_size = Some(val); 329 self 330 } 331 332 /// Executes function `f` after each thread is started but before it starts 333 /// doing work. 334 /// 335 /// This is intended for bookkeeping and monitoring use cases. 336 /// 337 /// # Examples 338 /// 339 /// ``` 340 /// # use tokio::runtime; 341 /// 342 /// # pub fn main() { 343 /// let runtime = runtime::Builder::new_multi_thread() 344 /// .on_thread_start(|| { 345 /// println!("thread started"); 346 /// }) 347 /// .build(); 348 /// # } 349 /// ``` 350 #[cfg(not(loom))] on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,351 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self 352 where 353 F: Fn() + Send + Sync + 'static, 354 { 355 self.after_start = Some(std::sync::Arc::new(f)); 356 self 357 } 358 359 /// Executes function `f` before each thread stops. 360 /// 361 /// This is intended for bookkeeping and monitoring use cases. 362 /// 363 /// # Examples 364 /// 365 /// ``` 366 /// # use tokio::runtime; 367 /// 368 /// # pub fn main() { 369 /// let runtime = runtime::Builder::new_multi_thread() 370 /// .on_thread_stop(|| { 371 /// println!("thread stopping"); 372 /// }) 373 /// .build(); 374 /// # } 375 /// ``` 376 #[cfg(not(loom))] on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,377 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self 378 where 379 F: Fn() + Send + Sync + 'static, 380 { 381 self.before_stop = Some(std::sync::Arc::new(f)); 382 self 383 } 384 385 /// Executes function `f` just before a thread is parked (goes idle). 386 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn) 387 /// can be called, and may result in this thread being unparked immediately. 388 /// 389 /// This can be used to start work only when the executor is idle, or for bookkeeping 390 /// and monitoring purposes. 391 /// 392 /// Note: There can only be one park callback for a runtime; calling this function 393 /// more than once replaces the last callback defined, rather than adding to it. 394 /// 395 /// # Examples 396 /// 397 /// ## Multithreaded executor 398 /// ``` 399 /// # use std::sync::Arc; 400 /// # use std::sync::atomic::{AtomicBool, Ordering}; 401 /// # use tokio::runtime; 402 /// # use tokio::sync::Barrier; 403 /// # pub fn main() { 404 /// let once = AtomicBool::new(true); 405 /// let barrier = Arc::new(Barrier::new(2)); 406 /// 407 /// let runtime = runtime::Builder::new_multi_thread() 408 /// .worker_threads(1) 409 /// .on_thread_park({ 410 /// let barrier = barrier.clone(); 411 /// move || { 412 /// let barrier = barrier.clone(); 413 /// if once.swap(false, Ordering::Relaxed) { 414 /// tokio::spawn(async move { barrier.wait().await; }); 415 /// } 416 /// } 417 /// }) 418 /// .build() 419 /// .unwrap(); 420 /// 421 /// runtime.block_on(async { 422 /// barrier.wait().await; 423 /// }) 424 /// # } 425 /// ``` 426 /// ## Current thread executor 427 /// ``` 428 /// # use std::sync::Arc; 429 /// # use std::sync::atomic::{AtomicBool, Ordering}; 430 /// # use tokio::runtime; 431 /// # use tokio::sync::Barrier; 432 /// # pub fn main() { 433 /// let once = AtomicBool::new(true); 434 /// let barrier = Arc::new(Barrier::new(2)); 435 /// 436 /// let runtime = runtime::Builder::new_current_thread() 437 /// .on_thread_park({ 438 /// let barrier = barrier.clone(); 439 /// move || { 440 /// let barrier = barrier.clone(); 441 /// if once.swap(false, Ordering::Relaxed) { 442 /// tokio::spawn(async move { barrier.wait().await; }); 443 /// } 444 /// } 445 /// }) 446 /// .build() 447 /// .unwrap(); 448 /// 449 /// runtime.block_on(async { 450 /// barrier.wait().await; 451 /// }) 452 /// # } 453 /// ``` 454 #[cfg(not(loom))] on_thread_park<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,455 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self 456 where 457 F: Fn() + Send + Sync + 'static, 458 { 459 self.before_park = Some(std::sync::Arc::new(f)); 460 self 461 } 462 463 /// Executes function `f` just after a thread unparks (starts executing tasks). 464 /// 465 /// This is intended for bookkeeping and monitoring use cases; note that work 466 /// in this callback will increase latencies when the application has allowed one or 467 /// more runtime threads to go idle. 468 /// 469 /// Note: There can only be one unpark callback for a runtime; calling this function 470 /// more than once replaces the last callback defined, rather than adding to it. 471 /// 472 /// # Examples 473 /// 474 /// ``` 475 /// # use tokio::runtime; 476 /// 477 /// # pub fn main() { 478 /// let runtime = runtime::Builder::new_multi_thread() 479 /// .on_thread_unpark(|| { 480 /// println!("thread unparking"); 481 /// }) 482 /// .build(); 483 /// 484 /// runtime.unwrap().block_on(async { 485 /// tokio::task::yield_now().await; 486 /// println!("Hello from Tokio!"); 487 /// }) 488 /// # } 489 /// ``` 490 #[cfg(not(loom))] on_thread_unpark<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,491 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self 492 where 493 F: Fn() + Send + Sync + 'static, 494 { 495 self.after_unpark = Some(std::sync::Arc::new(f)); 496 self 497 } 498 499 /// Creates the configured `Runtime`. 500 /// 501 /// The returned `Runtime` instance is ready to spawn tasks. 502 /// 503 /// # Examples 504 /// 505 /// ``` 506 /// use tokio::runtime::Builder; 507 /// 508 /// let rt = Builder::new_multi_thread().build().unwrap(); 509 /// 510 /// rt.block_on(async { 511 /// println!("Hello from the Tokio runtime"); 512 /// }); 513 /// ``` build(&mut self) -> io::Result<Runtime>514 pub fn build(&mut self) -> io::Result<Runtime> { 515 match &self.kind { 516 Kind::CurrentThread => self.build_basic_runtime(), 517 #[cfg(feature = "rt-multi-thread")] 518 Kind::MultiThread => self.build_threaded_runtime(), 519 } 520 } 521 get_cfg(&self) -> driver::Cfg522 fn get_cfg(&self) -> driver::Cfg { 523 driver::Cfg { 524 enable_pause_time: match self.kind { 525 Kind::CurrentThread => true, 526 #[cfg(feature = "rt-multi-thread")] 527 Kind::MultiThread => false, 528 }, 529 enable_io: self.enable_io, 530 enable_time: self.enable_time, 531 start_paused: self.start_paused, 532 } 533 } 534 535 /// Sets a custom timeout for a thread in the blocking pool. 536 /// 537 /// By default, the timeout for a thread is set to 10 seconds. This can 538 /// be overridden using .thread_keep_alive(). 539 /// 540 /// # Example 541 /// 542 /// ``` 543 /// # use tokio::runtime; 544 /// # use std::time::Duration; 545 /// 546 /// # pub fn main() { 547 /// let rt = runtime::Builder::new_multi_thread() 548 /// .thread_keep_alive(Duration::from_millis(100)) 549 /// .build(); 550 /// # } 551 /// ``` thread_keep_alive(&mut self, duration: Duration) -> &mut Self552 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self { 553 self.keep_alive = Some(duration); 554 self 555 } 556 build_basic_runtime(&mut self) -> io::Result<Runtime>557 fn build_basic_runtime(&mut self) -> io::Result<Runtime> { 558 use crate::runtime::{BasicScheduler, Kind}; 559 560 let (driver, resources) = driver::Driver::new(self.get_cfg())?; 561 562 // And now put a single-threaded scheduler on top of the timer. When 563 // there are no futures ready to do something, it'll let the timer or 564 // the reactor to generate some new stimuli for the futures to continue 565 // in their life. 566 let scheduler = 567 BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone()); 568 let spawner = Spawner::Basic(scheduler.spawner().clone()); 569 570 // Blocking pool 571 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); 572 let blocking_spawner = blocking_pool.spawner().clone(); 573 574 Ok(Runtime { 575 kind: Kind::CurrentThread(scheduler), 576 handle: Handle { 577 spawner, 578 io_handle: resources.io_handle, 579 time_handle: resources.time_handle, 580 signal_handle: resources.signal_handle, 581 clock: resources.clock, 582 blocking_spawner, 583 }, 584 blocking_pool, 585 }) 586 } 587 } 588 589 cfg_io_driver! { 590 impl Builder { 591 /// Enables the I/O driver. 592 /// 593 /// Doing this enables using net, process, signal, and some I/O types on 594 /// the runtime. 595 /// 596 /// # Examples 597 /// 598 /// ``` 599 /// use tokio::runtime; 600 /// 601 /// let rt = runtime::Builder::new_multi_thread() 602 /// .enable_io() 603 /// .build() 604 /// .unwrap(); 605 /// ``` 606 pub fn enable_io(&mut self) -> &mut Self { 607 self.enable_io = true; 608 self 609 } 610 } 611 } 612 613 cfg_time! { 614 impl Builder { 615 /// Enables the time driver. 616 /// 617 /// Doing this enables using `tokio::time` on the runtime. 618 /// 619 /// # Examples 620 /// 621 /// ``` 622 /// use tokio::runtime; 623 /// 624 /// let rt = runtime::Builder::new_multi_thread() 625 /// .enable_time() 626 /// .build() 627 /// .unwrap(); 628 /// ``` 629 pub fn enable_time(&mut self) -> &mut Self { 630 self.enable_time = true; 631 self 632 } 633 } 634 } 635 636 cfg_test_util! { 637 impl Builder { 638 /// Controls if the runtime's clock starts paused or advancing. 639 /// 640 /// Pausing time requires the current-thread runtime; construction of 641 /// the runtime will panic otherwise. 642 /// 643 /// # Examples 644 /// 645 /// ``` 646 /// use tokio::runtime; 647 /// 648 /// let rt = runtime::Builder::new_current_thread() 649 /// .enable_time() 650 /// .start_paused(true) 651 /// .build() 652 /// .unwrap(); 653 /// ``` 654 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self { 655 self.start_paused = start_paused; 656 self 657 } 658 } 659 } 660 661 cfg_rt_multi_thread! { 662 impl Builder { 663 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { 664 use crate::loom::sys::num_cpus; 665 use crate::runtime::{Kind, ThreadPool}; 666 use crate::runtime::park::Parker; 667 668 let core_threads = self.worker_threads.unwrap_or_else(num_cpus); 669 670 let (driver, resources) = driver::Driver::new(self.get_cfg())?; 671 672 let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone()); 673 let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); 674 675 // Create the blocking pool 676 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); 677 let blocking_spawner = blocking_pool.spawner().clone(); 678 679 // Create the runtime handle 680 let handle = Handle { 681 spawner, 682 io_handle: resources.io_handle, 683 time_handle: resources.time_handle, 684 signal_handle: resources.signal_handle, 685 clock: resources.clock, 686 blocking_spawner, 687 }; 688 689 // Spawn the thread pool workers 690 let _enter = crate::runtime::context::enter(handle.clone()); 691 launch.launch(); 692 693 Ok(Runtime { 694 kind: Kind::ThreadPool(scheduler), 695 handle, 696 blocking_pool, 697 }) 698 } 699 } 700 } 701 702 impl fmt::Debug for Builder { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result703 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 704 fmt.debug_struct("Builder") 705 .field("worker_threads", &self.worker_threads) 706 .field("max_blocking_threads", &self.max_blocking_threads) 707 .field( 708 "thread_name", 709 &"<dyn Fn() -> String + Send + Sync + 'static>", 710 ) 711 .field("thread_stack_size", &self.thread_stack_size) 712 .field("after_start", &self.after_start.as_ref().map(|_| "...")) 713 .field("before_stop", &self.before_stop.as_ref().map(|_| "...")) 714 .field("before_park", &self.before_park.as_ref().map(|_| "...")) 715 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "...")) 716 .finish() 717 } 718 } 719