1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <string.h>
22 
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
27 
28 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
29 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
30 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
31 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/gprpp/manual_constructor.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 #include "src/core/lib/surface/channel.h"
40 #include "src/core/lib/surface/validate_metadata.h"
41 #include "src/core/lib/transport/metadata_batch.h"
42 #include "src/core/lib/transport/static_metadata.h"
43 #include "src/core/lib/transport/timeout_encoding.h"
44 #include "src/core/lib/transport/transport_impl.h"
45 
46 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
47 
48 #define GRPC_HEADER_SIZE_IN_BYTES 5
49 #define GRPC_FLUSH_READ_SIZE 4096
50 
51 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
52 #define CRONET_LOG(...)                                    \
53   do {                                                     \
54     if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
55   } while (0)
56 
57 enum e_op_result {
58   ACTION_TAKEN_WITH_CALLBACK,
59   ACTION_TAKEN_NO_CALLBACK,
60   NO_ACTION_POSSIBLE
61 };
62 
63 enum e_op_id {
64   OP_SEND_INITIAL_METADATA = 0,
65   OP_SEND_MESSAGE,
66   OP_SEND_TRAILING_METADATA,
67   OP_RECV_MESSAGE,
68   OP_RECV_INITIAL_METADATA,
69   OP_RECV_TRAILING_METADATA,
70   OP_CANCEL_ERROR,
71   OP_ON_COMPLETE,
72   OP_FAILED,
73   OP_SUCCEEDED,
74   OP_CANCELED,
75   OP_RECV_MESSAGE_AND_ON_COMPLETE,
76   OP_READ_REQ_MADE,
77   OP_NUM_OPS
78 };
79 
80 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
81 
82 static void on_stream_ready(bidirectional_stream*);
83 static void on_response_headers_received(
84     bidirectional_stream*, const bidirectional_stream_header_array*,
85     const char*);
86 static void on_write_completed(bidirectional_stream*, const char*);
87 static void on_read_completed(bidirectional_stream*, char*, int);
88 static void on_response_trailers_received(
89     bidirectional_stream*, const bidirectional_stream_header_array*);
90 static void on_succeeded(bidirectional_stream*);
91 static void on_failed(bidirectional_stream*, int);
92 static void on_canceled(bidirectional_stream*);
93 static bidirectional_stream_callback cronet_callbacks = {
94     on_stream_ready,
95     on_response_headers_received,
96     on_read_completed,
97     on_write_completed,
98     on_response_trailers_received,
99     on_succeeded,
100     on_failed,
101     on_canceled};
102 
103 /* Cronet transport object */
104 struct grpc_cronet_transport {
105   grpc_transport base; /* must be first element in this structure */
106   stream_engine* engine;
107   char* host;
108   bool use_packet_coalescing;
109 };
110 typedef struct grpc_cronet_transport grpc_cronet_transport;
111 
112 /* TODO (makdharma): reorder structure for memory efficiency per
113    http://www.catb.org/esr/structure-packing/#_structure_reordering: */
114 struct read_state {
read_stateread_state115   read_state(grpc_core::Arena* arena)
116       : trailing_metadata(arena), initial_metadata(arena) {
117     grpc_slice_buffer_init(&read_slice_buffer);
118   }
119 
120   /* vars to store data coming from server */
121   char* read_buffer = nullptr;
122   bool length_field_received = false;
123   int received_bytes = 0;
124   int remaining_bytes = 0;
125   int length_field = 0;
126   bool compressed = 0;
127   char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
128   char* payload_field = nullptr;
129   bool read_stream_closed = 0;
130 
131   /* vars for holding data destined for the application */
132   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
133   grpc_slice_buffer read_slice_buffer;
134 
135   /* vars for trailing metadata */
136   grpc_chttp2_incoming_metadata_buffer trailing_metadata;
137   bool trailing_metadata_valid = false;
138 
139   /* vars for initial metadata */
140   grpc_chttp2_incoming_metadata_buffer initial_metadata;
141 };
142 
143 struct write_state {
144   char* write_buffer = nullptr;
145 };
146 
147 /* track state of one stream op */
148 struct op_state {
op_stateop_state149   op_state(grpc_core::Arena* arena) : rs(arena) {}
150 
151   bool state_op_done[OP_NUM_OPS] = {};
152   bool state_callback_received[OP_NUM_OPS] = {};
153   /* A non-zero gRPC status code has been seen */
154   bool fail_state = false;
155   /* Transport is discarding all buffered messages */
156   bool flush_read = false;
157   bool flush_cronet_when_ready = false;
158   bool pending_write_for_trailer = false;
159   bool pending_send_message = false;
160   /* User requested RECV_TRAILING_METADATA */
161   bool pending_recv_trailing_metadata = false;
162   /* Cronet has not issued a callback of a bidirectional read */
163   bool pending_read_from_cronet = false;
164   grpc_error* cancel_error = GRPC_ERROR_NONE;
165   /* data structure for storing data coming from server */
166   struct read_state rs;
167   /* data structure for storing data going to the server */
168   struct write_state ws;
169 };
170 
171 struct stream_obj;
172 
173 struct op_and_state {
174   op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
175 
176   grpc_transport_stream_op_batch op;
177   struct op_state state;
178   bool done = false;
179   struct stream_obj* s; /* Pointer back to the stream object */
180   /* next op_and_state in the linked list */
181   struct op_and_state* next = nullptr;
182 };
183 
184 struct op_storage {
185   int num_pending_ops = 0;
186   struct op_and_state* head = nullptr;
187 };
188 
189 struct stream_obj {
190   stream_obj(grpc_transport* gt, grpc_stream* gs,
191              grpc_stream_refcount* refcount, grpc_core::Arena* arena);
192   ~stream_obj();
193 
194   grpc_core::Arena* arena;
195   struct op_and_state* oas = nullptr;
196   grpc_transport_stream_op_batch* curr_op = nullptr;
197   grpc_cronet_transport* curr_ct;
198   grpc_stream* curr_gs;
199   bidirectional_stream* cbs = nullptr;
200   bidirectional_stream_header_array header_array =
201       bidirectional_stream_header_array();  // Zero-initialize the structure.
202 
203   /* Stream level state. Some state will be tracked both at stream and stream_op
204    * level */
205   struct op_state state;
206 
207   /* OP storage */
208   struct op_storage storage;
209 
210   /* Mutex to protect storage */
211   gpr_mu mu;
212 
213   /* Refcount object of the stream */
214   grpc_stream_refcount* refcount;
215 };
216 
217 #ifndef NDEBUG
218 #define GRPC_CRONET_STREAM_REF(stream, reason) \
219   grpc_cronet_stream_ref((stream), (reason))
220 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
221   grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)222 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
223   grpc_stream_ref(s->refcount, reason);
224 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)225 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
226   grpc_stream_unref(s->refcount, reason);
227 }
228 #else
229 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
230 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
231   grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)232 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)233 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
234 #endif
235 
236 static enum e_op_result execute_stream_op(struct op_and_state* oas);
237 
238 /*
239   Utility function to translate enum into string for printing
240 */
op_result_string(enum e_op_result i)241 static const char* op_result_string(enum e_op_result i) {
242   switch (i) {
243     case ACTION_TAKEN_WITH_CALLBACK:
244       return "ACTION_TAKEN_WITH_CALLBACK";
245     case ACTION_TAKEN_NO_CALLBACK:
246       return "ACTION_TAKEN_NO_CALLBACK";
247     case NO_ACTION_POSSIBLE:
248       return "NO_ACTION_POSSIBLE";
249   }
250   GPR_UNREACHABLE_CODE(return "UNKNOWN");
251 }
252 
op_id_string(enum e_op_id i)253 static const char* op_id_string(enum e_op_id i) {
254   switch (i) {
255     case OP_SEND_INITIAL_METADATA:
256       return "OP_SEND_INITIAL_METADATA";
257     case OP_SEND_MESSAGE:
258       return "OP_SEND_MESSAGE";
259     case OP_SEND_TRAILING_METADATA:
260       return "OP_SEND_TRAILING_METADATA";
261     case OP_RECV_MESSAGE:
262       return "OP_RECV_MESSAGE";
263     case OP_RECV_INITIAL_METADATA:
264       return "OP_RECV_INITIAL_METADATA";
265     case OP_RECV_TRAILING_METADATA:
266       return "OP_RECV_TRAILING_METADATA";
267     case OP_CANCEL_ERROR:
268       return "OP_CANCEL_ERROR";
269     case OP_ON_COMPLETE:
270       return "OP_ON_COMPLETE";
271     case OP_FAILED:
272       return "OP_FAILED";
273     case OP_SUCCEEDED:
274       return "OP_SUCCEEDED";
275     case OP_CANCELED:
276       return "OP_CANCELED";
277     case OP_RECV_MESSAGE_AND_ON_COMPLETE:
278       return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
279     case OP_READ_REQ_MADE:
280       return "OP_READ_REQ_MADE";
281     case OP_NUM_OPS:
282       return "OP_NUM_OPS";
283   }
284   return "UNKNOWN";
285 }
286 
null_and_maybe_free_read_buffer(stream_obj * s)287 static void null_and_maybe_free_read_buffer(stream_obj* s) {
288   if (s->state.rs.read_buffer &&
289       s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
290     gpr_free(s->state.rs.read_buffer);
291   }
292   s->state.rs.read_buffer = nullptr;
293 }
294 
maybe_flush_read(stream_obj * s)295 static void maybe_flush_read(stream_obj* s) {
296   /* To enter flush read state (discarding all the buffered messages in
297    * transport layer), two conditions must be satisfied: 1) non-zero grpc status
298    * has been received, and 2) an op requesting the status code
299    * (RECV_TRAILING_METADATA) is issued by the user. (See
300    * doc/status_ordering.md) */
301   /* Whenever the evaluation of any of the two condition is changed, we check
302    * whether we should enter the flush read state. */
303   if (s->state.pending_recv_trailing_metadata && s->state.fail_state) {
304     if (!s->state.flush_read && !s->state.rs.read_stream_closed) {
305       CRONET_LOG(GPR_DEBUG, "%p: Flush read", s);
306       s->state.flush_read = true;
307       null_and_maybe_free_read_buffer(s);
308       s->state.rs.read_buffer =
309           static_cast<char*>(gpr_malloc(GRPC_FLUSH_READ_SIZE));
310       if (!s->state.pending_read_from_cronet) {
311         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
312         bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
313                                   GRPC_FLUSH_READ_SIZE);
314         s->state.pending_read_from_cronet = true;
315       }
316     }
317   }
318 }
319 
make_error_with_desc(int error_code,const char * desc)320 static grpc_error* make_error_with_desc(int error_code, const char* desc) {
321   grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
322   error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
323   return error;
324 }
325 
op_and_state(stream_obj * s,const grpc_transport_stream_op_batch & op)326 inline op_and_state::op_and_state(stream_obj* s,
327                                   const grpc_transport_stream_op_batch& op)
328     : op(op), state(s->arena), s(s) {}
329 
330 /*
331   Add a new stream op to op storage.
332 */
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)333 static void add_to_storage(struct stream_obj* s,
334                            grpc_transport_stream_op_batch* op) {
335   struct op_storage* storage = &s->storage;
336   /* add new op at the beginning of the linked list. The memory is freed
337   in remove_from_storage */
338   op_and_state* new_op = new op_and_state(s, *op);
339   gpr_mu_lock(&s->mu);
340   new_op->next = storage->head;
341   storage->head = new_op;
342   storage->num_pending_ops++;
343   if (op->send_message) {
344     s->state.pending_send_message = true;
345   }
346   if (op->recv_trailing_metadata) {
347     s->state.pending_recv_trailing_metadata = true;
348     maybe_flush_read(s);
349   }
350   CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
351              storage->num_pending_ops);
352   gpr_mu_unlock(&s->mu);
353 }
354 
355 /*
356   Traverse the linked list and delete op and free memory
357 */
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)358 static void remove_from_storage(struct stream_obj* s,
359                                 struct op_and_state* oas) {
360   struct op_and_state* curr;
361   if (s->storage.head == nullptr || oas == nullptr) {
362     return;
363   }
364   if (s->storage.head == oas) {
365     s->storage.head = oas->next;
366     delete oas;
367     s->storage.num_pending_ops--;
368     CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
369                s->storage.num_pending_ops);
370   } else {
371     for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
372       if (curr->next == oas) {
373         curr->next = oas->next;
374         s->storage.num_pending_ops--;
375         CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
376                    s->storage.num_pending_ops);
377         delete oas;
378         break;
379       } else if (GPR_UNLIKELY(curr->next == nullptr)) {
380         CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
381       }
382     }
383   }
384 }
385 
386 /*
387   Cycle through ops and try to take next action. Break when either
388   an action with callback is taken, or no action is possible.
389   This can get executed from the Cronet network thread via cronet callback
390   or on the application supplied thread via the perform_stream_op function.
391 */
execute_from_storage(stream_obj * s)392 static void execute_from_storage(stream_obj* s) {
393   gpr_mu_lock(&s->mu);
394   for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
395     CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
396     GPR_ASSERT(!curr->done);
397     enum e_op_result result = execute_stream_op(curr);
398     CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
399                op_result_string(result));
400     /* if this op is done, then remove it and free memory */
401     if (curr->done) {
402       struct op_and_state* next = curr->next;
403       remove_from_storage(s, curr);
404       curr = next;
405     } else if (result == NO_ACTION_POSSIBLE) {
406       curr = curr->next;
407     } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
408       /* wait for the callback */
409       break;
410     } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
411   }
412   gpr_mu_unlock(&s->mu);
413 }
414 
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_chttp2_incoming_metadata_buffer * mds)415 static void convert_cronet_array_to_metadata(
416     const bidirectional_stream_header_array* header_array,
417     grpc_chttp2_incoming_metadata_buffer* mds) {
418   for (size_t i = 0; i < header_array->count; i++) {
419     CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
420                header_array->headers[i].key, header_array->headers[i].value);
421     grpc_slice key = grpc_slice_intern(
422         grpc_slice_from_static_string(header_array->headers[i].key));
423     grpc_slice value;
424     if (grpc_is_refcounted_slice_binary_header(key)) {
425       value = grpc_slice_from_static_string(header_array->headers[i].value);
426       value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
427           value, grpc_chttp2_base64_infer_length_after_decode(value)));
428     } else {
429       value = grpc_slice_intern(
430           grpc_slice_from_static_string(header_array->headers[i].value));
431     }
432     GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
433                       grpc_chttp2_incoming_metadata_buffer_add(
434                           mds, grpc_mdelem_from_slices(key, value)));
435   }
436 }
437 
438 /*
439   Cronet callback
440 */
on_failed(bidirectional_stream * stream,int net_error)441 static void on_failed(bidirectional_stream* stream, int net_error) {
442   gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
443   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
444   grpc_core::ExecCtx exec_ctx;
445 
446   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
447   gpr_mu_lock(&s->mu);
448   bidirectional_stream_destroy(s->cbs);
449   s->state.state_callback_received[OP_FAILED] = true;
450   s->cbs = nullptr;
451   if (s->header_array.headers) {
452     gpr_free(s->header_array.headers);
453     s->header_array.headers = nullptr;
454   }
455   if (s->state.ws.write_buffer) {
456     gpr_free(s->state.ws.write_buffer);
457     s->state.ws.write_buffer = nullptr;
458   }
459   null_and_maybe_free_read_buffer(s);
460   gpr_mu_unlock(&s->mu);
461   execute_from_storage(s);
462   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
463 }
464 
465 /*
466   Cronet callback
467 */
on_canceled(bidirectional_stream * stream)468 static void on_canceled(bidirectional_stream* stream) {
469   CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
470   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
471   grpc_core::ExecCtx exec_ctx;
472 
473   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
474   gpr_mu_lock(&s->mu);
475   bidirectional_stream_destroy(s->cbs);
476   s->state.state_callback_received[OP_CANCELED] = true;
477   s->cbs = nullptr;
478   if (s->header_array.headers) {
479     gpr_free(s->header_array.headers);
480     s->header_array.headers = nullptr;
481   }
482   if (s->state.ws.write_buffer) {
483     gpr_free(s->state.ws.write_buffer);
484     s->state.ws.write_buffer = nullptr;
485   }
486   null_and_maybe_free_read_buffer(s);
487   gpr_mu_unlock(&s->mu);
488   execute_from_storage(s);
489   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
490 }
491 
492 /*
493   Cronet callback
494 */
on_succeeded(bidirectional_stream * stream)495 static void on_succeeded(bidirectional_stream* stream) {
496   CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
497   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
498   grpc_core::ExecCtx exec_ctx;
499 
500   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
501   gpr_mu_lock(&s->mu);
502   bidirectional_stream_destroy(s->cbs);
503   s->state.state_callback_received[OP_SUCCEEDED] = true;
504   s->cbs = nullptr;
505   null_and_maybe_free_read_buffer(s);
506   gpr_mu_unlock(&s->mu);
507   execute_from_storage(s);
508   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
509 }
510 
511 /*
512   Cronet callback
513 */
on_stream_ready(bidirectional_stream * stream)514 static void on_stream_ready(bidirectional_stream* stream) {
515   CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
516   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
517   grpc_core::ExecCtx exec_ctx;
518   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
519   grpc_cronet_transport* t = s->curr_ct;
520   gpr_mu_lock(&s->mu);
521   s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
522   s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
523   /* Free the memory allocated for headers */
524   if (s->header_array.headers) {
525     gpr_free(s->header_array.headers);
526     s->header_array.headers = nullptr;
527   }
528   /* Send the initial metadata on wire if there is no SEND_MESSAGE or
529    * SEND_TRAILING_METADATA ops pending */
530   if (t->use_packet_coalescing) {
531     if (s->state.flush_cronet_when_ready) {
532       CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
533       bidirectional_stream_flush(stream);
534     }
535   }
536   gpr_mu_unlock(&s->mu);
537   execute_from_storage(s);
538 }
539 
540 /*
541   Cronet callback
542 */
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)543 static void on_response_headers_received(
544     bidirectional_stream* stream,
545     const bidirectional_stream_header_array* headers,
546     const char* negotiated_protocol) {
547   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
548   grpc_core::ExecCtx exec_ctx;
549   CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
550              headers, negotiated_protocol);
551   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
552 
553   /* Identify if this is a header or a trailer (in a trailer-only response case)
554    */
555   for (size_t i = 0; i < headers->count; i++) {
556     if (0 == strcmp("grpc-status", headers->headers[i].key)) {
557       on_response_trailers_received(stream, headers);
558       return;
559     }
560   }
561 
562   gpr_mu_lock(&s->mu);
563   convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
564   s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
565   if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
566         s->state.state_callback_received[OP_FAILED])) {
567     /* Do an extra read to trigger on_succeeded() callback in case connection
568      is closed */
569     GPR_ASSERT(s->state.rs.length_field_received == false);
570     s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
571     s->state.rs.compressed = false;
572     s->state.rs.received_bytes = 0;
573     s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
574     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
575     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
576                               s->state.rs.remaining_bytes);
577     s->state.pending_read_from_cronet = true;
578   }
579   gpr_mu_unlock(&s->mu);
580   execute_from_storage(s);
581 }
582 
583 /*
584   Cronet callback
585 */
on_write_completed(bidirectional_stream * stream,const char * data)586 static void on_write_completed(bidirectional_stream* stream, const char* data) {
587   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
588   grpc_core::ExecCtx exec_ctx;
589   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
590   CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
591   gpr_mu_lock(&s->mu);
592   if (s->state.ws.write_buffer) {
593     gpr_free(s->state.ws.write_buffer);
594     s->state.ws.write_buffer = nullptr;
595   }
596   s->state.state_callback_received[OP_SEND_MESSAGE] = true;
597   gpr_mu_unlock(&s->mu);
598   execute_from_storage(s);
599 }
600 
601 /*
602   Cronet callback
603 */
on_read_completed(bidirectional_stream * stream,char * data,int count)604 static void on_read_completed(bidirectional_stream* stream, char* data,
605                               int count) {
606   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
607   grpc_core::ExecCtx exec_ctx;
608   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
609   CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
610              count);
611   gpr_mu_lock(&s->mu);
612   s->state.pending_read_from_cronet = false;
613   s->state.state_callback_received[OP_RECV_MESSAGE] = true;
614   if (count > 0 && s->state.flush_read) {
615     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
616     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
617                               GRPC_FLUSH_READ_SIZE);
618     s->state.pending_read_from_cronet = true;
619     gpr_mu_unlock(&s->mu);
620   } else if (count > 0) {
621     s->state.rs.received_bytes += count;
622     s->state.rs.remaining_bytes -= count;
623     if (s->state.rs.remaining_bytes > 0) {
624       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
625       s->state.state_op_done[OP_READ_REQ_MADE] = true;
626       bidirectional_stream_read(
627           s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
628           s->state.rs.remaining_bytes);
629       s->state.pending_read_from_cronet = true;
630       gpr_mu_unlock(&s->mu);
631     } else {
632       gpr_mu_unlock(&s->mu);
633       execute_from_storage(s);
634     }
635   } else {
636     null_and_maybe_free_read_buffer(s);
637     s->state.rs.read_stream_closed = true;
638     gpr_mu_unlock(&s->mu);
639     execute_from_storage(s);
640   }
641 }
642 
643 /*
644   Cronet callback
645 */
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)646 static void on_response_trailers_received(
647     bidirectional_stream* stream,
648     const bidirectional_stream_header_array* trailers) {
649   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
650   grpc_core::ExecCtx exec_ctx;
651   CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
652              trailers);
653   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
654   grpc_cronet_transport* t = s->curr_ct;
655   gpr_mu_lock(&s->mu);
656   s->state.rs.trailing_metadata_valid = false;
657   convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
658   if (trailers->count > 0) {
659     s->state.rs.trailing_metadata_valid = true;
660   }
661   for (size_t i = 0; i < trailers->count; i++) {
662     if (0 == strcmp(trailers->headers[i].key, "grpc-status") &&
663         0 != strcmp(trailers->headers[i].value, "0")) {
664       s->state.fail_state = true;
665       maybe_flush_read(s);
666     }
667   }
668   s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
669   /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
670    * trigger on_succeeded */
671   if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
672       !(s->state.state_op_done[OP_CANCEL_ERROR] ||
673         s->state.state_callback_received[OP_FAILED])) {
674     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
675     s->state.state_callback_received[OP_SEND_MESSAGE] = false;
676     bidirectional_stream_write(s->cbs, "", 0, true);
677     if (t->use_packet_coalescing) {
678       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
679       bidirectional_stream_flush(s->cbs);
680     }
681     s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
682 
683     gpr_mu_unlock(&s->mu);
684   } else {
685     gpr_mu_unlock(&s->mu);
686     execute_from_storage(s);
687   }
688 }
689 
690 /*
691  Utility function that takes the data from s->write_slice_buffer and assembles
692  into a contiguous byte stream with 5 byte gRPC header prepended.
693 */
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)694 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
695                               char** pp_write_buffer,
696                               size_t* p_write_buffer_size, uint32_t flags) {
697   grpc_slice slice = grpc_slice_buffer_take_first(write_slice_buffer);
698   size_t length = GRPC_SLICE_LENGTH(slice);
699   *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
700   /* This is freed in the on_write_completed callback */
701   char* write_buffer =
702       static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
703   *pp_write_buffer = write_buffer;
704   uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
705   /* Append 5 byte header */
706   /* Compressed flag */
707   *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
708   /* Message length */
709   *p++ = static_cast<uint8_t>(length >> 24);
710   *p++ = static_cast<uint8_t>(length >> 16);
711   *p++ = static_cast<uint8_t>(length >> 8);
712   *p++ = static_cast<uint8_t>(length);
713   /* append actual data */
714   memcpy(p, GRPC_SLICE_START_PTR(slice), length);
715   grpc_slice_unref_internal(slice);
716 }
717 
718 /*
719  Convert metadata in a format that Cronet can consume
720 */
convert_metadata_to_cronet_headers(grpc_metadata_batch * metadata,const char * host,char ** pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)721 static void convert_metadata_to_cronet_headers(
722     grpc_metadata_batch* metadata, const char* host, char** pp_url,
723     bidirectional_stream_header** pp_headers, size_t* p_num_headers,
724     const char** method) {
725   grpc_linked_mdelem* curr = metadata->list.head;
726   /* Walk the linked list and get number of header fields */
727   size_t num_headers_available = 0;
728   while (curr != nullptr) {
729     curr = curr->next;
730     num_headers_available++;
731   }
732   grpc_millis deadline = metadata->deadline;
733   if (deadline != GRPC_MILLIS_INF_FUTURE) {
734     num_headers_available++;
735   }
736   /* Allocate enough memory. It is freed in the on_stream_ready callback
737    */
738   bidirectional_stream_header* headers =
739       static_cast<bidirectional_stream_header*>(gpr_malloc(
740           sizeof(bidirectional_stream_header) * num_headers_available));
741   *pp_headers = headers;
742 
743   /* Walk the linked list again, this time copying the header fields.
744     s->num_headers can be less than num_headers_available, as some headers
745     are not used for cronet.
746     TODO (makdharma): Eliminate need to traverse the LL second time for perf.
747    */
748   curr = metadata->list.head;
749   size_t num_headers = 0;
750   while (num_headers < num_headers_available) {
751     grpc_mdelem mdelem = curr->md;
752     curr = curr->next;
753     char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
754     char* value;
755     if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
756       grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
757       value = grpc_slice_to_c_string(wire_value);
758       grpc_slice_unref_internal(wire_value);
759     } else {
760       value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
761     }
762     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
763         grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
764                                       GRPC_MDSTR_AUTHORITY)) {
765       /* Cronet populates these fields on its own */
766       gpr_free(key);
767       gpr_free(value);
768       continue;
769     }
770     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
771       if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
772         *method = "PUT";
773       } else {
774         /* POST method in default*/
775         *method = "POST";
776       }
777       gpr_free(key);
778       gpr_free(value);
779       continue;
780     }
781     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
782       /* Create URL by appending :path value to the hostname */
783       gpr_asprintf(pp_url, "https://%s%s", host, value);
784       gpr_free(key);
785       gpr_free(value);
786       continue;
787     }
788     CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
789     headers[num_headers].key = key;
790     headers[num_headers].value = value;
791     num_headers++;
792     if (curr == nullptr) {
793       break;
794     }
795   }
796   if (deadline != GRPC_MILLIS_INF_FUTURE) {
797     char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
798     char* value =
799         static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
800     grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
801                               value);
802     headers[num_headers].key = key;
803     headers[num_headers].value = value;
804 
805     num_headers++;
806   }
807 
808   *p_num_headers = num_headers;
809 }
810 
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)811 static void parse_grpc_header(const uint8_t* data, int* length,
812                               bool* compressed) {
813   const uint8_t c = *data;
814   const uint8_t* p = data + 1;
815   *compressed = ((c & 0x01) == 0x01);
816   *length = 0;
817   *length |= (*p++) << 24;
818   *length |= (*p++) << 16;
819   *length |= (*p++) << 8;
820   *length |= (*p++);
821 }
822 
header_has_authority(grpc_linked_mdelem * head)823 static bool header_has_authority(grpc_linked_mdelem* head) {
824   while (head != nullptr) {
825     if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
826                                       GRPC_MDSTR_AUTHORITY)) {
827       return true;
828     }
829     head = head->next;
830   }
831   return false;
832 }
833 
834 /*
835   Op Execution: Decide if one of the actions contained in the stream op can be
836   executed. This is the heart of the state machine.
837 */
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)838 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
839                           struct stream_obj* s, struct op_state* op_state,
840                           enum e_op_id op_id) {
841   struct op_state* stream_state = &s->state;
842   grpc_cronet_transport* t = s->curr_ct;
843   bool result = true;
844   /* When call is canceled, every op can be run, except under following
845   conditions
846   */
847   bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
848                                stream_state->state_callback_received[OP_FAILED];
849   if (is_canceled_or_failed) {
850     if (op_id == OP_SEND_INITIAL_METADATA) {
851       CRONET_LOG(GPR_DEBUG, "Because");
852       result = false;
853     }
854     if (op_id == OP_SEND_MESSAGE) {
855       CRONET_LOG(GPR_DEBUG, "Because");
856       result = false;
857     }
858     if (op_id == OP_SEND_TRAILING_METADATA) {
859       CRONET_LOG(GPR_DEBUG, "Because");
860       result = false;
861     }
862     if (op_id == OP_CANCEL_ERROR) {
863       CRONET_LOG(GPR_DEBUG, "Because");
864       result = false;
865     }
866     /* already executed */
867     if (op_id == OP_RECV_INITIAL_METADATA &&
868         stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
869       CRONET_LOG(GPR_DEBUG, "Because");
870       result = false;
871     }
872     if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
873       CRONET_LOG(GPR_DEBUG, "Because");
874       result = false;
875     }
876     if (op_id == OP_RECV_TRAILING_METADATA &&
877         stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
878       CRONET_LOG(GPR_DEBUG, "Because");
879       result = false;
880     }
881     /* ON_COMPLETE can be processed if one of the following conditions is met:
882      * 1. the stream failed
883      * 2. the stream is cancelled, and the callback is received
884      * 3. the stream succeeded before cancel is effective
885      * 4. the stream is cancelled, and the stream is never started */
886     if (op_id == OP_ON_COMPLETE &&
887         !(stream_state->state_callback_received[OP_FAILED] ||
888           stream_state->state_callback_received[OP_CANCELED] ||
889           stream_state->state_callback_received[OP_SUCCEEDED] ||
890           !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
891       CRONET_LOG(GPR_DEBUG, "Because");
892       result = false;
893     }
894   } else if (op_id == OP_SEND_INITIAL_METADATA) {
895     /* already executed */
896     if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
897   } else if (op_id == OP_RECV_INITIAL_METADATA) {
898     /* already executed */
899     if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
900     /* we haven't sent headers yet. */
901     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
902       result = false;
903     /* we haven't received headers yet. */
904     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
905              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
906       result = false;
907   } else if (op_id == OP_SEND_MESSAGE) {
908     /* already executed (note we're checking op specific state, not stream
909      state) */
910     if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
911     /* we haven't sent headers yet. */
912     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
913       result = false;
914   } else if (op_id == OP_RECV_MESSAGE) {
915     /* already executed */
916     if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
917     /* we haven't received headers yet. */
918     else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
919              !stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
920       result = false;
921   } else if (op_id == OP_RECV_TRAILING_METADATA) {
922     /* already executed */
923     if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
924     /* we have asked for but haven't received message yet. */
925     else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
926              !stream_state->state_op_done[OP_RECV_MESSAGE])
927       result = false;
928     /* we haven't received trailers  yet. */
929     else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
930       result = false;
931     /* we haven't received on_succeeded  yet. */
932     else if (!stream_state->state_callback_received[OP_SUCCEEDED])
933       result = false;
934   } else if (op_id == OP_SEND_TRAILING_METADATA) {
935     /* already executed */
936     if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
937     /* we haven't sent initial metadata yet */
938     else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
939       result = false;
940     /* we haven't sent message yet */
941     else if (stream_state->pending_send_message &&
942              !stream_state->state_op_done[OP_SEND_MESSAGE])
943       result = false;
944     /* we haven't got on_write_completed for the send yet */
945     else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
946              !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
947              !(t->use_packet_coalescing &&
948                stream_state->pending_write_for_trailer))
949       result = false;
950   } else if (op_id == OP_CANCEL_ERROR) {
951     /* already executed */
952     if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
953   } else if (op_id == OP_ON_COMPLETE) {
954     /* already executed (note we're checking op specific state, not stream
955     state) */
956     if (op_state->state_op_done[OP_ON_COMPLETE]) {
957       CRONET_LOG(GPR_DEBUG, "Because");
958       result = false;
959     }
960     /* Check if every op that was asked for is done. */
961     /* TODO(muxi): We should not consider the recv ops here, since they
962      * have their own callbacks.  We should invoke a batch's on_complete
963      * as soon as all of the batch's send ops are complete, even if
964      * there are still recv ops pending. */
965     else if (curr_op->send_initial_metadata &&
966              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
967       CRONET_LOG(GPR_DEBUG, "Because");
968       result = false;
969     } else if (curr_op->send_message &&
970                !op_state->state_op_done[OP_SEND_MESSAGE]) {
971       CRONET_LOG(GPR_DEBUG, "Because");
972       result = false;
973     } else if (curr_op->send_message &&
974                !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
975       CRONET_LOG(GPR_DEBUG, "Because");
976       result = false;
977     } else if (curr_op->send_trailing_metadata &&
978                !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
979       CRONET_LOG(GPR_DEBUG, "Because");
980       result = false;
981     } else if (curr_op->recv_initial_metadata &&
982                !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
983       CRONET_LOG(GPR_DEBUG, "Because");
984       result = false;
985     } else if (curr_op->recv_message &&
986                !op_state->state_op_done[OP_RECV_MESSAGE]) {
987       CRONET_LOG(GPR_DEBUG, "Because");
988       result = false;
989     } else if (curr_op->cancel_stream &&
990                !stream_state->state_callback_received[OP_CANCELED]) {
991       CRONET_LOG(GPR_DEBUG, "Because");
992       result = false;
993     } else if (curr_op->recv_trailing_metadata) {
994       /* We aren't done with trailing metadata yet */
995       if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
996         CRONET_LOG(GPR_DEBUG, "Because");
997         result = false;
998       }
999       /* We've asked for actual message in an earlier op, and it hasn't been
1000         delivered yet. */
1001       else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1002         /* If this op is not the one asking for read, (which means some earlier
1003           op has asked), and the read hasn't been delivered. */
1004         if (!curr_op->recv_message &&
1005             !stream_state->state_callback_received[OP_SUCCEEDED]) {
1006           CRONET_LOG(GPR_DEBUG, "Because");
1007           result = false;
1008         }
1009       }
1010     }
1011     /* We should see at least one on_write_completed for the trailers that we
1012       sent */
1013     else if (curr_op->send_trailing_metadata &&
1014              !stream_state->state_callback_received[OP_SEND_MESSAGE])
1015       result = false;
1016   }
1017   CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1018              result ? "YES" : "NO");
1019   return result;
1020 }
1021 
1022 /*
1023   TODO (makdharma): Break down this function in smaller chunks for readability.
1024 */
execute_stream_op(struct op_and_state * oas)1025 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1026   grpc_transport_stream_op_batch* stream_op = &oas->op;
1027   struct stream_obj* s = oas->s;
1028   grpc_cronet_transport* t = s->curr_ct;
1029   struct op_state* stream_state = &s->state;
1030   enum e_op_result result = NO_ACTION_POSSIBLE;
1031   if (stream_op->send_initial_metadata &&
1032       op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1033     CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1034     /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1035      * on_failed */
1036     GPR_ASSERT(s->cbs == nullptr);
1037     GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1038     s->cbs =
1039         bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1040     CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1041     if (t->use_packet_coalescing) {
1042       bidirectional_stream_disable_auto_flush(s->cbs, true);
1043       bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1044     }
1045     char* url = nullptr;
1046     const char* method = "POST";
1047     s->header_array.headers = nullptr;
1048     convert_metadata_to_cronet_headers(
1049         stream_op->payload->send_initial_metadata.send_initial_metadata,
1050         t->host, &url, &s->header_array.headers, &s->header_array.count,
1051         &method);
1052     s->header_array.capacity = s->header_array.count;
1053     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
1054     bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
1055     if (url) {
1056       gpr_free(url);
1057     }
1058     unsigned int header_index;
1059     for (header_index = 0; header_index < s->header_array.count;
1060          header_index++) {
1061       gpr_free((void*)s->header_array.headers[header_index].key);
1062       gpr_free((void*)s->header_array.headers[header_index].value);
1063     }
1064     stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1065     if (t->use_packet_coalescing) {
1066       if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1067         s->state.flush_cronet_when_ready = true;
1068       }
1069     }
1070     result = ACTION_TAKEN_WITH_CALLBACK;
1071   } else if (stream_op->send_message &&
1072              op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1073     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
1074     stream_state->pending_send_message = false;
1075     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1076         stream_state->state_callback_received[OP_FAILED] ||
1077         stream_state->state_callback_received[OP_SUCCEEDED]) {
1078       result = NO_ACTION_POSSIBLE;
1079       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1080     } else {
1081       grpc_slice_buffer write_slice_buffer;
1082       grpc_slice slice;
1083       grpc_slice_buffer_init(&write_slice_buffer);
1084       if (1 != stream_op->payload->send_message.send_message->Next(
1085                    stream_op->payload->send_message.send_message->length(),
1086                    nullptr)) {
1087         /* Should never reach here */
1088         GPR_ASSERT(false);
1089       }
1090       if (GRPC_ERROR_NONE !=
1091           stream_op->payload->send_message.send_message->Pull(&slice)) {
1092         /* Should never reach here */
1093         GPR_ASSERT(false);
1094       }
1095       grpc_slice_buffer_add(&write_slice_buffer, slice);
1096       if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
1097         /* Empty request not handled yet */
1098         gpr_log(GPR_ERROR, "Empty request is not supported");
1099         GPR_ASSERT(write_slice_buffer.count == 1);
1100       }
1101       if (write_slice_buffer.count > 0) {
1102         size_t write_buffer_size;
1103         create_grpc_frame(
1104             &write_slice_buffer, &stream_state->ws.write_buffer,
1105             &write_buffer_size,
1106             stream_op->payload->send_message.send_message->flags());
1107         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1108                    stream_state->ws.write_buffer);
1109         stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1110         bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1111                                    static_cast<int>(write_buffer_size), false);
1112         grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1113         if (t->use_packet_coalescing) {
1114           if (!stream_op->send_trailing_metadata) {
1115             CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1116             bidirectional_stream_flush(s->cbs);
1117             result = ACTION_TAKEN_WITH_CALLBACK;
1118           } else {
1119             stream_state->pending_write_for_trailer = true;
1120             result = ACTION_TAKEN_NO_CALLBACK;
1121           }
1122         } else {
1123           result = ACTION_TAKEN_WITH_CALLBACK;
1124         }
1125       } else {
1126         result = NO_ACTION_POSSIBLE;
1127       }
1128     }
1129     stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1130     oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1131     stream_op->payload->send_message.send_message.reset();
1132   } else if (stream_op->send_trailing_metadata &&
1133              op_can_be_run(stream_op, s, &oas->state,
1134                            OP_SEND_TRAILING_METADATA)) {
1135     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
1136     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1137         stream_state->state_callback_received[OP_FAILED] ||
1138         stream_state->state_callback_received[OP_SUCCEEDED]) {
1139       result = NO_ACTION_POSSIBLE;
1140       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1141     } else {
1142       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1143       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1144       bidirectional_stream_write(s->cbs, "", 0, true);
1145       if (t->use_packet_coalescing) {
1146         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1147         bidirectional_stream_flush(s->cbs);
1148       }
1149       result = ACTION_TAKEN_WITH_CALLBACK;
1150     }
1151     stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1152   } else if (stream_op->recv_initial_metadata &&
1153              op_can_be_run(stream_op, s, &oas->state,
1154                            OP_RECV_INITIAL_METADATA)) {
1155     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
1156     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1157       grpc_core::ExecCtx::Run(
1158           DEBUG_LOCATION,
1159           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1160           GRPC_ERROR_NONE);
1161     } else if (stream_state->state_callback_received[OP_FAILED]) {
1162       grpc_core::ExecCtx::Run(
1163           DEBUG_LOCATION,
1164           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1165           GRPC_ERROR_NONE);
1166     } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1167       grpc_core::ExecCtx::Run(
1168           DEBUG_LOCATION,
1169           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1170           GRPC_ERROR_NONE);
1171     } else {
1172       grpc_chttp2_incoming_metadata_buffer_publish(
1173           &oas->s->state.rs.initial_metadata,
1174           stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1175       grpc_core::ExecCtx::Run(
1176           DEBUG_LOCATION,
1177           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1178           GRPC_ERROR_NONE);
1179     }
1180     stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1181     result = ACTION_TAKEN_NO_CALLBACK;
1182   } else if (stream_op->recv_message &&
1183              op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1184     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
1185     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1186       CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1187       grpc_core::ExecCtx::Run(
1188           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1189           GRPC_ERROR_NONE);
1190       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1191       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1192       result = ACTION_TAKEN_NO_CALLBACK;
1193     } else if (stream_state->state_callback_received[OP_FAILED]) {
1194       CRONET_LOG(GPR_DEBUG, "Stream failed.");
1195       grpc_core::ExecCtx::Run(
1196           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1197           GRPC_ERROR_NONE);
1198       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1199       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1200       result = ACTION_TAKEN_NO_CALLBACK;
1201     } else if (stream_state->rs.read_stream_closed == true) {
1202       /* No more data will be received */
1203       CRONET_LOG(GPR_DEBUG, "read stream closed");
1204       grpc_core::ExecCtx::Run(
1205           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1206           GRPC_ERROR_NONE);
1207       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1208       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1209       result = ACTION_TAKEN_NO_CALLBACK;
1210     } else if (stream_state->flush_read) {
1211       CRONET_LOG(GPR_DEBUG, "flush read");
1212       grpc_core::ExecCtx::Run(
1213           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1214           GRPC_ERROR_NONE);
1215       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1216       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1217       result = ACTION_TAKEN_NO_CALLBACK;
1218     } else if (stream_state->rs.length_field_received == false) {
1219       if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1220           stream_state->rs.remaining_bytes == 0) {
1221         /* Start a read operation for data */
1222         stream_state->rs.length_field_received = true;
1223         parse_grpc_header(
1224             reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1225             &stream_state->rs.length_field, &stream_state->rs.compressed);
1226         CRONET_LOG(GPR_DEBUG, "length field = %d",
1227                    stream_state->rs.length_field);
1228         if (stream_state->rs.length_field > 0) {
1229           stream_state->rs.read_buffer = static_cast<char*>(
1230               gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1231           GPR_ASSERT(stream_state->rs.read_buffer);
1232           stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1233           stream_state->rs.received_bytes = 0;
1234           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1235           stream_state->state_op_done[OP_READ_REQ_MADE] =
1236               true; /* Indicates that at least one read request has been made */
1237           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1238                                     stream_state->rs.remaining_bytes);
1239           stream_state->pending_read_from_cronet = true;
1240           result = ACTION_TAKEN_WITH_CALLBACK;
1241         } else {
1242           stream_state->rs.remaining_bytes = 0;
1243           CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1244           /* Clean up read_slice_buffer in case there is unread data. */
1245           grpc_slice_buffer_destroy_internal(
1246               &stream_state->rs.read_slice_buffer);
1247           grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1248           uint32_t flags = 0;
1249           if (stream_state->rs.compressed) {
1250             flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1251           }
1252           stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1253           stream_op->payload->recv_message.recv_message->reset(
1254               stream_state->rs.sbs.get());
1255           grpc_core::ExecCtx::Run(
1256               DEBUG_LOCATION,
1257               stream_op->payload->recv_message.recv_message_ready,
1258               GRPC_ERROR_NONE);
1259           stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1260           oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1261 
1262           /* Extra read to trigger on_succeed */
1263           stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1264           stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1265           stream_state->rs.received_bytes = 0;
1266           stream_state->rs.compressed = false;
1267           stream_state->rs.length_field_received = false;
1268           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1269           stream_state->state_op_done[OP_READ_REQ_MADE] =
1270               true; /* Indicates that at least one read request has been made */
1271           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1272                                     stream_state->rs.remaining_bytes);
1273           stream_state->pending_read_from_cronet = true;
1274           result = ACTION_TAKEN_NO_CALLBACK;
1275         }
1276       } else if (stream_state->rs.remaining_bytes == 0) {
1277         /* Start a read operation for first 5 bytes (GRPC header) */
1278         stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1279         stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1280         stream_state->rs.received_bytes = 0;
1281         stream_state->rs.compressed = false;
1282         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1283         stream_state->state_op_done[OP_READ_REQ_MADE] =
1284             true; /* Indicates that at least one read request has been made */
1285         bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1286                                   stream_state->rs.remaining_bytes);
1287         stream_state->pending_read_from_cronet = true;
1288         result = ACTION_TAKEN_WITH_CALLBACK;
1289       } else {
1290         result = NO_ACTION_POSSIBLE;
1291       }
1292     } else if (stream_state->rs.remaining_bytes == 0) {
1293       CRONET_LOG(GPR_DEBUG, "read operation complete");
1294       grpc_slice read_data_slice =
1295           GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1296       uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1297       memcpy(dst_p, stream_state->rs.read_buffer,
1298              static_cast<size_t>(stream_state->rs.length_field));
1299       null_and_maybe_free_read_buffer(s);
1300       /* Clean up read_slice_buffer in case there is unread data. */
1301       grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1302       grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1303       grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1304                             read_data_slice);
1305       uint32_t flags = 0;
1306       if (stream_state->rs.compressed) {
1307         flags = GRPC_WRITE_INTERNAL_COMPRESS;
1308       }
1309       stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1310       stream_op->payload->recv_message.recv_message->reset(
1311           stream_state->rs.sbs.get());
1312       grpc_core::ExecCtx::Run(
1313           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1314           GRPC_ERROR_NONE);
1315       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1316       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1317       /* Do an extra read to trigger on_succeeded() callback in case connection
1318          is closed */
1319       stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1320       stream_state->rs.compressed = false;
1321       stream_state->rs.received_bytes = 0;
1322       stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1323       stream_state->rs.length_field_received = false;
1324       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1325       bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1326                                 stream_state->rs.remaining_bytes);
1327       stream_state->pending_read_from_cronet = true;
1328       result = ACTION_TAKEN_NO_CALLBACK;
1329     }
1330   } else if (stream_op->recv_trailing_metadata &&
1331              op_can_be_run(stream_op, s, &oas->state,
1332                            OP_RECV_TRAILING_METADATA)) {
1333     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
1334     grpc_error* error = GRPC_ERROR_NONE;
1335     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1336       error = GRPC_ERROR_REF(stream_state->cancel_error);
1337     } else if (stream_state->state_callback_received[OP_FAILED]) {
1338       error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
1339     } else if (oas->s->state.rs.trailing_metadata_valid) {
1340       grpc_chttp2_incoming_metadata_buffer_publish(
1341           &oas->s->state.rs.trailing_metadata,
1342           stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1343       stream_state->rs.trailing_metadata_valid = false;
1344     }
1345     grpc_core::ExecCtx::Run(
1346         DEBUG_LOCATION,
1347         stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1348         error);
1349     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1350     result = ACTION_TAKEN_NO_CALLBACK;
1351   } else if (stream_op->cancel_stream &&
1352              op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1353     CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
1354     if (s->cbs) {
1355       CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1356       bidirectional_stream_cancel(s->cbs);
1357       result = ACTION_TAKEN_WITH_CALLBACK;
1358     } else {
1359       result = ACTION_TAKEN_NO_CALLBACK;
1360     }
1361     stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1362     if (!stream_state->cancel_error) {
1363       stream_state->cancel_error =
1364           GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1365     }
1366   } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1367     CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
1368     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1369       if (stream_op->on_complete) {
1370         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1371                                 GRPC_ERROR_REF(stream_state->cancel_error));
1372       }
1373     } else if (stream_state->state_callback_received[OP_FAILED]) {
1374       if (stream_op->on_complete) {
1375         grpc_core::ExecCtx::Run(
1376             DEBUG_LOCATION, stream_op->on_complete,
1377             make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
1378       }
1379     } else {
1380       /* All actions in this stream_op are complete. Call the on_complete
1381        * callback
1382        */
1383       if (stream_op->on_complete) {
1384         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1385                                 GRPC_ERROR_NONE);
1386       }
1387     }
1388     oas->state.state_op_done[OP_ON_COMPLETE] = true;
1389     oas->done = true;
1390     /* reset any send message state, only if this ON_COMPLETE is about a send.
1391      */
1392     if (stream_op->send_message) {
1393       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1394       stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1395     }
1396     result = ACTION_TAKEN_NO_CALLBACK;
1397     /* If this is the on_complete callback being called for a received message -
1398       make a note */
1399     if (stream_op->recv_message)
1400       stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1401   } else {
1402     result = NO_ACTION_POSSIBLE;
1403   }
1404   return result;
1405 }
1406 
1407 /*
1408   Functions used by upper layers to access transport functionality.
1409 */
1410 
stream_obj(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,grpc_core::Arena * arena)1411 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1412                               grpc_stream_refcount* refcount,
1413                               grpc_core::Arena* arena)
1414     : arena(arena),
1415       curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1416       curr_gs(gs),
1417       state(arena),
1418       refcount(refcount) {
1419   GRPC_CRONET_STREAM_REF(this, "cronet transport");
1420   gpr_mu_init(&mu);
1421 }
1422 
~stream_obj()1423 inline stream_obj::~stream_obj() {
1424   null_and_maybe_free_read_buffer(this);
1425   /* Clean up read_slice_buffer in case there is unread data. */
1426   grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1427   GRPC_ERROR_UNREF(state.cancel_error);
1428 }
1429 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)1430 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1431                        grpc_stream_refcount* refcount, const void* server_data,
1432                        grpc_core::Arena* arena) {
1433   new (gs) stream_obj(gt, gs, refcount, arena);
1434   return 0;
1435 }
1436 
set_pollset_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1437 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1438                                    grpc_pollset* pollset) {}
1439 
set_pollset_set_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1440 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1441                                        grpc_pollset_set* pollset_set) {}
1442 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1443 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1444                               grpc_transport_stream_op_batch* op) {
1445   CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1446   if (op->send_initial_metadata &&
1447       header_has_authority(op->payload->send_initial_metadata
1448                                .send_initial_metadata->list.head)) {
1449     /* Cronet does not support :authority header field. We cancel the call when
1450      this field is present in metadata */
1451     if (op->recv_initial_metadata) {
1452       grpc_core::ExecCtx::Run(
1453           DEBUG_LOCATION,
1454           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1455           GRPC_ERROR_CANCELLED);
1456     }
1457     if (op->recv_message) {
1458       grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1459                               op->payload->recv_message.recv_message_ready,
1460                               GRPC_ERROR_CANCELLED);
1461     }
1462     if (op->recv_trailing_metadata) {
1463       grpc_core::ExecCtx::Run(
1464           DEBUG_LOCATION,
1465           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1466           GRPC_ERROR_CANCELLED);
1467     }
1468     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1469                             GRPC_ERROR_CANCELLED);
1470     return;
1471   }
1472   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1473   add_to_storage(s, op);
1474   execute_from_storage(s);
1475 }
1476 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1477 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1478                            grpc_closure* then_schedule_closure) {
1479   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1480   s->~stream_obj();
1481   grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1482                           GRPC_ERROR_NONE);
1483 }
1484 
destroy_transport(grpc_transport * gt)1485 static void destroy_transport(grpc_transport* gt) {}
1486 
get_endpoint(grpc_transport * gt)1487 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1488 
perform_op(grpc_transport * gt,grpc_transport_op * op)1489 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1490 
1491 static const grpc_transport_vtable grpc_cronet_vtable = {
1492     sizeof(stream_obj),
1493     "cronet_http",
1494     init_stream,
1495     set_pollset_do_nothing,
1496     set_pollset_set_do_nothing,
1497     perform_stream_op,
1498     perform_op,
1499     destroy_stream,
1500     destroy_transport,
1501     get_endpoint};
1502 
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void * reserved)1503 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1504                                              const grpc_channel_args* args,
1505                                              void* reserved) {
1506   grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1507       gpr_malloc(sizeof(grpc_cronet_transport)));
1508   if (!ct) {
1509     goto error;
1510   }
1511   ct->base.vtable = &grpc_cronet_vtable;
1512   ct->engine = static_cast<stream_engine*>(engine);
1513   ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1514   if (!ct->host) {
1515     goto error;
1516   }
1517   strcpy(ct->host, target);
1518 
1519   ct->use_packet_coalescing = true;
1520   if (args) {
1521     for (size_t i = 0; i < args->num_args; i++) {
1522       if (0 ==
1523           strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1524         if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1525           gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1526                   GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1527         } else {
1528           ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1529         }
1530       }
1531     }
1532   }
1533 
1534   return &ct->base;
1535 
1536 error:
1537   if (ct) {
1538     if (ct->host) {
1539       gpr_free(ct->host);
1540     }
1541     gpr_free(ct);
1542   }
1543 
1544   return nullptr;
1545 }
1546