1 // Copyright (C) 2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 use super::Error;
28 use super::Result;
29 
30 use crate::octets;
31 
32 use super::frame;
33 
34 pub const HTTP3_CONTROL_STREAM_TYPE_ID: u64 = 0x0;
35 pub const HTTP3_PUSH_STREAM_TYPE_ID: u64 = 0x1;
36 pub const QPACK_ENCODER_STREAM_TYPE_ID: u64 = 0x2;
37 pub const QPACK_DECODER_STREAM_TYPE_ID: u64 = 0x3;
38 
39 const MAX_STATE_BUF_SIZE: usize = (1 << 24) - 1;
40 
41 #[derive(Clone, Copy, Debug, PartialEq)]
42 pub enum Type {
43     Control,
44     Request,
45     Push,
46     QpackEncoder,
47     QpackDecoder,
48     Unknown,
49 }
50 
51 #[derive(Clone, Copy, Debug, PartialEq)]
52 pub enum State {
53     /// Reading the stream's type.
54     StreamType,
55 
56     /// Reading the stream's current frame's type.
57     FrameType,
58 
59     /// Reading the stream's current frame's payload length.
60     FramePayloadLen,
61 
62     /// Reading the stream's current frame's payload.
63     FramePayload,
64 
65     /// Reading DATA payload.
66     Data,
67 
68     /// Reading the push ID.
69     PushId,
70 
71     /// Reading a QPACK instruction.
72     QpackInstruction,
73 
74     /// Reading and discarding data.
75     Drain,
76 
77     /// All data has been read.
78     Finished,
79 }
80 
81 impl Type {
deserialize(v: u64) -> Result<Type>82     pub fn deserialize(v: u64) -> Result<Type> {
83         match v {
84             HTTP3_CONTROL_STREAM_TYPE_ID => Ok(Type::Control),
85             HTTP3_PUSH_STREAM_TYPE_ID => Ok(Type::Push),
86             QPACK_ENCODER_STREAM_TYPE_ID => Ok(Type::QpackEncoder),
87             QPACK_DECODER_STREAM_TYPE_ID => Ok(Type::QpackDecoder),
88 
89             _ => Ok(Type::Unknown),
90         }
91     }
92 }
93 
94 /// An HTTP/3 stream.
95 ///
96 /// This maintains the HTTP/3 state for streams of any type (control, request,
97 /// QPACK, ...).
98 ///
99 /// A number of bytes, depending on the current stream's state, is read from the
100 /// transport stream into the HTTP/3 stream's "state buffer". This intermediate
101 /// buffering is required due to the fact that data read from the transport
102 /// might not be complete (e.g. a varint might be split across multiple QUIC
103 /// packets).
104 ///
105 /// When enough data to complete the current state has been buffered, it is
106 /// consumed from the state buffer and the stream is transitioned to the next
107 /// state (see `State` for a list of possible states).
108 #[derive(Debug)]
109 pub struct Stream {
110     /// The corresponding transport stream's ID.
111     id: u64,
112 
113     /// The stream's type (if known).
114     ty: Option<Type>,
115 
116     /// The current stream state.
117     state: State,
118 
119     /// The buffer holding partial data for the current state.
120     state_buf: Vec<u8>,
121 
122     /// The expected amount of bytes required to complete the state.
123     state_len: usize,
124 
125     /// The write offset in the state buffer, that is, how many bytes have
126     /// already been read from the transport for the current state. When
127     /// it reaches `stream_len` the state can be completed.
128     state_off: usize,
129 
130     /// The type of the frame currently being parsed.
131     frame_type: Option<u64>,
132 
133     /// Whether the stream was created locally, or by the peer.
134     is_local: bool,
135 
136     /// Whether the stream has been remotely initialized.
137     remote_initialized: bool,
138 
139     /// Whether the stream has been locally initialized.
140     local_initialized: bool,
141 
142     /// Whether a `Data` event has been triggered for this stream.
143     data_event_triggered: bool,
144 }
145 
146 impl Stream {
147     /// Creates a new HTTP/3 stream.
148     ///
149     /// The `is_local` parameter indicates whether the stream was created by the
150     /// local endpoint, or by the peer.
new(id: u64, is_local: bool) -> Stream151     pub fn new(id: u64, is_local: bool) -> Stream {
152         let (ty, state) = if crate::stream::is_bidi(id) {
153             // All bidirectional streams are "request" streams, so we don't
154             // need to read the stream type.
155             (Some(Type::Request), State::FrameType)
156         } else {
157             // The stream's type is yet to be determined.
158             (None, State::StreamType)
159         };
160 
161         Stream {
162             id,
163             ty,
164 
165             state,
166 
167             // Pre-allocate a buffer to avoid multiple tiny early allocations.
168             state_buf: vec![0; 16],
169 
170             // Expect one byte for the initial state, to parse the initial
171             // varint length.
172             state_len: 1,
173             state_off: 0,
174 
175             frame_type: None,
176 
177             is_local,
178             remote_initialized: false,
179             local_initialized: false,
180 
181             data_event_triggered: false,
182         }
183     }
184 
ty(&self) -> Option<Type>185     pub fn ty(&self) -> Option<Type> {
186         self.ty
187     }
188 
state(&self) -> State189     pub fn state(&self) -> State {
190         self.state
191     }
192 
193     /// Sets the stream's type and transitions to the next state.
set_ty(&mut self, ty: Type) -> Result<()>194     pub fn set_ty(&mut self, ty: Type) -> Result<()> {
195         assert_eq!(self.state, State::StreamType);
196 
197         self.ty = Some(ty);
198 
199         let state = match ty {
200             Type::Control | Type::Request => State::FrameType,
201 
202             Type::Push => State::PushId,
203 
204             Type::QpackEncoder | Type::QpackDecoder => {
205                 self.remote_initialized = true;
206 
207                 State::QpackInstruction
208             },
209 
210             Type::Unknown => State::Drain,
211         };
212 
213         self.state_transition(state, 1, true)?;
214 
215         Ok(())
216     }
217 
218     /// Sets the push ID and transitions to the next state.
set_push_id(&mut self, _id: u64) -> Result<()>219     pub fn set_push_id(&mut self, _id: u64) -> Result<()> {
220         assert_eq!(self.state, State::PushId);
221 
222         // TODO: implement push ID.
223 
224         self.state_transition(State::FrameType, 1, true)?;
225 
226         Ok(())
227     }
228 
229     /// Sets the frame type and transitions to the next state.
set_frame_type(&mut self, ty: u64) -> Result<()>230     pub fn set_frame_type(&mut self, ty: u64) -> Result<()> {
231         assert_eq!(self.state, State::FrameType);
232 
233         // Only expect frames on Control, Request and Push streams.
234         match self.ty {
235             Some(Type::Control) => {
236                 // Control stream starts uninitialized and only SETTINGS is
237                 // accepted in that state. Other frames cause an error. Once
238                 // initialized, no more SETTINGS are permitted.
239                 match (ty, self.remote_initialized) {
240                     // Initialize control stream.
241                     (frame::SETTINGS_FRAME_TYPE_ID, false) =>
242                         self.remote_initialized = true,
243 
244                     // Non-SETTINGS frames not allowed on control stream
245                     // before initialization.
246                     (_, false) => return Err(Error::MissingSettings),
247 
248                     // Additional SETTINGS frame.
249                     (frame::SETTINGS_FRAME_TYPE_ID, true) =>
250                         return Err(Error::FrameUnexpected),
251 
252                     // Frames that can't be received on control stream
253                     // after initialization.
254                     (frame::DATA_FRAME_TYPE_ID, true) =>
255                         return Err(Error::FrameUnexpected),
256 
257                     (frame::HEADERS_FRAME_TYPE_ID, true) =>
258                         return Err(Error::FrameUnexpected),
259 
260                     (frame::PUSH_PROMISE_FRAME_TYPE_ID, true) =>
261                         return Err(Error::FrameUnexpected),
262 
263                     // All other frames are ignored after initialization.
264                     (_, true) => (),
265                 }
266             },
267 
268             Some(Type::Request) => {
269                 // Request stream starts uninitialized and only HEADERS
270                 // is accepted. Other frames cause an error.
271                 if !self.is_local {
272                     match (ty, self.remote_initialized) {
273                         (frame::HEADERS_FRAME_TYPE_ID, false) =>
274                             self.remote_initialized = true,
275 
276                         (frame::DATA_FRAME_TYPE_ID, false) =>
277                             return Err(Error::FrameUnexpected),
278 
279                         (frame::CANCEL_PUSH_FRAME_TYPE_ID, _) =>
280                             return Err(Error::FrameUnexpected),
281 
282                         (frame::SETTINGS_FRAME_TYPE_ID, _) =>
283                             return Err(Error::FrameUnexpected),
284 
285                         (frame::GOAWAY_FRAME_TYPE_ID, _) =>
286                             return Err(Error::FrameUnexpected),
287 
288                         (frame::MAX_PUSH_FRAME_TYPE_ID, _) =>
289                             return Err(Error::FrameUnexpected),
290 
291                         // All other frames can be ignored regardless of stream
292                         // state.
293                         _ => (),
294                     }
295                 }
296             },
297 
298             Some(Type::Push) => {
299                 match ty {
300                     // Frames that can never be received on request streams.
301                     frame::CANCEL_PUSH_FRAME_TYPE_ID =>
302                         return Err(Error::FrameUnexpected),
303 
304                     frame::SETTINGS_FRAME_TYPE_ID =>
305                         return Err(Error::FrameUnexpected),
306 
307                     frame::PUSH_PROMISE_FRAME_TYPE_ID =>
308                         return Err(Error::FrameUnexpected),
309 
310                     frame::GOAWAY_FRAME_TYPE_ID =>
311                         return Err(Error::FrameUnexpected),
312 
313                     frame::MAX_PUSH_FRAME_TYPE_ID =>
314                         return Err(Error::FrameUnexpected),
315 
316                     _ => (),
317                 }
318             },
319 
320             _ => return Err(Error::FrameUnexpected),
321         }
322 
323         self.frame_type = Some(ty);
324 
325         self.state_transition(State::FramePayloadLen, 1, true)?;
326 
327         Ok(())
328     }
329 
330     /// Sets the frame's payload length and transitions to the next state.
set_frame_payload_len(&mut self, len: u64) -> Result<()>331     pub fn set_frame_payload_len(&mut self, len: u64) -> Result<()> {
332         assert_eq!(self.state, State::FramePayloadLen);
333 
334         // Only expect frames on Control, Request and Push streams.
335         if self.ty == Some(Type::Control) ||
336             self.ty == Some(Type::Request) ||
337             self.ty == Some(Type::Push)
338         {
339             let (state, resize) = match self.frame_type {
340                 Some(frame::DATA_FRAME_TYPE_ID) => (State::Data, false),
341 
342                 _ => (State::FramePayload, true),
343             };
344 
345             self.state_transition(state, len as usize, resize)?;
346 
347             return Ok(());
348         }
349 
350         Err(Error::InternalError)
351     }
352 
353     /// Tries to fill the state buffer by reading data from the corresponding
354     /// transport stream.
355     ///
356     /// When not enough data can be read to complete the state, this returns
357     /// `Error::Done`.
try_fill_buffer( &mut self, conn: &mut crate::Connection, ) -> Result<()>358     pub fn try_fill_buffer(
359         &mut self, conn: &mut crate::Connection,
360     ) -> Result<()> {
361         let buf = &mut self.state_buf[self.state_off..self.state_len];
362 
363         let read = match conn.stream_recv(self.id, buf) {
364             Ok((len, _)) => len,
365 
366             Err(e) => {
367                 // The stream is not readable anymore, so re-arm the Data event.
368                 if e == crate::Error::Done {
369                     self.reset_data_event();
370                 }
371 
372                 return Err(e.into());
373             },
374         };
375 
376         trace!(
377             "{} read {} bytes on stream {}",
378             conn.trace_id(),
379             read,
380             self.id,
381         );
382 
383         self.state_off += read;
384 
385         if !self.state_buffer_complete() {
386             self.reset_data_event();
387 
388             return Err(Error::Done);
389         }
390 
391         Ok(())
392     }
393 
394     /// Initialize the local part of the stream.
initialize_local(&mut self)395     pub fn initialize_local(&mut self) {
396         self.local_initialized = true
397     }
398 
399     /// Whether the stream has been locally initialized.
local_initialized(&self) -> bool400     pub fn local_initialized(&self) -> bool {
401         self.local_initialized
402     }
403 
404     /// Tries to fill the state buffer by reading data from the given cursor.
405     ///
406     /// This is intended to replace `try_fill_buffer()` in tests, in order to
407     /// avoid having to setup a transport connection.
408     #[cfg(test)]
try_fill_buffer_for_tests( &mut self, stream: &mut std::io::Cursor<Vec<u8>>, ) -> Result<()>409     fn try_fill_buffer_for_tests(
410         &mut self, stream: &mut std::io::Cursor<Vec<u8>>,
411     ) -> Result<()> {
412         let buf = &mut self.state_buf[self.state_off..self.state_len];
413 
414         let read = std::io::Read::read(stream, buf).unwrap();
415 
416         self.state_off += read;
417 
418         if !self.state_buffer_complete() {
419             return Err(Error::Done);
420         }
421 
422         Ok(())
423     }
424 
425     /// Tries to parse a varint (including length) from the state buffer.
try_consume_varint(&mut self) -> Result<u64>426     pub fn try_consume_varint(&mut self) -> Result<u64> {
427         if self.state_off == 1 {
428             self.state_len = octets::varint_parse_len(self.state_buf[0]);
429             self.state_buf.resize(self.state_len, 0);
430         }
431 
432         // Return early if we don't have enough data in the state buffer to
433         // parse the whole varint.
434         if !self.state_buffer_complete() {
435             return Err(Error::Done);
436         }
437 
438         let varint = octets::Octets::with_slice(&self.state_buf).get_varint()?;
439 
440         Ok(varint)
441     }
442 
443     /// Tries to parse a frame from the state buffer.
try_consume_frame(&mut self) -> Result<frame::Frame>444     pub fn try_consume_frame(&mut self) -> Result<frame::Frame> {
445         // Processing a frame other than DATA, so re-arm the Data event.
446         self.reset_data_event();
447 
448         // TODO: properly propagate frame parsing errors.
449         let frame = frame::Frame::from_bytes(
450             self.frame_type.unwrap(),
451             self.state_len as u64,
452             &self.state_buf,
453         )?;
454 
455         self.state_transition(State::FrameType, 1, true)?;
456 
457         Ok(frame)
458     }
459 
460     /// Tries to read DATA payload from the transport stream.
try_consume_data( &mut self, conn: &mut crate::Connection, out: &mut [u8], ) -> Result<(usize, bool)>461     pub fn try_consume_data(
462         &mut self, conn: &mut crate::Connection, out: &mut [u8],
463     ) -> Result<(usize, bool)> {
464         let left = std::cmp::min(out.len(), self.state_len - self.state_off);
465 
466         let (len, fin) = match conn.stream_recv(self.id, &mut out[..left]) {
467             Ok(v) => v,
468 
469             Err(e) => {
470                 // The stream is not readable anymore, so re-arm the Data event.
471                 if e == crate::Error::Done {
472                     self.reset_data_event();
473                 }
474 
475                 return Err(e.into());
476             },
477         };
478 
479         self.state_off += len;
480 
481         // The stream is not readable anymore, so re-arm the Data event.
482         if !conn.stream_readable(self.id) {
483             self.reset_data_event();
484         }
485 
486         if self.state_buffer_complete() {
487             self.state_transition(State::FrameType, 1, true)?;
488         }
489 
490         Ok((len, fin))
491     }
492 
493     /// Marks the stream as finished.
finished(&mut self)494     pub fn finished(&mut self) {
495         let _ = self.state_transition(State::Finished, 0, false);
496     }
497 
498     /// Tries to read DATA payload from the given cursor.
499     ///
500     /// This is intended to replace `try_consume_data()` in tests, in order to
501     /// avoid having to setup a transport connection.
502     #[cfg(test)]
try_consume_data_for_tests( &mut self, stream: &mut std::io::Cursor<Vec<u8>>, out: &mut [u8], ) -> Result<usize>503     fn try_consume_data_for_tests(
504         &mut self, stream: &mut std::io::Cursor<Vec<u8>>, out: &mut [u8],
505     ) -> Result<usize> {
506         let left = std::cmp::min(out.len(), self.state_len - self.state_off);
507 
508         let len = std::io::Read::read(stream, &mut out[..left]).unwrap();
509 
510         self.state_off += len;
511 
512         if self.state_buffer_complete() {
513             self.state_transition(State::FrameType, 1, true)?;
514         }
515 
516         Ok(len)
517     }
518 
519     /// Tries to update the data triggered state for the stream.
520     ///
521     /// This returns `true` if a Data event was not already triggered before
522     /// the last reset, and updates the state. Returns `false` otherwise.
try_trigger_data_event(&mut self) -> bool523     pub fn try_trigger_data_event(&mut self) -> bool {
524         if self.data_event_triggered {
525             return false;
526         }
527 
528         self.data_event_triggered = true;
529 
530         true
531     }
532 
533     /// Resets the data triggered state.
reset_data_event(&mut self)534     fn reset_data_event(&mut self) {
535         self.data_event_triggered = false;
536     }
537 
538     /// Returns true if the state buffer has enough data to complete the state.
state_buffer_complete(&self) -> bool539     fn state_buffer_complete(&self) -> bool {
540         self.state_off == self.state_len
541     }
542 
543     /// Transitions the stream to a new state, and optionally resets the state
544     /// buffer.
state_transition( &mut self, new_state: State, expected_len: usize, resize: bool, ) -> Result<()>545     fn state_transition(
546         &mut self, new_state: State, expected_len: usize, resize: bool,
547     ) -> Result<()> {
548         self.state = new_state;
549         self.state_off = 0;
550         self.state_len = expected_len;
551 
552         // Some states don't need the state buffer, so don't resize it if not
553         // necessary.
554         if resize {
555             // A peer can influence the size of the state buffer (e.g. with the
556             // payload size of a GREASE frame), so we need to limit the maximum
557             // size to avoid DoS.
558             if self.state_len > MAX_STATE_BUF_SIZE {
559                 return Err(Error::InternalError);
560             }
561 
562             self.state_buf.resize(self.state_len, 0);
563         }
564 
565         Ok(())
566     }
567 }
568 
569 #[cfg(test)]
570 mod tests {
571     use super::*;
572 
573     #[test]
574     /// Process incoming SETTINGS frame on control stream.
control_good()575     fn control_good() {
576         let mut stream = Stream::new(3, false);
577         assert_eq!(stream.state, State::StreamType);
578 
579         let mut d = vec![42; 40];
580         let mut b = octets::OctetsMut::with_slice(&mut d);
581 
582         let frame = frame::Frame::Settings {
583             max_header_list_size: Some(0),
584             qpack_max_table_capacity: Some(0),
585             qpack_blocked_streams: Some(0),
586             h3_datagram: None,
587             grease: None,
588         };
589 
590         b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
591         frame.to_bytes(&mut b).unwrap();
592 
593         let mut cursor = std::io::Cursor::new(d);
594 
595         // Parse stream type.
596         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
597 
598         let stream_ty = stream.try_consume_varint().unwrap();
599         assert_eq!(stream_ty, HTTP3_CONTROL_STREAM_TYPE_ID);
600         stream
601             .set_ty(Type::deserialize(stream_ty).unwrap())
602             .unwrap();
603         assert_eq!(stream.state, State::FrameType);
604 
605         // Parse the SETTINGS frame type.
606         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
607 
608         let frame_ty = stream.try_consume_varint().unwrap();
609         assert_eq!(frame_ty, frame::SETTINGS_FRAME_TYPE_ID);
610 
611         stream.set_frame_type(frame_ty).unwrap();
612         assert_eq!(stream.state, State::FramePayloadLen);
613 
614         // Parse the SETTINGS frame payload length.
615         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
616 
617         let frame_payload_len = stream.try_consume_varint().unwrap();
618         assert_eq!(frame_payload_len, 6);
619         stream.set_frame_payload_len(frame_payload_len).unwrap();
620         assert_eq!(stream.state, State::FramePayload);
621 
622         // Parse the SETTINGS frame payload.
623         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
624 
625         assert_eq!(stream.try_consume_frame(), Ok(frame));
626         assert_eq!(stream.state, State::FrameType);
627     }
628 
629     #[test]
630     /// Process duplicate SETTINGS frame on control stream.
control_bad_multiple_settings()631     fn control_bad_multiple_settings() {
632         let mut stream = Stream::new(3, false);
633         assert_eq!(stream.state, State::StreamType);
634 
635         let mut d = vec![42; 40];
636         let mut b = octets::OctetsMut::with_slice(&mut d);
637 
638         let frame = frame::Frame::Settings {
639             max_header_list_size: Some(0),
640             qpack_max_table_capacity: Some(0),
641             qpack_blocked_streams: Some(0),
642             h3_datagram: None,
643             grease: None,
644         };
645 
646         b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
647         frame.to_bytes(&mut b).unwrap();
648         frame.to_bytes(&mut b).unwrap();
649 
650         let mut cursor = std::io::Cursor::new(d);
651 
652         // Parse stream type.
653         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
654 
655         let stream_ty = stream.try_consume_varint().unwrap();
656         assert_eq!(stream_ty, HTTP3_CONTROL_STREAM_TYPE_ID);
657         stream
658             .set_ty(Type::deserialize(stream_ty).unwrap())
659             .unwrap();
660         assert_eq!(stream.state, State::FrameType);
661 
662         // Parse the SETTINGS frame type.
663         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
664 
665         let frame_ty = stream.try_consume_varint().unwrap();
666         assert_eq!(frame_ty, frame::SETTINGS_FRAME_TYPE_ID);
667 
668         stream.set_frame_type(frame_ty).unwrap();
669         assert_eq!(stream.state, State::FramePayloadLen);
670 
671         // Parse the SETTINGS frame payload length.
672         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
673 
674         let frame_payload_len = stream.try_consume_varint().unwrap();
675         assert_eq!(frame_payload_len, 6);
676         stream.set_frame_payload_len(frame_payload_len).unwrap();
677         assert_eq!(stream.state, State::FramePayload);
678 
679         // Parse the SETTINGS frame payload.
680         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
681 
682         assert_eq!(stream.try_consume_frame(), Ok(frame));
683         assert_eq!(stream.state, State::FrameType);
684 
685         // Parse the second SETTINGS frame type.
686         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
687 
688         let frame_ty = stream.try_consume_varint().unwrap();
689         assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
690     }
691 
692     #[test]
693     /// Process other frame before SETTINGS frame on control stream.
control_bad_late_settings()694     fn control_bad_late_settings() {
695         let mut stream = Stream::new(3, false);
696         assert_eq!(stream.state, State::StreamType);
697 
698         let mut d = vec![42; 40];
699         let mut b = octets::OctetsMut::with_slice(&mut d);
700 
701         let goaway = frame::Frame::GoAway { id: 0 };
702 
703         let settings = frame::Frame::Settings {
704             max_header_list_size: Some(0),
705             qpack_max_table_capacity: Some(0),
706             qpack_blocked_streams: Some(0),
707             h3_datagram: None,
708             grease: None,
709         };
710 
711         b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
712         goaway.to_bytes(&mut b).unwrap();
713         settings.to_bytes(&mut b).unwrap();
714 
715         let mut cursor = std::io::Cursor::new(d);
716 
717         // Parse stream type.
718         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
719 
720         let stream_ty = stream.try_consume_varint().unwrap();
721         assert_eq!(stream_ty, HTTP3_CONTROL_STREAM_TYPE_ID);
722         stream
723             .set_ty(Type::deserialize(stream_ty).unwrap())
724             .unwrap();
725         assert_eq!(stream.state, State::FrameType);
726 
727         // Parse GOAWAY.
728         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
729 
730         let frame_ty = stream.try_consume_varint().unwrap();
731         assert_eq!(stream.set_frame_type(frame_ty), Err(Error::MissingSettings));
732     }
733 
734     #[test]
735     /// Process not-allowed frame on control stream.
control_bad_frame()736     fn control_bad_frame() {
737         let mut stream = Stream::new(3, false);
738         assert_eq!(stream.state, State::StreamType);
739 
740         let mut d = vec![42; 40];
741         let mut b = octets::OctetsMut::with_slice(&mut d);
742 
743         let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
744         let hdrs = frame::Frame::Headers { header_block };
745 
746         let settings = frame::Frame::Settings {
747             max_header_list_size: Some(0),
748             qpack_max_table_capacity: Some(0),
749             qpack_blocked_streams: Some(0),
750             h3_datagram: None,
751             grease: None,
752         };
753 
754         b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
755         settings.to_bytes(&mut b).unwrap();
756         hdrs.to_bytes(&mut b).unwrap();
757 
758         let mut cursor = std::io::Cursor::new(d);
759 
760         // Parse stream type.
761         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
762 
763         let stream_ty = stream.try_consume_varint().unwrap();
764         stream
765             .set_ty(Type::deserialize(stream_ty).unwrap())
766             .unwrap();
767 
768         // Parse first SETTINGS frame.
769         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
770 
771         let frame_ty = stream.try_consume_varint().unwrap();
772         stream.set_frame_type(frame_ty).unwrap();
773 
774         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
775 
776         let frame_payload_len = stream.try_consume_varint().unwrap();
777         stream.set_frame_payload_len(frame_payload_len).unwrap();
778 
779         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
780 
781         assert!(stream.try_consume_frame().is_ok());
782 
783         // Parse HEADERS.
784         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
785 
786         let frame_ty = stream.try_consume_varint().unwrap();
787         assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
788     }
789 
790     #[test]
request_no_data()791     fn request_no_data() {
792         let mut stream = Stream::new(0, false);
793 
794         assert_eq!(stream.ty, Some(Type::Request));
795         assert_eq!(stream.state, State::FrameType);
796 
797         assert_eq!(stream.try_consume_varint(), Err(Error::Done));
798     }
799 
800     #[test]
request_good()801     fn request_good() {
802         let mut stream = Stream::new(0, false);
803 
804         let mut d = vec![42; 128];
805         let mut b = octets::OctetsMut::with_slice(&mut d);
806 
807         let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
808         let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
809         let hdrs = frame::Frame::Headers { header_block };
810         let data = frame::Frame::Data {
811             payload: payload.clone(),
812         };
813 
814         hdrs.to_bytes(&mut b).unwrap();
815         data.to_bytes(&mut b).unwrap();
816 
817         let mut cursor = std::io::Cursor::new(d);
818 
819         // Parse the HEADERS frame type.
820         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
821 
822         let frame_ty = stream.try_consume_varint().unwrap();
823         assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID);
824 
825         stream.set_frame_type(frame_ty).unwrap();
826         assert_eq!(stream.state, State::FramePayloadLen);
827 
828         // Parse the HEADERS frame payload length.
829         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
830 
831         let frame_payload_len = stream.try_consume_varint().unwrap();
832         assert_eq!(frame_payload_len, 12);
833 
834         stream.set_frame_payload_len(frame_payload_len).unwrap();
835         assert_eq!(stream.state, State::FramePayload);
836 
837         // Parse the HEADERS frame.
838         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
839 
840         assert_eq!(stream.try_consume_frame(), Ok(hdrs));
841         assert_eq!(stream.state, State::FrameType);
842 
843         // Parse the DATA frame type.
844         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
845 
846         let frame_ty = stream.try_consume_varint().unwrap();
847         assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID);
848 
849         stream.set_frame_type(frame_ty).unwrap();
850         assert_eq!(stream.state, State::FramePayloadLen);
851 
852         // Parse the DATA frame payload length.
853         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
854 
855         let frame_payload_len = stream.try_consume_varint().unwrap();
856         assert_eq!(frame_payload_len, 12);
857 
858         stream.set_frame_payload_len(frame_payload_len).unwrap();
859         assert_eq!(stream.state, State::Data);
860 
861         // Parse the DATA payload.
862         let mut recv_buf = vec![0; payload.len()];
863         assert_eq!(
864             stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
865             Ok(payload.len())
866         );
867         assert_eq!(payload, recv_buf);
868 
869         assert_eq!(stream.state, State::FrameType);
870     }
871 
872     #[test]
push_good()873     fn push_good() {
874         let mut stream = Stream::new(2, false);
875 
876         let mut d = vec![42; 128];
877         let mut b = octets::OctetsMut::with_slice(&mut d);
878 
879         let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
880         let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
881         let hdrs = frame::Frame::Headers { header_block };
882         let data = frame::Frame::Data {
883             payload: payload.clone(),
884         };
885 
886         b.put_varint(HTTP3_PUSH_STREAM_TYPE_ID).unwrap();
887         b.put_varint(1).unwrap();
888         hdrs.to_bytes(&mut b).unwrap();
889         data.to_bytes(&mut b).unwrap();
890 
891         let mut cursor = std::io::Cursor::new(d);
892 
893         // Parse stream type.
894         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
895 
896         let stream_ty = stream.try_consume_varint().unwrap();
897         assert_eq!(stream_ty, HTTP3_PUSH_STREAM_TYPE_ID);
898         stream
899             .set_ty(Type::deserialize(stream_ty).unwrap())
900             .unwrap();
901         assert_eq!(stream.state, State::PushId);
902 
903         // Parse push ID.
904         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
905 
906         let push_id = stream.try_consume_varint().unwrap();
907         assert_eq!(push_id, 1);
908 
909         stream.set_push_id(push_id).unwrap();
910         assert_eq!(stream.state, State::FrameType);
911 
912         // Parse the HEADERS frame type.
913         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
914 
915         let frame_ty = stream.try_consume_varint().unwrap();
916         assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID);
917 
918         stream.set_frame_type(frame_ty).unwrap();
919         assert_eq!(stream.state, State::FramePayloadLen);
920 
921         // Parse the HEADERS frame payload length.
922         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
923 
924         let frame_payload_len = stream.try_consume_varint().unwrap();
925         assert_eq!(frame_payload_len, 12);
926 
927         stream.set_frame_payload_len(frame_payload_len).unwrap();
928         assert_eq!(stream.state, State::FramePayload);
929 
930         // Parse the HEADERS frame.
931         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
932 
933         assert_eq!(stream.try_consume_frame(), Ok(hdrs));
934         assert_eq!(stream.state, State::FrameType);
935 
936         // Parse the DATA frame type.
937         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
938 
939         let frame_ty = stream.try_consume_varint().unwrap();
940         assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID);
941 
942         stream.set_frame_type(frame_ty).unwrap();
943         assert_eq!(stream.state, State::FramePayloadLen);
944 
945         // Parse the DATA frame payload length.
946         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
947 
948         let frame_payload_len = stream.try_consume_varint().unwrap();
949         assert_eq!(frame_payload_len, 12);
950 
951         stream.set_frame_payload_len(frame_payload_len).unwrap();
952         assert_eq!(stream.state, State::Data);
953 
954         // Parse the DATA payload.
955         let mut recv_buf = vec![0; payload.len()];
956         assert_eq!(
957             stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
958             Ok(payload.len())
959         );
960         assert_eq!(payload, recv_buf);
961 
962         assert_eq!(stream.state, State::FrameType);
963     }
964 
965     #[test]
grease()966     fn grease() {
967         let mut stream = Stream::new(2, false);
968 
969         let mut d = vec![42; 20];
970         let mut b = octets::OctetsMut::with_slice(&mut d);
971 
972         b.put_varint(33).unwrap();
973 
974         let mut cursor = std::io::Cursor::new(d);
975 
976         // Parse stream type.
977         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
978 
979         let stream_ty = stream.try_consume_varint().unwrap();
980         assert_eq!(stream_ty, 33);
981         stream
982             .set_ty(Type::deserialize(stream_ty).unwrap())
983             .unwrap();
984         assert_eq!(stream.state, State::Drain);
985     }
986 
987     #[test]
data_before_headers()988     fn data_before_headers() {
989         let mut stream = Stream::new(0, false);
990 
991         let mut d = vec![42; 128];
992         let mut b = octets::OctetsMut::with_slice(&mut d);
993 
994         let data = frame::Frame::Data {
995             payload: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
996         };
997 
998         data.to_bytes(&mut b).unwrap();
999 
1000         let mut cursor = std::io::Cursor::new(d);
1001 
1002         // Parse the DATA frame type.
1003         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1004 
1005         let frame_ty = stream.try_consume_varint().unwrap();
1006         assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID);
1007 
1008         assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
1009     }
1010 }
1011