1 #![allow(deprecated)]
2 
3 use std::{fmt, io};
4 use std::cell::UnsafeCell;
5 use std::os::windows::prelude::*;
6 use std::sync::{Arc, Mutex};
7 use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
8 use std::time::Duration;
9 
10 use lazycell::AtomicLazyCell;
11 
12 use winapi::*;
13 use miow;
14 use miow::iocp::{CompletionPort, CompletionStatus};
15 
16 use event_imp::{Event, Evented, Ready};
17 use poll::{self, Poll};
18 use sys::windows::buffer_pool::BufferPool;
19 use {Token, PollOpt};
20 
21 /// Each Selector has a globally unique(ish) ID associated with it. This ID
22 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
23 /// registered with the `Selector`. If a type that is previously associated with
24 /// a `Selector` attempts to register itself with a different `Selector`, the
25 /// operation will return with an error. This matches windows behavior.
26 static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
27 
28 /// The guts of the Windows event loop, this is the struct which actually owns
29 /// a completion port.
30 ///
31 /// Internally this is just an `Arc`, and this allows handing out references to
32 /// the internals to I/O handles registered on this selector. This is
33 /// required to schedule I/O operations independently of being inside the event
34 /// loop (e.g. when a call to `write` is seen we're not "in the event loop").
35 pub struct Selector {
36     inner: Arc<SelectorInner>,
37 }
38 
39 struct SelectorInner {
40     /// Unique identifier of the `Selector`
41     id: usize,
42 
43     /// The actual completion port that's used to manage all I/O
44     port: CompletionPort,
45 
46     /// A pool of buffers usable by this selector.
47     ///
48     /// Primitives will take buffers from this pool to perform I/O operations,
49     /// and once complete they'll be put back in.
50     buffers: Mutex<BufferPool>,
51 }
52 
53 impl Selector {
new() -> io::Result<Selector>54     pub fn new() -> io::Result<Selector> {
55         // offset by 1 to avoid choosing 0 as the id of a selector
56         let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
57 
58         CompletionPort::new(0).map(|cp| {
59             Selector {
60                 inner: Arc::new(SelectorInner {
61                     id: id,
62                     port: cp,
63                     buffers: Mutex::new(BufferPool::new(256)),
64                 }),
65             }
66         })
67     }
68 
select(&self, events: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool>69     pub fn select(&self,
70                   events: &mut Events,
71                   awakener: Token,
72                   timeout: Option<Duration>) -> io::Result<bool> {
73         trace!("select; timeout={:?}", timeout);
74 
75         // Clear out the previous list of I/O events and get some more!
76         events.clear();
77 
78         trace!("polling IOCP");
79         let n = match self.inner.port.get_many(&mut events.statuses, timeout) {
80             Ok(statuses) => statuses.len(),
81             Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => 0,
82             Err(e) => return Err(e),
83         };
84 
85         let mut ret = false;
86         for status in events.statuses[..n].iter() {
87             // This should only ever happen from the awakener, and we should
88             // only ever have one awakener right now, so assert as such.
89             if status.overlapped() as usize == 0 {
90                 assert_eq!(status.token(), usize::from(awakener));
91                 ret = true;
92                 continue;
93             }
94 
95             let callback = unsafe {
96                 (*(status.overlapped() as *mut Overlapped)).callback
97             };
98 
99             trace!("select; -> got overlapped");
100             callback(status.entry());
101         }
102 
103         trace!("returning");
104         Ok(ret)
105     }
106 
107     /// Gets a reference to the underlying `CompletionPort` structure.
port(&self) -> &CompletionPort108     pub fn port(&self) -> &CompletionPort {
109         &self.inner.port
110     }
111 
112     /// Gets a new reference to this selector, although all underlying data
113     /// structures will refer to the same completion port.
clone_ref(&self) -> Selector114     pub fn clone_ref(&self) -> Selector {
115         Selector { inner: self.inner.clone() }
116     }
117 
118     /// Return the `Selector`'s identifier
id(&self) -> usize119     pub fn id(&self) -> usize {
120         self.inner.id
121     }
122 }
123 
124 impl SelectorInner {
identical(&self, other: &SelectorInner) -> bool125     fn identical(&self, other: &SelectorInner) -> bool {
126         (self as *const SelectorInner) == (other as *const SelectorInner)
127     }
128 }
129 
130 // A registration is stored in each I/O object which keeps track of how it is
131 // associated with a `Selector` above.
132 //
133 // Once associated with a `Selector`, a registration can never be un-associated
134 // (due to IOCP requirements). This is actually implemented through the
135 // `poll::Registration` and `poll::SetReadiness` APIs to keep track of all the
136 // level/edge/filtering business.
137 /// A `Binding` is embedded in all I/O objects associated with a `Poll`
138 /// object.
139 ///
140 /// Each registration keeps track of which selector the I/O object is
141 /// associated with, ensuring that implementations of `Evented` can be
142 /// conformant for the various methods on Windows.
143 ///
144 /// If you're working with custom IOCP-enabled objects then you'll want to
145 /// ensure that one of these instances is stored in your object and used in the
146 /// implementation of `Evented`.
147 ///
148 /// For more information about how to use this see the `windows` module
149 /// documentation in this crate.
150 pub struct Binding {
151     selector: AtomicLazyCell<Arc<SelectorInner>>,
152 }
153 
154 impl Binding {
155     /// Creates a new blank binding ready to be inserted into an I/O
156     /// object.
157     ///
158     /// Won't actually do anything until associated with a `Poll` loop.
new() -> Binding159     pub fn new() -> Binding {
160         Binding { selector: AtomicLazyCell::new() }
161     }
162 
163     /// Registers a new handle with the `Poll` specified, also assigning the
164     /// `token` specified.
165     ///
166     /// This function is intended to be used as part of `Evented::register` for
167     /// custom IOCP objects. It will add the specified handle to the internal
168     /// IOCP object with the provided `token`. All future events generated by
169     /// the handled provided will be received by the `Poll`'s internal IOCP
170     /// object.
171     ///
172     /// # Unsafety
173     ///
174     /// This function is unsafe as the `Poll` instance has assumptions about
175     /// what the `OVERLAPPED` pointer used for each I/O operation looks like.
176     /// Specifically they must all be instances of the `Overlapped` type in
177     /// this crate. More information about this can be found on the
178     /// `windows` module in this crate.
register_handle(&self, handle: &AsRawHandle, token: Token, poll: &Poll) -> io::Result<()>179     pub unsafe fn register_handle(&self,
180                                   handle: &AsRawHandle,
181                                   token: Token,
182                                   poll: &Poll) -> io::Result<()> {
183         let selector = poll::selector(poll);
184 
185         // Ignore errors, we'll see them on the next line.
186         drop(self.selector.fill(selector.inner.clone()));
187         self.check_same_selector(poll)?;
188 
189         selector.inner.port.add_handle(usize::from(token), handle)
190     }
191 
192     /// Same as `register_handle` but for sockets.
register_socket(&self, handle: &AsRawSocket, token: Token, poll: &Poll) -> io::Result<()>193     pub unsafe fn register_socket(&self,
194                                   handle: &AsRawSocket,
195                                   token: Token,
196                                   poll: &Poll) -> io::Result<()> {
197         let selector = poll::selector(poll);
198         drop(self.selector.fill(selector.inner.clone()));
199         self.check_same_selector(poll)?;
200         selector.inner.port.add_socket(usize::from(token), handle)
201     }
202 
203     /// Reregisters the handle provided from the `Poll` provided.
204     ///
205     /// This is intended to be used as part of `Evented::reregister` but note
206     /// that this function does not currently reregister the provided handle
207     /// with the `poll` specified. IOCP has a special binding for changing the
208     /// token which has not yet been implemented. Instead this function should
209     /// be used to assert that the call to `reregister` happened on the same
210     /// `Poll` that was passed into to `register`.
211     ///
212     /// Eventually, though, the provided `handle` will be re-assigned to have
213     /// the token `token` on the given `poll` assuming that it's been
214     /// previously registered with it.
215     ///
216     /// # Unsafety
217     ///
218     /// This function is unsafe for similar reasons to `register`. That is,
219     /// there may be pending I/O events and such which aren't handled correctly.
reregister_handle(&self, _handle: &AsRawHandle, _token: Token, poll: &Poll) -> io::Result<()>220     pub unsafe fn reregister_handle(&self,
221                                     _handle: &AsRawHandle,
222                                     _token: Token,
223                                     poll: &Poll) -> io::Result<()> {
224         self.check_same_selector(poll)
225     }
226 
227     /// Same as `reregister_handle`, but for sockets.
reregister_socket(&self, _socket: &AsRawSocket, _token: Token, poll: &Poll) -> io::Result<()>228     pub unsafe fn reregister_socket(&self,
229                                     _socket: &AsRawSocket,
230                                     _token: Token,
231                                     poll: &Poll) -> io::Result<()> {
232         self.check_same_selector(poll)
233     }
234 
235     /// Deregisters the handle provided from the `Poll` provided.
236     ///
237     /// This is intended to be used as part of `Evented::deregister` but note
238     /// that this function does not currently deregister the provided handle
239     /// from the `poll` specified. IOCP has a special binding for that which has
240     /// not yet been implemented. Instead this function should be used to assert
241     /// that the call to `deregister` happened on the same `Poll` that was
242     /// passed into to `register`.
243     ///
244     /// # Unsafety
245     ///
246     /// This function is unsafe for similar reasons to `register`. That is,
247     /// there may be pending I/O events and such which aren't handled correctly.
deregister_handle(&self, _handle: &AsRawHandle, poll: &Poll) -> io::Result<()>248     pub unsafe fn deregister_handle(&self,
249                                     _handle: &AsRawHandle,
250                                     poll: &Poll) -> io::Result<()> {
251         self.check_same_selector(poll)
252     }
253 
254     /// Same as `deregister_handle`, but for sockets.
deregister_socket(&self, _socket: &AsRawSocket, poll: &Poll) -> io::Result<()>255     pub unsafe fn deregister_socket(&self,
256                                     _socket: &AsRawSocket,
257                                     poll: &Poll) -> io::Result<()> {
258         self.check_same_selector(poll)
259     }
260 
check_same_selector(&self, poll: &Poll) -> io::Result<()>261     fn check_same_selector(&self, poll: &Poll) -> io::Result<()> {
262         let selector = poll::selector(poll);
263         match self.selector.borrow() {
264             Some(prev) if prev.identical(&selector.inner) => Ok(()),
265             Some(_) |
266             None => Err(other("socket already registered")),
267         }
268     }
269 }
270 
271 impl fmt::Debug for Binding {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result272     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
273         f.debug_struct("Binding")
274             .finish()
275     }
276 }
277 
278 /// Helper struct used for TCP and UDP which bundles a `binding` with a
279 /// `SetReadiness` handle.
280 pub struct ReadyBinding {
281     binding: Binding,
282     readiness: Option<poll::SetReadiness>,
283 }
284 
285 impl ReadyBinding {
286     /// Creates a new blank binding ready to be inserted into an I/O object.
287     ///
288     /// Won't actually do anything until associated with an `Selector` loop.
new() -> ReadyBinding289     pub fn new() -> ReadyBinding {
290         ReadyBinding {
291             binding: Binding::new(),
292             readiness: None,
293         }
294     }
295 
296     /// Returns whether this binding has been associated with a selector
297     /// yet.
registered(&self) -> bool298     pub fn registered(&self) -> bool {
299         self.readiness.is_some()
300     }
301 
302     /// Acquires a buffer with at least `size` capacity.
303     ///
304     /// If associated with a selector, this will attempt to pull a buffer from
305     /// that buffer pool. If not associated with a selector, this will allocate
306     /// a fresh buffer.
get_buffer(&self, size: usize) -> Vec<u8>307     pub fn get_buffer(&self, size: usize) -> Vec<u8> {
308         match self.binding.selector.borrow() {
309             Some(i) => i.buffers.lock().unwrap().get(size),
310             None => Vec::with_capacity(size),
311         }
312     }
313 
314     /// Returns a buffer to this binding.
315     ///
316     /// If associated with a selector, this will push the buffer back into the
317     /// selector's pool of buffers. Otherwise this will just drop the buffer.
put_buffer(&self, buf: Vec<u8>)318     pub fn put_buffer(&self, buf: Vec<u8>) {
319         if let Some(i) = self.binding.selector.borrow() {
320             i.buffers.lock().unwrap().put(buf);
321         }
322     }
323 
324     /// Sets the readiness of this I/O object to a particular `set`.
325     ///
326     /// This is later used to fill out and respond to requests to `poll`. Note
327     /// that this is all implemented through the `SetReadiness` structure in the
328     /// `poll` module.
set_readiness(&self, set: Ready)329     pub fn set_readiness(&self, set: Ready) {
330         if let Some(ref i) = self.readiness {
331             trace!("set readiness to {:?}", set);
332             i.set_readiness(set).expect("event loop disappeared?");
333         }
334     }
335 
336     /// Queries what the current readiness of this I/O object is.
337     ///
338     /// This is what's being used to generate events returned by `poll`.
readiness(&self) -> Ready339     pub fn readiness(&self) -> Ready {
340         match self.readiness {
341             Some(ref i) => i.readiness(),
342             None => Ready::empty(),
343         }
344     }
345 
346     /// Implementation of the `Evented::register` function essentially.
347     ///
348     /// Returns an error if we're already registered with another event loop,
349     /// and otherwise just reassociates ourselves with the event loop to
350     /// possible change tokens.
register_socket(&mut self, socket: &AsRawSocket, poll: &Poll, token: Token, events: Ready, opts: PollOpt, registration: &Mutex<Option<poll::Registration>>) -> io::Result<()>351     pub fn register_socket(&mut self,
352                            socket: &AsRawSocket,
353                            poll: &Poll,
354                            token: Token,
355                            events: Ready,
356                            opts: PollOpt,
357                            registration: &Mutex<Option<poll::Registration>>)
358                            -> io::Result<()> {
359         trace!("register {:?} {:?}", token, events);
360         unsafe {
361             self.binding.register_socket(socket, token, poll)?;
362         }
363 
364         let (r, s) = poll::new_registration(poll, token, events, opts);
365         self.readiness = Some(s);
366         *registration.lock().unwrap() = Some(r);
367         Ok(())
368     }
369 
370     /// Implementation of `Evented::reregister` function.
reregister_socket(&mut self, socket: &AsRawSocket, poll: &Poll, token: Token, events: Ready, opts: PollOpt, registration: &Mutex<Option<poll::Registration>>) -> io::Result<()>371     pub fn reregister_socket(&mut self,
372                              socket: &AsRawSocket,
373                              poll: &Poll,
374                              token: Token,
375                              events: Ready,
376                              opts: PollOpt,
377                              registration: &Mutex<Option<poll::Registration>>)
378                              -> io::Result<()> {
379         trace!("reregister {:?} {:?}", token, events);
380         unsafe {
381             self.binding.reregister_socket(socket, token, poll)?;
382         }
383 
384         registration.lock().unwrap()
385                     .as_mut().unwrap()
386                     .reregister(poll, token, events, opts)
387     }
388 
389     /// Implementation of the `Evented::deregister` function.
390     ///
391     /// Doesn't allow registration with another event loop, just shuts down
392     /// readiness notifications and such.
deregister(&mut self, socket: &AsRawSocket, poll: &Poll, registration: &Mutex<Option<poll::Registration>>) -> io::Result<()>393     pub fn deregister(&mut self,
394                       socket: &AsRawSocket,
395                       poll: &Poll,
396                       registration: &Mutex<Option<poll::Registration>>)
397                       -> io::Result<()> {
398         trace!("deregistering");
399         unsafe {
400             self.binding.deregister_socket(socket, poll)?;
401         }
402 
403         registration.lock().unwrap()
404                     .as_ref().unwrap()
405                     .deregister(poll)
406     }
407 }
408 
other(s: &str) -> io::Error409 fn other(s: &str) -> io::Error {
410     io::Error::new(io::ErrorKind::Other, s)
411 }
412 
413 #[derive(Debug)]
414 pub struct Events {
415     /// Raw I/O event completions are filled in here by the call to `get_many`
416     /// on the completion port above. These are then processed to run callbacks
417     /// which figure out what to do after the event is done.
418     statuses: Box<[CompletionStatus]>,
419 
420     /// Literal events returned by `get` to the upwards `EventLoop`. This file
421     /// doesn't really modify this (except for the awakener), instead almost all
422     /// events are filled in by the `ReadinessQueue` from the `poll` module.
423     events: Vec<Event>,
424 }
425 
426 impl Events {
with_capacity(cap: usize) -> Events427     pub fn with_capacity(cap: usize) -> Events {
428         // Note that it's possible for the output `events` to grow beyond the
429         // capacity as it can also include deferred events, but that's certainly
430         // not the end of the world!
431         Events {
432             statuses: vec![CompletionStatus::zero(); cap].into_boxed_slice(),
433             events: Vec::with_capacity(cap),
434         }
435     }
436 
is_empty(&self) -> bool437     pub fn is_empty(&self) -> bool {
438         self.events.is_empty()
439     }
440 
len(&self) -> usize441     pub fn len(&self) -> usize {
442         self.events.len()
443     }
444 
capacity(&self) -> usize445     pub fn capacity(&self) -> usize {
446         self.events.capacity()
447     }
448 
get(&self, idx: usize) -> Option<Event>449     pub fn get(&self, idx: usize) -> Option<Event> {
450         self.events.get(idx).map(|e| *e)
451     }
452 
push_event(&mut self, event: Event)453     pub fn push_event(&mut self, event: Event) {
454         self.events.push(event);
455     }
456 
clear(&mut self)457     pub fn clear(&mut self) {
458         self.events.truncate(0);
459     }
460 }
461 
462 macro_rules! overlapped2arc {
463     ($e:expr, $t:ty, $($field:ident).+) => ({
464         let offset = offset_of!($t, $($field).+);
465         debug_assert!(offset < mem::size_of::<$t>());
466         FromRawArc::from_raw(($e as usize - offset) as *mut $t)
467     })
468 }
469 
470 macro_rules! offset_of {
471     ($t:ty, $($field:ident).+) => (
472         &(*(0 as *const $t)).$($field).+ as *const _ as usize
473     )
474 }
475 
476 // See sys::windows module docs for why this exists.
477 //
478 // The gist of it is that `Selector` assumes that all `OVERLAPPED` pointers are
479 // actually inside one of these structures so it can use the `Callback` stored
480 // right after it.
481 //
482 // We use repr(C) here to ensure that we can assume the overlapped pointer is
483 // at the start of the structure so we can just do a cast.
484 /// A wrapper around an internal instance over `miow::Overlapped` which is in
485 /// turn a wrapper around the Windows type `OVERLAPPED`.
486 ///
487 /// This type is required to be used for all IOCP operations on handles that are
488 /// registered with an event loop. The event loop will receive notifications
489 /// over `OVERLAPPED` pointers that have completed, and it will cast that
490 /// pointer to a pointer to this structure and invoke the associated callback.
491 #[repr(C)]
492 pub struct Overlapped {
493     inner: UnsafeCell<miow::Overlapped>,
494     callback: fn(&OVERLAPPED_ENTRY),
495 }
496 
497 impl Overlapped {
498     /// Creates a new `Overlapped` which will invoke the provided `cb` callback
499     /// whenever it's triggered.
500     ///
501     /// The returned `Overlapped` must be used as the `OVERLAPPED` passed to all
502     /// I/O operations that are registered with mio's event loop. When the I/O
503     /// operation associated with an `OVERLAPPED` pointer completes the event
504     /// loop will invoke the function pointer provided by `cb`.
new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped505     pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped {
506         Overlapped {
507             inner: UnsafeCell::new(miow::Overlapped::zero()),
508             callback: cb,
509         }
510     }
511 
512     /// Get the underlying `Overlapped` instance as a raw pointer.
513     ///
514     /// This can be useful when only a shared borrow is held and the overlapped
515     /// pointer needs to be passed down to winapi.
as_mut_ptr(&self) -> *mut OVERLAPPED516     pub fn as_mut_ptr(&self) -> *mut OVERLAPPED {
517         unsafe {
518             (*self.inner.get()).raw()
519         }
520     }
521 }
522 
523 impl fmt::Debug for Overlapped {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result524     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
525         f.debug_struct("Overlapped")
526             .finish()
527     }
528 }
529 
530 // Overlapped's APIs are marked as unsafe Overlapped's APIs are marked as
531 // unsafe as they must be used with caution to ensure thread safety. The
532 // structure itself is safe to send across threads.
533 unsafe impl Send for Overlapped {}
534 unsafe impl Sync for Overlapped {}
535