1 #![allow(deprecated)]
2 use std::os::unix::io::AsRawFd;
3 use std::os::unix::io::RawFd;
4 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
5 use std::time::Duration;
6 use std::{cmp, i32};
7 
8 use libc::{self, c_int};
9 use libc::{EPOLLERR, EPOLLHUP, EPOLLONESHOT};
10 use libc::{EPOLLET, EPOLLOUT, EPOLLIN, EPOLLPRI};
11 
12 use {io, Ready, PollOpt, Token};
13 use event_imp::Event;
14 use sys::unix::{cvt, UnixReady};
15 use sys::unix::io::set_cloexec;
16 
17 /// Each Selector has a globally unique(ish) ID associated with it. This ID
18 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
19 /// registered with the `Selector`. If a type that is previously associated with
20 /// a `Selector` attempts to register itself with a different `Selector`, the
21 /// operation will return with an error. This matches windows behavior.
22 static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
23 
24 #[derive(Debug)]
25 pub struct Selector {
26     id: usize,
27     epfd: RawFd,
28 }
29 
30 impl Selector {
new() -> io::Result<Selector>31     pub fn new() -> io::Result<Selector> {
32         let epfd = unsafe {
33             // Emulate `epoll_create` by using `epoll_create1` if it's available
34             // and otherwise falling back to `epoll_create` followed by a call to
35             // set the CLOEXEC flag.
36             dlsym!(fn epoll_create1(c_int) -> c_int);
37 
38             match epoll_create1.get() {
39                 Some(epoll_create1_fn) => {
40                     cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))?
41                 }
42                 None => {
43                     let fd = cvt(libc::epoll_create(1024))?;
44                     drop(set_cloexec(fd));
45                     fd
46                 }
47             }
48         };
49 
50         // offset by 1 to avoid choosing 0 as the id of a selector
51         let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
52 
53         Ok(Selector {
54             id: id,
55             epfd: epfd,
56         })
57     }
58 
id(&self) -> usize59     pub fn id(&self) -> usize {
60         self.id
61     }
62 
63     /// Wait for events from the OS
select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool>64     pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
65         // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
66         // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
67         // architectures. The magic number is the same constant used by libuv.
68         #[cfg(target_pointer_width = "32")]
69         const MAX_SAFE_TIMEOUT: u64 = 1789569;
70         #[cfg(not(target_pointer_width = "32"))]
71         const MAX_SAFE_TIMEOUT: u64 = c_int::max_value() as u64;
72 
73         let timeout_ms = timeout
74             .map(|to| cmp::min(millis(to), MAX_SAFE_TIMEOUT) as c_int)
75             .unwrap_or(-1);
76 
77         // Wait for epoll events for at most timeout_ms milliseconds
78         evts.clear();
79         unsafe {
80             let cnt = cvt(libc::epoll_wait(self.epfd,
81                                            evts.events.as_mut_ptr(),
82                                            evts.events.capacity() as i32,
83                                            timeout_ms))?;
84             let cnt = cnt as usize;
85             evts.events.set_len(cnt);
86 
87             for i in 0..cnt {
88                 if evts.events[i].u64 as usize == awakener.into() {
89                     evts.events.remove(i);
90                     return Ok(true);
91                 }
92             }
93         }
94 
95         Ok(false)
96     }
97 
98     /// Register event interests for the given IO handle with the OS
register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()>99     pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
100         let mut info = libc::epoll_event {
101             events: ioevent_to_epoll(interests, opts),
102             u64: usize::from(token) as u64
103         };
104 
105         unsafe {
106             cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?;
107             Ok(())
108         }
109     }
110 
111     /// Register event interests for the given IO handle with the OS
reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()>112     pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
113         let mut info = libc::epoll_event {
114             events: ioevent_to_epoll(interests, opts),
115             u64: usize::from(token) as u64
116         };
117 
118         unsafe {
119             cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?;
120             Ok(())
121         }
122     }
123 
124     /// Deregister event interests for the given IO handle with the OS
deregister(&self, fd: RawFd) -> io::Result<()>125     pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
126         // The &info argument should be ignored by the system,
127         // but linux < 2.6.9 required it to be not null.
128         // For compatibility, we provide a dummy EpollEvent.
129         let mut info = libc::epoll_event {
130             events: 0,
131             u64: 0,
132         };
133 
134         unsafe {
135             cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?;
136             Ok(())
137         }
138     }
139 }
140 
ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32141 fn ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32 {
142     let mut kind = 0;
143 
144     if interest.is_readable() {
145         kind |= EPOLLIN;
146     }
147 
148     if interest.is_writable() {
149         kind |= EPOLLOUT;
150     }
151 
152     if UnixReady::from(interest).is_priority() {
153         kind |= EPOLLPRI;
154     }
155 
156     if opts.is_edge() {
157         kind |= EPOLLET;
158     }
159 
160     if opts.is_oneshot() {
161         kind |= EPOLLONESHOT;
162     }
163 
164     if opts.is_level() {
165         kind &= !EPOLLET;
166     }
167 
168     kind as u32
169 }
170 
171 impl AsRawFd for Selector {
as_raw_fd(&self) -> RawFd172     fn as_raw_fd(&self) -> RawFd {
173         self.epfd
174     }
175 }
176 
177 impl Drop for Selector {
drop(&mut self)178     fn drop(&mut self) {
179         unsafe {
180             let _ = libc::close(self.epfd);
181         }
182     }
183 }
184 
185 pub struct Events {
186     events: Vec<libc::epoll_event>,
187 }
188 
189 impl Events {
with_capacity(u: usize) -> Events190     pub fn with_capacity(u: usize) -> Events {
191         Events {
192             events: Vec::with_capacity(u)
193         }
194     }
195 
196     #[inline]
len(&self) -> usize197     pub fn len(&self) -> usize {
198         self.events.len()
199     }
200 
201     #[inline]
capacity(&self) -> usize202     pub fn capacity(&self) -> usize {
203         self.events.capacity()
204     }
205 
206     #[inline]
is_empty(&self) -> bool207     pub fn is_empty(&self) -> bool {
208         self.events.is_empty()
209     }
210 
211     #[inline]
get(&self, idx: usize) -> Option<Event>212     pub fn get(&self, idx: usize) -> Option<Event> {
213         self.events.get(idx).map(|event| {
214             let epoll = event.events as c_int;
215             let mut kind = Ready::empty();
216 
217             if (epoll & EPOLLIN) != 0 {
218                 kind = kind | Ready::readable();
219             }
220 
221             if (epoll & EPOLLPRI) != 0 {
222                 kind = kind | Ready::readable() | UnixReady::priority();
223             }
224 
225             if (epoll & EPOLLOUT) != 0 {
226                 kind = kind | Ready::writable();
227             }
228 
229             // EPOLLHUP - Usually means a socket error happened
230             if (epoll & EPOLLERR) != 0 {
231                 kind = kind | UnixReady::error();
232             }
233 
234             if (epoll & EPOLLHUP) != 0 {
235                 kind = kind | UnixReady::hup();
236             }
237 
238             let token = self.events[idx].u64;
239 
240             Event::new(kind, Token(token as usize))
241         })
242     }
243 
push_event(&mut self, event: Event)244     pub fn push_event(&mut self, event: Event) {
245         self.events.push(libc::epoll_event {
246             events: ioevent_to_epoll(event.readiness(), PollOpt::empty()),
247             u64: usize::from(event.token()) as u64
248         });
249     }
250 
clear(&mut self)251     pub fn clear(&mut self) {
252         unsafe { self.events.set_len(0); }
253     }
254 }
255 
256 const NANOS_PER_MILLI: u32 = 1_000_000;
257 const MILLIS_PER_SEC: u64 = 1_000;
258 
259 /// Convert a `Duration` to milliseconds, rounding up and saturating at
260 /// `u64::MAX`.
261 ///
262 /// The saturating is fine because `u64::MAX` milliseconds are still many
263 /// million years.
millis(duration: Duration) -> u64264 pub fn millis(duration: Duration) -> u64 {
265     // Round up.
266     let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
267     duration.as_secs().saturating_mul(MILLIS_PER_SEC).saturating_add(millis as u64)
268 }
269