1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <limits.h>
22
23 #include <grpc/support/log.h>
24
25 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
26 #include "src/core/ext/transport/chttp2/transport/context_list.h"
27 #include "src/core/ext/transport/chttp2/transport/internal.h"
28 #include "src/core/lib/compression/stream_compression.h"
29 #include "src/core/lib/debug/stats.h"
30 #include "src/core/lib/profiling/timers.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/transport/http2_errors.h"
33
add_to_write_list(grpc_chttp2_write_cb ** list,grpc_chttp2_write_cb * cb)34 static void add_to_write_list(grpc_chttp2_write_cb** list,
35 grpc_chttp2_write_cb* cb) {
36 cb->next = *list;
37 *list = cb;
38 }
39
finish_write_cb(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb * cb,grpc_error_handle error)40 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
41 grpc_chttp2_write_cb* cb, grpc_error_handle error) {
42 grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
43 "finish_write_cb");
44 cb->next = t->write_cb_pool;
45 t->write_cb_pool = cb;
46 }
47
maybe_initiate_ping(grpc_chttp2_transport * t)48 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
49 grpc_chttp2_ping_queue* pq = &t->ping_queue;
50 if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
51 /* no ping needed: wait */
52 return;
53 }
54 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
55 /* ping already in-flight: wait */
56 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
57 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
58 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
59 gpr_log(GPR_INFO, "%s: Ping delayed [%s]: already pinging",
60 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str());
61 }
62 return;
63 }
64 if (t->ping_state.pings_before_data_required == 0 &&
65 t->ping_policy.max_pings_without_data != 0) {
66 /* need to receive something of substance before sending a ping again */
67 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
68 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
69 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
70 gpr_log(GPR_INFO, "%s: Ping delayed [%s]: too many recent pings: %d/%d",
71 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
72 t->ping_state.pings_before_data_required,
73 t->ping_policy.max_pings_without_data);
74 }
75 return;
76 }
77 // InvalidateNow to avoid getting stuck re-initializing the ping timer
78 // in a loop while draining the currently-held combiner. Also see
79 // https://github.com/grpc/grpc/issues/26079.
80 grpc_core::ExecCtx::Get()->InvalidateNow();
81 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
82
83 grpc_millis next_allowed_ping_interval =
84 (t->keepalive_permit_without_calls == 0 &&
85 grpc_chttp2_stream_map_size(&t->stream_map) == 0)
86 ? 7200 * GPR_MS_PER_SEC
87 : (GPR_MS_PER_SEC); /* A second is added to deal with network delays
88 and timing imprecision */
89 grpc_millis next_allowed_ping =
90 t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
91
92 if (next_allowed_ping > now) {
93 /* not enough elapsed time between successive pings */
94 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
95 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
96 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
97 gpr_log(GPR_INFO,
98 "%s: Ping delayed [%s]: not enough time elapsed since last ping. "
99 " Last ping %f: Next ping %f: Now %f",
100 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
101 static_cast<double>(t->ping_state.last_ping_sent_time),
102 static_cast<double>(next_allowed_ping), static_cast<double>(now));
103 }
104 if (!t->ping_state.is_delayed_ping_timer_set) {
105 t->ping_state.is_delayed_ping_timer_set = true;
106 GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
107 GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
108 grpc_chttp2_retry_initiate_ping, t,
109 grpc_schedule_on_exec_ctx);
110 grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
111 &t->retry_initiate_ping_locked);
112 }
113 return;
114 }
115
116 pq->inflight_id = t->ping_ctr;
117 t->ping_ctr++;
118 grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
119 &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
120 grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
121 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
122 grpc_slice_buffer_add(&t->outbuf,
123 grpc_chttp2_ping_create(false, pq->inflight_id));
124 GRPC_STATS_INC_HTTP2_PINGS_SENT();
125 t->ping_state.last_ping_sent_time = now;
126 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
127 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
128 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
129 gpr_log(GPR_INFO, "%s: Ping sent [%s]: %d/%d",
130 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
131 t->ping_state.pings_before_data_required,
132 t->ping_policy.max_pings_without_data);
133 }
134 t->ping_state.pings_before_data_required -=
135 (t->ping_state.pings_before_data_required != 0);
136 }
137
update_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int64_t send_bytes,grpc_chttp2_write_cb ** list,int64_t * ctr,grpc_error_handle error)138 static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
139 int64_t send_bytes, grpc_chttp2_write_cb** list,
140 int64_t* ctr, grpc_error_handle error) {
141 bool sched_any = false;
142 grpc_chttp2_write_cb* cb = *list;
143 *list = nullptr;
144 *ctr += send_bytes;
145 while (cb) {
146 grpc_chttp2_write_cb* next = cb->next;
147 if (cb->call_at_byte <= *ctr) {
148 sched_any = true;
149 finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
150 } else {
151 add_to_write_list(list, cb);
152 }
153 cb = next;
154 }
155 GRPC_ERROR_UNREF(error);
156 return sched_any;
157 }
158
report_stall(grpc_chttp2_transport * t,grpc_chttp2_stream * s,const char * staller)159 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
160 const char* staller) {
161 if (GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace)) {
162 gpr_log(
163 GPR_DEBUG,
164 "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
165 "to happen in a healthy program that is not seeing flow control stalls."
166 " However, if you know that there are unwanted stalls, here is some "
167 "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
168 ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
169 ":s_win=%d:s_delta=%" PRId64 "]",
170 t->peer_string.c_str(), t, s->id, staller,
171 s->flow_controlled_buffer.length,
172 s->stream_compression_method ==
173 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
174 ? 0
175 : s->compressed_data_buffer.length,
176 s->flow_controlled_bytes_flowed,
177 t->settings[GRPC_ACKED_SETTINGS]
178 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
179 t->flow_control->remote_window(),
180 static_cast<uint32_t>(std::max(
181 int64_t(0),
182 s->flow_control->remote_window_delta() +
183 static_cast<int64_t>(
184 t->settings[GRPC_PEER_SETTINGS]
185 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]))),
186 s->flow_control->remote_window_delta());
187 }
188 }
189
190 /* How many bytes would we like to put on the wire during a single syscall */
target_write_size(grpc_chttp2_transport *)191 static uint32_t target_write_size(grpc_chttp2_transport* /*t*/) {
192 return 1024 * 1024;
193 }
194
195 // Returns true if initial_metadata contains only default headers.
is_default_initial_metadata(grpc_metadata_batch * initial_metadata)196 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
197 return initial_metadata->default_count() ==
198 initial_metadata->non_deadline_count();
199 }
200
201 namespace {
202 class StreamWriteContext;
203
204 class WriteContext {
205 public:
WriteContext(grpc_chttp2_transport * t)206 explicit WriteContext(grpc_chttp2_transport* t) : t_(t) {
207 GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
208 GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0);
209 }
210
211 // TODO(ctiller): make this the destructor
FlushStats()212 void FlushStats() {
213 GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
214 initial_metadata_writes_);
215 GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
216 GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
217 trailing_metadata_writes_);
218 GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
219 }
220
FlushSettings()221 void FlushSettings() {
222 if (t_->dirtied_local_settings && !t_->sent_local_settings) {
223 grpc_slice_buffer_add(
224 &t_->outbuf, grpc_chttp2_settings_create(
225 t_->settings[GRPC_SENT_SETTINGS],
226 t_->settings[GRPC_LOCAL_SETTINGS],
227 t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
228 t_->force_send_settings = false;
229 t_->dirtied_local_settings = false;
230 t_->sent_local_settings = true;
231 GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
232 }
233 }
234
FlushQueuedBuffers()235 void FlushQueuedBuffers() {
236 /* simple writes are queued to qbuf, and flushed here */
237 grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
238 t_->num_pending_induced_frames = 0;
239 GPR_ASSERT(t_->qbuf.count == 0);
240 }
241
FlushWindowUpdates()242 void FlushWindowUpdates() {
243 uint32_t transport_announce =
244 t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
245 if (transport_announce) {
246 grpc_transport_one_way_stats throwaway_stats;
247 grpc_slice_buffer_add(
248 &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
249 &throwaway_stats));
250 grpc_chttp2_reset_ping_clock(t_);
251 }
252 }
253
FlushPingAcks()254 void FlushPingAcks() {
255 for (size_t i = 0; i < t_->ping_ack_count; i++) {
256 grpc_slice_buffer_add(&t_->outbuf,
257 grpc_chttp2_ping_create(true, t_->ping_acks[i]));
258 }
259 t_->ping_ack_count = 0;
260 }
261
EnactHpackSettings()262 void EnactHpackSettings() {
263 t_->hpack_compressor.SetMaxTableSize(
264 t_->settings[GRPC_PEER_SETTINGS]
265 [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
266 }
267
UpdateStreamsNoLongerStalled()268 void UpdateStreamsNoLongerStalled() {
269 grpc_chttp2_stream* s;
270 while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
271 if (t_->closed_with_error == GRPC_ERROR_NONE &&
272 grpc_chttp2_list_add_writable_stream(t_, s)) {
273 if (!s->refcount->refs.RefIfNonZero()) {
274 grpc_chttp2_list_remove_writable_stream(t_, s);
275 }
276 }
277 }
278 }
279
NextStream()280 grpc_chttp2_stream* NextStream() {
281 if (t_->outbuf.length > target_write_size(t_)) {
282 result_.partial = true;
283 return nullptr;
284 }
285
286 grpc_chttp2_stream* s;
287 if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
288 return nullptr;
289 }
290
291 return s;
292 }
293
IncInitialMetadataWrites()294 void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
IncWindowUpdateWrites()295 void IncWindowUpdateWrites() { ++flow_control_writes_; }
IncMessageWrites()296 void IncMessageWrites() { ++message_writes_; }
IncTrailingMetadataWrites()297 void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
298
NoteScheduledResults()299 void NoteScheduledResults() { result_.early_results_scheduled = true; }
300
transport() const301 grpc_chttp2_transport* transport() const { return t_; }
302
Result()303 grpc_chttp2_begin_write_result Result() {
304 result_.writing = t_->outbuf.count > 0;
305 return result_;
306 }
307
308 private:
309 grpc_chttp2_transport* const t_;
310
311 /* stats histogram counters: we increment these throughout this function,
312 and at the end publish to the central stats histograms */
313 int flow_control_writes_ = 0;
314 int initial_metadata_writes_ = 0;
315 int trailing_metadata_writes_ = 0;
316 int message_writes_ = 0;
317 grpc_chttp2_begin_write_result result_ = {false, false, false};
318 };
319
320 class DataSendContext {
321 public:
DataSendContext(WriteContext * write_context,grpc_chttp2_transport * t,grpc_chttp2_stream * s)322 DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
323 grpc_chttp2_stream* s)
324 : write_context_(write_context),
325 t_(t),
326 s_(s),
327 sending_bytes_before_(s_->sending_bytes) {}
328
stream_remote_window() const329 uint32_t stream_remote_window() const {
330 return static_cast<uint32_t>(std::max(
331 int64_t(0),
332 s_->flow_control->remote_window_delta() +
333 static_cast<int64_t>(
334 t_->settings[GRPC_PEER_SETTINGS]
335 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE])));
336 }
337
max_outgoing() const338 uint32_t max_outgoing() const {
339 return static_cast<uint32_t>(std::min(
340 t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
341 static_cast<uint32_t>(std::min(int64_t(stream_remote_window()),
342 t_->flow_control->remote_window()))));
343 }
344
AnyOutgoing() const345 bool AnyOutgoing() const { return max_outgoing() > 0; }
346
FlushUncompressedBytes()347 void FlushUncompressedBytes() {
348 uint32_t send_bytes = static_cast<uint32_t>(
349 std::min(size_t(max_outgoing()), s_->flow_controlled_buffer.length));
350 is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
351 s_->fetching_send_message == nullptr &&
352 s_->send_trailing_metadata != nullptr &&
353 s_->send_trailing_metadata->empty();
354 grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
355 is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
356 s_->flow_control->SentData(send_bytes);
357 s_->sending_bytes += send_bytes;
358 }
359
FlushCompressedBytes()360 void FlushCompressedBytes() {
361 GPR_DEBUG_ASSERT(s_->stream_compression_method !=
362 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
363
364 uint32_t send_bytes = static_cast<uint32_t>(
365 std::min(size_t(max_outgoing()), s_->compressed_data_buffer.length));
366 bool is_last_data_frame =
367 (send_bytes == s_->compressed_data_buffer.length &&
368 s_->flow_controlled_buffer.length == 0 &&
369 s_->fetching_send_message == nullptr);
370 if (is_last_data_frame && s_->send_trailing_metadata != nullptr &&
371 s_->stream_compression_ctx != nullptr) {
372 if (GPR_UNLIKELY(!grpc_stream_compress(
373 s_->stream_compression_ctx, &s_->flow_controlled_buffer,
374 &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
375 GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) {
376 gpr_log(GPR_ERROR, "Stream compression failed.");
377 }
378 grpc_stream_compression_context_destroy(s_->stream_compression_ctx);
379 s_->stream_compression_ctx = nullptr;
380 /* After finish, bytes in s->compressed_data_buffer may be
381 * more than max_outgoing. Start another round of the current
382 * while loop so that send_bytes and is_last_data_frame are
383 * recalculated. */
384 return;
385 }
386 is_last_frame_ = is_last_data_frame &&
387 s_->send_trailing_metadata != nullptr &&
388 s_->send_trailing_metadata->empty();
389 grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
390 is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
391 s_->flow_control->SentData(send_bytes);
392 if (s_->compressed_data_buffer.length == 0) {
393 s_->sending_bytes += s_->uncompressed_data_size;
394 }
395 }
396
CompressMoreBytes()397 void CompressMoreBytes() {
398 GPR_DEBUG_ASSERT(s_->stream_compression_method !=
399 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
400
401 if (s_->stream_compression_ctx == nullptr) {
402 s_->stream_compression_ctx =
403 grpc_stream_compression_context_create(s_->stream_compression_method);
404 }
405 s_->uncompressed_data_size = s_->flow_controlled_buffer.length;
406 if (GPR_UNLIKELY(!grpc_stream_compress(
407 s_->stream_compression_ctx, &s_->flow_controlled_buffer,
408 &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
409 GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) {
410 gpr_log(GPR_ERROR, "Stream compression failed.");
411 }
412 }
413
is_last_frame() const414 bool is_last_frame() const { return is_last_frame_; }
415
CallCallbacks()416 void CallCallbacks() {
417 if (update_list(
418 t_, s_,
419 static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
420 &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
421 GRPC_ERROR_NONE)) {
422 write_context_->NoteScheduledResults();
423 }
424 }
425
426 private:
427 WriteContext* write_context_;
428 grpc_chttp2_transport* t_;
429 grpc_chttp2_stream* s_;
430 const size_t sending_bytes_before_;
431 bool is_last_frame_ = false;
432 };
433
434 class StreamWriteContext {
435 public:
StreamWriteContext(WriteContext * write_context,grpc_chttp2_stream * s)436 StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
437 : write_context_(write_context), t_(write_context->transport()), s_(s) {
438 GRPC_CHTTP2_IF_TRACING(
439 gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
440 t_->is_client ? "CLIENT" : "SERVER", s->id,
441 s->sent_initial_metadata, s->send_initial_metadata != nullptr,
442 (int)(s->flow_control->local_window_delta() -
443 s->flow_control->announced_window_delta())));
444 }
445
FlushInitialMetadata()446 void FlushInitialMetadata() {
447 /* send initial metadata if it's available */
448 if (s_->sent_initial_metadata) return;
449 if (s_->send_initial_metadata == nullptr) return;
450
451 // We skip this on the server side if there is no custom initial
452 // metadata, there are no messages to send, and we are also sending
453 // trailing metadata. This results in a Trailers-Only response,
454 // which is required for retries, as per:
455 // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
456 if (!t_->is_client && s_->fetching_send_message == nullptr &&
457 s_->flow_controlled_buffer.length == 0 &&
458 compressed_data_buffer_len() == 0 &&
459 s_->send_trailing_metadata != nullptr &&
460 is_default_initial_metadata(s_->send_initial_metadata)) {
461 ConvertInitialMetadataToTrailingMetadata();
462 } else {
463 t_->hpack_compressor.EncodeHeaders(
464 grpc_core::HPackCompressor::EncodeHeaderOptions{
465 s_->id, // stream_id
466 false, // is_eof
467 t_->settings
468 [GRPC_PEER_SETTINGS]
469 [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
470 0, // use_true_binary_metadata
471 t_->settings
472 [GRPC_PEER_SETTINGS]
473 [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size
474 &s_->stats.outgoing // stats
475 },
476 *s_->send_initial_metadata, &t_->outbuf);
477 grpc_chttp2_reset_ping_clock(t_);
478 write_context_->IncInitialMetadataWrites();
479 }
480
481 s_->send_initial_metadata = nullptr;
482 s_->sent_initial_metadata = true;
483 write_context_->NoteScheduledResults();
484 grpc_chttp2_complete_closure_step(
485 t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
486 "send_initial_metadata_finished");
487 }
488
compressed_data_buffer_len()489 size_t compressed_data_buffer_len() {
490 return s_->stream_compression_method ==
491 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
492 ? 0
493 : s_->compressed_data_buffer.length;
494 }
495
FlushWindowUpdates()496 void FlushWindowUpdates() {
497 /* send any window updates */
498 const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
499 if (stream_announce == 0) return;
500
501 grpc_slice_buffer_add(
502 &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
503 &s_->stats.outgoing));
504 grpc_chttp2_reset_ping_clock(t_);
505 write_context_->IncWindowUpdateWrites();
506 }
507
FlushData()508 void FlushData() {
509 if (!s_->sent_initial_metadata) return;
510
511 if (s_->flow_controlled_buffer.length == 0 &&
512 compressed_data_buffer_len() == 0) {
513 return; // early out: nothing to do
514 }
515
516 DataSendContext data_send_context(write_context_, t_, s_);
517
518 if (!data_send_context.AnyOutgoing()) {
519 if (t_->flow_control->remote_window() <= 0) {
520 report_stall(t_, s_, "transport");
521 grpc_chttp2_list_add_stalled_by_transport(t_, s_);
522 } else if (data_send_context.stream_remote_window() <= 0) {
523 report_stall(t_, s_, "stream");
524 grpc_chttp2_list_add_stalled_by_stream(t_, s_);
525 }
526 return; // early out: nothing to do
527 }
528
529 if (s_->stream_compression_method ==
530 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
531 while (s_->flow_controlled_buffer.length > 0 &&
532 data_send_context.max_outgoing() > 0) {
533 data_send_context.FlushUncompressedBytes();
534 }
535 } else {
536 while ((s_->flow_controlled_buffer.length > 0 ||
537 s_->compressed_data_buffer.length > 0) &&
538 data_send_context.max_outgoing() > 0) {
539 if (s_->compressed_data_buffer.length > 0) {
540 data_send_context.FlushCompressedBytes();
541 } else {
542 data_send_context.CompressMoreBytes();
543 }
544 }
545 }
546 grpc_chttp2_reset_ping_clock(t_);
547 if (data_send_context.is_last_frame()) {
548 SentLastFrame();
549 }
550 data_send_context.CallCallbacks();
551 stream_became_writable_ = true;
552 if (s_->flow_controlled_buffer.length > 0 ||
553 compressed_data_buffer_len() > 0) {
554 GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
555 grpc_chttp2_list_add_writable_stream(t_, s_);
556 }
557 write_context_->IncMessageWrites();
558 }
559
FlushTrailingMetadata()560 void FlushTrailingMetadata() {
561 if (!s_->sent_initial_metadata) return;
562
563 if (s_->send_trailing_metadata == nullptr) return;
564 if (s_->fetching_send_message != nullptr) return;
565 if (s_->flow_controlled_buffer.length != 0) return;
566 if (compressed_data_buffer_len() != 0) return;
567
568 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
569 if (s_->send_trailing_metadata->empty()) {
570 grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
571 &s_->stats.outgoing, &t_->outbuf);
572 } else {
573 t_->hpack_compressor.EncodeHeaders(
574 grpc_core::HPackCompressor::EncodeHeaderOptions{
575 s_->id, true,
576 t_->settings
577 [GRPC_PEER_SETTINGS]
578 [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
579 0,
580 t_->settings[GRPC_PEER_SETTINGS]
581 [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
582 &s_->stats.outgoing},
583 grpc_core::ConcatMetadata(
584 grpc_core::MetadataArray(
585 extra_headers_for_trailing_metadata_,
586 num_extra_headers_for_trailing_metadata_),
587 *s_->send_trailing_metadata),
588 &t_->outbuf);
589 }
590 write_context_->IncTrailingMetadataWrites();
591 grpc_chttp2_reset_ping_clock(t_);
592 SentLastFrame();
593
594 write_context_->NoteScheduledResults();
595 grpc_chttp2_complete_closure_step(
596 t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
597 "send_trailing_metadata_finished");
598 }
599
stream_became_writable()600 bool stream_became_writable() { return stream_became_writable_; }
601
602 private:
ConvertInitialMetadataToTrailingMetadata()603 void ConvertInitialMetadataToTrailingMetadata() {
604 GRPC_CHTTP2_IF_TRACING(
605 gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
606 // When sending Trailers-Only, we need to move the :status and
607 // content-type headers to the trailers.
608 if (s_->send_initial_metadata->legacy_index()->named.status != nullptr) {
609 extra_headers_for_trailing_metadata_
610 [num_extra_headers_for_trailing_metadata_++] =
611 &s_->send_initial_metadata->legacy_index()->named.status->md;
612 }
613 if (s_->send_initial_metadata->legacy_index()->named.content_type !=
614 nullptr) {
615 extra_headers_for_trailing_metadata_
616 [num_extra_headers_for_trailing_metadata_++] =
617 &s_->send_initial_metadata->legacy_index()
618 ->named.content_type->md;
619 }
620 }
621
SentLastFrame()622 void SentLastFrame() {
623 s_->send_trailing_metadata = nullptr;
624 if (s_->sent_trailing_metadata_op) {
625 *s_->sent_trailing_metadata_op = true;
626 s_->sent_trailing_metadata_op = nullptr;
627 }
628 s_->sent_trailing_metadata = true;
629 s_->eos_sent = true;
630
631 if (!t_->is_client && !s_->read_closed) {
632 grpc_slice_buffer_add(
633 &t_->outbuf, grpc_chttp2_rst_stream_create(
634 s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
635 }
636 grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
637 GRPC_ERROR_NONE);
638 }
639
640 WriteContext* const write_context_;
641 grpc_chttp2_transport* const t_;
642 grpc_chttp2_stream* const s_;
643 bool stream_became_writable_ = false;
644 grpc_mdelem* extra_headers_for_trailing_metadata_[2];
645 size_t num_extra_headers_for_trailing_metadata_ = 0;
646 };
647 } // namespace
648
grpc_chttp2_begin_write(grpc_chttp2_transport * t)649 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
650 grpc_chttp2_transport* t) {
651 WriteContext ctx(t);
652 ctx.FlushSettings();
653 ctx.FlushPingAcks();
654 ctx.FlushQueuedBuffers();
655 ctx.EnactHpackSettings();
656
657 if (t->flow_control->remote_window() > 0) {
658 ctx.UpdateStreamsNoLongerStalled();
659 }
660
661 /* for each grpc_chttp2_stream that's become writable, frame it's data
662 (according to available window sizes) and add to the output buffer */
663 while (grpc_chttp2_stream* s = ctx.NextStream()) {
664 StreamWriteContext stream_ctx(&ctx, s);
665 size_t orig_len = t->outbuf.length;
666 stream_ctx.FlushInitialMetadata();
667 stream_ctx.FlushWindowUpdates();
668 stream_ctx.FlushData();
669 stream_ctx.FlushTrailingMetadata();
670 if (t->outbuf.length > orig_len) {
671 /* Add this stream to the list of the contexts to be traced at TCP */
672 s->byte_counter += t->outbuf.length - orig_len;
673 if (s->traced && grpc_endpoint_can_track_err(t->ep)) {
674 grpc_core::ContextList::Append(&t->cl, s);
675 }
676 }
677 if (stream_ctx.stream_became_writable()) {
678 if (!grpc_chttp2_list_add_writing_stream(t, s)) {
679 /* already in writing list: drop ref */
680 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
681 } else {
682 /* ref will be dropped at end of write */
683 }
684 } else {
685 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
686 }
687 }
688
689 ctx.FlushWindowUpdates();
690
691 maybe_initiate_ping(t);
692
693 return ctx.Result();
694 }
695
grpc_chttp2_end_write(grpc_chttp2_transport * t,grpc_error_handle error)696 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
697 GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
698 grpc_chttp2_stream* s;
699
700 if (t->channelz_socket != nullptr) {
701 t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
702 }
703 t->num_messages_in_next_write = 0;
704
705 while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
706 if (s->sending_bytes != 0) {
707 update_list(t, s, static_cast<int64_t>(s->sending_bytes),
708 &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
709 GRPC_ERROR_REF(error));
710 s->sending_bytes = 0;
711 }
712 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
713 }
714 grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
715 GRPC_ERROR_UNREF(error);
716 }
717