1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/third_party/quiche/src/quic/core/quic_session.h"
6 
7 #include <cstdint>
8 #include <string>
9 #include <utility>
10 
11 #include "net/third_party/quiche/src/quic/core/quic_connection.h"
12 #include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
13 #include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
14 #include "net/third_party/quiche/src/quic/core/quic_types.h"
15 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
16 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
17 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
18 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
19 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
20 #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
21 #include "net/third_party/quiche/src/quic/platform/api/quic_map_util.h"
22 #include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
23 #include "net/third_party/quiche/src/quic/platform/api/quic_server_stats.h"
24 #include "net/third_party/quiche/src/quic/platform/api/quic_stack_trace.h"
25 #include "net/third_party/quiche/src/common/platform/api/quiche_str_cat.h"
26 #include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
27 
28 using spdy::SpdyPriority;
29 
30 namespace quic {
31 
32 namespace {
33 
34 class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
35  public:
ClosedStreamsCleanUpDelegate(QuicSession * session)36   explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
37       : session_(session) {}
38   ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
39   ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
40       delete;
41 
OnAlarm()42   void OnAlarm() override { session_->CleanUpClosedStreams(); }
43 
44  private:
45   QuicSession* session_;
46 };
47 
48 // TODO(renjietang): remove this function once
49 // gfe2_reloadable_flag_quic_write_with_transmission is deprecated.
CountTransmissionTypeFlag(TransmissionType type)50 void CountTransmissionTypeFlag(TransmissionType type) {
51   switch (type) {
52     case NOT_RETRANSMISSION:
53       QUIC_RELOADABLE_FLAG_COUNT_N(quic_write_with_transmission, 1, 4);
54       break;
55     case HANDSHAKE_RETRANSMISSION:
56       QUIC_RELOADABLE_FLAG_COUNT_N(quic_write_with_transmission, 2, 4);
57       break;
58     case LOSS_RETRANSMISSION:
59       QUIC_RELOADABLE_FLAG_COUNT_N(quic_write_with_transmission, 3, 4);
60       break;
61     default:
62       QUIC_RELOADABLE_FLAG_COUNT_N(quic_write_with_transmission, 4, 4);
63   }
64 }
65 
66 }  // namespace
67 
68 #define ENDPOINT \
69   (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
70 
QuicSession(QuicConnection * connection,Visitor * owner,const QuicConfig & config,const ParsedQuicVersionVector & supported_versions,QuicStreamCount num_expected_unidirectional_static_streams)71 QuicSession::QuicSession(
72     QuicConnection* connection,
73     Visitor* owner,
74     const QuicConfig& config,
75     const ParsedQuicVersionVector& supported_versions,
76     QuicStreamCount num_expected_unidirectional_static_streams)
77     : connection_(connection),
78       perspective_(connection->perspective()),
79       visitor_(owner),
80       write_blocked_streams_(connection->transport_version()),
81       config_(config),
82       stream_id_manager_(perspective(),
83                          connection->transport_version(),
84                          kDefaultMaxStreamsPerConnection,
85                          config_.GetMaxBidirectionalStreamsToSend()),
86       v99_streamid_manager_(perspective(),
87                             connection->version(),
88                             this,
89                             0,
90                             num_expected_unidirectional_static_streams,
91                             config_.GetMaxBidirectionalStreamsToSend(),
92                             config_.GetMaxUnidirectionalStreamsToSend() +
93                                 num_expected_unidirectional_static_streams),
94       num_dynamic_incoming_streams_(0),
95       num_draining_incoming_streams_(0),
96       num_outgoing_static_streams_(0),
97       num_incoming_static_streams_(0),
98       num_locally_closed_incoming_streams_highest_offset_(0),
99       flow_controller_(
100           this,
101           QuicUtils::GetInvalidStreamId(connection->transport_version()),
102           /*is_connection_flow_controller*/ true,
103           connection->version().AllowsLowFlowControlLimits()
104               ? 0
105               : kMinimumFlowControlSendWindow,
106           config_.GetInitialSessionFlowControlWindowToSend(),
107           kSessionReceiveWindowLimit,
108           perspective() == Perspective::IS_SERVER,
109           nullptr),
110       currently_writing_stream_id_(0),
111       goaway_sent_(false),
112       goaway_received_(false),
113       control_frame_manager_(this),
114       last_message_id_(0),
115       datagram_queue_(this),
116       closed_streams_clean_up_alarm_(nullptr),
117       supported_versions_(supported_versions),
118       use_http2_priority_write_scheduler_(false),
119       is_configured_(false),
120       num_expected_unidirectional_static_streams_(
121           num_expected_unidirectional_static_streams),
122       enable_round_robin_scheduling_(false),
123       write_with_transmission_(
124           GetQuicReloadableFlag(quic_write_with_transmission)) {
125   closed_streams_clean_up_alarm_ =
126       QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
127           new ClosedStreamsCleanUpDelegate(this)));
128   if (perspective() == Perspective::IS_SERVER &&
129       connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
130     config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
131   }
132 }
133 
Initialize()134 void QuicSession::Initialize() {
135   connection_->set_visitor(this);
136   connection_->SetSessionNotifier(this);
137   connection_->SetDataProducer(this);
138   connection_->SetFromConfig(config_);
139 
140   // On the server side, version negotiation has been done by the dispatcher,
141   // and the server session is created with the right version.
142   if (perspective() == Perspective::IS_SERVER) {
143     connection_->OnSuccessfulVersionNegotiation();
144   }
145 
146   if (QuicVersionUsesCryptoFrames(transport_version())) {
147     return;
148   }
149 
150   DCHECK_EQ(QuicUtils::GetCryptoStreamId(transport_version()),
151             GetMutableCryptoStream()->id());
152 }
153 
~QuicSession()154 QuicSession::~QuicSession() {
155   QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
156   QUIC_LOG_IF(WARNING, num_locally_closed_incoming_streams_highest_offset() >
157                            stream_id_manager_.max_open_incoming_streams())
158       << "Surprisingly high number of locally closed peer initiated streams"
159          "still waiting for final byte offset: "
160       << num_locally_closed_incoming_streams_highest_offset();
161   QUIC_LOG_IF(WARNING, GetNumLocallyClosedOutgoingStreamsHighestOffset() >
162                            stream_id_manager_.max_open_outgoing_streams())
163       << "Surprisingly high number of locally closed self initiated streams"
164          "still waiting for final byte offset: "
165       << GetNumLocallyClosedOutgoingStreamsHighestOffset();
166 }
167 
PendingStreamOnStreamFrame(const QuicStreamFrame & frame)168 void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
169   DCHECK(VersionUsesHttp3(transport_version()));
170   QuicStreamId stream_id = frame.stream_id;
171 
172   PendingStream* pending = GetOrCreatePendingStream(stream_id);
173 
174   if (!pending) {
175     if (frame.fin) {
176       QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
177       OnFinalByteOffsetReceived(stream_id, final_byte_offset);
178     }
179     return;
180   }
181 
182   pending->OnStreamFrame(frame);
183   if (!connection()->connected()) {
184     return;
185   }
186   if (ProcessPendingStream(pending)) {
187     // The pending stream should now be in the scope of normal streams.
188     DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
189         << "Stream " << stream_id << " not created";
190     pending_stream_map_.erase(stream_id);
191     return;
192   }
193   if (pending->sequencer()->IsClosed()) {
194     ClosePendingStream(stream_id);
195   }
196 }
197 
OnStreamFrame(const QuicStreamFrame & frame)198 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
199   QuicStreamId stream_id = frame.stream_id;
200   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
201     connection()->CloseConnection(
202         QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
203         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
204     return;
205   }
206 
207   if (UsesPendingStreams() &&
208       QuicUtils::GetStreamType(stream_id, perspective(),
209                                IsIncomingStream(stream_id)) ==
210           READ_UNIDIRECTIONAL &&
211       stream_map_.find(stream_id) == stream_map_.end()) {
212     PendingStreamOnStreamFrame(frame);
213     return;
214   }
215 
216   QuicStream* stream = GetOrCreateStream(stream_id);
217 
218   if (!stream) {
219     // The stream no longer exists, but we may still be interested in the
220     // final stream byte offset sent by the peer. A frame with a FIN can give
221     // us this offset.
222     if (frame.fin) {
223       QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
224       OnFinalByteOffsetReceived(stream_id, final_byte_offset);
225     }
226     return;
227   }
228   stream->OnStreamFrame(frame);
229 }
230 
OnCryptoFrame(const QuicCryptoFrame & frame)231 void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
232   GetMutableCryptoStream()->OnCryptoFrame(frame);
233 }
234 
OnStopSendingFrame(const QuicStopSendingFrame & frame)235 void QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
236   // STOP_SENDING is in IETF QUIC only.
237   DCHECK(VersionHasIetfQuicFrames(transport_version()));
238   DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
239 
240   QuicStreamId stream_id = frame.stream_id;
241   // If Stream ID is invalid then close the connection.
242   // TODO(ianswett): This check is redundant to checks for IsClosedStream,
243   // but removing it requires removing multiple DCHECKs.
244   // TODO(ianswett): Multiple QUIC_DVLOGs could be QUIC_PEER_BUGs.
245   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
246     QUIC_DVLOG(1) << ENDPOINT
247                   << "Received STOP_SENDING with invalid stream_id: "
248                   << stream_id << " Closing connection";
249     connection()->CloseConnection(
250         QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
251         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
252     return;
253   }
254 
255   // If stream_id is READ_UNIDIRECTIONAL, close the connection.
256   if (QuicUtils::GetStreamType(stream_id, perspective(),
257                                IsIncomingStream(stream_id)) ==
258       READ_UNIDIRECTIONAL) {
259     QUIC_DVLOG(1) << ENDPOINT
260                   << "Received STOP_SENDING for a read-only stream_id: "
261                   << stream_id << ".";
262     connection()->CloseConnection(
263         QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a read-only stream",
264         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
265     return;
266   }
267 
268   if (visitor_) {
269     visitor_->OnStopSendingReceived(frame);
270   }
271 
272   QuicStream* stream = GetOrCreateStream(stream_id);
273   if (!stream) {
274     // Errors are handled by GetOrCreateStream.
275     return;
276   }
277 
278   if (!stream->OnStopSending(frame.application_error_code)) {
279     return;
280   }
281 
282   // TODO(renjietang): Consider moving those code into the stream.
283   if (connection()->connected()) {
284     MaybeSendRstStreamFrame(
285         stream->id(),
286         static_cast<quic::QuicRstStreamErrorCode>(frame.application_error_code),
287         stream->stream_bytes_written());
288     connection_->OnStreamReset(stream->id(),
289                                static_cast<quic::QuicRstStreamErrorCode>(
290                                    frame.application_error_code));
291   }
292   stream->set_rst_sent(true);
293   stream->CloseWriteSide();
294 }
295 
OnPacketDecrypted(EncryptionLevel level)296 void QuicSession::OnPacketDecrypted(EncryptionLevel level) {
297   GetMutableCryptoStream()->OnPacketDecrypted(level);
298 }
299 
OnOneRttPacketAcknowledged()300 void QuicSession::OnOneRttPacketAcknowledged() {
301   GetMutableCryptoStream()->OnOneRttPacketAcknowledged();
302 }
303 
PendingStreamOnRstStream(const QuicRstStreamFrame & frame)304 void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
305   DCHECK(VersionUsesHttp3(transport_version()));
306   QuicStreamId stream_id = frame.stream_id;
307 
308   PendingStream* pending = GetOrCreatePendingStream(stream_id);
309 
310   if (!pending) {
311     HandleRstOnValidNonexistentStream(frame);
312     return;
313   }
314 
315   pending->OnRstStreamFrame(frame);
316   // Pending stream is currently read only. We can safely close the stream.
317   DCHECK_EQ(READ_UNIDIRECTIONAL,
318             QuicUtils::GetStreamType(pending->id(), perspective(),
319                                      /*peer_initiated = */ true));
320   ClosePendingStream(stream_id);
321 }
322 
OnRstStream(const QuicRstStreamFrame & frame)323 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
324   QuicStreamId stream_id = frame.stream_id;
325   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
326     connection()->CloseConnection(
327         QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
328         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
329     return;
330   }
331 
332   if (VersionHasIetfQuicFrames(transport_version()) &&
333       QuicUtils::GetStreamType(stream_id, perspective(),
334                                IsIncomingStream(stream_id)) ==
335           WRITE_UNIDIRECTIONAL) {
336     connection()->CloseConnection(
337         QUIC_INVALID_STREAM_ID, "Received RESET_STREAM for a write-only stream",
338         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
339     return;
340   }
341 
342   if (visitor_) {
343     visitor_->OnRstStreamReceived(frame);
344   }
345 
346   if (UsesPendingStreams() &&
347       QuicUtils::GetStreamType(stream_id, perspective(),
348                                IsIncomingStream(stream_id)) ==
349           READ_UNIDIRECTIONAL &&
350       stream_map_.find(stream_id) == stream_map_.end()) {
351     PendingStreamOnRstStream(frame);
352     return;
353   }
354 
355   QuicStream* stream = GetOrCreateStream(stream_id);
356 
357   if (!stream) {
358     HandleRstOnValidNonexistentStream(frame);
359     return;  // Errors are handled by GetOrCreateStream.
360   }
361   stream->OnStreamReset(frame);
362 }
363 
OnGoAway(const QuicGoAwayFrame &)364 void QuicSession::OnGoAway(const QuicGoAwayFrame& /*frame*/) {
365   goaway_received_ = true;
366 }
367 
OnMessageReceived(quiche::QuicheStringPiece message)368 void QuicSession::OnMessageReceived(quiche::QuicheStringPiece message) {
369   QUIC_DVLOG(1) << ENDPOINT << "Received message, length: " << message.length()
370                 << ", " << message;
371 }
372 
OnHandshakeDoneReceived()373 void QuicSession::OnHandshakeDoneReceived() {
374   QUIC_DVLOG(1) << ENDPOINT << "OnHandshakeDoneReceived";
375   GetMutableCryptoStream()->OnHandshakeDoneReceived();
376 }
377 
378 // static
RecordConnectionCloseAtServer(QuicErrorCode error,ConnectionCloseSource source)379 void QuicSession::RecordConnectionCloseAtServer(QuicErrorCode error,
380                                                 ConnectionCloseSource source) {
381   if (error != QUIC_NO_ERROR) {
382     if (source == ConnectionCloseSource::FROM_SELF) {
383       QUIC_SERVER_HISTOGRAM_ENUM(
384           "quic_server_connection_close_errors", error, QUIC_LAST_ERROR,
385           "QuicErrorCode for server-closed connections.");
386     } else {
387       QUIC_SERVER_HISTOGRAM_ENUM(
388           "quic_client_connection_close_errors", error, QUIC_LAST_ERROR,
389           "QuicErrorCode for client-closed connections.");
390     }
391   }
392 }
393 
OnConnectionClosed(const QuicConnectionCloseFrame & frame,ConnectionCloseSource source)394 void QuicSession::OnConnectionClosed(const QuicConnectionCloseFrame& frame,
395                                      ConnectionCloseSource source) {
396   DCHECK(!connection_->connected());
397   if (perspective() == Perspective::IS_SERVER) {
398     RecordConnectionCloseAtServer(frame.quic_error_code, source);
399   }
400 
401   if (on_closed_frame_.extracted_error_code == QUIC_NO_ERROR) {
402     // Save all of the connection close information
403     on_closed_frame_ = frame;
404   }
405 
406   // Copy all non static streams in a new map for the ease of deleting.
407   QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
408   for (const auto& it : stream_map_) {
409     if (!it.second->is_static()) {
410       non_static_streams[it.first] = it.second.get();
411     }
412   }
413   for (const auto& it : non_static_streams) {
414     QuicStreamId id = it.first;
415     it.second->OnConnectionClosed(frame.quic_error_code, source);
416     if (stream_map_.find(id) != stream_map_.end()) {
417       QUIC_BUG << ENDPOINT << "Stream " << id
418                << " failed to close under OnConnectionClosed";
419       CloseStream(id);
420     }
421   }
422 
423   // Cleanup zombie stream map on connection close.
424   while (!zombie_streams_.empty()) {
425     ZombieStreamMap::iterator it = zombie_streams_.begin();
426     closed_streams_.push_back(std::move(it->second));
427     zombie_streams_.erase(it);
428   }
429 
430   closed_streams_clean_up_alarm_->Cancel();
431 
432   if (visitor_) {
433     visitor_->OnConnectionClosed(connection_->connection_id(),
434                                  frame.extracted_error_code,
435                                  frame.error_details, source);
436   }
437 }
438 
OnWriteBlocked()439 void QuicSession::OnWriteBlocked() {
440   if (!connection_->connected()) {
441     return;
442   }
443   if (visitor_) {
444     visitor_->OnWriteBlocked(connection_);
445   }
446 }
447 
OnSuccessfulVersionNegotiation(const ParsedQuicVersion &)448 void QuicSession::OnSuccessfulVersionNegotiation(
449     const ParsedQuicVersion& /*version*/) {}
450 
OnPacketReceived(const QuicSocketAddress &,const QuicSocketAddress & peer_address,bool is_connectivity_probe)451 void QuicSession::OnPacketReceived(const QuicSocketAddress& /*self_address*/,
452                                    const QuicSocketAddress& peer_address,
453                                    bool is_connectivity_probe) {
454   if (is_connectivity_probe && perspective() == Perspective::IS_SERVER) {
455     // Server only sends back a connectivity probe after received a
456     // connectivity probe from a new peer address.
457     connection_->SendConnectivityProbingResponsePacket(peer_address);
458   }
459 }
460 
OnPathDegrading()461 void QuicSession::OnPathDegrading() {}
462 
AllowSelfAddressChange() const463 bool QuicSession::AllowSelfAddressChange() const {
464   return false;
465 }
466 
OnForwardProgressConfirmed()467 void QuicSession::OnForwardProgressConfirmed() {}
468 
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)469 void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
470   // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
471   // assume that it still exists.
472   QuicStreamId stream_id = frame.stream_id;
473   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
474     // This is a window update that applies to the connection, rather than an
475     // individual stream.
476     QUIC_DVLOG(1) << ENDPOINT
477                   << "Received connection level flow control window "
478                      "update with max data: "
479                   << frame.max_data;
480     flow_controller_.UpdateSendWindowOffset(frame.max_data);
481     return;
482   }
483 
484   if (VersionHasIetfQuicFrames(transport_version()) &&
485       QuicUtils::GetStreamType(stream_id, perspective(),
486                                IsIncomingStream(stream_id)) ==
487           READ_UNIDIRECTIONAL) {
488     connection()->CloseConnection(
489         QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
490         "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.",
491         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
492     return;
493   }
494 
495   QuicStream* stream = GetOrCreateStream(stream_id);
496   if (stream != nullptr) {
497     stream->OnWindowUpdateFrame(frame);
498   }
499 }
500 
OnBlockedFrame(const QuicBlockedFrame & frame)501 void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
502   // TODO(rjshade): Compare our flow control receive windows for specified
503   //                streams: if we have a large window then maybe something
504   //                had gone wrong with the flow control accounting.
505   QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
506                   << frame.stream_id;
507 }
508 
CheckStreamNotBusyLooping(QuicStream * stream,uint64_t previous_bytes_written,bool previous_fin_sent)509 bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
510                                             uint64_t previous_bytes_written,
511                                             bool previous_fin_sent) {
512   if (  // Stream should not be closed.
513       !stream->write_side_closed() &&
514       // Not connection flow control blocked.
515       !flow_controller_.IsBlocked() &&
516       // Detect lack of forward progress.
517       previous_bytes_written == stream->stream_bytes_written() &&
518       previous_fin_sent == stream->fin_sent()) {
519     stream->set_busy_counter(stream->busy_counter() + 1);
520     QUIC_DVLOG(1) << ENDPOINT << "Suspected busy loop on stream id "
521                   << stream->id() << " stream_bytes_written "
522                   << stream->stream_bytes_written() << " fin "
523                   << stream->fin_sent() << " count " << stream->busy_counter();
524     // Wait a few iterations before firing, the exact count is
525     // arbitrary, more than a few to cover a few test-only false
526     // positives.
527     if (stream->busy_counter() > 20) {
528       QUIC_LOG(ERROR) << ENDPOINT << "Detected busy loop on stream id "
529                       << stream->id() << " stream_bytes_written "
530                       << stream->stream_bytes_written() << " fin "
531                       << stream->fin_sent();
532       return false;
533     }
534   } else {
535     stream->set_busy_counter(0);
536   }
537   return true;
538 }
539 
CheckStreamWriteBlocked(QuicStream * stream) const540 bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
541   if (!stream->write_side_closed() && stream->HasBufferedData() &&
542       !stream->flow_controller()->IsBlocked() &&
543       !write_blocked_streams_.IsStreamBlocked(stream->id())) {
544     QUIC_DLOG(ERROR) << ENDPOINT << "stream " << stream->id()
545                      << " has buffered " << stream->BufferedDataBytes()
546                      << " bytes, and is not flow control blocked, "
547                         "but it is not in the write block list.";
548     return false;
549   }
550   return true;
551 }
552 
OnCanWrite()553 void QuicSession::OnCanWrite() {
554   if (!RetransmitLostData()) {
555     // Cannot finish retransmitting lost data, connection is write blocked.
556     QUIC_DVLOG(1) << ENDPOINT
557                   << "Cannot finish retransmitting lost data, connection is "
558                      "write blocked.";
559     return;
560   }
561   if (!write_with_transmission_) {
562     SetTransmissionType(NOT_RETRANSMISSION);
563   }
564   // We limit the number of writes to the number of pending streams. If more
565   // streams become pending, WillingAndAbleToWrite will be true, which will
566   // cause the connection to request resumption before yielding to other
567   // connections.
568   // If we are connection level flow control blocked, then only allow the
569   // crypto and headers streams to try writing as all other streams will be
570   // blocked.
571   size_t num_writes = flow_controller_.IsBlocked()
572                           ? write_blocked_streams_.NumBlockedSpecialStreams()
573                           : write_blocked_streams_.NumBlockedStreams();
574   if (num_writes == 0 && !control_frame_manager_.WillingToWrite() &&
575       datagram_queue_.empty() &&
576       (!QuicVersionUsesCryptoFrames(transport_version()) ||
577        !GetCryptoStream()->HasBufferedCryptoFrames())) {
578     return;
579   }
580 
581   QuicConnection::ScopedPacketFlusher flusher(connection_);
582   if (QuicVersionUsesCryptoFrames(transport_version())) {
583     QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
584     if (crypto_stream->HasBufferedCryptoFrames()) {
585       crypto_stream->WriteBufferedCryptoFrames();
586     }
587     if (crypto_stream->HasBufferedCryptoFrames()) {
588       // Cannot finish writing buffered crypto frames, connection is write
589       // blocked.
590       return;
591     }
592   }
593   if (control_frame_manager_.WillingToWrite()) {
594     control_frame_manager_.OnCanWrite();
595   }
596   // TODO(b/147146815): this makes all datagrams go before stream data.  We
597   // should have a better priority scheme for this.
598   if (!datagram_queue_.empty()) {
599     size_t written = datagram_queue_.SendDatagrams();
600     QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams";
601     if (!datagram_queue_.empty()) {
602       return;
603     }
604   }
605   std::vector<QuicStreamId> last_writing_stream_ids;
606   for (size_t i = 0; i < num_writes; ++i) {
607     if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() ||
608           write_blocked_streams_.HasWriteBlockedDataStreams())) {
609       // Writing one stream removed another!? Something's broken.
610       QUIC_BUG << "WriteBlockedStream is missing, num_writes: " << num_writes
611                << ", finished_writes: " << i
612                << ", connected: " << connection_->connected()
613                << ", connection level flow control blocked: "
614                << flow_controller_.IsBlocked() << ", scheduler type: "
615                << spdy::WriteSchedulerTypeToString(
616                       write_blocked_streams_.scheduler_type());
617       for (QuicStreamId id : last_writing_stream_ids) {
618         QUIC_LOG(WARNING) << "last_writing_stream_id: " << id;
619       }
620       connection_->CloseConnection(QUIC_INTERNAL_ERROR,
621                                    "WriteBlockedStream is missing",
622                                    ConnectionCloseBehavior::SILENT_CLOSE);
623       return;
624     }
625     if (!connection_->CanWriteStreamData()) {
626       return;
627     }
628     currently_writing_stream_id_ = write_blocked_streams_.PopFront();
629     last_writing_stream_ids.push_back(currently_writing_stream_id_);
630     QUIC_DVLOG(1) << ENDPOINT << "Removing stream "
631                   << currently_writing_stream_id_ << " from write-blocked list";
632     QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
633     if (stream != nullptr && !stream->flow_controller()->IsBlocked()) {
634       // If the stream can't write all bytes it'll re-add itself to the blocked
635       // list.
636       uint64_t previous_bytes_written = stream->stream_bytes_written();
637       bool previous_fin_sent = stream->fin_sent();
638       QUIC_DVLOG(1) << ENDPOINT << "stream " << stream->id()
639                     << " bytes_written " << previous_bytes_written << " fin "
640                     << previous_fin_sent;
641       stream->OnCanWrite();
642       DCHECK(CheckStreamWriteBlocked(stream));
643       DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
644                                        previous_fin_sent));
645     }
646     currently_writing_stream_id_ = 0;
647   }
648 }
649 
SendProbingData()650 bool QuicSession::SendProbingData() {
651   if (connection()->sent_packet_manager().MaybeRetransmitOldestPacket(
652           PROBING_RETRANSMISSION)) {
653     return true;
654   }
655   return false;
656 }
657 
WillingAndAbleToWrite() const658 bool QuicSession::WillingAndAbleToWrite() const {
659   // Schedule a write when:
660   // 1) control frame manager has pending or new control frames, or
661   // 2) any stream has pending retransmissions, or
662   // 3) If the crypto or headers streams are blocked, or
663   // 4) connection is not flow control blocked and there are write blocked
664   // streams.
665   if (QuicVersionUsesCryptoFrames(transport_version()) &&
666       HasPendingHandshake()) {
667     return true;
668   }
669   return control_frame_manager_.WillingToWrite() ||
670          !streams_with_pending_retransmission_.empty() ||
671          write_blocked_streams_.HasWriteBlockedSpecialStream() ||
672          (!flow_controller_.IsBlocked() &&
673           write_blocked_streams_.HasWriteBlockedDataStreams());
674 }
675 
HasPendingHandshake() const676 bool QuicSession::HasPendingHandshake() const {
677   if (QuicVersionUsesCryptoFrames(transport_version())) {
678     return GetCryptoStream()->HasPendingCryptoRetransmission() ||
679            GetCryptoStream()->HasBufferedCryptoFrames();
680   }
681   return QuicContainsKey(streams_with_pending_retransmission_,
682                          QuicUtils::GetCryptoStreamId(transport_version())) ||
683          write_blocked_streams_.IsStreamBlocked(
684              QuicUtils::GetCryptoStreamId(transport_version()));
685 }
686 
GetNumOpenDynamicStreams() const687 uint64_t QuicSession::GetNumOpenDynamicStreams() const {
688   return stream_map_.size() - draining_streams_.size() +
689          locally_closed_streams_highest_offset_.size() -
690          num_incoming_static_streams_ - num_outgoing_static_streams_;
691 }
692 
ProcessUdpPacket(const QuicSocketAddress & self_address,const QuicSocketAddress & peer_address,const QuicReceivedPacket & packet)693 void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
694                                    const QuicSocketAddress& peer_address,
695                                    const QuicReceivedPacket& packet) {
696   connection_->ProcessUdpPacket(self_address, peer_address, packet);
697 }
698 
WritevData(QuicStreamId id,size_t write_length,QuicStreamOffset offset,StreamSendingState state,TransmissionType type,quiche::QuicheOptional<EncryptionLevel> level)699 QuicConsumedData QuicSession::WritevData(
700     QuicStreamId id,
701     size_t write_length,
702     QuicStreamOffset offset,
703     StreamSendingState state,
704     TransmissionType type,
705     quiche::QuicheOptional<EncryptionLevel> level) {
706   DCHECK(connection_->connected())
707       << ENDPOINT << "Try to write stream data when connection is closed.";
708   if (!IsEncryptionEstablished() &&
709       !QuicUtils::IsCryptoStreamId(transport_version(), id)) {
710     // Do not let streams write without encryption. The calling stream will end
711     // up write blocked until OnCanWrite is next called.
712     QUIC_BUG << ENDPOINT << "Try to send data of stream " << id
713              << " before encryption is established.";
714     return QuicConsumedData(0, false);
715   }
716 
717   if (write_with_transmission_) {
718     SetTransmissionType(type);
719     CountTransmissionTypeFlag(type);
720   }
721   const auto current_level = connection()->encryption_level();
722   if (level.has_value()) {
723     connection()->SetDefaultEncryptionLevel(level.value());
724   }
725 
726   QuicConsumedData data =
727       connection_->SendStreamData(id, write_length, offset, state);
728   if (type == NOT_RETRANSMISSION) {
729     // This is new stream data.
730     write_blocked_streams_.UpdateBytesForStream(id, data.bytes_consumed);
731   }
732 
733   // Restore the encryption level.
734   if (level.has_value()) {
735     connection()->SetDefaultEncryptionLevel(current_level);
736   }
737 
738   return data;
739 }
740 
SendCryptoData(EncryptionLevel level,size_t write_length,QuicStreamOffset offset,TransmissionType type)741 size_t QuicSession::SendCryptoData(EncryptionLevel level,
742                                    size_t write_length,
743                                    QuicStreamOffset offset,
744                                    TransmissionType type) {
745   DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
746   if (write_with_transmission_) {
747     SetTransmissionType(type);
748     CountTransmissionTypeFlag(type);
749   }
750   const auto current_level = connection()->encryption_level();
751   connection_->SetDefaultEncryptionLevel(level);
752   const auto bytes_consumed =
753       connection_->SendCryptoData(level, write_length, offset);
754   // Restores encryption level.
755   connection_->SetDefaultEncryptionLevel(current_level);
756   return bytes_consumed;
757 }
758 
WriteControlFrame(const QuicFrame & frame,TransmissionType type)759 bool QuicSession::WriteControlFrame(const QuicFrame& frame,
760                                     TransmissionType type) {
761   if (write_with_transmission_) {
762     SetTransmissionType(type);
763     CountTransmissionTypeFlag(type);
764   }
765   return connection_->SendControlFrame(frame);
766 }
767 
SendRstStream(QuicStreamId id,QuicRstStreamErrorCode error,QuicStreamOffset bytes_written)768 void QuicSession::SendRstStream(QuicStreamId id,
769                                 QuicRstStreamErrorCode error,
770                                 QuicStreamOffset bytes_written) {
771   if (connection()->connected()) {
772     QuicConnection::ScopedPacketFlusher flusher(connection());
773     MaybeSendRstStreamFrame(id, error, bytes_written);
774     MaybeSendStopSendingFrame(id, error);
775 
776     connection_->OnStreamReset(id, error);
777   }
778 
779   if (error != QUIC_STREAM_NO_ERROR && QuicContainsKey(zombie_streams_, id)) {
780     OnStreamDoneWaitingForAcks(id);
781     return;
782   }
783   CloseStreamInner(id, true);
784 }
785 
MaybeSendRstStreamFrame(QuicStreamId id,QuicRstStreamErrorCode error,QuicStreamOffset bytes_written)786 void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
787                                           QuicRstStreamErrorCode error,
788                                           QuicStreamOffset bytes_written) {
789   DCHECK(connection()->connected());
790   if (!VersionHasIetfQuicFrames(transport_version()) ||
791       QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id)) !=
792           READ_UNIDIRECTIONAL) {
793     control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
794   }
795 }
796 
MaybeSendStopSendingFrame(QuicStreamId id,QuicRstStreamErrorCode error)797 void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id,
798                                             QuicRstStreamErrorCode error) {
799   DCHECK(connection()->connected());
800   if (VersionHasIetfQuicFrames(transport_version()) &&
801       QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id)) !=
802           WRITE_UNIDIRECTIONAL) {
803     control_frame_manager_.WriteOrBufferStopSending(error, id);
804   }
805 }
806 
SendGoAway(QuicErrorCode error_code,const std::string & reason)807 void QuicSession::SendGoAway(QuicErrorCode error_code,
808                              const std::string& reason) {
809   // GOAWAY frame is not supported in v99.
810   DCHECK(!VersionHasIetfQuicFrames(transport_version()));
811   if (goaway_sent_) {
812     return;
813   }
814   goaway_sent_ = true;
815   control_frame_manager_.WriteOrBufferGoAway(
816       error_code, stream_id_manager_.largest_peer_created_stream_id(), reason);
817 }
818 
SendBlocked(QuicStreamId id)819 void QuicSession::SendBlocked(QuicStreamId id) {
820   control_frame_manager_.WriteOrBufferBlocked(id);
821 }
822 
SendWindowUpdate(QuicStreamId id,QuicStreamOffset byte_offset)823 void QuicSession::SendWindowUpdate(QuicStreamId id,
824                                    QuicStreamOffset byte_offset) {
825   control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
826 }
827 
OnStreamError(QuicErrorCode error_code,std::string error_details)828 void QuicSession::OnStreamError(QuicErrorCode error_code,
829                                 std::string error_details) {
830   connection_->CloseConnection(
831       error_code, error_details,
832       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
833 }
834 
SendMaxStreams(QuicStreamCount stream_count,bool unidirectional)835 void QuicSession::SendMaxStreams(QuicStreamCount stream_count,
836                                  bool unidirectional) {
837   if (!is_configured_) {
838     QUIC_BUG << "Try to send max streams before config negotiated.";
839     return;
840   }
841   control_frame_manager_.WriteOrBufferMaxStreams(stream_count, unidirectional);
842 }
843 
CloseStream(QuicStreamId stream_id)844 void QuicSession::CloseStream(QuicStreamId stream_id) {
845   CloseStreamInner(stream_id, false);
846 }
847 
InsertLocallyClosedStreamsHighestOffset(const QuicStreamId id,QuicStreamOffset offset)848 void QuicSession::InsertLocallyClosedStreamsHighestOffset(
849     const QuicStreamId id,
850     QuicStreamOffset offset) {
851   locally_closed_streams_highest_offset_[id] = offset;
852   if (IsIncomingStream(id)) {
853     ++num_locally_closed_incoming_streams_highest_offset_;
854   }
855 }
856 
CloseStreamInner(QuicStreamId stream_id,bool rst_sent)857 void QuicSession::CloseStreamInner(QuicStreamId stream_id, bool rst_sent) {
858   QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
859 
860   StreamMap::iterator it = stream_map_.find(stream_id);
861   if (it == stream_map_.end()) {
862     // When CloseStreamInner has been called recursively (via
863     // QuicStream::OnClose), the stream will already have been deleted
864     // from stream_map_, so return immediately.
865     QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
866     return;
867   }
868   QuicStream* stream = it->second.get();
869   if (stream->is_static()) {
870     QUIC_DVLOG(1) << ENDPOINT
871                   << "Try to close a static stream, id: " << stream_id
872                   << " Closing connection";
873     connection()->CloseConnection(
874         QUIC_INVALID_STREAM_ID, "Try to close a static stream",
875         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
876     return;
877   }
878   StreamType type = stream->type();
879 
880   // Tell the stream that a RST has been sent.
881   if (rst_sent) {
882     stream->set_rst_sent(true);
883   }
884 
885   if (stream->IsWaitingForAcks()) {
886     zombie_streams_[stream->id()] = std::move(it->second);
887   } else {
888     // Clean up the stream since it is no longer waiting for acks.
889     streams_waiting_for_acks_.erase(stream->id());
890     closed_streams_.push_back(std::move(it->second));
891     // Do not retransmit data of a closed stream.
892     streams_with_pending_retransmission_.erase(stream_id);
893     if (!closed_streams_clean_up_alarm_->IsSet()) {
894       closed_streams_clean_up_alarm_->Set(
895           connection_->clock()->ApproximateNow());
896     }
897   }
898 
899   // If we haven't received a FIN or RST for this stream, we need to keep track
900   // of the how many bytes the stream's flow controller believes it has
901   // received, for accurate connection level flow control accounting.
902   const bool had_fin_or_rst = stream->HasReceivedFinalOffset();
903   if (!had_fin_or_rst) {
904     InsertLocallyClosedStreamsHighestOffset(
905         stream_id, stream->flow_controller()->highest_received_byte_offset());
906   }
907   stream_map_.erase(it);
908   if (IsIncomingStream(stream_id)) {
909     --num_dynamic_incoming_streams_;
910   }
911 
912   const bool stream_was_draining =
913       draining_streams_.find(stream_id) != draining_streams_.end();
914   if (stream_was_draining) {
915     if (IsIncomingStream(stream_id)) {
916       --num_draining_incoming_streams_;
917     }
918     draining_streams_.erase(stream_id);
919   } else if (VersionHasIetfQuicFrames(transport_version())) {
920     // Stream was not draining, but we did have a fin or rst, so we can now
921     // free the stream ID if version 99.
922     if (had_fin_or_rst && connection_->connected()) {
923       // Do not bother informing stream ID manager if connection is closed.
924       v99_streamid_manager_.OnStreamClosed(stream_id);
925     }
926   }
927 
928   stream->OnClose();
929 
930   if (!stream_was_draining && !IsIncomingStream(stream_id) && had_fin_or_rst &&
931       !VersionHasIetfQuicFrames(transport_version())) {
932     // Streams that first became draining already called OnCanCreate...
933     // This covers the case where the stream went directly to being closed.
934     OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
935   }
936 }
937 
ClosePendingStream(QuicStreamId stream_id)938 void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
939   QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
940 
941   pending_stream_map_.erase(stream_id);
942   if (VersionHasIetfQuicFrames(transport_version()) &&
943       connection_->connected()) {
944     v99_streamid_manager_.OnStreamClosed(stream_id);
945   }
946 }
947 
OnFinalByteOffsetReceived(QuicStreamId stream_id,QuicStreamOffset final_byte_offset)948 void QuicSession::OnFinalByteOffsetReceived(
949     QuicStreamId stream_id,
950     QuicStreamOffset final_byte_offset) {
951   auto it = locally_closed_streams_highest_offset_.find(stream_id);
952   if (it == locally_closed_streams_highest_offset_.end()) {
953     return;
954   }
955 
956   QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
957                 << final_byte_offset << " for stream " << stream_id;
958   QuicByteCount offset_diff = final_byte_offset - it->second;
959   if (flow_controller_.UpdateHighestReceivedOffset(
960           flow_controller_.highest_received_byte_offset() + offset_diff)) {
961     // If the final offset violates flow control, close the connection now.
962     if (flow_controller_.FlowControlViolation()) {
963       connection_->CloseConnection(
964           QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
965           "Connection level flow control violation",
966           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
967       return;
968     }
969   }
970 
971   flow_controller_.AddBytesConsumed(offset_diff);
972   locally_closed_streams_highest_offset_.erase(it);
973   if (IsIncomingStream(stream_id)) {
974     --num_locally_closed_incoming_streams_highest_offset_;
975     if (VersionHasIetfQuicFrames(transport_version())) {
976       v99_streamid_manager_.OnStreamClosed(stream_id);
977     }
978   } else if (!VersionHasIetfQuicFrames(transport_version())) {
979     OnCanCreateNewOutgoingStream(false);
980   }
981 }
982 
IsEncryptionEstablished() const983 bool QuicSession::IsEncryptionEstablished() const {
984   if (GetCryptoStream() == nullptr) {
985     return false;
986   }
987   return GetCryptoStream()->encryption_established();
988 }
989 
OneRttKeysAvailable() const990 bool QuicSession::OneRttKeysAvailable() const {
991   if (GetCryptoStream() == nullptr) {
992     return false;
993   }
994   return GetCryptoStream()->one_rtt_keys_available();
995 }
996 
OnConfigNegotiated()997 void QuicSession::OnConfigNegotiated() {
998   QUIC_DVLOG(1) << ENDPOINT << "OnConfigNegotiated";
999   connection_->SetFromConfig(config_);
1000 
1001   if (VersionHasIetfQuicFrames(transport_version())) {
1002     uint32_t max_streams = 0;
1003     if (config_.HasReceivedMaxBidirectionalStreams()) {
1004       max_streams = config_.ReceivedMaxBidirectionalStreams();
1005     }
1006     QUIC_DVLOG(1) << ENDPOINT
1007                   << "Setting Bidirectional outgoing_max_streams_ to "
1008                   << max_streams;
1009     if (v99_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
1010             max_streams)) {
1011       OnCanCreateNewOutgoingStream(/*unidirectional = */ false);
1012     }
1013 
1014     max_streams = 0;
1015     if (config_.HasReceivedMaxUnidirectionalStreams()) {
1016       max_streams = config_.ReceivedMaxUnidirectionalStreams();
1017     }
1018     if (max_streams < num_expected_unidirectional_static_streams_) {
1019       // TODO(ianswett): Change this to an application error for HTTP/3.
1020       QUIC_DLOG(ERROR) << "Received unidirectional stream limit of "
1021                        << max_streams << " < "
1022                        << num_expected_unidirectional_static_streams_;
1023       connection_->CloseConnection(
1024           QUIC_MAX_STREAMS_ERROR, "New unidirectional stream limit is too low.",
1025           ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1026     }
1027     QUIC_DVLOG(1) << ENDPOINT
1028                   << "Setting Unidirectional outgoing_max_streams_ to "
1029                   << max_streams;
1030     if (v99_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
1031             max_streams)) {
1032       OnCanCreateNewOutgoingStream(/*unidirectional = */ true);
1033     }
1034   } else {
1035     uint32_t max_streams = 0;
1036     if (config_.HasReceivedMaxBidirectionalStreams()) {
1037       max_streams = config_.ReceivedMaxBidirectionalStreams();
1038     }
1039     QUIC_DVLOG(1) << ENDPOINT << "Setting max_open_outgoing_streams_ to "
1040                   << max_streams;
1041     stream_id_manager_.set_max_open_outgoing_streams(max_streams);
1042   }
1043 
1044   if (perspective() == Perspective::IS_SERVER) {
1045     if (config_.HasReceivedConnectionOptions()) {
1046       // The following variations change the initial receive flow control
1047       // window sizes.
1048       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
1049         AdjustInitialFlowControlWindows(64 * 1024);
1050       }
1051       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
1052         AdjustInitialFlowControlWindows(128 * 1024);
1053       }
1054       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
1055         AdjustInitialFlowControlWindows(256 * 1024);
1056       }
1057       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
1058         AdjustInitialFlowControlWindows(512 * 1024);
1059       }
1060       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
1061         AdjustInitialFlowControlWindows(1024 * 1024);
1062       }
1063       if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kH2PR) &&
1064           !VersionHasIetfQuicFrames(transport_version())) {
1065         // Enable HTTP2 (tree-style) priority write scheduler.
1066         use_http2_priority_write_scheduler_ =
1067             write_blocked_streams_.SwitchWriteScheduler(
1068                 spdy::WriteSchedulerType::HTTP2, transport_version());
1069       } else if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kFIFO)) {
1070         // Enable FIFO write scheduler.
1071         write_blocked_streams_.SwitchWriteScheduler(
1072             spdy::WriteSchedulerType::FIFO, transport_version());
1073       } else if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kLIFO)) {
1074         // Enable LIFO write scheduler.
1075         write_blocked_streams_.SwitchWriteScheduler(
1076             spdy::WriteSchedulerType::LIFO, transport_version());
1077       } else if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kRRWS) &&
1078                  write_blocked_streams_.scheduler_type() ==
1079                      spdy::WriteSchedulerType::SPDY) {
1080         enable_round_robin_scheduling_ = true;
1081       }
1082     }
1083 
1084     config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
1085   }
1086 
1087   if (VersionHasIetfQuicFrames(transport_version())) {
1088     v99_streamid_manager_.SetMaxOpenIncomingBidirectionalStreams(
1089         config_.GetMaxBidirectionalStreamsToSend());
1090     v99_streamid_manager_.SetMaxOpenIncomingUnidirectionalStreams(
1091         config_.GetMaxUnidirectionalStreamsToSend());
1092   } else {
1093     // A small number of additional incoming streams beyond the limit should be
1094     // allowed. This helps avoid early connection termination when FIN/RSTs for
1095     // old streams are lost or arrive out of order.
1096     // Use a minimum number of additional streams, or a percentage increase,
1097     // whichever is larger.
1098     uint32_t max_incoming_streams_to_send =
1099         config_.GetMaxBidirectionalStreamsToSend();
1100     uint32_t max_incoming_streams =
1101         std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
1102                  static_cast<uint32_t>(max_incoming_streams_to_send *
1103                                        kMaxStreamsMultiplier));
1104     stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
1105   }
1106 
1107   if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
1108     // When using IETF-style TLS transport parameters, inform existing streams
1109     // of new flow-control limits.
1110     if (config_.HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
1111       OnNewStreamOutgoingBidirectionalFlowControlWindow(
1112           config_.ReceivedInitialMaxStreamDataBytesOutgoingBidirectional());
1113     }
1114     if (config_.HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
1115       OnNewStreamIncomingBidirectionalFlowControlWindow(
1116           config_.ReceivedInitialMaxStreamDataBytesIncomingBidirectional());
1117     }
1118     if (config_.HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
1119       OnNewStreamUnidirectionalFlowControlWindow(
1120           config_.ReceivedInitialMaxStreamDataBytesUnidirectional());
1121     }
1122   } else {  // The version uses Google QUIC Crypto.
1123     if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
1124       // Streams which were created before the SHLO was received (0-RTT
1125       // requests) are now informed of the peer's initial flow control window.
1126       OnNewStreamFlowControlWindow(
1127           config_.ReceivedInitialStreamFlowControlWindowBytes());
1128     }
1129   }
1130 
1131   if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
1132     OnNewSessionFlowControlWindow(
1133         config_.ReceivedInitialSessionFlowControlWindowBytes());
1134   }
1135   is_configured_ = true;
1136   connection()->OnConfigNegotiated();
1137 
1138   // Ask flow controllers to try again since the config could have unblocked us.
1139   if (connection_->version().AllowsLowFlowControlLimits()) {
1140     OnCanWrite();
1141   }
1142 }
1143 
AdjustInitialFlowControlWindows(size_t stream_window)1144 void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
1145   const float session_window_multiplier =
1146       config_.GetInitialStreamFlowControlWindowToSend()
1147           ? static_cast<float>(
1148                 config_.GetInitialSessionFlowControlWindowToSend()) /
1149                 config_.GetInitialStreamFlowControlWindowToSend()
1150           : 1.5;
1151 
1152   QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
1153   config_.SetInitialStreamFlowControlWindowToSend(stream_window);
1154 
1155   size_t session_window = session_window_multiplier * stream_window;
1156   QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
1157                 << session_window;
1158   config_.SetInitialSessionFlowControlWindowToSend(session_window);
1159   flow_controller_.UpdateReceiveWindowSize(session_window);
1160   // Inform all existing streams about the new window.
1161   for (auto const& kv : stream_map_) {
1162     kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
1163   }
1164   if (!QuicVersionUsesCryptoFrames(transport_version())) {
1165     GetMutableCryptoStream()->flow_controller()->UpdateReceiveWindowSize(
1166         stream_window);
1167   }
1168 }
1169 
HandleFrameOnNonexistentOutgoingStream(QuicStreamId stream_id)1170 void QuicSession::HandleFrameOnNonexistentOutgoingStream(
1171     QuicStreamId stream_id) {
1172   DCHECK(!IsClosedStream(stream_id));
1173   // Received a frame for a locally-created stream that is not currently
1174   // active. This is an error.
1175   if (VersionHasIetfQuicFrames(transport_version())) {
1176     connection()->CloseConnection(
1177         QUIC_HTTP_STREAM_WRONG_DIRECTION, "Data for nonexistent stream",
1178         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1179     return;
1180   }
1181   connection()->CloseConnection(
1182       QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
1183       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1184 }
1185 
HandleRstOnValidNonexistentStream(const QuicRstStreamFrame & frame)1186 void QuicSession::HandleRstOnValidNonexistentStream(
1187     const QuicRstStreamFrame& frame) {
1188   // If the stream is neither originally in active streams nor created in
1189   // GetOrCreateStream(), it could be a closed stream in which case its
1190   // final received byte offset need to be updated.
1191   if (IsClosedStream(frame.stream_id)) {
1192     // The RST frame contains the final byte offset for the stream: we can now
1193     // update the connection level flow controller if needed.
1194     OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
1195   }
1196 }
1197 
OnNewStreamFlowControlWindow(QuicStreamOffset new_window)1198 void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
1199   QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamFlowControlWindow " << new_window;
1200   if (new_window < kMinimumFlowControlSendWindow &&
1201       !connection_->version().AllowsLowFlowControlLimits()) {
1202     QUIC_LOG_FIRST_N(ERROR, 1)
1203         << "Peer sent us an invalid stream flow control send window: "
1204         << new_window << ", below minimum: " << kMinimumFlowControlSendWindow;
1205     connection_->CloseConnection(
1206         QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
1207         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1208     return;
1209   }
1210 
1211   // Inform all existing streams about the new window.
1212   for (auto const& kv : stream_map_) {
1213     QUIC_DVLOG(1) << ENDPOINT << "Informing stream " << kv.first
1214                   << " of new stream flow control window " << new_window;
1215     kv.second->UpdateSendWindowOffset(new_window);
1216   }
1217   if (!QuicVersionUsesCryptoFrames(transport_version())) {
1218     QUIC_DVLOG(1)
1219         << ENDPOINT
1220         << "Informing crypto stream of new stream flow control window "
1221         << new_window;
1222     GetMutableCryptoStream()->UpdateSendWindowOffset(new_window);
1223   }
1224 }
1225 
OnNewStreamUnidirectionalFlowControlWindow(QuicStreamOffset new_window)1226 void QuicSession::OnNewStreamUnidirectionalFlowControlWindow(
1227     QuicStreamOffset new_window) {
1228   QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamUnidirectionalFlowControlWindow "
1229                 << new_window;
1230   // Inform all existing outgoing unidirectional streams about the new window.
1231   for (auto const& kv : stream_map_) {
1232     const QuicStreamId id = kv.first;
1233     if (QuicUtils::IsBidirectionalStreamId(id)) {
1234       continue;
1235     }
1236     if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1237                                        perspective())) {
1238       continue;
1239     }
1240     QUIC_DVLOG(1) << ENDPOINT << "Informing unidirectional stream " << id
1241                   << " of new stream flow control window " << new_window;
1242     kv.second->UpdateSendWindowOffset(new_window);
1243   }
1244 }
1245 
OnNewStreamOutgoingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1246 void QuicSession::OnNewStreamOutgoingBidirectionalFlowControlWindow(
1247     QuicStreamOffset new_window) {
1248   QUIC_DVLOG(1) << ENDPOINT
1249                 << "OnNewStreamOutgoingBidirectionalFlowControlWindow "
1250                 << new_window;
1251   // Inform all existing outgoing bidirectional streams about the new window.
1252   for (auto const& kv : stream_map_) {
1253     const QuicStreamId id = kv.first;
1254     if (!QuicUtils::IsBidirectionalStreamId(id)) {
1255       continue;
1256     }
1257     if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1258                                        perspective())) {
1259       continue;
1260     }
1261     QUIC_DVLOG(1) << ENDPOINT << "Informing outgoing bidirectional stream "
1262                   << id << " of new stream flow control window " << new_window;
1263     kv.second->UpdateSendWindowOffset(new_window);
1264   }
1265 }
1266 
OnNewStreamIncomingBidirectionalFlowControlWindow(QuicStreamOffset new_window)1267 void QuicSession::OnNewStreamIncomingBidirectionalFlowControlWindow(
1268     QuicStreamOffset new_window) {
1269   QUIC_DVLOG(1) << ENDPOINT
1270                 << "OnNewStreamIncomingBidirectionalFlowControlWindow "
1271                 << new_window;
1272   // Inform all existing incoming bidirectional streams about the new window.
1273   for (auto const& kv : stream_map_) {
1274     const QuicStreamId id = kv.first;
1275     if (!QuicUtils::IsBidirectionalStreamId(id)) {
1276       continue;
1277     }
1278     if (QuicUtils::IsOutgoingStreamId(connection_->version(), id,
1279                                       perspective())) {
1280       continue;
1281     }
1282     QUIC_DVLOG(1) << ENDPOINT << "Informing incoming bidirectional stream "
1283                   << id << " of new stream flow control window " << new_window;
1284     kv.second->UpdateSendWindowOffset(new_window);
1285   }
1286 }
1287 
OnNewSessionFlowControlWindow(QuicStreamOffset new_window)1288 void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
1289   QUIC_DVLOG(1) << ENDPOINT << "OnNewSessionFlowControlWindow " << new_window;
1290   if (new_window < kMinimumFlowControlSendWindow &&
1291       !connection_->version().AllowsLowFlowControlLimits()) {
1292     QUIC_LOG_FIRST_N(ERROR, 1)
1293         << "Peer sent us an invalid session flow control send window: "
1294         << new_window << ", below default: " << kMinimumFlowControlSendWindow;
1295     connection_->CloseConnection(
1296         QUIC_FLOW_CONTROL_INVALID_WINDOW, "New connection window too low",
1297         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1298     return;
1299   }
1300 
1301   flow_controller_.UpdateSendWindowOffset(new_window);
1302 }
1303 
OnNewDecryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicDecrypter> decrypter,bool set_alternative_decrypter,bool latch_once_used)1304 bool QuicSession::OnNewDecryptionKeyAvailable(
1305     EncryptionLevel level,
1306     std::unique_ptr<QuicDecrypter> decrypter,
1307     bool set_alternative_decrypter,
1308     bool latch_once_used) {
1309   if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3 &&
1310       !connection()->framer().HasEncrypterOfEncryptionLevel(
1311           QuicUtils::GetEncryptionLevel(
1312               QuicUtils::GetPacketNumberSpace(level)))) {
1313     // This should never happen because connection should never decrypt a packet
1314     // while an ACK for it cannot be encrypted.
1315     return false;
1316   }
1317   if (connection()->version().KnowsWhichDecrypterToUse()) {
1318     connection()->InstallDecrypter(level, std::move(decrypter));
1319     return true;
1320   }
1321   if (set_alternative_decrypter) {
1322     connection()->SetAlternativeDecrypter(level, std::move(decrypter),
1323                                           latch_once_used);
1324     return true;
1325   }
1326   connection()->SetDecrypter(level, std::move(decrypter));
1327   return true;
1328 }
1329 
OnNewEncryptionKeyAvailable(EncryptionLevel level,std::unique_ptr<QuicEncrypter> encrypter)1330 void QuicSession::OnNewEncryptionKeyAvailable(
1331     EncryptionLevel level,
1332     std::unique_ptr<QuicEncrypter> encrypter) {
1333   connection()->SetEncrypter(level, std::move(encrypter));
1334 
1335   if (GetQuicRestartFlag(quic_send_settings_on_write_key_available) &&
1336       connection_->version().handshake_protocol == PROTOCOL_TLS1_3 &&
1337       level == ENCRYPTION_FORWARD_SECURE) {
1338     // Set connection's default encryption level once 1-RTT write key is
1339     // available.
1340     QUIC_RESTART_FLAG_COUNT_N(quic_send_settings_on_write_key_available, 1, 2);
1341     QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to "
1342                   << EncryptionLevelToString(level);
1343     connection()->SetDefaultEncryptionLevel(level);
1344   }
1345 }
1346 
SetDefaultEncryptionLevel(EncryptionLevel level)1347 void QuicSession::SetDefaultEncryptionLevel(EncryptionLevel level) {
1348   DCHECK_EQ(PROTOCOL_QUIC_CRYPTO, connection_->version().handshake_protocol);
1349   QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to "
1350                 << EncryptionLevelToString(level);
1351   connection()->SetDefaultEncryptionLevel(level);
1352 
1353   switch (level) {
1354     case ENCRYPTION_INITIAL:
1355       break;
1356     case ENCRYPTION_ZERO_RTT:
1357       if (perspective() == Perspective::IS_CLIENT) {
1358         // Retransmit old 0-RTT data (if any) with the new 0-RTT keys, since
1359         // they can't be decrypted by the server.
1360         connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION);
1361         // Given any streams blocked by encryption a chance to write.
1362         OnCanWrite();
1363       }
1364       break;
1365     case ENCRYPTION_HANDSHAKE:
1366       break;
1367     case ENCRYPTION_FORWARD_SECURE:
1368       QUIC_BUG_IF(!config_.negotiated())
1369           << ENDPOINT << "Handshake confirmed without parameter negotiation.";
1370       if (!GetQuicReloadableFlag(quic_bw_sampler_app_limited_starting_value)) {
1371         connection_->ResetHasNonAppLimitedSampleAfterHandshakeCompletion();
1372       }
1373       break;
1374     default:
1375       QUIC_BUG << "Unknown encryption level: "
1376                << EncryptionLevelToString(level);
1377   }
1378 }
1379 
OnOneRttKeysAvailable()1380 void QuicSession::OnOneRttKeysAvailable() {
1381   DCHECK_EQ(PROTOCOL_TLS1_3, connection_->version().handshake_protocol);
1382   if (!GetQuicRestartFlag(quic_send_settings_on_write_key_available)) {
1383     QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to "
1384                   << EncryptionLevelToString(ENCRYPTION_FORWARD_SECURE);
1385     connection()->SetDefaultEncryptionLevel(ENCRYPTION_FORWARD_SECURE);
1386   }
1387 
1388   QUIC_BUG_IF(!GetCryptoStream()->crypto_negotiated_params().cipher_suite)
1389       << ENDPOINT << "Handshake completes without cipher suite negotiation.";
1390   QUIC_BUG_IF(!config_.negotiated())
1391       << ENDPOINT << "Handshake completes without parameter negotiation.";
1392   if (connection()->version().HasHandshakeDone() &&
1393       perspective_ == Perspective::IS_SERVER) {
1394     // Server sends HANDSHAKE_DONE to signal confirmation of the handshake
1395     // to the client.
1396     control_frame_manager_.WriteOrBufferHandshakeDone();
1397   }
1398   if (!GetQuicReloadableFlag(quic_bw_sampler_app_limited_starting_value)) {
1399     connection_->ResetHasNonAppLimitedSampleAfterHandshakeCompletion();
1400   }
1401 }
1402 
DiscardOldDecryptionKey(EncryptionLevel level)1403 void QuicSession::DiscardOldDecryptionKey(EncryptionLevel level) {
1404   if (!connection()->version().KnowsWhichDecrypterToUse()) {
1405     return;
1406   }
1407   connection()->RemoveDecrypter(level);
1408 }
1409 
DiscardOldEncryptionKey(EncryptionLevel level)1410 void QuicSession::DiscardOldEncryptionKey(EncryptionLevel level) {
1411   QUIC_DVLOG(1) << ENDPOINT << "Discard keys of "
1412                 << EncryptionLevelToString(level);
1413   if (connection()->version().handshake_protocol == PROTOCOL_TLS1_3) {
1414     connection()->RemoveEncrypter(level);
1415   }
1416   switch (level) {
1417     case ENCRYPTION_INITIAL:
1418       NeuterUnencryptedData();
1419       break;
1420     case ENCRYPTION_HANDSHAKE:
1421       NeuterHandshakeData();
1422       break;
1423     case ENCRYPTION_ZERO_RTT:
1424       break;
1425     case ENCRYPTION_FORWARD_SECURE:
1426       QUIC_BUG << "Tries to drop 1-RTT keys";
1427       break;
1428     default:
1429       QUIC_BUG << "Unknown encryption level: "
1430                << EncryptionLevelToString(level);
1431   }
1432 }
1433 
NeuterHandshakeData()1434 void QuicSession::NeuterHandshakeData() {
1435   connection()->OnHandshakeComplete();
1436 }
1437 
OnCryptoHandshakeMessageSent(const CryptoHandshakeMessage &)1438 void QuicSession::OnCryptoHandshakeMessageSent(
1439     const CryptoHandshakeMessage& /*message*/) {}
1440 
OnCryptoHandshakeMessageReceived(const CryptoHandshakeMessage &)1441 void QuicSession::OnCryptoHandshakeMessageReceived(
1442     const CryptoHandshakeMessage& /*message*/) {}
1443 
RegisterStreamPriority(QuicStreamId id,bool is_static,const spdy::SpdyStreamPrecedence & precedence)1444 void QuicSession::RegisterStreamPriority(
1445     QuicStreamId id,
1446     bool is_static,
1447     const spdy::SpdyStreamPrecedence& precedence) {
1448   if (enable_round_robin_scheduling_) {
1449     // Ignore provided precedence, instead, put all streams at the same priority
1450     // bucket.
1451     write_blocked_streams()->RegisterStream(
1452         id, is_static, spdy::SpdyStreamPrecedence(spdy::kV3LowestPriority));
1453     return;
1454   }
1455   write_blocked_streams()->RegisterStream(id, is_static, precedence);
1456 }
1457 
UnregisterStreamPriority(QuicStreamId id,bool is_static)1458 void QuicSession::UnregisterStreamPriority(QuicStreamId id, bool is_static) {
1459   write_blocked_streams()->UnregisterStream(id, is_static);
1460 }
1461 
UpdateStreamPriority(QuicStreamId id,const spdy::SpdyStreamPrecedence & new_precedence)1462 void QuicSession::UpdateStreamPriority(
1463     QuicStreamId id,
1464     const spdy::SpdyStreamPrecedence& new_precedence) {
1465   if (enable_round_robin_scheduling_) {
1466     // Ignore updated precedence.
1467     return;
1468   }
1469   write_blocked_streams()->UpdateStreamPriority(id, new_precedence);
1470 }
1471 
config()1472 QuicConfig* QuicSession::config() {
1473   return &config_;
1474 }
1475 
ActivateStream(std::unique_ptr<QuicStream> stream)1476 void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
1477   QuicStreamId stream_id = stream->id();
1478   bool is_static = stream->is_static();
1479   QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
1480                 << ". activating " << stream_id;
1481   DCHECK(!QuicContainsKey(stream_map_, stream_id));
1482   stream_map_[stream_id] = std::move(stream);
1483   if (IsIncomingStream(stream_id)) {
1484     is_static ? ++num_incoming_static_streams_
1485               : ++num_dynamic_incoming_streams_;
1486   } else if (is_static) {
1487     ++num_outgoing_static_streams_;
1488   }
1489 
1490   if (VersionHasIetfQuicFrames(transport_version()) &&
1491       !QuicUtils::IsBidirectionalStreamId(stream_id) && is_static) {
1492     DCHECK_LE(num_incoming_static_streams_,
1493               num_expected_unidirectional_static_streams_);
1494     DCHECK_LE(num_outgoing_static_streams_,
1495               num_expected_unidirectional_static_streams_);
1496   }
1497 }
1498 
GetNextOutgoingBidirectionalStreamId()1499 QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
1500   if (VersionHasIetfQuicFrames(transport_version())) {
1501     return v99_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
1502   }
1503   return stream_id_manager_.GetNextOutgoingStreamId();
1504 }
1505 
GetNextOutgoingUnidirectionalStreamId()1506 QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
1507   if (VersionHasIetfQuicFrames(transport_version())) {
1508     return v99_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
1509   }
1510   return stream_id_manager_.GetNextOutgoingStreamId();
1511 }
1512 
CanOpenNextOutgoingBidirectionalStream()1513 bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
1514   if (!VersionHasIetfQuicFrames(transport_version())) {
1515     return stream_id_manager_.CanOpenNextOutgoingStream(
1516         GetNumOpenOutgoingStreams());
1517   }
1518   if (v99_streamid_manager_.CanOpenNextOutgoingBidirectionalStream()) {
1519     return true;
1520   }
1521   if (is_configured_) {
1522     // Send STREAM_BLOCKED after config negotiated.
1523     control_frame_manager_.WriteOrBufferStreamsBlocked(
1524         v99_streamid_manager_.max_outgoing_bidirectional_streams(),
1525         /*unidirectional=*/false);
1526   }
1527   return false;
1528 }
1529 
CanOpenNextOutgoingUnidirectionalStream()1530 bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
1531   if (!VersionHasIetfQuicFrames(transport_version())) {
1532     return stream_id_manager_.CanOpenNextOutgoingStream(
1533         GetNumOpenOutgoingStreams());
1534   }
1535   if (v99_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream()) {
1536     return true;
1537   }
1538   if (is_configured_) {
1539     // Send STREAM_BLOCKED after config negotiated.
1540     control_frame_manager_.WriteOrBufferStreamsBlocked(
1541         v99_streamid_manager_.max_outgoing_unidirectional_streams(),
1542         /*unidirectional=*/true);
1543   }
1544   return false;
1545 }
1546 
GetAdvertisedMaxIncomingBidirectionalStreams() const1547 QuicStreamCount QuicSession::GetAdvertisedMaxIncomingBidirectionalStreams()
1548     const {
1549   DCHECK(VersionHasIetfQuicFrames(transport_version()));
1550   return v99_streamid_manager_.advertised_max_incoming_bidirectional_streams();
1551 }
1552 
GetOrCreateStream(const QuicStreamId stream_id)1553 QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
1554   DCHECK(!QuicContainsKey(pending_stream_map_, stream_id));
1555   if (QuicUtils::IsCryptoStreamId(transport_version(), stream_id)) {
1556     return GetMutableCryptoStream();
1557   }
1558 
1559   StreamMap::iterator it = stream_map_.find(stream_id);
1560   if (it != stream_map_.end()) {
1561     return it->second.get();
1562   }
1563 
1564   if (IsClosedStream(stream_id)) {
1565     return nullptr;
1566   }
1567 
1568   if (!IsIncomingStream(stream_id)) {
1569     HandleFrameOnNonexistentOutgoingStream(stream_id);
1570     return nullptr;
1571   }
1572 
1573   // TODO(fkastenholz): If we are creating a new stream and we have
1574   // sent a goaway, we should ignore the stream creation. Need to
1575   // add code to A) test if goaway was sent ("if (goaway_sent_)") and
1576   // B) reject stream creation ("return nullptr")
1577 
1578   if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
1579     return nullptr;
1580   }
1581 
1582   if (!VersionHasIetfQuicFrames(transport_version())) {
1583     // TODO(fayang): Let LegacyQuicStreamIdManager count open streams and make
1584     // CanOpenIncomingStream interface consistent with that of v99.
1585     if (!stream_id_manager_.CanOpenIncomingStream(
1586             GetNumOpenIncomingStreams())) {
1587       // Refuse to open the stream.
1588       SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
1589       return nullptr;
1590     }
1591   }
1592 
1593   return CreateIncomingStream(stream_id);
1594 }
1595 
StreamDraining(QuicStreamId stream_id)1596 void QuicSession::StreamDraining(QuicStreamId stream_id) {
1597   DCHECK(QuicContainsKey(stream_map_, stream_id));
1598   if (!QuicContainsKey(draining_streams_, stream_id)) {
1599     draining_streams_.insert(stream_id);
1600     if (IsIncomingStream(stream_id)) {
1601       ++num_draining_incoming_streams_;
1602     }
1603     if (VersionHasIetfQuicFrames(transport_version())) {
1604       v99_streamid_manager_.OnStreamClosed(stream_id);
1605     }
1606   }
1607   if (!IsIncomingStream(stream_id)) {
1608     // Inform application that a stream is available.
1609     if (VersionHasIetfQuicFrames(transport_version())) {
1610       OnCanCreateNewOutgoingStream(
1611           !QuicUtils::IsBidirectionalStreamId(stream_id));
1612     } else {
1613       QuicStream* stream = GetStream(stream_id);
1614       if (!stream) {
1615         QUIC_BUG << "Stream doesn't exist when draining.";
1616         return;
1617       }
1618       OnCanCreateNewOutgoingStream(stream->type() != BIDIRECTIONAL);
1619     }
1620   }
1621 }
1622 
MaybeIncreaseLargestPeerStreamId(const QuicStreamId stream_id)1623 bool QuicSession::MaybeIncreaseLargestPeerStreamId(
1624     const QuicStreamId stream_id) {
1625   if (VersionHasIetfQuicFrames(transport_version())) {
1626     std::string error_details;
1627     if (v99_streamid_manager_.MaybeIncreaseLargestPeerStreamId(
1628             stream_id, &error_details)) {
1629       return true;
1630     }
1631     connection()->CloseConnection(
1632         QUIC_INVALID_STREAM_ID, error_details,
1633         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1634     return false;
1635   }
1636   if (!stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id)) {
1637     connection()->CloseConnection(
1638         QUIC_TOO_MANY_AVAILABLE_STREAMS,
1639         quiche::QuicheStrCat(stream_id, " exceeds available streams ",
1640                              stream_id_manager_.MaxAvailableStreams()),
1641         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1642     return false;
1643   }
1644   return true;
1645 }
1646 
ShouldYield(QuicStreamId stream_id)1647 bool QuicSession::ShouldYield(QuicStreamId stream_id) {
1648   if (stream_id == currently_writing_stream_id_) {
1649     return false;
1650   }
1651   return write_blocked_streams()->ShouldYield(stream_id);
1652 }
1653 
GetOrCreatePendingStream(QuicStreamId stream_id)1654 PendingStream* QuicSession::GetOrCreatePendingStream(QuicStreamId stream_id) {
1655   auto it = pending_stream_map_.find(stream_id);
1656   if (it != pending_stream_map_.end()) {
1657     return it->second.get();
1658   }
1659 
1660   if (IsClosedStream(stream_id) ||
1661       !MaybeIncreaseLargestPeerStreamId(stream_id)) {
1662     return nullptr;
1663   }
1664 
1665   auto pending = std::make_unique<PendingStream>(stream_id, this);
1666   PendingStream* unowned_pending = pending.get();
1667   pending_stream_map_[stream_id] = std::move(pending);
1668   return unowned_pending;
1669 }
1670 
set_largest_peer_created_stream_id(QuicStreamId largest_peer_created_stream_id)1671 void QuicSession::set_largest_peer_created_stream_id(
1672     QuicStreamId largest_peer_created_stream_id) {
1673   DCHECK(!VersionHasIetfQuicFrames(transport_version()));
1674   stream_id_manager_.set_largest_peer_created_stream_id(
1675       largest_peer_created_stream_id);
1676 }
1677 
GetLargestPeerCreatedStreamId(bool unidirectional) const1678 QuicStreamId QuicSession::GetLargestPeerCreatedStreamId(
1679     bool unidirectional) const {
1680   // This method is only used in IETF QUIC.
1681   DCHECK(VersionHasIetfQuicFrames(transport_version()));
1682   return v99_streamid_manager_.GetLargestPeerCreatedStreamId(unidirectional);
1683 }
1684 
DeleteConnection()1685 void QuicSession::DeleteConnection() {
1686   if (connection_) {
1687     delete connection_;
1688     connection_ = nullptr;
1689   }
1690 }
1691 
MaybeSetStreamPriority(QuicStreamId stream_id,const spdy::SpdyStreamPrecedence & precedence)1692 bool QuicSession::MaybeSetStreamPriority(
1693     QuicStreamId stream_id,
1694     const spdy::SpdyStreamPrecedence& precedence) {
1695   auto active_stream = stream_map_.find(stream_id);
1696   if (active_stream != stream_map_.end()) {
1697     active_stream->second->SetPriority(precedence);
1698     return true;
1699   }
1700 
1701   return false;
1702 }
1703 
IsClosedStream(QuicStreamId id)1704 bool QuicSession::IsClosedStream(QuicStreamId id) {
1705   DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
1706   if (IsOpenStream(id)) {
1707     // Stream is active
1708     return false;
1709   }
1710 
1711   if (VersionHasIetfQuicFrames(transport_version())) {
1712     return !v99_streamid_manager_.IsAvailableStream(id);
1713   }
1714 
1715   return !stream_id_manager_.IsAvailableStream(id);
1716 }
1717 
IsOpenStream(QuicStreamId id)1718 bool QuicSession::IsOpenStream(QuicStreamId id) {
1719   DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
1720   if (QuicContainsKey(stream_map_, id) ||
1721       QuicContainsKey(pending_stream_map_, id) ||
1722       QuicUtils::IsCryptoStreamId(transport_version(), id)) {
1723     // Stream is active
1724     return true;
1725   }
1726   return false;
1727 }
1728 
IsStaticStream(QuicStreamId id) const1729 bool QuicSession::IsStaticStream(QuicStreamId id) const {
1730   auto it = stream_map_.find(id);
1731   if (it == stream_map_.end()) {
1732     return false;
1733   }
1734   return it->second->is_static();
1735 }
1736 
GetNumOpenIncomingStreams() const1737 size_t QuicSession::GetNumOpenIncomingStreams() const {
1738   return num_dynamic_incoming_streams_ - num_draining_incoming_streams_ +
1739          num_locally_closed_incoming_streams_highest_offset_;
1740 }
1741 
GetNumOpenOutgoingStreams() const1742 size_t QuicSession::GetNumOpenOutgoingStreams() const {
1743   DCHECK_GE(GetNumDynamicOutgoingStreams() +
1744                 GetNumLocallyClosedOutgoingStreamsHighestOffset(),
1745             GetNumDrainingOutgoingStreams());
1746   return GetNumDynamicOutgoingStreams() +
1747          GetNumLocallyClosedOutgoingStreamsHighestOffset() -
1748          GetNumDrainingOutgoingStreams();
1749 }
1750 
GetNumActiveStreams() const1751 size_t QuicSession::GetNumActiveStreams() const {
1752   return stream_map_.size() - draining_streams_.size() -
1753          num_incoming_static_streams_ - num_outgoing_static_streams_;
1754 }
1755 
GetNumDrainingStreams() const1756 size_t QuicSession::GetNumDrainingStreams() const {
1757   return draining_streams_.size();
1758 }
1759 
MarkConnectionLevelWriteBlocked(QuicStreamId id)1760 void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
1761   if (GetOrCreateStream(id) == nullptr) {
1762     QUIC_BUG << "Marking unknown stream " << id << " blocked.";
1763     QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
1764   }
1765 
1766   QUIC_DVLOG(1) << ENDPOINT << "Adding stream " << id
1767                 << " to write-blocked list";
1768 
1769   write_blocked_streams_.AddStream(id);
1770 }
1771 
HasDataToWrite() const1772 bool QuicSession::HasDataToWrite() const {
1773   return write_blocked_streams_.HasWriteBlockedSpecialStream() ||
1774          write_blocked_streams_.HasWriteBlockedDataStreams() ||
1775          connection_->HasQueuedData() ||
1776          !streams_with_pending_retransmission_.empty() ||
1777          control_frame_manager_.WillingToWrite();
1778 }
1779 
OnAckNeedsRetransmittableFrame()1780 void QuicSession::OnAckNeedsRetransmittableFrame() {
1781   flow_controller_.SendWindowUpdate();
1782 }
1783 
SendPing()1784 void QuicSession::SendPing() {
1785   control_frame_manager_.WritePing();
1786 }
1787 
GetNumDynamicOutgoingStreams() const1788 size_t QuicSession::GetNumDynamicOutgoingStreams() const {
1789   DCHECK_GE(
1790       static_cast<size_t>(stream_map_.size() + pending_stream_map_.size()),
1791       num_dynamic_incoming_streams_ + num_outgoing_static_streams_ +
1792           num_incoming_static_streams_);
1793   return stream_map_.size() + pending_stream_map_.size() -
1794          num_dynamic_incoming_streams_ - num_outgoing_static_streams_ -
1795          num_incoming_static_streams_;
1796 }
1797 
GetNumDrainingOutgoingStreams() const1798 size_t QuicSession::GetNumDrainingOutgoingStreams() const {
1799   DCHECK_GE(draining_streams_.size(), num_draining_incoming_streams_);
1800   return draining_streams_.size() - num_draining_incoming_streams_;
1801 }
1802 
GetNumLocallyClosedOutgoingStreamsHighestOffset() const1803 size_t QuicSession::GetNumLocallyClosedOutgoingStreamsHighestOffset() const {
1804   DCHECK_GE(locally_closed_streams_highest_offset_.size(),
1805             num_locally_closed_incoming_streams_highest_offset_);
1806   return locally_closed_streams_highest_offset_.size() -
1807          num_locally_closed_incoming_streams_highest_offset_;
1808 }
1809 
IsConnectionFlowControlBlocked() const1810 bool QuicSession::IsConnectionFlowControlBlocked() const {
1811   return flow_controller_.IsBlocked();
1812 }
1813 
IsStreamFlowControlBlocked()1814 bool QuicSession::IsStreamFlowControlBlocked() {
1815   for (auto const& kv : stream_map_) {
1816     if (kv.second->flow_controller()->IsBlocked()) {
1817       return true;
1818     }
1819   }
1820   if (!QuicVersionUsesCryptoFrames(transport_version()) &&
1821       GetMutableCryptoStream()->flow_controller()->IsBlocked()) {
1822     return true;
1823   }
1824   return false;
1825 }
1826 
MaxAvailableBidirectionalStreams() const1827 size_t QuicSession::MaxAvailableBidirectionalStreams() const {
1828   if (VersionHasIetfQuicFrames(transport_version())) {
1829     return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
1830   }
1831   return stream_id_manager_.MaxAvailableStreams();
1832 }
1833 
MaxAvailableUnidirectionalStreams() const1834 size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
1835   if (VersionHasIetfQuicFrames(transport_version())) {
1836     return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
1837   }
1838   return stream_id_manager_.MaxAvailableStreams();
1839 }
1840 
IsIncomingStream(QuicStreamId id) const1841 bool QuicSession::IsIncomingStream(QuicStreamId id) const {
1842   if (VersionHasIetfQuicFrames(transport_version())) {
1843     return v99_streamid_manager_.IsIncomingStream(id);
1844   }
1845   return stream_id_manager_.IsIncomingStream(id);
1846 }
1847 
OnStreamDoneWaitingForAcks(QuicStreamId id)1848 void QuicSession::OnStreamDoneWaitingForAcks(QuicStreamId id) {
1849   streams_waiting_for_acks_.erase(id);
1850 
1851   auto it = zombie_streams_.find(id);
1852   if (it == zombie_streams_.end()) {
1853     return;
1854   }
1855 
1856   closed_streams_.push_back(std::move(it->second));
1857   if (!closed_streams_clean_up_alarm_->IsSet()) {
1858     closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
1859   }
1860   zombie_streams_.erase(it);
1861   // Do not retransmit data of a closed stream.
1862   streams_with_pending_retransmission_.erase(id);
1863 }
1864 
OnStreamWaitingForAcks(QuicStreamId id)1865 void QuicSession::OnStreamWaitingForAcks(QuicStreamId id) {
1866   // Exclude crypto stream's status since it is counted in HasUnackedCryptoData.
1867   if (GetCryptoStream() != nullptr && id == GetCryptoStream()->id()) {
1868     return;
1869   }
1870 
1871   streams_waiting_for_acks_.insert(id);
1872 
1873   // The number of the streams waiting for acks should not be larger than the
1874   // number of streams.
1875   if (static_cast<size_t>(stream_map_.size() + zombie_streams_.size()) <
1876       streams_waiting_for_acks_.size()) {
1877     QUIC_BUG << "More streams are waiting for acks than the number of streams. "
1878              << "Sizes: streams: " << stream_map_.size()
1879              << ", zombie streams: " << zombie_streams_.size()
1880              << ", vs streams waiting for acks: "
1881              << streams_waiting_for_acks_.size();
1882   }
1883 }
1884 
GetStream(QuicStreamId id) const1885 QuicStream* QuicSession::GetStream(QuicStreamId id) const {
1886   auto active_stream = stream_map_.find(id);
1887   if (active_stream != stream_map_.end()) {
1888     return active_stream->second.get();
1889   }
1890   auto zombie_stream = zombie_streams_.find(id);
1891   if (zombie_stream != zombie_streams_.end()) {
1892     return zombie_stream->second.get();
1893   }
1894 
1895   if (QuicUtils::IsCryptoStreamId(transport_version(), id)) {
1896     return const_cast<QuicCryptoStream*>(GetCryptoStream());
1897   }
1898 
1899   return nullptr;
1900 }
1901 
OnFrameAcked(const QuicFrame & frame,QuicTime::Delta ack_delay_time,QuicTime receive_timestamp)1902 bool QuicSession::OnFrameAcked(const QuicFrame& frame,
1903                                QuicTime::Delta ack_delay_time,
1904                                QuicTime receive_timestamp) {
1905   if (frame.type == MESSAGE_FRAME) {
1906     OnMessageAcked(frame.message_frame->message_id, receive_timestamp);
1907     return true;
1908   }
1909   if (frame.type == CRYPTO_FRAME) {
1910     return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
1911                                                         ack_delay_time);
1912   }
1913   if (frame.type != STREAM_FRAME) {
1914     return control_frame_manager_.OnControlFrameAcked(frame);
1915   }
1916   bool new_stream_data_acked = false;
1917   QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1918   // Stream can already be reset when sent frame gets acked.
1919   if (stream != nullptr) {
1920     QuicByteCount newly_acked_length = 0;
1921     new_stream_data_acked = stream->OnStreamFrameAcked(
1922         frame.stream_frame.offset, frame.stream_frame.data_length,
1923         frame.stream_frame.fin, ack_delay_time, receive_timestamp,
1924         &newly_acked_length);
1925     if (!stream->HasPendingRetransmission()) {
1926       streams_with_pending_retransmission_.erase(stream->id());
1927     }
1928   }
1929   return new_stream_data_acked;
1930 }
1931 
OnStreamFrameRetransmitted(const QuicStreamFrame & frame)1932 void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
1933   QuicStream* stream = GetStream(frame.stream_id);
1934   if (stream == nullptr) {
1935     QUIC_BUG << "Stream: " << frame.stream_id << " is closed when " << frame
1936              << " is retransmitted.";
1937     connection()->CloseConnection(
1938         QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
1939         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1940     return;
1941   }
1942   stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
1943                                      frame.fin);
1944 }
1945 
OnFrameLost(const QuicFrame & frame)1946 void QuicSession::OnFrameLost(const QuicFrame& frame) {
1947   if (frame.type == MESSAGE_FRAME) {
1948     OnMessageLost(frame.message_frame->message_id);
1949     return;
1950   }
1951   if (frame.type == CRYPTO_FRAME) {
1952     GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
1953     return;
1954   }
1955   if (frame.type != STREAM_FRAME) {
1956     control_frame_manager_.OnControlFrameLost(frame);
1957     return;
1958   }
1959   QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1960   if (stream == nullptr) {
1961     return;
1962   }
1963   stream->OnStreamFrameLost(frame.stream_frame.offset,
1964                             frame.stream_frame.data_length,
1965                             frame.stream_frame.fin);
1966   if (stream->HasPendingRetransmission() &&
1967       !QuicContainsKey(streams_with_pending_retransmission_,
1968                        frame.stream_frame.stream_id)) {
1969     streams_with_pending_retransmission_.insert(
1970         std::make_pair(frame.stream_frame.stream_id, true));
1971   }
1972 }
1973 
RetransmitFrames(const QuicFrames & frames,TransmissionType type)1974 void QuicSession::RetransmitFrames(const QuicFrames& frames,
1975                                    TransmissionType type) {
1976   QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
1977   if (!write_with_transmission_) {
1978     SetTransmissionType(type);
1979   }
1980   for (const QuicFrame& frame : frames) {
1981     if (frame.type == MESSAGE_FRAME) {
1982       // Do not retransmit MESSAGE frames.
1983       continue;
1984     }
1985     if (frame.type == CRYPTO_FRAME) {
1986       GetMutableCryptoStream()->RetransmitData(frame.crypto_frame, type);
1987       continue;
1988     }
1989     if (frame.type != STREAM_FRAME) {
1990       if (!control_frame_manager_.RetransmitControlFrame(frame, type)) {
1991         break;
1992       }
1993       continue;
1994     }
1995     QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1996     if (stream != nullptr &&
1997         !stream->RetransmitStreamData(frame.stream_frame.offset,
1998                                       frame.stream_frame.data_length,
1999                                       frame.stream_frame.fin, type)) {
2000       break;
2001     }
2002   }
2003 }
2004 
IsFrameOutstanding(const QuicFrame & frame) const2005 bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
2006   if (frame.type == MESSAGE_FRAME) {
2007     return false;
2008   }
2009   if (frame.type == CRYPTO_FRAME) {
2010     return GetCryptoStream()->IsFrameOutstanding(
2011         frame.crypto_frame->level, frame.crypto_frame->offset,
2012         frame.crypto_frame->data_length);
2013   }
2014   if (frame.type != STREAM_FRAME) {
2015     return control_frame_manager_.IsControlFrameOutstanding(frame);
2016   }
2017   QuicStream* stream = GetStream(frame.stream_frame.stream_id);
2018   return stream != nullptr &&
2019          stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
2020                                           frame.stream_frame.data_length,
2021                                           frame.stream_frame.fin);
2022 }
2023 
HasUnackedCryptoData() const2024 bool QuicSession::HasUnackedCryptoData() const {
2025   const QuicCryptoStream* crypto_stream = GetCryptoStream();
2026   return crypto_stream->IsWaitingForAcks() || crypto_stream->HasBufferedData();
2027 }
2028 
HasUnackedStreamData() const2029 bool QuicSession::HasUnackedStreamData() const {
2030   return !streams_waiting_for_acks_.empty();
2031 }
2032 
GetHandshakeState() const2033 HandshakeState QuicSession::GetHandshakeState() const {
2034   return GetCryptoStream()->GetHandshakeState();
2035 }
2036 
WriteStreamData(QuicStreamId id,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2037 WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
2038                                                    QuicStreamOffset offset,
2039                                                    QuicByteCount data_length,
2040                                                    QuicDataWriter* writer) {
2041   QuicStream* stream = GetStream(id);
2042   if (stream == nullptr) {
2043     // This causes the connection to be closed because of failed to serialize
2044     // packet.
2045     QUIC_BUG << "Stream " << id << " does not exist when trying to write data."
2046              << " version:" << transport_version();
2047     return STREAM_MISSING;
2048   }
2049   if (stream->WriteStreamData(offset, data_length, writer)) {
2050     return WRITE_SUCCESS;
2051   }
2052   return WRITE_FAILED;
2053 }
2054 
WriteCryptoData(EncryptionLevel level,QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)2055 bool QuicSession::WriteCryptoData(EncryptionLevel level,
2056                                   QuicStreamOffset offset,
2057                                   QuicByteCount data_length,
2058                                   QuicDataWriter* writer) {
2059   return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
2060                                                     writer);
2061 }
2062 
GetStatelessResetToken() const2063 QuicUint128 QuicSession::GetStatelessResetToken() const {
2064   return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
2065 }
2066 
RetransmitLostData()2067 bool QuicSession::RetransmitLostData() {
2068   QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
2069   // Retransmit crypto data first.
2070   bool uses_crypto_frames = QuicVersionUsesCryptoFrames(transport_version());
2071   QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2072   if (uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
2073     if (!write_with_transmission_) {
2074       SetTransmissionType(HANDSHAKE_RETRANSMISSION);
2075     }
2076     crypto_stream->WritePendingCryptoRetransmission();
2077   }
2078   // Retransmit crypto data in stream 1 frames (version < 47).
2079   if (!uses_crypto_frames &&
2080       QuicContainsKey(streams_with_pending_retransmission_,
2081                       QuicUtils::GetCryptoStreamId(transport_version()))) {
2082     if (!write_with_transmission_) {
2083       SetTransmissionType(HANDSHAKE_RETRANSMISSION);
2084     }
2085     // Retransmit crypto data first.
2086     QuicStream* crypto_stream =
2087         GetStream(QuicUtils::GetCryptoStreamId(transport_version()));
2088     crypto_stream->OnCanWrite();
2089     DCHECK(CheckStreamWriteBlocked(crypto_stream));
2090     if (crypto_stream->HasPendingRetransmission()) {
2091       // Connection is write blocked.
2092       return false;
2093     } else {
2094       streams_with_pending_retransmission_.erase(
2095           QuicUtils::GetCryptoStreamId(transport_version()));
2096     }
2097   }
2098   if (control_frame_manager_.HasPendingRetransmission()) {
2099     if (!write_with_transmission_) {
2100       SetTransmissionType(LOSS_RETRANSMISSION);
2101     }
2102     control_frame_manager_.OnCanWrite();
2103     if (control_frame_manager_.HasPendingRetransmission()) {
2104       return false;
2105     }
2106   }
2107   while (!streams_with_pending_retransmission_.empty()) {
2108     if (!connection_->CanWriteStreamData()) {
2109       break;
2110     }
2111     // Retransmit lost data on headers and data streams.
2112     const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
2113     QuicStream* stream = GetStream(id);
2114     if (stream != nullptr) {
2115       if (!write_with_transmission_) {
2116         SetTransmissionType(LOSS_RETRANSMISSION);
2117       }
2118       stream->OnCanWrite();
2119       DCHECK(CheckStreamWriteBlocked(stream));
2120       if (stream->HasPendingRetransmission()) {
2121         // Connection is write blocked.
2122         break;
2123       } else if (!streams_with_pending_retransmission_.empty() &&
2124                  streams_with_pending_retransmission_.begin()->first == id) {
2125         // Retransmit lost data may cause connection close. If this stream
2126         // has not yet sent fin, a RST_STREAM will be sent and it will be
2127         // removed from streams_with_pending_retransmission_.
2128         streams_with_pending_retransmission_.pop_front();
2129       }
2130     } else {
2131       QUIC_BUG << "Try to retransmit data of a closed stream";
2132       streams_with_pending_retransmission_.pop_front();
2133     }
2134   }
2135 
2136   return streams_with_pending_retransmission_.empty();
2137 }
2138 
NeuterUnencryptedData()2139 void QuicSession::NeuterUnencryptedData() {
2140   QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
2141   crypto_stream->NeuterUnencryptedStreamData();
2142   if (!crypto_stream->HasPendingRetransmission() &&
2143       !QuicVersionUsesCryptoFrames(transport_version())) {
2144     streams_with_pending_retransmission_.erase(
2145         QuicUtils::GetCryptoStreamId(transport_version()));
2146   }
2147   connection_->NeuterUnencryptedPackets();
2148 }
2149 
SetTransmissionType(TransmissionType type)2150 void QuicSession::SetTransmissionType(TransmissionType type) {
2151   connection_->SetTransmissionType(type);
2152 }
2153 
SendMessage(QuicMemSliceSpan message)2154 MessageResult QuicSession::SendMessage(QuicMemSliceSpan message) {
2155   return SendMessage(message, /*flush=*/false);
2156 }
2157 
SendMessage(QuicMemSliceSpan message,bool flush)2158 MessageResult QuicSession::SendMessage(QuicMemSliceSpan message, bool flush) {
2159   DCHECK(connection_->connected())
2160       << ENDPOINT << "Try to write messages when connection is closed.";
2161   if (!IsEncryptionEstablished()) {
2162     return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
2163   }
2164   MessageStatus result =
2165       connection_->SendMessage(last_message_id_ + 1, message, flush);
2166   if (result == MESSAGE_STATUS_SUCCESS) {
2167     return {result, ++last_message_id_};
2168   }
2169   return {result, 0};
2170 }
2171 
OnMessageAcked(QuicMessageId message_id,QuicTime)2172 void QuicSession::OnMessageAcked(QuicMessageId message_id,
2173                                  QuicTime /*receive_timestamp*/) {
2174   QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
2175 }
2176 
OnMessageLost(QuicMessageId message_id)2177 void QuicSession::OnMessageLost(QuicMessageId message_id) {
2178   QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
2179                 << " is considered lost";
2180 }
2181 
CleanUpClosedStreams()2182 void QuicSession::CleanUpClosedStreams() {
2183   closed_streams_.clear();
2184 }
2185 
GetCurrentLargestMessagePayload() const2186 QuicPacketLength QuicSession::GetCurrentLargestMessagePayload() const {
2187   return connection_->GetCurrentLargestMessagePayload();
2188 }
2189 
GetGuaranteedLargestMessagePayload() const2190 QuicPacketLength QuicSession::GetGuaranteedLargestMessagePayload() const {
2191   return connection_->GetGuaranteedLargestMessagePayload();
2192 }
2193 
SendStopSending(uint16_t code,QuicStreamId stream_id)2194 void QuicSession::SendStopSending(uint16_t code, QuicStreamId stream_id) {
2195   control_frame_manager_.WriteOrBufferStopSending(code, stream_id);
2196 }
2197 
next_outgoing_bidirectional_stream_id() const2198 QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
2199   if (VersionHasIetfQuicFrames(transport_version())) {
2200     return v99_streamid_manager_.next_outgoing_bidirectional_stream_id();
2201   }
2202   return stream_id_manager_.next_outgoing_stream_id();
2203 }
2204 
next_outgoing_unidirectional_stream_id() const2205 QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
2206   if (VersionHasIetfQuicFrames(transport_version())) {
2207     return v99_streamid_manager_.next_outgoing_unidirectional_stream_id();
2208   }
2209   return stream_id_manager_.next_outgoing_stream_id();
2210 }
2211 
OnMaxStreamsFrame(const QuicMaxStreamsFrame & frame)2212 bool QuicSession::OnMaxStreamsFrame(const QuicMaxStreamsFrame& frame) {
2213   const bool allow_new_streams =
2214       frame.unidirectional
2215           ? v99_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
2216                 frame.stream_count)
2217           : v99_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
2218                 frame.stream_count);
2219   if (allow_new_streams) {
2220     OnCanCreateNewOutgoingStream(frame.unidirectional);
2221   }
2222 
2223   return true;
2224 }
2225 
OnStreamsBlockedFrame(const QuicStreamsBlockedFrame & frame)2226 bool QuicSession::OnStreamsBlockedFrame(const QuicStreamsBlockedFrame& frame) {
2227   std::string error_details;
2228   if (v99_streamid_manager_.OnStreamsBlockedFrame(frame, &error_details)) {
2229     return true;
2230   }
2231   connection_->CloseConnection(
2232       QUIC_STREAMS_BLOCKED_ERROR, error_details,
2233       ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
2234   return false;
2235 }
2236 
max_open_incoming_bidirectional_streams() const2237 size_t QuicSession::max_open_incoming_bidirectional_streams() const {
2238   if (VersionHasIetfQuicFrames(transport_version())) {
2239     return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
2240   }
2241   return stream_id_manager_.max_open_incoming_streams();
2242 }
2243 
max_open_incoming_unidirectional_streams() const2244 size_t QuicSession::max_open_incoming_unidirectional_streams() const {
2245   if (VersionHasIetfQuicFrames(transport_version())) {
2246     return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
2247   }
2248   return stream_id_manager_.max_open_incoming_streams();
2249 }
2250 
SelectAlpn(const std::vector<quiche::QuicheStringPiece> & alpns) const2251 std::vector<quiche::QuicheStringPiece>::const_iterator QuicSession::SelectAlpn(
2252     const std::vector<quiche::QuicheStringPiece>& alpns) const {
2253   const std::string alpn = AlpnForVersion(connection()->version());
2254   return std::find(alpns.cbegin(), alpns.cend(), alpn);
2255 }
2256 
OnAlpnSelected(quiche::QuicheStringPiece alpn)2257 void QuicSession::OnAlpnSelected(quiche::QuicheStringPiece alpn) {
2258   QUIC_DLOG(INFO) << (perspective() == Perspective::IS_SERVER ? "Server: "
2259                                                               : "Client: ")
2260                   << "ALPN selected: " << alpn;
2261 }
2262 
2263 #undef ENDPOINT  // undef for jumbo builds
2264 }  // namespace quic
2265