1 #![warn(rust_2018_idioms)]
2
3 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
4 use tokio_test::task;
5 use tokio_test::{
6 assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
7 };
8 use tokio_util::codec::*;
9
10 use bytes::{BufMut, Bytes, BytesMut};
11 use futures::{pin_mut, Sink, Stream};
12 use std::collections::VecDeque;
13 use std::io;
14 use std::pin::Pin;
15 use std::task::Poll::*;
16 use std::task::{Context, Poll};
17
18 macro_rules! mock {
19 ($($x:expr,)*) => {{
20 let mut v = VecDeque::new();
21 v.extend(vec![$($x),*]);
22 Mock { calls: v }
23 }};
24 }
25
26 macro_rules! assert_next_eq {
27 ($io:ident, $expect:expr) => {{
28 task::spawn(()).enter(|cx, _| {
29 let res = assert_ready!($io.as_mut().poll_next(cx));
30 match res {
31 Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
32 Some(Err(e)) => panic!("error = {:?}", e),
33 None => panic!("none"),
34 }
35 });
36 }};
37 }
38
39 macro_rules! assert_next_pending {
40 ($io:ident) => {{
41 task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
42 Ready(Some(Ok(v))) => panic!("value = {:?}", v),
43 Ready(Some(Err(e))) => panic!("error = {:?}", e),
44 Ready(None) => panic!("done"),
45 Pending => {}
46 });
47 }};
48 }
49
50 macro_rules! assert_next_err {
51 ($io:ident) => {{
52 task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
53 Ready(Some(Ok(v))) => panic!("value = {:?}", v),
54 Ready(Some(Err(_))) => {}
55 Ready(None) => panic!("done"),
56 Pending => panic!("pending"),
57 });
58 }};
59 }
60
61 macro_rules! assert_done {
62 ($io:ident) => {{
63 task::spawn(()).enter(|cx, _| {
64 let res = assert_ready!($io.as_mut().poll_next(cx));
65 match res {
66 Some(Ok(v)) => panic!("value = {:?}", v),
67 Some(Err(e)) => panic!("error = {:?}", e),
68 None => {}
69 }
70 });
71 }};
72 }
73
74 #[test]
read_empty_io_yields_nothing()75 fn read_empty_io_yields_nothing() {
76 let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
77 pin_mut!(io);
78
79 assert_done!(io);
80 }
81
82 #[test]
read_single_frame_one_packet()83 fn read_single_frame_one_packet() {
84 let io = FramedRead::new(
85 mock! {
86 data(b"\x00\x00\x00\x09abcdefghi"),
87 },
88 LengthDelimitedCodec::new(),
89 );
90 pin_mut!(io);
91
92 assert_next_eq!(io, b"abcdefghi");
93 assert_done!(io);
94 }
95
96 #[test]
read_single_frame_one_packet_little_endian()97 fn read_single_frame_one_packet_little_endian() {
98 let io = length_delimited::Builder::new()
99 .little_endian()
100 .new_read(mock! {
101 data(b"\x09\x00\x00\x00abcdefghi"),
102 });
103 pin_mut!(io);
104
105 assert_next_eq!(io, b"abcdefghi");
106 assert_done!(io);
107 }
108
109 #[test]
read_single_frame_one_packet_native_endian()110 fn read_single_frame_one_packet_native_endian() {
111 let d = if cfg!(target_endian = "big") {
112 b"\x00\x00\x00\x09abcdefghi"
113 } else {
114 b"\x09\x00\x00\x00abcdefghi"
115 };
116 let io = length_delimited::Builder::new()
117 .native_endian()
118 .new_read(mock! {
119 data(d),
120 });
121 pin_mut!(io);
122
123 assert_next_eq!(io, b"abcdefghi");
124 assert_done!(io);
125 }
126
127 #[test]
read_single_multi_frame_one_packet()128 fn read_single_multi_frame_one_packet() {
129 let mut d: Vec<u8> = vec![];
130 d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
131 d.extend_from_slice(b"\x00\x00\x00\x03123");
132 d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
133
134 let io = FramedRead::new(
135 mock! {
136 data(&d),
137 },
138 LengthDelimitedCodec::new(),
139 );
140 pin_mut!(io);
141
142 assert_next_eq!(io, b"abcdefghi");
143 assert_next_eq!(io, b"123");
144 assert_next_eq!(io, b"hello world");
145 assert_done!(io);
146 }
147
148 #[test]
read_single_frame_multi_packet()149 fn read_single_frame_multi_packet() {
150 let io = FramedRead::new(
151 mock! {
152 data(b"\x00\x00"),
153 data(b"\x00\x09abc"),
154 data(b"defghi"),
155 },
156 LengthDelimitedCodec::new(),
157 );
158 pin_mut!(io);
159
160 assert_next_eq!(io, b"abcdefghi");
161 assert_done!(io);
162 }
163
164 #[test]
read_multi_frame_multi_packet()165 fn read_multi_frame_multi_packet() {
166 let io = FramedRead::new(
167 mock! {
168 data(b"\x00\x00"),
169 data(b"\x00\x09abc"),
170 data(b"defghi"),
171 data(b"\x00\x00\x00\x0312"),
172 data(b"3\x00\x00\x00\x0bhello world"),
173 },
174 LengthDelimitedCodec::new(),
175 );
176 pin_mut!(io);
177
178 assert_next_eq!(io, b"abcdefghi");
179 assert_next_eq!(io, b"123");
180 assert_next_eq!(io, b"hello world");
181 assert_done!(io);
182 }
183
184 #[test]
read_single_frame_multi_packet_wait()185 fn read_single_frame_multi_packet_wait() {
186 let io = FramedRead::new(
187 mock! {
188 data(b"\x00\x00"),
189 Pending,
190 data(b"\x00\x09abc"),
191 Pending,
192 data(b"defghi"),
193 Pending,
194 },
195 LengthDelimitedCodec::new(),
196 );
197 pin_mut!(io);
198
199 assert_next_pending!(io);
200 assert_next_pending!(io);
201 assert_next_eq!(io, b"abcdefghi");
202 assert_next_pending!(io);
203 assert_done!(io);
204 }
205
206 #[test]
read_multi_frame_multi_packet_wait()207 fn read_multi_frame_multi_packet_wait() {
208 let io = FramedRead::new(
209 mock! {
210 data(b"\x00\x00"),
211 Pending,
212 data(b"\x00\x09abc"),
213 Pending,
214 data(b"defghi"),
215 Pending,
216 data(b"\x00\x00\x00\x0312"),
217 Pending,
218 data(b"3\x00\x00\x00\x0bhello world"),
219 Pending,
220 },
221 LengthDelimitedCodec::new(),
222 );
223 pin_mut!(io);
224
225 assert_next_pending!(io);
226 assert_next_pending!(io);
227 assert_next_eq!(io, b"abcdefghi");
228 assert_next_pending!(io);
229 assert_next_pending!(io);
230 assert_next_eq!(io, b"123");
231 assert_next_eq!(io, b"hello world");
232 assert_next_pending!(io);
233 assert_done!(io);
234 }
235
236 #[test]
read_incomplete_head()237 fn read_incomplete_head() {
238 let io = FramedRead::new(
239 mock! {
240 data(b"\x00\x00"),
241 },
242 LengthDelimitedCodec::new(),
243 );
244 pin_mut!(io);
245
246 assert_next_err!(io);
247 }
248
249 #[test]
read_incomplete_head_multi()250 fn read_incomplete_head_multi() {
251 let io = FramedRead::new(
252 mock! {
253 Pending,
254 data(b"\x00"),
255 Pending,
256 },
257 LengthDelimitedCodec::new(),
258 );
259 pin_mut!(io);
260
261 assert_next_pending!(io);
262 assert_next_pending!(io);
263 assert_next_err!(io);
264 }
265
266 #[test]
read_incomplete_payload()267 fn read_incomplete_payload() {
268 let io = FramedRead::new(
269 mock! {
270 data(b"\x00\x00\x00\x09ab"),
271 Pending,
272 data(b"cd"),
273 Pending,
274 },
275 LengthDelimitedCodec::new(),
276 );
277 pin_mut!(io);
278
279 assert_next_pending!(io);
280 assert_next_pending!(io);
281 assert_next_err!(io);
282 }
283
284 #[test]
read_max_frame_len()285 fn read_max_frame_len() {
286 let io = length_delimited::Builder::new()
287 .max_frame_length(5)
288 .new_read(mock! {
289 data(b"\x00\x00\x00\x09abcdefghi"),
290 });
291 pin_mut!(io);
292
293 assert_next_err!(io);
294 }
295
296 #[test]
read_update_max_frame_len_at_rest()297 fn read_update_max_frame_len_at_rest() {
298 let io = length_delimited::Builder::new().new_read(mock! {
299 data(b"\x00\x00\x00\x09abcdefghi"),
300 data(b"\x00\x00\x00\x09abcdefghi"),
301 });
302 pin_mut!(io);
303
304 assert_next_eq!(io, b"abcdefghi");
305 io.decoder_mut().set_max_frame_length(5);
306 assert_next_err!(io);
307 }
308
309 #[test]
read_update_max_frame_len_in_flight()310 fn read_update_max_frame_len_in_flight() {
311 let io = length_delimited::Builder::new().new_read(mock! {
312 data(b"\x00\x00\x00\x09abcd"),
313 Pending,
314 data(b"efghi"),
315 data(b"\x00\x00\x00\x09abcdefghi"),
316 });
317 pin_mut!(io);
318
319 assert_next_pending!(io);
320 io.decoder_mut().set_max_frame_length(5);
321 assert_next_eq!(io, b"abcdefghi");
322 assert_next_err!(io);
323 }
324
325 #[test]
read_one_byte_length_field()326 fn read_one_byte_length_field() {
327 let io = length_delimited::Builder::new()
328 .length_field_length(1)
329 .new_read(mock! {
330 data(b"\x09abcdefghi"),
331 });
332 pin_mut!(io);
333
334 assert_next_eq!(io, b"abcdefghi");
335 assert_done!(io);
336 }
337
338 #[test]
read_header_offset()339 fn read_header_offset() {
340 let io = length_delimited::Builder::new()
341 .length_field_length(2)
342 .length_field_offset(4)
343 .new_read(mock! {
344 data(b"zzzz\x00\x09abcdefghi"),
345 });
346 pin_mut!(io);
347
348 assert_next_eq!(io, b"abcdefghi");
349 assert_done!(io);
350 }
351
352 #[test]
read_single_multi_frame_one_packet_skip_none_adjusted()353 fn read_single_multi_frame_one_packet_skip_none_adjusted() {
354 let mut d: Vec<u8> = vec![];
355 d.extend_from_slice(b"xx\x00\x09abcdefghi");
356 d.extend_from_slice(b"yy\x00\x03123");
357 d.extend_from_slice(b"zz\x00\x0bhello world");
358
359 let io = length_delimited::Builder::new()
360 .length_field_length(2)
361 .length_field_offset(2)
362 .num_skip(0)
363 .length_adjustment(4)
364 .new_read(mock! {
365 data(&d),
366 });
367 pin_mut!(io);
368
369 assert_next_eq!(io, b"xx\x00\x09abcdefghi");
370 assert_next_eq!(io, b"yy\x00\x03123");
371 assert_next_eq!(io, b"zz\x00\x0bhello world");
372 assert_done!(io);
373 }
374
375 #[test]
read_single_frame_length_adjusted()376 fn read_single_frame_length_adjusted() {
377 let mut d: Vec<u8> = vec![];
378 d.extend_from_slice(b"\x00\x00\x0b\x0cHello world");
379
380 let io = length_delimited::Builder::new()
381 .length_field_offset(0)
382 .length_field_length(3)
383 .length_adjustment(0)
384 .num_skip(4)
385 .new_read(mock! {
386 data(&d),
387 });
388 pin_mut!(io);
389
390 assert_next_eq!(io, b"Hello world");
391 assert_done!(io);
392 }
393
394 #[test]
read_single_multi_frame_one_packet_length_includes_head()395 fn read_single_multi_frame_one_packet_length_includes_head() {
396 let mut d: Vec<u8> = vec![];
397 d.extend_from_slice(b"\x00\x0babcdefghi");
398 d.extend_from_slice(b"\x00\x05123");
399 d.extend_from_slice(b"\x00\x0dhello world");
400
401 let io = length_delimited::Builder::new()
402 .length_field_length(2)
403 .length_adjustment(-2)
404 .new_read(mock! {
405 data(&d),
406 });
407 pin_mut!(io);
408
409 assert_next_eq!(io, b"abcdefghi");
410 assert_next_eq!(io, b"123");
411 assert_next_eq!(io, b"hello world");
412 assert_done!(io);
413 }
414
415 #[test]
write_single_frame_length_adjusted()416 fn write_single_frame_length_adjusted() {
417 let io = length_delimited::Builder::new()
418 .length_adjustment(-2)
419 .new_write(mock! {
420 data(b"\x00\x00\x00\x0b"),
421 data(b"abcdefghi"),
422 flush(),
423 });
424 pin_mut!(io);
425
426 task::spawn(()).enter(|cx, _| {
427 assert_ready_ok!(io.as_mut().poll_ready(cx));
428 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
429 assert_ready_ok!(io.as_mut().poll_flush(cx));
430 assert!(io.get_ref().calls.is_empty());
431 });
432 }
433
434 #[test]
write_nothing_yields_nothing()435 fn write_nothing_yields_nothing() {
436 let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
437 pin_mut!(io);
438
439 task::spawn(()).enter(|cx, _| {
440 assert_ready_ok!(io.poll_flush(cx));
441 });
442 }
443
444 #[test]
write_single_frame_one_packet()445 fn write_single_frame_one_packet() {
446 let io = FramedWrite::new(
447 mock! {
448 data(b"\x00\x00\x00\x09"),
449 data(b"abcdefghi"),
450 flush(),
451 },
452 LengthDelimitedCodec::new(),
453 );
454 pin_mut!(io);
455
456 task::spawn(()).enter(|cx, _| {
457 assert_ready_ok!(io.as_mut().poll_ready(cx));
458 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
459 assert_ready_ok!(io.as_mut().poll_flush(cx));
460 assert!(io.get_ref().calls.is_empty());
461 });
462 }
463
464 #[test]
write_single_multi_frame_one_packet()465 fn write_single_multi_frame_one_packet() {
466 let io = FramedWrite::new(
467 mock! {
468 data(b"\x00\x00\x00\x09"),
469 data(b"abcdefghi"),
470 data(b"\x00\x00\x00\x03"),
471 data(b"123"),
472 data(b"\x00\x00\x00\x0b"),
473 data(b"hello world"),
474 flush(),
475 },
476 LengthDelimitedCodec::new(),
477 );
478 pin_mut!(io);
479
480 task::spawn(()).enter(|cx, _| {
481 assert_ready_ok!(io.as_mut().poll_ready(cx));
482 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
483
484 assert_ready_ok!(io.as_mut().poll_ready(cx));
485 assert_ok!(io.as_mut().start_send(Bytes::from("123")));
486
487 assert_ready_ok!(io.as_mut().poll_ready(cx));
488 assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
489
490 assert_ready_ok!(io.as_mut().poll_flush(cx));
491 assert!(io.get_ref().calls.is_empty());
492 });
493 }
494
495 #[test]
write_single_multi_frame_multi_packet()496 fn write_single_multi_frame_multi_packet() {
497 let io = FramedWrite::new(
498 mock! {
499 data(b"\x00\x00\x00\x09"),
500 data(b"abcdefghi"),
501 flush(),
502 data(b"\x00\x00\x00\x03"),
503 data(b"123"),
504 flush(),
505 data(b"\x00\x00\x00\x0b"),
506 data(b"hello world"),
507 flush(),
508 },
509 LengthDelimitedCodec::new(),
510 );
511 pin_mut!(io);
512
513 task::spawn(()).enter(|cx, _| {
514 assert_ready_ok!(io.as_mut().poll_ready(cx));
515 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
516
517 assert_ready_ok!(io.as_mut().poll_flush(cx));
518
519 assert_ready_ok!(io.as_mut().poll_ready(cx));
520 assert_ok!(io.as_mut().start_send(Bytes::from("123")));
521
522 assert_ready_ok!(io.as_mut().poll_flush(cx));
523
524 assert_ready_ok!(io.as_mut().poll_ready(cx));
525 assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
526
527 assert_ready_ok!(io.as_mut().poll_flush(cx));
528 assert!(io.get_ref().calls.is_empty());
529 });
530 }
531
532 #[test]
write_single_frame_would_block()533 fn write_single_frame_would_block() {
534 let io = FramedWrite::new(
535 mock! {
536 Pending,
537 data(b"\x00\x00"),
538 Pending,
539 data(b"\x00\x09"),
540 data(b"abcdefghi"),
541 flush(),
542 },
543 LengthDelimitedCodec::new(),
544 );
545 pin_mut!(io);
546
547 task::spawn(()).enter(|cx, _| {
548 assert_ready_ok!(io.as_mut().poll_ready(cx));
549 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
550
551 assert_pending!(io.as_mut().poll_flush(cx));
552 assert_pending!(io.as_mut().poll_flush(cx));
553 assert_ready_ok!(io.as_mut().poll_flush(cx));
554
555 assert!(io.get_ref().calls.is_empty());
556 });
557 }
558
559 #[test]
write_single_frame_little_endian()560 fn write_single_frame_little_endian() {
561 let io = length_delimited::Builder::new()
562 .little_endian()
563 .new_write(mock! {
564 data(b"\x09\x00\x00\x00"),
565 data(b"abcdefghi"),
566 flush(),
567 });
568 pin_mut!(io);
569
570 task::spawn(()).enter(|cx, _| {
571 assert_ready_ok!(io.as_mut().poll_ready(cx));
572 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
573
574 assert_ready_ok!(io.as_mut().poll_flush(cx));
575 assert!(io.get_ref().calls.is_empty());
576 });
577 }
578
579 #[test]
write_single_frame_with_short_length_field()580 fn write_single_frame_with_short_length_field() {
581 let io = length_delimited::Builder::new()
582 .length_field_length(1)
583 .new_write(mock! {
584 data(b"\x09"),
585 data(b"abcdefghi"),
586 flush(),
587 });
588 pin_mut!(io);
589
590 task::spawn(()).enter(|cx, _| {
591 assert_ready_ok!(io.as_mut().poll_ready(cx));
592 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
593
594 assert_ready_ok!(io.as_mut().poll_flush(cx));
595
596 assert!(io.get_ref().calls.is_empty());
597 });
598 }
599
600 #[test]
write_max_frame_len()601 fn write_max_frame_len() {
602 let io = length_delimited::Builder::new()
603 .max_frame_length(5)
604 .new_write(mock! {});
605 pin_mut!(io);
606
607 task::spawn(()).enter(|cx, _| {
608 assert_ready_ok!(io.as_mut().poll_ready(cx));
609 assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
610
611 assert!(io.get_ref().calls.is_empty());
612 });
613 }
614
615 #[test]
write_update_max_frame_len_at_rest()616 fn write_update_max_frame_len_at_rest() {
617 let io = length_delimited::Builder::new().new_write(mock! {
618 data(b"\x00\x00\x00\x06"),
619 data(b"abcdef"),
620 flush(),
621 });
622 pin_mut!(io);
623
624 task::spawn(()).enter(|cx, _| {
625 assert_ready_ok!(io.as_mut().poll_ready(cx));
626 assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
627
628 assert_ready_ok!(io.as_mut().poll_flush(cx));
629
630 io.encoder_mut().set_max_frame_length(5);
631
632 assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
633
634 assert!(io.get_ref().calls.is_empty());
635 });
636 }
637
638 #[test]
write_update_max_frame_len_in_flight()639 fn write_update_max_frame_len_in_flight() {
640 let io = length_delimited::Builder::new().new_write(mock! {
641 data(b"\x00\x00\x00\x06"),
642 data(b"ab"),
643 Pending,
644 data(b"cdef"),
645 flush(),
646 });
647 pin_mut!(io);
648
649 task::spawn(()).enter(|cx, _| {
650 assert_ready_ok!(io.as_mut().poll_ready(cx));
651 assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
652
653 assert_pending!(io.as_mut().poll_flush(cx));
654
655 io.encoder_mut().set_max_frame_length(5);
656
657 assert_ready_ok!(io.as_mut().poll_flush(cx));
658
659 assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
660 assert!(io.get_ref().calls.is_empty());
661 });
662 }
663
664 #[test]
write_zero()665 fn write_zero() {
666 let io = length_delimited::Builder::new().new_write(mock! {});
667 pin_mut!(io);
668
669 task::spawn(()).enter(|cx, _| {
670 assert_ready_ok!(io.as_mut().poll_ready(cx));
671 assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
672
673 assert_ready_err!(io.as_mut().poll_flush(cx));
674
675 assert!(io.get_ref().calls.is_empty());
676 });
677 }
678
679 #[test]
encode_overflow()680 fn encode_overflow() {
681 // Test reproducing tokio-rs/tokio#681.
682 let mut codec = length_delimited::Builder::new().new_codec();
683 let mut buf = BytesMut::with_capacity(1024);
684
685 // Put some data into the buffer without resizing it to hold more.
686 let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
687 buf.put_slice(&some_as[..]);
688
689 // Trying to encode the length header should resize the buffer if it won't fit.
690 codec.encode(Bytes::from("hello"), &mut buf).unwrap();
691 }
692
693 // ===== Test utils =====
694
695 struct Mock {
696 calls: VecDeque<Poll<io::Result<Op>>>,
697 }
698
699 enum Op {
700 Data(Vec<u8>),
701 Flush,
702 }
703
704 use self::Op::*;
705
706 impl AsyncRead for Mock {
poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, dst: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>707 fn poll_read(
708 mut self: Pin<&mut Self>,
709 _cx: &mut Context<'_>,
710 dst: &mut ReadBuf<'_>,
711 ) -> Poll<io::Result<()>> {
712 match self.calls.pop_front() {
713 Some(Ready(Ok(Op::Data(data)))) => {
714 debug_assert!(dst.remaining() >= data.len());
715 dst.put_slice(&data);
716 Ready(Ok(()))
717 }
718 Some(Ready(Ok(_))) => panic!(),
719 Some(Ready(Err(e))) => Ready(Err(e)),
720 Some(Pending) => Pending,
721 None => Ready(Ok(())),
722 }
723 }
724 }
725
726 impl AsyncWrite for Mock {
poll_write( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, src: &[u8], ) -> Poll<Result<usize, io::Error>>727 fn poll_write(
728 mut self: Pin<&mut Self>,
729 _cx: &mut Context<'_>,
730 src: &[u8],
731 ) -> Poll<Result<usize, io::Error>> {
732 match self.calls.pop_front() {
733 Some(Ready(Ok(Op::Data(data)))) => {
734 let len = data.len();
735 assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
736 assert_eq!(&data[..], &src[..len]);
737 Ready(Ok(len))
738 }
739 Some(Ready(Ok(_))) => panic!(),
740 Some(Ready(Err(e))) => Ready(Err(e)),
741 Some(Pending) => Pending,
742 None => Ready(Ok(0)),
743 }
744 }
745
poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>746 fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
747 match self.calls.pop_front() {
748 Some(Ready(Ok(Op::Flush))) => Ready(Ok(())),
749 Some(Ready(Ok(_))) => panic!(),
750 Some(Ready(Err(e))) => Ready(Err(e)),
751 Some(Pending) => Pending,
752 None => Ready(Ok(())),
753 }
754 }
755
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>756 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
757 Ready(Ok(()))
758 }
759 }
760
761 impl<'a> From<&'a [u8]> for Op {
from(src: &'a [u8]) -> Op762 fn from(src: &'a [u8]) -> Op {
763 Op::Data(src.into())
764 }
765 }
766
767 impl From<Vec<u8>> for Op {
from(src: Vec<u8>) -> Op768 fn from(src: Vec<u8>) -> Op {
769 Op::Data(src)
770 }
771 }
772
data(bytes: &[u8]) -> Poll<io::Result<Op>>773 fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
774 Ready(Ok(bytes.into()))
775 }
776
flush() -> Poll<io::Result<Op>>777 fn flush() -> Poll<io::Result<Op>> {
778 Ready(Ok(Flush))
779 }
780