1 use {io, Event, PollOpt, Ready, Token};
2 use sys::fuchsia::{
3 assert_fuchsia_ready_repr,
4 epoll_event_to_ready,
5 poll_opts_to_wait_async,
6 EventedFd,
7 EventedFdInner,
8 FuchsiaReady,
9 };
10 use zircon;
11 use zircon::AsHandleRef;
12 use zircon_sys::zx_handle_t;
13 use std::collections::hash_map;
14 use std::fmt;
15 use std::mem;
16 use std::sync::atomic::{AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
17 use std::sync::{Arc, Mutex, Weak};
18 use std::time::Duration;
19 use sys;
20
21 /// The kind of registration-- file descriptor or handle.
22 ///
23 /// The last bit of a token is set to indicate the type of the registration.
24 #[derive(Copy, Clone, Eq, PartialEq)]
25 enum RegType {
26 Fd,
27 Handle,
28 }
29
key_from_token_and_type(token: Token, reg_type: RegType) -> io::Result<u64>30 fn key_from_token_and_type(token: Token, reg_type: RegType) -> io::Result<u64> {
31 let key = token.0 as u64;
32 let msb = 1u64 << 63;
33 if (key & msb) != 0 {
34 return Err(io::Error::new(
35 io::ErrorKind::InvalidInput,
36 "Most-significant bit of token must remain unset."));
37 }
38
39 Ok(match reg_type {
40 RegType::Fd => key,
41 RegType::Handle => key | msb,
42 })
43 }
44
token_and_type_from_key(key: u64) -> (Token, RegType)45 fn token_and_type_from_key(key: u64) -> (Token, RegType) {
46 let msb = 1u64 << 63;
47 (
48 Token((key & !msb) as usize),
49 if (key & msb) == 0 {
50 RegType::Fd
51 } else {
52 RegType::Handle
53 }
54 )
55 }
56
57 /// Each Selector has a globally unique(ish) ID associated with it. This ID
58 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
59 /// registered with the `Selector`. If a type that is previously associated with
60 /// a `Selector` attempts to register itself with a different `Selector`, the
61 /// operation will return with an error. This matches windows behavior.
62 static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
63
64 pub struct Selector {
65 id: usize,
66
67 /// Zircon object on which the handles have been registered, and on which events occur
68 port: Arc<zircon::Port>,
69
70 /// Whether or not `tokens_to_rereg` contains any elements. This is a best-effort attempt
71 /// used to prevent having to lock `tokens_to_rereg` when it is empty.
72 has_tokens_to_rereg: AtomicBool,
73
74 /// List of `Token`s corresponding to registrations that need to be reregistered before the
75 /// next `port::wait`. This is necessary to provide level-triggered behavior for
76 /// `Async::repeating` registrations.
77 ///
78 /// When a level-triggered `Async::repeating` event is seen, its token is added to this list so
79 /// that it will be reregistered before the next `port::wait` call, making `port::wait` return
80 /// immediately if the signal was high during the reregistration.
81 ///
82 /// Note: when used at the same time, the `tokens_to_rereg` lock should be taken out _before_
83 /// `token_to_fd`.
84 tokens_to_rereg: Mutex<Vec<Token>>,
85
86 /// Map from tokens to weak references to `EventedFdInner`-- a structure describing a
87 /// file handle, its associated `fdio` object, and its current registration.
88 token_to_fd: Mutex<hash_map::HashMap<Token, Weak<EventedFdInner>>>,
89 }
90
91 impl Selector {
new() -> io::Result<Selector>92 pub fn new() -> io::Result<Selector> {
93 // Assertion from fuchsia/ready.rs to make sure that FuchsiaReady's representation is
94 // compatible with Ready.
95 assert_fuchsia_ready_repr();
96
97 let port = Arc::new(
98 zircon::Port::create(zircon::PortOpts::Default)?
99 );
100
101 // offset by 1 to avoid choosing 0 as the id of a selector
102 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
103
104 let has_tokens_to_rereg = AtomicBool::new(false);
105 let tokens_to_rereg = Mutex::new(Vec::new());
106 let token_to_fd = Mutex::new(hash_map::HashMap::new());
107
108 Ok(Selector {
109 id: id,
110 port: port,
111 has_tokens_to_rereg: has_tokens_to_rereg,
112 tokens_to_rereg: tokens_to_rereg,
113 token_to_fd: token_to_fd,
114 })
115 }
116
id(&self) -> usize117 pub fn id(&self) -> usize {
118 self.id
119 }
120
121 /// Returns a reference to the underlying port `Arc`.
port(&self) -> &Arc<zircon::Port>122 pub fn port(&self) -> &Arc<zircon::Port> { &self.port }
123
124 /// Reregisters all registrations pointed to by the `tokens_to_rereg` list
125 /// if `has_tokens_to_rereg`.
reregister_handles(&self) -> io::Result<()>126 fn reregister_handles(&self) -> io::Result<()> {
127 // We use `Ordering::Acquire` to make sure that we see all `tokens_to_rereg`
128 // written before the store using `Ordering::Release`.
129 if self.has_tokens_to_rereg.load(Ordering::Acquire) {
130 let mut tokens = self.tokens_to_rereg.lock().unwrap();
131 let token_to_fd = self.token_to_fd.lock().unwrap();
132 for token in tokens.drain(0..) {
133 if let Some(eventedfd) = token_to_fd.get(&token)
134 .and_then(|h| h.upgrade()) {
135 eventedfd.rereg_for_level(&self.port);
136 }
137 }
138 self.has_tokens_to_rereg.store(false, Ordering::Release);
139 }
140 Ok(())
141 }
142
select(&self, evts: &mut Events, _awakener: Token, timeout: Option<Duration>) -> io::Result<bool>143 pub fn select(&self,
144 evts: &mut Events,
145 _awakener: Token,
146 timeout: Option<Duration>) -> io::Result<bool>
147 {
148 evts.clear();
149
150 self.reregister_handles()?;
151
152 let deadline = match timeout {
153 Some(duration) => {
154 let nanos = duration.as_secs().saturating_mul(1_000_000_000)
155 .saturating_add(duration.subsec_nanos() as u64);
156
157 zircon::deadline_after(nanos)
158 }
159 None => zircon::ZX_TIME_INFINITE,
160 };
161
162 let packet = match self.port.wait(deadline) {
163 Ok(packet) => packet,
164 Err(zircon::Status::ErrTimedOut) => return Ok(false),
165 Err(e) => Err(e)?,
166 };
167
168 let observed_signals = match packet.contents() {
169 zircon::PacketContents::SignalOne(signal_packet) => {
170 signal_packet.observed()
171 }
172 zircon::PacketContents::SignalRep(signal_packet) => {
173 signal_packet.observed()
174 }
175 zircon::PacketContents::User(_user_packet) => {
176 // User packets are only ever sent by an Awakener
177 return Ok(true);
178 }
179 };
180
181 let key = packet.key();
182 let (token, reg_type) = token_and_type_from_key(key);
183
184 match reg_type {
185 RegType::Handle => {
186 // We can return immediately-- no lookup or registration necessary.
187 evts.events.push(Event::new(Ready::from(observed_signals), token));
188 Ok(false)
189 },
190 RegType::Fd => {
191 // Convert the signals to epoll events using __fdio_wait_end,
192 // and add to reregistration list if necessary.
193 let events: u32;
194 {
195 let handle = if let Some(handle) =
196 self.token_to_fd.lock().unwrap()
197 .get(&token)
198 .and_then(|h| h.upgrade()) {
199 handle
200 } else {
201 // This handle is apparently in the process of removal.
202 // It has been removed from the list, but port_cancel has not been called.
203 return Ok(false);
204 };
205
206 events = unsafe {
207 let mut events: u32 = mem::uninitialized();
208 sys::fuchsia::sys::__fdio_wait_end(handle.fdio(), observed_signals, &mut events);
209 events
210 };
211
212 // If necessary, queue to be reregistered before next port_await
213 let needs_to_rereg = {
214 let registration_lock = handle.registration().lock().unwrap();
215
216 registration_lock
217 .as_ref()
218 .and_then(|r| r.rereg_signals())
219 .is_some()
220 };
221
222 if needs_to_rereg {
223 let mut tokens_to_rereg_lock = self.tokens_to_rereg.lock().unwrap();
224 tokens_to_rereg_lock.push(token);
225 // We use `Ordering::Release` to make sure that we see all `tokens_to_rereg`
226 // written before the store.
227 self.has_tokens_to_rereg.store(true, Ordering::Release);
228 }
229 }
230
231 evts.events.push(Event::new(epoll_event_to_ready(events), token));
232 Ok(false)
233 },
234 }
235 }
236
237 /// Register event interests for the given IO handle with the OS
register_fd(&self, handle: &zircon::Handle, fd: &EventedFd, token: Token, signals: zircon::Signals, poll_opts: PollOpt) -> io::Result<()>238 pub fn register_fd(&self,
239 handle: &zircon::Handle,
240 fd: &EventedFd,
241 token: Token,
242 signals: zircon::Signals,
243 poll_opts: PollOpt) -> io::Result<()>
244 {
245 {
246 let mut token_to_fd = self.token_to_fd.lock().unwrap();
247 match token_to_fd.entry(token) {
248 hash_map::Entry::Occupied(_) =>
249 return Err(io::Error::new(io::ErrorKind::AlreadyExists,
250 "Attempted to register a filedescriptor on an existing token.")),
251 hash_map::Entry::Vacant(slot) => slot.insert(Arc::downgrade(&fd.inner)),
252 };
253 }
254
255 let wait_async_opts = poll_opts_to_wait_async(poll_opts);
256
257 let wait_res = handle.wait_async_handle(&self.port, token.0 as u64, signals, wait_async_opts);
258
259 if wait_res.is_err() {
260 self.token_to_fd.lock().unwrap().remove(&token);
261 }
262
263 Ok(wait_res?)
264 }
265
266 /// Deregister event interests for the given IO handle with the OS
deregister_fd(&self, handle: &zircon::Handle, token: Token) -> io::Result<()>267 pub fn deregister_fd(&self, handle: &zircon::Handle, token: Token) -> io::Result<()> {
268 self.token_to_fd.lock().unwrap().remove(&token);
269
270 // We ignore NotFound errors since oneshots are automatically deregistered,
271 // but mio will attempt to deregister them manually.
272 self.port.cancel(&*handle, token.0 as u64)
273 .map_err(io::Error::from)
274 .or_else(|e| if e.kind() == io::ErrorKind::NotFound {
275 Ok(())
276 } else {
277 Err(e)
278 })
279 }
280
register_handle(&self, handle: zx_handle_t, token: Token, interests: Ready, poll_opts: PollOpt) -> io::Result<()>281 pub fn register_handle(&self,
282 handle: zx_handle_t,
283 token: Token,
284 interests: Ready,
285 poll_opts: PollOpt) -> io::Result<()>
286 {
287 if poll_opts.is_level() && !poll_opts.is_oneshot() {
288 return Err(io::Error::new(io::ErrorKind::InvalidInput,
289 "Repeated level-triggered events are not supported on Fuchsia handles."));
290 }
291
292 let temp_handle = unsafe { zircon::Handle::from_raw(handle) };
293
294 let res = temp_handle.wait_async_handle(
295 &self.port,
296 key_from_token_and_type(token, RegType::Handle)?,
297 FuchsiaReady::from(interests).into_zx_signals(),
298 poll_opts_to_wait_async(poll_opts));
299
300 mem::forget(temp_handle);
301
302 Ok(res?)
303 }
304
305
deregister_handle(&self, handle: zx_handle_t, token: Token) -> io::Result<()>306 pub fn deregister_handle(&self, handle: zx_handle_t, token: Token) -> io::Result<()>
307 {
308 let temp_handle = unsafe { zircon::Handle::from_raw(handle) };
309 let res = self.port.cancel(&temp_handle, key_from_token_and_type(token, RegType::Handle)?);
310
311 mem::forget(temp_handle);
312
313 Ok(res?)
314 }
315 }
316
317 pub struct Events {
318 events: Vec<Event>
319 }
320
321 impl Events {
with_capacity(_u: usize) -> Events322 pub fn with_capacity(_u: usize) -> Events {
323 // The Fuchsia selector only handles one event at a time,
324 // so we ignore the default capacity and set it to one.
325 Events { events: Vec::with_capacity(1) }
326 }
len(&self) -> usize327 pub fn len(&self) -> usize {
328 self.events.len()
329 }
capacity(&self) -> usize330 pub fn capacity(&self) -> usize {
331 self.events.capacity()
332 }
is_empty(&self) -> bool333 pub fn is_empty(&self) -> bool {
334 self.events.is_empty()
335 }
get(&self, idx: usize) -> Option<Event>336 pub fn get(&self, idx: usize) -> Option<Event> {
337 self.events.get(idx).map(|e| *e)
338 }
push_event(&mut self, event: Event)339 pub fn push_event(&mut self, event: Event) {
340 self.events.push(event)
341 }
clear(&mut self)342 pub fn clear(&mut self) {
343 self.events.events.drain(0..);
344 }
345 }
346
347 impl fmt::Debug for Events {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result348 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
349 fmt.debug_struct("Events")
350 .field("len", &self.len())
351 .finish()
352 }
353 }
354