1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/third_party/quiche/src/quic/core/quic_stream.h"
6 
7 #include <limits>
8 #include <string>
9 
10 #include "absl/strings/string_view.h"
11 #include "absl/types/optional.h"
12 #include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
13 #include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
14 #include "net/third_party/quiche/src/quic/core/quic_session.h"
15 #include "net/third_party/quiche/src/quic/core/quic_types.h"
16 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
17 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
18 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
19 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
20 #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
21 #include "net/third_party/quiche/src/common/platform/api/quiche_str_cat.h"
22 
23 using spdy::SpdyPriority;
24 
25 namespace quic {
26 
27 #define ENDPOINT \
28   (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
29 
30 namespace {
31 
DefaultFlowControlWindow(ParsedQuicVersion version)32 QuicByteCount DefaultFlowControlWindow(ParsedQuicVersion version) {
33   if (!version.AllowsLowFlowControlLimits()) {
34     return kDefaultFlowControlSendWindow;
35   }
36   return 0;
37 }
38 
GetInitialStreamFlowControlWindowToSend(QuicSession * session,QuicStreamId stream_id)39 QuicByteCount GetInitialStreamFlowControlWindowToSend(QuicSession* session,
40                                                       QuicStreamId stream_id) {
41   ParsedQuicVersion version = session->connection()->version();
42   if (version.handshake_protocol != PROTOCOL_TLS1_3) {
43     return session->config()->GetInitialStreamFlowControlWindowToSend();
44   }
45 
46   // Unidirectional streams (v99 only).
47   if (VersionHasIetfQuicFrames(version.transport_version) &&
48       !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
49     return session->config()
50         ->GetInitialMaxStreamDataBytesUnidirectionalToSend();
51   }
52 
53   if (QuicUtils::IsOutgoingStreamId(version, stream_id,
54                                     session->perspective())) {
55     return session->config()
56         ->GetInitialMaxStreamDataBytesOutgoingBidirectionalToSend();
57   }
58 
59   return session->config()
60       ->GetInitialMaxStreamDataBytesIncomingBidirectionalToSend();
61 }
62 
GetReceivedFlowControlWindow(QuicSession * session,QuicStreamId stream_id)63 QuicByteCount GetReceivedFlowControlWindow(QuicSession* session,
64                                            QuicStreamId stream_id) {
65   ParsedQuicVersion version = session->connection()->version();
66   if (version.handshake_protocol != PROTOCOL_TLS1_3) {
67     if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
68       return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
69     }
70 
71     return DefaultFlowControlWindow(version);
72   }
73 
74   // Unidirectional streams (v99 only).
75   if (VersionHasIetfQuicFrames(version.transport_version) &&
76       !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
77     if (session->config()
78             ->HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
79       return session->config()
80           ->ReceivedInitialMaxStreamDataBytesUnidirectional();
81     }
82 
83     return DefaultFlowControlWindow(version);
84   }
85 
86   if (QuicUtils::IsOutgoingStreamId(version, stream_id,
87                                     session->perspective())) {
88     if (session->config()
89             ->HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
90       return session->config()
91           ->ReceivedInitialMaxStreamDataBytesOutgoingBidirectional();
92     }
93 
94     return DefaultFlowControlWindow(version);
95   }
96 
97   if (session->config()
98           ->HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
99     return session->config()
100         ->ReceivedInitialMaxStreamDataBytesIncomingBidirectional();
101   }
102 
103   return DefaultFlowControlWindow(version);
104 }
105 
106 }  // namespace
107 
108 // static
109 const SpdyPriority QuicStream::kDefaultPriority;
110 
111 // static
112 const int QuicStream::kDefaultUrgency;
113 
PendingStream(QuicStreamId id,QuicSession * session)114 PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
115     : id_(id),
116       stream_delegate_(session),
117       stream_bytes_read_(0),
118       fin_received_(false),
119       connection_flow_controller_(session->flow_controller()),
120       flow_controller_(session,
121                        id,
122                        /*is_connection_flow_controller*/ false,
123                        GetReceivedFlowControlWindow(session, id),
124                        GetInitialStreamFlowControlWindowToSend(session, id),
125                        kStreamReceiveWindowLimit,
126                        session->flow_controller()->auto_tune_receive_window(),
127                        session->flow_controller()),
128       sequencer_(this) {}
129 
OnDataAvailable()130 void PendingStream::OnDataAvailable() {
131   // Data should be kept in the sequencer so that
132   // QuicSession::ProcessPendingStream() can read it.
133 }
134 
OnFinRead()135 void PendingStream::OnFinRead() {
136   DCHECK(sequencer_.IsClosed());
137 }
138 
AddBytesConsumed(QuicByteCount bytes)139 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
140   // It will be called when the metadata of the stream is consumed.
141   flow_controller_.AddBytesConsumed(bytes);
142   connection_flow_controller_->AddBytesConsumed(bytes);
143 }
144 
Reset(QuicRstStreamErrorCode)145 void PendingStream::Reset(QuicRstStreamErrorCode /*error*/) {
146   // Currently PendingStream is only read-unidirectional. It shouldn't send
147   // Reset.
148   QUIC_NOTREACHED();
149 }
150 
OnUnrecoverableError(QuicErrorCode error,const std::string & details)151 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
152                                          const std::string& details) {
153   stream_delegate_->OnStreamError(error, details);
154 }
155 
id() const156 QuicStreamId PendingStream::id() const {
157   return id_;
158 }
159 
OnStreamFrame(const QuicStreamFrame & frame)160 void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
161   DCHECK_EQ(frame.stream_id, id_);
162 
163   bool is_stream_too_long =
164       (frame.offset > kMaxStreamLength) ||
165       (kMaxStreamLength - frame.offset < frame.data_length);
166   if (is_stream_too_long) {
167     // Close connection if stream becomes too long.
168     QUIC_PEER_BUG
169         << "Receive stream frame reaches max stream length. frame offset "
170         << frame.offset << " length " << frame.data_length;
171     OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
172                          "Peer sends more data than allowed on this stream.");
173     return;
174   }
175 
176   if (frame.offset + frame.data_length > sequencer_.close_offset()) {
177     OnUnrecoverableError(
178         QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
179         quiche::QuicheStrCat(
180             "Stream ", id_,
181             " received data with offset: ", frame.offset + frame.data_length,
182             ", which is beyond close offset: ", sequencer()->close_offset()));
183     return;
184   }
185 
186   if (frame.fin) {
187     fin_received_ = true;
188   }
189 
190   // This count includes duplicate data received.
191   QuicByteCount frame_payload_size = frame.data_length;
192   stream_bytes_read_ += frame_payload_size;
193 
194   // Flow control is interested in tracking highest received offset.
195   // Only interested in received frames that carry data.
196   if (frame_payload_size > 0 &&
197       MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
198     // As the highest received offset has changed, check to see if this is a
199     // violation of flow control.
200     if (flow_controller_.FlowControlViolation() ||
201         connection_flow_controller_->FlowControlViolation()) {
202       OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
203                            "Flow control violation after increasing offset");
204       return;
205     }
206   }
207 
208   sequencer_.OnStreamFrame(frame);
209 }
210 
OnRstStreamFrame(const QuicRstStreamFrame & frame)211 void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
212   DCHECK_EQ(frame.stream_id, id_);
213 
214   if (frame.byte_offset > kMaxStreamLength) {
215     // Peer are not suppose to write bytes more than maxium allowed.
216     OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
217                          "Reset frame stream offset overflow.");
218     return;
219   }
220 
221   const QuicStreamOffset kMaxOffset =
222       std::numeric_limits<QuicStreamOffset>::max();
223   if (sequencer()->close_offset() != kMaxOffset &&
224       frame.byte_offset != sequencer()->close_offset()) {
225     OnUnrecoverableError(
226         QUIC_STREAM_MULTIPLE_OFFSET,
227         quiche::QuicheStrCat("Stream ", id_,
228                              " received new final offset: ", frame.byte_offset,
229                              ", which is different from close offset: ",
230                              sequencer()->close_offset()));
231     return;
232   }
233 
234   MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
235   if (flow_controller_.FlowControlViolation() ||
236       connection_flow_controller_->FlowControlViolation()) {
237     OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
238                          "Flow control violation after increasing offset");
239     return;
240   }
241 }
242 
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)243 bool PendingStream::MaybeIncreaseHighestReceivedOffset(
244     QuicStreamOffset new_offset) {
245   uint64_t increment =
246       new_offset - flow_controller_.highest_received_byte_offset();
247   if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
248     return false;
249   }
250 
251   // If |new_offset| increased the stream flow controller's highest received
252   // offset, increase the connection flow controller's value by the incremental
253   // difference.
254   connection_flow_controller_->UpdateHighestReceivedOffset(
255       connection_flow_controller_->highest_received_byte_offset() + increment);
256   return true;
257 }
258 
MarkConsumed(QuicByteCount num_bytes)259 void PendingStream::MarkConsumed(QuicByteCount num_bytes) {
260   sequencer_.MarkConsumed(num_bytes);
261 }
262 
StopReading()263 void PendingStream::StopReading() {
264   QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
265   sequencer_.StopReading();
266 }
267 
QuicStream(PendingStream * pending,QuicSession * session,StreamType type,bool is_static)268 QuicStream::QuicStream(PendingStream* pending,
269                        QuicSession* session,
270                        StreamType type,
271                        bool is_static)
272     : QuicStream(pending->id_,
273                  session,
274                  std::move(pending->sequencer_),
275                  is_static,
276                  type,
277                  pending->stream_bytes_read_,
278                  pending->fin_received_,
279                  std::move(pending->flow_controller_),
280                  pending->connection_flow_controller_) {
281   sequencer_.set_stream(this);
282 }
283 
284 namespace {
285 
FlowController(QuicStreamId id,QuicSession * session,StreamType type)286 absl::optional<QuicFlowController> FlowController(QuicStreamId id,
287                                                   QuicSession* session,
288                                                   StreamType type) {
289   if (type == CRYPTO) {
290     // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
291     // it is using crypto frames instead of stream frames. The QuicCryptoStream
292     // doesn't have any flow control in that case, so we don't create a
293     // QuicFlowController for it.
294     return absl::nullopt;
295   }
296   return QuicFlowController(
297       session, id,
298       /*is_connection_flow_controller*/ false,
299       GetReceivedFlowControlWindow(session, id),
300       GetInitialStreamFlowControlWindowToSend(session, id),
301       kStreamReceiveWindowLimit,
302       session->flow_controller()->auto_tune_receive_window(),
303       session->flow_controller());
304 }
305 
306 }  // namespace
307 
QuicStream(QuicStreamId id,QuicSession * session,bool is_static,StreamType type)308 QuicStream::QuicStream(QuicStreamId id,
309                        QuicSession* session,
310                        bool is_static,
311                        StreamType type)
312     : QuicStream(id,
313                  session,
314                  QuicStreamSequencer(this),
315                  is_static,
316                  type,
317                  0,
318                  false,
319                  FlowController(id, session, type),
320                  session->flow_controller()) {}
321 
QuicStream(QuicStreamId id,QuicSession * session,QuicStreamSequencer sequencer,bool is_static,StreamType type,uint64_t stream_bytes_read,bool fin_received,absl::optional<QuicFlowController> flow_controller,QuicFlowController * connection_flow_controller)322 QuicStream::QuicStream(QuicStreamId id,
323                        QuicSession* session,
324                        QuicStreamSequencer sequencer,
325                        bool is_static,
326                        StreamType type,
327                        uint64_t stream_bytes_read,
328                        bool fin_received,
329                        absl::optional<QuicFlowController> flow_controller,
330                        QuicFlowController* connection_flow_controller)
331     : sequencer_(std::move(sequencer)),
332       id_(id),
333       session_(session),
334       stream_delegate_(session),
335       precedence_(CalculateDefaultPriority(session)),
336       stream_bytes_read_(stream_bytes_read),
337       stream_error_(QUIC_STREAM_NO_ERROR),
338       connection_error_(QUIC_NO_ERROR),
339       read_side_closed_(false),
340       write_side_closed_(false),
341       fin_buffered_(false),
342       fin_sent_(false),
343       fin_outstanding_(false),
344       fin_lost_(false),
345       fin_received_(fin_received),
346       rst_sent_(false),
347       rst_received_(false),
348       stop_sending_sent_(false),
349       flow_controller_(std::move(flow_controller)),
350       connection_flow_controller_(connection_flow_controller),
351       stream_contributes_to_connection_flow_control_(true),
352       busy_counter_(0),
353       add_random_padding_after_fin_(false),
354       send_buffer_(
355           session->connection()->helper()->GetStreamSendBufferAllocator()),
356       buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)),
357       is_static_(is_static),
358       deadline_(QuicTime::Zero()),
359       was_draining_(false),
360       type_(VersionHasIetfQuicFrames(session->transport_version()) &&
361                     type != CRYPTO
362                 ? QuicUtils::GetStreamType(id_,
363                                            session->perspective(),
364                                            session->IsIncomingStream(id_),
365                                            session->version())
366                 : type),
367       creation_time_(session->connection()->clock()->ApproximateNow()),
368       perspective_(session->perspective()) {
369   if (type_ == WRITE_UNIDIRECTIONAL) {
370     fin_received_ = true;
371     CloseReadSide();
372   } else if (type_ == READ_UNIDIRECTIONAL) {
373     fin_sent_ = true;
374     CloseWriteSide();
375   }
376   if (type_ != CRYPTO) {
377     stream_delegate_->RegisterStreamPriority(id, is_static_, precedence_);
378   }
379 }
380 
~QuicStream()381 QuicStream::~QuicStream() {
382   if (session_ != nullptr && IsWaitingForAcks()) {
383     QUIC_DVLOG(1)
384         << ENDPOINT << "Stream " << id_
385         << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
386         << send_buffer_.stream_bytes_outstanding()
387         << ", fin_outstanding: " << fin_outstanding_;
388   }
389   if (stream_delegate_ != nullptr && type_ != CRYPTO) {
390     stream_delegate_->UnregisterStreamPriority(id(), is_static_);
391   }
392 }
393 
OnStreamFrame(const QuicStreamFrame & frame)394 void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
395   DCHECK_EQ(frame.stream_id, id_);
396 
397   DCHECK(!(read_side_closed_ && write_side_closed_));
398 
399   if (frame.fin && is_static_) {
400     OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
401                          "Attempt to close a static stream");
402     return;
403   }
404 
405   if (type_ == WRITE_UNIDIRECTIONAL) {
406     OnUnrecoverableError(QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
407                          "Data received on write unidirectional stream");
408     return;
409   }
410 
411   bool is_stream_too_long =
412       (frame.offset > kMaxStreamLength) ||
413       (kMaxStreamLength - frame.offset < frame.data_length);
414   if (is_stream_too_long) {
415     // Close connection if stream becomes too long.
416     QUIC_PEER_BUG << "Receive stream frame on stream " << id_
417                   << " reaches max stream length. frame offset " << frame.offset
418                   << " length " << frame.data_length << ". "
419                   << sequencer_.DebugString();
420     OnUnrecoverableError(
421         QUIC_STREAM_LENGTH_OVERFLOW,
422         quiche::QuicheStrCat("Peer sends more data than allowed on stream ",
423                              id_, ". frame: offset = ", frame.offset,
424                              ", length = ", frame.data_length, ". ",
425                              sequencer_.DebugString()));
426     return;
427   }
428 
429   if (frame.offset + frame.data_length > sequencer_.close_offset()) {
430     OnUnrecoverableError(
431         QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
432         quiche::QuicheStrCat(
433             "Stream ", id_,
434             " received data with offset: ", frame.offset + frame.data_length,
435             ", which is beyond close offset: ", sequencer_.close_offset()));
436     return;
437   }
438 
439   if (frame.fin && !fin_received_) {
440     fin_received_ = true;
441     if (fin_sent_) {
442       DCHECK(!was_draining_);
443       session_->StreamDraining(id_,
444                                /*unidirectional=*/type_ != BIDIRECTIONAL);
445       was_draining_ = true;
446     }
447   }
448 
449   if (read_side_closed_) {
450     QUIC_DLOG(INFO)
451         << ENDPOINT << "Stream " << frame.stream_id
452         << " is closed for reading. Ignoring newly received stream data.";
453     // The subclass does not want to read data:  blackhole the data.
454     return;
455   }
456 
457   // This count includes duplicate data received.
458   QuicByteCount frame_payload_size = frame.data_length;
459   stream_bytes_read_ += frame_payload_size;
460 
461   // Flow control is interested in tracking highest received offset.
462   // Only interested in received frames that carry data.
463   if (frame_payload_size > 0 &&
464       MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
465     // As the highest received offset has changed, check to see if this is a
466     // violation of flow control.
467     QUIC_BUG_IF(!flow_controller_.has_value())
468         << ENDPOINT << "OnStreamFrame called on stream without flow control";
469     if ((flow_controller_.has_value() &&
470          flow_controller_->FlowControlViolation()) ||
471         connection_flow_controller_->FlowControlViolation()) {
472       OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
473                            "Flow control violation after increasing offset");
474       return;
475     }
476   }
477 
478   sequencer_.OnStreamFrame(frame);
479 }
480 
OnStopSending(QuicRstStreamErrorCode code)481 bool QuicStream::OnStopSending(QuicRstStreamErrorCode code) {
482   // Do not reset the stream if all data has been sent and acknowledged.
483   if (write_side_closed() && !IsWaitingForAcks()) {
484     QUIC_DVLOG(1) << ENDPOINT
485                   << "Ignoring STOP_SENDING for a write closed stream, id: "
486                   << id_;
487     return false;
488   }
489 
490   if (is_static_) {
491     QUIC_DVLOG(1) << ENDPOINT
492                   << "Received STOP_SENDING for a static stream, id: " << id_
493                   << " Closing connection";
494     OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
495                          "Received STOP_SENDING for a static stream");
496     return false;
497   }
498 
499   stream_error_ = code;
500 
501   if (session()->split_up_send_rst()) {
502     QUIC_RELOADABLE_FLAG_COUNT_N(quic_split_up_send_rst_2, 1, 3);
503     MaybeSendRstStream(code);
504   } else {
505     session()->SendRstStream(id(), code, stream_bytes_written(),
506                              /*send_rst_only = */ true);
507     rst_sent_ = true;
508     CloseWriteSide();
509   }
510   return true;
511 }
512 
num_frames_received() const513 int QuicStream::num_frames_received() const {
514   return sequencer_.num_frames_received();
515 }
516 
num_duplicate_frames_received() const517 int QuicStream::num_duplicate_frames_received() const {
518   return sequencer_.num_duplicate_frames_received();
519 }
520 
OnStreamReset(const QuicRstStreamFrame & frame)521 void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
522   rst_received_ = true;
523   if (frame.byte_offset > kMaxStreamLength) {
524     // Peer are not suppose to write bytes more than maxium allowed.
525     OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
526                          "Reset frame stream offset overflow.");
527     return;
528   }
529 
530   const QuicStreamOffset kMaxOffset =
531       std::numeric_limits<QuicStreamOffset>::max();
532   if (sequencer()->close_offset() != kMaxOffset &&
533       frame.byte_offset != sequencer()->close_offset()) {
534     OnUnrecoverableError(
535         QUIC_STREAM_MULTIPLE_OFFSET,
536         quiche::QuicheStrCat("Stream ", id_,
537                              " received new final offset: ", frame.byte_offset,
538                              ", which is different from close offset: ",
539                              sequencer_.close_offset()));
540     return;
541   }
542 
543   MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
544   QUIC_BUG_IF(!flow_controller_.has_value())
545       << ENDPOINT << "OnStreamReset called on stream without flow control";
546   if ((flow_controller_.has_value() &&
547        flow_controller_->FlowControlViolation()) ||
548       connection_flow_controller_->FlowControlViolation()) {
549     OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
550                          "Flow control violation after increasing offset");
551     return;
552   }
553 
554   stream_error_ = frame.error_code;
555   // Google QUIC closes both sides of the stream in response to a
556   // RESET_STREAM, IETF QUIC closes only the read side.
557   if (!VersionHasIetfQuicFrames(transport_version())) {
558     CloseWriteSide();
559   }
560   CloseReadSide();
561 }
562 
OnConnectionClosed(QuicErrorCode error,ConnectionCloseSource)563 void QuicStream::OnConnectionClosed(QuicErrorCode error,
564                                     ConnectionCloseSource /*source*/) {
565   if (read_side_closed_ && write_side_closed_) {
566     return;
567   }
568   if (error != QUIC_NO_ERROR) {
569     stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
570     connection_error_ = error;
571   }
572 
573   CloseWriteSide();
574   CloseReadSide();
575 }
576 
OnFinRead()577 void QuicStream::OnFinRead() {
578   DCHECK(sequencer_.IsClosed());
579   // OnFinRead can be called due to a FIN flag in a headers block, so there may
580   // have been no OnStreamFrame call with a FIN in the frame.
581   fin_received_ = true;
582   // If fin_sent_ is true, then CloseWriteSide has already been called, and the
583   // stream will be destroyed by CloseReadSide, so don't need to call
584   // StreamDraining.
585   CloseReadSide();
586 }
587 
SetFinSent()588 void QuicStream::SetFinSent() {
589   DCHECK(!VersionUsesHttp3(transport_version()));
590   fin_sent_ = true;
591 }
592 
Reset(QuicRstStreamErrorCode error)593 void QuicStream::Reset(QuicRstStreamErrorCode error) {
594   stream_error_ = error;
595   if (session()->split_up_send_rst()) {
596     QUIC_RELOADABLE_FLAG_COUNT_N(quic_split_up_send_rst_2, 2, 3);
597     QuicConnection::ScopedPacketFlusher flusher(session()->connection());
598     MaybeSendStopSending(error);
599     MaybeSendRstStream(error);
600   } else {
601     session()->SendRstStream(id(), error, stream_bytes_written(),
602                              /*send_rst_only = */ false);
603     rst_sent_ = true;
604   }
605   if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
606     session()->MaybeCloseZombieStream(id_);
607     return;
608   }
609   if (!session()->split_up_send_rst()) {
610     CloseReadSide();
611     CloseWriteSide();
612   }
613 }
614 
OnUnrecoverableError(QuicErrorCode error,const std::string & details)615 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
616                                       const std::string& details) {
617   stream_delegate_->OnStreamError(error, details);
618 }
619 
precedence() const620 const spdy::SpdyStreamPrecedence& QuicStream::precedence() const {
621   return precedence_;
622 }
623 
SetPriority(const spdy::SpdyStreamPrecedence & precedence)624 void QuicStream::SetPriority(const spdy::SpdyStreamPrecedence& precedence) {
625   precedence_ = precedence;
626 
627   MaybeSendPriorityUpdateFrame();
628 
629   stream_delegate_->UpdateStreamPriority(id(), precedence);
630 }
631 
WriteOrBufferData(absl::string_view data,bool fin,QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener)632 void QuicStream::WriteOrBufferData(
633     absl::string_view data,
634     bool fin,
635     QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
636   if (session()->use_write_or_buffer_data_at_level()) {
637     QUIC_BUG_IF(QuicUtils::IsCryptoStreamId(transport_version(), id_))
638         << ENDPOINT
639         << "WriteOrBufferData is used to send application data, use "
640            "WriteOrBufferDataAtLevel to send crypto data.";
641     return WriteOrBufferDataAtLevel(
642         data, fin, session()->GetEncryptionLevelToSendApplicationData(),
643         ack_listener);
644   }
645   return WriteOrBufferDataInner(data, fin, absl::nullopt, ack_listener);
646 }
647 
WriteOrBufferDataInner(absl::string_view data,bool fin,absl::optional<EncryptionLevel> level,QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener)648 void QuicStream::WriteOrBufferDataInner(
649     absl::string_view data,
650     bool fin,
651     absl::optional<EncryptionLevel> level,
652     QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
653   if (data.empty() && !fin) {
654     QUIC_BUG << "data.empty() && !fin";
655     return;
656   }
657 
658   if (fin_buffered_) {
659     QUIC_BUG << "Fin already buffered";
660     return;
661   }
662   if (write_side_closed_) {
663     QUIC_DLOG(ERROR) << ENDPOINT
664                      << "Attempt to write when the write side is closed";
665     if (type_ == READ_UNIDIRECTIONAL) {
666       OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
667                            "Try to send data on read unidirectional stream");
668     }
669     return;
670   }
671 
672   fin_buffered_ = fin;
673 
674   bool had_buffered_data = HasBufferedData();
675   // Do not respect buffered data upper limit as WriteOrBufferData guarantees
676   // all data to be consumed.
677   if (data.length() > 0) {
678     struct iovec iov(QuicUtils::MakeIovec(data));
679     QuicStreamOffset offset = send_buffer_.stream_offset();
680     if (kMaxStreamLength - offset < data.length()) {
681       QUIC_BUG << "Write too many data via stream " << id_;
682       OnUnrecoverableError(
683           QUIC_STREAM_LENGTH_OVERFLOW,
684           quiche::QuicheStrCat("Write too many data via stream ", id_));
685       return;
686     }
687     send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
688     OnDataBuffered(offset, data.length(), ack_listener);
689   }
690   if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
691     // Write data if there is no buffered data before.
692     WriteBufferedData(level);
693   }
694 }
695 
WriteOrBufferDataAtLevel(absl::string_view data,bool fin,EncryptionLevel level,QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener)696 void QuicStream::WriteOrBufferDataAtLevel(
697     absl::string_view data,
698     bool fin,
699     EncryptionLevel level,
700     QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
701   DCHECK(session()->use_write_or_buffer_data_at_level());
702   QUIC_RELOADABLE_FLAG_COUNT(quic_use_write_or_buffer_data_at_level);
703   return WriteOrBufferDataInner(data, fin, level, ack_listener);
704 }
705 
OnCanWrite()706 void QuicStream::OnCanWrite() {
707   if (HasDeadlinePassed()) {
708     OnDeadlinePassed();
709     return;
710   }
711   if (HasPendingRetransmission()) {
712     WritePendingRetransmission();
713     // Exit early to allow other streams to write pending retransmissions if
714     // any.
715     return;
716   }
717 
718   if (write_side_closed_) {
719     QUIC_DLOG(ERROR)
720         << ENDPOINT << "Stream " << id()
721         << " attempting to write new data when the write side is closed";
722     return;
723   }
724   if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
725     absl::optional<EncryptionLevel> send_level = absl::nullopt;
726     if (session()->use_write_or_buffer_data_at_level()) {
727       send_level = session()->GetEncryptionLevelToSendApplicationData();
728     }
729     WriteBufferedData(send_level);
730   }
731   if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
732     // Notify upper layer to write new data when buffered data size is below
733     // low water mark.
734     OnCanWriteNewData();
735   }
736 }
737 
MaybeSendBlocked()738 void QuicStream::MaybeSendBlocked() {
739   if (!flow_controller_.has_value()) {
740     QUIC_BUG << ENDPOINT
741              << "MaybeSendBlocked called on stream without flow control";
742     return;
743   }
744   if (flow_controller_->ShouldSendBlocked()) {
745     session_->SendBlocked(id_);
746   }
747   if (!stream_contributes_to_connection_flow_control_) {
748     return;
749   }
750   if (connection_flow_controller_->ShouldSendBlocked()) {
751     session_->SendBlocked(QuicUtils::GetInvalidStreamId(transport_version()));
752   }
753   // If the stream is blocked by connection-level flow control but not by
754   // stream-level flow control, add the stream to the write blocked list so that
755   // the stream will be given a chance to write when a connection-level
756   // WINDOW_UPDATE arrives.
757   if (connection_flow_controller_->IsBlocked() &&
758       !flow_controller_->IsBlocked()) {
759     session_->MarkConnectionLevelWriteBlocked(id());
760   }
761 }
762 
WriteMemSlices(QuicMemSliceSpan span,bool fin)763 QuicConsumedData QuicStream::WriteMemSlices(QuicMemSliceSpan span, bool fin) {
764   QuicConsumedData consumed_data(0, false);
765   if (span.empty() && !fin) {
766     QUIC_BUG << "span.empty() && !fin";
767     return consumed_data;
768   }
769 
770   if (fin_buffered_) {
771     QUIC_BUG << "Fin already buffered";
772     return consumed_data;
773   }
774 
775   if (write_side_closed_) {
776     QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
777                      << " attempting to write when the write side is closed";
778     if (type_ == READ_UNIDIRECTIONAL) {
779       OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
780                            "Try to send data on read unidirectional stream");
781     }
782     return consumed_data;
783   }
784 
785   bool had_buffered_data = HasBufferedData();
786   if (CanWriteNewData() || span.empty()) {
787     consumed_data.fin_consumed = fin;
788     if (!span.empty()) {
789       // Buffer all data if buffered data size is below limit.
790       QuicStreamOffset offset = send_buffer_.stream_offset();
791       consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
792       if (offset > send_buffer_.stream_offset() ||
793           kMaxStreamLength < send_buffer_.stream_offset()) {
794         QUIC_BUG << "Write too many data via stream " << id_;
795         OnUnrecoverableError(
796             QUIC_STREAM_LENGTH_OVERFLOW,
797             quiche::QuicheStrCat("Write too many data via stream ", id_));
798         return consumed_data;
799       }
800       OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
801     }
802   }
803   fin_buffered_ = consumed_data.fin_consumed;
804 
805   if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
806     // Write data if there is no buffered data before.
807     absl::optional<EncryptionLevel> send_level = absl::nullopt;
808     if (session()->use_write_or_buffer_data_at_level()) {
809       send_level = session()->GetEncryptionLevelToSendApplicationData();
810     }
811     WriteBufferedData(send_level);
812   }
813 
814   return consumed_data;
815 }
816 
HasPendingRetransmission() const817 bool QuicStream::HasPendingRetransmission() const {
818   return send_buffer_.HasPendingRetransmission() || fin_lost_;
819 }
820 
IsStreamFrameOutstanding(QuicStreamOffset offset,QuicByteCount data_length,bool fin) const821 bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
822                                           QuicByteCount data_length,
823                                           bool fin) const {
824   return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
825          (fin && fin_outstanding_);
826 }
827 
CloseReadSide()828 void QuicStream::CloseReadSide() {
829   if (read_side_closed_) {
830     return;
831   }
832   QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
833 
834   read_side_closed_ = true;
835   sequencer_.ReleaseBuffer();
836 
837   if (write_side_closed_) {
838     QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
839     session_->OnStreamClosed(id());
840     OnClose();
841   }
842 }
843 
CloseWriteSide()844 void QuicStream::CloseWriteSide() {
845   if (write_side_closed_) {
846     return;
847   }
848   QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
849 
850   write_side_closed_ = true;
851   if (read_side_closed_) {
852     QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
853     session_->OnStreamClosed(id());
854     OnClose();
855   }
856 }
857 
MaybeSendStopSending(QuicRstStreamErrorCode error)858 void QuicStream::MaybeSendStopSending(QuicRstStreamErrorCode error) {
859   DCHECK(session()->split_up_send_rst());
860   if (stop_sending_sent_) {
861     return;
862   }
863 
864   if (!session()->version().UsesHttp3() && error != QUIC_STREAM_NO_ERROR) {
865     // In gQUIC, RST with error closes both read and write side.
866     return;
867   }
868 
869   if (session()->version().UsesHttp3()) {
870     session()->MaybeSendStopSendingFrame(id(), error);
871   } else {
872     DCHECK_EQ(QUIC_STREAM_NO_ERROR, error);
873     session()->MaybeSendRstStreamFrame(id(), QUIC_STREAM_NO_ERROR,
874                                        stream_bytes_written());
875   }
876   stop_sending_sent_ = true;
877   CloseReadSide();
878 }
879 
MaybeSendRstStream(QuicRstStreamErrorCode error)880 void QuicStream::MaybeSendRstStream(QuicRstStreamErrorCode error) {
881   DCHECK(session()->split_up_send_rst());
882   if (rst_sent_) {
883     return;
884   }
885 
886   if (!session()->version().UsesHttp3()) {
887     QUIC_BUG_IF(error == QUIC_STREAM_NO_ERROR);
888     stop_sending_sent_ = true;
889     CloseReadSide();
890   }
891   session()->MaybeSendRstStreamFrame(id(), error, stream_bytes_written());
892   rst_sent_ = true;
893   CloseWriteSide();
894 }
895 
HasBufferedData() const896 bool QuicStream::HasBufferedData() const {
897   DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
898   return send_buffer_.stream_offset() > stream_bytes_written();
899 }
900 
transport_version() const901 QuicTransportVersion QuicStream::transport_version() const {
902   return session_->transport_version();
903 }
904 
handshake_protocol() const905 HandshakeProtocol QuicStream::handshake_protocol() const {
906   return session_->connection()->version().handshake_protocol;
907 }
908 
StopReading()909 void QuicStream::StopReading() {
910   QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
911   sequencer_.StopReading();
912 }
913 
OnClose()914 void QuicStream::OnClose() {
915   DCHECK(read_side_closed_ && write_side_closed_);
916 
917   if (!fin_sent_ && !rst_sent_) {
918     if (!session()->split_up_send_rst()) {
919       // For flow control accounting, tell the peer how many bytes have been
920       // written on this stream before termination. Done here if needed, using a
921       // RST_STREAM frame.
922       QUIC_DLOG(INFO) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
923       session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
924                               stream_bytes_written(),
925                               /*send_rst_only = */ false);
926       session_->MaybeCloseZombieStream(id_);
927       rst_sent_ = true;
928     } else {
929       QUIC_RELOADABLE_FLAG_COUNT_N(quic_split_up_send_rst_2, 3, 3);
930       QUIC_BUG_IF(session()->connection()->connected() &&
931                   session()->version().UsesHttp3())
932           << "The stream should've already sent RST in response to "
933              "STOP_SENDING";
934       MaybeSendRstStream(QUIC_RST_ACKNOWLEDGEMENT);
935       session_->MaybeCloseZombieStream(id_);
936     }
937   }
938 
939   if (!flow_controller_.has_value() ||
940       flow_controller_->FlowControlViolation() ||
941       connection_flow_controller_->FlowControlViolation()) {
942     return;
943   }
944   // The stream is being closed and will not process any further incoming bytes.
945   // As there may be more bytes in flight, to ensure that both endpoints have
946   // the same connection level flow control state, mark all unreceived or
947   // buffered bytes as consumed.
948   QuicByteCount bytes_to_consume =
949       flow_controller_->highest_received_byte_offset() -
950       flow_controller_->bytes_consumed();
951   AddBytesConsumed(bytes_to_consume);
952 }
953 
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)954 void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
955   if (type_ == READ_UNIDIRECTIONAL) {
956     OnUnrecoverableError(
957         QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
958         "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
959     return;
960   }
961 
962   if (!flow_controller_.has_value()) {
963     QUIC_BUG << ENDPOINT
964              << "OnWindowUpdateFrame called on stream without flow control";
965     return;
966   }
967 
968   if (flow_controller_->UpdateSendWindowOffset(frame.max_data)) {
969     // Let session unblock this stream.
970     session_->MarkConnectionLevelWriteBlocked(id_);
971   }
972 }
973 
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)974 bool QuicStream::MaybeIncreaseHighestReceivedOffset(
975     QuicStreamOffset new_offset) {
976   if (!flow_controller_.has_value()) {
977     QUIC_BUG << ENDPOINT
978              << "MaybeIncreaseHighestReceivedOffset called on stream without "
979                 "flow control";
980     return false;
981   }
982   uint64_t increment =
983       new_offset - flow_controller_->highest_received_byte_offset();
984   if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
985     return false;
986   }
987 
988   // If |new_offset| increased the stream flow controller's highest received
989   // offset, increase the connection flow controller's value by the incremental
990   // difference.
991   if (stream_contributes_to_connection_flow_control_) {
992     connection_flow_controller_->UpdateHighestReceivedOffset(
993         connection_flow_controller_->highest_received_byte_offset() +
994         increment);
995   }
996   return true;
997 }
998 
AddBytesSent(QuicByteCount bytes)999 void QuicStream::AddBytesSent(QuicByteCount bytes) {
1000   if (!flow_controller_.has_value()) {
1001     QUIC_BUG << ENDPOINT
1002              << "AddBytesSent called on stream without flow control";
1003     return;
1004   }
1005   flow_controller_->AddBytesSent(bytes);
1006   if (stream_contributes_to_connection_flow_control_) {
1007     connection_flow_controller_->AddBytesSent(bytes);
1008   }
1009 }
1010 
AddBytesConsumed(QuicByteCount bytes)1011 void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
1012   if (type_ == CRYPTO) {
1013     // A stream with type CRYPTO has no flow control, so there's nothing this
1014     // function needs to do. This function still gets called by the
1015     // QuicStreamSequencers used by QuicCryptoStream.
1016     return;
1017   }
1018   if (!flow_controller_.has_value()) {
1019     QUIC_BUG
1020         << ENDPOINT
1021         << "AddBytesConsumed called on non-crypto stream without flow control";
1022     return;
1023   }
1024   // Only adjust stream level flow controller if still reading.
1025   if (!read_side_closed_) {
1026     flow_controller_->AddBytesConsumed(bytes);
1027   }
1028 
1029   if (stream_contributes_to_connection_flow_control_) {
1030     connection_flow_controller_->AddBytesConsumed(bytes);
1031   }
1032 }
1033 
MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,bool was_zero_rtt_rejected)1034 bool QuicStream::MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,
1035                                              bool was_zero_rtt_rejected) {
1036   if (!flow_controller_.has_value()) {
1037     QUIC_BUG << ENDPOINT
1038              << "ConfigSendWindowOffset called on stream without flow control";
1039     return false;
1040   }
1041 
1042   // The validation code below is for QUIC with TLS only.
1043   if (new_offset < flow_controller_->send_window_offset()) {
1044     DCHECK(session()->version().UsesTls());
1045     if (was_zero_rtt_rejected && new_offset < flow_controller_->bytes_sent()) {
1046       // The client is given flow control window lower than what's written in
1047       // 0-RTT. This QUIC implementation is unable to retransmit them.
1048       QUIC_BUG_IF(perspective_ == Perspective::IS_SERVER)
1049           << "Server streams' flow control should never be configured twice.";
1050       OnUnrecoverableError(
1051           QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1052           quiche::QuicheStrCat(
1053               "Server rejected 0-RTT, aborting because new stream max data ",
1054               new_offset, " for stream ", id_, " is less than currently used: ",
1055               flow_controller_->bytes_sent()));
1056       return false;
1057     } else if (session()->version().AllowsLowFlowControlLimits()) {
1058       // In IETF QUIC, if the client receives flow control limit lower than what
1059       // was resumed from 0-RTT, depending on 0-RTT status, it's either the
1060       // peer's fault or our implementation's fault.
1061       QUIC_BUG_IF(perspective_ == Perspective::IS_SERVER)
1062           << "Server streams' flow control should never be configured twice.";
1063       OnUnrecoverableError(
1064           was_zero_rtt_rejected ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1065                                 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1066           quiche::QuicheStrCat(
1067               was_zero_rtt_rejected ? "Server rejected 0-RTT, aborting because "
1068                                     : "",
1069               "new stream max data ", new_offset, " decreases current limit: ",
1070               flow_controller_->send_window_offset()));
1071       return false;
1072     }
1073   }
1074 
1075   if (flow_controller_->UpdateSendWindowOffset(new_offset)) {
1076     // Let session unblock this stream.
1077     session_->MarkConnectionLevelWriteBlocked(id_);
1078   }
1079   return true;
1080 }
1081 
AddRandomPaddingAfterFin()1082 void QuicStream::AddRandomPaddingAfterFin() {
1083   add_random_padding_after_fin_ = true;
1084 }
1085 
OnStreamFrameAcked(QuicStreamOffset offset,QuicByteCount data_length,bool fin_acked,QuicTime::Delta,QuicTime,QuicByteCount * newly_acked_length)1086 bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
1087                                     QuicByteCount data_length,
1088                                     bool fin_acked,
1089                                     QuicTime::Delta /*ack_delay_time*/,
1090                                     QuicTime /*receive_timestamp*/,
1091                                     QuicByteCount* newly_acked_length) {
1092   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
1093                 << "[" << offset << ", " << offset + data_length << "]"
1094                 << " fin = " << fin_acked;
1095   *newly_acked_length = 0;
1096   if (!send_buffer_.OnStreamDataAcked(offset, data_length,
1097                                       newly_acked_length)) {
1098     OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
1099     return false;
1100   }
1101   if (!fin_sent_ && fin_acked) {
1102     OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent fin.");
1103     return false;
1104   }
1105   // Indicates whether ack listener's OnPacketAcked should be called.
1106   const bool new_data_acked =
1107       *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
1108   if (fin_acked) {
1109     fin_outstanding_ = false;
1110     fin_lost_ = false;
1111   }
1112   if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
1113     session_->MaybeCloseZombieStream(id_);
1114   }
1115   return new_data_acked;
1116 }
1117 
OnStreamFrameRetransmitted(QuicStreamOffset offset,QuicByteCount data_length,bool fin_retransmitted)1118 void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
1119                                             QuicByteCount data_length,
1120                                             bool fin_retransmitted) {
1121   send_buffer_.OnStreamDataRetransmitted(offset, data_length);
1122   if (fin_retransmitted) {
1123     fin_lost_ = false;
1124   }
1125 }
1126 
OnStreamFrameLost(QuicStreamOffset offset,QuicByteCount data_length,bool fin_lost)1127 void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
1128                                    QuicByteCount data_length,
1129                                    bool fin_lost) {
1130   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
1131                 << "[" << offset << ", " << offset + data_length << "]"
1132                 << " fin = " << fin_lost;
1133   if (data_length > 0) {
1134     send_buffer_.OnStreamDataLost(offset, data_length);
1135   }
1136   if (fin_lost && fin_outstanding_) {
1137     fin_lost_ = true;
1138   }
1139 }
1140 
RetransmitStreamData(QuicStreamOffset offset,QuicByteCount data_length,bool fin,TransmissionType type)1141 bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
1142                                       QuicByteCount data_length,
1143                                       bool fin,
1144                                       TransmissionType type) {
1145   DCHECK(type == PTO_RETRANSMISSION || type == RTO_RETRANSMISSION ||
1146          type == TLP_RETRANSMISSION || type == PROBING_RETRANSMISSION);
1147   if (HasDeadlinePassed()) {
1148     OnDeadlinePassed();
1149     return true;
1150   }
1151   QuicIntervalSet<QuicStreamOffset> retransmission(offset,
1152                                                    offset + data_length);
1153   retransmission.Difference(bytes_acked());
1154   bool retransmit_fin = fin && fin_outstanding_;
1155   if (retransmission.Empty() && !retransmit_fin) {
1156     return true;
1157   }
1158   absl::optional<EncryptionLevel> send_level = absl::nullopt;
1159   if (session()->use_write_or_buffer_data_at_level()) {
1160     send_level = session()->GetEncryptionLevelToSendApplicationData();
1161   }
1162   QuicConsumedData consumed(0, false);
1163   for (const auto& interval : retransmission) {
1164     QuicStreamOffset retransmission_offset = interval.min();
1165     QuicByteCount retransmission_length = interval.max() - interval.min();
1166     const bool can_bundle_fin =
1167         retransmit_fin && (retransmission_offset + retransmission_length ==
1168                            stream_bytes_written());
1169     consumed = stream_delegate_->WritevData(
1170         id_, retransmission_length, retransmission_offset,
1171         can_bundle_fin ? FIN : NO_FIN, type, send_level);
1172     QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1173                   << " is forced to retransmit stream data ["
1174                   << retransmission_offset << ", "
1175                   << retransmission_offset + retransmission_length
1176                   << ") and fin: " << can_bundle_fin
1177                   << ", consumed: " << consumed;
1178     OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
1179                                consumed.fin_consumed);
1180     if (can_bundle_fin) {
1181       retransmit_fin = !consumed.fin_consumed;
1182     }
1183     if (consumed.bytes_consumed < retransmission_length ||
1184         (can_bundle_fin && !consumed.fin_consumed)) {
1185       // Connection is write blocked.
1186       return false;
1187     }
1188   }
1189   if (retransmit_fin) {
1190     QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1191                   << " retransmits fin only frame.";
1192     consumed = stream_delegate_->WritevData(id_, 0, stream_bytes_written(), FIN,
1193                                             type, send_level);
1194     if (!consumed.fin_consumed) {
1195       return false;
1196     }
1197   }
1198   return true;
1199 }
1200 
IsWaitingForAcks() const1201 bool QuicStream::IsWaitingForAcks() const {
1202   return (!rst_sent_ || stream_error_ == QUIC_STREAM_NO_ERROR) &&
1203          (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
1204 }
1205 
ReadableBytes() const1206 QuicByteCount QuicStream::ReadableBytes() const {
1207   return sequencer_.ReadableBytes();
1208 }
1209 
WriteStreamData(QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)1210 bool QuicStream::WriteStreamData(QuicStreamOffset offset,
1211                                  QuicByteCount data_length,
1212                                  QuicDataWriter* writer) {
1213   DCHECK_LT(0u, data_length);
1214   QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
1215                 << offset << " length " << data_length;
1216   return send_buffer_.WriteStreamData(offset, data_length, writer);
1217 }
1218 
WriteBufferedData(absl::optional<EncryptionLevel> level)1219 void QuicStream::WriteBufferedData(absl::optional<EncryptionLevel> level) {
1220   DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
1221 
1222   if (session_->ShouldYield(id())) {
1223     session_->MarkConnectionLevelWriteBlocked(id());
1224     return;
1225   }
1226 
1227   // Size of buffered data.
1228   QuicByteCount write_length = BufferedDataBytes();
1229 
1230   // A FIN with zero data payload should not be flow control blocked.
1231   bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
1232 
1233   bool fin = fin_buffered_;
1234 
1235   // How much data flow control permits to be written.
1236   QuicByteCount send_window;
1237   if (flow_controller_.has_value()) {
1238     send_window = flow_controller_->SendWindowSize();
1239   } else {
1240     send_window = std::numeric_limits<QuicByteCount>::max();
1241     QUIC_BUG << ENDPOINT
1242              << "WriteBufferedData called on stream without flow control";
1243   }
1244   if (stream_contributes_to_connection_flow_control_) {
1245     send_window =
1246         std::min(send_window, connection_flow_controller_->SendWindowSize());
1247   }
1248 
1249   if (send_window == 0 && !fin_with_zero_data) {
1250     // Quick return if nothing can be sent.
1251     MaybeSendBlocked();
1252     return;
1253   }
1254 
1255   if (write_length > send_window) {
1256     // Don't send the FIN unless all the data will be sent.
1257     fin = false;
1258 
1259     // Writing more data would be a violation of flow control.
1260     write_length = send_window;
1261     QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
1262                   << write_length << " due to flow control";
1263   }
1264 
1265   StreamSendingState state = fin ? FIN : NO_FIN;
1266   if (fin && add_random_padding_after_fin_) {
1267     state = FIN_AND_PADDING;
1268   }
1269   QuicConsumedData consumed_data =
1270       stream_delegate_->WritevData(id(), write_length, stream_bytes_written(),
1271                                    state, NOT_RETRANSMISSION, level);
1272 
1273   OnStreamDataConsumed(consumed_data.bytes_consumed);
1274 
1275   AddBytesSent(consumed_data.bytes_consumed);
1276   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
1277                 << stream_bytes_written() << " bytes "
1278                 << " and has buffered data " << BufferedDataBytes() << " bytes."
1279                 << " fin is sent: " << consumed_data.fin_consumed
1280                 << " fin is buffered: " << fin_buffered_;
1281 
1282   // The write may have generated a write error causing this stream to be
1283   // closed. If so, simply return without marking the stream write blocked.
1284   if (write_side_closed_) {
1285     return;
1286   }
1287 
1288   if (consumed_data.bytes_consumed == write_length) {
1289     if (!fin_with_zero_data) {
1290       MaybeSendBlocked();
1291     }
1292     if (fin && consumed_data.fin_consumed) {
1293       DCHECK(!fin_sent_);
1294       fin_sent_ = true;
1295       fin_outstanding_ = true;
1296       if (fin_received_) {
1297         DCHECK(!was_draining_);
1298         session_->StreamDraining(id_,
1299                                  /*unidirectional=*/type_ != BIDIRECTIONAL);
1300         was_draining_ = true;
1301       }
1302       CloseWriteSide();
1303     } else if (fin && !consumed_data.fin_consumed) {
1304       session_->MarkConnectionLevelWriteBlocked(id());
1305     }
1306   } else {
1307     session_->MarkConnectionLevelWriteBlocked(id());
1308   }
1309   if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1310     busy_counter_ = 0;
1311   }
1312 }
1313 
BufferedDataBytes() const1314 uint64_t QuicStream::BufferedDataBytes() const {
1315   DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1316   return send_buffer_.stream_offset() - stream_bytes_written();
1317 }
1318 
CanWriteNewData() const1319 bool QuicStream::CanWriteNewData() const {
1320   return BufferedDataBytes() < buffered_data_threshold_;
1321 }
1322 
CanWriteNewDataAfterData(QuicByteCount length) const1323 bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1324   return (BufferedDataBytes() + length) < buffered_data_threshold_;
1325 }
1326 
stream_bytes_written() const1327 uint64_t QuicStream::stream_bytes_written() const {
1328   return send_buffer_.stream_bytes_written();
1329 }
1330 
bytes_acked() const1331 const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1332   return send_buffer_.bytes_acked();
1333 }
1334 
OnStreamDataConsumed(QuicByteCount bytes_consumed)1335 void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) {
1336   send_buffer_.OnStreamDataConsumed(bytes_consumed);
1337 }
1338 
WritePendingRetransmission()1339 void QuicStream::WritePendingRetransmission() {
1340   while (HasPendingRetransmission()) {
1341     QuicConsumedData consumed(0, false);
1342     absl::optional<EncryptionLevel> send_level = absl::nullopt;
1343     if (session()->use_write_or_buffer_data_at_level()) {
1344       send_level = session()->GetEncryptionLevelToSendApplicationData();
1345     }
1346     if (!send_buffer_.HasPendingRetransmission()) {
1347       QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1348                     << " retransmits fin only frame.";
1349       consumed = stream_delegate_->WritevData(
1350           id_, 0, stream_bytes_written(), FIN, LOSS_RETRANSMISSION, send_level);
1351       fin_lost_ = !consumed.fin_consumed;
1352       if (fin_lost_) {
1353         // Connection is write blocked.
1354         return;
1355       }
1356     } else {
1357       StreamPendingRetransmission pending =
1358           send_buffer_.NextPendingRetransmission();
1359       // Determine whether the lost fin can be bundled with the data.
1360       const bool can_bundle_fin =
1361           fin_lost_ &&
1362           (pending.offset + pending.length == stream_bytes_written());
1363       consumed = stream_delegate_->WritevData(
1364           id_, pending.length, pending.offset, can_bundle_fin ? FIN : NO_FIN,
1365           LOSS_RETRANSMISSION, send_level);
1366       QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1367                     << " tries to retransmit stream data [" << pending.offset
1368                     << ", " << pending.offset + pending.length
1369                     << ") and fin: " << can_bundle_fin
1370                     << ", consumed: " << consumed;
1371       OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1372                                  consumed.fin_consumed);
1373       if (consumed.bytes_consumed < pending.length ||
1374           (can_bundle_fin && !consumed.fin_consumed)) {
1375         // Connection is write blocked.
1376         return;
1377       }
1378     }
1379   }
1380 }
1381 
MaybeSetTtl(QuicTime::Delta ttl)1382 bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1383   if (is_static_) {
1384     QUIC_BUG << "Cannot set TTL of a static stream.";
1385     return false;
1386   }
1387   if (deadline_.IsInitialized()) {
1388     QUIC_DLOG(WARNING) << "Deadline has already been set.";
1389     return false;
1390   }
1391   QuicTime now = session()->connection()->clock()->ApproximateNow();
1392   deadline_ = now + ttl;
1393   return true;
1394 }
1395 
HasDeadlinePassed() const1396 bool QuicStream::HasDeadlinePassed() const {
1397   if (!deadline_.IsInitialized()) {
1398     // No deadline has been set.
1399     return false;
1400   }
1401   QuicTime now = session()->connection()->clock()->ApproximateNow();
1402   if (now < deadline_) {
1403     return false;
1404   }
1405   // TTL expired.
1406   QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1407   return true;
1408 }
1409 
OnDeadlinePassed()1410 void QuicStream::OnDeadlinePassed() {
1411   Reset(QUIC_STREAM_TTL_EXPIRED);
1412 }
1413 
IsFlowControlBlocked() const1414 bool QuicStream::IsFlowControlBlocked() const {
1415   if (!flow_controller_.has_value()) {
1416     QUIC_BUG << "Trying to access non-existent flow controller.";
1417     return false;
1418   }
1419   return flow_controller_->IsBlocked();
1420 }
1421 
highest_received_byte_offset() const1422 QuicStreamOffset QuicStream::highest_received_byte_offset() const {
1423   if (!flow_controller_.has_value()) {
1424     QUIC_BUG << "Trying to access non-existent flow controller.";
1425     return 0;
1426   }
1427   return flow_controller_->highest_received_byte_offset();
1428 }
1429 
UpdateReceiveWindowSize(QuicStreamOffset size)1430 void QuicStream::UpdateReceiveWindowSize(QuicStreamOffset size) {
1431   if (!flow_controller_.has_value()) {
1432     QUIC_BUG << "Trying to access non-existent flow controller.";
1433     return;
1434   }
1435   flow_controller_->UpdateReceiveWindowSize(size);
1436 }
1437 
1438 // static
CalculateDefaultPriority(const QuicSession * session)1439 spdy::SpdyStreamPrecedence QuicStream::CalculateDefaultPriority(
1440     const QuicSession* session) {
1441   if (VersionUsesHttp3(session->transport_version())) {
1442     return spdy::SpdyStreamPrecedence(kDefaultUrgency);
1443   }
1444 
1445   if (session->use_http2_priority_write_scheduler()) {
1446     return spdy::SpdyStreamPrecedence(0, spdy::kHttp2DefaultStreamWeight,
1447                                       false);
1448   }
1449 
1450   return spdy::SpdyStreamPrecedence(QuicStream::kDefaultPriority);
1451 }
1452 
1453 }  // namespace quic
1454