1 extern crate tokio;
2 extern crate futures;
3 extern crate bytes;
4 
5 use tokio::io::{AsyncRead, AsyncWrite};
6 use tokio::codec::*;
7 
8 use bytes::Bytes;
9 use futures::{Stream, Sink, Poll};
10 use futures::Async::*;
11 
12 use std::io;
13 use std::collections::VecDeque;
14 
15 macro_rules! mock {
16     ($($x:expr,)*) => {{
17         let mut v = VecDeque::new();
18         v.extend(vec![$($x),*]);
19         Mock { calls: v }
20     }};
21 }
22 
23 
24 #[test]
read_empty_io_yields_nothing()25 fn read_empty_io_yields_nothing() {
26     let mut io = FramedRead::new(mock!(), LengthDelimitedCodec::new());
27 
28     assert_eq!(io.poll().unwrap(), Ready(None));
29 }
30 
31 #[test]
read_single_frame_one_packet()32 fn read_single_frame_one_packet() {
33     let mut io = FramedRead::new(mock! {
34         Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
35     }, LengthDelimitedCodec::new());
36 
37     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
38     assert_eq!(io.poll().unwrap(), Ready(None));
39 }
40 
41 #[test]
read_single_frame_one_packet_little_endian()42 fn read_single_frame_one_packet_little_endian() {
43     let mut io = length_delimited::Builder::new()
44         .little_endian()
45         .new_read(mock! {
46             Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()),
47         });
48 
49     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
50     assert_eq!(io.poll().unwrap(), Ready(None));
51 }
52 
53 #[test]
read_single_frame_one_packet_native_endian()54 fn read_single_frame_one_packet_native_endian() {
55     let data = if cfg!(target_endian = "big") {
56         b"\x00\x00\x00\x09abcdefghi"
57     } else {
58         b"\x09\x00\x00\x00abcdefghi"
59     };
60     let mut io = length_delimited::Builder::new()
61         .native_endian()
62         .new_read(mock! {
63             Ok(data[..].into()),
64         });
65 
66     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
67     assert_eq!(io.poll().unwrap(), Ready(None));
68 }
69 
70 #[test]
read_single_multi_frame_one_packet()71 fn read_single_multi_frame_one_packet() {
72     let mut data: Vec<u8> = vec![];
73     data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
74     data.extend_from_slice(b"\x00\x00\x00\x03123");
75     data.extend_from_slice(b"\x00\x00\x00\x0bhello world");
76 
77     let mut io = FramedRead::new(mock! {
78         Ok(data.into()),
79     }, LengthDelimitedCodec::new());
80 
81     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
82     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
83     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
84     assert_eq!(io.poll().unwrap(), Ready(None));
85 }
86 
87 #[test]
read_single_frame_multi_packet()88 fn read_single_frame_multi_packet() {
89     let mut io = FramedRead::new(mock! {
90         Ok(b"\x00\x00"[..].into()),
91         Ok(b"\x00\x09abc"[..].into()),
92         Ok(b"defghi"[..].into()),
93     }, LengthDelimitedCodec::new());
94 
95     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
96     assert_eq!(io.poll().unwrap(), Ready(None));
97 }
98 
99 #[test]
read_multi_frame_multi_packet()100 fn read_multi_frame_multi_packet() {
101     let mut io = FramedRead::new(mock! {
102         Ok(b"\x00\x00"[..].into()),
103         Ok(b"\x00\x09abc"[..].into()),
104         Ok(b"defghi"[..].into()),
105         Ok(b"\x00\x00\x00\x0312"[..].into()),
106         Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
107     }, LengthDelimitedCodec::new());
108 
109     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
110     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
111     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
112     assert_eq!(io.poll().unwrap(), Ready(None));
113 }
114 
115 #[test]
read_single_frame_multi_packet_wait()116 fn read_single_frame_multi_packet_wait() {
117     let mut io = FramedRead::new(mock! {
118         Ok(b"\x00\x00"[..].into()),
119         Err(would_block()),
120         Ok(b"\x00\x09abc"[..].into()),
121         Err(would_block()),
122         Ok(b"defghi"[..].into()),
123         Err(would_block()),
124     }, LengthDelimitedCodec::new());
125 
126     assert_eq!(io.poll().unwrap(), NotReady);
127     assert_eq!(io.poll().unwrap(), NotReady);
128     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
129     assert_eq!(io.poll().unwrap(), NotReady);
130     assert_eq!(io.poll().unwrap(), Ready(None));
131 }
132 
133 #[test]
read_multi_frame_multi_packet_wait()134 fn read_multi_frame_multi_packet_wait() {
135     let mut io = FramedRead::new(mock! {
136         Ok(b"\x00\x00"[..].into()),
137         Err(would_block()),
138         Ok(b"\x00\x09abc"[..].into()),
139         Err(would_block()),
140         Ok(b"defghi"[..].into()),
141         Err(would_block()),
142         Ok(b"\x00\x00\x00\x0312"[..].into()),
143         Err(would_block()),
144         Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
145         Err(would_block()),
146     }, LengthDelimitedCodec::new());
147 
148 
149     assert_eq!(io.poll().unwrap(), NotReady);
150     assert_eq!(io.poll().unwrap(), NotReady);
151     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
152     assert_eq!(io.poll().unwrap(), NotReady);
153     assert_eq!(io.poll().unwrap(), NotReady);
154     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
155     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
156     assert_eq!(io.poll().unwrap(), NotReady);
157     assert_eq!(io.poll().unwrap(), Ready(None));
158 }
159 
160 #[test]
read_incomplete_head()161 fn read_incomplete_head() {
162     let mut io = FramedRead::new(mock! {
163         Ok(b"\x00\x00"[..].into()),
164     }, LengthDelimitedCodec::new());
165 
166     assert!(io.poll().is_err());
167 }
168 
169 #[test]
read_incomplete_head_multi()170 fn read_incomplete_head_multi() {
171     let mut io = FramedRead::new(mock! {
172         Err(would_block()),
173         Ok(b"\x00"[..].into()),
174         Err(would_block()),
175     }, LengthDelimitedCodec::new());
176 
177     assert_eq!(io.poll().unwrap(), NotReady);
178     assert_eq!(io.poll().unwrap(), NotReady);
179     assert!(io.poll().is_err());
180 }
181 
182 #[test]
read_incomplete_payload()183 fn read_incomplete_payload() {
184     let mut io = FramedRead::new(mock! {
185         Ok(b"\x00\x00\x00\x09ab"[..].into()),
186         Err(would_block()),
187         Ok(b"cd"[..].into()),
188         Err(would_block()),
189     }, LengthDelimitedCodec::new());
190 
191     assert_eq!(io.poll().unwrap(), NotReady);
192     assert_eq!(io.poll().unwrap(), NotReady);
193     assert!(io.poll().is_err());
194 }
195 
196 #[test]
read_max_frame_len()197 fn read_max_frame_len() {
198     let mut io = length_delimited::Builder::new()
199         .max_frame_length(5)
200         .new_read(mock! {
201             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
202         });
203 
204     assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
205 }
206 
207 #[test]
read_update_max_frame_len_at_rest()208 fn read_update_max_frame_len_at_rest() {
209     let mut io = length_delimited::Builder::new()
210         .new_read(mock! {
211             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
212             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
213         });
214 
215     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
216     io.decoder_mut().set_max_frame_length(5);
217     assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
218 }
219 
220 #[test]
read_update_max_frame_len_in_flight()221 fn read_update_max_frame_len_in_flight() {
222     let mut io = length_delimited::Builder::new()
223         .new_read(mock! {
224             Ok(b"\x00\x00\x00\x09abcd"[..].into()),
225             Err(would_block()),
226             Ok(b"efghi"[..].into()),
227             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
228         });
229 
230     assert_eq!(io.poll().unwrap(), NotReady);
231     io.decoder_mut().set_max_frame_length(5);
232     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
233     assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
234 }
235 
236 #[test]
read_one_byte_length_field()237 fn read_one_byte_length_field() {
238     let mut io = length_delimited::Builder::new()
239         .length_field_length(1)
240         .new_read(mock! {
241             Ok(b"\x09abcdefghi"[..].into()),
242         });
243 
244     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
245     assert_eq!(io.poll().unwrap(), Ready(None));
246 }
247 
248 #[test]
read_header_offset()249 fn read_header_offset() {
250     let mut io = length_delimited::Builder::new()
251         .length_field_length(2)
252         .length_field_offset(4)
253         .new_read(mock! {
254             Ok(b"zzzz\x00\x09abcdefghi"[..].into()),
255         });
256 
257     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
258     assert_eq!(io.poll().unwrap(), Ready(None));
259 }
260 
261 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()262 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
263     let mut data: Vec<u8> = vec![];
264     data.extend_from_slice(b"xx\x00\x09abcdefghi");
265     data.extend_from_slice(b"yy\x00\x03123");
266     data.extend_from_slice(b"zz\x00\x0bhello world");
267 
268     let mut io = length_delimited::Builder::new()
269         .length_field_length(2)
270         .length_field_offset(2)
271         .num_skip(0)
272         .length_adjustment(4)
273         .new_read(mock! {
274             Ok(data.into()),
275         });
276 
277     assert_eq!(io.poll().unwrap(), Ready(Some(b"xx\x00\x09abcdefghi"[..].into())));
278     assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into())));
279     assert_eq!(io.poll().unwrap(), Ready(Some(b"zz\x00\x0bhello world"[..].into())));
280     assert_eq!(io.poll().unwrap(), Ready(None));
281 }
282 
283 #[test]
read_single_multi_frame_one_packet_length_includes_head()284 fn read_single_multi_frame_one_packet_length_includes_head() {
285     let mut data: Vec<u8> = vec![];
286     data.extend_from_slice(b"\x00\x0babcdefghi");
287     data.extend_from_slice(b"\x00\x05123");
288     data.extend_from_slice(b"\x00\x0dhello world");
289 
290     let mut io = length_delimited::Builder::new()
291         .length_field_length(2)
292         .length_adjustment(-2)
293         .new_read(mock! {
294             Ok(data.into()),
295         });
296 
297     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
298     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
299     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
300     assert_eq!(io.poll().unwrap(), Ready(None));
301 }
302 
303 #[test]
write_single_frame_length_adjusted()304 fn write_single_frame_length_adjusted() {
305     let mut io = length_delimited::Builder::new()
306         .length_adjustment(-2)
307         .new_write(mock! {
308             Ok(b"\x00\x00\x00\x0b"[..].into()),
309             Ok(b"abcdefghi"[..].into()),
310             Ok(Flush),
311         });
312     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
313     assert!(io.poll_complete().unwrap().is_ready());
314     assert!(io.get_ref().calls.is_empty());
315 }
316 
317 #[test]
write_nothing_yields_nothing()318 fn write_nothing_yields_nothing() {
319     let mut io = FramedWrite::new(
320         mock!(),
321         LengthDelimitedCodec::new()
322     );
323     assert!(io.poll_complete().unwrap().is_ready());
324 }
325 
326 #[test]
write_single_frame_one_packet()327 fn write_single_frame_one_packet() {
328     let mut io = FramedWrite::new(mock! {
329         Ok(b"\x00\x00\x00\x09"[..].into()),
330         Ok(b"abcdefghi"[..].into()),
331         Ok(Flush),
332     }, LengthDelimitedCodec::new());
333 
334     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
335     assert!(io.poll_complete().unwrap().is_ready());
336     assert!(io.get_ref().calls.is_empty());
337 }
338 
339 #[test]
write_single_multi_frame_one_packet()340 fn write_single_multi_frame_one_packet() {
341     let mut io = FramedWrite::new(mock! {
342         Ok(b"\x00\x00\x00\x09"[..].into()),
343         Ok(b"abcdefghi"[..].into()),
344         Ok(b"\x00\x00\x00\x03"[..].into()),
345         Ok(b"123"[..].into()),
346         Ok(b"\x00\x00\x00\x0b"[..].into()),
347         Ok(b"hello world"[..].into()),
348         Ok(Flush),
349     }, LengthDelimitedCodec::new());
350 
351     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
352     assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
353     assert!(io.start_send(Bytes::from("hello world")).unwrap().is_ready());
354     assert!(io.poll_complete().unwrap().is_ready());
355     assert!(io.get_ref().calls.is_empty());
356 }
357 
358 #[test]
write_single_multi_frame_multi_packet()359 fn write_single_multi_frame_multi_packet() {
360     let mut io = FramedWrite::new(mock! {
361         Ok(b"\x00\x00\x00\x09"[..].into()),
362         Ok(b"abcdefghi"[..].into()),
363         Ok(Flush),
364         Ok(b"\x00\x00\x00\x03"[..].into()),
365         Ok(b"123"[..].into()),
366         Ok(Flush),
367         Ok(b"\x00\x00\x00\x0b"[..].into()),
368         Ok(b"hello world"[..].into()),
369         Ok(Flush),
370     }, LengthDelimitedCodec::new());
371 
372     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
373     assert!(io.poll_complete().unwrap().is_ready());
374     assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
375     assert!(io.poll_complete().unwrap().is_ready());
376     assert!(io.start_send(Bytes::from("hello world")).unwrap().is_ready());
377     assert!(io.poll_complete().unwrap().is_ready());
378     assert!(io.get_ref().calls.is_empty());
379 }
380 
381 #[test]
write_single_frame_would_block()382 fn write_single_frame_would_block() {
383     let mut io = FramedWrite::new(mock! {
384         Err(would_block()),
385         Ok(b"\x00\x00"[..].into()),
386         Err(would_block()),
387         Ok(b"\x00\x09"[..].into()),
388         Ok(b"abcdefghi"[..].into()),
389         Ok(Flush),
390     }, LengthDelimitedCodec::new());
391 
392     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
393     assert!(!io.poll_complete().unwrap().is_ready());
394     assert!(!io.poll_complete().unwrap().is_ready());
395     assert!(io.poll_complete().unwrap().is_ready());
396 
397     assert!(io.get_ref().calls.is_empty());
398 }
399 
400 #[test]
write_single_frame_little_endian()401 fn write_single_frame_little_endian() {
402     let mut io = length_delimited::Builder::new()
403         .little_endian()
404         .new_write(mock! {
405             Ok(b"\x09\x00\x00\x00"[..].into()),
406             Ok(b"abcdefghi"[..].into()),
407             Ok(Flush),
408         });
409 
410     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
411     assert!(io.poll_complete().unwrap().is_ready());
412     assert!(io.get_ref().calls.is_empty());
413 }
414 
415 
416 #[test]
write_single_frame_with_short_length_field()417 fn write_single_frame_with_short_length_field() {
418     let mut io = length_delimited::Builder::new()
419         .length_field_length(1)
420         .new_write(mock! {
421             Ok(b"\x09"[..].into()),
422             Ok(b"abcdefghi"[..].into()),
423             Ok(Flush),
424         });
425 
426     assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
427     assert!(io.poll_complete().unwrap().is_ready());
428     assert!(io.get_ref().calls.is_empty());
429 }
430 
431 #[test]
write_max_frame_len()432 fn write_max_frame_len() {
433     let mut io = length_delimited::Builder::new()
434         .max_frame_length(5)
435         .new_write(mock! { });
436 
437     assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput);
438     assert!(io.get_ref().calls.is_empty());
439 }
440 
441 #[test]
write_update_max_frame_len_at_rest()442 fn write_update_max_frame_len_at_rest() {
443     let mut io = length_delimited::Builder::new()
444         .new_write(mock! {
445             Ok(b"\x00\x00\x00\x06"[..].into()),
446             Ok(b"abcdef"[..].into()),
447             Ok(Flush),
448         });
449 
450     assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
451     assert!(io.poll_complete().unwrap().is_ready());
452     io.encoder_mut().set_max_frame_length(5);
453     assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput);
454     assert!(io.get_ref().calls.is_empty());
455 }
456 
457 #[test]
write_update_max_frame_len_in_flight()458 fn write_update_max_frame_len_in_flight() {
459     let mut io = length_delimited::Builder::new()
460         .new_write(mock! {
461             Ok(b"\x00\x00\x00\x06"[..].into()),
462             Ok(b"ab"[..].into()),
463             Err(would_block()),
464             Ok(b"cdef"[..].into()),
465             Ok(Flush),
466         });
467 
468     assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
469     assert!(!io.poll_complete().unwrap().is_ready());
470     io.encoder_mut().set_max_frame_length(5);
471     assert!(io.poll_complete().unwrap().is_ready());
472     assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput);
473     assert!(io.get_ref().calls.is_empty());
474 }
475 
476 #[test]
write_zero()477 fn write_zero() {
478     let mut io = length_delimited::Builder::new()
479         .new_write(mock! { });
480 
481     assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
482     assert_eq!(io.poll_complete().unwrap_err().kind(), io::ErrorKind::WriteZero);
483     assert!(io.get_ref().calls.is_empty());
484 }
485 
486 // ===== Test utils =====
487 
would_block() -> io::Error488 fn would_block() -> io::Error {
489     io::Error::new(io::ErrorKind::WouldBlock, "would block")
490 }
491 
492 struct Mock {
493     calls: VecDeque<io::Result<Op>>,
494 }
495 
496 enum Op {
497     Data(Vec<u8>),
498     Flush,
499 }
500 
501 use self::Op::*;
502 
503 impl io::Read for Mock {
read(&mut self, dst: &mut [u8]) -> io::Result<usize>504     fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
505         match self.calls.pop_front() {
506             Some(Ok(Op::Data(data))) => {
507                 debug_assert!(dst.len() >= data.len());
508                 dst[..data.len()].copy_from_slice(&data[..]);
509                 Ok(data.len())
510             }
511             Some(Ok(_)) => panic!(),
512             Some(Err(e)) => Err(e),
513             None => Ok(0),
514         }
515     }
516 }
517 
518 impl AsyncRead for Mock {
519 }
520 
521 impl io::Write for Mock {
write(&mut self, src: &[u8]) -> io::Result<usize>522     fn write(&mut self, src: &[u8]) -> io::Result<usize> {
523         match self.calls.pop_front() {
524             Some(Ok(Op::Data(data))) => {
525                 let len = data.len();
526                 assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
527                 assert_eq!(&data[..], &src[..len]);
528                 Ok(len)
529             }
530             Some(Ok(_)) => panic!(),
531             Some(Err(e)) => Err(e),
532             None => Ok(0),
533         }
534     }
535 
flush(&mut self) -> io::Result<()>536     fn flush(&mut self) -> io::Result<()> {
537         match self.calls.pop_front() {
538             Some(Ok(Op::Flush)) => {
539                 Ok(())
540             }
541             Some(Ok(_)) => panic!(),
542             Some(Err(e)) => Err(e),
543             None => Ok(()),
544         }
545     }
546 }
547 
548 impl AsyncWrite for Mock {
shutdown(&mut self) -> Poll<(), io::Error>549     fn shutdown(&mut self) -> Poll<(), io::Error> {
550         Ok(Ready(()))
551     }
552 }
553 
554 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op555     fn from(src: &'a [u8]) -> Op {
556         Op::Data(src.into())
557     }
558 }
559 
560 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op561     fn from(src: Vec<u8>) -> Op {
562         Op::Data(src)
563     }
564 }
565