1 use std::fmt;
2 use std::io::{self, Read, ErrorKind};
3 use std::mem;
4 use std::net::{self, SocketAddr, Shutdown};
5 use std::os::windows::prelude::*;
6 use std::sync::{Mutex, MutexGuard};
7 use std::time::Duration;
8
9 use miow::iocp::CompletionStatus;
10 use miow::net::*;
11 use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt};
12 use winapi::um::minwinbase::OVERLAPPED_ENTRY;
13 use winapi::um::winnt::HANDLE;
14 use iovec::IoVec;
15
16 use {poll, Ready, Poll, PollOpt, Token};
17 use event::Evented;
18 use sys::windows::from_raw_arc::FromRawArc;
19 use sys::windows::selector::{Overlapped, ReadyBinding};
20 use sys::windows::Family;
21
22 pub struct TcpStream {
23 /// Separately stored implementation to ensure that the `Drop`
24 /// implementation on this type is only executed when it's actually dropped
25 /// (many clones of this `imp` are made).
26 imp: StreamImp,
27 registration: Mutex<Option<poll::Registration>>,
28 }
29
30 pub struct TcpListener {
31 imp: ListenerImp,
32 registration: Mutex<Option<poll::Registration>>,
33 }
34
35 #[derive(Clone)]
36 struct StreamImp {
37 /// A stable address and synchronized access for all internals. This serves
38 /// to ensure that all `Overlapped` pointers are valid for a long period of
39 /// time as well as allowing completion callbacks to have access to the
40 /// internals without having ownership.
41 ///
42 /// Note that the reference count also allows us "loan out" copies to
43 /// completion ports while I/O is running to guarantee that this stays alive
44 /// until the I/O completes. You'll notice a number of calls to
45 /// `mem::forget` below, and these only happen on successful scheduling of
46 /// I/O and are paired with `overlapped2arc!` macro invocations in the
47 /// completion callbacks (to have a decrement match the increment).
48 inner: FromRawArc<StreamIo>,
49 }
50
51 #[derive(Clone)]
52 struct ListenerImp {
53 inner: FromRawArc<ListenerIo>,
54 }
55
56 struct StreamIo {
57 inner: Mutex<StreamInner>,
58 read: Overlapped, // also used for connect
59 write: Overlapped,
60 socket: net::TcpStream,
61 }
62
63 struct ListenerIo {
64 inner: Mutex<ListenerInner>,
65 accept: Overlapped,
66 family: Family,
67 socket: net::TcpListener,
68 }
69
70 struct StreamInner {
71 iocp: ReadyBinding,
72 deferred_connect: Option<SocketAddr>,
73 read: State<(), ()>,
74 write: State<(Vec<u8>, usize), (Vec<u8>, usize)>,
75 /// whether we are instantly notified of success
76 /// (FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
77 /// without a roundtrip through the event loop)
78 instant_notify: bool,
79 }
80
81 struct ListenerInner {
82 iocp: ReadyBinding,
83 accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>,
84 accept_buf: AcceptAddrsBuf,
85 }
86
87 enum State<T, U> {
88 Empty, // no I/O operation in progress
89 Pending(T), // an I/O operation is in progress
90 Ready(U), // I/O has finished with this value
91 Error(io::Error), // there was an I/O error
92 }
93
94 impl TcpStream {
new(socket: net::TcpStream, deferred_connect: Option<SocketAddr>) -> TcpStream95 fn new(socket: net::TcpStream,
96 deferred_connect: Option<SocketAddr>) -> TcpStream {
97 TcpStream {
98 registration: Mutex::new(None),
99 imp: StreamImp {
100 inner: FromRawArc::new(StreamIo {
101 read: Overlapped::new(read_done),
102 write: Overlapped::new(write_done),
103 socket: socket,
104 inner: Mutex::new(StreamInner {
105 iocp: ReadyBinding::new(),
106 deferred_connect: deferred_connect,
107 read: State::Empty,
108 write: State::Empty,
109 instant_notify: false,
110 }),
111 }),
112 },
113 }
114 }
115
connect(socket: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream>116 pub fn connect(socket: net::TcpStream, addr: &SocketAddr)
117 -> io::Result<TcpStream> {
118 socket.set_nonblocking(true)?;
119 Ok(TcpStream::new(socket, Some(*addr)))
120 }
121
from_stream(stream: net::TcpStream) -> TcpStream122 pub fn from_stream(stream: net::TcpStream) -> TcpStream {
123 TcpStream::new(stream, None)
124 }
125
peer_addr(&self) -> io::Result<SocketAddr>126 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
127 self.imp.inner.socket.peer_addr()
128 }
129
local_addr(&self) -> io::Result<SocketAddr>130 pub fn local_addr(&self) -> io::Result<SocketAddr> {
131 self.imp.inner.socket.local_addr()
132 }
133
try_clone(&self) -> io::Result<TcpStream>134 pub fn try_clone(&self) -> io::Result<TcpStream> {
135 self.imp.inner.socket.try_clone().map(|s| TcpStream::new(s, None))
136 }
137
shutdown(&self, how: Shutdown) -> io::Result<()>138 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
139 self.imp.inner.socket.shutdown(how)
140 }
141
set_nodelay(&self, nodelay: bool) -> io::Result<()>142 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
143 self.imp.inner.socket.set_nodelay(nodelay)
144 }
145
nodelay(&self) -> io::Result<bool>146 pub fn nodelay(&self) -> io::Result<bool> {
147 self.imp.inner.socket.nodelay()
148 }
149
set_recv_buffer_size(&self, size: usize) -> io::Result<()>150 pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
151 self.imp.inner.socket.set_recv_buffer_size(size)
152 }
153
recv_buffer_size(&self) -> io::Result<usize>154 pub fn recv_buffer_size(&self) -> io::Result<usize> {
155 self.imp.inner.socket.recv_buffer_size()
156 }
157
set_send_buffer_size(&self, size: usize) -> io::Result<()>158 pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
159 self.imp.inner.socket.set_send_buffer_size(size)
160 }
161
send_buffer_size(&self) -> io::Result<usize>162 pub fn send_buffer_size(&self) -> io::Result<usize> {
163 self.imp.inner.socket.send_buffer_size()
164 }
165
set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()>166 pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
167 self.imp.inner.socket.set_keepalive(keepalive)
168 }
169
keepalive(&self) -> io::Result<Option<Duration>>170 pub fn keepalive(&self) -> io::Result<Option<Duration>> {
171 self.imp.inner.socket.keepalive()
172 }
173
set_ttl(&self, ttl: u32) -> io::Result<()>174 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
175 self.imp.inner.socket.set_ttl(ttl)
176 }
177
ttl(&self) -> io::Result<u32>178 pub fn ttl(&self) -> io::Result<u32> {
179 self.imp.inner.socket.ttl()
180 }
181
set_only_v6(&self, only_v6: bool) -> io::Result<()>182 pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
183 self.imp.inner.socket.set_only_v6(only_v6)
184 }
185
only_v6(&self) -> io::Result<bool>186 pub fn only_v6(&self) -> io::Result<bool> {
187 self.imp.inner.socket.only_v6()
188 }
189
set_linger(&self, dur: Option<Duration>) -> io::Result<()>190 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
191 self.imp.inner.socket.set_linger(dur)
192 }
193
linger(&self) -> io::Result<Option<Duration>>194 pub fn linger(&self) -> io::Result<Option<Duration>> {
195 self.imp.inner.socket.linger()
196 }
197
take_error(&self) -> io::Result<Option<io::Error>>198 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
199 if let Some(e) = self.imp.inner.socket.take_error()? {
200 return Ok(Some(e))
201 }
202
203 // If the syscall didn't return anything then also check to see if we've
204 // squirreled away an error elsewhere for example as part of a connect
205 // operation.
206 //
207 // Typically this is used like so:
208 //
209 // 1. A `connect` is issued
210 // 2. Wait for the socket to be writable
211 // 3. Call `take_error` to see if the connect succeeded.
212 //
213 // Right now the `connect` operation finishes in `read_done` below and
214 // fill will in `State::Error` in the `read` slot if it fails, so we
215 // extract that here.
216 let mut me = self.inner();
217 match mem::replace(&mut me.read, State::Empty) {
218 State::Error(e) => {
219 self.imp.schedule_read(&mut me);
220 Ok(Some(e))
221 }
222 other => {
223 me.read = other;
224 Ok(None)
225 }
226 }
227 }
228
inner(&self) -> MutexGuard<StreamInner>229 fn inner(&self) -> MutexGuard<StreamInner> {
230 self.imp.inner()
231 }
232
before_read(&self) -> io::Result<MutexGuard<StreamInner>>233 fn before_read(&self) -> io::Result<MutexGuard<StreamInner>> {
234 let mut me = self.inner();
235
236 match me.read {
237 // Empty == we're not associated yet, and if we're pending then
238 // these are both cases where we return "would block"
239 State::Empty |
240 State::Pending(()) => return Err(io::ErrorKind::WouldBlock.into()),
241
242 // If we got a delayed error as part of a `read_overlapped` below,
243 // return that here. Also schedule another read in case it was
244 // transient.
245 State::Error(_) => {
246 let e = match mem::replace(&mut me.read, State::Empty) {
247 State::Error(e) => e,
248 _ => panic!(),
249 };
250 self.imp.schedule_read(&mut me);
251 return Err(e)
252 }
253
254 // If we're ready for a read then some previous 0-byte read has
255 // completed. In that case the OS's socket buffer has something for
256 // us, so we just keep pulling out bytes while we can in the loop
257 // below.
258 State::Ready(()) => {}
259 }
260
261 Ok(me)
262 }
263
post_register(&self, interest: Ready, me: &mut StreamInner)264 fn post_register(&self, interest: Ready, me: &mut StreamInner) {
265 if interest.is_readable() {
266 self.imp.schedule_read(me);
267 }
268
269 // At least with epoll, if a socket is registered with an interest in
270 // writing and it's immediately writable then a writable event is
271 // generated immediately, so do so here.
272 if interest.is_writable() {
273 if let State::Empty = me.write {
274 self.imp.add_readiness(me, Ready::writable());
275 }
276 }
277 }
278
read(&self, buf: &mut [u8]) -> io::Result<usize>279 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
280 match IoVec::from_bytes_mut(buf) {
281 Some(vec) => self.readv(&mut [vec]),
282 None => Ok(0),
283 }
284 }
285
peek(&self, buf: &mut [u8]) -> io::Result<usize>286 pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
287 let mut me = self.before_read()?;
288
289 match (&self.imp.inner.socket).peek(buf) {
290 Ok(n) => Ok(n),
291 Err(e) => {
292 me.read = State::Empty;
293 self.imp.schedule_read(&mut me);
294 Err(e)
295 }
296 }
297 }
298
readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize>299 pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
300 let mut me = self.before_read()?;
301
302 // TODO: Does WSARecv work on a nonblocking sockets? We ideally want to
303 // call that instead of looping over all the buffers and calling
304 // `recv` on each buffer. I'm not sure though if an overlapped
305 // socket in nonblocking mode would work with that use case,
306 // however, so for now we just call `recv`.
307
308 let mut amt = 0;
309 for buf in bufs {
310 match (&self.imp.inner.socket).read(buf) {
311 // If we did a partial read, then return what we've read so far
312 Ok(n) if n < buf.len() => return Ok(amt + n),
313
314 // Otherwise filled this buffer entirely, so try to fill the
315 // next one as well.
316 Ok(n) => amt += n,
317
318 // If we hit an error then things get tricky if we've already
319 // read some data. If the error is "would block" then we just
320 // return the data we've read so far while scheduling another
321 // 0-byte read.
322 //
323 // If we've read data and the error kind is not "would block",
324 // then we stash away the error to get returned later and return
325 // the data that we've read.
326 //
327 // Finally if we haven't actually read any data we just
328 // reschedule a 0-byte read to happen again and then return the
329 // error upwards.
330 Err(e) => {
331 if amt > 0 && e.kind() == io::ErrorKind::WouldBlock {
332 me.read = State::Empty;
333 self.imp.schedule_read(&mut me);
334 return Ok(amt)
335 } else if amt > 0 {
336 me.read = State::Error(e);
337 return Ok(amt)
338 } else {
339 me.read = State::Empty;
340 self.imp.schedule_read(&mut me);
341 return Err(e)
342 }
343 }
344 }
345 }
346
347 Ok(amt)
348 }
349
write(&self, buf: &[u8]) -> io::Result<usize>350 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
351 match IoVec::from_bytes(buf) {
352 Some(vec) => self.writev(&[vec]),
353 None => Ok(0),
354 }
355 }
356
writev(&self, bufs: &[&IoVec]) -> io::Result<usize>357 pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
358 let mut me = self.inner();
359 let me = &mut *me;
360
361 match mem::replace(&mut me.write, State::Empty) {
362 State::Empty => {}
363 State::Error(e) => return Err(e),
364 other => {
365 me.write = other;
366 return Err(io::ErrorKind::WouldBlock.into())
367 }
368 }
369
370 if !me.iocp.registered() {
371 return Err(io::ErrorKind::WouldBlock.into())
372 }
373
374 if bufs.is_empty() {
375 return Ok(0)
376 }
377
378 let len = bufs.iter().map(|b| b.len()).fold(0, |a, b| a + b);
379 let mut intermediate = me.iocp.get_buffer(len);
380 for buf in bufs {
381 intermediate.extend_from_slice(buf);
382 }
383 self.imp.schedule_write(intermediate, 0, me);
384 Ok(len)
385 }
386
flush(&self) -> io::Result<()>387 pub fn flush(&self) -> io::Result<()> {
388 Ok(())
389 }
390 }
391
392 impl StreamImp {
inner(&self) -> MutexGuard<StreamInner>393 fn inner(&self) -> MutexGuard<StreamInner> {
394 self.inner.inner.lock().unwrap()
395 }
396
schedule_connect(&self, addr: &SocketAddr) -> io::Result<()>397 fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> {
398 unsafe {
399 trace!("scheduling a connect");
400 self.inner.socket.connect_overlapped(addr, &[], self.inner.read.as_mut_ptr())?;
401 }
402 // see docs above on StreamImp.inner for rationale on forget
403 mem::forget(self.clone());
404 Ok(())
405 }
406
407 /// Schedule a read to happen on this socket, enqueuing us to receive a
408 /// notification when a read is ready.
409 ///
410 /// Note that this does *not* work with a buffer. When reading a TCP stream
411 /// we actually read into a 0-byte buffer so Windows will send us a
412 /// notification when the socket is otherwise ready for reading. This allows
413 /// us to avoid buffer allocations for in-flight reads.
schedule_read(&self, me: &mut StreamInner)414 fn schedule_read(&self, me: &mut StreamInner) {
415 match me.read {
416 State::Empty => {}
417 State::Ready(_) | State::Error(_) => {
418 self.add_readiness(me, Ready::readable());
419 return;
420 }
421 _ => return,
422 }
423
424 me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
425
426 trace!("scheduling a read");
427 let res = unsafe {
428 self.inner.socket.read_overlapped(&mut [], self.inner.read.as_mut_ptr())
429 };
430 match res {
431 // Note that `Ok(true)` means that this completed immediately and
432 // our socket is readable. This typically means that the caller of
433 // this function (likely `read` above) can try again as an
434 // optimization and return bytes quickly.
435 //
436 // Normally, though, although the read completed immediately
437 // there's still an IOCP completion packet enqueued that we're going
438 // to receive.
439 //
440 // You can configure this behavior (miow) with
441 // SetFileCompletionNotificationModes to indicate that `Ok(true)`
442 // does **not** enqueue a completion packet. (This is the case
443 // for me.instant_notify)
444 //
445 // Note that apparently libuv has scary code to work around bugs in
446 // `WSARecv` for UDP sockets apparently for handles which have had
447 // the `SetFileCompletionNotificationModes` function called on them,
448 // worth looking into!
449 Ok(Some(_)) if me.instant_notify => {
450 me.read = State::Ready(());
451 self.add_readiness(me, Ready::readable());
452 }
453 Ok(_) => {
454 // see docs above on StreamImp.inner for rationale on forget
455 me.read = State::Pending(());
456 mem::forget(self.clone());
457 }
458 Err(e) => {
459 me.read = State::Error(e);
460 self.add_readiness(me, Ready::readable());
461 }
462 }
463 }
464
465 /// Similar to `schedule_read`, except that this issues, well, writes.
466 ///
467 /// This function will continually attempt to write the entire contents of
468 /// the buffer `buf` until they have all been written. The `pos` argument is
469 /// the current offset within the buffer up to which the contents have
470 /// already been written.
471 ///
472 /// A new writable event (e.g. allowing another write) will only happen once
473 /// the buffer has been written completely (or hit an error).
schedule_write(&self, buf: Vec<u8>, mut pos: usize, me: &mut StreamInner)474 fn schedule_write(&self,
475 buf: Vec<u8>,
476 mut pos: usize,
477 me: &mut StreamInner) {
478
479 // About to write, clear any pending level triggered events
480 me.iocp.set_readiness(me.iocp.readiness() - Ready::writable());
481
482 loop {
483 trace!("scheduling a write of {} bytes", buf[pos..].len());
484 let ret = unsafe {
485 self.inner.socket.write_overlapped(&buf[pos..], self.inner.write.as_mut_ptr())
486 };
487 match ret {
488 Ok(Some(transferred_bytes)) if me.instant_notify => {
489 trace!("done immediately with {} bytes", transferred_bytes);
490 if transferred_bytes == buf.len() - pos {
491 self.add_readiness(me, Ready::writable());
492 me.write = State::Empty;
493 break;
494 }
495 pos += transferred_bytes;
496 }
497 Ok(_) => {
498 trace!("scheduled for later");
499 // see docs above on StreamImp.inner for rationale on forget
500 me.write = State::Pending((buf, pos));
501 mem::forget(self.clone());
502 break;
503 }
504 Err(e) => {
505 trace!("write error: {}", e);
506 me.write = State::Error(e);
507 self.add_readiness(me, Ready::writable());
508 me.iocp.put_buffer(buf);
509 break;
510 }
511 }
512 }
513 }
514
515 /// Pushes an event for this socket onto the selector its registered for.
516 ///
517 /// When an event is generated on this socket, if it happened after the
518 /// socket was closed then we don't want to actually push the event onto our
519 /// selector as otherwise it's just a spurious notification.
add_readiness(&self, me: &mut StreamInner, set: Ready)520 fn add_readiness(&self, me: &mut StreamInner, set: Ready) {
521 me.iocp.set_readiness(set | me.iocp.readiness());
522 }
523 }
524
read_done(status: &OVERLAPPED_ENTRY)525 fn read_done(status: &OVERLAPPED_ENTRY) {
526 let status = CompletionStatus::from_entry(status);
527 let me2 = StreamImp {
528 inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) },
529 };
530
531 let mut me = me2.inner();
532 match mem::replace(&mut me.read, State::Empty) {
533 State::Pending(()) => {
534 trace!("finished a read: {}", status.bytes_transferred());
535 assert_eq!(status.bytes_transferred(), 0);
536 me.read = State::Ready(());
537 return me2.add_readiness(&mut me, Ready::readable())
538 }
539 s => me.read = s,
540 }
541
542 // If a read didn't complete, then the connect must have just finished.
543 trace!("finished a connect");
544
545 // By guarding with socket.result(), we ensure that a connection
546 // was successfully made before performing operations requiring a
547 // connected socket.
548 match unsafe { me2.inner.socket.result(status.overlapped()) }
549 .and_then(|_| me2.inner.socket.connect_complete())
550 {
551 Ok(()) => {
552 me2.add_readiness(&mut me, Ready::writable());
553 me2.schedule_read(&mut me);
554 }
555 Err(e) => {
556 me2.add_readiness(&mut me, Ready::readable() | Ready::writable());
557 me.read = State::Error(e);
558 }
559 }
560 }
561
write_done(status: &OVERLAPPED_ENTRY)562 fn write_done(status: &OVERLAPPED_ENTRY) {
563 let status = CompletionStatus::from_entry(status);
564 trace!("finished a write {}", status.bytes_transferred());
565 let me2 = StreamImp {
566 inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) },
567 };
568 let mut me = me2.inner();
569 let (buf, pos) = match mem::replace(&mut me.write, State::Empty) {
570 State::Pending(pair) => pair,
571 _ => unreachable!(),
572 };
573 let new_pos = pos + (status.bytes_transferred() as usize);
574 if new_pos == buf.len() {
575 me2.add_readiness(&mut me, Ready::writable());
576 } else {
577 me2.schedule_write(buf, new_pos, &mut me);
578 }
579 }
580
581 impl Evented for TcpStream {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>582 fn register(&self, poll: &Poll, token: Token,
583 interest: Ready, opts: PollOpt) -> io::Result<()> {
584 let mut me = self.inner();
585 me.iocp.register_socket(&self.imp.inner.socket, poll, token,
586 interest, opts, &self.registration)?;
587
588 unsafe {
589 super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
590 me.instant_notify = true;
591 }
592
593 // If we were connected before being registered process that request
594 // here and go along our merry ways. Note that the callback for a
595 // successful connect will worry about generating writable/readable
596 // events and scheduling a new read.
597 if let Some(addr) = me.deferred_connect.take() {
598 return self.imp.schedule_connect(&addr).map(|_| ())
599 }
600 self.post_register(interest, &mut me);
601 Ok(())
602 }
603
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>604 fn reregister(&self, poll: &Poll, token: Token,
605 interest: Ready, opts: PollOpt) -> io::Result<()> {
606 let mut me = self.inner();
607 me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
608 interest, opts, &self.registration)?;
609 self.post_register(interest, &mut me);
610 Ok(())
611 }
612
deregister(&self, poll: &Poll) -> io::Result<()>613 fn deregister(&self, poll: &Poll) -> io::Result<()> {
614 self.inner().iocp.deregister(&self.imp.inner.socket,
615 poll, &self.registration)
616 }
617 }
618
619 impl fmt::Debug for TcpStream {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result620 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
621 f.debug_struct("TcpStream")
622 .finish()
623 }
624 }
625
626 impl Drop for TcpStream {
drop(&mut self)627 fn drop(&mut self) {
628 // If we're still internally reading, we're no longer interested. Note
629 // though that we don't cancel any writes which may have been issued to
630 // preserve the same semantics as Unix.
631 //
632 // Note that "Empty" here may mean that a connect is pending, so we
633 // cancel even if that happens as well.
634 unsafe {
635 match self.inner().read {
636 State::Pending(_) | State::Empty => {
637 trace!("cancelling active TCP read");
638 drop(super::cancel(&self.imp.inner.socket,
639 &self.imp.inner.read));
640 }
641 State::Ready(_) | State::Error(_) => {}
642 }
643 }
644 }
645 }
646
647 impl TcpListener {
new(socket: net::TcpListener) -> io::Result<TcpListener>648 pub fn new(socket: net::TcpListener)
649 -> io::Result<TcpListener> {
650 let addr = socket.local_addr()?;
651 Ok(TcpListener::new_family(socket, match addr {
652 SocketAddr::V4(..) => Family::V4,
653 SocketAddr::V6(..) => Family::V6,
654 }))
655 }
656
new_family(socket: net::TcpListener, family: Family) -> TcpListener657 fn new_family(socket: net::TcpListener, family: Family) -> TcpListener {
658 TcpListener {
659 registration: Mutex::new(None),
660 imp: ListenerImp {
661 inner: FromRawArc::new(ListenerIo {
662 accept: Overlapped::new(accept_done),
663 family: family,
664 socket: socket,
665 inner: Mutex::new(ListenerInner {
666 iocp: ReadyBinding::new(),
667 accept: State::Empty,
668 accept_buf: AcceptAddrsBuf::new(),
669 }),
670 }),
671 },
672 }
673 }
674
accept(&self) -> io::Result<(net::TcpStream, SocketAddr)>675 pub fn accept(&self) -> io::Result<(net::TcpStream, SocketAddr)> {
676 let mut me = self.inner();
677
678 let ret = match mem::replace(&mut me.accept, State::Empty) {
679 State::Empty => return Err(io::ErrorKind::WouldBlock.into()),
680 State::Pending(t) => {
681 me.accept = State::Pending(t);
682 return Err(io::ErrorKind::WouldBlock.into());
683 }
684 State::Ready((s, a)) => Ok((s, a)),
685 State::Error(e) => Err(e),
686 };
687
688 self.imp.schedule_accept(&mut me);
689
690 return ret
691 }
692
local_addr(&self) -> io::Result<SocketAddr>693 pub fn local_addr(&self) -> io::Result<SocketAddr> {
694 self.imp.inner.socket.local_addr()
695 }
696
try_clone(&self) -> io::Result<TcpListener>697 pub fn try_clone(&self) -> io::Result<TcpListener> {
698 self.imp.inner.socket.try_clone().map(|s| {
699 TcpListener::new_family(s, self.imp.inner.family)
700 })
701 }
702
703 #[allow(deprecated)]
set_only_v6(&self, only_v6: bool) -> io::Result<()>704 pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
705 self.imp.inner.socket.set_only_v6(only_v6)
706 }
707
708 #[allow(deprecated)]
only_v6(&self) -> io::Result<bool>709 pub fn only_v6(&self) -> io::Result<bool> {
710 self.imp.inner.socket.only_v6()
711 }
712
set_ttl(&self, ttl: u32) -> io::Result<()>713 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
714 self.imp.inner.socket.set_ttl(ttl)
715 }
716
ttl(&self) -> io::Result<u32>717 pub fn ttl(&self) -> io::Result<u32> {
718 self.imp.inner.socket.ttl()
719 }
720
take_error(&self) -> io::Result<Option<io::Error>>721 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
722 self.imp.inner.socket.take_error()
723 }
724
inner(&self) -> MutexGuard<ListenerInner>725 fn inner(&self) -> MutexGuard<ListenerInner> {
726 self.imp.inner()
727 }
728 }
729
730 impl ListenerImp {
inner(&self) -> MutexGuard<ListenerInner>731 fn inner(&self) -> MutexGuard<ListenerInner> {
732 self.inner.inner.lock().unwrap()
733 }
734
schedule_accept(&self, me: &mut ListenerInner)735 fn schedule_accept(&self, me: &mut ListenerInner) {
736 match me.accept {
737 State::Empty => {}
738 _ => return
739 }
740
741 me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
742
743 let res = match self.inner.family {
744 Family::V4 => TcpBuilder::new_v4(),
745 Family::V6 => TcpBuilder::new_v6(),
746 }
747 .and_then(|builder| builder.to_tcp_stream())
748 .and_then(|stream| unsafe {
749 trace!("scheduling an accept");
750 self.inner
751 .socket
752 .accept_overlapped(&stream, &mut me.accept_buf, self.inner.accept.as_mut_ptr())
753 .map(|x| (stream, x))
754 });
755 match res {
756 Ok((socket, _)) => {
757 // see docs above on StreamImp.inner for rationale on forget
758 me.accept = State::Pending(socket);
759 mem::forget(self.clone());
760 }
761 Err(e) => {
762 me.accept = State::Error(e);
763 self.add_readiness(me, Ready::readable());
764 }
765 }
766 }
767
768 // See comments in StreamImp::push
add_readiness(&self, me: &mut ListenerInner, set: Ready)769 fn add_readiness(&self, me: &mut ListenerInner, set: Ready) {
770 me.iocp.set_readiness(set | me.iocp.readiness());
771 }
772 }
773
accept_done(status: &OVERLAPPED_ENTRY)774 fn accept_done(status: &OVERLAPPED_ENTRY) {
775 let status = CompletionStatus::from_entry(status);
776 let me2 = ListenerImp {
777 inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) },
778 };
779
780 let mut me = me2.inner();
781 let socket = match mem::replace(&mut me.accept, State::Empty) {
782 State::Pending(s) => s,
783 _ => unreachable!(),
784 };
785 trace!("finished an accept");
786 let result = me2.inner.socket.accept_complete(&socket).and_then(|()| {
787 me.accept_buf.parse(&me2.inner.socket)
788 }).and_then(|buf| {
789 buf.remote().ok_or_else(|| {
790 io::Error::new(ErrorKind::Other, "could not obtain remote address")
791 })
792 });
793 me.accept = match result {
794 Ok(remote_addr) => State::Ready((socket, remote_addr)),
795 Err(e) => State::Error(e),
796 };
797 me2.add_readiness(&mut me, Ready::readable());
798 }
799
800 impl Evented for TcpListener {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>801 fn register(&self, poll: &Poll, token: Token,
802 interest: Ready, opts: PollOpt) -> io::Result<()> {
803 let mut me = self.inner();
804 me.iocp.register_socket(&self.imp.inner.socket, poll, token,
805 interest, opts, &self.registration)?;
806
807 unsafe {
808 super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
809 }
810
811 self.imp.schedule_accept(&mut me);
812 Ok(())
813 }
814
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>815 fn reregister(&self, poll: &Poll, token: Token,
816 interest: Ready, opts: PollOpt) -> io::Result<()> {
817 let mut me = self.inner();
818 me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
819 interest, opts, &self.registration)?;
820 self.imp.schedule_accept(&mut me);
821 Ok(())
822 }
823
deregister(&self, poll: &Poll) -> io::Result<()>824 fn deregister(&self, poll: &Poll) -> io::Result<()> {
825 self.inner().iocp.deregister(&self.imp.inner.socket,
826 poll, &self.registration)
827 }
828 }
829
830 impl fmt::Debug for TcpListener {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result831 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
832 f.debug_struct("TcpListener")
833 .finish()
834 }
835 }
836
837 impl Drop for TcpListener {
drop(&mut self)838 fn drop(&mut self) {
839 // If we're still internally reading, we're no longer interested.
840 unsafe {
841 match self.inner().accept {
842 State::Pending(_) => {
843 trace!("cancelling active TCP accept");
844 drop(super::cancel(&self.imp.inner.socket,
845 &self.imp.inner.accept));
846 }
847 State::Empty |
848 State::Ready(_) |
849 State::Error(_) => {}
850 }
851 }
852 }
853 }
854