1 use {io, Event, PollOpt, Ready, Token};
2 use sys::fuchsia::{
3     assert_fuchsia_ready_repr,
4     epoll_event_to_ready,
5     poll_opts_to_wait_async,
6     EventedFd,
7     EventedFdInner,
8     FuchsiaReady,
9 };
10 use zircon;
11 use zircon::AsHandleRef;
12 use zircon_sys::zx_handle_t;
13 use std::collections::hash_map;
14 use std::fmt;
15 use std::mem;
16 use std::sync::atomic::{AtomicBool, AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
17 use std::sync::{Arc, Mutex, Weak};
18 use std::time::Duration;
19 use sys;
20 
21 /// The kind of registration-- file descriptor or handle.
22 ///
23 /// The last bit of a token is set to indicate the type of the registration.
24 #[derive(Copy, Clone, Eq, PartialEq)]
25 enum RegType {
26     Fd,
27     Handle,
28 }
29 
key_from_token_and_type(token: Token, reg_type: RegType) -> io::Result<u64>30 fn key_from_token_and_type(token: Token, reg_type: RegType) -> io::Result<u64> {
31     let key = token.0 as u64;
32     let msb = 1u64 << 63;
33     if (key & msb) != 0 {
34         return Err(io::Error::new(
35             io::ErrorKind::InvalidInput,
36             "Most-significant bit of token must remain unset."));
37     }
38 
39     Ok(match reg_type {
40         RegType::Fd => key,
41         RegType::Handle => key | msb,
42     })
43 }
44 
token_and_type_from_key(key: u64) -> (Token, RegType)45 fn token_and_type_from_key(key: u64) -> (Token, RegType) {
46     let msb = 1u64 << 63;
47     (
48         Token((key & !msb) as usize),
49         if (key & msb) == 0 {
50             RegType::Fd
51         } else {
52             RegType::Handle
53         }
54     )
55 }
56 
57 /// Each Selector has a globally unique(ish) ID associated with it. This ID
58 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
59 /// registered with the `Selector`. If a type that is previously associated with
60 /// a `Selector` attempts to register itself with a different `Selector`, the
61 /// operation will return with an error. This matches windows behavior.
62 static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
63 
64 pub struct Selector {
65     id: usize,
66 
67     /// Zircon object on which the handles have been registered, and on which events occur
68     port: Arc<zircon::Port>,
69 
70     /// Whether or not `tokens_to_rereg` contains any elements. This is a best-effort attempt
71     /// used to prevent having to lock `tokens_to_rereg` when it is empty.
72     has_tokens_to_rereg: AtomicBool,
73 
74     /// List of `Token`s corresponding to registrations that need to be reregistered before the
75     /// next `port::wait`. This is necessary to provide level-triggered behavior for
76     /// `Async::repeating` registrations.
77     ///
78     /// When a level-triggered `Async::repeating` event is seen, its token is added to this list so
79     /// that it will be reregistered before the next `port::wait` call, making `port::wait` return
80     /// immediately if the signal was high during the reregistration.
81     ///
82     /// Note: when used at the same time, the `tokens_to_rereg` lock should be taken out _before_
83     /// `token_to_fd`.
84     tokens_to_rereg: Mutex<Vec<Token>>,
85 
86     /// Map from tokens to weak references to `EventedFdInner`-- a structure describing a
87     /// file handle, its associated `fdio` object, and its current registration.
88     token_to_fd: Mutex<hash_map::HashMap<Token, Weak<EventedFdInner>>>,
89 }
90 
91 impl Selector {
new() -> io::Result<Selector>92     pub fn new() -> io::Result<Selector> {
93         // Assertion from fuchsia/ready.rs to make sure that FuchsiaReady's representation is
94         // compatible with Ready.
95         assert_fuchsia_ready_repr();
96 
97         let port = Arc::new(
98             zircon::Port::create(zircon::PortOpts::Default)?
99         );
100 
101         // offset by 1 to avoid choosing 0 as the id of a selector
102         let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
103 
104         let has_tokens_to_rereg = AtomicBool::new(false);
105         let tokens_to_rereg = Mutex::new(Vec::new());
106         let token_to_fd = Mutex::new(hash_map::HashMap::new());
107 
108         Ok(Selector {
109             id: id,
110             port: port,
111             has_tokens_to_rereg: has_tokens_to_rereg,
112             tokens_to_rereg: tokens_to_rereg,
113             token_to_fd: token_to_fd,
114         })
115     }
116 
id(&self) -> usize117     pub fn id(&self) -> usize {
118         self.id
119     }
120 
121     /// Returns a reference to the underlying port `Arc`.
port(&self) -> &Arc<zircon::Port>122     pub fn port(&self) -> &Arc<zircon::Port> { &self.port }
123 
124     /// Reregisters all registrations pointed to by the `tokens_to_rereg` list
125     /// if `has_tokens_to_rereg`.
reregister_handles(&self) -> io::Result<()>126     fn reregister_handles(&self) -> io::Result<()> {
127         // We use `Ordering::Acquire` to make sure that we see all `tokens_to_rereg`
128         // written before the store using `Ordering::Release`.
129         if self.has_tokens_to_rereg.load(Ordering::Acquire) {
130             let mut tokens = self.tokens_to_rereg.lock().unwrap();
131             let token_to_fd = self.token_to_fd.lock().unwrap();
132             for token in tokens.drain(0..) {
133                 if let Some(eventedfd) = token_to_fd.get(&token)
134                     .and_then(|h| h.upgrade()) {
135                     eventedfd.rereg_for_level(&self.port);
136                 }
137             }
138             self.has_tokens_to_rereg.store(false, Ordering::Release);
139         }
140         Ok(())
141     }
142 
select(&self, evts: &mut Events, _awakener: Token, timeout: Option<Duration>) -> io::Result<bool>143     pub fn select(&self,
144                   evts: &mut Events,
145                   _awakener: Token,
146                   timeout: Option<Duration>) -> io::Result<bool>
147     {
148         evts.clear();
149 
150         self.reregister_handles()?;
151 
152         let deadline = match timeout {
153             Some(duration) => {
154                 let nanos = duration.as_secs().saturating_mul(1_000_000_000)
155                                 .saturating_add(duration.subsec_nanos() as u64);
156 
157                 zircon::deadline_after(nanos)
158             }
159             None => zircon::ZX_TIME_INFINITE,
160         };
161 
162         let packet = match self.port.wait(deadline) {
163             Ok(packet) => packet,
164             Err(zircon::Status::ErrTimedOut) => return Ok(false),
165             Err(e) => Err(e)?,
166         };
167 
168         let observed_signals = match packet.contents() {
169             zircon::PacketContents::SignalOne(signal_packet) => {
170                 signal_packet.observed()
171             }
172             zircon::PacketContents::SignalRep(signal_packet) => {
173                 signal_packet.observed()
174             }
175             zircon::PacketContents::User(_user_packet) => {
176                 // User packets are only ever sent by an Awakener
177                 return Ok(true);
178             }
179         };
180 
181         let key = packet.key();
182         let (token, reg_type) = token_and_type_from_key(key);
183 
184         match reg_type {
185             RegType::Handle => {
186                 // We can return immediately-- no lookup or registration necessary.
187                 evts.events.push(Event::new(Ready::from(observed_signals), token));
188                 Ok(false)
189             },
190             RegType::Fd => {
191                 // Convert the signals to epoll events using __fdio_wait_end,
192                 // and add to reregistration list if necessary.
193                 let events: u32;
194                 {
195                     let handle = if let Some(handle) =
196                     self.token_to_fd.lock().unwrap()
197                         .get(&token)
198                         .and_then(|h| h.upgrade()) {
199                         handle
200                     } else {
201                         // This handle is apparently in the process of removal.
202                         // It has been removed from the list, but port_cancel has not been called.
203                         return Ok(false);
204                     };
205 
206                     events = unsafe {
207                         let mut events: u32 = mem::uninitialized();
208                         sys::fuchsia::sys::__fdio_wait_end(handle.fdio(), observed_signals, &mut events);
209                         events
210                     };
211 
212                     // If necessary, queue to be reregistered before next port_await
213                     let needs_to_rereg = {
214                         let registration_lock = handle.registration().lock().unwrap();
215 
216                         registration_lock
217                             .as_ref()
218                             .and_then(|r| r.rereg_signals())
219                             .is_some()
220                     };
221 
222                     if needs_to_rereg {
223                         let mut tokens_to_rereg_lock = self.tokens_to_rereg.lock().unwrap();
224                         tokens_to_rereg_lock.push(token);
225                         // We use `Ordering::Release` to make sure that we see all `tokens_to_rereg`
226                         // written before the store.
227                         self.has_tokens_to_rereg.store(true, Ordering::Release);
228                     }
229                 }
230 
231                 evts.events.push(Event::new(epoll_event_to_ready(events), token));
232                 Ok(false)
233             },
234         }
235     }
236 
237     /// Register event interests for the given IO handle with the OS
register_fd(&self, handle: &zircon::Handle, fd: &EventedFd, token: Token, signals: zircon::Signals, poll_opts: PollOpt) -> io::Result<()>238     pub fn register_fd(&self,
239                        handle: &zircon::Handle,
240                        fd: &EventedFd,
241                        token: Token,
242                        signals: zircon::Signals,
243                        poll_opts: PollOpt) -> io::Result<()>
244     {
245         {
246             let mut token_to_fd = self.token_to_fd.lock().unwrap();
247             match token_to_fd.entry(token) {
248                 hash_map::Entry::Occupied(_) =>
249                     return Err(io::Error::new(io::ErrorKind::AlreadyExists,
250                                "Attempted to register a filedescriptor on an existing token.")),
251                 hash_map::Entry::Vacant(slot) => slot.insert(Arc::downgrade(&fd.inner)),
252             };
253         }
254 
255         let wait_async_opts = poll_opts_to_wait_async(poll_opts);
256 
257         let wait_res = handle.wait_async_handle(&self.port, token.0 as u64, signals, wait_async_opts);
258 
259         if wait_res.is_err() {
260             self.token_to_fd.lock().unwrap().remove(&token);
261         }
262 
263         Ok(wait_res?)
264     }
265 
266     /// Deregister event interests for the given IO handle with the OS
deregister_fd(&self, handle: &zircon::Handle, token: Token) -> io::Result<()>267     pub fn deregister_fd(&self, handle: &zircon::Handle, token: Token) -> io::Result<()> {
268         self.token_to_fd.lock().unwrap().remove(&token);
269 
270         // We ignore NotFound errors since oneshots are automatically deregistered,
271         // but mio will attempt to deregister them manually.
272         self.port.cancel(&*handle, token.0 as u64)
273             .map_err(io::Error::from)
274             .or_else(|e| if e.kind() == io::ErrorKind::NotFound {
275                 Ok(())
276             } else {
277                 Err(e)
278             })
279     }
280 
register_handle(&self, handle: zx_handle_t, token: Token, interests: Ready, poll_opts: PollOpt) -> io::Result<()>281     pub fn register_handle(&self,
282                            handle: zx_handle_t,
283                            token: Token,
284                            interests: Ready,
285                            poll_opts: PollOpt) -> io::Result<()>
286     {
287         if poll_opts.is_level() && !poll_opts.is_oneshot() {
288             return Err(io::Error::new(io::ErrorKind::InvalidInput,
289                       "Repeated level-triggered events are not supported on Fuchsia handles."));
290         }
291 
292         let temp_handle = unsafe { zircon::Handle::from_raw(handle) };
293 
294         let res = temp_handle.wait_async_handle(
295                     &self.port,
296                     key_from_token_and_type(token, RegType::Handle)?,
297                     FuchsiaReady::from(interests).into_zx_signals(),
298                     poll_opts_to_wait_async(poll_opts));
299 
300         mem::forget(temp_handle);
301 
302         Ok(res?)
303     }
304 
305 
deregister_handle(&self, handle: zx_handle_t, token: Token) -> io::Result<()>306     pub fn deregister_handle(&self, handle: zx_handle_t, token: Token) -> io::Result<()>
307     {
308         let temp_handle = unsafe { zircon::Handle::from_raw(handle) };
309         let res = self.port.cancel(&temp_handle, key_from_token_and_type(token, RegType::Handle)?);
310 
311         mem::forget(temp_handle);
312 
313         Ok(res?)
314     }
315 }
316 
317 pub struct Events {
318     events: Vec<Event>
319 }
320 
321 impl Events {
with_capacity(_u: usize) -> Events322     pub fn with_capacity(_u: usize) -> Events {
323         // The Fuchsia selector only handles one event at a time,
324         // so we ignore the default capacity and set it to one.
325         Events { events: Vec::with_capacity(1) }
326     }
len(&self) -> usize327     pub fn len(&self) -> usize {
328         self.events.len()
329     }
capacity(&self) -> usize330     pub fn capacity(&self) -> usize {
331         self.events.capacity()
332     }
is_empty(&self) -> bool333     pub fn is_empty(&self) -> bool {
334         self.events.is_empty()
335     }
get(&self, idx: usize) -> Option<Event>336     pub fn get(&self, idx: usize) -> Option<Event> {
337         self.events.get(idx).map(|e| *e)
338     }
push_event(&mut self, event: Event)339     pub fn push_event(&mut self, event: Event) {
340         self.events.push(event)
341     }
clear(&mut self)342     pub fn clear(&mut self) {
343         self.events.events.drain(0..);
344     }
345 }
346 
347 impl fmt::Debug for Events {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result348     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
349         fmt.debug_struct("Events")
350             .field("len", &self.len())
351             .finish()
352     }
353 }
354