1 //! Tokio-based single-threaded async runtime for the Actix ecosystem.
2 //!
3 //! In most parts of the the Actix ecosystem, it has been chosen to use !Send futures. For this
4 //! reason, a single-threaded runtime is appropriate since it is guaranteed that futures will not
5 //! be moved between threads. This can result in small performance improvements over cases where
6 //! atomics would otherwise be needed.
7 //!
8 //! To achieve similar performance to multi-threaded, work-stealing runtimes, applications
9 //! using `actix-rt` will create multiple, mostly disconnected, single-threaded runtimes.
10 //! This approach has good performance characteristics for workloads where the majority of tasks
11 //! have similar runtime expense.
12 //!
13 //! The disadvantage is that idle threads will not steal work from very busy, stuck or otherwise
14 //! backlogged threads. Tasks that are disproportionately expensive should be offloaded to the
15 //! blocking task thread-pool using [`task::spawn_blocking`].
16 //!
17 //! # Examples
18 //! ```
19 //! use std::sync::mpsc;
20 //! use actix_rt::{Arbiter, System};
21 //!
22 //! let _ = System::new();
23 //!
24 //! let (tx, rx) = mpsc::channel::<u32>();
25 //!
26 //! let arbiter = Arbiter::new();
27 //! arbiter.spawn_fn(move || tx.send(42).unwrap());
28 //!
29 //! let num = rx.recv().unwrap();
30 //! assert_eq!(num, 42);
31 //!
32 //! arbiter.stop();
33 //! arbiter.join().unwrap();
34 //! ```
35 
36 #![deny(rust_2018_idioms, nonstandard_style)]
37 #![allow(clippy::type_complexity)]
38 #![warn(missing_docs)]
39 #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
40 #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
41 
42 use std::future::Future;
43 
44 use tokio::task::JoinHandle;
45 
46 // Cannot define a main macro when compiled into test harness.
47 // Workaround for https://github.com/rust-lang/rust/issues/62127.
48 #[cfg(all(feature = "macros", not(test)))]
49 pub use actix_macros::{main, test};
50 
51 mod arbiter;
52 mod runtime;
53 mod system;
54 
55 pub use self::arbiter::{Arbiter, ArbiterHandle};
56 pub use self::runtime::Runtime;
57 pub use self::system::{System, SystemRunner};
58 
59 pub use tokio::pin;
60 
61 pub mod signal {
62     //! Asynchronous signal handling (Tokio re-exports).
63 
64     #[cfg(unix)]
65     pub mod unix {
66         //! Unix specific signals (Tokio re-exports).
67         pub use tokio::signal::unix::*;
68     }
69     pub use tokio::signal::ctrl_c;
70 }
71 
72 pub mod net {
73     //! TCP/UDP/Unix bindings (mostly Tokio re-exports).
74 
75     use std::{
76         future::Future,
77         io,
78         task::{Context, Poll},
79     };
80 
81     pub use tokio::io::Ready;
82     use tokio::io::{AsyncRead, AsyncWrite, Interest};
83     pub use tokio::net::UdpSocket;
84     pub use tokio::net::{TcpListener, TcpSocket, TcpStream};
85 
86     #[cfg(unix)]
87     pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
88 
89     /// Extension trait over async read+write types that can also signal readiness.
90     #[doc(hidden)]
91     pub trait ActixStream: AsyncRead + AsyncWrite + Unpin {
92         /// Poll stream and check read readiness of Self.
93         ///
94         /// See [tokio::net::TcpStream::poll_read_ready] for detail on intended use.
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>95         fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>;
96 
97         /// Poll stream and check write readiness of Self.
98         ///
99         /// See [tokio::net::TcpStream::poll_write_ready] for detail on intended use.
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>100         fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>;
101     }
102 
103     impl ActixStream for TcpStream {
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>104         fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
105             let ready = self.ready(Interest::READABLE);
106             tokio::pin!(ready);
107             ready.poll(cx)
108         }
109 
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>110         fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
111             let ready = self.ready(Interest::WRITABLE);
112             tokio::pin!(ready);
113             ready.poll(cx)
114         }
115     }
116 
117     #[cfg(unix)]
118     impl ActixStream for UnixStream {
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>119         fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
120             let ready = self.ready(Interest::READABLE);
121             tokio::pin!(ready);
122             ready.poll(cx)
123         }
124 
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>125         fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
126             let ready = self.ready(Interest::WRITABLE);
127             tokio::pin!(ready);
128             ready.poll(cx)
129         }
130     }
131 
132     impl<Io: ActixStream + ?Sized> ActixStream for Box<Io> {
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>133         fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
134             (**self).poll_read_ready(cx)
135         }
136 
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>137         fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
138             (**self).poll_write_ready(cx)
139         }
140     }
141 }
142 
143 pub mod time {
144     //! Utilities for tracking time (Tokio re-exports).
145 
146     pub use tokio::time::Instant;
147     pub use tokio::time::{interval, interval_at, Interval};
148     pub use tokio::time::{sleep, sleep_until, Sleep};
149     pub use tokio::time::{timeout, Timeout};
150 }
151 
152 pub mod task {
153     //! Task management (Tokio re-exports).
154 
155     pub use tokio::task::{spawn_blocking, yield_now, JoinError, JoinHandle};
156 }
157 
158 /// Spawns a future on the current thread.
159 ///
160 /// # Panics
161 /// Panics if Actix system is not running.
162 #[inline]
spawn<Fut>(f: Fut) -> JoinHandle<()> where Fut: Future<Output = ()> + 'static,163 pub fn spawn<Fut>(f: Fut) -> JoinHandle<()>
164 where
165     Fut: Future<Output = ()> + 'static,
166 {
167     tokio::task::spawn_local(f)
168 }
169