1 /** @file
2 *
3 * A brief file description
4 *
5 * @section license License
6 *
7 * Licensed to the Apache Software Foundation (ASF) under one
8 * or more contributor license agreements. See the NOTICE file
9 * distributed with this work for additional information
10 * regarding copyright ownership. The ASF licenses this file
11 * to you under the Apache License, Version 2.0 (the
12 * "License"); you may not use this file except in compliance
13 * with the License. You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24 #include "QUICStreamManager.h"
25
26 #include "QUICApplication.h"
27 #include "QUICTransportParameters.h"
28
29 static constexpr char tag[] = "quic_stream_manager";
30 static constexpr QUICStreamId QUIC_STREAM_TYPES = 4;
31
QUICStreamManager(QUICContext * context,QUICApplicationMap * app_map)32 QUICStreamManager::QUICStreamManager(QUICContext *context, QUICApplicationMap *app_map)
33 : _stream_factory(context->rtt_provider(), context->connection_info()), _context(context), _app_map(app_map)
34 {
35 if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
36 this->_next_stream_id_bidi = static_cast<uint32_t>(QUICStreamType::CLIENT_BIDI);
37 this->_next_stream_id_uni = static_cast<uint32_t>(QUICStreamType::CLIENT_UNI);
38 } else {
39 this->_next_stream_id_bidi = static_cast<uint32_t>(QUICStreamType::SERVER_BIDI);
40 this->_next_stream_id_uni = static_cast<uint32_t>(QUICStreamType::SERVER_UNI);
41 }
42 }
43
44 std::vector<QUICFrameType>
interests()45 QUICStreamManager::interests()
46 {
47 return {
48 QUICFrameType::STREAM, QUICFrameType::RESET_STREAM, QUICFrameType::STOP_SENDING,
49 QUICFrameType::MAX_STREAM_DATA, QUICFrameType::MAX_STREAMS,
50 };
51 }
52
53 void
init_flow_control_params(const std::shared_ptr<const QUICTransportParameters> & local_tp,const std::shared_ptr<const QUICTransportParameters> & remote_tp)54 QUICStreamManager::init_flow_control_params(const std::shared_ptr<const QUICTransportParameters> &local_tp,
55 const std::shared_ptr<const QUICTransportParameters> &remote_tp)
56 {
57 this->_local_tp = local_tp;
58 this->_remote_tp = remote_tp;
59
60 if (this->_local_tp) {
61 this->_local_max_streams_bidi = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_BIDI);
62 this->_local_max_streams_uni = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_UNI);
63 }
64 if (this->_remote_tp) {
65 this->_remote_max_streams_bidi = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_BIDI);
66 this->_remote_max_streams_uni = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAMS_UNI);
67 }
68 }
69
70 void
set_max_streams_bidi(uint64_t max_streams)71 QUICStreamManager::set_max_streams_bidi(uint64_t max_streams)
72 {
73 if (this->_local_max_streams_bidi <= max_streams) {
74 this->_local_max_streams_bidi = max_streams;
75 }
76 }
77
78 void
set_max_streams_uni(uint64_t max_streams)79 QUICStreamManager::set_max_streams_uni(uint64_t max_streams)
80 {
81 if (this->_local_max_streams_uni <= max_streams) {
82 this->_local_max_streams_uni = max_streams;
83 }
84 }
85
86 QUICConnectionErrorUPtr
create_stream(QUICStreamId stream_id)87 QUICStreamManager::create_stream(QUICStreamId stream_id)
88 {
89 // TODO: check stream_id
90 QUICConnectionErrorUPtr error = nullptr;
91 QUICStreamVConnection *stream_vc = this->_find_or_create_stream_vc(stream_id);
92 if (!stream_vc) {
93 return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
94 }
95
96 QUICApplication *application = this->_app_map->get(stream_id);
97
98 if (!application->is_stream_set(stream_vc)) {
99 application->set_stream(stream_vc);
100 }
101
102 return error;
103 }
104
105 QUICConnectionErrorUPtr
create_uni_stream(QUICStreamId & new_stream_id)106 QUICStreamManager::create_uni_stream(QUICStreamId &new_stream_id)
107 {
108 QUICConnectionErrorUPtr error = this->create_stream(this->_next_stream_id_uni);
109 if (error == nullptr) {
110 new_stream_id = this->_next_stream_id_uni;
111 this->_next_stream_id_uni += QUIC_STREAM_TYPES;
112 }
113
114 return error;
115 }
116
117 QUICConnectionErrorUPtr
create_bidi_stream(QUICStreamId & new_stream_id)118 QUICStreamManager::create_bidi_stream(QUICStreamId &new_stream_id)
119 {
120 QUICConnectionErrorUPtr error = this->create_stream(this->_next_stream_id_bidi);
121 if (error == nullptr) {
122 new_stream_id = this->_next_stream_id_bidi;
123 this->_next_stream_id_bidi += QUIC_STREAM_TYPES;
124 }
125
126 return error;
127 }
128
129 void
reset_stream(QUICStreamId stream_id,QUICStreamErrorUPtr error)130 QUICStreamManager::reset_stream(QUICStreamId stream_id, QUICStreamErrorUPtr error)
131 {
132 auto stream = this->_find_stream_vc(stream_id);
133 stream->reset(std::move(error));
134 }
135
136 QUICConnectionErrorUPtr
handle_frame(QUICEncryptionLevel level,const QUICFrame & frame)137 QUICStreamManager::handle_frame(QUICEncryptionLevel level, const QUICFrame &frame)
138 {
139 QUICConnectionErrorUPtr error = nullptr;
140
141 switch (frame.type()) {
142 case QUICFrameType::MAX_STREAM_DATA:
143 error = this->_handle_frame(static_cast<const QUICMaxStreamDataFrame &>(frame));
144 break;
145 case QUICFrameType::STREAM_DATA_BLOCKED:
146 // STREAM_DATA_BLOCKED frame is for debugging. Just propagate to streams
147 error = this->_handle_frame(static_cast<const QUICStreamDataBlockedFrame &>(frame));
148 break;
149 case QUICFrameType::STREAM:
150 error = this->_handle_frame(static_cast<const QUICStreamFrame &>(frame));
151 break;
152 case QUICFrameType::STOP_SENDING:
153 error = this->_handle_frame(static_cast<const QUICStopSendingFrame &>(frame));
154 break;
155 case QUICFrameType::RESET_STREAM:
156 error = this->_handle_frame(static_cast<const QUICRstStreamFrame &>(frame));
157 break;
158 case QUICFrameType::MAX_STREAMS:
159 error = this->_handle_frame(static_cast<const QUICMaxStreamsFrame &>(frame));
160 break;
161 default:
162 Debug(tag, "Unexpected frame type: %02x", static_cast<unsigned int>(frame.type()));
163 ink_assert(false);
164 break;
165 }
166
167 return error;
168 }
169
170 QUICConnectionErrorUPtr
_handle_frame(const QUICMaxStreamDataFrame & frame)171 QUICStreamManager::_handle_frame(const QUICMaxStreamDataFrame &frame)
172 {
173 QUICStreamVConnection *stream = this->_find_or_create_stream_vc(frame.stream_id());
174 if (stream) {
175 return stream->recv(frame);
176 } else {
177 return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
178 }
179 }
180
181 QUICConnectionErrorUPtr
_handle_frame(const QUICStreamDataBlockedFrame & frame)182 QUICStreamManager::_handle_frame(const QUICStreamDataBlockedFrame &frame)
183 {
184 QUICStreamVConnection *stream = this->_find_or_create_stream_vc(frame.stream_id());
185 if (stream) {
186 return stream->recv(frame);
187 } else {
188 return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
189 }
190 }
191
192 QUICConnectionErrorUPtr
_handle_frame(const QUICStreamFrame & frame)193 QUICStreamManager::_handle_frame(const QUICStreamFrame &frame)
194 {
195 QUICStreamVConnection *stream = this->_find_or_create_stream_vc(frame.stream_id());
196 if (!stream) {
197 return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
198 }
199
200 QUICApplication *application = this->_app_map->get(frame.stream_id());
201
202 if (application && !application->is_stream_set(stream)) {
203 application->set_stream(stream);
204 }
205
206 return stream->recv(frame);
207 }
208
209 QUICConnectionErrorUPtr
_handle_frame(const QUICRstStreamFrame & frame)210 QUICStreamManager::_handle_frame(const QUICRstStreamFrame &frame)
211 {
212 QUICStream *stream = this->_find_or_create_stream_vc(frame.stream_id());
213 if (stream) {
214 return stream->recv(frame);
215 } else {
216 return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
217 }
218 }
219
220 QUICConnectionErrorUPtr
_handle_frame(const QUICStopSendingFrame & frame)221 QUICStreamManager::_handle_frame(const QUICStopSendingFrame &frame)
222 {
223 QUICStream *stream = this->_find_or_create_stream_vc(frame.stream_id());
224 if (stream) {
225 return stream->recv(frame);
226 } else {
227 return std::make_unique<QUICConnectionError>(QUICTransErrorCode::STREAM_LIMIT_ERROR);
228 }
229 }
230
231 QUICConnectionErrorUPtr
_handle_frame(const QUICMaxStreamsFrame & frame)232 QUICStreamManager::_handle_frame(const QUICMaxStreamsFrame &frame)
233 {
234 QUICStreamType type = QUICTypeUtil::detect_stream_type(frame.maximum_streams());
235 if (type == QUICStreamType::SERVER_BIDI || type == QUICStreamType::CLIENT_BIDI) {
236 this->_remote_max_streams_bidi = frame.maximum_streams();
237 } else {
238 this->_remote_max_streams_uni = frame.maximum_streams();
239 }
240 return nullptr;
241 }
242
243 QUICStreamVConnection *
_find_stream_vc(QUICStreamId id)244 QUICStreamManager::_find_stream_vc(QUICStreamId id)
245 {
246 for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
247 if (s->id() == id) {
248 return s;
249 }
250 }
251 return nullptr;
252 }
253
254 QUICStreamVConnection *
_find_or_create_stream_vc(QUICStreamId stream_id)255 QUICStreamManager::_find_or_create_stream_vc(QUICStreamId stream_id)
256 {
257 QUICStreamVConnection *stream = this->_find_stream_vc(stream_id);
258 if (!stream) {
259 if (!this->_local_tp) {
260 return nullptr;
261 }
262
263 ink_assert(this->_local_tp);
264 ink_assert(this->_remote_tp);
265
266 uint64_t local_max_stream_data = 0;
267 uint64_t remote_max_stream_data = 0;
268 uint64_t nth_stream = this->_stream_id_to_nth_stream(stream_id);
269
270 switch (QUICTypeUtil::detect_stream_type(stream_id)) {
271 case QUICStreamType::CLIENT_BIDI:
272 if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
273 // client
274 if (this->_remote_max_streams_bidi == 0 || nth_stream > this->_remote_max_streams_bidi) {
275 return nullptr;
276 }
277
278 local_max_stream_data = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
279 remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
280 } else {
281 // server
282 if (this->_local_max_streams_bidi == 0 || nth_stream > this->_local_max_streams_bidi) {
283 return nullptr;
284 }
285
286 local_max_stream_data = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
287 remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
288 }
289
290 break;
291 case QUICStreamType::CLIENT_UNI:
292 if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
293 // client
294 if (this->_remote_max_streams_uni == 0 || nth_stream > this->_remote_max_streams_uni) {
295 return nullptr;
296 }
297 } else {
298 // server
299 if (this->_local_max_streams_uni == 0 || nth_stream > this->_local_max_streams_uni) {
300 return nullptr;
301 }
302 }
303
304 local_max_stream_data = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
305 remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
306
307 break;
308 case QUICStreamType::SERVER_BIDI:
309 if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
310 // client
311 if (this->_local_max_streams_bidi == 0 || nth_stream > this->_local_max_streams_bidi) {
312 return nullptr;
313 }
314
315 local_max_stream_data = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
316 remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
317 } else {
318 // server
319 if (this->_remote_max_streams_bidi == 0 || nth_stream > this->_remote_max_streams_bidi) {
320 return nullptr;
321 }
322
323 local_max_stream_data = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
324 remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE);
325 }
326 break;
327 case QUICStreamType::SERVER_UNI:
328 if (this->_context->connection_info()->direction() == NET_VCONNECTION_OUT) {
329 if (this->_local_max_streams_uni == 0 || nth_stream > this->_local_max_streams_uni) {
330 return nullptr;
331 }
332 } else {
333 if (this->_remote_max_streams_uni == 0 || nth_stream > this->_remote_max_streams_uni) {
334 return nullptr;
335 }
336 }
337
338 local_max_stream_data = this->_local_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
339 remote_max_stream_data = this->_remote_tp->getAsUInt(QUICTransportParameterId::INITIAL_MAX_STREAM_DATA_UNI);
340
341 break;
342 default:
343 ink_release_assert(false);
344 break;
345 }
346
347 stream = this->_stream_factory.create(stream_id, local_max_stream_data, remote_max_stream_data);
348 ink_assert(stream != nullptr);
349 this->stream_list.push(stream);
350 }
351
352 return stream;
353 }
354
355 uint64_t
total_reordered_bytes() const356 QUICStreamManager::total_reordered_bytes() const
357 {
358 uint64_t total_bytes = 0;
359
360 // FIXME Iterating all (open + closed) streams is expensive
361 for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
362 total_bytes += s->reordered_bytes();
363 }
364 return total_bytes;
365 }
366
367 uint64_t
total_offset_received() const368 QUICStreamManager::total_offset_received() const
369 {
370 uint64_t total_offset_received = 0;
371
372 // FIXME Iterating all (open + closed) streams is expensive
373 for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
374 total_offset_received += s->largest_offset_received();
375 }
376 return total_offset_received;
377 }
378
379 uint64_t
total_offset_sent() const380 QUICStreamManager::total_offset_sent() const
381 {
382 return this->_total_offset_sent;
383 }
384
385 void
_add_total_offset_sent(uint32_t sent_byte)386 QUICStreamManager::_add_total_offset_sent(uint32_t sent_byte)
387 {
388 // FIXME: use atomic increment
389 this->_total_offset_sent += sent_byte;
390 }
391
392 uint32_t
stream_count() const393 QUICStreamManager::stream_count() const
394 {
395 uint32_t count = 0;
396 for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
397 ++count;
398 }
399 return count;
400 }
401
402 void
set_default_application(QUICApplication * app)403 QUICStreamManager::set_default_application(QUICApplication *app)
404 {
405 this->_app_map->set_default(app);
406 }
407
408 bool
will_generate_frame(QUICEncryptionLevel level,size_t current_packet_size,bool ack_eliciting,uint32_t seq_num)409 QUICStreamManager::will_generate_frame(QUICEncryptionLevel level, size_t current_packet_size, bool ack_eliciting, uint32_t seq_num)
410 {
411 if (!this->_is_level_matched(level)) {
412 return false;
413 }
414
415 // workaround fix until support 0-RTT on client
416 if (level == QUICEncryptionLevel::ZERO_RTT) {
417 return false;
418 }
419
420 // Don't send DATA frames if current path is not validated
421 if (!this->_context->path_manager()->get_verified_path().remote_ep().isValid()) {
422 return false;
423 }
424
425 for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
426 if (s->will_generate_frame(level, current_packet_size, ack_eliciting, seq_num)) {
427 return true;
428 }
429 }
430
431 return false;
432 }
433
434 QUICFrame *
generate_frame(uint8_t * buf,QUICEncryptionLevel level,uint64_t connection_credit,uint16_t maximum_frame_size,size_t current_packet_size,uint32_t seq_num)435 QUICStreamManager::generate_frame(uint8_t *buf, QUICEncryptionLevel level, uint64_t connection_credit, uint16_t maximum_frame_size,
436 size_t current_packet_size, uint32_t seq_num)
437 {
438 QUICFrame *frame = nullptr;
439
440 if (!this->_is_level_matched(level)) {
441 return frame;
442 }
443
444 // workaround fix until support 0-RTT on client
445 if (level == QUICEncryptionLevel::ZERO_RTT) {
446 return frame;
447 }
448
449 // FIXME We should pick a stream based on priority
450 for (QUICStreamVConnection *s = this->stream_list.head; s; s = s->link.next) {
451 frame = s->generate_frame(buf, level, connection_credit, maximum_frame_size, current_packet_size, seq_num);
452 if (frame) {
453 break;
454 }
455 }
456
457 if (frame != nullptr && frame->type() == QUICFrameType::STREAM) {
458 this->_add_total_offset_sent(static_cast<QUICStreamFrame *>(frame)->data_length());
459 }
460
461 return frame;
462 }
463
464 bool
_is_level_matched(QUICEncryptionLevel level)465 QUICStreamManager::_is_level_matched(QUICEncryptionLevel level)
466 {
467 for (auto l : this->_encryption_level_filter) {
468 if (l == level) {
469 return true;
470 }
471 }
472
473 return false;
474 }
475
476 uint64_t
_stream_id_to_nth_stream(QUICStreamId stream_id)477 QUICStreamManager::_stream_id_to_nth_stream(QUICStreamId stream_id)
478 {
479 return (stream_id / 4) + 1;
480 }
481