1 use crate::runtime::handle::Handle; 2 use crate::runtime::shell::Shell; 3 use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; 4 5 use std::fmt; 6 #[cfg(not(loom))] 7 use std::sync::Arc; 8 9 /// Builds Tokio Runtime with custom configuration values. 10 /// 11 /// Methods can be chained in order to set the configuration values. The 12 /// Runtime is constructed by calling [`build`]. 13 /// 14 /// New instances of `Builder` are obtained via [`Builder::new`]. 15 /// 16 /// See function level documentation for details on the various configuration 17 /// settings. 18 /// 19 /// [`build`]: method@Self::build 20 /// [`Builder::new`]: method@Self::new 21 /// 22 /// # Examples 23 /// 24 /// ``` 25 /// use tokio::runtime::Builder; 26 /// 27 /// fn main() { 28 /// // build runtime 29 /// let runtime = Builder::new() 30 /// .threaded_scheduler() 31 /// .core_threads(4) 32 /// .thread_name("my-custom-name") 33 /// .thread_stack_size(3 * 1024 * 1024) 34 /// .build() 35 /// .unwrap(); 36 /// 37 /// // use runtime ... 38 /// } 39 /// ``` 40 pub struct Builder { 41 /// The task execution model to use. 42 kind: Kind, 43 44 /// Whether or not to enable the I/O driver 45 enable_io: bool, 46 47 /// Whether or not to enable the time driver 48 enable_time: bool, 49 50 /// The number of worker threads, used by Runtime. 51 /// 52 /// Only used when not using the current-thread executor. 53 core_threads: Option<usize>, 54 55 /// Cap on thread usage. 56 max_threads: usize, 57 58 /// Name used for threads spawned by the runtime. 59 pub(super) thread_name: String, 60 61 /// Stack size used for threads spawned by the runtime. 62 pub(super) thread_stack_size: Option<usize>, 63 64 /// Callback to run after each thread starts. 65 pub(super) after_start: Option<Callback>, 66 67 /// To run before each worker thread stops 68 pub(super) before_stop: Option<Callback>, 69 } 70 71 #[derive(Debug, Clone, Copy)] 72 enum Kind { 73 Shell, 74 #[cfg(feature = "rt-core")] 75 Basic, 76 #[cfg(feature = "rt-threaded")] 77 ThreadPool, 78 } 79 80 impl Builder { 81 /// Returns a new runtime builder initialized with default configuration 82 /// values. 83 /// 84 /// Configuration methods can be chained on the return value. new() -> Builder85 pub fn new() -> Builder { 86 Builder { 87 // No task execution by default 88 kind: Kind::Shell, 89 90 // I/O defaults to "off" 91 enable_io: false, 92 93 // Time defaults to "off" 94 enable_time: false, 95 96 // Default to lazy auto-detection (one thread per CPU core) 97 core_threads: None, 98 99 max_threads: 512, 100 101 // Default thread name 102 thread_name: "tokio-runtime-worker".into(), 103 104 // Do not set a stack size by default 105 thread_stack_size: None, 106 107 // No worker thread callbacks 108 after_start: None, 109 before_stop: None, 110 } 111 } 112 113 /// Enables both I/O and time drivers. 114 /// 115 /// Doing this is a shorthand for calling `enable_io` and `enable_time` 116 /// individually. If additional components are added to Tokio in the future, 117 /// `enable_all` will include these future components. 118 /// 119 /// # Examples 120 /// 121 /// ``` 122 /// use tokio::runtime; 123 /// 124 /// let rt = runtime::Builder::new() 125 /// .threaded_scheduler() 126 /// .enable_all() 127 /// .build() 128 /// .unwrap(); 129 /// ``` enable_all(&mut self) -> &mut Self130 pub fn enable_all(&mut self) -> &mut Self { 131 #[cfg(feature = "io-driver")] 132 self.enable_io(); 133 #[cfg(feature = "time")] 134 self.enable_time(); 135 136 self 137 } 138 139 #[deprecated(note = "In future will be replaced by core_threads method")] 140 /// Sets the maximum number of worker threads for the `Runtime`'s thread pool. 141 /// 142 /// This must be a number between 1 and 32,768 though it is advised to keep 143 /// this value on the smaller side. 144 /// 145 /// The default value is the number of cores available to the system. num_threads(&mut self, val: usize) -> &mut Self146 pub fn num_threads(&mut self, val: usize) -> &mut Self { 147 self.core_threads = Some(val); 148 self 149 } 150 151 /// Sets the core number of worker threads for the `Runtime`'s thread pool. 152 /// 153 /// This should be a number between 1 and 32,768 though it is advised to keep 154 /// this value on the smaller side. 155 /// 156 /// The default value is the number of cores available to the system. 157 /// 158 /// These threads will be always active and running. 159 /// 160 /// # Examples 161 /// 162 /// ``` 163 /// use tokio::runtime; 164 /// 165 /// let rt = runtime::Builder::new() 166 /// .threaded_scheduler() 167 /// .core_threads(4) 168 /// .build() 169 /// .unwrap(); 170 /// ``` core_threads(&mut self, val: usize) -> &mut Self171 pub fn core_threads(&mut self, val: usize) -> &mut Self { 172 assert_ne!(val, 0, "Core threads cannot be zero"); 173 self.core_threads = Some(val); 174 self 175 } 176 177 /// Specifies limit for threads, spawned by the Runtime. 178 /// 179 /// This is number of threads to be used by Runtime, including `core_threads` 180 /// Having `max_threads` less than `core_threads` results in invalid configuration 181 /// when building multi-threaded `Runtime`, which would cause a panic. 182 /// 183 /// Similarly to the `core_threads`, this number should be between 1 and 32,768. 184 /// 185 /// The default value is 512. 186 /// 187 /// When multi-threaded runtime is not used, will act as limit on additional threads. 188 /// 189 /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for 190 /// blocking annotations) as `max_threads - core_threads`. max_threads(&mut self, val: usize) -> &mut Self191 pub fn max_threads(&mut self, val: usize) -> &mut Self { 192 assert_ne!(val, 0, "Thread limit cannot be zero"); 193 self.max_threads = val; 194 self 195 } 196 197 /// Sets name of threads spawned by the `Runtime`'s thread pool. 198 /// 199 /// The default name is "tokio-runtime-worker". 200 /// 201 /// # Examples 202 /// 203 /// ``` 204 /// # use tokio::runtime; 205 /// 206 /// # pub fn main() { 207 /// let rt = runtime::Builder::new() 208 /// .thread_name("my-pool") 209 /// .build(); 210 /// # } 211 /// ``` thread_name(&mut self, val: impl Into<String>) -> &mut Self212 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { 213 self.thread_name = val.into(); 214 self 215 } 216 217 /// Sets the stack size (in bytes) for worker threads. 218 /// 219 /// The actual stack size may be greater than this value if the platform 220 /// specifies minimal stack size. 221 /// 222 /// The default stack size for spawned threads is 2 MiB, though this 223 /// particular stack size is subject to change in the future. 224 /// 225 /// # Examples 226 /// 227 /// ``` 228 /// # use tokio::runtime; 229 /// 230 /// # pub fn main() { 231 /// let rt = runtime::Builder::new() 232 /// .threaded_scheduler() 233 /// .thread_stack_size(32 * 1024) 234 /// .build(); 235 /// # } 236 /// ``` thread_stack_size(&mut self, val: usize) -> &mut Self237 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self { 238 self.thread_stack_size = Some(val); 239 self 240 } 241 242 /// Executes function `f` after each thread is started but before it starts 243 /// doing work. 244 /// 245 /// This is intended for bookkeeping and monitoring use cases. 246 /// 247 /// # Examples 248 /// 249 /// ``` 250 /// # use tokio::runtime; 251 /// 252 /// # pub fn main() { 253 /// let runtime = runtime::Builder::new() 254 /// .threaded_scheduler() 255 /// .on_thread_start(|| { 256 /// println!("thread started"); 257 /// }) 258 /// .build(); 259 /// # } 260 /// ``` 261 #[cfg(not(loom))] on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,262 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self 263 where 264 F: Fn() + Send + Sync + 'static, 265 { 266 self.after_start = Some(Arc::new(f)); 267 self 268 } 269 270 /// Executes function `f` before each thread stops. 271 /// 272 /// This is intended for bookkeeping and monitoring use cases. 273 /// 274 /// # Examples 275 /// 276 /// ``` 277 /// # use tokio::runtime; 278 /// 279 /// # pub fn main() { 280 /// let runtime = runtime::Builder::new() 281 /// .threaded_scheduler() 282 /// .on_thread_stop(|| { 283 /// println!("thread stopping"); 284 /// }) 285 /// .build(); 286 /// # } 287 /// ``` 288 #[cfg(not(loom))] on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,289 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self 290 where 291 F: Fn() + Send + Sync + 'static, 292 { 293 self.before_stop = Some(Arc::new(f)); 294 self 295 } 296 297 /// Creates the configured `Runtime`. 298 /// 299 /// The returned `ThreadPool` instance is ready to spawn tasks. 300 /// 301 /// # Examples 302 /// 303 /// ``` 304 /// use tokio::runtime::Builder; 305 /// 306 /// let mut rt = Builder::new().build().unwrap(); 307 /// 308 /// rt.block_on(async { 309 /// println!("Hello from the Tokio runtime"); 310 /// }); 311 /// ``` build(&mut self) -> io::Result<Runtime>312 pub fn build(&mut self) -> io::Result<Runtime> { 313 match self.kind { 314 Kind::Shell => self.build_shell_runtime(), 315 #[cfg(feature = "rt-core")] 316 Kind::Basic => self.build_basic_runtime(), 317 #[cfg(feature = "rt-threaded")] 318 Kind::ThreadPool => self.build_threaded_runtime(), 319 } 320 } 321 build_shell_runtime(&mut self) -> io::Result<Runtime>322 fn build_shell_runtime(&mut self) -> io::Result<Runtime> { 323 use crate::runtime::Kind; 324 325 let clock = time::create_clock(); 326 327 // Create I/O driver 328 let (io_driver, io_handle) = io::create_driver(self.enable_io)?; 329 let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); 330 331 let spawner = Spawner::Shell; 332 333 let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); 334 let blocking_spawner = blocking_pool.spawner().clone(); 335 336 Ok(Runtime { 337 kind: Kind::Shell(Shell::new(driver)), 338 handle: Handle { 339 spawner, 340 io_handle, 341 time_handle, 342 clock, 343 blocking_spawner, 344 }, 345 blocking_pool, 346 }) 347 } 348 } 349 350 cfg_io_driver! { 351 impl Builder { 352 /// Enables the I/O driver. 353 /// 354 /// Doing this enables using net, process, signal, and some I/O types on 355 /// the runtime. 356 /// 357 /// # Examples 358 /// 359 /// ``` 360 /// use tokio::runtime; 361 /// 362 /// let rt = runtime::Builder::new() 363 /// .enable_io() 364 /// .build() 365 /// .unwrap(); 366 /// ``` 367 pub fn enable_io(&mut self) -> &mut Self { 368 self.enable_io = true; 369 self 370 } 371 } 372 } 373 374 cfg_time! { 375 impl Builder { 376 /// Enables the time driver. 377 /// 378 /// Doing this enables using `tokio::time` on the runtime. 379 /// 380 /// # Examples 381 /// 382 /// ``` 383 /// use tokio::runtime; 384 /// 385 /// let rt = runtime::Builder::new() 386 /// .enable_time() 387 /// .build() 388 /// .unwrap(); 389 /// ``` 390 pub fn enable_time(&mut self) -> &mut Self { 391 self.enable_time = true; 392 self 393 } 394 } 395 } 396 397 cfg_rt_core! { 398 impl Builder { 399 /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread. 400 /// 401 /// The executor and all necessary drivers will all be run on the current 402 /// thread during [`block_on`] calls. 403 /// 404 /// See also [the module level documentation][1], which has a section on scheduler 405 /// types. 406 /// 407 /// [1]: index.html#runtime-configurations 408 /// [`block_on`]: Runtime::block_on 409 pub fn basic_scheduler(&mut self) -> &mut Self { 410 self.kind = Kind::Basic; 411 self 412 } 413 414 fn build_basic_runtime(&mut self) -> io::Result<Runtime> { 415 use crate::runtime::{BasicScheduler, Kind}; 416 417 let clock = time::create_clock(); 418 419 // Create I/O driver 420 let (io_driver, io_handle) = io::create_driver(self.enable_io)?; 421 422 let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); 423 424 // And now put a single-threaded scheduler on top of the timer. When 425 // there are no futures ready to do something, it'll let the timer or 426 // the reactor to generate some new stimuli for the futures to continue 427 // in their life. 428 let scheduler = BasicScheduler::new(driver); 429 let spawner = Spawner::Basic(scheduler.spawner().clone()); 430 431 // Blocking pool 432 let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); 433 let blocking_spawner = blocking_pool.spawner().clone(); 434 435 Ok(Runtime { 436 kind: Kind::Basic(scheduler), 437 handle: Handle { 438 spawner, 439 io_handle, 440 time_handle, 441 clock, 442 blocking_spawner, 443 }, 444 blocking_pool, 445 }) 446 } 447 } 448 } 449 450 cfg_rt_threaded! { 451 impl Builder { 452 /// Sets runtime to use a multi-threaded scheduler for executing tasks. 453 /// 454 /// See also [the module level documentation][1], which has a section on scheduler 455 /// types. 456 /// 457 /// [1]: index.html#runtime-configurations 458 pub fn threaded_scheduler(&mut self) -> &mut Self { 459 self.kind = Kind::ThreadPool; 460 self 461 } 462 463 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { 464 use crate::loom::sys::num_cpus; 465 use crate::runtime::{Kind, ThreadPool}; 466 use crate::runtime::park::Parker; 467 use std::cmp; 468 469 let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); 470 assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); 471 472 let clock = time::create_clock(); 473 474 let (io_driver, io_handle) = io::create_driver(self.enable_io)?; 475 let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); 476 let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); 477 let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); 478 479 // Create the blocking pool 480 let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); 481 let blocking_spawner = blocking_pool.spawner().clone(); 482 483 // Create the runtime handle 484 let handle = Handle { 485 spawner, 486 io_handle, 487 time_handle, 488 clock, 489 blocking_spawner, 490 }; 491 492 // Spawn the thread pool workers 493 handle.enter(|| launch.launch()); 494 495 Ok(Runtime { 496 kind: Kind::ThreadPool(scheduler), 497 handle, 498 blocking_pool, 499 }) 500 } 501 } 502 } 503 504 impl Default for Builder { default() -> Self505 fn default() -> Self { 506 Self::new() 507 } 508 } 509 510 impl fmt::Debug for Builder { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result511 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 512 fmt.debug_struct("Builder") 513 .field("kind", &self.kind) 514 .field("core_threads", &self.core_threads) 515 .field("max_threads", &self.max_threads) 516 .field("thread_name", &self.thread_name) 517 .field("thread_stack_size", &self.thread_stack_size) 518 .field("after_start", &self.after_start.as_ref().map(|_| "...")) 519 .field("before_stop", &self.after_start.as_ref().map(|_| "...")) 520 .finish() 521 } 522 } 523