1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/third_party/quiche/src/quic/core/quic_stream.h"
6
7 #include <limits>
8 #include <string>
9
10 #include "absl/strings/string_view.h"
11 #include "absl/types/optional.h"
12 #include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
13 #include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
14 #include "net/third_party/quiche/src/quic/core/quic_session.h"
15 #include "net/third_party/quiche/src/quic/core/quic_types.h"
16 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
17 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
18 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
19 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
20 #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
21 #include "net/third_party/quiche/src/common/platform/api/quiche_str_cat.h"
22
23 using spdy::SpdyPriority;
24
25 namespace quic {
26
27 #define ENDPOINT \
28 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
29
30 namespace {
31
DefaultFlowControlWindow(ParsedQuicVersion version)32 QuicByteCount DefaultFlowControlWindow(ParsedQuicVersion version) {
33 if (!version.AllowsLowFlowControlLimits()) {
34 return kDefaultFlowControlSendWindow;
35 }
36 return 0;
37 }
38
GetInitialStreamFlowControlWindowToSend(QuicSession * session,QuicStreamId stream_id)39 QuicByteCount GetInitialStreamFlowControlWindowToSend(QuicSession* session,
40 QuicStreamId stream_id) {
41 ParsedQuicVersion version = session->connection()->version();
42 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
43 return session->config()->GetInitialStreamFlowControlWindowToSend();
44 }
45
46 // Unidirectional streams (v99 only).
47 if (VersionHasIetfQuicFrames(version.transport_version) &&
48 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
49 return session->config()
50 ->GetInitialMaxStreamDataBytesUnidirectionalToSend();
51 }
52
53 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
54 session->perspective())) {
55 return session->config()
56 ->GetInitialMaxStreamDataBytesOutgoingBidirectionalToSend();
57 }
58
59 return session->config()
60 ->GetInitialMaxStreamDataBytesIncomingBidirectionalToSend();
61 }
62
GetReceivedFlowControlWindow(QuicSession * session,QuicStreamId stream_id)63 QuicByteCount GetReceivedFlowControlWindow(QuicSession* session,
64 QuicStreamId stream_id) {
65 ParsedQuicVersion version = session->connection()->version();
66 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
67 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
68 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
69 }
70
71 return DefaultFlowControlWindow(version);
72 }
73
74 // Unidirectional streams (v99 only).
75 if (VersionHasIetfQuicFrames(version.transport_version) &&
76 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
77 if (session->config()
78 ->HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
79 return session->config()
80 ->ReceivedInitialMaxStreamDataBytesUnidirectional();
81 }
82
83 return DefaultFlowControlWindow(version);
84 }
85
86 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
87 session->perspective())) {
88 if (session->config()
89 ->HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
90 return session->config()
91 ->ReceivedInitialMaxStreamDataBytesOutgoingBidirectional();
92 }
93
94 return DefaultFlowControlWindow(version);
95 }
96
97 if (session->config()
98 ->HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
99 return session->config()
100 ->ReceivedInitialMaxStreamDataBytesIncomingBidirectional();
101 }
102
103 return DefaultFlowControlWindow(version);
104 }
105
106 } // namespace
107
108 // static
109 const SpdyPriority QuicStream::kDefaultPriority;
110
111 // static
112 const int QuicStream::kDefaultUrgency;
113
PendingStream(QuicStreamId id,QuicSession * session)114 PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
115 : id_(id),
116 stream_delegate_(session),
117 stream_bytes_read_(0),
118 fin_received_(false),
119 connection_flow_controller_(session->flow_controller()),
120 flow_controller_(session,
121 id,
122 /*is_connection_flow_controller*/ false,
123 GetReceivedFlowControlWindow(session, id),
124 GetInitialStreamFlowControlWindowToSend(session, id),
125 kStreamReceiveWindowLimit,
126 session->flow_controller()->auto_tune_receive_window(),
127 session->flow_controller()),
128 sequencer_(this) {}
129
OnDataAvailable()130 void PendingStream::OnDataAvailable() {
131 // Data should be kept in the sequencer so that
132 // QuicSession::ProcessPendingStream() can read it.
133 }
134
OnFinRead()135 void PendingStream::OnFinRead() {
136 DCHECK(sequencer_.IsClosed());
137 }
138
AddBytesConsumed(QuicByteCount bytes)139 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
140 // It will be called when the metadata of the stream is consumed.
141 flow_controller_.AddBytesConsumed(bytes);
142 connection_flow_controller_->AddBytesConsumed(bytes);
143 }
144
Reset(QuicRstStreamErrorCode)145 void PendingStream::Reset(QuicRstStreamErrorCode /*error*/) {
146 // Currently PendingStream is only read-unidirectional. It shouldn't send
147 // Reset.
148 QUIC_NOTREACHED();
149 }
150
OnUnrecoverableError(QuicErrorCode error,const std::string & details)151 void PendingStream::OnUnrecoverableError(QuicErrorCode error,
152 const std::string& details) {
153 stream_delegate_->OnStreamError(error, details);
154 }
155
id() const156 QuicStreamId PendingStream::id() const {
157 return id_;
158 }
159
OnStreamFrame(const QuicStreamFrame & frame)160 void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
161 DCHECK_EQ(frame.stream_id, id_);
162
163 bool is_stream_too_long =
164 (frame.offset > kMaxStreamLength) ||
165 (kMaxStreamLength - frame.offset < frame.data_length);
166 if (is_stream_too_long) {
167 // Close connection if stream becomes too long.
168 QUIC_PEER_BUG
169 << "Receive stream frame reaches max stream length. frame offset "
170 << frame.offset << " length " << frame.data_length;
171 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
172 "Peer sends more data than allowed on this stream.");
173 return;
174 }
175
176 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
177 OnUnrecoverableError(
178 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
179 quiche::QuicheStrCat(
180 "Stream ", id_,
181 " received data with offset: ", frame.offset + frame.data_length,
182 ", which is beyond close offset: ", sequencer()->close_offset()));
183 return;
184 }
185
186 if (frame.fin) {
187 fin_received_ = true;
188 }
189
190 // This count includes duplicate data received.
191 QuicByteCount frame_payload_size = frame.data_length;
192 stream_bytes_read_ += frame_payload_size;
193
194 // Flow control is interested in tracking highest received offset.
195 // Only interested in received frames that carry data.
196 if (frame_payload_size > 0 &&
197 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
198 // As the highest received offset has changed, check to see if this is a
199 // violation of flow control.
200 if (flow_controller_.FlowControlViolation() ||
201 connection_flow_controller_->FlowControlViolation()) {
202 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
203 "Flow control violation after increasing offset");
204 return;
205 }
206 }
207
208 sequencer_.OnStreamFrame(frame);
209 }
210
OnRstStreamFrame(const QuicRstStreamFrame & frame)211 void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
212 DCHECK_EQ(frame.stream_id, id_);
213
214 if (frame.byte_offset > kMaxStreamLength) {
215 // Peer are not suppose to write bytes more than maxium allowed.
216 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
217 "Reset frame stream offset overflow.");
218 return;
219 }
220
221 const QuicStreamOffset kMaxOffset =
222 std::numeric_limits<QuicStreamOffset>::max();
223 if (sequencer()->close_offset() != kMaxOffset &&
224 frame.byte_offset != sequencer()->close_offset()) {
225 OnUnrecoverableError(
226 QUIC_STREAM_MULTIPLE_OFFSET,
227 quiche::QuicheStrCat("Stream ", id_,
228 " received new final offset: ", frame.byte_offset,
229 ", which is different from close offset: ",
230 sequencer()->close_offset()));
231 return;
232 }
233
234 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
235 if (flow_controller_.FlowControlViolation() ||
236 connection_flow_controller_->FlowControlViolation()) {
237 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
238 "Flow control violation after increasing offset");
239 return;
240 }
241 }
242
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)243 bool PendingStream::MaybeIncreaseHighestReceivedOffset(
244 QuicStreamOffset new_offset) {
245 uint64_t increment =
246 new_offset - flow_controller_.highest_received_byte_offset();
247 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
248 return false;
249 }
250
251 // If |new_offset| increased the stream flow controller's highest received
252 // offset, increase the connection flow controller's value by the incremental
253 // difference.
254 connection_flow_controller_->UpdateHighestReceivedOffset(
255 connection_flow_controller_->highest_received_byte_offset() + increment);
256 return true;
257 }
258
MarkConsumed(QuicByteCount num_bytes)259 void PendingStream::MarkConsumed(QuicByteCount num_bytes) {
260 sequencer_.MarkConsumed(num_bytes);
261 }
262
StopReading()263 void PendingStream::StopReading() {
264 QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
265 sequencer_.StopReading();
266 }
267
QuicStream(PendingStream * pending,QuicSession * session,StreamType type,bool is_static)268 QuicStream::QuicStream(PendingStream* pending,
269 QuicSession* session,
270 StreamType type,
271 bool is_static)
272 : QuicStream(pending->id_,
273 session,
274 std::move(pending->sequencer_),
275 is_static,
276 type,
277 pending->stream_bytes_read_,
278 pending->fin_received_,
279 std::move(pending->flow_controller_),
280 pending->connection_flow_controller_) {
281 sequencer_.set_stream(this);
282 }
283
284 namespace {
285
FlowController(QuicStreamId id,QuicSession * session,StreamType type)286 absl::optional<QuicFlowController> FlowController(QuicStreamId id,
287 QuicSession* session,
288 StreamType type) {
289 if (type == CRYPTO) {
290 // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
291 // it is using crypto frames instead of stream frames. The QuicCryptoStream
292 // doesn't have any flow control in that case, so we don't create a
293 // QuicFlowController for it.
294 return absl::nullopt;
295 }
296 return QuicFlowController(
297 session, id,
298 /*is_connection_flow_controller*/ false,
299 GetReceivedFlowControlWindow(session, id),
300 GetInitialStreamFlowControlWindowToSend(session, id),
301 kStreamReceiveWindowLimit,
302 session->flow_controller()->auto_tune_receive_window(),
303 session->flow_controller());
304 }
305
306 } // namespace
307
QuicStream(QuicStreamId id,QuicSession * session,bool is_static,StreamType type)308 QuicStream::QuicStream(QuicStreamId id,
309 QuicSession* session,
310 bool is_static,
311 StreamType type)
312 : QuicStream(id,
313 session,
314 QuicStreamSequencer(this),
315 is_static,
316 type,
317 0,
318 false,
319 FlowController(id, session, type),
320 session->flow_controller()) {}
321
QuicStream(QuicStreamId id,QuicSession * session,QuicStreamSequencer sequencer,bool is_static,StreamType type,uint64_t stream_bytes_read,bool fin_received,absl::optional<QuicFlowController> flow_controller,QuicFlowController * connection_flow_controller)322 QuicStream::QuicStream(QuicStreamId id,
323 QuicSession* session,
324 QuicStreamSequencer sequencer,
325 bool is_static,
326 StreamType type,
327 uint64_t stream_bytes_read,
328 bool fin_received,
329 absl::optional<QuicFlowController> flow_controller,
330 QuicFlowController* connection_flow_controller)
331 : sequencer_(std::move(sequencer)),
332 id_(id),
333 session_(session),
334 stream_delegate_(session),
335 precedence_(CalculateDefaultPriority(session)),
336 stream_bytes_read_(stream_bytes_read),
337 stream_error_(QUIC_STREAM_NO_ERROR),
338 connection_error_(QUIC_NO_ERROR),
339 read_side_closed_(false),
340 write_side_closed_(false),
341 fin_buffered_(false),
342 fin_sent_(false),
343 fin_outstanding_(false),
344 fin_lost_(false),
345 fin_received_(fin_received),
346 rst_sent_(false),
347 rst_received_(false),
348 stop_sending_sent_(false),
349 flow_controller_(std::move(flow_controller)),
350 connection_flow_controller_(connection_flow_controller),
351 stream_contributes_to_connection_flow_control_(true),
352 busy_counter_(0),
353 add_random_padding_after_fin_(false),
354 send_buffer_(
355 session->connection()->helper()->GetStreamSendBufferAllocator()),
356 buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)),
357 is_static_(is_static),
358 deadline_(QuicTime::Zero()),
359 was_draining_(false),
360 type_(VersionHasIetfQuicFrames(session->transport_version()) &&
361 type != CRYPTO
362 ? QuicUtils::GetStreamType(id_,
363 session->perspective(),
364 session->IsIncomingStream(id_),
365 session->version())
366 : type),
367 creation_time_(session->connection()->clock()->ApproximateNow()),
368 perspective_(session->perspective()) {
369 if (type_ == WRITE_UNIDIRECTIONAL) {
370 fin_received_ = true;
371 CloseReadSide();
372 } else if (type_ == READ_UNIDIRECTIONAL) {
373 fin_sent_ = true;
374 CloseWriteSide();
375 }
376 if (type_ != CRYPTO) {
377 stream_delegate_->RegisterStreamPriority(id, is_static_, precedence_);
378 }
379 }
380
~QuicStream()381 QuicStream::~QuicStream() {
382 if (session_ != nullptr && IsWaitingForAcks()) {
383 QUIC_DVLOG(1)
384 << ENDPOINT << "Stream " << id_
385 << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
386 << send_buffer_.stream_bytes_outstanding()
387 << ", fin_outstanding: " << fin_outstanding_;
388 }
389 if (stream_delegate_ != nullptr && type_ != CRYPTO) {
390 stream_delegate_->UnregisterStreamPriority(id(), is_static_);
391 }
392 }
393
OnStreamFrame(const QuicStreamFrame & frame)394 void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
395 DCHECK_EQ(frame.stream_id, id_);
396
397 DCHECK(!(read_side_closed_ && write_side_closed_));
398
399 if (frame.fin && is_static_) {
400 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
401 "Attempt to close a static stream");
402 return;
403 }
404
405 if (type_ == WRITE_UNIDIRECTIONAL) {
406 OnUnrecoverableError(QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
407 "Data received on write unidirectional stream");
408 return;
409 }
410
411 bool is_stream_too_long =
412 (frame.offset > kMaxStreamLength) ||
413 (kMaxStreamLength - frame.offset < frame.data_length);
414 if (is_stream_too_long) {
415 // Close connection if stream becomes too long.
416 QUIC_PEER_BUG << "Receive stream frame on stream " << id_
417 << " reaches max stream length. frame offset " << frame.offset
418 << " length " << frame.data_length << ". "
419 << sequencer_.DebugString();
420 OnUnrecoverableError(
421 QUIC_STREAM_LENGTH_OVERFLOW,
422 quiche::QuicheStrCat("Peer sends more data than allowed on stream ",
423 id_, ". frame: offset = ", frame.offset,
424 ", length = ", frame.data_length, ". ",
425 sequencer_.DebugString()));
426 return;
427 }
428
429 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
430 OnUnrecoverableError(
431 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
432 quiche::QuicheStrCat(
433 "Stream ", id_,
434 " received data with offset: ", frame.offset + frame.data_length,
435 ", which is beyond close offset: ", sequencer_.close_offset()));
436 return;
437 }
438
439 if (frame.fin && !fin_received_) {
440 fin_received_ = true;
441 if (fin_sent_) {
442 DCHECK(!was_draining_);
443 session_->StreamDraining(id_,
444 /*unidirectional=*/type_ != BIDIRECTIONAL);
445 was_draining_ = true;
446 }
447 }
448
449 if (read_side_closed_) {
450 QUIC_DLOG(INFO)
451 << ENDPOINT << "Stream " << frame.stream_id
452 << " is closed for reading. Ignoring newly received stream data.";
453 // The subclass does not want to read data: blackhole the data.
454 return;
455 }
456
457 // This count includes duplicate data received.
458 QuicByteCount frame_payload_size = frame.data_length;
459 stream_bytes_read_ += frame_payload_size;
460
461 // Flow control is interested in tracking highest received offset.
462 // Only interested in received frames that carry data.
463 if (frame_payload_size > 0 &&
464 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
465 // As the highest received offset has changed, check to see if this is a
466 // violation of flow control.
467 QUIC_BUG_IF(!flow_controller_.has_value())
468 << ENDPOINT << "OnStreamFrame called on stream without flow control";
469 if ((flow_controller_.has_value() &&
470 flow_controller_->FlowControlViolation()) ||
471 connection_flow_controller_->FlowControlViolation()) {
472 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
473 "Flow control violation after increasing offset");
474 return;
475 }
476 }
477
478 sequencer_.OnStreamFrame(frame);
479 }
480
OnStopSending(QuicRstStreamErrorCode code)481 bool QuicStream::OnStopSending(QuicRstStreamErrorCode code) {
482 // Do not reset the stream if all data has been sent and acknowledged.
483 if (write_side_closed() && !IsWaitingForAcks()) {
484 QUIC_DVLOG(1) << ENDPOINT
485 << "Ignoring STOP_SENDING for a write closed stream, id: "
486 << id_;
487 return false;
488 }
489
490 if (is_static_) {
491 QUIC_DVLOG(1) << ENDPOINT
492 << "Received STOP_SENDING for a static stream, id: " << id_
493 << " Closing connection";
494 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
495 "Received STOP_SENDING for a static stream");
496 return false;
497 }
498
499 stream_error_ = code;
500
501 if (session()->split_up_send_rst()) {
502 QUIC_RELOADABLE_FLAG_COUNT_N(quic_split_up_send_rst_2, 1, 3);
503 MaybeSendRstStream(code);
504 } else {
505 session()->SendRstStream(id(), code, stream_bytes_written(),
506 /*send_rst_only = */ true);
507 rst_sent_ = true;
508 CloseWriteSide();
509 }
510 return true;
511 }
512
num_frames_received() const513 int QuicStream::num_frames_received() const {
514 return sequencer_.num_frames_received();
515 }
516
num_duplicate_frames_received() const517 int QuicStream::num_duplicate_frames_received() const {
518 return sequencer_.num_duplicate_frames_received();
519 }
520
OnStreamReset(const QuicRstStreamFrame & frame)521 void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
522 rst_received_ = true;
523 if (frame.byte_offset > kMaxStreamLength) {
524 // Peer are not suppose to write bytes more than maxium allowed.
525 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
526 "Reset frame stream offset overflow.");
527 return;
528 }
529
530 const QuicStreamOffset kMaxOffset =
531 std::numeric_limits<QuicStreamOffset>::max();
532 if (sequencer()->close_offset() != kMaxOffset &&
533 frame.byte_offset != sequencer()->close_offset()) {
534 OnUnrecoverableError(
535 QUIC_STREAM_MULTIPLE_OFFSET,
536 quiche::QuicheStrCat("Stream ", id_,
537 " received new final offset: ", frame.byte_offset,
538 ", which is different from close offset: ",
539 sequencer_.close_offset()));
540 return;
541 }
542
543 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
544 QUIC_BUG_IF(!flow_controller_.has_value())
545 << ENDPOINT << "OnStreamReset called on stream without flow control";
546 if ((flow_controller_.has_value() &&
547 flow_controller_->FlowControlViolation()) ||
548 connection_flow_controller_->FlowControlViolation()) {
549 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
550 "Flow control violation after increasing offset");
551 return;
552 }
553
554 stream_error_ = frame.error_code;
555 // Google QUIC closes both sides of the stream in response to a
556 // RESET_STREAM, IETF QUIC closes only the read side.
557 if (!VersionHasIetfQuicFrames(transport_version())) {
558 CloseWriteSide();
559 }
560 CloseReadSide();
561 }
562
OnConnectionClosed(QuicErrorCode error,ConnectionCloseSource)563 void QuicStream::OnConnectionClosed(QuicErrorCode error,
564 ConnectionCloseSource /*source*/) {
565 if (read_side_closed_ && write_side_closed_) {
566 return;
567 }
568 if (error != QUIC_NO_ERROR) {
569 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
570 connection_error_ = error;
571 }
572
573 CloseWriteSide();
574 CloseReadSide();
575 }
576
OnFinRead()577 void QuicStream::OnFinRead() {
578 DCHECK(sequencer_.IsClosed());
579 // OnFinRead can be called due to a FIN flag in a headers block, so there may
580 // have been no OnStreamFrame call with a FIN in the frame.
581 fin_received_ = true;
582 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
583 // stream will be destroyed by CloseReadSide, so don't need to call
584 // StreamDraining.
585 CloseReadSide();
586 }
587
SetFinSent()588 void QuicStream::SetFinSent() {
589 DCHECK(!VersionUsesHttp3(transport_version()));
590 fin_sent_ = true;
591 }
592
Reset(QuicRstStreamErrorCode error)593 void QuicStream::Reset(QuicRstStreamErrorCode error) {
594 stream_error_ = error;
595 if (session()->split_up_send_rst()) {
596 QUIC_RELOADABLE_FLAG_COUNT_N(quic_split_up_send_rst_2, 2, 3);
597 QuicConnection::ScopedPacketFlusher flusher(session()->connection());
598 MaybeSendStopSending(error);
599 MaybeSendRstStream(error);
600 } else {
601 session()->SendRstStream(id(), error, stream_bytes_written(),
602 /*send_rst_only = */ false);
603 rst_sent_ = true;
604 }
605 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
606 session()->MaybeCloseZombieStream(id_);
607 return;
608 }
609 if (!session()->split_up_send_rst()) {
610 CloseReadSide();
611 CloseWriteSide();
612 }
613 }
614
OnUnrecoverableError(QuicErrorCode error,const std::string & details)615 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
616 const std::string& details) {
617 stream_delegate_->OnStreamError(error, details);
618 }
619
precedence() const620 const spdy::SpdyStreamPrecedence& QuicStream::precedence() const {
621 return precedence_;
622 }
623
SetPriority(const spdy::SpdyStreamPrecedence & precedence)624 void QuicStream::SetPriority(const spdy::SpdyStreamPrecedence& precedence) {
625 precedence_ = precedence;
626
627 MaybeSendPriorityUpdateFrame();
628
629 stream_delegate_->UpdateStreamPriority(id(), precedence);
630 }
631
WriteOrBufferData(absl::string_view data,bool fin,QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener)632 void QuicStream::WriteOrBufferData(
633 absl::string_view data,
634 bool fin,
635 QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
636 if (session()->use_write_or_buffer_data_at_level()) {
637 QUIC_BUG_IF(QuicUtils::IsCryptoStreamId(transport_version(), id_))
638 << ENDPOINT
639 << "WriteOrBufferData is used to send application data, use "
640 "WriteOrBufferDataAtLevel to send crypto data.";
641 return WriteOrBufferDataAtLevel(
642 data, fin, session()->GetEncryptionLevelToSendApplicationData(),
643 ack_listener);
644 }
645 return WriteOrBufferDataInner(data, fin, absl::nullopt, ack_listener);
646 }
647
WriteOrBufferDataInner(absl::string_view data,bool fin,absl::optional<EncryptionLevel> level,QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener)648 void QuicStream::WriteOrBufferDataInner(
649 absl::string_view data,
650 bool fin,
651 absl::optional<EncryptionLevel> level,
652 QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
653 if (data.empty() && !fin) {
654 QUIC_BUG << "data.empty() && !fin";
655 return;
656 }
657
658 if (fin_buffered_) {
659 QUIC_BUG << "Fin already buffered";
660 return;
661 }
662 if (write_side_closed_) {
663 QUIC_DLOG(ERROR) << ENDPOINT
664 << "Attempt to write when the write side is closed";
665 if (type_ == READ_UNIDIRECTIONAL) {
666 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
667 "Try to send data on read unidirectional stream");
668 }
669 return;
670 }
671
672 fin_buffered_ = fin;
673
674 bool had_buffered_data = HasBufferedData();
675 // Do not respect buffered data upper limit as WriteOrBufferData guarantees
676 // all data to be consumed.
677 if (data.length() > 0) {
678 struct iovec iov(QuicUtils::MakeIovec(data));
679 QuicStreamOffset offset = send_buffer_.stream_offset();
680 if (kMaxStreamLength - offset < data.length()) {
681 QUIC_BUG << "Write too many data via stream " << id_;
682 OnUnrecoverableError(
683 QUIC_STREAM_LENGTH_OVERFLOW,
684 quiche::QuicheStrCat("Write too many data via stream ", id_));
685 return;
686 }
687 send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
688 OnDataBuffered(offset, data.length(), ack_listener);
689 }
690 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
691 // Write data if there is no buffered data before.
692 WriteBufferedData(level);
693 }
694 }
695
WriteOrBufferDataAtLevel(absl::string_view data,bool fin,EncryptionLevel level,QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener)696 void QuicStream::WriteOrBufferDataAtLevel(
697 absl::string_view data,
698 bool fin,
699 EncryptionLevel level,
700 QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
701 DCHECK(session()->use_write_or_buffer_data_at_level());
702 QUIC_RELOADABLE_FLAG_COUNT(quic_use_write_or_buffer_data_at_level);
703 return WriteOrBufferDataInner(data, fin, level, ack_listener);
704 }
705
OnCanWrite()706 void QuicStream::OnCanWrite() {
707 if (HasDeadlinePassed()) {
708 OnDeadlinePassed();
709 return;
710 }
711 if (HasPendingRetransmission()) {
712 WritePendingRetransmission();
713 // Exit early to allow other streams to write pending retransmissions if
714 // any.
715 return;
716 }
717
718 if (write_side_closed_) {
719 QUIC_DLOG(ERROR)
720 << ENDPOINT << "Stream " << id()
721 << " attempting to write new data when the write side is closed";
722 return;
723 }
724 if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
725 absl::optional<EncryptionLevel> send_level = absl::nullopt;
726 if (session()->use_write_or_buffer_data_at_level()) {
727 send_level = session()->GetEncryptionLevelToSendApplicationData();
728 }
729 WriteBufferedData(send_level);
730 }
731 if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
732 // Notify upper layer to write new data when buffered data size is below
733 // low water mark.
734 OnCanWriteNewData();
735 }
736 }
737
MaybeSendBlocked()738 void QuicStream::MaybeSendBlocked() {
739 if (!flow_controller_.has_value()) {
740 QUIC_BUG << ENDPOINT
741 << "MaybeSendBlocked called on stream without flow control";
742 return;
743 }
744 if (flow_controller_->ShouldSendBlocked()) {
745 session_->SendBlocked(id_);
746 }
747 if (!stream_contributes_to_connection_flow_control_) {
748 return;
749 }
750 if (connection_flow_controller_->ShouldSendBlocked()) {
751 session_->SendBlocked(QuicUtils::GetInvalidStreamId(transport_version()));
752 }
753 // If the stream is blocked by connection-level flow control but not by
754 // stream-level flow control, add the stream to the write blocked list so that
755 // the stream will be given a chance to write when a connection-level
756 // WINDOW_UPDATE arrives.
757 if (connection_flow_controller_->IsBlocked() &&
758 !flow_controller_->IsBlocked()) {
759 session_->MarkConnectionLevelWriteBlocked(id());
760 }
761 }
762
WriteMemSlices(QuicMemSliceSpan span,bool fin)763 QuicConsumedData QuicStream::WriteMemSlices(QuicMemSliceSpan span, bool fin) {
764 QuicConsumedData consumed_data(0, false);
765 if (span.empty() && !fin) {
766 QUIC_BUG << "span.empty() && !fin";
767 return consumed_data;
768 }
769
770 if (fin_buffered_) {
771 QUIC_BUG << "Fin already buffered";
772 return consumed_data;
773 }
774
775 if (write_side_closed_) {
776 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
777 << " attempting to write when the write side is closed";
778 if (type_ == READ_UNIDIRECTIONAL) {
779 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
780 "Try to send data on read unidirectional stream");
781 }
782 return consumed_data;
783 }
784
785 bool had_buffered_data = HasBufferedData();
786 if (CanWriteNewData() || span.empty()) {
787 consumed_data.fin_consumed = fin;
788 if (!span.empty()) {
789 // Buffer all data if buffered data size is below limit.
790 QuicStreamOffset offset = send_buffer_.stream_offset();
791 consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
792 if (offset > send_buffer_.stream_offset() ||
793 kMaxStreamLength < send_buffer_.stream_offset()) {
794 QUIC_BUG << "Write too many data via stream " << id_;
795 OnUnrecoverableError(
796 QUIC_STREAM_LENGTH_OVERFLOW,
797 quiche::QuicheStrCat("Write too many data via stream ", id_));
798 return consumed_data;
799 }
800 OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
801 }
802 }
803 fin_buffered_ = consumed_data.fin_consumed;
804
805 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
806 // Write data if there is no buffered data before.
807 absl::optional<EncryptionLevel> send_level = absl::nullopt;
808 if (session()->use_write_or_buffer_data_at_level()) {
809 send_level = session()->GetEncryptionLevelToSendApplicationData();
810 }
811 WriteBufferedData(send_level);
812 }
813
814 return consumed_data;
815 }
816
HasPendingRetransmission() const817 bool QuicStream::HasPendingRetransmission() const {
818 return send_buffer_.HasPendingRetransmission() || fin_lost_;
819 }
820
IsStreamFrameOutstanding(QuicStreamOffset offset,QuicByteCount data_length,bool fin) const821 bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
822 QuicByteCount data_length,
823 bool fin) const {
824 return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
825 (fin && fin_outstanding_);
826 }
827
CloseReadSide()828 void QuicStream::CloseReadSide() {
829 if (read_side_closed_) {
830 return;
831 }
832 QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
833
834 read_side_closed_ = true;
835 sequencer_.ReleaseBuffer();
836
837 if (write_side_closed_) {
838 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
839 session_->OnStreamClosed(id());
840 OnClose();
841 }
842 }
843
CloseWriteSide()844 void QuicStream::CloseWriteSide() {
845 if (write_side_closed_) {
846 return;
847 }
848 QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
849
850 write_side_closed_ = true;
851 if (read_side_closed_) {
852 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
853 session_->OnStreamClosed(id());
854 OnClose();
855 }
856 }
857
MaybeSendStopSending(QuicRstStreamErrorCode error)858 void QuicStream::MaybeSendStopSending(QuicRstStreamErrorCode error) {
859 DCHECK(session()->split_up_send_rst());
860 if (stop_sending_sent_) {
861 return;
862 }
863
864 if (!session()->version().UsesHttp3() && error != QUIC_STREAM_NO_ERROR) {
865 // In gQUIC, RST with error closes both read and write side.
866 return;
867 }
868
869 if (session()->version().UsesHttp3()) {
870 session()->MaybeSendStopSendingFrame(id(), error);
871 } else {
872 DCHECK_EQ(QUIC_STREAM_NO_ERROR, error);
873 session()->MaybeSendRstStreamFrame(id(), QUIC_STREAM_NO_ERROR,
874 stream_bytes_written());
875 }
876 stop_sending_sent_ = true;
877 CloseReadSide();
878 }
879
MaybeSendRstStream(QuicRstStreamErrorCode error)880 void QuicStream::MaybeSendRstStream(QuicRstStreamErrorCode error) {
881 DCHECK(session()->split_up_send_rst());
882 if (rst_sent_) {
883 return;
884 }
885
886 if (!session()->version().UsesHttp3()) {
887 QUIC_BUG_IF(error == QUIC_STREAM_NO_ERROR);
888 stop_sending_sent_ = true;
889 CloseReadSide();
890 }
891 session()->MaybeSendRstStreamFrame(id(), error, stream_bytes_written());
892 rst_sent_ = true;
893 CloseWriteSide();
894 }
895
HasBufferedData() const896 bool QuicStream::HasBufferedData() const {
897 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
898 return send_buffer_.stream_offset() > stream_bytes_written();
899 }
900
transport_version() const901 QuicTransportVersion QuicStream::transport_version() const {
902 return session_->transport_version();
903 }
904
handshake_protocol() const905 HandshakeProtocol QuicStream::handshake_protocol() const {
906 return session_->connection()->version().handshake_protocol;
907 }
908
StopReading()909 void QuicStream::StopReading() {
910 QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
911 sequencer_.StopReading();
912 }
913
OnClose()914 void QuicStream::OnClose() {
915 DCHECK(read_side_closed_ && write_side_closed_);
916
917 if (!fin_sent_ && !rst_sent_) {
918 if (!session()->split_up_send_rst()) {
919 // For flow control accounting, tell the peer how many bytes have been
920 // written on this stream before termination. Done here if needed, using a
921 // RST_STREAM frame.
922 QUIC_DLOG(INFO) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
923 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
924 stream_bytes_written(),
925 /*send_rst_only = */ false);
926 session_->MaybeCloseZombieStream(id_);
927 rst_sent_ = true;
928 } else {
929 QUIC_RELOADABLE_FLAG_COUNT_N(quic_split_up_send_rst_2, 3, 3);
930 QUIC_BUG_IF(session()->connection()->connected() &&
931 session()->version().UsesHttp3())
932 << "The stream should've already sent RST in response to "
933 "STOP_SENDING";
934 MaybeSendRstStream(QUIC_RST_ACKNOWLEDGEMENT);
935 session_->MaybeCloseZombieStream(id_);
936 }
937 }
938
939 if (!flow_controller_.has_value() ||
940 flow_controller_->FlowControlViolation() ||
941 connection_flow_controller_->FlowControlViolation()) {
942 return;
943 }
944 // The stream is being closed and will not process any further incoming bytes.
945 // As there may be more bytes in flight, to ensure that both endpoints have
946 // the same connection level flow control state, mark all unreceived or
947 // buffered bytes as consumed.
948 QuicByteCount bytes_to_consume =
949 flow_controller_->highest_received_byte_offset() -
950 flow_controller_->bytes_consumed();
951 AddBytesConsumed(bytes_to_consume);
952 }
953
OnWindowUpdateFrame(const QuicWindowUpdateFrame & frame)954 void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
955 if (type_ == READ_UNIDIRECTIONAL) {
956 OnUnrecoverableError(
957 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
958 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
959 return;
960 }
961
962 if (!flow_controller_.has_value()) {
963 QUIC_BUG << ENDPOINT
964 << "OnWindowUpdateFrame called on stream without flow control";
965 return;
966 }
967
968 if (flow_controller_->UpdateSendWindowOffset(frame.max_data)) {
969 // Let session unblock this stream.
970 session_->MarkConnectionLevelWriteBlocked(id_);
971 }
972 }
973
MaybeIncreaseHighestReceivedOffset(QuicStreamOffset new_offset)974 bool QuicStream::MaybeIncreaseHighestReceivedOffset(
975 QuicStreamOffset new_offset) {
976 if (!flow_controller_.has_value()) {
977 QUIC_BUG << ENDPOINT
978 << "MaybeIncreaseHighestReceivedOffset called on stream without "
979 "flow control";
980 return false;
981 }
982 uint64_t increment =
983 new_offset - flow_controller_->highest_received_byte_offset();
984 if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
985 return false;
986 }
987
988 // If |new_offset| increased the stream flow controller's highest received
989 // offset, increase the connection flow controller's value by the incremental
990 // difference.
991 if (stream_contributes_to_connection_flow_control_) {
992 connection_flow_controller_->UpdateHighestReceivedOffset(
993 connection_flow_controller_->highest_received_byte_offset() +
994 increment);
995 }
996 return true;
997 }
998
AddBytesSent(QuicByteCount bytes)999 void QuicStream::AddBytesSent(QuicByteCount bytes) {
1000 if (!flow_controller_.has_value()) {
1001 QUIC_BUG << ENDPOINT
1002 << "AddBytesSent called on stream without flow control";
1003 return;
1004 }
1005 flow_controller_->AddBytesSent(bytes);
1006 if (stream_contributes_to_connection_flow_control_) {
1007 connection_flow_controller_->AddBytesSent(bytes);
1008 }
1009 }
1010
AddBytesConsumed(QuicByteCount bytes)1011 void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
1012 if (type_ == CRYPTO) {
1013 // A stream with type CRYPTO has no flow control, so there's nothing this
1014 // function needs to do. This function still gets called by the
1015 // QuicStreamSequencers used by QuicCryptoStream.
1016 return;
1017 }
1018 if (!flow_controller_.has_value()) {
1019 QUIC_BUG
1020 << ENDPOINT
1021 << "AddBytesConsumed called on non-crypto stream without flow control";
1022 return;
1023 }
1024 // Only adjust stream level flow controller if still reading.
1025 if (!read_side_closed_) {
1026 flow_controller_->AddBytesConsumed(bytes);
1027 }
1028
1029 if (stream_contributes_to_connection_flow_control_) {
1030 connection_flow_controller_->AddBytesConsumed(bytes);
1031 }
1032 }
1033
MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,bool was_zero_rtt_rejected)1034 bool QuicStream::MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,
1035 bool was_zero_rtt_rejected) {
1036 if (!flow_controller_.has_value()) {
1037 QUIC_BUG << ENDPOINT
1038 << "ConfigSendWindowOffset called on stream without flow control";
1039 return false;
1040 }
1041
1042 // The validation code below is for QUIC with TLS only.
1043 if (new_offset < flow_controller_->send_window_offset()) {
1044 DCHECK(session()->version().UsesTls());
1045 if (was_zero_rtt_rejected && new_offset < flow_controller_->bytes_sent()) {
1046 // The client is given flow control window lower than what's written in
1047 // 0-RTT. This QUIC implementation is unable to retransmit them.
1048 QUIC_BUG_IF(perspective_ == Perspective::IS_SERVER)
1049 << "Server streams' flow control should never be configured twice.";
1050 OnUnrecoverableError(
1051 QUIC_ZERO_RTT_UNRETRANSMITTABLE,
1052 quiche::QuicheStrCat(
1053 "Server rejected 0-RTT, aborting because new stream max data ",
1054 new_offset, " for stream ", id_, " is less than currently used: ",
1055 flow_controller_->bytes_sent()));
1056 return false;
1057 } else if (session()->version().AllowsLowFlowControlLimits()) {
1058 // In IETF QUIC, if the client receives flow control limit lower than what
1059 // was resumed from 0-RTT, depending on 0-RTT status, it's either the
1060 // peer's fault or our implementation's fault.
1061 QUIC_BUG_IF(perspective_ == Perspective::IS_SERVER)
1062 << "Server streams' flow control should never be configured twice.";
1063 OnUnrecoverableError(
1064 was_zero_rtt_rejected ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
1065 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
1066 quiche::QuicheStrCat(
1067 was_zero_rtt_rejected ? "Server rejected 0-RTT, aborting because "
1068 : "",
1069 "new stream max data ", new_offset, " decreases current limit: ",
1070 flow_controller_->send_window_offset()));
1071 return false;
1072 }
1073 }
1074
1075 if (flow_controller_->UpdateSendWindowOffset(new_offset)) {
1076 // Let session unblock this stream.
1077 session_->MarkConnectionLevelWriteBlocked(id_);
1078 }
1079 return true;
1080 }
1081
AddRandomPaddingAfterFin()1082 void QuicStream::AddRandomPaddingAfterFin() {
1083 add_random_padding_after_fin_ = true;
1084 }
1085
OnStreamFrameAcked(QuicStreamOffset offset,QuicByteCount data_length,bool fin_acked,QuicTime::Delta,QuicTime,QuicByteCount * newly_acked_length)1086 bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
1087 QuicByteCount data_length,
1088 bool fin_acked,
1089 QuicTime::Delta /*ack_delay_time*/,
1090 QuicTime /*receive_timestamp*/,
1091 QuicByteCount* newly_acked_length) {
1092 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
1093 << "[" << offset << ", " << offset + data_length << "]"
1094 << " fin = " << fin_acked;
1095 *newly_acked_length = 0;
1096 if (!send_buffer_.OnStreamDataAcked(offset, data_length,
1097 newly_acked_length)) {
1098 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
1099 return false;
1100 }
1101 if (!fin_sent_ && fin_acked) {
1102 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent fin.");
1103 return false;
1104 }
1105 // Indicates whether ack listener's OnPacketAcked should be called.
1106 const bool new_data_acked =
1107 *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
1108 if (fin_acked) {
1109 fin_outstanding_ = false;
1110 fin_lost_ = false;
1111 }
1112 if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
1113 session_->MaybeCloseZombieStream(id_);
1114 }
1115 return new_data_acked;
1116 }
1117
OnStreamFrameRetransmitted(QuicStreamOffset offset,QuicByteCount data_length,bool fin_retransmitted)1118 void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
1119 QuicByteCount data_length,
1120 bool fin_retransmitted) {
1121 send_buffer_.OnStreamDataRetransmitted(offset, data_length);
1122 if (fin_retransmitted) {
1123 fin_lost_ = false;
1124 }
1125 }
1126
OnStreamFrameLost(QuicStreamOffset offset,QuicByteCount data_length,bool fin_lost)1127 void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
1128 QuicByteCount data_length,
1129 bool fin_lost) {
1130 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
1131 << "[" << offset << ", " << offset + data_length << "]"
1132 << " fin = " << fin_lost;
1133 if (data_length > 0) {
1134 send_buffer_.OnStreamDataLost(offset, data_length);
1135 }
1136 if (fin_lost && fin_outstanding_) {
1137 fin_lost_ = true;
1138 }
1139 }
1140
RetransmitStreamData(QuicStreamOffset offset,QuicByteCount data_length,bool fin,TransmissionType type)1141 bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
1142 QuicByteCount data_length,
1143 bool fin,
1144 TransmissionType type) {
1145 DCHECK(type == PTO_RETRANSMISSION || type == RTO_RETRANSMISSION ||
1146 type == TLP_RETRANSMISSION || type == PROBING_RETRANSMISSION);
1147 if (HasDeadlinePassed()) {
1148 OnDeadlinePassed();
1149 return true;
1150 }
1151 QuicIntervalSet<QuicStreamOffset> retransmission(offset,
1152 offset + data_length);
1153 retransmission.Difference(bytes_acked());
1154 bool retransmit_fin = fin && fin_outstanding_;
1155 if (retransmission.Empty() && !retransmit_fin) {
1156 return true;
1157 }
1158 absl::optional<EncryptionLevel> send_level = absl::nullopt;
1159 if (session()->use_write_or_buffer_data_at_level()) {
1160 send_level = session()->GetEncryptionLevelToSendApplicationData();
1161 }
1162 QuicConsumedData consumed(0, false);
1163 for (const auto& interval : retransmission) {
1164 QuicStreamOffset retransmission_offset = interval.min();
1165 QuicByteCount retransmission_length = interval.max() - interval.min();
1166 const bool can_bundle_fin =
1167 retransmit_fin && (retransmission_offset + retransmission_length ==
1168 stream_bytes_written());
1169 consumed = stream_delegate_->WritevData(
1170 id_, retransmission_length, retransmission_offset,
1171 can_bundle_fin ? FIN : NO_FIN, type, send_level);
1172 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1173 << " is forced to retransmit stream data ["
1174 << retransmission_offset << ", "
1175 << retransmission_offset + retransmission_length
1176 << ") and fin: " << can_bundle_fin
1177 << ", consumed: " << consumed;
1178 OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
1179 consumed.fin_consumed);
1180 if (can_bundle_fin) {
1181 retransmit_fin = !consumed.fin_consumed;
1182 }
1183 if (consumed.bytes_consumed < retransmission_length ||
1184 (can_bundle_fin && !consumed.fin_consumed)) {
1185 // Connection is write blocked.
1186 return false;
1187 }
1188 }
1189 if (retransmit_fin) {
1190 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1191 << " retransmits fin only frame.";
1192 consumed = stream_delegate_->WritevData(id_, 0, stream_bytes_written(), FIN,
1193 type, send_level);
1194 if (!consumed.fin_consumed) {
1195 return false;
1196 }
1197 }
1198 return true;
1199 }
1200
IsWaitingForAcks() const1201 bool QuicStream::IsWaitingForAcks() const {
1202 return (!rst_sent_ || stream_error_ == QUIC_STREAM_NO_ERROR) &&
1203 (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
1204 }
1205
ReadableBytes() const1206 QuicByteCount QuicStream::ReadableBytes() const {
1207 return sequencer_.ReadableBytes();
1208 }
1209
WriteStreamData(QuicStreamOffset offset,QuicByteCount data_length,QuicDataWriter * writer)1210 bool QuicStream::WriteStreamData(QuicStreamOffset offset,
1211 QuicByteCount data_length,
1212 QuicDataWriter* writer) {
1213 DCHECK_LT(0u, data_length);
1214 QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
1215 << offset << " length " << data_length;
1216 return send_buffer_.WriteStreamData(offset, data_length, writer);
1217 }
1218
WriteBufferedData(absl::optional<EncryptionLevel> level)1219 void QuicStream::WriteBufferedData(absl::optional<EncryptionLevel> level) {
1220 DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
1221
1222 if (session_->ShouldYield(id())) {
1223 session_->MarkConnectionLevelWriteBlocked(id());
1224 return;
1225 }
1226
1227 // Size of buffered data.
1228 QuicByteCount write_length = BufferedDataBytes();
1229
1230 // A FIN with zero data payload should not be flow control blocked.
1231 bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
1232
1233 bool fin = fin_buffered_;
1234
1235 // How much data flow control permits to be written.
1236 QuicByteCount send_window;
1237 if (flow_controller_.has_value()) {
1238 send_window = flow_controller_->SendWindowSize();
1239 } else {
1240 send_window = std::numeric_limits<QuicByteCount>::max();
1241 QUIC_BUG << ENDPOINT
1242 << "WriteBufferedData called on stream without flow control";
1243 }
1244 if (stream_contributes_to_connection_flow_control_) {
1245 send_window =
1246 std::min(send_window, connection_flow_controller_->SendWindowSize());
1247 }
1248
1249 if (send_window == 0 && !fin_with_zero_data) {
1250 // Quick return if nothing can be sent.
1251 MaybeSendBlocked();
1252 return;
1253 }
1254
1255 if (write_length > send_window) {
1256 // Don't send the FIN unless all the data will be sent.
1257 fin = false;
1258
1259 // Writing more data would be a violation of flow control.
1260 write_length = send_window;
1261 QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
1262 << write_length << " due to flow control";
1263 }
1264
1265 StreamSendingState state = fin ? FIN : NO_FIN;
1266 if (fin && add_random_padding_after_fin_) {
1267 state = FIN_AND_PADDING;
1268 }
1269 QuicConsumedData consumed_data =
1270 stream_delegate_->WritevData(id(), write_length, stream_bytes_written(),
1271 state, NOT_RETRANSMISSION, level);
1272
1273 OnStreamDataConsumed(consumed_data.bytes_consumed);
1274
1275 AddBytesSent(consumed_data.bytes_consumed);
1276 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
1277 << stream_bytes_written() << " bytes "
1278 << " and has buffered data " << BufferedDataBytes() << " bytes."
1279 << " fin is sent: " << consumed_data.fin_consumed
1280 << " fin is buffered: " << fin_buffered_;
1281
1282 // The write may have generated a write error causing this stream to be
1283 // closed. If so, simply return without marking the stream write blocked.
1284 if (write_side_closed_) {
1285 return;
1286 }
1287
1288 if (consumed_data.bytes_consumed == write_length) {
1289 if (!fin_with_zero_data) {
1290 MaybeSendBlocked();
1291 }
1292 if (fin && consumed_data.fin_consumed) {
1293 DCHECK(!fin_sent_);
1294 fin_sent_ = true;
1295 fin_outstanding_ = true;
1296 if (fin_received_) {
1297 DCHECK(!was_draining_);
1298 session_->StreamDraining(id_,
1299 /*unidirectional=*/type_ != BIDIRECTIONAL);
1300 was_draining_ = true;
1301 }
1302 CloseWriteSide();
1303 } else if (fin && !consumed_data.fin_consumed) {
1304 session_->MarkConnectionLevelWriteBlocked(id());
1305 }
1306 } else {
1307 session_->MarkConnectionLevelWriteBlocked(id());
1308 }
1309 if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1310 busy_counter_ = 0;
1311 }
1312 }
1313
BufferedDataBytes() const1314 uint64_t QuicStream::BufferedDataBytes() const {
1315 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1316 return send_buffer_.stream_offset() - stream_bytes_written();
1317 }
1318
CanWriteNewData() const1319 bool QuicStream::CanWriteNewData() const {
1320 return BufferedDataBytes() < buffered_data_threshold_;
1321 }
1322
CanWriteNewDataAfterData(QuicByteCount length) const1323 bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1324 return (BufferedDataBytes() + length) < buffered_data_threshold_;
1325 }
1326
stream_bytes_written() const1327 uint64_t QuicStream::stream_bytes_written() const {
1328 return send_buffer_.stream_bytes_written();
1329 }
1330
bytes_acked() const1331 const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1332 return send_buffer_.bytes_acked();
1333 }
1334
OnStreamDataConsumed(QuicByteCount bytes_consumed)1335 void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) {
1336 send_buffer_.OnStreamDataConsumed(bytes_consumed);
1337 }
1338
WritePendingRetransmission()1339 void QuicStream::WritePendingRetransmission() {
1340 while (HasPendingRetransmission()) {
1341 QuicConsumedData consumed(0, false);
1342 absl::optional<EncryptionLevel> send_level = absl::nullopt;
1343 if (session()->use_write_or_buffer_data_at_level()) {
1344 send_level = session()->GetEncryptionLevelToSendApplicationData();
1345 }
1346 if (!send_buffer_.HasPendingRetransmission()) {
1347 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1348 << " retransmits fin only frame.";
1349 consumed = stream_delegate_->WritevData(
1350 id_, 0, stream_bytes_written(), FIN, LOSS_RETRANSMISSION, send_level);
1351 fin_lost_ = !consumed.fin_consumed;
1352 if (fin_lost_) {
1353 // Connection is write blocked.
1354 return;
1355 }
1356 } else {
1357 StreamPendingRetransmission pending =
1358 send_buffer_.NextPendingRetransmission();
1359 // Determine whether the lost fin can be bundled with the data.
1360 const bool can_bundle_fin =
1361 fin_lost_ &&
1362 (pending.offset + pending.length == stream_bytes_written());
1363 consumed = stream_delegate_->WritevData(
1364 id_, pending.length, pending.offset, can_bundle_fin ? FIN : NO_FIN,
1365 LOSS_RETRANSMISSION, send_level);
1366 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1367 << " tries to retransmit stream data [" << pending.offset
1368 << ", " << pending.offset + pending.length
1369 << ") and fin: " << can_bundle_fin
1370 << ", consumed: " << consumed;
1371 OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1372 consumed.fin_consumed);
1373 if (consumed.bytes_consumed < pending.length ||
1374 (can_bundle_fin && !consumed.fin_consumed)) {
1375 // Connection is write blocked.
1376 return;
1377 }
1378 }
1379 }
1380 }
1381
MaybeSetTtl(QuicTime::Delta ttl)1382 bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1383 if (is_static_) {
1384 QUIC_BUG << "Cannot set TTL of a static stream.";
1385 return false;
1386 }
1387 if (deadline_.IsInitialized()) {
1388 QUIC_DLOG(WARNING) << "Deadline has already been set.";
1389 return false;
1390 }
1391 QuicTime now = session()->connection()->clock()->ApproximateNow();
1392 deadline_ = now + ttl;
1393 return true;
1394 }
1395
HasDeadlinePassed() const1396 bool QuicStream::HasDeadlinePassed() const {
1397 if (!deadline_.IsInitialized()) {
1398 // No deadline has been set.
1399 return false;
1400 }
1401 QuicTime now = session()->connection()->clock()->ApproximateNow();
1402 if (now < deadline_) {
1403 return false;
1404 }
1405 // TTL expired.
1406 QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1407 return true;
1408 }
1409
OnDeadlinePassed()1410 void QuicStream::OnDeadlinePassed() {
1411 Reset(QUIC_STREAM_TTL_EXPIRED);
1412 }
1413
IsFlowControlBlocked() const1414 bool QuicStream::IsFlowControlBlocked() const {
1415 if (!flow_controller_.has_value()) {
1416 QUIC_BUG << "Trying to access non-existent flow controller.";
1417 return false;
1418 }
1419 return flow_controller_->IsBlocked();
1420 }
1421
highest_received_byte_offset() const1422 QuicStreamOffset QuicStream::highest_received_byte_offset() const {
1423 if (!flow_controller_.has_value()) {
1424 QUIC_BUG << "Trying to access non-existent flow controller.";
1425 return 0;
1426 }
1427 return flow_controller_->highest_received_byte_offset();
1428 }
1429
UpdateReceiveWindowSize(QuicStreamOffset size)1430 void QuicStream::UpdateReceiveWindowSize(QuicStreamOffset size) {
1431 if (!flow_controller_.has_value()) {
1432 QUIC_BUG << "Trying to access non-existent flow controller.";
1433 return;
1434 }
1435 flow_controller_->UpdateReceiveWindowSize(size);
1436 }
1437
1438 // static
CalculateDefaultPriority(const QuicSession * session)1439 spdy::SpdyStreamPrecedence QuicStream::CalculateDefaultPriority(
1440 const QuicSession* session) {
1441 if (VersionUsesHttp3(session->transport_version())) {
1442 return spdy::SpdyStreamPrecedence(kDefaultUrgency);
1443 }
1444
1445 if (session->use_http2_priority_write_scheduler()) {
1446 return spdy::SpdyStreamPrecedence(0, spdy::kHttp2DefaultStreamWeight,
1447 false);
1448 }
1449
1450 return spdy::SpdyStreamPrecedence(QuicStream::kDefaultPriority);
1451 }
1452
1453 } // namespace quic
1454