1 #![allow(deprecated)]
2 use std::os::unix::io::AsRawFd;
3 use std::os::unix::io::RawFd;
4 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
5 use std::time::Duration;
6 use std::{cmp, i32};
7 
8 use libc::{self, c_int};
9 use libc::{EPOLLERR, EPOLLHUP, EPOLLONESHOT};
10 use libc::{EPOLLET, EPOLLOUT, EPOLLIN, EPOLLPRI};
11 
12 use {io, Ready, PollOpt, Token};
13 use event_imp::Event;
14 use sys::unix::{cvt, UnixReady};
15 use sys::unix::io::set_cloexec;
16 
17 /// Each Selector has a globally unique(ish) ID associated with it. This ID
18 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
19 /// registered with the `Selector`. If a type that is previously associated with
20 /// a `Selector` attempts to register itself with a different `Selector`, the
21 /// operation will return with an error. This matches windows behavior.
22 static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
23 
24 #[derive(Debug)]
25 pub struct Selector {
26     id: usize,
27     epfd: RawFd,
28 }
29 
30 impl Selector {
new() -> io::Result<Selector>31     pub fn new() -> io::Result<Selector> {
32         let epfd = unsafe {
33             // Emulate `epoll_create` by using `epoll_create1` if it's available
34             // and otherwise falling back to `epoll_create` followed by a call to
35             // set the CLOEXEC flag.
36             dlsym!(fn epoll_create1(c_int) -> c_int);
37 
38             match epoll_create1.get() {
39                 Some(epoll_create1_fn) => {
40                     cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))?
41                 }
42                 None => {
43                     let fd = cvt(libc::epoll_create(1024))?;
44                     drop(set_cloexec(fd));
45                     fd
46                 }
47             }
48         };
49 
50         // offset by 1 to avoid choosing 0 as the id of a selector
51         let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
52 
53         Ok(Selector {
54             id: id,
55             epfd: epfd,
56         })
57     }
58 
id(&self) -> usize59     pub fn id(&self) -> usize {
60         self.id
61     }
62 
63     /// Wait for events from the OS
select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool>64     pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
65         let timeout_ms = timeout
66             .map(|to| cmp::min(millis(to), i32::MAX as u64) as i32)
67             .unwrap_or(-1);
68 
69         // Wait for epoll events for at most timeout_ms milliseconds
70         evts.clear();
71         unsafe {
72             let cnt = cvt(libc::epoll_wait(self.epfd,
73                                            evts.events.as_mut_ptr(),
74                                            evts.events.capacity() as i32,
75                                            timeout_ms))?;
76             let cnt = cnt as usize;
77             evts.events.set_len(cnt);
78 
79             for i in 0..cnt {
80                 if evts.events[i].u64 as usize == awakener.into() {
81                     evts.events.remove(i);
82                     return Ok(true);
83                 }
84             }
85         }
86 
87         Ok(false)
88     }
89 
90     /// Register event interests for the given IO handle with the OS
register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()>91     pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
92         let mut info = libc::epoll_event {
93             events: ioevent_to_epoll(interests, opts),
94             u64: usize::from(token) as u64
95         };
96 
97         unsafe {
98             cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?;
99             Ok(())
100         }
101     }
102 
103     /// Register event interests for the given IO handle with the OS
reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()>104     pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
105         let mut info = libc::epoll_event {
106             events: ioevent_to_epoll(interests, opts),
107             u64: usize::from(token) as u64
108         };
109 
110         unsafe {
111             cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?;
112             Ok(())
113         }
114     }
115 
116     /// Deregister event interests for the given IO handle with the OS
deregister(&self, fd: RawFd) -> io::Result<()>117     pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
118         // The &info argument should be ignored by the system,
119         // but linux < 2.6.9 required it to be not null.
120         // For compatibility, we provide a dummy EpollEvent.
121         let mut info = libc::epoll_event {
122             events: 0,
123             u64: 0,
124         };
125 
126         unsafe {
127             cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?;
128             Ok(())
129         }
130     }
131 }
132 
ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32133 fn ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32 {
134     let mut kind = 0;
135 
136     if interest.is_readable() {
137         kind |= EPOLLIN;
138     }
139 
140     if interest.is_writable() {
141         kind |= EPOLLOUT;
142     }
143 
144     if UnixReady::from(interest).is_priority() {
145         kind |= EPOLLPRI;
146     }
147 
148     if opts.is_edge() {
149         kind |= EPOLLET;
150     }
151 
152     if opts.is_oneshot() {
153         kind |= EPOLLONESHOT;
154     }
155 
156     if opts.is_level() {
157         kind &= !EPOLLET;
158     }
159 
160     kind as u32
161 }
162 
163 impl AsRawFd for Selector {
as_raw_fd(&self) -> RawFd164     fn as_raw_fd(&self) -> RawFd {
165         self.epfd
166     }
167 }
168 
169 impl Drop for Selector {
drop(&mut self)170     fn drop(&mut self) {
171         unsafe {
172             let _ = libc::close(self.epfd);
173         }
174     }
175 }
176 
177 pub struct Events {
178     events: Vec<libc::epoll_event>,
179 }
180 
181 impl Events {
with_capacity(u: usize) -> Events182     pub fn with_capacity(u: usize) -> Events {
183         Events {
184             events: Vec::with_capacity(u)
185         }
186     }
187 
188     #[inline]
len(&self) -> usize189     pub fn len(&self) -> usize {
190         self.events.len()
191     }
192 
193     #[inline]
capacity(&self) -> usize194     pub fn capacity(&self) -> usize {
195         self.events.capacity()
196     }
197 
198     #[inline]
is_empty(&self) -> bool199     pub fn is_empty(&self) -> bool {
200         self.events.is_empty()
201     }
202 
203     #[inline]
get(&self, idx: usize) -> Option<Event>204     pub fn get(&self, idx: usize) -> Option<Event> {
205         self.events.get(idx).map(|event| {
206             let epoll = event.events as c_int;
207             let mut kind = Ready::empty();
208 
209             if (epoll & EPOLLIN) != 0 {
210                 kind = kind | Ready::readable();
211             }
212 
213             if (epoll & EPOLLPRI) != 0 {
214                 kind = kind | Ready::readable() | UnixReady::priority();
215             }
216 
217             if (epoll & EPOLLOUT) != 0 {
218                 kind = kind | Ready::writable();
219             }
220 
221             // EPOLLHUP - Usually means a socket error happened
222             if (epoll & EPOLLERR) != 0 {
223                 kind = kind | UnixReady::error();
224             }
225 
226             if (epoll & EPOLLHUP) != 0 {
227                 kind = kind | UnixReady::hup();
228             }
229 
230             let token = self.events[idx].u64;
231 
232             Event::new(kind, Token(token as usize))
233         })
234     }
235 
push_event(&mut self, event: Event)236     pub fn push_event(&mut self, event: Event) {
237         self.events.push(libc::epoll_event {
238             events: ioevent_to_epoll(event.readiness(), PollOpt::empty()),
239             u64: usize::from(event.token()) as u64
240         });
241     }
242 
clear(&mut self)243     pub fn clear(&mut self) {
244         unsafe { self.events.set_len(0); }
245     }
246 }
247 
248 const NANOS_PER_MILLI: u32 = 1_000_000;
249 const MILLIS_PER_SEC: u64 = 1_000;
250 
251 /// Convert a `Duration` to milliseconds, rounding up and saturating at
252 /// `u64::MAX`.
253 ///
254 /// The saturating is fine because `u64::MAX` milliseconds are still many
255 /// million years.
millis(duration: Duration) -> u64256 pub fn millis(duration: Duration) -> u64 {
257     // Round up.
258     let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
259     duration.as_secs().saturating_mul(MILLIS_PER_SEC).saturating_add(millis as u64)
260 }
261