1 use std::ops::{Deref, DerefMut};
2 #[cfg(unix)]
3 use std::os::unix::io::AsRawFd;
4 #[cfg(windows)]
5 use std::os::windows::io::AsRawSocket;
6 #[cfg(debug_assertions)]
7 use std::sync::atomic::{AtomicUsize, Ordering};
8 use std::{fmt, io};
9 
10 #[cfg(any(unix, debug_assertions))]
11 use crate::poll;
12 use crate::sys::IoSourceState;
13 use crate::{event, Interest, Registry, Token};
14 
15 /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
16 /// implementation.
17 ///
18 /// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
19 ///
20 /// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
21 /// Mio supports registering any FD or socket that can be registered with the
22 /// underlying OS selector. `IoSource` provides the necessary bridge.
23 ///
24 /// [`RawFd`]: std::os::unix::io::RawFd
25 /// [`RawSocket`]: std::os::windows::io::RawSocket
26 ///
27 /// # Notes
28 ///
29 /// To handle the registrations and events properly **all** I/O operations (such
30 /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
31 /// internal state is updated accordingly.
32 ///
33 /// [`Poll`]: crate::Poll
34 /// [`do_io`]: IoSource::do_io
35 /*
36 ///
37 /// # Examples
38 ///
39 /// Basic usage.
40 ///
41 /// ```
42 /// # use std::error::Error;
43 /// # fn main() -> Result<(), Box<dyn Error>> {
44 /// use mio::{Interest, Poll, Token};
45 /// use mio::IoSource;
46 ///
47 /// use std::net;
48 ///
49 /// let poll = Poll::new()?;
50 ///
51 /// // Bind a std TCP listener.
52 /// let listener = net::TcpListener::bind("127.0.0.1:0")?;
53 /// // Wrap it in the `IoSource` type.
54 /// let mut listener = IoSource::new(listener);
55 ///
56 /// // Register the listener.
57 /// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?;
58 /// #     Ok(())
59 /// # }
60 /// ```
61 */
62 pub struct IoSource<T> {
63     state: IoSourceState,
64     inner: T,
65     #[cfg(debug_assertions)]
66     selector_id: SelectorId,
67 }
68 
69 impl<T> IoSource<T> {
70     /// Create a new `IoSource`.
new(io: T) -> IoSource<T>71     pub fn new(io: T) -> IoSource<T> {
72         IoSource {
73             state: IoSourceState::new(),
74             inner: io,
75             #[cfg(debug_assertions)]
76             selector_id: SelectorId::new(),
77         }
78     }
79 
80     /// Execute an I/O operations ensuring that the socket receives more events
81     /// if it hits a [`WouldBlock`] error.
82     ///
83     /// # Notes
84     ///
85     /// This method is required to be called for **all** I/O operations to
86     /// ensure the user will receive events once the socket is ready again after
87     /// returning a [`WouldBlock`] error.
88     ///
89     /// [`WouldBlock`]: io::ErrorKind::WouldBlock
do_io<F, R>(&self, f: F) -> io::Result<R> where F: FnOnce(&T) -> io::Result<R>,90     pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
91     where
92         F: FnOnce(&T) -> io::Result<R>,
93     {
94         self.state.do_io(f, &self.inner)
95     }
96 
97     /// Returns the I/O source, dropping the state.
98     ///
99     /// # Notes
100     ///
101     /// To ensure no more events are to be received for this I/O source first
102     /// [`deregister`] it.
103     ///
104     /// [`deregister`]: Registry::deregister
into_inner(self) -> T105     pub fn into_inner(self) -> T {
106         self.inner
107     }
108 }
109 
110 /// Be careful when using this method. All I/O operations that may block must go
111 /// through the [`do_io`] method.
112 ///
113 /// [`do_io`]: IoSource::do_io
114 impl<T> Deref for IoSource<T> {
115     type Target = T;
116 
deref(&self) -> &Self::Target117     fn deref(&self) -> &Self::Target {
118         &self.inner
119     }
120 }
121 
122 /// Be careful when using this method. All I/O operations that may block must go
123 /// through the [`do_io`] method.
124 ///
125 /// [`do_io`]: IoSource::do_io
126 impl<T> DerefMut for IoSource<T> {
deref_mut(&mut self) -> &mut Self::Target127     fn deref_mut(&mut self) -> &mut Self::Target {
128         &mut self.inner
129     }
130 }
131 
132 #[cfg(unix)]
133 impl<T> event::Source for IoSource<T>
134 where
135     T: AsRawFd,
136 {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>137     fn register(
138         &mut self,
139         registry: &Registry,
140         token: Token,
141         interests: Interest,
142     ) -> io::Result<()> {
143         #[cfg(debug_assertions)]
144         self.selector_id.associate(registry)?;
145         poll::selector(registry).register(self.inner.as_raw_fd(), token, interests)
146     }
147 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>148     fn reregister(
149         &mut self,
150         registry: &Registry,
151         token: Token,
152         interests: Interest,
153     ) -> io::Result<()> {
154         #[cfg(debug_assertions)]
155         self.selector_id.check_association(registry)?;
156         poll::selector(registry).reregister(self.inner.as_raw_fd(), token, interests)
157     }
158 
deregister(&mut self, registry: &Registry) -> io::Result<()>159     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
160         #[cfg(debug_assertions)]
161         self.selector_id.remove_association(registry)?;
162         poll::selector(registry).deregister(self.inner.as_raw_fd())
163     }
164 }
165 
166 #[cfg(windows)]
167 impl<T> event::Source for IoSource<T>
168 where
169     T: AsRawSocket,
170 {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>171     fn register(
172         &mut self,
173         registry: &Registry,
174         token: Token,
175         interests: Interest,
176     ) -> io::Result<()> {
177         #[cfg(debug_assertions)]
178         self.selector_id.associate(registry)?;
179         self.state
180             .register(registry, token, interests, self.inner.as_raw_socket())
181     }
182 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>183     fn reregister(
184         &mut self,
185         registry: &Registry,
186         token: Token,
187         interests: Interest,
188     ) -> io::Result<()> {
189         #[cfg(debug_assertions)]
190         self.selector_id.check_association(registry)?;
191         self.state.reregister(registry, token, interests)
192     }
193 
deregister(&mut self, _registry: &Registry) -> io::Result<()>194     fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
195         #[cfg(debug_assertions)]
196         self.selector_id.remove_association(_registry)?;
197         self.state.deregister()
198     }
199 }
200 
201 impl<T> fmt::Debug for IoSource<T>
202 where
203     T: fmt::Debug,
204 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result205     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206         self.inner.fmt(f)
207     }
208 }
209 
210 /// Used to associate an `IoSource` with a `sys::Selector`.
211 #[cfg(debug_assertions)]
212 #[derive(Debug)]
213 struct SelectorId {
214     id: AtomicUsize,
215 }
216 
217 #[cfg(debug_assertions)]
218 impl SelectorId {
219     /// Value of `id` if `SelectorId` is not associated with any
220     /// `sys::Selector`. Valid selector ids start at 1.
221     const UNASSOCIATED: usize = 0;
222 
223     /// Create a new `SelectorId`.
new() -> SelectorId224     const fn new() -> SelectorId {
225         SelectorId {
226             id: AtomicUsize::new(Self::UNASSOCIATED),
227         }
228     }
229 
230     /// Associate an I/O source with `registry`, returning an error if its
231     /// already registered.
associate(&self, registry: &Registry) -> io::Result<()>232     fn associate(&self, registry: &Registry) -> io::Result<()> {
233         let registry_id = poll::selector(&registry).id();
234         let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
235 
236         if previous_id == Self::UNASSOCIATED {
237             Ok(())
238         } else {
239             Err(io::Error::new(
240                 io::ErrorKind::AlreadyExists,
241                 "I/O source already registered with a `Registry`",
242             ))
243         }
244     }
245 
246     /// Check the association of an I/O source with `registry`, returning an
247     /// error if its registered with a different `Registry` or not registered at
248     /// all.
check_association(&self, registry: &Registry) -> io::Result<()>249     fn check_association(&self, registry: &Registry) -> io::Result<()> {
250         let registry_id = poll::selector(&registry).id();
251         let id = self.id.load(Ordering::Acquire);
252 
253         if id == registry_id {
254             Ok(())
255         } else if id == Self::UNASSOCIATED {
256             Err(io::Error::new(
257                 io::ErrorKind::NotFound,
258                 "I/O source not registered with `Registry`",
259             ))
260         } else {
261             Err(io::Error::new(
262                 io::ErrorKind::AlreadyExists,
263                 "I/O source already registered with a different `Registry`",
264             ))
265         }
266     }
267 
268     /// Remove a previously made association from `registry`, returns an error
269     /// if it was not previously associated with `registry`.
remove_association(&self, registry: &Registry) -> io::Result<()>270     fn remove_association(&self, registry: &Registry) -> io::Result<()> {
271         let registry_id = poll::selector(&registry).id();
272         let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
273 
274         if previous_id == registry_id {
275             Ok(())
276         } else {
277             Err(io::Error::new(
278                 io::ErrorKind::NotFound,
279                 "I/O source not registered with `Registry`",
280             ))
281         }
282     }
283 }
284 
285 #[cfg(debug_assertions)]
286 impl Clone for SelectorId {
clone(&self) -> SelectorId287     fn clone(&self) -> SelectorId {
288         SelectorId {
289             id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
290         }
291     }
292 }
293