1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <limits.h>
22 
23 #include <grpc/support/log.h>
24 
25 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
26 #include "src/core/ext/transport/chttp2/transport/context_list.h"
27 #include "src/core/ext/transport/chttp2/transport/internal.h"
28 #include "src/core/lib/compression/stream_compression.h"
29 #include "src/core/lib/debug/stats.h"
30 #include "src/core/lib/profiling/timers.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/transport/http2_errors.h"
33 
add_to_write_list(grpc_chttp2_write_cb ** list,grpc_chttp2_write_cb * cb)34 static void add_to_write_list(grpc_chttp2_write_cb** list,
35                               grpc_chttp2_write_cb* cb) {
36   cb->next = *list;
37   *list = cb;
38 }
39 
finish_write_cb(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb * cb,grpc_error_handle error)40 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
41                             grpc_chttp2_write_cb* cb, grpc_error_handle error) {
42   grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
43                                     "finish_write_cb");
44   cb->next = t->write_cb_pool;
45   t->write_cb_pool = cb;
46 }
47 
maybe_initiate_ping(grpc_chttp2_transport * t)48 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
49   grpc_chttp2_ping_queue* pq = &t->ping_queue;
50   if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
51     /* no ping needed: wait */
52     return;
53   }
54   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
55     /* ping already in-flight: wait */
56     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
57         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
58         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
59       gpr_log(GPR_INFO, "%s: Ping delayed [%s]: already pinging",
60               t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str());
61     }
62     return;
63   }
64   if (t->ping_state.pings_before_data_required == 0 &&
65       t->ping_policy.max_pings_without_data != 0) {
66     /* need to receive something of substance before sending a ping again */
67     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
68         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
69         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
70       gpr_log(GPR_INFO, "%s: Ping delayed [%s]: too many recent pings: %d/%d",
71               t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
72               t->ping_state.pings_before_data_required,
73               t->ping_policy.max_pings_without_data);
74     }
75     return;
76   }
77   // InvalidateNow to avoid getting stuck re-initializing the ping timer
78   // in a loop while draining the currently-held combiner. Also see
79   // https://github.com/grpc/grpc/issues/26079.
80   grpc_core::ExecCtx::Get()->InvalidateNow();
81   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
82 
83   grpc_millis next_allowed_ping_interval =
84       (t->keepalive_permit_without_calls == 0 &&
85        grpc_chttp2_stream_map_size(&t->stream_map) == 0)
86           ? 7200 * GPR_MS_PER_SEC
87           : (GPR_MS_PER_SEC); /* A second is added to deal with network delays
88                                  and timing imprecision */
89   grpc_millis next_allowed_ping =
90       t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
91 
92   if (next_allowed_ping > now) {
93     /* not enough elapsed time between successive pings */
94     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
95         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
96         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
97       gpr_log(GPR_INFO,
98               "%s: Ping delayed [%s]: not enough time elapsed since last ping. "
99               " Last ping %f: Next ping %f: Now %f",
100               t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
101               static_cast<double>(t->ping_state.last_ping_sent_time),
102               static_cast<double>(next_allowed_ping), static_cast<double>(now));
103     }
104     if (!t->ping_state.is_delayed_ping_timer_set) {
105       t->ping_state.is_delayed_ping_timer_set = true;
106       GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
107       GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
108                         grpc_chttp2_retry_initiate_ping, t,
109                         grpc_schedule_on_exec_ctx);
110       grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
111                       &t->retry_initiate_ping_locked);
112     }
113     return;
114   }
115 
116   pq->inflight_id = t->ping_ctr;
117   t->ping_ctr++;
118   grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
119                               &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
120   grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
121                          &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
122   grpc_slice_buffer_add(&t->outbuf,
123                         grpc_chttp2_ping_create(false, pq->inflight_id));
124   GRPC_STATS_INC_HTTP2_PINGS_SENT();
125   t->ping_state.last_ping_sent_time = now;
126   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
127       GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
128       GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
129     gpr_log(GPR_INFO, "%s: Ping sent [%s]: %d/%d",
130             t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
131             t->ping_state.pings_before_data_required,
132             t->ping_policy.max_pings_without_data);
133   }
134   t->ping_state.pings_before_data_required -=
135       (t->ping_state.pings_before_data_required != 0);
136 }
137 
update_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int64_t send_bytes,grpc_chttp2_write_cb ** list,int64_t * ctr,grpc_error_handle error)138 static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
139                         int64_t send_bytes, grpc_chttp2_write_cb** list,
140                         int64_t* ctr, grpc_error_handle error) {
141   bool sched_any = false;
142   grpc_chttp2_write_cb* cb = *list;
143   *list = nullptr;
144   *ctr += send_bytes;
145   while (cb) {
146     grpc_chttp2_write_cb* next = cb->next;
147     if (cb->call_at_byte <= *ctr) {
148       sched_any = true;
149       finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
150     } else {
151       add_to_write_list(list, cb);
152     }
153     cb = next;
154   }
155   GRPC_ERROR_UNREF(error);
156   return sched_any;
157 }
158 
report_stall(grpc_chttp2_transport * t,grpc_chttp2_stream * s,const char * staller)159 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
160                          const char* staller) {
161   if (GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace)) {
162     gpr_log(
163         GPR_DEBUG,
164         "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
165         "to happen in a healthy program that is not seeing flow control stalls."
166         " However, if you know that there are unwanted stalls, here is some "
167         "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
168         ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
169         ":s_win=%d:s_delta=%" PRId64 "]",
170         t->peer_string.c_str(), t, s->id, staller,
171         s->flow_controlled_buffer.length,
172         s->stream_compression_method ==
173                 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
174             ? 0
175             : s->compressed_data_buffer.length,
176         s->flow_controlled_bytes_flowed,
177         t->settings[GRPC_ACKED_SETTINGS]
178                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
179         t->flow_control->remote_window(),
180         static_cast<uint32_t>(std::max(
181             int64_t(0),
182             s->flow_control->remote_window_delta() +
183                 static_cast<int64_t>(
184                     t->settings[GRPC_PEER_SETTINGS]
185                                [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]))),
186         s->flow_control->remote_window_delta());
187   }
188 }
189 
190 /* How many bytes would we like to put on the wire during a single syscall */
target_write_size(grpc_chttp2_transport *)191 static uint32_t target_write_size(grpc_chttp2_transport* /*t*/) {
192   return 1024 * 1024;
193 }
194 
195 // Returns true if initial_metadata contains only default headers.
is_default_initial_metadata(grpc_metadata_batch * initial_metadata)196 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
197   return initial_metadata->default_count() ==
198          initial_metadata->non_deadline_count();
199 }
200 
201 namespace {
202 class StreamWriteContext;
203 
204 class WriteContext {
205  public:
WriteContext(grpc_chttp2_transport * t)206   explicit WriteContext(grpc_chttp2_transport* t) : t_(t) {
207     GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
208     GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0);
209   }
210 
211   // TODO(ctiller): make this the destructor
FlushStats()212   void FlushStats() {
213     GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
214         initial_metadata_writes_);
215     GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
216     GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
217         trailing_metadata_writes_);
218     GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
219   }
220 
FlushSettings()221   void FlushSettings() {
222     if (t_->dirtied_local_settings && !t_->sent_local_settings) {
223       grpc_slice_buffer_add(
224           &t_->outbuf, grpc_chttp2_settings_create(
225                            t_->settings[GRPC_SENT_SETTINGS],
226                            t_->settings[GRPC_LOCAL_SETTINGS],
227                            t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
228       t_->force_send_settings = false;
229       t_->dirtied_local_settings = false;
230       t_->sent_local_settings = true;
231       GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
232     }
233   }
234 
FlushQueuedBuffers()235   void FlushQueuedBuffers() {
236     /* simple writes are queued to qbuf, and flushed here */
237     grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
238     t_->num_pending_induced_frames = 0;
239     GPR_ASSERT(t_->qbuf.count == 0);
240   }
241 
FlushWindowUpdates()242   void FlushWindowUpdates() {
243     uint32_t transport_announce =
244         t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
245     if (transport_announce) {
246       grpc_transport_one_way_stats throwaway_stats;
247       grpc_slice_buffer_add(
248           &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
249                                                         &throwaway_stats));
250       grpc_chttp2_reset_ping_clock(t_);
251     }
252   }
253 
FlushPingAcks()254   void FlushPingAcks() {
255     for (size_t i = 0; i < t_->ping_ack_count; i++) {
256       grpc_slice_buffer_add(&t_->outbuf,
257                             grpc_chttp2_ping_create(true, t_->ping_acks[i]));
258     }
259     t_->ping_ack_count = 0;
260   }
261 
EnactHpackSettings()262   void EnactHpackSettings() {
263     t_->hpack_compressor.SetMaxTableSize(
264         t_->settings[GRPC_PEER_SETTINGS]
265                     [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
266   }
267 
UpdateStreamsNoLongerStalled()268   void UpdateStreamsNoLongerStalled() {
269     grpc_chttp2_stream* s;
270     while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
271       if (t_->closed_with_error == GRPC_ERROR_NONE &&
272           grpc_chttp2_list_add_writable_stream(t_, s)) {
273         if (!s->refcount->refs.RefIfNonZero()) {
274           grpc_chttp2_list_remove_writable_stream(t_, s);
275         }
276       }
277     }
278   }
279 
NextStream()280   grpc_chttp2_stream* NextStream() {
281     if (t_->outbuf.length > target_write_size(t_)) {
282       result_.partial = true;
283       return nullptr;
284     }
285 
286     grpc_chttp2_stream* s;
287     if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
288       return nullptr;
289     }
290 
291     return s;
292   }
293 
IncInitialMetadataWrites()294   void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
IncWindowUpdateWrites()295   void IncWindowUpdateWrites() { ++flow_control_writes_; }
IncMessageWrites()296   void IncMessageWrites() { ++message_writes_; }
IncTrailingMetadataWrites()297   void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
298 
NoteScheduledResults()299   void NoteScheduledResults() { result_.early_results_scheduled = true; }
300 
transport() const301   grpc_chttp2_transport* transport() const { return t_; }
302 
Result()303   grpc_chttp2_begin_write_result Result() {
304     result_.writing = t_->outbuf.count > 0;
305     return result_;
306   }
307 
308  private:
309   grpc_chttp2_transport* const t_;
310 
311   /* stats histogram counters: we increment these throughout this function,
312      and at the end publish to the central stats histograms */
313   int flow_control_writes_ = 0;
314   int initial_metadata_writes_ = 0;
315   int trailing_metadata_writes_ = 0;
316   int message_writes_ = 0;
317   grpc_chttp2_begin_write_result result_ = {false, false, false};
318 };
319 
320 class DataSendContext {
321  public:
DataSendContext(WriteContext * write_context,grpc_chttp2_transport * t,grpc_chttp2_stream * s)322   DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
323                   grpc_chttp2_stream* s)
324       : write_context_(write_context),
325         t_(t),
326         s_(s),
327         sending_bytes_before_(s_->sending_bytes) {}
328 
stream_remote_window() const329   uint32_t stream_remote_window() const {
330     return static_cast<uint32_t>(std::max(
331         int64_t(0),
332         s_->flow_control->remote_window_delta() +
333             static_cast<int64_t>(
334                 t_->settings[GRPC_PEER_SETTINGS]
335                             [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE])));
336   }
337 
max_outgoing() const338   uint32_t max_outgoing() const {
339     return static_cast<uint32_t>(std::min(
340         t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
341         static_cast<uint32_t>(std::min(int64_t(stream_remote_window()),
342                                        t_->flow_control->remote_window()))));
343   }
344 
AnyOutgoing() const345   bool AnyOutgoing() const { return max_outgoing() > 0; }
346 
FlushUncompressedBytes()347   void FlushUncompressedBytes() {
348     uint32_t send_bytes = static_cast<uint32_t>(
349         std::min(size_t(max_outgoing()), s_->flow_controlled_buffer.length));
350     is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
351                      s_->fetching_send_message == nullptr &&
352                      s_->send_trailing_metadata != nullptr &&
353                      s_->send_trailing_metadata->empty();
354     grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
355                             is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
356     s_->flow_control->SentData(send_bytes);
357     s_->sending_bytes += send_bytes;
358   }
359 
FlushCompressedBytes()360   void FlushCompressedBytes() {
361     GPR_DEBUG_ASSERT(s_->stream_compression_method !=
362                      GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
363 
364     uint32_t send_bytes = static_cast<uint32_t>(
365         std::min(size_t(max_outgoing()), s_->compressed_data_buffer.length));
366     bool is_last_data_frame =
367         (send_bytes == s_->compressed_data_buffer.length &&
368          s_->flow_controlled_buffer.length == 0 &&
369          s_->fetching_send_message == nullptr);
370     if (is_last_data_frame && s_->send_trailing_metadata != nullptr &&
371         s_->stream_compression_ctx != nullptr) {
372       if (GPR_UNLIKELY(!grpc_stream_compress(
373               s_->stream_compression_ctx, &s_->flow_controlled_buffer,
374               &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
375               GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) {
376         gpr_log(GPR_ERROR, "Stream compression failed.");
377       }
378       grpc_stream_compression_context_destroy(s_->stream_compression_ctx);
379       s_->stream_compression_ctx = nullptr;
380       /* After finish, bytes in s->compressed_data_buffer may be
381        * more than max_outgoing. Start another round of the current
382        * while loop so that send_bytes and is_last_data_frame are
383        * recalculated. */
384       return;
385     }
386     is_last_frame_ = is_last_data_frame &&
387                      s_->send_trailing_metadata != nullptr &&
388                      s_->send_trailing_metadata->empty();
389     grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
390                             is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
391     s_->flow_control->SentData(send_bytes);
392     if (s_->compressed_data_buffer.length == 0) {
393       s_->sending_bytes += s_->uncompressed_data_size;
394     }
395   }
396 
CompressMoreBytes()397   void CompressMoreBytes() {
398     GPR_DEBUG_ASSERT(s_->stream_compression_method !=
399                      GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
400 
401     if (s_->stream_compression_ctx == nullptr) {
402       s_->stream_compression_ctx =
403           grpc_stream_compression_context_create(s_->stream_compression_method);
404     }
405     s_->uncompressed_data_size = s_->flow_controlled_buffer.length;
406     if (GPR_UNLIKELY(!grpc_stream_compress(
407             s_->stream_compression_ctx, &s_->flow_controlled_buffer,
408             &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
409             GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) {
410       gpr_log(GPR_ERROR, "Stream compression failed.");
411     }
412   }
413 
is_last_frame() const414   bool is_last_frame() const { return is_last_frame_; }
415 
CallCallbacks()416   void CallCallbacks() {
417     if (update_list(
418             t_, s_,
419             static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
420             &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
421             GRPC_ERROR_NONE)) {
422       write_context_->NoteScheduledResults();
423     }
424   }
425 
426  private:
427   WriteContext* write_context_;
428   grpc_chttp2_transport* t_;
429   grpc_chttp2_stream* s_;
430   const size_t sending_bytes_before_;
431   bool is_last_frame_ = false;
432 };
433 
434 class StreamWriteContext {
435  public:
StreamWriteContext(WriteContext * write_context,grpc_chttp2_stream * s)436   StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
437       : write_context_(write_context), t_(write_context->transport()), s_(s) {
438     GRPC_CHTTP2_IF_TRACING(
439         gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
440                 t_->is_client ? "CLIENT" : "SERVER", s->id,
441                 s->sent_initial_metadata, s->send_initial_metadata != nullptr,
442                 (int)(s->flow_control->local_window_delta() -
443                       s->flow_control->announced_window_delta())));
444   }
445 
FlushInitialMetadata()446   void FlushInitialMetadata() {
447     /* send initial metadata if it's available */
448     if (s_->sent_initial_metadata) return;
449     if (s_->send_initial_metadata == nullptr) return;
450 
451     // We skip this on the server side if there is no custom initial
452     // metadata, there are no messages to send, and we are also sending
453     // trailing metadata.  This results in a Trailers-Only response,
454     // which is required for retries, as per:
455     // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
456     if (!t_->is_client && s_->fetching_send_message == nullptr &&
457         s_->flow_controlled_buffer.length == 0 &&
458         compressed_data_buffer_len() == 0 &&
459         s_->send_trailing_metadata != nullptr &&
460         is_default_initial_metadata(s_->send_initial_metadata)) {
461       ConvertInitialMetadataToTrailingMetadata();
462     } else {
463       t_->hpack_compressor.EncodeHeaders(
464           grpc_core::HPackCompressor::EncodeHeaderOptions{
465               s_->id,  // stream_id
466               false,   // is_eof
467               t_->settings
468                       [GRPC_PEER_SETTINGS]
469                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
470                   0,  // use_true_binary_metadata
471               t_->settings
472                   [GRPC_PEER_SETTINGS]
473                   [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],  // max_frame_size
474               &s_->stats.outgoing                         // stats
475           },
476           *s_->send_initial_metadata, &t_->outbuf);
477       grpc_chttp2_reset_ping_clock(t_);
478       write_context_->IncInitialMetadataWrites();
479     }
480 
481     s_->send_initial_metadata = nullptr;
482     s_->sent_initial_metadata = true;
483     write_context_->NoteScheduledResults();
484     grpc_chttp2_complete_closure_step(
485         t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
486         "send_initial_metadata_finished");
487   }
488 
compressed_data_buffer_len()489   size_t compressed_data_buffer_len() {
490     return s_->stream_compression_method ==
491                    GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
492                ? 0
493                : s_->compressed_data_buffer.length;
494   }
495 
FlushWindowUpdates()496   void FlushWindowUpdates() {
497     /* send any window updates */
498     const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
499     if (stream_announce == 0) return;
500 
501     grpc_slice_buffer_add(
502         &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
503                                                       &s_->stats.outgoing));
504     grpc_chttp2_reset_ping_clock(t_);
505     write_context_->IncWindowUpdateWrites();
506   }
507 
FlushData()508   void FlushData() {
509     if (!s_->sent_initial_metadata) return;
510 
511     if (s_->flow_controlled_buffer.length == 0 &&
512         compressed_data_buffer_len() == 0) {
513       return;  // early out: nothing to do
514     }
515 
516     DataSendContext data_send_context(write_context_, t_, s_);
517 
518     if (!data_send_context.AnyOutgoing()) {
519       if (t_->flow_control->remote_window() <= 0) {
520         report_stall(t_, s_, "transport");
521         grpc_chttp2_list_add_stalled_by_transport(t_, s_);
522       } else if (data_send_context.stream_remote_window() <= 0) {
523         report_stall(t_, s_, "stream");
524         grpc_chttp2_list_add_stalled_by_stream(t_, s_);
525       }
526       return;  // early out: nothing to do
527     }
528 
529     if (s_->stream_compression_method ==
530         GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
531       while (s_->flow_controlled_buffer.length > 0 &&
532              data_send_context.max_outgoing() > 0) {
533         data_send_context.FlushUncompressedBytes();
534       }
535     } else {
536       while ((s_->flow_controlled_buffer.length > 0 ||
537               s_->compressed_data_buffer.length > 0) &&
538              data_send_context.max_outgoing() > 0) {
539         if (s_->compressed_data_buffer.length > 0) {
540           data_send_context.FlushCompressedBytes();
541         } else {
542           data_send_context.CompressMoreBytes();
543         }
544       }
545     }
546     grpc_chttp2_reset_ping_clock(t_);
547     if (data_send_context.is_last_frame()) {
548       SentLastFrame();
549     }
550     data_send_context.CallCallbacks();
551     stream_became_writable_ = true;
552     if (s_->flow_controlled_buffer.length > 0 ||
553         compressed_data_buffer_len() > 0) {
554       GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
555       grpc_chttp2_list_add_writable_stream(t_, s_);
556     }
557     write_context_->IncMessageWrites();
558   }
559 
FlushTrailingMetadata()560   void FlushTrailingMetadata() {
561     if (!s_->sent_initial_metadata) return;
562 
563     if (s_->send_trailing_metadata == nullptr) return;
564     if (s_->fetching_send_message != nullptr) return;
565     if (s_->flow_controlled_buffer.length != 0) return;
566     if (compressed_data_buffer_len() != 0) return;
567 
568     GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
569     if (s_->send_trailing_metadata->empty()) {
570       grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
571                               &s_->stats.outgoing, &t_->outbuf);
572     } else {
573       t_->hpack_compressor.EncodeHeaders(
574           grpc_core::HPackCompressor::EncodeHeaderOptions{
575               s_->id, true,
576               t_->settings
577                       [GRPC_PEER_SETTINGS]
578                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
579                   0,
580               t_->settings[GRPC_PEER_SETTINGS]
581                           [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
582               &s_->stats.outgoing},
583           grpc_core::ConcatMetadata(
584               grpc_core::MetadataArray(
585                   extra_headers_for_trailing_metadata_,
586                   num_extra_headers_for_trailing_metadata_),
587               *s_->send_trailing_metadata),
588           &t_->outbuf);
589     }
590     write_context_->IncTrailingMetadataWrites();
591     grpc_chttp2_reset_ping_clock(t_);
592     SentLastFrame();
593 
594     write_context_->NoteScheduledResults();
595     grpc_chttp2_complete_closure_step(
596         t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
597         "send_trailing_metadata_finished");
598   }
599 
stream_became_writable()600   bool stream_became_writable() { return stream_became_writable_; }
601 
602  private:
ConvertInitialMetadataToTrailingMetadata()603   void ConvertInitialMetadataToTrailingMetadata() {
604     GRPC_CHTTP2_IF_TRACING(
605         gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
606     // When sending Trailers-Only, we need to move the :status and
607     // content-type headers to the trailers.
608     if (s_->send_initial_metadata->legacy_index()->named.status != nullptr) {
609       extra_headers_for_trailing_metadata_
610           [num_extra_headers_for_trailing_metadata_++] =
611               &s_->send_initial_metadata->legacy_index()->named.status->md;
612     }
613     if (s_->send_initial_metadata->legacy_index()->named.content_type !=
614         nullptr) {
615       extra_headers_for_trailing_metadata_
616           [num_extra_headers_for_trailing_metadata_++] =
617               &s_->send_initial_metadata->legacy_index()
618                    ->named.content_type->md;
619     }
620   }
621 
SentLastFrame()622   void SentLastFrame() {
623     s_->send_trailing_metadata = nullptr;
624     if (s_->sent_trailing_metadata_op) {
625       *s_->sent_trailing_metadata_op = true;
626       s_->sent_trailing_metadata_op = nullptr;
627     }
628     s_->sent_trailing_metadata = true;
629     s_->eos_sent = true;
630 
631     if (!t_->is_client && !s_->read_closed) {
632       grpc_slice_buffer_add(
633           &t_->outbuf, grpc_chttp2_rst_stream_create(
634                            s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
635     }
636     grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
637                                    GRPC_ERROR_NONE);
638   }
639 
640   WriteContext* const write_context_;
641   grpc_chttp2_transport* const t_;
642   grpc_chttp2_stream* const s_;
643   bool stream_became_writable_ = false;
644   grpc_mdelem* extra_headers_for_trailing_metadata_[2];
645   size_t num_extra_headers_for_trailing_metadata_ = 0;
646 };
647 }  // namespace
648 
grpc_chttp2_begin_write(grpc_chttp2_transport * t)649 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
650     grpc_chttp2_transport* t) {
651   WriteContext ctx(t);
652   ctx.FlushSettings();
653   ctx.FlushPingAcks();
654   ctx.FlushQueuedBuffers();
655   ctx.EnactHpackSettings();
656 
657   if (t->flow_control->remote_window() > 0) {
658     ctx.UpdateStreamsNoLongerStalled();
659   }
660 
661   /* for each grpc_chttp2_stream that's become writable, frame it's data
662      (according to available window sizes) and add to the output buffer */
663   while (grpc_chttp2_stream* s = ctx.NextStream()) {
664     StreamWriteContext stream_ctx(&ctx, s);
665     size_t orig_len = t->outbuf.length;
666     stream_ctx.FlushInitialMetadata();
667     stream_ctx.FlushWindowUpdates();
668     stream_ctx.FlushData();
669     stream_ctx.FlushTrailingMetadata();
670     if (t->outbuf.length > orig_len) {
671       /* Add this stream to the list of the contexts to be traced at TCP */
672       s->byte_counter += t->outbuf.length - orig_len;
673       if (s->traced && grpc_endpoint_can_track_err(t->ep)) {
674         grpc_core::ContextList::Append(&t->cl, s);
675       }
676     }
677     if (stream_ctx.stream_became_writable()) {
678       if (!grpc_chttp2_list_add_writing_stream(t, s)) {
679         /* already in writing list: drop ref */
680         GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
681       } else {
682         /* ref will be dropped at end of write */
683       }
684     } else {
685       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
686     }
687   }
688 
689   ctx.FlushWindowUpdates();
690 
691   maybe_initiate_ping(t);
692 
693   return ctx.Result();
694 }
695 
grpc_chttp2_end_write(grpc_chttp2_transport * t,grpc_error_handle error)696 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
697   GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
698   grpc_chttp2_stream* s;
699 
700   if (t->channelz_socket != nullptr) {
701     t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
702   }
703   t->num_messages_in_next_write = 0;
704 
705   while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
706     if (s->sending_bytes != 0) {
707       update_list(t, s, static_cast<int64_t>(s->sending_bytes),
708                   &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
709                   GRPC_ERROR_REF(error));
710       s->sending_bytes = 0;
711     }
712     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
713   }
714   grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
715   GRPC_ERROR_UNREF(error);
716 }
717