1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/third_party/quiche/src/quic/core/quic_session.h"
6
7 #include <cstdint>
8 #include <string>
9 #include <utility>
10
11 #include "net/third_party/quiche/src/quic/core/quic_connection.h"
12 #include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
13 #include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
14 #include "net/third_party/quiche/src/quic/core/quic_types.h"
15 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
16 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
17 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
18 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
19 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
20 #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
21 #include "net/third_party/quiche/src/quic/platform/api/quic_map_util.h"
22 #include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
23 #include "net/third_party/quiche/src/quic/platform/api/quic_server_stats.h"
24 #include "net/third_party/quiche/src/quic/platform/api/quic_stack_trace.h"
25 #include "net/third_party/quiche/src/common/platform/api/quiche_str_cat.h"
26 #include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
27
28 using spdy::SpdyPriority;
29
30 namespace quic {
31
32 namespace {
33
34 class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
35 public:
ClosedStreamsCleanUpDelegate(QuicSession * session)36 explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
37 : session_(session) {}
38 ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
39 ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
40 delete;
41
OnAlarm()42 void OnAlarm() override { session_->CleanUpClosedStreams(); }
43
44 private:
45 QuicSession* session_;
46 };
47
48 // TODO(renjietang): remove this function once
49 // gfe2_reloadable_flag_quic_write_with_transmission is deprecated.
CountTransmissionTypeFlag(TransmissionType type)50 void CountTransmissionTypeFlag(TransmissionType type) {
51 switch (type) {
52 case NOT_RETRANSMISSION:
53 QUIC_RELOADABLE_FLAG_COUNT_N(quic_write_with_transmission, 1, 4);
54 break;
55 case HANDSHAKE_RETRANSMISSION:
56 QUIC_RELOADABLE_FLAG_COUNT_N(quic_write_with_transmission, 2, 4);
57 break;
58 case LOSS_RETRANSMISSION:
59 QUIC_RELOADABLE_FLAG_COUNT_N(quic_write_with_transmission, 3, 4);
60 break;
61 default:
62 QUIC_RELOADABLE_FLAG_COUNT_N(quic_write_with_transmission, 4, 4);
63 }
64 }
65
66 } // namespace
67
68 #define ENDPOINT \
69 (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
70
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams)71 QuicSession::QuicSession(
72 QuicConnection* connection,
73 Visitor* owner,
74 const QuicConfig& config,
75 const ParsedQuicVersionVector& supported_versions,
76 QuicStreamCount num_expected_unidirectional_static_streams)
77 : connection_(connection),
78 perspective_(connection->perspective()),
79 visitor_(owner),
80 write_blocked_streams_(connection->transport_version()),
81 config_(config),
82 stream_id_manager_(perspective(),
83 connection->transport_version(),
84 kDefaultMaxStreamsPerConnection,
85 config_.GetMaxBidirectionalStreamsToSend()),
86 v99_streamid_manager_(perspective(),
87 connection->version(),
88 this,
89 0,
90 num_expected_unidirectional_static_streams,
91 config_.GetMaxBidirectionalStreamsToSend(),
92 config_.GetMaxUnidirectionalStreamsToSend() +
93 num_expected_unidirectional_static_streams),
94 num_dynamic_incoming_streams_(0),
95 num_draining_incoming_streams_(0),
96 num_outgoing_static_streams_(0),
97 num_incoming_static_streams_(0),
98 num_locally_closed_incoming_streams_highest_offset_(0),
99 flow_controller_(
100 this,
101 QuicUtils::GetInvalidStreamId(connection->transport_version()),
102 /*is_connection_flow_controller*/ true,
103 connection->version().AllowsLowFlowControlLimits()
104 ? 0
105 : kMinimumFlowControlSendWindow,
106 config_.GetInitialSessionFlowControlWindowToSend(),
107 kSessionReceiveWindowLimit,
108 perspective() == Perspective::IS_SERVER,
109 nullptr),
110 currently_writing_stream_id_(0),
111 goaway_sent_(false),
112 goaway_received_(false),
113 control_frame_manager_(this),
114 last_message_id_(0),
115 datagram_queue_(this),
116 closed_streams_clean_up_alarm_(nullptr),
117 supported_versions_(supported_versions),
118 use_http2_priority_write_scheduler_(false),
119 is_configured_(false),
120 num_expected_unidirectional_static_streams_(
121 num_expected_unidirectional_static_streams),
122 enable_round_robin_scheduling_(false),
123 write_with_transmission_(
124 GetQuicReloadableFlag(quic_write_with_transmission)) {
125 closed_streams_clean_up_alarm_ =
126 QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
127 new ClosedStreamsCleanUpDelegate(this)));
128 if (perspective() == Perspective::IS_SERVER &&
129 connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
130 config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
131 }
132 }
133
Initialize()134 void QuicSession::Initialize() {
135 connection_->set_visitor(this);
136 connection_->SetSessionNotifier(this);
137 connection_->SetDataProducer(this);
138 connection_->SetFromConfig(config_);
139
140 // On the server side, version negotiation has been done by the dispatcher,
141 // and the server session is created with the right version.
142 if (perspective() == Perspective::IS_SERVER) {
143 connection_->OnSuccessfulVersionNegotiation();
144 }
145
146 if (QuicVersionUsesCryptoFrames(transport_version())) {
147 return;
148 }
149
150 DCHECK_EQ(QuicUtils::GetCryptoStreamId(transport_version()),
151 GetMutableCryptoStream()->id());
152 }
153
~QuicSession()154 QuicSession::~QuicSession() {
155 QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
156 QUIC_LOG_IF(WARNING, num_locally_closed_incoming_streams_highest_offset() >
157 stream_id_manager_.max_open_incoming_streams())
158 << "Surprisingly high number of locally closed peer initiated streams"
159 "still waiting for final byte offset: "
160 << num_locally_closed_incoming_streams_highest_offset();
161 QUIC_LOG_IF(WARNING, GetNumLocallyClosedOutgoingStreamsHighestOffset() >
162 stream_id_manager_.max_open_outgoing_streams())
163 << "Surprisingly high number of locally closed self initiated streams"
164 "still waiting for final byte offset: "
165 << GetNumLocallyClosedOutgoingStreamsHighestOffset();
166 }
167
PendingStreamOnStreamFrame(const QuicStreamFrame & frame)168 void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
169 DCHECK(VersionUsesHttp3(transport_version()));
170 QuicStreamId stream_id = frame.stream_id;
171
172 PendingStream* pending = GetOrCreatePendingStream(stream_id);
173
174 if (!pending) {
175 if (frame.fin) {
176 QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
177 OnFinalByteOffsetReceived(stream_id, final_byte_offset);
178 }
179 return;
180 }
181
182 pending->OnStreamFrame(frame);
183 if (!connection()->connected()) {
184 return;
185 }
186 if (ProcessPendingStream(pending)) {
187 // The pending stream should now be in the scope of normal streams.
188 DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
189 << "Stream " << stream_id << " not created";
190 pending_stream_map_.erase(stream_id);
191 return;
192 }
193 if (pending->sequencer()->IsClosed()) {
194 ClosePendingStream(stream_id);
195 }
196 }
197
OnStreamFrame(const QuicStreamFrame & frame)198 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
199 QuicStreamId stream_id = frame.stream_id;
200 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
201 connection()->CloseConnection(
202 QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
203 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
204 return;
205 }
206
207 if (UsesPendingStreams() &&
208 QuicUtils::GetStreamType(stream_id, perspective(),
209 IsIncomingStream(stream_id)) ==
210 READ_UNIDIRECTIONAL &&
211 stream_map_.find(stream_id) == stream_map_.end()) {
212 PendingStreamOnStreamFrame(frame);
213 return;
214 }
215
216 QuicStream* stream = GetOrCreateStream(stream_id);
217
218 if (!stream) {
219 // The stream no longer exists, but we may still be interested in the
220 // final stream byte offset sent by the peer. A frame with a FIN can give
221 // us this offset.
222 if (frame.fin) {
223 QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
224 OnFinalByteOffsetReceived(stream_id, final_byte_offset);
225 }
226 return;
227 }
228 stream->OnStreamFrame(frame);
229 }
230
OnCryptoFrame(const QuicCryptoFrame & frame)231 void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
232 GetMutableCryptoStream()->OnCryptoFrame(frame);
233 }
234
OnStopSendingFrame(const QuicStopSendingFrame & frame)235 void QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
236 // STOP_SENDING is in IETF QUIC only.
237 DCHECK(VersionHasIetfQuicFrames(transport_version()));
238 DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
239
240 QuicStreamId stream_id = frame.stream_id;
241 // If Stream ID is invalid then close the connection.
242 // TODO(ianswett): This check is redundant to checks for IsClosedStream,
243 // but removing it requires removing multiple DCHECKs.
244 // TODO(ianswett): Multiple QUIC_DVLOGs could be QUIC_PEER_BUGs.
245 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
246 QUIC_DVLOG(1) << ENDPOINT
247 << "Received STOP_SENDING with invalid stream_id: "
248 << stream_id << " Closing connection";
249 connection()->CloseConnection(
250 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
251 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
252 return;
253 }
254
255 // If stream_id is READ_UNIDIRECTIONAL, close the connection.
256 if (QuicUtils::GetStreamType(stream_id, perspective(),
257 IsIncomingStream(stream_id)) ==
258 READ_UNIDIRECTIONAL) {
259 QUIC_DVLOG(1) << ENDPOINT
260 << "Received STOP_SENDING for a read-only stream_id: "
261 << stream_id << ".";
262 connection()->CloseConnection(
263 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a read-only stream",
264 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
265 return;
266 }
267
268 if (visitor_) {
269 visitor_->OnStopSendingReceived(frame);
270 }
271
272 QuicStream* stream = GetOrCreateStream(stream_id);
273 if (!stream) {
274 // Errors are handled by GetOrCreateStream.
275 return;
276 }
277
278 if (!stream->OnStopSending(frame.application_error_code)) {
279 return;
280 }
281
282 // TODO(renjietang): Consider moving those code into the stream.
283 if (connection()->connected()) {
284 MaybeSendRstStreamFrame(
285 stream->id(),
286 static_cast<quic::QuicRstStreamErrorCode>(frame.application_error_code),
287 stream->stream_bytes_written());
288 connection_->OnStreamReset(stream->id(),
289 static_cast<quic::QuicRstStreamErrorCode>(
290 frame.application_error_code));
291 }
292 stream->set_rst_sent(true);
293 stream->CloseWriteSide();
294 }
295
OnPacketDecrypted(EncryptionLevel level)296 void QuicSession::OnPacketDecrypted(EncryptionLevel level) {
297 GetMutableCryptoStream()->OnPacketDecrypted(level);
298 }
299
OnOneRttPacketAcknowledged()300 void QuicSession::OnOneRttPacketAcknowledged() {
301 GetMutableCryptoStream()->OnOneRttPacketAcknowledged();
302 }
303
PendingStreamOnRstStream(const QuicRstStreamFrame & frame)304 void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
305 DCHECK(VersionUsesHttp3(transport_version()));
306 QuicStreamId stream_id = frame.stream_id;
307
308 PendingStream* pending = GetOrCreatePendingStream(stream_id);
309
310 if (!pending) {
311 HandleRstOnValidNonexistentStream(frame);
312 return;
313 }
314
315 pending->OnRstStreamFrame(frame);
316 // Pending stream is currently read only. We can safely close the stream.
317 DCHECK_EQ(READ_UNIDIRECTIONAL,
318 QuicUtils::GetStreamType(pending->id(), perspective(),
319 /*peer_initiated = */ true));
320 ClosePendingStream(stream_id);
321 }
322
OnRstStream(const QuicRstStreamFrame & frame)323 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
324 QuicStreamId stream_id = frame.stream_id;
325 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
326 connection()->CloseConnection(
327 QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
328 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
329 return;
330 }
331
332 if (VersionHasIetfQuicFrames(transport_version()) &&
333 QuicUtils::GetStreamType(stream_id, perspective(),
334 IsIncomingStream(stream_id)) ==
335 WRITE_UNIDIRECTIONAL) {
336 connection()->CloseConnection(
337 QUIC_INVALID_STREAM_ID, "Received RESET_STREAM for a write-only stream",
338 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
339 return;
340 }
341
342 if (visitor_) {
343 visitor_->OnRstStreamReceived(frame);
344 }
345
346 if (UsesPendingStreams() &&
347 QuicUtils::GetStreamType(stream_id, perspective(),
348 IsIncomingStream(stream_id)) ==
349 READ_UNIDIRECTIONAL &&
350 stream_map_.find(stream_id) == stream_map_.end()) {
351 PendingStreamOnRstStream(frame);
352 return;
353 }
354
355 QuicStream* stream = GetOrCreateStream(stream_id);
356
357 if (!stream) {
358 HandleRstOnValidNonexistentStream(frame);
359 return; // Errors are handled by GetOrCreateStream.
360 }
361 stream->OnStreamReset(frame);
362 }
363
OnGoAway(const QuicGoAwayFrame &)364 void QuicSession::OnGoAway(const QuicGoAwayFrame& /*frame*/) {
365 goaway_received_ = true;
366 }
367
OnMessageReceived(quiche::QuicheStringPiece message)368 void QuicSession::OnMessageReceived(quiche::QuicheStringPiece message) {
369 QUIC_DVLOG(1) << ENDPOINT << "Received message, length: " << message.length()
370 << ", " << message;
371 }
372
OnHandshakeDoneReceived()373 void QuicSession::OnHandshakeDoneReceived() {
374 QUIC_DVLOG(1) << ENDPOINT << "OnHandshakeDoneReceived";
375 GetMutableCryptoStream()->OnHandshakeDoneReceived();
376 }
377
378 // static
RecordConnectionCloseAtServer(QuicErrorCode error,ConnectionCloseSource source)379 void QuicSession::RecordConnectionCloseAtServer(QuicErrorCode error,
380 ConnectionCloseSource source) {
381 if (error != QUIC_NO_ERROR) {
382 if (source == ConnectionCloseSource::FROM_SELF) {
383 QUIC_SERVER_HISTOGRAM_ENUM(
384 "quic_server_connection_close_errors", error, QUIC_LAST_ERROR,
385 "QuicErrorCode for server-closed connections.");
386 } else {
387 QUIC_SERVER_HISTOGRAM_ENUM(
388 "quic_client_connection_close_errors", error, QUIC_LAST_ERROR,
389 "QuicErrorCode for client-closed connections.");
390 }
391 }
392 }
393
OnConnectionClosed(const QuicConnectionCloseFrame & frame,ConnectionCloseSource source)394 void QuicSession::OnConnectionClosed(const QuicConnectionCloseFrame& frame,
395 ConnectionCloseSource source) {
396 DCHECK(!connection_->connected());
397 if (perspective() == Perspective::IS_SERVER) {
398 RecordConnectionCloseAtServer(frame.quic_error_code, source);
399 }
400
401 if (on_closed_frame_.extracted_error_code == QUIC_NO_ERROR) {
402 // Save all of the connection close information
403 on_closed_frame_ = frame;
404 }
405
406 // Copy all non static streams in a new map for the ease of deleting.
407 QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
408 for (const auto& it : stream_map_) {
409 if (!it.second->is_static()) {
410 non_static_streams[it.first] = it.second.get();
411 }
412 }
413 for (const auto& it : non_static_streams) {
414 QuicStreamId id = it.first;
415 it.second->OnConnectionClosed(frame.quic_error_code, source);
416 if (stream_map_.find(id) != stream_map_.end()) {
417 QUIC_BUG << ENDPOINT << "Stream " << id
418 << " failed to close under OnConnectionClosed";
419 CloseStream(id);
420 }
421 }
422
423 // Cleanup zombie stream map on connection close.
424 while (!zombie_streams_.empty()) {
425 ZombieStreamMap::iterator it = zombie_streams_.begin();
426 closed_streams_.push_back(std::move(it->second));
427 zombie_streams_.erase(it);
428 }
429
430 closed_streams_clean_up_alarm_->Cancel();
431
432 if (visitor_) {
433 visitor_->OnConnectionClosed(connection_->connection_id(),
434 frame.extracted_error_code,
435 frame.error_details, source);
436 }
437 }
438
OnWriteBlocked()439 void QuicSession::OnWriteBlocked() {
440 if (!connection_->connected()) {
441 return;
442 }
443 if (visitor_) {
444 visitor_->OnWriteBlocked(connection_);
445 }
446 }
447
OnSuccessfulVersionNegotiation(const ParsedQuicVersion &)448 void QuicSession::OnSuccessfulVersionNegotiation(
449 const ParsedQuicVersion& /*version*/) {}
450
OnPacketReceived(const QuicSocketAddress &,const QuicSocketAddress & peer_address,bool is_connectivity_probe)451 void QuicSession::OnPacketReceived(const QuicSocketAddress& /*self_address*/,
452 const QuicSocketAddress& peer_address,
453 bool is_connectivity_probe) {
454 if (is_connectivity_probe && perspective() == Perspective::IS_SERVER) {
455 // Server only sends back a connectivity probe after received a
456 // connectivity probe from a new peer address.
457 connection_->SendConnectivityProbingResponsePacket(peer_address);
458 }
459 }
460
OnPathDegrading()461 void QuicSession::OnPathDegrading() {}
462
AllowSelfAddressChange() const463 bool QuicSession::AllowSelfAddressChange() const {
464 return false;
465 }
466
OnForwardProgressConfirmed()467 void QuicSession::OnForwardProgressConfirmed() {}
468
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)469 void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
470 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
471 // assume that it still exists.
472 QuicStreamId stream_id = frame.stream_id;
473 if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
474 // This is a window update that applies to the connection, rather than an
475 // individual stream.
476 QUIC_DVLOG(1) << ENDPOINT
477 << "Received connection level flow control window "
478 "update with max data: "
479 << frame.max_data;
480 flow_controller_.UpdateSendWindowOffset(frame.max_data);
481 return;
482 }
483
484 if (VersionHasIetfQuicFrames(transport_version()) &&
485 QuicUtils::GetStreamType(stream_id, perspective(),
486 IsIncomingStream(stream_id)) ==
487 READ_UNIDIRECTIONAL) {
488 connection()->CloseConnection(
489 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
490 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.",
491 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
492 return;
493 }
494
495 QuicStream* stream = GetOrCreateStream(stream_id);
496 if (stream != nullptr) {
497 stream->OnWindowUpdateFrame(frame);
498 }
499 }
500
OnBlockedFrame(const QuicBlockedFrame & frame)501 void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
502 // TODO(rjshade): Compare our flow control receive windows for specified
503 // streams: if we have a large window then maybe something
504 // had gone wrong with the flow control accounting.
505 QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
506 << frame.stream_id;
507 }
508
CheckStreamNotBusyLooping(QuicStream * stream,uint64_t previous_bytes_written,bool previous_fin_sent)509 bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
510 uint64_t previous_bytes_written,
511 bool previous_fin_sent) {
512 if ( // Stream should not be closed.
513 !stream->write_side_closed() &&
514 // Not connection flow control blocked.
515 !flow_controller_.IsBlocked() &&
516 // Detect lack of forward progress.
517 previous_bytes_written == stream->stream_bytes_written() &&
518 previous_fin_sent == stream->fin_sent()) {
519 stream->set_busy_counter(stream->busy_counter() + 1);
520 QUIC_DVLOG(1) << ENDPOINT << "Suspected busy loop on stream id "
521 << stream->id() << " stream_bytes_written "
522 << stream->stream_bytes_written() << " fin "
523 << stream->fin_sent() << " count " << stream->busy_counter();
524 // Wait a few iterations before firing, the exact count is
525 // arbitrary, more than a few to cover a few test-only false
526 // positives.
527 if (stream->busy_counter() > 20) {
528 QUIC_LOG(ERROR) << ENDPOINT << "Detected busy loop on stream id "
529 << stream->id() << " stream_bytes_written "
530 << stream->stream_bytes_written() << " fin "
531 << stream->fin_sent();
532 return false;
533 }
534 } else {
535 stream->set_busy_counter(0);
536 }
537 return true;
538 }
539
CheckStreamWriteBlocked(QuicStream * stream) const540 bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
541 if (!stream->write_side_closed() && stream->HasBufferedData() &&
542 !stream->flow_controller()->IsBlocked() &&
543 !write_blocked_streams_.IsStreamBlocked(stream->id())) {
544 QUIC_DLOG(ERROR) << ENDPOINT << "stream " << stream->id()
545 << " has buffered " << stream->BufferedDataBytes()
546 << " bytes, and is not flow control blocked, "
547 "but it is not in the write block list.";
548 return false;
549 }
550 return true;
551 }
552
OnCanWrite()553 void QuicSession::OnCanWrite() {
554 if (!RetransmitLostData()) {
555 // Cannot finish retransmitting lost data, connection is write blocked.
556 QUIC_DVLOG(1) << ENDPOINT
557 << "Cannot finish retransmitting lost data, connection is "
558 "write blocked.";
559 return;
560 }
561 if (!write_with_transmission_) {
562 SetTransmissionType(NOT_RETRANSMISSION);
563 }
564 // We limit the number of writes to the number of pending streams. If more
565 // streams become pending, WillingAndAbleToWrite will be true, which will
566 // cause the connection to request resumption before yielding to other
567 // connections.
568 // If we are connection level flow control blocked, then only allow the
569 // crypto and headers streams to try writing as all other streams will be
570 // blocked.
571 size_t num_writes = flow_controller_.IsBlocked()
572 ? write_blocked_streams_.NumBlockedSpecialStreams()
573 : write_blocked_streams_.NumBlockedStreams();
574 if (num_writes == 0 && !control_frame_manager_.WillingToWrite() &&
575 datagram_queue_.empty() &&
576 (!QuicVersionUsesCryptoFrames(transport_version()) ||
577 !GetCryptoStream()->HasBufferedCryptoFrames())) {
578 return;
579 }
580
581 QuicConnection::ScopedPacketFlusher flusher(connection_);
582 if (QuicVersionUsesCryptoFrames(transport_version())) {
583 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
584 if (crypto_stream->HasBufferedCryptoFrames()) {
585 crypto_stream->WriteBufferedCryptoFrames();
586 }
587 if (crypto_stream->HasBufferedCryptoFrames()) {
588 // Cannot finish writing buffered crypto frames, connection is write
589 // blocked.
590 return;
591 }
592 }
593 if (control_frame_manager_.WillingToWrite()) {
594 control_frame_manager_.OnCanWrite();
595 }
596 // TODO(b/147146815): this makes all datagrams go before stream data. We
597 // should have a better priority scheme for this.
598 if (!datagram_queue_.empty()) {
599 size_t written = datagram_queue_.SendDatagrams();
600 QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams";
601 if (!datagram_queue_.empty()) {
602 return;
603 }
604 }
605 std::vector<QuicStreamId> last_writing_stream_ids;
606 for (size_t i = 0; i < num_writes; ++i) {
607 if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() ||
608 write_blocked_streams_.HasWriteBlockedDataStreams())) {
609 // Writing one stream removed another!? Something's broken.
610 QUIC_BUG << "WriteBlockedStream is missing, num_writes: " << num_writes
611 << ", finished_writes: " << i
612 << ", connected: " << connection_->connected()
613 << ", connection level flow control blocked: "
614 << flow_controller_.IsBlocked() << ", scheduler type: "
615 << spdy::WriteSchedulerTypeToString(
616 write_blocked_streams_.scheduler_type());
617 for (QuicStreamId id : last_writing_stream_ids) {
618 QUIC_LOG(WARNING) << "last_writing_stream_id: " << id;
619 }
620 connection_->CloseConnection(QUIC_INTERNAL_ERROR,
621 "WriteBlockedStream is missing",
622 ConnectionCloseBehavior::SILENT_CLOSE);
623 return;
624 }
625 if (!connection_->CanWriteStreamData()) {
626 return;
627 }
628 currently_writing_stream_id_ = write_blocked_streams_.PopFront();
629 last_writing_stream_ids.push_back(currently_writing_stream_id_);
630 QUIC_DVLOG(1) << ENDPOINT << "Removing stream "
631 << currently_writing_stream_id_ << " from write-blocked list";
632 QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
633 if (stream != nullptr && !stream->flow_controller()->IsBlocked()) {
634 // If the stream can't write all bytes it'll re-add itself to the blocked
635 // list.
636 uint64_t previous_bytes_written = stream->stream_bytes_written();
637 bool previous_fin_sent = stream->fin_sent();
638 QUIC_DVLOG(1) << ENDPOINT << "stream " << stream->id()
639 << " bytes_written " << previous_bytes_written << " fin "
640 << previous_fin_sent;
641 stream->OnCanWrite();
642 DCHECK(CheckStreamWriteBlocked(stream));
643 DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
644 previous_fin_sent));
645 }
646 currently_writing_stream_id_ = 0;
647 }
648 }
649
SendProbingData()650 bool QuicSession::SendProbingData() {
651 if (connection()->sent_packet_manager().MaybeRetransmitOldestPacket(
652 PROBING_RETRANSMISSION)) {
653 return true;
654 }
655 return false;
656 }
657
WillingAndAbleToWrite() const658 bool QuicSession::WillingAndAbleToWrite() const {
659 // Schedule a write when:
660 // 1) control frame manager has pending or new control frames, or
661 // 2) any stream has pending retransmissions, or
662 // 3) If the crypto or headers streams are blocked, or
663 // 4) connection is not flow control blocked and there are write blocked
664 // streams.
665 if (QuicVersionUsesCryptoFrames(transport_version()) &&
666 HasPendingHandshake()) {
667 return true;
668 }
669 return control_frame_manager_.WillingToWrite() ||
670 !streams_with_pending_retransmission_.empty() ||
671 write_blocked_streams_.HasWriteBlockedSpecialStream() ||
672 (!flow_controller_.IsBlocked() &&
673 write_blocked_streams_.HasWriteBlockedDataStreams());
674 }
675
HasPendingHandshake() const676 bool QuicSession::HasPendingHandshake() const {
677 if (QuicVersionUsesCryptoFrames(transport_version())) {
678 return GetCryptoStream()->HasPendingCryptoRetransmission() ||
679 GetCryptoStream()->HasBufferedCryptoFrames();
680 }
681 return QuicContainsKey(streams_with_pending_retransmission_,
682 QuicUtils::GetCryptoStreamId(transport_version())) ||
683 write_blocked_streams_.IsStreamBlocked(
684 QuicUtils::GetCryptoStreamId(transport_version()));
685 }
686
GetNumOpenDynamicStreams() const687 uint64_t QuicSession::GetNumOpenDynamicStreams() const {
688 return stream_map_.size() - draining_streams_.size() +
689 locally_closed_streams_highest_offset_.size() -
690 num_incoming_static_streams_ - num_outgoing_static_streams_;
691 }
692
ProcessUdpPacket(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,const QuicReceivedPacket & packet)693 void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
694 const QuicSocketAddress& peer_address,
695 const QuicReceivedPacket& packet) {
696 connection_->ProcessUdpPacket(self_address, peer_address, packet);
697 }
698
WritevData(QuicStreamId id,size_t write_length,QuicStreamOffset offset,StreamSendingState state,TransmissionType type,quiche::QuicheOptional<EncryptionLevel> level)699 QuicConsumedData QuicSession::WritevData(
700 QuicStreamId id,
701 size_t write_length,
702 QuicStreamOffset offset,
703 StreamSendingState state,
704 TransmissionType type,
705 quiche::QuicheOptional<EncryptionLevel> level) {
706 DCHECK(connection_->connected())
707 << ENDPOINT << "Try to write stream data when connection is closed.";
708 if (!IsEncryptionEstablished() &&
709 !QuicUtils::IsCryptoStreamId(transport_version(), id)) {
710 // Do not let streams write without encryption. The calling stream will end
711 // up write blocked until OnCanWrite is next called.
712 QUIC_BUG << ENDPOINT << "Try to send data of stream " << id
713 << " before encryption is established.";
714 return QuicConsumedData(0, false);
715 }
716
717 if (write_with_transmission_) {
718 SetTransmissionType(type);
719 CountTransmissionTypeFlag(type);
720 }
721 const auto current_level = connection()->encryption_level();
722 if (level.has_value()) {
723 connection()->SetDefaultEncryptionLevel(level.value());
724 }
725
726 QuicConsumedData data =
727 connection_->SendStreamData(id, write_length, offset, state);
728 if (type == NOT_RETRANSMISSION) {
729 // This is new stream data.
730 write_blocked_streams_.UpdateBytesForStream(id, data.bytes_consumed);
731 }
732
733 // Restore the encryption level.
734 if (level.has_value()) {
735 connection()->SetDefaultEncryptionLevel(current_level);
736 }
737
738 return data;
739 }
740
SendCryptoData(EncryptionLevel level,size_t write_length,QuicStreamOffset offset,TransmissionType type)741 size_t QuicSession::SendCryptoData(EncryptionLevel level,
742 size_t write_length,
743 QuicStreamOffset offset,
744 TransmissionType type) {
745 DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
746 if (write_with_transmission_) {
747 SetTransmissionType(type);
748 CountTransmissionTypeFlag(type);
749 }
750 const auto current_level = connection()->encryption_level();
751 connection_->SetDefaultEncryptionLevel(level);
752 const auto bytes_consumed =
753 connection_->SendCryptoData(level, write_length, offset);
754 // Restores encryption level.
755 connection_->SetDefaultEncryptionLevel(current_level);
756 return bytes_consumed;
757 }
758
WriteControlFrame(const QuicFrame & frame,TransmissionType type)759 bool QuicSession::WriteControlFrame(const QuicFrame& frame,
760 TransmissionType type) {
761 if (write_with_transmission_) {
762 SetTransmissionType(type);
763 CountTransmissionTypeFlag(type);
764 }
765 return connection_->SendControlFrame(frame);
766 }
767
SendRstStream(QuicStreamId id,QuicRstStreamErrorCode error,QuicStreamOffset bytes_written)768 void QuicSession::SendRstStream(QuicStreamId id,
769 QuicRstStreamErrorCode error,
770 QuicStreamOffset bytes_written) {
771 if (connection()->connected()) {
772 QuicConnection::ScopedPacketFlusher flusher(connection());
773 MaybeSendRstStreamFrame(id, error, bytes_written);
774 MaybeSendStopSendingFrame(id, error);
775
776 connection_->OnStreamReset(id, error);
777 }
778
779 if (error != QUIC_STREAM_NO_ERROR && QuicContainsKey(zombie_streams_, id)) {
780 OnStreamDoneWaitingForAcks(id);
781 return;
782 }
783 CloseStreamInner(id, true);
784 }
785
MaybeSendRstStreamFrame(QuicStreamId id,QuicRstStreamErrorCode error,QuicStreamOffset bytes_written)786 void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
787 QuicRstStreamErrorCode error,
788 QuicStreamOffset bytes_written) {
789 DCHECK(connection()->connected());
790 if (!VersionHasIetfQuicFrames(transport_version()) ||
791 QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id)) !=
792 READ_UNIDIRECTIONAL) {
793 control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
794 }
795 }
796
MaybeSendStopSendingFrame(QuicStreamId id,QuicRstStreamErrorCode error)797 void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id,
798 QuicRstStreamErrorCode error) {
799 DCHECK(connection()->connected());
800 if (VersionHasIetfQuicFrames(transport_version()) &&
801 QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id)) !=
802 WRITE_UNIDIRECTIONAL) {
803 control_frame_manager_.WriteOrBufferStopSending(error, id);
804 }
805 }
806
SendGoAway(QuicErrorCode error_code,const std::string & reason)807 void QuicSession::SendGoAway(QuicErrorCode error_code,
808 const std::string& reason) {
809 // GOAWAY frame is not supported in v99.
810 DCHECK(!VersionHasIetfQuicFrames(transport_version()));
811 if (goaway_sent_) {
812 return;
813 }
814 goaway_sent_ = true;
815 control_frame_manager_.WriteOrBufferGoAway(
816 error_code, stream_id_manager_.largest_peer_created_stream_id(), reason);
817 }
818
SendBlocked(QuicStreamId id)819 void QuicSession::SendBlocked(QuicStreamId id) {
820 control_frame_manager_.WriteOrBufferBlocked(id);
821 }
822
SendWindowUpdate(QuicStreamId id,QuicStreamOffset byte_offset)823 void QuicSession::SendWindowUpdate(QuicStreamId id,
824 QuicStreamOffset byte_offset) {
825 control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
826 }
827
OnStreamError(QuicErrorCode error_code,std::string error_details)828 void QuicSession::OnStreamError(QuicErrorCode error_code,
829 std::string error_details) {
830 connection_->CloseConnection(
831 error_code, error_details,
832 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
833 }
834
SendMaxStreams(QuicStreamCount stream_count,bool unidirectional)835 void QuicSession::SendMaxStreams(QuicStreamCount stream_count,
836 bool unidirectional) {
837 if (!is_configured_) {
838 QUIC_BUG << "Try to send max streams before config negotiated.";
839 return;
840 }
841 control_frame_manager_.WriteOrBufferMaxStreams(stream_count, unidirectional);
842 }
843
CloseStream(QuicStreamId stream_id)844 void QuicSession::CloseStream(QuicStreamId stream_id) {
845 CloseStreamInner(stream_id, false);
846 }
847
InsertLocallyClosedStreamsHighestOffset(const QuicStreamId id,QuicStreamOffset offset)848 void QuicSession::InsertLocallyClosedStreamsHighestOffset(
849 const QuicStreamId id,
850 QuicStreamOffset offset) {
851 locally_closed_streams_highest_offset_[id] = offset;
852 if (IsIncomingStream(id)) {
853 ++num_locally_closed_incoming_streams_highest_offset_;
854 }
855 }
856
CloseStreamInner(QuicStreamId stream_id,bool rst_sent)857 void QuicSession::CloseStreamInner(QuicStreamId stream_id, bool rst_sent) {
858 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
859
860 StreamMap::iterator it = stream_map_.find(stream_id);
861 if (it == stream_map_.end()) {
862 // When CloseStreamInner has been called recursively (via
863 // QuicStream::OnClose), the stream will already have been deleted
864 // from stream_map_, so return immediately.
865 QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
866 return;
867 }
868 QuicStream* stream = it->second.get();
869 if (stream->is_static()) {
870 QUIC_DVLOG(1) << ENDPOINT
871 << "Try to close a static stream, id: " << stream_id
872 << " Closing connection";
873 connection()->CloseConnection(
874 QUIC_INVALID_STREAM_ID, "Try to close a static stream",
875 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
876 return;
877 }
878 StreamType type = stream->type();
879
880 // Tell the stream that a RST has been sent.
881 if (rst_sent) {
882 stream->set_rst_sent(true);
883 }
884
885 if (stream->IsWaitingForAcks()) {
886 zombie_streams_[stream->id()] = std::move(it->second);
887 } else {
888 // Clean up the stream since it is no longer waiting for acks.
889 streams_waiting_for_acks_.erase(stream->id());
890 closed_streams_.push_back(std::move(it->second));
891 // Do not retransmit data of a closed stream.
892 streams_with_pending_retransmission_.erase(stream_id);
893 if (!closed_streams_clean_up_alarm_->IsSet()) {
894 closed_streams_clean_up_alarm_->Set(
895 connection_->clock()->ApproximateNow());
896 }
897 }
898
899 // If we haven't received a FIN or RST for this stream, we need to keep track
900 // of the how many bytes the stream's flow controller believes it has
901 // received, for accurate connection level flow control accounting.
902 const bool had_fin_or_rst = stream->HasReceivedFinalOffset();
903 if (!had_fin_or_rst) {
904 InsertLocallyClosedStreamsHighestOffset(
905 stream_id, stream->flow_controller()->highest_received_byte_offset());
906 }
907 stream_map_.erase(it);
908 if (IsIncomingStream(stream_id)) {
909 --num_dynamic_incoming_streams_;
910 }
911
912 const bool stream_was_draining =
913 draining_streams_.find(stream_id) != draining_streams_.end();
914 if (stream_was_draining) {
915 if (IsIncomingStream(stream_id)) {
916 --num_draining_incoming_streams_;
917 }
918 draining_streams_.erase(stream_id);
919 } else if (VersionHasIetfQuicFrames(transport_version())) {
920 // Stream was not draining, but we did have a fin or rst, so we can now
921 // free the stream ID if version 99.
922 if (had_fin_or_rst && connection_->connected()) {
923 // Do not bother informing stream ID manager if connection is closed.
924 v99_streamid_manager_.OnStreamClosed(stream_id);
925 }
926 }
927
928 stream->OnClose();
929
930 if (!stream_was_draining && !IsIncomingStream(stream_id) && had_fin_or_rst &&
931 !VersionHasIetfQuicFrames(transport_version())) {
932 // Streams that first became draining already called OnCanCreate...
933 // This covers the case where the stream went directly to being closed.
934 OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
935 }
936 }
937
ClosePendingStream(QuicStreamId stream_id)938 void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
939 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
940
941 pending_stream_map_.erase(stream_id);
942 if (VersionHasIetfQuicFrames(transport_version()) &&
943 connection_->connected()) {
944 v99_streamid_manager_.OnStreamClosed(stream_id);
945 }
946 }
947
OnFinalByteOffsetReceived(QuicStreamId stream_id,QuicStreamOffset final_byte_offset)948 void QuicSession::OnFinalByteOffsetReceived(
949 QuicStreamId stream_id,
950 QuicStreamOffset final_byte_offset) {
951 auto it = locally_closed_streams_highest_offset_.find(stream_id);
952 if (it == locally_closed_streams_highest_offset_.end()) {
953 return;
954 }
955
956 QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
957 << final_byte_offset << " for stream " << stream_id;
958 QuicByteCount offset_diff = final_byte_offset - it->second;
959 if (flow_controller_.UpdateHighestReceivedOffset(
960 flow_controller_.highest_received_byte_offset() + offset_diff)) {
961 // If the final offset violates flow control, close the connection now.
962 if (flow_controller_.FlowControlViolation()) {
963 connection_->CloseConnection(
964 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
965 "Connection level flow control violation",
966 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
967 return;
968 }
969 }
970
971 flow_controller_.AddBytesConsumed(offset_diff);
972 locally_closed_streams_highest_offset_.erase(it);
973 if (IsIncomingStream(stream_id)) {
974 --num_locally_closed_incoming_streams_highest_offset_;
975 if (VersionHasIetfQuicFrames(transport_version())) {
976 v99_streamid_manager_.OnStreamClosed(stream_id);
977 }
978 } else if (!VersionHasIetfQuicFrames(transport_version())) {
979 OnCanCreateNewOutgoingStream(false);
980 }
981 }
982
IsEncryptionEstablished() const983 bool QuicSession::IsEncryptionEstablished() const {
984 if (GetCryptoStream() == nullptr) {
985 return false;
986 }
987 return GetCryptoStream()->encryption_established();
988 }
989
OneRttKeysAvailable() const990 bool QuicSession::OneRttKeysAvailable() const {
991 if (GetCryptoStream() == nullptr) {
992 return false;
993 }
994 return GetCryptoStream()->one_rtt_keys_available();
995 }
996
OnConfigNegotiated()997 void QuicSession::OnConfigNegotiated() {
998 QUIC_DVLOG(1) << ENDPOINT << "OnConfigNegotiated";
999 connection_->SetFromConfig(config_);
1000
1001 if (VersionHasIetfQuicFrames(transport_version())) {
1002 uint32_t max_streams = 0;
1003 if (config_.HasReceivedMaxBidirectionalStreams()) {
1004 max_streams = config_.ReceivedMaxBidirectionalStreams();
1005 }
1006 QUIC_DVLOG(1) << ENDPOINT
1007 << "Setting Bidirectional outgoing_max_streams_ to "
1008 << max_streams;
1009 if (v99_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
1010 max_streams)) {
1011 OnCanCreateNewOutgoingStream(/*unidirectional = */ false);
1012 }
1013
1014 max_streams = 0;
1015 if (config_.HasReceivedMaxUnidirectionalStreams()) {
1016 max_streams = config_.ReceivedMaxUnidirectionalStreams();
1017 }
1018 if (max_streams < num_expected_unidirectional_static_streams_) {
1019 // TODO(ianswett): Change this to an application error for HTTP/3.
1020 QUIC_DLOG(ERROR) << "Received unidirectional stream limit of "
1021 << max_streams << " < "
1022 << num_expected_unidirectional_static_streams_;
1023 connection_->CloseConnection(
1024 QUIC_MAX_STREAMS_ERROR, "New unidirectional stream limit is too low.",
1025 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1026 }
1027 QUIC_DVLOG(1) << ENDPOINT
1028 << "Setting Unidirectional outgoing_max_streams_ to "
1029 << max_streams;
1030 if (v99_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
1031 max_streams)) {
1032 OnCanCreateNewOutgoingStream(/*unidirectional = */ true);
1033 }
1034 } else {
1035 uint32_t max_streams = 0;
1036 if (config_.HasReceivedMaxBidirectionalStreams()) {
1037 max_streams = config_.ReceivedMaxBidirectionalStreams();
1038 }
1039 QUIC_DVLOG(1) << ENDPOINT << "Setting max_open_outgoing_streams_ to "
1040 << max_streams;
1041 stream_id_manager_.set_max_open_outgoing_streams(max_streams);
1042 }
1043
1044 if (perspective() == Perspective::IS_SERVER) {
1045 if (config_.HasReceivedConnectionOptions()) {
1046 // The following variations change the initial receive flow control
1047 // window sizes.
1048 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
1049 AdjustInitialFlowControlWindows(64 * 1024);
1050 }
1051 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
1052 AdjustInitialFlowControlWindows(128 * 1024);
1053 }
1054 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
1055 AdjustInitialFlowControlWindows(256 * 1024);
1056 }
1057 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
1058 AdjustInitialFlowControlWindows(512 * 1024);
1059 }
1060 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
1061 AdjustInitialFlowControlWindows(1024 * 1024);
1062 }
1063 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kH2PR) &&
1064 !VersionHasIetfQuicFrames(transport_version())) {
1065 // Enable HTTP2 (tree-style) priority write scheduler.
1066 use_http2_priority_write_scheduler_ =
1067 write_blocked_streams_.SwitchWriteScheduler(
1068 spdy::WriteSchedulerType::HTTP2, transport_version());
1069 } else if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kFIFO)) {
1070 // Enable FIFO write scheduler.
1071 write_blocked_streams_.SwitchWriteScheduler(
1072 spdy::WriteSchedulerType::FIFO, transport_version());
1073 } else if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kLIFO)) {
1074 // Enable LIFO write scheduler.
1075 write_blocked_streams_.SwitchWriteScheduler(
1076 spdy::WriteSchedulerType::LIFO, transport_version());
1077 } else if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kRRWS) &&
1078 write_blocked_streams_.scheduler_type() ==
1079 spdy::WriteSchedulerType::SPDY) {
1080 enable_round_robin_scheduling_ = true;
1081 }
1082 }
1083
1084 config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
1085 }
1086
1087 if (VersionHasIetfQuicFrames(transport_version())) {
1088 v99_streamid_manager_.SetMaxOpenIncomingBidirectionalStreams(
1089 config_.GetMaxBidirectionalStreamsToSend());
1090 v99_streamid_manager_.SetMaxOpenIncomingUnidirectionalStreams(
1091 config_.GetMaxUnidirectionalStreamsToSend());
1092 } else {
1093 // A small number of additional incoming streams beyond the limit should be
1094 // allowed. This helps avoid early connection termination when FIN/RSTs for
1095 // old streams are lost or arrive out of order.
1096 // Use a minimum number of additional streams, or a percentage increase,
1097 // whichever is larger.
1098 uint32_t max_incoming_streams_to_send =
1099 config_.GetMaxBidirectionalStreamsToSend();
1100 uint32_t max_incoming_streams =
1101 std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
1102 static_cast<uint32_t>(max_incoming_streams_to_send *
1103 kMaxStreamsMultiplier));
1104 stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
1105 }
1106
1107 if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
1108 // When using IETF-style TLS transport parameters, inform existing streams
1109 // of new flow-control limits.
1110 if (config_.HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
1111 OnNewStreamOutgoingBidirectionalFlowControlWindow(
1112 config_.ReceivedInitialMaxStreamDataBytesOutgoingBidirectional());
1113 }
1114 if (config_.HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
1115 OnNewStreamIncomingBidirectionalFlowControlWindow(
1116 config_.ReceivedInitialMaxStreamDataBytesIncomingBidirectional());
1117 }
1118 if (config_.HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
1119 OnNewStreamUnidirectionalFlowControlWindow(
1120 config_.ReceivedInitialMaxStreamDataBytesUnidirectional());
1121 }
1122 } else { // The version uses Google QUIC Crypto.
1123 if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
1124 // Streams which were created before the SHLO was received (0-RTT
1125 // requests) are now informed of the peer's initial flow control window.
1126 OnNewStreamFlowControlWindow(
1127 config_.ReceivedInitialStreamFlowControlWindowBytes());
1128 }
1129 }
1130
1131 if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
1132 OnNewSessionFlowControlWindow(
1133 config_.ReceivedInitialSessionFlowControlWindowBytes());
1134 }
1135 is_configured_ = true;
1136 connection()->OnConfigNegotiated();
1137
1138 // Ask flow controllers to try again since the config could have unblocked us.
1139 if (connection_->version().AllowsLowFlowControlLimits()) {
1140 OnCanWrite();
1141 }
1142 }
1143
AdjustInitialFlowControlWindows(size_t stream_window)1144 void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
1145 const float session_window_multiplier =
1146 config_.GetInitialStreamFlowControlWindowToSend()
1147 ? static_cast<float>(
1148 config_.GetInitialSessionFlowControlWindowToSend()) /
1149 config_.GetInitialStreamFlowControlWindowToSend()
1150 : 1.5;
1151
1152 QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
1153 config_.SetInitialStreamFlowControlWindowToSend(stream_window);
1154
1155 size_t session_window = session_window_multiplier * stream_window;
1156 QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
1157 << session_window;
1158 config_.SetInitialSessionFlowControlWindowToSend(session_window);
1159 flow_controller_.UpdateReceiveWindowSize(session_window);
1160 // Inform all existing streams about the new window.
1161 for (auto const& kv : stream_map_) {
1162 kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
1163 }
1164 if (!QuicVersionUsesCryptoFrames(transport_version())) {
1165 GetMutableCryptoStream()->flow_controller()->UpdateReceiveWindowSize(
1166 stream_window);
1167 }
1168 }
1169
HandleFrameOnNonexistentOutgoingStream(QuicStreamId stream_id)1170 void QuicSession::HandleFrameOnNonexistentOutgoingStream(
1171 QuicStreamId stream_id) {
1172 DCHECK(!IsClosedStream(stream_id));
1173 // Received a frame for a locally-created stream that is not currently
1174 // active. This is an error.
1175 if (VersionHasIetfQuicFrames(transport_version())) {
1176 connection()->CloseConnection(
1177 QUIC_HTTP_STREAM_WRONG_DIRECTION, "Data for nonexistent stream",
1178 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1179 return;
1180 }
1181 connection()->CloseConnection(
1182 QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
1183 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1184 }
1185
HandleRstOnValidNonexistentStream(const QuicRstStreamFrame & frame)1186 void QuicSession::HandleRstOnValidNonexistentStream(
1187 const QuicRstStreamFrame& frame) {
1188 // If the stream is neither originally in active streams nor created in
1189 // GetOrCreateStream(), it could be a closed stream in which case its
1190 // final received byte offset need to be updated.
1191 if (IsClosedStream(frame.stream_id)) {
1192 // The RST frame contains the final byte offset for the stream: we can now
1193 // update the connection level flow controller if needed.
1194 OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
1195 }
1196 }
1197
OnNewStreamFlowControlWindow(QuicStreamOffset new_window)1198 void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
1199 QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamFlowControlWindow " << new_window;
1200 if (new_window < kMinimumFlowControlSendWindow &&
1201 !connection_->version().AllowsLowFlowControlLimits()) {
1202 QUIC_LOG_FIRST_N(ERROR, 1)
1203 << "Peer sent us an invalid stream flow control send window: "
1204 << new_window << ", below minimum: " << kMinimumFlowControlSendWindow;
1205 connection_->CloseConnection(
1206 QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
1207 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1208 return;
1209 }
1210
1211 // Inform all existing streams about the new window.
1212 for (auto const& kv : stream_map_) {
1213 QUIC_DVLOG(1) << ENDPOINT << "Informing stream " << kv.first
1214 << " of new stream flow control window " << new_window;
1215 kv.second->UpdateSendWindowOffset(new_window);
1216 }
1217 if (!QuicVersionUsesCryptoFrames(transport_version())) {
1218 QUIC_DVLOG(1)
1219 << ENDPOINT
1220 << "Informing crypto stream of new stream flow control window "
1221 << new_window;
1222 GetMutableCryptoStream()->UpdateSendWindowOffset(new_window);
1223 }
1224 }
1225
OnNewStreamUnidirectionalFlowControlWindow(QuicStreamOffset new_window)1226 void QuicSession::OnNewStreamUnidirectionalFlowControlWindow(
1227 QuicStreamOffset new_window) {
1228 QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamUnidirectionalFlowControlWindow "
1229 << new_window;
1230 // Inform all existing outgoing unidirectional streams about the new window.
1231 for (auto const& kv : stream_map_) {
1232 const QuicStreamId id = kv.first;
1233 if (QuicUtils::IsBidirectionalStreamId(id)) {
1234 continue;
1235 }
1236 if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1237 perspective())) {
1238 continue;
1239 }
1240 QUIC_DVLOG(1) << ENDPOINT << "Informing unidirectional stream " << id
1241 << " of new stream flow control window " << new_window;
1242 kv.second->UpdateSendWindowOffset(new_window);
1243 }
1244 }
1245
OnNewStreamOutgoingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1246 void QuicSession::OnNewStreamOutgoingBidirectionalFlowControlWindow(
1247 QuicStreamOffset new_window) {
1248 QUIC_DVLOG(1) << ENDPOINT
1249 << "OnNewStreamOutgoingBidirectionalFlowControlWindow "
1250 << new_window;
1251 // Inform all existing outgoing bidirectional streams about the new window.
1252 for (auto const& kv : stream_map_) {
1253 const QuicStreamId id = kv.first;
1254 if (!QuicUtils::IsBidirectionalStreamId(id)) {
1255 continue;
1256 }
1257 if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1258 perspective())) {
1259 continue;
1260 }
1261 QUIC_DVLOG(1) << ENDPOINT << "Informing outgoing bidirectional stream "
1262 << id << " of new stream flow control window " << new_window;
1263 kv.second->UpdateSendWindowOffset(new_window);
1264 }
1265 }
1266
OnNewStreamIncomingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1267 void QuicSession::OnNewStreamIncomingBidirectionalFlowControlWindow(
1268 QuicStreamOffset new_window) {
1269 QUIC_DVLOG(1) << ENDPOINT
1270 << "OnNewStreamIncomingBidirectionalFlowControlWindow "
1271 << new_window;
1272 // Inform all existing incoming bidirectional streams about the new window.
1273 for (auto const& kv : stream_map_) {
1274 const QuicStreamId id = kv.first;
1275 if (!QuicUtils::IsBidirectionalStreamId(id)) {
1276 continue;
1277 }
1278 if (QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1279 perspective())) {
1280 continue;
1281 }
1282 QUIC_DVLOG(1) << ENDPOINT << "Informing incoming bidirectional stream "
1283 << id << " of new stream flow control window " << new_window;
1284 kv.second->UpdateSendWindowOffset(new_window);
1285 }
1286 }
1287
OnNewSessionFlowControlWindow(QuicStreamOffset new_window)1288 void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
1289 QUIC_DVLOG(1) << ENDPOINT << "OnNewSessionFlowControlWindow " << new_window;
1290 if (new_window < kMinimumFlowControlSendWindow &&
1291 !connection_->version().AllowsLowFlowControlLimits()) {
1292 QUIC_LOG_FIRST_N(ERROR, 1)
1293 << "Peer sent us an invalid session flow control send window: "
1294 << new_window << ", below default: " << kMinimumFlowControlSendWindow;
1295 connection_->CloseConnection(
1296 QUIC_FLOW_CONTROL_INVALID_WINDOW, "New connection window too low",
1297 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1298 return;
1299 }
1300
1301 flow_controller_.UpdateSendWindowOffset(new_window);
1302 }
1303
OnNewDecryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicDecrypter> decrypter,bool set_alternative_decrypter,bool latch_once_used)1304 bool QuicSession::OnNewDecryptionKeyAvailable(
1305 EncryptionLevel level,
1306 std::unique_ptr<QuicDecrypter> decrypter,
1307 bool set_alternative_decrypter,
1308 bool latch_once_used) {
1309 if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3 &&
1310 !connection()->framer().HasEncrypterOfEncryptionLevel(
1311 QuicUtils::GetEncryptionLevel(
1312 QuicUtils::GetPacketNumberSpace(level)))) {
1313 // This should never happen because connection should never decrypt a packet
1314 // while an ACK for it cannot be encrypted.
1315 return false;
1316 }
1317 if (connection()->version().KnowsWhichDecrypterToUse()) {
1318 connection()->InstallDecrypter(level, std::move(decrypter));
1319 return true;
1320 }
1321 if (set_alternative_decrypter) {
1322 connection()->SetAlternativeDecrypter(level, std::move(decrypter),
1323 latch_once_used);
1324 return true;
1325 }
1326 connection()->SetDecrypter(level, std::move(decrypter));
1327 return true;
1328 }
1329
OnNewEncryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicEncrypter> encrypter)1330 void QuicSession::OnNewEncryptionKeyAvailable(
1331 EncryptionLevel level,
1332 std::unique_ptr<QuicEncrypter> encrypter) {
1333 connection()->SetEncrypter(level, std::move(encrypter));
1334
1335 if (GetQuicRestartFlag(quic_send_settings_on_write_key_available) &&
1336 connection_->version().handshake_protocol == PROTOCOL_TLS1_3 &&
1337 level == ENCRYPTION_FORWARD_SECURE) {
1338 // Set connection's default encryption level once 1-RTT write key is
1339 // available.
1340 QUIC_RESTART_FLAG_COUNT_N(quic_send_settings_on_write_key_available, 1, 2);
1341 QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to "
1342 << EncryptionLevelToString(level);
1343 connection()->SetDefaultEncryptionLevel(level);
1344 }
1345 }
1346
SetDefaultEncryptionLevel(EncryptionLevel level)1347 void QuicSession::SetDefaultEncryptionLevel(EncryptionLevel level) {
1348 DCHECK_EQ(PROTOCOL_QUIC_CRYPTO, connection_->version().handshake_protocol);
1349 QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to "
1350 << EncryptionLevelToString(level);
1351 connection()->SetDefaultEncryptionLevel(level);
1352
1353 switch (level) {
1354 case ENCRYPTION_INITIAL:
1355 break;
1356 case ENCRYPTION_ZERO_RTT:
1357 if (perspective() == Perspective::IS_CLIENT) {
1358 // Retransmit old 0-RTT data (if any) with the new 0-RTT keys, since
1359 // they can't be decrypted by the server.
1360 connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION);
1361 // Given any streams blocked by encryption a chance to write.
1362 OnCanWrite();
1363 }
1364 break;
1365 case ENCRYPTION_HANDSHAKE:
1366 break;
1367 case ENCRYPTION_FORWARD_SECURE:
1368 QUIC_BUG_IF(!config_.negotiated())
1369 << ENDPOINT << "Handshake confirmed without parameter negotiation.";
1370 if (!GetQuicReloadableFlag(quic_bw_sampler_app_limited_starting_value)) {
1371 connection_->ResetHasNonAppLimitedSampleAfterHandshakeCompletion();
1372 }
1373 break;
1374 default:
1375 QUIC_BUG << "Unknown encryption level: "
1376 << EncryptionLevelToString(level);
1377 }
1378 }
1379
OnOneRttKeysAvailable()1380 void QuicSession::OnOneRttKeysAvailable() {
1381 DCHECK_EQ(PROTOCOL_TLS1_3, connection_->version().handshake_protocol);
1382 if (!GetQuicRestartFlag(quic_send_settings_on_write_key_available)) {
1383 QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to "
1384 << EncryptionLevelToString(ENCRYPTION_FORWARD_SECURE);
1385 connection()->SetDefaultEncryptionLevel(ENCRYPTION_FORWARD_SECURE);
1386 }
1387
1388 QUIC_BUG_IF(!GetCryptoStream()->crypto_negotiated_params().cipher_suite)
1389 << ENDPOINT << "Handshake completes without cipher suite negotiation.";
1390 QUIC_BUG_IF(!config_.negotiated())
1391 << ENDPOINT << "Handshake completes without parameter negotiation.";
1392 if (connection()->version().HasHandshakeDone() &&
1393 perspective_ == Perspective::IS_SERVER) {
1394 // Server sends HANDSHAKE_DONE to signal confirmation of the handshake
1395 // to the client.
1396 control_frame_manager_.WriteOrBufferHandshakeDone();
1397 }
1398 if (!GetQuicReloadableFlag(quic_bw_sampler_app_limited_starting_value)) {
1399 connection_->ResetHasNonAppLimitedSampleAfterHandshakeCompletion();
1400 }
1401 }
1402
DiscardOldDecryptionKey(EncryptionLevel level)1403 void QuicSession::DiscardOldDecryptionKey(EncryptionLevel level) {
1404 if (!connection()->version().KnowsWhichDecrypterToUse()) {
1405 return;
1406 }
1407 connection()->RemoveDecrypter(level);
1408 }
1409
DiscardOldEncryptionKey(EncryptionLevel level)1410 void QuicSession::DiscardOldEncryptionKey(EncryptionLevel level) {
1411 QUIC_DVLOG(1) << ENDPOINT << "Discard keys of "
1412 << EncryptionLevelToString(level);
1413 if (connection()->version().handshake_protocol == PROTOCOL_TLS1_3) {
1414 connection()->RemoveEncrypter(level);
1415 }
1416 switch (level) {
1417 case ENCRYPTION_INITIAL:
1418 NeuterUnencryptedData();
1419 break;
1420 case ENCRYPTION_HANDSHAKE:
1421 NeuterHandshakeData();
1422 break;
1423 case ENCRYPTION_ZERO_RTT:
1424 break;
1425 case ENCRYPTION_FORWARD_SECURE:
1426 QUIC_BUG << "Tries to drop 1-RTT keys";
1427 break;
1428 default:
1429 QUIC_BUG << "Unknown encryption level: "
1430 << EncryptionLevelToString(level);
1431 }
1432 }
1433
NeuterHandshakeData()1434 void QuicSession::NeuterHandshakeData() {
1435 connection()->OnHandshakeComplete();
1436 }
1437
OnCryptoHandshakeMessageSent(const CryptoHandshakeMessage &)1438 void QuicSession::OnCryptoHandshakeMessageSent(
1439 const CryptoHandshakeMessage& /*message*/) {}
1440
OnCryptoHandshakeMessageReceived(const CryptoHandshakeMessage &)1441 void QuicSession::OnCryptoHandshakeMessageReceived(
1442 const CryptoHandshakeMessage& /*message*/) {}
1443
RegisterStreamPriority(QuicStreamId id,bool is_static,const spdy::SpdyStreamPrecedence & precedence)1444 void QuicSession::RegisterStreamPriority(
1445 QuicStreamId id,
1446 bool is_static,
1447 const spdy::SpdyStreamPrecedence& precedence) {
1448 if (enable_round_robin_scheduling_) {
1449 // Ignore provided precedence, instead, put all streams at the same priority
1450 // bucket.
1451 write_blocked_streams()->RegisterStream(
1452 id, is_static, spdy::SpdyStreamPrecedence(spdy::kV3LowestPriority));
1453 return;
1454 }
1455 write_blocked_streams()->RegisterStream(id, is_static, precedence);
1456 }
1457
UnregisterStreamPriority(QuicStreamId id,bool is_static)1458 void QuicSession::UnregisterStreamPriority(QuicStreamId id, bool is_static) {
1459 write_blocked_streams()->UnregisterStream(id, is_static);
1460 }
1461
UpdateStreamPriority(QuicStreamId id,const spdy::SpdyStreamPrecedence & new_precedence)1462 void QuicSession::UpdateStreamPriority(
1463 QuicStreamId id,
1464 const spdy::SpdyStreamPrecedence& new_precedence) {
1465 if (enable_round_robin_scheduling_) {
1466 // Ignore updated precedence.
1467 return;
1468 }
1469 write_blocked_streams()->UpdateStreamPriority(id, new_precedence);
1470 }
1471
config()1472 QuicConfig* QuicSession::config() {
1473 return &config_;
1474 }
1475
ActivateStream(std::unique_ptr<QuicStream> stream)1476 void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
1477 QuicStreamId stream_id = stream->id();
1478 bool is_static = stream->is_static();
1479 QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
1480 << ". activating " << stream_id;
1481 DCHECK(!QuicContainsKey(stream_map_, stream_id));
1482 stream_map_[stream_id] = std::move(stream);
1483 if (IsIncomingStream(stream_id)) {
1484 is_static ? ++num_incoming_static_streams_
1485 : ++num_dynamic_incoming_streams_;
1486 } else if (is_static) {
1487 ++num_outgoing_static_streams_;
1488 }
1489
1490 if (VersionHasIetfQuicFrames(transport_version()) &&
1491 !QuicUtils::IsBidirectionalStreamId(stream_id) && is_static) {
1492 DCHECK_LE(num_incoming_static_streams_,
1493 num_expected_unidirectional_static_streams_);
1494 DCHECK_LE(num_outgoing_static_streams_,
1495 num_expected_unidirectional_static_streams_);
1496 }
1497 }
1498
GetNextOutgoingBidirectionalStreamId()1499 QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
1500 if (VersionHasIetfQuicFrames(transport_version())) {
1501 return v99_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
1502 }
1503 return stream_id_manager_.GetNextOutgoingStreamId();
1504 }
1505
GetNextOutgoingUnidirectionalStreamId()1506 QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
1507 if (VersionHasIetfQuicFrames(transport_version())) {
1508 return v99_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
1509 }
1510 return stream_id_manager_.GetNextOutgoingStreamId();
1511 }
1512
CanOpenNextOutgoingBidirectionalStream()1513 bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
1514 if (!VersionHasIetfQuicFrames(transport_version())) {
1515 return stream_id_manager_.CanOpenNextOutgoingStream(
1516 GetNumOpenOutgoingStreams());
1517 }
1518 if (v99_streamid_manager_.CanOpenNextOutgoingBidirectionalStream()) {
1519 return true;
1520 }
1521 if (is_configured_) {
1522 // Send STREAM_BLOCKED after config negotiated.
1523 control_frame_manager_.WriteOrBufferStreamsBlocked(
1524 v99_streamid_manager_.max_outgoing_bidirectional_streams(),
1525 /*unidirectional=*/false);
1526 }
1527 return false;
1528 }
1529
CanOpenNextOutgoingUnidirectionalStream()1530 bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
1531 if (!VersionHasIetfQuicFrames(transport_version())) {
1532 return stream_id_manager_.CanOpenNextOutgoingStream(
1533 GetNumOpenOutgoingStreams());
1534 }
1535 if (v99_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream()) {
1536 return true;
1537 }
1538 if (is_configured_) {
1539 // Send STREAM_BLOCKED after config negotiated.
1540 control_frame_manager_.WriteOrBufferStreamsBlocked(
1541 v99_streamid_manager_.max_outgoing_unidirectional_streams(),
1542 /*unidirectional=*/true);
1543 }
1544 return false;
1545 }
1546
GetAdvertisedMaxIncomingBidirectionalStreams() const1547 QuicStreamCount QuicSession::GetAdvertisedMaxIncomingBidirectionalStreams()
1548 const {
1549 DCHECK(VersionHasIetfQuicFrames(transport_version()));
1550 return v99_streamid_manager_.advertised_max_incoming_bidirectional_streams();
1551 }
1552
GetOrCreateStream(const QuicStreamId stream_id)1553 QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
1554 DCHECK(!QuicContainsKey(pending_stream_map_, stream_id));
1555 if (QuicUtils::IsCryptoStreamId(transport_version(), stream_id)) {
1556 return GetMutableCryptoStream();
1557 }
1558
1559 StreamMap::iterator it = stream_map_.find(stream_id);
1560 if (it != stream_map_.end()) {
1561 return it->second.get();
1562 }
1563
1564 if (IsClosedStream(stream_id)) {
1565 return nullptr;
1566 }
1567
1568 if (!IsIncomingStream(stream_id)) {
1569 HandleFrameOnNonexistentOutgoingStream(stream_id);
1570 return nullptr;
1571 }
1572
1573 // TODO(fkastenholz): If we are creating a new stream and we have
1574 // sent a goaway, we should ignore the stream creation. Need to
1575 // add code to A) test if goaway was sent ("if (goaway_sent_)") and
1576 // B) reject stream creation ("return nullptr")
1577
1578 if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
1579 return nullptr;
1580 }
1581
1582 if (!VersionHasIetfQuicFrames(transport_version())) {
1583 // TODO(fayang): Let LegacyQuicStreamIdManager count open streams and make
1584 // CanOpenIncomingStream interface consistent with that of v99.
1585 if (!stream_id_manager_.CanOpenIncomingStream(
1586 GetNumOpenIncomingStreams())) {
1587 // Refuse to open the stream.
1588 SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
1589 return nullptr;
1590 }
1591 }
1592
1593 return CreateIncomingStream(stream_id);
1594 }
1595
StreamDraining(QuicStreamId stream_id)1596 void QuicSession::StreamDraining(QuicStreamId stream_id) {
1597 DCHECK(QuicContainsKey(stream_map_, stream_id));
1598 if (!QuicContainsKey(draining_streams_, stream_id)) {
1599 draining_streams_.insert(stream_id);
1600 if (IsIncomingStream(stream_id)) {
1601 ++num_draining_incoming_streams_;
1602 }
1603 if (VersionHasIetfQuicFrames(transport_version())) {
1604 v99_streamid_manager_.OnStreamClosed(stream_id);
1605 }
1606 }
1607 if (!IsIncomingStream(stream_id)) {
1608 // Inform application that a stream is available.
1609 if (VersionHasIetfQuicFrames(transport_version())) {
1610 OnCanCreateNewOutgoingStream(
1611 !QuicUtils::IsBidirectionalStreamId(stream_id));
1612 } else {
1613 QuicStream* stream = GetStream(stream_id);
1614 if (!stream) {
1615 QUIC_BUG << "Stream doesn't exist when draining.";
1616 return;
1617 }
1618 OnCanCreateNewOutgoingStream(stream->type() != BIDIRECTIONAL);
1619 }
1620 }
1621 }
1622
MaybeIncreaseLargestPeerStreamId(const QuicStreamId stream_id)1623 bool QuicSession::MaybeIncreaseLargestPeerStreamId(
1624 const QuicStreamId stream_id) {
1625 if (VersionHasIetfQuicFrames(transport_version())) {
1626 std::string error_details;
1627 if (v99_streamid_manager_.MaybeIncreaseLargestPeerStreamId(
1628 stream_id, &error_details)) {
1629 return true;
1630 }
1631 connection()->CloseConnection(
1632 QUIC_INVALID_STREAM_ID, error_details,
1633 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1634 return false;
1635 }
1636 if (!stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id)) {
1637 connection()->CloseConnection(
1638 QUIC_TOO_MANY_AVAILABLE_STREAMS,
1639 quiche::QuicheStrCat(stream_id, " exceeds available streams ",
1640 stream_id_manager_.MaxAvailableStreams()),
1641 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1642 return false;
1643 }
1644 return true;
1645 }
1646
ShouldYield(QuicStreamId stream_id)1647 bool QuicSession::ShouldYield(QuicStreamId stream_id) {
1648 if (stream_id == currently_writing_stream_id_) {
1649 return false;
1650 }
1651 return write_blocked_streams()->ShouldYield(stream_id);
1652 }
1653
GetOrCreatePendingStream(QuicStreamId stream_id)1654 PendingStream* QuicSession::GetOrCreatePendingStream(QuicStreamId stream_id) {
1655 auto it = pending_stream_map_.find(stream_id);
1656 if (it != pending_stream_map_.end()) {
1657 return it->second.get();
1658 }
1659
1660 if (IsClosedStream(stream_id) ||
1661 !MaybeIncreaseLargestPeerStreamId(stream_id)) {
1662 return nullptr;
1663 }
1664
1665 auto pending = std::make_unique<PendingStream>(stream_id, this);
1666 PendingStream* unowned_pending = pending.get();
1667 pending_stream_map_[stream_id] = std::move(pending);
1668 return unowned_pending;
1669 }
1670
set_largest_peer_created_stream_id(QuicStreamId largest_peer_created_stream_id)1671 void QuicSession::set_largest_peer_created_stream_id(
1672 QuicStreamId largest_peer_created_stream_id) {
1673 DCHECK(!VersionHasIetfQuicFrames(transport_version()));
1674 stream_id_manager_.set_largest_peer_created_stream_id(
1675 largest_peer_created_stream_id);
1676 }
1677
GetLargestPeerCreatedStreamId(bool unidirectional) const1678 QuicStreamId QuicSession::GetLargestPeerCreatedStreamId(
1679 bool unidirectional) const {
1680 // This method is only used in IETF QUIC.
1681 DCHECK(VersionHasIetfQuicFrames(transport_version()));
1682 return v99_streamid_manager_.GetLargestPeerCreatedStreamId(unidirectional);
1683 }
1684
DeleteConnection()1685 void QuicSession::DeleteConnection() {
1686 if (connection_) {
1687 delete connection_;
1688 connection_ = nullptr;
1689 }
1690 }
1691
MaybeSetStreamPriority(QuicStreamId stream_id,const spdy::SpdyStreamPrecedence & precedence)1692 bool QuicSession::MaybeSetStreamPriority(
1693 QuicStreamId stream_id,
1694 const spdy::SpdyStreamPrecedence& precedence) {
1695 auto active_stream = stream_map_.find(stream_id);
1696 if (active_stream != stream_map_.end()) {
1697 active_stream->second->SetPriority(precedence);
1698 return true;
1699 }
1700
1701 return false;
1702 }
1703
IsClosedStream(QuicStreamId id)1704 bool QuicSession::IsClosedStream(QuicStreamId id) {
1705 DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
1706 if (IsOpenStream(id)) {
1707 // Stream is active
1708 return false;
1709 }
1710
1711 if (VersionHasIetfQuicFrames(transport_version())) {
1712 return !v99_streamid_manager_.IsAvailableStream(id);
1713 }
1714
1715 return !stream_id_manager_.IsAvailableStream(id);
1716 }
1717
IsOpenStream(QuicStreamId id)1718 bool QuicSession::IsOpenStream(QuicStreamId id) {
1719 DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
1720 if (QuicContainsKey(stream_map_, id) ||
1721 QuicContainsKey(pending_stream_map_, id) ||
1722 QuicUtils::IsCryptoStreamId(transport_version(), id)) {
1723 // Stream is active
1724 return true;
1725 }
1726 return false;
1727 }
1728
IsStaticStream(QuicStreamId id) const1729 bool QuicSession::IsStaticStream(QuicStreamId id) const {
1730 auto it = stream_map_.find(id);
1731 if (it == stream_map_.end()) {
1732 return false;
1733 }
1734 return it->second->is_static();
1735 }
1736
GetNumOpenIncomingStreams() const1737 size_t QuicSession::GetNumOpenIncomingStreams() const {
1738 return num_dynamic_incoming_streams_ - num_draining_incoming_streams_ +
1739 num_locally_closed_incoming_streams_highest_offset_;
1740 }
1741
GetNumOpenOutgoingStreams() const1742 size_t QuicSession::GetNumOpenOutgoingStreams() const {
1743 DCHECK_GE(GetNumDynamicOutgoingStreams() +
1744 GetNumLocallyClosedOutgoingStreamsHighestOffset(),
1745 GetNumDrainingOutgoingStreams());
1746 return GetNumDynamicOutgoingStreams() +
1747 GetNumLocallyClosedOutgoingStreamsHighestOffset() -
1748 GetNumDrainingOutgoingStreams();
1749 }
1750
GetNumActiveStreams() const1751 size_t QuicSession::GetNumActiveStreams() const {
1752 return stream_map_.size() - draining_streams_.size() -
1753 num_incoming_static_streams_ - num_outgoing_static_streams_;
1754 }
1755
GetNumDrainingStreams() const1756 size_t QuicSession::GetNumDrainingStreams() const {
1757 return draining_streams_.size();
1758 }
1759
MarkConnectionLevelWriteBlocked(QuicStreamId id)1760 void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
1761 if (GetOrCreateStream(id) == nullptr) {
1762 QUIC_BUG << "Marking unknown stream " << id << " blocked.";
1763 QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
1764 }
1765
1766 QUIC_DVLOG(1) << ENDPOINT << "Adding stream " << id
1767 << " to write-blocked list";
1768
1769 write_blocked_streams_.AddStream(id);
1770 }
1771
HasDataToWrite() const1772 bool QuicSession::HasDataToWrite() const {
1773 return write_blocked_streams_.HasWriteBlockedSpecialStream() ||
1774 write_blocked_streams_.HasWriteBlockedDataStreams() ||
1775 connection_->HasQueuedData() ||
1776 !streams_with_pending_retransmission_.empty() ||
1777 control_frame_manager_.WillingToWrite();
1778 }
1779
OnAckNeedsRetransmittableFrame()1780 void QuicSession::OnAckNeedsRetransmittableFrame() {
1781 flow_controller_.SendWindowUpdate();
1782 }
1783
SendPing()1784 void QuicSession::SendPing() {
1785 control_frame_manager_.WritePing();
1786 }
1787
GetNumDynamicOutgoingStreams() const1788 size_t QuicSession::GetNumDynamicOutgoingStreams() const {
1789 DCHECK_GE(
1790 static_cast<size_t>(stream_map_.size() + pending_stream_map_.size()),
1791 num_dynamic_incoming_streams_ + num_outgoing_static_streams_ +
1792 num_incoming_static_streams_);
1793 return stream_map_.size() + pending_stream_map_.size() -
1794 num_dynamic_incoming_streams_ - num_outgoing_static_streams_ -
1795 num_incoming_static_streams_;
1796 }
1797
GetNumDrainingOutgoingStreams() const1798 size_t QuicSession::GetNumDrainingOutgoingStreams() const {
1799 DCHECK_GE(draining_streams_.size(), num_draining_incoming_streams_);
1800 return draining_streams_.size() - num_draining_incoming_streams_;
1801 }
1802
GetNumLocallyClosedOutgoingStreamsHighestOffset() const1803 size_t QuicSession::GetNumLocallyClosedOutgoingStreamsHighestOffset() const {
1804 DCHECK_GE(locally_closed_streams_highest_offset_.size(),
1805 num_locally_closed_incoming_streams_highest_offset_);
1806 return locally_closed_streams_highest_offset_.size() -
1807 num_locally_closed_incoming_streams_highest_offset_;
1808 }
1809
IsConnectionFlowControlBlocked() const1810 bool QuicSession::IsConnectionFlowControlBlocked() const {
1811 return flow_controller_.IsBlocked();
1812 }
1813
IsStreamFlowControlBlocked()1814 bool QuicSession::IsStreamFlowControlBlocked() {
1815 for (auto const& kv : stream_map_) {
1816 if (kv.second->flow_controller()->IsBlocked()) {
1817 return true;
1818 }
1819 }
1820 if (!QuicVersionUsesCryptoFrames(transport_version()) &&
1821 GetMutableCryptoStream()->flow_controller()->IsBlocked()) {
1822 return true;
1823 }
1824 return false;
1825 }
1826
MaxAvailableBidirectionalStreams() const1827 size_t QuicSession::MaxAvailableBidirectionalStreams() const {
1828 if (VersionHasIetfQuicFrames(transport_version())) {
1829 return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
1830 }
1831 return stream_id_manager_.MaxAvailableStreams();
1832 }
1833
MaxAvailableUnidirectionalStreams() const1834 size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
1835 if (VersionHasIetfQuicFrames(transport_version())) {
1836 return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
1837 }
1838 return stream_id_manager_.MaxAvailableStreams();
1839 }
1840
IsIncomingStream(QuicStreamId id) const1841 bool QuicSession::IsIncomingStream(QuicStreamId id) const {
1842 if (VersionHasIetfQuicFrames(transport_version())) {
1843 return v99_streamid_manager_.IsIncomingStream(id);
1844 }
1845 return stream_id_manager_.IsIncomingStream(id);
1846 }
1847
OnStreamDoneWaitingForAcks(QuicStreamId id)1848 void QuicSession::OnStreamDoneWaitingForAcks(QuicStreamId id) {
1849 streams_waiting_for_acks_.erase(id);
1850
1851 auto it = zombie_streams_.find(id);
1852 if (it == zombie_streams_.end()) {
1853 return;
1854 }
1855
1856 closed_streams_.push_back(std::move(it->second));
1857 if (!closed_streams_clean_up_alarm_->IsSet()) {
1858 closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
1859 }
1860 zombie_streams_.erase(it);
1861 // Do not retransmit data of a closed stream.
1862 streams_with_pending_retransmission_.erase(id);
1863 }
1864
OnStreamWaitingForAcks(QuicStreamId id)1865 void QuicSession::OnStreamWaitingForAcks(QuicStreamId id) {
1866 // Exclude crypto stream's status since it is counted in HasUnackedCryptoData.
1867 if (GetCryptoStream() != nullptr && id == GetCryptoStream()->id()) {
1868 return;
1869 }
1870
1871 streams_waiting_for_acks_.insert(id);
1872
1873 // The number of the streams waiting for acks should not be larger than the
1874 // number of streams.
1875 if (static_cast<size_t>(stream_map_.size() + zombie_streams_.size()) <
1876 streams_waiting_for_acks_.size()) {
1877 QUIC_BUG << "More streams are waiting for acks than the number of streams. "
1878 << "Sizes: streams: " << stream_map_.size()
1879 << ", zombie streams: " << zombie_streams_.size()
1880 << ", vs streams waiting for acks: "
1881 << streams_waiting_for_acks_.size();
1882 }
1883 }
1884
GetStream(QuicStreamId id) const1885 QuicStream* QuicSession::GetStream(QuicStreamId id) const {
1886 auto active_stream = stream_map_.find(id);
1887 if (active_stream != stream_map_.end()) {
1888 return active_stream->second.get();
1889 }
1890 auto zombie_stream = zombie_streams_.find(id);
1891 if (zombie_stream != zombie_streams_.end()) {
1892 return zombie_stream->second.get();
1893 }
1894
1895 if (QuicUtils::IsCryptoStreamId(transport_version(), id)) {
1896 return const_cast<QuicCryptoStream*>(GetCryptoStream());
1897 }
1898
1899 return nullptr;
1900 }
1901
OnFrameAcked(const QuicFrame & frame,QuicTime::Delta ack_delay_time,QuicTime receive_timestamp)1902 bool QuicSession::OnFrameAcked(const QuicFrame& frame,
1903 QuicTime::Delta ack_delay_time,
1904 QuicTime receive_timestamp) {
1905 if (frame.type == MESSAGE_FRAME) {
1906 OnMessageAcked(frame.message_frame->message_id, receive_timestamp);
1907 return true;
1908 }
1909 if (frame.type == CRYPTO_FRAME) {
1910 return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
1911 ack_delay_time);
1912 }
1913 if (frame.type != STREAM_FRAME) {
1914 return control_frame_manager_.OnControlFrameAcked(frame);
1915 }
1916 bool new_stream_data_acked = false;
1917 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1918 // Stream can already be reset when sent frame gets acked.
1919 if (stream != nullptr) {
1920 QuicByteCount newly_acked_length = 0;
1921 new_stream_data_acked = stream->OnStreamFrameAcked(
1922 frame.stream_frame.offset, frame.stream_frame.data_length,
1923 frame.stream_frame.fin, ack_delay_time, receive_timestamp,
1924 &newly_acked_length);
1925 if (!stream->HasPendingRetransmission()) {
1926 streams_with_pending_retransmission_.erase(stream->id());
1927 }
1928 }
1929 return new_stream_data_acked;
1930 }
1931
OnStreamFrameRetransmitted(const QuicStreamFrame & frame)1932 void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
1933 QuicStream* stream = GetStream(frame.stream_id);
1934 if (stream == nullptr) {
1935 QUIC_BUG << "Stream: " << frame.stream_id << " is closed when " << frame
1936 << " is retransmitted.";
1937 connection()->CloseConnection(
1938 QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
1939 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1940 return;
1941 }
1942 stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
1943 frame.fin);
1944 }
1945
OnFrameLost(const QuicFrame & frame)1946 void QuicSession::OnFrameLost(const QuicFrame& frame) {
1947 if (frame.type == MESSAGE_FRAME) {
1948 OnMessageLost(frame.message_frame->message_id);
1949 return;
1950 }
1951 if (frame.type == CRYPTO_FRAME) {
1952 GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
1953 return;
1954 }
1955 if (frame.type != STREAM_FRAME) {
1956 control_frame_manager_.OnControlFrameLost(frame);
1957 return;
1958 }
1959 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1960 if (stream == nullptr) {
1961 return;
1962 }
1963 stream->OnStreamFrameLost(frame.stream_frame.offset,
1964 frame.stream_frame.data_length,
1965 frame.stream_frame.fin);
1966 if (stream->HasPendingRetransmission() &&
1967 !QuicContainsKey(streams_with_pending_retransmission_,
1968 frame.stream_frame.stream_id)) {
1969 streams_with_pending_retransmission_.insert(
1970 std::make_pair(frame.stream_frame.stream_id, true));
1971 }
1972 }
1973
RetransmitFrames(const QuicFrames & frames,TransmissionType type)1974 void QuicSession::RetransmitFrames(const QuicFrames& frames,
1975 TransmissionType type) {
1976 QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
1977 if (!write_with_transmission_) {
1978 SetTransmissionType(type);
1979 }
1980 for (const QuicFrame& frame : frames) {
1981 if (frame.type == MESSAGE_FRAME) {
1982 // Do not retransmit MESSAGE frames.
1983 continue;
1984 }
1985 if (frame.type == CRYPTO_FRAME) {
1986 GetMutableCryptoStream()->RetransmitData(frame.crypto_frame, type);
1987 continue;
1988 }
1989 if (frame.type != STREAM_FRAME) {
1990 if (!control_frame_manager_.RetransmitControlFrame(frame, type)) {
1991 break;
1992 }
1993 continue;
1994 }
1995 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1996 if (stream != nullptr &&
1997 !stream->RetransmitStreamData(frame.stream_frame.offset,
1998 frame.stream_frame.data_length,
1999 frame.stream_frame.fin, type)) {
2000 break;
2001 }
2002 }
2003 }
2004
IsFrameOutstanding(const QuicFrame & frame) const2005 bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
2006 if (frame.type == MESSAGE_FRAME) {
2007 return false;
2008 }
2009 if (frame.type == CRYPTO_FRAME) {
2010 return GetCryptoStream()->IsFrameOutstanding(
2011 frame.crypto_frame->level, frame.crypto_frame->offset,
2012 frame.crypto_frame->data_length);
2013 }
2014 if (frame.type != STREAM_FRAME) {
2015 return control_frame_manager_.IsControlFrameOutstanding(frame);
2016 }
2017 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2018 return stream != nullptr &&
2019 stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
2020 frame.stream_frame.data_length,
2021 frame.stream_frame.fin);
2022 }
2023
HasUnackedCryptoData() const2024 bool QuicSession::HasUnackedCryptoData() const {
2025 const QuicCryptoStream* crypto_stream = GetCryptoStream();
2026 return crypto_stream->IsWaitingForAcks() || crypto_stream->HasBufferedData();
2027 }
2028
HasUnackedStreamData() const2029 bool QuicSession::HasUnackedStreamData() const {
2030 return !streams_waiting_for_acks_.empty();
2031 }
2032
GetHandshakeState() const2033 HandshakeState QuicSession::GetHandshakeState() const {
2034 return GetCryptoStream()->GetHandshakeState();
2035 }
2036
WriteStreamData(QuicStreamId id,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2037 WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
2038 QuicStreamOffset offset,
2039 QuicByteCount data_length,
2040 QuicDataWriter* writer) {
2041 QuicStream* stream = GetStream(id);
2042 if (stream == nullptr) {
2043 // This causes the connection to be closed because of failed to serialize
2044 // packet.
2045 QUIC_BUG << "Stream " << id << " does not exist when trying to write data."
2046 << " version:" << transport_version();
2047 return STREAM_MISSING;
2048 }
2049 if (stream->WriteStreamData(offset, data_length, writer)) {
2050 return WRITE_SUCCESS;
2051 }
2052 return WRITE_FAILED;
2053 }
2054
WriteCryptoData(EncryptionLevel level,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2055 bool QuicSession::WriteCryptoData(EncryptionLevel level,
2056 QuicStreamOffset offset,
2057 QuicByteCount data_length,
2058 QuicDataWriter* writer) {
2059 return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
2060 writer);
2061 }
2062
GetStatelessResetToken() const2063 QuicUint128 QuicSession::GetStatelessResetToken() const {
2064 return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
2065 }
2066
RetransmitLostData()2067 bool QuicSession::RetransmitLostData() {
2068 QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2069 // Retransmit crypto data first.
2070 bool uses_crypto_frames = QuicVersionUsesCryptoFrames(transport_version());
2071 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2072 if (uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
2073 if (!write_with_transmission_) {
2074 SetTransmissionType(HANDSHAKE_RETRANSMISSION);
2075 }
2076 crypto_stream->WritePendingCryptoRetransmission();
2077 }
2078 // Retransmit crypto data in stream 1 frames (version < 47).
2079 if (!uses_crypto_frames &&
2080 QuicContainsKey(streams_with_pending_retransmission_,
2081 QuicUtils::GetCryptoStreamId(transport_version()))) {
2082 if (!write_with_transmission_) {
2083 SetTransmissionType(HANDSHAKE_RETRANSMISSION);
2084 }
2085 // Retransmit crypto data first.
2086 QuicStream* crypto_stream =
2087 GetStream(QuicUtils::GetCryptoStreamId(transport_version()));
2088 crypto_stream->OnCanWrite();
2089 DCHECK(CheckStreamWriteBlocked(crypto_stream));
2090 if (crypto_stream->HasPendingRetransmission()) {
2091 // Connection is write blocked.
2092 return false;
2093 } else {
2094 streams_with_pending_retransmission_.erase(
2095 QuicUtils::GetCryptoStreamId(transport_version()));
2096 }
2097 }
2098 if (control_frame_manager_.HasPendingRetransmission()) {
2099 if (!write_with_transmission_) {
2100 SetTransmissionType(LOSS_RETRANSMISSION);
2101 }
2102 control_frame_manager_.OnCanWrite();
2103 if (control_frame_manager_.HasPendingRetransmission()) {
2104 return false;
2105 }
2106 }
2107 while (!streams_with_pending_retransmission_.empty()) {
2108 if (!connection_->CanWriteStreamData()) {
2109 break;
2110 }
2111 // Retransmit lost data on headers and data streams.
2112 const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
2113 QuicStream* stream = GetStream(id);
2114 if (stream != nullptr) {
2115 if (!write_with_transmission_) {
2116 SetTransmissionType(LOSS_RETRANSMISSION);
2117 }
2118 stream->OnCanWrite();
2119 DCHECK(CheckStreamWriteBlocked(stream));
2120 if (stream->HasPendingRetransmission()) {
2121 // Connection is write blocked.
2122 break;
2123 } else if (!streams_with_pending_retransmission_.empty() &&
2124 streams_with_pending_retransmission_.begin()->first == id) {
2125 // Retransmit lost data may cause connection close. If this stream
2126 // has not yet sent fin, a RST_STREAM will be sent and it will be
2127 // removed from streams_with_pending_retransmission_.
2128 streams_with_pending_retransmission_.pop_front();
2129 }
2130 } else {
2131 QUIC_BUG << "Try to retransmit data of a closed stream";
2132 streams_with_pending_retransmission_.pop_front();
2133 }
2134 }
2135
2136 return streams_with_pending_retransmission_.empty();
2137 }
2138
NeuterUnencryptedData()2139 void QuicSession::NeuterUnencryptedData() {
2140 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2141 crypto_stream->NeuterUnencryptedStreamData();
2142 if (!crypto_stream->HasPendingRetransmission() &&
2143 !QuicVersionUsesCryptoFrames(transport_version())) {
2144 streams_with_pending_retransmission_.erase(
2145 QuicUtils::GetCryptoStreamId(transport_version()));
2146 }
2147 connection_->NeuterUnencryptedPackets();
2148 }
2149
SetTransmissionType(TransmissionType type)2150 void QuicSession::SetTransmissionType(TransmissionType type) {
2151 connection_->SetTransmissionType(type);
2152 }
2153
SendMessage(QuicMemSliceSpan message)2154 MessageResult QuicSession::SendMessage(QuicMemSliceSpan message) {
2155 return SendMessage(message, /*flush=*/false);
2156 }
2157
SendMessage(QuicMemSliceSpan message,bool flush)2158 MessageResult QuicSession::SendMessage(QuicMemSliceSpan message, bool flush) {
2159 DCHECK(connection_->connected())
2160 << ENDPOINT << "Try to write messages when connection is closed.";
2161 if (!IsEncryptionEstablished()) {
2162 return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
2163 }
2164 MessageStatus result =
2165 connection_->SendMessage(last_message_id_ + 1, message, flush);
2166 if (result == MESSAGE_STATUS_SUCCESS) {
2167 return {result, ++last_message_id_};
2168 }
2169 return {result, 0};
2170 }
2171
OnMessageAcked(QuicMessageId message_id,QuicTime)2172 void QuicSession::OnMessageAcked(QuicMessageId message_id,
2173 QuicTime /*receive_timestamp*/) {
2174 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
2175 }
2176
OnMessageLost(QuicMessageId message_id)2177 void QuicSession::OnMessageLost(QuicMessageId message_id) {
2178 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
2179 << " is considered lost";
2180 }
2181
CleanUpClosedStreams()2182 void QuicSession::CleanUpClosedStreams() {
2183 closed_streams_.clear();
2184 }
2185
GetCurrentLargestMessagePayload() const2186 QuicPacketLength QuicSession::GetCurrentLargestMessagePayload() const {
2187 return connection_->GetCurrentLargestMessagePayload();
2188 }
2189
GetGuaranteedLargestMessagePayload() const2190 QuicPacketLength QuicSession::GetGuaranteedLargestMessagePayload() const {
2191 return connection_->GetGuaranteedLargestMessagePayload();
2192 }
2193
SendStopSending(uint16_t code,QuicStreamId stream_id)2194 void QuicSession::SendStopSending(uint16_t code, QuicStreamId stream_id) {
2195 control_frame_manager_.WriteOrBufferStopSending(code, stream_id);
2196 }
2197
next_outgoing_bidirectional_stream_id() const2198 QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
2199 if (VersionHasIetfQuicFrames(transport_version())) {
2200 return v99_streamid_manager_.next_outgoing_bidirectional_stream_id();
2201 }
2202 return stream_id_manager_.next_outgoing_stream_id();
2203 }
2204
next_outgoing_unidirectional_stream_id() const2205 QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
2206 if (VersionHasIetfQuicFrames(transport_version())) {
2207 return v99_streamid_manager_.next_outgoing_unidirectional_stream_id();
2208 }
2209 return stream_id_manager_.next_outgoing_stream_id();
2210 }
2211
OnMaxStreamsFrame(const QuicMaxStreamsFrame & frame)2212 bool QuicSession::OnMaxStreamsFrame(const QuicMaxStreamsFrame& frame) {
2213 const bool allow_new_streams =
2214 frame.unidirectional
2215 ? v99_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
2216 frame.stream_count)
2217 : v99_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
2218 frame.stream_count);
2219 if (allow_new_streams) {
2220 OnCanCreateNewOutgoingStream(frame.unidirectional);
2221 }
2222
2223 return true;
2224 }
2225
OnStreamsBlockedFrame(const QuicStreamsBlockedFrame & frame)2226 bool QuicSession::OnStreamsBlockedFrame(const QuicStreamsBlockedFrame& frame) {
2227 std::string error_details;
2228 if (v99_streamid_manager_.OnStreamsBlockedFrame(frame, &error_details)) {
2229 return true;
2230 }
2231 connection_->CloseConnection(
2232 QUIC_STREAMS_BLOCKED_ERROR, error_details,
2233 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2234 return false;
2235 }
2236
max_open_incoming_bidirectional_streams() const2237 size_t QuicSession::max_open_incoming_bidirectional_streams() const {
2238 if (VersionHasIetfQuicFrames(transport_version())) {
2239 return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2240 }
2241 return stream_id_manager_.max_open_incoming_streams();
2242 }
2243
max_open_incoming_unidirectional_streams() const2244 size_t QuicSession::max_open_incoming_unidirectional_streams() const {
2245 if (VersionHasIetfQuicFrames(transport_version())) {
2246 return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2247 }
2248 return stream_id_manager_.max_open_incoming_streams();
2249 }
2250
SelectAlpn(const std::vector<quiche::QuicheStringPiece> & alpns) const2251 std::vector<quiche::QuicheStringPiece>::const_iterator QuicSession::SelectAlpn(
2252 const std::vector<quiche::QuicheStringPiece>& alpns) const {
2253 const std::string alpn = AlpnForVersion(connection()->version());
2254 return std::find(alpns.cbegin(), alpns.cend(), alpn);
2255 }
2256
OnAlpnSelected(quiche::QuicheStringPiece alpn)2257 void QuicSession::OnAlpnSelected(quiche::QuicheStringPiece alpn) {
2258 QUIC_DLOG(INFO) << (perspective() == Perspective::IS_SERVER ? "Server: "
2259 : "Client: ")
2260 << "ALPN selected: " << alpn;
2261 }
2262
2263 #undef ENDPOINT // undef for jumbo builds
2264 } // namespace quic
2265