1 use std::ops::{Deref, DerefMut}; 2 #[cfg(unix)] 3 use std::os::unix::io::AsRawFd; 4 #[cfg(windows)] 5 use std::os::windows::io::AsRawSocket; 6 #[cfg(debug_assertions)] 7 use std::sync::atomic::{AtomicUsize, Ordering}; 8 use std::{fmt, io}; 9 10 #[cfg(any(unix, debug_assertions))] 11 use crate::poll; 12 use crate::sys::IoSourceState; 13 use crate::{event, Interest, Registry, Token}; 14 15 /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`] 16 /// implementation. 17 /// 18 /// `IoSource` enables registering any FD or socket wrapper with [`Poll`]. 19 /// 20 /// While only implementations for TCP, UDP, and UDS (Unix only) are provided, 21 /// Mio supports registering any FD or socket that can be registered with the 22 /// underlying OS selector. `IoSource` provides the necessary bridge. 23 /// 24 /// [`RawFd`]: std::os::unix::io::RawFd 25 /// [`RawSocket`]: std::os::windows::io::RawSocket 26 /// 27 /// # Notes 28 /// 29 /// To handle the registrations and events properly **all** I/O operations (such 30 /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the 31 /// internal state is updated accordingly. 32 /// 33 /// [`Poll`]: crate::Poll 34 /// [`do_io`]: IoSource::do_io 35 /* 36 /// 37 /// # Examples 38 /// 39 /// Basic usage. 40 /// 41 /// ``` 42 /// # use std::error::Error; 43 /// # fn main() -> Result<(), Box<dyn Error>> { 44 /// use mio::{Interest, Poll, Token}; 45 /// use mio::IoSource; 46 /// 47 /// use std::net; 48 /// 49 /// let poll = Poll::new()?; 50 /// 51 /// // Bind a std TCP listener. 52 /// let listener = net::TcpListener::bind("127.0.0.1:0")?; 53 /// // Wrap it in the `IoSource` type. 54 /// let mut listener = IoSource::new(listener); 55 /// 56 /// // Register the listener. 57 /// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?; 58 /// # Ok(()) 59 /// # } 60 /// ``` 61 */ 62 pub struct IoSource<T> { 63 state: IoSourceState, 64 inner: T, 65 #[cfg(debug_assertions)] 66 selector_id: SelectorId, 67 } 68 69 impl<T> IoSource<T> { 70 /// Create a new `IoSource`. new(io: T) -> IoSource<T>71 pub fn new(io: T) -> IoSource<T> { 72 IoSource { 73 state: IoSourceState::new(), 74 inner: io, 75 #[cfg(debug_assertions)] 76 selector_id: SelectorId::new(), 77 } 78 } 79 80 /// Execute an I/O operations ensuring that the socket receives more events 81 /// if it hits a [`WouldBlock`] error. 82 /// 83 /// # Notes 84 /// 85 /// This method is required to be called for **all** I/O operations to 86 /// ensure the user will receive events once the socket is ready again after 87 /// returning a [`WouldBlock`] error. 88 /// 89 /// [`WouldBlock`]: io::ErrorKind::WouldBlock do_io<F, R>(&self, f: F) -> io::Result<R> where F: FnOnce(&T) -> io::Result<R>,90 pub fn do_io<F, R>(&self, f: F) -> io::Result<R> 91 where 92 F: FnOnce(&T) -> io::Result<R>, 93 { 94 self.state.do_io(f, &self.inner) 95 } 96 97 /// Returns the I/O source, dropping the state. 98 /// 99 /// # Notes 100 /// 101 /// To ensure no more events are to be received for this I/O source first 102 /// [`deregister`] it. 103 /// 104 /// [`deregister`]: Registry::deregister into_inner(self) -> T105 pub fn into_inner(self) -> T { 106 self.inner 107 } 108 } 109 110 /// Be careful when using this method. All I/O operations that may block must go 111 /// through the [`do_io`] method. 112 /// 113 /// [`do_io`]: IoSource::do_io 114 impl<T> Deref for IoSource<T> { 115 type Target = T; 116 deref(&self) -> &Self::Target117 fn deref(&self) -> &Self::Target { 118 &self.inner 119 } 120 } 121 122 /// Be careful when using this method. All I/O operations that may block must go 123 /// through the [`do_io`] method. 124 /// 125 /// [`do_io`]: IoSource::do_io 126 impl<T> DerefMut for IoSource<T> { deref_mut(&mut self) -> &mut Self::Target127 fn deref_mut(&mut self) -> &mut Self::Target { 128 &mut self.inner 129 } 130 } 131 132 #[cfg(unix)] 133 impl<T> event::Source for IoSource<T> 134 where 135 T: AsRawFd, 136 { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>137 fn register( 138 &mut self, 139 registry: &Registry, 140 token: Token, 141 interests: Interest, 142 ) -> io::Result<()> { 143 #[cfg(debug_assertions)] 144 self.selector_id.associate(registry)?; 145 poll::selector(registry).register(self.inner.as_raw_fd(), token, interests) 146 } 147 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>148 fn reregister( 149 &mut self, 150 registry: &Registry, 151 token: Token, 152 interests: Interest, 153 ) -> io::Result<()> { 154 #[cfg(debug_assertions)] 155 self.selector_id.check_association(registry)?; 156 poll::selector(registry).reregister(self.inner.as_raw_fd(), token, interests) 157 } 158 deregister(&mut self, registry: &Registry) -> io::Result<()>159 fn deregister(&mut self, registry: &Registry) -> io::Result<()> { 160 #[cfg(debug_assertions)] 161 self.selector_id.remove_association(registry)?; 162 poll::selector(registry).deregister(self.inner.as_raw_fd()) 163 } 164 } 165 166 #[cfg(windows)] 167 impl<T> event::Source for IoSource<T> 168 where 169 T: AsRawSocket, 170 { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>171 fn register( 172 &mut self, 173 registry: &Registry, 174 token: Token, 175 interests: Interest, 176 ) -> io::Result<()> { 177 #[cfg(debug_assertions)] 178 self.selector_id.associate(registry)?; 179 self.state 180 .register(registry, token, interests, self.inner.as_raw_socket()) 181 } 182 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>183 fn reregister( 184 &mut self, 185 registry: &Registry, 186 token: Token, 187 interests: Interest, 188 ) -> io::Result<()> { 189 #[cfg(debug_assertions)] 190 self.selector_id.check_association(registry)?; 191 self.state.reregister(registry, token, interests) 192 } 193 deregister(&mut self, _registry: &Registry) -> io::Result<()>194 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { 195 #[cfg(debug_assertions)] 196 self.selector_id.remove_association(_registry)?; 197 self.state.deregister() 198 } 199 } 200 201 impl<T> fmt::Debug for IoSource<T> 202 where 203 T: fmt::Debug, 204 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 206 self.inner.fmt(f) 207 } 208 } 209 210 /// Used to associate an `IoSource` with a `sys::Selector`. 211 #[cfg(debug_assertions)] 212 #[derive(Debug)] 213 struct SelectorId { 214 id: AtomicUsize, 215 } 216 217 #[cfg(debug_assertions)] 218 impl SelectorId { 219 /// Value of `id` if `SelectorId` is not associated with any 220 /// `sys::Selector`. Valid selector ids start at 1. 221 const UNASSOCIATED: usize = 0; 222 223 /// Create a new `SelectorId`. new() -> SelectorId224 const fn new() -> SelectorId { 225 SelectorId { 226 id: AtomicUsize::new(Self::UNASSOCIATED), 227 } 228 } 229 230 /// Associate an I/O source with `registry`, returning an error if its 231 /// already registered. associate(&self, registry: &Registry) -> io::Result<()>232 fn associate(&self, registry: &Registry) -> io::Result<()> { 233 let registry_id = poll::selector(®istry).id(); 234 let previous_id = self.id.swap(registry_id, Ordering::AcqRel); 235 236 if previous_id == Self::UNASSOCIATED { 237 Ok(()) 238 } else { 239 Err(io::Error::new( 240 io::ErrorKind::AlreadyExists, 241 "I/O source already registered with a `Registry`", 242 )) 243 } 244 } 245 246 /// Check the association of an I/O source with `registry`, returning an 247 /// error if its registered with a different `Registry` or not registered at 248 /// all. check_association(&self, registry: &Registry) -> io::Result<()>249 fn check_association(&self, registry: &Registry) -> io::Result<()> { 250 let registry_id = poll::selector(®istry).id(); 251 let id = self.id.load(Ordering::Acquire); 252 253 if id == registry_id { 254 Ok(()) 255 } else if id == Self::UNASSOCIATED { 256 Err(io::Error::new( 257 io::ErrorKind::NotFound, 258 "I/O source not registered with `Registry`", 259 )) 260 } else { 261 Err(io::Error::new( 262 io::ErrorKind::AlreadyExists, 263 "I/O source already registered with a different `Registry`", 264 )) 265 } 266 } 267 268 /// Remove a previously made association from `registry`, returns an error 269 /// if it was not previously associated with `registry`. remove_association(&self, registry: &Registry) -> io::Result<()>270 fn remove_association(&self, registry: &Registry) -> io::Result<()> { 271 let registry_id = poll::selector(®istry).id(); 272 let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel); 273 274 if previous_id == registry_id { 275 Ok(()) 276 } else { 277 Err(io::Error::new( 278 io::ErrorKind::NotFound, 279 "I/O source not registered with `Registry`", 280 )) 281 } 282 } 283 } 284 285 #[cfg(debug_assertions)] 286 impl Clone for SelectorId { clone(&self) -> SelectorId287 fn clone(&self) -> SelectorId { 288 SelectorId { 289 id: AtomicUsize::new(self.id.load(Ordering::Acquire)), 290 } 291 } 292 } 293