1 use crate::{Interest, Token};
2 
3 use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP};
4 use log::error;
5 use std::os::unix::io::{AsRawFd, RawFd};
6 #[cfg(debug_assertions)]
7 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 use std::time::Duration;
9 use std::{cmp, i32, io, ptr};
10 
11 /// Unique id for use as `SelectorId`.
12 #[cfg(debug_assertions)]
13 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
14 
15 #[derive(Debug)]
16 pub struct Selector {
17     #[cfg(debug_assertions)]
18     id: usize,
19     ep: RawFd,
20     #[cfg(debug_assertions)]
21     has_waker: AtomicBool,
22 }
23 
24 impl Selector {
new() -> io::Result<Selector>25     pub fn new() -> io::Result<Selector> {
26         // According to libuv, `EPOLL_CLOEXEC` is not defined on Android API <
27         // 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform,
28         // so we use it instead.
29         #[cfg(target_os = "android")]
30         let flag = libc::O_CLOEXEC;
31         #[cfg(not(target_os = "android"))]
32         let flag = libc::EPOLL_CLOEXEC;
33 
34         syscall!(epoll_create1(flag)).map(|ep| Selector {
35             #[cfg(debug_assertions)]
36             id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
37             ep,
38             #[cfg(debug_assertions)]
39             has_waker: AtomicBool::new(false),
40         })
41     }
42 
try_clone(&self) -> io::Result<Selector>43     pub fn try_clone(&self) -> io::Result<Selector> {
44         syscall!(fcntl(self.ep, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|ep| Selector {
45             // It's the same selector, so we use the same id.
46             #[cfg(debug_assertions)]
47             id: self.id,
48             ep,
49             #[cfg(debug_assertions)]
50             has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
51         })
52     }
53 
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>54     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
55         // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
56         // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
57         // architectures. The magic number is the same constant used by libuv.
58         #[cfg(target_pointer_width = "32")]
59         const MAX_SAFE_TIMEOUT: u128 = 1789569;
60         #[cfg(not(target_pointer_width = "32"))]
61         const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128;
62 
63         let timeout = timeout
64             .map(|to| cmp::min(to.as_millis(), MAX_SAFE_TIMEOUT) as libc::c_int)
65             .unwrap_or(-1);
66 
67         events.clear();
68         syscall!(epoll_wait(
69             self.ep,
70             events.as_mut_ptr(),
71             events.capacity() as i32,
72             timeout,
73         ))
74         .map(|n_events| {
75             // This is safe because `epoll_wait` ensures that `n_events` are
76             // assigned.
77             unsafe { events.set_len(n_events as usize) };
78         })
79     }
80 
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>81     pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
82         let mut event = libc::epoll_event {
83             events: interests_to_epoll(interests),
84             u64: usize::from(token) as u64,
85         };
86 
87         syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
88     }
89 
reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>90     pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
91         let mut event = libc::epoll_event {
92             events: interests_to_epoll(interests),
93             u64: usize::from(token) as u64,
94         };
95 
96         syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
97     }
98 
deregister(&self, fd: RawFd) -> io::Result<()>99     pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
100         syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
101     }
102 
103     #[cfg(debug_assertions)]
register_waker(&self) -> bool104     pub fn register_waker(&self) -> bool {
105         self.has_waker.swap(true, Ordering::AcqRel)
106     }
107 }
108 
109 cfg_io_source! {
110     impl Selector {
111         #[cfg(debug_assertions)]
112         pub fn id(&self) -> usize {
113             self.id
114         }
115     }
116 }
117 
118 impl AsRawFd for Selector {
as_raw_fd(&self) -> RawFd119     fn as_raw_fd(&self) -> RawFd {
120         self.ep
121     }
122 }
123 
124 impl Drop for Selector {
drop(&mut self)125     fn drop(&mut self) {
126         if let Err(err) = syscall!(close(self.ep)) {
127             error!("error closing epoll: {}", err);
128         }
129     }
130 }
131 
interests_to_epoll(interests: Interest) -> u32132 fn interests_to_epoll(interests: Interest) -> u32 {
133     let mut kind = EPOLLET;
134 
135     if interests.is_readable() {
136         kind = kind | EPOLLIN | EPOLLRDHUP;
137     }
138 
139     if interests.is_writable() {
140         kind |= EPOLLOUT;
141     }
142 
143     kind as u32
144 }
145 
146 pub type Event = libc::epoll_event;
147 pub type Events = Vec<Event>;
148 
149 pub mod event {
150     use std::fmt;
151 
152     use crate::sys::Event;
153     use crate::Token;
154 
token(event: &Event) -> Token155     pub fn token(event: &Event) -> Token {
156         Token(event.u64 as usize)
157     }
158 
is_readable(event: &Event) -> bool159     pub fn is_readable(event: &Event) -> bool {
160         (event.events as libc::c_int & libc::EPOLLIN) != 0
161             || (event.events as libc::c_int & libc::EPOLLPRI) != 0
162     }
163 
is_writable(event: &Event) -> bool164     pub fn is_writable(event: &Event) -> bool {
165         (event.events as libc::c_int & libc::EPOLLOUT) != 0
166     }
167 
is_error(event: &Event) -> bool168     pub fn is_error(event: &Event) -> bool {
169         (event.events as libc::c_int & libc::EPOLLERR) != 0
170     }
171 
is_read_closed(event: &Event) -> bool172     pub fn is_read_closed(event: &Event) -> bool {
173         // Both halves of the socket have closed
174         event.events as libc::c_int & libc::EPOLLHUP != 0
175             // Socket has received FIN or called shutdown(SHUT_RD)
176             || (event.events as libc::c_int & libc::EPOLLIN != 0
177                 && event.events as libc::c_int & libc::EPOLLRDHUP != 0)
178     }
179 
is_write_closed(event: &Event) -> bool180     pub fn is_write_closed(event: &Event) -> bool {
181         // Both halves of the socket have closed
182         event.events as libc::c_int & libc::EPOLLHUP != 0
183             // Unix pipe write end has closed
184             || (event.events as libc::c_int & libc::EPOLLOUT != 0
185                 && event.events as libc::c_int & libc::EPOLLERR != 0)
186             // The other side (read end) of a Unix pipe has closed.
187             || event.events as libc::c_int == libc::EPOLLERR
188     }
189 
is_priority(event: &Event) -> bool190     pub fn is_priority(event: &Event) -> bool {
191         (event.events as libc::c_int & libc::EPOLLPRI) != 0
192     }
193 
is_aio(_: &Event) -> bool194     pub fn is_aio(_: &Event) -> bool {
195         // Not supported in the kernel, only in libc.
196         false
197     }
198 
is_lio(_: &Event) -> bool199     pub fn is_lio(_: &Event) -> bool {
200         // Not supported.
201         false
202     }
203 
debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result204     pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
205         #[allow(clippy::trivially_copy_pass_by_ref)]
206         fn check_events(got: &u32, want: &libc::c_int) -> bool {
207             (*got as libc::c_int & want) != 0
208         }
209         debug_detail!(
210             EventsDetails(u32),
211             check_events,
212             libc::EPOLLIN,
213             libc::EPOLLPRI,
214             libc::EPOLLOUT,
215             libc::EPOLLRDNORM,
216             libc::EPOLLRDBAND,
217             libc::EPOLLWRNORM,
218             libc::EPOLLWRBAND,
219             libc::EPOLLMSG,
220             libc::EPOLLERR,
221             libc::EPOLLHUP,
222             libc::EPOLLET,
223             libc::EPOLLRDHUP,
224             libc::EPOLLONESHOT,
225             #[cfg(any(target_os = "linux", target_os = "solaris"))]
226             libc::EPOLLEXCLUSIVE,
227             #[cfg(any(target_os = "android", target_os = "linux"))]
228             libc::EPOLLWAKEUP,
229             libc::EPOLL_CLOEXEC,
230         );
231 
232         // Can't reference fields in packed structures.
233         let e_u64 = event.u64;
234         f.debug_struct("epoll_event")
235             .field("events", &EventsDetails(event.events))
236             .field("u64", &e_u64)
237             .finish()
238     }
239 }
240 
241 #[cfg(target_os = "android")]
242 #[test]
assert_close_on_exec_flag()243 fn assert_close_on_exec_flag() {
244     // This assertion need to be true for Selector::new.
245     assert_eq!(libc::O_CLOEXEC, libc::EPOLL_CLOEXEC);
246 }
247