1 // Copyright (C) 2019, Cloudflare, Inc. 2 // All rights reserved. 3 // 4 // Redistribution and use in source and binary forms, with or without 5 // modification, are permitted provided that the following conditions are 6 // met: 7 // 8 // * Redistributions of source code must retain the above copyright notice, 9 // this list of conditions and the following disclaimer. 10 // 11 // * Redistributions in binary form must reproduce the above copyright 12 // notice, this list of conditions and the following disclaimer in the 13 // documentation and/or other materials provided with the distribution. 14 // 15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27 use super::Error; 28 use super::Result; 29 30 use crate::octets; 31 32 use super::frame; 33 34 pub const HTTP3_CONTROL_STREAM_TYPE_ID: u64 = 0x0; 35 pub const HTTP3_PUSH_STREAM_TYPE_ID: u64 = 0x1; 36 pub const QPACK_ENCODER_STREAM_TYPE_ID: u64 = 0x2; 37 pub const QPACK_DECODER_STREAM_TYPE_ID: u64 = 0x3; 38 39 const MAX_STATE_BUF_SIZE: usize = (1 << 24) - 1; 40 41 #[derive(Clone, Copy, Debug, PartialEq)] 42 pub enum Type { 43 Control, 44 Request, 45 Push, 46 QpackEncoder, 47 QpackDecoder, 48 Unknown, 49 } 50 51 #[derive(Clone, Copy, Debug, PartialEq)] 52 pub enum State { 53 /// Reading the stream's type. 54 StreamType, 55 56 /// Reading the stream's current frame's type. 57 FrameType, 58 59 /// Reading the stream's current frame's payload length. 60 FramePayloadLen, 61 62 /// Reading the stream's current frame's payload. 63 FramePayload, 64 65 /// Reading DATA payload. 66 Data, 67 68 /// Reading the push ID. 69 PushId, 70 71 /// Reading a QPACK instruction. 72 QpackInstruction, 73 74 /// Reading and discarding data. 75 Drain, 76 77 /// All data has been read. 78 Finished, 79 } 80 81 impl Type { deserialize(v: u64) -> Result<Type>82 pub fn deserialize(v: u64) -> Result<Type> { 83 match v { 84 HTTP3_CONTROL_STREAM_TYPE_ID => Ok(Type::Control), 85 HTTP3_PUSH_STREAM_TYPE_ID => Ok(Type::Push), 86 QPACK_ENCODER_STREAM_TYPE_ID => Ok(Type::QpackEncoder), 87 QPACK_DECODER_STREAM_TYPE_ID => Ok(Type::QpackDecoder), 88 89 _ => Ok(Type::Unknown), 90 } 91 } 92 } 93 94 /// An HTTP/3 stream. 95 /// 96 /// This maintains the HTTP/3 state for streams of any type (control, request, 97 /// QPACK, ...). 98 /// 99 /// A number of bytes, depending on the current stream's state, is read from the 100 /// transport stream into the HTTP/3 stream's "state buffer". This intermediate 101 /// buffering is required due to the fact that data read from the transport 102 /// might not be complete (e.g. a varint might be split across multiple QUIC 103 /// packets). 104 /// 105 /// When enough data to complete the current state has been buffered, it is 106 /// consumed from the state buffer and the stream is transitioned to the next 107 /// state (see `State` for a list of possible states). 108 #[derive(Debug)] 109 pub struct Stream { 110 /// The corresponding transport stream's ID. 111 id: u64, 112 113 /// The stream's type (if known). 114 ty: Option<Type>, 115 116 /// The current stream state. 117 state: State, 118 119 /// The buffer holding partial data for the current state. 120 state_buf: Vec<u8>, 121 122 /// The expected amount of bytes required to complete the state. 123 state_len: usize, 124 125 /// The write offset in the state buffer, that is, how many bytes have 126 /// already been read from the transport for the current state. When 127 /// it reaches `stream_len` the state can be completed. 128 state_off: usize, 129 130 /// The type of the frame currently being parsed. 131 frame_type: Option<u64>, 132 133 /// Whether the stream was created locally, or by the peer. 134 is_local: bool, 135 136 /// Whether the stream has been remotely initialized. 137 remote_initialized: bool, 138 139 /// Whether the stream has been locally initialized. 140 local_initialized: bool, 141 142 /// Whether a `Data` event has been triggered for this stream. 143 data_event_triggered: bool, 144 } 145 146 impl Stream { 147 /// Creates a new HTTP/3 stream. 148 /// 149 /// The `is_local` parameter indicates whether the stream was created by the 150 /// local endpoint, or by the peer. new(id: u64, is_local: bool) -> Stream151 pub fn new(id: u64, is_local: bool) -> Stream { 152 let (ty, state) = if crate::stream::is_bidi(id) { 153 // All bidirectional streams are "request" streams, so we don't 154 // need to read the stream type. 155 (Some(Type::Request), State::FrameType) 156 } else { 157 // The stream's type is yet to be determined. 158 (None, State::StreamType) 159 }; 160 161 Stream { 162 id, 163 ty, 164 165 state, 166 167 // Pre-allocate a buffer to avoid multiple tiny early allocations. 168 state_buf: vec![0; 16], 169 170 // Expect one byte for the initial state, to parse the initial 171 // varint length. 172 state_len: 1, 173 state_off: 0, 174 175 frame_type: None, 176 177 is_local, 178 remote_initialized: false, 179 local_initialized: false, 180 181 data_event_triggered: false, 182 } 183 } 184 ty(&self) -> Option<Type>185 pub fn ty(&self) -> Option<Type> { 186 self.ty 187 } 188 state(&self) -> State189 pub fn state(&self) -> State { 190 self.state 191 } 192 193 /// Sets the stream's type and transitions to the next state. set_ty(&mut self, ty: Type) -> Result<()>194 pub fn set_ty(&mut self, ty: Type) -> Result<()> { 195 assert_eq!(self.state, State::StreamType); 196 197 self.ty = Some(ty); 198 199 let state = match ty { 200 Type::Control | Type::Request => State::FrameType, 201 202 Type::Push => State::PushId, 203 204 Type::QpackEncoder | Type::QpackDecoder => { 205 self.remote_initialized = true; 206 207 State::QpackInstruction 208 }, 209 210 Type::Unknown => State::Drain, 211 }; 212 213 self.state_transition(state, 1, true)?; 214 215 Ok(()) 216 } 217 218 /// Sets the push ID and transitions to the next state. set_push_id(&mut self, _id: u64) -> Result<()>219 pub fn set_push_id(&mut self, _id: u64) -> Result<()> { 220 assert_eq!(self.state, State::PushId); 221 222 // TODO: implement push ID. 223 224 self.state_transition(State::FrameType, 1, true)?; 225 226 Ok(()) 227 } 228 229 /// Sets the frame type and transitions to the next state. set_frame_type(&mut self, ty: u64) -> Result<()>230 pub fn set_frame_type(&mut self, ty: u64) -> Result<()> { 231 assert_eq!(self.state, State::FrameType); 232 233 // Only expect frames on Control, Request and Push streams. 234 match self.ty { 235 Some(Type::Control) => { 236 // Control stream starts uninitialized and only SETTINGS is 237 // accepted in that state. Other frames cause an error. Once 238 // initialized, no more SETTINGS are permitted. 239 match (ty, self.remote_initialized) { 240 // Initialize control stream. 241 (frame::SETTINGS_FRAME_TYPE_ID, false) => 242 self.remote_initialized = true, 243 244 // Non-SETTINGS frames not allowed on control stream 245 // before initialization. 246 (_, false) => return Err(Error::MissingSettings), 247 248 // Additional SETTINGS frame. 249 (frame::SETTINGS_FRAME_TYPE_ID, true) => 250 return Err(Error::FrameUnexpected), 251 252 // Frames that can't be received on control stream 253 // after initialization. 254 (frame::DATA_FRAME_TYPE_ID, true) => 255 return Err(Error::FrameUnexpected), 256 257 (frame::HEADERS_FRAME_TYPE_ID, true) => 258 return Err(Error::FrameUnexpected), 259 260 (frame::PUSH_PROMISE_FRAME_TYPE_ID, true) => 261 return Err(Error::FrameUnexpected), 262 263 // All other frames are ignored after initialization. 264 (_, true) => (), 265 } 266 }, 267 268 Some(Type::Request) => { 269 // Request stream starts uninitialized and only HEADERS 270 // is accepted. Other frames cause an error. 271 if !self.is_local { 272 match (ty, self.remote_initialized) { 273 (frame::HEADERS_FRAME_TYPE_ID, false) => 274 self.remote_initialized = true, 275 276 (frame::DATA_FRAME_TYPE_ID, false) => 277 return Err(Error::FrameUnexpected), 278 279 (frame::CANCEL_PUSH_FRAME_TYPE_ID, _) => 280 return Err(Error::FrameUnexpected), 281 282 (frame::SETTINGS_FRAME_TYPE_ID, _) => 283 return Err(Error::FrameUnexpected), 284 285 (frame::GOAWAY_FRAME_TYPE_ID, _) => 286 return Err(Error::FrameUnexpected), 287 288 (frame::MAX_PUSH_FRAME_TYPE_ID, _) => 289 return Err(Error::FrameUnexpected), 290 291 // All other frames can be ignored regardless of stream 292 // state. 293 _ => (), 294 } 295 } 296 }, 297 298 Some(Type::Push) => { 299 match ty { 300 // Frames that can never be received on request streams. 301 frame::CANCEL_PUSH_FRAME_TYPE_ID => 302 return Err(Error::FrameUnexpected), 303 304 frame::SETTINGS_FRAME_TYPE_ID => 305 return Err(Error::FrameUnexpected), 306 307 frame::PUSH_PROMISE_FRAME_TYPE_ID => 308 return Err(Error::FrameUnexpected), 309 310 frame::GOAWAY_FRAME_TYPE_ID => 311 return Err(Error::FrameUnexpected), 312 313 frame::MAX_PUSH_FRAME_TYPE_ID => 314 return Err(Error::FrameUnexpected), 315 316 _ => (), 317 } 318 }, 319 320 _ => return Err(Error::FrameUnexpected), 321 } 322 323 self.frame_type = Some(ty); 324 325 self.state_transition(State::FramePayloadLen, 1, true)?; 326 327 Ok(()) 328 } 329 330 /// Sets the frame's payload length and transitions to the next state. set_frame_payload_len(&mut self, len: u64) -> Result<()>331 pub fn set_frame_payload_len(&mut self, len: u64) -> Result<()> { 332 assert_eq!(self.state, State::FramePayloadLen); 333 334 // Only expect frames on Control, Request and Push streams. 335 if self.ty == Some(Type::Control) || 336 self.ty == Some(Type::Request) || 337 self.ty == Some(Type::Push) 338 { 339 let (state, resize) = match self.frame_type { 340 Some(frame::DATA_FRAME_TYPE_ID) => (State::Data, false), 341 342 _ => (State::FramePayload, true), 343 }; 344 345 self.state_transition(state, len as usize, resize)?; 346 347 return Ok(()); 348 } 349 350 Err(Error::InternalError) 351 } 352 353 /// Tries to fill the state buffer by reading data from the corresponding 354 /// transport stream. 355 /// 356 /// When not enough data can be read to complete the state, this returns 357 /// `Error::Done`. try_fill_buffer( &mut self, conn: &mut crate::Connection, ) -> Result<()>358 pub fn try_fill_buffer( 359 &mut self, conn: &mut crate::Connection, 360 ) -> Result<()> { 361 let buf = &mut self.state_buf[self.state_off..self.state_len]; 362 363 let read = match conn.stream_recv(self.id, buf) { 364 Ok((len, _)) => len, 365 366 Err(e) => { 367 // The stream is not readable anymore, so re-arm the Data event. 368 if e == crate::Error::Done { 369 self.reset_data_event(); 370 } 371 372 return Err(e.into()); 373 }, 374 }; 375 376 trace!( 377 "{} read {} bytes on stream {}", 378 conn.trace_id(), 379 read, 380 self.id, 381 ); 382 383 self.state_off += read; 384 385 if !self.state_buffer_complete() { 386 self.reset_data_event(); 387 388 return Err(Error::Done); 389 } 390 391 Ok(()) 392 } 393 394 /// Initialize the local part of the stream. initialize_local(&mut self)395 pub fn initialize_local(&mut self) { 396 self.local_initialized = true 397 } 398 399 /// Whether the stream has been locally initialized. local_initialized(&self) -> bool400 pub fn local_initialized(&self) -> bool { 401 self.local_initialized 402 } 403 404 /// Tries to fill the state buffer by reading data from the given cursor. 405 /// 406 /// This is intended to replace `try_fill_buffer()` in tests, in order to 407 /// avoid having to setup a transport connection. 408 #[cfg(test)] try_fill_buffer_for_tests( &mut self, stream: &mut std::io::Cursor<Vec<u8>>, ) -> Result<()>409 fn try_fill_buffer_for_tests( 410 &mut self, stream: &mut std::io::Cursor<Vec<u8>>, 411 ) -> Result<()> { 412 let buf = &mut self.state_buf[self.state_off..self.state_len]; 413 414 let read = std::io::Read::read(stream, buf).unwrap(); 415 416 self.state_off += read; 417 418 if !self.state_buffer_complete() { 419 return Err(Error::Done); 420 } 421 422 Ok(()) 423 } 424 425 /// Tries to parse a varint (including length) from the state buffer. try_consume_varint(&mut self) -> Result<u64>426 pub fn try_consume_varint(&mut self) -> Result<u64> { 427 if self.state_off == 1 { 428 self.state_len = octets::varint_parse_len(self.state_buf[0]); 429 self.state_buf.resize(self.state_len, 0); 430 } 431 432 // Return early if we don't have enough data in the state buffer to 433 // parse the whole varint. 434 if !self.state_buffer_complete() { 435 return Err(Error::Done); 436 } 437 438 let varint = octets::Octets::with_slice(&self.state_buf).get_varint()?; 439 440 Ok(varint) 441 } 442 443 /// Tries to parse a frame from the state buffer. try_consume_frame(&mut self) -> Result<frame::Frame>444 pub fn try_consume_frame(&mut self) -> Result<frame::Frame> { 445 // Processing a frame other than DATA, so re-arm the Data event. 446 self.reset_data_event(); 447 448 // TODO: properly propagate frame parsing errors. 449 let frame = frame::Frame::from_bytes( 450 self.frame_type.unwrap(), 451 self.state_len as u64, 452 &self.state_buf, 453 )?; 454 455 self.state_transition(State::FrameType, 1, true)?; 456 457 Ok(frame) 458 } 459 460 /// Tries to read DATA payload from the transport stream. try_consume_data( &mut self, conn: &mut crate::Connection, out: &mut [u8], ) -> Result<(usize, bool)>461 pub fn try_consume_data( 462 &mut self, conn: &mut crate::Connection, out: &mut [u8], 463 ) -> Result<(usize, bool)> { 464 let left = std::cmp::min(out.len(), self.state_len - self.state_off); 465 466 let (len, fin) = match conn.stream_recv(self.id, &mut out[..left]) { 467 Ok(v) => v, 468 469 Err(e) => { 470 // The stream is not readable anymore, so re-arm the Data event. 471 if e == crate::Error::Done { 472 self.reset_data_event(); 473 } 474 475 return Err(e.into()); 476 }, 477 }; 478 479 self.state_off += len; 480 481 // The stream is not readable anymore, so re-arm the Data event. 482 if !conn.stream_readable(self.id) { 483 self.reset_data_event(); 484 } 485 486 if self.state_buffer_complete() { 487 self.state_transition(State::FrameType, 1, true)?; 488 } 489 490 Ok((len, fin)) 491 } 492 493 /// Marks the stream as finished. finished(&mut self)494 pub fn finished(&mut self) { 495 let _ = self.state_transition(State::Finished, 0, false); 496 } 497 498 /// Tries to read DATA payload from the given cursor. 499 /// 500 /// This is intended to replace `try_consume_data()` in tests, in order to 501 /// avoid having to setup a transport connection. 502 #[cfg(test)] try_consume_data_for_tests( &mut self, stream: &mut std::io::Cursor<Vec<u8>>, out: &mut [u8], ) -> Result<usize>503 fn try_consume_data_for_tests( 504 &mut self, stream: &mut std::io::Cursor<Vec<u8>>, out: &mut [u8], 505 ) -> Result<usize> { 506 let left = std::cmp::min(out.len(), self.state_len - self.state_off); 507 508 let len = std::io::Read::read(stream, &mut out[..left]).unwrap(); 509 510 self.state_off += len; 511 512 if self.state_buffer_complete() { 513 self.state_transition(State::FrameType, 1, true)?; 514 } 515 516 Ok(len) 517 } 518 519 /// Tries to update the data triggered state for the stream. 520 /// 521 /// This returns `true` if a Data event was not already triggered before 522 /// the last reset, and updates the state. Returns `false` otherwise. try_trigger_data_event(&mut self) -> bool523 pub fn try_trigger_data_event(&mut self) -> bool { 524 if self.data_event_triggered { 525 return false; 526 } 527 528 self.data_event_triggered = true; 529 530 true 531 } 532 533 /// Resets the data triggered state. reset_data_event(&mut self)534 fn reset_data_event(&mut self) { 535 self.data_event_triggered = false; 536 } 537 538 /// Returns true if the state buffer has enough data to complete the state. state_buffer_complete(&self) -> bool539 fn state_buffer_complete(&self) -> bool { 540 self.state_off == self.state_len 541 } 542 543 /// Transitions the stream to a new state, and optionally resets the state 544 /// buffer. state_transition( &mut self, new_state: State, expected_len: usize, resize: bool, ) -> Result<()>545 fn state_transition( 546 &mut self, new_state: State, expected_len: usize, resize: bool, 547 ) -> Result<()> { 548 self.state = new_state; 549 self.state_off = 0; 550 self.state_len = expected_len; 551 552 // Some states don't need the state buffer, so don't resize it if not 553 // necessary. 554 if resize { 555 // A peer can influence the size of the state buffer (e.g. with the 556 // payload size of a GREASE frame), so we need to limit the maximum 557 // size to avoid DoS. 558 if self.state_len > MAX_STATE_BUF_SIZE { 559 return Err(Error::InternalError); 560 } 561 562 self.state_buf.resize(self.state_len, 0); 563 } 564 565 Ok(()) 566 } 567 } 568 569 #[cfg(test)] 570 mod tests { 571 use super::*; 572 573 #[test] 574 /// Process incoming SETTINGS frame on control stream. control_good()575 fn control_good() { 576 let mut stream = Stream::new(3, false); 577 assert_eq!(stream.state, State::StreamType); 578 579 let mut d = vec![42; 40]; 580 let mut b = octets::OctetsMut::with_slice(&mut d); 581 582 let frame = frame::Frame::Settings { 583 max_header_list_size: Some(0), 584 qpack_max_table_capacity: Some(0), 585 qpack_blocked_streams: Some(0), 586 h3_datagram: None, 587 grease: None, 588 }; 589 590 b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(); 591 frame.to_bytes(&mut b).unwrap(); 592 593 let mut cursor = std::io::Cursor::new(d); 594 595 // Parse stream type. 596 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 597 598 let stream_ty = stream.try_consume_varint().unwrap(); 599 assert_eq!(stream_ty, HTTP3_CONTROL_STREAM_TYPE_ID); 600 stream 601 .set_ty(Type::deserialize(stream_ty).unwrap()) 602 .unwrap(); 603 assert_eq!(stream.state, State::FrameType); 604 605 // Parse the SETTINGS frame type. 606 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 607 608 let frame_ty = stream.try_consume_varint().unwrap(); 609 assert_eq!(frame_ty, frame::SETTINGS_FRAME_TYPE_ID); 610 611 stream.set_frame_type(frame_ty).unwrap(); 612 assert_eq!(stream.state, State::FramePayloadLen); 613 614 // Parse the SETTINGS frame payload length. 615 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 616 617 let frame_payload_len = stream.try_consume_varint().unwrap(); 618 assert_eq!(frame_payload_len, 6); 619 stream.set_frame_payload_len(frame_payload_len).unwrap(); 620 assert_eq!(stream.state, State::FramePayload); 621 622 // Parse the SETTINGS frame payload. 623 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 624 625 assert_eq!(stream.try_consume_frame(), Ok(frame)); 626 assert_eq!(stream.state, State::FrameType); 627 } 628 629 #[test] 630 /// Process duplicate SETTINGS frame on control stream. control_bad_multiple_settings()631 fn control_bad_multiple_settings() { 632 let mut stream = Stream::new(3, false); 633 assert_eq!(stream.state, State::StreamType); 634 635 let mut d = vec![42; 40]; 636 let mut b = octets::OctetsMut::with_slice(&mut d); 637 638 let frame = frame::Frame::Settings { 639 max_header_list_size: Some(0), 640 qpack_max_table_capacity: Some(0), 641 qpack_blocked_streams: Some(0), 642 h3_datagram: None, 643 grease: None, 644 }; 645 646 b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(); 647 frame.to_bytes(&mut b).unwrap(); 648 frame.to_bytes(&mut b).unwrap(); 649 650 let mut cursor = std::io::Cursor::new(d); 651 652 // Parse stream type. 653 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 654 655 let stream_ty = stream.try_consume_varint().unwrap(); 656 assert_eq!(stream_ty, HTTP3_CONTROL_STREAM_TYPE_ID); 657 stream 658 .set_ty(Type::deserialize(stream_ty).unwrap()) 659 .unwrap(); 660 assert_eq!(stream.state, State::FrameType); 661 662 // Parse the SETTINGS frame type. 663 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 664 665 let frame_ty = stream.try_consume_varint().unwrap(); 666 assert_eq!(frame_ty, frame::SETTINGS_FRAME_TYPE_ID); 667 668 stream.set_frame_type(frame_ty).unwrap(); 669 assert_eq!(stream.state, State::FramePayloadLen); 670 671 // Parse the SETTINGS frame payload length. 672 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 673 674 let frame_payload_len = stream.try_consume_varint().unwrap(); 675 assert_eq!(frame_payload_len, 6); 676 stream.set_frame_payload_len(frame_payload_len).unwrap(); 677 assert_eq!(stream.state, State::FramePayload); 678 679 // Parse the SETTINGS frame payload. 680 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 681 682 assert_eq!(stream.try_consume_frame(), Ok(frame)); 683 assert_eq!(stream.state, State::FrameType); 684 685 // Parse the second SETTINGS frame type. 686 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 687 688 let frame_ty = stream.try_consume_varint().unwrap(); 689 assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected)); 690 } 691 692 #[test] 693 /// Process other frame before SETTINGS frame on control stream. control_bad_late_settings()694 fn control_bad_late_settings() { 695 let mut stream = Stream::new(3, false); 696 assert_eq!(stream.state, State::StreamType); 697 698 let mut d = vec![42; 40]; 699 let mut b = octets::OctetsMut::with_slice(&mut d); 700 701 let goaway = frame::Frame::GoAway { id: 0 }; 702 703 let settings = frame::Frame::Settings { 704 max_header_list_size: Some(0), 705 qpack_max_table_capacity: Some(0), 706 qpack_blocked_streams: Some(0), 707 h3_datagram: None, 708 grease: None, 709 }; 710 711 b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(); 712 goaway.to_bytes(&mut b).unwrap(); 713 settings.to_bytes(&mut b).unwrap(); 714 715 let mut cursor = std::io::Cursor::new(d); 716 717 // Parse stream type. 718 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 719 720 let stream_ty = stream.try_consume_varint().unwrap(); 721 assert_eq!(stream_ty, HTTP3_CONTROL_STREAM_TYPE_ID); 722 stream 723 .set_ty(Type::deserialize(stream_ty).unwrap()) 724 .unwrap(); 725 assert_eq!(stream.state, State::FrameType); 726 727 // Parse GOAWAY. 728 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 729 730 let frame_ty = stream.try_consume_varint().unwrap(); 731 assert_eq!(stream.set_frame_type(frame_ty), Err(Error::MissingSettings)); 732 } 733 734 #[test] 735 /// Process not-allowed frame on control stream. control_bad_frame()736 fn control_bad_frame() { 737 let mut stream = Stream::new(3, false); 738 assert_eq!(stream.state, State::StreamType); 739 740 let mut d = vec![42; 40]; 741 let mut b = octets::OctetsMut::with_slice(&mut d); 742 743 let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; 744 let hdrs = frame::Frame::Headers { header_block }; 745 746 let settings = frame::Frame::Settings { 747 max_header_list_size: Some(0), 748 qpack_max_table_capacity: Some(0), 749 qpack_blocked_streams: Some(0), 750 h3_datagram: None, 751 grease: None, 752 }; 753 754 b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(); 755 settings.to_bytes(&mut b).unwrap(); 756 hdrs.to_bytes(&mut b).unwrap(); 757 758 let mut cursor = std::io::Cursor::new(d); 759 760 // Parse stream type. 761 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 762 763 let stream_ty = stream.try_consume_varint().unwrap(); 764 stream 765 .set_ty(Type::deserialize(stream_ty).unwrap()) 766 .unwrap(); 767 768 // Parse first SETTINGS frame. 769 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 770 771 let frame_ty = stream.try_consume_varint().unwrap(); 772 stream.set_frame_type(frame_ty).unwrap(); 773 774 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 775 776 let frame_payload_len = stream.try_consume_varint().unwrap(); 777 stream.set_frame_payload_len(frame_payload_len).unwrap(); 778 779 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 780 781 assert!(stream.try_consume_frame().is_ok()); 782 783 // Parse HEADERS. 784 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 785 786 let frame_ty = stream.try_consume_varint().unwrap(); 787 assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected)); 788 } 789 790 #[test] request_no_data()791 fn request_no_data() { 792 let mut stream = Stream::new(0, false); 793 794 assert_eq!(stream.ty, Some(Type::Request)); 795 assert_eq!(stream.state, State::FrameType); 796 797 assert_eq!(stream.try_consume_varint(), Err(Error::Done)); 798 } 799 800 #[test] request_good()801 fn request_good() { 802 let mut stream = Stream::new(0, false); 803 804 let mut d = vec![42; 128]; 805 let mut b = octets::OctetsMut::with_slice(&mut d); 806 807 let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; 808 let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; 809 let hdrs = frame::Frame::Headers { header_block }; 810 let data = frame::Frame::Data { 811 payload: payload.clone(), 812 }; 813 814 hdrs.to_bytes(&mut b).unwrap(); 815 data.to_bytes(&mut b).unwrap(); 816 817 let mut cursor = std::io::Cursor::new(d); 818 819 // Parse the HEADERS frame type. 820 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 821 822 let frame_ty = stream.try_consume_varint().unwrap(); 823 assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID); 824 825 stream.set_frame_type(frame_ty).unwrap(); 826 assert_eq!(stream.state, State::FramePayloadLen); 827 828 // Parse the HEADERS frame payload length. 829 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 830 831 let frame_payload_len = stream.try_consume_varint().unwrap(); 832 assert_eq!(frame_payload_len, 12); 833 834 stream.set_frame_payload_len(frame_payload_len).unwrap(); 835 assert_eq!(stream.state, State::FramePayload); 836 837 // Parse the HEADERS frame. 838 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 839 840 assert_eq!(stream.try_consume_frame(), Ok(hdrs)); 841 assert_eq!(stream.state, State::FrameType); 842 843 // Parse the DATA frame type. 844 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 845 846 let frame_ty = stream.try_consume_varint().unwrap(); 847 assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID); 848 849 stream.set_frame_type(frame_ty).unwrap(); 850 assert_eq!(stream.state, State::FramePayloadLen); 851 852 // Parse the DATA frame payload length. 853 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 854 855 let frame_payload_len = stream.try_consume_varint().unwrap(); 856 assert_eq!(frame_payload_len, 12); 857 858 stream.set_frame_payload_len(frame_payload_len).unwrap(); 859 assert_eq!(stream.state, State::Data); 860 861 // Parse the DATA payload. 862 let mut recv_buf = vec![0; payload.len()]; 863 assert_eq!( 864 stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf), 865 Ok(payload.len()) 866 ); 867 assert_eq!(payload, recv_buf); 868 869 assert_eq!(stream.state, State::FrameType); 870 } 871 872 #[test] push_good()873 fn push_good() { 874 let mut stream = Stream::new(2, false); 875 876 let mut d = vec![42; 128]; 877 let mut b = octets::OctetsMut::with_slice(&mut d); 878 879 let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; 880 let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; 881 let hdrs = frame::Frame::Headers { header_block }; 882 let data = frame::Frame::Data { 883 payload: payload.clone(), 884 }; 885 886 b.put_varint(HTTP3_PUSH_STREAM_TYPE_ID).unwrap(); 887 b.put_varint(1).unwrap(); 888 hdrs.to_bytes(&mut b).unwrap(); 889 data.to_bytes(&mut b).unwrap(); 890 891 let mut cursor = std::io::Cursor::new(d); 892 893 // Parse stream type. 894 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 895 896 let stream_ty = stream.try_consume_varint().unwrap(); 897 assert_eq!(stream_ty, HTTP3_PUSH_STREAM_TYPE_ID); 898 stream 899 .set_ty(Type::deserialize(stream_ty).unwrap()) 900 .unwrap(); 901 assert_eq!(stream.state, State::PushId); 902 903 // Parse push ID. 904 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 905 906 let push_id = stream.try_consume_varint().unwrap(); 907 assert_eq!(push_id, 1); 908 909 stream.set_push_id(push_id).unwrap(); 910 assert_eq!(stream.state, State::FrameType); 911 912 // Parse the HEADERS frame type. 913 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 914 915 let frame_ty = stream.try_consume_varint().unwrap(); 916 assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID); 917 918 stream.set_frame_type(frame_ty).unwrap(); 919 assert_eq!(stream.state, State::FramePayloadLen); 920 921 // Parse the HEADERS frame payload length. 922 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 923 924 let frame_payload_len = stream.try_consume_varint().unwrap(); 925 assert_eq!(frame_payload_len, 12); 926 927 stream.set_frame_payload_len(frame_payload_len).unwrap(); 928 assert_eq!(stream.state, State::FramePayload); 929 930 // Parse the HEADERS frame. 931 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 932 933 assert_eq!(stream.try_consume_frame(), Ok(hdrs)); 934 assert_eq!(stream.state, State::FrameType); 935 936 // Parse the DATA frame type. 937 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 938 939 let frame_ty = stream.try_consume_varint().unwrap(); 940 assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID); 941 942 stream.set_frame_type(frame_ty).unwrap(); 943 assert_eq!(stream.state, State::FramePayloadLen); 944 945 // Parse the DATA frame payload length. 946 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 947 948 let frame_payload_len = stream.try_consume_varint().unwrap(); 949 assert_eq!(frame_payload_len, 12); 950 951 stream.set_frame_payload_len(frame_payload_len).unwrap(); 952 assert_eq!(stream.state, State::Data); 953 954 // Parse the DATA payload. 955 let mut recv_buf = vec![0; payload.len()]; 956 assert_eq!( 957 stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf), 958 Ok(payload.len()) 959 ); 960 assert_eq!(payload, recv_buf); 961 962 assert_eq!(stream.state, State::FrameType); 963 } 964 965 #[test] grease()966 fn grease() { 967 let mut stream = Stream::new(2, false); 968 969 let mut d = vec![42; 20]; 970 let mut b = octets::OctetsMut::with_slice(&mut d); 971 972 b.put_varint(33).unwrap(); 973 974 let mut cursor = std::io::Cursor::new(d); 975 976 // Parse stream type. 977 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 978 979 let stream_ty = stream.try_consume_varint().unwrap(); 980 assert_eq!(stream_ty, 33); 981 stream 982 .set_ty(Type::deserialize(stream_ty).unwrap()) 983 .unwrap(); 984 assert_eq!(stream.state, State::Drain); 985 } 986 987 #[test] data_before_headers()988 fn data_before_headers() { 989 let mut stream = Stream::new(0, false); 990 991 let mut d = vec![42; 128]; 992 let mut b = octets::OctetsMut::with_slice(&mut d); 993 994 let data = frame::Frame::Data { 995 payload: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], 996 }; 997 998 data.to_bytes(&mut b).unwrap(); 999 1000 let mut cursor = std::io::Cursor::new(d); 1001 1002 // Parse the DATA frame type. 1003 stream.try_fill_buffer_for_tests(&mut cursor).unwrap(); 1004 1005 let frame_ty = stream.try_consume_varint().unwrap(); 1006 assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID); 1007 1008 assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected)); 1009 } 1010 } 1011