1 use std::cmp;
2 use std::io::prelude::*;
3 use std::io;
4 use std::net;
5 use std::sync::mpsc::channel;
6 use std::thread;
7 use std::time::Duration;
8 
9 use net2::{self, TcpStreamExt};
10 
11 use {TryRead, TryWrite};
12 use mio::{Token, Ready, PollOpt, Poll, Events};
13 use iovec::IoVec;
14 use mio::net::{TcpListener, TcpStream};
15 
16 #[test]
accept()17 fn accept() {
18     struct H { hit: bool, listener: TcpListener, shutdown: bool }
19 
20     let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
21     let addr = l.local_addr().unwrap();
22 
23     let t = thread::spawn(move || {
24         net::TcpStream::connect(&addr).unwrap();
25     });
26 
27     let poll = Poll::new().unwrap();
28 
29     poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
30 
31     let mut events = Events::with_capacity(128);
32 
33     let mut h = H { hit: false, listener: l, shutdown: false };
34     while !h.shutdown {
35         poll.poll(&mut events, None).unwrap();
36 
37         for event in &events {
38             h.hit = true;
39             assert_eq!(event.token(), Token(1));
40             assert!(event.readiness().is_readable());
41             assert!(h.listener.accept().is_ok());
42             h.shutdown = true;
43         }
44     }
45     assert!(h.hit);
46     assert!(h.listener.accept().unwrap_err().kind() == io::ErrorKind::WouldBlock);
47     t.join().unwrap();
48 }
49 
50 #[test]
connect()51 fn connect() {
52     struct H { hit: u32, shutdown: bool }
53 
54     let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
55     let addr = l.local_addr().unwrap();
56 
57     let (tx, rx) = channel();
58     let (tx2, rx2) = channel();
59     let t = thread::spawn(move || {
60         let s = l.accept().unwrap();
61         rx.recv().unwrap();
62         drop(s);
63         tx2.send(()).unwrap();
64     });
65 
66     let poll = Poll::new().unwrap();
67     let s = TcpStream::connect(&addr).unwrap();
68 
69     poll.register(&s, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
70 
71     let mut events = Events::with_capacity(128);
72 
73     let mut h = H { hit: 0, shutdown: false };
74     while !h.shutdown {
75         poll.poll(&mut events, None).unwrap();
76 
77         for event in &events {
78             assert_eq!(event.token(), Token(1));
79             match h.hit {
80                 0 => assert!(event.readiness().is_writable()),
81                 1 => assert!(event.readiness().is_readable()),
82                 _ => panic!(),
83             }
84             h.hit += 1;
85             h.shutdown = true;
86         }
87     }
88     assert_eq!(h.hit, 1);
89     tx.send(()).unwrap();
90     rx2.recv().unwrap();
91     h.shutdown = false;
92     while !h.shutdown {
93         poll.poll(&mut events, None).unwrap();
94 
95         for event in &events {
96             assert_eq!(event.token(), Token(1));
97             match h.hit {
98                 0 => assert!(event.readiness().is_writable()),
99                 1 => assert!(event.readiness().is_readable()),
100                 _ => panic!(),
101             }
102             h.hit += 1;
103             h.shutdown = true;
104         }
105     }
106     assert_eq!(h.hit, 2);
107     t.join().unwrap();
108 }
109 
110 #[test]
read()111 fn read() {
112     const N: usize = 16 * 1024 * 1024;
113     struct H { amt: usize, socket: TcpStream, shutdown: bool }
114 
115     let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
116     let addr = l.local_addr().unwrap();
117 
118     let t = thread::spawn(move || {
119         let mut s = l.accept().unwrap().0;
120         let b = [0; 1024];
121         let mut amt = 0;
122         while amt < N {
123             amt += s.write(&b).unwrap();
124         }
125     });
126 
127     let poll = Poll::new().unwrap();
128     let s = TcpStream::connect(&addr).unwrap();
129 
130     poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
131 
132     let mut events = Events::with_capacity(128);
133 
134     let mut h = H { amt: 0, socket: s, shutdown: false };
135     while !h.shutdown {
136         poll.poll(&mut events, None).unwrap();
137 
138         for event in &events {
139             assert_eq!(event.token(), Token(1));
140             let mut b = [0; 1024];
141             loop {
142                 if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
143                     h.amt += amt;
144                 } else {
145                     break
146                 }
147                 if h.amt >= N {
148                     h.shutdown = true;
149                     break
150                 }
151             }
152         }
153     }
154     t.join().unwrap();
155 }
156 
157 #[test]
peek()158 fn peek() {
159     const N: usize = 16 * 1024 * 1024;
160     struct H { amt: usize, socket: TcpStream, shutdown: bool }
161 
162     let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
163     let addr = l.local_addr().unwrap();
164 
165     let t = thread::spawn(move || {
166         let mut s = l.accept().unwrap().0;
167         let b = [0; 1024];
168         let mut amt = 0;
169         while amt < N {
170             amt += s.write(&b).unwrap();
171         }
172     });
173 
174     let poll = Poll::new().unwrap();
175     let s = TcpStream::connect(&addr).unwrap();
176 
177     poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
178 
179     let mut events = Events::with_capacity(128);
180 
181     let mut h = H { amt: 0, socket: s, shutdown: false };
182     while !h.shutdown {
183         poll.poll(&mut events, None).unwrap();
184 
185         for event in &events {
186             assert_eq!(event.token(), Token(1));
187             let mut b = [0; 1024];
188             match h.socket.peek(&mut b) {
189                 Ok(_) => (),
190                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
191                     continue
192                 },
193                 Err(e) => panic!("unexpected error: {:?}", e),
194             }
195 
196             loop {
197                 if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
198                     h.amt += amt;
199                 } else {
200                     break
201                 }
202                 if h.amt >= N {
203                     h.shutdown = true;
204                     break
205                 }
206             }
207         }
208     }
209     t.join().unwrap();
210 }
211 
212 #[test]
read_bufs()213 fn read_bufs() {
214     const N: usize = 16 * 1024 * 1024;
215 
216     let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
217     let addr = l.local_addr().unwrap();
218 
219     let t = thread::spawn(move || {
220         let mut s = l.accept().unwrap().0;
221         let b = [1; 1024];
222         let mut amt = 0;
223         while amt < N {
224             amt += s.write(&b).unwrap();
225         }
226     });
227 
228     let poll = Poll::new().unwrap();
229     let mut events = Events::with_capacity(128);
230 
231     let s = TcpStream::connect(&addr).unwrap();
232 
233     poll.register(&s, Token(1), Ready::readable(), PollOpt::level()).unwrap();
234 
235     let b1 = &mut [0; 10][..];
236     let b2 = &mut [0; 383][..];
237     let b3 = &mut [0; 28][..];
238     let b4 = &mut [0; 8][..];
239     let b5 = &mut [0; 128][..];
240     let mut b: [&mut IoVec; 5] = [
241         b1.into(),
242         b2.into(),
243         b3.into(),
244         b4.into(),
245         b5.into(),
246     ];
247 
248     let mut so_far = 0;
249     loop {
250         for buf in b.iter_mut() {
251             for byte in buf.as_mut_bytes() {
252                 *byte = 0;
253             }
254         }
255 
256         poll.poll(&mut events, None).unwrap();
257 
258         match s.read_bufs(&mut b) {
259             Ok(0) => {
260                 assert_eq!(so_far, N);
261                 break
262             }
263             Ok(mut n) => {
264                 so_far += n;
265                 for buf in b.iter() {
266                     let buf = buf.as_bytes();
267                     for byte in buf[..cmp::min(n, buf.len())].iter() {
268                         assert_eq!(*byte, 1);
269                     }
270                     n = n.saturating_sub(buf.len());
271                     if n == 0 {
272                         break
273                     }
274                 }
275                 assert_eq!(n, 0);
276             }
277             Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
278         }
279     }
280 
281     t.join().unwrap();
282 }
283 
284 #[test]
write()285 fn write() {
286     const N: usize = 16 * 1024 * 1024;
287     struct H { amt: usize, socket: TcpStream, shutdown: bool }
288 
289     let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
290     let addr = l.local_addr().unwrap();
291 
292     let t = thread::spawn(move || {
293         let mut s = l.accept().unwrap().0;
294         let mut b = [0; 1024];
295         let mut amt = 0;
296         while amt < N {
297             amt += s.read(&mut b).unwrap();
298         }
299     });
300 
301     let poll = Poll::new().unwrap();
302     let s = TcpStream::connect(&addr).unwrap();
303 
304     poll.register(&s, Token(1), Ready::writable(), PollOpt::edge()).unwrap();
305 
306     let mut events = Events::with_capacity(128);
307 
308     let mut h = H { amt: 0, socket: s, shutdown: false };
309     while !h.shutdown {
310         poll.poll(&mut events, None).unwrap();
311 
312         for event in &events {
313             assert_eq!(event.token(), Token(1));
314             let b = [0; 1024];
315             loop {
316                 if let Some(amt) = h.socket.try_write(&b).unwrap() {
317                     h.amt += amt;
318                 } else {
319                     break
320                 }
321                 if h.amt >= N {
322                     h.shutdown = true;
323                     break
324                 }
325             }
326         }
327     }
328     t.join().unwrap();
329 }
330 
331 #[test]
write_bufs()332 fn write_bufs() {
333     const N: usize = 16 * 1024 * 1024;
334 
335     let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
336     let addr = l.local_addr().unwrap();
337 
338     let t = thread::spawn(move || {
339         let mut s = l.accept().unwrap().0;
340         let mut b = [0; 1024];
341         let mut amt = 0;
342         while amt < N {
343             for byte in b.iter_mut() {
344                 *byte = 0;
345             }
346             let n = s.read(&mut b).unwrap();
347             amt += n;
348             for byte in b[..n].iter() {
349                 assert_eq!(*byte, 1);
350             }
351         }
352     });
353 
354     let poll = Poll::new().unwrap();
355     let mut events = Events::with_capacity(128);
356     let s = TcpStream::connect(&addr).unwrap();
357     poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
358 
359     let b1 = &[1; 10][..];
360     let b2 = &[1; 383][..];
361     let b3 = &[1; 28][..];
362     let b4 = &[1; 8][..];
363     let b5 = &[1; 128][..];
364     let b: [&IoVec; 5] = [
365         b1.into(),
366         b2.into(),
367         b3.into(),
368         b4.into(),
369         b5.into(),
370     ];
371 
372     let mut so_far = 0;
373     while so_far < N {
374         poll.poll(&mut events, None).unwrap();
375 
376         match s.write_bufs(&b) {
377             Ok(n) => so_far += n,
378             Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
379         }
380     }
381 
382     t.join().unwrap();
383 }
384 
385 #[test]
connect_then_close()386 fn connect_then_close() {
387     struct H { listener: TcpListener, shutdown: bool }
388 
389     let poll = Poll::new().unwrap();
390     let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
391     let s = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
392 
393     poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
394     poll.register(&s, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
395 
396     let mut events = Events::with_capacity(128);
397 
398     let mut h = H { listener: l, shutdown: false };
399     while !h.shutdown {
400         poll.poll(&mut events, None).unwrap();
401 
402         for event in &events {
403             if event.token() == Token(1) {
404                 let s = h.listener.accept().unwrap().0;
405                 poll.register(&s, Token(3), Ready::readable() | Ready::writable(),
406                                         PollOpt::edge()).unwrap();
407                 drop(s);
408             } else if event.token() == Token(2) {
409                 h.shutdown = true;
410             }
411         }
412     }
413 }
414 
415 #[test]
listen_then_close()416 fn listen_then_close() {
417     let poll = Poll::new().unwrap();
418     let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
419 
420     poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
421     drop(l);
422 
423     let mut events = Events::with_capacity(128);
424 
425     poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
426 
427     for event in &events {
428         if event.token() == Token(1) {
429             panic!("recieved ready() on a closed TcpListener")
430         }
431     }
432 }
433 
assert_send<T: Send>()434 fn assert_send<T: Send>() {
435 }
436 
assert_sync<T: Sync>()437 fn assert_sync<T: Sync>() {
438 }
439 
440 #[test]
test_tcp_sockets_are_send()441 fn test_tcp_sockets_are_send() {
442     assert_send::<TcpListener>();
443     assert_send::<TcpStream>();
444     assert_sync::<TcpListener>();
445     assert_sync::<TcpStream>();
446 }
447 
448 #[test]
bind_twice_bad()449 fn bind_twice_bad() {
450     let l1 = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
451     let addr = l1.local_addr().unwrap();
452     assert!(TcpListener::bind(&addr).is_err());
453 }
454 
455 #[test]
multiple_writes_immediate_success()456 fn multiple_writes_immediate_success() {
457     const N: usize = 16;
458     let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
459     let addr = l.local_addr().unwrap();
460 
461     let t = thread::spawn(move || {
462         let mut s = l.accept().unwrap().0;
463         let mut b = [0; 1024];
464         let mut amt = 0;
465         while amt < 1024*N {
466             for byte in b.iter_mut() {
467                 *byte = 0;
468             }
469             let n = s.read(&mut b).unwrap();
470             amt += n;
471             for byte in b[..n].iter() {
472                 assert_eq!(*byte, 1);
473             }
474         }
475     });
476 
477     let poll = Poll::new().unwrap();
478     let mut s = TcpStream::connect(&addr).unwrap();
479     poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
480     let mut events = Events::with_capacity(16);
481 
482     // Wait for our TCP stream to connect
483     'outer: loop {
484         poll.poll(&mut events, None).unwrap();
485         for event in events.iter() {
486             if event.token() == Token(1) && event.readiness().is_writable() {
487                 break 'outer
488             }
489         }
490     }
491 
492     for _ in 0..N {
493         s.write_all(&[1; 1024]).unwrap();
494     }
495 
496     t.join().unwrap();
497 }
498 
499 #[test]
connection_reset_by_peer()500 fn connection_reset_by_peer() {
501     let poll = Poll::new().unwrap();
502     let mut events = Events::with_capacity(16);
503     let mut buf = [0u8; 16];
504 
505     // Create listener
506     let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
507     let addr = l.local_addr().unwrap();
508 
509     // Connect client
510     let client = net2::TcpBuilder::new_v4().unwrap()
511         .to_tcp_stream().unwrap();
512 
513     client.set_linger(Some(Duration::from_millis(0))).unwrap();
514     client.connect(&addr).unwrap();
515 
516     // Convert to Mio stream
517     let client = TcpStream::from_stream(client).unwrap();
518 
519     // Register server
520     poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
521 
522     // Register interest in the client
523     poll.register(&client, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
524 
525     // Wait for listener to be ready
526     let mut server;
527     'outer:
528     loop {
529         poll.poll(&mut events, None).unwrap();
530 
531         for event in &events {
532             if event.token() == Token(0) {
533                 match l.accept() {
534                     Ok((sock, _)) => {
535                         server = sock;
536                         break 'outer;
537                     }
538                     Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
539                     Err(e) => panic!("unexpected error {:?}", e),
540                 }
541             }
542         }
543     }
544 
545     // Close the connection
546     drop(client);
547 
548     // Wait a moment
549     thread::sleep(Duration::from_millis(100));
550 
551     // Register interest in the server socket
552     poll.register(&server, Token(3), Ready::readable(), PollOpt::edge()).unwrap();
553 
554 
555     loop {
556         poll.poll(&mut events, None).unwrap();
557 
558         for event in &events {
559             if event.token() == Token(3) {
560                 assert!(event.readiness().is_readable());
561 
562                 match server.read(&mut buf) {
563                     Ok(0) |
564                     Err(_) => {},
565 
566                     Ok(x) => panic!("expected empty buffer but read {} bytes", x),
567                 }
568                 return;
569             }
570         }
571     }
572 
573 }
574 
575 #[test]
576 #[cfg_attr(target_os = "fuchsia", ignore)]
connect_error()577 fn connect_error() {
578     let poll = Poll::new().unwrap();
579     let mut events = Events::with_capacity(16);
580 
581     // Pick a "random" port that shouldn't be in use.
582     let l = match TcpStream::connect(&"127.0.0.1:38381".parse().unwrap()) {
583         Ok(l) => l,
584         Err(ref e) if e.kind() == io::ErrorKind::ConnectionRefused => {
585             // Connection failed synchronously.  This is not a bug, but it
586             // unfortunately doesn't get us the code coverage we want.
587             return;
588         },
589         Err(e) => panic!("TcpStream::connect unexpected error {:?}", e)
590     };
591 
592     poll.register(&l, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
593 
594     'outer:
595     loop {
596         poll.poll(&mut events, None).unwrap();
597 
598         for event in &events {
599             if event.token() == Token(0) {
600                 assert!(event.readiness().is_writable());
601                 break 'outer
602             }
603         }
604     }
605 
606     assert!(l.take_error().unwrap().is_some());
607 }
608 
609 #[test]
write_error()610 fn write_error() {
611     let poll = Poll::new().unwrap();
612     let mut events = Events::with_capacity(16);
613     let (tx, rx) = channel();
614 
615     let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
616     let addr = listener.local_addr().unwrap();
617     let t = thread::spawn(move || {
618         let (conn, _addr) = listener.accept().unwrap();
619         rx.recv().unwrap();
620         drop(conn);
621     });
622 
623     let mut s = TcpStream::connect(&addr).unwrap();
624     poll.register(&s,
625                   Token(0),
626                   Ready::readable() | Ready::writable(),
627                   PollOpt::edge()).unwrap();
628 
629     let mut wait_writable = || {
630         'outer:
631         loop {
632             poll.poll(&mut events, None).unwrap();
633 
634             for event in &events {
635                 if event.token() == Token(0) && event.readiness().is_writable() {
636                     break 'outer
637                 }
638             }
639         }
640     };
641 
642     wait_writable();
643 
644     tx.send(()).unwrap();
645     t.join().unwrap();
646 
647     let buf = [0; 1024];
648     loop {
649         match s.write(&buf) {
650             Ok(_) => {}
651             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
652                 wait_writable()
653             }
654             Err(e) => {
655                 println!("good error: {}", e);
656                 break
657             }
658         }
659     }
660 }
661