1 extern crate tokio_io;
2 extern crate futures;
3 
4 use tokio_io::{AsyncRead, AsyncWrite};
5 use tokio_io::codec::length_delimited::*;
6 
7 use futures::{Stream, Sink, Poll};
8 use futures::Async::*;
9 
10 use std::io;
11 use std::collections::VecDeque;
12 
13 macro_rules! mock {
14     ($($x:expr,)*) => {{
15         let mut v = VecDeque::new();
16         v.extend(vec![$($x),*]);
17         Mock { calls: v }
18     }};
19 }
20 
21 
22 #[test]
read_empty_io_yields_nothing()23 fn read_empty_io_yields_nothing() {
24     let mut io = FramedRead::new(mock!());
25 
26     assert_eq!(io.poll().unwrap(), Ready(None));
27 }
28 
29 #[test]
read_single_frame_one_packet()30 fn read_single_frame_one_packet() {
31     let mut io = FramedRead::new(mock! {
32         Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
33     });
34 
35     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
36     assert_eq!(io.poll().unwrap(), Ready(None));
37 }
38 
39 #[test]
read_single_frame_one_packet_little_endian()40 fn read_single_frame_one_packet_little_endian() {
41     let mut io = Builder::new()
42         .little_endian()
43         .new_read(mock! {
44             Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()),
45         });
46 
47     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
48     assert_eq!(io.poll().unwrap(), Ready(None));
49 }
50 
51 #[test]
read_single_frame_one_packet_native_endian()52 fn read_single_frame_one_packet_native_endian() {
53     let data = if cfg!(target_endian = "big") {
54         b"\x00\x00\x00\x09abcdefghi"
55     } else {
56         b"\x09\x00\x00\x00abcdefghi"
57     };
58     let mut io = Builder::new()
59         .native_endian()
60         .new_read(mock! {
61             Ok(data[..].into()),
62         });
63 
64     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
65     assert_eq!(io.poll().unwrap(), Ready(None));
66 }
67 
68 #[test]
read_single_multi_frame_one_packet()69 fn read_single_multi_frame_one_packet() {
70     let mut data: Vec<u8> = vec![];
71     data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
72     data.extend_from_slice(b"\x00\x00\x00\x03123");
73     data.extend_from_slice(b"\x00\x00\x00\x0bhello world");
74 
75     let mut io = FramedRead::new(mock! {
76         Ok(data.into()),
77     });
78 
79     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
80     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
81     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
82     assert_eq!(io.poll().unwrap(), Ready(None));
83 }
84 
85 #[test]
read_single_frame_multi_packet()86 fn read_single_frame_multi_packet() {
87     let mut io = FramedRead::new(mock! {
88         Ok(b"\x00\x00"[..].into()),
89         Ok(b"\x00\x09abc"[..].into()),
90         Ok(b"defghi"[..].into()),
91     });
92 
93     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
94     assert_eq!(io.poll().unwrap(), Ready(None));
95 }
96 
97 #[test]
read_multi_frame_multi_packet()98 fn read_multi_frame_multi_packet() {
99     let mut io = FramedRead::new(mock! {
100         Ok(b"\x00\x00"[..].into()),
101         Ok(b"\x00\x09abc"[..].into()),
102         Ok(b"defghi"[..].into()),
103         Ok(b"\x00\x00\x00\x0312"[..].into()),
104         Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
105     });
106 
107     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
108     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
109     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
110     assert_eq!(io.poll().unwrap(), Ready(None));
111 }
112 
113 #[test]
read_single_frame_multi_packet_wait()114 fn read_single_frame_multi_packet_wait() {
115     let mut io = FramedRead::new(mock! {
116         Ok(b"\x00\x00"[..].into()),
117         Err(would_block()),
118         Ok(b"\x00\x09abc"[..].into()),
119         Err(would_block()),
120         Ok(b"defghi"[..].into()),
121         Err(would_block()),
122     });
123 
124     assert_eq!(io.poll().unwrap(), NotReady);
125     assert_eq!(io.poll().unwrap(), NotReady);
126     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
127     assert_eq!(io.poll().unwrap(), NotReady);
128     assert_eq!(io.poll().unwrap(), Ready(None));
129 }
130 
131 #[test]
read_multi_frame_multi_packet_wait()132 fn read_multi_frame_multi_packet_wait() {
133     let mut io = FramedRead::new(mock! {
134         Ok(b"\x00\x00"[..].into()),
135         Err(would_block()),
136         Ok(b"\x00\x09abc"[..].into()),
137         Err(would_block()),
138         Ok(b"defghi"[..].into()),
139         Err(would_block()),
140         Ok(b"\x00\x00\x00\x0312"[..].into()),
141         Err(would_block()),
142         Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
143         Err(would_block()),
144     });
145 
146 
147     assert_eq!(io.poll().unwrap(), NotReady);
148     assert_eq!(io.poll().unwrap(), NotReady);
149     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
150     assert_eq!(io.poll().unwrap(), NotReady);
151     assert_eq!(io.poll().unwrap(), NotReady);
152     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
153     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
154     assert_eq!(io.poll().unwrap(), NotReady);
155     assert_eq!(io.poll().unwrap(), Ready(None));
156 }
157 
158 #[test]
read_incomplete_head()159 fn read_incomplete_head() {
160     let mut io = FramedRead::new(mock! {
161         Ok(b"\x00\x00"[..].into()),
162     });
163 
164     assert!(io.poll().is_err());
165 }
166 
167 #[test]
read_incomplete_head_multi()168 fn read_incomplete_head_multi() {
169     let mut io = FramedRead::new(mock! {
170         Err(would_block()),
171         Ok(b"\x00"[..].into()),
172         Err(would_block()),
173     });
174 
175     assert_eq!(io.poll().unwrap(), NotReady);
176     assert_eq!(io.poll().unwrap(), NotReady);
177     assert!(io.poll().is_err());
178 }
179 
180 #[test]
read_incomplete_payload()181 fn read_incomplete_payload() {
182     let mut io = FramedRead::new(mock! {
183         Ok(b"\x00\x00\x00\x09ab"[..].into()),
184         Err(would_block()),
185         Ok(b"cd"[..].into()),
186         Err(would_block()),
187     });
188 
189     assert_eq!(io.poll().unwrap(), NotReady);
190     assert_eq!(io.poll().unwrap(), NotReady);
191     assert!(io.poll().is_err());
192 }
193 
194 #[test]
read_max_frame_len()195 fn read_max_frame_len() {
196     let mut io = Builder::new()
197         .max_frame_length(5)
198         .new_read(mock! {
199             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
200         });
201 
202     assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
203 }
204 
205 #[test]
read_update_max_frame_len_at_rest()206 fn read_update_max_frame_len_at_rest() {
207     let mut io = Builder::new()
208         .new_read(mock! {
209             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
210             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
211         });
212 
213     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
214     io.set_max_frame_length(5);
215     assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
216 }
217 
218 #[test]
read_update_max_frame_len_in_flight()219 fn read_update_max_frame_len_in_flight() {
220     let mut io = Builder::new()
221         .new_read(mock! {
222             Ok(b"\x00\x00\x00\x09abcd"[..].into()),
223             Err(would_block()),
224             Ok(b"efghi"[..].into()),
225             Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
226         });
227 
228     assert_eq!(io.poll().unwrap(), NotReady);
229     io.set_max_frame_length(5);
230     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
231     assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
232 }
233 
234 #[test]
read_one_byte_length_field()235 fn read_one_byte_length_field() {
236     let mut io = Builder::new()
237         .length_field_length(1)
238         .new_read(mock! {
239             Ok(b"\x09abcdefghi"[..].into()),
240         });
241 
242     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
243     assert_eq!(io.poll().unwrap(), Ready(None));
244 }
245 
246 #[test]
read_header_offset()247 fn read_header_offset() {
248     let mut io = Builder::new()
249         .length_field_length(2)
250         .length_field_offset(4)
251         .new_read(mock! {
252             Ok(b"zzzz\x00\x09abcdefghi"[..].into()),
253         });
254 
255     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
256     assert_eq!(io.poll().unwrap(), Ready(None));
257 }
258 
259 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()260 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
261     let mut data: Vec<u8> = vec![];
262     data.extend_from_slice(b"xx\x00\x09abcdefghi");
263     data.extend_from_slice(b"yy\x00\x03123");
264     data.extend_from_slice(b"zz\x00\x0bhello world");
265 
266     let mut io = Builder::new()
267         .length_field_length(2)
268         .length_field_offset(2)
269         .num_skip(0)
270         .length_adjustment(4)
271         .new_read(mock! {
272             Ok(data.into()),
273         });
274 
275     assert_eq!(io.poll().unwrap(), Ready(Some(b"xx\x00\x09abcdefghi"[..].into())));
276     assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into())));
277     assert_eq!(io.poll().unwrap(), Ready(Some(b"zz\x00\x0bhello world"[..].into())));
278     assert_eq!(io.poll().unwrap(), Ready(None));
279 }
280 
281 #[test]
read_single_multi_frame_one_packet_length_includes_head()282 fn read_single_multi_frame_one_packet_length_includes_head() {
283     let mut data: Vec<u8> = vec![];
284     data.extend_from_slice(b"\x00\x0babcdefghi");
285     data.extend_from_slice(b"\x00\x05123");
286     data.extend_from_slice(b"\x00\x0dhello world");
287 
288     let mut io = Builder::new()
289         .length_field_length(2)
290         .length_adjustment(-2)
291         .new_read(mock! {
292             Ok(data.into()),
293         });
294 
295     assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
296     assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
297     assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
298     assert_eq!(io.poll().unwrap(), Ready(None));
299 }
300 
301 #[test]
write_single_frame_length_adjusted()302 fn write_single_frame_length_adjusted() {
303     let mut io = Builder::new()
304         .length_adjustment(-2)
305         .new_write(mock! {
306             Ok(b"\x00\x00\x00\x0b"[..].into()),
307             Ok(b"abcdefghi"[..].into()),
308             Ok(Flush),
309         });
310     assert!(io.start_send("abcdefghi").unwrap().is_ready());
311     assert!(io.poll_complete().unwrap().is_ready());
312     assert!(io.get_ref().calls.is_empty());
313 }
314 
315 #[test]
write_nothing_yields_nothing()316 fn write_nothing_yields_nothing() {
317     let mut io: FramedWrite<_, &'static [u8]> = FramedWrite::new(mock!());
318     assert!(io.poll_complete().unwrap().is_ready());
319 }
320 
321 #[test]
write_single_frame_one_packet()322 fn write_single_frame_one_packet() {
323     let mut io = FramedWrite::new(mock! {
324         Ok(b"\x00\x00\x00\x09"[..].into()),
325         Ok(b"abcdefghi"[..].into()),
326         Ok(Flush),
327     });
328 
329     assert!(io.start_send("abcdefghi").unwrap().is_ready());
330     assert!(io.poll_complete().unwrap().is_ready());
331     assert!(io.get_ref().calls.is_empty());
332 }
333 
334 #[test]
write_single_multi_frame_one_packet()335 fn write_single_multi_frame_one_packet() {
336     let mut io = FramedWrite::new(mock! {
337         Ok(b"\x00\x00\x00\x09"[..].into()),
338         Ok(b"abcdefghi"[..].into()),
339         Ok(b"\x00\x00\x00\x03"[..].into()),
340         Ok(b"123"[..].into()),
341         Ok(b"\x00\x00\x00\x0b"[..].into()),
342         Ok(b"hello world"[..].into()),
343         Ok(Flush),
344     });
345 
346     assert!(io.start_send("abcdefghi").unwrap().is_ready());
347     assert!(io.start_send("123").unwrap().is_ready());
348     assert!(io.start_send("hello world").unwrap().is_ready());
349     assert!(io.poll_complete().unwrap().is_ready());
350     assert!(io.get_ref().calls.is_empty());
351 }
352 
353 #[test]
write_single_multi_frame_multi_packet()354 fn write_single_multi_frame_multi_packet() {
355     let mut io = FramedWrite::new(mock! {
356         Ok(b"\x00\x00\x00\x09"[..].into()),
357         Ok(b"abcdefghi"[..].into()),
358         Ok(Flush),
359         Ok(b"\x00\x00\x00\x03"[..].into()),
360         Ok(b"123"[..].into()),
361         Ok(Flush),
362         Ok(b"\x00\x00\x00\x0b"[..].into()),
363         Ok(b"hello world"[..].into()),
364         Ok(Flush),
365     });
366 
367     assert!(io.start_send("abcdefghi").unwrap().is_ready());
368     assert!(io.poll_complete().unwrap().is_ready());
369     assert!(io.start_send("123").unwrap().is_ready());
370     assert!(io.poll_complete().unwrap().is_ready());
371     assert!(io.start_send("hello world").unwrap().is_ready());
372     assert!(io.poll_complete().unwrap().is_ready());
373     assert!(io.get_ref().calls.is_empty());
374 }
375 
376 #[test]
write_single_frame_would_block()377 fn write_single_frame_would_block() {
378     let mut io = FramedWrite::new(mock! {
379         Err(would_block()),
380         Ok(b"\x00\x00"[..].into()),
381         Err(would_block()),
382         Ok(b"\x00\x09"[..].into()),
383         Ok(b"abcdefghi"[..].into()),
384         Ok(Flush),
385     });
386 
387     assert!(io.start_send("abcdefghi").unwrap().is_ready());
388     assert!(!io.poll_complete().unwrap().is_ready());
389     assert!(!io.poll_complete().unwrap().is_ready());
390     assert!(io.poll_complete().unwrap().is_ready());
391 
392     assert!(io.get_ref().calls.is_empty());
393 }
394 
395 #[test]
write_single_frame_little_endian()396 fn write_single_frame_little_endian() {
397     let mut io = Builder::new()
398         .little_endian()
399         .new_write(mock! {
400             Ok(b"\x09\x00\x00\x00"[..].into()),
401             Ok(b"abcdefghi"[..].into()),
402             Ok(Flush),
403         });
404 
405     assert!(io.start_send("abcdefghi").unwrap().is_ready());
406     assert!(io.poll_complete().unwrap().is_ready());
407     assert!(io.get_ref().calls.is_empty());
408 }
409 
410 
411 #[test]
write_single_frame_with_short_length_field()412 fn write_single_frame_with_short_length_field() {
413     let mut io = Builder::new()
414         .length_field_length(1)
415         .new_write(mock! {
416             Ok(b"\x09"[..].into()),
417             Ok(b"abcdefghi"[..].into()),
418             Ok(Flush),
419         });
420 
421     assert!(io.start_send("abcdefghi").unwrap().is_ready());
422     assert!(io.poll_complete().unwrap().is_ready());
423     assert!(io.get_ref().calls.is_empty());
424 }
425 
426 #[test]
write_max_frame_len()427 fn write_max_frame_len() {
428     let mut io = Builder::new()
429         .max_frame_length(5)
430         .new_write(mock! { });
431 
432     assert_eq!(io.start_send("abcdef").unwrap_err().kind(), io::ErrorKind::InvalidInput);
433     assert!(io.get_ref().calls.is_empty());
434 }
435 
436 #[test]
write_update_max_frame_len_at_rest()437 fn write_update_max_frame_len_at_rest() {
438     let mut io = Builder::new()
439         .new_write(mock! {
440             Ok(b"\x00\x00\x00\x06"[..].into()),
441             Ok(b"abcdef"[..].into()),
442             Ok(Flush),
443         });
444 
445     assert!(io.start_send("abcdef").unwrap().is_ready());
446     assert!(io.poll_complete().unwrap().is_ready());
447     io.set_max_frame_length(5);
448     assert_eq!(io.start_send("abcdef").unwrap_err().kind(), io::ErrorKind::InvalidInput);
449     assert!(io.get_ref().calls.is_empty());
450 }
451 
452 #[test]
write_update_max_frame_len_in_flight()453 fn write_update_max_frame_len_in_flight() {
454     let mut io = Builder::new()
455         .new_write(mock! {
456             Ok(b"\x00\x00\x00\x06"[..].into()),
457             Ok(b"ab"[..].into()),
458             Err(would_block()),
459             Ok(b"cdef"[..].into()),
460             Ok(Flush),
461         });
462 
463     assert!(io.start_send("abcdef").unwrap().is_ready());
464     assert!(!io.poll_complete().unwrap().is_ready());
465     io.set_max_frame_length(5);
466     assert!(io.poll_complete().unwrap().is_ready());
467     assert_eq!(io.start_send("abcdef").unwrap_err().kind(), io::ErrorKind::InvalidInput);
468     assert!(io.get_ref().calls.is_empty());
469 }
470 
471 // ===== Test utils =====
472 
would_block() -> io::Error473 fn would_block() -> io::Error {
474     io::Error::new(io::ErrorKind::WouldBlock, "would block")
475 }
476 
477 struct Mock {
478     calls: VecDeque<io::Result<Op>>,
479 }
480 
481 enum Op {
482     Data(Vec<u8>),
483     Flush,
484 }
485 
486 use self::Op::*;
487 
488 impl io::Read for Mock {
read(&mut self, dst: &mut [u8]) -> io::Result<usize>489     fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
490         match self.calls.pop_front() {
491             Some(Ok(Op::Data(data))) => {
492                 debug_assert!(dst.len() >= data.len());
493                 dst[..data.len()].copy_from_slice(&data[..]);
494                 Ok(data.len())
495             }
496             Some(Ok(_)) => panic!(),
497             Some(Err(e)) => Err(e),
498             None => Ok(0),
499         }
500     }
501 }
502 
503 impl AsyncRead for Mock {
504 }
505 
506 impl io::Write for Mock {
write(&mut self, src: &[u8]) -> io::Result<usize>507     fn write(&mut self, src: &[u8]) -> io::Result<usize> {
508         match self.calls.pop_front() {
509             Some(Ok(Op::Data(data))) => {
510                 let len = data.len();
511                 assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
512                 assert_eq!(&data[..], &src[..len]);
513                 Ok(len)
514             }
515             Some(Ok(_)) => panic!(),
516             Some(Err(e)) => Err(e),
517             None => Ok(0),
518         }
519     }
520 
flush(&mut self) -> io::Result<()>521     fn flush(&mut self) -> io::Result<()> {
522         match self.calls.pop_front() {
523             Some(Ok(Op::Flush)) => {
524                 Ok(())
525             }
526             Some(Ok(_)) => panic!(),
527             Some(Err(e)) => Err(e),
528             None => Ok(()),
529         }
530     }
531 }
532 
533 impl AsyncWrite for Mock {
shutdown(&mut self) -> Poll<(), io::Error>534     fn shutdown(&mut self) -> Poll<(), io::Error> {
535         Ok(Ready(()))
536     }
537 }
538 
539 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op540     fn from(src: &'a [u8]) -> Op {
541         Op::Data(src.into())
542     }
543 }
544 
545 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op546     fn from(src: Vec<u8>) -> Op {
547         Op::Data(src)
548     }
549 }
550