1 extern crate bytes;
2 extern crate futures;
3 extern crate tokio;
4
5 use tokio::codec::*;
6 use tokio::io::{AsyncRead, AsyncWrite};
7
8 use bytes::{BufMut, Bytes, BytesMut};
9 use futures::Async::*;
10 use futures::{Poll, Sink, Stream};
11
12 use std::collections::VecDeque;
13 use std::io;
14
15 macro_rules! mock {
16 ($($x:expr,)*) => {{
17 let mut v = VecDeque::new();
18 v.extend(vec![$($x),*]);
19 Mock { calls: v }
20 }};
21 }
22
23 #[test]
read_empty_io_yields_nothing()24 fn read_empty_io_yields_nothing() {
25 let mut io = FramedRead::new(mock!(), LengthDelimitedCodec::new());
26
27 assert_eq!(io.poll().unwrap(), Ready(None));
28 }
29
30 #[test]
read_single_frame_one_packet()31 fn read_single_frame_one_packet() {
32 let mut io = FramedRead::new(
33 mock! {
34 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
35 },
36 LengthDelimitedCodec::new(),
37 );
38
39 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
40 assert_eq!(io.poll().unwrap(), Ready(None));
41 }
42
43 #[test]
read_single_frame_one_packet_little_endian()44 fn read_single_frame_one_packet_little_endian() {
45 let mut io = length_delimited::Builder::new()
46 .little_endian()
47 .new_read(mock! {
48 Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()),
49 });
50
51 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
52 assert_eq!(io.poll().unwrap(), Ready(None));
53 }
54
55 #[test]
read_single_frame_one_packet_native_endian()56 fn read_single_frame_one_packet_native_endian() {
57 let data = if cfg!(target_endian = "big") {
58 b"\x00\x00\x00\x09abcdefghi"
59 } else {
60 b"\x09\x00\x00\x00abcdefghi"
61 };
62 let mut io = length_delimited::Builder::new()
63 .native_endian()
64 .new_read(mock! {
65 Ok(data[..].into()),
66 });
67
68 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
69 assert_eq!(io.poll().unwrap(), Ready(None));
70 }
71
72 #[test]
read_single_multi_frame_one_packet()73 fn read_single_multi_frame_one_packet() {
74 let mut data: Vec<u8> = vec![];
75 data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
76 data.extend_from_slice(b"\x00\x00\x00\x03123");
77 data.extend_from_slice(b"\x00\x00\x00\x0bhello world");
78
79 let mut io = FramedRead::new(
80 mock! {
81 Ok(data.into()),
82 },
83 LengthDelimitedCodec::new(),
84 );
85
86 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
87 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
88 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
89 assert_eq!(io.poll().unwrap(), Ready(None));
90 }
91
92 #[test]
read_single_frame_multi_packet()93 fn read_single_frame_multi_packet() {
94 let mut io = FramedRead::new(
95 mock! {
96 Ok(b"\x00\x00"[..].into()),
97 Ok(b"\x00\x09abc"[..].into()),
98 Ok(b"defghi"[..].into()),
99 },
100 LengthDelimitedCodec::new(),
101 );
102
103 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
104 assert_eq!(io.poll().unwrap(), Ready(None));
105 }
106
107 #[test]
read_multi_frame_multi_packet()108 fn read_multi_frame_multi_packet() {
109 let mut io = FramedRead::new(
110 mock! {
111 Ok(b"\x00\x00"[..].into()),
112 Ok(b"\x00\x09abc"[..].into()),
113 Ok(b"defghi"[..].into()),
114 Ok(b"\x00\x00\x00\x0312"[..].into()),
115 Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
116 },
117 LengthDelimitedCodec::new(),
118 );
119
120 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
121 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
122 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
123 assert_eq!(io.poll().unwrap(), Ready(None));
124 }
125
126 #[test]
read_single_frame_multi_packet_wait()127 fn read_single_frame_multi_packet_wait() {
128 let mut io = FramedRead::new(
129 mock! {
130 Ok(b"\x00\x00"[..].into()),
131 Err(would_block()),
132 Ok(b"\x00\x09abc"[..].into()),
133 Err(would_block()),
134 Ok(b"defghi"[..].into()),
135 Err(would_block()),
136 },
137 LengthDelimitedCodec::new(),
138 );
139
140 assert_eq!(io.poll().unwrap(), NotReady);
141 assert_eq!(io.poll().unwrap(), NotReady);
142 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
143 assert_eq!(io.poll().unwrap(), NotReady);
144 assert_eq!(io.poll().unwrap(), Ready(None));
145 }
146
147 #[test]
read_multi_frame_multi_packet_wait()148 fn read_multi_frame_multi_packet_wait() {
149 let mut io = FramedRead::new(
150 mock! {
151 Ok(b"\x00\x00"[..].into()),
152 Err(would_block()),
153 Ok(b"\x00\x09abc"[..].into()),
154 Err(would_block()),
155 Ok(b"defghi"[..].into()),
156 Err(would_block()),
157 Ok(b"\x00\x00\x00\x0312"[..].into()),
158 Err(would_block()),
159 Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
160 Err(would_block()),
161 },
162 LengthDelimitedCodec::new(),
163 );
164
165 assert_eq!(io.poll().unwrap(), NotReady);
166 assert_eq!(io.poll().unwrap(), NotReady);
167 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
168 assert_eq!(io.poll().unwrap(), NotReady);
169 assert_eq!(io.poll().unwrap(), NotReady);
170 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
171 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
172 assert_eq!(io.poll().unwrap(), NotReady);
173 assert_eq!(io.poll().unwrap(), Ready(None));
174 }
175
176 #[test]
read_incomplete_head()177 fn read_incomplete_head() {
178 let mut io = FramedRead::new(
179 mock! {
180 Ok(b"\x00\x00"[..].into()),
181 },
182 LengthDelimitedCodec::new(),
183 );
184
185 assert!(io.poll().is_err());
186 }
187
188 #[test]
read_incomplete_head_multi()189 fn read_incomplete_head_multi() {
190 let mut io = FramedRead::new(
191 mock! {
192 Err(would_block()),
193 Ok(b"\x00"[..].into()),
194 Err(would_block()),
195 },
196 LengthDelimitedCodec::new(),
197 );
198
199 assert_eq!(io.poll().unwrap(), NotReady);
200 assert_eq!(io.poll().unwrap(), NotReady);
201 assert!(io.poll().is_err());
202 }
203
204 #[test]
read_incomplete_payload()205 fn read_incomplete_payload() {
206 let mut io = FramedRead::new(
207 mock! {
208 Ok(b"\x00\x00\x00\x09ab"[..].into()),
209 Err(would_block()),
210 Ok(b"cd"[..].into()),
211 Err(would_block()),
212 },
213 LengthDelimitedCodec::new(),
214 );
215
216 assert_eq!(io.poll().unwrap(), NotReady);
217 assert_eq!(io.poll().unwrap(), NotReady);
218 assert!(io.poll().is_err());
219 }
220
221 #[test]
read_max_frame_len()222 fn read_max_frame_len() {
223 let mut io = length_delimited::Builder::new()
224 .max_frame_length(5)
225 .new_read(mock! {
226 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
227 });
228
229 assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
230 }
231
232 #[test]
read_update_max_frame_len_at_rest()233 fn read_update_max_frame_len_at_rest() {
234 let mut io = length_delimited::Builder::new().new_read(mock! {
235 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
236 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
237 });
238
239 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
240 io.decoder_mut().set_max_frame_length(5);
241 assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
242 }
243
244 #[test]
read_update_max_frame_len_in_flight()245 fn read_update_max_frame_len_in_flight() {
246 let mut io = length_delimited::Builder::new().new_read(mock! {
247 Ok(b"\x00\x00\x00\x09abcd"[..].into()),
248 Err(would_block()),
249 Ok(b"efghi"[..].into()),
250 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
251 });
252
253 assert_eq!(io.poll().unwrap(), NotReady);
254 io.decoder_mut().set_max_frame_length(5);
255 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
256 assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
257 }
258
259 #[test]
read_one_byte_length_field()260 fn read_one_byte_length_field() {
261 let mut io = length_delimited::Builder::new()
262 .length_field_length(1)
263 .new_read(mock! {
264 Ok(b"\x09abcdefghi"[..].into()),
265 });
266
267 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
268 assert_eq!(io.poll().unwrap(), Ready(None));
269 }
270
271 #[test]
read_header_offset()272 fn read_header_offset() {
273 let mut io = length_delimited::Builder::new()
274 .length_field_length(2)
275 .length_field_offset(4)
276 .new_read(mock! {
277 Ok(b"zzzz\x00\x09abcdefghi"[..].into()),
278 });
279
280 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
281 assert_eq!(io.poll().unwrap(), Ready(None));
282 }
283
284 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()285 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
286 let mut data: Vec<u8> = vec![];
287 data.extend_from_slice(b"xx\x00\x09abcdefghi");
288 data.extend_from_slice(b"yy\x00\x03123");
289 data.extend_from_slice(b"zz\x00\x0bhello world");
290
291 let mut io = length_delimited::Builder::new()
292 .length_field_length(2)
293 .length_field_offset(2)
294 .num_skip(0)
295 .length_adjustment(4)
296 .new_read(mock! {
297 Ok(data.into()),
298 });
299
300 assert_eq!(
301 io.poll().unwrap(),
302 Ready(Some(b"xx\x00\x09abcdefghi"[..].into()))
303 );
304 assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into())));
305 assert_eq!(
306 io.poll().unwrap(),
307 Ready(Some(b"zz\x00\x0bhello world"[..].into()))
308 );
309 assert_eq!(io.poll().unwrap(), Ready(None));
310 }
311
312 #[test]
read_single_multi_frame_one_packet_length_includes_head()313 fn read_single_multi_frame_one_packet_length_includes_head() {
314 let mut data: Vec<u8> = vec![];
315 data.extend_from_slice(b"\x00\x0babcdefghi");
316 data.extend_from_slice(b"\x00\x05123");
317 data.extend_from_slice(b"\x00\x0dhello world");
318
319 let mut io = length_delimited::Builder::new()
320 .length_field_length(2)
321 .length_adjustment(-2)
322 .new_read(mock! {
323 Ok(data.into()),
324 });
325
326 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
327 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
328 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
329 assert_eq!(io.poll().unwrap(), Ready(None));
330 }
331
332 #[test]
write_single_frame_length_adjusted()333 fn write_single_frame_length_adjusted() {
334 let mut io = length_delimited::Builder::new()
335 .length_adjustment(-2)
336 .new_write(mock! {
337 Ok(b"\x00\x00\x00\x0b"[..].into()),
338 Ok(b"abcdefghi"[..].into()),
339 Ok(Flush),
340 });
341 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
342 assert!(io.poll_complete().unwrap().is_ready());
343 assert!(io.get_ref().calls.is_empty());
344 }
345
346 #[test]
write_nothing_yields_nothing()347 fn write_nothing_yields_nothing() {
348 let mut io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
349 assert!(io.poll_complete().unwrap().is_ready());
350 }
351
352 #[test]
write_single_frame_one_packet()353 fn write_single_frame_one_packet() {
354 let mut io = FramedWrite::new(
355 mock! {
356 Ok(b"\x00\x00\x00\x09"[..].into()),
357 Ok(b"abcdefghi"[..].into()),
358 Ok(Flush),
359 },
360 LengthDelimitedCodec::new(),
361 );
362
363 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
364 assert!(io.poll_complete().unwrap().is_ready());
365 assert!(io.get_ref().calls.is_empty());
366 }
367
368 #[test]
write_single_multi_frame_one_packet()369 fn write_single_multi_frame_one_packet() {
370 let mut io = FramedWrite::new(
371 mock! {
372 Ok(b"\x00\x00\x00\x09"[..].into()),
373 Ok(b"abcdefghi"[..].into()),
374 Ok(b"\x00\x00\x00\x03"[..].into()),
375 Ok(b"123"[..].into()),
376 Ok(b"\x00\x00\x00\x0b"[..].into()),
377 Ok(b"hello world"[..].into()),
378 Ok(Flush),
379 },
380 LengthDelimitedCodec::new(),
381 );
382
383 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
384 assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
385 assert!(io
386 .start_send(Bytes::from("hello world"))
387 .unwrap()
388 .is_ready());
389 assert!(io.poll_complete().unwrap().is_ready());
390 assert!(io.get_ref().calls.is_empty());
391 }
392
393 #[test]
write_single_multi_frame_multi_packet()394 fn write_single_multi_frame_multi_packet() {
395 let mut io = FramedWrite::new(
396 mock! {
397 Ok(b"\x00\x00\x00\x09"[..].into()),
398 Ok(b"abcdefghi"[..].into()),
399 Ok(Flush),
400 Ok(b"\x00\x00\x00\x03"[..].into()),
401 Ok(b"123"[..].into()),
402 Ok(Flush),
403 Ok(b"\x00\x00\x00\x0b"[..].into()),
404 Ok(b"hello world"[..].into()),
405 Ok(Flush),
406 },
407 LengthDelimitedCodec::new(),
408 );
409
410 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
411 assert!(io.poll_complete().unwrap().is_ready());
412 assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
413 assert!(io.poll_complete().unwrap().is_ready());
414 assert!(io
415 .start_send(Bytes::from("hello world"))
416 .unwrap()
417 .is_ready());
418 assert!(io.poll_complete().unwrap().is_ready());
419 assert!(io.get_ref().calls.is_empty());
420 }
421
422 #[test]
write_single_frame_would_block()423 fn write_single_frame_would_block() {
424 let mut io = FramedWrite::new(
425 mock! {
426 Err(would_block()),
427 Ok(b"\x00\x00"[..].into()),
428 Err(would_block()),
429 Ok(b"\x00\x09"[..].into()),
430 Ok(b"abcdefghi"[..].into()),
431 Ok(Flush),
432 },
433 LengthDelimitedCodec::new(),
434 );
435
436 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
437 assert!(!io.poll_complete().unwrap().is_ready());
438 assert!(!io.poll_complete().unwrap().is_ready());
439 assert!(io.poll_complete().unwrap().is_ready());
440
441 assert!(io.get_ref().calls.is_empty());
442 }
443
444 #[test]
write_single_frame_little_endian()445 fn write_single_frame_little_endian() {
446 let mut io = length_delimited::Builder::new()
447 .little_endian()
448 .new_write(mock! {
449 Ok(b"\x09\x00\x00\x00"[..].into()),
450 Ok(b"abcdefghi"[..].into()),
451 Ok(Flush),
452 });
453
454 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
455 assert!(io.poll_complete().unwrap().is_ready());
456 assert!(io.get_ref().calls.is_empty());
457 }
458
459 #[test]
write_single_frame_with_short_length_field()460 fn write_single_frame_with_short_length_field() {
461 let mut io = length_delimited::Builder::new()
462 .length_field_length(1)
463 .new_write(mock! {
464 Ok(b"\x09"[..].into()),
465 Ok(b"abcdefghi"[..].into()),
466 Ok(Flush),
467 });
468
469 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
470 assert!(io.poll_complete().unwrap().is_ready());
471 assert!(io.get_ref().calls.is_empty());
472 }
473
474 #[test]
write_max_frame_len()475 fn write_max_frame_len() {
476 let mut io = length_delimited::Builder::new()
477 .max_frame_length(5)
478 .new_write(mock! {});
479
480 assert_eq!(
481 io.start_send(Bytes::from("abcdef")).unwrap_err().kind(),
482 io::ErrorKind::InvalidInput
483 );
484 assert!(io.get_ref().calls.is_empty());
485 }
486
487 #[test]
write_update_max_frame_len_at_rest()488 fn write_update_max_frame_len_at_rest() {
489 let mut io = length_delimited::Builder::new().new_write(mock! {
490 Ok(b"\x00\x00\x00\x06"[..].into()),
491 Ok(b"abcdef"[..].into()),
492 Ok(Flush),
493 });
494
495 assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
496 assert!(io.poll_complete().unwrap().is_ready());
497 io.encoder_mut().set_max_frame_length(5);
498 assert_eq!(
499 io.start_send(Bytes::from("abcdef")).unwrap_err().kind(),
500 io::ErrorKind::InvalidInput
501 );
502 assert!(io.get_ref().calls.is_empty());
503 }
504
505 #[test]
write_update_max_frame_len_in_flight()506 fn write_update_max_frame_len_in_flight() {
507 let mut io = length_delimited::Builder::new().new_write(mock! {
508 Ok(b"\x00\x00\x00\x06"[..].into()),
509 Ok(b"ab"[..].into()),
510 Err(would_block()),
511 Ok(b"cdef"[..].into()),
512 Ok(Flush),
513 });
514
515 assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
516 assert!(!io.poll_complete().unwrap().is_ready());
517 io.encoder_mut().set_max_frame_length(5);
518 assert!(io.poll_complete().unwrap().is_ready());
519 assert_eq!(
520 io.start_send(Bytes::from("abcdef")).unwrap_err().kind(),
521 io::ErrorKind::InvalidInput
522 );
523 assert!(io.get_ref().calls.is_empty());
524 }
525
526 #[test]
write_zero()527 fn write_zero() {
528 let mut io = length_delimited::Builder::new().new_write(mock! {});
529
530 assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
531 assert_eq!(
532 io.poll_complete().unwrap_err().kind(),
533 io::ErrorKind::WriteZero
534 );
535 assert!(io.get_ref().calls.is_empty());
536 }
537
538 #[test]
encode_overflow()539 fn encode_overflow() {
540 // Test reproducing tokio-rs/tokio#681.
541 let mut codec = length_delimited::Builder::new().new_codec();
542 let mut buf = BytesMut::with_capacity(1024);
543
544 // Put some data into the buffer without resizing it to hold more.
545 let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
546 buf.put_slice(&some_as[..]);
547
548 // Trying to encode the length header should resize the buffer if it won't fit.
549 codec.encode(Bytes::from("hello"), &mut buf).unwrap();
550 }
551
552 // ===== Test utils =====
553
would_block() -> io::Error554 fn would_block() -> io::Error {
555 io::Error::new(io::ErrorKind::WouldBlock, "would block")
556 }
557
558 struct Mock {
559 calls: VecDeque<io::Result<Op>>,
560 }
561
562 enum Op {
563 Data(Vec<u8>),
564 Flush,
565 }
566
567 use self::Op::*;
568
569 impl io::Read for Mock {
read(&mut self, dst: &mut [u8]) -> io::Result<usize>570 fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
571 match self.calls.pop_front() {
572 Some(Ok(Op::Data(data))) => {
573 debug_assert!(dst.len() >= data.len());
574 dst[..data.len()].copy_from_slice(&data[..]);
575 Ok(data.len())
576 }
577 Some(Ok(_)) => panic!(),
578 Some(Err(e)) => Err(e),
579 None => Ok(0),
580 }
581 }
582 }
583
584 impl AsyncRead for Mock {}
585
586 impl io::Write for Mock {
write(&mut self, src: &[u8]) -> io::Result<usize>587 fn write(&mut self, src: &[u8]) -> io::Result<usize> {
588 match self.calls.pop_front() {
589 Some(Ok(Op::Data(data))) => {
590 let len = data.len();
591 assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
592 assert_eq!(&data[..], &src[..len]);
593 Ok(len)
594 }
595 Some(Ok(_)) => panic!(),
596 Some(Err(e)) => Err(e),
597 None => Ok(0),
598 }
599 }
600
flush(&mut self) -> io::Result<()>601 fn flush(&mut self) -> io::Result<()> {
602 match self.calls.pop_front() {
603 Some(Ok(Op::Flush)) => Ok(()),
604 Some(Ok(_)) => panic!(),
605 Some(Err(e)) => Err(e),
606 None => Ok(()),
607 }
608 }
609 }
610
611 impl AsyncWrite for Mock {
shutdown(&mut self) -> Poll<(), io::Error>612 fn shutdown(&mut self) -> Poll<(), io::Error> {
613 Ok(Ready(()))
614 }
615 }
616
617 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op618 fn from(src: &'a [u8]) -> Op {
619 Op::Data(src.into())
620 }
621 }
622
623 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op624 fn from(src: Vec<u8>) -> Op {
625 Op::Data(src)
626 }
627 }
628