1 #![allow(deprecated)]
2 use std::os::unix::io::AsRawFd;
3 use std::os::unix::io::RawFd;
4 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
5 use std::time::Duration;
6 use std::{cmp, i32};
7
8 use libc::{self, c_int};
9 use libc::{EPOLLERR, EPOLLHUP, EPOLLONESHOT};
10 use libc::{EPOLLET, EPOLLOUT, EPOLLIN, EPOLLPRI};
11
12 use {io, Ready, PollOpt, Token};
13 use event_imp::Event;
14 use sys::unix::{cvt, UnixReady};
15 use sys::unix::io::set_cloexec;
16
17 /// Each Selector has a globally unique(ish) ID associated with it. This ID
18 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
19 /// registered with the `Selector`. If a type that is previously associated with
20 /// a `Selector` attempts to register itself with a different `Selector`, the
21 /// operation will return with an error. This matches windows behavior.
22 static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
23
24 #[derive(Debug)]
25 pub struct Selector {
26 id: usize,
27 epfd: RawFd,
28 }
29
30 impl Selector {
new() -> io::Result<Selector>31 pub fn new() -> io::Result<Selector> {
32 let epfd = unsafe {
33 // Emulate `epoll_create` by using `epoll_create1` if it's available
34 // and otherwise falling back to `epoll_create` followed by a call to
35 // set the CLOEXEC flag.
36 dlsym!(fn epoll_create1(c_int) -> c_int);
37
38 match epoll_create1.get() {
39 Some(epoll_create1_fn) => {
40 cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))?
41 }
42 None => {
43 let fd = cvt(libc::epoll_create(1024))?;
44 drop(set_cloexec(fd));
45 fd
46 }
47 }
48 };
49
50 // offset by 1 to avoid choosing 0 as the id of a selector
51 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
52
53 Ok(Selector {
54 id: id,
55 epfd: epfd,
56 })
57 }
58
id(&self) -> usize59 pub fn id(&self) -> usize {
60 self.id
61 }
62
63 /// Wait for events from the OS
select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool>64 pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
65 // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
66 // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
67 // architectures. The magic number is the same constant used by libuv.
68 #[cfg(target_pointer_width = "32")]
69 const MAX_SAFE_TIMEOUT: u64 = 1789569;
70 #[cfg(not(target_pointer_width = "32"))]
71 const MAX_SAFE_TIMEOUT: u64 = c_int::max_value() as u64;
72
73 let timeout_ms = timeout
74 .map(|to| cmp::min(millis(to), MAX_SAFE_TIMEOUT) as c_int)
75 .unwrap_or(-1);
76
77 // Wait for epoll events for at most timeout_ms milliseconds
78 evts.clear();
79 unsafe {
80 let cnt = cvt(libc::epoll_wait(self.epfd,
81 evts.events.as_mut_ptr(),
82 evts.events.capacity() as i32,
83 timeout_ms))?;
84 let cnt = cnt as usize;
85 evts.events.set_len(cnt);
86
87 for i in 0..cnt {
88 if evts.events[i].u64 as usize == awakener.into() {
89 evts.events.remove(i);
90 return Ok(true);
91 }
92 }
93 }
94
95 Ok(false)
96 }
97
98 /// Register event interests for the given IO handle with the OS
register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()>99 pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
100 let mut info = libc::epoll_event {
101 events: ioevent_to_epoll(interests, opts),
102 u64: usize::from(token) as u64
103 };
104
105 unsafe {
106 cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?;
107 Ok(())
108 }
109 }
110
111 /// Register event interests for the given IO handle with the OS
reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()>112 pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
113 let mut info = libc::epoll_event {
114 events: ioevent_to_epoll(interests, opts),
115 u64: usize::from(token) as u64
116 };
117
118 unsafe {
119 cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?;
120 Ok(())
121 }
122 }
123
124 /// Deregister event interests for the given IO handle with the OS
deregister(&self, fd: RawFd) -> io::Result<()>125 pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
126 // The &info argument should be ignored by the system,
127 // but linux < 2.6.9 required it to be not null.
128 // For compatibility, we provide a dummy EpollEvent.
129 let mut info = libc::epoll_event {
130 events: 0,
131 u64: 0,
132 };
133
134 unsafe {
135 cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?;
136 Ok(())
137 }
138 }
139 }
140
ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32141 fn ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32 {
142 let mut kind = 0;
143
144 if interest.is_readable() {
145 kind |= EPOLLIN;
146 }
147
148 if interest.is_writable() {
149 kind |= EPOLLOUT;
150 }
151
152 if UnixReady::from(interest).is_priority() {
153 kind |= EPOLLPRI;
154 }
155
156 if opts.is_edge() {
157 kind |= EPOLLET;
158 }
159
160 if opts.is_oneshot() {
161 kind |= EPOLLONESHOT;
162 }
163
164 if opts.is_level() {
165 kind &= !EPOLLET;
166 }
167
168 kind as u32
169 }
170
171 impl AsRawFd for Selector {
as_raw_fd(&self) -> RawFd172 fn as_raw_fd(&self) -> RawFd {
173 self.epfd
174 }
175 }
176
177 impl Drop for Selector {
drop(&mut self)178 fn drop(&mut self) {
179 unsafe {
180 let _ = libc::close(self.epfd);
181 }
182 }
183 }
184
185 pub struct Events {
186 events: Vec<libc::epoll_event>,
187 }
188
189 impl Events {
with_capacity(u: usize) -> Events190 pub fn with_capacity(u: usize) -> Events {
191 Events {
192 events: Vec::with_capacity(u)
193 }
194 }
195
196 #[inline]
len(&self) -> usize197 pub fn len(&self) -> usize {
198 self.events.len()
199 }
200
201 #[inline]
capacity(&self) -> usize202 pub fn capacity(&self) -> usize {
203 self.events.capacity()
204 }
205
206 #[inline]
is_empty(&self) -> bool207 pub fn is_empty(&self) -> bool {
208 self.events.is_empty()
209 }
210
211 #[inline]
get(&self, idx: usize) -> Option<Event>212 pub fn get(&self, idx: usize) -> Option<Event> {
213 self.events.get(idx).map(|event| {
214 let epoll = event.events as c_int;
215 let mut kind = Ready::empty();
216
217 if (epoll & EPOLLIN) != 0 {
218 kind = kind | Ready::readable();
219 }
220
221 if (epoll & EPOLLPRI) != 0 {
222 kind = kind | Ready::readable() | UnixReady::priority();
223 }
224
225 if (epoll & EPOLLOUT) != 0 {
226 kind = kind | Ready::writable();
227 }
228
229 // EPOLLHUP - Usually means a socket error happened
230 if (epoll & EPOLLERR) != 0 {
231 kind = kind | UnixReady::error();
232 }
233
234 if (epoll & EPOLLHUP) != 0 {
235 kind = kind | UnixReady::hup();
236 }
237
238 let token = self.events[idx].u64;
239
240 Event::new(kind, Token(token as usize))
241 })
242 }
243
push_event(&mut self, event: Event)244 pub fn push_event(&mut self, event: Event) {
245 self.events.push(libc::epoll_event {
246 events: ioevent_to_epoll(event.readiness(), PollOpt::empty()),
247 u64: usize::from(event.token()) as u64
248 });
249 }
250
clear(&mut self)251 pub fn clear(&mut self) {
252 unsafe { self.events.set_len(0); }
253 }
254 }
255
256 const NANOS_PER_MILLI: u32 = 1_000_000;
257 const MILLIS_PER_SEC: u64 = 1_000;
258
259 /// Convert a `Duration` to milliseconds, rounding up and saturating at
260 /// `u64::MAX`.
261 ///
262 /// The saturating is fine because `u64::MAX` milliseconds are still many
263 /// million years.
millis(duration: Duration) -> u64264 pub fn millis(duration: Duration) -> u64 {
265 // Round up.
266 let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
267 duration.as_secs().saturating_mul(MILLIS_PER_SEC).saturating_add(millis as u64)
268 }
269