1 //! Tokio-based single-threaded async runtime for the Actix ecosystem.
2 //!
3 //! In most parts of the the Actix ecosystem, it has been chosen to use !Send futures. For this
4 //! reason, a single-threaded runtime is appropriate since it is guaranteed that futures will not
5 //! be moved between threads. This can result in small performance improvements over cases where
6 //! atomics would otherwise be needed.
7 //!
8 //! To achieve similar performance to multi-threaded, work-stealing runtimes, applications
9 //! using `actix-rt` will create multiple, mostly disconnected, single-threaded runtimes.
10 //! This approach has good performance characteristics for workloads where the majority of tasks
11 //! have similar runtime expense.
12 //!
13 //! The disadvantage is that idle threads will not steal work from very busy, stuck or otherwise
14 //! backlogged threads. Tasks that are disproportionately expensive should be offloaded to the
15 //! blocking task thread-pool using [`task::spawn_blocking`].
16 //!
17 //! # Examples
18 //! ```
19 //! use std::sync::mpsc;
20 //! use actix_rt::{Arbiter, System};
21 //!
22 //! let _ = System::new();
23 //!
24 //! let (tx, rx) = mpsc::channel::<u32>();
25 //!
26 //! let arbiter = Arbiter::new();
27 //! arbiter.spawn_fn(move || tx.send(42).unwrap());
28 //!
29 //! let num = rx.recv().unwrap();
30 //! assert_eq!(num, 42);
31 //!
32 //! arbiter.stop();
33 //! arbiter.join().unwrap();
34 //! ```
35
36 #![deny(rust_2018_idioms, nonstandard_style)]
37 #![allow(clippy::type_complexity)]
38 #![warn(missing_docs)]
39 #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
40 #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
41
42 use std::future::Future;
43
44 use tokio::task::JoinHandle;
45
46 // Cannot define a main macro when compiled into test harness.
47 // Workaround for https://github.com/rust-lang/rust/issues/62127.
48 #[cfg(all(feature = "macros", not(test)))]
49 pub use actix_macros::{main, test};
50
51 mod arbiter;
52 mod runtime;
53 mod system;
54
55 pub use self::arbiter::{Arbiter, ArbiterHandle};
56 pub use self::runtime::Runtime;
57 pub use self::system::{System, SystemRunner};
58
59 pub use tokio::pin;
60
61 pub mod signal {
62 //! Asynchronous signal handling (Tokio re-exports).
63
64 #[cfg(unix)]
65 pub mod unix {
66 //! Unix specific signals (Tokio re-exports).
67 pub use tokio::signal::unix::*;
68 }
69 pub use tokio::signal::ctrl_c;
70 }
71
72 pub mod net {
73 //! TCP/UDP/Unix bindings (mostly Tokio re-exports).
74
75 use std::{
76 future::Future,
77 io,
78 task::{Context, Poll},
79 };
80
81 pub use tokio::io::Ready;
82 use tokio::io::{AsyncRead, AsyncWrite, Interest};
83 pub use tokio::net::UdpSocket;
84 pub use tokio::net::{TcpListener, TcpSocket, TcpStream};
85
86 #[cfg(unix)]
87 pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
88
89 /// Extension trait over async read+write types that can also signal readiness.
90 #[doc(hidden)]
91 pub trait ActixStream: AsyncRead + AsyncWrite + Unpin {
92 /// Poll stream and check read readiness of Self.
93 ///
94 /// See [tokio::net::TcpStream::poll_read_ready] for detail on intended use.
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>95 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>;
96
97 /// Poll stream and check write readiness of Self.
98 ///
99 /// See [tokio::net::TcpStream::poll_write_ready] for detail on intended use.
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>100 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>;
101 }
102
103 impl ActixStream for TcpStream {
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>104 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
105 let ready = self.ready(Interest::READABLE);
106 tokio::pin!(ready);
107 ready.poll(cx)
108 }
109
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>110 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
111 let ready = self.ready(Interest::WRITABLE);
112 tokio::pin!(ready);
113 ready.poll(cx)
114 }
115 }
116
117 #[cfg(unix)]
118 impl ActixStream for UnixStream {
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>119 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
120 let ready = self.ready(Interest::READABLE);
121 tokio::pin!(ready);
122 ready.poll(cx)
123 }
124
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>125 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
126 let ready = self.ready(Interest::WRITABLE);
127 tokio::pin!(ready);
128 ready.poll(cx)
129 }
130 }
131
132 impl<Io: ActixStream + ?Sized> ActixStream for Box<Io> {
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>133 fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
134 (**self).poll_read_ready(cx)
135 }
136
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>137 fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
138 (**self).poll_write_ready(cx)
139 }
140 }
141 }
142
143 pub mod time {
144 //! Utilities for tracking time (Tokio re-exports).
145
146 pub use tokio::time::Instant;
147 pub use tokio::time::{interval, interval_at, Interval};
148 pub use tokio::time::{sleep, sleep_until, Sleep};
149 pub use tokio::time::{timeout, Timeout};
150 }
151
152 pub mod task {
153 //! Task management (Tokio re-exports).
154
155 pub use tokio::task::{spawn_blocking, yield_now, JoinError, JoinHandle};
156 }
157
158 /// Spawns a future on the current thread.
159 ///
160 /// # Panics
161 /// Panics if Actix system is not running.
162 #[inline]
spawn<Fut>(f: Fut) -> JoinHandle<()> where Fut: Future<Output = ()> + 'static,163 pub fn spawn<Fut>(f: Fut) -> JoinHandle<()>
164 where
165 Fut: Future<Output = ()> + 'static,
166 {
167 tokio::task::spawn_local(f)
168 }
169