1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 // Stream management for a connection.
8 
9 use crate::fc::{LocalStreamLimits, ReceiverFlowControl, RemoteStreamLimits, SenderFlowControl};
10 use crate::frame::Frame;
11 use crate::packet::PacketBuilder;
12 use crate::recovery::{RecoveryToken, StreamRecoveryToken};
13 use crate::recv_stream::{RecvStream, RecvStreams};
14 use crate::send_stream::{SendStream, SendStreams, TransmissionPriority};
15 use crate::stats::FrameStats;
16 use crate::stream_id::{StreamId, StreamType};
17 use crate::tparams::{self, TransportParametersHandler};
18 use crate::ConnectionEvents;
19 use crate::{Error, Res};
20 use neqo_common::{qtrace, qwarn, Role};
21 use std::cell::RefCell;
22 use std::rc::Rc;
23 
24 pub struct Streams {
25     role: Role,
26     tps: Rc<RefCell<TransportParametersHandler>>,
27     events: ConnectionEvents,
28     sender_fc: Rc<RefCell<SenderFlowControl<()>>>,
29     receiver_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
30     remote_stream_limits: RemoteStreamLimits,
31     local_stream_limits: LocalStreamLimits,
32     pub(crate) send: SendStreams,
33     pub(crate) recv: RecvStreams,
34 }
35 
36 impl Streams {
new( tps: Rc<RefCell<TransportParametersHandler>>, role: Role, events: ConnectionEvents, ) -> Self37     pub fn new(
38         tps: Rc<RefCell<TransportParametersHandler>>,
39         role: Role,
40         events: ConnectionEvents,
41     ) -> Self {
42         let limit_bidi = tps
43             .borrow()
44             .local
45             .get_integer(tparams::INITIAL_MAX_STREAMS_BIDI);
46         let limit_uni = tps
47             .borrow()
48             .local
49             .get_integer(tparams::INITIAL_MAX_STREAMS_UNI);
50         let max_data = tps.borrow().local.get_integer(tparams::INITIAL_MAX_DATA);
51         Self {
52             role,
53             tps,
54             events,
55             sender_fc: Rc::new(RefCell::new(SenderFlowControl::new((), 0))),
56             receiver_fc: Rc::new(RefCell::new(ReceiverFlowControl::new((), max_data))),
57             remote_stream_limits: RemoteStreamLimits::new(limit_bidi, limit_uni, role),
58             local_stream_limits: LocalStreamLimits::new(role),
59             send: SendStreams::default(),
60             recv: RecvStreams::default(),
61         }
62     }
63 
is_stream_id_allowed(&self, stream_id: StreamId) -> bool64     pub fn is_stream_id_allowed(&self, stream_id: StreamId) -> bool {
65         self.remote_stream_limits[stream_id.stream_type()].is_allowed(stream_id)
66     }
67 
zero_rtt_rejected(&mut self)68     pub fn zero_rtt_rejected(&mut self) {
69         self.send.clear();
70         self.recv.clear();
71         debug_assert_eq!(
72             self.remote_stream_limits[StreamType::BiDi].max_active(),
73             self.tps
74                 .borrow()
75                 .local
76                 .get_integer(tparams::INITIAL_MAX_STREAMS_BIDI)
77         );
78         debug_assert_eq!(
79             self.remote_stream_limits[StreamType::UniDi].max_active(),
80             self.tps
81                 .borrow()
82                 .local
83                 .get_integer(tparams::INITIAL_MAX_STREAMS_UNI)
84         );
85         self.local_stream_limits = LocalStreamLimits::new(self.role);
86     }
87 
input_frame(&mut self, frame: Frame, stats: &mut FrameStats) -> Res<()>88     pub fn input_frame(&mut self, frame: Frame, stats: &mut FrameStats) -> Res<()> {
89         match frame {
90             Frame::ResetStream {
91                 stream_id,
92                 application_error_code,
93                 final_size,
94             } => {
95                 stats.reset_stream += 1;
96                 if let (_, Some(rs)) = self.obtain_stream(stream_id)? {
97                     rs.reset(application_error_code, final_size)?;
98                 }
99             }
100             Frame::StopSending {
101                 stream_id,
102                 application_error_code,
103             } => {
104                 stats.stop_sending += 1;
105                 self.events
106                     .send_stream_stop_sending(stream_id, application_error_code);
107                 if let (Some(ss), _) = self.obtain_stream(stream_id)? {
108                     ss.reset(application_error_code);
109                 }
110             }
111             Frame::Stream {
112                 fin,
113                 stream_id,
114                 offset,
115                 data,
116                 ..
117             } => {
118                 stats.stream += 1;
119                 if let (_, Some(rs)) = self.obtain_stream(stream_id)? {
120                     rs.inbound_stream_frame(fin, offset, data)?;
121                 }
122             }
123             Frame::MaxData { maximum_data } => {
124                 stats.max_data += 1;
125                 self.handle_max_data(maximum_data);
126             }
127             Frame::MaxStreamData {
128                 stream_id,
129                 maximum_stream_data,
130             } => {
131                 qtrace!(
132                     "Stream {} Received MaxStreamData {}",
133                     stream_id,
134                     maximum_stream_data
135                 );
136                 stats.max_stream_data += 1;
137                 if let (Some(ss), _) = self.obtain_stream(stream_id)? {
138                     ss.set_max_stream_data(maximum_stream_data);
139                 }
140             }
141             Frame::MaxStreams {
142                 stream_type,
143                 maximum_streams,
144             } => {
145                 stats.max_streams += 1;
146                 self.handle_max_streams(stream_type, maximum_streams);
147             }
148             Frame::DataBlocked { data_limit } => {
149                 // Should never happen since we set data limit to max
150                 qwarn!("Received DataBlocked with data limit {}", data_limit);
151                 stats.data_blocked += 1;
152                 self.handle_data_blocked();
153             }
154             Frame::StreamDataBlocked { stream_id, .. } => {
155                 qtrace!("Received StreamDataBlocked");
156                 stats.stream_data_blocked += 1;
157                 // Terminate connection with STREAM_STATE_ERROR if send-only
158                 // stream (-transport 19.13)
159                 if stream_id.is_send_only(self.role) {
160                     return Err(Error::StreamStateError);
161                 }
162 
163                 if let (_, Some(rs)) = self.obtain_stream(stream_id)? {
164                     rs.send_flowc_update();
165                 }
166             }
167             Frame::StreamsBlocked { .. } => {
168                 stats.streams_blocked += 1;
169                 // We send an update evry time we retire a stream. There is no need to
170                 // trigger flow updates here.
171             }
172             _ => unreachable!("This is not a stream Frame"),
173         }
174         Ok(())
175     }
176 
write_maintenance_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )177     fn write_maintenance_frames(
178         &mut self,
179         builder: &mut PacketBuilder,
180         tokens: &mut Vec<RecoveryToken>,
181         stats: &mut FrameStats,
182     ) {
183         // Send `DATA_BLOCKED` as necessary.
184         self.sender_fc
185             .borrow_mut()
186             .write_frames(builder, tokens, stats);
187         if builder.is_full() {
188             return;
189         }
190 
191         // Send `MAX_DATA` as necessary.
192         self.receiver_fc
193             .borrow_mut()
194             .write_frames(builder, tokens, stats);
195         if builder.is_full() {
196             return;
197         }
198 
199         self.recv.write_frames(builder, tokens, stats);
200 
201         self.remote_stream_limits[StreamType::BiDi].write_frames(builder, tokens, stats);
202         if builder.is_full() {
203             return;
204         }
205         self.remote_stream_limits[StreamType::UniDi].write_frames(builder, tokens, stats);
206         if builder.is_full() {
207             return;
208         }
209 
210         self.local_stream_limits[StreamType::BiDi].write_frames(builder, tokens, stats);
211         if builder.is_full() {
212             return;
213         }
214 
215         self.local_stream_limits[StreamType::UniDi].write_frames(builder, tokens, stats);
216     }
217 
write_frames( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )218     pub fn write_frames(
219         &mut self,
220         priority: TransmissionPriority,
221         builder: &mut PacketBuilder,
222         tokens: &mut Vec<RecoveryToken>,
223         stats: &mut FrameStats,
224     ) {
225         if priority == TransmissionPriority::Important {
226             self.write_maintenance_frames(builder, tokens, stats);
227             if builder.is_full() {
228                 return;
229             }
230         }
231 
232         self.send.write_frames(priority, builder, tokens, stats);
233     }
234 
lost(&mut self, token: &StreamRecoveryToken)235     pub fn lost(&mut self, token: &StreamRecoveryToken) {
236         match token {
237             StreamRecoveryToken::Stream(st) => self.send.lost(st),
238             StreamRecoveryToken::ResetStream { stream_id } => self.send.reset_lost(*stream_id),
239             StreamRecoveryToken::StreamDataBlocked { stream_id, limit } => {
240                 self.send.blocked_lost(*stream_id, *limit)
241             }
242             StreamRecoveryToken::MaxStreamData {
243                 stream_id,
244                 max_data,
245             } => {
246                 if let Ok((_, Some(rs))) = self.obtain_stream(*stream_id) {
247                     rs.max_stream_data_lost(*max_data);
248                 }
249             }
250             StreamRecoveryToken::StopSending { stream_id } => {
251                 if let Ok((_, Some(rs))) = self.obtain_stream(*stream_id) {
252                     rs.stop_sending_lost();
253                 }
254             }
255             StreamRecoveryToken::StreamsBlocked { stream_type, limit } => {
256                 self.local_stream_limits[*stream_type].frame_lost(*limit);
257             }
258             StreamRecoveryToken::MaxStreams {
259                 stream_type,
260                 max_streams,
261             } => {
262                 self.remote_stream_limits[*stream_type].frame_lost(*max_streams);
263             }
264             StreamRecoveryToken::DataBlocked(limit) => {
265                 self.sender_fc.borrow_mut().frame_lost(*limit)
266             }
267             StreamRecoveryToken::MaxData(maximum_data) => {
268                 self.receiver_fc.borrow_mut().frame_lost(*maximum_data)
269             }
270         }
271     }
272 
acked(&mut self, token: &StreamRecoveryToken)273     pub fn acked(&mut self, token: &StreamRecoveryToken) {
274         match token {
275             StreamRecoveryToken::Stream(st) => self.send.acked(st),
276             StreamRecoveryToken::ResetStream { stream_id } => self.send.reset_acked(*stream_id),
277             StreamRecoveryToken::StopSending { stream_id } => {
278                 if let Ok((_, Some(rs))) = self.obtain_stream(*stream_id) {
279                     rs.stop_sending_acked();
280                 }
281             }
282             // We only worry when these are lost
283             StreamRecoveryToken::DataBlocked(_)
284             | StreamRecoveryToken::StreamDataBlocked { .. }
285             | StreamRecoveryToken::MaxStreamData { .. }
286             | StreamRecoveryToken::StreamsBlocked { .. }
287             | StreamRecoveryToken::MaxStreams { .. }
288             | StreamRecoveryToken::MaxData(_) => (),
289         }
290     }
291 
clear_streams(&mut self)292     pub fn clear_streams(&mut self) {
293         self.send.clear();
294         self.recv.clear();
295     }
296 
cleanup_closed_streams(&mut self)297     pub fn cleanup_closed_streams(&mut self) {
298         self.send.clear_terminal();
299         let send = &self.send;
300         let (removed_bidi, removed_uni) = self.recv.clear_terminal(send, self.role);
301 
302         // Send max_streams updates if we removed remote-initiated recv streams.
303         // The updates will be send if any steams has been removed.
304         self.remote_stream_limits[StreamType::BiDi].add_retired(removed_bidi);
305         self.remote_stream_limits[StreamType::UniDi].add_retired(removed_uni);
306     }
307 
ensure_created_if_remote(&mut self, stream_id: StreamId) -> Res<()>308     fn ensure_created_if_remote(&mut self, stream_id: StreamId) -> Res<()> {
309         if !stream_id.is_remote_initiated(self.role)
310             || !self.remote_stream_limits[stream_id.stream_type()].is_new_stream(stream_id)?
311         {
312             // If it is not a remote stream and stream already exist.
313             return Ok(());
314         }
315 
316         let tp = match stream_id.stream_type() {
317             // From the local perspective, this is a remote- originated BiDi stream. From
318             // the remote perspective, this is a local-originated BiDi stream. Therefore,
319             // look at the local transport parameters for the
320             // INITIAL_MAX_STREAM_DATA_BIDI_REMOTE value to decide how much this endpoint
321             // will allow its peer to send.
322             StreamType::BiDi => tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE,
323             StreamType::UniDi => tparams::INITIAL_MAX_STREAM_DATA_UNI,
324         };
325         let recv_initial_max_stream_data = self.tps.borrow().local.get_integer(tp);
326 
327         while self.remote_stream_limits[stream_id.stream_type()].is_new_stream(stream_id)? {
328             let next_stream_id =
329                 self.remote_stream_limits[stream_id.stream_type()].take_stream_id();
330             self.events.new_stream(next_stream_id);
331 
332             self.recv.insert(
333                 next_stream_id,
334                 RecvStream::new(
335                     next_stream_id,
336                     recv_initial_max_stream_data,
337                     Rc::clone(&self.receiver_fc),
338                     self.events.clone(),
339                 ),
340             );
341 
342             if next_stream_id.is_bidi() {
343                 // From the local perspective, this is a remote- originated BiDi stream.
344                 // From the remote perspective, this is a local-originated BiDi stream.
345                 // Therefore, look at the remote's transport parameters for the
346                 // INITIAL_MAX_STREAM_DATA_BIDI_LOCAL value to decide how much this endpoint
347                 // is allowed to send its peer.
348                 let send_initial_max_stream_data = self
349                     .tps
350                     .borrow()
351                     .remote()
352                     .get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
353                 self.send.insert(
354                     next_stream_id,
355                     SendStream::new(
356                         next_stream_id,
357                         send_initial_max_stream_data,
358                         Rc::clone(&self.sender_fc),
359                         self.events.clone(),
360                     ),
361                 );
362             }
363         }
364         Ok(())
365     }
366 
367     /// Get or make a stream, and implicitly open additional streams as
368     /// indicated by its stream id.
obtain_stream( &mut self, stream_id: StreamId, ) -> Res<(Option<&mut SendStream>, Option<&mut RecvStream>)>369     pub fn obtain_stream(
370         &mut self,
371         stream_id: StreamId,
372     ) -> Res<(Option<&mut SendStream>, Option<&mut RecvStream>)> {
373         self.ensure_created_if_remote(stream_id)?;
374         Ok((
375             self.send.get_mut(stream_id).ok(),
376             self.recv.get_mut(stream_id).ok(),
377         ))
378     }
379 
stream_create(&mut self, st: StreamType) -> Res<StreamId>380     pub fn stream_create(&mut self, st: StreamType) -> Res<StreamId> {
381         match self.local_stream_limits.take_stream_id(st) {
382             None => Err(Error::StreamLimitError),
383             Some(new_id) => {
384                 let send_limit_tp = match st {
385                     StreamType::UniDi => tparams::INITIAL_MAX_STREAM_DATA_UNI,
386                     StreamType::BiDi => tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE,
387                 };
388                 let send_limit = self.tps.borrow().remote().get_integer(send_limit_tp);
389                 self.send.insert(
390                     new_id,
391                     SendStream::new(
392                         new_id,
393                         send_limit,
394                         Rc::clone(&self.sender_fc),
395                         self.events.clone(),
396                     ),
397                 );
398                 if st == StreamType::BiDi {
399                     // From the local perspective, this is a local- originated BiDi stream. From the
400                     // remote perspective, this is a remote-originated BiDi stream. Therefore, look at
401                     // the local transport parameters for the INITIAL_MAX_STREAM_DATA_BIDI_LOCAL value
402                     // to decide how much this endpoint will allow its peer to send.
403                     let recv_initial_max_stream_data = self
404                         .tps
405                         .borrow()
406                         .local
407                         .get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
408 
409                     self.recv.insert(
410                         new_id,
411                         RecvStream::new(
412                             new_id,
413                             recv_initial_max_stream_data,
414                             Rc::clone(&self.receiver_fc),
415                             self.events.clone(),
416                         ),
417                     );
418                 }
419                 Ok(new_id)
420             }
421         }
422     }
423 
handle_max_data(&mut self, maximum_data: u64)424     pub fn handle_max_data(&mut self, maximum_data: u64) {
425         let conn_was_blocked = self.sender_fc.borrow().available() == 0;
426         let conn_credit_increased = self.sender_fc.borrow_mut().update(maximum_data);
427 
428         if conn_was_blocked && conn_credit_increased {
429             for (id, ss) in &mut self.send {
430                 if ss.avail() > 0 {
431                     // These may not actually all be writable if one
432                     // uses up all the conn credit. Not our fault.
433                     self.events.send_stream_writable(*id);
434                 }
435             }
436         }
437     }
438 
handle_data_blocked(&mut self)439     pub fn handle_data_blocked(&mut self) {
440         self.receiver_fc.borrow_mut().send_flowc_update();
441     }
442 
set_initial_limits(&mut self)443     pub fn set_initial_limits(&mut self) {
444         let _ = self.local_stream_limits[StreamType::BiDi].update(
445             self.tps
446                 .borrow()
447                 .remote()
448                 .get_integer(tparams::INITIAL_MAX_STREAMS_BIDI),
449         );
450         let _ = self.local_stream_limits[StreamType::UniDi].update(
451             self.tps
452                 .borrow()
453                 .remote()
454                 .get_integer(tparams::INITIAL_MAX_STREAMS_UNI),
455         );
456 
457         // As a client, there are two sets of initial limits for sending stream data.
458         // If the second limit is higher and streams have been created, then
459         // ensure that streams are not blocked on the lower limit.
460         if self.role == Role::Client {
461             self.send.update_initial_limit(self.tps.borrow().remote());
462         }
463 
464         self.sender_fc.borrow_mut().update(
465             self.tps
466                 .borrow()
467                 .remote()
468                 .get_integer(tparams::INITIAL_MAX_DATA),
469         );
470 
471         if self.local_stream_limits[StreamType::BiDi].available() > 0 {
472             self.events.send_stream_creatable(StreamType::BiDi);
473         }
474         if self.local_stream_limits[StreamType::UniDi].available() > 0 {
475             self.events.send_stream_creatable(StreamType::UniDi);
476         }
477     }
478 
handle_max_streams(&mut self, stream_type: StreamType, maximum_streams: u64)479     pub fn handle_max_streams(&mut self, stream_type: StreamType, maximum_streams: u64) {
480         if self.local_stream_limits[stream_type].update(maximum_streams) {
481             self.events.send_stream_creatable(stream_type);
482         }
483     }
484 
get_send_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut SendStream>485     pub fn get_send_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut SendStream> {
486         self.send.get_mut(stream_id)
487     }
488 
get_send_stream(&self, stream_id: StreamId) -> Res<&SendStream>489     pub fn get_send_stream(&self, stream_id: StreamId) -> Res<&SendStream> {
490         self.send.get(stream_id)
491     }
492 
get_recv_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut RecvStream>493     pub fn get_recv_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut RecvStream> {
494         self.recv.get_mut(stream_id)
495     }
496 
keep_alive(&mut self, stream_id: StreamId, keep: bool) -> Res<()>497     pub fn keep_alive(&mut self, stream_id: StreamId, keep: bool) -> Res<()> {
498         self.recv.keep_alive(stream_id, keep)
499     }
500 
need_keep_alive(&mut self) -> bool501     pub fn need_keep_alive(&mut self) -> bool {
502         self.recv.need_keep_alive()
503     }
504 }
505