1 pub(crate) mod platform;
2
3 mod scheduled_io;
4 pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
5
6 use crate::loom::sync::atomic::AtomicUsize;
7 use crate::park::{Park, Unpark};
8 use crate::runtime::context;
9 use crate::util::slab::{Address, Slab};
10
11 use mio::event::Evented;
12 use std::fmt;
13 use std::io;
14 use std::sync::atomic::Ordering::SeqCst;
15 use std::sync::{Arc, Weak};
16 use std::task::Waker;
17 use std::time::Duration;
18
19 /// I/O driver, backed by Mio
20 pub(crate) struct Driver {
21 /// Reuse the `mio::Events` value across calls to poll.
22 events: mio::Events,
23
24 /// State shared between the reactor and the handles.
25 inner: Arc<Inner>,
26
27 _wakeup_registration: mio::Registration,
28 }
29
30 /// A reference to an I/O driver
31 #[derive(Clone)]
32 pub(crate) struct Handle {
33 inner: Weak<Inner>,
34 }
35
36 pub(super) struct Inner {
37 /// The underlying system event queue.
38 io: mio::Poll,
39
40 /// Dispatch slabs for I/O and futures events
41 pub(super) io_dispatch: Slab<ScheduledIo>,
42
43 /// The number of sources in `io_dispatch`.
44 n_sources: AtomicUsize,
45
46 /// Used to wake up the reactor from a call to `turn`
47 wakeup: mio::SetReadiness,
48 }
49
50 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
51 pub(super) enum Direction {
52 Read,
53 Write,
54 }
55
56 const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
57
_assert_kinds()58 fn _assert_kinds() {
59 fn _assert<T: Send + Sync>() {}
60
61 _assert::<Handle>();
62 }
63
64 // ===== impl Driver =====
65
66 impl Driver {
67 /// Creates a new event loop, returning any error that happened during the
68 /// creation.
new() -> io::Result<Driver>69 pub(crate) fn new() -> io::Result<Driver> {
70 let io = mio::Poll::new()?;
71 let wakeup_pair = mio::Registration::new2();
72
73 io.register(
74 &wakeup_pair.0,
75 TOKEN_WAKEUP,
76 mio::Ready::readable(),
77 mio::PollOpt::level(),
78 )?;
79
80 Ok(Driver {
81 events: mio::Events::with_capacity(1024),
82 _wakeup_registration: wakeup_pair.0,
83 inner: Arc::new(Inner {
84 io,
85 io_dispatch: Slab::new(),
86 n_sources: AtomicUsize::new(0),
87 wakeup: wakeup_pair.1,
88 }),
89 })
90 }
91
92 /// Returns a handle to this event loop which can be sent across threads
93 /// and can be used as a proxy to the event loop itself.
94 ///
95 /// Handles are cloneable and clones always refer to the same event loop.
96 /// This handle is typically passed into functions that create I/O objects
97 /// to bind them to this event loop.
handle(&self) -> Handle98 pub(crate) fn handle(&self) -> Handle {
99 Handle {
100 inner: Arc::downgrade(&self.inner),
101 }
102 }
103
turn(&mut self, max_wait: Option<Duration>) -> io::Result<()>104 fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
105 // Block waiting for an event to happen, peeling out how many events
106 // happened.
107 match self.inner.io.poll(&mut self.events, max_wait) {
108 Ok(_) => {}
109 Err(e) => return Err(e),
110 }
111
112 // Process all the events that came in, dispatching appropriately
113
114 for event in self.events.iter() {
115 let token = event.token();
116
117 if token == TOKEN_WAKEUP {
118 self.inner
119 .wakeup
120 .set_readiness(mio::Ready::empty())
121 .unwrap();
122 } else {
123 self.dispatch(token, event.readiness());
124 }
125 }
126
127 Ok(())
128 }
129
dispatch(&self, token: mio::Token, ready: mio::Ready)130 fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
131 let mut rd = None;
132 let mut wr = None;
133
134 let address = Address::from_usize(token.0);
135
136 let io = match self.inner.io_dispatch.get(address) {
137 Some(io) => io,
138 None => return,
139 };
140
141 if io
142 .set_readiness(address, |curr| curr | ready.as_usize())
143 .is_err()
144 {
145 // token no longer valid!
146 return;
147 }
148
149 if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
150 wr = io.writer.take_waker();
151 }
152
153 if !(ready & (!mio::Ready::writable())).is_empty() {
154 rd = io.reader.take_waker();
155 }
156
157 if let Some(w) = rd {
158 w.wake();
159 }
160
161 if let Some(w) = wr {
162 w.wake();
163 }
164 }
165 }
166
167 impl Park for Driver {
168 type Unpark = Handle;
169 type Error = io::Error;
170
unpark(&self) -> Self::Unpark171 fn unpark(&self) -> Self::Unpark {
172 self.handle()
173 }
174
park(&mut self) -> io::Result<()>175 fn park(&mut self) -> io::Result<()> {
176 self.turn(None)?;
177 Ok(())
178 }
179
park_timeout(&mut self, duration: Duration) -> io::Result<()>180 fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
181 self.turn(Some(duration))?;
182 Ok(())
183 }
184
shutdown(&mut self)185 fn shutdown(&mut self) {}
186 }
187
188 impl fmt::Debug for Driver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 write!(f, "Driver")
191 }
192 }
193
194 // ===== impl Handle =====
195
196 impl Handle {
197 /// Returns a handle to the current reactor
198 ///
199 /// # Panics
200 ///
201 /// This function panics if there is no current reactor set.
current() -> Self202 pub(super) fn current() -> Self {
203 context::io_handle().expect(
204 "there is no reactor running, must be called from the context of a Tokio 0.2.x runtime",
205 )
206 }
207
208 /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
209 /// makes the next call to `turn` return immediately.
210 ///
211 /// This method is intended to be used in situations where a notification
212 /// needs to otherwise be sent to the main reactor. If the reactor is
213 /// currently blocked inside of `turn` then it will wake up and soon return
214 /// after this method has been called. If the reactor is not currently
215 /// blocked in `turn`, then the next call to `turn` will not block and
216 /// return immediately.
wakeup(&self)217 fn wakeup(&self) {
218 if let Some(inner) = self.inner() {
219 inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
220 }
221 }
222
inner(&self) -> Option<Arc<Inner>>223 pub(super) fn inner(&self) -> Option<Arc<Inner>> {
224 self.inner.upgrade()
225 }
226 }
227
228 impl Unpark for Handle {
unpark(&self)229 fn unpark(&self) {
230 self.wakeup();
231 }
232 }
233
234 impl fmt::Debug for Handle {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 write!(f, "Handle")
237 }
238 }
239
240 // ===== impl Inner =====
241
242 impl Inner {
243 /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
244 ///
245 /// The registration token is returned.
add_source( &self, source: &dyn Evented, ready: mio::Ready, ) -> io::Result<Address>246 pub(super) fn add_source(
247 &self,
248 source: &dyn Evented,
249 ready: mio::Ready,
250 ) -> io::Result<Address> {
251 let address = self.io_dispatch.alloc().ok_or_else(|| {
252 io::Error::new(
253 io::ErrorKind::Other,
254 "reactor at max registered I/O resources",
255 )
256 })?;
257
258 self.n_sources.fetch_add(1, SeqCst);
259
260 self.io.register(
261 source,
262 mio::Token(address.to_usize()),
263 ready,
264 mio::PollOpt::edge(),
265 )?;
266
267 Ok(address)
268 }
269
270 /// Deregisters an I/O resource from the reactor.
deregister_source(&self, source: &dyn Evented) -> io::Result<()>271 pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
272 self.io.deregister(source)
273 }
274
drop_source(&self, address: Address)275 pub(super) fn drop_source(&self, address: Address) {
276 self.io_dispatch.remove(address);
277 self.n_sources.fetch_sub(1, SeqCst);
278 }
279
280 /// Registers interest in the I/O resource associated with `token`.
register(&self, token: Address, dir: Direction, w: Waker)281 pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
282 let sched = self
283 .io_dispatch
284 .get(token)
285 .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));
286
287 let waker = match dir {
288 Direction::Read => &sched.reader,
289 Direction::Write => &sched.writer,
290 };
291
292 waker.register(w);
293 }
294 }
295
296 impl Direction {
mask(self) -> mio::Ready297 pub(super) fn mask(self) -> mio::Ready {
298 match self {
299 Direction::Read => {
300 // Everything except writable is signaled through read.
301 mio::Ready::all() - mio::Ready::writable()
302 }
303 Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(),
304 }
305 }
306 }
307
308 #[cfg(all(test, loom))]
309 mod tests {
310 use super::*;
311 use loom::thread;
312
313 // No-op `Evented` impl just so we can have something to pass to `add_source`.
314 struct NotEvented;
315
316 impl Evented for NotEvented {
register( &self, _: &mio::Poll, _: mio::Token, _: mio::Ready, _: mio::PollOpt, ) -> io::Result<()>317 fn register(
318 &self,
319 _: &mio::Poll,
320 _: mio::Token,
321 _: mio::Ready,
322 _: mio::PollOpt,
323 ) -> io::Result<()> {
324 Ok(())
325 }
326
reregister( &self, _: &mio::Poll, _: mio::Token, _: mio::Ready, _: mio::PollOpt, ) -> io::Result<()>327 fn reregister(
328 &self,
329 _: &mio::Poll,
330 _: mio::Token,
331 _: mio::Ready,
332 _: mio::PollOpt,
333 ) -> io::Result<()> {
334 Ok(())
335 }
336
deregister(&self, _: &mio::Poll) -> io::Result<()>337 fn deregister(&self, _: &mio::Poll) -> io::Result<()> {
338 Ok(())
339 }
340 }
341
342 #[test]
tokens_unique_when_dropped()343 fn tokens_unique_when_dropped() {
344 loom::model(|| {
345 let reactor = Driver::new().unwrap();
346 let inner = reactor.inner;
347 let inner2 = inner.clone();
348
349 let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
350 let thread = thread::spawn(move || {
351 inner2.drop_source(token_1);
352 });
353
354 let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
355 thread.join().unwrap();
356
357 assert!(token_1 != token_2);
358 })
359 }
360
361 #[test]
tokens_unique_when_dropped_on_full_page()362 fn tokens_unique_when_dropped_on_full_page() {
363 loom::model(|| {
364 let reactor = Driver::new().unwrap();
365 let inner = reactor.inner;
366 let inner2 = inner.clone();
367 // add sources to fill up the first page so that the dropped index
368 // may be reused.
369 for _ in 0..31 {
370 inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
371 }
372
373 let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
374 let thread = thread::spawn(move || {
375 inner2.drop_source(token_1);
376 });
377
378 let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
379 thread.join().unwrap();
380
381 assert!(token_1 != token_2);
382 })
383 }
384
385 #[test]
tokens_unique_concurrent_add()386 fn tokens_unique_concurrent_add() {
387 loom::model(|| {
388 let reactor = Driver::new().unwrap();
389 let inner = reactor.inner;
390 let inner2 = inner.clone();
391
392 let thread = thread::spawn(move || {
393 let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap();
394 token_2
395 });
396
397 let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap();
398 let token_2 = thread.join().unwrap();
399
400 assert!(token_1 != token_2);
401 })
402 }
403 }
404