1 #![cfg_attr(not(feature = "rt"), allow(dead_code))]
2 
3 mod interest;
4 #[allow(unreachable_pub)]
5 pub use interest::Interest;
6 
7 mod ready;
8 #[allow(unreachable_pub)]
9 pub use ready::Ready;
10 
11 mod registration;
12 pub(crate) use registration::Registration;
13 
14 mod scheduled_io;
15 use scheduled_io::ScheduledIo;
16 
17 use crate::park::{Park, Unpark};
18 use crate::util::slab::{self, Slab};
19 use crate::{loom::sync::Mutex, util::bit};
20 
21 use std::fmt;
22 use std::io;
23 use std::sync::{Arc, Weak};
24 use std::time::Duration;
25 
26 /// I/O driver, backed by Mio.
27 pub(crate) struct Driver {
28     /// Tracks the number of times `turn` is called. It is safe for this to wrap
29     /// as it is mostly used to determine when to call `compact()`.
30     tick: u8,
31 
32     /// Reuse the `mio::Events` value across calls to poll.
33     events: Option<mio::Events>,
34 
35     /// Primary slab handle containing the state for each resource registered
36     /// with this driver. During Drop this is moved into the Inner structure, so
37     /// this is an Option to allow it to be vacated (until Drop this is always
38     /// Some).
39     resources: Option<Slab<ScheduledIo>>,
40 
41     /// The system event queue.
42     poll: mio::Poll,
43 
44     /// State shared between the reactor and the handles.
45     inner: Arc<Inner>,
46 }
47 
48 /// A reference to an I/O driver.
49 #[derive(Clone)]
50 pub(crate) struct Handle {
51     inner: Weak<Inner>,
52 }
53 
54 #[derive(Debug)]
55 pub(crate) struct ReadyEvent {
56     tick: u8,
57     pub(crate) ready: Ready,
58 }
59 
60 pub(super) struct Inner {
61     /// Primary slab handle containing the state for each resource registered
62     /// with this driver.
63     ///
64     /// The ownership of this slab is moved into this structure during
65     /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles
66     /// without risking new ones being registered in the meantime.
67     resources: Mutex<Option<Slab<ScheduledIo>>>,
68 
69     /// Registers I/O resources.
70     registry: mio::Registry,
71 
72     /// Allocates `ScheduledIo` handles when creating new resources.
73     pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
74 
75     /// Used to wake up the reactor from a call to `turn`.
76     waker: mio::Waker,
77 }
78 
79 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
80 enum Direction {
81     Read,
82     Write,
83 }
84 
85 enum Tick {
86     Set(u8),
87     Clear(u8),
88 }
89 
90 // TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
91 // token.
92 const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
93 
94 const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
95 
96 // Packs the generation value in the `readiness` field.
97 //
98 // The generation prevents a race condition where a slab slot is reused for a
99 // new socket while the I/O driver is about to apply a readiness event. The
100 // generation value is checked when setting new readiness. If the generation do
101 // not match, then the readiness event is discarded.
102 const GENERATION: bit::Pack = ADDRESS.then(7);
103 
_assert_kinds()104 fn _assert_kinds() {
105     fn _assert<T: Send + Sync>() {}
106 
107     _assert::<Handle>();
108 }
109 
110 // ===== impl Driver =====
111 
112 impl Driver {
113     /// Creates a new event loop, returning any error that happened during the
114     /// creation.
new() -> io::Result<Driver>115     pub(crate) fn new() -> io::Result<Driver> {
116         let poll = mio::Poll::new()?;
117         let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
118         let registry = poll.registry().try_clone()?;
119 
120         let slab = Slab::new();
121         let allocator = slab.allocator();
122 
123         Ok(Driver {
124             tick: 0,
125             events: Some(mio::Events::with_capacity(1024)),
126             poll,
127             resources: Some(slab),
128             inner: Arc::new(Inner {
129                 resources: Mutex::new(None),
130                 registry,
131                 io_dispatch: allocator,
132                 waker,
133             }),
134         })
135     }
136 
137     /// Returns a handle to this event loop which can be sent across threads
138     /// and can be used as a proxy to the event loop itself.
139     ///
140     /// Handles are cloneable and clones always refer to the same event loop.
141     /// This handle is typically passed into functions that create I/O objects
142     /// to bind them to this event loop.
handle(&self) -> Handle143     pub(crate) fn handle(&self) -> Handle {
144         Handle {
145             inner: Arc::downgrade(&self.inner),
146         }
147     }
148 
turn(&mut self, max_wait: Option<Duration>) -> io::Result<()>149     fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
150         // How often to call `compact()` on the resource slab
151         const COMPACT_INTERVAL: u8 = 255;
152 
153         self.tick = self.tick.wrapping_add(1);
154 
155         if self.tick == COMPACT_INTERVAL {
156             self.resources.as_mut().unwrap().compact()
157         }
158 
159         let mut events = self.events.take().expect("i/o driver event store missing");
160 
161         // Block waiting for an event to happen, peeling out how many events
162         // happened.
163         match self.poll.poll(&mut events, max_wait) {
164             Ok(_) => {}
165             Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
166             Err(e) => return Err(e),
167         }
168 
169         // Process all the events that came in, dispatching appropriately
170         for event in events.iter() {
171             let token = event.token();
172 
173             if token != TOKEN_WAKEUP {
174                 self.dispatch(token, Ready::from_mio(event));
175             }
176         }
177 
178         self.events = Some(events);
179 
180         Ok(())
181     }
182 
dispatch(&mut self, token: mio::Token, ready: Ready)183     fn dispatch(&mut self, token: mio::Token, ready: Ready) {
184         let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
185 
186         let resources = self.resources.as_mut().unwrap();
187 
188         let io = match resources.get(addr) {
189             Some(io) => io,
190             None => return,
191         };
192 
193         let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready);
194 
195         if res.is_err() {
196             // token no longer valid!
197             return;
198         }
199 
200         io.wake(ready);
201     }
202 }
203 
204 impl Drop for Driver {
drop(&mut self)205     fn drop(&mut self) {
206         (*self.inner.resources.lock()) = self.resources.take();
207     }
208 }
209 
210 impl Drop for Inner {
drop(&mut self)211     fn drop(&mut self) {
212         let resources = self.resources.lock().take();
213 
214         if let Some(mut slab) = resources {
215             slab.for_each(|io| {
216                 // If a task is waiting on the I/O resource, notify it. The task
217                 // will then attempt to use the I/O resource and fail due to the
218                 // driver being shutdown.
219                 io.shutdown();
220             });
221         }
222     }
223 }
224 
225 impl Park for Driver {
226     type Unpark = Handle;
227     type Error = io::Error;
228 
unpark(&self) -> Self::Unpark229     fn unpark(&self) -> Self::Unpark {
230         self.handle()
231     }
232 
park(&mut self) -> io::Result<()>233     fn park(&mut self) -> io::Result<()> {
234         self.turn(None)?;
235         Ok(())
236     }
237 
park_timeout(&mut self, duration: Duration) -> io::Result<()>238     fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
239         self.turn(Some(duration))?;
240         Ok(())
241     }
242 
shutdown(&mut self)243     fn shutdown(&mut self) {}
244 }
245 
246 impl fmt::Debug for Driver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result247     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248         write!(f, "Driver")
249     }
250 }
251 
252 // ===== impl Handle =====
253 
254 cfg_rt! {
255     impl Handle {
256         /// Returns a handle to the current reactor.
257         ///
258         /// # Panics
259         ///
260         /// This function panics if there is no current reactor set and `rt` feature
261         /// flag is not enabled.
262         pub(super) fn current() -> Self {
263             crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
264         }
265     }
266 }
267 
268 cfg_not_rt! {
269     impl Handle {
270         /// Returns a handle to the current reactor.
271         ///
272         /// # Panics
273         ///
274         /// This function panics if there is no current reactor set, or if the `rt`
275         /// feature flag is not enabled.
276         pub(super) fn current() -> Self {
277             panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
278         }
279     }
280 }
281 
282 impl Handle {
283     /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
284     /// makes the next call to `turn` return immediately.
285     ///
286     /// This method is intended to be used in situations where a notification
287     /// needs to otherwise be sent to the main reactor. If the reactor is
288     /// currently blocked inside of `turn` then it will wake up and soon return
289     /// after this method has been called. If the reactor is not currently
290     /// blocked in `turn`, then the next call to `turn` will not block and
291     /// return immediately.
wakeup(&self)292     fn wakeup(&self) {
293         if let Some(inner) = self.inner() {
294             inner.waker.wake().expect("failed to wake I/O driver");
295         }
296     }
297 
inner(&self) -> Option<Arc<Inner>>298     pub(super) fn inner(&self) -> Option<Arc<Inner>> {
299         self.inner.upgrade()
300     }
301 }
302 
303 impl Unpark for Handle {
unpark(&self)304     fn unpark(&self) {
305         self.wakeup();
306     }
307 }
308 
309 impl fmt::Debug for Handle {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result310     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311         write!(f, "Handle")
312     }
313 }
314 
315 // ===== impl Inner =====
316 
317 impl Inner {
318     /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
319     ///
320     /// The registration token is returned.
add_source( &self, source: &mut impl mio::event::Source, interest: Interest, ) -> io::Result<slab::Ref<ScheduledIo>>321     pub(super) fn add_source(
322         &self,
323         source: &mut impl mio::event::Source,
324         interest: Interest,
325     ) -> io::Result<slab::Ref<ScheduledIo>> {
326         let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
327             io::Error::new(
328                 io::ErrorKind::Other,
329                 "reactor at max registered I/O resources",
330             )
331         })?;
332 
333         let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
334 
335         self.registry
336             .register(source, mio::Token(token), interest.to_mio())?;
337 
338         Ok(shared)
339     }
340 
341     /// Deregisters an I/O resource from the reactor.
deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()>342     pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
343         self.registry.deregister(source)
344     }
345 }
346 
347 impl Direction {
mask(self) -> Ready348     pub(super) fn mask(self) -> Ready {
349         match self {
350             Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
351             Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
352         }
353     }
354 }
355