1 use super::afd::{self, Afd, AfdPollInfo};
2 use super::io_status_block::IoStatusBlock;
3 use super::Event;
4 use crate::sys::Events;
5 
6 cfg_net! {
7     use crate::sys::event::{
8         ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS,
9     };
10     use crate::Interest;
11 }
12 
13 use miow::iocp::{CompletionPort, CompletionStatus};
14 use std::collections::VecDeque;
15 use std::io;
16 use std::marker::PhantomPinned;
17 use std::os::windows::io::RawSocket;
18 use std::pin::Pin;
19 #[cfg(debug_assertions)]
20 use std::sync::atomic::AtomicUsize;
21 use std::sync::atomic::{AtomicBool, Ordering};
22 use std::sync::{Arc, Mutex};
23 use std::time::Duration;
24 use winapi::shared::ntdef::NT_SUCCESS;
25 use winapi::shared::ntdef::{HANDLE, PVOID};
26 use winapi::shared::ntstatus::STATUS_CANCELLED;
27 use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT};
28 use winapi::um::minwinbase::OVERLAPPED;
29 
30 #[derive(Debug)]
31 struct AfdGroup {
32     cp: Arc<CompletionPort>,
33     afd_group: Mutex<Vec<Arc<Afd>>>,
34 }
35 
36 impl AfdGroup {
new(cp: Arc<CompletionPort>) -> AfdGroup37     pub fn new(cp: Arc<CompletionPort>) -> AfdGroup {
38         AfdGroup {
39             afd_group: Mutex::new(Vec::new()),
40             cp,
41         }
42     }
43 
release_unused_afd(&self)44     pub fn release_unused_afd(&self) {
45         let mut afd_group = self.afd_group.lock().unwrap();
46         afd_group.retain(|g| Arc::strong_count(&g) > 1);
47     }
48 }
49 
50 cfg_io_source! {
51     const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;
52 
53     impl AfdGroup {
54         pub fn acquire(&self) -> io::Result<Arc<Afd>> {
55             let mut afd_group = self.afd_group.lock().unwrap();
56             if afd_group.len() == 0 {
57                 self._alloc_afd_group(&mut afd_group)?;
58             } else {
59                 // + 1 reference in Vec
60                 if Arc::strong_count(afd_group.last().unwrap()) >= POLL_GROUP__MAX_GROUP_SIZE + 1 {
61                     self._alloc_afd_group(&mut afd_group)?;
62                 }
63             }
64 
65             match afd_group.last() {
66                 Some(arc) => Ok(arc.clone()),
67                 None => unreachable!(
68                     "Cannot acquire afd, {:#?}, afd_group: {:#?}",
69                     self, afd_group
70                 ),
71             }
72         }
73 
74         fn _alloc_afd_group(&self, afd_group: &mut Vec<Arc<Afd>>) -> io::Result<()> {
75             let afd = Afd::new(&self.cp)?;
76             let arc = Arc::new(afd);
77             afd_group.push(arc);
78             Ok(())
79         }
80     }
81 }
82 
83 #[derive(Debug)]
84 enum SockPollStatus {
85     Idle,
86     Pending,
87     Cancelled,
88 }
89 
90 #[derive(Debug)]
91 pub struct SockState {
92     iosb: IoStatusBlock,
93     poll_info: AfdPollInfo,
94     afd: Arc<Afd>,
95 
96     raw_socket: RawSocket,
97     base_socket: RawSocket,
98 
99     user_evts: u32,
100     pending_evts: u32,
101 
102     user_data: u64,
103 
104     poll_status: SockPollStatus,
105     delete_pending: bool,
106 
107     // last raw os error
108     error: Option<i32>,
109 
110     pinned: PhantomPinned,
111 }
112 
113 impl SockState {
update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()>114     fn update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()> {
115         assert!(!self.delete_pending);
116 
117         // make sure to reset previous error before a new update
118         self.error = None;
119 
120         if let SockPollStatus::Pending = self.poll_status {
121             if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 {
122                 /* All the events the user is interested in are already being monitored by
123                  * the pending poll operation. It might spuriously complete because of an
124                  * event that we're no longer interested in; when that happens we'll submit
125                  * a new poll operation with the updated event mask. */
126             } else {
127                 /* A poll operation is already pending, but it's not monitoring for all the
128                  * events that the user is interested in. Therefore, cancel the pending
129                  * poll operation; when we receive it's completion package, a new poll
130                  * operation will be submitted with the correct event mask. */
131                 if let Err(e) = self.cancel() {
132                     self.error = e.raw_os_error();
133                     return Err(e);
134                 }
135                 return Ok(());
136             }
137         } else if let SockPollStatus::Cancelled = self.poll_status {
138             /* The poll operation has already been cancelled, we're still waiting for
139              * it to return. For now, there's nothing that needs to be done. */
140         } else if let SockPollStatus::Idle = self.poll_status {
141             /* No poll operation is pending; start one. */
142             self.poll_info.exclusive = 0;
143             self.poll_info.number_of_handles = 1;
144             *unsafe { self.poll_info.timeout.QuadPart_mut() } = std::i64::MAX;
145             self.poll_info.handles[0].handle = self.base_socket as HANDLE;
146             self.poll_info.handles[0].status = 0;
147             self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;
148 
149             // Increase the ref count as the memory will be used by the kernel.
150             let overlapped_ptr = into_overlapped(self_arc.clone());
151 
152             let result = unsafe {
153                 self.afd
154                     .poll(&mut self.poll_info, &mut *self.iosb, overlapped_ptr)
155             };
156             if let Err(e) = result {
157                 let code = e.raw_os_error().unwrap();
158                 if code == ERROR_IO_PENDING as i32 {
159                     /* Overlapped poll operation in progress; this is expected. */
160                 } else {
161                     // Since the operation failed it means the kernel won't be
162                     // using the memory any more.
163                     drop(from_overlapped(overlapped_ptr as *mut _));
164                     if code == ERROR_INVALID_HANDLE as i32 {
165                         /* Socket closed; it'll be dropped. */
166                         self.mark_delete();
167                         return Ok(());
168                     } else {
169                         self.error = e.raw_os_error();
170                         return Err(e);
171                     }
172                 }
173             }
174 
175             self.poll_status = SockPollStatus::Pending;
176             self.pending_evts = self.user_evts;
177         } else {
178             unreachable!("Invalid poll status during update, {:#?}", self)
179         }
180 
181         Ok(())
182     }
183 
cancel(&mut self) -> io::Result<()>184     fn cancel(&mut self) -> io::Result<()> {
185         match self.poll_status {
186             SockPollStatus::Pending => {}
187             _ => unreachable!("Invalid poll status during cancel, {:#?}", self),
188         };
189         unsafe {
190             self.afd.cancel(&mut *self.iosb)?;
191         }
192         self.poll_status = SockPollStatus::Cancelled;
193         self.pending_evts = 0;
194         Ok(())
195     }
196 
197     // This is the function called from the overlapped using as Arc<Mutex<SockState>>. Watch out for reference counting.
feed_event(&mut self) -> Option<Event>198     fn feed_event(&mut self) -> Option<Event> {
199         self.poll_status = SockPollStatus::Idle;
200         self.pending_evts = 0;
201 
202         let mut afd_events = 0;
203         // We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is unsafe to use a pointer of IO_STATUS_BLOCK.
204         unsafe {
205             if self.delete_pending {
206                 return None;
207             } else if self.iosb.u.Status == STATUS_CANCELLED {
208                 /* The poll request was cancelled by CancelIoEx. */
209             } else if !NT_SUCCESS(self.iosb.u.Status) {
210                 /* The overlapped request itself failed in an unexpected way. */
211                 afd_events = afd::POLL_CONNECT_FAIL;
212             } else if self.poll_info.number_of_handles < 1 {
213                 /* This poll operation succeeded but didn't report any socket events. */
214             } else if self.poll_info.handles[0].events & afd::POLL_LOCAL_CLOSE != 0 {
215                 /* The poll operation reported that the socket was closed. */
216                 self.mark_delete();
217                 return None;
218             } else {
219                 afd_events = self.poll_info.handles[0].events;
220             }
221         }
222 
223         afd_events &= self.user_evts;
224 
225         if afd_events == 0 {
226             return None;
227         }
228 
229         // In mio, we have to simulate Edge-triggered behavior to match API usage.
230         // The strategy here is to intercept all read/write from user that could cause WouldBlock usage,
231         // then reregister the socket to reset the interests.
232         self.user_evts &= !afd_events;
233 
234         Some(Event {
235             data: self.user_data,
236             flags: afd_events,
237         })
238     }
239 
is_pending_deletion(&self) -> bool240     pub fn is_pending_deletion(&self) -> bool {
241         self.delete_pending
242     }
243 
mark_delete(&mut self)244     pub fn mark_delete(&mut self) {
245         if !self.delete_pending {
246             if let SockPollStatus::Pending = self.poll_status {
247                 drop(self.cancel());
248             }
249 
250             self.delete_pending = true;
251         }
252     }
253 
has_error(&self) -> bool254     fn has_error(&self) -> bool {
255         self.error.is_some()
256     }
257 }
258 
259 cfg_io_source! {
260     impl SockState {
261         fn new(raw_socket: RawSocket, afd: Arc<Afd>) -> io::Result<SockState> {
262             Ok(SockState {
263                 iosb: IoStatusBlock::zeroed(),
264                 poll_info: AfdPollInfo::zeroed(),
265                 afd,
266                 raw_socket,
267                 base_socket: get_base_socket(raw_socket)?,
268                 user_evts: 0,
269                 pending_evts: 0,
270                 user_data: 0,
271                 poll_status: SockPollStatus::Idle,
272                 delete_pending: false,
273                 error: None,
274                 pinned: PhantomPinned,
275             })
276         }
277 
278         /// True if need to be added on update queue, false otherwise.
279         fn set_event(&mut self, ev: Event) -> bool {
280             /* afd::POLL_CONNECT_FAIL and afd::POLL_ABORT are always reported, even when not requested by the caller. */
281             let events = ev.flags | afd::POLL_CONNECT_FAIL | afd::POLL_ABORT;
282 
283             self.user_evts = events;
284             self.user_data = ev.data;
285 
286             (events & !self.pending_evts) != 0
287         }
288     }
289 }
290 
291 impl Drop for SockState {
drop(&mut self)292     fn drop(&mut self) {
293         self.mark_delete();
294     }
295 }
296 
297 /// Converts the pointer to a `SockState` into a raw pointer.
298 /// To revert see `from_overlapped`.
into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> PVOID299 fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> PVOID {
300     let overlapped_ptr: *const Mutex<SockState> =
301         unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) };
302     overlapped_ptr as *mut _
303 }
304 
305 /// Convert a raw overlapped pointer into a reference to `SockState`.
306 /// Reverts `into_overlapped`.
from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>>307 fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>> {
308     let sock_ptr: *const Mutex<SockState> = ptr as *const _;
309     unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) }
310 }
311 
312 /// Each Selector has a globally unique(ish) ID associated with it. This ID
313 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
314 /// registered with the `Selector`. If a type that is previously associated with
315 /// a `Selector` attempts to register itself with a different `Selector`, the
316 /// operation will return with an error. This matches windows behavior.
317 #[cfg(debug_assertions)]
318 static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
319 
320 /// Windows implementaion of `sys::Selector`
321 ///
322 /// Edge-triggered event notification is simulated by resetting internal event flag of each socket state `SockState`
323 /// and setting all events back by intercepting all requests that could cause `io::ErrorKind::WouldBlock` happening.
324 ///
325 /// This selector is currently only support socket due to `Afd` driver is winsock2 specific.
326 #[derive(Debug)]
327 pub struct Selector {
328     #[cfg(debug_assertions)]
329     id: usize,
330     pub(super) inner: Arc<SelectorInner>,
331     #[cfg(debug_assertions)]
332     has_waker: AtomicBool,
333 }
334 
335 impl Selector {
new() -> io::Result<Selector>336     pub fn new() -> io::Result<Selector> {
337         SelectorInner::new().map(|inner| {
338             #[cfg(debug_assertions)]
339             let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
340             Selector {
341                 #[cfg(debug_assertions)]
342                 id,
343                 inner: Arc::new(inner),
344                 #[cfg(debug_assertions)]
345                 has_waker: AtomicBool::new(false),
346             }
347         })
348     }
349 
try_clone(&self) -> io::Result<Selector>350     pub fn try_clone(&self) -> io::Result<Selector> {
351         Ok(Selector {
352             #[cfg(debug_assertions)]
353             id: self.id,
354             inner: Arc::clone(&self.inner),
355             #[cfg(debug_assertions)]
356             has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
357         })
358     }
359 
360     /// # Safety
361     ///
362     /// This requires a mutable reference to self because only a single thread
363     /// can poll IOCP at a time.
select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>364     pub fn select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
365         self.inner.select(events, timeout)
366     }
367 
368     #[cfg(debug_assertions)]
register_waker(&self) -> bool369     pub fn register_waker(&self) -> bool {
370         self.has_waker.swap(true, Ordering::AcqRel)
371     }
372 
clone_port(&self) -> Arc<CompletionPort>373     pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
374         self.inner.cp.clone()
375     }
376 
377     #[cfg(feature = "os-ext")]
same_port(&self, other: &Arc<CompletionPort>) -> bool378     pub(super) fn same_port(&self, other: &Arc<CompletionPort>) -> bool {
379         Arc::ptr_eq(&self.inner.cp, other)
380     }
381 }
382 
383 cfg_io_source! {
384     use super::InternalState;
385     use crate::Token;
386 
387     impl Selector {
388         pub(super) fn register(
389             &self,
390             socket: RawSocket,
391             token: Token,
392             interests: Interest,
393         ) -> io::Result<InternalState> {
394             SelectorInner::register(&self.inner, socket, token, interests)
395         }
396 
397         pub(super) fn reregister(
398             &self,
399             state: Pin<Arc<Mutex<SockState>>>,
400             token: Token,
401             interests: Interest,
402         ) -> io::Result<()> {
403             self.inner.reregister(state, token, interests)
404         }
405 
406         #[cfg(debug_assertions)]
407         pub fn id(&self) -> usize {
408             self.id
409         }
410     }
411 }
412 
413 #[derive(Debug)]
414 pub struct SelectorInner {
415     pub(super) cp: Arc<CompletionPort>,
416     update_queue: Mutex<VecDeque<Pin<Arc<Mutex<SockState>>>>>,
417     afd_group: AfdGroup,
418     is_polling: AtomicBool,
419 }
420 
421 // We have ensured thread safety by introducing lock manually.
422 unsafe impl Sync for SelectorInner {}
423 
424 impl SelectorInner {
new() -> io::Result<SelectorInner>425     pub fn new() -> io::Result<SelectorInner> {
426         CompletionPort::new(0).map(|cp| {
427             let cp = Arc::new(cp);
428             let cp_afd = Arc::clone(&cp);
429 
430             SelectorInner {
431                 cp,
432                 update_queue: Mutex::new(VecDeque::new()),
433                 afd_group: AfdGroup::new(cp_afd),
434                 is_polling: AtomicBool::new(false),
435             }
436         })
437     }
438 
439     /// # Safety
440     ///
441     /// May only be calling via `Selector::select`.
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>442     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
443         events.clear();
444 
445         if timeout.is_none() {
446             loop {
447                 let len = self.select2(&mut events.statuses, &mut events.events, None)?;
448                 if len == 0 {
449                     continue;
450                 }
451                 return Ok(());
452             }
453         } else {
454             self.select2(&mut events.statuses, &mut events.events, timeout)?;
455             return Ok(());
456         }
457     }
458 
select2( &self, statuses: &mut [CompletionStatus], events: &mut Vec<Event>, timeout: Option<Duration>, ) -> io::Result<usize>459     pub fn select2(
460         &self,
461         statuses: &mut [CompletionStatus],
462         events: &mut Vec<Event>,
463         timeout: Option<Duration>,
464     ) -> io::Result<usize> {
465         assert_eq!(self.is_polling.swap(true, Ordering::AcqRel), false);
466 
467         unsafe { self.update_sockets_events() }?;
468 
469         let result = self.cp.get_many(statuses, timeout);
470 
471         self.is_polling.store(false, Ordering::Relaxed);
472 
473         match result {
474             Ok(iocp_events) => Ok(unsafe { self.feed_events(events, iocp_events) }),
475             Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => Ok(0),
476             Err(e) => Err(e),
477         }
478     }
479 
update_sockets_events(&self) -> io::Result<()>480     unsafe fn update_sockets_events(&self) -> io::Result<()> {
481         let mut update_queue = self.update_queue.lock().unwrap();
482         for sock in update_queue.iter_mut() {
483             let mut sock_internal = sock.lock().unwrap();
484             if !sock_internal.is_pending_deletion() {
485                 sock_internal.update(&sock)?;
486             }
487         }
488 
489         // remove all sock which do not have error, they have afd op pending
490         update_queue.retain(|sock| sock.lock().unwrap().has_error());
491 
492         self.afd_group.release_unused_afd();
493         Ok(())
494     }
495 
496     // It returns processed count of iocp_events rather than the events itself.
feed_events( &self, events: &mut Vec<Event>, iocp_events: &[CompletionStatus], ) -> usize497     unsafe fn feed_events(
498         &self,
499         events: &mut Vec<Event>,
500         iocp_events: &[CompletionStatus],
501     ) -> usize {
502         let mut n = 0;
503         let mut update_queue = self.update_queue.lock().unwrap();
504         for iocp_event in iocp_events.iter() {
505             if iocp_event.overlapped().is_null() {
506                 events.push(Event::from_completion_status(iocp_event));
507                 n += 1;
508                 continue;
509             } else if iocp_event.token() % 2 == 1 {
510                 // Handle is a named pipe. This could be extended to be any non-AFD event.
511                 let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback;
512 
513                 let len = events.len();
514                 callback(iocp_event.entry(), Some(events));
515                 n += events.len() - len;
516                 continue;
517             }
518 
519             let sock_state = from_overlapped(iocp_event.overlapped());
520             let mut sock_guard = sock_state.lock().unwrap();
521             match sock_guard.feed_event() {
522                 Some(e) => {
523                     events.push(e);
524                     n += 1;
525                 }
526                 None => {}
527             }
528 
529             if !sock_guard.is_pending_deletion() {
530                 update_queue.push_back(sock_state.clone());
531             }
532         }
533         self.afd_group.release_unused_afd();
534         n
535     }
536 }
537 
538 cfg_io_source! {
539     use std::mem::size_of;
540     use std::ptr::null_mut;
541     use winapi::um::mswsock;
542     use winapi::um::winsock2::WSAGetLastError;
543     use winapi::um::winsock2::{WSAIoctl, SOCKET_ERROR};
544 
545     impl SelectorInner {
546         fn register(
547             this: &Arc<Self>,
548             socket: RawSocket,
549             token: Token,
550             interests: Interest,
551         ) -> io::Result<InternalState> {
552             let flags = interests_to_afd_flags(interests);
553 
554             let sock = {
555                 let sock = this._alloc_sock_for_rawsocket(socket)?;
556                 let event = Event {
557                     flags,
558                     data: token.0 as u64,
559                 };
560                 sock.lock().unwrap().set_event(event);
561                 sock
562             };
563 
564             let state = InternalState {
565                 selector: this.clone(),
566                 token,
567                 interests,
568                 sock_state: sock.clone(),
569             };
570 
571             this.queue_state(sock);
572             unsafe { this.update_sockets_events_if_polling()? };
573 
574             Ok(state)
575         }
576 
577         // Directly accessed in `IoSourceState::do_io`.
578         pub(super) fn reregister(
579             &self,
580             state: Pin<Arc<Mutex<SockState>>>,
581             token: Token,
582             interests: Interest,
583         ) -> io::Result<()> {
584             {
585                 let event = Event {
586                     flags: interests_to_afd_flags(interests),
587                     data: token.0 as u64,
588                 };
589 
590                 state.lock().unwrap().set_event(event);
591             }
592 
593             // FIXME: a sock which has_error true should not be re-added to
594             // the update queue because it's already there.
595             self.queue_state(state);
596             unsafe { self.update_sockets_events_if_polling() }
597         }
598 
599         /// This function is called by register() and reregister() to start an
600         /// IOCTL_AFD_POLL operation corresponding to the registered events, but
601         /// only if necessary.
602         ///
603         /// Since it is not possible to modify or synchronously cancel an AFD_POLL
604         /// operation, and there can be only one active AFD_POLL operation per
605         /// (socket, completion port) pair at any time, it is expensive to change
606         /// a socket's event registration after it has been submitted to the kernel.
607         ///
608         /// Therefore, if no other threads are polling when interest in a socket
609         /// event is (re)registered, the socket is added to the 'update queue', but
610         /// the actual syscall to start the IOCTL_AFD_POLL operation is deferred
611         /// until just before the GetQueuedCompletionStatusEx() syscall is made.
612         ///
613         /// However, when another thread is already blocked on
614         /// GetQueuedCompletionStatusEx() we tell the kernel about the registered
615         /// socket event(s) immediately.
616         unsafe fn update_sockets_events_if_polling(&self) -> io::Result<()> {
617             if self.is_polling.load(Ordering::Acquire) {
618                 self.update_sockets_events()
619             } else {
620                 Ok(())
621             }
622         }
623 
624         fn queue_state(&self, sock_state: Pin<Arc<Mutex<SockState>>>) {
625             let mut update_queue = self.update_queue.lock().unwrap();
626             update_queue.push_back(sock_state);
627         }
628 
629         fn _alloc_sock_for_rawsocket(
630             &self,
631             raw_socket: RawSocket,
632         ) -> io::Result<Pin<Arc<Mutex<SockState>>>> {
633             let afd = self.afd_group.acquire()?;
634             Ok(Arc::pin(Mutex::new(SockState::new(raw_socket, afd)?)))
635         }
636     }
637 
638     fn try_get_base_socket(raw_socket: RawSocket, ioctl: u32) -> Result<RawSocket, i32> {
639         let mut base_socket: RawSocket = 0;
640         let mut bytes: u32 = 0;
641         unsafe {
642             if WSAIoctl(
643                 raw_socket as usize,
644                 ioctl,
645                 null_mut(),
646                 0,
647                 &mut base_socket as *mut _ as PVOID,
648                 size_of::<RawSocket>() as u32,
649                 &mut bytes,
650                 null_mut(),
651                 None,
652             ) != SOCKET_ERROR
653             {
654                 Ok(base_socket)
655             } else {
656                 Err(WSAGetLastError())
657             }
658         }
659     }
660 
661     fn get_base_socket(raw_socket: RawSocket) -> io::Result<RawSocket> {
662         let res = try_get_base_socket(raw_socket, mswsock::SIO_BASE_HANDLE);
663         if let Ok(base_socket) = res {
664             return Ok(base_socket);
665         }
666 
667         // The `SIO_BASE_HANDLE` should not be intercepted by LSPs, therefore
668         // it should not fail as long as `raw_socket` is a valid socket. See
669         // https://docs.microsoft.com/en-us/windows/win32/winsock/winsock-ioctls.
670         // However, at least one known LSP deliberately breaks it, so we try
671         // some alternative IOCTLs, starting with the most appropriate one.
672         for &ioctl in &[
673             mswsock::SIO_BSP_HANDLE_SELECT,
674             mswsock::SIO_BSP_HANDLE_POLL,
675             mswsock::SIO_BSP_HANDLE,
676         ] {
677             if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) {
678                 // Since we know now that we're dealing with an LSP (otherwise
679                 // SIO_BASE_HANDLE would't have failed), only return any result
680                 // when it is different from the original `raw_socket`.
681                 if base_socket != raw_socket {
682                     return Ok(base_socket);
683                 }
684             }
685         }
686 
687         // If the alternative IOCTLs also failed, return the original error.
688         let os_error = res.unwrap_err();
689         let err = io::Error::from_raw_os_error(os_error);
690         Err(err)
691     }
692 }
693 
694 impl Drop for SelectorInner {
drop(&mut self)695     fn drop(&mut self) {
696         loop {
697             let events_num: usize;
698             let mut statuses: [CompletionStatus; 1024] = [CompletionStatus::zero(); 1024];
699 
700             let result = self
701                 .cp
702                 .get_many(&mut statuses, Some(std::time::Duration::from_millis(0)));
703             match result {
704                 Ok(iocp_events) => {
705                     events_num = iocp_events.iter().len();
706                     for iocp_event in iocp_events.iter() {
707                         if iocp_event.overlapped().is_null() {
708                             // Custom event
709                         } else if iocp_event.token() % 2 == 1 {
710                             // Named pipe, dispatch the event so it can release resources
711                             let callback = unsafe {
712                                 (*(iocp_event.overlapped() as *mut super::Overlapped)).callback
713                             };
714 
715                             callback(iocp_event.entry(), None);
716                         } else {
717                             // drain sock state to release memory of Arc reference
718                             let _sock_state = from_overlapped(iocp_event.overlapped());
719                         }
720                     }
721                 }
722 
723                 Err(_) => {
724                     break;
725                 }
726             }
727 
728             if events_num == 0 {
729                 // continue looping until all completion statuses have been drained
730                 break;
731             }
732         }
733 
734         self.afd_group.release_unused_afd();
735     }
736 }
737 
738 cfg_net! {
739     fn interests_to_afd_flags(interests: Interest) -> u32 {
740         let mut flags = 0;
741 
742         if interests.is_readable() {
743             flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS;
744         }
745 
746         if interests.is_writable() {
747             flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS;
748         }
749 
750         flags
751     }
752 }
753