1 extern crate tokio;
2 extern crate futures;
3 extern crate bytes;
4
5 use tokio::io::{AsyncRead, AsyncWrite};
6 use tokio::codec::*;
7
8 use bytes::Bytes;
9 use futures::{Stream, Sink, Poll};
10 use futures::Async::*;
11
12 use std::io;
13 use std::collections::VecDeque;
14
15 macro_rules! mock {
16 ($($x:expr,)*) => {{
17 let mut v = VecDeque::new();
18 v.extend(vec![$($x),*]);
19 Mock { calls: v }
20 }};
21 }
22
23
24 #[test]
read_empty_io_yields_nothing()25 fn read_empty_io_yields_nothing() {
26 let mut io = FramedRead::new(mock!(), LengthDelimitedCodec::new());
27
28 assert_eq!(io.poll().unwrap(), Ready(None));
29 }
30
31 #[test]
read_single_frame_one_packet()32 fn read_single_frame_one_packet() {
33 let mut io = FramedRead::new(mock! {
34 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
35 }, LengthDelimitedCodec::new());
36
37 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
38 assert_eq!(io.poll().unwrap(), Ready(None));
39 }
40
41 #[test]
read_single_frame_one_packet_little_endian()42 fn read_single_frame_one_packet_little_endian() {
43 let mut io = length_delimited::Builder::new()
44 .little_endian()
45 .new_read(mock! {
46 Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()),
47 });
48
49 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
50 assert_eq!(io.poll().unwrap(), Ready(None));
51 }
52
53 #[test]
read_single_frame_one_packet_native_endian()54 fn read_single_frame_one_packet_native_endian() {
55 let data = if cfg!(target_endian = "big") {
56 b"\x00\x00\x00\x09abcdefghi"
57 } else {
58 b"\x09\x00\x00\x00abcdefghi"
59 };
60 let mut io = length_delimited::Builder::new()
61 .native_endian()
62 .new_read(mock! {
63 Ok(data[..].into()),
64 });
65
66 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
67 assert_eq!(io.poll().unwrap(), Ready(None));
68 }
69
70 #[test]
read_single_multi_frame_one_packet()71 fn read_single_multi_frame_one_packet() {
72 let mut data: Vec<u8> = vec![];
73 data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
74 data.extend_from_slice(b"\x00\x00\x00\x03123");
75 data.extend_from_slice(b"\x00\x00\x00\x0bhello world");
76
77 let mut io = FramedRead::new(mock! {
78 Ok(data.into()),
79 }, LengthDelimitedCodec::new());
80
81 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
82 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
83 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
84 assert_eq!(io.poll().unwrap(), Ready(None));
85 }
86
87 #[test]
read_single_frame_multi_packet()88 fn read_single_frame_multi_packet() {
89 let mut io = FramedRead::new(mock! {
90 Ok(b"\x00\x00"[..].into()),
91 Ok(b"\x00\x09abc"[..].into()),
92 Ok(b"defghi"[..].into()),
93 }, LengthDelimitedCodec::new());
94
95 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
96 assert_eq!(io.poll().unwrap(), Ready(None));
97 }
98
99 #[test]
read_multi_frame_multi_packet()100 fn read_multi_frame_multi_packet() {
101 let mut io = FramedRead::new(mock! {
102 Ok(b"\x00\x00"[..].into()),
103 Ok(b"\x00\x09abc"[..].into()),
104 Ok(b"defghi"[..].into()),
105 Ok(b"\x00\x00\x00\x0312"[..].into()),
106 Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
107 }, LengthDelimitedCodec::new());
108
109 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
110 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
111 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
112 assert_eq!(io.poll().unwrap(), Ready(None));
113 }
114
115 #[test]
read_single_frame_multi_packet_wait()116 fn read_single_frame_multi_packet_wait() {
117 let mut io = FramedRead::new(mock! {
118 Ok(b"\x00\x00"[..].into()),
119 Err(would_block()),
120 Ok(b"\x00\x09abc"[..].into()),
121 Err(would_block()),
122 Ok(b"defghi"[..].into()),
123 Err(would_block()),
124 }, LengthDelimitedCodec::new());
125
126 assert_eq!(io.poll().unwrap(), NotReady);
127 assert_eq!(io.poll().unwrap(), NotReady);
128 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
129 assert_eq!(io.poll().unwrap(), NotReady);
130 assert_eq!(io.poll().unwrap(), Ready(None));
131 }
132
133 #[test]
read_multi_frame_multi_packet_wait()134 fn read_multi_frame_multi_packet_wait() {
135 let mut io = FramedRead::new(mock! {
136 Ok(b"\x00\x00"[..].into()),
137 Err(would_block()),
138 Ok(b"\x00\x09abc"[..].into()),
139 Err(would_block()),
140 Ok(b"defghi"[..].into()),
141 Err(would_block()),
142 Ok(b"\x00\x00\x00\x0312"[..].into()),
143 Err(would_block()),
144 Ok(b"3\x00\x00\x00\x0bhello world"[..].into()),
145 Err(would_block()),
146 }, LengthDelimitedCodec::new());
147
148
149 assert_eq!(io.poll().unwrap(), NotReady);
150 assert_eq!(io.poll().unwrap(), NotReady);
151 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
152 assert_eq!(io.poll().unwrap(), NotReady);
153 assert_eq!(io.poll().unwrap(), NotReady);
154 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
155 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
156 assert_eq!(io.poll().unwrap(), NotReady);
157 assert_eq!(io.poll().unwrap(), Ready(None));
158 }
159
160 #[test]
read_incomplete_head()161 fn read_incomplete_head() {
162 let mut io = FramedRead::new(mock! {
163 Ok(b"\x00\x00"[..].into()),
164 }, LengthDelimitedCodec::new());
165
166 assert!(io.poll().is_err());
167 }
168
169 #[test]
read_incomplete_head_multi()170 fn read_incomplete_head_multi() {
171 let mut io = FramedRead::new(mock! {
172 Err(would_block()),
173 Ok(b"\x00"[..].into()),
174 Err(would_block()),
175 }, LengthDelimitedCodec::new());
176
177 assert_eq!(io.poll().unwrap(), NotReady);
178 assert_eq!(io.poll().unwrap(), NotReady);
179 assert!(io.poll().is_err());
180 }
181
182 #[test]
read_incomplete_payload()183 fn read_incomplete_payload() {
184 let mut io = FramedRead::new(mock! {
185 Ok(b"\x00\x00\x00\x09ab"[..].into()),
186 Err(would_block()),
187 Ok(b"cd"[..].into()),
188 Err(would_block()),
189 }, LengthDelimitedCodec::new());
190
191 assert_eq!(io.poll().unwrap(), NotReady);
192 assert_eq!(io.poll().unwrap(), NotReady);
193 assert!(io.poll().is_err());
194 }
195
196 #[test]
read_max_frame_len()197 fn read_max_frame_len() {
198 let mut io = length_delimited::Builder::new()
199 .max_frame_length(5)
200 .new_read(mock! {
201 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
202 });
203
204 assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
205 }
206
207 #[test]
read_update_max_frame_len_at_rest()208 fn read_update_max_frame_len_at_rest() {
209 let mut io = length_delimited::Builder::new()
210 .new_read(mock! {
211 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
212 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
213 });
214
215 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
216 io.decoder_mut().set_max_frame_length(5);
217 assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
218 }
219
220 #[test]
read_update_max_frame_len_in_flight()221 fn read_update_max_frame_len_in_flight() {
222 let mut io = length_delimited::Builder::new()
223 .new_read(mock! {
224 Ok(b"\x00\x00\x00\x09abcd"[..].into()),
225 Err(would_block()),
226 Ok(b"efghi"[..].into()),
227 Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()),
228 });
229
230 assert_eq!(io.poll().unwrap(), NotReady);
231 io.decoder_mut().set_max_frame_length(5);
232 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
233 assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData);
234 }
235
236 #[test]
read_one_byte_length_field()237 fn read_one_byte_length_field() {
238 let mut io = length_delimited::Builder::new()
239 .length_field_length(1)
240 .new_read(mock! {
241 Ok(b"\x09abcdefghi"[..].into()),
242 });
243
244 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
245 assert_eq!(io.poll().unwrap(), Ready(None));
246 }
247
248 #[test]
read_header_offset()249 fn read_header_offset() {
250 let mut io = length_delimited::Builder::new()
251 .length_field_length(2)
252 .length_field_offset(4)
253 .new_read(mock! {
254 Ok(b"zzzz\x00\x09abcdefghi"[..].into()),
255 });
256
257 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
258 assert_eq!(io.poll().unwrap(), Ready(None));
259 }
260
261 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()262 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
263 let mut data: Vec<u8> = vec![];
264 data.extend_from_slice(b"xx\x00\x09abcdefghi");
265 data.extend_from_slice(b"yy\x00\x03123");
266 data.extend_from_slice(b"zz\x00\x0bhello world");
267
268 let mut io = length_delimited::Builder::new()
269 .length_field_length(2)
270 .length_field_offset(2)
271 .num_skip(0)
272 .length_adjustment(4)
273 .new_read(mock! {
274 Ok(data.into()),
275 });
276
277 assert_eq!(io.poll().unwrap(), Ready(Some(b"xx\x00\x09abcdefghi"[..].into())));
278 assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into())));
279 assert_eq!(io.poll().unwrap(), Ready(Some(b"zz\x00\x0bhello world"[..].into())));
280 assert_eq!(io.poll().unwrap(), Ready(None));
281 }
282
283 #[test]
read_single_multi_frame_one_packet_length_includes_head()284 fn read_single_multi_frame_one_packet_length_includes_head() {
285 let mut data: Vec<u8> = vec![];
286 data.extend_from_slice(b"\x00\x0babcdefghi");
287 data.extend_from_slice(b"\x00\x05123");
288 data.extend_from_slice(b"\x00\x0dhello world");
289
290 let mut io = length_delimited::Builder::new()
291 .length_field_length(2)
292 .length_adjustment(-2)
293 .new_read(mock! {
294 Ok(data.into()),
295 });
296
297 assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into())));
298 assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into())));
299 assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into())));
300 assert_eq!(io.poll().unwrap(), Ready(None));
301 }
302
303 #[test]
write_single_frame_length_adjusted()304 fn write_single_frame_length_adjusted() {
305 let mut io = length_delimited::Builder::new()
306 .length_adjustment(-2)
307 .new_write(mock! {
308 Ok(b"\x00\x00\x00\x0b"[..].into()),
309 Ok(b"abcdefghi"[..].into()),
310 Ok(Flush),
311 });
312 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
313 assert!(io.poll_complete().unwrap().is_ready());
314 assert!(io.get_ref().calls.is_empty());
315 }
316
317 #[test]
write_nothing_yields_nothing()318 fn write_nothing_yields_nothing() {
319 let mut io = FramedWrite::new(
320 mock!(),
321 LengthDelimitedCodec::new()
322 );
323 assert!(io.poll_complete().unwrap().is_ready());
324 }
325
326 #[test]
write_single_frame_one_packet()327 fn write_single_frame_one_packet() {
328 let mut io = FramedWrite::new(mock! {
329 Ok(b"\x00\x00\x00\x09"[..].into()),
330 Ok(b"abcdefghi"[..].into()),
331 Ok(Flush),
332 }, LengthDelimitedCodec::new());
333
334 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
335 assert!(io.poll_complete().unwrap().is_ready());
336 assert!(io.get_ref().calls.is_empty());
337 }
338
339 #[test]
write_single_multi_frame_one_packet()340 fn write_single_multi_frame_one_packet() {
341 let mut io = FramedWrite::new(mock! {
342 Ok(b"\x00\x00\x00\x09"[..].into()),
343 Ok(b"abcdefghi"[..].into()),
344 Ok(b"\x00\x00\x00\x03"[..].into()),
345 Ok(b"123"[..].into()),
346 Ok(b"\x00\x00\x00\x0b"[..].into()),
347 Ok(b"hello world"[..].into()),
348 Ok(Flush),
349 }, LengthDelimitedCodec::new());
350
351 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
352 assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
353 assert!(io.start_send(Bytes::from("hello world")).unwrap().is_ready());
354 assert!(io.poll_complete().unwrap().is_ready());
355 assert!(io.get_ref().calls.is_empty());
356 }
357
358 #[test]
write_single_multi_frame_multi_packet()359 fn write_single_multi_frame_multi_packet() {
360 let mut io = FramedWrite::new(mock! {
361 Ok(b"\x00\x00\x00\x09"[..].into()),
362 Ok(b"abcdefghi"[..].into()),
363 Ok(Flush),
364 Ok(b"\x00\x00\x00\x03"[..].into()),
365 Ok(b"123"[..].into()),
366 Ok(Flush),
367 Ok(b"\x00\x00\x00\x0b"[..].into()),
368 Ok(b"hello world"[..].into()),
369 Ok(Flush),
370 }, LengthDelimitedCodec::new());
371
372 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
373 assert!(io.poll_complete().unwrap().is_ready());
374 assert!(io.start_send(Bytes::from("123")).unwrap().is_ready());
375 assert!(io.poll_complete().unwrap().is_ready());
376 assert!(io.start_send(Bytes::from("hello world")).unwrap().is_ready());
377 assert!(io.poll_complete().unwrap().is_ready());
378 assert!(io.get_ref().calls.is_empty());
379 }
380
381 #[test]
write_single_frame_would_block()382 fn write_single_frame_would_block() {
383 let mut io = FramedWrite::new(mock! {
384 Err(would_block()),
385 Ok(b"\x00\x00"[..].into()),
386 Err(would_block()),
387 Ok(b"\x00\x09"[..].into()),
388 Ok(b"abcdefghi"[..].into()),
389 Ok(Flush),
390 }, LengthDelimitedCodec::new());
391
392 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
393 assert!(!io.poll_complete().unwrap().is_ready());
394 assert!(!io.poll_complete().unwrap().is_ready());
395 assert!(io.poll_complete().unwrap().is_ready());
396
397 assert!(io.get_ref().calls.is_empty());
398 }
399
400 #[test]
write_single_frame_little_endian()401 fn write_single_frame_little_endian() {
402 let mut io = length_delimited::Builder::new()
403 .little_endian()
404 .new_write(mock! {
405 Ok(b"\x09\x00\x00\x00"[..].into()),
406 Ok(b"abcdefghi"[..].into()),
407 Ok(Flush),
408 });
409
410 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
411 assert!(io.poll_complete().unwrap().is_ready());
412 assert!(io.get_ref().calls.is_empty());
413 }
414
415
416 #[test]
write_single_frame_with_short_length_field()417 fn write_single_frame_with_short_length_field() {
418 let mut io = length_delimited::Builder::new()
419 .length_field_length(1)
420 .new_write(mock! {
421 Ok(b"\x09"[..].into()),
422 Ok(b"abcdefghi"[..].into()),
423 Ok(Flush),
424 });
425
426 assert!(io.start_send(Bytes::from("abcdefghi")).unwrap().is_ready());
427 assert!(io.poll_complete().unwrap().is_ready());
428 assert!(io.get_ref().calls.is_empty());
429 }
430
431 #[test]
write_max_frame_len()432 fn write_max_frame_len() {
433 let mut io = length_delimited::Builder::new()
434 .max_frame_length(5)
435 .new_write(mock! { });
436
437 assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput);
438 assert!(io.get_ref().calls.is_empty());
439 }
440
441 #[test]
write_update_max_frame_len_at_rest()442 fn write_update_max_frame_len_at_rest() {
443 let mut io = length_delimited::Builder::new()
444 .new_write(mock! {
445 Ok(b"\x00\x00\x00\x06"[..].into()),
446 Ok(b"abcdef"[..].into()),
447 Ok(Flush),
448 });
449
450 assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
451 assert!(io.poll_complete().unwrap().is_ready());
452 io.encoder_mut().set_max_frame_length(5);
453 assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput);
454 assert!(io.get_ref().calls.is_empty());
455 }
456
457 #[test]
write_update_max_frame_len_in_flight()458 fn write_update_max_frame_len_in_flight() {
459 let mut io = length_delimited::Builder::new()
460 .new_write(mock! {
461 Ok(b"\x00\x00\x00\x06"[..].into()),
462 Ok(b"ab"[..].into()),
463 Err(would_block()),
464 Ok(b"cdef"[..].into()),
465 Ok(Flush),
466 });
467
468 assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
469 assert!(!io.poll_complete().unwrap().is_ready());
470 io.encoder_mut().set_max_frame_length(5);
471 assert!(io.poll_complete().unwrap().is_ready());
472 assert_eq!(io.start_send(Bytes::from("abcdef")).unwrap_err().kind(), io::ErrorKind::InvalidInput);
473 assert!(io.get_ref().calls.is_empty());
474 }
475
476 #[test]
write_zero()477 fn write_zero() {
478 let mut io = length_delimited::Builder::new()
479 .new_write(mock! { });
480
481 assert!(io.start_send(Bytes::from("abcdef")).unwrap().is_ready());
482 assert_eq!(io.poll_complete().unwrap_err().kind(), io::ErrorKind::WriteZero);
483 assert!(io.get_ref().calls.is_empty());
484 }
485
486 // ===== Test utils =====
487
would_block() -> io::Error488 fn would_block() -> io::Error {
489 io::Error::new(io::ErrorKind::WouldBlock, "would block")
490 }
491
492 struct Mock {
493 calls: VecDeque<io::Result<Op>>,
494 }
495
496 enum Op {
497 Data(Vec<u8>),
498 Flush,
499 }
500
501 use self::Op::*;
502
503 impl io::Read for Mock {
read(&mut self, dst: &mut [u8]) -> io::Result<usize>504 fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
505 match self.calls.pop_front() {
506 Some(Ok(Op::Data(data))) => {
507 debug_assert!(dst.len() >= data.len());
508 dst[..data.len()].copy_from_slice(&data[..]);
509 Ok(data.len())
510 }
511 Some(Ok(_)) => panic!(),
512 Some(Err(e)) => Err(e),
513 None => Ok(0),
514 }
515 }
516 }
517
518 impl AsyncRead for Mock {
519 }
520
521 impl io::Write for Mock {
write(&mut self, src: &[u8]) -> io::Result<usize>522 fn write(&mut self, src: &[u8]) -> io::Result<usize> {
523 match self.calls.pop_front() {
524 Some(Ok(Op::Data(data))) => {
525 let len = data.len();
526 assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
527 assert_eq!(&data[..], &src[..len]);
528 Ok(len)
529 }
530 Some(Ok(_)) => panic!(),
531 Some(Err(e)) => Err(e),
532 None => Ok(0),
533 }
534 }
535
flush(&mut self) -> io::Result<()>536 fn flush(&mut self) -> io::Result<()> {
537 match self.calls.pop_front() {
538 Some(Ok(Op::Flush)) => {
539 Ok(())
540 }
541 Some(Ok(_)) => panic!(),
542 Some(Err(e)) => Err(e),
543 None => Ok(()),
544 }
545 }
546 }
547
548 impl AsyncWrite for Mock {
shutdown(&mut self) -> Poll<(), io::Error>549 fn shutdown(&mut self) -> Poll<(), io::Error> {
550 Ok(Ready(()))
551 }
552 }
553
554 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op555 fn from(src: &'a [u8]) -> Op {
556 Op::Data(src.into())
557 }
558 }
559
560 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op561 fn from(src: Vec<u8>) -> Op {
562 Op::Data(src)
563 }
564 }
565