1 #![allow(deprecated)]
2 use std::os::unix::io::AsRawFd;
3 use std::os::unix::io::RawFd;
4 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
5 use std::time::Duration;
6 use std::{cmp, i32};
7
8 use libc::{self, c_int};
9 use libc::{EPOLLERR, EPOLLHUP, EPOLLONESHOT};
10 use libc::{EPOLLET, EPOLLOUT, EPOLLIN, EPOLLPRI};
11
12 use {io, Ready, PollOpt, Token};
13 use event_imp::Event;
14 use sys::unix::{cvt, UnixReady};
15 use sys::unix::io::set_cloexec;
16
17 /// Each Selector has a globally unique(ish) ID associated with it. This ID
18 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
19 /// registered with the `Selector`. If a type that is previously associated with
20 /// a `Selector` attempts to register itself with a different `Selector`, the
21 /// operation will return with an error. This matches windows behavior.
22 static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
23
24 #[derive(Debug)]
25 pub struct Selector {
26 id: usize,
27 epfd: RawFd,
28 }
29
30 impl Selector {
new() -> io::Result<Selector>31 pub fn new() -> io::Result<Selector> {
32 let epfd = unsafe {
33 // Emulate `epoll_create` by using `epoll_create1` if it's available
34 // and otherwise falling back to `epoll_create` followed by a call to
35 // set the CLOEXEC flag.
36 dlsym!(fn epoll_create1(c_int) -> c_int);
37
38 match epoll_create1.get() {
39 Some(epoll_create1_fn) => {
40 cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))?
41 }
42 None => {
43 let fd = cvt(libc::epoll_create(1024))?;
44 drop(set_cloexec(fd));
45 fd
46 }
47 }
48 };
49
50 // offset by 1 to avoid choosing 0 as the id of a selector
51 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
52
53 Ok(Selector {
54 id: id,
55 epfd: epfd,
56 })
57 }
58
id(&self) -> usize59 pub fn id(&self) -> usize {
60 self.id
61 }
62
63 /// Wait for events from the OS
select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool>64 pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
65 let timeout_ms = timeout
66 .map(|to| cmp::min(millis(to), i32::MAX as u64) as i32)
67 .unwrap_or(-1);
68
69 // Wait for epoll events for at most timeout_ms milliseconds
70 evts.clear();
71 unsafe {
72 let cnt = cvt(libc::epoll_wait(self.epfd,
73 evts.events.as_mut_ptr(),
74 evts.events.capacity() as i32,
75 timeout_ms))?;
76 let cnt = cnt as usize;
77 evts.events.set_len(cnt);
78
79 for i in 0..cnt {
80 if evts.events[i].u64 as usize == awakener.into() {
81 evts.events.remove(i);
82 return Ok(true);
83 }
84 }
85 }
86
87 Ok(false)
88 }
89
90 /// Register event interests for the given IO handle with the OS
register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()>91 pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
92 let mut info = libc::epoll_event {
93 events: ioevent_to_epoll(interests, opts),
94 u64: usize::from(token) as u64
95 };
96
97 unsafe {
98 cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?;
99 Ok(())
100 }
101 }
102
103 /// Register event interests for the given IO handle with the OS
reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()>104 pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
105 let mut info = libc::epoll_event {
106 events: ioevent_to_epoll(interests, opts),
107 u64: usize::from(token) as u64
108 };
109
110 unsafe {
111 cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?;
112 Ok(())
113 }
114 }
115
116 /// Deregister event interests for the given IO handle with the OS
deregister(&self, fd: RawFd) -> io::Result<()>117 pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
118 // The &info argument should be ignored by the system,
119 // but linux < 2.6.9 required it to be not null.
120 // For compatibility, we provide a dummy EpollEvent.
121 let mut info = libc::epoll_event {
122 events: 0,
123 u64: 0,
124 };
125
126 unsafe {
127 cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?;
128 Ok(())
129 }
130 }
131 }
132
ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32133 fn ioevent_to_epoll(interest: Ready, opts: PollOpt) -> u32 {
134 let mut kind = 0;
135
136 if interest.is_readable() {
137 kind |= EPOLLIN;
138 }
139
140 if interest.is_writable() {
141 kind |= EPOLLOUT;
142 }
143
144 if UnixReady::from(interest).is_priority() {
145 kind |= EPOLLPRI;
146 }
147
148 if opts.is_edge() {
149 kind |= EPOLLET;
150 }
151
152 if opts.is_oneshot() {
153 kind |= EPOLLONESHOT;
154 }
155
156 if opts.is_level() {
157 kind &= !EPOLLET;
158 }
159
160 kind as u32
161 }
162
163 impl AsRawFd for Selector {
as_raw_fd(&self) -> RawFd164 fn as_raw_fd(&self) -> RawFd {
165 self.epfd
166 }
167 }
168
169 impl Drop for Selector {
drop(&mut self)170 fn drop(&mut self) {
171 unsafe {
172 let _ = libc::close(self.epfd);
173 }
174 }
175 }
176
177 pub struct Events {
178 events: Vec<libc::epoll_event>,
179 }
180
181 impl Events {
with_capacity(u: usize) -> Events182 pub fn with_capacity(u: usize) -> Events {
183 Events {
184 events: Vec::with_capacity(u)
185 }
186 }
187
188 #[inline]
len(&self) -> usize189 pub fn len(&self) -> usize {
190 self.events.len()
191 }
192
193 #[inline]
capacity(&self) -> usize194 pub fn capacity(&self) -> usize {
195 self.events.capacity()
196 }
197
198 #[inline]
is_empty(&self) -> bool199 pub fn is_empty(&self) -> bool {
200 self.events.is_empty()
201 }
202
203 #[inline]
get(&self, idx: usize) -> Option<Event>204 pub fn get(&self, idx: usize) -> Option<Event> {
205 self.events.get(idx).map(|event| {
206 let epoll = event.events as c_int;
207 let mut kind = Ready::empty();
208
209 if (epoll & EPOLLIN) != 0 {
210 kind = kind | Ready::readable();
211 }
212
213 if (epoll & EPOLLPRI) != 0 {
214 kind = kind | Ready::readable() | UnixReady::priority();
215 }
216
217 if (epoll & EPOLLOUT) != 0 {
218 kind = kind | Ready::writable();
219 }
220
221 // EPOLLHUP - Usually means a socket error happened
222 if (epoll & EPOLLERR) != 0 {
223 kind = kind | UnixReady::error();
224 }
225
226 if (epoll & EPOLLHUP) != 0 {
227 kind = kind | UnixReady::hup();
228 }
229
230 let token = self.events[idx].u64;
231
232 Event::new(kind, Token(token as usize))
233 })
234 }
235
push_event(&mut self, event: Event)236 pub fn push_event(&mut self, event: Event) {
237 self.events.push(libc::epoll_event {
238 events: ioevent_to_epoll(event.readiness(), PollOpt::empty()),
239 u64: usize::from(event.token()) as u64
240 });
241 }
242
clear(&mut self)243 pub fn clear(&mut self) {
244 unsafe { self.events.set_len(0); }
245 }
246 }
247
248 const NANOS_PER_MILLI: u32 = 1_000_000;
249 const MILLIS_PER_SEC: u64 = 1_000;
250
251 /// Convert a `Duration` to milliseconds, rounding up and saturating at
252 /// `u64::MAX`.
253 ///
254 /// The saturating is fine because `u64::MAX` milliseconds are still many
255 /// million years.
millis(duration: Duration) -> u64256 pub fn millis(duration: Duration) -> u64 {
257 // Round up.
258 let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
259 duration.as_secs().saturating_mul(MILLIS_PER_SEC).saturating_add(millis as u64)
260 }
261