1 use crate::io::driver::{platform, Direction, Handle};
2 use crate::util::slab::Address;
3 
4 use mio::{self, Evented};
5 use std::io;
6 use std::task::{Context, Poll};
7 
8 cfg_io_driver! {
9     /// Associates an I/O resource with the reactor instance that drives it.
10     ///
11     /// A registration represents an I/O resource registered with a Reactor such
12     /// that it will receive task notifications on readiness. This is the lowest
13     /// level API for integrating with a reactor.
14     ///
15     /// The association between an I/O resource is made by calling [`new`]. Once
16     /// the association is established, it remains established until the
17     /// registration instance is dropped.
18     ///
19     /// A registration instance represents two separate readiness streams. One
20     /// for the read readiness and one for write readiness. These streams are
21     /// independent and can be consumed from separate tasks.
22     ///
23     /// **Note**: while `Registration` is `Sync`, the caller must ensure that
24     /// there are at most two tasks that use a registration instance
25     /// concurrently. One task for [`poll_read_ready`] and one task for
26     /// [`poll_write_ready`]. While violating this requirement is "safe" from a
27     /// Rust memory safety point of view, it will result in unexpected behavior
28     /// in the form of lost notifications and tasks hanging.
29     ///
30     /// ## Platform-specific events
31     ///
32     /// `Registration` also allows receiving platform-specific `mio::Ready`
33     /// events. These events are included as part of the read readiness event
34     /// stream. The write readiness event stream is only for `Ready::writable()`
35     /// events.
36     ///
37     /// [`new`]: method@Self::new
38     /// [`poll_read_ready`]: method@Self::poll_read_ready`
39     /// [`poll_write_ready`]: method@Self::poll_write_ready`
40     #[derive(Debug)]
41     pub struct Registration {
42         handle: Handle,
43         address: Address,
44     }
45 }
46 
47 // ===== impl Registration =====
48 
49 impl Registration {
50     /// Registers the I/O resource with the default reactor.
51     ///
52     /// # Return
53     ///
54     /// - `Ok` if the registration happened successfully
55     /// - `Err` if an error was encountered during registration
56     ///
57     ///
58     /// # Panics
59     ///
60     /// This function panics if thread-local runtime is not set.
61     ///
62     /// The runtime is usually set implicitly when this function is called
63     /// from a future driven by a tokio runtime, otherwise runtime can be set
64     /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
new<T>(io: &T) -> io::Result<Registration> where T: Evented,65     pub fn new<T>(io: &T) -> io::Result<Registration>
66     where
67         T: Evented,
68     {
69         Registration::new_with_ready(io, mio::Ready::all())
70     }
71 
72     /// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state.
73     /// `new_with_ready` should be used over `new` when you need control over the readiness state,
74     /// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if
75     /// you are interested in those states, you will need to add them to the readiness state passed
76     /// to this function.
77     ///
78     /// An example to listen to read only
79     ///
80     /// ```rust
81     /// ##[cfg(unix)]
82     ///     mio::Ready::from_usize(
83     ///         mio::Ready::readable().as_usize()
84     ///         | mio::unix::UnixReady::error().as_usize()
85     ///         | mio::unix::UnixReady::hup().as_usize()
86     ///     );
87     /// ```
88     ///
89     /// # Return
90     ///
91     /// - `Ok` if the registration happened successfully
92     /// - `Err` if an error was encountered during registration
93     ///
94     ///
95     /// # Panics
96     ///
97     /// This function panics if thread-local runtime is not set.
98     ///
99     /// The runtime is usually set implicitly when this function is called
100     /// from a future driven by a tokio runtime, otherwise runtime can be set
101     /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration> where T: Evented,102     pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
103     where
104         T: Evented,
105     {
106         let handle = Handle::current();
107         let address = if let Some(inner) = handle.inner() {
108             inner.add_source(io, ready)?
109         } else {
110             return Err(io::Error::new(
111                 io::ErrorKind::Other,
112                 "failed to find event loop",
113             ));
114         };
115 
116         Ok(Registration { handle, address })
117     }
118 
119     /// Deregisters the I/O resource from the reactor it is associated with.
120     ///
121     /// This function must be called before the I/O resource associated with the
122     /// registration is dropped.
123     ///
124     /// Note that deregistering does not guarantee that the I/O resource can be
125     /// registered with a different reactor. Some I/O resource types can only be
126     /// associated with a single reactor instance for their lifetime.
127     ///
128     /// # Return
129     ///
130     /// If the deregistration was successful, `Ok` is returned. Any calls to
131     /// `Reactor::turn` that happen after a successful call to `deregister` will
132     /// no longer result in notifications getting sent for this registration.
133     ///
134     /// `Err` is returned if an error is encountered.
deregister<T>(&mut self, io: &T) -> io::Result<()> where T: Evented,135     pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
136     where
137         T: Evented,
138     {
139         let inner = match self.handle.inner() {
140             Some(inner) => inner,
141             None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
142         };
143         inner.deregister_source(io)
144     }
145 
146     /// Polls for events on the I/O resource's read readiness stream.
147     ///
148     /// If the I/O resource receives a new read readiness event since the last
149     /// call to `poll_read_ready`, it is returned. If it has not, the current
150     /// task is notified once a new event is received.
151     ///
152     /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
153     /// the function will always return `Ready(HUP)`. This should be treated as
154     /// the end of the readiness stream.
155     ///
156     /// # Return value
157     ///
158     /// There are several possible return values:
159     ///
160     /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
161     ///   a new readiness event. The readiness value is included.
162     ///
163     /// * `Poll::Pending` means that no new readiness events have been received
164     ///   since the last call to `poll_read_ready`.
165     ///
166     /// * `Poll::Ready(Err(err))` means that the registration has encountered an
167     ///   error. This could represent a permanent internal error for example.
168     ///
169     /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered
170     ///
171     /// # Panics
172     ///
173     /// This function will panic if called from outside of a task context.
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>>174     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
175         // Keep track of task budget
176         let coop = ready!(crate::coop::poll_proceed(cx));
177 
178         let v = self.poll_ready(Direction::Read, Some(cx)).map_err(|e| {
179             coop.made_progress();
180             e
181         })?;
182         match v {
183             Some(v) => {
184                 coop.made_progress();
185                 Poll::Ready(Ok(v))
186             }
187             None => Poll::Pending,
188         }
189     }
190 
191     /// Consume any pending read readiness event.
192     ///
193     /// This function is identical to [`poll_read_ready`] **except** that it
194     /// will not notify the current task when a new event is received. As such,
195     /// it is safe to call this function from outside of a task context.
196     ///
197     /// [`poll_read_ready`]: method@Self::poll_read_ready
take_read_ready(&self) -> io::Result<Option<mio::Ready>>198     pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
199         self.poll_ready(Direction::Read, None)
200     }
201 
202     /// Polls for events on the I/O resource's write readiness stream.
203     ///
204     /// If the I/O resource receives a new write readiness event since the last
205     /// call to `poll_write_ready`, it is returned. If it has not, the current
206     /// task is notified once a new event is received.
207     ///
208     /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
209     /// the function will always return `Ready(HUP)`. This should be treated as
210     /// the end of the readiness stream.
211     ///
212     /// # Return value
213     ///
214     /// There are several possible return values:
215     ///
216     /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
217     ///   a new readiness event. The readiness value is included.
218     ///
219     /// * `Poll::Pending` means that no new readiness events have been received
220     ///   since the last call to `poll_write_ready`.
221     ///
222     /// * `Poll::Ready(Err(err))` means that the registration has encountered an
223     ///   error. This could represent a permanent internal error for example.
224     ///
225     /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered
226     ///
227     /// # Panics
228     ///
229     /// This function will panic if called from outside of a task context.
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>>230     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
231         // Keep track of task budget
232         let coop = ready!(crate::coop::poll_proceed(cx));
233 
234         let v = self.poll_ready(Direction::Write, Some(cx)).map_err(|e| {
235             coop.made_progress();
236             e
237         })?;
238         match v {
239             Some(v) => {
240                 coop.made_progress();
241                 Poll::Ready(Ok(v))
242             }
243             None => Poll::Pending,
244         }
245     }
246 
247     /// Consumes any pending write readiness event.
248     ///
249     /// This function is identical to [`poll_write_ready`] **except** that it
250     /// will not notify the current task when a new event is received. As such,
251     /// it is safe to call this function from outside of a task context.
252     ///
253     /// [`poll_write_ready`]: method@Self::poll_write_ready
take_write_ready(&self) -> io::Result<Option<mio::Ready>>254     pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
255         self.poll_ready(Direction::Write, None)
256     }
257 
258     /// Polls for events on the I/O resource's `direction` readiness stream.
259     ///
260     /// If called with a task context, notify the task when a new event is
261     /// received.
poll_ready( &self, direction: Direction, cx: Option<&mut Context<'_>>, ) -> io::Result<Option<mio::Ready>>262     fn poll_ready(
263         &self,
264         direction: Direction,
265         cx: Option<&mut Context<'_>>,
266     ) -> io::Result<Option<mio::Ready>> {
267         let inner = match self.handle.inner() {
268             Some(inner) => inner,
269             None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
270         };
271 
272         // If the task should be notified about new events, ensure that it has
273         // been registered
274         if let Some(ref cx) = cx {
275             inner.register(self.address, direction, cx.waker().clone())
276         }
277 
278         let mask = direction.mask();
279         let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize();
280 
281         let sched = inner.io_dispatch.get(self.address).unwrap();
282 
283         // This consumes the current readiness state **except** for HUP and
284         // error. HUP and error are excluded because a) they are final states
285         // and never transitition out and b) both the read AND the write
286         // directions need to be able to obvserve these states.
287         //
288         // # Platform-specific behavior
289         //
290         // HUP and error readiness are platform-specific. On epoll platforms,
291         // HUP has specific conditions that must be met by both peers of a
292         // connection in order to be triggered.
293         //
294         // On epoll platforms, `EPOLLERR` is signaled through
295         // `UnixReady::error()` and is important to be observable by both read
296         // AND write. A specific case that `EPOLLERR` occurs is when the read
297         // end of a pipe is closed. When this occurs, a peer blocked by
298         // writing to the pipe should be notified.
299         let curr_ready = sched
300             .set_readiness(self.address, |curr| curr & (!mask_no_hup))
301             .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
302 
303         let mut ready = mask & mio::Ready::from_usize(curr_ready);
304 
305         if ready.is_empty() {
306             if let Some(cx) = cx {
307                 // Update the task info
308                 match direction {
309                     Direction::Read => sched.reader.register_by_ref(cx.waker()),
310                     Direction::Write => sched.writer.register_by_ref(cx.waker()),
311                 }
312 
313                 // Try again
314                 let curr_ready = sched
315                     .set_readiness(self.address, |curr| curr & (!mask_no_hup))
316                     .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
317                 ready = mask & mio::Ready::from_usize(curr_ready);
318             }
319         }
320 
321         if ready.is_empty() {
322             Ok(None)
323         } else {
324             Ok(Some(ready))
325         }
326     }
327 }
328 
329 unsafe impl Send for Registration {}
330 unsafe impl Sync for Registration {}
331 
332 impl Drop for Registration {
drop(&mut self)333     fn drop(&mut self) {
334         let inner = match self.handle.inner() {
335             Some(inner) => inner,
336             None => return,
337         };
338         inner.drop_source(self.address);
339     }
340 }
341