1 /** @file
2  *
3  *  A brief file description
4  *
5  *  @section license License
6  *
7  *  Licensed to the Apache Software Foundation (ASF) under one
8  *  or more contributor license agreements.  See the NOTICE file
9  *  distributed with this work for additional information
10  *  regarding copyright ownership.  The ASF licenses this file
11  *  to you under the Apache License, Version 2.0 (the
12  *  "License"); you may not use this file except in compliance
13  *  with the License.  You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  *  Unless required by applicable law or agreed to in writing, software
18  *  distributed under the License is distributed on an "AS IS" BASIS,
19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  *  See the License for the specific language governing permissions and
21  *  limitations under the License.
22  */
23 
24 #include "QUICStreamManager.h"
25 
26 #include "QUICApplication.h"
27 #include "QUICTransportParameters.h"
28 
29 static constexpr char tag[]                     = "quic_stream_manager";
30 static constexpr QUICStreamId QUIC_STREAM_TYPES = 4;
31 
QUICStreamManager(QUICContext * context,QUICApplicationMap * app_map)32 QUICStreamManager::QUICStreamManager(QUICContext *context, QUICApplicationMap *app_map)
33   : _stream_factory(context->rtt_provider(), context->connection_info()), _context(context), _app_map(app_map)
34 {
35   if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
36     this->_next_stream_id_bidi = static_cast<uint32_t>(QUICStreamType::CLIENT_BIDI);
37     this->_next_stream_id_uni  = static_cast<uint32_t>(QUICStreamType::CLIENT_UNI);
38   } else {
39     this->_next_stream_id_bidi = static_cast<uint32_t>(QUICStreamType::SERVER_BIDI);
40     this->_next_stream_id_uni  = static_cast<uint32_t>(QUICStreamType::SERVER_UNI);
41   }
42 }
43 
44 std::vector<QUICFrameType>
interests()45 QUICStreamManager::interests()
46 {
47   return {
48     QUICFrameType::STREAM,          QUICFrameType::RESET_STREAM, QUICFrameType::STOP_SENDING,
49     QUICFrameType::MAX_STREAM_DATA, QUICFrameType::MAX_STREAMS,
50   };
51 }
52 
53 void
init_flow_control_params(const std::shared_ptr<const QUICTransportParameters> & local_tp,const std::shared_ptr<const QUICTransportParameters> & remote_tp)54 QUICStreamManager::init_flow_control_params(const std::shared_ptr<const QUICTransportParameters> &local_tp,
55                                             const std::shared_ptr<const QUICTransportParameters> &remote_tp)
56 {
57   this->_local_tp  = local_tp;
58   this->_remote_tp = remote_tp;
59 
60   if (this->_local_tp) {
61     this->_local_max_streams_bidi = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_BIDI);
62     this->_local_max_streams_uni  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_UNI);
63   }
64   if (this->_remote_tp) {
65     this->_remote_max_streams_bidi = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_BIDI);
66     this->_remote_max_streams_uni  = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_UNI);
67   }
68 }
69 
70 void
set_max_streams_bidi(uint64_t max_streams)71 QUICStreamManager::set_max_streams_bidi(uint64_t max_streams)
72 {
73   if (this->_local_max_streams_bidi <= max_streams) {
74     this->_local_max_streams_bidi = max_streams;
75   }
76 }
77 
78 void
set_max_streams_uni(uint64_t max_streams)79 QUICStreamManager::set_max_streams_uni(uint64_t max_streams)
80 {
81   if (this->_local_max_streams_uni <= max_streams) {
82     this->_local_max_streams_uni = max_streams;
83   }
84 }
85 
86 QUICConnectionErrorUPtr
create_stream(QUICStreamId stream_id)87 QUICStreamManager::create_stream(QUICStreamId stream_id)
88 {
89   // TODO: check stream_id
90   QUICConnectionErrorUPtr error    = nullptr;
91   QUICStreamVConnection *stream_vc = this->_find_or_create_stream_vc(stream_id);
92   if (!stream_vc) {
93     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
94   }
95 
96   QUICApplication *application = this->_app_map->get(stream_id);
97 
98   if (!application->is_stream_set(stream_vc)) {
99     application->set_stream(stream_vc);
100   }
101 
102   return error;
103 }
104 
105 QUICConnectionErrorUPtr
create_uni_stream(QUICStreamId & new_stream_id)106 QUICStreamManager::create_uni_stream(QUICStreamId &new_stream_id)
107 {
108   QUICConnectionErrorUPtr error = this->create_stream(this->_next_stream_id_uni);
109   if (error == nullptr) {
110     new_stream_id = this->_next_stream_id_uni;
111     this->_next_stream_id_uni += QUIC_STREAM_TYPES;
112   }
113 
114   return error;
115 }
116 
117 QUICConnectionErrorUPtr
create_bidi_stream(QUICStreamId & new_stream_id)118 QUICStreamManager::create_bidi_stream(QUICStreamId &new_stream_id)
119 {
120   QUICConnectionErrorUPtr error = this->create_stream(this->_next_stream_id_bidi);
121   if (error == nullptr) {
122     new_stream_id = this->_next_stream_id_bidi;
123     this->_next_stream_id_bidi += QUIC_STREAM_TYPES;
124   }
125 
126   return error;
127 }
128 
129 void
reset_stream(QUICStreamId stream_id,QUICStreamErrorUPtr error)130 QUICStreamManager::reset_stream(QUICStreamId stream_id, QUICStreamErrorUPtr error)
131 {
132   auto stream = this->_find_stream_vc(stream_id);
133   stream->reset(std::move(error));
134 }
135 
136 QUICConnectionErrorUPtr
handle_frame(QUICEncryptionLevel level,const QUICFrame & frame)137 QUICStreamManager::handle_frame(QUICEncryptionLevel level, const QUICFrame &frame)
138 {
139   QUICConnectionErrorUPtr error = nullptr;
140 
141   switch (frame.type()) {
142   case QUICFrameType::MAX_STREAM_DATA:
143     error = this->_handle_frame(static_cast<const QUICMaxStreamDataFrame &>(frame));
144     break;
145   case QUICFrameType::STREAM_DATA_BLOCKED:
146     // STREAM_DATA_BLOCKED frame is for debugging. Just propagate to streams
147     error = this->_handle_frame(static_cast<const QUICStreamDataBlockedFrame &>(frame));
148     break;
149   case QUICFrameType::STREAM:
150     error = this->_handle_frame(static_cast<const QUICStreamFrame &>(frame));
151     break;
152   case QUICFrameType::STOP_SENDING:
153     error = this->_handle_frame(static_cast<const QUICStopSendingFrame &>(frame));
154     break;
155   case QUICFrameType::RESET_STREAM:
156     error = this->_handle_frame(static_cast<const QUICRstStreamFrame &>(frame));
157     break;
158   case QUICFrameType::MAX_STREAMS:
159     error = this->_handle_frame(static_cast<const QUICMaxStreamsFrame &>(frame));
160     break;
161   default:
162     Debug(tag, "Unexpected frame type: %02x", static_cast<unsigned int>(frame.type()));
163     ink_assert(false);
164     break;
165   }
166 
167   return error;
168 }
169 
170 QUICConnectionErrorUPtr
_handle_frame(const QUICMaxStreamDataFrame & frame)171 QUICStreamManager::_handle_frame(const QUICMaxStreamDataFrame &frame)
172 {
173   QUICStreamVConnection *stream = this->_find_or_create_stream_vc(frame.stream_id());
174   if (stream) {
175     return stream->recv(frame);
176   } else {
177     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
178   }
179 }
180 
181 QUICConnectionErrorUPtr
_handle_frame(const QUICStreamDataBlockedFrame & frame)182 QUICStreamManager::_handle_frame(const QUICStreamDataBlockedFrame &frame)
183 {
184   QUICStreamVConnection *stream = this->_find_or_create_stream_vc(frame.stream_id());
185   if (stream) {
186     return stream->recv(frame);
187   } else {
188     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
189   }
190 }
191 
192 QUICConnectionErrorUPtr
_handle_frame(const QUICStreamFrame & frame)193 QUICStreamManager::_handle_frame(const QUICStreamFrame &frame)
194 {
195   QUICStreamVConnection *stream = this->_find_or_create_stream_vc(frame.stream_id());
196   if (!stream) {
197     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
198   }
199 
200   QUICApplication *application = this->_app_map->get(frame.stream_id());
201 
202   if (application && !application->is_stream_set(stream)) {
203     application->set_stream(stream);
204   }
205 
206   return stream->recv(frame);
207 }
208 
209 QUICConnectionErrorUPtr
_handle_frame(const QUICRstStreamFrame & frame)210 QUICStreamManager::_handle_frame(const QUICRstStreamFrame &frame)
211 {
212   QUICStream *stream = this->_find_or_create_stream_vc(frame.stream_id());
213   if (stream) {
214     return stream->recv(frame);
215   } else {
216     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
217   }
218 }
219 
220 QUICConnectionErrorUPtr
_handle_frame(const QUICStopSendingFrame & frame)221 QUICStreamManager::_handle_frame(const QUICStopSendingFrame &frame)
222 {
223   QUICStream *stream = this->_find_or_create_stream_vc(frame.stream_id());
224   if (stream) {
225     return stream->recv(frame);
226   } else {
227     return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
228   }
229 }
230 
231 QUICConnectionErrorUPtr
_handle_frame(const QUICMaxStreamsFrame & frame)232 QUICStreamManager::_handle_frame(const QUICMaxStreamsFrame &frame)
233 {
234   QUICStreamType type = QUICTypeUtil::detect_stream_type(frame.maximum_streams());
235   if (type == QUICStreamType::SERVER_BIDI || type == QUICStreamType::CLIENT_BIDI) {
236     this->_remote_max_streams_bidi = frame.maximum_streams();
237   } else {
238     this->_remote_max_streams_uni = frame.maximum_streams();
239   }
240   return nullptr;
241 }
242 
243 QUICStreamVConnection *
_find_stream_vc(QUICStreamId id)244 QUICStreamManager::_find_stream_vc(QUICStreamId id)
245 {
246   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
247     if (s->id() == id) {
248       return s;
249     }
250   }
251   return nullptr;
252 }
253 
254 QUICStreamVConnection *
_find_or_create_stream_vc(QUICStreamId stream_id)255 QUICStreamManager::_find_or_create_stream_vc(QUICStreamId stream_id)
256 {
257   QUICStreamVConnection *stream = this->_find_stream_vc(stream_id);
258   if (!stream) {
259     if (!this->_local_tp) {
260       return nullptr;
261     }
262 
263     ink_assert(this->_local_tp);
264     ink_assert(this->_remote_tp);
265 
266     uint64_t local_max_stream_data  = 0;
267     uint64_t remote_max_stream_data = 0;
268     uint64_t nth_stream             = this->_stream_id_to_nth_stream(stream_id);
269 
270     switch (QUICTypeUtil::detect_stream_type(stream_id)) {
271     case QUICStreamType::CLIENT_BIDI:
272       if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
273         // client
274         if (this->_remote_max_streams_bidi == 0 || nth_stream > this->_remote_max_streams_bidi) {
275           return nullptr;
276         }
277 
278         local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
279         remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
280       } else {
281         // server
282         if (this->_local_max_streams_bidi == 0 || nth_stream > this->_local_max_streams_bidi) {
283           return nullptr;
284         }
285 
286         local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
287         remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
288       }
289 
290       break;
291     case QUICStreamType::CLIENT_UNI:
292       if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
293         // client
294         if (this->_remote_max_streams_uni == 0 || nth_stream > this->_remote_max_streams_uni) {
295           return nullptr;
296         }
297       } else {
298         // server
299         if (this->_local_max_streams_uni == 0 || nth_stream > this->_local_max_streams_uni) {
300           return nullptr;
301         }
302       }
303 
304       local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
305       remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
306 
307       break;
308     case QUICStreamType::SERVER_BIDI:
309       if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
310         // client
311         if (this->_local_max_streams_bidi == 0 || nth_stream > this->_local_max_streams_bidi) {
312           return nullptr;
313         }
314 
315         local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
316         remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
317       } else {
318         // server
319         if (this->_remote_max_streams_bidi == 0 || nth_stream > this->_remote_max_streams_bidi) {
320           return nullptr;
321         }
322 
323         local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
324         remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
325       }
326       break;
327     case QUICStreamType::SERVER_UNI:
328       if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
329         if (this->_local_max_streams_uni == 0 || nth_stream > this->_local_max_streams_uni) {
330           return nullptr;
331         }
332       } else {
333         if (this->_remote_max_streams_uni == 0 || nth_stream > this->_remote_max_streams_uni) {
334           return nullptr;
335         }
336       }
337 
338       local_max_stream_data  = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
339       remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
340 
341       break;
342     default:
343       ink_release_assert(false);
344       break;
345     }
346 
347     stream = this->_stream_factory.create(stream_id, local_max_stream_data, remote_max_stream_data);
348     ink_assert(stream != nullptr);
349     this->stream_list.push(stream);
350   }
351 
352   return stream;
353 }
354 
355 uint64_t
total_reordered_bytes() const356 QUICStreamManager::total_reordered_bytes() const
357 {
358   uint64_t total_bytes = 0;
359 
360   // FIXME Iterating all (open + closed) streams is expensive
361   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
362     total_bytes += s->reordered_bytes();
363   }
364   return total_bytes;
365 }
366 
367 uint64_t
total_offset_received() const368 QUICStreamManager::total_offset_received() const
369 {
370   uint64_t total_offset_received = 0;
371 
372   // FIXME Iterating all (open + closed) streams is expensive
373   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
374     total_offset_received += s->largest_offset_received();
375   }
376   return total_offset_received;
377 }
378 
379 uint64_t
total_offset_sent() const380 QUICStreamManager::total_offset_sent() const
381 {
382   return this->_total_offset_sent;
383 }
384 
385 void
_add_total_offset_sent(uint32_t sent_byte)386 QUICStreamManager::_add_total_offset_sent(uint32_t sent_byte)
387 {
388   // FIXME: use atomic increment
389   this->_total_offset_sent += sent_byte;
390 }
391 
392 uint32_t
stream_count() const393 QUICStreamManager::stream_count() const
394 {
395   uint32_t count = 0;
396   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
397     ++count;
398   }
399   return count;
400 }
401 
402 void
set_default_application(QUICApplication * app)403 QUICStreamManager::set_default_application(QUICApplication *app)
404 {
405   this->_app_map->set_default(app);
406 }
407 
408 bool
will_generate_frame(QUICEncryptionLevel level,size_t current_packet_size,bool ack_eliciting,uint32_t seq_num)409 QUICStreamManager::will_generate_frame(QUICEncryptionLevel level, size_t current_packet_size, bool ack_eliciting, uint32_t seq_num)
410 {
411   if (!this->_is_level_matched(level)) {
412     return false;
413   }
414 
415   // workaround fix until support 0-RTT on client
416   if (level == QUICEncryptionLevel::ZERO_RTT) {
417     return false;
418   }
419 
420   // Don't send DATA frames if current path is not validated
421   if (!this->_context->path_manager()->get_verified_path().remote_ep().isValid()) {
422     return false;
423   }
424 
425   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
426     if (s->will_generate_frame(level, current_packet_size, ack_eliciting, seq_num)) {
427       return true;
428     }
429   }
430 
431   return false;
432 }
433 
434 QUICFrame *
generate_frame(uint8_t * buf,QUICEncryptionLevel level,uint64_t connection_credit,uint16_t maximum_frame_size,size_t current_packet_size,uint32_t seq_num)435 QUICStreamManager::generate_frame(uint8_t *buf, QUICEncryptionLevel level, uint64_t connection_credit, uint16_t maximum_frame_size,
436                                   size_t current_packet_size, uint32_t seq_num)
437 {
438   QUICFrame *frame = nullptr;
439 
440   if (!this->_is_level_matched(level)) {
441     return frame;
442   }
443 
444   // workaround fix until support 0-RTT on client
445   if (level == QUICEncryptionLevel::ZERO_RTT) {
446     return frame;
447   }
448 
449   // FIXME We should pick a stream based on priority
450   for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
451     frame = s->generate_frame(buf, level, connection_credit, maximum_frame_size, current_packet_size, seq_num);
452     if (frame) {
453       break;
454     }
455   }
456 
457   if (frame != nullptr && frame->type() == QUICFrameType::STREAM) {
458     this->_add_total_offset_sent(static_cast<QUICStreamFrame *>(frame)->data_length());
459   }
460 
461   return frame;
462 }
463 
464 bool
_is_level_matched(QUICEncryptionLevel level)465 QUICStreamManager::_is_level_matched(QUICEncryptionLevel level)
466 {
467   for (auto l : this->_encryption_level_filter) {
468     if (l == level) {
469       return true;
470     }
471   }
472 
473   return false;
474 }
475 
476 uint64_t
_stream_id_to_nth_stream(QUICStreamId stream_id)477 QUICStreamManager::_stream_id_to_nth_stream(QUICStreamId stream_id)
478 {
479   return (stream_id / 4) + 1;
480 }
481