1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <assert.h>
25 #include <stdbool.h>
26
27 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
28 #include "src/core/ext/transport/chttp2/transport/frame.h"
29 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
30 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
31 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
32 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
33 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
34 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
35 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
36 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
37 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
38 #include "src/core/lib/channel/channelz.h"
39 #include "src/core/lib/compression/stream_compression.h"
40 #include "src/core/lib/gprpp/manual_constructor.h"
41 #include "src/core/lib/iomgr/combiner.h"
42 #include "src/core/lib/iomgr/endpoint.h"
43 #include "src/core/lib/iomgr/timer.h"
44 #include "src/core/lib/transport/connectivity_state.h"
45 #include "src/core/lib/transport/metadata_batch.h"
46 #include "src/core/lib/transport/transport_impl.h"
47
48 namespace grpc_core {
49 class ContextList;
50 }
51
52 /* streams are kept in various linked lists depending on what things need to
53 happen to them... this enum labels each list */
54 typedef enum {
55 /* If a stream is in the following two lists, an explicit ref is associated
56 with the stream */
57 GRPC_CHTTP2_LIST_WRITABLE,
58 GRPC_CHTTP2_LIST_WRITING,
59 /* No additional ref is taken for the following refs. Make sure to remove the
60 stream from these lists when the stream is removed. */
61 GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
62 GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
63 /** streams that are waiting to start because there are too many concurrent
64 streams on the connection */
65 GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
66 STREAM_LIST_COUNT /* must be last */
67 } grpc_chttp2_stream_list_id;
68
69 typedef enum {
70 GRPC_CHTTP2_WRITE_STATE_IDLE,
71 GRPC_CHTTP2_WRITE_STATE_WRITING,
72 GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
73 } grpc_chttp2_write_state;
74
75 typedef enum {
76 GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
77 GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
78 } grpc_chttp2_optimization_target;
79
80 typedef enum {
81 GRPC_CHTTP2_PCL_INITIATE = 0,
82 GRPC_CHTTP2_PCL_NEXT,
83 GRPC_CHTTP2_PCL_INFLIGHT,
84 GRPC_CHTTP2_PCL_COUNT /* must be last */
85 } grpc_chttp2_ping_closure_list;
86
87 typedef enum {
88 GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
89 GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
90 GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
91 GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
92 GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
93 GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
94 GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
95 GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
96 GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
97 GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
98 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
99 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
100 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
101 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
102 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
103 GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
104 GRPC_CHTTP2_INITIATE_WRITE_BDP_PING,
105 GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
106 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
107 GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
108 GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
109 } grpc_chttp2_initiate_write_reason;
110
111 const char* grpc_chttp2_initiate_write_reason_string(
112 grpc_chttp2_initiate_write_reason reason);
113
114 struct grpc_chttp2_ping_queue {
115 grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {};
116 uint64_t inflight_id = 0;
117 };
118 struct grpc_chttp2_repeated_ping_policy {
119 int max_pings_without_data;
120 int max_ping_strikes;
121 grpc_millis min_recv_ping_interval_without_data;
122 };
123 struct grpc_chttp2_repeated_ping_state {
124 grpc_millis last_ping_sent_time;
125 int pings_before_data_required;
126 grpc_timer delayed_ping_timer;
127 bool is_delayed_ping_timer_set;
128 };
129 struct grpc_chttp2_server_ping_recv_state {
130 grpc_millis last_ping_recv_time;
131 int ping_strikes;
132 };
133 /* deframer state for the overall http2 stream of bytes */
134 typedef enum {
135 /* prefix: one entry per http2 connection prefix byte */
136 GRPC_DTS_CLIENT_PREFIX_0 = 0,
137 GRPC_DTS_CLIENT_PREFIX_1,
138 GRPC_DTS_CLIENT_PREFIX_2,
139 GRPC_DTS_CLIENT_PREFIX_3,
140 GRPC_DTS_CLIENT_PREFIX_4,
141 GRPC_DTS_CLIENT_PREFIX_5,
142 GRPC_DTS_CLIENT_PREFIX_6,
143 GRPC_DTS_CLIENT_PREFIX_7,
144 GRPC_DTS_CLIENT_PREFIX_8,
145 GRPC_DTS_CLIENT_PREFIX_9,
146 GRPC_DTS_CLIENT_PREFIX_10,
147 GRPC_DTS_CLIENT_PREFIX_11,
148 GRPC_DTS_CLIENT_PREFIX_12,
149 GRPC_DTS_CLIENT_PREFIX_13,
150 GRPC_DTS_CLIENT_PREFIX_14,
151 GRPC_DTS_CLIENT_PREFIX_15,
152 GRPC_DTS_CLIENT_PREFIX_16,
153 GRPC_DTS_CLIENT_PREFIX_17,
154 GRPC_DTS_CLIENT_PREFIX_18,
155 GRPC_DTS_CLIENT_PREFIX_19,
156 GRPC_DTS_CLIENT_PREFIX_20,
157 GRPC_DTS_CLIENT_PREFIX_21,
158 GRPC_DTS_CLIENT_PREFIX_22,
159 GRPC_DTS_CLIENT_PREFIX_23,
160 /* frame header byte 0... */
161 /* must follow from the prefix states */
162 GRPC_DTS_FH_0,
163 GRPC_DTS_FH_1,
164 GRPC_DTS_FH_2,
165 GRPC_DTS_FH_3,
166 GRPC_DTS_FH_4,
167 GRPC_DTS_FH_5,
168 GRPC_DTS_FH_6,
169 GRPC_DTS_FH_7,
170 /* ... frame header byte 8 */
171 GRPC_DTS_FH_8,
172 /* inside a http2 frame */
173 GRPC_DTS_FRAME
174 } grpc_chttp2_deframe_transport_state;
175
176 struct grpc_chttp2_stream_list {
177 grpc_chttp2_stream* head;
178 grpc_chttp2_stream* tail;
179 };
180 struct grpc_chttp2_stream_link {
181 grpc_chttp2_stream* next;
182 grpc_chttp2_stream* prev;
183 };
184 /* We keep several sets of connection wide parameters */
185 typedef enum {
186 /* The settings our peer has asked for (and we have acked) */
187 GRPC_PEER_SETTINGS = 0,
188 /* The settings we'd like to have */
189 GRPC_LOCAL_SETTINGS,
190 /* The settings we've published to our peer */
191 GRPC_SENT_SETTINGS,
192 /* The settings the peer has acked */
193 GRPC_ACKED_SETTINGS,
194 GRPC_NUM_SETTING_SETS
195 } grpc_chttp2_setting_set;
196
197 typedef enum {
198 GRPC_CHTTP2_NO_GOAWAY_SEND,
199 GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED,
200 GRPC_CHTTP2_GOAWAY_SENT,
201 } grpc_chttp2_sent_goaway_state;
202
203 typedef struct grpc_chttp2_write_cb {
204 int64_t call_at_byte;
205 grpc_closure* closure;
206 struct grpc_chttp2_write_cb* next;
207 } grpc_chttp2_write_cb;
208
209 namespace grpc_core {
210
211 class Chttp2IncomingByteStream : public ByteStream {
212 public:
213 Chttp2IncomingByteStream(grpc_chttp2_transport* transport,
214 grpc_chttp2_stream* stream, uint32_t frame_size,
215 uint32_t flags);
216
217 void Orphan() override;
218
219 bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
220 grpc_error_handle Pull(grpc_slice* slice) override;
221 void Shutdown(grpc_error_handle error) override;
222
223 // TODO(roth): When I converted this class to C++, I wanted to make it
224 // inherit from RefCounted or InternallyRefCounted instead of continuing
225 // to use its own custom ref-counting code. However, that would require
226 // using multiple inheritance, which sucks in general. And to make matters
227 // worse, it causes problems with our New<> and Delete<> wrappers.
228 // Specifically, unless RefCounted is first in the list of parent classes,
229 // it will see a different value of the address of the object than the one
230 // we actually allocated, in which case gpr_free() will be called on a
231 // different address than the one we got from gpr_malloc(), thus causing a
232 // crash. Given the fragility of depending on that, as well as a desire to
233 // avoid multiple inheritance in general, I've decided to leave this
234 // alone for now. We can revisit this once we're able to link against
235 // libc++, at which point we can eliminate New<> and Delete<> and
236 // switch to std::shared_ptr<>.
Ref()237 void Ref() { refs_.Ref(); }
Unref()238 void Unref() {
239 if (GPR_UNLIKELY(refs_.Unref())) {
240 delete this;
241 }
242 }
243
244 void PublishError(grpc_error_handle error);
245
246 grpc_error_handle Push(const grpc_slice& slice, grpc_slice* slice_out);
247
248 grpc_error_handle Finished(grpc_error_handle error, bool reset_on_error);
249
remaining_bytes()250 uint32_t remaining_bytes() const { return remaining_bytes_; }
251
252 private:
253 static void NextLocked(void* arg, grpc_error_handle error_ignored);
254 static void OrphanLocked(void* arg, grpc_error_handle error_ignored);
255
256 void MaybeCreateStreamDecompressionCtx();
257
258 grpc_chttp2_transport* transport_; // Immutable.
259 grpc_chttp2_stream* stream_; // Immutable.
260
261 grpc_core::RefCount refs_;
262
263 /* Accessed only by transport thread when stream->pending_byte_stream == false
264 * Accessed only by application thread when stream->pending_byte_stream ==
265 * true */
266 uint32_t remaining_bytes_;
267
268 /* Accessed only by transport thread when stream->pending_byte_stream == false
269 * Accessed only by application thread when stream->pending_byte_stream ==
270 * true */
271 struct {
272 grpc_closure closure;
273 size_t max_size_hint;
274 grpc_closure* on_complete;
275 } next_action_;
276 grpc_closure destroy_action_;
277 };
278
279 } // namespace grpc_core
280
281 typedef enum {
282 GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
283 GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
284 GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
285 GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
286 } grpc_chttp2_keepalive_state;
287
288 struct grpc_chttp2_transport {
289 grpc_chttp2_transport(const grpc_channel_args* channel_args,
290 grpc_endpoint* ep, bool is_client,
291 grpc_resource_user* resource_user);
292 ~grpc_chttp2_transport();
293
294 grpc_transport base; /* must be first */
295 grpc_core::RefCount refs;
296 grpc_endpoint* ep;
297 std::string peer_string;
298
299 grpc_resource_user* resource_user;
300
301 grpc_core::Combiner* combiner;
302
303 grpc_closure* notify_on_receive_settings = nullptr;
304 grpc_closure* notify_on_close = nullptr;
305
306 /** write execution state of the transport */
307 grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
308
309 /** is the transport destroying itself? */
310 uint8_t destroying = false;
311 /** has the upper layer closed the transport? */
312 grpc_error_handle closed_with_error = GRPC_ERROR_NONE;
313
314 /** is there a read request to the endpoint outstanding? */
315 uint8_t endpoint_reading = 1;
316
317 /** various lists of streams */
318 grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
319
320 /** maps stream id to grpc_chttp2_stream objects */
321 grpc_chttp2_stream_map stream_map;
322
323 grpc_closure write_action_begin_locked;
324 grpc_closure write_action;
325 grpc_closure write_action_end_locked;
326
327 grpc_closure read_action_locked;
328
329 /** incoming read bytes */
330 grpc_slice_buffer read_buffer;
331
332 /** address to place a newly accepted stream - set and unset by
333 grpc_chttp2_parsing_accept_stream; used by init_stream to
334 publish the accepted server stream */
335 grpc_chttp2_stream** accepting_stream = nullptr;
336
337 /* accept stream callback */
338 void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
339 const void* server_data);
340 void* accept_stream_cb_user_data;
341
342 /** connectivity tracking */
343 grpc_core::ConnectivityStateTracker state_tracker;
344
345 /** data to write now */
346 grpc_slice_buffer outbuf;
347 /** hpack encoding */
348 grpc_core::HPackCompressor hpack_compressor;
349 /** is this a client? */
350 bool is_client;
351
352 /** data to write next write */
353 grpc_slice_buffer qbuf;
354
355 /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
356 */
357 uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
358
359 /** Set to a grpc_error object if a goaway frame is received. By default, set
360 * to GRPC_ERROR_NONE */
361 grpc_error_handle goaway_error = GRPC_ERROR_NONE;
362
363 grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
364
365 /** are the local settings dirty and need to be sent? */
366 bool dirtied_local_settings = true;
367 /** have local settings been sent? */
368 bool sent_local_settings = false;
369 /** bitmask of setting indexes to send out
370 Hack: it's common for implementations to assume 65536 bytes initial send
371 window -- this should by rights be 0 */
372 uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
373 /** settings values */
374 uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
375
376 /** what is the next stream id to be allocated by this peer?
377 copied to next_stream_id in parsing when parsing commences */
378 uint32_t next_stream_id = 0;
379
380 /** last new stream id */
381 uint32_t last_new_stream_id = 0;
382
383 /** ping queues for various ping insertion points */
384 grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue();
385 grpc_chttp2_repeated_ping_policy ping_policy;
386 grpc_chttp2_repeated_ping_state ping_state;
387 uint64_t ping_ctr = 0; /* unique id for pings */
388 grpc_closure retry_initiate_ping_locked;
389
390 /** ping acks */
391 size_t ping_ack_count = 0;
392 size_t ping_ack_capacity = 0;
393 uint64_t* ping_acks = nullptr;
394 grpc_chttp2_server_ping_recv_state ping_recv_state;
395
396 /** parser for headers */
397 grpc_core::HPackParser hpack_parser;
398 /** simple one shot parsers */
399 union {
400 grpc_chttp2_window_update_parser window_update;
401 grpc_chttp2_settings_parser settings;
402 grpc_chttp2_ping_parser ping;
403 grpc_chttp2_rst_stream_parser rst_stream;
404 } simple;
405 /** parser for goaway frames */
406 grpc_chttp2_goaway_parser goaway_parser;
407
408 grpc_core::PolymorphicManualConstructor<
409 grpc_core::chttp2::TransportFlowControlBase,
410 grpc_core::chttp2::TransportFlowControl,
411 grpc_core::chttp2::TransportFlowControlDisabled>
412 flow_control;
413 /** initial window change. This is tracked as we parse settings frames from
414 * the remote peer. If there is a positive delta, then we will make all
415 * streams readable since they may have become unstalled */
416 int64_t initial_window_update = 0;
417
418 /* deframing */
419 grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0;
420 uint8_t incoming_frame_type = 0;
421 uint8_t incoming_frame_flags = 0;
422 uint8_t header_eof = 0;
423 bool is_first_frame = true;
424 uint32_t expect_continuation_stream_id = 0;
425 uint32_t incoming_frame_size = 0;
426 uint32_t incoming_stream_id = 0;
427
428 /* active parser */
429 void* parser_data = nullptr;
430 grpc_chttp2_stream* incoming_stream = nullptr;
431 grpc_error_handle (*parser)(void* parser_user_data, grpc_chttp2_transport* t,
432 grpc_chttp2_stream* s, const grpc_slice& slice,
433 int is_last);
434
435 grpc_chttp2_write_cb* write_cb_pool = nullptr;
436
437 /* bdp estimator */
438 bool bdp_ping_blocked =
439 false; /* Is the BDP blocked due to not receiving any data? */
440 grpc_closure next_bdp_ping_timer_expired_locked;
441 grpc_closure start_bdp_ping_locked;
442 grpc_closure finish_bdp_ping_locked;
443
444 /* if non-NULL, close the transport with this error when writes are finished
445 */
446 grpc_error_handle close_transport_on_writes_finished = GRPC_ERROR_NONE;
447
448 /* a list of closures to run after writes are finished */
449 grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
450
451 /* buffer pool state */
452 /** have we scheduled a benign cleanup? */
453 bool benign_reclaimer_registered = false;
454 /** have we scheduled a destructive cleanup? */
455 bool destructive_reclaimer_registered = false;
456 /** benign cleanup closure */
457 grpc_closure benign_reclaimer_locked;
458 /** destructive cleanup closure */
459 grpc_closure destructive_reclaimer_locked;
460
461 /* next bdp ping timer */
462 bool have_next_bdp_ping_timer = false;
463 /** If start_bdp_ping_locked has been called */
464 bool bdp_ping_started = false;
465 grpc_timer next_bdp_ping_timer;
466
467 /* keep-alive ping support */
468 /** Closure to initialize a keepalive ping */
469 grpc_closure init_keepalive_ping_locked;
470 /** Closure to run when the keepalive ping is sent */
471 grpc_closure start_keepalive_ping_locked;
472 /** Cousure to run when the keepalive ping ack is received */
473 grpc_closure finish_keepalive_ping_locked;
474 /** Closrue to run when the keepalive ping timeouts */
475 grpc_closure keepalive_watchdog_fired_locked;
476 /** timer to initiate ping events */
477 grpc_timer keepalive_ping_timer;
478 /** watchdog to kill the transport when waiting for the keepalive ping */
479 grpc_timer keepalive_watchdog_timer;
480 /** time duration in between pings */
481 grpc_millis keepalive_time;
482 /** grace period for a ping to complete before watchdog kicks in */
483 grpc_millis keepalive_timeout;
484 /** if keepalive pings are allowed when there's no outstanding streams */
485 bool keepalive_permit_without_calls = false;
486 /** If start_keepalive_ping_locked has been called */
487 bool keepalive_ping_started = false;
488 /** keep-alive state machine state */
489 grpc_chttp2_keepalive_state keepalive_state;
490 grpc_core::ContextList* cl = nullptr;
491 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
492 uint32_t num_messages_in_next_write = 0;
493 /** The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
494 * RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond
495 * DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would
496 * only continue reading when we are able to write to the socket again,
497 * thereby reducing the number of induced frames. */
498 uint32_t num_pending_induced_frames = 0;
499 bool reading_paused_on_pending_induced_frames = false;
500 };
501
502 typedef enum {
503 GRPC_METADATA_NOT_PUBLISHED,
504 GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
505 GRPC_METADATA_PUBLISHED_FROM_WIRE,
506 GRPC_METADATA_PUBLISHED_AT_CLOSE
507 } grpc_published_metadata_method;
508
509 struct grpc_chttp2_stream {
510 grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
511 const void* server_data, grpc_core::Arena* arena);
512 ~grpc_chttp2_stream();
513
514 void* context;
515 grpc_chttp2_transport* t;
516 grpc_stream_refcount* refcount;
517 // Reffer is a 0-len structure, simply reffing `t` and `refcount` in its ctor
518 // before initializing the rest of the stream, to avoid cache misses. This
519 // field MUST be right after `t` and `refcount`.
520 struct Reffer {
521 explicit Reffer(grpc_chttp2_stream* s);
522 } reffer;
523
524 grpc_closure destroy_stream;
525 grpc_closure* destroy_stream_arg;
526
527 grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
528 uint8_t included[STREAM_LIST_COUNT] = {};
529
530 /** HTTP2 stream id for this stream, or zero if one has not been assigned */
531 uint32_t id = 0;
532
533 /** things the upper layers would like to send */
534 grpc_metadata_batch* send_initial_metadata = nullptr;
535 grpc_closure* send_initial_metadata_finished = nullptr;
536 grpc_metadata_batch* send_trailing_metadata = nullptr;
537 // TODO(yashykt): Find a better name for the below field and others in this
538 // struct to betteer distinguish inputs, return values, and
539 // internal state.
540 // sent_trailing_metadata_op allows the transport to fill in to the upper
541 // layer whether this stream was able to send its trailing metadata (used for
542 // detecting cancellation on the server-side)..
543 bool* sent_trailing_metadata_op = nullptr;
544 grpc_closure* send_trailing_metadata_finished = nullptr;
545
546 grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message;
547 uint32_t fetched_send_message_length = 0;
548 grpc_slice fetching_slice = grpc_empty_slice();
549 int64_t next_message_end_offset;
550 int64_t flow_controlled_bytes_written = 0;
551 int64_t flow_controlled_bytes_flowed = 0;
552 grpc_closure complete_fetch_locked;
553 grpc_closure* fetching_send_message_finished = nullptr;
554
555 grpc_metadata_batch* recv_initial_metadata;
556 grpc_closure* recv_initial_metadata_ready = nullptr;
557 bool* trailing_metadata_available = nullptr;
558 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message = nullptr;
559 bool* call_failed_before_recv_message = nullptr;
560 grpc_closure* recv_message_ready = nullptr;
561 grpc_metadata_batch* recv_trailing_metadata;
562 grpc_closure* recv_trailing_metadata_finished = nullptr;
563
564 grpc_transport_stream_stats* collecting_stats = nullptr;
565 grpc_transport_stream_stats stats = grpc_transport_stream_stats();
566
567 /** Is this stream closed for writing. */
568 bool write_closed = false;
569 /** Is this stream reading half-closed. */
570 bool read_closed = false;
571 /** Are all published incoming byte streams closed. */
572 bool all_incoming_byte_streams_finished = false;
573 /** Has this stream seen an error.
574 If true, then pending incoming frames can be thrown away. */
575 bool seen_error = false;
576 /** Are we buffering writes on this stream? If yes, we won't become writable
577 until there's enough queued up in the flow_controlled_buffer */
578 bool write_buffering = false;
579
580 /* have we sent or received the EOS bit? */
581 bool eos_received = false;
582 bool eos_sent = false;
583
584 /** the error that resulted in this stream being read-closed */
585 grpc_error_handle read_closed_error = GRPC_ERROR_NONE;
586 /** the error that resulted in this stream being write-closed */
587 grpc_error_handle write_closed_error = GRPC_ERROR_NONE;
588
589 grpc_published_metadata_method published_metadata[2] = {};
590 bool final_metadata_requested = false;
591
592 grpc_metadata_batch initial_metadata_buffer;
593 grpc_metadata_batch trailing_metadata_buffer;
594
595 grpc_slice_buffer frame_storage; /* protected by t combiner */
596
597 grpc_closure* on_next = nullptr; /* protected by t combiner */
598 bool pending_byte_stream = false; /* protected by t combiner */
599 // cached length of buffer to be used by the transport thread in cases where
600 // stream->pending_byte_stream == true. The value is saved before
601 // application threads are allowed to modify
602 // unprocessed_incoming_frames_buffer
603 size_t unprocessed_incoming_frames_buffer_cached_length = 0;
604 /* Accessed only by transport thread when stream->pending_byte_stream == false
605 * Accessed only by application thread when stream->pending_byte_stream ==
606 * true */
607 grpc_slice_buffer unprocessed_incoming_frames_buffer;
608 grpc_closure reset_byte_stream;
609 grpc_error_handle byte_stream_error =
610 GRPC_ERROR_NONE; /* protected by t combiner */
611 bool received_last_frame = false; /* protected by t combiner */
612
613 grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
614
615 /** saw some stream level error */
616 grpc_error_handle forced_close_error = GRPC_ERROR_NONE;
617 /** how many header frames have we received? */
618 uint8_t header_frames_received = 0;
619 /** parsing state for data frames */
620 /* Accessed only by transport thread when stream->pending_byte_stream == false
621 * Accessed only by application thread when stream->pending_byte_stream ==
622 * true */
623 grpc_chttp2_data_parser data_parser;
624 /** number of bytes received - reset at end of parse thread execution */
625 int64_t received_bytes = 0;
626
627 bool sent_initial_metadata = false;
628 bool sent_trailing_metadata = false;
629
630 grpc_core::PolymorphicManualConstructor<
631 grpc_core::chttp2::StreamFlowControlBase,
632 grpc_core::chttp2::StreamFlowControl,
633 grpc_core::chttp2::StreamFlowControlDisabled>
634 flow_control;
635
636 grpc_slice_buffer flow_controlled_buffer;
637
638 grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr;
639 grpc_chttp2_write_cb* on_write_finished_cbs = nullptr;
640 grpc_chttp2_write_cb* finish_after_write = nullptr;
641 size_t sending_bytes = 0;
642
643 /* Stream compression method to be used. */
644 grpc_stream_compression_method stream_compression_method =
645 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
646 /* Stream decompression method to be used. */
647 grpc_stream_compression_method stream_decompression_method =
648 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS;
649
650 /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
651 */
652 bool unprocessed_incoming_frames_decompressed = false;
653 /** Whether the bytes needs to be traced using Fathom */
654 bool traced = false;
655 /** gRPC header bytes that are already decompressed */
656 size_t decompressed_header_bytes = 0;
657 /** Byte counter for number of bytes written */
658 size_t byte_counter = 0;
659
660 /** Amount of uncompressed bytes sent out when compressed_data_buffer is
661 * emptied */
662 size_t uncompressed_data_size;
663 /** Stream compression compress context */
664 grpc_stream_compression_context* stream_compression_ctx;
665 /** Buffer storing data that is compressed but not sent */
666 grpc_slice_buffer compressed_data_buffer;
667
668 /** Stream compression decompress context */
669 grpc_stream_compression_context* stream_decompression_ctx;
670 /** Temporary buffer storing decompressed data.
671 * Initialized, used, and destroyed only when stream uses (non-identity)
672 * compression.
673 */
674 grpc_slice_buffer decompressed_data_buffer;
675 };
676
677 /** Transport writing call flow:
678 grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
679 go out on the wire.
680 If no other write has been started, a task is enqueued onto our workqueue.
681 When that task executes, it obtains the global lock, and gathers the data
682 to write.
683 The global lock is dropped and we do the syscall to write.
684 After writing, a follow-up check is made to see if another round of writing
685 should be performed.
686
687 The actual call chain is documented in the implementation of this function.
688 */
689 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
690 grpc_chttp2_initiate_write_reason reason);
691
692 struct grpc_chttp2_begin_write_result {
693 /** are we writing? */
694 bool writing;
695 /** if writing: was it a complete flush (false) or a partial flush (true) */
696 bool partial;
697 /** did we queue any completions as part of beginning the write */
698 bool early_results_scheduled;
699 };
700 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
701 grpc_chttp2_transport* t);
702 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error);
703
704 /** Process one slice of incoming data; return 1 if the connection is still
705 viable after reading, or 0 if the connection should be torn down */
706 grpc_error_handle grpc_chttp2_perform_read(grpc_chttp2_transport* t,
707 const grpc_slice& slice);
708
709 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
710 grpc_chttp2_stream* s);
711 /** Get a writable stream
712 returns non-zero if there was a stream available */
713 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
714 grpc_chttp2_stream** s);
715 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
716 grpc_chttp2_stream* s);
717
718 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
719 grpc_chttp2_stream* s);
720 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t);
721 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
722 grpc_chttp2_stream** s);
723
724 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t,
725 grpc_chttp2_stream* s);
726 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t,
727 grpc_chttp2_stream** s);
728
729 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
730 grpc_chttp2_stream* s);
731 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
732 grpc_chttp2_stream** s);
733 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
734 grpc_chttp2_stream* s);
735
736 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
737 grpc_chttp2_stream* s);
738 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
739 grpc_chttp2_stream** s);
740 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
741 grpc_chttp2_stream* s);
742
743 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
744 grpc_chttp2_stream* s);
745 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
746 grpc_chttp2_stream** s);
747 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
748 grpc_chttp2_stream* s);
749
750 /********* Flow Control ***************/
751
752 // Takes in a flow control action and performs all the needed operations.
753 void grpc_chttp2_act_on_flowctl_action(
754 const grpc_core::chttp2::FlowControlAction& action,
755 grpc_chttp2_transport* t, grpc_chttp2_stream* s);
756
757 /********* End of Flow Control ***************/
758
grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport * t,uint32_t id)759 inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(
760 grpc_chttp2_transport* t, uint32_t id) {
761 return static_cast<grpc_chttp2_stream*>(
762 grpc_chttp2_stream_map_find(&t->stream_map, id));
763 }
764 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
765 uint32_t id);
766
767 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
768 uint32_t goaway_error,
769 uint32_t last_stream_id,
770 absl::string_view goaway_text);
771
772 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
773
774 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
775 grpc_chttp2_stream* s,
776 grpc_closure** pclosure,
777 grpc_error_handle error,
778 const char* desc);
779
780 #define GRPC_HEADER_SIZE_IN_BYTES 5
781 #define MAX_SIZE_T (~(size_t)0)
782
783 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
784 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
785 (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
786
787 // extern grpc_core::TraceFlag grpc_http_trace;
788 // extern grpc_core::TraceFlag grpc_flowctl_trace;
789
790 #define GRPC_CHTTP2_IF_TRACING(stmt) \
791 do { \
792 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { \
793 (stmt); \
794 } \
795 } while (0)
796
797 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
798 grpc_chttp2_stream* stream,
799 grpc_error_handle error);
800 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
801 grpc_chttp2_stream* s, int close_reads,
802 int close_writes, grpc_error_handle error);
803 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
804
805 #ifndef NDEBUG
806 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
807 grpc_chttp2_stream_ref(stream, reason)
808 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
809 grpc_chttp2_stream_unref(stream, reason)
810 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
811 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
812 #else
813 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
814 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
815 grpc_chttp2_stream_unref(stream)
816 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
817 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
818 #endif
819
820 #ifndef NDEBUG
821 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
822 grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
823 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \
824 grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)825 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t,
826 const char* reason, const char* file,
827 int line) {
828 if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
829 delete t;
830 }
831 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)832 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t,
833 const char* reason, const char* file,
834 int line) {
835 t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
836 }
837 #else
838 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
839 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t)840 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
841 if (t->refs.Unref()) {
842 delete t;
843 }
844 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t)845 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) {
846 t->refs.Ref();
847 }
848 #endif
849
850 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
851
852 /** Add a new ping strike to ping_recv_state.ping_strikes. If
853 ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
854 with error code ENHANCE_YOUR_CALM and additional debug data resembling
855 "too_many_pings" followed by immediately closing the connection. */
856 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t);
857
858 /** Resets ping clock. Should be called when flushing window updates,
859 * initial/trailing metadata or data frames. For a server, it resets the number
860 * of ping strikes and the last_ping_recv_time. For a ping sender, it resets
861 * pings_before_data_required. */
862 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t);
863
864 /** add a ref to the stream and add it to the writable list;
865 ref will be dropped in writing.c */
866 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
867 grpc_chttp2_stream* s);
868
869 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
870 grpc_error_handle due_to_error);
871
872 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
873 grpc_chttp2_stream* s);
874 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
875 grpc_chttp2_stream* s);
876 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
877 grpc_chttp2_stream* s);
878
879 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
880 grpc_chttp2_stream* s,
881 grpc_error_handle error);
882
883 /** Set the default keepalive configurations, must only be called at
884 initialization */
885 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
886 bool is_client);
887
888 void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error_handle error);
889
890 void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
891
892 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
893