1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 #![allow(clippy::module_name_repetitions)]
8 
9 use crate::control_stream_local::ControlStreamLocal;
10 use crate::control_stream_remote::ControlStreamRemote;
11 use crate::features::extended_connect::{
12     webtransport_session::WebTransportSession,
13     webtransport_streams::{WebTransportRecvStream, WebTransportSendStream},
14     ExtendedConnectEvents, ExtendedConnectFeature, ExtendedConnectType,
15 };
16 use crate::frames::HFrame;
17 use crate::push_controller::PushController;
18 use crate::qpack_decoder_receiver::DecoderRecvStream;
19 use crate::qpack_encoder_receiver::EncoderRecvStream;
20 use crate::recv_message::{RecvMessage, RecvMessageInfo};
21 use crate::request_target::{AsRequestTarget, RequestTarget};
22 use crate::send_message::SendMessage;
23 use crate::settings::{HSettingType, HSettings, HttpZeroRttChecker};
24 use crate::stream_type_reader::NewStreamHeadReader;
25 use crate::{
26     client_events::Http3ClientEvents, CloseType, Http3Parameters, Http3StreamType,
27     HttpRecvStreamEvents, NewStreamType, Priority, PriorityHandler, ReceiveOutput, RecvStream,
28     RecvStreamEvents, SendStream, SendStreamEvents,
29 };
30 use neqo_common::{qdebug, qerror, qinfo, qtrace, qwarn, Header, MessageType, Role};
31 use neqo_qpack::decoder::QPackDecoder;
32 use neqo_qpack::encoder::QPackEncoder;
33 use neqo_transport::{
34     AppError, Connection, ConnectionError, State, StreamId, StreamType, ZeroRttState,
35 };
36 use std::cell::RefCell;
37 use std::collections::{BTreeSet, HashMap};
38 use std::fmt::Debug;
39 use std::mem;
40 use std::rc::Rc;
41 
42 use crate::{Error, Res};
43 
44 pub struct RequestDescription<'b, 't, T>
45 where
46     T: AsRequestTarget<'t> + ?Sized + Debug,
47 {
48     pub method: &'b str,
49     pub connect_type: Option<ExtendedConnectType>,
50     pub target: &'t T,
51     pub headers: &'b [Header],
52     pub priority: Priority,
53 }
54 
55 #[derive(Debug)]
56 enum Http3RemoteSettingsState {
57     NotReceived,
58     Received(HSettings),
59     ZeroRtt(HSettings),
60 }
61 
62 #[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Clone)]
63 pub enum Http3State {
64     Initializing,
65     ZeroRtt,
66     Connected,
67     GoingAway(StreamId),
68     Closing(ConnectionError),
69     Closed(ConnectionError),
70 }
71 
72 impl Http3State {
73     #[must_use]
active(&self) -> bool74     pub fn active(&self) -> bool {
75         matches!(
76             self,
77             Http3State::Connected | Http3State::GoingAway(_) | Http3State::ZeroRtt
78         )
79     }
80 }
81 
82 #[derive(Debug)]
83 pub(crate) struct Http3Connection {
84     role: Role,
85     pub state: Http3State,
86     local_params: Http3Parameters,
87     control_stream_local: ControlStreamLocal,
88     pub qpack_encoder: Rc<RefCell<QPackEncoder>>,
89     pub qpack_decoder: Rc<RefCell<QPackDecoder>>,
90     settings_state: Http3RemoteSettingsState,
91     streams_with_pending_data: BTreeSet<StreamId>,
92     pub send_streams: HashMap<StreamId, Box<dyn SendStream>>,
93     pub recv_streams: HashMap<StreamId, Box<dyn RecvStream>>,
94     webtransport: ExtendedConnectFeature,
95 }
96 
97 impl ::std::fmt::Display for Http3Connection {
fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result98     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
99         write!(f, "Http3 connection")
100     }
101 }
102 
103 impl Http3Connection {
104     /// Create a new connection.
new(conn_params: Http3Parameters, role: Role) -> Self105     pub fn new(conn_params: Http3Parameters, role: Role) -> Self {
106         Self {
107             state: Http3State::Initializing,
108             control_stream_local: ControlStreamLocal::new(),
109             qpack_encoder: Rc::new(RefCell::new(QPackEncoder::new(
110                 conn_params.get_qpack_settings(),
111                 true,
112             ))),
113             qpack_decoder: Rc::new(RefCell::new(QPackDecoder::new(
114                 conn_params.get_qpack_settings(),
115             ))),
116             webtransport: ExtendedConnectFeature::new(
117                 ExtendedConnectType::WebTransport,
118                 conn_params.get_webtransport(),
119             ),
120             local_params: conn_params,
121             settings_state: Http3RemoteSettingsState::NotReceived,
122             streams_with_pending_data: BTreeSet::new(),
123             send_streams: HashMap::new(),
124             recv_streams: HashMap::new(),
125             role,
126         }
127     }
128 
set_features_listener(&mut self, feature_listener: Http3ClientEvents)129     pub fn set_features_listener(&mut self, feature_listener: Http3ClientEvents) {
130         self.webtransport.set_listener(feature_listener);
131     }
132 
initialize_http3_connection(&mut self, conn: &mut Connection) -> Res<()>133     fn initialize_http3_connection(&mut self, conn: &mut Connection) -> Res<()> {
134         qinfo!([self], "Initialize the http3 connection.");
135         self.control_stream_local.create(conn)?;
136 
137         self.send_settings();
138         self.create_qpack_streams(conn)?;
139         Ok(())
140     }
141 
send_settings(&mut self)142     fn send_settings(&mut self) {
143         qdebug!([self], "Send settings.");
144         self.control_stream_local.queue_frame(&HFrame::Settings {
145             settings: HSettings::from(&self.local_params),
146         });
147         self.control_stream_local.queue_frame(&HFrame::Grease);
148     }
149 
150     /// Save settings for adding to the session ticket.
save_settings(&self) -> Vec<u8>151     pub(crate) fn save_settings(&self) -> Vec<u8> {
152         HttpZeroRttChecker::save(&self.local_params)
153     }
154 
create_qpack_streams(&mut self, conn: &mut Connection) -> Res<()>155     fn create_qpack_streams(&mut self, conn: &mut Connection) -> Res<()> {
156         qdebug!([self], "create_qpack_streams.");
157         self.qpack_encoder
158             .borrow_mut()
159             .add_send_stream(conn.stream_create(StreamType::UniDi)?);
160         self.qpack_decoder
161             .borrow_mut()
162             .add_send_stream(conn.stream_create(StreamType::UniDi)?);
163         Ok(())
164     }
165 
166     /// Inform a `HttpConnection` that a stream has data to send and that `send` should be called for the stream.
stream_has_pending_data(&mut self, stream_id: StreamId)167     pub fn stream_has_pending_data(&mut self, stream_id: StreamId) {
168         self.streams_with_pending_data.insert(stream_id);
169     }
170 
171     /// Return true if there is a stream that needs to send data.
has_data_to_send(&self) -> bool172     pub fn has_data_to_send(&self) -> bool {
173         !self.streams_with_pending_data.is_empty()
174     }
175 
send_non_control_streams(&mut self, conn: &mut Connection) -> Res<()>176     fn send_non_control_streams(&mut self, conn: &mut Connection) -> Res<()> {
177         let to_send = mem::take(&mut self.streams_with_pending_data);
178         for stream_id in to_send {
179             let done = if let Some(s) = &mut self.send_streams.get_mut(&stream_id) {
180                 s.send(conn)?;
181                 if s.has_data_to_send() {
182                     self.streams_with_pending_data.insert(stream_id);
183                 }
184                 s.done()
185             } else {
186                 false
187             };
188             if done {
189                 self.remove_send_stream(stream_id, conn);
190             }
191         }
192         Ok(())
193     }
194 
195     /// Call `send` for all streams that need to send data.
process_sending(&mut self, conn: &mut Connection) -> Res<()>196     pub fn process_sending(&mut self, conn: &mut Connection) -> Res<()> {
197         // check if control stream has data to send.
198         self.control_stream_local
199             .send(conn, &mut self.recv_streams)?;
200 
201         self.send_non_control_streams(conn)?;
202 
203         self.qpack_decoder.borrow_mut().send(conn)?;
204         match self.qpack_encoder.borrow_mut().send_encoder_updates(conn) {
205             Ok(())
206             | Err(neqo_qpack::Error::EncoderStreamBlocked)
207             | Err(neqo_qpack::Error::DynamicTableFull) => {}
208             Err(e) => return Err(Error::QpackError(e)),
209         }
210         Ok(())
211     }
212 
213     /// We have a resumption token which remembers previous settings. Update the setting.
set_0rtt_settings(&mut self, conn: &mut Connection, settings: HSettings) -> Res<()>214     pub fn set_0rtt_settings(&mut self, conn: &mut Connection, settings: HSettings) -> Res<()> {
215         self.initialize_http3_connection(conn)?;
216         self.set_qpack_settings(&settings)?;
217         self.settings_state = Http3RemoteSettingsState::ZeroRtt(settings);
218         self.state = Http3State::ZeroRtt;
219         Ok(())
220     }
221 
222     /// Returns the settings for a connection. This is used for creating a resumption token.
get_settings(&self) -> Option<HSettings>223     pub fn get_settings(&self) -> Option<HSettings> {
224         if let Http3RemoteSettingsState::Received(settings) = &self.settings_state {
225             Some(settings.clone())
226         } else {
227             None
228         }
229     }
230 
add_new_stream(&mut self, stream_id: StreamId)231     pub fn add_new_stream(&mut self, stream_id: StreamId) {
232         qtrace!([self], "A new stream: {}.", stream_id);
233         self.recv_streams.insert(
234             stream_id,
235             Box::new(NewStreamHeadReader::new(stream_id, self.role)),
236         );
237     }
238 
239     #[allow(clippy::option_if_let_else)] // False positive as borrow scope isn't lexical here.
stream_receive(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<ReceiveOutput>240     fn stream_receive(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<ReceiveOutput> {
241         qtrace!([self], "Readable stream {}.", stream_id);
242 
243         if let Some(recv_stream) = self.recv_streams.get_mut(&stream_id) {
244             let res = recv_stream.receive(conn);
245             return self
246                 .handle_stream_manipulation_output(res, stream_id, conn)
247                 .map(|(output, _)| output);
248         }
249         Ok(ReceiveOutput::NoOutput)
250     }
251 
handle_unblocked_streams( &mut self, unblocked_streams: Vec<StreamId>, conn: &mut Connection, ) -> Res<()>252     fn handle_unblocked_streams(
253         &mut self,
254         unblocked_streams: Vec<StreamId>,
255         conn: &mut Connection,
256     ) -> Res<()> {
257         for stream_id in unblocked_streams {
258             qdebug!([self], "Stream {} is unblocked", stream_id);
259             if let Some(r) = self.recv_streams.get_mut(&stream_id) {
260                 let res = r
261                     .http_stream()
262                     .ok_or(Error::HttpInternal(10))?
263                     .header_unblocked(conn);
264                 let res = self.handle_stream_manipulation_output(res, stream_id, conn)?;
265                 debug_assert!(matches!(res, (ReceiveOutput::NoOutput, _)));
266             }
267         }
268         Ok(())
269     }
270 
271     /// This function handles reading from all streams, i.e. control, qpack, request/response
272     /// stream and unidi stream that are still do not have a type.
273     /// The function cannot handle:
274     /// 1) a Push stream (if an unknown unidi stream is decoded to be a push stream)
275     /// 2) frames `MaxPushId` or `Goaway` must be handled by `Http3Client`/`Server`.
276     /// The function returns `ReceiveOutput`.
handle_stream_readable( &mut self, conn: &mut Connection, stream_id: StreamId, ) -> Res<ReceiveOutput>277     pub fn handle_stream_readable(
278         &mut self,
279         conn: &mut Connection,
280         stream_id: StreamId,
281     ) -> Res<ReceiveOutput> {
282         let mut output = self.stream_receive(conn, stream_id)?;
283 
284         if let ReceiveOutput::NewStream(stream_type) = output {
285             output = self.handle_new_stream(conn, stream_type, stream_id)?;
286         }
287 
288         #[allow(clippy::match_same_arms)] // clippy is being stupid here
289         match output {
290             ReceiveOutput::UnblockedStreams(unblocked_streams) => {
291                 self.handle_unblocked_streams(unblocked_streams, conn)?;
292                 Ok(ReceiveOutput::NoOutput)
293             }
294             ReceiveOutput::ControlFrames(mut control_frames) => {
295                 let mut rest = Vec::new();
296                 for cf in control_frames.drain(..) {
297                     if let Some(not_handled) = self.handle_control_frame(cf)? {
298                         rest.push(not_handled);
299                     }
300                 }
301                 Ok(ReceiveOutput::ControlFrames(rest))
302             }
303             ReceiveOutput::NewStream(NewStreamType::Push(_))
304             | ReceiveOutput::NewStream(NewStreamType::Http)
305             | ReceiveOutput::NewStream(NewStreamType::WebTransportStream(_)) => Ok(output),
306             ReceiveOutput::NewStream(_) => {
307                 unreachable!("NewStream should have been handled already")
308             }
309             _ => Ok(output),
310         }
311     }
312 
313     /// This is called when a RESET frame has been received.
handle_stream_reset( &mut self, stream_id: StreamId, app_error: AppError, conn: &mut Connection, ) -> Res<()>314     pub fn handle_stream_reset(
315         &mut self,
316         stream_id: StreamId,
317         app_error: AppError,
318         conn: &mut Connection,
319     ) -> Res<()> {
320         qinfo!(
321             [self],
322             "Handle a stream reset stream_id={} app_err={}",
323             stream_id,
324             app_error
325         );
326 
327         self.close_recv(stream_id, CloseType::ResetRemote(app_error), conn)
328     }
329 
handle_stream_stop_sending( &mut self, stream_id: StreamId, app_error: AppError, conn: &mut Connection, ) -> Res<()>330     pub fn handle_stream_stop_sending(
331         &mut self,
332         stream_id: StreamId,
333         app_error: AppError,
334         conn: &mut Connection,
335     ) -> Res<()> {
336         qinfo!(
337             [self],
338             "Handle stream_stop_sending stream_id={} app_err={}",
339             stream_id,
340             app_error
341         );
342 
343         if self.send_stream_is_critical(stream_id) {
344             return Err(Error::HttpClosedCriticalStream);
345         }
346 
347         self.close_send(stream_id, CloseType::ResetRemote(app_error), conn);
348         Ok(())
349     }
350 
351     /// This is called when `neqo_transport::Connection` state has been change to take proper actions in
352     /// the HTTP3 layer.
handle_state_change(&mut self, conn: &mut Connection, state: &State) -> Res<bool>353     pub fn handle_state_change(&mut self, conn: &mut Connection, state: &State) -> Res<bool> {
354         qdebug!([self], "Handle state change {:?}", state);
355         match state {
356             State::Handshaking => {
357                 if self.role == Role::Server
358                     && conn.zero_rtt_state() == &ZeroRttState::AcceptedServer
359                 {
360                     self.state = Http3State::ZeroRtt;
361                     self.initialize_http3_connection(conn)?;
362                     Ok(true)
363                 } else {
364                     Ok(false)
365                 }
366             }
367             State::Connected => {
368                 debug_assert!(matches!(
369                     self.state,
370                     Http3State::Initializing | Http3State::ZeroRtt
371                 ));
372                 if self.state == Http3State::Initializing {
373                     self.initialize_http3_connection(conn)?;
374                 }
375                 self.state = Http3State::Connected;
376                 Ok(true)
377             }
378             State::Closing { error, .. } | State::Draining { error, .. } => {
379                 if matches!(self.state, Http3State::Closing(_) | Http3State::Closed(_)) {
380                     Ok(false)
381                 } else {
382                     self.state = Http3State::Closing(error.clone());
383                     Ok(true)
384                 }
385             }
386             State::Closed(error) => {
387                 if matches!(self.state, Http3State::Closed(_)) {
388                     Ok(false)
389                 } else {
390                     self.state = Http3State::Closed(error.clone());
391                     Ok(true)
392                 }
393             }
394             _ => Ok(false),
395         }
396     }
397 
398     /// This is called when 0RTT has been reseted to clear `send_streams`, `recv_streams` and settings.
handle_zero_rtt_rejected(&mut self) -> Res<()>399     pub fn handle_zero_rtt_rejected(&mut self) -> Res<()> {
400         if self.state == Http3State::ZeroRtt {
401             self.state = Http3State::Initializing;
402             self.control_stream_local = ControlStreamLocal::new();
403             self.qpack_encoder = Rc::new(RefCell::new(QPackEncoder::new(
404                 self.local_params.get_qpack_settings(),
405                 true,
406             )));
407             self.qpack_decoder = Rc::new(RefCell::new(QPackDecoder::new(
408                 self.local_params.get_qpack_settings(),
409             )));
410             self.settings_state = Http3RemoteSettingsState::NotReceived;
411             self.streams_with_pending_data.clear();
412             // TODO: investigate whether this code can automatically retry failed transactions.
413             self.send_streams.clear();
414             self.recv_streams.clear();
415             Ok(())
416         } else {
417             debug_assert!(false, "Zero rtt rejected in the wrong state.");
418             Err(Error::HttpInternal(3))
419         }
420     }
421 
check_stream_exists(&self, stream_type: Http3StreamType) -> Res<()>422     fn check_stream_exists(&self, stream_type: Http3StreamType) -> Res<()> {
423         if self
424             .recv_streams
425             .values()
426             .any(|c| c.stream_type() == stream_type)
427         {
428             Err(Error::HttpStreamCreation)
429         } else {
430             Ok(())
431         }
432     }
433 
434     /// If the new stream is a control stream, this function creates a proper handler
435     /// and perform a read.
436     /// if the new stream is a push stream, the function returns `ReceiveOutput::PushStream`
437     /// and the caller will handle it.
438     /// If the stream is of a unknown type the stream will be closed.
handle_new_stream( &mut self, conn: &mut Connection, stream_type: NewStreamType, stream_id: StreamId, ) -> Res<ReceiveOutput>439     fn handle_new_stream(
440         &mut self,
441         conn: &mut Connection,
442         stream_type: NewStreamType,
443         stream_id: StreamId,
444     ) -> Res<ReceiveOutput> {
445         match stream_type {
446             NewStreamType::Control => {
447                 self.check_stream_exists(Http3StreamType::Control)?;
448                 self.recv_streams
449                     .insert(stream_id, Box::new(ControlStreamRemote::new(stream_id)));
450             }
451 
452             NewStreamType::Push(push_id) => {
453                 qinfo!(
454                     [self],
455                     "A new push stream {} push_id:{}.",
456                     stream_id,
457                     push_id
458                 );
459             }
460             NewStreamType::Decoder => {
461                 qinfo!([self], "A new remote qpack encoder stream {}", stream_id);
462                 self.check_stream_exists(Http3StreamType::Decoder)?;
463                 self.recv_streams.insert(
464                     stream_id,
465                     Box::new(DecoderRecvStream::new(
466                         stream_id,
467                         Rc::clone(&self.qpack_decoder),
468                     )),
469                 );
470             }
471             NewStreamType::Encoder => {
472                 qinfo!([self], "A new remote qpack decoder stream {}", stream_id);
473                 self.check_stream_exists(Http3StreamType::Encoder)?;
474                 self.recv_streams.insert(
475                     stream_id,
476                     Box::new(EncoderRecvStream::new(
477                         stream_id,
478                         Rc::clone(&self.qpack_encoder),
479                     )),
480                 );
481             }
482             NewStreamType::Http => {
483                 qinfo!([self], "A new http stream {}.", stream_id);
484             }
485             NewStreamType::WebTransportStream(session_id) => {
486                 let session_exists = self
487                     .send_streams
488                     .get(&StreamId::from(session_id))
489                     .map_or(false, |s| {
490                         s.stream_type() == Http3StreamType::ExtendedConnect
491                     });
492                 if !session_exists {
493                     conn.stream_stop_sending(stream_id, Error::HttpStreamCreation.code())?;
494                     return Ok(ReceiveOutput::NoOutput);
495                 }
496             }
497             NewStreamType::Unknown => {
498                 conn.stream_stop_sending(stream_id, Error::HttpStreamCreation.code())?;
499             }
500         };
501 
502         match stream_type {
503             NewStreamType::Control | NewStreamType::Decoder | NewStreamType::Encoder => {
504                 self.stream_receive(conn, stream_id)
505             }
506             NewStreamType::Push(_) | NewStreamType::Http | NewStreamType::WebTransportStream(_) => {
507                 Ok(ReceiveOutput::NewStream(stream_type))
508             }
509             NewStreamType::Unknown => Ok(ReceiveOutput::NoOutput),
510         }
511     }
512 
513     /// This is called when an application closes the connection.
close(&mut self, error: AppError)514     pub fn close(&mut self, error: AppError) {
515         qinfo!([self], "Close connection error {:?}.", error);
516         self.state = Http3State::Closing(ConnectionError::Application(error));
517         if (!self.send_streams.is_empty() || !self.recv_streams.is_empty()) && (error == 0) {
518             qwarn!("close(0) called when streams still active");
519         }
520         self.send_streams.clear();
521         self.recv_streams.clear();
522     }
523 
524     /// This function will not handle the output of the function completely, but only
525     /// handle the indication that a stream is closed. There are 2 cases:
526     ///  - an error occurred or
527     ///  - the stream is done, i.e. the second value in `output` tuple is true if
528     ///    the stream is done and can be removed from the `recv_streams`
529     /// How it is handling `output`:
530     ///  - if the stream is done, it removes the stream from `recv_streams`
531     ///  - if the stream is not done and there is no error, return `output` and the caller will
532     ///    handle it.
533     ///  - in case of an error:
534     ///    - if it is only a stream error and the stream is not critical, send `STOP_SENDING`
535     ///      frame, remove the stream from `recv_streams` and inform the listener that the stream
536     ///      has been reset.
537     ///    - otherwise this is a connection error. In this case, propagate the error to the caller
538     ///      that will handle it properly.
handle_stream_manipulation_output<U>( &mut self, output: Res<(U, bool)>, stream_id: StreamId, conn: &mut Connection, ) -> Res<(U, bool)> where U: Default,539     fn handle_stream_manipulation_output<U>(
540         &mut self,
541         output: Res<(U, bool)>,
542         stream_id: StreamId,
543         conn: &mut Connection,
544     ) -> Res<(U, bool)>
545     where
546         U: Default,
547     {
548         match &output {
549             Ok((_, true)) => {
550                 self.remove_recv_stream(stream_id, conn);
551             }
552             Ok((_, false)) => {}
553             Err(e) => {
554                 if e.stream_reset_error() && !self.recv_stream_is_critical(stream_id) {
555                     mem::drop(conn.stream_stop_sending(stream_id, e.code()));
556                     self.close_recv(stream_id, CloseType::LocalError(e.code()), conn)?;
557                     return Ok((U::default(), false));
558                 }
559             }
560         }
561         output
562     }
563 
create_fetch_headers<'b, 't, T>(request: &RequestDescription<'b, 't, T>) -> Res<Vec<Header>> where T: AsRequestTarget<'t> + ?Sized + Debug,564     fn create_fetch_headers<'b, 't, T>(request: &RequestDescription<'b, 't, T>) -> Res<Vec<Header>>
565     where
566         T: AsRequestTarget<'t> + ?Sized + Debug,
567     {
568         let target = request
569             .target
570             .as_request_target()
571             .map_err(|_| Error::InvalidRequestTarget)?;
572 
573         // Transform pseudo-header fields
574         let mut final_headers = vec![
575             Header::new(":method", request.method),
576             Header::new(":scheme", target.scheme()),
577             Header::new(":authority", target.authority()),
578             Header::new(":path", target.path()),
579         ];
580         if let Some(conn_type) = request.connect_type {
581             final_headers.push(Header::new(":protocol", conn_type.string()));
582         }
583 
584         if let Some(priority_header) = request.priority.header() {
585             final_headers.push(priority_header);
586         }
587         final_headers.extend_from_slice(request.headers);
588         Ok(final_headers)
589     }
590 
fetch<'b, 't, T>( &mut self, conn: &mut Connection, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn HttpRecvStreamEvents>, push_handler: Option<Rc<RefCell<PushController>>>, request: &RequestDescription<'b, 't, T>, ) -> Res<StreamId> where T: AsRequestTarget<'t> + ?Sized + Debug,591     pub fn fetch<'b, 't, T>(
592         &mut self,
593         conn: &mut Connection,
594         send_events: Box<dyn SendStreamEvents>,
595         recv_events: Box<dyn HttpRecvStreamEvents>,
596         push_handler: Option<Rc<RefCell<PushController>>>,
597         request: &RequestDescription<'b, 't, T>,
598     ) -> Res<StreamId>
599     where
600         T: AsRequestTarget<'t> + ?Sized + Debug,
601     {
602         qinfo!(
603             [self],
604             "Fetch method={} target: {:?}",
605             request.method,
606             request.target,
607         );
608         let id = self.create_bidi_transport_stream(conn)?;
609         self.fetch_with_stream(id, conn, send_events, recv_events, push_handler, request)?;
610         Ok(id)
611     }
612 
create_bidi_transport_stream(&self, conn: &mut Connection) -> Res<StreamId>613     fn create_bidi_transport_stream(&self, conn: &mut Connection) -> Res<StreamId> {
614         // Requests cannot be created when a connection is in states: Initializing, GoingAway, Closing and Closed.
615         match self.state() {
616             Http3State::GoingAway(..) | Http3State::Closing(..) | Http3State::Closed(..) => {
617                 return Err(Error::AlreadyClosed)
618             }
619             Http3State::Initializing => return Err(Error::Unavailable),
620             _ => {}
621         }
622 
623         let id = conn
624             .stream_create(StreamType::BiDi)
625             .map_err(|e| Error::map_stream_create_errors(&e))?;
626         conn.stream_keep_alive(id, true)?;
627         Ok(id)
628     }
629 
fetch_with_stream<'b, 't, T>( &mut self, stream_id: StreamId, conn: &mut Connection, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn HttpRecvStreamEvents>, push_handler: Option<Rc<RefCell<PushController>>>, request: &RequestDescription<'b, 't, T>, ) -> Res<()> where T: AsRequestTarget<'t> + ?Sized + Debug,630     fn fetch_with_stream<'b, 't, T>(
631         &mut self,
632         stream_id: StreamId,
633         conn: &mut Connection,
634         send_events: Box<dyn SendStreamEvents>,
635         recv_events: Box<dyn HttpRecvStreamEvents>,
636         push_handler: Option<Rc<RefCell<PushController>>>,
637         request: &RequestDescription<'b, 't, T>,
638     ) -> Res<()>
639     where
640         T: AsRequestTarget<'t> + ?Sized + Debug,
641     {
642         let final_headers = Http3Connection::create_fetch_headers(request)?;
643 
644         let stream_type = if request.connect_type.is_some() {
645             Http3StreamType::ExtendedConnect
646         } else {
647             Http3StreamType::Http
648         };
649 
650         let mut send_message = SendMessage::new(
651             MessageType::Request,
652             stream_type,
653             stream_id,
654             self.qpack_encoder.clone(),
655             send_events,
656         );
657 
658         send_message
659             .http_stream()
660             .unwrap()
661             .send_headers(&final_headers, conn)?;
662 
663         self.add_streams(
664             stream_id,
665             Box::new(send_message),
666             Box::new(RecvMessage::new(
667                 &RecvMessageInfo {
668                     message_type: MessageType::Response,
669                     stream_type,
670                     stream_id,
671                     header_frame_type_read: false,
672                 },
673                 Rc::clone(&self.qpack_decoder),
674                 recv_events,
675                 push_handler,
676                 PriorityHandler::new(false, request.priority),
677             )),
678         );
679 
680         // Call immediately send so that at least headers get sent. This will make Firefox faster, since
681         // it can send request body immediatly in most cases and does not need to do a complete process loop.
682         self.send_streams
683             .get_mut(&stream_id)
684             .ok_or(Error::InvalidStreamId)?
685             .send(conn)?;
686         Ok(())
687     }
688 
689     /// Stream data are read directly into a buffer supplied as a parameter of this function to avoid copying
690     /// data.
691     /// # Errors
692     /// It returns an error if a stream does not exist or an error happens while reading a stream, e.g.
693     /// early close, protocol error, etc.
read_data( &mut self, conn: &mut Connection, stream_id: StreamId, buf: &mut [u8], ) -> Res<(usize, bool)>694     pub fn read_data(
695         &mut self,
696         conn: &mut Connection,
697         stream_id: StreamId,
698         buf: &mut [u8],
699     ) -> Res<(usize, bool)> {
700         qinfo!([self], "read_data from stream {}.", stream_id);
701         let res = self
702             .recv_streams
703             .get_mut(&stream_id)
704             .ok_or(Error::InvalidStreamId)?
705             .read_data(conn, buf);
706         self.handle_stream_manipulation_output(res, stream_id, conn)
707     }
708 
709     /// This is called when an application resets a stream.
710     /// The application reset will close both sides.
stream_reset_send( &mut self, conn: &mut Connection, stream_id: StreamId, error: AppError, ) -> Res<()>711     pub fn stream_reset_send(
712         &mut self,
713         conn: &mut Connection,
714         stream_id: StreamId,
715         error: AppError,
716     ) -> Res<()> {
717         qinfo!(
718             [self],
719             "Reset sending side of stream {} error={}.",
720             stream_id,
721             error
722         );
723 
724         if self.send_stream_is_critical(stream_id) {
725             return Err(Error::InvalidStreamId);
726         }
727 
728         self.close_send(stream_id, CloseType::ResetApp(error), conn);
729         conn.stream_reset_send(stream_id, error)?;
730         Ok(())
731     }
732 
stream_stop_sending( &mut self, conn: &mut Connection, stream_id: StreamId, error: AppError, ) -> Res<()>733     pub fn stream_stop_sending(
734         &mut self,
735         conn: &mut Connection,
736         stream_id: StreamId,
737         error: AppError,
738     ) -> Res<()> {
739         qinfo!(
740             [self],
741             "Send stop sending for stream {} error={}.",
742             stream_id,
743             error
744         );
745         if self.recv_stream_is_critical(stream_id) {
746             return Err(Error::InvalidStreamId);
747         }
748 
749         self.close_recv(stream_id, CloseType::ResetApp(error), conn)?;
750 
751         // Stream may be already be closed and we may get an error here, but we do not care.
752         conn.stream_stop_sending(stream_id, error)?;
753         Ok(())
754     }
755 
cancel_fetch( &mut self, stream_id: StreamId, error: AppError, conn: &mut Connection, ) -> Res<()>756     pub fn cancel_fetch(
757         &mut self,
758         stream_id: StreamId,
759         error: AppError,
760         conn: &mut Connection,
761     ) -> Res<()> {
762         qinfo!([self], "cancel_fetch {} error={}.", stream_id, error);
763         let send_stream = self.send_streams.get(&stream_id);
764         let recv_stream = self.recv_streams.get(&stream_id);
765         match (send_stream, recv_stream) {
766             (None, None) => return Err(Error::InvalidStreamId),
767             (Some(s), None) => {
768                 if !matches!(
769                     s.stream_type(),
770                     Http3StreamType::Http | Http3StreamType::ExtendedConnect
771                 ) {
772                     return Err(Error::InvalidStreamId);
773                 }
774                 // Stream may be already be closed and we may get an error here, but we do not care.
775                 mem::drop(self.stream_reset_send(conn, stream_id, error));
776             }
777             (None, Some(s)) => {
778                 if !matches!(
779                     s.stream_type(),
780                     Http3StreamType::Http
781                         | Http3StreamType::Push
782                         | Http3StreamType::ExtendedConnect
783                 ) {
784                     return Err(Error::InvalidStreamId);
785                 }
786 
787                 // Stream may be already be closed and we may get an error here, but we do not care.
788                 mem::drop(self.stream_stop_sending(conn, stream_id, error));
789             }
790             (Some(s), Some(r)) => {
791                 debug_assert_eq!(s.stream_type(), r.stream_type());
792                 if !matches!(
793                     s.stream_type(),
794                     Http3StreamType::Http | Http3StreamType::ExtendedConnect
795                 ) {
796                     return Err(Error::InvalidStreamId);
797                 }
798                 // Stream may be already be closed and we may get an error here, but we do not care.
799                 mem::drop(self.stream_reset_send(conn, stream_id, error));
800                 // Stream may be already be closed and we may get an error here, but we do not care.
801                 mem::drop(self.stream_stop_sending(conn, stream_id, error));
802             }
803         }
804         Ok(())
805     }
806 
807     /// This is called when an application wants to close the sending side of a stream.
stream_close_send(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()>808     pub fn stream_close_send(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> {
809         qinfo!([self], "Close the sending side for stream {}.", stream_id);
810         debug_assert!(self.state.active());
811         let send_stream = self
812             .send_streams
813             .get_mut(&stream_id)
814             .ok_or(Error::InvalidStreamId)?;
815         // The following function may return InvalidStreamId from the transport layer if the stream has been closed
816         // already. It is ok to ignore it here.
817         mem::drop(send_stream.close(conn));
818         if send_stream.done() {
819             self.remove_send_stream(stream_id, conn);
820         } else if send_stream.has_data_to_send() {
821             self.streams_with_pending_data.insert(stream_id);
822         }
823         Ok(())
824     }
825 
webtransport_create_session<'x, 't: 'x, T>( &mut self, conn: &mut Connection, events: Box<dyn ExtendedConnectEvents>, target: &'t T, headers: &'t [Header], ) -> Res<StreamId> where T: AsRequestTarget<'x> + ?Sized + Debug,826     pub fn webtransport_create_session<'x, 't: 'x, T>(
827         &mut self,
828         conn: &mut Connection,
829         events: Box<dyn ExtendedConnectEvents>,
830         target: &'t T,
831         headers: &'t [Header],
832     ) -> Res<StreamId>
833     where
834         T: AsRequestTarget<'x> + ?Sized + Debug,
835     {
836         qinfo!([self], "Create WebTransport");
837         if !self.webtransport_enabled() {
838             return Err(Error::Unavailable);
839         }
840 
841         let id = self.create_bidi_transport_stream(conn)?;
842 
843         let extended_conn = Rc::new(RefCell::new(WebTransportSession::new(
844             id,
845             events,
846             self.role,
847             Rc::clone(&self.qpack_encoder),
848             Rc::clone(&self.qpack_decoder),
849         )));
850         self.add_streams(
851             id,
852             Box::new(extended_conn.clone()),
853             Box::new(extended_conn.clone()),
854         );
855 
856         let final_headers = Http3Connection::create_fetch_headers(&RequestDescription {
857             method: "CONNECT",
858             target,
859             headers,
860             connect_type: Some(ExtendedConnectType::WebTransport),
861             priority: Priority::default(),
862         })?;
863         extended_conn
864             .borrow_mut()
865             .send_request(&final_headers, conn)?;
866         self.streams_with_pending_data.insert(id);
867         Ok(id)
868     }
869 
webtransport_session_accept( &mut self, conn: &mut Connection, stream_id: StreamId, events: Box<dyn ExtendedConnectEvents>, accept: bool, ) -> Res<()>870     pub(crate) fn webtransport_session_accept(
871         &mut self,
872         conn: &mut Connection,
873         stream_id: StreamId,
874         events: Box<dyn ExtendedConnectEvents>,
875         accept: bool,
876     ) -> Res<()> {
877         qtrace!("Respond to WebTransport session with accept={}.", accept);
878         if !self.webtransport_enabled() {
879             return Err(Error::Unavailable);
880         }
881         let mut recv_stream = self.recv_streams.get_mut(&stream_id);
882         if let Some(r) = &mut recv_stream {
883             if !r
884                 .http_stream()
885                 .ok_or(Error::InvalidStreamId)?
886                 .extended_connect_wait_for_response()
887             {
888                 return Err(Error::InvalidStreamId);
889             }
890         }
891 
892         let send_stream = self.send_streams.get_mut(&stream_id);
893 
894         match (send_stream, recv_stream, accept) {
895             (None, None, _) => Err(Error::InvalidStreamId),
896             (None, Some(_), _) | (Some(_), None, _) => {
897                 // TODO this needs a better error
898                 self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?;
899                 Err(Error::InvalidStreamId)
900             }
901             (Some(s), Some(_r), false) => {
902                 if s.http_stream()
903                     .ok_or(Error::InvalidStreamId)?
904                     .send_headers(&[Header::new(":status", "404")], conn)
905                     .is_ok()
906                 {
907                     mem::drop(self.stream_close_send(conn, stream_id));
908                     // TODO issue 1294: add a timer to clean up the recv_stream if the peer does not do that in a short time.
909                     self.streams_with_pending_data.insert(stream_id);
910                 } else {
911                     self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?;
912                 }
913                 Ok(())
914             }
915             (Some(s), Some(_r), true) => {
916                 if s.http_stream()
917                     .ok_or(Error::InvalidStreamId)?
918                     .send_headers(&[Header::new(":status", "200")], conn)
919                     .is_ok()
920                 {
921                     let extended_conn =
922                         Rc::new(RefCell::new(WebTransportSession::new_with_http_streams(
923                             stream_id,
924                             events,
925                             self.role,
926                             self.recv_streams.remove(&stream_id).unwrap(),
927                             self.send_streams.remove(&stream_id).unwrap(),
928                         )));
929                     self.add_streams(
930                         stream_id,
931                         Box::new(extended_conn.clone()),
932                         Box::new(extended_conn),
933                     );
934                     self.streams_with_pending_data.insert(stream_id);
935                 } else {
936                     self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?;
937                     return Err(Error::InvalidStreamId);
938                 }
939                 Ok(())
940             }
941         }
942     }
943 
webtransport_close_session( &mut self, conn: &mut Connection, session_id: StreamId, error: u32, message: &str, ) -> Res<()>944     pub(crate) fn webtransport_close_session(
945         &mut self,
946         conn: &mut Connection,
947         session_id: StreamId,
948         error: u32,
949         message: &str,
950     ) -> Res<()> {
951         qtrace!("Clos WebTransport session {:?}", session_id);
952         let send_stream = self
953             .send_streams
954             .get_mut(&session_id)
955             .ok_or(Error::InvalidStreamId)?;
956         if send_stream.stream_type() != Http3StreamType::ExtendedConnect {
957             return Err(Error::InvalidStreamId);
958         }
959 
960         send_stream.close_with_message(conn, error, message)?;
961         if send_stream.done() {
962             self.remove_send_stream(session_id, conn);
963         } else if send_stream.has_data_to_send() {
964             self.streams_with_pending_data.insert(session_id);
965         }
966         Ok(())
967     }
968 
webtransport_create_stream_local( &mut self, conn: &mut Connection, session_id: StreamId, stream_type: StreamType, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn RecvStreamEvents>, ) -> Res<StreamId>969     pub fn webtransport_create_stream_local(
970         &mut self,
971         conn: &mut Connection,
972         session_id: StreamId,
973         stream_type: StreamType,
974         send_events: Box<dyn SendStreamEvents>,
975         recv_events: Box<dyn RecvStreamEvents>,
976     ) -> Res<StreamId> {
977         qtrace!(
978             "Create new WebTransport stream session={} type={:?}",
979             session_id,
980             stream_type
981         );
982 
983         let wt = self
984             .recv_streams
985             .get(&session_id)
986             .ok_or(Error::InvalidStreamId)?
987             .webtransport()
988             .ok_or(Error::InvalidStreamId)?;
989         if !wt.borrow().is_active() {
990             return Err(Error::InvalidStreamId);
991         }
992 
993         let stream_id = conn
994             .stream_create(stream_type)
995             .map_err(|e| Error::map_stream_create_errors(&e))?;
996 
997         self.webtransport_create_stream_internal(
998             wt,
999             stream_id,
1000             session_id,
1001             send_events,
1002             recv_events,
1003             true,
1004         );
1005         Ok(stream_id)
1006     }
1007 
webtransport_create_stream_remote( &mut self, session_id: StreamId, stream_id: StreamId, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn RecvStreamEvents>, ) -> Res<()>1008     pub fn webtransport_create_stream_remote(
1009         &mut self,
1010         session_id: StreamId,
1011         stream_id: StreamId,
1012         send_events: Box<dyn SendStreamEvents>,
1013         recv_events: Box<dyn RecvStreamEvents>,
1014     ) -> Res<()> {
1015         qtrace!(
1016             "Create new WebTransport stream session={} stream_id={}",
1017             session_id,
1018             stream_id
1019         );
1020 
1021         let wt = self
1022             .recv_streams
1023             .get(&session_id)
1024             .ok_or(Error::InvalidStreamId)?
1025             .webtransport()
1026             .ok_or(Error::InvalidStreamId)?;
1027 
1028         self.webtransport_create_stream_internal(
1029             wt,
1030             stream_id,
1031             session_id,
1032             send_events,
1033             recv_events,
1034             false,
1035         );
1036         Ok(())
1037     }
1038 
webtransport_create_stream_internal( &mut self, webtransport_session: Rc<RefCell<WebTransportSession>>, stream_id: StreamId, session_id: StreamId, send_events: Box<dyn SendStreamEvents>, recv_events: Box<dyn RecvStreamEvents>, local: bool, )1039     fn webtransport_create_stream_internal(
1040         &mut self,
1041         webtransport_session: Rc<RefCell<WebTransportSession>>,
1042         stream_id: StreamId,
1043         session_id: StreamId,
1044         send_events: Box<dyn SendStreamEvents>,
1045         recv_events: Box<dyn RecvStreamEvents>,
1046         local: bool,
1047     ) {
1048         // TODO conn.stream_keep_alive(stream_id, true)?;
1049         webtransport_session.borrow_mut().add_stream(stream_id);
1050         if stream_id.stream_type() == StreamType::UniDi {
1051             if local {
1052                 self.send_streams.insert(
1053                     stream_id,
1054                     Box::new(WebTransportSendStream::new(
1055                         stream_id,
1056                         session_id,
1057                         send_events,
1058                         webtransport_session,
1059                         true,
1060                     )),
1061                 );
1062             } else {
1063                 self.recv_streams.insert(
1064                     stream_id,
1065                     Box::new(WebTransportRecvStream::new(
1066                         stream_id,
1067                         session_id,
1068                         recv_events,
1069                         webtransport_session,
1070                     )),
1071                 );
1072             }
1073         } else {
1074             self.add_streams(
1075                 stream_id,
1076                 Box::new(WebTransportSendStream::new(
1077                     stream_id,
1078                     session_id,
1079                     send_events,
1080                     webtransport_session.clone(),
1081                     local,
1082                 )),
1083                 Box::new(WebTransportRecvStream::new(
1084                     stream_id,
1085                     session_id,
1086                     recv_events,
1087                     webtransport_session,
1088                 )),
1089             );
1090         }
1091     }
1092 
1093     // If the control stream has received frames MaxPushId or Goaway which handling is specific to
1094     // the client and server, we must give them to the specific client/server handler.
handle_control_frame(&mut self, f: HFrame) -> Res<Option<HFrame>>1095     fn handle_control_frame(&mut self, f: HFrame) -> Res<Option<HFrame>> {
1096         qinfo!([self], "Handle a control frame {:?}", f);
1097         if !matches!(f, HFrame::Settings { .. })
1098             && !matches!(
1099                 self.settings_state,
1100                 Http3RemoteSettingsState::Received { .. }
1101             )
1102         {
1103             return Err(Error::HttpMissingSettings);
1104         }
1105         match f {
1106             HFrame::Settings { settings } => {
1107                 self.handle_settings(settings)?;
1108                 Ok(None)
1109             }
1110             HFrame::Goaway { .. }
1111             | HFrame::MaxPushId { .. }
1112             | HFrame::CancelPush { .. }
1113             | HFrame::PriorityUpdateRequest { .. }
1114             | HFrame::PriorityUpdatePush { .. } => Ok(Some(f)),
1115             _ => Err(Error::HttpFrameUnexpected),
1116         }
1117     }
1118 
set_qpack_settings(&mut self, settings: &HSettings) -> Res<()>1119     fn set_qpack_settings(&mut self, settings: &HSettings) -> Res<()> {
1120         let mut qpe = self.qpack_encoder.borrow_mut();
1121         qpe.set_max_capacity(settings.get(HSettingType::MaxTableCapacity))?;
1122         qpe.set_max_blocked_streams(settings.get(HSettingType::BlockedStreams))?;
1123         Ok(())
1124     }
1125 
handle_settings(&mut self, new_settings: HSettings) -> Res<()>1126     fn handle_settings(&mut self, new_settings: HSettings) -> Res<()> {
1127         qinfo!([self], "Handle SETTINGS frame.");
1128         match &self.settings_state {
1129             Http3RemoteSettingsState::NotReceived => {
1130                 self.set_qpack_settings(&new_settings)?;
1131                 self.webtransport.handle_settings(&new_settings);
1132                 self.settings_state = Http3RemoteSettingsState::Received(new_settings);
1133                 Ok(())
1134             }
1135             Http3RemoteSettingsState::ZeroRtt(settings) => {
1136                 self.webtransport.handle_settings(&new_settings);
1137                 let mut qpack_changed = false;
1138                 for st in &[
1139                     HSettingType::MaxHeaderListSize,
1140                     HSettingType::MaxTableCapacity,
1141                     HSettingType::BlockedStreams,
1142                 ] {
1143                     let zero_rtt_value = settings.get(*st);
1144                     let new_value = new_settings.get(*st);
1145                     if zero_rtt_value == new_value {
1146                         continue;
1147                     }
1148                     if zero_rtt_value > new_value {
1149                         qerror!(
1150                             [self],
1151                             "The new({}) and the old value({}) of setting {:?} do not match",
1152                             new_value,
1153                             zero_rtt_value,
1154                             st
1155                         );
1156                         return Err(Error::HttpSettings);
1157                     }
1158 
1159                     match st {
1160                         HSettingType::MaxTableCapacity => {
1161                             if zero_rtt_value != 0 {
1162                                 return Err(Error::QpackError(neqo_qpack::Error::DecoderStream));
1163                             }
1164                             qpack_changed = true;
1165                         }
1166                         HSettingType::BlockedStreams => qpack_changed = true,
1167                         HSettingType::MaxHeaderListSize | HSettingType::EnableWebTransport => (),
1168                     }
1169                 }
1170                 if qpack_changed {
1171                     qdebug!([self], "Settings after zero rtt differ.");
1172                     self.set_qpack_settings(&(new_settings))?;
1173                 }
1174                 self.settings_state = Http3RemoteSettingsState::Received(new_settings);
1175                 Ok(())
1176             }
1177             Http3RemoteSettingsState::Received { .. } => Err(Error::HttpFrameUnexpected),
1178         }
1179     }
1180 
1181     /// Return the current state on `Http3Connection`.
state(&self) -> Http3State1182     pub fn state(&self) -> Http3State {
1183         self.state.clone()
1184     }
1185 
1186     /// Adds a new send and receive stream.
add_streams( &mut self, stream_id: StreamId, send_stream: Box<dyn SendStream>, recv_stream: Box<dyn RecvStream>, )1187     pub fn add_streams(
1188         &mut self,
1189         stream_id: StreamId,
1190         send_stream: Box<dyn SendStream>,
1191         recv_stream: Box<dyn RecvStream>,
1192     ) {
1193         if send_stream.has_data_to_send() {
1194             self.streams_with_pending_data.insert(stream_id);
1195         }
1196         self.send_streams.insert(stream_id, send_stream);
1197         self.recv_streams.insert(stream_id, recv_stream);
1198     }
1199 
1200     /// Add a new recv stream. This is used for push streams.
add_recv_stream(&mut self, stream_id: StreamId, recv_stream: Box<dyn RecvStream>)1201     pub fn add_recv_stream(&mut self, stream_id: StreamId, recv_stream: Box<dyn RecvStream>) {
1202         self.recv_streams.insert(stream_id, recv_stream);
1203     }
1204 
queue_control_frame(&mut self, frame: &HFrame)1205     pub fn queue_control_frame(&mut self, frame: &HFrame) {
1206         self.control_stream_local.queue_frame(frame);
1207     }
1208 
queue_update_priority(&mut self, stream_id: StreamId, priority: Priority) -> Res<bool>1209     pub fn queue_update_priority(&mut self, stream_id: StreamId, priority: Priority) -> Res<bool> {
1210         let stream = self
1211             .recv_streams
1212             .get_mut(&stream_id)
1213             .ok_or(Error::InvalidStreamId)?
1214             .http_stream()
1215             .ok_or(Error::InvalidStreamId)?;
1216 
1217         if stream.maybe_update_priority(priority) {
1218             self.control_stream_local.queue_update_priority(stream_id);
1219             Ok(true)
1220         } else {
1221             Ok(false)
1222         }
1223     }
1224 
recv_stream_is_critical(&self, stream_id: StreamId) -> bool1225     fn recv_stream_is_critical(&self, stream_id: StreamId) -> bool {
1226         if let Some(r) = self.recv_streams.get(&stream_id) {
1227             matches!(
1228                 r.stream_type(),
1229                 Http3StreamType::Control | Http3StreamType::Encoder | Http3StreamType::Decoder
1230             )
1231         } else {
1232             false
1233         }
1234     }
1235 
send_stream_is_critical(&self, stream_id: StreamId) -> bool1236     fn send_stream_is_critical(&self, stream_id: StreamId) -> bool {
1237         self.qpack_encoder
1238             .borrow()
1239             .local_stream_id()
1240             .iter()
1241             .chain(self.qpack_decoder.borrow().local_stream_id().iter())
1242             .chain(self.control_stream_local.stream_id().iter())
1243             .any(|id| stream_id == *id)
1244     }
1245 
close_send(&mut self, stream_id: StreamId, close_type: CloseType, conn: &mut Connection)1246     fn close_send(&mut self, stream_id: StreamId, close_type: CloseType, conn: &mut Connection) {
1247         if let Some(mut s) = self.remove_send_stream(stream_id, conn) {
1248             s.handle_stop_sending(close_type);
1249         }
1250     }
1251 
close_recv( &mut self, stream_id: StreamId, close_type: CloseType, conn: &mut Connection, ) -> Res<()>1252     fn close_recv(
1253         &mut self,
1254         stream_id: StreamId,
1255         close_type: CloseType,
1256         conn: &mut Connection,
1257     ) -> Res<()> {
1258         if let Some(mut s) = self.remove_recv_stream(stream_id, conn) {
1259             s.reset(close_type)?;
1260         }
1261         Ok(())
1262     }
1263 
remove_extended_connect( &mut self, wt: &Rc<RefCell<WebTransportSession>>, conn: &mut Connection, )1264     fn remove_extended_connect(
1265         &mut self,
1266         wt: &Rc<RefCell<WebTransportSession>>,
1267         conn: &mut Connection,
1268     ) {
1269         let out = wt.borrow_mut().take_sub_streams();
1270         if out.is_none() {
1271             return;
1272         }
1273         let (recv, send) = out.unwrap();
1274 
1275         for id in recv {
1276             qtrace!("Remove the extended connect sub receiver stream {}", id);
1277             // Use CloseType::ResetRemote so that an event will be sent. CloseType::LocalError would have
1278             // the same effect.
1279             if let Some(mut s) = self.recv_streams.remove(&id) {
1280                 mem::drop(s.reset(CloseType::ResetRemote(Error::HttpRequestCancelled.code())));
1281             }
1282             mem::drop(conn.stream_stop_sending(id, Error::HttpRequestCancelled.code()));
1283         }
1284         for id in send {
1285             qtrace!("Remove the extended connect sub send stream {}", id);
1286             if let Some(mut s) = self.send_streams.remove(&id) {
1287                 s.handle_stop_sending(CloseType::ResetRemote(Error::HttpRequestCancelled.code()));
1288             }
1289             mem::drop(conn.stream_reset_send(id, Error::HttpRequestCancelled.code()));
1290         }
1291     }
1292 
remove_recv_stream( &mut self, stream_id: StreamId, conn: &mut Connection, ) -> Option<Box<dyn RecvStream>>1293     fn remove_recv_stream(
1294         &mut self,
1295         stream_id: StreamId,
1296         conn: &mut Connection,
1297     ) -> Option<Box<dyn RecvStream>> {
1298         let stream = self.recv_streams.remove(&stream_id);
1299         if let Some(ref s) = stream {
1300             if s.stream_type() == Http3StreamType::ExtendedConnect {
1301                 self.send_streams.remove(&stream_id).unwrap();
1302                 if let Some(wt) = s.webtransport() {
1303                     self.remove_extended_connect(&wt, conn);
1304                 }
1305             }
1306         }
1307         stream
1308     }
1309 
remove_send_stream( &mut self, stream_id: StreamId, conn: &mut Connection, ) -> Option<Box<dyn SendStream>>1310     fn remove_send_stream(
1311         &mut self,
1312         stream_id: StreamId,
1313         conn: &mut Connection,
1314     ) -> Option<Box<dyn SendStream>> {
1315         let stream = self.send_streams.remove(&stream_id);
1316         if let Some(ref s) = stream {
1317             if s.stream_type() == Http3StreamType::ExtendedConnect {
1318                 if let Some(wt) = self.recv_streams.remove(&stream_id).unwrap().webtransport() {
1319                     self.remove_extended_connect(&wt, conn);
1320                 }
1321             }
1322         }
1323         stream
1324     }
1325 
webtransport_enabled(&self) -> bool1326     pub fn webtransport_enabled(&self) -> bool {
1327         self.webtransport.enabled()
1328     }
1329 }
1330