1 //! The Tokio runtime.
2 //!
3 //! Unlike other Rust programs, asynchronous applications require
4 //! runtime support. In particular, the following runtime services are
5 //! necessary:
6 //!
7 //! * An **I/O event loop**, called the driver, which drives I/O resources and
8 //!   dispatches I/O events to tasks that depend on them.
9 //! * A **scheduler** to execute [tasks] that use these I/O resources.
10 //! * A **timer** for scheduling work to run after a set period of time.
11 //!
12 //! Tokio's [`Runtime`] bundles all of these services as a single type, allowing
13 //! them to be started, shut down, and configured together. However, most
14 //! applications won't need to use [`Runtime`] directly. Instead, they can
15 //! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under
16 //! the hood.
17 //!
18 //! # Usage
19 //!
20 //! Most applications will use the [`tokio::main`] attribute macro.
21 //!
22 //! ```no_run
23 //! use tokio::net::TcpListener;
24 //! use tokio::prelude::*;
25 //!
26 //! #[tokio::main]
27 //! async fn main() -> Result<(), Box<dyn std::error::Error>> {
28 //!     let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
29 //!
30 //!     loop {
31 //!         let (mut socket, _) = listener.accept().await?;
32 //!
33 //!         tokio::spawn(async move {
34 //!             let mut buf = [0; 1024];
35 //!
36 //!             // In a loop, read data from the socket and write the data back.
37 //!             loop {
38 //!                 let n = match socket.read(&mut buf).await {
39 //!                     // socket closed
40 //!                     Ok(n) if n == 0 => return,
41 //!                     Ok(n) => n,
42 //!                     Err(e) => {
43 //!                         println!("failed to read from socket; err = {:?}", e);
44 //!                         return;
45 //!                     }
46 //!                 };
47 //!
48 //!                 // Write the data back
49 //!                 if let Err(e) = socket.write_all(&buf[0..n]).await {
50 //!                     println!("failed to write to socket; err = {:?}", e);
51 //!                     return;
52 //!                 }
53 //!             }
54 //!         });
55 //!     }
56 //! }
57 //! ```
58 //!
59 //! From within the context of the runtime, additional tasks are spawned using
60 //! the [`tokio::spawn`] function. Futures spawned using this function will be
61 //! executed on the same thread pool used by the [`Runtime`].
62 //!
63 //! A [`Runtime`] instance can also be used directly.
64 //!
65 //! ```no_run
66 //! use tokio::net::TcpListener;
67 //! use tokio::prelude::*;
68 //! use tokio::runtime::Runtime;
69 //!
70 //! fn main() -> Result<(), Box<dyn std::error::Error>> {
71 //!     // Create the runtime
72 //!     let mut rt = Runtime::new()?;
73 //!
74 //!     // Spawn the root task
75 //!     rt.block_on(async {
76 //!         let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
77 //!
78 //!         loop {
79 //!             let (mut socket, _) = listener.accept().await?;
80 //!
81 //!             tokio::spawn(async move {
82 //!                 let mut buf = [0; 1024];
83 //!
84 //!                 // In a loop, read data from the socket and write the data back.
85 //!                 loop {
86 //!                     let n = match socket.read(&mut buf).await {
87 //!                         // socket closed
88 //!                         Ok(n) if n == 0 => return,
89 //!                         Ok(n) => n,
90 //!                         Err(e) => {
91 //!                             println!("failed to read from socket; err = {:?}", e);
92 //!                             return;
93 //!                         }
94 //!                     };
95 //!
96 //!                     // Write the data back
97 //!                     if let Err(e) = socket.write_all(&buf[0..n]).await {
98 //!                         println!("failed to write to socket; err = {:?}", e);
99 //!                         return;
100 //!                     }
101 //!                 }
102 //!             });
103 //!         }
104 //!     })
105 //! }
106 //! ```
107 //!
108 //! ## Runtime Configurations
109 //!
110 //! Tokio provides multiple task scheduling strategies, suitable for different
111 //! applications. The [runtime builder] or `#[tokio::main]` attribute may be
112 //! used to select which scheduler to use.
113 //!
114 //! #### Basic Scheduler
115 //!
116 //! The basic scheduler provides a _single-threaded_ future executor. All tasks
117 //! will be created and executed on the current thread. The basic scheduler
118 //! requires the `rt-core` feature flag, and can be selected using the
119 //! [`Builder::basic_scheduler`] method:
120 //! ```
121 //! use tokio::runtime;
122 //!
123 //! # fn main() -> Result<(), Box<dyn std::error::Error>> {
124 //! let basic_rt = runtime::Builder::new()
125 //!     .basic_scheduler()
126 //!     .build()?;
127 //! # Ok(()) }
128 //! ```
129 //!
130 //! If the `rt-core` feature is enabled and `rt-threaded` is not,
131 //! [`Runtime::new`] will return a basic scheduler runtime by default.
132 //!
133 //! #### Threaded Scheduler
134 //!
135 //! The threaded scheduler executes futures on a _thread pool_, using a
136 //! work-stealing strategy. By default, it will start a worker thread for each
137 //! CPU core available on the system. This tends to be the ideal configurations
138 //! for most applications. The threaded scheduler requires the `rt-threaded` feature
139 //! flag, and can be selected using the  [`Builder::threaded_scheduler`] method:
140 //! ```
141 //! use tokio::runtime;
142 //!
143 //! # fn main() -> Result<(), Box<dyn std::error::Error>> {
144 //! let threaded_rt = runtime::Builder::new()
145 //!     .threaded_scheduler()
146 //!     .build()?;
147 //! # Ok(()) }
148 //! ```
149 //!
150 //! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a
151 //! threaded scheduler runtime by default.
152 //!
153 //! Most applications should use the threaded scheduler, except in some niche
154 //! use-cases, such as when running only a single thread is required.
155 //!
156 //! #### Resource drivers
157 //!
158 //! When configuring a runtime by hand, no resource drivers are enabled by
159 //! default. In this case, attempting to use networking types or time types will
160 //! fail. In order to enable these types, the resource drivers must be enabled.
161 //! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a
162 //! shorthand, [`Builder::enable_all`] enables both resource drivers.
163 //!
164 //! ## Lifetime of spawned threads
165 //!
166 //! The runtime may spawn threads depending on its configuration and usage. The
167 //! threaded scheduler spawns threads to schedule tasks and calls to
168 //! `spawn_blocking` spawn threads to run blocking operations.
169 //!
170 //! While the `Runtime` is active, threads may shutdown after periods of being
171 //! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown.
172 //! Any tasks that have not yet completed will be dropped.
173 //!
174 //! [tasks]: crate::task
175 //! [`Runtime`]: Runtime
176 //! [`tokio::spawn`]: crate::spawn
177 //! [`tokio::main`]: ../attr.main.html
178 //! [runtime builder]: crate::runtime::Builder
179 //! [`Runtime::new`]: crate::runtime::Runtime::new
180 //! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler
181 //! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler
182 //! [`Builder::enable_io`]: crate::runtime::Builder::enable_io
183 //! [`Builder::enable_time`]: crate::runtime::Builder::enable_time
184 //! [`Builder::enable_all`]: crate::runtime::Builder::enable_all
185 
186 // At the top due to macros
187 #[cfg(test)]
188 #[macro_use]
189 mod tests;
190 
191 pub(crate) mod context;
192 
193 cfg_rt_core! {
194     mod basic_scheduler;
195     use basic_scheduler::BasicScheduler;
196 
197     pub(crate) mod task;
198 }
199 
200 mod blocking;
201 use blocking::BlockingPool;
202 
203 cfg_blocking_impl! {
204     #[allow(unused_imports)]
205     pub(crate) use blocking::{spawn_blocking, try_spawn_blocking};
206 }
207 
208 mod builder;
209 pub use self::builder::Builder;
210 
211 pub(crate) mod enter;
212 use self::enter::enter;
213 
214 mod handle;
215 pub use self::handle::{Handle, TryCurrentError};
216 
217 mod io;
218 
219 cfg_rt_threaded! {
220     mod park;
221     use park::Parker;
222 }
223 
224 mod shell;
225 use self::shell::Shell;
226 
227 mod spawner;
228 use self::spawner::Spawner;
229 
230 mod time;
231 
232 cfg_rt_threaded! {
233     mod queue;
234 
235     pub(crate) mod thread_pool;
236     use self::thread_pool::ThreadPool;
237 }
238 
239 cfg_rt_core! {
240     use crate::task::JoinHandle;
241 }
242 
243 use std::future::Future;
244 use std::time::Duration;
245 
246 /// The Tokio runtime.
247 ///
248 /// The runtime provides an I/O driver, task scheduler, [timer], and blocking
249 /// pool, necessary for running asynchronous tasks.
250 ///
251 /// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
252 /// most users will use the `#[tokio::main]` annotation on their entry point instead.
253 ///
254 /// See [module level][mod] documentation for more details.
255 ///
256 /// # Shutdown
257 ///
258 /// Shutting down the runtime is done by dropping the value. The current thread
259 /// will block until the shut down operation has completed.
260 ///
261 /// * Drain any scheduled work queues.
262 /// * Drop any futures that have not yet completed.
263 /// * Drop the reactor.
264 ///
265 /// Once the reactor has dropped, any outstanding I/O resources bound to
266 /// that reactor will no longer function. Calling any method on them will
267 /// result in an error.
268 ///
269 /// [timer]: crate::time
270 /// [mod]: index.html
271 /// [`new`]: method@Self::new
272 /// [`Builder`]: struct@Builder
273 /// [`tokio::run`]: fn@run
274 #[derive(Debug)]
275 pub struct Runtime {
276     /// Task executor
277     kind: Kind,
278 
279     /// Handle to runtime, also contains driver handles
280     handle: Handle,
281 
282     /// Blocking pool handle, used to signal shutdown
283     blocking_pool: BlockingPool,
284 }
285 
286 /// The runtime executor is either a thread-pool or a current-thread executor.
287 #[derive(Debug)]
288 enum Kind {
289     /// Not able to execute concurrent tasks. This variant is mostly used to get
290     /// access to the driver handles.
291     Shell(Shell),
292 
293     /// Execute all tasks on the current-thread.
294     #[cfg(feature = "rt-core")]
295     Basic(BasicScheduler<time::Driver>),
296 
297     /// Execute tasks across multiple threads.
298     #[cfg(feature = "rt-threaded")]
299     ThreadPool(ThreadPool),
300 }
301 
302 /// After thread starts / before thread stops
303 type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
304 
305 impl Runtime {
306     /// Create a new runtime instance with default configuration values.
307     ///
308     /// This results in a scheduler, I/O driver, and time driver being
309     /// initialized. The type of scheduler used depends on what feature flags
310     /// are enabled: if the `rt-threaded` feature is enabled, the [threaded
311     /// scheduler] is used, while if only the `rt-core` feature is enabled, the
312     /// [basic scheduler] is used instead.
313     ///
314     /// If the threaded scheduler is selected, it will not spawn
315     /// any worker threads until it needs to, i.e. tasks are scheduled to run.
316     ///
317     /// Most applications will not need to call this function directly. Instead,
318     /// they will use the  [`#[tokio::main]` attribute][main]. When more complex
319     /// configuration is necessary, the [runtime builder] may be used.
320     ///
321     /// See [module level][mod] documentation for more details.
322     ///
323     /// # Examples
324     ///
325     /// Creating a new `Runtime` with default configuration values.
326     ///
327     /// ```
328     /// use tokio::runtime::Runtime;
329     ///
330     /// let rt = Runtime::new()
331     ///     .unwrap();
332     ///
333     /// // Use the runtime...
334     /// ```
335     ///
336     /// [mod]: index.html
337     /// [main]: ../attr.main.html
338     /// [threaded scheduler]: index.html#threaded-scheduler
339     /// [basic scheduler]: index.html#basic-scheduler
340     /// [runtime builder]: crate::runtime::Builder
new() -> io::Result<Runtime>341     pub fn new() -> io::Result<Runtime> {
342         #[cfg(feature = "rt-threaded")]
343         let ret = Builder::new().threaded_scheduler().enable_all().build();
344 
345         #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))]
346         let ret = Builder::new().basic_scheduler().enable_all().build();
347 
348         #[cfg(not(feature = "rt-core"))]
349         let ret = Builder::new().enable_all().build();
350 
351         ret
352     }
353 
354     /// Spawn a future onto the Tokio runtime.
355     ///
356     /// This spawns the given future onto the runtime's executor, usually a
357     /// thread pool. The thread pool is then responsible for polling the future
358     /// until it completes.
359     ///
360     /// See [module level][mod] documentation for more details.
361     ///
362     /// [mod]: index.html
363     ///
364     /// # Examples
365     ///
366     /// ```
367     /// use tokio::runtime::Runtime;
368     ///
369     /// # fn dox() {
370     /// // Create the runtime
371     /// let rt = Runtime::new().unwrap();
372     ///
373     /// // Spawn a future onto the runtime
374     /// rt.spawn(async {
375     ///     println!("now running on a worker thread");
376     /// });
377     /// # }
378     /// ```
379     ///
380     /// # Panics
381     ///
382     /// This function will not panic unless task execution is disabled on the
383     /// executor. This can only happen if the runtime was built using
384     /// [`Builder`] without picking either [`basic_scheduler`] or
385     /// [`threaded_scheduler`].
386     ///
387     /// [`Builder`]: struct@Builder
388     /// [`threaded_scheduler`]: fn@Builder::threaded_scheduler
389     /// [`basic_scheduler`]: fn@Builder::basic_scheduler
390     #[cfg(feature = "rt-core")]
spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,391     pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
392     where
393         F: Future + Send + 'static,
394         F::Output: Send + 'static,
395     {
396         match &self.kind {
397             Kind::Shell(_) => panic!("task execution disabled"),
398             #[cfg(feature = "rt-threaded")]
399             Kind::ThreadPool(exec) => exec.spawn(future),
400             Kind::Basic(exec) => exec.spawn(future),
401         }
402     }
403 
404     /// Run a future to completion on the Tokio runtime. This is the runtime's
405     /// entry point.
406     ///
407     /// This runs the given future on the runtime, blocking until it is
408     /// complete, and yielding its resolved result. Any tasks or timers which
409     /// the future spawns internally will be executed on the runtime.
410     ///
411     /// `&mut` is required as calling `block_on` **may** result in advancing the
412     /// state of the runtime. The details depend on how the runtime is
413     /// configured. [`runtime::Handle::block_on`][handle] provides a version
414     /// that takes `&self`.
415     ///
416     /// This method may not be called from an asynchronous context.
417     ///
418     /// # Panics
419     ///
420     /// This function panics if the provided future panics, or if called within an
421     /// asynchronous execution context.
422     ///
423     /// # Examples
424     ///
425     /// ```no_run
426     /// use tokio::runtime::Runtime;
427     ///
428     /// // Create the runtime
429     /// let mut rt = Runtime::new().unwrap();
430     ///
431     /// // Execute the future, blocking the current thread until completion
432     /// rt.block_on(async {
433     ///     println!("hello");
434     /// });
435     /// ```
436     ///
437     /// [handle]: fn@Handle::block_on
block_on<F: Future>(&mut self, future: F) -> F::Output438     pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
439         let kind = &mut self.kind;
440 
441         self.handle.enter(|| match kind {
442             Kind::Shell(exec) => exec.block_on(future),
443             #[cfg(feature = "rt-core")]
444             Kind::Basic(exec) => exec.block_on(future),
445             #[cfg(feature = "rt-threaded")]
446             Kind::ThreadPool(exec) => exec.block_on(future),
447         })
448     }
449 
450     /// Enter the runtime context. This allows you to construct types that must
451     /// have an executor available on creation such as [`Delay`] or [`TcpStream`].
452     /// It will also allow you to call methods such as [`tokio::spawn`].
453     ///
454     /// This function is also available as [`Handle::enter`].
455     ///
456     /// [`Delay`]: struct@crate::time::Delay
457     /// [`TcpStream`]: struct@crate::net::TcpStream
458     /// [`Handle::enter`]: fn@crate::runtime::Handle::enter
459     /// [`tokio::spawn`]: fn@crate::spawn
460     ///
461     /// # Example
462     ///
463     /// ```
464     /// use tokio::runtime::Runtime;
465     ///
466     /// fn function_that_spawns(msg: String) {
467     ///     // Had we not used `rt.enter` below, this would panic.
468     ///     tokio::spawn(async move {
469     ///         println!("{}", msg);
470     ///     });
471     /// }
472     ///
473     /// fn main() {
474     ///     let rt = Runtime::new().unwrap();
475     ///
476     ///     let s = "Hello World!".to_string();
477     ///
478     ///     // By entering the context, we tie `tokio::spawn` to this executor.
479     ///     rt.enter(|| function_that_spawns(s));
480     /// }
481     /// ```
enter<F, R>(&self, f: F) -> R where F: FnOnce() -> R,482     pub fn enter<F, R>(&self, f: F) -> R
483     where
484         F: FnOnce() -> R,
485     {
486         self.handle.enter(f)
487     }
488 
489     /// Return a handle to the runtime's spawner.
490     ///
491     /// The returned handle can be used to spawn tasks that run on this runtime, and can
492     /// be cloned to allow moving the `Handle` to other threads.
493     ///
494     /// # Examples
495     ///
496     /// ```
497     /// use tokio::runtime::Runtime;
498     ///
499     /// let rt = Runtime::new()
500     ///     .unwrap();
501     ///
502     /// let handle = rt.handle();
503     ///
504     /// handle.spawn(async { println!("hello"); });
505     /// ```
handle(&self) -> &Handle506     pub fn handle(&self) -> &Handle {
507         &self.handle
508     }
509 
510     /// Shutdown the runtime, waiting for at most `duration` for all spawned
511     /// task to shutdown.
512     ///
513     /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to
514     /// shutdown in a timely fashion. However, dropping a `Runtime` will wait
515     /// indefinitely for all tasks to terminate, and there are cases where a long
516     /// blocking task has been spawned, which can block dropping `Runtime`.
517     ///
518     /// In this case, calling `shutdown_timeout` with an explicit wait timeout
519     /// can work. The `shutdown_timeout` will signal all tasks to shutdown and
520     /// will wait for at most `duration` for all spawned tasks to terminate. If
521     /// `timeout` elapses before all tasks are dropped, the function returns and
522     /// outstanding tasks are potentially leaked.
523     ///
524     /// # Examples
525     ///
526     /// ```
527     /// use tokio::runtime::Runtime;
528     /// use tokio::task;
529     ///
530     /// use std::thread;
531     /// use std::time::Duration;
532     ///
533     /// fn main() {
534     ///    let mut runtime = Runtime::new().unwrap();
535     ///
536     ///    runtime.block_on(async move {
537     ///        task::spawn_blocking(move || {
538     ///            thread::sleep(Duration::from_secs(10_000));
539     ///        });
540     ///    });
541     ///
542     ///    runtime.shutdown_timeout(Duration::from_millis(100));
543     /// }
544     /// ```
shutdown_timeout(self, duration: Duration)545     pub fn shutdown_timeout(self, duration: Duration) {
546         let Runtime {
547             mut blocking_pool, ..
548         } = self;
549         blocking_pool.shutdown(Some(duration));
550     }
551 
552     /// Shutdown the runtime, without waiting for any spawned tasks to shutdown.
553     ///
554     /// This can be useful if you want to drop a runtime from within another runtime.
555     /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
556     /// to complete, which would normally not be permitted within an asynchronous context.
557     /// By calling `shutdown_background()`, you can drop the runtime from such a context.
558     ///
559     /// Note however, that because we do not wait for any blocking tasks to complete, this
560     /// may result in a resource leak (in that any blocking tasks are still running until they
561     /// return.
562     ///
563     /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`.
564     ///
565     /// ```
566     /// use tokio::runtime::Runtime;
567     ///
568     /// fn main() {
569     ///    let mut runtime = Runtime::new().unwrap();
570     ///
571     ///    runtime.block_on(async move {
572     ///        let inner_runtime = Runtime::new().unwrap();
573     ///        // ...
574     ///        inner_runtime.shutdown_background();
575     ///    });
576     /// }
577     /// ```
shutdown_background(self)578     pub fn shutdown_background(self) {
579         self.shutdown_timeout(Duration::from_nanos(0))
580     }
581 }
582