1 #![cfg_attr(not(feature = "net"), allow(dead_code))]
2 
3 use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo};
4 use crate::util::slab;
5 
6 use mio::event::Source;
7 use std::io;
8 use std::task::{Context, Poll};
9 
10 cfg_io_driver! {
11     /// Associates an I/O resource with the reactor instance that drives it.
12     ///
13     /// A registration represents an I/O resource registered with a Reactor such
14     /// that it will receive task notifications on readiness. This is the lowest
15     /// level API for integrating with a reactor.
16     ///
17     /// The association between an I/O resource is made by calling
18     /// [`new_with_interest_and_handle`].
19     /// Once the association is established, it remains established until the
20     /// registration instance is dropped.
21     ///
22     /// A registration instance represents two separate readiness streams. One
23     /// for the read readiness and one for write readiness. These streams are
24     /// independent and can be consumed from separate tasks.
25     ///
26     /// **Note**: while `Registration` is `Sync`, the caller must ensure that
27     /// there are at most two tasks that use a registration instance
28     /// concurrently. One task for [`poll_read_ready`] and one task for
29     /// [`poll_write_ready`]. While violating this requirement is "safe" from a
30     /// Rust memory safety point of view, it will result in unexpected behavior
31     /// in the form of lost notifications and tasks hanging.
32     ///
33     /// ## Platform-specific events
34     ///
35     /// `Registration` also allows receiving platform-specific `mio::Ready`
36     /// events. These events are included as part of the read readiness event
37     /// stream. The write readiness event stream is only for `Ready::writable()`
38     /// events.
39     ///
40     /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
41     /// [`poll_read_ready`]: method@Self::poll_read_ready`
42     /// [`poll_write_ready`]: method@Self::poll_write_ready`
43     #[derive(Debug)]
44     pub(crate) struct Registration {
45         /// Handle to the associated driver.
46         handle: Handle,
47 
48         /// Reference to state stored by the driver.
49         shared: slab::Ref<ScheduledIo>,
50     }
51 }
52 
53 unsafe impl Send for Registration {}
54 unsafe impl Sync for Registration {}
55 
56 // ===== impl Registration =====
57 
58 impl Registration {
59     /// Registers the I/O resource with the default reactor, for a specific
60     /// `Interest`. `new_with_interest` should be used over `new` when you need
61     /// control over the readiness state, such as when a file descriptor only
62     /// allows reads. This does not add `hup` or `error` so if you are
63     /// interested in those states, you will need to add them to the readiness
64     /// state passed to this function.
65     ///
66     /// # Return
67     ///
68     /// - `Ok` if the registration happened successfully
69     /// - `Err` if an error was encountered during registration
new_with_interest_and_handle( io: &mut impl Source, interest: Interest, handle: Handle, ) -> io::Result<Registration>70     pub(crate) fn new_with_interest_and_handle(
71         io: &mut impl Source,
72         interest: Interest,
73         handle: Handle,
74     ) -> io::Result<Registration> {
75         let shared = if let Some(inner) = handle.inner() {
76             inner.add_source(io, interest)?
77         } else {
78             return Err(io::Error::new(
79                 io::ErrorKind::Other,
80                 "failed to find event loop",
81             ));
82         };
83 
84         Ok(Registration { handle, shared })
85     }
86 
87     /// Deregisters the I/O resource from the reactor it is associated with.
88     ///
89     /// This function must be called before the I/O resource associated with the
90     /// registration is dropped.
91     ///
92     /// Note that deregistering does not guarantee that the I/O resource can be
93     /// registered with a different reactor. Some I/O resource types can only be
94     /// associated with a single reactor instance for their lifetime.
95     ///
96     /// # Return
97     ///
98     /// If the deregistration was successful, `Ok` is returned. Any calls to
99     /// `Reactor::turn` that happen after a successful call to `deregister` will
100     /// no longer result in notifications getting sent for this registration.
101     ///
102     /// `Err` is returned if an error is encountered.
deregister(&mut self, io: &mut impl Source) -> io::Result<()>103     pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
104         let inner = match self.handle.inner() {
105             Some(inner) => inner,
106             None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
107         };
108         inner.deregister_source(io)
109     }
110 
clear_readiness(&self, event: ReadyEvent)111     pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
112         self.shared.clear_readiness(event);
113     }
114 
115     // Uses the poll path, requiring the caller to ensure mutual exclusion for
116     // correctness. Only the last task to call this function is notified.
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>>117     pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
118         self.poll_ready(cx, Direction::Read)
119     }
120 
121     // Uses the poll path, requiring the caller to ensure mutual exclusion for
122     // correctness. Only the last task to call this function is notified.
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>>123     pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
124         self.poll_ready(cx, Direction::Write)
125     }
126 
127     // Uses the poll path, requiring the caller to ensure mutual exclusion for
128     // correctness. Only the last task to call this function is notified.
poll_read_io<R>( &self, cx: &mut Context<'_>, f: impl FnMut() -> io::Result<R>, ) -> Poll<io::Result<R>>129     pub(crate) fn poll_read_io<R>(
130         &self,
131         cx: &mut Context<'_>,
132         f: impl FnMut() -> io::Result<R>,
133     ) -> Poll<io::Result<R>> {
134         self.poll_io(cx, Direction::Read, f)
135     }
136 
137     // Uses the poll path, requiring the caller to ensure mutual exclusion for
138     // correctness. Only the last task to call this function is notified.
poll_write_io<R>( &self, cx: &mut Context<'_>, f: impl FnMut() -> io::Result<R>, ) -> Poll<io::Result<R>>139     pub(crate) fn poll_write_io<R>(
140         &self,
141         cx: &mut Context<'_>,
142         f: impl FnMut() -> io::Result<R>,
143     ) -> Poll<io::Result<R>> {
144         self.poll_io(cx, Direction::Write, f)
145     }
146 
147     /// Polls for events on the I/O resource's `direction` readiness stream.
148     ///
149     /// If called with a task context, notify the task when a new event is
150     /// received.
poll_ready( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll<io::Result<ReadyEvent>>151     fn poll_ready(
152         &self,
153         cx: &mut Context<'_>,
154         direction: Direction,
155     ) -> Poll<io::Result<ReadyEvent>> {
156         // Keep track of task budget
157         let coop = ready!(crate::coop::poll_proceed(cx));
158         let ev = ready!(self.shared.poll_readiness(cx, direction));
159 
160         if self.handle.inner().is_none() {
161             return Poll::Ready(Err(gone()));
162         }
163 
164         coop.made_progress();
165         Poll::Ready(Ok(ev))
166     }
167 
poll_io<R>( &self, cx: &mut Context<'_>, direction: Direction, mut f: impl FnMut() -> io::Result<R>, ) -> Poll<io::Result<R>>168     fn poll_io<R>(
169         &self,
170         cx: &mut Context<'_>,
171         direction: Direction,
172         mut f: impl FnMut() -> io::Result<R>,
173     ) -> Poll<io::Result<R>> {
174         loop {
175             let ev = ready!(self.poll_ready(cx, direction))?;
176 
177             match f() {
178                 Ok(ret) => {
179                     return Poll::Ready(Ok(ret));
180                 }
181                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
182                     self.clear_readiness(ev);
183                 }
184                 Err(e) => return Poll::Ready(Err(e)),
185             }
186         }
187     }
188 
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>189     pub(crate) fn try_io<R>(
190         &self,
191         interest: Interest,
192         f: impl FnOnce() -> io::Result<R>,
193     ) -> io::Result<R> {
194         let ev = self.shared.ready_event(interest);
195 
196         // Don't attempt the operation if the resource is not ready.
197         if ev.ready.is_empty() {
198             return Err(io::ErrorKind::WouldBlock.into());
199         }
200 
201         match f() {
202             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
203                 self.clear_readiness(ev);
204                 Err(io::ErrorKind::WouldBlock.into())
205             }
206             res => res,
207         }
208     }
209 }
210 
211 impl Drop for Registration {
drop(&mut self)212     fn drop(&mut self) {
213         // It is possible for a cycle to be created between wakers stored in
214         // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
215         // cycle, wakers are cleared. This is an imperfect solution as it is
216         // possible to store a `Registration` in a waker. In this case, the
217         // cycle would remain.
218         //
219         // See tokio-rs/tokio#3481 for more details.
220         self.shared.clear_wakers();
221     }
222 }
223 
gone() -> io::Error224 fn gone() -> io::Error {
225     io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
226 }
227 
228 cfg_io_readiness! {
229     impl Registration {
230         pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
231             use std::future::Future;
232             use std::pin::Pin;
233 
234             let fut = self.shared.readiness(interest);
235             pin!(fut);
236 
237             crate::future::poll_fn(|cx| {
238                 if self.handle.inner().is_none() {
239                     return Poll::Ready(Err(io::Error::new(
240                         io::ErrorKind::Other,
241                         crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
242                     )));
243                 }
244 
245                 Pin::new(&mut fut).poll(cx).map(Ok)
246             }).await
247         }
248 
249         pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
250             loop {
251                 let event = self.readiness(interest).await?;
252 
253                 match f() {
254                     Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
255                         self.clear_readiness(event);
256                     }
257                     x => return x,
258                 }
259             }
260         }
261     }
262 }
263