1 #![cfg_attr(not(feature = "rt"), allow(dead_code))]
2
3 mod interest;
4 #[allow(unreachable_pub)]
5 pub use interest::Interest;
6
7 mod ready;
8 #[allow(unreachable_pub)]
9 pub use ready::Ready;
10
11 mod registration;
12 pub(crate) use registration::Registration;
13
14 mod scheduled_io;
15 use scheduled_io::ScheduledIo;
16
17 use crate::park::{Park, Unpark};
18 use crate::util::slab::{self, Slab};
19 use crate::{loom::sync::Mutex, util::bit};
20
21 use std::fmt;
22 use std::io;
23 use std::sync::{Arc, Weak};
24 use std::time::Duration;
25
26 /// I/O driver, backed by Mio.
27 pub(crate) struct Driver {
28 /// Tracks the number of times `turn` is called. It is safe for this to wrap
29 /// as it is mostly used to determine when to call `compact()`.
30 tick: u8,
31
32 /// Reuse the `mio::Events` value across calls to poll.
33 events: Option<mio::Events>,
34
35 /// Primary slab handle containing the state for each resource registered
36 /// with this driver. During Drop this is moved into the Inner structure, so
37 /// this is an Option to allow it to be vacated (until Drop this is always
38 /// Some).
39 resources: Option<Slab<ScheduledIo>>,
40
41 /// The system event queue.
42 poll: mio::Poll,
43
44 /// State shared between the reactor and the handles.
45 inner: Arc<Inner>,
46 }
47
48 /// A reference to an I/O driver.
49 #[derive(Clone)]
50 pub(crate) struct Handle {
51 inner: Weak<Inner>,
52 }
53
54 #[derive(Debug)]
55 pub(crate) struct ReadyEvent {
56 tick: u8,
57 pub(crate) ready: Ready,
58 }
59
60 pub(super) struct Inner {
61 /// Primary slab handle containing the state for each resource registered
62 /// with this driver.
63 ///
64 /// The ownership of this slab is moved into this structure during
65 /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles
66 /// without risking new ones being registered in the meantime.
67 resources: Mutex<Option<Slab<ScheduledIo>>>,
68
69 /// Registers I/O resources.
70 registry: mio::Registry,
71
72 /// Allocates `ScheduledIo` handles when creating new resources.
73 pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
74
75 /// Used to wake up the reactor from a call to `turn`.
76 waker: mio::Waker,
77 }
78
79 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
80 enum Direction {
81 Read,
82 Write,
83 }
84
85 enum Tick {
86 Set(u8),
87 Clear(u8),
88 }
89
90 // TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
91 // token.
92 const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
93
94 const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
95
96 // Packs the generation value in the `readiness` field.
97 //
98 // The generation prevents a race condition where a slab slot is reused for a
99 // new socket while the I/O driver is about to apply a readiness event. The
100 // generation value is checked when setting new readiness. If the generation do
101 // not match, then the readiness event is discarded.
102 const GENERATION: bit::Pack = ADDRESS.then(7);
103
_assert_kinds()104 fn _assert_kinds() {
105 fn _assert<T: Send + Sync>() {}
106
107 _assert::<Handle>();
108 }
109
110 // ===== impl Driver =====
111
112 impl Driver {
113 /// Creates a new event loop, returning any error that happened during the
114 /// creation.
new() -> io::Result<Driver>115 pub(crate) fn new() -> io::Result<Driver> {
116 let poll = mio::Poll::new()?;
117 let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
118 let registry = poll.registry().try_clone()?;
119
120 let slab = Slab::new();
121 let allocator = slab.allocator();
122
123 Ok(Driver {
124 tick: 0,
125 events: Some(mio::Events::with_capacity(1024)),
126 poll,
127 resources: Some(slab),
128 inner: Arc::new(Inner {
129 resources: Mutex::new(None),
130 registry,
131 io_dispatch: allocator,
132 waker,
133 }),
134 })
135 }
136
137 /// Returns a handle to this event loop which can be sent across threads
138 /// and can be used as a proxy to the event loop itself.
139 ///
140 /// Handles are cloneable and clones always refer to the same event loop.
141 /// This handle is typically passed into functions that create I/O objects
142 /// to bind them to this event loop.
handle(&self) -> Handle143 pub(crate) fn handle(&self) -> Handle {
144 Handle {
145 inner: Arc::downgrade(&self.inner),
146 }
147 }
148
turn(&mut self, max_wait: Option<Duration>) -> io::Result<()>149 fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
150 // How often to call `compact()` on the resource slab
151 const COMPACT_INTERVAL: u8 = 255;
152
153 self.tick = self.tick.wrapping_add(1);
154
155 if self.tick == COMPACT_INTERVAL {
156 self.resources.as_mut().unwrap().compact()
157 }
158
159 let mut events = self.events.take().expect("i/o driver event store missing");
160
161 // Block waiting for an event to happen, peeling out how many events
162 // happened.
163 match self.poll.poll(&mut events, max_wait) {
164 Ok(_) => {}
165 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
166 Err(e) => return Err(e),
167 }
168
169 // Process all the events that came in, dispatching appropriately
170 for event in events.iter() {
171 let token = event.token();
172
173 if token != TOKEN_WAKEUP {
174 self.dispatch(token, Ready::from_mio(event));
175 }
176 }
177
178 self.events = Some(events);
179
180 Ok(())
181 }
182
dispatch(&mut self, token: mio::Token, ready: Ready)183 fn dispatch(&mut self, token: mio::Token, ready: Ready) {
184 let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
185
186 let resources = self.resources.as_mut().unwrap();
187
188 let io = match resources.get(addr) {
189 Some(io) => io,
190 None => return,
191 };
192
193 let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready);
194
195 if res.is_err() {
196 // token no longer valid!
197 return;
198 }
199
200 io.wake(ready);
201 }
202 }
203
204 impl Drop for Driver {
drop(&mut self)205 fn drop(&mut self) {
206 (*self.inner.resources.lock()) = self.resources.take();
207 }
208 }
209
210 impl Drop for Inner {
drop(&mut self)211 fn drop(&mut self) {
212 let resources = self.resources.lock().take();
213
214 if let Some(mut slab) = resources {
215 slab.for_each(|io| {
216 // If a task is waiting on the I/O resource, notify it. The task
217 // will then attempt to use the I/O resource and fail due to the
218 // driver being shutdown.
219 io.shutdown();
220 });
221 }
222 }
223 }
224
225 impl Park for Driver {
226 type Unpark = Handle;
227 type Error = io::Error;
228
unpark(&self) -> Self::Unpark229 fn unpark(&self) -> Self::Unpark {
230 self.handle()
231 }
232
park(&mut self) -> io::Result<()>233 fn park(&mut self) -> io::Result<()> {
234 self.turn(None)?;
235 Ok(())
236 }
237
park_timeout(&mut self, duration: Duration) -> io::Result<()>238 fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
239 self.turn(Some(duration))?;
240 Ok(())
241 }
242
shutdown(&mut self)243 fn shutdown(&mut self) {}
244 }
245
246 impl fmt::Debug for Driver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 write!(f, "Driver")
249 }
250 }
251
252 // ===== impl Handle =====
253
254 cfg_rt! {
255 impl Handle {
256 /// Returns a handle to the current reactor.
257 ///
258 /// # Panics
259 ///
260 /// This function panics if there is no current reactor set and `rt` feature
261 /// flag is not enabled.
262 pub(super) fn current() -> Self {
263 crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
264 }
265 }
266 }
267
268 cfg_not_rt! {
269 impl Handle {
270 /// Returns a handle to the current reactor.
271 ///
272 /// # Panics
273 ///
274 /// This function panics if there is no current reactor set, or if the `rt`
275 /// feature flag is not enabled.
276 pub(super) fn current() -> Self {
277 panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
278 }
279 }
280 }
281
282 impl Handle {
283 /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
284 /// makes the next call to `turn` return immediately.
285 ///
286 /// This method is intended to be used in situations where a notification
287 /// needs to otherwise be sent to the main reactor. If the reactor is
288 /// currently blocked inside of `turn` then it will wake up and soon return
289 /// after this method has been called. If the reactor is not currently
290 /// blocked in `turn`, then the next call to `turn` will not block and
291 /// return immediately.
wakeup(&self)292 fn wakeup(&self) {
293 if let Some(inner) = self.inner() {
294 inner.waker.wake().expect("failed to wake I/O driver");
295 }
296 }
297
inner(&self) -> Option<Arc<Inner>>298 pub(super) fn inner(&self) -> Option<Arc<Inner>> {
299 self.inner.upgrade()
300 }
301 }
302
303 impl Unpark for Handle {
unpark(&self)304 fn unpark(&self) {
305 self.wakeup();
306 }
307 }
308
309 impl fmt::Debug for Handle {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311 write!(f, "Handle")
312 }
313 }
314
315 // ===== impl Inner =====
316
317 impl Inner {
318 /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
319 ///
320 /// The registration token is returned.
add_source( &self, source: &mut impl mio::event::Source, interest: Interest, ) -> io::Result<slab::Ref<ScheduledIo>>321 pub(super) fn add_source(
322 &self,
323 source: &mut impl mio::event::Source,
324 interest: Interest,
325 ) -> io::Result<slab::Ref<ScheduledIo>> {
326 let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
327 io::Error::new(
328 io::ErrorKind::Other,
329 "reactor at max registered I/O resources",
330 )
331 })?;
332
333 let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
334
335 self.registry
336 .register(source, mio::Token(token), interest.to_mio())?;
337
338 Ok(shared)
339 }
340
341 /// Deregisters an I/O resource from the reactor.
deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()>342 pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
343 self.registry.deregister(source)
344 }
345 }
346
347 impl Direction {
mask(self) -> Ready348 pub(super) fn mask(self) -> Ready {
349 match self {
350 Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
351 Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
352 }
353 }
354 }
355