1 extern crate bytes;
2 extern crate futures;
3 extern crate tokio;
4 
5 use tokio::codec::*;
6 use tokio::io::{AsyncRead, AsyncWrite};
7 
8 use bytes::{BufMut, Bytes, BytesMut};
9 use futures::Async::*;
10 use futures::{Poll, Sink, Stream};
11 
12 use std::collections::VecDeque;
13 use std::io;
14 
15 macro_rules! mock {
16     ($($x:expr,)*) => {{
17         let mut v = VecDeque::new();
18         v.extend(vec![$($x),*]);
19         Mock { calls: v }
20     }};
21 }
22 
23 #[test]
read_empty_io_yields_nothing()24 fn read_empty_io_yields_nothing() {
25     let mut io = FramedRead::new(mock!(), LengthDelimitedCodec::new());
26 
27     assert_eq!(io.poll().unwrap(), Ready(None));
28 }
29 
30 #[test]
read_single_frame_one_packet()31 fn read_single_frame_one_packet() {
32     let mut io = FramedRead::new(
33         mock! {
34             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
35         },
36         LengthDelimitedCodec::new(),
37     );
38 
39     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
40     assert_eq!(io.poll().unwrap(), Ready(None));
41 }
42 
43 #[test]
read_single_frame_one_packet_little_endian()44 fn read_single_frame_one_packet_little_endian() {
45     let mut io = length_delimited::Builder::new()
46         .little_endian()
47         .new_read(mock! {
48             Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()),
49         });
50 
51     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
52     assert_eq!(io.poll().unwrap(), Ready(None));
53 }
54 
55 #[test]
read_single_frame_one_packet_native_endian()56 fn read_single_frame_one_packet_native_endian() {
57     let data = if cfg!(target_endian = "big") {
58         b"\x00\x00\x00\x09abcdefghi"
59     } else {
60         b"\x09\x00\x00\x00abcdefghi"
61     };
62     let mut io = length_delimited::Builder::new()
63         .native_endian()
64         .new_read(mock! {
65             Ok(data[..].into()),
66         });
67 
68     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
69     assert_eq!(io.poll().unwrap(), Ready(None));
70 }
71 
72 #[test]
read_single_multi_frame_one_packet()73 fn read_single_multi_frame_one_packet() {
74     let mut data: Vec<u8> = vec![];
75     data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
76     data.extend_from_slice(b"\x00\x00\x00\x03123");
77     data.extend_from_slice(b"\x00\x00\x00\x0bhello world");
78 
79     let mut io = FramedRead::new(
80         mock! {
81             Ok(data.into()),
82         },
83         LengthDelimitedCodec::new(),
84     );
85 
86     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
87     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
88     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
89     assert_eq!(io.poll().unwrap(), Ready(None));
90 }
91 
92 #[test]
read_single_frame_multi_packet()93 fn read_single_frame_multi_packet() {
94     let mut io = FramedRead::new(
95         mock! {
96             Ok(b"\x00\x00"[..].into()),
97             Ok(b"\x00\x09abc"[..].into()),
98             Ok(b"defghi"[..].into()),
99         },
100         LengthDelimitedCodec::new(),
101     );
102 
103     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
104     assert_eq!(io.poll().unwrap(), Ready(None));
105 }
106 
107 #[test]
read_multi_frame_multi_packet()108 fn read_multi_frame_multi_packet() {
109     let mut io = FramedRead::new(
110         mock! {
111             Ok(b"\x00\x00"[..].into()),
112             Ok(b"\x00\x09abc"[..].into()),
113             Ok(b"defghi"[..].into()),
114             Ok(b"\x00\x00\x00\x0312"[..].into()),
115             Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
116         },
117         LengthDelimitedCodec::new(),
118     );
119 
120     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
121     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
122     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
123     assert_eq!(io.poll().unwrap(), Ready(None));
124 }
125 
126 #[test]
read_single_frame_multi_packet_wait()127 fn read_single_frame_multi_packet_wait() {
128     let mut io = FramedRead::new(
129         mock! {
130             Ok(b"\x00\x00"[..].into()),
131             Err(would_block()),
132             Ok(b"\x00\x09abc"[..].into()),
133             Err(would_block()),
134             Ok(b"defghi"[..].into()),
135             Err(would_block()),
136         },
137         LengthDelimitedCodec::new(),
138     );
139 
140     assert_eq!(io.poll().unwrap(), NotReady);
141     assert_eq!(io.poll().unwrap(), NotReady);
142     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
143     assert_eq!(io.poll().unwrap(), NotReady);
144     assert_eq!(io.poll().unwrap(), Ready(None));
145 }
146 
147 #[test]
read_multi_frame_multi_packet_wait()148 fn read_multi_frame_multi_packet_wait() {
149     let mut io = FramedRead::new(
150         mock! {
151             Ok(b"\x00\x00"[..].into()),
152             Err(would_block()),
153             Ok(b"\x00\x09abc"[..].into()),
154             Err(would_block()),
155             Ok(b"defghi"[..].into()),
156             Err(would_block()),
157             Ok(b"\x00\x00\x00\x0312"[..].into()),
158             Err(would_block()),
159             Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
160             Err(would_block()),
161         },
162         LengthDelimitedCodec::new(),
163     );
164 
165     assert_eq!(io.poll().unwrap(), NotReady);
166     assert_eq!(io.poll().unwrap(), NotReady);
167     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
168     assert_eq!(io.poll().unwrap(), NotReady);
169     assert_eq!(io.poll().unwrap(), NotReady);
170     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
171     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
172     assert_eq!(io.poll().unwrap(), NotReady);
173     assert_eq!(io.poll().unwrap(), Ready(None));
174 }
175 
176 #[test]
read_incomplete_head()177 fn read_incomplete_head() {
178     let mut io = FramedRead::new(
179         mock! {
180             Ok(b"\x00\x00"[..].into()),
181         },
182         LengthDelimitedCodec::new(),
183     );
184 
185     assert!(io.poll().is_err());
186 }
187 
188 #[test]
read_incomplete_head_multi()189 fn read_incomplete_head_multi() {
190     let mut io = FramedRead::new(
191         mock! {
192             Err(would_block()),
193             Ok(b"\x00"[..].into()),
194             Err(would_block()),
195         },
196         LengthDelimitedCodec::new(),
197     );
198 
199     assert_eq!(io.poll().unwrap(), NotReady);
200     assert_eq!(io.poll().unwrap(), NotReady);
201     assert!(io.poll().is_err());
202 }
203 
204 #[test]
read_incomplete_payload()205 fn read_incomplete_payload() {
206     let mut io = FramedRead::new(
207         mock! {
208             Ok(b"\x00\x00\x00\x09ab"[..].into()),
209             Err(would_block()),
210             Ok(b"cd"[..].into()),
211             Err(would_block()),
212         },
213         LengthDelimitedCodec::new(),
214     );
215 
216     assert_eq!(io.poll().unwrap(), NotReady);
217     assert_eq!(io.poll().unwrap(), NotReady);
218     assert!(io.poll().is_err());
219 }
220 
221 #[test]
read_max_frame_len()222 fn read_max_frame_len() {
223     let mut io = length_delimited::Builder::new()
224         .max_frame_length(5)
225         .new_read(mock! {
226             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
227         });
228 
229     assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
230 }
231 
232 #[test]
read_update_max_frame_len_at_rest()233 fn read_update_max_frame_len_at_rest() {
234     let mut io = length_delimited::Builder::new().new_read(mock! {
235         Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
236         Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
237     });
238 
239     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
240     io.decoder_mut().set_max_frame_length(5);
241     assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
242 }
243 
244 #[test]
read_update_max_frame_len_in_flight()245 fn read_update_max_frame_len_in_flight() {
246     let mut io = length_delimited::Builder::new().new_read(mock! {
247         Ok(b"\x00\x00\x00\x09abcd"[..].into()),
248         Err(would_block()),
249         Ok(b"efghi"[..].into()),
250         Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
251     });
252 
253     assert_eq!(io.poll().unwrap(), NotReady);
254     io.decoder_mut().set_max_frame_length(5);
255     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
256     assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
257 }
258 
259 #[test]
read_one_byte_length_field()260 fn read_one_byte_length_field() {
261     let mut io = length_delimited::Builder::new()
262         .length_field_length(1)
263         .new_read(mock! {
264             Ok(b"\x09abcdefghi"[..].into()),
265         });
266 
267     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
268     assert_eq!(io.poll().unwrap(), Ready(None));
269 }
270 
271 #[test]
read_header_offset()272 fn read_header_offset() {
273     let mut io = length_delimited::Builder::new()
274         .length_field_length(2)
275         .length_field_offset(4)
276         .new_read(mock! {
277             Ok(b"zzzz\x00\x09abcdefghi"[..].into()),
278         });
279 
280     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
281     assert_eq!(io.poll().unwrap(), Ready(None));
282 }
283 
284 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()285 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
286     let mut data: Vec<u8> = vec![];
287     data.extend_from_slice(b"xx\x00\x09abcdefghi");
288     data.extend_from_slice(b"yy\x00\x03123");
289     data.extend_from_slice(b"zz\x00\x0bhello world");
290 
291     let mut io = length_delimited::Builder::new()
292         .length_field_length(2)
293         .length_field_offset(2)
294         .num_skip(0)
295         .length_adjustment(4)
296         .new_read(mock! {
297             Ok(data.into()),
298         });
299 
300     assert_eq!(
301         io.poll().unwrap(),
302         Ready(Some(b"xx\x00\x09abcdefghi"[..].into()))
303     );
304     assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into())));
305     assert_eq!(
306         io.poll().unwrap(),
307         Ready(Some(b"zz\x00\x0bhello world"[..].into()))
308     );
309     assert_eq!(io.poll().unwrap(), Ready(None));
310 }
311 
312 #[test]
read_single_multi_frame_one_packet_length_includes_head()313 fn read_single_multi_frame_one_packet_length_includes_head() {
314     let mut data: Vec<u8> = vec![];
315     data.extend_from_slice(b"\x00\x0babcdefghi");
316     data.extend_from_slice(b"\x00\x05123");
317     data.extend_from_slice(b"\x00\x0dhello world");
318 
319     let mut io = length_delimited::Builder::new()
320         .length_field_length(2)
321         .length_adjustment(-2)
322         .new_read(mock! {
323             Ok(data.into()),
324         });
325 
326     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
327     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
328     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
329     assert_eq!(io.poll().unwrap(), Ready(None));
330 }
331 
332 #[test]
write_single_frame_length_adjusted()333 fn write_single_frame_length_adjusted() {
334     let mut io = length_delimited::Builder::new()
335         .length_adjustment(-2)
336         .new_write(mock! {
337             Ok(b"\x00\x00\x00\x0b"[..].into()),
338             Ok(b"abcdefghi"[..].into()),
339             Ok(Flush),
340         });
341     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
342     assert!(io.poll_complete().unwrap().is_ready());
343     assert!(io.get_ref().calls.is_empty());
344 }
345 
346 #[test]
write_nothing_yields_nothing()347 fn write_nothing_yields_nothing() {
348     let mut io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
349     assert!(io.poll_complete().unwrap().is_ready());
350 }
351 
352 #[test]
write_single_frame_one_packet()353 fn write_single_frame_one_packet() {
354     let mut io = FramedWrite::new(
355         mock! {
356             Ok(b"\x00\x00\x00\x09"[..].into()),
357             Ok(b"abcdefghi"[..].into()),
358             Ok(Flush),
359         },
360         LengthDelimitedCodec::new(),
361     );
362 
363     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
364     assert!(io.poll_complete().unwrap().is_ready());
365     assert!(io.get_ref().calls.is_empty());
366 }
367 
368 #[test]
write_single_multi_frame_one_packet()369 fn write_single_multi_frame_one_packet() {
370     let mut io = FramedWrite::new(
371         mock! {
372             Ok(b"\x00\x00\x00\x09"[..].into()),
373             Ok(b"abcdefghi"[..].into()),
374             Ok(b"\x00\x00\x00\x03"[..].into()),
375             Ok(b"123"[..].into()),
376             Ok(b"\x00\x00\x00\x0b"[..].into()),
377             Ok(b"hello world"[..].into()),
378             Ok(Flush),
379         },
380         LengthDelimitedCodec::new(),
381     );
382 
383     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
384     assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
385     assert!(io
386         .start_send(Bytes::from("hello world"))
387         .unwrap()
388         .is_ready());
389     assert!(io.poll_complete().unwrap().is_ready());
390     assert!(io.get_ref().calls.is_empty());
391 }
392 
393 #[test]
write_single_multi_frame_multi_packet()394 fn write_single_multi_frame_multi_packet() {
395     let mut io = FramedWrite::new(
396         mock! {
397             Ok(b"\x00\x00\x00\x09"[..].into()),
398             Ok(b"abcdefghi"[..].into()),
399             Ok(Flush),
400             Ok(b"\x00\x00\x00\x03"[..].into()),
401             Ok(b"123"[..].into()),
402             Ok(Flush),
403             Ok(b"\x00\x00\x00\x0b"[..].into()),
404             Ok(b"hello world"[..].into()),
405             Ok(Flush),
406         },
407         LengthDelimitedCodec::new(),
408     );
409 
410     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
411     assert!(io.poll_complete().unwrap().is_ready());
412     assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
413     assert!(io.poll_complete().unwrap().is_ready());
414     assert!(io
415         .start_send(Bytes::from("hello world"))
416         .unwrap()
417         .is_ready());
418     assert!(io.poll_complete().unwrap().is_ready());
419     assert!(io.get_ref().calls.is_empty());
420 }
421 
422 #[test]
write_single_frame_would_block()423 fn write_single_frame_would_block() {
424     let mut io = FramedWrite::new(
425         mock! {
426             Err(would_block()),
427             Ok(b"\x00\x00"[..].into()),
428             Err(would_block()),
429             Ok(b"\x00\x09"[..].into()),
430             Ok(b"abcdefghi"[..].into()),
431             Ok(Flush),
432         },
433         LengthDelimitedCodec::new(),
434     );
435 
436     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
437     assert!(!io.poll_complete().unwrap().is_ready());
438     assert!(!io.poll_complete().unwrap().is_ready());
439     assert!(io.poll_complete().unwrap().is_ready());
440 
441     assert!(io.get_ref().calls.is_empty());
442 }
443 
444 #[test]
write_single_frame_little_endian()445 fn write_single_frame_little_endian() {
446     let mut io = length_delimited::Builder::new()
447         .little_endian()
448         .new_write(mock! {
449             Ok(b"\x09\x00\x00\x00"[..].into()),
450             Ok(b"abcdefghi"[..].into()),
451             Ok(Flush),
452         });
453 
454     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
455     assert!(io.poll_complete().unwrap().is_ready());
456     assert!(io.get_ref().calls.is_empty());
457 }
458 
459 #[test]
write_single_frame_with_short_length_field()460 fn write_single_frame_with_short_length_field() {
461     let mut io = length_delimited::Builder::new()
462         .length_field_length(1)
463         .new_write(mock! {
464             Ok(b"\x09"[..].into()),
465             Ok(b"abcdefghi"[..].into()),
466             Ok(Flush),
467         });
468 
469     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
470     assert!(io.poll_complete().unwrap().is_ready());
471     assert!(io.get_ref().calls.is_empty());
472 }
473 
474 #[test]
write_max_frame_len()475 fn write_max_frame_len() {
476     let mut io = length_delimited::Builder::new()
477         .max_frame_length(5)
478         .new_write(mock! {});
479 
480     assert_eq!(
481         io.start_send(Bytes::from("abcdef")).unwrap_err().kind(),
482         io::ErrorKind::InvalidInput
483     );
484     assert!(io.get_ref().calls.is_empty());
485 }
486 
487 #[test]
write_update_max_frame_len_at_rest()488 fn write_update_max_frame_len_at_rest() {
489     let mut io = length_delimited::Builder::new().new_write(mock! {
490         Ok(b"\x00\x00\x00\x06"[..].into()),
491         Ok(b"abcdef"[..].into()),
492         Ok(Flush),
493     });
494 
495     assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
496     assert!(io.poll_complete().unwrap().is_ready());
497     io.encoder_mut().set_max_frame_length(5);
498     assert_eq!(
499         io.start_send(Bytes::from("abcdef")).unwrap_err().kind(),
500         io::ErrorKind::InvalidInput
501     );
502     assert!(io.get_ref().calls.is_empty());
503 }
504 
505 #[test]
write_update_max_frame_len_in_flight()506 fn write_update_max_frame_len_in_flight() {
507     let mut io = length_delimited::Builder::new().new_write(mock! {
508         Ok(b"\x00\x00\x00\x06"[..].into()),
509         Ok(b"ab"[..].into()),
510         Err(would_block()),
511         Ok(b"cdef"[..].into()),
512         Ok(Flush),
513     });
514 
515     assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
516     assert!(!io.poll_complete().unwrap().is_ready());
517     io.encoder_mut().set_max_frame_length(5);
518     assert!(io.poll_complete().unwrap().is_ready());
519     assert_eq!(
520         io.start_send(Bytes::from("abcdef")).unwrap_err().kind(),
521         io::ErrorKind::InvalidInput
522     );
523     assert!(io.get_ref().calls.is_empty());
524 }
525 
526 #[test]
write_zero()527 fn write_zero() {
528     let mut io = length_delimited::Builder::new().new_write(mock! {});
529 
530     assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
531     assert_eq!(
532         io.poll_complete().unwrap_err().kind(),
533         io::ErrorKind::WriteZero
534     );
535     assert!(io.get_ref().calls.is_empty());
536 }
537 
538 #[test]
encode_overflow()539 fn encode_overflow() {
540     // Test reproducing tokio-rs/tokio#681.
541     let mut codec = length_delimited::Builder::new().new_codec();
542     let mut buf = BytesMut::with_capacity(1024);
543 
544     // Put some data into the buffer without resizing it to hold more.
545     let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
546     buf.put_slice(&some_as[..]);
547 
548     // Trying to encode the length header should resize the buffer if it won't fit.
549     codec.encode(Bytes::from("hello"), &mut buf).unwrap();
550 }
551 
552 // ===== Test utils =====
553 
would_block() -> io::Error554 fn would_block() -> io::Error {
555     io::Error::new(io::ErrorKind::WouldBlock, "would block")
556 }
557 
558 struct Mock {
559     calls: VecDeque<io::Result<Op>>,
560 }
561 
562 enum Op {
563     Data(Vec<u8>),
564     Flush,
565 }
566 
567 use self::Op::*;
568 
569 impl io::Read for Mock {
read(&mut self, dst: &mut [u8]) -> io::Result<usize>570     fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
571         match self.calls.pop_front() {
572             Some(Ok(Op::Data(data))) => {
573                 debug_assert!(dst.len() >= data.len());
574                 dst[..data.len()].copy_from_slice(&data[..]);
575                 Ok(data.len())
576             }
577             Some(Ok(_)) => panic!(),
578             Some(Err(e)) => Err(e),
579             None => Ok(0),
580         }
581     }
582 }
583 
584 impl AsyncRead for Mock {}
585 
586 impl io::Write for Mock {
write(&mut self, src: &[u8]) -> io::Result<usize>587     fn write(&mut self, src: &[u8]) -> io::Result<usize> {
588         match self.calls.pop_front() {
589             Some(Ok(Op::Data(data))) => {
590                 let len = data.len();
591                 assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
592                 assert_eq!(&data[..], &src[..len]);
593                 Ok(len)
594             }
595             Some(Ok(_)) => panic!(),
596             Some(Err(e)) => Err(e),
597             None => Ok(0),
598         }
599     }
600 
flush(&mut self) -> io::Result<()>601     fn flush(&mut self) -> io::Result<()> {
602         match self.calls.pop_front() {
603             Some(Ok(Op::Flush)) => Ok(()),
604             Some(Ok(_)) => panic!(),
605             Some(Err(e)) => Err(e),
606             None => Ok(()),
607         }
608     }
609 }
610 
611 impl AsyncWrite for Mock {
shutdown(&mut self) -> Poll<(), io::Error>612     fn shutdown(&mut self) -> Poll<(), io::Error> {
613         Ok(Ready(()))
614     }
615 }
616 
617 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op618     fn from(src: &'a [u8]) -> Op {
619         Op::Data(src.into())
620     }
621 }
622 
623 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op624     fn from(src: Vec<u8>) -> Op {
625         Op::Data(src)
626     }
627 }
628