1 use std::fmt;
2 use std::io::{self, Read, ErrorKind};
3 use std::mem;
4 use std::net::{self, SocketAddr, Shutdown};
5 use std::os::windows::prelude::*;
6 use std::sync::{Mutex, MutexGuard};
7 use std::time::Duration;
8 
9 use miow::iocp::CompletionStatus;
10 use miow::net::*;
11 use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt};
12 use winapi::um::minwinbase::OVERLAPPED_ENTRY;
13 use winapi::um::winnt::HANDLE;
14 use iovec::IoVec;
15 
16 use {poll, Ready, Poll, PollOpt, Token};
17 use event::Evented;
18 use sys::windows::from_raw_arc::FromRawArc;
19 use sys::windows::selector::{Overlapped, ReadyBinding};
20 use sys::windows::Family;
21 
22 pub struct TcpStream {
23     /// Separately stored implementation to ensure that the `Drop`
24     /// implementation on this type is only executed when it's actually dropped
25     /// (many clones of this `imp` are made).
26     imp: StreamImp,
27     registration: Mutex<Option<poll::Registration>>,
28 }
29 
30 pub struct TcpListener {
31     imp: ListenerImp,
32     registration: Mutex<Option<poll::Registration>>,
33 }
34 
35 #[derive(Clone)]
36 struct StreamImp {
37     /// A stable address and synchronized access for all internals. This serves
38     /// to ensure that all `Overlapped` pointers are valid for a long period of
39     /// time as well as allowing completion callbacks to have access to the
40     /// internals without having ownership.
41     ///
42     /// Note that the reference count also allows us "loan out" copies to
43     /// completion ports while I/O is running to guarantee that this stays alive
44     /// until the I/O completes. You'll notice a number of calls to
45     /// `mem::forget` below, and these only happen on successful scheduling of
46     /// I/O and are paired with `overlapped2arc!` macro invocations in the
47     /// completion callbacks (to have a decrement match the increment).
48     inner: FromRawArc<StreamIo>,
49 }
50 
51 #[derive(Clone)]
52 struct ListenerImp {
53     inner: FromRawArc<ListenerIo>,
54 }
55 
56 struct StreamIo {
57     inner: Mutex<StreamInner>,
58     read: Overlapped, // also used for connect
59     write: Overlapped,
60     socket: net::TcpStream,
61 }
62 
63 struct ListenerIo {
64     inner: Mutex<ListenerInner>,
65     accept: Overlapped,
66     family: Family,
67     socket: net::TcpListener,
68 }
69 
70 struct StreamInner {
71     iocp: ReadyBinding,
72     deferred_connect: Option<SocketAddr>,
73     read: State<(), ()>,
74     write: State<(Vec<u8>, usize), (Vec<u8>, usize)>,
75     /// whether we are instantly notified of success
76     /// (FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
77     ///  without a roundtrip through the event loop)
78     instant_notify: bool,
79 }
80 
81 struct ListenerInner {
82     iocp: ReadyBinding,
83     accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>,
84     accept_buf: AcceptAddrsBuf,
85 }
86 
87 enum State<T, U> {
88     Empty,              // no I/O operation in progress
89     Pending(T),         // an I/O operation is in progress
90     Ready(U),           // I/O has finished with this value
91     Error(io::Error),   // there was an I/O error
92 }
93 
94 impl TcpStream {
new(socket: net::TcpStream, deferred_connect: Option<SocketAddr>) -> TcpStream95     fn new(socket: net::TcpStream,
96            deferred_connect: Option<SocketAddr>) -> TcpStream {
97         TcpStream {
98             registration: Mutex::new(None),
99             imp: StreamImp {
100                 inner: FromRawArc::new(StreamIo {
101                     read: Overlapped::new(read_done),
102                     write: Overlapped::new(write_done),
103                     socket: socket,
104                     inner: Mutex::new(StreamInner {
105                         iocp: ReadyBinding::new(),
106                         deferred_connect: deferred_connect,
107                         read: State::Empty,
108                         write: State::Empty,
109                         instant_notify: false,
110                     }),
111                 }),
112             },
113         }
114     }
115 
connect(socket: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream>116     pub fn connect(socket: net::TcpStream, addr: &SocketAddr)
117                    -> io::Result<TcpStream> {
118         socket.set_nonblocking(true)?;
119         Ok(TcpStream::new(socket, Some(*addr)))
120     }
121 
from_stream(stream: net::TcpStream) -> TcpStream122     pub fn from_stream(stream: net::TcpStream) -> TcpStream {
123         TcpStream::new(stream, None)
124     }
125 
peer_addr(&self) -> io::Result<SocketAddr>126     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
127         self.imp.inner.socket.peer_addr()
128     }
129 
local_addr(&self) -> io::Result<SocketAddr>130     pub fn local_addr(&self) -> io::Result<SocketAddr> {
131         self.imp.inner.socket.local_addr()
132     }
133 
try_clone(&self) -> io::Result<TcpStream>134     pub fn try_clone(&self) -> io::Result<TcpStream> {
135         self.imp.inner.socket.try_clone().map(|s| TcpStream::new(s, None))
136     }
137 
shutdown(&self, how: Shutdown) -> io::Result<()>138     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
139         self.imp.inner.socket.shutdown(how)
140     }
141 
set_nodelay(&self, nodelay: bool) -> io::Result<()>142     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
143         self.imp.inner.socket.set_nodelay(nodelay)
144     }
145 
nodelay(&self) -> io::Result<bool>146     pub fn nodelay(&self) -> io::Result<bool> {
147         self.imp.inner.socket.nodelay()
148     }
149 
set_recv_buffer_size(&self, size: usize) -> io::Result<()>150     pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
151         self.imp.inner.socket.set_recv_buffer_size(size)
152     }
153 
recv_buffer_size(&self) -> io::Result<usize>154     pub fn recv_buffer_size(&self) -> io::Result<usize> {
155         self.imp.inner.socket.recv_buffer_size()
156     }
157 
set_send_buffer_size(&self, size: usize) -> io::Result<()>158     pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
159         self.imp.inner.socket.set_send_buffer_size(size)
160     }
161 
send_buffer_size(&self) -> io::Result<usize>162     pub fn send_buffer_size(&self) -> io::Result<usize> {
163         self.imp.inner.socket.send_buffer_size()
164     }
165 
set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()>166     pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
167         self.imp.inner.socket.set_keepalive(keepalive)
168     }
169 
keepalive(&self) -> io::Result<Option<Duration>>170     pub fn keepalive(&self) -> io::Result<Option<Duration>> {
171         self.imp.inner.socket.keepalive()
172     }
173 
set_ttl(&self, ttl: u32) -> io::Result<()>174     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
175         self.imp.inner.socket.set_ttl(ttl)
176     }
177 
ttl(&self) -> io::Result<u32>178     pub fn ttl(&self) -> io::Result<u32> {
179         self.imp.inner.socket.ttl()
180     }
181 
set_only_v6(&self, only_v6: bool) -> io::Result<()>182     pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
183         self.imp.inner.socket.set_only_v6(only_v6)
184     }
185 
only_v6(&self) -> io::Result<bool>186     pub fn only_v6(&self) -> io::Result<bool> {
187         self.imp.inner.socket.only_v6()
188     }
189 
set_linger(&self, dur: Option<Duration>) -> io::Result<()>190     pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
191         self.imp.inner.socket.set_linger(dur)
192     }
193 
linger(&self) -> io::Result<Option<Duration>>194     pub fn linger(&self) -> io::Result<Option<Duration>> {
195         self.imp.inner.socket.linger()
196     }
197 
take_error(&self) -> io::Result<Option<io::Error>>198     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
199         if let Some(e) = self.imp.inner.socket.take_error()? {
200             return Ok(Some(e))
201         }
202 
203         // If the syscall didn't return anything then also check to see if we've
204         // squirreled away an error elsewhere for example as part of a connect
205         // operation.
206         //
207         // Typically this is used like so:
208         //
209         // 1. A `connect` is issued
210         // 2. Wait for the socket to be writable
211         // 3. Call `take_error` to see if the connect succeeded.
212         //
213         // Right now the `connect` operation finishes in `read_done` below and
214         // fill will in `State::Error` in the `read` slot if it fails, so we
215         // extract that here.
216         let mut me = self.inner();
217         match mem::replace(&mut me.read, State::Empty) {
218             State::Error(e) => {
219                 self.imp.schedule_read(&mut me);
220                 Ok(Some(e))
221             }
222             other => {
223                 me.read = other;
224                 Ok(None)
225             }
226         }
227     }
228 
inner(&self) -> MutexGuard<StreamInner>229     fn inner(&self) -> MutexGuard<StreamInner> {
230         self.imp.inner()
231     }
232 
before_read(&self) -> io::Result<MutexGuard<StreamInner>>233     fn before_read(&self) -> io::Result<MutexGuard<StreamInner>> {
234         let mut me = self.inner();
235 
236         match me.read {
237             // Empty == we're not associated yet, and if we're pending then
238             // these are both cases where we return "would block"
239             State::Empty |
240             State::Pending(()) => return Err(io::ErrorKind::WouldBlock.into()),
241 
242             // If we got a delayed error as part of a `read_overlapped` below,
243             // return that here. Also schedule another read in case it was
244             // transient.
245             State::Error(_) => {
246                 let e = match mem::replace(&mut me.read, State::Empty) {
247                     State::Error(e) => e,
248                     _ => panic!(),
249                 };
250                 self.imp.schedule_read(&mut me);
251                 return Err(e)
252             }
253 
254             // If we're ready for a read then some previous 0-byte read has
255             // completed. In that case the OS's socket buffer has something for
256             // us, so we just keep pulling out bytes while we can in the loop
257             // below.
258             State::Ready(()) => {}
259         }
260 
261         Ok(me)
262     }
263 
post_register(&self, interest: Ready, me: &mut StreamInner)264     fn post_register(&self, interest: Ready, me: &mut StreamInner) {
265         if interest.is_readable() {
266             self.imp.schedule_read(me);
267         }
268 
269         // At least with epoll, if a socket is registered with an interest in
270         // writing and it's immediately writable then a writable event is
271         // generated immediately, so do so here.
272         if interest.is_writable() {
273             if let State::Empty = me.write {
274                 self.imp.add_readiness(me, Ready::writable());
275             }
276         }
277     }
278 
read(&self, buf: &mut [u8]) -> io::Result<usize>279     pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
280         match IoVec::from_bytes_mut(buf) {
281             Some(vec) => self.readv(&mut [vec]),
282             None => Ok(0),
283         }
284     }
285 
peek(&self, buf: &mut [u8]) -> io::Result<usize>286     pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
287         let mut me = self.before_read()?;
288 
289         match (&self.imp.inner.socket).peek(buf) {
290             Ok(n) => Ok(n),
291             Err(e) => {
292                 me.read = State::Empty;
293                 self.imp.schedule_read(&mut me);
294                 Err(e)
295             }
296         }
297     }
298 
readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize>299     pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
300         let mut me = self.before_read()?;
301 
302         // TODO: Does WSARecv work on a nonblocking sockets? We ideally want to
303         //       call that instead of looping over all the buffers and calling
304         //       `recv` on each buffer. I'm not sure though if an overlapped
305         //       socket in nonblocking mode would work with that use case,
306         //       however, so for now we just call `recv`.
307 
308         let mut amt = 0;
309         for buf in bufs {
310             match (&self.imp.inner.socket).read(buf) {
311                 // If we did a partial read, then return what we've read so far
312                 Ok(n) if n < buf.len() => return Ok(amt + n),
313 
314                 // Otherwise filled this buffer entirely, so try to fill the
315                 // next one as well.
316                 Ok(n) => amt += n,
317 
318                 // If we hit an error then things get tricky if we've already
319                 // read some data. If the error is "would block" then we just
320                 // return the data we've read so far while scheduling another
321                 // 0-byte read.
322                 //
323                 // If we've read data and the error kind is not "would block",
324                 // then we stash away the error to get returned later and return
325                 // the data that we've read.
326                 //
327                 // Finally if we haven't actually read any data we just
328                 // reschedule a 0-byte read to happen again and then return the
329                 // error upwards.
330                 Err(e) => {
331                     if amt > 0 && e.kind() == io::ErrorKind::WouldBlock {
332                         me.read = State::Empty;
333                         self.imp.schedule_read(&mut me);
334                         return Ok(amt)
335                     } else if amt > 0 {
336                         me.read = State::Error(e);
337                         return Ok(amt)
338                     } else {
339                         me.read = State::Empty;
340                         self.imp.schedule_read(&mut me);
341                         return Err(e)
342                     }
343                 }
344             }
345         }
346 
347         Ok(amt)
348     }
349 
write(&self, buf: &[u8]) -> io::Result<usize>350     pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
351         match IoVec::from_bytes(buf) {
352             Some(vec) => self.writev(&[vec]),
353             None => Ok(0),
354         }
355     }
356 
writev(&self, bufs: &[&IoVec]) -> io::Result<usize>357     pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
358         let mut me = self.inner();
359         let me = &mut *me;
360 
361         match mem::replace(&mut me.write, State::Empty) {
362             State::Empty => {}
363             State::Error(e) => return Err(e),
364             other => {
365                 me.write = other;
366                 return Err(io::ErrorKind::WouldBlock.into())
367             }
368         }
369 
370         if !me.iocp.registered() {
371             return Err(io::ErrorKind::WouldBlock.into())
372         }
373 
374         if bufs.is_empty() {
375             return Ok(0)
376         }
377 
378         let len = bufs.iter().map(|b| b.len()).fold(0, |a, b| a + b);
379         let mut intermediate = me.iocp.get_buffer(len);
380         for buf in bufs {
381             intermediate.extend_from_slice(buf);
382         }
383         self.imp.schedule_write(intermediate, 0, me);
384         Ok(len)
385     }
386 
flush(&self) -> io::Result<()>387     pub fn flush(&self) -> io::Result<()> {
388         Ok(())
389     }
390 }
391 
392 impl StreamImp {
inner(&self) -> MutexGuard<StreamInner>393     fn inner(&self) -> MutexGuard<StreamInner> {
394         self.inner.inner.lock().unwrap()
395     }
396 
schedule_connect(&self, addr: &SocketAddr) -> io::Result<()>397     fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> {
398         unsafe {
399             trace!("scheduling a connect");
400             self.inner.socket.connect_overlapped(addr, &[], self.inner.read.as_mut_ptr())?;
401         }
402         // see docs above on StreamImp.inner for rationale on forget
403         mem::forget(self.clone());
404         Ok(())
405     }
406 
407     /// Schedule a read to happen on this socket, enqueuing us to receive a
408     /// notification when a read is ready.
409     ///
410     /// Note that this does *not* work with a buffer. When reading a TCP stream
411     /// we actually read into a 0-byte buffer so Windows will send us a
412     /// notification when the socket is otherwise ready for reading. This allows
413     /// us to avoid buffer allocations for in-flight reads.
schedule_read(&self, me: &mut StreamInner)414     fn schedule_read(&self, me: &mut StreamInner) {
415         match me.read {
416             State::Empty => {}
417             State::Ready(_) | State::Error(_) => {
418                 self.add_readiness(me, Ready::readable());
419                 return;
420             }
421             _ => return,
422         }
423 
424         me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
425 
426         trace!("scheduling a read");
427         let res = unsafe {
428             self.inner.socket.read_overlapped(&mut [], self.inner.read.as_mut_ptr())
429         };
430         match res {
431             // Note that `Ok(true)` means that this completed immediately and
432             // our socket is readable. This typically means that the caller of
433             // this function (likely `read` above) can try again as an
434             // optimization and return bytes quickly.
435             //
436             // Normally, though, although the read completed immediately
437             // there's still an IOCP completion packet enqueued that we're going
438             // to receive.
439             //
440             // You can configure this behavior (miow) with
441             // SetFileCompletionNotificationModes to indicate that `Ok(true)`
442             // does **not** enqueue a completion packet. (This is the case
443             // for me.instant_notify)
444             //
445             // Note that apparently libuv has scary code to work around bugs in
446             // `WSARecv` for UDP sockets apparently for handles which have had
447             // the `SetFileCompletionNotificationModes` function called on them,
448             // worth looking into!
449             Ok(Some(_)) if me.instant_notify => {
450                 me.read = State::Ready(());
451                 self.add_readiness(me, Ready::readable());
452             }
453             Ok(_) => {
454                 // see docs above on StreamImp.inner for rationale on forget
455                 me.read = State::Pending(());
456                 mem::forget(self.clone());
457             }
458             Err(e) => {
459                 me.read = State::Error(e);
460                 self.add_readiness(me, Ready::readable());
461             }
462         }
463     }
464 
465     /// Similar to `schedule_read`, except that this issues, well, writes.
466     ///
467     /// This function will continually attempt to write the entire contents of
468     /// the buffer `buf` until they have all been written. The `pos` argument is
469     /// the current offset within the buffer up to which the contents have
470     /// already been written.
471     ///
472     /// A new writable event (e.g. allowing another write) will only happen once
473     /// the buffer has been written completely (or hit an error).
schedule_write(&self, buf: Vec<u8>, mut pos: usize, me: &mut StreamInner)474     fn schedule_write(&self,
475                       buf: Vec<u8>,
476                       mut pos: usize,
477                       me: &mut StreamInner) {
478 
479         // About to write, clear any pending level triggered events
480         me.iocp.set_readiness(me.iocp.readiness() - Ready::writable());
481 
482         loop {
483             trace!("scheduling a write of {} bytes", buf[pos..].len());
484             let ret = unsafe {
485                 self.inner.socket.write_overlapped(&buf[pos..], self.inner.write.as_mut_ptr())
486             };
487             match ret {
488                 Ok(Some(transferred_bytes)) if me.instant_notify => {
489                     trace!("done immediately with {} bytes", transferred_bytes);
490                     if transferred_bytes == buf.len() - pos {
491                         self.add_readiness(me, Ready::writable());
492                         me.write = State::Empty;
493                         break;
494                     }
495                     pos += transferred_bytes;
496                 }
497                 Ok(_) => {
498                     trace!("scheduled for later");
499                     // see docs above on StreamImp.inner for rationale on forget
500                     me.write = State::Pending((buf, pos));
501                     mem::forget(self.clone());
502                     break;
503                 }
504                 Err(e) => {
505                     trace!("write error: {}", e);
506                     me.write = State::Error(e);
507                     self.add_readiness(me, Ready::writable());
508                     me.iocp.put_buffer(buf);
509                     break;
510                 }
511             }
512         }
513     }
514 
515     /// Pushes an event for this socket onto the selector its registered for.
516     ///
517     /// When an event is generated on this socket, if it happened after the
518     /// socket was closed then we don't want to actually push the event onto our
519     /// selector as otherwise it's just a spurious notification.
add_readiness(&self, me: &mut StreamInner, set: Ready)520     fn add_readiness(&self, me: &mut StreamInner, set: Ready) {
521         me.iocp.set_readiness(set | me.iocp.readiness());
522     }
523 }
524 
read_done(status: &OVERLAPPED_ENTRY)525 fn read_done(status: &OVERLAPPED_ENTRY) {
526     let status = CompletionStatus::from_entry(status);
527     let me2 = StreamImp {
528         inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) },
529     };
530 
531     let mut me = me2.inner();
532     match mem::replace(&mut me.read, State::Empty) {
533         State::Pending(()) => {
534             trace!("finished a read: {}", status.bytes_transferred());
535             assert_eq!(status.bytes_transferred(), 0);
536             me.read = State::Ready(());
537             return me2.add_readiness(&mut me, Ready::readable())
538         }
539         s => me.read = s,
540     }
541 
542     // If a read didn't complete, then the connect must have just finished.
543     trace!("finished a connect");
544 
545     // By guarding with socket.result(), we ensure that a connection
546     // was successfully made before performing operations requiring a
547     // connected socket.
548     match unsafe { me2.inner.socket.result(status.overlapped()) }
549         .and_then(|_| me2.inner.socket.connect_complete())
550     {
551         Ok(()) => {
552             me2.add_readiness(&mut me, Ready::writable());
553             me2.schedule_read(&mut me);
554         }
555         Err(e) => {
556             me2.add_readiness(&mut me, Ready::readable() | Ready::writable());
557             me.read = State::Error(e);
558         }
559     }
560 }
561 
write_done(status: &OVERLAPPED_ENTRY)562 fn write_done(status: &OVERLAPPED_ENTRY) {
563     let status = CompletionStatus::from_entry(status);
564     trace!("finished a write {}", status.bytes_transferred());
565     let me2 = StreamImp {
566         inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) },
567     };
568     let mut me = me2.inner();
569     let (buf, pos) = match mem::replace(&mut me.write, State::Empty) {
570         State::Pending(pair) => pair,
571         _ => unreachable!(),
572     };
573     let new_pos = pos + (status.bytes_transferred() as usize);
574     if new_pos == buf.len() {
575         me2.add_readiness(&mut me, Ready::writable());
576     } else {
577         me2.schedule_write(buf, new_pos, &mut me);
578     }
579 }
580 
581 impl Evented for TcpStream {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>582     fn register(&self, poll: &Poll, token: Token,
583                 interest: Ready, opts: PollOpt) -> io::Result<()> {
584         let mut me = self.inner();
585         me.iocp.register_socket(&self.imp.inner.socket, poll, token,
586                                      interest, opts, &self.registration)?;
587 
588         unsafe {
589             super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
590             me.instant_notify = true;
591         }
592 
593         // If we were connected before being registered process that request
594         // here and go along our merry ways. Note that the callback for a
595         // successful connect will worry about generating writable/readable
596         // events and scheduling a new read.
597         if let Some(addr) = me.deferred_connect.take() {
598             return self.imp.schedule_connect(&addr).map(|_| ())
599         }
600         self.post_register(interest, &mut me);
601         Ok(())
602     }
603 
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>604     fn reregister(&self, poll: &Poll, token: Token,
605                   interest: Ready, opts: PollOpt) -> io::Result<()> {
606         let mut me = self.inner();
607         me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
608                                        interest, opts, &self.registration)?;
609         self.post_register(interest, &mut me);
610         Ok(())
611     }
612 
deregister(&self, poll: &Poll) -> io::Result<()>613     fn deregister(&self, poll: &Poll) -> io::Result<()> {
614         self.inner().iocp.deregister(&self.imp.inner.socket,
615                                      poll, &self.registration)
616     }
617 }
618 
619 impl fmt::Debug for TcpStream {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result620     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
621         f.debug_struct("TcpStream")
622             .finish()
623     }
624 }
625 
626 impl Drop for TcpStream {
drop(&mut self)627     fn drop(&mut self) {
628         // If we're still internally reading, we're no longer interested. Note
629         // though that we don't cancel any writes which may have been issued to
630         // preserve the same semantics as Unix.
631         //
632         // Note that "Empty" here may mean that a connect is pending, so we
633         // cancel even if that happens as well.
634         unsafe {
635             match self.inner().read {
636                 State::Pending(_) | State::Empty => {
637                     trace!("cancelling active TCP read");
638                     drop(super::cancel(&self.imp.inner.socket,
639                                        &self.imp.inner.read));
640                 }
641                 State::Ready(_) | State::Error(_) => {}
642             }
643         }
644     }
645 }
646 
647 impl TcpListener {
new(socket: net::TcpListener) -> io::Result<TcpListener>648     pub fn new(socket: net::TcpListener)
649                -> io::Result<TcpListener> {
650         let addr = socket.local_addr()?;
651         Ok(TcpListener::new_family(socket, match addr {
652             SocketAddr::V4(..) => Family::V4,
653             SocketAddr::V6(..) => Family::V6,
654         }))
655     }
656 
new_family(socket: net::TcpListener, family: Family) -> TcpListener657     fn new_family(socket: net::TcpListener, family: Family) -> TcpListener {
658         TcpListener {
659             registration: Mutex::new(None),
660             imp: ListenerImp {
661                 inner: FromRawArc::new(ListenerIo {
662                     accept: Overlapped::new(accept_done),
663                     family: family,
664                     socket: socket,
665                     inner: Mutex::new(ListenerInner {
666                         iocp: ReadyBinding::new(),
667                         accept: State::Empty,
668                         accept_buf: AcceptAddrsBuf::new(),
669                     }),
670                 }),
671             },
672         }
673     }
674 
accept(&self) -> io::Result<(net::TcpStream, SocketAddr)>675     pub fn accept(&self) -> io::Result<(net::TcpStream, SocketAddr)> {
676         let mut me = self.inner();
677 
678         let ret = match mem::replace(&mut me.accept, State::Empty) {
679             State::Empty => return Err(io::ErrorKind::WouldBlock.into()),
680             State::Pending(t) => {
681                 me.accept = State::Pending(t);
682                 return Err(io::ErrorKind::WouldBlock.into());
683             }
684             State::Ready((s, a)) => Ok((s, a)),
685             State::Error(e) => Err(e),
686         };
687 
688         self.imp.schedule_accept(&mut me);
689 
690         return ret
691     }
692 
local_addr(&self) -> io::Result<SocketAddr>693     pub fn local_addr(&self) -> io::Result<SocketAddr> {
694         self.imp.inner.socket.local_addr()
695     }
696 
try_clone(&self) -> io::Result<TcpListener>697     pub fn try_clone(&self) -> io::Result<TcpListener> {
698         self.imp.inner.socket.try_clone().map(|s| {
699             TcpListener::new_family(s, self.imp.inner.family)
700         })
701     }
702 
703     #[allow(deprecated)]
set_only_v6(&self, only_v6: bool) -> io::Result<()>704     pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
705         self.imp.inner.socket.set_only_v6(only_v6)
706     }
707 
708     #[allow(deprecated)]
only_v6(&self) -> io::Result<bool>709     pub fn only_v6(&self) -> io::Result<bool> {
710         self.imp.inner.socket.only_v6()
711     }
712 
set_ttl(&self, ttl: u32) -> io::Result<()>713     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
714         self.imp.inner.socket.set_ttl(ttl)
715     }
716 
ttl(&self) -> io::Result<u32>717     pub fn ttl(&self) -> io::Result<u32> {
718         self.imp.inner.socket.ttl()
719     }
720 
take_error(&self) -> io::Result<Option<io::Error>>721     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
722         self.imp.inner.socket.take_error()
723     }
724 
inner(&self) -> MutexGuard<ListenerInner>725     fn inner(&self) -> MutexGuard<ListenerInner> {
726         self.imp.inner()
727     }
728 }
729 
730 impl ListenerImp {
inner(&self) -> MutexGuard<ListenerInner>731     fn inner(&self) -> MutexGuard<ListenerInner> {
732         self.inner.inner.lock().unwrap()
733     }
734 
schedule_accept(&self, me: &mut ListenerInner)735     fn schedule_accept(&self, me: &mut ListenerInner) {
736         match me.accept {
737             State::Empty => {}
738             _ => return
739         }
740 
741         me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
742 
743         let res = match self.inner.family {
744             Family::V4 => TcpBuilder::new_v4(),
745             Family::V6 => TcpBuilder::new_v6(),
746         }
747         .and_then(|builder| builder.to_tcp_stream())
748         .and_then(|stream| unsafe {
749             trace!("scheduling an accept");
750             self.inner
751                 .socket
752                 .accept_overlapped(&stream, &mut me.accept_buf, self.inner.accept.as_mut_ptr())
753                 .map(|x| (stream, x))
754         });
755         match res {
756             Ok((socket, _)) => {
757                 // see docs above on StreamImp.inner for rationale on forget
758                 me.accept = State::Pending(socket);
759                 mem::forget(self.clone());
760             }
761             Err(e) => {
762                 me.accept = State::Error(e);
763                 self.add_readiness(me, Ready::readable());
764             }
765         }
766     }
767 
768     // See comments in StreamImp::push
add_readiness(&self, me: &mut ListenerInner, set: Ready)769     fn add_readiness(&self, me: &mut ListenerInner, set: Ready) {
770         me.iocp.set_readiness(set | me.iocp.readiness());
771     }
772 }
773 
accept_done(status: &OVERLAPPED_ENTRY)774 fn accept_done(status: &OVERLAPPED_ENTRY) {
775     let status = CompletionStatus::from_entry(status);
776     let me2 = ListenerImp {
777         inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) },
778     };
779 
780     let mut me = me2.inner();
781     let socket = match mem::replace(&mut me.accept, State::Empty) {
782         State::Pending(s) => s,
783         _ => unreachable!(),
784     };
785     trace!("finished an accept");
786     let result = me2.inner.socket.accept_complete(&socket).and_then(|()| {
787         me.accept_buf.parse(&me2.inner.socket)
788     }).and_then(|buf| {
789         buf.remote().ok_or_else(|| {
790             io::Error::new(ErrorKind::Other, "could not obtain remote address")
791         })
792     });
793     me.accept = match result {
794         Ok(remote_addr) => State::Ready((socket, remote_addr)),
795         Err(e) => State::Error(e),
796     };
797     me2.add_readiness(&mut me, Ready::readable());
798 }
799 
800 impl Evented for TcpListener {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>801     fn register(&self, poll: &Poll, token: Token,
802                 interest: Ready, opts: PollOpt) -> io::Result<()> {
803         let mut me = self.inner();
804         me.iocp.register_socket(&self.imp.inner.socket, poll, token,
805                                      interest, opts, &self.registration)?;
806 
807         unsafe {
808             super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
809         }
810 
811         self.imp.schedule_accept(&mut me);
812         Ok(())
813     }
814 
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>815     fn reregister(&self, poll: &Poll, token: Token,
816                   interest: Ready, opts: PollOpt) -> io::Result<()> {
817         let mut me = self.inner();
818         me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
819                                        interest, opts, &self.registration)?;
820         self.imp.schedule_accept(&mut me);
821         Ok(())
822     }
823 
deregister(&self, poll: &Poll) -> io::Result<()>824     fn deregister(&self, poll: &Poll) -> io::Result<()> {
825         self.inner().iocp.deregister(&self.imp.inner.socket,
826                                      poll, &self.registration)
827     }
828 }
829 
830 impl fmt::Debug for TcpListener {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result831     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
832         f.debug_struct("TcpListener")
833             .finish()
834     }
835 }
836 
837 impl Drop for TcpListener {
drop(&mut self)838     fn drop(&mut self) {
839         // If we're still internally reading, we're no longer interested.
840         unsafe {
841             match self.inner().accept {
842                 State::Pending(_) => {
843                     trace!("cancelling active TCP accept");
844                     drop(super::cancel(&self.imp.inner.socket,
845                                        &self.imp.inner.accept));
846                 }
847                 State::Empty |
848                 State::Ready(_) |
849                 State::Error(_) => {}
850             }
851         }
852     }
853 }
854