1 use std::fmt;
2 use std::io::{self, Read, ErrorKind};
3 use std::mem;
4 use std::net::{self, SocketAddr, Shutdown};
5 use std::os::windows::prelude::*;
6 use std::sync::{Mutex, MutexGuard};
7 use std::time::Duration;
8
9 use miow::iocp::CompletionStatus;
10 use miow::net::*;
11 use net2::{TcpBuilder, TcpStreamExt as Net2TcpExt};
12 use winapi::*;
13 use iovec::IoVec;
14
15 use {poll, Ready, Poll, PollOpt, Token};
16 use event::Evented;
17 use sys::windows::from_raw_arc::FromRawArc;
18 use sys::windows::selector::{Overlapped, ReadyBinding};
19 use sys::windows::Family;
20
21 pub struct TcpStream {
22 /// Separately stored implementation to ensure that the `Drop`
23 /// implementation on this type is only executed when it's actually dropped
24 /// (many clones of this `imp` are made).
25 imp: StreamImp,
26 registration: Mutex<Option<poll::Registration>>,
27 }
28
29 pub struct TcpListener {
30 imp: ListenerImp,
31 registration: Mutex<Option<poll::Registration>>,
32 }
33
34 #[derive(Clone)]
35 struct StreamImp {
36 /// A stable address and synchronized access for all internals. This serves
37 /// to ensure that all `Overlapped` pointers are valid for a long period of
38 /// time as well as allowing completion callbacks to have access to the
39 /// internals without having ownership.
40 ///
41 /// Note that the reference count also allows us "loan out" copies to
42 /// completion ports while I/O is running to guarantee that this stays alive
43 /// until the I/O completes. You'll notice a number of calls to
44 /// `mem::forget` below, and these only happen on successful scheduling of
45 /// I/O and are paired with `overlapped2arc!` macro invocations in the
46 /// completion callbacks (to have a decrement match the increment).
47 inner: FromRawArc<StreamIo>,
48 }
49
50 #[derive(Clone)]
51 struct ListenerImp {
52 inner: FromRawArc<ListenerIo>,
53 }
54
55 struct StreamIo {
56 inner: Mutex<StreamInner>,
57 read: Overlapped, // also used for connect
58 write: Overlapped,
59 socket: net::TcpStream,
60 }
61
62 struct ListenerIo {
63 inner: Mutex<ListenerInner>,
64 accept: Overlapped,
65 family: Family,
66 socket: net::TcpListener,
67 }
68
69 struct StreamInner {
70 iocp: ReadyBinding,
71 deferred_connect: Option<SocketAddr>,
72 read: State<(), ()>,
73 write: State<(Vec<u8>, usize), (Vec<u8>, usize)>,
74 /// whether we are instantly notified of success
75 /// (FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
76 /// without a roundtrip through the event loop)
77 instant_notify: bool,
78 }
79
80 struct ListenerInner {
81 iocp: ReadyBinding,
82 accept: State<net::TcpStream, (net::TcpStream, SocketAddr)>,
83 accept_buf: AcceptAddrsBuf,
84 instant_notify: bool,
85 }
86
87 enum State<T, U> {
88 Empty, // no I/O operation in progress
89 Pending(T), // an I/O operation is in progress
90 Ready(U), // I/O has finished with this value
91 Error(io::Error), // there was an I/O error
92 }
93
94 impl TcpStream {
new(socket: net::TcpStream, deferred_connect: Option<SocketAddr>) -> TcpStream95 fn new(socket: net::TcpStream,
96 deferred_connect: Option<SocketAddr>) -> TcpStream {
97 TcpStream {
98 registration: Mutex::new(None),
99 imp: StreamImp {
100 inner: FromRawArc::new(StreamIo {
101 read: Overlapped::new(read_done),
102 write: Overlapped::new(write_done),
103 socket: socket,
104 inner: Mutex::new(StreamInner {
105 iocp: ReadyBinding::new(),
106 deferred_connect: deferred_connect,
107 read: State::Empty,
108 write: State::Empty,
109 instant_notify: false,
110 }),
111 }),
112 },
113 }
114 }
115
connect(socket: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream>116 pub fn connect(socket: net::TcpStream, addr: &SocketAddr)
117 -> io::Result<TcpStream> {
118 socket.set_nonblocking(true)?;
119 Ok(TcpStream::new(socket, Some(*addr)))
120 }
121
from_stream(stream: net::TcpStream) -> TcpStream122 pub fn from_stream(stream: net::TcpStream) -> TcpStream {
123 TcpStream::new(stream, None)
124 }
125
peer_addr(&self) -> io::Result<SocketAddr>126 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
127 self.imp.inner.socket.peer_addr()
128 }
129
local_addr(&self) -> io::Result<SocketAddr>130 pub fn local_addr(&self) -> io::Result<SocketAddr> {
131 self.imp.inner.socket.local_addr()
132 }
133
try_clone(&self) -> io::Result<TcpStream>134 pub fn try_clone(&self) -> io::Result<TcpStream> {
135 self.imp.inner.socket.try_clone().map(|s| TcpStream::new(s, None))
136 }
137
shutdown(&self, how: Shutdown) -> io::Result<()>138 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
139 self.imp.inner.socket.shutdown(how)
140 }
141
set_nodelay(&self, nodelay: bool) -> io::Result<()>142 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
143 self.imp.inner.socket.set_nodelay(nodelay)
144 }
145
nodelay(&self) -> io::Result<bool>146 pub fn nodelay(&self) -> io::Result<bool> {
147 self.imp.inner.socket.nodelay()
148 }
149
set_recv_buffer_size(&self, size: usize) -> io::Result<()>150 pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
151 self.imp.inner.socket.set_recv_buffer_size(size)
152 }
153
recv_buffer_size(&self) -> io::Result<usize>154 pub fn recv_buffer_size(&self) -> io::Result<usize> {
155 self.imp.inner.socket.recv_buffer_size()
156 }
157
set_send_buffer_size(&self, size: usize) -> io::Result<()>158 pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
159 self.imp.inner.socket.set_send_buffer_size(size)
160 }
161
send_buffer_size(&self) -> io::Result<usize>162 pub fn send_buffer_size(&self) -> io::Result<usize> {
163 self.imp.inner.socket.send_buffer_size()
164 }
165
set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()>166 pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
167 self.imp.inner.socket.set_keepalive(keepalive)
168 }
169
keepalive(&self) -> io::Result<Option<Duration>>170 pub fn keepalive(&self) -> io::Result<Option<Duration>> {
171 self.imp.inner.socket.keepalive()
172 }
173
set_ttl(&self, ttl: u32) -> io::Result<()>174 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
175 self.imp.inner.socket.set_ttl(ttl)
176 }
177
ttl(&self) -> io::Result<u32>178 pub fn ttl(&self) -> io::Result<u32> {
179 self.imp.inner.socket.ttl()
180 }
181
set_only_v6(&self, only_v6: bool) -> io::Result<()>182 pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
183 self.imp.inner.socket.set_only_v6(only_v6)
184 }
185
only_v6(&self) -> io::Result<bool>186 pub fn only_v6(&self) -> io::Result<bool> {
187 self.imp.inner.socket.only_v6()
188 }
189
set_linger(&self, dur: Option<Duration>) -> io::Result<()>190 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
191 self.imp.inner.socket.set_linger(dur)
192 }
193
linger(&self) -> io::Result<Option<Duration>>194 pub fn linger(&self) -> io::Result<Option<Duration>> {
195 self.imp.inner.socket.linger()
196 }
197
take_error(&self) -> io::Result<Option<io::Error>>198 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
199 if let Some(e) = self.imp.inner.socket.take_error()? {
200 return Ok(Some(e))
201 }
202
203 // If the syscall didn't return anything then also check to see if we've
204 // squirreled away an error elsewhere for example as part of a connect
205 // operation.
206 //
207 // Typically this is used like so:
208 //
209 // 1. A `connect` is issued
210 // 2. Wait for the socket to be writable
211 // 3. Call `take_error` to see if the connect succeeded.
212 //
213 // Right now the `connect` operation finishes in `read_done` below and
214 // fill will in `State::Error` in the `read` slot if it fails, so we
215 // extract that here.
216 let mut me = self.inner();
217 match mem::replace(&mut me.read, State::Empty) {
218 State::Error(e) => {
219 self.imp.schedule_read(&mut me);
220 Ok(Some(e))
221 }
222 other => {
223 me.read = other;
224 Ok(None)
225 }
226 }
227 }
228
inner(&self) -> MutexGuard<StreamInner>229 fn inner(&self) -> MutexGuard<StreamInner> {
230 self.imp.inner()
231 }
232
before_read(&self) -> io::Result<MutexGuard<StreamInner>>233 fn before_read(&self) -> io::Result<MutexGuard<StreamInner>> {
234 let mut me = self.inner();
235
236 match me.read {
237 // Empty == we're not associated yet, and if we're pending then
238 // these are both cases where we return "would block"
239 State::Empty |
240 State::Pending(()) => return Err(io::ErrorKind::WouldBlock.into()),
241
242 // If we got a delayed error as part of a `read_overlapped` below,
243 // return that here. Also schedule another read in case it was
244 // transient.
245 State::Error(_) => {
246 let e = match mem::replace(&mut me.read, State::Empty) {
247 State::Error(e) => e,
248 _ => panic!(),
249 };
250 self.imp.schedule_read(&mut me);
251 return Err(e)
252 }
253
254 // If we're ready for a read then some previous 0-byte read has
255 // completed. In that case the OS's socket buffer has something for
256 // us, so we just keep pulling out bytes while we can in the loop
257 // below.
258 State::Ready(()) => {}
259 }
260
261 Ok(me)
262 }
263
post_register(&self, interest: Ready, me: &mut StreamInner)264 fn post_register(&self, interest: Ready, me: &mut StreamInner) {
265 if interest.is_readable() {
266 self.imp.schedule_read(me);
267 }
268
269 // At least with epoll, if a socket is registered with an interest in
270 // writing and it's immediately writable then a writable event is
271 // generated immediately, so do so here.
272 if interest.is_writable() {
273 if let State::Empty = me.write {
274 self.imp.add_readiness(me, Ready::writable());
275 }
276 }
277 }
278
read(&self, buf: &mut [u8]) -> io::Result<usize>279 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
280 match IoVec::from_bytes_mut(buf) {
281 Some(vec) => self.readv(&mut [vec]),
282 None => Ok(0),
283 }
284 }
285
peek(&self, buf: &mut [u8]) -> io::Result<usize>286 pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
287 let mut me = self.before_read()?;
288
289 match (&self.imp.inner.socket).peek(buf) {
290 Ok(n) => Ok(n),
291 Err(e) => {
292 me.read = State::Empty;
293 self.imp.schedule_read(&mut me);
294 Err(e)
295 }
296 }
297 }
298
readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize>299 pub fn readv(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
300 let mut me = self.before_read()?;
301
302 // TODO: Does WSARecv work on a nonblocking sockets? We ideally want to
303 // call that instead of looping over all the buffers and calling
304 // `recv` on each buffer. I'm not sure though if an overlapped
305 // socket in nonblocking mode would work with that use case,
306 // however, so for now we just call `recv`.
307
308 let mut amt = 0;
309 for buf in bufs {
310 match (&self.imp.inner.socket).read(buf) {
311 // If we did a partial read, then return what we've read so far
312 Ok(n) if n < buf.len() => return Ok(amt + n),
313
314 // Otherwise filled this buffer entirely, so try to fill the
315 // next one as well.
316 Ok(n) => amt += n,
317
318 // If we hit an error then things get tricky if we've already
319 // read some data. If the error is "would block" then we just
320 // return the data we've read so far while scheduling another
321 // 0-byte read.
322 //
323 // If we've read data and the error kind is not "would block",
324 // then we stash away the error to get returned later and return
325 // the data that we've read.
326 //
327 // Finally if we haven't actually read any data we just
328 // reschedule a 0-byte read to happen again and then return the
329 // error upwards.
330 Err(e) => {
331 if amt > 0 && e.kind() == io::ErrorKind::WouldBlock {
332 me.read = State::Empty;
333 self.imp.schedule_read(&mut me);
334 return Ok(amt)
335 } else if amt > 0 {
336 me.read = State::Error(e);
337 return Ok(amt)
338 } else {
339 me.read = State::Empty;
340 self.imp.schedule_read(&mut me);
341 return Err(e)
342 }
343 }
344 }
345 }
346
347 Ok(amt)
348 }
349
write(&self, buf: &[u8]) -> io::Result<usize>350 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
351 match IoVec::from_bytes(buf) {
352 Some(vec) => self.writev(&[vec]),
353 None => Ok(0),
354 }
355 }
356
writev(&self, bufs: &[&IoVec]) -> io::Result<usize>357 pub fn writev(&self, bufs: &[&IoVec]) -> io::Result<usize> {
358 let mut me = self.inner();
359 let me = &mut *me;
360
361 match mem::replace(&mut me.write, State::Empty) {
362 State::Empty => {}
363 State::Error(e) => return Err(e),
364 other => {
365 me.write = other;
366 return Err(io::ErrorKind::WouldBlock.into())
367 }
368 }
369
370 if !me.iocp.registered() {
371 return Err(io::ErrorKind::WouldBlock.into())
372 }
373
374 if bufs.is_empty() {
375 return Ok(0)
376 }
377
378 let len = bufs.iter().map(|b| b.len()).fold(0, |a, b| a + b);
379 let mut intermediate = me.iocp.get_buffer(len);
380 for buf in bufs {
381 intermediate.extend_from_slice(buf);
382 }
383 self.imp.schedule_write(intermediate, 0, me);
384 Ok(len)
385 }
386
flush(&self) -> io::Result<()>387 pub fn flush(&self) -> io::Result<()> {
388 Ok(())
389 }
390 }
391
392 impl StreamImp {
inner(&self) -> MutexGuard<StreamInner>393 fn inner(&self) -> MutexGuard<StreamInner> {
394 self.inner.inner.lock().unwrap()
395 }
396
schedule_connect(&self, addr: &SocketAddr) -> io::Result<()>397 fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> {
398 unsafe {
399 trace!("scheduling a connect");
400 self.inner.socket.connect_overlapped(addr, &[], self.inner.read.as_mut_ptr())?;
401 }
402 // see docs above on StreamImp.inner for rationale on forget
403 mem::forget(self.clone());
404 Ok(())
405 }
406
407 /// Schedule a read to happen on this socket, enqueuing us to receive a
408 /// notification when a read is ready.
409 ///
410 /// Note that this does *not* work with a buffer. When reading a TCP stream
411 /// we actually read into a 0-byte buffer so Windows will send us a
412 /// notification when the socket is otherwise ready for reading. This allows
413 /// us to avoid buffer allocations for in-flight reads.
schedule_read(&self, me: &mut StreamInner)414 fn schedule_read(&self, me: &mut StreamInner) {
415 match me.read {
416 State::Empty => {}
417 State::Ready(_) | State::Error(_) => {
418 self.add_readiness(me, Ready::readable());
419 return;
420 }
421 _ => return,
422 }
423
424 me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
425
426 trace!("scheduling a read");
427 let res = unsafe {
428 self.inner.socket.read_overlapped(&mut [], self.inner.read.as_mut_ptr())
429 };
430 match res {
431 // Note that `Ok(true)` means that this completed immediately and
432 // our socket is readable. This typically means that the caller of
433 // this function (likely `read` above) can try again as an
434 // optimization and return bytes quickly.
435 //
436 // Normally, though, although the read completed immediately
437 // there's still an IOCP completion packet enqueued that we're going
438 // to receive.
439 //
440 // You can configure this behavior (miow) with
441 // SetFileCompletionNotificationModes to indicate that `Ok(true)`
442 // does **not** enqueue a completion packet. (This is the case
443 // for me.instant_notify)
444 //
445 // Note that apparently libuv has scary code to work around bugs in
446 // `WSARecv` for UDP sockets apparently for handles which have had
447 // the `SetFileCompletionNotificationModes` function called on them,
448 // worth looking into!
449 Ok(Some(_)) if me.instant_notify => {
450 me.read = State::Ready(());
451 self.add_readiness(me, Ready::readable());
452 }
453 Ok(_) => {
454 // see docs above on StreamImp.inner for rationale on forget
455 me.read = State::Pending(());
456 mem::forget(self.clone());
457 }
458 Err(e) => {
459 me.read = State::Error(e);
460 self.add_readiness(me, Ready::readable());
461 }
462 }
463 }
464
465 /// Similar to `schedule_read`, except that this issues, well, writes.
466 ///
467 /// This function will continually attempt to write the entire contents of
468 /// the buffer `buf` until they have all been written. The `pos` argument is
469 /// the current offset within the buffer up to which the contents have
470 /// already been written.
471 ///
472 /// A new writable event (e.g. allowing another write) will only happen once
473 /// the buffer has been written completely (or hit an error).
schedule_write(&self, buf: Vec<u8>, mut pos: usize, me: &mut StreamInner)474 fn schedule_write(&self,
475 buf: Vec<u8>,
476 mut pos: usize,
477 me: &mut StreamInner) {
478
479 // About to write, clear any pending level triggered events
480 me.iocp.set_readiness(me.iocp.readiness() - Ready::writable());
481
482 loop {
483 trace!("scheduling a write of {} bytes", buf[pos..].len());
484 let ret = unsafe {
485 self.inner.socket.write_overlapped(&buf[pos..], self.inner.write.as_mut_ptr())
486 };
487 match ret {
488 Ok(Some(transferred_bytes)) if me.instant_notify => {
489 trace!("done immediately with {} bytes", transferred_bytes);
490 if transferred_bytes == buf.len() - pos {
491 self.add_readiness(me, Ready::writable());
492 me.write = State::Empty;
493 break;
494 }
495 pos += transferred_bytes;
496 }
497 Ok(_) => {
498 trace!("scheduled for later");
499 // see docs above on StreamImp.inner for rationale on forget
500 me.write = State::Pending((buf, pos));
501 mem::forget(self.clone());
502 break;
503 }
504 Err(e) => {
505 trace!("write error: {}", e);
506 me.write = State::Error(e);
507 self.add_readiness(me, Ready::writable());
508 me.iocp.put_buffer(buf);
509 break;
510 }
511 }
512 }
513 }
514
515 /// Pushes an event for this socket onto the selector its registered for.
516 ///
517 /// When an event is generated on this socket, if it happened after the
518 /// socket was closed then we don't want to actually push the event onto our
519 /// selector as otherwise it's just a spurious notification.
add_readiness(&self, me: &mut StreamInner, set: Ready)520 fn add_readiness(&self, me: &mut StreamInner, set: Ready) {
521 me.iocp.set_readiness(set | me.iocp.readiness());
522 }
523 }
524
read_done(status: &OVERLAPPED_ENTRY)525 fn read_done(status: &OVERLAPPED_ENTRY) {
526 let status = CompletionStatus::from_entry(status);
527 let me2 = StreamImp {
528 inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, read) },
529 };
530
531 let mut me = me2.inner();
532 match mem::replace(&mut me.read, State::Empty) {
533 State::Pending(()) => {
534 trace!("finished a read: {}", status.bytes_transferred());
535 assert_eq!(status.bytes_transferred(), 0);
536 me.read = State::Ready(());
537 return me2.add_readiness(&mut me, Ready::readable())
538 }
539 s => me.read = s,
540 }
541
542 // If a read didn't complete, then the connect must have just finished.
543 trace!("finished a connect");
544
545 // By guarding with socket.result(), we ensure that a connection
546 // was successfully made before performing operations requiring a
547 // connected socket.
548 match unsafe { me2.inner.socket.result(status.overlapped()) }
549 .and_then(|_| me2.inner.socket.connect_complete())
550 {
551 Ok(()) => {
552 me2.add_readiness(&mut me, Ready::writable());
553 me2.schedule_read(&mut me);
554 }
555 Err(e) => {
556 me2.add_readiness(&mut me, Ready::readable() | Ready::writable());
557 me.read = State::Error(e);
558 }
559 }
560 }
561
write_done(status: &OVERLAPPED_ENTRY)562 fn write_done(status: &OVERLAPPED_ENTRY) {
563 let status = CompletionStatus::from_entry(status);
564 trace!("finished a write {}", status.bytes_transferred());
565 let me2 = StreamImp {
566 inner: unsafe { overlapped2arc!(status.overlapped(), StreamIo, write) },
567 };
568 let mut me = me2.inner();
569 let (buf, pos) = match mem::replace(&mut me.write, State::Empty) {
570 State::Pending(pair) => pair,
571 _ => unreachable!(),
572 };
573 let new_pos = pos + (status.bytes_transferred() as usize);
574 if new_pos == buf.len() {
575 me2.add_readiness(&mut me, Ready::writable());
576 } else {
577 me2.schedule_write(buf, new_pos, &mut me);
578 }
579 }
580
581 impl Evented for TcpStream {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>582 fn register(&self, poll: &Poll, token: Token,
583 interest: Ready, opts: PollOpt) -> io::Result<()> {
584 let mut me = self.inner();
585 me.iocp.register_socket(&self.imp.inner.socket, poll, token,
586 interest, opts, &self.registration)?;
587
588 unsafe {
589 super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
590 me.instant_notify = true;
591 }
592
593 // If we were connected before being registered process that request
594 // here and go along our merry ways. Note that the callback for a
595 // successful connect will worry about generating writable/readable
596 // events and scheduling a new read.
597 if let Some(addr) = me.deferred_connect.take() {
598 return self.imp.schedule_connect(&addr).map(|_| ())
599 }
600 self.post_register(interest, &mut me);
601 Ok(())
602 }
603
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>604 fn reregister(&self, poll: &Poll, token: Token,
605 interest: Ready, opts: PollOpt) -> io::Result<()> {
606 let mut me = self.inner();
607 me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
608 interest, opts, &self.registration)?;
609 self.post_register(interest, &mut me);
610 Ok(())
611 }
612
deregister(&self, poll: &Poll) -> io::Result<()>613 fn deregister(&self, poll: &Poll) -> io::Result<()> {
614 self.inner().iocp.deregister(&self.imp.inner.socket,
615 poll, &self.registration)
616 }
617 }
618
619 impl fmt::Debug for TcpStream {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result620 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
621 f.debug_struct("TcpStream")
622 .finish()
623 }
624 }
625
626 impl Drop for TcpStream {
drop(&mut self)627 fn drop(&mut self) {
628 // If we're still internally reading, we're no longer interested. Note
629 // though that we don't cancel any writes which may have been issued to
630 // preserve the same semantics as Unix.
631 //
632 // Note that "Empty" here may mean that a connect is pending, so we
633 // cancel even if that happens as well.
634 unsafe {
635 match self.inner().read {
636 State::Pending(_) | State::Empty => {
637 trace!("cancelling active TCP read");
638 drop(super::cancel(&self.imp.inner.socket,
639 &self.imp.inner.read));
640 }
641 State::Ready(_) | State::Error(_) => {}
642 }
643 }
644 }
645 }
646
647 impl TcpListener {
new(socket: net::TcpListener) -> io::Result<TcpListener>648 pub fn new(socket: net::TcpListener)
649 -> io::Result<TcpListener> {
650 let addr = socket.local_addr()?;
651 Ok(TcpListener::new_family(socket, match addr {
652 SocketAddr::V4(..) => Family::V4,
653 SocketAddr::V6(..) => Family::V6,
654 }))
655 }
656
new_family(socket: net::TcpListener, family: Family) -> TcpListener657 fn new_family(socket: net::TcpListener, family: Family) -> TcpListener {
658 TcpListener {
659 registration: Mutex::new(None),
660 imp: ListenerImp {
661 inner: FromRawArc::new(ListenerIo {
662 accept: Overlapped::new(accept_done),
663 family: family,
664 socket: socket,
665 inner: Mutex::new(ListenerInner {
666 iocp: ReadyBinding::new(),
667 accept: State::Empty,
668 accept_buf: AcceptAddrsBuf::new(),
669 instant_notify: false,
670 }),
671 }),
672 },
673 }
674 }
675
accept(&self) -> io::Result<(net::TcpStream, SocketAddr)>676 pub fn accept(&self) -> io::Result<(net::TcpStream, SocketAddr)> {
677 let mut me = self.inner();
678
679 let ret = match mem::replace(&mut me.accept, State::Empty) {
680 State::Empty => return Err(io::ErrorKind::WouldBlock.into()),
681 State::Pending(t) => {
682 me.accept = State::Pending(t);
683 return Err(io::ErrorKind::WouldBlock.into());
684 }
685 State::Ready((s, a)) => Ok((s, a)),
686 State::Error(e) => Err(e),
687 };
688
689 self.imp.schedule_accept(&mut me);
690
691 return ret
692 }
693
local_addr(&self) -> io::Result<SocketAddr>694 pub fn local_addr(&self) -> io::Result<SocketAddr> {
695 self.imp.inner.socket.local_addr()
696 }
697
try_clone(&self) -> io::Result<TcpListener>698 pub fn try_clone(&self) -> io::Result<TcpListener> {
699 self.imp.inner.socket.try_clone().map(|s| {
700 TcpListener::new_family(s, self.imp.inner.family)
701 })
702 }
703
704 #[allow(deprecated)]
set_only_v6(&self, only_v6: bool) -> io::Result<()>705 pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
706 self.imp.inner.socket.set_only_v6(only_v6)
707 }
708
709 #[allow(deprecated)]
only_v6(&self) -> io::Result<bool>710 pub fn only_v6(&self) -> io::Result<bool> {
711 self.imp.inner.socket.only_v6()
712 }
713
set_ttl(&self, ttl: u32) -> io::Result<()>714 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
715 self.imp.inner.socket.set_ttl(ttl)
716 }
717
ttl(&self) -> io::Result<u32>718 pub fn ttl(&self) -> io::Result<u32> {
719 self.imp.inner.socket.ttl()
720 }
721
take_error(&self) -> io::Result<Option<io::Error>>722 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
723 self.imp.inner.socket.take_error()
724 }
725
inner(&self) -> MutexGuard<ListenerInner>726 fn inner(&self) -> MutexGuard<ListenerInner> {
727 self.imp.inner()
728 }
729 }
730
731 impl ListenerImp {
inner(&self) -> MutexGuard<ListenerInner>732 fn inner(&self) -> MutexGuard<ListenerInner> {
733 self.inner.inner.lock().unwrap()
734 }
735
schedule_accept(&self, me: &mut ListenerInner)736 fn schedule_accept(&self, me: &mut ListenerInner) {
737 match me.accept {
738 State::Empty => {}
739 _ => return
740 }
741
742 me.iocp.set_readiness(me.iocp.readiness() - Ready::readable());
743
744 let res = match self.inner.family {
745 Family::V4 => TcpBuilder::new_v4(),
746 Family::V6 => TcpBuilder::new_v6(),
747 }.and_then(|builder| unsafe {
748 trace!("scheduling an accept");
749 self.inner.socket.accept_overlapped(&builder, &mut me.accept_buf,
750 self.inner.accept.as_mut_ptr())
751 });
752 match res {
753 Ok((socket, _)) => {
754 // see docs above on StreamImp.inner for rationale on forget
755 me.accept = State::Pending(socket);
756 mem::forget(self.clone());
757 }
758 Err(e) => {
759 me.accept = State::Error(e);
760 self.add_readiness(me, Ready::readable());
761 }
762 }
763 }
764
765 // See comments in StreamImp::push
add_readiness(&self, me: &mut ListenerInner, set: Ready)766 fn add_readiness(&self, me: &mut ListenerInner, set: Ready) {
767 me.iocp.set_readiness(set | me.iocp.readiness());
768 }
769 }
770
accept_done(status: &OVERLAPPED_ENTRY)771 fn accept_done(status: &OVERLAPPED_ENTRY) {
772 let status = CompletionStatus::from_entry(status);
773 let me2 = ListenerImp {
774 inner: unsafe { overlapped2arc!(status.overlapped(), ListenerIo, accept) },
775 };
776
777 let mut me = me2.inner();
778 let socket = match mem::replace(&mut me.accept, State::Empty) {
779 State::Pending(s) => s,
780 _ => unreachable!(),
781 };
782 trace!("finished an accept");
783 let result = me2.inner.socket.accept_complete(&socket).and_then(|()| {
784 me.accept_buf.parse(&me2.inner.socket)
785 }).and_then(|buf| {
786 buf.remote().ok_or_else(|| {
787 io::Error::new(ErrorKind::Other, "could not obtain remote address")
788 })
789 });
790 me.accept = match result {
791 Ok(remote_addr) => State::Ready((socket, remote_addr)),
792 Err(e) => State::Error(e),
793 };
794 me2.add_readiness(&mut me, Ready::readable());
795 }
796
797 impl Evented for TcpListener {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>798 fn register(&self, poll: &Poll, token: Token,
799 interest: Ready, opts: PollOpt) -> io::Result<()> {
800 let mut me = self.inner();
801 me.iocp.register_socket(&self.imp.inner.socket, poll, token,
802 interest, opts, &self.registration)?;
803
804 unsafe {
805 super::no_notify_on_instant_completion(self.imp.inner.socket.as_raw_socket() as HANDLE)?;
806 me.instant_notify = true;
807 }
808
809 self.imp.schedule_accept(&mut me);
810 Ok(())
811 }
812
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>813 fn reregister(&self, poll: &Poll, token: Token,
814 interest: Ready, opts: PollOpt) -> io::Result<()> {
815 let mut me = self.inner();
816 me.iocp.reregister_socket(&self.imp.inner.socket, poll, token,
817 interest, opts, &self.registration)?;
818 self.imp.schedule_accept(&mut me);
819 Ok(())
820 }
821
deregister(&self, poll: &Poll) -> io::Result<()>822 fn deregister(&self, poll: &Poll) -> io::Result<()> {
823 self.inner().iocp.deregister(&self.imp.inner.socket,
824 poll, &self.registration)
825 }
826 }
827
828 impl fmt::Debug for TcpListener {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result829 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
830 f.debug_struct("TcpListener")
831 .finish()
832 }
833 }
834
835 impl Drop for TcpListener {
drop(&mut self)836 fn drop(&mut self) {
837 // If we're still internally reading, we're no longer interested.
838 unsafe {
839 match self.inner().accept {
840 State::Pending(_) => {
841 trace!("cancelling active TCP accept");
842 drop(super::cancel(&self.imp.inner.socket,
843 &self.imp.inner.accept));
844 }
845 State::Empty |
846 State::Ready(_) |
847 State::Error(_) => {}
848 }
849 }
850 }
851 }
852