1 use super::afd::{self, Afd, AfdPollInfo};
2 use super::io_status_block::IoStatusBlock;
3 use super::Event;
4 use crate::sys::Events;
5
6 cfg_net! {
7 use crate::sys::event::{
8 ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS,
9 };
10 use crate::Interest;
11 }
12
13 use miow::iocp::{CompletionPort, CompletionStatus};
14 use std::collections::VecDeque;
15 use std::io;
16 use std::marker::PhantomPinned;
17 use std::os::windows::io::RawSocket;
18 use std::pin::Pin;
19 #[cfg(debug_assertions)]
20 use std::sync::atomic::AtomicUsize;
21 use std::sync::atomic::{AtomicBool, Ordering};
22 use std::sync::{Arc, Mutex};
23 use std::time::Duration;
24 use winapi::shared::ntdef::NT_SUCCESS;
25 use winapi::shared::ntdef::{HANDLE, PVOID};
26 use winapi::shared::ntstatus::STATUS_CANCELLED;
27 use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT};
28 use winapi::um::minwinbase::OVERLAPPED;
29
30 #[derive(Debug)]
31 struct AfdGroup {
32 cp: Arc<CompletionPort>,
33 afd_group: Mutex<Vec<Arc<Afd>>>,
34 }
35
36 impl AfdGroup {
new(cp: Arc<CompletionPort>) -> AfdGroup37 pub fn new(cp: Arc<CompletionPort>) -> AfdGroup {
38 AfdGroup {
39 afd_group: Mutex::new(Vec::new()),
40 cp,
41 }
42 }
43
release_unused_afd(&self)44 pub fn release_unused_afd(&self) {
45 let mut afd_group = self.afd_group.lock().unwrap();
46 afd_group.retain(|g| Arc::strong_count(&g) > 1);
47 }
48 }
49
50 cfg_io_source! {
51 const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;
52
53 impl AfdGroup {
54 pub fn acquire(&self) -> io::Result<Arc<Afd>> {
55 let mut afd_group = self.afd_group.lock().unwrap();
56 if afd_group.len() == 0 {
57 self._alloc_afd_group(&mut afd_group)?;
58 } else {
59 // + 1 reference in Vec
60 if Arc::strong_count(afd_group.last().unwrap()) >= POLL_GROUP__MAX_GROUP_SIZE + 1 {
61 self._alloc_afd_group(&mut afd_group)?;
62 }
63 }
64
65 match afd_group.last() {
66 Some(arc) => Ok(arc.clone()),
67 None => unreachable!(
68 "Cannot acquire afd, {:#?}, afd_group: {:#?}",
69 self, afd_group
70 ),
71 }
72 }
73
74 fn _alloc_afd_group(&self, afd_group: &mut Vec<Arc<Afd>>) -> io::Result<()> {
75 let afd = Afd::new(&self.cp)?;
76 let arc = Arc::new(afd);
77 afd_group.push(arc);
78 Ok(())
79 }
80 }
81 }
82
83 #[derive(Debug)]
84 enum SockPollStatus {
85 Idle,
86 Pending,
87 Cancelled,
88 }
89
90 #[derive(Debug)]
91 pub struct SockState {
92 iosb: IoStatusBlock,
93 poll_info: AfdPollInfo,
94 afd: Arc<Afd>,
95
96 raw_socket: RawSocket,
97 base_socket: RawSocket,
98
99 user_evts: u32,
100 pending_evts: u32,
101
102 user_data: u64,
103
104 poll_status: SockPollStatus,
105 delete_pending: bool,
106
107 // last raw os error
108 error: Option<i32>,
109
110 pinned: PhantomPinned,
111 }
112
113 impl SockState {
update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()>114 fn update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()> {
115 assert!(!self.delete_pending);
116
117 // make sure to reset previous error before a new update
118 self.error = None;
119
120 if let SockPollStatus::Pending = self.poll_status {
121 if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 {
122 /* All the events the user is interested in are already being monitored by
123 * the pending poll operation. It might spuriously complete because of an
124 * event that we're no longer interested in; when that happens we'll submit
125 * a new poll operation with the updated event mask. */
126 } else {
127 /* A poll operation is already pending, but it's not monitoring for all the
128 * events that the user is interested in. Therefore, cancel the pending
129 * poll operation; when we receive it's completion package, a new poll
130 * operation will be submitted with the correct event mask. */
131 if let Err(e) = self.cancel() {
132 self.error = e.raw_os_error();
133 return Err(e);
134 }
135 return Ok(());
136 }
137 } else if let SockPollStatus::Cancelled = self.poll_status {
138 /* The poll operation has already been cancelled, we're still waiting for
139 * it to return. For now, there's nothing that needs to be done. */
140 } else if let SockPollStatus::Idle = self.poll_status {
141 /* No poll operation is pending; start one. */
142 self.poll_info.exclusive = 0;
143 self.poll_info.number_of_handles = 1;
144 *unsafe { self.poll_info.timeout.QuadPart_mut() } = std::i64::MAX;
145 self.poll_info.handles[0].handle = self.base_socket as HANDLE;
146 self.poll_info.handles[0].status = 0;
147 self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;
148
149 // Increase the ref count as the memory will be used by the kernel.
150 let overlapped_ptr = into_overlapped(self_arc.clone());
151
152 let result = unsafe {
153 self.afd
154 .poll(&mut self.poll_info, &mut *self.iosb, overlapped_ptr)
155 };
156 if let Err(e) = result {
157 let code = e.raw_os_error().unwrap();
158 if code == ERROR_IO_PENDING as i32 {
159 /* Overlapped poll operation in progress; this is expected. */
160 } else {
161 // Since the operation failed it means the kernel won't be
162 // using the memory any more.
163 drop(from_overlapped(overlapped_ptr as *mut _));
164 if code == ERROR_INVALID_HANDLE as i32 {
165 /* Socket closed; it'll be dropped. */
166 self.mark_delete();
167 return Ok(());
168 } else {
169 self.error = e.raw_os_error();
170 return Err(e);
171 }
172 }
173 }
174
175 self.poll_status = SockPollStatus::Pending;
176 self.pending_evts = self.user_evts;
177 } else {
178 unreachable!("Invalid poll status during update, {:#?}", self)
179 }
180
181 Ok(())
182 }
183
cancel(&mut self) -> io::Result<()>184 fn cancel(&mut self) -> io::Result<()> {
185 match self.poll_status {
186 SockPollStatus::Pending => {}
187 _ => unreachable!("Invalid poll status during cancel, {:#?}", self),
188 };
189 unsafe {
190 self.afd.cancel(&mut *self.iosb)?;
191 }
192 self.poll_status = SockPollStatus::Cancelled;
193 self.pending_evts = 0;
194 Ok(())
195 }
196
197 // This is the function called from the overlapped using as Arc<Mutex<SockState>>. Watch out for reference counting.
feed_event(&mut self) -> Option<Event>198 fn feed_event(&mut self) -> Option<Event> {
199 self.poll_status = SockPollStatus::Idle;
200 self.pending_evts = 0;
201
202 let mut afd_events = 0;
203 // We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is unsafe to use a pointer of IO_STATUS_BLOCK.
204 unsafe {
205 if self.delete_pending {
206 return None;
207 } else if self.iosb.u.Status == STATUS_CANCELLED {
208 /* The poll request was cancelled by CancelIoEx. */
209 } else if !NT_SUCCESS(self.iosb.u.Status) {
210 /* The overlapped request itself failed in an unexpected way. */
211 afd_events = afd::POLL_CONNECT_FAIL;
212 } else if self.poll_info.number_of_handles < 1 {
213 /* This poll operation succeeded but didn't report any socket events. */
214 } else if self.poll_info.handles[0].events & afd::POLL_LOCAL_CLOSE != 0 {
215 /* The poll operation reported that the socket was closed. */
216 self.mark_delete();
217 return None;
218 } else {
219 afd_events = self.poll_info.handles[0].events;
220 }
221 }
222
223 afd_events &= self.user_evts;
224
225 if afd_events == 0 {
226 return None;
227 }
228
229 // In mio, we have to simulate Edge-triggered behavior to match API usage.
230 // The strategy here is to intercept all read/write from user that could cause WouldBlock usage,
231 // then reregister the socket to reset the interests.
232 self.user_evts &= !afd_events;
233
234 Some(Event {
235 data: self.user_data,
236 flags: afd_events,
237 })
238 }
239
is_pending_deletion(&self) -> bool240 pub fn is_pending_deletion(&self) -> bool {
241 self.delete_pending
242 }
243
mark_delete(&mut self)244 pub fn mark_delete(&mut self) {
245 if !self.delete_pending {
246 if let SockPollStatus::Pending = self.poll_status {
247 drop(self.cancel());
248 }
249
250 self.delete_pending = true;
251 }
252 }
253
has_error(&self) -> bool254 fn has_error(&self) -> bool {
255 self.error.is_some()
256 }
257 }
258
259 cfg_io_source! {
260 impl SockState {
261 fn new(raw_socket: RawSocket, afd: Arc<Afd>) -> io::Result<SockState> {
262 Ok(SockState {
263 iosb: IoStatusBlock::zeroed(),
264 poll_info: AfdPollInfo::zeroed(),
265 afd,
266 raw_socket,
267 base_socket: get_base_socket(raw_socket)?,
268 user_evts: 0,
269 pending_evts: 0,
270 user_data: 0,
271 poll_status: SockPollStatus::Idle,
272 delete_pending: false,
273 error: None,
274 pinned: PhantomPinned,
275 })
276 }
277
278 /// True if need to be added on update queue, false otherwise.
279 fn set_event(&mut self, ev: Event) -> bool {
280 /* afd::POLL_CONNECT_FAIL and afd::POLL_ABORT are always reported, even when not requested by the caller. */
281 let events = ev.flags | afd::POLL_CONNECT_FAIL | afd::POLL_ABORT;
282
283 self.user_evts = events;
284 self.user_data = ev.data;
285
286 (events & !self.pending_evts) != 0
287 }
288 }
289 }
290
291 impl Drop for SockState {
drop(&mut self)292 fn drop(&mut self) {
293 self.mark_delete();
294 }
295 }
296
297 /// Converts the pointer to a `SockState` into a raw pointer.
298 /// To revert see `from_overlapped`.
into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> PVOID299 fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> PVOID {
300 let overlapped_ptr: *const Mutex<SockState> =
301 unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) };
302 overlapped_ptr as *mut _
303 }
304
305 /// Convert a raw overlapped pointer into a reference to `SockState`.
306 /// Reverts `into_overlapped`.
from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>>307 fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>> {
308 let sock_ptr: *const Mutex<SockState> = ptr as *const _;
309 unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) }
310 }
311
312 /// Each Selector has a globally unique(ish) ID associated with it. This ID
313 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
314 /// registered with the `Selector`. If a type that is previously associated with
315 /// a `Selector` attempts to register itself with a different `Selector`, the
316 /// operation will return with an error. This matches windows behavior.
317 #[cfg(debug_assertions)]
318 static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
319
320 /// Windows implementaion of `sys::Selector`
321 ///
322 /// Edge-triggered event notification is simulated by resetting internal event flag of each socket state `SockState`
323 /// and setting all events back by intercepting all requests that could cause `io::ErrorKind::WouldBlock` happening.
324 ///
325 /// This selector is currently only support socket due to `Afd` driver is winsock2 specific.
326 #[derive(Debug)]
327 pub struct Selector {
328 #[cfg(debug_assertions)]
329 id: usize,
330 pub(super) inner: Arc<SelectorInner>,
331 #[cfg(debug_assertions)]
332 has_waker: AtomicBool,
333 }
334
335 impl Selector {
new() -> io::Result<Selector>336 pub fn new() -> io::Result<Selector> {
337 SelectorInner::new().map(|inner| {
338 #[cfg(debug_assertions)]
339 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
340 Selector {
341 #[cfg(debug_assertions)]
342 id,
343 inner: Arc::new(inner),
344 #[cfg(debug_assertions)]
345 has_waker: AtomicBool::new(false),
346 }
347 })
348 }
349
try_clone(&self) -> io::Result<Selector>350 pub fn try_clone(&self) -> io::Result<Selector> {
351 Ok(Selector {
352 #[cfg(debug_assertions)]
353 id: self.id,
354 inner: Arc::clone(&self.inner),
355 #[cfg(debug_assertions)]
356 has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
357 })
358 }
359
360 /// # Safety
361 ///
362 /// This requires a mutable reference to self because only a single thread
363 /// can poll IOCP at a time.
select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>364 pub fn select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
365 self.inner.select(events, timeout)
366 }
367
368 #[cfg(debug_assertions)]
register_waker(&self) -> bool369 pub fn register_waker(&self) -> bool {
370 self.has_waker.swap(true, Ordering::AcqRel)
371 }
372
clone_port(&self) -> Arc<CompletionPort>373 pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
374 self.inner.cp.clone()
375 }
376
377 #[cfg(feature = "os-ext")]
same_port(&self, other: &Arc<CompletionPort>) -> bool378 pub(super) fn same_port(&self, other: &Arc<CompletionPort>) -> bool {
379 Arc::ptr_eq(&self.inner.cp, other)
380 }
381 }
382
383 cfg_io_source! {
384 use super::InternalState;
385 use crate::Token;
386
387 impl Selector {
388 pub(super) fn register(
389 &self,
390 socket: RawSocket,
391 token: Token,
392 interests: Interest,
393 ) -> io::Result<InternalState> {
394 SelectorInner::register(&self.inner, socket, token, interests)
395 }
396
397 pub(super) fn reregister(
398 &self,
399 state: Pin<Arc<Mutex<SockState>>>,
400 token: Token,
401 interests: Interest,
402 ) -> io::Result<()> {
403 self.inner.reregister(state, token, interests)
404 }
405
406 #[cfg(debug_assertions)]
407 pub fn id(&self) -> usize {
408 self.id
409 }
410 }
411 }
412
413 #[derive(Debug)]
414 pub struct SelectorInner {
415 pub(super) cp: Arc<CompletionPort>,
416 update_queue: Mutex<VecDeque<Pin<Arc<Mutex<SockState>>>>>,
417 afd_group: AfdGroup,
418 is_polling: AtomicBool,
419 }
420
421 // We have ensured thread safety by introducing lock manually.
422 unsafe impl Sync for SelectorInner {}
423
424 impl SelectorInner {
new() -> io::Result<SelectorInner>425 pub fn new() -> io::Result<SelectorInner> {
426 CompletionPort::new(0).map(|cp| {
427 let cp = Arc::new(cp);
428 let cp_afd = Arc::clone(&cp);
429
430 SelectorInner {
431 cp,
432 update_queue: Mutex::new(VecDeque::new()),
433 afd_group: AfdGroup::new(cp_afd),
434 is_polling: AtomicBool::new(false),
435 }
436 })
437 }
438
439 /// # Safety
440 ///
441 /// May only be calling via `Selector::select`.
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>442 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
443 events.clear();
444
445 if timeout.is_none() {
446 loop {
447 let len = self.select2(&mut events.statuses, &mut events.events, None)?;
448 if len == 0 {
449 continue;
450 }
451 return Ok(());
452 }
453 } else {
454 self.select2(&mut events.statuses, &mut events.events, timeout)?;
455 return Ok(());
456 }
457 }
458
select2( &self, statuses: &mut [CompletionStatus], events: &mut Vec<Event>, timeout: Option<Duration>, ) -> io::Result<usize>459 pub fn select2(
460 &self,
461 statuses: &mut [CompletionStatus],
462 events: &mut Vec<Event>,
463 timeout: Option<Duration>,
464 ) -> io::Result<usize> {
465 assert_eq!(self.is_polling.swap(true, Ordering::AcqRel), false);
466
467 unsafe { self.update_sockets_events() }?;
468
469 let result = self.cp.get_many(statuses, timeout);
470
471 self.is_polling.store(false, Ordering::Relaxed);
472
473 match result {
474 Ok(iocp_events) => Ok(unsafe { self.feed_events(events, iocp_events) }),
475 Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => Ok(0),
476 Err(e) => Err(e),
477 }
478 }
479
update_sockets_events(&self) -> io::Result<()>480 unsafe fn update_sockets_events(&self) -> io::Result<()> {
481 let mut update_queue = self.update_queue.lock().unwrap();
482 for sock in update_queue.iter_mut() {
483 let mut sock_internal = sock.lock().unwrap();
484 if !sock_internal.is_pending_deletion() {
485 sock_internal.update(&sock)?;
486 }
487 }
488
489 // remove all sock which do not have error, they have afd op pending
490 update_queue.retain(|sock| sock.lock().unwrap().has_error());
491
492 self.afd_group.release_unused_afd();
493 Ok(())
494 }
495
496 // It returns processed count of iocp_events rather than the events itself.
feed_events( &self, events: &mut Vec<Event>, iocp_events: &[CompletionStatus], ) -> usize497 unsafe fn feed_events(
498 &self,
499 events: &mut Vec<Event>,
500 iocp_events: &[CompletionStatus],
501 ) -> usize {
502 let mut n = 0;
503 let mut update_queue = self.update_queue.lock().unwrap();
504 for iocp_event in iocp_events.iter() {
505 if iocp_event.overlapped().is_null() {
506 events.push(Event::from_completion_status(iocp_event));
507 n += 1;
508 continue;
509 } else if iocp_event.token() % 2 == 1 {
510 // Handle is a named pipe. This could be extended to be any non-AFD event.
511 let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback;
512
513 let len = events.len();
514 callback(iocp_event.entry(), Some(events));
515 n += events.len() - len;
516 continue;
517 }
518
519 let sock_state = from_overlapped(iocp_event.overlapped());
520 let mut sock_guard = sock_state.lock().unwrap();
521 match sock_guard.feed_event() {
522 Some(e) => {
523 events.push(e);
524 n += 1;
525 }
526 None => {}
527 }
528
529 if !sock_guard.is_pending_deletion() {
530 update_queue.push_back(sock_state.clone());
531 }
532 }
533 self.afd_group.release_unused_afd();
534 n
535 }
536 }
537
538 cfg_io_source! {
539 use std::mem::size_of;
540 use std::ptr::null_mut;
541 use winapi::um::mswsock;
542 use winapi::um::winsock2::WSAGetLastError;
543 use winapi::um::winsock2::{WSAIoctl, SOCKET_ERROR};
544
545 impl SelectorInner {
546 fn register(
547 this: &Arc<Self>,
548 socket: RawSocket,
549 token: Token,
550 interests: Interest,
551 ) -> io::Result<InternalState> {
552 let flags = interests_to_afd_flags(interests);
553
554 let sock = {
555 let sock = this._alloc_sock_for_rawsocket(socket)?;
556 let event = Event {
557 flags,
558 data: token.0 as u64,
559 };
560 sock.lock().unwrap().set_event(event);
561 sock
562 };
563
564 let state = InternalState {
565 selector: this.clone(),
566 token,
567 interests,
568 sock_state: sock.clone(),
569 };
570
571 this.queue_state(sock);
572 unsafe { this.update_sockets_events_if_polling()? };
573
574 Ok(state)
575 }
576
577 // Directly accessed in `IoSourceState::do_io`.
578 pub(super) fn reregister(
579 &self,
580 state: Pin<Arc<Mutex<SockState>>>,
581 token: Token,
582 interests: Interest,
583 ) -> io::Result<()> {
584 {
585 let event = Event {
586 flags: interests_to_afd_flags(interests),
587 data: token.0 as u64,
588 };
589
590 state.lock().unwrap().set_event(event);
591 }
592
593 // FIXME: a sock which has_error true should not be re-added to
594 // the update queue because it's already there.
595 self.queue_state(state);
596 unsafe { self.update_sockets_events_if_polling() }
597 }
598
599 /// This function is called by register() and reregister() to start an
600 /// IOCTL_AFD_POLL operation corresponding to the registered events, but
601 /// only if necessary.
602 ///
603 /// Since it is not possible to modify or synchronously cancel an AFD_POLL
604 /// operation, and there can be only one active AFD_POLL operation per
605 /// (socket, completion port) pair at any time, it is expensive to change
606 /// a socket's event registration after it has been submitted to the kernel.
607 ///
608 /// Therefore, if no other threads are polling when interest in a socket
609 /// event is (re)registered, the socket is added to the 'update queue', but
610 /// the actual syscall to start the IOCTL_AFD_POLL operation is deferred
611 /// until just before the GetQueuedCompletionStatusEx() syscall is made.
612 ///
613 /// However, when another thread is already blocked on
614 /// GetQueuedCompletionStatusEx() we tell the kernel about the registered
615 /// socket event(s) immediately.
616 unsafe fn update_sockets_events_if_polling(&self) -> io::Result<()> {
617 if self.is_polling.load(Ordering::Acquire) {
618 self.update_sockets_events()
619 } else {
620 Ok(())
621 }
622 }
623
624 fn queue_state(&self, sock_state: Pin<Arc<Mutex<SockState>>>) {
625 let mut update_queue = self.update_queue.lock().unwrap();
626 update_queue.push_back(sock_state);
627 }
628
629 fn _alloc_sock_for_rawsocket(
630 &self,
631 raw_socket: RawSocket,
632 ) -> io::Result<Pin<Arc<Mutex<SockState>>>> {
633 let afd = self.afd_group.acquire()?;
634 Ok(Arc::pin(Mutex::new(SockState::new(raw_socket, afd)?)))
635 }
636 }
637
638 fn try_get_base_socket(raw_socket: RawSocket, ioctl: u32) -> Result<RawSocket, i32> {
639 let mut base_socket: RawSocket = 0;
640 let mut bytes: u32 = 0;
641 unsafe {
642 if WSAIoctl(
643 raw_socket as usize,
644 ioctl,
645 null_mut(),
646 0,
647 &mut base_socket as *mut _ as PVOID,
648 size_of::<RawSocket>() as u32,
649 &mut bytes,
650 null_mut(),
651 None,
652 ) != SOCKET_ERROR
653 {
654 Ok(base_socket)
655 } else {
656 Err(WSAGetLastError())
657 }
658 }
659 }
660
661 fn get_base_socket(raw_socket: RawSocket) -> io::Result<RawSocket> {
662 let res = try_get_base_socket(raw_socket, mswsock::SIO_BASE_HANDLE);
663 if let Ok(base_socket) = res {
664 return Ok(base_socket);
665 }
666
667 // The `SIO_BASE_HANDLE` should not be intercepted by LSPs, therefore
668 // it should not fail as long as `raw_socket` is a valid socket. See
669 // https://docs.microsoft.com/en-us/windows/win32/winsock/winsock-ioctls.
670 // However, at least one known LSP deliberately breaks it, so we try
671 // some alternative IOCTLs, starting with the most appropriate one.
672 for &ioctl in &[
673 mswsock::SIO_BSP_HANDLE_SELECT,
674 mswsock::SIO_BSP_HANDLE_POLL,
675 mswsock::SIO_BSP_HANDLE,
676 ] {
677 if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) {
678 // Since we know now that we're dealing with an LSP (otherwise
679 // SIO_BASE_HANDLE would't have failed), only return any result
680 // when it is different from the original `raw_socket`.
681 if base_socket != raw_socket {
682 return Ok(base_socket);
683 }
684 }
685 }
686
687 // If the alternative IOCTLs also failed, return the original error.
688 let os_error = res.unwrap_err();
689 let err = io::Error::from_raw_os_error(os_error);
690 Err(err)
691 }
692 }
693
694 impl Drop for SelectorInner {
drop(&mut self)695 fn drop(&mut self) {
696 loop {
697 let events_num: usize;
698 let mut statuses: [CompletionStatus; 1024] = [CompletionStatus::zero(); 1024];
699
700 let result = self
701 .cp
702 .get_many(&mut statuses, Some(std::time::Duration::from_millis(0)));
703 match result {
704 Ok(iocp_events) => {
705 events_num = iocp_events.iter().len();
706 for iocp_event in iocp_events.iter() {
707 if iocp_event.overlapped().is_null() {
708 // Custom event
709 } else if iocp_event.token() % 2 == 1 {
710 // Named pipe, dispatch the event so it can release resources
711 let callback = unsafe {
712 (*(iocp_event.overlapped() as *mut super::Overlapped)).callback
713 };
714
715 callback(iocp_event.entry(), None);
716 } else {
717 // drain sock state to release memory of Arc reference
718 let _sock_state = from_overlapped(iocp_event.overlapped());
719 }
720 }
721 }
722
723 Err(_) => {
724 break;
725 }
726 }
727
728 if events_num == 0 {
729 // continue looping until all completion statuses have been drained
730 break;
731 }
732 }
733
734 self.afd_group.release_unused_afd();
735 }
736 }
737
738 cfg_net! {
739 fn interests_to_afd_flags(interests: Interest) -> u32 {
740 let mut flags = 0;
741
742 if interests.is_readable() {
743 flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS;
744 }
745
746 if interests.is_writable() {
747 flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS;
748 }
749
750 flags
751 }
752 }
753