1 #![cfg_attr(not(feature = "net"), allow(dead_code))]
2
3 use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo};
4 use crate::util::slab;
5
6 use mio::event::Source;
7 use std::io;
8 use std::task::{Context, Poll};
9
10 cfg_io_driver! {
11 /// Associates an I/O resource with the reactor instance that drives it.
12 ///
13 /// A registration represents an I/O resource registered with a Reactor such
14 /// that it will receive task notifications on readiness. This is the lowest
15 /// level API for integrating with a reactor.
16 ///
17 /// The association between an I/O resource is made by calling
18 /// [`new_with_interest_and_handle`].
19 /// Once the association is established, it remains established until the
20 /// registration instance is dropped.
21 ///
22 /// A registration instance represents two separate readiness streams. One
23 /// for the read readiness and one for write readiness. These streams are
24 /// independent and can be consumed from separate tasks.
25 ///
26 /// **Note**: while `Registration` is `Sync`, the caller must ensure that
27 /// there are at most two tasks that use a registration instance
28 /// concurrently. One task for [`poll_read_ready`] and one task for
29 /// [`poll_write_ready`]. While violating this requirement is "safe" from a
30 /// Rust memory safety point of view, it will result in unexpected behavior
31 /// in the form of lost notifications and tasks hanging.
32 ///
33 /// ## Platform-specific events
34 ///
35 /// `Registration` also allows receiving platform-specific `mio::Ready`
36 /// events. These events are included as part of the read readiness event
37 /// stream. The write readiness event stream is only for `Ready::writable()`
38 /// events.
39 ///
40 /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
41 /// [`poll_read_ready`]: method@Self::poll_read_ready`
42 /// [`poll_write_ready`]: method@Self::poll_write_ready`
43 #[derive(Debug)]
44 pub(crate) struct Registration {
45 /// Handle to the associated driver.
46 handle: Handle,
47
48 /// Reference to state stored by the driver.
49 shared: slab::Ref<ScheduledIo>,
50 }
51 }
52
53 unsafe impl Send for Registration {}
54 unsafe impl Sync for Registration {}
55
56 // ===== impl Registration =====
57
58 impl Registration {
59 /// Registers the I/O resource with the default reactor, for a specific
60 /// `Interest`. `new_with_interest` should be used over `new` when you need
61 /// control over the readiness state, such as when a file descriptor only
62 /// allows reads. This does not add `hup` or `error` so if you are
63 /// interested in those states, you will need to add them to the readiness
64 /// state passed to this function.
65 ///
66 /// # Return
67 ///
68 /// - `Ok` if the registration happened successfully
69 /// - `Err` if an error was encountered during registration
new_with_interest_and_handle( io: &mut impl Source, interest: Interest, handle: Handle, ) -> io::Result<Registration>70 pub(crate) fn new_with_interest_and_handle(
71 io: &mut impl Source,
72 interest: Interest,
73 handle: Handle,
74 ) -> io::Result<Registration> {
75 let shared = if let Some(inner) = handle.inner() {
76 inner.add_source(io, interest)?
77 } else {
78 return Err(io::Error::new(
79 io::ErrorKind::Other,
80 "failed to find event loop",
81 ));
82 };
83
84 Ok(Registration { handle, shared })
85 }
86
87 /// Deregisters the I/O resource from the reactor it is associated with.
88 ///
89 /// This function must be called before the I/O resource associated with the
90 /// registration is dropped.
91 ///
92 /// Note that deregistering does not guarantee that the I/O resource can be
93 /// registered with a different reactor. Some I/O resource types can only be
94 /// associated with a single reactor instance for their lifetime.
95 ///
96 /// # Return
97 ///
98 /// If the deregistration was successful, `Ok` is returned. Any calls to
99 /// `Reactor::turn` that happen after a successful call to `deregister` will
100 /// no longer result in notifications getting sent for this registration.
101 ///
102 /// `Err` is returned if an error is encountered.
deregister(&mut self, io: &mut impl Source) -> io::Result<()>103 pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
104 let inner = match self.handle.inner() {
105 Some(inner) => inner,
106 None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
107 };
108 inner.deregister_source(io)
109 }
110
clear_readiness(&self, event: ReadyEvent)111 pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
112 self.shared.clear_readiness(event);
113 }
114
115 // Uses the poll path, requiring the caller to ensure mutual exclusion for
116 // correctness. Only the last task to call this function is notified.
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>>117 pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
118 self.poll_ready(cx, Direction::Read)
119 }
120
121 // Uses the poll path, requiring the caller to ensure mutual exclusion for
122 // correctness. Only the last task to call this function is notified.
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>>123 pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
124 self.poll_ready(cx, Direction::Write)
125 }
126
127 // Uses the poll path, requiring the caller to ensure mutual exclusion for
128 // correctness. Only the last task to call this function is notified.
poll_read_io<R>( &self, cx: &mut Context<'_>, f: impl FnMut() -> io::Result<R>, ) -> Poll<io::Result<R>>129 pub(crate) fn poll_read_io<R>(
130 &self,
131 cx: &mut Context<'_>,
132 f: impl FnMut() -> io::Result<R>,
133 ) -> Poll<io::Result<R>> {
134 self.poll_io(cx, Direction::Read, f)
135 }
136
137 // Uses the poll path, requiring the caller to ensure mutual exclusion for
138 // correctness. Only the last task to call this function is notified.
poll_write_io<R>( &self, cx: &mut Context<'_>, f: impl FnMut() -> io::Result<R>, ) -> Poll<io::Result<R>>139 pub(crate) fn poll_write_io<R>(
140 &self,
141 cx: &mut Context<'_>,
142 f: impl FnMut() -> io::Result<R>,
143 ) -> Poll<io::Result<R>> {
144 self.poll_io(cx, Direction::Write, f)
145 }
146
147 /// Polls for events on the I/O resource's `direction` readiness stream.
148 ///
149 /// If called with a task context, notify the task when a new event is
150 /// received.
poll_ready( &self, cx: &mut Context<'_>, direction: Direction, ) -> Poll<io::Result<ReadyEvent>>151 fn poll_ready(
152 &self,
153 cx: &mut Context<'_>,
154 direction: Direction,
155 ) -> Poll<io::Result<ReadyEvent>> {
156 // Keep track of task budget
157 let coop = ready!(crate::coop::poll_proceed(cx));
158 let ev = ready!(self.shared.poll_readiness(cx, direction));
159
160 if self.handle.inner().is_none() {
161 return Poll::Ready(Err(gone()));
162 }
163
164 coop.made_progress();
165 Poll::Ready(Ok(ev))
166 }
167
poll_io<R>( &self, cx: &mut Context<'_>, direction: Direction, mut f: impl FnMut() -> io::Result<R>, ) -> Poll<io::Result<R>>168 fn poll_io<R>(
169 &self,
170 cx: &mut Context<'_>,
171 direction: Direction,
172 mut f: impl FnMut() -> io::Result<R>,
173 ) -> Poll<io::Result<R>> {
174 loop {
175 let ev = ready!(self.poll_ready(cx, direction))?;
176
177 match f() {
178 Ok(ret) => {
179 return Poll::Ready(Ok(ret));
180 }
181 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
182 self.clear_readiness(ev);
183 }
184 Err(e) => return Poll::Ready(Err(e)),
185 }
186 }
187 }
188
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>189 pub(crate) fn try_io<R>(
190 &self,
191 interest: Interest,
192 f: impl FnOnce() -> io::Result<R>,
193 ) -> io::Result<R> {
194 let ev = self.shared.ready_event(interest);
195
196 // Don't attempt the operation if the resource is not ready.
197 if ev.ready.is_empty() {
198 return Err(io::ErrorKind::WouldBlock.into());
199 }
200
201 match f() {
202 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
203 self.clear_readiness(ev);
204 Err(io::ErrorKind::WouldBlock.into())
205 }
206 res => res,
207 }
208 }
209 }
210
211 impl Drop for Registration {
drop(&mut self)212 fn drop(&mut self) {
213 // It is possible for a cycle to be created between wakers stored in
214 // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
215 // cycle, wakers are cleared. This is an imperfect solution as it is
216 // possible to store a `Registration` in a waker. In this case, the
217 // cycle would remain.
218 //
219 // See tokio-rs/tokio#3481 for more details.
220 self.shared.clear_wakers();
221 }
222 }
223
gone() -> io::Error224 fn gone() -> io::Error {
225 io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
226 }
227
228 cfg_io_readiness! {
229 impl Registration {
230 pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
231 use std::future::Future;
232 use std::pin::Pin;
233
234 let fut = self.shared.readiness(interest);
235 pin!(fut);
236
237 crate::future::poll_fn(|cx| {
238 if self.handle.inner().is_none() {
239 return Poll::Ready(Err(io::Error::new(
240 io::ErrorKind::Other,
241 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
242 )));
243 }
244
245 Pin::new(&mut fut).poll(cx).map(Ok)
246 }).await
247 }
248
249 pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
250 loop {
251 let event = self.readiness(interest).await?;
252
253 match f() {
254 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
255 self.clear_readiness(event);
256 }
257 x => return x,
258 }
259 }
260 }
261 }
262 }
263