1 extern crate tokio_io;
2 extern crate futures;
3
4 use tokio_io::{AsyncRead, AsyncWrite};
5 use tokio_io::codec::length_delimited::*;
6
7 use futures::{Stream, Sink, Poll};
8 use futures::Async::*;
9
10 use std::io;
11 use std::collections::VecDeque;
12
13 macro_rules! mock {
14 ($($x:expr,)*) => {{
15 let mut v = VecDeque::new();
16 v.extend(vec![$($x),*]);
17 Mock { calls: v }
18 }};
19 }
20
21
22 #[test]
read_empty_io_yields_nothing()23 fn read_empty_io_yields_nothing() {
24 let mut io = FramedRead::new(mock!());
25
26 assert_eq!(io.poll().unwrap(), Ready(None));
27 }
28
29 #[test]
read_single_frame_one_packet()30 fn read_single_frame_one_packet() {
31 let mut io = FramedRead::new(mock! {
32 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
33 });
34
35 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
36 assert_eq!(io.poll().unwrap(), Ready(None));
37 }
38
39 #[test]
read_single_frame_one_packet_little_endian()40 fn read_single_frame_one_packet_little_endian() {
41 let mut io = Builder::new()
42 .little_endian()
43 .new_read(mock! {
44 Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()),
45 });
46
47 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
48 assert_eq!(io.poll().unwrap(), Ready(None));
49 }
50
51 #[test]
read_single_frame_one_packet_native_endian()52 fn read_single_frame_one_packet_native_endian() {
53 let data = if cfg!(target_endian = "big") {
54 b"\x00\x00\x00\x09abcdefghi"
55 } else {
56 b"\x09\x00\x00\x00abcdefghi"
57 };
58 let mut io = Builder::new()
59 .native_endian()
60 .new_read(mock! {
61 Ok(data[..].into()),
62 });
63
64 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
65 assert_eq!(io.poll().unwrap(), Ready(None));
66 }
67
68 #[test]
read_single_multi_frame_one_packet()69 fn read_single_multi_frame_one_packet() {
70 let mut data: Vec<u8> = vec![];
71 data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
72 data.extend_from_slice(b"\x00\x00\x00\x03123");
73 data.extend_from_slice(b"\x00\x00\x00\x0bhello world");
74
75 let mut io = FramedRead::new(mock! {
76 Ok(data.into()),
77 });
78
79 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
80 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
81 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
82 assert_eq!(io.poll().unwrap(), Ready(None));
83 }
84
85 #[test]
read_single_frame_multi_packet()86 fn read_single_frame_multi_packet() {
87 let mut io = FramedRead::new(mock! {
88 Ok(b"\x00\x00"[..].into()),
89 Ok(b"\x00\x09abc"[..].into()),
90 Ok(b"defghi"[..].into()),
91 });
92
93 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
94 assert_eq!(io.poll().unwrap(), Ready(None));
95 }
96
97 #[test]
read_multi_frame_multi_packet()98 fn read_multi_frame_multi_packet() {
99 let mut io = FramedRead::new(mock! {
100 Ok(b"\x00\x00"[..].into()),
101 Ok(b"\x00\x09abc"[..].into()),
102 Ok(b"defghi"[..].into()),
103 Ok(b"\x00\x00\x00\x0312"[..].into()),
104 Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
105 });
106
107 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
108 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
109 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
110 assert_eq!(io.poll().unwrap(), Ready(None));
111 }
112
113 #[test]
read_single_frame_multi_packet_wait()114 fn read_single_frame_multi_packet_wait() {
115 let mut io = FramedRead::new(mock! {
116 Ok(b"\x00\x00"[..].into()),
117 Err(would_block()),
118 Ok(b"\x00\x09abc"[..].into()),
119 Err(would_block()),
120 Ok(b"defghi"[..].into()),
121 Err(would_block()),
122 });
123
124 assert_eq!(io.poll().unwrap(), NotReady);
125 assert_eq!(io.poll().unwrap(), NotReady);
126 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
127 assert_eq!(io.poll().unwrap(), NotReady);
128 assert_eq!(io.poll().unwrap(), Ready(None));
129 }
130
131 #[test]
read_multi_frame_multi_packet_wait()132 fn read_multi_frame_multi_packet_wait() {
133 let mut io = FramedRead::new(mock! {
134 Ok(b"\x00\x00"[..].into()),
135 Err(would_block()),
136 Ok(b"\x00\x09abc"[..].into()),
137 Err(would_block()),
138 Ok(b"defghi"[..].into()),
139 Err(would_block()),
140 Ok(b"\x00\x00\x00\x0312"[..].into()),
141 Err(would_block()),
142 Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
143 Err(would_block()),
144 });
145
146
147 assert_eq!(io.poll().unwrap(), NotReady);
148 assert_eq!(io.poll().unwrap(), NotReady);
149 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
150 assert_eq!(io.poll().unwrap(), NotReady);
151 assert_eq!(io.poll().unwrap(), NotReady);
152 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
153 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
154 assert_eq!(io.poll().unwrap(), NotReady);
155 assert_eq!(io.poll().unwrap(), Ready(None));
156 }
157
158 #[test]
read_incomplete_head()159 fn read_incomplete_head() {
160 let mut io = FramedRead::new(mock! {
161 Ok(b"\x00\x00"[..].into()),
162 });
163
164 assert!(io.poll().is_err());
165 }
166
167 #[test]
read_incomplete_head_multi()168 fn read_incomplete_head_multi() {
169 let mut io = FramedRead::new(mock! {
170 Err(would_block()),
171 Ok(b"\x00"[..].into()),
172 Err(would_block()),
173 });
174
175 assert_eq!(io.poll().unwrap(), NotReady);
176 assert_eq!(io.poll().unwrap(), NotReady);
177 assert!(io.poll().is_err());
178 }
179
180 #[test]
read_incomplete_payload()181 fn read_incomplete_payload() {
182 let mut io = FramedRead::new(mock! {
183 Ok(b"\x00\x00\x00\x09ab"[..].into()),
184 Err(would_block()),
185 Ok(b"cd"[..].into()),
186 Err(would_block()),
187 });
188
189 assert_eq!(io.poll().unwrap(), NotReady);
190 assert_eq!(io.poll().unwrap(), NotReady);
191 assert!(io.poll().is_err());
192 }
193
194 #[test]
read_max_frame_len()195 fn read_max_frame_len() {
196 let mut io = Builder::new()
197 .max_frame_length(5)
198 .new_read(mock! {
199 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
200 });
201
202 assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
203 }
204
205 #[test]
read_update_max_frame_len_at_rest()206 fn read_update_max_frame_len_at_rest() {
207 let mut io = Builder::new()
208 .new_read(mock! {
209 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
210 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
211 });
212
213 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
214 io.set_max_frame_length(5);
215 assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
216 }
217
218 #[test]
read_update_max_frame_len_in_flight()219 fn read_update_max_frame_len_in_flight() {
220 let mut io = Builder::new()
221 .new_read(mock! {
222 Ok(b"\x00\x00\x00\x09abcd"[..].into()),
223 Err(would_block()),
224 Ok(b"efghi"[..].into()),
225 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
226 });
227
228 assert_eq!(io.poll().unwrap(), NotReady);
229 io.set_max_frame_length(5);
230 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
231 assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
232 }
233
234 #[test]
read_one_byte_length_field()235 fn read_one_byte_length_field() {
236 let mut io = Builder::new()
237 .length_field_length(1)
238 .new_read(mock! {
239 Ok(b"\x09abcdefghi"[..].into()),
240 });
241
242 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
243 assert_eq!(io.poll().unwrap(), Ready(None));
244 }
245
246 #[test]
read_header_offset()247 fn read_header_offset() {
248 let mut io = Builder::new()
249 .length_field_length(2)
250 .length_field_offset(4)
251 .new_read(mock! {
252 Ok(b"zzzz\x00\x09abcdefghi"[..].into()),
253 });
254
255 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
256 assert_eq!(io.poll().unwrap(), Ready(None));
257 }
258
259 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()260 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
261 let mut data: Vec<u8> = vec![];
262 data.extend_from_slice(b"xx\x00\x09abcdefghi");
263 data.extend_from_slice(b"yy\x00\x03123");
264 data.extend_from_slice(b"zz\x00\x0bhello world");
265
266 let mut io = Builder::new()
267 .length_field_length(2)
268 .length_field_offset(2)
269 .num_skip(0)
270 .length_adjustment(4)
271 .new_read(mock! {
272 Ok(data.into()),
273 });
274
275 assert_eq!(io.poll().unwrap(), Ready(Some(b"xx\x00\x09abcdefghi"[..].into())));
276 assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into())));
277 assert_eq!(io.poll().unwrap(), Ready(Some(b"zz\x00\x0bhello world"[..].into())));
278 assert_eq!(io.poll().unwrap(), Ready(None));
279 }
280
281 #[test]
read_single_multi_frame_one_packet_length_includes_head()282 fn read_single_multi_frame_one_packet_length_includes_head() {
283 let mut data: Vec<u8> = vec![];
284 data.extend_from_slice(b"\x00\x0babcdefghi");
285 data.extend_from_slice(b"\x00\x05123");
286 data.extend_from_slice(b"\x00\x0dhello world");
287
288 let mut io = Builder::new()
289 .length_field_length(2)
290 .length_adjustment(-2)
291 .new_read(mock! {
292 Ok(data.into()),
293 });
294
295 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
296 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
297 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
298 assert_eq!(io.poll().unwrap(), Ready(None));
299 }
300
301 #[test]
write_single_frame_length_adjusted()302 fn write_single_frame_length_adjusted() {
303 let mut io = Builder::new()
304 .length_adjustment(-2)
305 .new_write(mock! {
306 Ok(b"\x00\x00\x00\x0b"[..].into()),
307 Ok(b"abcdefghi"[..].into()),
308 Ok(Flush),
309 });
310 assert!(io.start_send("abcdefghi").unwrap().is_ready());
311 assert!(io.poll_complete().unwrap().is_ready());
312 assert!(io.get_ref().calls.is_empty());
313 }
314
315 #[test]
write_nothing_yields_nothing()316 fn write_nothing_yields_nothing() {
317 let mut io: FramedWrite<_, &'static [u8]> = FramedWrite::new(mock!());
318 assert!(io.poll_complete().unwrap().is_ready());
319 }
320
321 #[test]
write_single_frame_one_packet()322 fn write_single_frame_one_packet() {
323 let mut io = FramedWrite::new(mock! {
324 Ok(b"\x00\x00\x00\x09"[..].into()),
325 Ok(b"abcdefghi"[..].into()),
326 Ok(Flush),
327 });
328
329 assert!(io.start_send("abcdefghi").unwrap().is_ready());
330 assert!(io.poll_complete().unwrap().is_ready());
331 assert!(io.get_ref().calls.is_empty());
332 }
333
334 #[test]
write_single_multi_frame_one_packet()335 fn write_single_multi_frame_one_packet() {
336 let mut io = FramedWrite::new(mock! {
337 Ok(b"\x00\x00\x00\x09"[..].into()),
338 Ok(b"abcdefghi"[..].into()),
339 Ok(b"\x00\x00\x00\x03"[..].into()),
340 Ok(b"123"[..].into()),
341 Ok(b"\x00\x00\x00\x0b"[..].into()),
342 Ok(b"hello world"[..].into()),
343 Ok(Flush),
344 });
345
346 assert!(io.start_send("abcdefghi").unwrap().is_ready());
347 assert!(io.start_send("123").unwrap().is_ready());
348 assert!(io.start_send("hello world").unwrap().is_ready());
349 assert!(io.poll_complete().unwrap().is_ready());
350 assert!(io.get_ref().calls.is_empty());
351 }
352
353 #[test]
write_single_multi_frame_multi_packet()354 fn write_single_multi_frame_multi_packet() {
355 let mut io = FramedWrite::new(mock! {
356 Ok(b"\x00\x00\x00\x09"[..].into()),
357 Ok(b"abcdefghi"[..].into()),
358 Ok(Flush),
359 Ok(b"\x00\x00\x00\x03"[..].into()),
360 Ok(b"123"[..].into()),
361 Ok(Flush),
362 Ok(b"\x00\x00\x00\x0b"[..].into()),
363 Ok(b"hello world"[..].into()),
364 Ok(Flush),
365 });
366
367 assert!(io.start_send("abcdefghi").unwrap().is_ready());
368 assert!(io.poll_complete().unwrap().is_ready());
369 assert!(io.start_send("123").unwrap().is_ready());
370 assert!(io.poll_complete().unwrap().is_ready());
371 assert!(io.start_send("hello world").unwrap().is_ready());
372 assert!(io.poll_complete().unwrap().is_ready());
373 assert!(io.get_ref().calls.is_empty());
374 }
375
376 #[test]
write_single_frame_would_block()377 fn write_single_frame_would_block() {
378 let mut io = FramedWrite::new(mock! {
379 Err(would_block()),
380 Ok(b"\x00\x00"[..].into()),
381 Err(would_block()),
382 Ok(b"\x00\x09"[..].into()),
383 Ok(b"abcdefghi"[..].into()),
384 Ok(Flush),
385 });
386
387 assert!(io.start_send("abcdefghi").unwrap().is_ready());
388 assert!(!io.poll_complete().unwrap().is_ready());
389 assert!(!io.poll_complete().unwrap().is_ready());
390 assert!(io.poll_complete().unwrap().is_ready());
391
392 assert!(io.get_ref().calls.is_empty());
393 }
394
395 #[test]
write_single_frame_little_endian()396 fn write_single_frame_little_endian() {
397 let mut io = Builder::new()
398 .little_endian()
399 .new_write(mock! {
400 Ok(b"\x09\x00\x00\x00"[..].into()),
401 Ok(b"abcdefghi"[..].into()),
402 Ok(Flush),
403 });
404
405 assert!(io.start_send("abcdefghi").unwrap().is_ready());
406 assert!(io.poll_complete().unwrap().is_ready());
407 assert!(io.get_ref().calls.is_empty());
408 }
409
410
411 #[test]
write_single_frame_with_short_length_field()412 fn write_single_frame_with_short_length_field() {
413 let mut io = Builder::new()
414 .length_field_length(1)
415 .new_write(mock! {
416 Ok(b"\x09"[..].into()),
417 Ok(b"abcdefghi"[..].into()),
418 Ok(Flush),
419 });
420
421 assert!(io.start_send("abcdefghi").unwrap().is_ready());
422 assert!(io.poll_complete().unwrap().is_ready());
423 assert!(io.get_ref().calls.is_empty());
424 }
425
426 #[test]
write_max_frame_len()427 fn write_max_frame_len() {
428 let mut io = Builder::new()
429 .max_frame_length(5)
430 .new_write(mock! { });
431
432 assert_eq!(io.start_send("abcdef").unwrap_err().kind(), io::ErrorKind::InvalidInput);
433 assert!(io.get_ref().calls.is_empty());
434 }
435
436 #[test]
write_update_max_frame_len_at_rest()437 fn write_update_max_frame_len_at_rest() {
438 let mut io = Builder::new()
439 .new_write(mock! {
440 Ok(b"\x00\x00\x00\x06"[..].into()),
441 Ok(b"abcdef"[..].into()),
442 Ok(Flush),
443 });
444
445 assert!(io.start_send("abcdef").unwrap().is_ready());
446 assert!(io.poll_complete().unwrap().is_ready());
447 io.set_max_frame_length(5);
448 assert_eq!(io.start_send("abcdef").unwrap_err().kind(), io::ErrorKind::InvalidInput);
449 assert!(io.get_ref().calls.is_empty());
450 }
451
452 #[test]
write_update_max_frame_len_in_flight()453 fn write_update_max_frame_len_in_flight() {
454 let mut io = Builder::new()
455 .new_write(mock! {
456 Ok(b"\x00\x00\x00\x06"[..].into()),
457 Ok(b"ab"[..].into()),
458 Err(would_block()),
459 Ok(b"cdef"[..].into()),
460 Ok(Flush),
461 });
462
463 assert!(io.start_send("abcdef").unwrap().is_ready());
464 assert!(!io.poll_complete().unwrap().is_ready());
465 io.set_max_frame_length(5);
466 assert!(io.poll_complete().unwrap().is_ready());
467 assert_eq!(io.start_send("abcdef").unwrap_err().kind(), io::ErrorKind::InvalidInput);
468 assert!(io.get_ref().calls.is_empty());
469 }
470
471 // ===== Test utils =====
472
would_block() -> io::Error473 fn would_block() -> io::Error {
474 io::Error::new(io::ErrorKind::WouldBlock, "would block")
475 }
476
477 struct Mock {
478 calls: VecDeque<io::Result<Op>>,
479 }
480
481 enum Op {
482 Data(Vec<u8>),
483 Flush,
484 }
485
486 use self::Op::*;
487
488 impl io::Read for Mock {
read(&mut self, dst: &mut [u8]) -> io::Result<usize>489 fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
490 match self.calls.pop_front() {
491 Some(Ok(Op::Data(data))) => {
492 debug_assert!(dst.len() >= data.len());
493 dst[..data.len()].copy_from_slice(&data[..]);
494 Ok(data.len())
495 }
496 Some(Ok(_)) => panic!(),
497 Some(Err(e)) => Err(e),
498 None => Ok(0),
499 }
500 }
501 }
502
503 impl AsyncRead for Mock {
504 }
505
506 impl io::Write for Mock {
write(&mut self, src: &[u8]) -> io::Result<usize>507 fn write(&mut self, src: &[u8]) -> io::Result<usize> {
508 match self.calls.pop_front() {
509 Some(Ok(Op::Data(data))) => {
510 let len = data.len();
511 assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
512 assert_eq!(&data[..], &src[..len]);
513 Ok(len)
514 }
515 Some(Ok(_)) => panic!(),
516 Some(Err(e)) => Err(e),
517 None => Ok(0),
518 }
519 }
520
flush(&mut self) -> io::Result<()>521 fn flush(&mut self) -> io::Result<()> {
522 match self.calls.pop_front() {
523 Some(Ok(Op::Flush)) => {
524 Ok(())
525 }
526 Some(Ok(_)) => panic!(),
527 Some(Err(e)) => Err(e),
528 None => Ok(()),
529 }
530 }
531 }
532
533 impl AsyncWrite for Mock {
shutdown(&mut self) -> Poll<(), io::Error>534 fn shutdown(&mut self) -> Poll<(), io::Error> {
535 Ok(Ready(()))
536 }
537 }
538
539 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op540 fn from(src: &'a [u8]) -> Op {
541 Op::Data(src.into())
542 }
543 }
544
545 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op546 fn from(src: Vec<u8>) -> Op {
547 Op::Data(src)
548 }
549 }
550