1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
27
28 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
29 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
30 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
31 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/gprpp/manual_constructor.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 #include "src/core/lib/surface/channel.h"
40 #include "src/core/lib/surface/validate_metadata.h"
41 #include "src/core/lib/transport/metadata_batch.h"
42 #include "src/core/lib/transport/static_metadata.h"
43 #include "src/core/lib/transport/timeout_encoding.h"
44 #include "src/core/lib/transport/transport_impl.h"
45
46 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
47
48 #define GRPC_HEADER_SIZE_IN_BYTES 5
49 #define GRPC_FLUSH_READ_SIZE 4096
50
51 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
52 #define CRONET_LOG(...) \
53 do { \
54 if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
55 } while (0)
56
57 enum e_op_result {
58 ACTION_TAKEN_WITH_CALLBACK,
59 ACTION_TAKEN_NO_CALLBACK,
60 NO_ACTION_POSSIBLE
61 };
62
63 enum e_op_id {
64 OP_SEND_INITIAL_METADATA = 0,
65 OP_SEND_MESSAGE,
66 OP_SEND_TRAILING_METADATA,
67 OP_RECV_MESSAGE,
68 OP_RECV_INITIAL_METADATA,
69 OP_RECV_TRAILING_METADATA,
70 OP_CANCEL_ERROR,
71 OP_ON_COMPLETE,
72 OP_FAILED,
73 OP_SUCCEEDED,
74 OP_CANCELED,
75 OP_RECV_MESSAGE_AND_ON_COMPLETE,
76 OP_READ_REQ_MADE,
77 OP_NUM_OPS
78 };
79
80 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
81
82 static void on_stream_ready(bidirectional_stream*);
83 static void on_response_headers_received(
84 bidirectional_stream*, const bidirectional_stream_header_array*,
85 const char*);
86 static void on_write_completed(bidirectional_stream*, const char*);
87 static void on_read_completed(bidirectional_stream*, char*, int);
88 static void on_response_trailers_received(
89 bidirectional_stream*, const bidirectional_stream_header_array*);
90 static void on_succeeded(bidirectional_stream*);
91 static void on_failed(bidirectional_stream*, int);
92 static void on_canceled(bidirectional_stream*);
93 static bidirectional_stream_callback cronet_callbacks = {
94 on_stream_ready,
95 on_response_headers_received,
96 on_read_completed,
97 on_write_completed,
98 on_response_trailers_received,
99 on_succeeded,
100 on_failed,
101 on_canceled};
102
103 /* Cronet transport object */
104 struct grpc_cronet_transport {
105 grpc_transport base; /* must be first element in this structure */
106 stream_engine* engine;
107 char* host;
108 bool use_packet_coalescing;
109 };
110 typedef struct grpc_cronet_transport grpc_cronet_transport;
111
112 /* TODO (makdharma): reorder structure for memory efficiency per
113 http://www.catb.org/esr/structure-packing/#_structure_reordering: */
114 struct read_state {
read_stateread_state115 read_state(grpc_core::Arena* arena)
116 : trailing_metadata(arena), initial_metadata(arena) {
117 grpc_slice_buffer_init(&read_slice_buffer);
118 }
119
120 /* vars to store data coming from server */
121 char* read_buffer = nullptr;
122 bool length_field_received = false;
123 int received_bytes = 0;
124 int remaining_bytes = 0;
125 int length_field = 0;
126 bool compressed = 0;
127 char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
128 char* payload_field = nullptr;
129 bool read_stream_closed = 0;
130
131 /* vars for holding data destined for the application */
132 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
133 grpc_slice_buffer read_slice_buffer;
134
135 /* vars for trailing metadata */
136 grpc_chttp2_incoming_metadata_buffer trailing_metadata;
137 bool trailing_metadata_valid = false;
138
139 /* vars for initial metadata */
140 grpc_chttp2_incoming_metadata_buffer initial_metadata;
141 };
142
143 struct write_state {
144 char* write_buffer = nullptr;
145 };
146
147 /* track state of one stream op */
148 struct op_state {
op_stateop_state149 op_state(grpc_core::Arena* arena) : rs(arena) {}
150
151 bool state_op_done[OP_NUM_OPS] = {};
152 bool state_callback_received[OP_NUM_OPS] = {};
153 /* A non-zero gRPC status code has been seen */
154 bool fail_state = false;
155 /* Transport is discarding all buffered messages */
156 bool flush_read = false;
157 bool flush_cronet_when_ready = false;
158 bool pending_write_for_trailer = false;
159 bool pending_send_message = false;
160 /* User requested RECV_TRAILING_METADATA */
161 bool pending_recv_trailing_metadata = false;
162 /* Cronet has not issued a callback of a bidirectional read */
163 bool pending_read_from_cronet = false;
164 grpc_error* cancel_error = GRPC_ERROR_NONE;
165 /* data structure for storing data coming from server */
166 struct read_state rs;
167 /* data structure for storing data going to the server */
168 struct write_state ws;
169 };
170
171 struct stream_obj;
172
173 struct op_and_state {
174 op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
175
176 grpc_transport_stream_op_batch op;
177 struct op_state state;
178 bool done = false;
179 struct stream_obj* s; /* Pointer back to the stream object */
180 /* next op_and_state in the linked list */
181 struct op_and_state* next = nullptr;
182 };
183
184 struct op_storage {
185 int num_pending_ops = 0;
186 struct op_and_state* head = nullptr;
187 };
188
189 struct stream_obj {
190 stream_obj(grpc_transport* gt, grpc_stream* gs,
191 grpc_stream_refcount* refcount, grpc_core::Arena* arena);
192 ~stream_obj();
193
194 grpc_core::Arena* arena;
195 struct op_and_state* oas = nullptr;
196 grpc_transport_stream_op_batch* curr_op = nullptr;
197 grpc_cronet_transport* curr_ct;
198 grpc_stream* curr_gs;
199 bidirectional_stream* cbs = nullptr;
200 bidirectional_stream_header_array header_array =
201 bidirectional_stream_header_array(); // Zero-initialize the structure.
202
203 /* Stream level state. Some state will be tracked both at stream and stream_op
204 * level */
205 struct op_state state;
206
207 /* OP storage */
208 struct op_storage storage;
209
210 /* Mutex to protect storage */
211 gpr_mu mu;
212
213 /* Refcount object of the stream */
214 grpc_stream_refcount* refcount;
215 };
216
217 #ifndef NDEBUG
218 #define GRPC_CRONET_STREAM_REF(stream, reason) \
219 grpc_cronet_stream_ref((stream), (reason))
220 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
221 grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)222 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
223 grpc_stream_ref(s->refcount, reason);
224 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)225 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
226 grpc_stream_unref(s->refcount, reason);
227 }
228 #else
229 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
230 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
231 grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)232 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)233 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
234 #endif
235
236 static enum e_op_result execute_stream_op(struct op_and_state* oas);
237
238 /*
239 Utility function to translate enum into string for printing
240 */
op_result_string(enum e_op_result i)241 static const char* op_result_string(enum e_op_result i) {
242 switch (i) {
243 case ACTION_TAKEN_WITH_CALLBACK:
244 return "ACTION_TAKEN_WITH_CALLBACK";
245 case ACTION_TAKEN_NO_CALLBACK:
246 return "ACTION_TAKEN_NO_CALLBACK";
247 case NO_ACTION_POSSIBLE:
248 return "NO_ACTION_POSSIBLE";
249 }
250 GPR_UNREACHABLE_CODE(return "UNKNOWN");
251 }
252
op_id_string(enum e_op_id i)253 static const char* op_id_string(enum e_op_id i) {
254 switch (i) {
255 case OP_SEND_INITIAL_METADATA:
256 return "OP_SEND_INITIAL_METADATA";
257 case OP_SEND_MESSAGE:
258 return "OP_SEND_MESSAGE";
259 case OP_SEND_TRAILING_METADATA:
260 return "OP_SEND_TRAILING_METADATA";
261 case OP_RECV_MESSAGE:
262 return "OP_RECV_MESSAGE";
263 case OP_RECV_INITIAL_METADATA:
264 return "OP_RECV_INITIAL_METADATA";
265 case OP_RECV_TRAILING_METADATA:
266 return "OP_RECV_TRAILING_METADATA";
267 case OP_CANCEL_ERROR:
268 return "OP_CANCEL_ERROR";
269 case OP_ON_COMPLETE:
270 return "OP_ON_COMPLETE";
271 case OP_FAILED:
272 return "OP_FAILED";
273 case OP_SUCCEEDED:
274 return "OP_SUCCEEDED";
275 case OP_CANCELED:
276 return "OP_CANCELED";
277 case OP_RECV_MESSAGE_AND_ON_COMPLETE:
278 return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
279 case OP_READ_REQ_MADE:
280 return "OP_READ_REQ_MADE";
281 case OP_NUM_OPS:
282 return "OP_NUM_OPS";
283 }
284 return "UNKNOWN";
285 }
286
null_and_maybe_free_read_buffer(stream_obj * s)287 static void null_and_maybe_free_read_buffer(stream_obj* s) {
288 if (s->state.rs.read_buffer &&
289 s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
290 gpr_free(s->state.rs.read_buffer);
291 }
292 s->state.rs.read_buffer = nullptr;
293 }
294
maybe_flush_read(stream_obj * s)295 static void maybe_flush_read(stream_obj* s) {
296 /* To enter flush read state (discarding all the buffered messages in
297 * transport layer), two conditions must be satisfied: 1) non-zero grpc status
298 * has been received, and 2) an op requesting the status code
299 * (RECV_TRAILING_METADATA) is issued by the user. (See
300 * doc/status_ordering.md) */
301 /* Whenever the evaluation of any of the two condition is changed, we check
302 * whether we should enter the flush read state. */
303 if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
304 if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
305 CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
306 s->state.flush_read = true;
307 null_and_maybe_free_read_buffer(s);
308 s->state.rs.read_buffer =
309 static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
310 if (!s->state.pending_read_from_cronet) {
311 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
312 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
313 GRPC_FLUSH_READ_SIZE);
314 s->state.pending_read_from_cronet = true;
315 }
316 }
317 }
318 }
319
make_error_with_desc(int error_code,const char * desc)320 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
321 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
322 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
323 return error;
324 }
325
op_and_state(stream_obj * s,const grpc_transport_stream_op_batch & op)326 inline op_and_state::op_and_state(stream_obj* s,
327 const grpc_transport_stream_op_batch& op)
328 : op(op), state(s->arena), s(s) {}
329
330 /*
331 Add a new stream op to op storage.
332 */
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)333 static void add_to_storage(struct stream_obj* s,
334 grpc_transport_stream_op_batch* op) {
335 struct op_storage* storage = &s->storage;
336 /* add new op at the beginning of the linked list. The memory is freed
337 in remove_from_storage */
338 op_and_state* new_op = new op_and_state(s, *op);
339 gpr_mu_lock(&s->mu);
340 new_op->next = storage->head;
341 storage->head = new_op;
342 storage->num_pending_ops++;
343 if (op->send_message) {
344 s->state.pending_send_message = true;
345 }
346 if (op->recv_trailing_metadata) {
347 s->state.pending_recv_trailing_metadata = true;
348 maybe_flush_read(s);
349 }
350 CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
351 storage->num_pending_ops);
352 gpr_mu_unlock(&s->mu);
353 }
354
355 /*
356 Traverse the linked list and delete op and free memory
357 */
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)358 static void remove_from_storage(struct stream_obj* s,
359 struct op_and_state* oas) {
360 struct op_and_state* curr;
361 if (s->storage.head == nullptr || oas == nullptr) {
362 return;
363 }
364 if (s->storage.head == oas) {
365 s->storage.head = oas->next;
366 delete oas;
367 s->storage.num_pending_ops--;
368 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
369 s->storage.num_pending_ops);
370 } else {
371 for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
372 if (curr->next == oas) {
373 curr->next = oas->next;
374 s->storage.num_pending_ops--;
375 CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
376 s->storage.num_pending_ops);
377 delete oas;
378 break;
379 } else if (GPR_UNLIKELY(curr->next == nullptr)) {
380 CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
381 }
382 }
383 }
384 }
385
386 /*
387 Cycle through ops and try to take next action. Break when either
388 an action with callback is taken, or no action is possible.
389 This can get executed from the Cronet network thread via cronet callback
390 or on the application supplied thread via the perform_stream_op function.
391 */
execute_from_storage(stream_obj * s)392 static void execute_from_storage(stream_obj* s) {
393 gpr_mu_lock(&s->mu);
394 for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
395 CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
396 GPR_ASSERT(!curr->done);
397 enum e_op_result result = execute_stream_op(curr);
398 CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
399 op_result_string(result));
400 /* if this op is done, then remove it and free memory */
401 if (curr->done) {
402 struct op_and_state* next = curr->next;
403 remove_from_storage(s, curr);
404 curr = next;
405 } else if (result == NO_ACTION_POSSIBLE) {
406 curr = curr->next;
407 } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
408 /* wait for the callback */
409 break;
410 } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
411 }
412 gpr_mu_unlock(&s->mu);
413 }
414
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_chttp2_incoming_metadata_buffer * mds)415 static void convert_cronet_array_to_metadata(
416 const bidirectional_stream_header_array* header_array,
417 grpc_chttp2_incoming_metadata_buffer* mds) {
418 for (size_t i = 0; i < header_array->count; i++) {
419 CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
420 header_array->headers[i].key, header_array->headers[i].value);
421 grpc_slice key = grpc_slice_intern(
422 grpc_slice_from_static_string(header_array->headers[i].key));
423 grpc_slice value;
424 if (grpc_is_refcounted_slice_binary_header(key)) {
425 value = grpc_slice_from_static_string(header_array->headers[i].value);
426 value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
427 value, grpc_chttp2_base64_infer_length_after_decode(value)));
428 } else {
429 value = grpc_slice_intern(
430 grpc_slice_from_static_string(header_array->headers[i].value));
431 }
432 GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
433 grpc_chttp2_incoming_metadata_buffer_add(
434 mds, grpc_mdelem_from_slices(key, value)));
435 }
436 }
437
438 /*
439 Cronet callback
440 */
on_failed(bidirectional_stream * stream,int net_error)441 static void on_failed(bidirectional_stream* stream, int net_error) {
442 gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
443 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
444 grpc_core::ExecCtx exec_ctx;
445
446 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
447 gpr_mu_lock(&s->mu);
448 bidirectional_stream_destroy(s->cbs);
449 s->state.state_callback_received[OP_FAILED] = true;
450 s->cbs = nullptr;
451 if (s->header_array.headers) {
452 gpr_free(s->header_array.headers);
453 s->header_array.headers = nullptr;
454 }
455 if (s->state.ws.write_buffer) {
456 gpr_free(s->state.ws.write_buffer);
457 s->state.ws.write_buffer = nullptr;
458 }
459 null_and_maybe_free_read_buffer(s);
460 gpr_mu_unlock(&s->mu);
461 execute_from_storage(s);
462 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
463 }
464
465 /*
466 Cronet callback
467 */
on_canceled(bidirectional_stream * stream)468 static void on_canceled(bidirectional_stream* stream) {
469 CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
470 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
471 grpc_core::ExecCtx exec_ctx;
472
473 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
474 gpr_mu_lock(&s->mu);
475 bidirectional_stream_destroy(s->cbs);
476 s->state.state_callback_received[OP_CANCELED] = true;
477 s->cbs = nullptr;
478 if (s->header_array.headers) {
479 gpr_free(s->header_array.headers);
480 s->header_array.headers = nullptr;
481 }
482 if (s->state.ws.write_buffer) {
483 gpr_free(s->state.ws.write_buffer);
484 s->state.ws.write_buffer = nullptr;
485 }
486 null_and_maybe_free_read_buffer(s);
487 gpr_mu_unlock(&s->mu);
488 execute_from_storage(s);
489 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
490 }
491
492 /*
493 Cronet callback
494 */
on_succeeded(bidirectional_stream * stream)495 static void on_succeeded(bidirectional_stream* stream) {
496 CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
497 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
498 grpc_core::ExecCtx exec_ctx;
499
500 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
501 gpr_mu_lock(&s->mu);
502 bidirectional_stream_destroy(s->cbs);
503 s->state.state_callback_received[OP_SUCCEEDED] = true;
504 s->cbs = nullptr;
505 null_and_maybe_free_read_buffer(s);
506 gpr_mu_unlock(&s->mu);
507 execute_from_storage(s);
508 GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
509 }
510
511 /*
512 Cronet callback
513 */
on_stream_ready(bidirectional_stream * stream)514 static void on_stream_ready(bidirectional_stream* stream) {
515 CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
516 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
517 grpc_core::ExecCtx exec_ctx;
518 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
519 grpc_cronet_transport* t = s->curr_ct;
520 gpr_mu_lock(&s->mu);
521 s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
522 s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
523 /* Free the memory allocated for headers */
524 if (s->header_array.headers) {
525 gpr_free(s->header_array.headers);
526 s->header_array.headers = nullptr;
527 }
528 /* Send the initial metadata on wire if there is no SEND_MESSAGE or
529 * SEND_TRAILING_METADATA ops pending */
530 if (t->use_packet_coalescing) {
531 if (s->state.flush_cronet_when_ready) {
532 CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
533 bidirectional_stream_flush(stream);
534 }
535 }
536 gpr_mu_unlock(&s->mu);
537 execute_from_storage(s);
538 }
539
540 /*
541 Cronet callback
542 */
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)543 static void on_response_headers_received(
544 bidirectional_stream* stream,
545 const bidirectional_stream_header_array* headers,
546 const char* negotiated_protocol) {
547 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
548 grpc_core::ExecCtx exec_ctx;
549 CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
550 headers, negotiated_protocol);
551 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
552
553 /* Identify if this is a header or a trailer (in a trailer-only response case)
554 */
555 for (size_t i = 0; i < headers->count; i++) {
556 if (0 == strcmp("grpc-status", headers->headers[i].key)) {
557 on_response_trailers_received(stream, headers);
558 return;
559 }
560 }
561
562 gpr_mu_lock(&s->mu);
563 convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
564 s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
565 if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
566 s->state.state_callback_received[OP_FAILED])) {
567 /* Do an extra read to trigger on_succeeded() callback in case connection
568 is closed */
569 GPR_ASSERT(s->state.rs.length_field_received == false);
570 s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
571 s->state.rs.compressed = false;
572 s->state.rs.received_bytes = 0;
573 s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
574 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
575 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
576 s->state.rs.remaining_bytes);
577 s->state.pending_read_from_cronet = true;
578 }
579 gpr_mu_unlock(&s->mu);
580 execute_from_storage(s);
581 }
582
583 /*
584 Cronet callback
585 */
on_write_completed(bidirectional_stream * stream,const char * data)586 static void on_write_completed(bidirectional_stream* stream, const char* data) {
587 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
588 grpc_core::ExecCtx exec_ctx;
589 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
590 CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
591 gpr_mu_lock(&s->mu);
592 if (s->state.ws.write_buffer) {
593 gpr_free(s->state.ws.write_buffer);
594 s->state.ws.write_buffer = nullptr;
595 }
596 s->state.state_callback_received[OP_SEND_MESSAGE] = true;
597 gpr_mu_unlock(&s->mu);
598 execute_from_storage(s);
599 }
600
601 /*
602 Cronet callback
603 */
on_read_completed(bidirectional_stream * stream,char * data,int count)604 static void on_read_completed(bidirectional_stream* stream, char* data,
605 int count) {
606 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
607 grpc_core::ExecCtx exec_ctx;
608 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
609 CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
610 count);
611 gpr_mu_lock(&s->mu);
612 s->state.pending_read_from_cronet = false;
613 s->state.state_callback_received[OP_RECV_MESSAGE] = true;
614 if (count > 0 && s->state.flush_read) {
615 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
616 bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
617 GRPC_FLUSH_READ_SIZE);
618 s->state.pending_read_from_cronet = true;
619 gpr_mu_unlock(&s->mu);
620 } else if (count > 0) {
621 s->state.rs.received_bytes += count;
622 s->state.rs.remaining_bytes -= count;
623 if (s->state.rs.remaining_bytes > 0) {
624 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
625 s->state.state_op_done[OP_READ_REQ_MADE] = true;
626 bidirectional_stream_read(
627 s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
628 s->state.rs.remaining_bytes);
629 s->state.pending_read_from_cronet = true;
630 gpr_mu_unlock(&s->mu);
631 } else {
632 gpr_mu_unlock(&s->mu);
633 execute_from_storage(s);
634 }
635 } else {
636 null_and_maybe_free_read_buffer(s);
637 s->state.rs.read_stream_closed = true;
638 gpr_mu_unlock(&s->mu);
639 execute_from_storage(s);
640 }
641 }
642
643 /*
644 Cronet callback
645 */
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)646 static void on_response_trailers_received(
647 bidirectional_stream* stream,
648 const bidirectional_stream_header_array* trailers) {
649 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
650 grpc_core::ExecCtx exec_ctx;
651 CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
652 trailers);
653 stream_obj* s = static_cast<stream_obj*>(stream->annotation);
654 grpc_cronet_transport* t = s->curr_ct;
655 gpr_mu_lock(&s->mu);
656 s->state.rs.trailing_metadata_valid = false;
657 convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
658 if (trailers->count > 0) {
659 s->state.rs.trailing_metadata_valid = true;
660 }
661 for (size_t i = 0; i < trailers->count; i++) {
662 if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
663 0 != strcmp(trailers->headers[i].value, "0")) {
664 s->state.fail_state = true;
665 maybe_flush_read(s);
666 }
667 }
668 s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
669 /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
670 * trigger on_succeeded */
671 if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
672 !(s->state.state_op_done[OP_CANCEL_ERROR] ||
673 s->state.state_callback_received[OP_FAILED])) {
674 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
675 s->state.state_callback_received[OP_SEND_MESSAGE] = false;
676 bidirectional_stream_write(s->cbs, "", 0, true);
677 if (t->use_packet_coalescing) {
678 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
679 bidirectional_stream_flush(s->cbs);
680 }
681 s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
682
683 gpr_mu_unlock(&s->mu);
684 } else {
685 gpr_mu_unlock(&s->mu);
686 execute_from_storage(s);
687 }
688 }
689
690 /*
691 Utility function that takes the data from s->write_slice_buffer and assembles
692 into a contiguous byte stream with 5 byte gRPC header prepended.
693 */
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)694 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
695 char** pp_write_buffer,
696 size_t* p_write_buffer_size, uint32_t flags) {
697 grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
698 size_t length = GRPC_SLICE_LENGTH(slice);
699 *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
700 /* This is freed in the on_write_completed callback */
701 char* write_buffer =
702 static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
703 *pp_write_buffer = write_buffer;
704 uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
705 /* Append 5 byte header */
706 /* Compressed flag */
707 *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
708 /* Message length */
709 *p++ = static_cast<uint8_t>(length >> 24);
710 *p++ = static_cast<uint8_t>(length >> 16);
711 *p++ = static_cast<uint8_t>(length >> 8);
712 *p++ = static_cast<uint8_t>(length);
713 /* append actual data */
714 memcpy(p, GRPC_SLICE_START_PTR(slice), length);
715 grpc_slice_unref_internal(slice);
716 }
717
718 /*
719 Convert metadata in a format that Cronet can consume
720 */
convert_metadata_to_cronet_headers(grpc_metadata_batch * metadata,const char * host,char ** pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)721 static void convert_metadata_to_cronet_headers(
722 grpc_metadata_batch* metadata, const char* host, char** pp_url,
723 bidirectional_stream_header** pp_headers, size_t* p_num_headers,
724 const char** method) {
725 grpc_linked_mdelem* curr = metadata->list.head;
726 /* Walk the linked list and get number of header fields */
727 size_t num_headers_available = 0;
728 while (curr != nullptr) {
729 curr = curr->next;
730 num_headers_available++;
731 }
732 grpc_millis deadline = metadata->deadline;
733 if (deadline != GRPC_MILLIS_INF_FUTURE) {
734 num_headers_available++;
735 }
736 /* Allocate enough memory. It is freed in the on_stream_ready callback
737 */
738 bidirectional_stream_header* headers =
739 static_cast<bidirectional_stream_header*>(gpr_malloc(
740 sizeof(bidirectional_stream_header) * num_headers_available));
741 *pp_headers = headers;
742
743 /* Walk the linked list again, this time copying the header fields.
744 s->num_headers can be less than num_headers_available, as some headers
745 are not used for cronet.
746 TODO (makdharma): Eliminate need to traverse the LL second time for perf.
747 */
748 curr = metadata->list.head;
749 size_t num_headers = 0;
750 while (num_headers < num_headers_available) {
751 grpc_mdelem mdelem = curr->md;
752 curr = curr->next;
753 char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
754 char* value;
755 if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
756 grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
757 value = grpc_slice_to_c_string(wire_value);
758 grpc_slice_unref_internal(wire_value);
759 } else {
760 value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
761 }
762 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
763 grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
764 GRPC_MDSTR_AUTHORITY)) {
765 /* Cronet populates these fields on its own */
766 gpr_free(key);
767 gpr_free(value);
768 continue;
769 }
770 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
771 if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
772 *method = "PUT";
773 } else {
774 /* POST method in default*/
775 *method = "POST";
776 }
777 gpr_free(key);
778 gpr_free(value);
779 continue;
780 }
781 if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
782 /* Create URL by appending :path value to the hostname */
783 gpr_asprintf(pp_url, "https://%s%s", host, value);
784 gpr_free(key);
785 gpr_free(value);
786 continue;
787 }
788 CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
789 headers[num_headers].key = key;
790 headers[num_headers].value = value;
791 num_headers++;
792 if (curr == nullptr) {
793 break;
794 }
795 }
796 if (deadline != GRPC_MILLIS_INF_FUTURE) {
797 char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
798 char* value =
799 static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
800 grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
801 value);
802 headers[num_headers].key = key;
803 headers[num_headers].value = value;
804
805 num_headers++;
806 }
807
808 *p_num_headers = num_headers;
809 }
810
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)811 static void parse_grpc_header(const uint8_t* data, int* length,
812 bool* compressed) {
813 const uint8_t c = *data;
814 const uint8_t* p = data + 1;
815 *compressed = ((c & 0x01) == 0x01);
816 *length = 0;
817 *length |= (*p++) << 24;
818 *length |= (*p++) << 16;
819 *length |= (*p++) << 8;
820 *length |= (*p++);
821 }
822
header_has_authority(grpc_linked_mdelem * head)823 static bool header_has_authority(grpc_linked_mdelem* head) {
824 while (head != nullptr) {
825 if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
826 GRPC_MDSTR_AUTHORITY)) {
827 return true;
828 }
829 head = head->next;
830 }
831 return false;
832 }
833
834 /*
835 Op Execution: Decide if one of the actions contained in the stream op can be
836 executed. This is the heart of the state machine.
837 */
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)838 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
839 struct stream_obj* s, struct op_state* op_state,
840 enum e_op_id op_id) {
841 struct op_state* stream_state = &s->state;
842 grpc_cronet_transport* t = s->curr_ct;
843 bool result = true;
844 /* When call is canceled, every op can be run, except under following
845 conditions
846 */
847 bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
848 stream_state->state_callback_received[OP_FAILED];
849 if (is_canceled_or_failed) {
850 if (op_id == OP_SEND_INITIAL_METADATA) {
851 CRONET_LOG(GPR_DEBUG, "Because");
852 result = false;
853 }
854 if (op_id == OP_SEND_MESSAGE) {
855 CRONET_LOG(GPR_DEBUG, "Because");
856 result = false;
857 }
858 if (op_id == OP_SEND_TRAILING_METADATA) {
859 CRONET_LOG(GPR_DEBUG, "Because");
860 result = false;
861 }
862 if (op_id == OP_CANCEL_ERROR) {
863 CRONET_LOG(GPR_DEBUG, "Because");
864 result = false;
865 }
866 /* already executed */
867 if (op_id == OP_RECV_INITIAL_METADATA &&
868 stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
869 CRONET_LOG(GPR_DEBUG, "Because");
870 result = false;
871 }
872 if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
873 CRONET_LOG(GPR_DEBUG, "Because");
874 result = false;
875 }
876 if (op_id == OP_RECV_TRAILING_METADATA &&
877 stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
878 CRONET_LOG(GPR_DEBUG, "Because");
879 result = false;
880 }
881 /* ON_COMPLETE can be processed if one of the following conditions is met:
882 * 1. the stream failed
883 * 2. the stream is cancelled, and the callback is received
884 * 3. the stream succeeded before cancel is effective
885 * 4. the stream is cancelled, and the stream is never started */
886 if (op_id == OP_ON_COMPLETE &&
887 !(stream_state->state_callback_received[OP_FAILED] ||
888 stream_state->state_callback_received[OP_CANCELED] ||
889 stream_state->state_callback_received[OP_SUCCEEDED] ||
890 !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
891 CRONET_LOG(GPR_DEBUG, "Because");
892 result = false;
893 }
894 } else if (op_id == OP_SEND_INITIAL_METADATA) {
895 /* already executed */
896 if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
897 } else if (op_id == OP_RECV_INITIAL_METADATA) {
898 /* already executed */
899 if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
900 /* we haven't sent headers yet. */
901 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
902 result = false;
903 /* we haven't received headers yet. */
904 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
905 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
906 result = false;
907 } else if (op_id == OP_SEND_MESSAGE) {
908 /* already executed (note we're checking op specific state, not stream
909 state) */
910 if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
911 /* we haven't sent headers yet. */
912 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
913 result = false;
914 } else if (op_id == OP_RECV_MESSAGE) {
915 /* already executed */
916 if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
917 /* we haven't received headers yet. */
918 else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
919 !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
920 result = false;
921 } else if (op_id == OP_RECV_TRAILING_METADATA) {
922 /* already executed */
923 if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
924 /* we have asked for but haven't received message yet. */
925 else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
926 !stream_state->state_op_done[OP_RECV_MESSAGE])
927 result = false;
928 /* we haven't received trailers yet. */
929 else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
930 result = false;
931 /* we haven't received on_succeeded yet. */
932 else if (!stream_state->state_callback_received[OP_SUCCEEDED])
933 result = false;
934 } else if (op_id == OP_SEND_TRAILING_METADATA) {
935 /* already executed */
936 if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
937 /* we haven't sent initial metadata yet */
938 else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
939 result = false;
940 /* we haven't sent message yet */
941 else if (stream_state->pending_send_message &&
942 !stream_state->state_op_done[OP_SEND_MESSAGE])
943 result = false;
944 /* we haven't got on_write_completed for the send yet */
945 else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
946 !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
947 !(t->use_packet_coalescing &&
948 stream_state->pending_write_for_trailer))
949 result = false;
950 } else if (op_id == OP_CANCEL_ERROR) {
951 /* already executed */
952 if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
953 } else if (op_id == OP_ON_COMPLETE) {
954 /* already executed (note we're checking op specific state, not stream
955 state) */
956 if (op_state->state_op_done[OP_ON_COMPLETE]) {
957 CRONET_LOG(GPR_DEBUG, "Because");
958 result = false;
959 }
960 /* Check if every op that was asked for is done. */
961 /* TODO(muxi): We should not consider the recv ops here, since they
962 * have their own callbacks. We should invoke a batch's on_complete
963 * as soon as all of the batch's send ops are complete, even if
964 * there are still recv ops pending. */
965 else if (curr_op->send_initial_metadata &&
966 !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
967 CRONET_LOG(GPR_DEBUG, "Because");
968 result = false;
969 } else if (curr_op->send_message &&
970 !op_state->state_op_done[OP_SEND_MESSAGE]) {
971 CRONET_LOG(GPR_DEBUG, "Because");
972 result = false;
973 } else if (curr_op->send_message &&
974 !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
975 CRONET_LOG(GPR_DEBUG, "Because");
976 result = false;
977 } else if (curr_op->send_trailing_metadata &&
978 !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
979 CRONET_LOG(GPR_DEBUG, "Because");
980 result = false;
981 } else if (curr_op->recv_initial_metadata &&
982 !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
983 CRONET_LOG(GPR_DEBUG, "Because");
984 result = false;
985 } else if (curr_op->recv_message &&
986 !op_state->state_op_done[OP_RECV_MESSAGE]) {
987 CRONET_LOG(GPR_DEBUG, "Because");
988 result = false;
989 } else if (curr_op->cancel_stream &&
990 !stream_state->state_callback_received[OP_CANCELED]) {
991 CRONET_LOG(GPR_DEBUG, "Because");
992 result = false;
993 } else if (curr_op->recv_trailing_metadata) {
994 /* We aren't done with trailing metadata yet */
995 if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
996 CRONET_LOG(GPR_DEBUG, "Because");
997 result = false;
998 }
999 /* We've asked for actual message in an earlier op, and it hasn't been
1000 delivered yet. */
1001 else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1002 /* If this op is not the one asking for read, (which means some earlier
1003 op has asked), and the read hasn't been delivered. */
1004 if (!curr_op->recv_message &&
1005 !stream_state->state_callback_received[OP_SUCCEEDED]) {
1006 CRONET_LOG(GPR_DEBUG, "Because");
1007 result = false;
1008 }
1009 }
1010 }
1011 /* We should see at least one on_write_completed for the trailers that we
1012 sent */
1013 else if (curr_op->send_trailing_metadata &&
1014 !stream_state->state_callback_received[OP_SEND_MESSAGE])
1015 result = false;
1016 }
1017 CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1018 result ? "YES" : "NO");
1019 return result;
1020 }
1021
1022 /*
1023 TODO (makdharma): Break down this function in smaller chunks for readability.
1024 */
execute_stream_op(struct op_and_state * oas)1025 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1026 grpc_transport_stream_op_batch* stream_op = &oas->op;
1027 struct stream_obj* s = oas->s;
1028 grpc_cronet_transport* t = s->curr_ct;
1029 struct op_state* stream_state = &s->state;
1030 enum e_op_result result = NO_ACTION_POSSIBLE;
1031 if (stream_op->send_initial_metadata &&
1032 op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1033 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1034 /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1035 * on_failed */
1036 GPR_ASSERT(s->cbs == nullptr);
1037 GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1038 s->cbs =
1039 bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1040 CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1041 if (t->use_packet_coalescing) {
1042 bidirectional_stream_disable_auto_flush(s->cbs, true);
1043 bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1044 }
1045 char* url = nullptr;
1046 const char* method = "POST";
1047 s->header_array.headers = nullptr;
1048 convert_metadata_to_cronet_headers(
1049 stream_op->payload->send_initial_metadata.send_initial_metadata,
1050 t->host, &url, &s->header_array.headers, &s->header_array.count,
1051 &method);
1052 s->header_array.capacity = s->header_array.count;
1053 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
1054 bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
1055 if (url) {
1056 gpr_free(url);
1057 }
1058 unsigned int header_index;
1059 for (header_index = 0; header_index < s->header_array.count;
1060 header_index++) {
1061 gpr_free((void*)s->header_array.headers[header_index].key);
1062 gpr_free((void*)s->header_array.headers[header_index].value);
1063 }
1064 stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1065 if (t->use_packet_coalescing) {
1066 if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1067 s->state.flush_cronet_when_ready = true;
1068 }
1069 }
1070 result = ACTION_TAKEN_WITH_CALLBACK;
1071 } else if (stream_op->send_message &&
1072 op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1073 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
1074 stream_state->pending_send_message = false;
1075 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1076 stream_state->state_callback_received[OP_FAILED] ||
1077 stream_state->state_callback_received[OP_SUCCEEDED]) {
1078 result = NO_ACTION_POSSIBLE;
1079 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1080 } else {
1081 grpc_slice_buffer write_slice_buffer;
1082 grpc_slice slice;
1083 grpc_slice_buffer_init(&write_slice_buffer);
1084 if (1 != stream_op->payload->send_message.send_message->Next(
1085 stream_op->payload->send_message.send_message->length(),
1086 nullptr)) {
1087 /* Should never reach here */
1088 GPR_ASSERT(false);
1089 }
1090 if (GRPC_ERROR_NONE !=
1091 stream_op->payload->send_message.send_message->Pull(&slice)) {
1092 /* Should never reach here */
1093 GPR_ASSERT(false);
1094 }
1095 grpc_slice_buffer_add(&write_slice_buffer, slice);
1096 if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1097 /* Empty request not handled yet */
1098 gpr_log(GPR_ERROR, "Empty request is not supported");
1099 GPR_ASSERT(write_slice_buffer.count == 1);
1100 }
1101 if (write_slice_buffer.count > 0) {
1102 size_t write_buffer_size;
1103 create_grpc_frame(
1104 &write_slice_buffer, &stream_state->ws.write_buffer,
1105 &write_buffer_size,
1106 stream_op->payload->send_message.send_message->flags());
1107 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1108 stream_state->ws.write_buffer);
1109 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1110 bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1111 static_cast<int>(write_buffer_size), false);
1112 grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1113 if (t->use_packet_coalescing) {
1114 if (!stream_op->send_trailing_metadata) {
1115 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1116 bidirectional_stream_flush(s->cbs);
1117 result = ACTION_TAKEN_WITH_CALLBACK;
1118 } else {
1119 stream_state->pending_write_for_trailer = true;
1120 result = ACTION_TAKEN_NO_CALLBACK;
1121 }
1122 } else {
1123 result = ACTION_TAKEN_WITH_CALLBACK;
1124 }
1125 } else {
1126 result = NO_ACTION_POSSIBLE;
1127 }
1128 }
1129 stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1130 oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1131 stream_op->payload->send_message.send_message.reset();
1132 } else if (stream_op->send_trailing_metadata &&
1133 op_can_be_run(stream_op, s, &oas->state,
1134 OP_SEND_TRAILING_METADATA)) {
1135 CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
1136 if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1137 stream_state->state_callback_received[OP_FAILED] ||
1138 stream_state->state_callback_received[OP_SUCCEEDED]) {
1139 result = NO_ACTION_POSSIBLE;
1140 CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1141 } else {
1142 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1143 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1144 bidirectional_stream_write(s->cbs, "", 0, true);
1145 if (t->use_packet_coalescing) {
1146 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1147 bidirectional_stream_flush(s->cbs);
1148 }
1149 result = ACTION_TAKEN_WITH_CALLBACK;
1150 }
1151 stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1152 } else if (stream_op->recv_initial_metadata &&
1153 op_can_be_run(stream_op, s, &oas->state,
1154 OP_RECV_INITIAL_METADATA)) {
1155 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
1156 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1157 grpc_core::ExecCtx::Run(
1158 DEBUG_LOCATION,
1159 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1160 GRPC_ERROR_NONE);
1161 } else if (stream_state->state_callback_received[OP_FAILED]) {
1162 grpc_core::ExecCtx::Run(
1163 DEBUG_LOCATION,
1164 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1165 GRPC_ERROR_NONE);
1166 } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1167 grpc_core::ExecCtx::Run(
1168 DEBUG_LOCATION,
1169 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1170 GRPC_ERROR_NONE);
1171 } else {
1172 grpc_chttp2_incoming_metadata_buffer_publish(
1173 &oas->s->state.rs.initial_metadata,
1174 stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1175 grpc_core::ExecCtx::Run(
1176 DEBUG_LOCATION,
1177 stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1178 GRPC_ERROR_NONE);
1179 }
1180 stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1181 result = ACTION_TAKEN_NO_CALLBACK;
1182 } else if (stream_op->recv_message &&
1183 op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1184 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
1185 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1186 CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1187 grpc_core::ExecCtx::Run(
1188 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1189 GRPC_ERROR_NONE);
1190 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1191 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1192 result = ACTION_TAKEN_NO_CALLBACK;
1193 } else if (stream_state->state_callback_received[OP_FAILED]) {
1194 CRONET_LOG(GPR_DEBUG, "Stream failed.");
1195 grpc_core::ExecCtx::Run(
1196 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1197 GRPC_ERROR_NONE);
1198 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1199 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1200 result = ACTION_TAKEN_NO_CALLBACK;
1201 } else if (stream_state->rs.read_stream_closed == true) {
1202 /* No more data will be received */
1203 CRONET_LOG(GPR_DEBUG, "read stream closed");
1204 grpc_core::ExecCtx::Run(
1205 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1206 GRPC_ERROR_NONE);
1207 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1208 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1209 result = ACTION_TAKEN_NO_CALLBACK;
1210 } else if (stream_state->flush_read) {
1211 CRONET_LOG(GPR_DEBUG, "flush read");
1212 grpc_core::ExecCtx::Run(
1213 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1214 GRPC_ERROR_NONE);
1215 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1216 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1217 result = ACTION_TAKEN_NO_CALLBACK;
1218 } else if (stream_state->rs.length_field_received == false) {
1219 if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1220 stream_state->rs.remaining_bytes == 0) {
1221 /* Start a read operation for data */
1222 stream_state->rs.length_field_received = true;
1223 parse_grpc_header(
1224 reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1225 &stream_state->rs.length_field, &stream_state->rs.compressed);
1226 CRONET_LOG(GPR_DEBUG, "length field = %d",
1227 stream_state->rs.length_field);
1228 if (stream_state->rs.length_field > 0) {
1229 stream_state->rs.read_buffer = static_cast<char*>(
1230 gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1231 GPR_ASSERT(stream_state->rs.read_buffer);
1232 stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1233 stream_state->rs.received_bytes = 0;
1234 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1235 stream_state->state_op_done[OP_READ_REQ_MADE] =
1236 true; /* Indicates that at least one read request has been made */
1237 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1238 stream_state->rs.remaining_bytes);
1239 stream_state->pending_read_from_cronet = true;
1240 result = ACTION_TAKEN_WITH_CALLBACK;
1241 } else {
1242 stream_state->rs.remaining_bytes = 0;
1243 CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1244 /* Clean up read_slice_buffer in case there is unread data. */
1245 grpc_slice_buffer_destroy_internal(
1246 &stream_state->rs.read_slice_buffer);
1247 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1248 uint32_t flags = 0;
1249 if (stream_state->rs.compressed) {
1250 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1251 }
1252 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1253 stream_op->payload->recv_message.recv_message->reset(
1254 stream_state->rs.sbs.get());
1255 grpc_core::ExecCtx::Run(
1256 DEBUG_LOCATION,
1257 stream_op->payload->recv_message.recv_message_ready,
1258 GRPC_ERROR_NONE);
1259 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1260 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1261
1262 /* Extra read to trigger on_succeed */
1263 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1264 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1265 stream_state->rs.received_bytes = 0;
1266 stream_state->rs.compressed = false;
1267 stream_state->rs.length_field_received = false;
1268 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1269 stream_state->state_op_done[OP_READ_REQ_MADE] =
1270 true; /* Indicates that at least one read request has been made */
1271 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1272 stream_state->rs.remaining_bytes);
1273 stream_state->pending_read_from_cronet = true;
1274 result = ACTION_TAKEN_NO_CALLBACK;
1275 }
1276 } else if (stream_state->rs.remaining_bytes == 0) {
1277 /* Start a read operation for first 5 bytes (GRPC header) */
1278 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1279 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1280 stream_state->rs.received_bytes = 0;
1281 stream_state->rs.compressed = false;
1282 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1283 stream_state->state_op_done[OP_READ_REQ_MADE] =
1284 true; /* Indicates that at least one read request has been made */
1285 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1286 stream_state->rs.remaining_bytes);
1287 stream_state->pending_read_from_cronet = true;
1288 result = ACTION_TAKEN_WITH_CALLBACK;
1289 } else {
1290 result = NO_ACTION_POSSIBLE;
1291 }
1292 } else if (stream_state->rs.remaining_bytes == 0) {
1293 CRONET_LOG(GPR_DEBUG, "read operation complete");
1294 grpc_slice read_data_slice =
1295 GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1296 uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1297 memcpy(dst_p, stream_state->rs.read_buffer,
1298 static_cast<size_t>(stream_state->rs.length_field));
1299 null_and_maybe_free_read_buffer(s);
1300 /* Clean up read_slice_buffer in case there is unread data. */
1301 grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1302 grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1303 grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1304 read_data_slice);
1305 uint32_t flags = 0;
1306 if (stream_state->rs.compressed) {
1307 flags = GRPC_WRITE_INTERNAL_COMPRESS;
1308 }
1309 stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1310 stream_op->payload->recv_message.recv_message->reset(
1311 stream_state->rs.sbs.get());
1312 grpc_core::ExecCtx::Run(
1313 DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1314 GRPC_ERROR_NONE);
1315 stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1316 oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1317 /* Do an extra read to trigger on_succeeded() callback in case connection
1318 is closed */
1319 stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1320 stream_state->rs.compressed = false;
1321 stream_state->rs.received_bytes = 0;
1322 stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1323 stream_state->rs.length_field_received = false;
1324 CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1325 bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1326 stream_state->rs.remaining_bytes);
1327 stream_state->pending_read_from_cronet = true;
1328 result = ACTION_TAKEN_NO_CALLBACK;
1329 }
1330 } else if (stream_op->recv_trailing_metadata &&
1331 op_can_be_run(stream_op, s, &oas->state,
1332 OP_RECV_TRAILING_METADATA)) {
1333 CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
1334 grpc_error* error = GRPC_ERROR_NONE;
1335 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1336 error = GRPC_ERROR_REF(stream_state->cancel_error);
1337 } else if (stream_state->state_callback_received[OP_FAILED]) {
1338 error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1339 } else if (oas->s->state.rs.trailing_metadata_valid) {
1340 grpc_chttp2_incoming_metadata_buffer_publish(
1341 &oas->s->state.rs.trailing_metadata,
1342 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1343 stream_state->rs.trailing_metadata_valid = false;
1344 }
1345 grpc_core::ExecCtx::Run(
1346 DEBUG_LOCATION,
1347 stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1348 error);
1349 stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1350 result = ACTION_TAKEN_NO_CALLBACK;
1351 } else if (stream_op->cancel_stream &&
1352 op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1353 CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
1354 if (s->cbs) {
1355 CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1356 bidirectional_stream_cancel(s->cbs);
1357 result = ACTION_TAKEN_WITH_CALLBACK;
1358 } else {
1359 result = ACTION_TAKEN_NO_CALLBACK;
1360 }
1361 stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1362 if (!stream_state->cancel_error) {
1363 stream_state->cancel_error =
1364 GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1365 }
1366 } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1367 CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
1368 if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1369 if (stream_op->on_complete) {
1370 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1371 GRPC_ERROR_REF(stream_state->cancel_error));
1372 }
1373 } else if (stream_state->state_callback_received[OP_FAILED]) {
1374 if (stream_op->on_complete) {
1375 grpc_core::ExecCtx::Run(
1376 DEBUG_LOCATION, stream_op->on_complete,
1377 make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1378 }
1379 } else {
1380 /* All actions in this stream_op are complete. Call the on_complete
1381 * callback
1382 */
1383 if (stream_op->on_complete) {
1384 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1385 GRPC_ERROR_NONE);
1386 }
1387 }
1388 oas->state.state_op_done[OP_ON_COMPLETE] = true;
1389 oas->done = true;
1390 /* reset any send message state, only if this ON_COMPLETE is about a send.
1391 */
1392 if (stream_op->send_message) {
1393 stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1394 stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1395 }
1396 result = ACTION_TAKEN_NO_CALLBACK;
1397 /* If this is the on_complete callback being called for a received message -
1398 make a note */
1399 if (stream_op->recv_message)
1400 stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1401 } else {
1402 result = NO_ACTION_POSSIBLE;
1403 }
1404 return result;
1405 }
1406
1407 /*
1408 Functions used by upper layers to access transport functionality.
1409 */
1410
stream_obj(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,grpc_core::Arena * arena)1411 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1412 grpc_stream_refcount* refcount,
1413 grpc_core::Arena* arena)
1414 : arena(arena),
1415 curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1416 curr_gs(gs),
1417 state(arena),
1418 refcount(refcount) {
1419 GRPC_CRONET_STREAM_REF(this, "cronet transport");
1420 gpr_mu_init(&mu);
1421 }
1422
~stream_obj()1423 inline stream_obj::~stream_obj() {
1424 null_and_maybe_free_read_buffer(this);
1425 /* Clean up read_slice_buffer in case there is unread data. */
1426 grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1427 GRPC_ERROR_UNREF(state.cancel_error);
1428 }
1429
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)1430 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1431 grpc_stream_refcount* refcount, const void* server_data,
1432 grpc_core::Arena* arena) {
1433 new (gs) stream_obj(gt, gs, refcount, arena);
1434 return 0;
1435 }
1436
set_pollset_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1437 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1438 grpc_pollset* pollset) {}
1439
set_pollset_set_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1440 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1441 grpc_pollset_set* pollset_set) {}
1442
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1443 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1444 grpc_transport_stream_op_batch* op) {
1445 CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1446 if (op->send_initial_metadata &&
1447 header_has_authority(op->payload->send_initial_metadata
1448 .send_initial_metadata->list.head)) {
1449 /* Cronet does not support :authority header field. We cancel the call when
1450 this field is present in metadata */
1451 if (op->recv_initial_metadata) {
1452 grpc_core::ExecCtx::Run(
1453 DEBUG_LOCATION,
1454 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1455 GRPC_ERROR_CANCELLED);
1456 }
1457 if (op->recv_message) {
1458 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1459 op->payload->recv_message.recv_message_ready,
1460 GRPC_ERROR_CANCELLED);
1461 }
1462 if (op->recv_trailing_metadata) {
1463 grpc_core::ExecCtx::Run(
1464 DEBUG_LOCATION,
1465 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1466 GRPC_ERROR_CANCELLED);
1467 }
1468 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1469 GRPC_ERROR_CANCELLED);
1470 return;
1471 }
1472 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1473 add_to_storage(s, op);
1474 execute_from_storage(s);
1475 }
1476
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1477 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1478 grpc_closure* then_schedule_closure) {
1479 stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1480 s->~stream_obj();
1481 grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1482 GRPC_ERROR_NONE);
1483 }
1484
destroy_transport(grpc_transport * gt)1485 static void destroy_transport(grpc_transport* gt) {}
1486
get_endpoint(grpc_transport * gt)1487 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1488
perform_op(grpc_transport * gt,grpc_transport_op * op)1489 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1490
1491 static const grpc_transport_vtable grpc_cronet_vtable = {
1492 sizeof(stream_obj),
1493 "cronet_http",
1494 init_stream,
1495 set_pollset_do_nothing,
1496 set_pollset_set_do_nothing,
1497 perform_stream_op,
1498 perform_op,
1499 destroy_stream,
1500 destroy_transport,
1501 get_endpoint};
1502
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void * reserved)1503 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1504 const grpc_channel_args* args,
1505 void* reserved) {
1506 grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1507 gpr_malloc(sizeof(grpc_cronet_transport)));
1508 if (!ct) {
1509 goto error;
1510 }
1511 ct->base.vtable = &grpc_cronet_vtable;
1512 ct->engine = static_cast<stream_engine*>(engine);
1513 ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1514 if (!ct->host) {
1515 goto error;
1516 }
1517 strcpy(ct->host, target);
1518
1519 ct->use_packet_coalescing = true;
1520 if (args) {
1521 for (size_t i = 0; i < args->num_args; i++) {
1522 if (0 ==
1523 strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1524 if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1525 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1526 GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1527 } else {
1528 ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1529 }
1530 }
1531 }
1532 }
1533
1534 return &ct->base;
1535
1536 error:
1537 if (ct) {
1538 if (ct->host) {
1539 gpr_free(ct->host);
1540 }
1541 gpr_free(ct);
1542 }
1543
1544 return nullptr;
1545 }
1546