1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 #![allow(clippy::module_name_repetitions)] 8 9 use crate::control_stream_local::ControlStreamLocal; 10 use crate::control_stream_remote::ControlStreamRemote; 11 use crate::features::extended_connect::{ 12 webtransport_session::WebTransportSession, 13 webtransport_streams::{WebTransportRecvStream, WebTransportSendStream}, 14 ExtendedConnectEvents, ExtendedConnectFeature, ExtendedConnectType, 15 }; 16 use crate::frames::HFrame; 17 use crate::push_controller::PushController; 18 use crate::qpack_decoder_receiver::DecoderRecvStream; 19 use crate::qpack_encoder_receiver::EncoderRecvStream; 20 use crate::recv_message::{RecvMessage, RecvMessageInfo}; 21 use crate::request_target::{AsRequestTarget, RequestTarget}; 22 use crate::send_message::SendMessage; 23 use crate::settings::{HSettingType, HSettings, HttpZeroRttChecker}; 24 use crate::stream_type_reader::NewStreamHeadReader; 25 use crate::{ 26 client_events::Http3ClientEvents, CloseType, Http3Parameters, Http3StreamType, 27 HttpRecvStreamEvents, NewStreamType, Priority, PriorityHandler, ReceiveOutput, RecvStream, 28 RecvStreamEvents, SendStream, SendStreamEvents, 29 }; 30 use neqo_common::{qdebug, qerror, qinfo, qtrace, qwarn, Header, MessageType, Role}; 31 use neqo_qpack::decoder::QPackDecoder; 32 use neqo_qpack::encoder::QPackEncoder; 33 use neqo_transport::{ 34 AppError, Connection, ConnectionError, State, StreamId, StreamType, ZeroRttState, 35 }; 36 use std::cell::RefCell; 37 use std::collections::{BTreeSet, HashMap}; 38 use std::fmt::Debug; 39 use std::mem; 40 use std::rc::Rc; 41 42 use crate::{Error, Res}; 43 44 pub struct RequestDescription<'b, 't, T> 45 where 46 T: AsRequestTarget<'t> + ?Sized + Debug, 47 { 48 pub method: &'b str, 49 pub connect_type: Option<ExtendedConnectType>, 50 pub target: &'t T, 51 pub headers: &'b [Header], 52 pub priority: Priority, 53 } 54 55 #[derive(Debug)] 56 enum Http3RemoteSettingsState { 57 NotReceived, 58 Received(HSettings), 59 ZeroRtt(HSettings), 60 } 61 62 #[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Clone)] 63 pub enum Http3State { 64 Initializing, 65 ZeroRtt, 66 Connected, 67 GoingAway(StreamId), 68 Closing(ConnectionError), 69 Closed(ConnectionError), 70 } 71 72 impl Http3State { 73 #[must_use] active(&self) -> bool74 pub fn active(&self) -> bool { 75 matches!( 76 self, 77 Http3State::Connected | Http3State::GoingAway(_) | Http3State::ZeroRtt 78 ) 79 } 80 } 81 82 #[derive(Debug)] 83 pub(crate) struct Http3Connection { 84 role: Role, 85 pub state: Http3State, 86 local_params: Http3Parameters, 87 control_stream_local: ControlStreamLocal, 88 pub qpack_encoder: Rc<RefCell<QPackEncoder>>, 89 pub qpack_decoder: Rc<RefCell<QPackDecoder>>, 90 settings_state: Http3RemoteSettingsState, 91 streams_with_pending_data: BTreeSet<StreamId>, 92 pub send_streams: HashMap<StreamId, Box<dyn SendStream>>, 93 pub recv_streams: HashMap<StreamId, Box<dyn RecvStream>>, 94 webtransport: ExtendedConnectFeature, 95 } 96 97 impl ::std::fmt::Display for Http3Connection { fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result98 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { 99 write!(f, "Http3 connection") 100 } 101 } 102 103 impl Http3Connection { 104 /// Create a new connection. new(conn_params: Http3Parameters, role: Role) -> Self105 pub fn new(conn_params: Http3Parameters, role: Role) -> Self { 106 Self { 107 state: Http3State::Initializing, 108 control_stream_local: ControlStreamLocal::new(), 109 qpack_encoder: Rc::new(RefCell::new(QPackEncoder::new( 110 conn_params.get_qpack_settings(), 111 true, 112 ))), 113 qpack_decoder: Rc::new(RefCell::new(QPackDecoder::new( 114 conn_params.get_qpack_settings(), 115 ))), 116 webtransport: ExtendedConnectFeature::new( 117 ExtendedConnectType::WebTransport, 118 conn_params.get_webtransport(), 119 ), 120 local_params: conn_params, 121 settings_state: Http3RemoteSettingsState::NotReceived, 122 streams_with_pending_data: BTreeSet::new(), 123 send_streams: HashMap::new(), 124 recv_streams: HashMap::new(), 125 role, 126 } 127 } 128 set_features_listener(&mut self, feature_listener: Http3ClientEvents)129 pub fn set_features_listener(&mut self, feature_listener: Http3ClientEvents) { 130 self.webtransport.set_listener(feature_listener); 131 } 132 initialize_http3_connection(&mut self, conn: &mut Connection) -> Res<()>133 fn initialize_http3_connection(&mut self, conn: &mut Connection) -> Res<()> { 134 qinfo!([self], "Initialize the http3 connection."); 135 self.control_stream_local.create(conn)?; 136 137 self.send_settings(); 138 self.create_qpack_streams(conn)?; 139 Ok(()) 140 } 141 send_settings(&mut self)142 fn send_settings(&mut self) { 143 qdebug!([self], "Send settings."); 144 self.control_stream_local.queue_frame(&HFrame::Settings { 145 settings: HSettings::from(&self.local_params), 146 }); 147 self.control_stream_local.queue_frame(&HFrame::Grease); 148 } 149 150 /// Save settings for adding to the session ticket. save_settings(&self) -> Vec<u8>151 pub(crate) fn save_settings(&self) -> Vec<u8> { 152 HttpZeroRttChecker::save(&self.local_params) 153 } 154 create_qpack_streams(&mut self, conn: &mut Connection) -> Res<()>155 fn create_qpack_streams(&mut self, conn: &mut Connection) -> Res<()> { 156 qdebug!([self], "create_qpack_streams."); 157 self.qpack_encoder 158 .borrow_mut() 159 .add_send_stream(conn.stream_create(StreamType::UniDi)?); 160 self.qpack_decoder 161 .borrow_mut() 162 .add_send_stream(conn.stream_create(StreamType::UniDi)?); 163 Ok(()) 164 } 165 166 /// Inform a `HttpConnection` that a stream has data to send and that `send` should be called for the stream. stream_has_pending_data(&mut self, stream_id: StreamId)167 pub fn stream_has_pending_data(&mut self, stream_id: StreamId) { 168 self.streams_with_pending_data.insert(stream_id); 169 } 170 171 /// Return true if there is a stream that needs to send data. has_data_to_send(&self) -> bool172 pub fn has_data_to_send(&self) -> bool { 173 !self.streams_with_pending_data.is_empty() 174 } 175 send_non_control_streams(&mut self, conn: &mut Connection) -> Res<()>176 fn send_non_control_streams(&mut self, conn: &mut Connection) -> Res<()> { 177 let to_send = mem::take(&mut self.streams_with_pending_data); 178 for stream_id in to_send { 179 let done = if let Some(s) = &mut self.send_streams.get_mut(&stream_id) { 180 s.send(conn)?; 181 if s.has_data_to_send() { 182 self.streams_with_pending_data.insert(stream_id); 183 } 184 s.done() 185 } else { 186 false 187 }; 188 if done { 189 self.remove_send_stream(stream_id, conn); 190 } 191 } 192 Ok(()) 193 } 194 195 /// Call `send` for all streams that need to send data. process_sending(&mut self, conn: &mut Connection) -> Res<()>196 pub fn process_sending(&mut self, conn: &mut Connection) -> Res<()> { 197 // check if control stream has data to send. 198 self.control_stream_local 199 .send(conn, &mut self.recv_streams)?; 200 201 self.send_non_control_streams(conn)?; 202 203 self.qpack_decoder.borrow_mut().send(conn)?; 204 match self.qpack_encoder.borrow_mut().send_encoder_updates(conn) { 205 Ok(()) 206 | Err(neqo_qpack::Error::EncoderStreamBlocked) 207 | Err(neqo_qpack::Error::DynamicTableFull) => {} 208 Err(e) => return Err(Error::QpackError(e)), 209 } 210 Ok(()) 211 } 212 213 /// We have a resumption token which remembers previous settings. Update the setting. set_0rtt_settings(&mut self, conn: &mut Connection, settings: HSettings) -> Res<()>214 pub fn set_0rtt_settings(&mut self, conn: &mut Connection, settings: HSettings) -> Res<()> { 215 self.initialize_http3_connection(conn)?; 216 self.set_qpack_settings(&settings)?; 217 self.settings_state = Http3RemoteSettingsState::ZeroRtt(settings); 218 self.state = Http3State::ZeroRtt; 219 Ok(()) 220 } 221 222 /// Returns the settings for a connection. This is used for creating a resumption token. get_settings(&self) -> Option<HSettings>223 pub fn get_settings(&self) -> Option<HSettings> { 224 if let Http3RemoteSettingsState::Received(settings) = &self.settings_state { 225 Some(settings.clone()) 226 } else { 227 None 228 } 229 } 230 add_new_stream(&mut self, stream_id: StreamId)231 pub fn add_new_stream(&mut self, stream_id: StreamId) { 232 qtrace!([self], "A new stream: {}.", stream_id); 233 self.recv_streams.insert( 234 stream_id, 235 Box::new(NewStreamHeadReader::new(stream_id, self.role)), 236 ); 237 } 238 239 #[allow(clippy::option_if_let_else)] // False positive as borrow scope isn't lexical here. stream_receive(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<ReceiveOutput>240 fn stream_receive(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<ReceiveOutput> { 241 qtrace!([self], "Readable stream {}.", stream_id); 242 243 if let Some(recv_stream) = self.recv_streams.get_mut(&stream_id) { 244 let res = recv_stream.receive(conn); 245 return self 246 .handle_stream_manipulation_output(res, stream_id, conn) 247 .map(|(output, _)| output); 248 } 249 Ok(ReceiveOutput::NoOutput) 250 } 251 handle_unblocked_streams( &mut self, unblocked_streams: Vec<StreamId>, conn: &mut Connection, ) -> Res<()>252 fn handle_unblocked_streams( 253 &mut self, 254 unblocked_streams: Vec<StreamId>, 255 conn: &mut Connection, 256 ) -> Res<()> { 257 for stream_id in unblocked_streams { 258 qdebug!([self], "Stream {} is unblocked", stream_id); 259 if let Some(r) = self.recv_streams.get_mut(&stream_id) { 260 let res = r 261 .http_stream() 262 .ok_or(Error::HttpInternal(10))? 263 .header_unblocked(conn); 264 let res = self.handle_stream_manipulation_output(res, stream_id, conn)?; 265 debug_assert!(matches!(res, (ReceiveOutput::NoOutput, _))); 266 } 267 } 268 Ok(()) 269 } 270 271 /// This function handles reading from all streams, i.e. control, qpack, request/response 272 /// stream and unidi stream that are still do not have a type. 273 /// The function cannot handle: 274 /// 1) a Push stream (if an unknown unidi stream is decoded to be a push stream) 275 /// 2) frames `MaxPushId` or `Goaway` must be handled by `Http3Client`/`Server`. 276 /// The function returns `ReceiveOutput`. handle_stream_readable( &mut self, conn: &mut Connection, stream_id: StreamId, ) -> Res<ReceiveOutput>277 pub fn handle_stream_readable( 278 &mut self, 279 conn: &mut Connection, 280 stream_id: StreamId, 281 ) -> Res<ReceiveOutput> { 282 let mut output = self.stream_receive(conn, stream_id)?; 283 284 if let ReceiveOutput::NewStream(stream_type) = output { 285 output = self.handle_new_stream(conn, stream_type, stream_id)?; 286 } 287 288 #[allow(clippy::match_same_arms)] // clippy is being stupid here 289 match output { 290 ReceiveOutput::UnblockedStreams(unblocked_streams) => { 291 self.handle_unblocked_streams(unblocked_streams, conn)?; 292 Ok(ReceiveOutput::NoOutput) 293 } 294 ReceiveOutput::ControlFrames(mut control_frames) => { 295 let mut rest = Vec::new(); 296 for cf in control_frames.drain(..) { 297 if let Some(not_handled) = self.handle_control_frame(cf)? { 298 rest.push(not_handled); 299 } 300 } 301 Ok(ReceiveOutput::ControlFrames(rest)) 302 } 303 ReceiveOutput::NewStream(NewStreamType::Push(_)) 304 | ReceiveOutput::NewStream(NewStreamType::Http) 305 | ReceiveOutput::NewStream(NewStreamType::WebTransportStream(_)) => Ok(output), 306 ReceiveOutput::NewStream(_) => { 307 unreachable!("NewStream should have been handled already") 308 } 309 _ => Ok(output), 310 } 311 } 312 313 /// This is called when a RESET frame has been received. handle_stream_reset( &mut self, stream_id: StreamId, app_error: AppError, conn: &mut Connection, ) -> Res<()>314 pub fn handle_stream_reset( 315 &mut self, 316 stream_id: StreamId, 317 app_error: AppError, 318 conn: &mut Connection, 319 ) -> Res<()> { 320 qinfo!( 321 [self], 322 "Handle a stream reset stream_id={} app_err={}", 323 stream_id, 324 app_error 325 ); 326 327 self.close_recv(stream_id, CloseType::ResetRemote(app_error), conn) 328 } 329 handle_stream_stop_sending( &mut self, stream_id: StreamId, app_error: AppError, conn: &mut Connection, ) -> Res<()>330 pub fn handle_stream_stop_sending( 331 &mut self, 332 stream_id: StreamId, 333 app_error: AppError, 334 conn: &mut Connection, 335 ) -> Res<()> { 336 qinfo!( 337 [self], 338 "Handle stream_stop_sending stream_id={} app_err={}", 339 stream_id, 340 app_error 341 ); 342 343 if self.send_stream_is_critical(stream_id) { 344 return Err(Error::HttpClosedCriticalStream); 345 } 346 347 self.close_send(stream_id, CloseType::ResetRemote(app_error), conn); 348 Ok(()) 349 } 350 351 /// This is called when `neqo_transport::Connection` state has been change to take proper actions in 352 /// the HTTP3 layer. handle_state_change(&mut self, conn: &mut Connection, state: &State) -> Res<bool>353 pub fn handle_state_change(&mut self, conn: &mut Connection, state: &State) -> Res<bool> { 354 qdebug!([self], "Handle state change {:?}", state); 355 match state { 356 State::Handshaking => { 357 if self.role == Role::Server 358 && conn.zero_rtt_state() == &ZeroRttState::AcceptedServer 359 { 360 self.state = Http3State::ZeroRtt; 361 self.initialize_http3_connection(conn)?; 362 Ok(true) 363 } else { 364 Ok(false) 365 } 366 } 367 State::Connected => { 368 debug_assert!(matches!( 369 self.state, 370 Http3State::Initializing | Http3State::ZeroRtt 371 )); 372 if self.state == Http3State::Initializing { 373 self.initialize_http3_connection(conn)?; 374 } 375 self.state = Http3State::Connected; 376 Ok(true) 377 } 378 State::Closing { error, .. } | State::Draining { error, .. } => { 379 if matches!(self.state, Http3State::Closing(_) | Http3State::Closed(_)) { 380 Ok(false) 381 } else { 382 self.state = Http3State::Closing(error.clone()); 383 Ok(true) 384 } 385 } 386 State::Closed(error) => { 387 if matches!(self.state, Http3State::Closed(_)) { 388 Ok(false) 389 } else { 390 self.state = Http3State::Closed(error.clone()); 391 Ok(true) 392 } 393 } 394 _ => Ok(false), 395 } 396 } 397 398 /// This is called when 0RTT has been reseted to clear `send_streams`, `recv_streams` and settings. handle_zero_rtt_rejected(&mut self) -> Res<()>399 pub fn handle_zero_rtt_rejected(&mut self) -> Res<()> { 400 if self.state == Http3State::ZeroRtt { 401 self.state = Http3State::Initializing; 402 self.control_stream_local = ControlStreamLocal::new(); 403 self.qpack_encoder = Rc::new(RefCell::new(QPackEncoder::new( 404 self.local_params.get_qpack_settings(), 405 true, 406 ))); 407 self.qpack_decoder = Rc::new(RefCell::new(QPackDecoder::new( 408 self.local_params.get_qpack_settings(), 409 ))); 410 self.settings_state = Http3RemoteSettingsState::NotReceived; 411 self.streams_with_pending_data.clear(); 412 // TODO: investigate whether this code can automatically retry failed transactions. 413 self.send_streams.clear(); 414 self.recv_streams.clear(); 415 Ok(()) 416 } else { 417 debug_assert!(false, "Zero rtt rejected in the wrong state."); 418 Err(Error::HttpInternal(3)) 419 } 420 } 421 check_stream_exists(&self, stream_type: Http3StreamType) -> Res<()>422 fn check_stream_exists(&self, stream_type: Http3StreamType) -> Res<()> { 423 if self 424 .recv_streams 425 .values() 426 .any(|c| c.stream_type() == stream_type) 427 { 428 Err(Error::HttpStreamCreation) 429 } else { 430 Ok(()) 431 } 432 } 433 434 /// If the new stream is a control stream, this function creates a proper handler 435 /// and perform a read. 436 /// if the new stream is a push stream, the function returns `ReceiveOutput::PushStream` 437 /// and the caller will handle it. 438 /// If the stream is of a unknown type the stream will be closed. handle_new_stream( &mut self, conn: &mut Connection, stream_type: NewStreamType, stream_id: StreamId, ) -> Res<ReceiveOutput>439 fn handle_new_stream( 440 &mut self, 441 conn: &mut Connection, 442 stream_type: NewStreamType, 443 stream_id: StreamId, 444 ) -> Res<ReceiveOutput> { 445 match stream_type { 446 NewStreamType::Control => { 447 self.check_stream_exists(Http3StreamType::Control)?; 448 self.recv_streams 449 .insert(stream_id, Box::new(ControlStreamRemote::new(stream_id))); 450 } 451 452 NewStreamType::Push(push_id) => { 453 qinfo!( 454 [self], 455 "A new push stream {} push_id:{}.", 456 stream_id, 457 push_id 458 ); 459 } 460 NewStreamType::Decoder => { 461 qinfo!([self], "A new remote qpack encoder stream {}", stream_id); 462 self.check_stream_exists(Http3StreamType::Decoder)?; 463 self.recv_streams.insert( 464 stream_id, 465 Box::new(DecoderRecvStream::new( 466 stream_id, 467 Rc::clone(&self.qpack_decoder), 468 )), 469 ); 470 } 471 NewStreamType::Encoder => { 472 qinfo!([self], "A new remote qpack decoder stream {}", stream_id); 473 self.check_stream_exists(Http3StreamType::Encoder)?; 474 self.recv_streams.insert( 475 stream_id, 476 Box::new(EncoderRecvStream::new( 477 stream_id, 478 Rc::clone(&self.qpack_encoder), 479 )), 480 ); 481 } 482 NewStreamType::Http => { 483 qinfo!([self], "A new http stream {}.", stream_id); 484 } 485 NewStreamType::WebTransportStream(session_id) => { 486 let session_exists = self 487 .send_streams 488 .get(&StreamId::from(session_id)) 489 .map_or(false, |s| { 490 s.stream_type() == Http3StreamType::ExtendedConnect 491 }); 492 if !session_exists { 493 conn.stream_stop_sending(stream_id, Error::HttpStreamCreation.code())?; 494 return Ok(ReceiveOutput::NoOutput); 495 } 496 } 497 NewStreamType::Unknown => { 498 conn.stream_stop_sending(stream_id, Error::HttpStreamCreation.code())?; 499 } 500 }; 501 502 match stream_type { 503 NewStreamType::Control | NewStreamType::Decoder | NewStreamType::Encoder => { 504 self.stream_receive(conn, stream_id) 505 } 506 NewStreamType::Push(_) | NewStreamType::Http | NewStreamType::WebTransportStream(_) => { 507 Ok(ReceiveOutput::NewStream(stream_type)) 508 } 509 NewStreamType::Unknown => Ok(ReceiveOutput::NoOutput), 510 } 511 } 512 513 /// This is called when an application closes the connection. close(&mut self, error: AppError)514 pub fn close(&mut self, error: AppError) { 515 qinfo!([self], "Close connection error {:?}.", error); 516 self.state = Http3State::Closing(ConnectionError::Application(error)); 517 if (!self.send_streams.is_empty() || !self.recv_streams.is_empty()) && (error == 0) { 518 qwarn!("close(0) called when streams still active"); 519 } 520 self.send_streams.clear(); 521 self.recv_streams.clear(); 522 } 523 524 /// This function will not handle the output of the function completely, but only 525 /// handle the indication that a stream is closed. There are 2 cases: 526 /// - an error occurred or 527 /// - the stream is done, i.e. the second value in `output` tuple is true if 528 /// the stream is done and can be removed from the `recv_streams` 529 /// How it is handling `output`: 530 /// - if the stream is done, it removes the stream from `recv_streams` 531 /// - if the stream is not done and there is no error, return `output` and the caller will 532 /// handle it. 533 /// - in case of an error: 534 /// - if it is only a stream error and the stream is not critical, send `STOP_SENDING` 535 /// frame, remove the stream from `recv_streams` and inform the listener that the stream 536 /// has been reset. 537 /// - otherwise this is a connection error. In this case, propagate the error to the caller 538 /// that will handle it properly. handle_stream_manipulation_output<U>( &mut self, output: Res<(U, bool)>, stream_id: StreamId, conn: &mut Connection, ) -> Res<(U, bool)> where U: Default,539 fn handle_stream_manipulation_output<U>( 540 &mut self, 541 output: Res<(U, bool)>, 542 stream_id: StreamId, 543 conn: &mut Connection, 544 ) -> Res<(U, bool)> 545 where 546 U: Default, 547 { 548 match &output { 549 Ok((_, true)) => { 550 self.remove_recv_stream(stream_id, conn); 551 } 552 Ok((_, false)) => {} 553 Err(e) => { 554 if e.stream_reset_error() && !self.recv_stream_is_critical(stream_id) { 555 mem::drop(conn.stream_stop_sending(stream_id, e.code())); 556 self.close_recv(stream_id, CloseType::LocalError(e.code()), conn)?; 557 return Ok((U::default(), false)); 558 } 559 } 560 } 561 output 562 } 563 create_fetch_headers<'b, 't, T>(request: &RequestDescription<'b, 't, T>) -> Res<Vec<Header>> where T: AsRequestTarget<'t> + ?Sized + Debug,564 fn create_fetch_headers<'b, 't, T>(request: &RequestDescription<'b, 't, T>) -> Res<Vec<Header>> 565 where 566 T: AsRequestTarget<'t> + ?Sized + Debug, 567 { 568 let target = request 569 .target 570 .as_request_target() 571 .map_err(|_| Error::InvalidRequestTarget)?; 572 573 // Transform pseudo-header fields 574 let mut final_headers = vec![ 575 Header::new(":method", request.method), 576 Header::new(":scheme", target.scheme()), 577 Header::new(":authority", target.authority()), 578 Header::new(":path", target.path()), 579 ]; 580 if let Some(conn_type) = request.connect_type { 581 final_headers.push(Header::new(":protocol", conn_type.string())); 582 } 583 584 if let Some(priority_header) = request.priority.header() { 585 final_headers.push(priority_header); 586 } 587 final_headers.extend_from_slice(request.headers); 588 Ok(final_headers) 589 } 590 fetch<'b, 't, T>( &mut self, conn: &mut Connection, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn HttpRecvStreamEvents>, push_handler: Option<Rc<RefCell<PushController>>>, request: &RequestDescription<'b, 't, T>, ) -> Res<StreamId> where T: AsRequestTarget<'t> + ?Sized + Debug,591 pub fn fetch<'b, 't, T>( 592 &mut self, 593 conn: &mut Connection, 594 send_events: Box<dyn SendStreamEvents>, 595 recv_events: Box<dyn HttpRecvStreamEvents>, 596 push_handler: Option<Rc<RefCell<PushController>>>, 597 request: &RequestDescription<'b, 't, T>, 598 ) -> Res<StreamId> 599 where 600 T: AsRequestTarget<'t> + ?Sized + Debug, 601 { 602 qinfo!( 603 [self], 604 "Fetch method={} target: {:?}", 605 request.method, 606 request.target, 607 ); 608 let id = self.create_bidi_transport_stream(conn)?; 609 self.fetch_with_stream(id, conn, send_events, recv_events, push_handler, request)?; 610 Ok(id) 611 } 612 create_bidi_transport_stream(&self, conn: &mut Connection) -> Res<StreamId>613 fn create_bidi_transport_stream(&self, conn: &mut Connection) -> Res<StreamId> { 614 // Requests cannot be created when a connection is in states: Initializing, GoingAway, Closing and Closed. 615 match self.state() { 616 Http3State::GoingAway(..) | Http3State::Closing(..) | Http3State::Closed(..) => { 617 return Err(Error::AlreadyClosed) 618 } 619 Http3State::Initializing => return Err(Error::Unavailable), 620 _ => {} 621 } 622 623 let id = conn 624 .stream_create(StreamType::BiDi) 625 .map_err(|e| Error::map_stream_create_errors(&e))?; 626 conn.stream_keep_alive(id, true)?; 627 Ok(id) 628 } 629 fetch_with_stream<'b, 't, T>( &mut self, stream_id: StreamId, conn: &mut Connection, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn HttpRecvStreamEvents>, push_handler: Option<Rc<RefCell<PushController>>>, request: &RequestDescription<'b, 't, T>, ) -> Res<()> where T: AsRequestTarget<'t> + ?Sized + Debug,630 fn fetch_with_stream<'b, 't, T>( 631 &mut self, 632 stream_id: StreamId, 633 conn: &mut Connection, 634 send_events: Box<dyn SendStreamEvents>, 635 recv_events: Box<dyn HttpRecvStreamEvents>, 636 push_handler: Option<Rc<RefCell<PushController>>>, 637 request: &RequestDescription<'b, 't, T>, 638 ) -> Res<()> 639 where 640 T: AsRequestTarget<'t> + ?Sized + Debug, 641 { 642 let final_headers = Http3Connection::create_fetch_headers(request)?; 643 644 let stream_type = if request.connect_type.is_some() { 645 Http3StreamType::ExtendedConnect 646 } else { 647 Http3StreamType::Http 648 }; 649 650 let mut send_message = SendMessage::new( 651 MessageType::Request, 652 stream_type, 653 stream_id, 654 self.qpack_encoder.clone(), 655 send_events, 656 ); 657 658 send_message 659 .http_stream() 660 .unwrap() 661 .send_headers(&final_headers, conn)?; 662 663 self.add_streams( 664 stream_id, 665 Box::new(send_message), 666 Box::new(RecvMessage::new( 667 &RecvMessageInfo { 668 message_type: MessageType::Response, 669 stream_type, 670 stream_id, 671 header_frame_type_read: false, 672 }, 673 Rc::clone(&self.qpack_decoder), 674 recv_events, 675 push_handler, 676 PriorityHandler::new(false, request.priority), 677 )), 678 ); 679 680 // Call immediately send so that at least headers get sent. This will make Firefox faster, since 681 // it can send request body immediatly in most cases and does not need to do a complete process loop. 682 self.send_streams 683 .get_mut(&stream_id) 684 .ok_or(Error::InvalidStreamId)? 685 .send(conn)?; 686 Ok(()) 687 } 688 689 /// Stream data are read directly into a buffer supplied as a parameter of this function to avoid copying 690 /// data. 691 /// # Errors 692 /// It returns an error if a stream does not exist or an error happens while reading a stream, e.g. 693 /// early close, protocol error, etc. read_data( &mut self, conn: &mut Connection, stream_id: StreamId, buf: &mut [u8], ) -> Res<(usize, bool)>694 pub fn read_data( 695 &mut self, 696 conn: &mut Connection, 697 stream_id: StreamId, 698 buf: &mut [u8], 699 ) -> Res<(usize, bool)> { 700 qinfo!([self], "read_data from stream {}.", stream_id); 701 let res = self 702 .recv_streams 703 .get_mut(&stream_id) 704 .ok_or(Error::InvalidStreamId)? 705 .read_data(conn, buf); 706 self.handle_stream_manipulation_output(res, stream_id, conn) 707 } 708 709 /// This is called when an application resets a stream. 710 /// The application reset will close both sides. stream_reset_send( &mut self, conn: &mut Connection, stream_id: StreamId, error: AppError, ) -> Res<()>711 pub fn stream_reset_send( 712 &mut self, 713 conn: &mut Connection, 714 stream_id: StreamId, 715 error: AppError, 716 ) -> Res<()> { 717 qinfo!( 718 [self], 719 "Reset sending side of stream {} error={}.", 720 stream_id, 721 error 722 ); 723 724 if self.send_stream_is_critical(stream_id) { 725 return Err(Error::InvalidStreamId); 726 } 727 728 self.close_send(stream_id, CloseType::ResetApp(error), conn); 729 conn.stream_reset_send(stream_id, error)?; 730 Ok(()) 731 } 732 stream_stop_sending( &mut self, conn: &mut Connection, stream_id: StreamId, error: AppError, ) -> Res<()>733 pub fn stream_stop_sending( 734 &mut self, 735 conn: &mut Connection, 736 stream_id: StreamId, 737 error: AppError, 738 ) -> Res<()> { 739 qinfo!( 740 [self], 741 "Send stop sending for stream {} error={}.", 742 stream_id, 743 error 744 ); 745 if self.recv_stream_is_critical(stream_id) { 746 return Err(Error::InvalidStreamId); 747 } 748 749 self.close_recv(stream_id, CloseType::ResetApp(error), conn)?; 750 751 // Stream may be already be closed and we may get an error here, but we do not care. 752 conn.stream_stop_sending(stream_id, error)?; 753 Ok(()) 754 } 755 cancel_fetch( &mut self, stream_id: StreamId, error: AppError, conn: &mut Connection, ) -> Res<()>756 pub fn cancel_fetch( 757 &mut self, 758 stream_id: StreamId, 759 error: AppError, 760 conn: &mut Connection, 761 ) -> Res<()> { 762 qinfo!([self], "cancel_fetch {} error={}.", stream_id, error); 763 let send_stream = self.send_streams.get(&stream_id); 764 let recv_stream = self.recv_streams.get(&stream_id); 765 match (send_stream, recv_stream) { 766 (None, None) => return Err(Error::InvalidStreamId), 767 (Some(s), None) => { 768 if !matches!( 769 s.stream_type(), 770 Http3StreamType::Http | Http3StreamType::ExtendedConnect 771 ) { 772 return Err(Error::InvalidStreamId); 773 } 774 // Stream may be already be closed and we may get an error here, but we do not care. 775 mem::drop(self.stream_reset_send(conn, stream_id, error)); 776 } 777 (None, Some(s)) => { 778 if !matches!( 779 s.stream_type(), 780 Http3StreamType::Http 781 | Http3StreamType::Push 782 | Http3StreamType::ExtendedConnect 783 ) { 784 return Err(Error::InvalidStreamId); 785 } 786 787 // Stream may be already be closed and we may get an error here, but we do not care. 788 mem::drop(self.stream_stop_sending(conn, stream_id, error)); 789 } 790 (Some(s), Some(r)) => { 791 debug_assert_eq!(s.stream_type(), r.stream_type()); 792 if !matches!( 793 s.stream_type(), 794 Http3StreamType::Http | Http3StreamType::ExtendedConnect 795 ) { 796 return Err(Error::InvalidStreamId); 797 } 798 // Stream may be already be closed and we may get an error here, but we do not care. 799 mem::drop(self.stream_reset_send(conn, stream_id, error)); 800 // Stream may be already be closed and we may get an error here, but we do not care. 801 mem::drop(self.stream_stop_sending(conn, stream_id, error)); 802 } 803 } 804 Ok(()) 805 } 806 807 /// This is called when an application wants to close the sending side of a stream. stream_close_send(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()>808 pub fn stream_close_send(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> { 809 qinfo!([self], "Close the sending side for stream {}.", stream_id); 810 debug_assert!(self.state.active()); 811 let send_stream = self 812 .send_streams 813 .get_mut(&stream_id) 814 .ok_or(Error::InvalidStreamId)?; 815 // The following function may return InvalidStreamId from the transport layer if the stream has been closed 816 // already. It is ok to ignore it here. 817 mem::drop(send_stream.close(conn)); 818 if send_stream.done() { 819 self.remove_send_stream(stream_id, conn); 820 } else if send_stream.has_data_to_send() { 821 self.streams_with_pending_data.insert(stream_id); 822 } 823 Ok(()) 824 } 825 webtransport_create_session<'x, 't: 'x, T>( &mut self, conn: &mut Connection, events: Box<dyn ExtendedConnectEvents>, target: &'t T, headers: &'t [Header], ) -> Res<StreamId> where T: AsRequestTarget<'x> + ?Sized + Debug,826 pub fn webtransport_create_session<'x, 't: 'x, T>( 827 &mut self, 828 conn: &mut Connection, 829 events: Box<dyn ExtendedConnectEvents>, 830 target: &'t T, 831 headers: &'t [Header], 832 ) -> Res<StreamId> 833 where 834 T: AsRequestTarget<'x> + ?Sized + Debug, 835 { 836 qinfo!([self], "Create WebTransport"); 837 if !self.webtransport_enabled() { 838 return Err(Error::Unavailable); 839 } 840 841 let id = self.create_bidi_transport_stream(conn)?; 842 843 let extended_conn = Rc::new(RefCell::new(WebTransportSession::new( 844 id, 845 events, 846 self.role, 847 Rc::clone(&self.qpack_encoder), 848 Rc::clone(&self.qpack_decoder), 849 ))); 850 self.add_streams( 851 id, 852 Box::new(extended_conn.clone()), 853 Box::new(extended_conn.clone()), 854 ); 855 856 let final_headers = Http3Connection::create_fetch_headers(&RequestDescription { 857 method: "CONNECT", 858 target, 859 headers, 860 connect_type: Some(ExtendedConnectType::WebTransport), 861 priority: Priority::default(), 862 })?; 863 extended_conn 864 .borrow_mut() 865 .send_request(&final_headers, conn)?; 866 self.streams_with_pending_data.insert(id); 867 Ok(id) 868 } 869 webtransport_session_accept( &mut self, conn: &mut Connection, stream_id: StreamId, events: Box<dyn ExtendedConnectEvents>, accept: bool, ) -> Res<()>870 pub(crate) fn webtransport_session_accept( 871 &mut self, 872 conn: &mut Connection, 873 stream_id: StreamId, 874 events: Box<dyn ExtendedConnectEvents>, 875 accept: bool, 876 ) -> Res<()> { 877 qtrace!("Respond to WebTransport session with accept={}.", accept); 878 if !self.webtransport_enabled() { 879 return Err(Error::Unavailable); 880 } 881 let mut recv_stream = self.recv_streams.get_mut(&stream_id); 882 if let Some(r) = &mut recv_stream { 883 if !r 884 .http_stream() 885 .ok_or(Error::InvalidStreamId)? 886 .extended_connect_wait_for_response() 887 { 888 return Err(Error::InvalidStreamId); 889 } 890 } 891 892 let send_stream = self.send_streams.get_mut(&stream_id); 893 894 match (send_stream, recv_stream, accept) { 895 (None, None, _) => Err(Error::InvalidStreamId), 896 (None, Some(_), _) | (Some(_), None, _) => { 897 // TODO this needs a better error 898 self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?; 899 Err(Error::InvalidStreamId) 900 } 901 (Some(s), Some(_r), false) => { 902 if s.http_stream() 903 .ok_or(Error::InvalidStreamId)? 904 .send_headers(&[Header::new(":status", "404")], conn) 905 .is_ok() 906 { 907 mem::drop(self.stream_close_send(conn, stream_id)); 908 // TODO issue 1294: add a timer to clean up the recv_stream if the peer does not do that in a short time. 909 self.streams_with_pending_data.insert(stream_id); 910 } else { 911 self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?; 912 } 913 Ok(()) 914 } 915 (Some(s), Some(_r), true) => { 916 if s.http_stream() 917 .ok_or(Error::InvalidStreamId)? 918 .send_headers(&[Header::new(":status", "200")], conn) 919 .is_ok() 920 { 921 let extended_conn = 922 Rc::new(RefCell::new(WebTransportSession::new_with_http_streams( 923 stream_id, 924 events, 925 self.role, 926 self.recv_streams.remove(&stream_id).unwrap(), 927 self.send_streams.remove(&stream_id).unwrap(), 928 ))); 929 self.add_streams( 930 stream_id, 931 Box::new(extended_conn.clone()), 932 Box::new(extended_conn), 933 ); 934 self.streams_with_pending_data.insert(stream_id); 935 } else { 936 self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?; 937 return Err(Error::InvalidStreamId); 938 } 939 Ok(()) 940 } 941 } 942 } 943 webtransport_close_session( &mut self, conn: &mut Connection, session_id: StreamId, error: u32, message: &str, ) -> Res<()>944 pub(crate) fn webtransport_close_session( 945 &mut self, 946 conn: &mut Connection, 947 session_id: StreamId, 948 error: u32, 949 message: &str, 950 ) -> Res<()> { 951 qtrace!("Clos WebTransport session {:?}", session_id); 952 let send_stream = self 953 .send_streams 954 .get_mut(&session_id) 955 .ok_or(Error::InvalidStreamId)?; 956 if send_stream.stream_type() != Http3StreamType::ExtendedConnect { 957 return Err(Error::InvalidStreamId); 958 } 959 960 send_stream.close_with_message(conn, error, message)?; 961 if send_stream.done() { 962 self.remove_send_stream(session_id, conn); 963 } else if send_stream.has_data_to_send() { 964 self.streams_with_pending_data.insert(session_id); 965 } 966 Ok(()) 967 } 968 webtransport_create_stream_local( &mut self, conn: &mut Connection, session_id: StreamId, stream_type: StreamType, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn RecvStreamEvents>, ) -> Res<StreamId>969 pub fn webtransport_create_stream_local( 970 &mut self, 971 conn: &mut Connection, 972 session_id: StreamId, 973 stream_type: StreamType, 974 send_events: Box<dyn SendStreamEvents>, 975 recv_events: Box<dyn RecvStreamEvents>, 976 ) -> Res<StreamId> { 977 qtrace!( 978 "Create new WebTransport stream session={} type={:?}", 979 session_id, 980 stream_type 981 ); 982 983 let wt = self 984 .recv_streams 985 .get(&session_id) 986 .ok_or(Error::InvalidStreamId)? 987 .webtransport() 988 .ok_or(Error::InvalidStreamId)?; 989 if !wt.borrow().is_active() { 990 return Err(Error::InvalidStreamId); 991 } 992 993 let stream_id = conn 994 .stream_create(stream_type) 995 .map_err(|e| Error::map_stream_create_errors(&e))?; 996 997 self.webtransport_create_stream_internal( 998 wt, 999 stream_id, 1000 session_id, 1001 send_events, 1002 recv_events, 1003 true, 1004 ); 1005 Ok(stream_id) 1006 } 1007 webtransport_create_stream_remote( &mut self, session_id: StreamId, stream_id: StreamId, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn RecvStreamEvents>, ) -> Res<()>1008 pub fn webtransport_create_stream_remote( 1009 &mut self, 1010 session_id: StreamId, 1011 stream_id: StreamId, 1012 send_events: Box<dyn SendStreamEvents>, 1013 recv_events: Box<dyn RecvStreamEvents>, 1014 ) -> Res<()> { 1015 qtrace!( 1016 "Create new WebTransport stream session={} stream_id={}", 1017 session_id, 1018 stream_id 1019 ); 1020 1021 let wt = self 1022 .recv_streams 1023 .get(&session_id) 1024 .ok_or(Error::InvalidStreamId)? 1025 .webtransport() 1026 .ok_or(Error::InvalidStreamId)?; 1027 1028 self.webtransport_create_stream_internal( 1029 wt, 1030 stream_id, 1031 session_id, 1032 send_events, 1033 recv_events, 1034 false, 1035 ); 1036 Ok(()) 1037 } 1038 webtransport_create_stream_internal( &mut self, webtransport_session: Rc<RefCell<WebTransportSession>>, stream_id: StreamId, session_id: StreamId, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn RecvStreamEvents>, local: bool, )1039 fn webtransport_create_stream_internal( 1040 &mut self, 1041 webtransport_session: Rc<RefCell<WebTransportSession>>, 1042 stream_id: StreamId, 1043 session_id: StreamId, 1044 send_events: Box<dyn SendStreamEvents>, 1045 recv_events: Box<dyn RecvStreamEvents>, 1046 local: bool, 1047 ) { 1048 // TODO conn.stream_keep_alive(stream_id, true)?; 1049 webtransport_session.borrow_mut().add_stream(stream_id); 1050 if stream_id.stream_type() == StreamType::UniDi { 1051 if local { 1052 self.send_streams.insert( 1053 stream_id, 1054 Box::new(WebTransportSendStream::new( 1055 stream_id, 1056 session_id, 1057 send_events, 1058 webtransport_session, 1059 true, 1060 )), 1061 ); 1062 } else { 1063 self.recv_streams.insert( 1064 stream_id, 1065 Box::new(WebTransportRecvStream::new( 1066 stream_id, 1067 session_id, 1068 recv_events, 1069 webtransport_session, 1070 )), 1071 ); 1072 } 1073 } else { 1074 self.add_streams( 1075 stream_id, 1076 Box::new(WebTransportSendStream::new( 1077 stream_id, 1078 session_id, 1079 send_events, 1080 webtransport_session.clone(), 1081 local, 1082 )), 1083 Box::new(WebTransportRecvStream::new( 1084 stream_id, 1085 session_id, 1086 recv_events, 1087 webtransport_session, 1088 )), 1089 ); 1090 } 1091 } 1092 1093 // If the control stream has received frames MaxPushId or Goaway which handling is specific to 1094 // the client and server, we must give them to the specific client/server handler. handle_control_frame(&mut self, f: HFrame) -> Res<Option<HFrame>>1095 fn handle_control_frame(&mut self, f: HFrame) -> Res<Option<HFrame>> { 1096 qinfo!([self], "Handle a control frame {:?}", f); 1097 if !matches!(f, HFrame::Settings { .. }) 1098 && !matches!( 1099 self.settings_state, 1100 Http3RemoteSettingsState::Received { .. } 1101 ) 1102 { 1103 return Err(Error::HttpMissingSettings); 1104 } 1105 match f { 1106 HFrame::Settings { settings } => { 1107 self.handle_settings(settings)?; 1108 Ok(None) 1109 } 1110 HFrame::Goaway { .. } 1111 | HFrame::MaxPushId { .. } 1112 | HFrame::CancelPush { .. } 1113 | HFrame::PriorityUpdateRequest { .. } 1114 | HFrame::PriorityUpdatePush { .. } => Ok(Some(f)), 1115 _ => Err(Error::HttpFrameUnexpected), 1116 } 1117 } 1118 set_qpack_settings(&mut self, settings: &HSettings) -> Res<()>1119 fn set_qpack_settings(&mut self, settings: &HSettings) -> Res<()> { 1120 let mut qpe = self.qpack_encoder.borrow_mut(); 1121 qpe.set_max_capacity(settings.get(HSettingType::MaxTableCapacity))?; 1122 qpe.set_max_blocked_streams(settings.get(HSettingType::BlockedStreams))?; 1123 Ok(()) 1124 } 1125 handle_settings(&mut self, new_settings: HSettings) -> Res<()>1126 fn handle_settings(&mut self, new_settings: HSettings) -> Res<()> { 1127 qinfo!([self], "Handle SETTINGS frame."); 1128 match &self.settings_state { 1129 Http3RemoteSettingsState::NotReceived => { 1130 self.set_qpack_settings(&new_settings)?; 1131 self.webtransport.handle_settings(&new_settings); 1132 self.settings_state = Http3RemoteSettingsState::Received(new_settings); 1133 Ok(()) 1134 } 1135 Http3RemoteSettingsState::ZeroRtt(settings) => { 1136 self.webtransport.handle_settings(&new_settings); 1137 let mut qpack_changed = false; 1138 for st in &[ 1139 HSettingType::MaxHeaderListSize, 1140 HSettingType::MaxTableCapacity, 1141 HSettingType::BlockedStreams, 1142 ] { 1143 let zero_rtt_value = settings.get(*st); 1144 let new_value = new_settings.get(*st); 1145 if zero_rtt_value == new_value { 1146 continue; 1147 } 1148 if zero_rtt_value > new_value { 1149 qerror!( 1150 [self], 1151 "The new({}) and the old value({}) of setting {:?} do not match", 1152 new_value, 1153 zero_rtt_value, 1154 st 1155 ); 1156 return Err(Error::HttpSettings); 1157 } 1158 1159 match st { 1160 HSettingType::MaxTableCapacity => { 1161 if zero_rtt_value != 0 { 1162 return Err(Error::QpackError(neqo_qpack::Error::DecoderStream)); 1163 } 1164 qpack_changed = true; 1165 } 1166 HSettingType::BlockedStreams => qpack_changed = true, 1167 HSettingType::MaxHeaderListSize | HSettingType::EnableWebTransport => (), 1168 } 1169 } 1170 if qpack_changed { 1171 qdebug!([self], "Settings after zero rtt differ."); 1172 self.set_qpack_settings(&(new_settings))?; 1173 } 1174 self.settings_state = Http3RemoteSettingsState::Received(new_settings); 1175 Ok(()) 1176 } 1177 Http3RemoteSettingsState::Received { .. } => Err(Error::HttpFrameUnexpected), 1178 } 1179 } 1180 1181 /// Return the current state on `Http3Connection`. state(&self) -> Http3State1182 pub fn state(&self) -> Http3State { 1183 self.state.clone() 1184 } 1185 1186 /// Adds a new send and receive stream. add_streams( &mut self, stream_id: StreamId, send_stream: Box<dyn SendStream>, recv_stream: Box<dyn RecvStream>, )1187 pub fn add_streams( 1188 &mut self, 1189 stream_id: StreamId, 1190 send_stream: Box<dyn SendStream>, 1191 recv_stream: Box<dyn RecvStream>, 1192 ) { 1193 if send_stream.has_data_to_send() { 1194 self.streams_with_pending_data.insert(stream_id); 1195 } 1196 self.send_streams.insert(stream_id, send_stream); 1197 self.recv_streams.insert(stream_id, recv_stream); 1198 } 1199 1200 /// Add a new recv stream. This is used for push streams. add_recv_stream(&mut self, stream_id: StreamId, recv_stream: Box<dyn RecvStream>)1201 pub fn add_recv_stream(&mut self, stream_id: StreamId, recv_stream: Box<dyn RecvStream>) { 1202 self.recv_streams.insert(stream_id, recv_stream); 1203 } 1204 queue_control_frame(&mut self, frame: &HFrame)1205 pub fn queue_control_frame(&mut self, frame: &HFrame) { 1206 self.control_stream_local.queue_frame(frame); 1207 } 1208 queue_update_priority(&mut self, stream_id: StreamId, priority: Priority) -> Res<bool>1209 pub fn queue_update_priority(&mut self, stream_id: StreamId, priority: Priority) -> Res<bool> { 1210 let stream = self 1211 .recv_streams 1212 .get_mut(&stream_id) 1213 .ok_or(Error::InvalidStreamId)? 1214 .http_stream() 1215 .ok_or(Error::InvalidStreamId)?; 1216 1217 if stream.maybe_update_priority(priority) { 1218 self.control_stream_local.queue_update_priority(stream_id); 1219 Ok(true) 1220 } else { 1221 Ok(false) 1222 } 1223 } 1224 recv_stream_is_critical(&self, stream_id: StreamId) -> bool1225 fn recv_stream_is_critical(&self, stream_id: StreamId) -> bool { 1226 if let Some(r) = self.recv_streams.get(&stream_id) { 1227 matches!( 1228 r.stream_type(), 1229 Http3StreamType::Control | Http3StreamType::Encoder | Http3StreamType::Decoder 1230 ) 1231 } else { 1232 false 1233 } 1234 } 1235 send_stream_is_critical(&self, stream_id: StreamId) -> bool1236 fn send_stream_is_critical(&self, stream_id: StreamId) -> bool { 1237 self.qpack_encoder 1238 .borrow() 1239 .local_stream_id() 1240 .iter() 1241 .chain(self.qpack_decoder.borrow().local_stream_id().iter()) 1242 .chain(self.control_stream_local.stream_id().iter()) 1243 .any(|id| stream_id == *id) 1244 } 1245 close_send(&mut self, stream_id: StreamId, close_type: CloseType, conn: &mut Connection)1246 fn close_send(&mut self, stream_id: StreamId, close_type: CloseType, conn: &mut Connection) { 1247 if let Some(mut s) = self.remove_send_stream(stream_id, conn) { 1248 s.handle_stop_sending(close_type); 1249 } 1250 } 1251 close_recv( &mut self, stream_id: StreamId, close_type: CloseType, conn: &mut Connection, ) -> Res<()>1252 fn close_recv( 1253 &mut self, 1254 stream_id: StreamId, 1255 close_type: CloseType, 1256 conn: &mut Connection, 1257 ) -> Res<()> { 1258 if let Some(mut s) = self.remove_recv_stream(stream_id, conn) { 1259 s.reset(close_type)?; 1260 } 1261 Ok(()) 1262 } 1263 remove_extended_connect( &mut self, wt: &Rc<RefCell<WebTransportSession>>, conn: &mut Connection, )1264 fn remove_extended_connect( 1265 &mut self, 1266 wt: &Rc<RefCell<WebTransportSession>>, 1267 conn: &mut Connection, 1268 ) { 1269 let out = wt.borrow_mut().take_sub_streams(); 1270 if out.is_none() { 1271 return; 1272 } 1273 let (recv, send) = out.unwrap(); 1274 1275 for id in recv { 1276 qtrace!("Remove the extended connect sub receiver stream {}", id); 1277 // Use CloseType::ResetRemote so that an event will be sent. CloseType::LocalError would have 1278 // the same effect. 1279 if let Some(mut s) = self.recv_streams.remove(&id) { 1280 mem::drop(s.reset(CloseType::ResetRemote(Error::HttpRequestCancelled.code()))); 1281 } 1282 mem::drop(conn.stream_stop_sending(id, Error::HttpRequestCancelled.code())); 1283 } 1284 for id in send { 1285 qtrace!("Remove the extended connect sub send stream {}", id); 1286 if let Some(mut s) = self.send_streams.remove(&id) { 1287 s.handle_stop_sending(CloseType::ResetRemote(Error::HttpRequestCancelled.code())); 1288 } 1289 mem::drop(conn.stream_reset_send(id, Error::HttpRequestCancelled.code())); 1290 } 1291 } 1292 remove_recv_stream( &mut self, stream_id: StreamId, conn: &mut Connection, ) -> Option<Box<dyn RecvStream>>1293 fn remove_recv_stream( 1294 &mut self, 1295 stream_id: StreamId, 1296 conn: &mut Connection, 1297 ) -> Option<Box<dyn RecvStream>> { 1298 let stream = self.recv_streams.remove(&stream_id); 1299 if let Some(ref s) = stream { 1300 if s.stream_type() == Http3StreamType::ExtendedConnect { 1301 self.send_streams.remove(&stream_id).unwrap(); 1302 if let Some(wt) = s.webtransport() { 1303 self.remove_extended_connect(&wt, conn); 1304 } 1305 } 1306 } 1307 stream 1308 } 1309 remove_send_stream( &mut self, stream_id: StreamId, conn: &mut Connection, ) -> Option<Box<dyn SendStream>>1310 fn remove_send_stream( 1311 &mut self, 1312 stream_id: StreamId, 1313 conn: &mut Connection, 1314 ) -> Option<Box<dyn SendStream>> { 1315 let stream = self.send_streams.remove(&stream_id); 1316 if let Some(ref s) = stream { 1317 if s.stream_type() == Http3StreamType::ExtendedConnect { 1318 if let Some(wt) = self.recv_streams.remove(&stream_id).unwrap().webtransport() { 1319 self.remove_extended_connect(&wt, conn); 1320 } 1321 } 1322 } 1323 stream 1324 } 1325 webtransport_enabled(&self) -> bool1326 pub fn webtransport_enabled(&self) -> bool { 1327 self.webtransport.enabled() 1328 } 1329 } 1330