1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 // Stream management for a connection. 8 9 use crate::fc::{LocalStreamLimits, ReceiverFlowControl, RemoteStreamLimits, SenderFlowControl}; 10 use crate::frame::Frame; 11 use crate::packet::PacketBuilder; 12 use crate::recovery::{RecoveryToken, StreamRecoveryToken}; 13 use crate::recv_stream::{RecvStream, RecvStreams}; 14 use crate::send_stream::{SendStream, SendStreams, TransmissionPriority}; 15 use crate::stats::FrameStats; 16 use crate::stream_id::{StreamId, StreamType}; 17 use crate::tparams::{self, TransportParametersHandler}; 18 use crate::ConnectionEvents; 19 use crate::{Error, Res}; 20 use neqo_common::{qtrace, qwarn, Role}; 21 use std::cell::RefCell; 22 use std::rc::Rc; 23 24 pub struct Streams { 25 role: Role, 26 tps: Rc<RefCell<TransportParametersHandler>>, 27 events: ConnectionEvents, 28 sender_fc: Rc<RefCell<SenderFlowControl<()>>>, 29 receiver_fc: Rc<RefCell<ReceiverFlowControl<()>>>, 30 remote_stream_limits: RemoteStreamLimits, 31 local_stream_limits: LocalStreamLimits, 32 pub(crate) send: SendStreams, 33 pub(crate) recv: RecvStreams, 34 } 35 36 impl Streams { new( tps: Rc<RefCell<TransportParametersHandler>>, role: Role, events: ConnectionEvents, ) -> Self37 pub fn new( 38 tps: Rc<RefCell<TransportParametersHandler>>, 39 role: Role, 40 events: ConnectionEvents, 41 ) -> Self { 42 let limit_bidi = tps 43 .borrow() 44 .local 45 .get_integer(tparams::INITIAL_MAX_STREAMS_BIDI); 46 let limit_uni = tps 47 .borrow() 48 .local 49 .get_integer(tparams::INITIAL_MAX_STREAMS_UNI); 50 let max_data = tps.borrow().local.get_integer(tparams::INITIAL_MAX_DATA); 51 Self { 52 role, 53 tps, 54 events, 55 sender_fc: Rc::new(RefCell::new(SenderFlowControl::new((), 0))), 56 receiver_fc: Rc::new(RefCell::new(ReceiverFlowControl::new((), max_data))), 57 remote_stream_limits: RemoteStreamLimits::new(limit_bidi, limit_uni, role), 58 local_stream_limits: LocalStreamLimits::new(role), 59 send: SendStreams::default(), 60 recv: RecvStreams::default(), 61 } 62 } 63 is_stream_id_allowed(&self, stream_id: StreamId) -> bool64 pub fn is_stream_id_allowed(&self, stream_id: StreamId) -> bool { 65 self.remote_stream_limits[stream_id.stream_type()].is_allowed(stream_id) 66 } 67 zero_rtt_rejected(&mut self)68 pub fn zero_rtt_rejected(&mut self) { 69 self.send.clear(); 70 self.recv.clear(); 71 debug_assert_eq!( 72 self.remote_stream_limits[StreamType::BiDi].max_active(), 73 self.tps 74 .borrow() 75 .local 76 .get_integer(tparams::INITIAL_MAX_STREAMS_BIDI) 77 ); 78 debug_assert_eq!( 79 self.remote_stream_limits[StreamType::UniDi].max_active(), 80 self.tps 81 .borrow() 82 .local 83 .get_integer(tparams::INITIAL_MAX_STREAMS_UNI) 84 ); 85 self.local_stream_limits = LocalStreamLimits::new(self.role); 86 } 87 input_frame(&mut self, frame: Frame, stats: &mut FrameStats) -> Res<()>88 pub fn input_frame(&mut self, frame: Frame, stats: &mut FrameStats) -> Res<()> { 89 match frame { 90 Frame::ResetStream { 91 stream_id, 92 application_error_code, 93 final_size, 94 } => { 95 stats.reset_stream += 1; 96 if let (_, Some(rs)) = self.obtain_stream(stream_id)? { 97 rs.reset(application_error_code, final_size)?; 98 } 99 } 100 Frame::StopSending { 101 stream_id, 102 application_error_code, 103 } => { 104 stats.stop_sending += 1; 105 self.events 106 .send_stream_stop_sending(stream_id, application_error_code); 107 if let (Some(ss), _) = self.obtain_stream(stream_id)? { 108 ss.reset(application_error_code); 109 } 110 } 111 Frame::Stream { 112 fin, 113 stream_id, 114 offset, 115 data, 116 .. 117 } => { 118 stats.stream += 1; 119 if let (_, Some(rs)) = self.obtain_stream(stream_id)? { 120 rs.inbound_stream_frame(fin, offset, data)?; 121 } 122 } 123 Frame::MaxData { maximum_data } => { 124 stats.max_data += 1; 125 self.handle_max_data(maximum_data); 126 } 127 Frame::MaxStreamData { 128 stream_id, 129 maximum_stream_data, 130 } => { 131 qtrace!( 132 "Stream {} Received MaxStreamData {}", 133 stream_id, 134 maximum_stream_data 135 ); 136 stats.max_stream_data += 1; 137 if let (Some(ss), _) = self.obtain_stream(stream_id)? { 138 ss.set_max_stream_data(maximum_stream_data); 139 } 140 } 141 Frame::MaxStreams { 142 stream_type, 143 maximum_streams, 144 } => { 145 stats.max_streams += 1; 146 self.handle_max_streams(stream_type, maximum_streams); 147 } 148 Frame::DataBlocked { data_limit } => { 149 // Should never happen since we set data limit to max 150 qwarn!("Received DataBlocked with data limit {}", data_limit); 151 stats.data_blocked += 1; 152 self.handle_data_blocked(); 153 } 154 Frame::StreamDataBlocked { stream_id, .. } => { 155 qtrace!("Received StreamDataBlocked"); 156 stats.stream_data_blocked += 1; 157 // Terminate connection with STREAM_STATE_ERROR if send-only 158 // stream (-transport 19.13) 159 if stream_id.is_send_only(self.role) { 160 return Err(Error::StreamStateError); 161 } 162 163 if let (_, Some(rs)) = self.obtain_stream(stream_id)? { 164 rs.send_flowc_update(); 165 } 166 } 167 Frame::StreamsBlocked { .. } => { 168 stats.streams_blocked += 1; 169 // We send an update evry time we retire a stream. There is no need to 170 // trigger flow updates here. 171 } 172 _ => unreachable!("This is not a stream Frame"), 173 } 174 Ok(()) 175 } 176 write_maintenance_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )177 fn write_maintenance_frames( 178 &mut self, 179 builder: &mut PacketBuilder, 180 tokens: &mut Vec<RecoveryToken>, 181 stats: &mut FrameStats, 182 ) { 183 // Send `DATA_BLOCKED` as necessary. 184 self.sender_fc 185 .borrow_mut() 186 .write_frames(builder, tokens, stats); 187 if builder.is_full() { 188 return; 189 } 190 191 // Send `MAX_DATA` as necessary. 192 self.receiver_fc 193 .borrow_mut() 194 .write_frames(builder, tokens, stats); 195 if builder.is_full() { 196 return; 197 } 198 199 self.recv.write_frames(builder, tokens, stats); 200 201 self.remote_stream_limits[StreamType::BiDi].write_frames(builder, tokens, stats); 202 if builder.is_full() { 203 return; 204 } 205 self.remote_stream_limits[StreamType::UniDi].write_frames(builder, tokens, stats); 206 if builder.is_full() { 207 return; 208 } 209 210 self.local_stream_limits[StreamType::BiDi].write_frames(builder, tokens, stats); 211 if builder.is_full() { 212 return; 213 } 214 215 self.local_stream_limits[StreamType::UniDi].write_frames(builder, tokens, stats); 216 } 217 write_frames( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )218 pub fn write_frames( 219 &mut self, 220 priority: TransmissionPriority, 221 builder: &mut PacketBuilder, 222 tokens: &mut Vec<RecoveryToken>, 223 stats: &mut FrameStats, 224 ) { 225 if priority == TransmissionPriority::Important { 226 self.write_maintenance_frames(builder, tokens, stats); 227 if builder.is_full() { 228 return; 229 } 230 } 231 232 self.send.write_frames(priority, builder, tokens, stats); 233 } 234 lost(&mut self, token: &StreamRecoveryToken)235 pub fn lost(&mut self, token: &StreamRecoveryToken) { 236 match token { 237 StreamRecoveryToken::Stream(st) => self.send.lost(st), 238 StreamRecoveryToken::ResetStream { stream_id } => self.send.reset_lost(*stream_id), 239 StreamRecoveryToken::StreamDataBlocked { stream_id, limit } => { 240 self.send.blocked_lost(*stream_id, *limit) 241 } 242 StreamRecoveryToken::MaxStreamData { 243 stream_id, 244 max_data, 245 } => { 246 if let Ok((_, Some(rs))) = self.obtain_stream(*stream_id) { 247 rs.max_stream_data_lost(*max_data); 248 } 249 } 250 StreamRecoveryToken::StopSending { stream_id } => { 251 if let Ok((_, Some(rs))) = self.obtain_stream(*stream_id) { 252 rs.stop_sending_lost(); 253 } 254 } 255 StreamRecoveryToken::StreamsBlocked { stream_type, limit } => { 256 self.local_stream_limits[*stream_type].frame_lost(*limit); 257 } 258 StreamRecoveryToken::MaxStreams { 259 stream_type, 260 max_streams, 261 } => { 262 self.remote_stream_limits[*stream_type].frame_lost(*max_streams); 263 } 264 StreamRecoveryToken::DataBlocked(limit) => { 265 self.sender_fc.borrow_mut().frame_lost(*limit) 266 } 267 StreamRecoveryToken::MaxData(maximum_data) => { 268 self.receiver_fc.borrow_mut().frame_lost(*maximum_data) 269 } 270 } 271 } 272 acked(&mut self, token: &StreamRecoveryToken)273 pub fn acked(&mut self, token: &StreamRecoveryToken) { 274 match token { 275 StreamRecoveryToken::Stream(st) => self.send.acked(st), 276 StreamRecoveryToken::ResetStream { stream_id } => self.send.reset_acked(*stream_id), 277 StreamRecoveryToken::StopSending { stream_id } => { 278 if let Ok((_, Some(rs))) = self.obtain_stream(*stream_id) { 279 rs.stop_sending_acked(); 280 } 281 } 282 // We only worry when these are lost 283 StreamRecoveryToken::DataBlocked(_) 284 | StreamRecoveryToken::StreamDataBlocked { .. } 285 | StreamRecoveryToken::MaxStreamData { .. } 286 | StreamRecoveryToken::StreamsBlocked { .. } 287 | StreamRecoveryToken::MaxStreams { .. } 288 | StreamRecoveryToken::MaxData(_) => (), 289 } 290 } 291 clear_streams(&mut self)292 pub fn clear_streams(&mut self) { 293 self.send.clear(); 294 self.recv.clear(); 295 } 296 cleanup_closed_streams(&mut self)297 pub fn cleanup_closed_streams(&mut self) { 298 self.send.clear_terminal(); 299 let send = &self.send; 300 let (removed_bidi, removed_uni) = self.recv.clear_terminal(send, self.role); 301 302 // Send max_streams updates if we removed remote-initiated recv streams. 303 // The updates will be send if any steams has been removed. 304 self.remote_stream_limits[StreamType::BiDi].add_retired(removed_bidi); 305 self.remote_stream_limits[StreamType::UniDi].add_retired(removed_uni); 306 } 307 ensure_created_if_remote(&mut self, stream_id: StreamId) -> Res<()>308 fn ensure_created_if_remote(&mut self, stream_id: StreamId) -> Res<()> { 309 if !stream_id.is_remote_initiated(self.role) 310 || !self.remote_stream_limits[stream_id.stream_type()].is_new_stream(stream_id)? 311 { 312 // If it is not a remote stream and stream already exist. 313 return Ok(()); 314 } 315 316 let tp = match stream_id.stream_type() { 317 // From the local perspective, this is a remote- originated BiDi stream. From 318 // the remote perspective, this is a local-originated BiDi stream. Therefore, 319 // look at the local transport parameters for the 320 // INITIAL_MAX_STREAM_DATA_BIDI_REMOTE value to decide how much this endpoint 321 // will allow its peer to send. 322 StreamType::BiDi => tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE, 323 StreamType::UniDi => tparams::INITIAL_MAX_STREAM_DATA_UNI, 324 }; 325 let recv_initial_max_stream_data = self.tps.borrow().local.get_integer(tp); 326 327 while self.remote_stream_limits[stream_id.stream_type()].is_new_stream(stream_id)? { 328 let next_stream_id = 329 self.remote_stream_limits[stream_id.stream_type()].take_stream_id(); 330 self.events.new_stream(next_stream_id); 331 332 self.recv.insert( 333 next_stream_id, 334 RecvStream::new( 335 next_stream_id, 336 recv_initial_max_stream_data, 337 Rc::clone(&self.receiver_fc), 338 self.events.clone(), 339 ), 340 ); 341 342 if next_stream_id.is_bidi() { 343 // From the local perspective, this is a remote- originated BiDi stream. 344 // From the remote perspective, this is a local-originated BiDi stream. 345 // Therefore, look at the remote's transport parameters for the 346 // INITIAL_MAX_STREAM_DATA_BIDI_LOCAL value to decide how much this endpoint 347 // is allowed to send its peer. 348 let send_initial_max_stream_data = self 349 .tps 350 .borrow() 351 .remote() 352 .get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL); 353 self.send.insert( 354 next_stream_id, 355 SendStream::new( 356 next_stream_id, 357 send_initial_max_stream_data, 358 Rc::clone(&self.sender_fc), 359 self.events.clone(), 360 ), 361 ); 362 } 363 } 364 Ok(()) 365 } 366 367 /// Get or make a stream, and implicitly open additional streams as 368 /// indicated by its stream id. obtain_stream( &mut self, stream_id: StreamId, ) -> Res<(Option<&mut SendStream>, Option<&mut RecvStream>)>369 pub fn obtain_stream( 370 &mut self, 371 stream_id: StreamId, 372 ) -> Res<(Option<&mut SendStream>, Option<&mut RecvStream>)> { 373 self.ensure_created_if_remote(stream_id)?; 374 Ok(( 375 self.send.get_mut(stream_id).ok(), 376 self.recv.get_mut(stream_id).ok(), 377 )) 378 } 379 stream_create(&mut self, st: StreamType) -> Res<StreamId>380 pub fn stream_create(&mut self, st: StreamType) -> Res<StreamId> { 381 match self.local_stream_limits.take_stream_id(st) { 382 None => Err(Error::StreamLimitError), 383 Some(new_id) => { 384 let send_limit_tp = match st { 385 StreamType::UniDi => tparams::INITIAL_MAX_STREAM_DATA_UNI, 386 StreamType::BiDi => tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE, 387 }; 388 let send_limit = self.tps.borrow().remote().get_integer(send_limit_tp); 389 self.send.insert( 390 new_id, 391 SendStream::new( 392 new_id, 393 send_limit, 394 Rc::clone(&self.sender_fc), 395 self.events.clone(), 396 ), 397 ); 398 if st == StreamType::BiDi { 399 // From the local perspective, this is a local- originated BiDi stream. From the 400 // remote perspective, this is a remote-originated BiDi stream. Therefore, look at 401 // the local transport parameters for the INITIAL_MAX_STREAM_DATA_BIDI_LOCAL value 402 // to decide how much this endpoint will allow its peer to send. 403 let recv_initial_max_stream_data = self 404 .tps 405 .borrow() 406 .local 407 .get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL); 408 409 self.recv.insert( 410 new_id, 411 RecvStream::new( 412 new_id, 413 recv_initial_max_stream_data, 414 Rc::clone(&self.receiver_fc), 415 self.events.clone(), 416 ), 417 ); 418 } 419 Ok(new_id) 420 } 421 } 422 } 423 handle_max_data(&mut self, maximum_data: u64)424 pub fn handle_max_data(&mut self, maximum_data: u64) { 425 let conn_was_blocked = self.sender_fc.borrow().available() == 0; 426 let conn_credit_increased = self.sender_fc.borrow_mut().update(maximum_data); 427 428 if conn_was_blocked && conn_credit_increased { 429 for (id, ss) in &mut self.send { 430 if ss.avail() > 0 { 431 // These may not actually all be writable if one 432 // uses up all the conn credit. Not our fault. 433 self.events.send_stream_writable(*id); 434 } 435 } 436 } 437 } 438 handle_data_blocked(&mut self)439 pub fn handle_data_blocked(&mut self) { 440 self.receiver_fc.borrow_mut().send_flowc_update(); 441 } 442 set_initial_limits(&mut self)443 pub fn set_initial_limits(&mut self) { 444 let _ = self.local_stream_limits[StreamType::BiDi].update( 445 self.tps 446 .borrow() 447 .remote() 448 .get_integer(tparams::INITIAL_MAX_STREAMS_BIDI), 449 ); 450 let _ = self.local_stream_limits[StreamType::UniDi].update( 451 self.tps 452 .borrow() 453 .remote() 454 .get_integer(tparams::INITIAL_MAX_STREAMS_UNI), 455 ); 456 457 // As a client, there are two sets of initial limits for sending stream data. 458 // If the second limit is higher and streams have been created, then 459 // ensure that streams are not blocked on the lower limit. 460 if self.role == Role::Client { 461 self.send.update_initial_limit(self.tps.borrow().remote()); 462 } 463 464 self.sender_fc.borrow_mut().update( 465 self.tps 466 .borrow() 467 .remote() 468 .get_integer(tparams::INITIAL_MAX_DATA), 469 ); 470 471 if self.local_stream_limits[StreamType::BiDi].available() > 0 { 472 self.events.send_stream_creatable(StreamType::BiDi); 473 } 474 if self.local_stream_limits[StreamType::UniDi].available() > 0 { 475 self.events.send_stream_creatable(StreamType::UniDi); 476 } 477 } 478 handle_max_streams(&mut self, stream_type: StreamType, maximum_streams: u64)479 pub fn handle_max_streams(&mut self, stream_type: StreamType, maximum_streams: u64) { 480 if self.local_stream_limits[stream_type].update(maximum_streams) { 481 self.events.send_stream_creatable(stream_type); 482 } 483 } 484 get_send_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut SendStream>485 pub fn get_send_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut SendStream> { 486 self.send.get_mut(stream_id) 487 } 488 get_send_stream(&self, stream_id: StreamId) -> Res<&SendStream>489 pub fn get_send_stream(&self, stream_id: StreamId) -> Res<&SendStream> { 490 self.send.get(stream_id) 491 } 492 get_recv_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut RecvStream>493 pub fn get_recv_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut RecvStream> { 494 self.recv.get_mut(stream_id) 495 } 496 keep_alive(&mut self, stream_id: StreamId, keep: bool) -> Res<()>497 pub fn keep_alive(&mut self, stream_id: StreamId, keep: bool) -> Res<()> { 498 self.recv.keep_alive(stream_id, keep) 499 } 500 need_keep_alive(&mut self) -> bool501 pub fn need_keep_alive(&mut self) -> bool { 502 self.recv.need_keep_alive() 503 } 504 } 505