1 use crate::{Interest, Token};
2
3 use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP};
4 use log::error;
5 use std::os::unix::io::{AsRawFd, RawFd};
6 #[cfg(debug_assertions)]
7 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 use std::time::Duration;
9 use std::{cmp, i32, io, ptr};
10
11 /// Unique id for use as `SelectorId`.
12 #[cfg(debug_assertions)]
13 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
14
15 #[derive(Debug)]
16 pub struct Selector {
17 #[cfg(debug_assertions)]
18 id: usize,
19 ep: RawFd,
20 #[cfg(debug_assertions)]
21 has_waker: AtomicBool,
22 }
23
24 impl Selector {
new() -> io::Result<Selector>25 pub fn new() -> io::Result<Selector> {
26 // According to libuv, `EPOLL_CLOEXEC` is not defined on Android API <
27 // 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform,
28 // so we use it instead.
29 #[cfg(target_os = "android")]
30 let flag = libc::O_CLOEXEC;
31 #[cfg(not(target_os = "android"))]
32 let flag = libc::EPOLL_CLOEXEC;
33
34 syscall!(epoll_create1(flag)).map(|ep| Selector {
35 #[cfg(debug_assertions)]
36 id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
37 ep,
38 #[cfg(debug_assertions)]
39 has_waker: AtomicBool::new(false),
40 })
41 }
42
try_clone(&self) -> io::Result<Selector>43 pub fn try_clone(&self) -> io::Result<Selector> {
44 syscall!(fcntl(self.ep, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|ep| Selector {
45 // It's the same selector, so we use the same id.
46 #[cfg(debug_assertions)]
47 id: self.id,
48 ep,
49 #[cfg(debug_assertions)]
50 has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
51 })
52 }
53
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>54 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
55 // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
56 // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
57 // architectures. The magic number is the same constant used by libuv.
58 #[cfg(target_pointer_width = "32")]
59 const MAX_SAFE_TIMEOUT: u128 = 1789569;
60 #[cfg(not(target_pointer_width = "32"))]
61 const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128;
62
63 let timeout = timeout
64 .map(|to| cmp::min(to.as_millis(), MAX_SAFE_TIMEOUT) as libc::c_int)
65 .unwrap_or(-1);
66
67 events.clear();
68 syscall!(epoll_wait(
69 self.ep,
70 events.as_mut_ptr(),
71 events.capacity() as i32,
72 timeout,
73 ))
74 .map(|n_events| {
75 // This is safe because `epoll_wait` ensures that `n_events` are
76 // assigned.
77 unsafe { events.set_len(n_events as usize) };
78 })
79 }
80
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>81 pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
82 let mut event = libc::epoll_event {
83 events: interests_to_epoll(interests),
84 u64: usize::from(token) as u64,
85 };
86
87 syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
88 }
89
reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>90 pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
91 let mut event = libc::epoll_event {
92 events: interests_to_epoll(interests),
93 u64: usize::from(token) as u64,
94 };
95
96 syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
97 }
98
deregister(&self, fd: RawFd) -> io::Result<()>99 pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
100 syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
101 }
102
103 #[cfg(debug_assertions)]
register_waker(&self) -> bool104 pub fn register_waker(&self) -> bool {
105 self.has_waker.swap(true, Ordering::AcqRel)
106 }
107 }
108
109 cfg_io_source! {
110 impl Selector {
111 #[cfg(debug_assertions)]
112 pub fn id(&self) -> usize {
113 self.id
114 }
115 }
116 }
117
118 impl AsRawFd for Selector {
as_raw_fd(&self) -> RawFd119 fn as_raw_fd(&self) -> RawFd {
120 self.ep
121 }
122 }
123
124 impl Drop for Selector {
drop(&mut self)125 fn drop(&mut self) {
126 if let Err(err) = syscall!(close(self.ep)) {
127 error!("error closing epoll: {}", err);
128 }
129 }
130 }
131
interests_to_epoll(interests: Interest) -> u32132 fn interests_to_epoll(interests: Interest) -> u32 {
133 let mut kind = EPOLLET;
134
135 if interests.is_readable() {
136 kind = kind | EPOLLIN | EPOLLRDHUP;
137 }
138
139 if interests.is_writable() {
140 kind |= EPOLLOUT;
141 }
142
143 kind as u32
144 }
145
146 pub type Event = libc::epoll_event;
147 pub type Events = Vec<Event>;
148
149 pub mod event {
150 use std::fmt;
151
152 use crate::sys::Event;
153 use crate::Token;
154
token(event: &Event) -> Token155 pub fn token(event: &Event) -> Token {
156 Token(event.u64 as usize)
157 }
158
is_readable(event: &Event) -> bool159 pub fn is_readable(event: &Event) -> bool {
160 (event.events as libc::c_int & libc::EPOLLIN) != 0
161 || (event.events as libc::c_int & libc::EPOLLPRI) != 0
162 }
163
is_writable(event: &Event) -> bool164 pub fn is_writable(event: &Event) -> bool {
165 (event.events as libc::c_int & libc::EPOLLOUT) != 0
166 }
167
is_error(event: &Event) -> bool168 pub fn is_error(event: &Event) -> bool {
169 (event.events as libc::c_int & libc::EPOLLERR) != 0
170 }
171
is_read_closed(event: &Event) -> bool172 pub fn is_read_closed(event: &Event) -> bool {
173 // Both halves of the socket have closed
174 event.events as libc::c_int & libc::EPOLLHUP != 0
175 // Socket has received FIN or called shutdown(SHUT_RD)
176 || (event.events as libc::c_int & libc::EPOLLIN != 0
177 && event.events as libc::c_int & libc::EPOLLRDHUP != 0)
178 }
179
is_write_closed(event: &Event) -> bool180 pub fn is_write_closed(event: &Event) -> bool {
181 // Both halves of the socket have closed
182 event.events as libc::c_int & libc::EPOLLHUP != 0
183 // Unix pipe write end has closed
184 || (event.events as libc::c_int & libc::EPOLLOUT != 0
185 && event.events as libc::c_int & libc::EPOLLERR != 0)
186 // The other side (read end) of a Unix pipe has closed.
187 || event.events as libc::c_int == libc::EPOLLERR
188 }
189
is_priority(event: &Event) -> bool190 pub fn is_priority(event: &Event) -> bool {
191 (event.events as libc::c_int & libc::EPOLLPRI) != 0
192 }
193
is_aio(_: &Event) -> bool194 pub fn is_aio(_: &Event) -> bool {
195 // Not supported in the kernel, only in libc.
196 false
197 }
198
is_lio(_: &Event) -> bool199 pub fn is_lio(_: &Event) -> bool {
200 // Not supported.
201 false
202 }
203
debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result204 pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
205 #[allow(clippy::trivially_copy_pass_by_ref)]
206 fn check_events(got: &u32, want: &libc::c_int) -> bool {
207 (*got as libc::c_int & want) != 0
208 }
209 debug_detail!(
210 EventsDetails(u32),
211 check_events,
212 libc::EPOLLIN,
213 libc::EPOLLPRI,
214 libc::EPOLLOUT,
215 libc::EPOLLRDNORM,
216 libc::EPOLLRDBAND,
217 libc::EPOLLWRNORM,
218 libc::EPOLLWRBAND,
219 libc::EPOLLMSG,
220 libc::EPOLLERR,
221 libc::EPOLLHUP,
222 libc::EPOLLET,
223 libc::EPOLLRDHUP,
224 libc::EPOLLONESHOT,
225 #[cfg(any(target_os = "linux", target_os = "solaris"))]
226 libc::EPOLLEXCLUSIVE,
227 #[cfg(any(target_os = "android", target_os = "linux"))]
228 libc::EPOLLWAKEUP,
229 libc::EPOLL_CLOEXEC,
230 );
231
232 // Can't reference fields in packed structures.
233 let e_u64 = event.u64;
234 f.debug_struct("epoll_event")
235 .field("events", &EventsDetails(event.events))
236 .field("u64", &e_u64)
237 .finish()
238 }
239 }
240
241 #[cfg(target_os = "android")]
242 #[test]
assert_close_on_exec_flag()243 fn assert_close_on_exec_flag() {
244 // This assertion need to be true for Selector::new.
245 assert_eq!(libc::O_CLOEXEC, libc::EPOLL_CLOEXEC);
246 }
247