1 use crate::io::driver::{platform, Direction, Handle}; 2 use crate::util::slab::Address; 3 4 use mio::{self, Evented}; 5 use std::io; 6 use std::task::{Context, Poll}; 7 8 cfg_io_driver! { 9 /// Associates an I/O resource with the reactor instance that drives it. 10 /// 11 /// A registration represents an I/O resource registered with a Reactor such 12 /// that it will receive task notifications on readiness. This is the lowest 13 /// level API for integrating with a reactor. 14 /// 15 /// The association between an I/O resource is made by calling [`new`]. Once 16 /// the association is established, it remains established until the 17 /// registration instance is dropped. 18 /// 19 /// A registration instance represents two separate readiness streams. One 20 /// for the read readiness and one for write readiness. These streams are 21 /// independent and can be consumed from separate tasks. 22 /// 23 /// **Note**: while `Registration` is `Sync`, the caller must ensure that 24 /// there are at most two tasks that use a registration instance 25 /// concurrently. One task for [`poll_read_ready`] and one task for 26 /// [`poll_write_ready`]. While violating this requirement is "safe" from a 27 /// Rust memory safety point of view, it will result in unexpected behavior 28 /// in the form of lost notifications and tasks hanging. 29 /// 30 /// ## Platform-specific events 31 /// 32 /// `Registration` also allows receiving platform-specific `mio::Ready` 33 /// events. These events are included as part of the read readiness event 34 /// stream. The write readiness event stream is only for `Ready::writable()` 35 /// events. 36 /// 37 /// [`new`]: method@Self::new 38 /// [`poll_read_ready`]: method@Self::poll_read_ready` 39 /// [`poll_write_ready`]: method@Self::poll_write_ready` 40 #[derive(Debug)] 41 pub struct Registration { 42 handle: Handle, 43 address: Address, 44 } 45 } 46 47 // ===== impl Registration ===== 48 49 impl Registration { 50 /// Registers the I/O resource with the default reactor. 51 /// 52 /// # Return 53 /// 54 /// - `Ok` if the registration happened successfully 55 /// - `Err` if an error was encountered during registration 56 /// 57 /// 58 /// # Panics 59 /// 60 /// This function panics if thread-local runtime is not set. 61 /// 62 /// The runtime is usually set implicitly when this function is called 63 /// from a future driven by a tokio runtime, otherwise runtime can be set 64 /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. new<T>(io: &T) -> io::Result<Registration> where T: Evented,65 pub fn new<T>(io: &T) -> io::Result<Registration> 66 where 67 T: Evented, 68 { 69 Registration::new_with_ready(io, mio::Ready::all()) 70 } 71 72 /// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state. 73 /// `new_with_ready` should be used over `new` when you need control over the readiness state, 74 /// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if 75 /// you are interested in those states, you will need to add them to the readiness state passed 76 /// to this function. 77 /// 78 /// An example to listen to read only 79 /// 80 /// ```rust 81 /// ##[cfg(unix)] 82 /// mio::Ready::from_usize( 83 /// mio::Ready::readable().as_usize() 84 /// | mio::unix::UnixReady::error().as_usize() 85 /// | mio::unix::UnixReady::hup().as_usize() 86 /// ); 87 /// ``` 88 /// 89 /// # Return 90 /// 91 /// - `Ok` if the registration happened successfully 92 /// - `Err` if an error was encountered during registration 93 /// 94 /// 95 /// # Panics 96 /// 97 /// This function panics if thread-local runtime is not set. 98 /// 99 /// The runtime is usually set implicitly when this function is called 100 /// from a future driven by a tokio runtime, otherwise runtime can be set 101 /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration> where T: Evented,102 pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration> 103 where 104 T: Evented, 105 { 106 let handle = Handle::current(); 107 let address = if let Some(inner) = handle.inner() { 108 inner.add_source(io, ready)? 109 } else { 110 return Err(io::Error::new( 111 io::ErrorKind::Other, 112 "failed to find event loop", 113 )); 114 }; 115 116 Ok(Registration { handle, address }) 117 } 118 119 /// Deregisters the I/O resource from the reactor it is associated with. 120 /// 121 /// This function must be called before the I/O resource associated with the 122 /// registration is dropped. 123 /// 124 /// Note that deregistering does not guarantee that the I/O resource can be 125 /// registered with a different reactor. Some I/O resource types can only be 126 /// associated with a single reactor instance for their lifetime. 127 /// 128 /// # Return 129 /// 130 /// If the deregistration was successful, `Ok` is returned. Any calls to 131 /// `Reactor::turn` that happen after a successful call to `deregister` will 132 /// no longer result in notifications getting sent for this registration. 133 /// 134 /// `Err` is returned if an error is encountered. deregister<T>(&mut self, io: &T) -> io::Result<()> where T: Evented,135 pub fn deregister<T>(&mut self, io: &T) -> io::Result<()> 136 where 137 T: Evented, 138 { 139 let inner = match self.handle.inner() { 140 Some(inner) => inner, 141 None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), 142 }; 143 inner.deregister_source(io) 144 } 145 146 /// Polls for events on the I/O resource's read readiness stream. 147 /// 148 /// If the I/O resource receives a new read readiness event since the last 149 /// call to `poll_read_ready`, it is returned. If it has not, the current 150 /// task is notified once a new event is received. 151 /// 152 /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, 153 /// the function will always return `Ready(HUP)`. This should be treated as 154 /// the end of the readiness stream. 155 /// 156 /// # Return value 157 /// 158 /// There are several possible return values: 159 /// 160 /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received 161 /// a new readiness event. The readiness value is included. 162 /// 163 /// * `Poll::Pending` means that no new readiness events have been received 164 /// since the last call to `poll_read_ready`. 165 /// 166 /// * `Poll::Ready(Err(err))` means that the registration has encountered an 167 /// error. This could represent a permanent internal error for example. 168 /// 169 /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered 170 /// 171 /// # Panics 172 /// 173 /// This function will panic if called from outside of a task context. poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>>174 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> { 175 // Keep track of task budget 176 let coop = ready!(crate::coop::poll_proceed(cx)); 177 178 let v = self.poll_ready(Direction::Read, Some(cx)).map_err(|e| { 179 coop.made_progress(); 180 e 181 })?; 182 match v { 183 Some(v) => { 184 coop.made_progress(); 185 Poll::Ready(Ok(v)) 186 } 187 None => Poll::Pending, 188 } 189 } 190 191 /// Consume any pending read readiness event. 192 /// 193 /// This function is identical to [`poll_read_ready`] **except** that it 194 /// will not notify the current task when a new event is received. As such, 195 /// it is safe to call this function from outside of a task context. 196 /// 197 /// [`poll_read_ready`]: method@Self::poll_read_ready take_read_ready(&self) -> io::Result<Option<mio::Ready>>198 pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> { 199 self.poll_ready(Direction::Read, None) 200 } 201 202 /// Polls for events on the I/O resource's write readiness stream. 203 /// 204 /// If the I/O resource receives a new write readiness event since the last 205 /// call to `poll_write_ready`, it is returned. If it has not, the current 206 /// task is notified once a new event is received. 207 /// 208 /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, 209 /// the function will always return `Ready(HUP)`. This should be treated as 210 /// the end of the readiness stream. 211 /// 212 /// # Return value 213 /// 214 /// There are several possible return values: 215 /// 216 /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received 217 /// a new readiness event. The readiness value is included. 218 /// 219 /// * `Poll::Pending` means that no new readiness events have been received 220 /// since the last call to `poll_write_ready`. 221 /// 222 /// * `Poll::Ready(Err(err))` means that the registration has encountered an 223 /// error. This could represent a permanent internal error for example. 224 /// 225 /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered 226 /// 227 /// # Panics 228 /// 229 /// This function will panic if called from outside of a task context. poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>>230 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> { 231 // Keep track of task budget 232 let coop = ready!(crate::coop::poll_proceed(cx)); 233 234 let v = self.poll_ready(Direction::Write, Some(cx)).map_err(|e| { 235 coop.made_progress(); 236 e 237 })?; 238 match v { 239 Some(v) => { 240 coop.made_progress(); 241 Poll::Ready(Ok(v)) 242 } 243 None => Poll::Pending, 244 } 245 } 246 247 /// Consumes any pending write readiness event. 248 /// 249 /// This function is identical to [`poll_write_ready`] **except** that it 250 /// will not notify the current task when a new event is received. As such, 251 /// it is safe to call this function from outside of a task context. 252 /// 253 /// [`poll_write_ready`]: method@Self::poll_write_ready take_write_ready(&self) -> io::Result<Option<mio::Ready>>254 pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> { 255 self.poll_ready(Direction::Write, None) 256 } 257 258 /// Polls for events on the I/O resource's `direction` readiness stream. 259 /// 260 /// If called with a task context, notify the task when a new event is 261 /// received. poll_ready( &self, direction: Direction, cx: Option<&mut Context<'_>>, ) -> io::Result<Option<mio::Ready>>262 fn poll_ready( 263 &self, 264 direction: Direction, 265 cx: Option<&mut Context<'_>>, 266 ) -> io::Result<Option<mio::Ready>> { 267 let inner = match self.handle.inner() { 268 Some(inner) => inner, 269 None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), 270 }; 271 272 // If the task should be notified about new events, ensure that it has 273 // been registered 274 if let Some(ref cx) = cx { 275 inner.register(self.address, direction, cx.waker().clone()) 276 } 277 278 let mask = direction.mask(); 279 let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize(); 280 281 let sched = inner.io_dispatch.get(self.address).unwrap(); 282 283 // This consumes the current readiness state **except** for HUP and 284 // error. HUP and error are excluded because a) they are final states 285 // and never transitition out and b) both the read AND the write 286 // directions need to be able to obvserve these states. 287 // 288 // # Platform-specific behavior 289 // 290 // HUP and error readiness are platform-specific. On epoll platforms, 291 // HUP has specific conditions that must be met by both peers of a 292 // connection in order to be triggered. 293 // 294 // On epoll platforms, `EPOLLERR` is signaled through 295 // `UnixReady::error()` and is important to be observable by both read 296 // AND write. A specific case that `EPOLLERR` occurs is when the read 297 // end of a pipe is closed. When this occurs, a peer blocked by 298 // writing to the pipe should be notified. 299 let curr_ready = sched 300 .set_readiness(self.address, |curr| curr & (!mask_no_hup)) 301 .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address)); 302 303 let mut ready = mask & mio::Ready::from_usize(curr_ready); 304 305 if ready.is_empty() { 306 if let Some(cx) = cx { 307 // Update the task info 308 match direction { 309 Direction::Read => sched.reader.register_by_ref(cx.waker()), 310 Direction::Write => sched.writer.register_by_ref(cx.waker()), 311 } 312 313 // Try again 314 let curr_ready = sched 315 .set_readiness(self.address, |curr| curr & (!mask_no_hup)) 316 .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address)); 317 ready = mask & mio::Ready::from_usize(curr_ready); 318 } 319 } 320 321 if ready.is_empty() { 322 Ok(None) 323 } else { 324 Ok(Some(ready)) 325 } 326 } 327 } 328 329 unsafe impl Send for Registration {} 330 unsafe impl Sync for Registration {} 331 332 impl Drop for Registration { drop(&mut self)333 fn drop(&mut self) { 334 let inner = match self.handle.inner() { 335 Some(inner) => inner, 336 None => return, 337 }; 338 inner.drop_source(self.address); 339 } 340 } 341