1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <string>
24
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/str_format.h"
27
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31
32 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
33 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
34 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
35 #include "src/core/ext/transport/cronet/transport/cronet_status.h"
36 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
37 #include "src/core/lib/debug/trace.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gprpp/manual_constructor.h"
40 #include "src/core/lib/iomgr/endpoint.h"
41 #include "src/core/lib/iomgr/exec_ctx.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "src/core/lib/slice/slice_string_helpers.h"
44 #include "src/core/lib/surface/channel.h"
45 #include "src/core/lib/surface/validate_metadata.h"
46 #include "src/core/lib/transport/metadata_batch.h"
47 #include "src/core/lib/transport/static_metadata.h"
48 #include "src/core/lib/transport/timeout_encoding.h"
49 #include "src/core/lib/transport/transport_impl.h"
50
51 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
52
53 #define GRPC_HEADER_SIZE_IN_BYTES 5
54 #define GRPC_FLUSH_READ_SIZE 4096
55
56 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
57 #define CRONET_LOG(...) \
58 do { \
59 if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
60 } while (0)
61
62 enum e_op_result {
63 ACTION_TAKEN_WITH_CALLBACK,
64 ACTION_TAKEN_NO_CALLBACK,
65 NO_ACTION_POSSIBLE
66 };
67
68 enum e_op_id {
69 OP_SEND_INITIAL_METADATA = 0,
70 OP_SEND_MESSAGE,
71 OP_SEND_TRAILING_METADATA,
72 OP_RECV_MESSAGE,
73 OP_RECV_INITIAL_METADATA,
74 OP_RECV_TRAILING_METADATA,
75 OP_CANCEL_ERROR,
76 OP_ON_COMPLETE,
77 OP_FAILED,
78 OP_SUCCEEDED,
79 OP_CANCELED,
80 OP_RECV_MESSAGE_AND_ON_COMPLETE,
81 OP_READ_REQ_MADE,
82 OP_NUM_OPS
83 };
84
85 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
86
87 static void on_stream_ready(bidirectional_stream*);
88 static void on_response_headers_received(
89 bidirectional_stream*, const bidirectional_stream_header_array*,
90 const char*);
91 static void on_write_completed(bidirectional_stream*, const char*);
92 static void on_read_completed(bidirectional_stream*, char*, int);
93 static void on_response_trailers_received(
94 bidirectional_stream*, const bidirectional_stream_header_array*);
95 static void on_succeeded(bidirectional_stream*);
96 static void on_failed(bidirectional_stream*, int);
97 static void on_canceled(bidirectional_stream*);
98 static bidirectional_stream_callback cronet_callbacks = {
99 on_stream_ready,
100 on_response_headers_received,
101 on_read_completed,
102 on_write_completed,
103 on_response_trailers_received,
104 on_succeeded,
105 on_failed,
106 on_canceled};
107
108 /* Cronet transport object */
109 struct grpc_cronet_transport {
110 grpc_transport base; /* must be first element in this structure */
111 stream_engine* engine;
112 char* host;
113 bool use_packet_coalescing;
114 };
115 typedef struct grpc_cronet_transport grpc_cronet_transport;
116
117 /* TODO (makdharma): reorder structure for memory efficiency per
118 http://www.catb.org/esr/structure-packing/#_structure_reordering: */
119 struct read_state {
read_stateread_state120 read_state(grpc_core::Arena* arena)
121 : trailing_metadata(arena), initial_metadata(arena) {
122 grpc_slice_buffer_init(&read_slice_buffer);
123 }
124
125 /* vars to store data coming from server */
126 char* read_buffer = nullptr;
127 bool length_field_received = false;
128 int received_bytes = 0;
129 int remaining_bytes = 0;
130 int length_field = 0;
131 bool compressed = false;
132 char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
133 char* payload_field = nullptr;
134 bool read_stream_closed = false;
135
136 /* vars for holding data destined for the application */
137 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
138 grpc_slice_buffer read_slice_buffer;
139
140 /* vars for trailing metadata */
141 grpc_chttp2_incoming_metadata_buffer trailing_metadata;
142 bool trailing_metadata_valid = false;
143
144 /* vars for initial metadata */
145 grpc_chttp2_incoming_metadata_buffer initial_metadata;
146 };
147
148 struct write_state {
149 char* write_buffer = nullptr;
150 };
151
152 /* track state of one stream op */
153 struct op_state {
op_stateop_state154 op_state(grpc_core::Arena* arena) : rs(arena) {}
155
156 bool state_op_done[OP_NUM_OPS] = {};
157 bool state_callback_received[OP_NUM_OPS] = {};
158 /* A non-zero gRPC status code has been seen */
159 bool fail_state = false;
160 /* Transport is discarding all buffered messages */
161 bool flush_read = false;
162 bool flush_cronet_when_ready = false;
163 bool pending_write_for_trailer = false;
164 bool pending_send_message = false;
165 /* User requested RECV_TRAILING_METADATA */
166 bool pending_recv_trailing_metadata = false;
167 cronet_net_error_code net_error = OK;
168 grpc_error* cancel_error = GRPC_ERROR_NONE;
169 /* data structure for storing data coming from server */
170 struct read_state rs;
171 /* data structure for storing data going to the server */
172 struct write_state ws;
173 };
174
175 struct stream_obj;
176
177 struct op_and_state {
178 op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
179
180 grpc_transport_stream_op_batch op;
181 struct op_state state;
182 bool done = false;
183 struct stream_obj* s; /* Pointer back to the stream object */
184 /* next op_and_state in the linked list */
185 struct op_and_state* next = nullptr;
186 };
187
188 struct op_storage {
189 int num_pending_ops = 0;
190 struct op_and_state* head = nullptr;
191 };
192
193 struct stream_obj {
194 stream_obj(grpc_transport* gt, grpc_stream* gs,
195 grpc_stream_refcount* refcount, grpc_core::Arena* arena);
196 ~stream_obj();
197
198 grpc_core::Arena* arena;
199 struct op_and_state* oas = nullptr;
200 grpc_transport_stream_op_batch* curr_op = nullptr;
201 grpc_cronet_transport* curr_ct;
202 grpc_stream* curr_gs;
203 bidirectional_stream* cbs = nullptr;
204 bidirectional_stream_header_array header_array =
205 bidirectional_stream_header_array(); // Zero-initialize the structure.
206
207 /* Stream level state. Some state will be tracked both at stream and stream_op
208 * level */
209 struct op_state state;
210
211 /* OP storage */
212 struct op_storage storage;
213
214 /* Mutex to protect storage */
215 gpr_mu mu;
216
217 /* Refcount object of the stream */
218 grpc_stream_refcount* refcount;
219 };
220
221 #ifndef NDEBUG
222 #define GRPC_CRONET_STREAM_REF(stream, reason) \
223 grpc_cronet_stream_ref((stream), (reason))
224 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
225 grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)226 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
227 grpc_stream_ref(s->refcount, reason);
228 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)229 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
230 grpc_stream_unref(s->refcount, reason);
231 }
232 #else
233 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
234 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
235 grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)236 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)237 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
238 #endif
239
240 static enum e_op_result execute_stream_op(struct op_and_state* oas);
241
242 /*
243 Utility function to translate enum into string for printing
244 */
op_result_string(enum e_op_result i)245 static const char* op_result_string(enum e_op_result i) {
246 switch (i) {
247 case ACTION_TAKEN_WITH_CALLBACK:
248 return "ACTION_TAKEN_WITH_CALLBACK";
249 case ACTION_TAKEN_NO_CALLBACK:
250 return "ACTION_TAKEN_NO_CALLBACK";
251 case NO_ACTION_POSSIBLE:
252 return "NO_ACTION_POSSIBLE";
253 }
254 GPR_UNREACHABLE_CODE(return "UNKNOWN");
255 }
256
op_id_string(enum e_op_id i)257 static const char* op_id_string(enum e_op_id i) {
258 switch (i) {
259 case OP_SEND_INITIAL_METADATA:
260 return "OP_SEND_INITIAL_METADATA";
261 case OP_SEND_MESSAGE:
262 return "OP_SEND_MESSAGE";
263 case OP_SEND_TRAILING_METADATA:
264 return "OP_SEND_TRAILING_METADATA";
265 case OP_RECV_MESSAGE:
266 return "OP_RECV_MESSAGE";
267 case OP_RECV_INITIAL_METADATA:
268 return "OP_RECV_INITIAL_METADATA";
269 case OP_RECV_TRAILING_METADATA:
270 return "OP_RECV_TRAILING_METADATA";
271 case OP_CANCEL_ERROR:
272 return "OP_CANCEL_ERROR";
273 case OP_ON_COMPLETE:
274 return "OP_ON_COMPLETE";
275 case OP_FAILED:
276 return "OP_FAILED";
277 case OP_SUCCEEDED:
278 return "OP_SUCCEEDED";
279 case OP_CANCELED:
280 return "OP_CANCELED";
281 case OP_RECV_MESSAGE_AND_ON_COMPLETE:
282 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
283 case OP_READ_REQ_MADE:
284 return "OP_READ_REQ_MADE";
285 case OP_NUM_OPS:
286 return "OP_NUM_OPS";
287 }
288 return "UNKNOWN";
289 }
290
null_and_maybe_free_read_buffer(stream_obj * s)291 static void null_and_maybe_free_read_buffer(stream_obj* s) {
292 if (s->state.rs.read_buffer &&
293 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
294 gpr_free(s->state.rs.read_buffer);
295 }
296 s->state.rs.read_buffer = nullptr;
297 }
298
read_grpc_header(stream_obj * s)299 static void read_grpc_header(stream_obj* s) {
300 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
301 s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
302 s->state.rs.received_bytes = 0;
303 s->state.rs.compressed = false;
304 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
305 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
306 s->state.rs.remaining_bytes);
307 }
308
make_error_with_desc(int error_code,int cronet_internal_error_code,const char * desc)309 static grpc_error* make_error_with_desc(int error_code,
310 int cronet_internal_error_code,
311 const char* desc) {
312 std::string error_message =
313 absl::StrFormat("Cronet error code:%d, Cronet error detail:%s",
314 cronet_internal_error_code, desc);
315 grpc_error* error =
316 GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_message.c_str());
317 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
318 return error;
319 }
320
op_and_state(stream_obj * s,const grpc_transport_stream_op_batch & op)321 inline op_and_state::op_and_state(stream_obj* s,
322 const grpc_transport_stream_op_batch& op)
323 : op(op), state(s->arena), s(s) {}
324
325 /*
326 Add a new stream op to op storage.
327 */
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)328 static void add_to_storage(struct stream_obj* s,
329 grpc_transport_stream_op_batch* op) {
330 struct op_storage* storage = &s->storage;
331 /* add new op at the beginning of the linked list. The memory is freed
332 in remove_from_storage */
333 op_and_state* new_op = new op_and_state(s, *op);
334 gpr_mu_lock(&s->mu);
335 new_op->next = storage->head;
336 storage->head = new_op;
337 storage->num_pending_ops++;
338 if (op->send_message) {
339 s->state.pending_send_message = true;
340 }
341 if (op->recv_trailing_metadata) {
342 s->state.pending_recv_trailing_metadata = true;
343 }
344 CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
345 storage->num_pending_ops);
346 gpr_mu_unlock(&s->mu);
347 }
348
349 /*
350 Traverse the linked list and delete op and free memory
351 */
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)352 static void remove_from_storage(struct stream_obj* s,
353 struct op_and_state* oas) {
354 struct op_and_state* curr;
355 if (s->storage.head == nullptr || oas == nullptr) {
356 return;
357 }
358 if (s->storage.head == oas) {
359 s->storage.head = oas->next;
360 delete oas;
361 s->storage.num_pending_ops--;
362 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
363 s->storage.num_pending_ops);
364 } else {
365 for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
366 if (curr->next == oas) {
367 curr->next = oas->next;
368 s->storage.num_pending_ops--;
369 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
370 s->storage.num_pending_ops);
371 delete oas;
372 break;
373 } else if (GPR_UNLIKELY(curr->next == nullptr)) {
374 CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
375 }
376 }
377 }
378 }
379
380 /*
381 Cycle through ops and try to take next action. Break when either
382 an action with callback is taken, or no action is possible.
383 This can get executed from the Cronet network thread via cronet callback
384 or on the application supplied thread via the perform_stream_op function.
385 */
execute_from_storage(stream_obj * s)386 static void execute_from_storage(stream_obj* s) {
387 gpr_mu_lock(&s->mu);
388 for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
389 CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
390 GPR_ASSERT(!curr->done);
391 enum e_op_result result = execute_stream_op(curr);
392 CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
393 op_result_string(result));
394 /* if this op is done, then remove it and free memory */
395 if (curr->done) {
396 struct op_and_state* next = curr->next;
397 remove_from_storage(s, curr);
398 curr = next;
399 } else if (result == NO_ACTION_POSSIBLE) {
400 curr = curr->next;
401 } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
402 /* wait for the callback */
403 break;
404 } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
405 }
406 gpr_mu_unlock(&s->mu);
407 }
408
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_chttp2_incoming_metadata_buffer * mds)409 static void convert_cronet_array_to_metadata(
410 const bidirectional_stream_header_array* header_array,
411 grpc_chttp2_incoming_metadata_buffer* mds) {
412 for (size_t i = 0; i < header_array->count; i++) {
413 CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
414 header_array->headers[i].key, header_array->headers[i].value);
415 grpc_slice key = grpc_slice_intern(
416 grpc_slice_from_static_string(header_array->headers[i].key));
417 grpc_slice value;
418 if (grpc_is_refcounted_slice_binary_header(key)) {
419 value = grpc_slice_from_static_string(header_array->headers[i].value);
420 value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
421 value, grpc_chttp2_base64_infer_length_after_decode(value)));
422 } else {
423 value = grpc_slice_intern(
424 grpc_slice_from_static_string(header_array->headers[i].value));
425 }
426 GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
427 grpc_chttp2_incoming_metadata_buffer_add(
428 mds, grpc_mdelem_from_slices(key, value)));
429 }
430 }
431
432 /*
433 Cronet callback
434 */
on_failed(bidirectional_stream * stream,int net_error)435 static void on_failed(bidirectional_stream* stream, int net_error) {
436 gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
437 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
438 grpc_core::ExecCtx exec_ctx;
439
440 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
441 gpr_mu_lock(&s->mu);
442 bidirectional_stream_destroy(s->cbs);
443 s->state.state_callback_received[OP_FAILED] = true;
444 s->state.net_error = static_cast<cronet_net_error_code>(net_error);
445 s->cbs = nullptr;
446 if (s->header_array.headers) {
447 gpr_free(s->header_array.headers);
448 s->header_array.headers = nullptr;
449 }
450 if (s->state.ws.write_buffer) {
451 gpr_free(s->state.ws.write_buffer);
452 s->state.ws.write_buffer = nullptr;
453 }
454 null_and_maybe_free_read_buffer(s);
455 gpr_mu_unlock(&s->mu);
456 execute_from_storage(s);
457 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
458 }
459
460 /*
461 Cronet callback
462 */
on_canceled(bidirectional_stream * stream)463 static void on_canceled(bidirectional_stream* stream) {
464 CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
465 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
466 grpc_core::ExecCtx exec_ctx;
467
468 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
469 gpr_mu_lock(&s->mu);
470 bidirectional_stream_destroy(s->cbs);
471 s->state.state_callback_received[OP_CANCELED] = true;
472 s->cbs = nullptr;
473 if (s->header_array.headers) {
474 gpr_free(s->header_array.headers);
475 s->header_array.headers = nullptr;
476 }
477 if (s->state.ws.write_buffer) {
478 gpr_free(s->state.ws.write_buffer);
479 s->state.ws.write_buffer = nullptr;
480 }
481 null_and_maybe_free_read_buffer(s);
482 gpr_mu_unlock(&s->mu);
483 execute_from_storage(s);
484 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
485 }
486
487 /*
488 Cronet callback
489 */
on_succeeded(bidirectional_stream * stream)490 static void on_succeeded(bidirectional_stream* stream) {
491 CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
492 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
493 grpc_core::ExecCtx exec_ctx;
494
495 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
496 gpr_mu_lock(&s->mu);
497 bidirectional_stream_destroy(s->cbs);
498 s->state.state_callback_received[OP_SUCCEEDED] = true;
499 s->cbs = nullptr;
500 null_and_maybe_free_read_buffer(s);
501 gpr_mu_unlock(&s->mu);
502 execute_from_storage(s);
503 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
504 }
505
506 /*
507 Cronet callback
508 */
on_stream_ready(bidirectional_stream * stream)509 static void on_stream_ready(bidirectional_stream* stream) {
510 CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
511 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
512 grpc_core::ExecCtx exec_ctx;
513 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
514 grpc_cronet_transport* t = s->curr_ct;
515 gpr_mu_lock(&s->mu);
516 s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
517 s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
518 /* Free the memory allocated for headers */
519 if (s->header_array.headers) {
520 gpr_free(s->header_array.headers);
521 s->header_array.headers = nullptr;
522 }
523 /* Send the initial metadata on wire if there is no SEND_MESSAGE or
524 * SEND_TRAILING_METADATA ops pending */
525 if (t->use_packet_coalescing) {
526 if (s->state.flush_cronet_when_ready) {
527 CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
528 bidirectional_stream_flush(stream);
529 }
530 }
531 gpr_mu_unlock(&s->mu);
532 execute_from_storage(s);
533 }
534
535 /*
536 Cronet callback
537 */
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)538 static void on_response_headers_received(
539 bidirectional_stream* stream,
540 const bidirectional_stream_header_array* headers,
541 const char* negotiated_protocol) {
542 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
543 grpc_core::ExecCtx exec_ctx;
544 CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
545 headers, negotiated_protocol);
546 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
547
548 /* Identify if this is a header or a trailer (in a trailer-only response case)
549 */
550 for (size_t i = 0; i < headers->count; i++) {
551 if (0 == strcmp("grpc-status", headers->headers[i].key)) {
552 on_response_trailers_received(stream, headers);
553
554 /* Do an extra read for a trailer-only stream to trigger on_succeeded()
555 * callback */
556 read_grpc_header(s);
557 return;
558 }
559 }
560
561 gpr_mu_lock(&s->mu);
562 convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
563 s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
564 if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
565 s->state.state_callback_received[OP_FAILED])) {
566 /* Do an extra read to trigger on_succeeded() callback in case connection
567 is closed */
568 GPR_ASSERT(s->state.rs.length_field_received == false);
569 read_grpc_header(s);
570 }
571 gpr_mu_unlock(&s->mu);
572 execute_from_storage(s);
573 }
574
575 /*
576 Cronet callback
577 */
on_write_completed(bidirectional_stream * stream,const char * data)578 static void on_write_completed(bidirectional_stream* stream, const char* data) {
579 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
580 grpc_core::ExecCtx exec_ctx;
581 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
582 CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
583 gpr_mu_lock(&s->mu);
584 if (s->state.ws.write_buffer) {
585 gpr_free(s->state.ws.write_buffer);
586 s->state.ws.write_buffer = nullptr;
587 }
588 s->state.state_callback_received[OP_SEND_MESSAGE] = true;
589 gpr_mu_unlock(&s->mu);
590 execute_from_storage(s);
591 }
592
593 /*
594 Cronet callback
595 */
on_read_completed(bidirectional_stream * stream,char * data,int count)596 static void on_read_completed(bidirectional_stream* stream, char* data,
597 int count) {
598 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
599 grpc_core::ExecCtx exec_ctx;
600 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
601 CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
602 count);
603 gpr_mu_lock(&s->mu);
604 s->state.state_callback_received[OP_RECV_MESSAGE] = true;
605 if (count > 0 && s->state.flush_read) {
606 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
607 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
608 GRPC_FLUSH_READ_SIZE);
609 gpr_mu_unlock(&s->mu);
610 } else if (count > 0) {
611 s->state.rs.received_bytes += count;
612 s->state.rs.remaining_bytes -= count;
613 if (s->state.rs.remaining_bytes > 0) {
614 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
615 s->state.state_op_done[OP_READ_REQ_MADE] = true;
616 bidirectional_stream_read(
617 s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
618 s->state.rs.remaining_bytes);
619 gpr_mu_unlock(&s->mu);
620 } else {
621 gpr_mu_unlock(&s->mu);
622 execute_from_storage(s);
623 }
624 } else {
625 null_and_maybe_free_read_buffer(s);
626 s->state.rs.read_stream_closed = true;
627 gpr_mu_unlock(&s->mu);
628 execute_from_storage(s);
629 }
630 }
631
632 /*
633 Cronet callback
634 */
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)635 static void on_response_trailers_received(
636 bidirectional_stream* stream,
637 const bidirectional_stream_header_array* trailers) {
638 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
639 grpc_core::ExecCtx exec_ctx;
640 CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
641 trailers);
642 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
643 grpc_cronet_transport* t = s->curr_ct;
644 gpr_mu_lock(&s->mu);
645 s->state.rs.trailing_metadata_valid = false;
646 convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
647 if (trailers->count > 0) {
648 s->state.rs.trailing_metadata_valid = true;
649 }
650 s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
651 /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
652 * trigger on_succeeded */
653 if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
654 !(s->state.state_op_done[OP_CANCEL_ERROR] ||
655 s->state.state_callback_received[OP_FAILED])) {
656 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
657 s->state.state_callback_received[OP_SEND_MESSAGE] = false;
658 bidirectional_stream_write(s->cbs, "", 0, true);
659 if (t->use_packet_coalescing) {
660 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
661 bidirectional_stream_flush(s->cbs);
662 }
663 s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
664
665 gpr_mu_unlock(&s->mu);
666 } else {
667 gpr_mu_unlock(&s->mu);
668 execute_from_storage(s);
669 }
670 }
671
672 /*
673 Utility function that takes the data from s->write_slice_buffer and assembles
674 into a contiguous byte stream with 5 byte gRPC header prepended.
675 */
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)676 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
677 char** pp_write_buffer,
678 size_t* p_write_buffer_size, uint32_t flags) {
679 size_t length = write_slice_buffer->length;
680 *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
681 /* This is freed in the on_write_completed callback */
682 char* write_buffer =
683 static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
684 *pp_write_buffer = write_buffer;
685 uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
686 /* Append 5 byte header */
687 /* Compressed flag */
688 *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
689 /* Message length */
690 *p++ = static_cast<uint8_t>(length >> 24);
691 *p++ = static_cast<uint8_t>(length >> 16);
692 *p++ = static_cast<uint8_t>(length >> 8);
693 *p++ = static_cast<uint8_t>(length);
694 /* append actual data */
695 size_t offset = 0;
696 for (size_t i = 0; i < write_slice_buffer->count; ++i) {
697 memcpy(p + offset, GRPC_SLICE_START_PTR(write_slice_buffer->slices[i]),
698 GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]));
699 offset += GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]);
700 }
701 }
702
703 /*
704 Convert metadata in a format that Cronet can consume
705 */
convert_metadata_to_cronet_headers(grpc_metadata_batch * metadata,const char * host,std::string * pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)706 static void convert_metadata_to_cronet_headers(
707 grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
708 bidirectional_stream_header** pp_headers, size_t* p_num_headers,
709 const char** method) {
710 grpc_linked_mdelem* curr = metadata->list.head;
711 /* Walk the linked list and get number of header fields */
712 size_t num_headers_available = 0;
713 while (curr != nullptr) {
714 curr = curr->next;
715 num_headers_available++;
716 }
717 grpc_millis deadline = metadata->deadline;
718 if (deadline != GRPC_MILLIS_INF_FUTURE) {
719 num_headers_available++;
720 }
721 /* Allocate enough memory. It is freed in the on_stream_ready callback
722 */
723 bidirectional_stream_header* headers =
724 static_cast<bidirectional_stream_header*>(gpr_malloc(
725 sizeof(bidirectional_stream_header) * num_headers_available));
726 *pp_headers = headers;
727
728 /* Walk the linked list again, this time copying the header fields.
729 s->num_headers can be less than num_headers_available, as some headers
730 are not used for cronet.
731 TODO (makdharma): Eliminate need to traverse the LL second time for perf.
732 */
733 curr = metadata->list.head;
734 size_t num_headers = 0;
735 while (num_headers < num_headers_available) {
736 grpc_mdelem mdelem = curr->md;
737 curr = curr->next;
738 char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
739 char* value;
740 if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
741 grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
742 value = grpc_slice_to_c_string(wire_value);
743 grpc_slice_unref_internal(wire_value);
744 } else {
745 value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
746 }
747 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
748 grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
749 GRPC_MDSTR_AUTHORITY)) {
750 /* Cronet populates these fields on its own */
751 gpr_free(key);
752 gpr_free(value);
753 continue;
754 }
755 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
756 if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
757 *method = "PUT";
758 } else {
759 /* POST method in default*/
760 *method = "POST";
761 }
762 gpr_free(key);
763 gpr_free(value);
764 continue;
765 }
766 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
767 /* Create URL by appending :path value to the hostname */
768 *pp_url = absl::StrCat("https://", host, value);
769 gpr_free(key);
770 gpr_free(value);
771 continue;
772 }
773 CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
774 headers[num_headers].key = key;
775 headers[num_headers].value = value;
776 num_headers++;
777 if (curr == nullptr) {
778 break;
779 }
780 }
781 if (deadline != GRPC_MILLIS_INF_FUTURE) {
782 char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
783 char* value =
784 static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
785 grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
786 value);
787 headers[num_headers].key = key;
788 headers[num_headers].value = value;
789
790 num_headers++;
791 }
792
793 *p_num_headers = num_headers;
794 }
795
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)796 static void parse_grpc_header(const uint8_t* data, int* length,
797 bool* compressed) {
798 const uint8_t c = *data;
799 const uint8_t* p = data + 1;
800 *compressed = ((c & 0x01) == 0x01);
801 *length = 0;
802 *length |= (*p++) << 24;
803 *length |= (*p++) << 16;
804 *length |= (*p++) << 8;
805 *length |= (*p++);
806 }
807
header_has_authority(grpc_linked_mdelem * head)808 static bool header_has_authority(grpc_linked_mdelem* head) {
809 while (head != nullptr) {
810 if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
811 GRPC_MDSTR_AUTHORITY)) {
812 return true;
813 }
814 head = head->next;
815 }
816 return false;
817 }
818
819 /*
820 Op Execution: Decide if one of the actions contained in the stream op can be
821 executed. This is the heart of the state machine.
822 */
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)823 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
824 struct stream_obj* s, struct op_state* op_state,
825 enum e_op_id op_id) {
826 struct op_state* stream_state = &s->state;
827 grpc_cronet_transport* t = s->curr_ct;
828 bool result = true;
829 /* When call is canceled, every op can be run, except under following
830 conditions
831 */
832 bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
833 stream_state->state_callback_received[OP_FAILED];
834 if (is_canceled_or_failed) {
835 if (op_id == OP_SEND_INITIAL_METADATA) {
836 CRONET_LOG(GPR_DEBUG, "Because");
837 result = false;
838 }
839 if (op_id == OP_SEND_MESSAGE) {
840 CRONET_LOG(GPR_DEBUG, "Because");
841 result = false;
842 }
843 if (op_id == OP_SEND_TRAILING_METADATA) {
844 CRONET_LOG(GPR_DEBUG, "Because");
845 result = false;
846 }
847 if (op_id == OP_CANCEL_ERROR) {
848 CRONET_LOG(GPR_DEBUG, "Because");
849 result = false;
850 }
851 /* already executed */
852 if (op_id == OP_RECV_INITIAL_METADATA &&
853 stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
854 CRONET_LOG(GPR_DEBUG, "Because");
855 result = false;
856 }
857 if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
858 CRONET_LOG(GPR_DEBUG, "Because");
859 result = false;
860 }
861 if (op_id == OP_RECV_TRAILING_METADATA &&
862 stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
863 CRONET_LOG(GPR_DEBUG, "Because");
864 result = false;
865 }
866 /* ON_COMPLETE can be processed if one of the following conditions is met:
867 * 1. the stream failed
868 * 2. the stream is cancelled, and the callback is received
869 * 3. the stream succeeded before cancel is effective
870 * 4. the stream is cancelled, and the stream is never started */
871 if (op_id == OP_ON_COMPLETE &&
872 !(stream_state->state_callback_received[OP_FAILED] ||
873 stream_state->state_callback_received[OP_CANCELED] ||
874 stream_state->state_callback_received[OP_SUCCEEDED] ||
875 !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
876 CRONET_LOG(GPR_DEBUG, "Because");
877 result = false;
878 }
879 } else if (op_id == OP_SEND_INITIAL_METADATA) {
880 /* already executed */
881 if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
882 } else if (op_id == OP_RECV_INITIAL_METADATA) {
883 if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
884 /* already executed */
885 result = false;
886 } else if (!stream_state
887 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
888 /* we haven't sent headers yet. */
889 result = false;
890 } else if (!stream_state
891 ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
892 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
893 /* we haven't received headers yet. */
894 result = false;
895 }
896 } else if (op_id == OP_SEND_MESSAGE) {
897 if (op_state->state_op_done[OP_SEND_MESSAGE]) {
898 /* already executed (note we're checking op specific state, not stream
899 state) */
900 result = false;
901 } else if (!stream_state
902 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
903 /* we haven't sent headers yet. */
904 result = false;
905 }
906 } else if (op_id == OP_RECV_MESSAGE) {
907 if (op_state->state_op_done[OP_RECV_MESSAGE]) {
908 /* already executed */
909 result = false;
910 } else if (!stream_state
911 ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
912 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
913 /* we haven't received headers yet. */
914 result = false;
915 }
916 } else if (op_id == OP_RECV_TRAILING_METADATA) {
917 if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
918 /* already executed */
919 result = false;
920 } else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
921 !stream_state->state_op_done[OP_RECV_MESSAGE]) {
922 /* we have asked for but haven't received message yet. */
923 result = false;
924 } else if (!stream_state
925 ->state_callback_received[OP_RECV_TRAILING_METADATA]) {
926 /* we haven't received trailers yet. */
927 result = false;
928 } else if (!stream_state->state_callback_received[OP_SUCCEEDED]) {
929 /* we haven't received on_succeeded yet. */
930 result = false;
931 }
932 } else if (op_id == OP_SEND_TRAILING_METADATA) {
933 if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
934 /* already executed */
935 result = false;
936 } else if (!stream_state
937 ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
938 /* we haven't sent initial metadata yet */
939 result = false;
940 } else if (stream_state->pending_send_message &&
941 !stream_state->state_op_done[OP_SEND_MESSAGE]) {
942 /* we haven't sent message yet */
943 result = false;
944 } else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
945 !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
946 !(t->use_packet_coalescing &&
947 stream_state->pending_write_for_trailer)) {
948 /* we haven't got on_write_completed for the send yet */
949 result = false;
950 }
951 } else if (op_id == OP_CANCEL_ERROR) {
952 /* already executed */
953 if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
954 } else if (op_id == OP_ON_COMPLETE) {
955 if (op_state->state_op_done[OP_ON_COMPLETE]) {
956 /* already executed (note we're checking op specific state, not stream
957 state) */
958 CRONET_LOG(GPR_DEBUG, "Because");
959 result = false;
960 }
961 /* Check if every op that was asked for is done. */
962 /* TODO(muxi): We should not consider the recv ops here, since they
963 * have their own callbacks. We should invoke a batch's on_complete
964 * as soon as all of the batch's send ops are complete, even if
965 * there are still recv ops pending. */
966 else if (curr_op->send_initial_metadata &&
967 !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
968 CRONET_LOG(GPR_DEBUG, "Because");
969 result = false;
970 } else if (curr_op->send_message &&
971 !op_state->state_op_done[OP_SEND_MESSAGE]) {
972 CRONET_LOG(GPR_DEBUG, "Because");
973 result = false;
974 } else if (curr_op->send_message &&
975 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
976 CRONET_LOG(GPR_DEBUG, "Because");
977 result = false;
978 } else if (curr_op->send_trailing_metadata &&
979 !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
980 CRONET_LOG(GPR_DEBUG, "Because");
981 result = false;
982 } else if (curr_op->recv_initial_metadata &&
983 !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
984 CRONET_LOG(GPR_DEBUG, "Because");
985 result = false;
986 } else if (curr_op->recv_message &&
987 !op_state->state_op_done[OP_RECV_MESSAGE]) {
988 CRONET_LOG(GPR_DEBUG, "Because");
989 result = false;
990 } else if (curr_op->cancel_stream &&
991 !stream_state->state_callback_received[OP_CANCELED]) {
992 CRONET_LOG(GPR_DEBUG, "Because");
993 result = false;
994 } else if (curr_op->recv_trailing_metadata) {
995 /* We aren't done with trailing metadata yet */
996 if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
997 CRONET_LOG(GPR_DEBUG, "Because");
998 result = false;
999 }
1000 /* We've asked for actual message in an earlier op, and it hasn't been
1001 delivered yet. */
1002 else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1003 /* If this op is not the one asking for read, (which means some earlier
1004 op has asked), and the read hasn't been delivered. */
1005 if (!curr_op->recv_message &&
1006 !stream_state->state_callback_received[OP_SUCCEEDED]) {
1007 CRONET_LOG(GPR_DEBUG, "Because");
1008 result = false;
1009 }
1010 }
1011 }
1012 /* We should see at least one on_write_completed for the trailers that we
1013 sent */
1014 else if (curr_op->send_trailing_metadata &&
1015 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
1016 result = false;
1017 }
1018 }
1019 CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1020 result ? "YES" : "NO");
1021 return result;
1022 }
1023
1024 /*
1025 TODO (makdharma): Break down this function in smaller chunks for readability.
1026 */
execute_stream_op(struct op_and_state * oas)1027 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1028 grpc_transport_stream_op_batch* stream_op = &oas->op;
1029 struct stream_obj* s = oas->s;
1030 grpc_cronet_transport* t = s->curr_ct;
1031 struct op_state* stream_state = &s->state;
1032 enum e_op_result result = NO_ACTION_POSSIBLE;
1033 if (stream_op->send_initial_metadata &&
1034 op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1035 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1036 /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1037 * on_failed */
1038 GPR_ASSERT(s->cbs == nullptr);
1039 GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1040 s->cbs =
1041 bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1042 CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1043 if (t->use_packet_coalescing) {
1044 bidirectional_stream_disable_auto_flush(s->cbs, true);
1045 bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1046 }
1047 std::string url;
1048 const char* method = "POST";
1049 s->header_array.headers = nullptr;
1050 convert_metadata_to_cronet_headers(
1051 stream_op->payload->send_initial_metadata.send_initial_metadata,
1052 t->host, &url, &s->header_array.headers, &s->header_array.count,
1053 &method);
1054 s->header_array.capacity = s->header_array.count;
1055 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1056 url.c_str());
1057 bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1058 false);
1059 unsigned int header_index;
1060 for (header_index = 0; header_index < s->header_array.count;
1061 header_index++) {
1062 gpr_free((void*)s->header_array.headers[header_index].key);
1063 gpr_free((void*)s->header_array.headers[header_index].value);
1064 }
1065 stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1066 if (t->use_packet_coalescing) {
1067 if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1068 s->state.flush_cronet_when_ready = true;
1069 }
1070 }
1071 result = ACTION_TAKEN_WITH_CALLBACK;
1072 } else if (stream_op->send_message &&
1073 op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1074 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1075 stream_state->pending_send_message = false;
1076 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1077 stream_state->state_callback_received[OP_FAILED] ||
1078 stream_state->state_callback_received[OP_SUCCEEDED]) {
1079 result = NO_ACTION_POSSIBLE;
1080 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1081 } else {
1082 grpc_slice_buffer write_slice_buffer;
1083 grpc_slice slice;
1084 grpc_slice_buffer_init(&write_slice_buffer);
1085 while (write_slice_buffer.length <
1086 stream_op->payload->send_message.send_message->length()) {
1087 /* TODO(roth): When we add support for incremental sending,this code
1088 * will need to be changed to support asynchronous delivery of the
1089 * send_message payload. */
1090 if (!stream_op->payload->send_message.send_message->Next(
1091 stream_op->payload->send_message.send_message->length(),
1092 nullptr)) {
1093 /* Should never reach here */
1094 GPR_ASSERT(false);
1095 }
1096 if (GRPC_ERROR_NONE !=
1097 stream_op->payload->send_message.send_message->Pull(&slice)) {
1098 /* Should never reach here */
1099 GPR_ASSERT(false);
1100 }
1101 grpc_slice_buffer_add(&write_slice_buffer, slice);
1102 }
1103 size_t write_buffer_size;
1104 create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
1105 &write_buffer_size,
1106 stream_op->payload->send_message.send_message->flags());
1107 if (write_buffer_size > 0) {
1108 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1109 stream_state->ws.write_buffer);
1110 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1111 bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1112 static_cast<int>(write_buffer_size), false);
1113 grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1114 if (t->use_packet_coalescing) {
1115 if (!stream_op->send_trailing_metadata) {
1116 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1117 bidirectional_stream_flush(s->cbs);
1118 result = ACTION_TAKEN_WITH_CALLBACK;
1119 } else {
1120 stream_state->pending_write_for_trailer = true;
1121 result = ACTION_TAKEN_NO_CALLBACK;
1122 }
1123 } else {
1124 result = ACTION_TAKEN_WITH_CALLBACK;
1125 }
1126 } else {
1127 /* Should never reach here */
1128 GPR_ASSERT(false);
1129 }
1130 }
1131 stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1132 oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1133 stream_op->payload->send_message.send_message.reset();
1134 } else if (stream_op->send_trailing_metadata &&
1135 op_can_be_run(stream_op, s, &oas->state,
1136 OP_SEND_TRAILING_METADATA)) {
1137 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1138 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1139 stream_state->state_callback_received[OP_FAILED] ||
1140 stream_state->state_callback_received[OP_SUCCEEDED]) {
1141 result = NO_ACTION_POSSIBLE;
1142 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1143 } else {
1144 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1145 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1146 bidirectional_stream_write(s->cbs, "", 0, true);
1147 if (t->use_packet_coalescing) {
1148 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1149 bidirectional_stream_flush(s->cbs);
1150 }
1151 result = ACTION_TAKEN_WITH_CALLBACK;
1152 }
1153 stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1154 } else if (stream_op->recv_initial_metadata &&
1155 op_can_be_run(stream_op, s, &oas->state,
1156 OP_RECV_INITIAL_METADATA)) {
1157 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1158 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1159 grpc_core::ExecCtx::Run(
1160 DEBUG_LOCATION,
1161 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1162 GRPC_ERROR_NONE);
1163 } else if (stream_state->state_callback_received[OP_FAILED]) {
1164 grpc_core::ExecCtx::Run(
1165 DEBUG_LOCATION,
1166 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1167 GRPC_ERROR_NONE);
1168 } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1169 grpc_core::ExecCtx::Run(
1170 DEBUG_LOCATION,
1171 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1172 GRPC_ERROR_NONE);
1173 } else {
1174 grpc_chttp2_incoming_metadata_buffer_publish(
1175 &oas->s->state.rs.initial_metadata,
1176 stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1177 grpc_core::ExecCtx::Run(
1178 DEBUG_LOCATION,
1179 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1180 GRPC_ERROR_NONE);
1181 }
1182 stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1183 result = ACTION_TAKEN_NO_CALLBACK;
1184 } else if (stream_op->recv_message &&
1185 op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1186 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1187 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1188 CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1189 grpc_core::ExecCtx::Run(
1190 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1191 GRPC_ERROR_NONE);
1192 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1193 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1194 result = ACTION_TAKEN_NO_CALLBACK;
1195 } else if (stream_state->state_callback_received[OP_FAILED]) {
1196 CRONET_LOG(GPR_DEBUG, "Stream failed.");
1197 grpc_core::ExecCtx::Run(
1198 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1199 GRPC_ERROR_NONE);
1200 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1201 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1202 result = ACTION_TAKEN_NO_CALLBACK;
1203 } else if (stream_state->rs.read_stream_closed == true) {
1204 /* No more data will be received */
1205 CRONET_LOG(GPR_DEBUG, "read stream closed");
1206 grpc_core::ExecCtx::Run(
1207 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1208 GRPC_ERROR_NONE);
1209 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1210 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1211 result = ACTION_TAKEN_NO_CALLBACK;
1212 } else if (stream_state->flush_read) {
1213 CRONET_LOG(GPR_DEBUG, "flush read");
1214 grpc_core::ExecCtx::Run(
1215 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1216 GRPC_ERROR_NONE);
1217 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1218 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1219 result = ACTION_TAKEN_NO_CALLBACK;
1220 } else if (stream_state->rs.length_field_received == false) {
1221 if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1222 stream_state->rs.remaining_bytes == 0) {
1223 /* Start a read operation for data */
1224 stream_state->rs.length_field_received = true;
1225 parse_grpc_header(
1226 reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1227 &stream_state->rs.length_field, &stream_state->rs.compressed);
1228 CRONET_LOG(GPR_DEBUG, "length field = %d",
1229 stream_state->rs.length_field);
1230 if (stream_state->rs.length_field > 0) {
1231 stream_state->rs.read_buffer = static_cast<char*>(
1232 gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1233 GPR_ASSERT(stream_state->rs.read_buffer);
1234 stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1235 stream_state->rs.received_bytes = 0;
1236 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1237 stream_state->state_op_done[OP_READ_REQ_MADE] =
1238 true; /* Indicates that at least one read request has been made */
1239 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1240 stream_state->rs.remaining_bytes);
1241 result = ACTION_TAKEN_WITH_CALLBACK;
1242 } else {
1243 stream_state->rs.remaining_bytes = 0;
1244 CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1245 /* Clean up read_slice_buffer in case there is unread data. */
1246 grpc_slice_buffer_destroy_internal(
1247 &stream_state->rs.read_slice_buffer);
1248 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1249 uint32_t flags = 0;
1250 if (stream_state->rs.compressed) {
1251 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1252 }
1253 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1254 stream_op->payload->recv_message.recv_message->reset(
1255 stream_state->rs.sbs.get());
1256 grpc_core::ExecCtx::Run(
1257 DEBUG_LOCATION,
1258 stream_op->payload->recv_message.recv_message_ready,
1259 GRPC_ERROR_NONE);
1260 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1261 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1262
1263 /* Extra read to trigger on_succeed */
1264 stream_state->rs.length_field_received = false;
1265 stream_state->state_op_done[OP_READ_REQ_MADE] =
1266 true; /* Indicates that at least one read request has been made */
1267 read_grpc_header(s);
1268 result = ACTION_TAKEN_NO_CALLBACK;
1269 }
1270 } else if (stream_state->rs.remaining_bytes == 0) {
1271 /* Start a read operation for first 5 bytes (GRPC header) */
1272 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1273 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1274 stream_state->rs.received_bytes = 0;
1275 stream_state->rs.compressed = false;
1276 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1277 stream_state->state_op_done[OP_READ_REQ_MADE] =
1278 true; /* Indicates that at least one read request has been made */
1279 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1280 stream_state->rs.remaining_bytes);
1281 result = ACTION_TAKEN_WITH_CALLBACK;
1282 } else {
1283 result = NO_ACTION_POSSIBLE;
1284 }
1285 } else if (stream_state->rs.remaining_bytes == 0) {
1286 CRONET_LOG(GPR_DEBUG, "read operation complete");
1287 grpc_slice read_data_slice =
1288 GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1289 uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1290 memcpy(dst_p, stream_state->rs.read_buffer,
1291 static_cast<size_t>(stream_state->rs.length_field));
1292 null_and_maybe_free_read_buffer(s);
1293 /* Clean up read_slice_buffer in case there is unread data. */
1294 grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1295 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1296 grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1297 read_data_slice);
1298 uint32_t flags = 0;
1299 if (stream_state->rs.compressed) {
1300 flags = GRPC_WRITE_INTERNAL_COMPRESS;
1301 }
1302 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1303 stream_op->payload->recv_message.recv_message->reset(
1304 stream_state->rs.sbs.get());
1305 grpc_core::ExecCtx::Run(
1306 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1307 GRPC_ERROR_NONE);
1308 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1309 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1310 /* Do an extra read to trigger on_succeeded() callback in case connection
1311 is closed */
1312 stream_state->rs.length_field_received = false;
1313 read_grpc_header(s);
1314 result = ACTION_TAKEN_NO_CALLBACK;
1315 }
1316 } else if (stream_op->recv_trailing_metadata &&
1317 op_can_be_run(stream_op, s, &oas->state,
1318 OP_RECV_TRAILING_METADATA)) {
1319 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1320 grpc_error* error = GRPC_ERROR_NONE;
1321 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1322 error = GRPC_ERROR_REF(stream_state->cancel_error);
1323 } else if (stream_state->state_callback_received[OP_FAILED]) {
1324 const char* desc = cronet_net_error_as_string(stream_state->net_error);
1325 error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE,
1326 stream_state->net_error, desc);
1327 } else if (oas->s->state.rs.trailing_metadata_valid) {
1328 grpc_chttp2_incoming_metadata_buffer_publish(
1329 &oas->s->state.rs.trailing_metadata,
1330 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1331 stream_state->rs.trailing_metadata_valid = false;
1332 }
1333 grpc_core::ExecCtx::Run(
1334 DEBUG_LOCATION,
1335 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1336 error);
1337 stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1338 result = ACTION_TAKEN_NO_CALLBACK;
1339 } else if (stream_op->cancel_stream &&
1340 op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1341 CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1342 if (s->cbs) {
1343 CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1344 bidirectional_stream_cancel(s->cbs);
1345 result = ACTION_TAKEN_WITH_CALLBACK;
1346 } else {
1347 result = ACTION_TAKEN_NO_CALLBACK;
1348 }
1349 stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1350 if (!stream_state->cancel_error) {
1351 stream_state->cancel_error =
1352 GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1353 }
1354 } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1355 CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1356 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1357 if (stream_op->on_complete) {
1358 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1359 GRPC_ERROR_REF(stream_state->cancel_error));
1360 }
1361 } else if (stream_state->state_callback_received[OP_FAILED]) {
1362 if (stream_op->on_complete) {
1363 const char* error_message =
1364 cronet_net_error_as_string(stream_state->net_error);
1365 grpc_core::ExecCtx::Run(
1366 DEBUG_LOCATION, stream_op->on_complete,
1367 make_error_with_desc(GRPC_STATUS_UNAVAILABLE,
1368 stream_state->net_error, error_message));
1369 }
1370 } else {
1371 /* All actions in this stream_op are complete. Call the on_complete
1372 * callback
1373 */
1374 if (stream_op->on_complete) {
1375 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1376 GRPC_ERROR_NONE);
1377 }
1378 }
1379 oas->state.state_op_done[OP_ON_COMPLETE] = true;
1380 oas->done = true;
1381 /* reset any send message state, only if this ON_COMPLETE is about a send.
1382 */
1383 if (stream_op->send_message) {
1384 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1385 stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1386 }
1387 result = ACTION_TAKEN_NO_CALLBACK;
1388 /* If this is the on_complete callback being called for a received message -
1389 make a note */
1390 if (stream_op->recv_message) {
1391 stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1392 }
1393 } else {
1394 result = NO_ACTION_POSSIBLE;
1395 }
1396 return result;
1397 }
1398
1399 /*
1400 Functions used by upper layers to access transport functionality.
1401 */
1402
stream_obj(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,grpc_core::Arena * arena)1403 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1404 grpc_stream_refcount* refcount,
1405 grpc_core::Arena* arena)
1406 : arena(arena),
1407 curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1408 curr_gs(gs),
1409 state(arena),
1410 refcount(refcount) {
1411 GRPC_CRONET_STREAM_REF(this, "cronet transport");
1412 gpr_mu_init(&mu);
1413 }
1414
~stream_obj()1415 inline stream_obj::~stream_obj() {
1416 null_and_maybe_free_read_buffer(this);
1417 /* Clean up read_slice_buffer in case there is unread data. */
1418 grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1419 GRPC_ERROR_UNREF(state.cancel_error);
1420 }
1421
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)1422 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1423 grpc_stream_refcount* refcount, const void* server_data,
1424 grpc_core::Arena* arena) {
1425 new (gs) stream_obj(gt, gs, refcount, arena);
1426 return 0;
1427 }
1428
set_pollset_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1429 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1430 grpc_pollset* pollset) {}
1431
set_pollset_set_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1432 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1433 grpc_pollset_set* pollset_set) {}
1434
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1435 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1436 grpc_transport_stream_op_batch* op) {
1437 CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1438 if (op->send_initial_metadata &&
1439 header_has_authority(op->payload->send_initial_metadata
1440 .send_initial_metadata->list.head)) {
1441 /* Cronet does not support :authority header field. We cancel the call when
1442 this field is present in metadata */
1443 if (op->recv_initial_metadata) {
1444 grpc_core::ExecCtx::Run(
1445 DEBUG_LOCATION,
1446 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1447 GRPC_ERROR_CANCELLED);
1448 }
1449 if (op->recv_message) {
1450 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1451 op->payload->recv_message.recv_message_ready,
1452 GRPC_ERROR_CANCELLED);
1453 }
1454 if (op->recv_trailing_metadata) {
1455 grpc_core::ExecCtx::Run(
1456 DEBUG_LOCATION,
1457 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1458 GRPC_ERROR_CANCELLED);
1459 }
1460 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1461 GRPC_ERROR_CANCELLED);
1462 return;
1463 }
1464 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1465 add_to_storage(s, op);
1466 execute_from_storage(s);
1467 }
1468
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1469 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1470 grpc_closure* then_schedule_closure) {
1471 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1472 s->~stream_obj();
1473 grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1474 GRPC_ERROR_NONE);
1475 }
1476
destroy_transport(grpc_transport * gt)1477 static void destroy_transport(grpc_transport* gt) {}
1478
get_endpoint(grpc_transport * gt)1479 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1480
perform_op(grpc_transport * gt,grpc_transport_op * op)1481 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1482
1483 static const grpc_transport_vtable grpc_cronet_vtable = {
1484 sizeof(stream_obj),
1485 "cronet_http",
1486 init_stream,
1487 set_pollset_do_nothing,
1488 set_pollset_set_do_nothing,
1489 perform_stream_op,
1490 perform_op,
1491 destroy_stream,
1492 destroy_transport,
1493 get_endpoint};
1494
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void * reserved)1495 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1496 const grpc_channel_args* args,
1497 void* reserved) {
1498 grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1499 gpr_malloc(sizeof(grpc_cronet_transport)));
1500 if (!ct) {
1501 goto error;
1502 }
1503 ct->base.vtable = &grpc_cronet_vtable;
1504 ct->engine = static_cast<stream_engine*>(engine);
1505 ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1506 if (!ct->host) {
1507 goto error;
1508 }
1509 strcpy(ct->host, target);
1510
1511 ct->use_packet_coalescing = true;
1512 if (args) {
1513 for (size_t i = 0; i < args->num_args; i++) {
1514 if (0 ==
1515 strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1516 if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1517 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1518 GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1519 } else {
1520 ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1521 }
1522 }
1523 }
1524 }
1525
1526 return &ct->base;
1527
1528 error:
1529 if (ct) {
1530 if (ct->host) {
1531 gpr_free(ct->host);
1532 }
1533 gpr_free(ct);
1534 }
1535
1536 return nullptr;
1537 }
1538