1 pub(crate) mod platform;
2 
3 mod scheduled_io;
4 pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
5 
6 use crate::loom::sync::atomic::AtomicUsize;
7 use crate::park::{Park, Unpark};
8 use crate::runtime::context;
9 use crate::util::slab::{Address, Slab};
10 
11 use mio::event::Evented;
12 use std::fmt;
13 use std::io;
14 use std::sync::atomic::Ordering::SeqCst;
15 use std::sync::{Arc, Weak};
16 use std::task::Waker;
17 use std::time::Duration;
18 
19 /// I/O driver, backed by Mio
20 pub(crate) struct Driver {
21     /// Reuse the `mio::Events` value across calls to poll.
22     events: mio::Events,
23 
24     /// State shared between the reactor and the handles.
25     inner: Arc<Inner>,
26 
27     _wakeup_registration: mio::Registration,
28 }
29 
30 /// A reference to an I/O driver
31 #[derive(Clone)]
32 pub(crate) struct Handle {
33     inner: Weak<Inner>,
34 }
35 
36 pub(super) struct Inner {
37     /// The underlying system event queue.
38     io: mio::Poll,
39 
40     /// Dispatch slabs for I/O and futures events
41     pub(super) io_dispatch: Slab<ScheduledIo>,
42 
43     /// The number of sources in `io_dispatch`.
44     n_sources: AtomicUsize,
45 
46     /// Used to wake up the reactor from a call to `turn`
47     wakeup: mio::SetReadiness,
48 }
49 
50 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
51 pub(super) enum Direction {
52     Read,
53     Write,
54 }
55 
56 const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
57 
_assert_kinds()58 fn _assert_kinds() {
59     fn _assert<T: Send + Sync>() {}
60 
61     _assert::<Handle>();
62 }
63 
64 // ===== impl Driver =====
65 
66 impl Driver {
67     /// Creates a new event loop, returning any error that happened during the
68     /// creation.
new() -> io::Result<Driver>69     pub(crate) fn new() -> io::Result<Driver> {
70         let io = mio::Poll::new()?;
71         let wakeup_pair = mio::Registration::new2();
72 
73         io.register(
74             &wakeup_pair.0,
75             TOKEN_WAKEUP,
76             mio::Ready::readable(),
77             mio::PollOpt::level(),
78         )?;
79 
80         Ok(Driver {
81             events: mio::Events::with_capacity(1024),
82             _wakeup_registration: wakeup_pair.0,
83             inner: Arc::new(Inner {
84                 io,
85                 io_dispatch: Slab::new(),
86                 n_sources: AtomicUsize::new(0),
87                 wakeup: wakeup_pair.1,
88             }),
89         })
90     }
91 
92     /// Returns a handle to this event loop which can be sent across threads
93     /// and can be used as a proxy to the event loop itself.
94     ///
95     /// Handles are cloneable and clones always refer to the same event loop.
96     /// This handle is typically passed into functions that create I/O objects
97     /// to bind them to this event loop.
handle(&self) -> Handle98     pub(crate) fn handle(&self) -> Handle {
99         Handle {
100             inner: Arc::downgrade(&self.inner),
101         }
102     }
103 
turn(&mut self, max_wait: Option<Duration>) -> io::Result<()>104     fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
105         // Block waiting for an event to happen, peeling out how many events
106         // happened.
107         match self.inner.io.poll(&mut self.events, max_wait) {
108             Ok(_) => {}
109             Err(e) => return Err(e),
110         }
111 
112         // Process all the events that came in, dispatching appropriately
113 
114         for event in self.events.iter() {
115             let token = event.token();
116 
117             if token == TOKEN_WAKEUP {
118                 self.inner
119                     .wakeup
120                     .set_readiness(mio::Ready::empty())
121                     .unwrap();
122             } else {
123                 self.dispatch(token, event.readiness());
124             }
125         }
126 
127         Ok(())
128     }
129 
dispatch(&self, token: mio::Token, ready: mio::Ready)130     fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
131         let mut rd = None;
132         let mut wr = None;
133 
134         let address = Address::from_usize(token.0);
135 
136         let io = match self.inner.io_dispatch.get(address) {
137             Some(io) => io,
138             None => return,
139         };
140 
141         if io
142             .set_readiness(address, |curr| curr | ready.as_usize())
143             .is_err()
144         {
145             // token no longer valid!
146             return;
147         }
148 
149         if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
150             wr = io.writer.take_waker();
151         }
152 
153         if !(ready & (!mio::Ready::writable())).is_empty() {
154             rd = io.reader.take_waker();
155         }
156 
157         if let Some(w) = rd {
158             w.wake();
159         }
160 
161         if let Some(w) = wr {
162             w.wake();
163         }
164     }
165 }
166 
167 impl Park for Driver {
168     type Unpark = Handle;
169     type Error = io::Error;
170 
unpark(&self) -> Self::Unpark171     fn unpark(&self) -> Self::Unpark {
172         self.handle()
173     }
174 
park(&mut self) -> io::Result<()>175     fn park(&mut self) -> io::Result<()> {
176         self.turn(None)?;
177         Ok(())
178     }
179 
park_timeout(&mut self, duration: Duration) -> io::Result<()>180     fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
181         self.turn(Some(duration))?;
182         Ok(())
183     }
184 
shutdown(&mut self)185     fn shutdown(&mut self) {}
186 }
187 
188 impl fmt::Debug for Driver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result189     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190         write!(f, "Driver")
191     }
192 }
193 
194 // ===== impl Handle =====
195 
196 impl Handle {
197     /// Returns a handle to the current reactor
198     ///
199     /// # Panics
200     ///
201     /// This function panics if there is no current reactor set.
current() -> Self202     pub(super) fn current() -> Self {
203         context::io_handle().expect(
204             "there is no reactor running, must be called from the context of a Tokio 0.2.x runtime",
205         )
206     }
207 
208     /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
209     /// makes the next call to `turn` return immediately.
210     ///
211     /// This method is intended to be used in situations where a notification
212     /// needs to otherwise be sent to the main reactor. If the reactor is
213     /// currently blocked inside of `turn` then it will wake up and soon return
214     /// after this method has been called. If the reactor is not currently
215     /// blocked in `turn`, then the next call to `turn` will not block and
216     /// return immediately.
wakeup(&self)217     fn wakeup(&self) {
218         if let Some(inner) = self.inner() {
219             inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
220         }
221     }
222 
inner(&self) -> Option<Arc<Inner>>223     pub(super) fn inner(&self) -> Option<Arc<Inner>> {
224         self.inner.upgrade()
225     }
226 }
227 
228 impl Unpark for Handle {
unpark(&self)229     fn unpark(&self) {
230         self.wakeup();
231     }
232 }
233 
234 impl fmt::Debug for Handle {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result235     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236         write!(f, "Handle")
237     }
238 }
239 
240 // ===== impl Inner =====
241 
242 impl Inner {
243     /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
244     ///
245     /// The registration token is returned.
add_source( &self, source: &dyn Evented, ready: mio::Ready, ) -> io::Result<Address>246     pub(super) fn add_source(
247         &self,
248         source: &dyn Evented,
249         ready: mio::Ready,
250     ) -> io::Result<Address> {
251         let address = self.io_dispatch.alloc().ok_or_else(|| {
252             io::Error::new(
253                 io::ErrorKind::Other,
254                 "reactor at max registered I/O resources",
255             )
256         })?;
257 
258         self.n_sources.fetch_add(1, SeqCst);
259 
260         self.io.register(
261             source,
262             mio::Token(address.to_usize()),
263             ready,
264             mio::PollOpt::edge(),
265         )?;
266 
267         Ok(address)
268     }
269 
270     /// Deregisters an I/O resource from the reactor.
deregister_source(&self, source: &dyn Evented) -> io::Result<()>271     pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
272         self.io.deregister(source)
273     }
274 
drop_source(&self, address: Address)275     pub(super) fn drop_source(&self, address: Address) {
276         self.io_dispatch.remove(address);
277         self.n_sources.fetch_sub(1, SeqCst);
278     }
279 
280     /// Registers interest in the I/O resource associated with `token`.
register(&self, token: Address, dir: Direction, w: Waker)281     pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
282         let sched = self
283             .io_dispatch
284             .get(token)
285             .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));
286 
287         let waker = match dir {
288             Direction::Read => &sched.reader,
289             Direction::Write => &sched.writer,
290         };
291 
292         waker.register(w);
293     }
294 }
295 
296 impl Direction {
mask(self) -> mio::Ready297     pub(super) fn mask(self) -> mio::Ready {
298         match self {
299             Direction::Read => {
300                 // Everything except writable is signaled through read.
301                 mio::Ready::all() - mio::Ready::writable()
302             }
303             Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(),
304         }
305     }
306 }
307 
308 #[cfg(all(test, loom))]
309 mod tests {
310     use super::*;
311     use loom::thread;
312 
313     // No-op `Evented` impl just so we can have something to pass to `add_source`.
314     struct NotEvented;
315 
316     impl Evented for NotEvented {
register( &self, _: &mio::Poll, _: mio::Token, _: mio::Ready, _: mio::PollOpt, ) -> io::Result<()>317         fn register(
318             &self,
319             _: &mio::Poll,
320             _: mio::Token,
321             _: mio::Ready,
322             _: mio::PollOpt,
323         ) -> io::Result<()> {
324             Ok(())
325         }
326 
reregister( &self, _: &mio::Poll, _: mio::Token, _: mio::Ready, _: mio::PollOpt, ) -> io::Result<()>327         fn reregister(
328             &self,
329             _: &mio::Poll,
330             _: mio::Token,
331             _: mio::Ready,
332             _: mio::PollOpt,
333         ) -> io::Result<()> {
334             Ok(())
335         }
336 
deregister(&self, _: &mio::Poll) -> io::Result<()>337         fn deregister(&self, _: &mio::Poll) -> io::Result<()> {
338             Ok(())
339         }
340     }
341 
342     #[test]
tokens_unique_when_dropped()343     fn tokens_unique_when_dropped() {
344         loom::model(|| {
345             let reactor = Driver::new().unwrap();
346             let inner = reactor.inner;
347             let inner2 = inner.clone();
348 
349             let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
350             let thread = thread::spawn(move || {
351                 inner2.drop_source(token_1);
352             });
353 
354             let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
355             thread.join().unwrap();
356 
357             assert!(token_1 != token_2);
358         })
359     }
360 
361     #[test]
tokens_unique_when_dropped_on_full_page()362     fn tokens_unique_when_dropped_on_full_page() {
363         loom::model(|| {
364             let reactor = Driver::new().unwrap();
365             let inner = reactor.inner;
366             let inner2 = inner.clone();
367             // add sources to fill up the first page so that the dropped index
368             // may be reused.
369             for _ in 0..31 {
370                 inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
371             }
372 
373             let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
374             let thread = thread::spawn(move || {
375                 inner2.drop_source(token_1);
376             });
377 
378             let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
379             thread.join().unwrap();
380 
381             assert!(token_1 != token_2);
382         })
383     }
384 
385     #[test]
tokens_unique_concurrent_add()386     fn tokens_unique_concurrent_add() {
387         loom::model(|| {
388             let reactor = Driver::new().unwrap();
389             let inner = reactor.inner;
390             let inner2 = inner.clone();
391 
392             let thread = thread::spawn(move || {
393                 let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap();
394                 token_2
395             });
396 
397             let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
398             let token_2 = thread.join().unwrap();
399 
400             assert!(token_1 != token_2);
401         })
402     }
403 }
404