1 use crate::runtime::handle::Handle; 2 use crate::runtime::shell::Shell; 3 use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; 4 5 use std::fmt; 6 #[cfg(not(loom))] 7 use std::sync::Arc; 8 #[cfg(feature = "rt-core")] 9 use std::time::Duration; 10 11 /// Builds Tokio Runtime with custom configuration values. 12 /// 13 /// Methods can be chained in order to set the configuration values. The 14 /// Runtime is constructed by calling [`build`]. 15 /// 16 /// New instances of `Builder` are obtained via [`Builder::new`]. 17 /// 18 /// See function level documentation for details on the various configuration 19 /// settings. 20 /// 21 /// [`build`]: #method.build 22 /// [`Builder::new`]: #method.new 23 /// 24 /// # Examples 25 /// 26 /// ``` 27 /// use tokio::runtime::Builder; 28 /// 29 /// fn main() { 30 /// // build runtime 31 /// let runtime = Builder::new() 32 /// .threaded_scheduler() 33 /// .core_threads(4) 34 /// .thread_name("my-custom-name") 35 /// .thread_stack_size(3 * 1024 * 1024) 36 /// .build() 37 /// .unwrap(); 38 /// 39 /// // use runtime ... 40 /// } 41 /// ``` 42 pub struct Builder { 43 /// The task execution model to use. 44 kind: Kind, 45 46 /// Whether or not to enable the I/O driver 47 enable_io: bool, 48 49 /// Whether or not to enable the time driver 50 enable_time: bool, 51 52 /// The number of worker threads, used by Runtime. 53 /// 54 /// Only used when not using the current-thread executor. 55 core_threads: Option<usize>, 56 57 /// Cap on thread usage. 58 max_threads: usize, 59 60 /// Name used for threads spawned by the runtime. 61 pub(super) thread_name: String, 62 63 /// Stack size used for threads spawned by the runtime. 64 pub(super) thread_stack_size: Option<usize>, 65 66 /// Callback to run after each thread starts. 67 pub(super) after_start: Option<Callback>, 68 69 /// To run before each worker thread stops 70 pub(super) before_stop: Option<Callback>, 71 72 /// Max throttling duration 73 #[cfg(feature = "rt-core")] 74 pub(super) max_throttling: Option<std::time::Duration>, 75 } 76 77 #[derive(Debug, Clone, Copy)] 78 enum Kind { 79 Shell, 80 #[cfg(feature = "rt-core")] 81 Basic, 82 #[cfg(feature = "rt-threaded")] 83 ThreadPool, 84 } 85 86 impl Builder { 87 /// Returns a new runtime builder initialized with default configuration 88 /// values. 89 /// 90 /// Configuration methods can be chained on the return value. new() -> Builder91 pub fn new() -> Builder { 92 Builder { 93 // No task execution by default 94 kind: Kind::Shell, 95 96 // I/O defaults to "off" 97 enable_io: false, 98 99 // Time defaults to "off" 100 enable_time: false, 101 102 // Default to lazy auto-detection (one thread per CPU core) 103 core_threads: None, 104 105 max_threads: 512, 106 107 // Default thread name 108 thread_name: "tokio-runtime-worker".into(), 109 110 // Do not set a stack size by default 111 thread_stack_size: None, 112 113 // No worker thread callbacks 114 after_start: None, 115 before_stop: None, 116 117 // No throttling by default 118 #[cfg(feature = "rt-core")] 119 max_throttling: None, 120 } 121 } 122 123 /// Enables both I/O and time drivers. 124 /// 125 /// Doing this is a shorthand for calling `enable_io` and `enable_time` 126 /// individually. If additional components are added to Tokio in the future, 127 /// `enable_all` will include these future components. 128 /// 129 /// # Examples 130 /// 131 /// ``` 132 /// use tokio::runtime; 133 /// 134 /// let rt = runtime::Builder::new() 135 /// .threaded_scheduler() 136 /// .enable_all() 137 /// .build() 138 /// .unwrap(); 139 /// ``` enable_all(&mut self) -> &mut Self140 pub fn enable_all(&mut self) -> &mut Self { 141 #[cfg(feature = "io-driver")] 142 self.enable_io(); 143 #[cfg(feature = "time")] 144 self.enable_time(); 145 146 self 147 } 148 149 #[deprecated(note = "In future will be replaced by core_threads method")] 150 /// Sets the maximum number of worker threads for the `Runtime`'s thread pool. 151 /// 152 /// This must be a number between 1 and 32,768 though it is advised to keep 153 /// this value on the smaller side. 154 /// 155 /// The default value is the number of cores available to the system. num_threads(&mut self, val: usize) -> &mut Self156 pub fn num_threads(&mut self, val: usize) -> &mut Self { 157 self.core_threads = Some(val); 158 self 159 } 160 161 /// Sets the core number of worker threads for the `Runtime`'s thread pool. 162 /// 163 /// This should be a number between 1 and 32,768 though it is advised to keep 164 /// this value on the smaller side. 165 /// 166 /// The default value is the number of cores available to the system. 167 /// 168 /// These threads will be always active and running. 169 /// 170 /// # Examples 171 /// 172 /// ``` 173 /// use tokio::runtime; 174 /// 175 /// let rt = runtime::Builder::new() 176 /// .threaded_scheduler() 177 /// .core_threads(4) 178 /// .build() 179 /// .unwrap(); 180 /// ``` core_threads(&mut self, val: usize) -> &mut Self181 pub fn core_threads(&mut self, val: usize) -> &mut Self { 182 assert_ne!(val, 0, "Core threads cannot be zero"); 183 self.core_threads = Some(val); 184 self 185 } 186 187 /// Specifies limit for threads, spawned by the Runtime. 188 /// 189 /// This is number of threads to be used by Runtime, including `core_threads` 190 /// Having `max_threads` less than `core_threads` results in invalid configuration 191 /// when building multi-threaded `Runtime`, which would cause a panic. 192 /// 193 /// Similarly to the `core_threads`, this number should be between 1 and 32,768. 194 /// 195 /// The default value is 512. 196 /// 197 /// When multi-threaded runtime is not used, will act as limit on additional threads. 198 /// 199 /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for 200 /// blocking annotations) as `max_threads - core_threads`. max_threads(&mut self, val: usize) -> &mut Self201 pub fn max_threads(&mut self, val: usize) -> &mut Self { 202 assert_ne!(val, 0, "Thread limit cannot be zero"); 203 self.max_threads = val; 204 self 205 } 206 207 /// Sets name of threads spawned by the `Runtime`'s thread pool. 208 /// 209 /// The default name is "tokio-runtime-worker". 210 /// 211 /// # Examples 212 /// 213 /// ``` 214 /// # use tokio::runtime; 215 /// 216 /// # pub fn main() { 217 /// let rt = runtime::Builder::new() 218 /// .thread_name("my-pool") 219 /// .build(); 220 /// # } 221 /// ``` thread_name(&mut self, val: impl Into<String>) -> &mut Self222 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { 223 self.thread_name = val.into(); 224 self 225 } 226 227 /// Sets the stack size (in bytes) for worker threads. 228 /// 229 /// The actual stack size may be greater than this value if the platform 230 /// specifies minimal stack size. 231 /// 232 /// The default stack size for spawned threads is 2 MiB, though this 233 /// particular stack size is subject to change in the future. 234 /// 235 /// # Examples 236 /// 237 /// ``` 238 /// # use tokio::runtime; 239 /// 240 /// # pub fn main() { 241 /// let rt = runtime::Builder::new() 242 /// .threaded_scheduler() 243 /// .thread_stack_size(32 * 1024) 244 /// .build(); 245 /// # } 246 /// ``` thread_stack_size(&mut self, val: usize) -> &mut Self247 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self { 248 self.thread_stack_size = Some(val); 249 self 250 } 251 252 /// Executes function `f` after each thread is started but before it starts 253 /// doing work. 254 /// 255 /// This is intended for bookkeeping and monitoring use cases. 256 /// 257 /// # Examples 258 /// 259 /// ``` 260 /// # use tokio::runtime; 261 /// 262 /// # pub fn main() { 263 /// let runtime = runtime::Builder::new() 264 /// .threaded_scheduler() 265 /// .on_thread_start(|| { 266 /// println!("thread started"); 267 /// }) 268 /// .build(); 269 /// # } 270 /// ``` 271 #[cfg(not(loom))] on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,272 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self 273 where 274 F: Fn() + Send + Sync + 'static, 275 { 276 self.after_start = Some(Arc::new(f)); 277 self 278 } 279 280 /// Executes function `f` before each thread stops. 281 /// 282 /// This is intended for bookkeeping and monitoring use cases. 283 /// 284 /// # Examples 285 /// 286 /// ``` 287 /// # use tokio::runtime; 288 /// 289 /// # pub fn main() { 290 /// let runtime = runtime::Builder::new() 291 /// .threaded_scheduler() 292 /// .on_thread_stop(|| { 293 /// println!("thread stopping"); 294 /// }) 295 /// .build(); 296 /// # } 297 /// ``` 298 #[cfg(not(loom))] on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,299 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self 300 where 301 F: Fn() + Send + Sync + 'static, 302 { 303 self.before_stop = Some(Arc::new(f)); 304 self 305 } 306 307 /// Sets the maximum throttling duration. 308 /// 309 /// Throttling reduces syscalls & context switches 310 /// by grouping timers, I/O and tasks handling. 311 /// 312 /// The default is to not apply any throttling. 313 /// 314 /// This is only available for the basic scheduler. 315 /// 316 /// # Examples 317 /// 318 /// ``` 319 /// # use tokio::runtime; 320 /// # use std::time::Duration; 321 /// 322 /// # pub fn main() { 323 /// let rt = runtime::Builder::new() 324 /// .basic_scheduler() 325 /// .enable_all() 326 /// .max_throttling(Duration::from_millis(20)) 327 /// .build(); 328 /// # } 329 /// ``` 330 #[cfg(feature = "rt-core")] max_throttling(&mut self, dur: Duration) -> &mut Self331 pub fn max_throttling(&mut self, dur: Duration) -> &mut Self { 332 self.max_throttling = Some(dur); 333 self 334 } 335 336 /// Creates the configured `Runtime`. 337 /// 338 /// The returned `ThreadPool` instance is ready to spawn tasks. 339 /// 340 /// # Examples 341 /// 342 /// ``` 343 /// use tokio::runtime::Builder; 344 /// 345 /// let mut rt = Builder::new().build().unwrap(); 346 /// 347 /// rt.block_on(async { 348 /// println!("Hello from the Tokio runtime"); 349 /// }); 350 /// ``` build(&mut self) -> io::Result<Runtime>351 pub fn build(&mut self) -> io::Result<Runtime> { 352 match self.kind { 353 Kind::Shell => self.build_shell_runtime(), 354 #[cfg(feature = "rt-core")] 355 Kind::Basic => self.build_basic_runtime(), 356 #[cfg(feature = "rt-threaded")] 357 Kind::ThreadPool => self.build_threaded_runtime(), 358 } 359 } 360 build_shell_runtime(&mut self) -> io::Result<Runtime>361 fn build_shell_runtime(&mut self) -> io::Result<Runtime> { 362 use crate::runtime::Kind; 363 364 let clock = time::create_clock(); 365 366 // Create I/O driver 367 let (io_driver, io_handle) = io::create_driver(self.enable_io)?; 368 let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); 369 370 let spawner = Spawner::Shell; 371 372 let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); 373 let blocking_spawner = blocking_pool.spawner().clone(); 374 375 Ok(Runtime { 376 kind: Kind::Shell(Shell::new(driver)), 377 handle: Handle { 378 spawner, 379 io_handle, 380 time_handle, 381 clock, 382 blocking_spawner, 383 }, 384 blocking_pool, 385 }) 386 } 387 } 388 389 cfg_io_driver! { 390 impl Builder { 391 /// Enables the I/O driver. 392 /// 393 /// Doing this enables using net, process, signal, and some I/O types on 394 /// the runtime. 395 /// 396 /// # Examples 397 /// 398 /// ``` 399 /// use tokio::runtime; 400 /// 401 /// let rt = runtime::Builder::new() 402 /// .enable_io() 403 /// .build() 404 /// .unwrap(); 405 /// ``` 406 pub fn enable_io(&mut self) -> &mut Self { 407 self.enable_io = true; 408 self 409 } 410 } 411 } 412 413 cfg_time! { 414 impl Builder { 415 /// Enables the time driver. 416 /// 417 /// Doing this enables using `tokio::time` on the runtime. 418 /// 419 /// # Examples 420 /// 421 /// ``` 422 /// use tokio::runtime; 423 /// 424 /// let rt = runtime::Builder::new() 425 /// .enable_time() 426 /// .build() 427 /// .unwrap(); 428 /// ``` 429 pub fn enable_time(&mut self) -> &mut Self { 430 self.enable_time = true; 431 self 432 } 433 } 434 } 435 436 cfg_rt_core! { 437 impl Builder { 438 /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread. 439 /// 440 /// The executor and all necessary drivers will all be run on the current 441 /// thread during `block_on` calls. 442 /// 443 /// See also [the module level documentation][1], which has a section on scheduler 444 /// types. 445 /// 446 /// [1]: index.html#runtime-configurations 447 pub fn basic_scheduler(&mut self) -> &mut Self { 448 self.kind = Kind::Basic; 449 self 450 } 451 452 fn build_basic_runtime(&mut self) -> io::Result<Runtime> { 453 use crate::runtime::{BasicScheduler, Kind}; 454 455 let clock = time::create_clock(); 456 457 let max_throttling = self.max_throttling.take().filter(|max_throttling| max_throttling.as_millis() > 0); 458 459 let (io_driver, io_handle) = io::create_driver(self.enable_io)?; 460 let (driver, time_handle) = time::create_throttling_driver( 461 self.enable_time, 462 io_driver, 463 clock.clone(), 464 max_throttling, 465 ); 466 467 // And now put a single-threaded scheduler on top of the timer. When 468 // there are no futures ready to do something, it'll let the timer or 469 // the reactor to generate some new stimuli for the futures to continue 470 // in their life. 471 let scheduler = BasicScheduler::new(driver, max_throttling); 472 let spawner = Spawner::Basic(scheduler.spawner()); 473 474 // Blocking pool 475 let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); 476 let blocking_spawner = blocking_pool.spawner().clone(); 477 478 Ok(Runtime { 479 kind: Kind::Basic(scheduler), 480 handle: Handle { 481 spawner, 482 io_handle, 483 time_handle, 484 clock, 485 blocking_spawner, 486 }, 487 blocking_pool, 488 }) 489 } 490 } 491 } 492 493 cfg_rt_threaded! { 494 impl Builder { 495 /// Sets runtime to use a multi-threaded scheduler for executing tasks. 496 /// 497 /// See also [the module level documentation][1], which has a section on scheduler 498 /// types. 499 /// 500 /// [1]: index.html#runtime-configurations 501 pub fn threaded_scheduler(&mut self) -> &mut Self { 502 self.kind = Kind::ThreadPool; 503 self 504 } 505 506 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { 507 use crate::runtime::{Kind, ThreadPool}; 508 use crate::runtime::park::Parker; 509 510 let core_threads = self.core_threads.unwrap_or_else(crate::loom::sys::num_cpus); 511 assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); 512 513 let clock = time::create_clock(); 514 515 let (io_driver, io_handle) = io::create_driver(self.enable_io)?; 516 let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); 517 let (scheduler, workers) = ThreadPool::new(core_threads, Parker::new(driver)); 518 let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); 519 520 // Create the blocking pool 521 let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); 522 let blocking_spawner = blocking_pool.spawner().clone(); 523 524 // Create the runtime handle 525 let handle = Handle { 526 spawner, 527 io_handle, 528 time_handle, 529 clock, 530 blocking_spawner, 531 }; 532 533 // Spawn the thread pool workers 534 workers.spawn(&handle); 535 536 // FIXME(fengalin): use max_throttling? 537 Ok(Runtime { 538 kind: Kind::ThreadPool(scheduler), 539 handle, 540 blocking_pool, 541 }) 542 } 543 } 544 } 545 546 impl Default for Builder { default() -> Self547 fn default() -> Self { 548 Self::new() 549 } 550 } 551 552 impl fmt::Debug for Builder { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result553 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 554 fmt.debug_struct("Builder") 555 .field("kind", &self.kind) 556 .field("core_threads", &self.core_threads) 557 .field("max_threads", &self.max_threads) 558 .field("thread_name", &self.thread_name) 559 .field("thread_stack_size", &self.thread_stack_size) 560 .field("after_start", &self.after_start.as_ref().map(|_| "...")) 561 .field("before_stop", &self.after_start.as_ref().map(|_| "...")) 562 .finish() 563 } 564 } 565