1 //! The Tokio runtime. 2 //! 3 //! Unlike other Rust programs, asynchronous applications require 4 //! runtime support. In particular, the following runtime services are 5 //! necessary: 6 //! 7 //! * An **I/O event loop**, called the driver, which drives I/O resources and 8 //! dispatches I/O events to tasks that depend on them. 9 //! * A **scheduler** to execute [tasks] that use these I/O resources. 10 //! * A **timer** for scheduling work to run after a set period of time. 11 //! 12 //! Tokio's [`Runtime`] bundles all of these services as a single type, allowing 13 //! them to be started, shut down, and configured together. However, most 14 //! applications won't need to use [`Runtime`] directly. Instead, they can 15 //! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under 16 //! the hood. 17 //! 18 //! # Usage 19 //! 20 //! Most applications will use the [`tokio::main`] attribute macro. 21 //! 22 //! ```no_run 23 //! use tokio::net::TcpListener; 24 //! use tokio::prelude::*; 25 //! 26 //! #[tokio::main] 27 //! async fn main() -> Result<(), Box<dyn std::error::Error>> { 28 //! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; 29 //! 30 //! loop { 31 //! let (mut socket, _) = listener.accept().await?; 32 //! 33 //! tokio::spawn(async move { 34 //! let mut buf = [0; 1024]; 35 //! 36 //! // In a loop, read data from the socket and write the data back. 37 //! loop { 38 //! let n = match socket.read(&mut buf).await { 39 //! // socket closed 40 //! Ok(n) if n == 0 => return, 41 //! Ok(n) => n, 42 //! Err(e) => { 43 //! println!("failed to read from socket; err = {:?}", e); 44 //! return; 45 //! } 46 //! }; 47 //! 48 //! // Write the data back 49 //! if let Err(e) = socket.write_all(&buf[0..n]).await { 50 //! println!("failed to write to socket; err = {:?}", e); 51 //! return; 52 //! } 53 //! } 54 //! }); 55 //! } 56 //! } 57 //! ``` 58 //! 59 //! From within the context of the runtime, additional tasks are spawned using 60 //! the [`tokio::spawn`] function. Futures spawned using this function will be 61 //! executed on the same thread pool used by the [`Runtime`]. 62 //! 63 //! A [`Runtime`] instance can also be used directly. 64 //! 65 //! ```no_run 66 //! use tokio::net::TcpListener; 67 //! use tokio::prelude::*; 68 //! use tokio::runtime::Runtime; 69 //! 70 //! fn main() -> Result<(), Box<dyn std::error::Error>> { 71 //! // Create the runtime 72 //! let mut rt = Runtime::new()?; 73 //! 74 //! // Spawn the root task 75 //! rt.block_on(async { 76 //! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; 77 //! 78 //! loop { 79 //! let (mut socket, _) = listener.accept().await?; 80 //! 81 //! tokio::spawn(async move { 82 //! let mut buf = [0; 1024]; 83 //! 84 //! // In a loop, read data from the socket and write the data back. 85 //! loop { 86 //! let n = match socket.read(&mut buf).await { 87 //! // socket closed 88 //! Ok(n) if n == 0 => return, 89 //! Ok(n) => n, 90 //! Err(e) => { 91 //! println!("failed to read from socket; err = {:?}", e); 92 //! return; 93 //! } 94 //! }; 95 //! 96 //! // Write the data back 97 //! if let Err(e) = socket.write_all(&buf[0..n]).await { 98 //! println!("failed to write to socket; err = {:?}", e); 99 //! return; 100 //! } 101 //! } 102 //! }); 103 //! } 104 //! }) 105 //! } 106 //! ``` 107 //! 108 //! ## Runtime Configurations 109 //! 110 //! Tokio provides multiple task scheduling strategies, suitable for different 111 //! applications. The [runtime builder] or `#[tokio::main]` attribute may be 112 //! used to select which scheduler to use. 113 //! 114 //! #### Basic Scheduler 115 //! 116 //! The basic scheduler provides a _single-threaded_ future executor. All tasks 117 //! will be created and executed on the current thread. The basic scheduler 118 //! requires the `rt-core` feature flag, and can be selected using the 119 //! [`Builder::basic_scheduler`] method: 120 //! ``` 121 //! use tokio::runtime; 122 //! 123 //! # fn main() -> Result<(), Box<dyn std::error::Error>> { 124 //! let basic_rt = runtime::Builder::new() 125 //! .basic_scheduler() 126 //! .build()?; 127 //! # Ok(()) } 128 //! ``` 129 //! 130 //! If the `rt-core` feature is enabled and `rt-threaded` is not, 131 //! [`Runtime::new`] will return a basic scheduler runtime by default. 132 //! 133 //! #### Threaded Scheduler 134 //! 135 //! The threaded scheduler executes futures on a _thread pool_, using a 136 //! work-stealing strategy. By default, it will start a worker thread for each 137 //! CPU core available on the system. This tends to be the ideal configurations 138 //! for most applications. The threaded scheduler requires the `rt-threaded` feature 139 //! flag, and can be selected using the [`Builder::threaded_scheduler`] method: 140 //! ``` 141 //! use tokio::runtime; 142 //! 143 //! # fn main() -> Result<(), Box<dyn std::error::Error>> { 144 //! let threaded_rt = runtime::Builder::new() 145 //! .threaded_scheduler() 146 //! .build()?; 147 //! # Ok(()) } 148 //! ``` 149 //! 150 //! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a 151 //! threaded scheduler runtime by default. 152 //! 153 //! Most applications should use the threaded scheduler, except in some niche 154 //! use-cases, such as when running only a single thread is required. 155 //! 156 //! #### Resource drivers 157 //! 158 //! When configuring a runtime by hand, no resource drivers are enabled by 159 //! default. In this case, attempting to use networking types or time types will 160 //! fail. In order to enable these types, the resource drivers must be enabled. 161 //! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a 162 //! shorthand, [`Builder::enable_all`] enables both resource drivers. 163 //! 164 //! ## Lifetime of spawned threads 165 //! 166 //! The runtime may spawn threads depending on its configuration and usage. The 167 //! threaded scheduler spawns threads to schedule tasks and calls to 168 //! `spawn_blocking` spawn threads to run blocking operations. 169 //! 170 //! While the `Runtime` is active, threads may shutdown after periods of being 171 //! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown. 172 //! Any tasks that have not yet completed will be dropped. 173 //! 174 //! [tasks]: crate::task 175 //! [`Runtime`]: Runtime 176 //! [`tokio::spawn`]: crate::spawn 177 //! [`tokio::main`]: ../attr.main.html 178 //! [runtime builder]: crate::runtime::Builder 179 //! [`Runtime::new`]: crate::runtime::Runtime::new 180 //! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler 181 //! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler 182 //! [`Builder::enable_io`]: crate::runtime::Builder::enable_io 183 //! [`Builder::enable_time`]: crate::runtime::Builder::enable_time 184 //! [`Builder::enable_all`]: crate::runtime::Builder::enable_all 185 186 // At the top due to macros 187 #[cfg(test)] 188 #[macro_use] 189 mod tests; 190 191 pub(crate) mod context; 192 193 cfg_rt_core! { 194 mod basic_scheduler; 195 use basic_scheduler::BasicScheduler; 196 197 pub(crate) mod task; 198 } 199 200 mod blocking; 201 use blocking::BlockingPool; 202 203 cfg_blocking_impl! { 204 #[allow(unused_imports)] 205 pub(crate) use blocking::{spawn_blocking, try_spawn_blocking}; 206 } 207 208 mod builder; 209 pub use self::builder::Builder; 210 211 pub(crate) mod enter; 212 use self::enter::enter; 213 214 mod handle; 215 pub use self::handle::{Handle, TryCurrentError}; 216 217 mod io; 218 219 cfg_rt_threaded! { 220 mod park; 221 use park::Parker; 222 } 223 224 mod shell; 225 use self::shell::Shell; 226 227 mod spawner; 228 use self::spawner::Spawner; 229 230 mod time; 231 232 cfg_rt_threaded! { 233 mod queue; 234 235 pub(crate) mod thread_pool; 236 use self::thread_pool::ThreadPool; 237 } 238 239 cfg_rt_core! { 240 use crate::task::JoinHandle; 241 } 242 243 use std::future::Future; 244 use std::time::Duration; 245 246 /// The Tokio runtime. 247 /// 248 /// The runtime provides an I/O driver, task scheduler, [timer], and blocking 249 /// pool, necessary for running asynchronous tasks. 250 /// 251 /// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, 252 /// most users will use the `#[tokio::main]` annotation on their entry point instead. 253 /// 254 /// See [module level][mod] documentation for more details. 255 /// 256 /// # Shutdown 257 /// 258 /// Shutting down the runtime is done by dropping the value. The current thread 259 /// will block until the shut down operation has completed. 260 /// 261 /// * Drain any scheduled work queues. 262 /// * Drop any futures that have not yet completed. 263 /// * Drop the reactor. 264 /// 265 /// Once the reactor has dropped, any outstanding I/O resources bound to 266 /// that reactor will no longer function. Calling any method on them will 267 /// result in an error. 268 /// 269 /// [timer]: crate::time 270 /// [mod]: index.html 271 /// [`new`]: method@Self::new 272 /// [`Builder`]: struct@Builder 273 /// [`tokio::run`]: fn@run 274 #[derive(Debug)] 275 pub struct Runtime { 276 /// Task executor 277 kind: Kind, 278 279 /// Handle to runtime, also contains driver handles 280 handle: Handle, 281 282 /// Blocking pool handle, used to signal shutdown 283 blocking_pool: BlockingPool, 284 } 285 286 /// The runtime executor is either a thread-pool or a current-thread executor. 287 #[derive(Debug)] 288 enum Kind { 289 /// Not able to execute concurrent tasks. This variant is mostly used to get 290 /// access to the driver handles. 291 Shell(Shell), 292 293 /// Execute all tasks on the current-thread. 294 #[cfg(feature = "rt-core")] 295 Basic(BasicScheduler<time::Driver>), 296 297 /// Execute tasks across multiple threads. 298 #[cfg(feature = "rt-threaded")] 299 ThreadPool(ThreadPool), 300 } 301 302 /// After thread starts / before thread stops 303 type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; 304 305 impl Runtime { 306 /// Create a new runtime instance with default configuration values. 307 /// 308 /// This results in a scheduler, I/O driver, and time driver being 309 /// initialized. The type of scheduler used depends on what feature flags 310 /// are enabled: if the `rt-threaded` feature is enabled, the [threaded 311 /// scheduler] is used, while if only the `rt-core` feature is enabled, the 312 /// [basic scheduler] is used instead. 313 /// 314 /// If the threaded scheduler is selected, it will not spawn 315 /// any worker threads until it needs to, i.e. tasks are scheduled to run. 316 /// 317 /// Most applications will not need to call this function directly. Instead, 318 /// they will use the [`#[tokio::main]` attribute][main]. When more complex 319 /// configuration is necessary, the [runtime builder] may be used. 320 /// 321 /// See [module level][mod] documentation for more details. 322 /// 323 /// # Examples 324 /// 325 /// Creating a new `Runtime` with default configuration values. 326 /// 327 /// ``` 328 /// use tokio::runtime::Runtime; 329 /// 330 /// let rt = Runtime::new() 331 /// .unwrap(); 332 /// 333 /// // Use the runtime... 334 /// ``` 335 /// 336 /// [mod]: index.html 337 /// [main]: ../attr.main.html 338 /// [threaded scheduler]: index.html#threaded-scheduler 339 /// [basic scheduler]: index.html#basic-scheduler 340 /// [runtime builder]: crate::runtime::Builder new() -> io::Result<Runtime>341 pub fn new() -> io::Result<Runtime> { 342 #[cfg(feature = "rt-threaded")] 343 let ret = Builder::new().threaded_scheduler().enable_all().build(); 344 345 #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))] 346 let ret = Builder::new().basic_scheduler().enable_all().build(); 347 348 #[cfg(not(feature = "rt-core"))] 349 let ret = Builder::new().enable_all().build(); 350 351 ret 352 } 353 354 /// Spawn a future onto the Tokio runtime. 355 /// 356 /// This spawns the given future onto the runtime's executor, usually a 357 /// thread pool. The thread pool is then responsible for polling the future 358 /// until it completes. 359 /// 360 /// See [module level][mod] documentation for more details. 361 /// 362 /// [mod]: index.html 363 /// 364 /// # Examples 365 /// 366 /// ``` 367 /// use tokio::runtime::Runtime; 368 /// 369 /// # fn dox() { 370 /// // Create the runtime 371 /// let rt = Runtime::new().unwrap(); 372 /// 373 /// // Spawn a future onto the runtime 374 /// rt.spawn(async { 375 /// println!("now running on a worker thread"); 376 /// }); 377 /// # } 378 /// ``` 379 /// 380 /// # Panics 381 /// 382 /// This function will not panic unless task execution is disabled on the 383 /// executor. This can only happen if the runtime was built using 384 /// [`Builder`] without picking either [`basic_scheduler`] or 385 /// [`threaded_scheduler`]. 386 /// 387 /// [`Builder`]: struct@Builder 388 /// [`threaded_scheduler`]: fn@Builder::threaded_scheduler 389 /// [`basic_scheduler`]: fn@Builder::basic_scheduler 390 #[cfg(feature = "rt-core")] spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,391 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> 392 where 393 F: Future + Send + 'static, 394 F::Output: Send + 'static, 395 { 396 match &self.kind { 397 Kind::Shell(_) => panic!("task execution disabled"), 398 #[cfg(feature = "rt-threaded")] 399 Kind::ThreadPool(exec) => exec.spawn(future), 400 Kind::Basic(exec) => exec.spawn(future), 401 } 402 } 403 404 /// Run a future to completion on the Tokio runtime. This is the runtime's 405 /// entry point. 406 /// 407 /// This runs the given future on the runtime, blocking until it is 408 /// complete, and yielding its resolved result. Any tasks or timers which 409 /// the future spawns internally will be executed on the runtime. 410 /// 411 /// `&mut` is required as calling `block_on` **may** result in advancing the 412 /// state of the runtime. The details depend on how the runtime is 413 /// configured. [`runtime::Handle::block_on`][handle] provides a version 414 /// that takes `&self`. 415 /// 416 /// This method may not be called from an asynchronous context. 417 /// 418 /// # Panics 419 /// 420 /// This function panics if the provided future panics, or if called within an 421 /// asynchronous execution context. 422 /// 423 /// # Examples 424 /// 425 /// ```no_run 426 /// use tokio::runtime::Runtime; 427 /// 428 /// // Create the runtime 429 /// let mut rt = Runtime::new().unwrap(); 430 /// 431 /// // Execute the future, blocking the current thread until completion 432 /// rt.block_on(async { 433 /// println!("hello"); 434 /// }); 435 /// ``` 436 /// 437 /// [handle]: fn@Handle::block_on block_on<F: Future>(&mut self, future: F) -> F::Output438 pub fn block_on<F: Future>(&mut self, future: F) -> F::Output { 439 let kind = &mut self.kind; 440 441 self.handle.enter(|| match kind { 442 Kind::Shell(exec) => exec.block_on(future), 443 #[cfg(feature = "rt-core")] 444 Kind::Basic(exec) => exec.block_on(future), 445 #[cfg(feature = "rt-threaded")] 446 Kind::ThreadPool(exec) => exec.block_on(future), 447 }) 448 } 449 450 /// Enter the runtime context. This allows you to construct types that must 451 /// have an executor available on creation such as [`Delay`] or [`TcpStream`]. 452 /// It will also allow you to call methods such as [`tokio::spawn`]. 453 /// 454 /// This function is also available as [`Handle::enter`]. 455 /// 456 /// [`Delay`]: struct@crate::time::Delay 457 /// [`TcpStream`]: struct@crate::net::TcpStream 458 /// [`Handle::enter`]: fn@crate::runtime::Handle::enter 459 /// [`tokio::spawn`]: fn@crate::spawn 460 /// 461 /// # Example 462 /// 463 /// ``` 464 /// use tokio::runtime::Runtime; 465 /// 466 /// fn function_that_spawns(msg: String) { 467 /// // Had we not used `rt.enter` below, this would panic. 468 /// tokio::spawn(async move { 469 /// println!("{}", msg); 470 /// }); 471 /// } 472 /// 473 /// fn main() { 474 /// let rt = Runtime::new().unwrap(); 475 /// 476 /// let s = "Hello World!".to_string(); 477 /// 478 /// // By entering the context, we tie `tokio::spawn` to this executor. 479 /// rt.enter(|| function_that_spawns(s)); 480 /// } 481 /// ``` enter<F, R>(&self, f: F) -> R where F: FnOnce() -> R,482 pub fn enter<F, R>(&self, f: F) -> R 483 where 484 F: FnOnce() -> R, 485 { 486 self.handle.enter(f) 487 } 488 489 /// Return a handle to the runtime's spawner. 490 /// 491 /// The returned handle can be used to spawn tasks that run on this runtime, and can 492 /// be cloned to allow moving the `Handle` to other threads. 493 /// 494 /// # Examples 495 /// 496 /// ``` 497 /// use tokio::runtime::Runtime; 498 /// 499 /// let rt = Runtime::new() 500 /// .unwrap(); 501 /// 502 /// let handle = rt.handle(); 503 /// 504 /// handle.spawn(async { println!("hello"); }); 505 /// ``` handle(&self) -> &Handle506 pub fn handle(&self) -> &Handle { 507 &self.handle 508 } 509 510 /// Shutdown the runtime, waiting for at most `duration` for all spawned 511 /// task to shutdown. 512 /// 513 /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to 514 /// shutdown in a timely fashion. However, dropping a `Runtime` will wait 515 /// indefinitely for all tasks to terminate, and there are cases where a long 516 /// blocking task has been spawned, which can block dropping `Runtime`. 517 /// 518 /// In this case, calling `shutdown_timeout` with an explicit wait timeout 519 /// can work. The `shutdown_timeout` will signal all tasks to shutdown and 520 /// will wait for at most `duration` for all spawned tasks to terminate. If 521 /// `timeout` elapses before all tasks are dropped, the function returns and 522 /// outstanding tasks are potentially leaked. 523 /// 524 /// # Examples 525 /// 526 /// ``` 527 /// use tokio::runtime::Runtime; 528 /// use tokio::task; 529 /// 530 /// use std::thread; 531 /// use std::time::Duration; 532 /// 533 /// fn main() { 534 /// let mut runtime = Runtime::new().unwrap(); 535 /// 536 /// runtime.block_on(async move { 537 /// task::spawn_blocking(move || { 538 /// thread::sleep(Duration::from_secs(10_000)); 539 /// }); 540 /// }); 541 /// 542 /// runtime.shutdown_timeout(Duration::from_millis(100)); 543 /// } 544 /// ``` shutdown_timeout(self, duration: Duration)545 pub fn shutdown_timeout(self, duration: Duration) { 546 let Runtime { 547 mut blocking_pool, .. 548 } = self; 549 blocking_pool.shutdown(Some(duration)); 550 } 551 552 /// Shutdown the runtime, without waiting for any spawned tasks to shutdown. 553 /// 554 /// This can be useful if you want to drop a runtime from within another runtime. 555 /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks 556 /// to complete, which would normally not be permitted within an asynchronous context. 557 /// By calling `shutdown_background()`, you can drop the runtime from such a context. 558 /// 559 /// Note however, that because we do not wait for any blocking tasks to complete, this 560 /// may result in a resource leak (in that any blocking tasks are still running until they 561 /// return. 562 /// 563 /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`. 564 /// 565 /// ``` 566 /// use tokio::runtime::Runtime; 567 /// 568 /// fn main() { 569 /// let mut runtime = Runtime::new().unwrap(); 570 /// 571 /// runtime.block_on(async move { 572 /// let inner_runtime = Runtime::new().unwrap(); 573 /// // ... 574 /// inner_runtime.shutdown_background(); 575 /// }); 576 /// } 577 /// ``` shutdown_background(self)578 pub fn shutdown_background(self) { 579 self.shutdown_timeout(Duration::from_nanos(0)) 580 } 581 } 582