1 #![warn(rust_2018_idioms)]
2 
3 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
4 use tokio_test::task;
5 use tokio_test::{
6     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
7 };
8 use tokio_util::codec::*;
9 
10 use bytes::{BufMut, Bytes, BytesMut};
11 use futures::{pin_mut, Sink, Stream};
12 use std::collections::VecDeque;
13 use std::io;
14 use std::pin::Pin;
15 use std::task::Poll::*;
16 use std::task::{Context, Poll};
17 
18 macro_rules! mock {
19     ($($x:expr,)*) => {{
20         let mut v = VecDeque::new();
21         v.extend(vec![$($x),*]);
22         Mock { calls: v }
23     }};
24 }
25 
26 macro_rules! assert_next_eq {
27     ($io:ident, $expect:expr) => {{
28         task::spawn(()).enter(|cx, _| {
29             let res = assert_ready!($io.as_mut().poll_next(cx));
30             match res {
31                 Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
32                 Some(Err(e)) => panic!("error = {:?}", e),
33                 None => panic!("none"),
34             }
35         });
36     }};
37 }
38 
39 macro_rules! assert_next_pending {
40     ($io:ident) => {{
41         task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
42             Ready(Some(Ok(v))) => panic!("value = {:?}", v),
43             Ready(Some(Err(e))) => panic!("error = {:?}", e),
44             Ready(None) => panic!("done"),
45             Pending => {}
46         });
47     }};
48 }
49 
50 macro_rules! assert_next_err {
51     ($io:ident) => {{
52         task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
53             Ready(Some(Ok(v))) => panic!("value = {:?}", v),
54             Ready(Some(Err(_))) => {}
55             Ready(None) => panic!("done"),
56             Pending => panic!("pending"),
57         });
58     }};
59 }
60 
61 macro_rules! assert_done {
62     ($io:ident) => {{
63         task::spawn(()).enter(|cx, _| {
64             let res = assert_ready!($io.as_mut().poll_next(cx));
65             match res {
66                 Some(Ok(v)) => panic!("value = {:?}", v),
67                 Some(Err(e)) => panic!("error = {:?}", e),
68                 None => {}
69             }
70         });
71     }};
72 }
73 
74 #[test]
read_empty_io_yields_nothing()75 fn read_empty_io_yields_nothing() {
76     let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
77     pin_mut!(io);
78 
79     assert_done!(io);
80 }
81 
82 #[test]
read_single_frame_one_packet()83 fn read_single_frame_one_packet() {
84     let io = FramedRead::new(
85         mock! {
86             data(b"\x00\x00\x00\x09abcdefghi"),
87         },
88         LengthDelimitedCodec::new(),
89     );
90     pin_mut!(io);
91 
92     assert_next_eq!(io, b"abcdefghi");
93     assert_done!(io);
94 }
95 
96 #[test]
read_single_frame_one_packet_little_endian()97 fn read_single_frame_one_packet_little_endian() {
98     let io = length_delimited::Builder::new()
99         .little_endian()
100         .new_read(mock! {
101             data(b"\x09\x00\x00\x00abcdefghi"),
102         });
103     pin_mut!(io);
104 
105     assert_next_eq!(io, b"abcdefghi");
106     assert_done!(io);
107 }
108 
109 #[test]
read_single_frame_one_packet_native_endian()110 fn read_single_frame_one_packet_native_endian() {
111     let d = if cfg!(target_endian = "big") {
112         b"\x00\x00\x00\x09abcdefghi"
113     } else {
114         b"\x09\x00\x00\x00abcdefghi"
115     };
116     let io = length_delimited::Builder::new()
117         .native_endian()
118         .new_read(mock! {
119             data(d),
120         });
121     pin_mut!(io);
122 
123     assert_next_eq!(io, b"abcdefghi");
124     assert_done!(io);
125 }
126 
127 #[test]
read_single_multi_frame_one_packet()128 fn read_single_multi_frame_one_packet() {
129     let mut d: Vec<u8> = vec![];
130     d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
131     d.extend_from_slice(b"\x00\x00\x00\x03123");
132     d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
133 
134     let io = FramedRead::new(
135         mock! {
136             data(&d),
137         },
138         LengthDelimitedCodec::new(),
139     );
140     pin_mut!(io);
141 
142     assert_next_eq!(io, b"abcdefghi");
143     assert_next_eq!(io, b"123");
144     assert_next_eq!(io, b"hello world");
145     assert_done!(io);
146 }
147 
148 #[test]
read_single_frame_multi_packet()149 fn read_single_frame_multi_packet() {
150     let io = FramedRead::new(
151         mock! {
152             data(b"\x00\x00"),
153             data(b"\x00\x09abc"),
154             data(b"defghi"),
155         },
156         LengthDelimitedCodec::new(),
157     );
158     pin_mut!(io);
159 
160     assert_next_eq!(io, b"abcdefghi");
161     assert_done!(io);
162 }
163 
164 #[test]
read_multi_frame_multi_packet()165 fn read_multi_frame_multi_packet() {
166     let io = FramedRead::new(
167         mock! {
168             data(b"\x00\x00"),
169             data(b"\x00\x09abc"),
170             data(b"defghi"),
171             data(b"\x00\x00\x00\x0312"),
172             data(b"3\x00\x00\x00\x0bhello world"),
173         },
174         LengthDelimitedCodec::new(),
175     );
176     pin_mut!(io);
177 
178     assert_next_eq!(io, b"abcdefghi");
179     assert_next_eq!(io, b"123");
180     assert_next_eq!(io, b"hello world");
181     assert_done!(io);
182 }
183 
184 #[test]
read_single_frame_multi_packet_wait()185 fn read_single_frame_multi_packet_wait() {
186     let io = FramedRead::new(
187         mock! {
188             data(b"\x00\x00"),
189             Pending,
190             data(b"\x00\x09abc"),
191             Pending,
192             data(b"defghi"),
193             Pending,
194         },
195         LengthDelimitedCodec::new(),
196     );
197     pin_mut!(io);
198 
199     assert_next_pending!(io);
200     assert_next_pending!(io);
201     assert_next_eq!(io, b"abcdefghi");
202     assert_next_pending!(io);
203     assert_done!(io);
204 }
205 
206 #[test]
read_multi_frame_multi_packet_wait()207 fn read_multi_frame_multi_packet_wait() {
208     let io = FramedRead::new(
209         mock! {
210             data(b"\x00\x00"),
211             Pending,
212             data(b"\x00\x09abc"),
213             Pending,
214             data(b"defghi"),
215             Pending,
216             data(b"\x00\x00\x00\x0312"),
217             Pending,
218             data(b"3\x00\x00\x00\x0bhello world"),
219             Pending,
220         },
221         LengthDelimitedCodec::new(),
222     );
223     pin_mut!(io);
224 
225     assert_next_pending!(io);
226     assert_next_pending!(io);
227     assert_next_eq!(io, b"abcdefghi");
228     assert_next_pending!(io);
229     assert_next_pending!(io);
230     assert_next_eq!(io, b"123");
231     assert_next_eq!(io, b"hello world");
232     assert_next_pending!(io);
233     assert_done!(io);
234 }
235 
236 #[test]
read_incomplete_head()237 fn read_incomplete_head() {
238     let io = FramedRead::new(
239         mock! {
240             data(b"\x00\x00"),
241         },
242         LengthDelimitedCodec::new(),
243     );
244     pin_mut!(io);
245 
246     assert_next_err!(io);
247 }
248 
249 #[test]
read_incomplete_head_multi()250 fn read_incomplete_head_multi() {
251     let io = FramedRead::new(
252         mock! {
253             Pending,
254             data(b"\x00"),
255             Pending,
256         },
257         LengthDelimitedCodec::new(),
258     );
259     pin_mut!(io);
260 
261     assert_next_pending!(io);
262     assert_next_pending!(io);
263     assert_next_err!(io);
264 }
265 
266 #[test]
read_incomplete_payload()267 fn read_incomplete_payload() {
268     let io = FramedRead::new(
269         mock! {
270             data(b"\x00\x00\x00\x09ab"),
271             Pending,
272             data(b"cd"),
273             Pending,
274         },
275         LengthDelimitedCodec::new(),
276     );
277     pin_mut!(io);
278 
279     assert_next_pending!(io);
280     assert_next_pending!(io);
281     assert_next_err!(io);
282 }
283 
284 #[test]
read_max_frame_len()285 fn read_max_frame_len() {
286     let io = length_delimited::Builder::new()
287         .max_frame_length(5)
288         .new_read(mock! {
289             data(b"\x00\x00\x00\x09abcdefghi"),
290         });
291     pin_mut!(io);
292 
293     assert_next_err!(io);
294 }
295 
296 #[test]
read_update_max_frame_len_at_rest()297 fn read_update_max_frame_len_at_rest() {
298     let io = length_delimited::Builder::new().new_read(mock! {
299         data(b"\x00\x00\x00\x09abcdefghi"),
300         data(b"\x00\x00\x00\x09abcdefghi"),
301     });
302     pin_mut!(io);
303 
304     assert_next_eq!(io, b"abcdefghi");
305     io.decoder_mut().set_max_frame_length(5);
306     assert_next_err!(io);
307 }
308 
309 #[test]
read_update_max_frame_len_in_flight()310 fn read_update_max_frame_len_in_flight() {
311     let io = length_delimited::Builder::new().new_read(mock! {
312         data(b"\x00\x00\x00\x09abcd"),
313         Pending,
314         data(b"efghi"),
315         data(b"\x00\x00\x00\x09abcdefghi"),
316     });
317     pin_mut!(io);
318 
319     assert_next_pending!(io);
320     io.decoder_mut().set_max_frame_length(5);
321     assert_next_eq!(io, b"abcdefghi");
322     assert_next_err!(io);
323 }
324 
325 #[test]
read_one_byte_length_field()326 fn read_one_byte_length_field() {
327     let io = length_delimited::Builder::new()
328         .length_field_length(1)
329         .new_read(mock! {
330             data(b"\x09abcdefghi"),
331         });
332     pin_mut!(io);
333 
334     assert_next_eq!(io, b"abcdefghi");
335     assert_done!(io);
336 }
337 
338 #[test]
read_header_offset()339 fn read_header_offset() {
340     let io = length_delimited::Builder::new()
341         .length_field_length(2)
342         .length_field_offset(4)
343         .new_read(mock! {
344             data(b"zzzz\x00\x09abcdefghi"),
345         });
346     pin_mut!(io);
347 
348     assert_next_eq!(io, b"abcdefghi");
349     assert_done!(io);
350 }
351 
352 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()353 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
354     let mut d: Vec<u8> = vec![];
355     d.extend_from_slice(b"xx\x00\x09abcdefghi");
356     d.extend_from_slice(b"yy\x00\x03123");
357     d.extend_from_slice(b"zz\x00\x0bhello world");
358 
359     let io = length_delimited::Builder::new()
360         .length_field_length(2)
361         .length_field_offset(2)
362         .num_skip(0)
363         .length_adjustment(4)
364         .new_read(mock! {
365             data(&d),
366         });
367     pin_mut!(io);
368 
369     assert_next_eq!(io, b"xx\x00\x09abcdefghi");
370     assert_next_eq!(io, b"yy\x00\x03123");
371     assert_next_eq!(io, b"zz\x00\x0bhello world");
372     assert_done!(io);
373 }
374 
375 #[test]
read_single_frame_length_adjusted()376 fn read_single_frame_length_adjusted() {
377     let mut d: Vec<u8> = vec![];
378     d.extend_from_slice(b"\x00\x00\x0b\x0cHello world");
379 
380     let io = length_delimited::Builder::new()
381         .length_field_offset(0)
382         .length_field_length(3)
383         .length_adjustment(0)
384         .num_skip(4)
385         .new_read(mock! {
386             data(&d),
387         });
388     pin_mut!(io);
389 
390     assert_next_eq!(io, b"Hello world");
391     assert_done!(io);
392 }
393 
394 #[test]
read_single_multi_frame_one_packet_length_includes_head()395 fn read_single_multi_frame_one_packet_length_includes_head() {
396     let mut d: Vec<u8> = vec![];
397     d.extend_from_slice(b"\x00\x0babcdefghi");
398     d.extend_from_slice(b"\x00\x05123");
399     d.extend_from_slice(b"\x00\x0dhello world");
400 
401     let io = length_delimited::Builder::new()
402         .length_field_length(2)
403         .length_adjustment(-2)
404         .new_read(mock! {
405             data(&d),
406         });
407     pin_mut!(io);
408 
409     assert_next_eq!(io, b"abcdefghi");
410     assert_next_eq!(io, b"123");
411     assert_next_eq!(io, b"hello world");
412     assert_done!(io);
413 }
414 
415 #[test]
write_single_frame_length_adjusted()416 fn write_single_frame_length_adjusted() {
417     let io = length_delimited::Builder::new()
418         .length_adjustment(-2)
419         .new_write(mock! {
420             data(b"\x00\x00\x00\x0b"),
421             data(b"abcdefghi"),
422             flush(),
423         });
424     pin_mut!(io);
425 
426     task::spawn(()).enter(|cx, _| {
427         assert_ready_ok!(io.as_mut().poll_ready(cx));
428         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
429         assert_ready_ok!(io.as_mut().poll_flush(cx));
430         assert!(io.get_ref().calls.is_empty());
431     });
432 }
433 
434 #[test]
write_nothing_yields_nothing()435 fn write_nothing_yields_nothing() {
436     let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
437     pin_mut!(io);
438 
439     task::spawn(()).enter(|cx, _| {
440         assert_ready_ok!(io.poll_flush(cx));
441     });
442 }
443 
444 #[test]
write_single_frame_one_packet()445 fn write_single_frame_one_packet() {
446     let io = FramedWrite::new(
447         mock! {
448             data(b"\x00\x00\x00\x09"),
449             data(b"abcdefghi"),
450             flush(),
451         },
452         LengthDelimitedCodec::new(),
453     );
454     pin_mut!(io);
455 
456     task::spawn(()).enter(|cx, _| {
457         assert_ready_ok!(io.as_mut().poll_ready(cx));
458         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
459         assert_ready_ok!(io.as_mut().poll_flush(cx));
460         assert!(io.get_ref().calls.is_empty());
461     });
462 }
463 
464 #[test]
write_single_multi_frame_one_packet()465 fn write_single_multi_frame_one_packet() {
466     let io = FramedWrite::new(
467         mock! {
468             data(b"\x00\x00\x00\x09"),
469             data(b"abcdefghi"),
470             data(b"\x00\x00\x00\x03"),
471             data(b"123"),
472             data(b"\x00\x00\x00\x0b"),
473             data(b"hello world"),
474             flush(),
475         },
476         LengthDelimitedCodec::new(),
477     );
478     pin_mut!(io);
479 
480     task::spawn(()).enter(|cx, _| {
481         assert_ready_ok!(io.as_mut().poll_ready(cx));
482         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
483 
484         assert_ready_ok!(io.as_mut().poll_ready(cx));
485         assert_ok!(io.as_mut().start_send(Bytes::from("123")));
486 
487         assert_ready_ok!(io.as_mut().poll_ready(cx));
488         assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
489 
490         assert_ready_ok!(io.as_mut().poll_flush(cx));
491         assert!(io.get_ref().calls.is_empty());
492     });
493 }
494 
495 #[test]
write_single_multi_frame_multi_packet()496 fn write_single_multi_frame_multi_packet() {
497     let io = FramedWrite::new(
498         mock! {
499             data(b"\x00\x00\x00\x09"),
500             data(b"abcdefghi"),
501             flush(),
502             data(b"\x00\x00\x00\x03"),
503             data(b"123"),
504             flush(),
505             data(b"\x00\x00\x00\x0b"),
506             data(b"hello world"),
507             flush(),
508         },
509         LengthDelimitedCodec::new(),
510     );
511     pin_mut!(io);
512 
513     task::spawn(()).enter(|cx, _| {
514         assert_ready_ok!(io.as_mut().poll_ready(cx));
515         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
516 
517         assert_ready_ok!(io.as_mut().poll_flush(cx));
518 
519         assert_ready_ok!(io.as_mut().poll_ready(cx));
520         assert_ok!(io.as_mut().start_send(Bytes::from("123")));
521 
522         assert_ready_ok!(io.as_mut().poll_flush(cx));
523 
524         assert_ready_ok!(io.as_mut().poll_ready(cx));
525         assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
526 
527         assert_ready_ok!(io.as_mut().poll_flush(cx));
528         assert!(io.get_ref().calls.is_empty());
529     });
530 }
531 
532 #[test]
write_single_frame_would_block()533 fn write_single_frame_would_block() {
534     let io = FramedWrite::new(
535         mock! {
536             Pending,
537             data(b"\x00\x00"),
538             Pending,
539             data(b"\x00\x09"),
540             data(b"abcdefghi"),
541             flush(),
542         },
543         LengthDelimitedCodec::new(),
544     );
545     pin_mut!(io);
546 
547     task::spawn(()).enter(|cx, _| {
548         assert_ready_ok!(io.as_mut().poll_ready(cx));
549         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
550 
551         assert_pending!(io.as_mut().poll_flush(cx));
552         assert_pending!(io.as_mut().poll_flush(cx));
553         assert_ready_ok!(io.as_mut().poll_flush(cx));
554 
555         assert!(io.get_ref().calls.is_empty());
556     });
557 }
558 
559 #[test]
write_single_frame_little_endian()560 fn write_single_frame_little_endian() {
561     let io = length_delimited::Builder::new()
562         .little_endian()
563         .new_write(mock! {
564             data(b"\x09\x00\x00\x00"),
565             data(b"abcdefghi"),
566             flush(),
567         });
568     pin_mut!(io);
569 
570     task::spawn(()).enter(|cx, _| {
571         assert_ready_ok!(io.as_mut().poll_ready(cx));
572         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
573 
574         assert_ready_ok!(io.as_mut().poll_flush(cx));
575         assert!(io.get_ref().calls.is_empty());
576     });
577 }
578 
579 #[test]
write_single_frame_with_short_length_field()580 fn write_single_frame_with_short_length_field() {
581     let io = length_delimited::Builder::new()
582         .length_field_length(1)
583         .new_write(mock! {
584             data(b"\x09"),
585             data(b"abcdefghi"),
586             flush(),
587         });
588     pin_mut!(io);
589 
590     task::spawn(()).enter(|cx, _| {
591         assert_ready_ok!(io.as_mut().poll_ready(cx));
592         assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
593 
594         assert_ready_ok!(io.as_mut().poll_flush(cx));
595 
596         assert!(io.get_ref().calls.is_empty());
597     });
598 }
599 
600 #[test]
write_max_frame_len()601 fn write_max_frame_len() {
602     let io = length_delimited::Builder::new()
603         .max_frame_length(5)
604         .new_write(mock! {});
605     pin_mut!(io);
606 
607     task::spawn(()).enter(|cx, _| {
608         assert_ready_ok!(io.as_mut().poll_ready(cx));
609         assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
610 
611         assert!(io.get_ref().calls.is_empty());
612     });
613 }
614 
615 #[test]
write_update_max_frame_len_at_rest()616 fn write_update_max_frame_len_at_rest() {
617     let io = length_delimited::Builder::new().new_write(mock! {
618         data(b"\x00\x00\x00\x06"),
619         data(b"abcdef"),
620         flush(),
621     });
622     pin_mut!(io);
623 
624     task::spawn(()).enter(|cx, _| {
625         assert_ready_ok!(io.as_mut().poll_ready(cx));
626         assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
627 
628         assert_ready_ok!(io.as_mut().poll_flush(cx));
629 
630         io.encoder_mut().set_max_frame_length(5);
631 
632         assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
633 
634         assert!(io.get_ref().calls.is_empty());
635     });
636 }
637 
638 #[test]
write_update_max_frame_len_in_flight()639 fn write_update_max_frame_len_in_flight() {
640     let io = length_delimited::Builder::new().new_write(mock! {
641         data(b"\x00\x00\x00\x06"),
642         data(b"ab"),
643         Pending,
644         data(b"cdef"),
645         flush(),
646     });
647     pin_mut!(io);
648 
649     task::spawn(()).enter(|cx, _| {
650         assert_ready_ok!(io.as_mut().poll_ready(cx));
651         assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
652 
653         assert_pending!(io.as_mut().poll_flush(cx));
654 
655         io.encoder_mut().set_max_frame_length(5);
656 
657         assert_ready_ok!(io.as_mut().poll_flush(cx));
658 
659         assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
660         assert!(io.get_ref().calls.is_empty());
661     });
662 }
663 
664 #[test]
write_zero()665 fn write_zero() {
666     let io = length_delimited::Builder::new().new_write(mock! {});
667     pin_mut!(io);
668 
669     task::spawn(()).enter(|cx, _| {
670         assert_ready_ok!(io.as_mut().poll_ready(cx));
671         assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
672 
673         assert_ready_err!(io.as_mut().poll_flush(cx));
674 
675         assert!(io.get_ref().calls.is_empty());
676     });
677 }
678 
679 #[test]
encode_overflow()680 fn encode_overflow() {
681     // Test reproducing tokio-rs/tokio#681.
682     let mut codec = length_delimited::Builder::new().new_codec();
683     let mut buf = BytesMut::with_capacity(1024);
684 
685     // Put some data into the buffer without resizing it to hold more.
686     let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
687     buf.put_slice(&some_as[..]);
688 
689     // Trying to encode the length header should resize the buffer if it won't fit.
690     codec.encode(Bytes::from("hello"), &mut buf).unwrap();
691 }
692 
693 // ===== Test utils =====
694 
695 struct Mock {
696     calls: VecDeque<Poll<io::Result<Op>>>,
697 }
698 
699 enum Op {
700     Data(Vec<u8>),
701     Flush,
702 }
703 
704 use self::Op::*;
705 
706 impl AsyncRead for Mock {
poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, dst: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>707     fn poll_read(
708         mut self: Pin<&mut Self>,
709         _cx: &mut Context<'_>,
710         dst: &mut ReadBuf<'_>,
711     ) -> Poll<io::Result<()>> {
712         match self.calls.pop_front() {
713             Some(Ready(Ok(Op::Data(data)))) => {
714                 debug_assert!(dst.remaining() >= data.len());
715                 dst.put_slice(&data);
716                 Ready(Ok(()))
717             }
718             Some(Ready(Ok(_))) => panic!(),
719             Some(Ready(Err(e))) => Ready(Err(e)),
720             Some(Pending) => Pending,
721             None => Ready(Ok(())),
722         }
723     }
724 }
725 
726 impl AsyncWrite for Mock {
poll_write( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, src: &[u8], ) -> Poll<Result<usize, io::Error>>727     fn poll_write(
728         mut self: Pin<&mut Self>,
729         _cx: &mut Context<'_>,
730         src: &[u8],
731     ) -> Poll<Result<usize, io::Error>> {
732         match self.calls.pop_front() {
733             Some(Ready(Ok(Op::Data(data)))) => {
734                 let len = data.len();
735                 assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
736                 assert_eq!(&data[..], &src[..len]);
737                 Ready(Ok(len))
738             }
739             Some(Ready(Ok(_))) => panic!(),
740             Some(Ready(Err(e))) => Ready(Err(e)),
741             Some(Pending) => Pending,
742             None => Ready(Ok(0)),
743         }
744     }
745 
poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>746     fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
747         match self.calls.pop_front() {
748             Some(Ready(Ok(Op::Flush))) => Ready(Ok(())),
749             Some(Ready(Ok(_))) => panic!(),
750             Some(Ready(Err(e))) => Ready(Err(e)),
751             Some(Pending) => Pending,
752             None => Ready(Ok(())),
753         }
754     }
755 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>756     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
757         Ready(Ok(()))
758     }
759 }
760 
761 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op762     fn from(src: &'a [u8]) -> Op {
763         Op::Data(src.into())
764     }
765 }
766 
767 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op768     fn from(src: Vec<u8>) -> Op {
769         Op::Data(src)
770     }
771 }
772 
data(bytes: &[u8]) -> Poll<io::Result<Op>>773 fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
774     Ready(Ok(bytes.into()))
775 }
776 
flush() -> Poll<io::Result<Op>>777 fn flush() -> Poll<io::Result<Op>> {
778     Ready(Ok(Flush))
779 }
780