1 use std::cmp;
2 use std::io::prelude::*;
3 use std::io;
4 use std::net;
5 use std::sync::mpsc::channel;
6 use std::thread;
7 use std::time::Duration;
8
9 use net2::{self, TcpStreamExt};
10
11 use {TryRead, TryWrite};
12 use mio::{Token, Ready, PollOpt, Poll, Events};
13 use iovec::IoVec;
14 use mio::net::{TcpListener, TcpStream};
15
16 #[test]
accept()17 fn accept() {
18 struct H { hit: bool, listener: TcpListener, shutdown: bool }
19
20 let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
21 let addr = l.local_addr().unwrap();
22
23 let t = thread::spawn(move || {
24 net::TcpStream::connect(&addr).unwrap();
25 });
26
27 let poll = Poll::new().unwrap();
28
29 poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
30
31 let mut events = Events::with_capacity(128);
32
33 let mut h = H { hit: false, listener: l, shutdown: false };
34 while !h.shutdown {
35 poll.poll(&mut events, None).unwrap();
36
37 for event in &events {
38 h.hit = true;
39 assert_eq!(event.token(), Token(1));
40 assert!(event.readiness().is_readable());
41 assert!(h.listener.accept().is_ok());
42 h.shutdown = true;
43 }
44 }
45 assert!(h.hit);
46 assert!(h.listener.accept().unwrap_err().kind() == io::ErrorKind::WouldBlock);
47 t.join().unwrap();
48 }
49
50 #[test]
connect()51 fn connect() {
52 struct H { hit: u32, shutdown: bool }
53
54 let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
55 let addr = l.local_addr().unwrap();
56
57 let (tx, rx) = channel();
58 let (tx2, rx2) = channel();
59 let t = thread::spawn(move || {
60 let s = l.accept().unwrap();
61 rx.recv().unwrap();
62 drop(s);
63 tx2.send(()).unwrap();
64 });
65
66 let poll = Poll::new().unwrap();
67 let s = TcpStream::connect(&addr).unwrap();
68
69 poll.register(&s, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
70
71 let mut events = Events::with_capacity(128);
72
73 let mut h = H { hit: 0, shutdown: false };
74 while !h.shutdown {
75 poll.poll(&mut events, None).unwrap();
76
77 for event in &events {
78 assert_eq!(event.token(), Token(1));
79 match h.hit {
80 0 => assert!(event.readiness().is_writable()),
81 1 => assert!(event.readiness().is_readable()),
82 _ => panic!(),
83 }
84 h.hit += 1;
85 h.shutdown = true;
86 }
87 }
88 assert_eq!(h.hit, 1);
89 tx.send(()).unwrap();
90 rx2.recv().unwrap();
91 h.shutdown = false;
92 while !h.shutdown {
93 poll.poll(&mut events, None).unwrap();
94
95 for event in &events {
96 assert_eq!(event.token(), Token(1));
97 match h.hit {
98 0 => assert!(event.readiness().is_writable()),
99 1 => assert!(event.readiness().is_readable()),
100 _ => panic!(),
101 }
102 h.hit += 1;
103 h.shutdown = true;
104 }
105 }
106 assert_eq!(h.hit, 2);
107 t.join().unwrap();
108 }
109
110 #[test]
read()111 fn read() {
112 const N: usize = 16 * 1024 * 1024;
113 struct H { amt: usize, socket: TcpStream, shutdown: bool }
114
115 let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
116 let addr = l.local_addr().unwrap();
117
118 let t = thread::spawn(move || {
119 let mut s = l.accept().unwrap().0;
120 let b = [0; 1024];
121 let mut amt = 0;
122 while amt < N {
123 amt += s.write(&b).unwrap();
124 }
125 });
126
127 let poll = Poll::new().unwrap();
128 let s = TcpStream::connect(&addr).unwrap();
129
130 poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
131
132 let mut events = Events::with_capacity(128);
133
134 let mut h = H { amt: 0, socket: s, shutdown: false };
135 while !h.shutdown {
136 poll.poll(&mut events, None).unwrap();
137
138 for event in &events {
139 assert_eq!(event.token(), Token(1));
140 let mut b = [0; 1024];
141 loop {
142 if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
143 h.amt += amt;
144 } else {
145 break
146 }
147 if h.amt >= N {
148 h.shutdown = true;
149 break
150 }
151 }
152 }
153 }
154 t.join().unwrap();
155 }
156
157 #[test]
peek()158 fn peek() {
159 const N: usize = 16 * 1024 * 1024;
160 struct H { amt: usize, socket: TcpStream, shutdown: bool }
161
162 let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
163 let addr = l.local_addr().unwrap();
164
165 let t = thread::spawn(move || {
166 let mut s = l.accept().unwrap().0;
167 let b = [0; 1024];
168 let mut amt = 0;
169 while amt < N {
170 amt += s.write(&b).unwrap();
171 }
172 });
173
174 let poll = Poll::new().unwrap();
175 let s = TcpStream::connect(&addr).unwrap();
176
177 poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
178
179 let mut events = Events::with_capacity(128);
180
181 let mut h = H { amt: 0, socket: s, shutdown: false };
182 while !h.shutdown {
183 poll.poll(&mut events, None).unwrap();
184
185 for event in &events {
186 assert_eq!(event.token(), Token(1));
187 let mut b = [0; 1024];
188 match h.socket.peek(&mut b) {
189 Ok(_) => (),
190 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
191 continue
192 },
193 Err(e) => panic!("unexpected error: {:?}", e),
194 }
195
196 loop {
197 if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
198 h.amt += amt;
199 } else {
200 break
201 }
202 if h.amt >= N {
203 h.shutdown = true;
204 break
205 }
206 }
207 }
208 }
209 t.join().unwrap();
210 }
211
212 #[test]
read_bufs()213 fn read_bufs() {
214 const N: usize = 16 * 1024 * 1024;
215
216 let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
217 let addr = l.local_addr().unwrap();
218
219 let t = thread::spawn(move || {
220 let mut s = l.accept().unwrap().0;
221 let b = [1; 1024];
222 let mut amt = 0;
223 while amt < N {
224 amt += s.write(&b).unwrap();
225 }
226 });
227
228 let poll = Poll::new().unwrap();
229 let mut events = Events::with_capacity(128);
230
231 let s = TcpStream::connect(&addr).unwrap();
232
233 poll.register(&s, Token(1), Ready::readable(), PollOpt::level()).unwrap();
234
235 let b1 = &mut [0; 10][..];
236 let b2 = &mut [0; 383][..];
237 let b3 = &mut [0; 28][..];
238 let b4 = &mut [0; 8][..];
239 let b5 = &mut [0; 128][..];
240 let mut b: [&mut IoVec; 5] = [
241 b1.into(),
242 b2.into(),
243 b3.into(),
244 b4.into(),
245 b5.into(),
246 ];
247
248 let mut so_far = 0;
249 loop {
250 for buf in b.iter_mut() {
251 for byte in buf.as_mut_bytes() {
252 *byte = 0;
253 }
254 }
255
256 poll.poll(&mut events, None).unwrap();
257
258 match s.read_bufs(&mut b) {
259 Ok(0) => {
260 assert_eq!(so_far, N);
261 break
262 }
263 Ok(mut n) => {
264 so_far += n;
265 for buf in b.iter() {
266 let buf = buf.as_bytes();
267 for byte in buf[..cmp::min(n, buf.len())].iter() {
268 assert_eq!(*byte, 1);
269 }
270 n = n.saturating_sub(buf.len());
271 if n == 0 {
272 break
273 }
274 }
275 assert_eq!(n, 0);
276 }
277 Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
278 }
279 }
280
281 t.join().unwrap();
282 }
283
284 #[test]
write()285 fn write() {
286 const N: usize = 16 * 1024 * 1024;
287 struct H { amt: usize, socket: TcpStream, shutdown: bool }
288
289 let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
290 let addr = l.local_addr().unwrap();
291
292 let t = thread::spawn(move || {
293 let mut s = l.accept().unwrap().0;
294 let mut b = [0; 1024];
295 let mut amt = 0;
296 while amt < N {
297 amt += s.read(&mut b).unwrap();
298 }
299 });
300
301 let poll = Poll::new().unwrap();
302 let s = TcpStream::connect(&addr).unwrap();
303
304 poll.register(&s, Token(1), Ready::writable(), PollOpt::edge()).unwrap();
305
306 let mut events = Events::with_capacity(128);
307
308 let mut h = H { amt: 0, socket: s, shutdown: false };
309 while !h.shutdown {
310 poll.poll(&mut events, None).unwrap();
311
312 for event in &events {
313 assert_eq!(event.token(), Token(1));
314 let b = [0; 1024];
315 loop {
316 if let Some(amt) = h.socket.try_write(&b).unwrap() {
317 h.amt += amt;
318 } else {
319 break
320 }
321 if h.amt >= N {
322 h.shutdown = true;
323 break
324 }
325 }
326 }
327 }
328 t.join().unwrap();
329 }
330
331 #[test]
write_bufs()332 fn write_bufs() {
333 const N: usize = 16 * 1024 * 1024;
334
335 let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
336 let addr = l.local_addr().unwrap();
337
338 let t = thread::spawn(move || {
339 let mut s = l.accept().unwrap().0;
340 let mut b = [0; 1024];
341 let mut amt = 0;
342 while amt < N {
343 for byte in b.iter_mut() {
344 *byte = 0;
345 }
346 let n = s.read(&mut b).unwrap();
347 amt += n;
348 for byte in b[..n].iter() {
349 assert_eq!(*byte, 1);
350 }
351 }
352 });
353
354 let poll = Poll::new().unwrap();
355 let mut events = Events::with_capacity(128);
356 let s = TcpStream::connect(&addr).unwrap();
357 poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
358
359 let b1 = &[1; 10][..];
360 let b2 = &[1; 383][..];
361 let b3 = &[1; 28][..];
362 let b4 = &[1; 8][..];
363 let b5 = &[1; 128][..];
364 let b: [&IoVec; 5] = [
365 b1.into(),
366 b2.into(),
367 b3.into(),
368 b4.into(),
369 b5.into(),
370 ];
371
372 let mut so_far = 0;
373 while so_far < N {
374 poll.poll(&mut events, None).unwrap();
375
376 match s.write_bufs(&b) {
377 Ok(n) => so_far += n,
378 Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
379 }
380 }
381
382 t.join().unwrap();
383 }
384
385 #[test]
connect_then_close()386 fn connect_then_close() {
387 struct H { listener: TcpListener, shutdown: bool }
388
389 let poll = Poll::new().unwrap();
390 let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
391 let s = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
392
393 poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
394 poll.register(&s, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
395
396 let mut events = Events::with_capacity(128);
397
398 let mut h = H { listener: l, shutdown: false };
399 while !h.shutdown {
400 poll.poll(&mut events, None).unwrap();
401
402 for event in &events {
403 if event.token() == Token(1) {
404 let s = h.listener.accept().unwrap().0;
405 poll.register(&s, Token(3), Ready::readable() | Ready::writable(),
406 PollOpt::edge()).unwrap();
407 drop(s);
408 } else if event.token() == Token(2) {
409 h.shutdown = true;
410 }
411 }
412 }
413 }
414
415 #[test]
listen_then_close()416 fn listen_then_close() {
417 let poll = Poll::new().unwrap();
418 let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
419
420 poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
421 drop(l);
422
423 let mut events = Events::with_capacity(128);
424
425 poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
426
427 for event in &events {
428 if event.token() == Token(1) {
429 panic!("recieved ready() on a closed TcpListener")
430 }
431 }
432 }
433
assert_send<T: Send>()434 fn assert_send<T: Send>() {
435 }
436
assert_sync<T: Sync>()437 fn assert_sync<T: Sync>() {
438 }
439
440 #[test]
test_tcp_sockets_are_send()441 fn test_tcp_sockets_are_send() {
442 assert_send::<TcpListener>();
443 assert_send::<TcpStream>();
444 assert_sync::<TcpListener>();
445 assert_sync::<TcpStream>();
446 }
447
448 #[test]
bind_twice_bad()449 fn bind_twice_bad() {
450 let l1 = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
451 let addr = l1.local_addr().unwrap();
452 assert!(TcpListener::bind(&addr).is_err());
453 }
454
455 #[test]
multiple_writes_immediate_success()456 fn multiple_writes_immediate_success() {
457 const N: usize = 16;
458 let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
459 let addr = l.local_addr().unwrap();
460
461 let t = thread::spawn(move || {
462 let mut s = l.accept().unwrap().0;
463 let mut b = [0; 1024];
464 let mut amt = 0;
465 while amt < 1024*N {
466 for byte in b.iter_mut() {
467 *byte = 0;
468 }
469 let n = s.read(&mut b).unwrap();
470 amt += n;
471 for byte in b[..n].iter() {
472 assert_eq!(*byte, 1);
473 }
474 }
475 });
476
477 let poll = Poll::new().unwrap();
478 let mut s = TcpStream::connect(&addr).unwrap();
479 poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
480 let mut events = Events::with_capacity(16);
481
482 // Wait for our TCP stream to connect
483 'outer: loop {
484 poll.poll(&mut events, None).unwrap();
485 for event in events.iter() {
486 if event.token() == Token(1) && event.readiness().is_writable() {
487 break 'outer
488 }
489 }
490 }
491
492 for _ in 0..N {
493 s.write_all(&[1; 1024]).unwrap();
494 }
495
496 t.join().unwrap();
497 }
498
499 #[test]
connection_reset_by_peer()500 fn connection_reset_by_peer() {
501 let poll = Poll::new().unwrap();
502 let mut events = Events::with_capacity(16);
503 let mut buf = [0u8; 16];
504
505 // Create listener
506 let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
507 let addr = l.local_addr().unwrap();
508
509 // Connect client
510 let client = net2::TcpBuilder::new_v4().unwrap()
511 .to_tcp_stream().unwrap();
512
513 client.set_linger(Some(Duration::from_millis(0))).unwrap();
514 client.connect(&addr).unwrap();
515
516 // Convert to Mio stream
517 let client = TcpStream::from_stream(client).unwrap();
518
519 // Register server
520 poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
521
522 // Register interest in the client
523 poll.register(&client, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
524
525 // Wait for listener to be ready
526 let mut server;
527 'outer:
528 loop {
529 poll.poll(&mut events, None).unwrap();
530
531 for event in &events {
532 if event.token() == Token(0) {
533 match l.accept() {
534 Ok((sock, _)) => {
535 server = sock;
536 break 'outer;
537 }
538 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
539 Err(e) => panic!("unexpected error {:?}", e),
540 }
541 }
542 }
543 }
544
545 // Close the connection
546 drop(client);
547
548 // Wait a moment
549 thread::sleep(Duration::from_millis(100));
550
551 // Register interest in the server socket
552 poll.register(&server, Token(3), Ready::readable(), PollOpt::edge()).unwrap();
553
554
555 loop {
556 poll.poll(&mut events, None).unwrap();
557
558 for event in &events {
559 if event.token() == Token(3) {
560 assert!(event.readiness().is_readable());
561
562 match server.read(&mut buf) {
563 Ok(0) |
564 Err(_) => {},
565
566 Ok(x) => panic!("expected empty buffer but read {} bytes", x),
567 }
568 return;
569 }
570 }
571 }
572
573 }
574
575 #[test]
576 #[cfg_attr(target_os = "fuchsia", ignore)]
connect_error()577 fn connect_error() {
578 let poll = Poll::new().unwrap();
579 let mut events = Events::with_capacity(16);
580
581 // Pick a "random" port that shouldn't be in use.
582 let l = match TcpStream::connect(&"127.0.0.1:38381".parse().unwrap()) {
583 Ok(l) => l,
584 Err(ref e) if e.kind() == io::ErrorKind::ConnectionRefused => {
585 // Connection failed synchronously. This is not a bug, but it
586 // unfortunately doesn't get us the code coverage we want.
587 return;
588 },
589 Err(e) => panic!("TcpStream::connect unexpected error {:?}", e)
590 };
591
592 poll.register(&l, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
593
594 'outer:
595 loop {
596 poll.poll(&mut events, None).unwrap();
597
598 for event in &events {
599 if event.token() == Token(0) {
600 assert!(event.readiness().is_writable());
601 break 'outer
602 }
603 }
604 }
605
606 assert!(l.take_error().unwrap().is_some());
607 }
608
609 #[test]
write_error()610 fn write_error() {
611 let poll = Poll::new().unwrap();
612 let mut events = Events::with_capacity(16);
613 let (tx, rx) = channel();
614
615 let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
616 let addr = listener.local_addr().unwrap();
617 let t = thread::spawn(move || {
618 let (conn, _addr) = listener.accept().unwrap();
619 rx.recv().unwrap();
620 drop(conn);
621 });
622
623 let mut s = TcpStream::connect(&addr).unwrap();
624 poll.register(&s,
625 Token(0),
626 Ready::readable() | Ready::writable(),
627 PollOpt::edge()).unwrap();
628
629 let mut wait_writable = || {
630 'outer:
631 loop {
632 poll.poll(&mut events, None).unwrap();
633
634 for event in &events {
635 if event.token() == Token(0) && event.readiness().is_writable() {
636 break 'outer
637 }
638 }
639 }
640 };
641
642 wait_writable();
643
644 tx.send(()).unwrap();
645 t.join().unwrap();
646
647 let buf = [0; 1024];
648 loop {
649 match s.write(&buf) {
650 Ok(_) => {}
651 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
652 wait_writable()
653 }
654 Err(e) => {
655 println!("good error: {}", e);
656 break
657 }
658 }
659 }
660 }
661