1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <string.h>
22 
23 #include <string>
24 
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/str_format.h"
27 
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 
32 #include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
33 #include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
34 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
35 #include "src/core/ext/transport/cronet/transport/cronet_status.h"
36 #include "src/core/ext/transport/cronet/transport/cronet_transport.h"
37 #include "src/core/lib/debug/trace.h"
38 #include "src/core/lib/gpr/string.h"
39 #include "src/core/lib/gprpp/manual_constructor.h"
40 #include "src/core/lib/iomgr/endpoint.h"
41 #include "src/core/lib/iomgr/exec_ctx.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "src/core/lib/slice/slice_string_helpers.h"
44 #include "src/core/lib/surface/channel.h"
45 #include "src/core/lib/surface/validate_metadata.h"
46 #include "src/core/lib/transport/metadata_batch.h"
47 #include "src/core/lib/transport/static_metadata.h"
48 #include "src/core/lib/transport/timeout_encoding.h"
49 #include "src/core/lib/transport/transport_impl.h"
50 
51 #include "third_party/objective_c/Cronet/bidirectional_stream_c.h"
52 
53 #define GRPC_HEADER_SIZE_IN_BYTES 5
54 #define GRPC_FLUSH_READ_SIZE 4096
55 
56 grpc_core::TraceFlag grpc_cronet_trace(false, "cronet");
57 #define CRONET_LOG(...)                                    \
58   do {                                                     \
59     if (grpc_cronet_trace.enabled()) gpr_log(__VA_ARGS__); \
60   } while (0)
61 
62 enum e_op_result {
63   ACTION_TAKEN_WITH_CALLBACK,
64   ACTION_TAKEN_NO_CALLBACK,
65   NO_ACTION_POSSIBLE
66 };
67 
68 enum e_op_id {
69   OP_SEND_INITIAL_METADATA = 0,
70   OP_SEND_MESSAGE,
71   OP_SEND_TRAILING_METADATA,
72   OP_RECV_MESSAGE,
73   OP_RECV_INITIAL_METADATA,
74   OP_RECV_TRAILING_METADATA,
75   OP_CANCEL_ERROR,
76   OP_ON_COMPLETE,
77   OP_FAILED,
78   OP_SUCCEEDED,
79   OP_CANCELED,
80   OP_RECV_MESSAGE_AND_ON_COMPLETE,
81   OP_READ_REQ_MADE,
82   OP_NUM_OPS
83 };
84 
85 /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
86 
87 static void on_stream_ready(bidirectional_stream*);
88 static void on_response_headers_received(
89     bidirectional_stream*, const bidirectional_stream_header_array*,
90     const char*);
91 static void on_write_completed(bidirectional_stream*, const char*);
92 static void on_read_completed(bidirectional_stream*, char*, int);
93 static void on_response_trailers_received(
94     bidirectional_stream*, const bidirectional_stream_header_array*);
95 static void on_succeeded(bidirectional_stream*);
96 static void on_failed(bidirectional_stream*, int);
97 static void on_canceled(bidirectional_stream*);
98 static bidirectional_stream_callback cronet_callbacks = {
99     on_stream_ready,
100     on_response_headers_received,
101     on_read_completed,
102     on_write_completed,
103     on_response_trailers_received,
104     on_succeeded,
105     on_failed,
106     on_canceled};
107 
108 /* Cronet transport object */
109 struct grpc_cronet_transport {
110   grpc_transport base; /* must be first element in this structure */
111   stream_engine* engine;
112   char* host;
113   bool use_packet_coalescing;
114 };
115 typedef struct grpc_cronet_transport grpc_cronet_transport;
116 
117 /* TODO (makdharma): reorder structure for memory efficiency per
118    http://www.catb.org/esr/structure-packing/#_structure_reordering: */
119 struct read_state {
read_stateread_state120   read_state(grpc_core::Arena* arena)
121       : trailing_metadata(arena), initial_metadata(arena) {
122     grpc_slice_buffer_init(&read_slice_buffer);
123   }
124 
125   /* vars to store data coming from server */
126   char* read_buffer = nullptr;
127   bool length_field_received = false;
128   int received_bytes = 0;
129   int remaining_bytes = 0;
130   int length_field = 0;
131   bool compressed = false;
132   char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES] = {};
133   char* payload_field = nullptr;
134   bool read_stream_closed = false;
135 
136   /* vars for holding data destined for the application */
137   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
138   grpc_slice_buffer read_slice_buffer;
139 
140   /* vars for trailing metadata */
141   grpc_chttp2_incoming_metadata_buffer trailing_metadata;
142   bool trailing_metadata_valid = false;
143 
144   /* vars for initial metadata */
145   grpc_chttp2_incoming_metadata_buffer initial_metadata;
146 };
147 
148 struct write_state {
149   char* write_buffer = nullptr;
150 };
151 
152 /* track state of one stream op */
153 struct op_state {
op_stateop_state154   op_state(grpc_core::Arena* arena) : rs(arena) {}
155 
156   bool state_op_done[OP_NUM_OPS] = {};
157   bool state_callback_received[OP_NUM_OPS] = {};
158   /* A non-zero gRPC status code has been seen */
159   bool fail_state = false;
160   /* Transport is discarding all buffered messages */
161   bool flush_read = false;
162   bool flush_cronet_when_ready = false;
163   bool pending_write_for_trailer = false;
164   bool pending_send_message = false;
165   /* User requested RECV_TRAILING_METADATA */
166   bool pending_recv_trailing_metadata = false;
167   cronet_net_error_code net_error = OK;
168   grpc_error* cancel_error = GRPC_ERROR_NONE;
169   /* data structure for storing data coming from server */
170   struct read_state rs;
171   /* data structure for storing data going to the server */
172   struct write_state ws;
173 };
174 
175 struct stream_obj;
176 
177 struct op_and_state {
178   op_and_state(stream_obj* s, const grpc_transport_stream_op_batch& op);
179 
180   grpc_transport_stream_op_batch op;
181   struct op_state state;
182   bool done = false;
183   struct stream_obj* s; /* Pointer back to the stream object */
184   /* next op_and_state in the linked list */
185   struct op_and_state* next = nullptr;
186 };
187 
188 struct op_storage {
189   int num_pending_ops = 0;
190   struct op_and_state* head = nullptr;
191 };
192 
193 struct stream_obj {
194   stream_obj(grpc_transport* gt, grpc_stream* gs,
195              grpc_stream_refcount* refcount, grpc_core::Arena* arena);
196   ~stream_obj();
197 
198   grpc_core::Arena* arena;
199   struct op_and_state* oas = nullptr;
200   grpc_transport_stream_op_batch* curr_op = nullptr;
201   grpc_cronet_transport* curr_ct;
202   grpc_stream* curr_gs;
203   bidirectional_stream* cbs = nullptr;
204   bidirectional_stream_header_array header_array =
205       bidirectional_stream_header_array();  // Zero-initialize the structure.
206 
207   /* Stream level state. Some state will be tracked both at stream and stream_op
208    * level */
209   struct op_state state;
210 
211   /* OP storage */
212   struct op_storage storage;
213 
214   /* Mutex to protect storage */
215   gpr_mu mu;
216 
217   /* Refcount object of the stream */
218   grpc_stream_refcount* refcount;
219 };
220 
221 #ifndef NDEBUG
222 #define GRPC_CRONET_STREAM_REF(stream, reason) \
223   grpc_cronet_stream_ref((stream), (reason))
224 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
225   grpc_cronet_stream_unref((stream), (reason))
grpc_cronet_stream_ref(stream_obj * s,const char * reason)226 void grpc_cronet_stream_ref(stream_obj* s, const char* reason) {
227   grpc_stream_ref(s->refcount, reason);
228 }
grpc_cronet_stream_unref(stream_obj * s,const char * reason)229 void grpc_cronet_stream_unref(stream_obj* s, const char* reason) {
230   grpc_stream_unref(s->refcount, reason);
231 }
232 #else
233 #define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
234 #define GRPC_CRONET_STREAM_UNREF(stream, reason) \
235   grpc_cronet_stream_unref((stream))
grpc_cronet_stream_ref(stream_obj * s)236 void grpc_cronet_stream_ref(stream_obj* s) { grpc_stream_ref(s->refcount); }
grpc_cronet_stream_unref(stream_obj * s)237 void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); }
238 #endif
239 
240 static enum e_op_result execute_stream_op(struct op_and_state* oas);
241 
242 /*
243   Utility function to translate enum into string for printing
244 */
op_result_string(enum e_op_result i)245 static const char* op_result_string(enum e_op_result i) {
246   switch (i) {
247     case ACTION_TAKEN_WITH_CALLBACK:
248       return "ACTION_TAKEN_WITH_CALLBACK";
249     case ACTION_TAKEN_NO_CALLBACK:
250       return "ACTION_TAKEN_NO_CALLBACK";
251     case NO_ACTION_POSSIBLE:
252       return "NO_ACTION_POSSIBLE";
253   }
254   GPR_UNREACHABLE_CODE(return "UNKNOWN");
255 }
256 
op_id_string(enum e_op_id i)257 static const char* op_id_string(enum e_op_id i) {
258   switch (i) {
259     case OP_SEND_INITIAL_METADATA:
260       return "OP_SEND_INITIAL_METADATA";
261     case OP_SEND_MESSAGE:
262       return "OP_SEND_MESSAGE";
263     case OP_SEND_TRAILING_METADATA:
264       return "OP_SEND_TRAILING_METADATA";
265     case OP_RECV_MESSAGE:
266       return "OP_RECV_MESSAGE";
267     case OP_RECV_INITIAL_METADATA:
268       return "OP_RECV_INITIAL_METADATA";
269     case OP_RECV_TRAILING_METADATA:
270       return "OP_RECV_TRAILING_METADATA";
271     case OP_CANCEL_ERROR:
272       return "OP_CANCEL_ERROR";
273     case OP_ON_COMPLETE:
274       return "OP_ON_COMPLETE";
275     case OP_FAILED:
276       return "OP_FAILED";
277     case OP_SUCCEEDED:
278       return "OP_SUCCEEDED";
279     case OP_CANCELED:
280       return "OP_CANCELED";
281     case OP_RECV_MESSAGE_AND_ON_COMPLETE:
282       return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
283     case OP_READ_REQ_MADE:
284       return "OP_READ_REQ_MADE";
285     case OP_NUM_OPS:
286       return "OP_NUM_OPS";
287   }
288   return "UNKNOWN";
289 }
290 
null_and_maybe_free_read_buffer(stream_obj * s)291 static void null_and_maybe_free_read_buffer(stream_obj* s) {
292   if (s->state.rs.read_buffer &&
293       s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
294     gpr_free(s->state.rs.read_buffer);
295   }
296   s->state.rs.read_buffer = nullptr;
297 }
298 
read_grpc_header(stream_obj * s)299 static void read_grpc_header(stream_obj* s) {
300   s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
301   s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
302   s->state.rs.received_bytes = 0;
303   s->state.rs.compressed = false;
304   CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
305   bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
306                             s->state.rs.remaining_bytes);
307 }
308 
make_error_with_desc(int error_code,int cronet_internal_error_code,const char * desc)309 static grpc_error* make_error_with_desc(int error_code,
310                                         int cronet_internal_error_code,
311                                         const char* desc) {
312   std::string error_message =
313       absl::StrFormat("Cronet error code:%d, Cronet error detail:%s",
314                       cronet_internal_error_code, desc);
315   grpc_error* error =
316       GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_message.c_str());
317   error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, error_code);
318   return error;
319 }
320 
op_and_state(stream_obj * s,const grpc_transport_stream_op_batch & op)321 inline op_and_state::op_and_state(stream_obj* s,
322                                   const grpc_transport_stream_op_batch& op)
323     : op(op), state(s->arena), s(s) {}
324 
325 /*
326   Add a new stream op to op storage.
327 */
add_to_storage(struct stream_obj * s,grpc_transport_stream_op_batch * op)328 static void add_to_storage(struct stream_obj* s,
329                            grpc_transport_stream_op_batch* op) {
330   struct op_storage* storage = &s->storage;
331   /* add new op at the beginning of the linked list. The memory is freed
332   in remove_from_storage */
333   op_and_state* new_op = new op_and_state(s, *op);
334   gpr_mu_lock(&s->mu);
335   new_op->next = storage->head;
336   storage->head = new_op;
337   storage->num_pending_ops++;
338   if (op->send_message) {
339     s->state.pending_send_message = true;
340   }
341   if (op->recv_trailing_metadata) {
342     s->state.pending_recv_trailing_metadata = true;
343   }
344   CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
345              storage->num_pending_ops);
346   gpr_mu_unlock(&s->mu);
347 }
348 
349 /*
350   Traverse the linked list and delete op and free memory
351 */
remove_from_storage(struct stream_obj * s,struct op_and_state * oas)352 static void remove_from_storage(struct stream_obj* s,
353                                 struct op_and_state* oas) {
354   struct op_and_state* curr;
355   if (s->storage.head == nullptr || oas == nullptr) {
356     return;
357   }
358   if (s->storage.head == oas) {
359     s->storage.head = oas->next;
360     delete oas;
361     s->storage.num_pending_ops--;
362     CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
363                s->storage.num_pending_ops);
364   } else {
365     for (curr = s->storage.head; curr != nullptr; curr = curr->next) {
366       if (curr->next == oas) {
367         curr->next = oas->next;
368         s->storage.num_pending_ops--;
369         CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
370                    s->storage.num_pending_ops);
371         delete oas;
372         break;
373       } else if (GPR_UNLIKELY(curr->next == nullptr)) {
374         CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
375       }
376     }
377   }
378 }
379 
380 /*
381   Cycle through ops and try to take next action. Break when either
382   an action with callback is taken, or no action is possible.
383   This can get executed from the Cronet network thread via cronet callback
384   or on the application supplied thread via the perform_stream_op function.
385 */
execute_from_storage(stream_obj * s)386 static void execute_from_storage(stream_obj* s) {
387   gpr_mu_lock(&s->mu);
388   for (struct op_and_state* curr = s->storage.head; curr != nullptr;) {
389     CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
390     GPR_ASSERT(!curr->done);
391     enum e_op_result result = execute_stream_op(curr);
392     CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
393                op_result_string(result));
394     /* if this op is done, then remove it and free memory */
395     if (curr->done) {
396       struct op_and_state* next = curr->next;
397       remove_from_storage(s, curr);
398       curr = next;
399     } else if (result == NO_ACTION_POSSIBLE) {
400       curr = curr->next;
401     } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
402       /* wait for the callback */
403       break;
404     } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
405   }
406   gpr_mu_unlock(&s->mu);
407 }
408 
convert_cronet_array_to_metadata(const bidirectional_stream_header_array * header_array,grpc_chttp2_incoming_metadata_buffer * mds)409 static void convert_cronet_array_to_metadata(
410     const bidirectional_stream_header_array* header_array,
411     grpc_chttp2_incoming_metadata_buffer* mds) {
412   for (size_t i = 0; i < header_array->count; i++) {
413     CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s",
414                header_array->headers[i].key, header_array->headers[i].value);
415     grpc_slice key = grpc_slice_intern(
416         grpc_slice_from_static_string(header_array->headers[i].key));
417     grpc_slice value;
418     if (grpc_is_refcounted_slice_binary_header(key)) {
419       value = grpc_slice_from_static_string(header_array->headers[i].value);
420       value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length(
421           value, grpc_chttp2_base64_infer_length_after_decode(value)));
422     } else {
423       value = grpc_slice_intern(
424           grpc_slice_from_static_string(header_array->headers[i].value));
425     }
426     GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata",
427                       grpc_chttp2_incoming_metadata_buffer_add(
428                           mds, grpc_mdelem_from_slices(key, value)));
429   }
430 }
431 
432 /*
433   Cronet callback
434 */
on_failed(bidirectional_stream * stream,int net_error)435 static void on_failed(bidirectional_stream* stream, int net_error) {
436   gpr_log(GPR_ERROR, "on_failed(%p, %d)", stream, net_error);
437   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
438   grpc_core::ExecCtx exec_ctx;
439 
440   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
441   gpr_mu_lock(&s->mu);
442   bidirectional_stream_destroy(s->cbs);
443   s->state.state_callback_received[OP_FAILED] = true;
444   s->state.net_error = static_cast<cronet_net_error_code>(net_error);
445   s->cbs = nullptr;
446   if (s->header_array.headers) {
447     gpr_free(s->header_array.headers);
448     s->header_array.headers = nullptr;
449   }
450   if (s->state.ws.write_buffer) {
451     gpr_free(s->state.ws.write_buffer);
452     s->state.ws.write_buffer = nullptr;
453   }
454   null_and_maybe_free_read_buffer(s);
455   gpr_mu_unlock(&s->mu);
456   execute_from_storage(s);
457   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
458 }
459 
460 /*
461   Cronet callback
462 */
on_canceled(bidirectional_stream * stream)463 static void on_canceled(bidirectional_stream* stream) {
464   CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
465   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
466   grpc_core::ExecCtx exec_ctx;
467 
468   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
469   gpr_mu_lock(&s->mu);
470   bidirectional_stream_destroy(s->cbs);
471   s->state.state_callback_received[OP_CANCELED] = true;
472   s->cbs = nullptr;
473   if (s->header_array.headers) {
474     gpr_free(s->header_array.headers);
475     s->header_array.headers = nullptr;
476   }
477   if (s->state.ws.write_buffer) {
478     gpr_free(s->state.ws.write_buffer);
479     s->state.ws.write_buffer = nullptr;
480   }
481   null_and_maybe_free_read_buffer(s);
482   gpr_mu_unlock(&s->mu);
483   execute_from_storage(s);
484   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
485 }
486 
487 /*
488   Cronet callback
489 */
on_succeeded(bidirectional_stream * stream)490 static void on_succeeded(bidirectional_stream* stream) {
491   CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
492   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
493   grpc_core::ExecCtx exec_ctx;
494 
495   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
496   gpr_mu_lock(&s->mu);
497   bidirectional_stream_destroy(s->cbs);
498   s->state.state_callback_received[OP_SUCCEEDED] = true;
499   s->cbs = nullptr;
500   null_and_maybe_free_read_buffer(s);
501   gpr_mu_unlock(&s->mu);
502   execute_from_storage(s);
503   GRPC_CRONET_STREAM_UNREF(s, "cronet transport");
504 }
505 
506 /*
507   Cronet callback
508 */
on_stream_ready(bidirectional_stream * stream)509 static void on_stream_ready(bidirectional_stream* stream) {
510   CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
511   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
512   grpc_core::ExecCtx exec_ctx;
513   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
514   grpc_cronet_transport* t = s->curr_ct;
515   gpr_mu_lock(&s->mu);
516   s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
517   s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
518   /* Free the memory allocated for headers */
519   if (s->header_array.headers) {
520     gpr_free(s->header_array.headers);
521     s->header_array.headers = nullptr;
522   }
523   /* Send the initial metadata on wire if there is no SEND_MESSAGE or
524    * SEND_TRAILING_METADATA ops pending */
525   if (t->use_packet_coalescing) {
526     if (s->state.flush_cronet_when_ready) {
527       CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
528       bidirectional_stream_flush(stream);
529     }
530   }
531   gpr_mu_unlock(&s->mu);
532   execute_from_storage(s);
533 }
534 
535 /*
536   Cronet callback
537 */
on_response_headers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * headers,const char * negotiated_protocol)538 static void on_response_headers_received(
539     bidirectional_stream* stream,
540     const bidirectional_stream_header_array* headers,
541     const char* negotiated_protocol) {
542   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
543   grpc_core::ExecCtx exec_ctx;
544   CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
545              headers, negotiated_protocol);
546   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
547 
548   /* Identify if this is a header or a trailer (in a trailer-only response case)
549    */
550   for (size_t i = 0; i < headers->count; i++) {
551     if (0 == strcmp("grpc-status", headers->headers[i].key)) {
552       on_response_trailers_received(stream, headers);
553 
554       /* Do an extra read for a trailer-only stream to trigger on_succeeded()
555        * callback */
556       read_grpc_header(s);
557       return;
558     }
559   }
560 
561   gpr_mu_lock(&s->mu);
562   convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata);
563   s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
564   if (!(s->state.state_op_done[OP_CANCEL_ERROR] ||
565         s->state.state_callback_received[OP_FAILED])) {
566     /* Do an extra read to trigger on_succeeded() callback in case connection
567      is closed */
568     GPR_ASSERT(s->state.rs.length_field_received == false);
569     read_grpc_header(s);
570   }
571   gpr_mu_unlock(&s->mu);
572   execute_from_storage(s);
573 }
574 
575 /*
576   Cronet callback
577 */
on_write_completed(bidirectional_stream * stream,const char * data)578 static void on_write_completed(bidirectional_stream* stream, const char* data) {
579   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
580   grpc_core::ExecCtx exec_ctx;
581   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
582   CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
583   gpr_mu_lock(&s->mu);
584   if (s->state.ws.write_buffer) {
585     gpr_free(s->state.ws.write_buffer);
586     s->state.ws.write_buffer = nullptr;
587   }
588   s->state.state_callback_received[OP_SEND_MESSAGE] = true;
589   gpr_mu_unlock(&s->mu);
590   execute_from_storage(s);
591 }
592 
593 /*
594   Cronet callback
595 */
on_read_completed(bidirectional_stream * stream,char * data,int count)596 static void on_read_completed(bidirectional_stream* stream, char* data,
597                               int count) {
598   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
599   grpc_core::ExecCtx exec_ctx;
600   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
601   CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
602              count);
603   gpr_mu_lock(&s->mu);
604   s->state.state_callback_received[OP_RECV_MESSAGE] = true;
605   if (count > 0 && s->state.flush_read) {
606     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
607     bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
608                               GRPC_FLUSH_READ_SIZE);
609     gpr_mu_unlock(&s->mu);
610   } else if (count > 0) {
611     s->state.rs.received_bytes += count;
612     s->state.rs.remaining_bytes -= count;
613     if (s->state.rs.remaining_bytes > 0) {
614       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
615       s->state.state_op_done[OP_READ_REQ_MADE] = true;
616       bidirectional_stream_read(
617           s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
618           s->state.rs.remaining_bytes);
619       gpr_mu_unlock(&s->mu);
620     } else {
621       gpr_mu_unlock(&s->mu);
622       execute_from_storage(s);
623     }
624   } else {
625     null_and_maybe_free_read_buffer(s);
626     s->state.rs.read_stream_closed = true;
627     gpr_mu_unlock(&s->mu);
628     execute_from_storage(s);
629   }
630 }
631 
632 /*
633   Cronet callback
634 */
on_response_trailers_received(bidirectional_stream * stream,const bidirectional_stream_header_array * trailers)635 static void on_response_trailers_received(
636     bidirectional_stream* stream,
637     const bidirectional_stream_header_array* trailers) {
638   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
639   grpc_core::ExecCtx exec_ctx;
640   CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
641              trailers);
642   stream_obj* s = static_cast<stream_obj*>(stream->annotation);
643   grpc_cronet_transport* t = s->curr_ct;
644   gpr_mu_lock(&s->mu);
645   s->state.rs.trailing_metadata_valid = false;
646   convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata);
647   if (trailers->count > 0) {
648     s->state.rs.trailing_metadata_valid = true;
649   }
650   s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
651   /* Send a EOS when server terminates the stream (testServerFinishesRequest) to
652    * trigger on_succeeded */
653   if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
654       !(s->state.state_op_done[OP_CANCEL_ERROR] ||
655         s->state.state_callback_received[OP_FAILED])) {
656     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
657     s->state.state_callback_received[OP_SEND_MESSAGE] = false;
658     bidirectional_stream_write(s->cbs, "", 0, true);
659     if (t->use_packet_coalescing) {
660       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
661       bidirectional_stream_flush(s->cbs);
662     }
663     s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
664 
665     gpr_mu_unlock(&s->mu);
666   } else {
667     gpr_mu_unlock(&s->mu);
668     execute_from_storage(s);
669   }
670 }
671 
672 /*
673  Utility function that takes the data from s->write_slice_buffer and assembles
674  into a contiguous byte stream with 5 byte gRPC header prepended.
675 */
create_grpc_frame(grpc_slice_buffer * write_slice_buffer,char ** pp_write_buffer,size_t * p_write_buffer_size,uint32_t flags)676 static void create_grpc_frame(grpc_slice_buffer* write_slice_buffer,
677                               char** pp_write_buffer,
678                               size_t* p_write_buffer_size, uint32_t flags) {
679   size_t length = write_slice_buffer->length;
680   *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
681   /* This is freed in the on_write_completed callback */
682   char* write_buffer =
683       static_cast<char*>(gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES));
684   *pp_write_buffer = write_buffer;
685   uint8_t* p = reinterpret_cast<uint8_t*>(write_buffer);
686   /* Append 5 byte header */
687   /* Compressed flag */
688   *p++ = static_cast<uint8_t>((flags & GRPC_WRITE_INTERNAL_COMPRESS) ? 1 : 0);
689   /* Message length */
690   *p++ = static_cast<uint8_t>(length >> 24);
691   *p++ = static_cast<uint8_t>(length >> 16);
692   *p++ = static_cast<uint8_t>(length >> 8);
693   *p++ = static_cast<uint8_t>(length);
694   /* append actual data */
695   size_t offset = 0;
696   for (size_t i = 0; i < write_slice_buffer->count; ++i) {
697     memcpy(p + offset, GRPC_SLICE_START_PTR(write_slice_buffer->slices[i]),
698            GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]));
699     offset += GRPC_SLICE_LENGTH(write_slice_buffer->slices[i]);
700   }
701 }
702 
703 /*
704  Convert metadata in a format that Cronet can consume
705 */
convert_metadata_to_cronet_headers(grpc_metadata_batch * metadata,const char * host,std::string * pp_url,bidirectional_stream_header ** pp_headers,size_t * p_num_headers,const char ** method)706 static void convert_metadata_to_cronet_headers(
707     grpc_metadata_batch* metadata, const char* host, std::string* pp_url,
708     bidirectional_stream_header** pp_headers, size_t* p_num_headers,
709     const char** method) {
710   grpc_linked_mdelem* curr = metadata->list.head;
711   /* Walk the linked list and get number of header fields */
712   size_t num_headers_available = 0;
713   while (curr != nullptr) {
714     curr = curr->next;
715     num_headers_available++;
716   }
717   grpc_millis deadline = metadata->deadline;
718   if (deadline != GRPC_MILLIS_INF_FUTURE) {
719     num_headers_available++;
720   }
721   /* Allocate enough memory. It is freed in the on_stream_ready callback
722    */
723   bidirectional_stream_header* headers =
724       static_cast<bidirectional_stream_header*>(gpr_malloc(
725           sizeof(bidirectional_stream_header) * num_headers_available));
726   *pp_headers = headers;
727 
728   /* Walk the linked list again, this time copying the header fields.
729     s->num_headers can be less than num_headers_available, as some headers
730     are not used for cronet.
731     TODO (makdharma): Eliminate need to traverse the LL second time for perf.
732    */
733   curr = metadata->list.head;
734   size_t num_headers = 0;
735   while (num_headers < num_headers_available) {
736     grpc_mdelem mdelem = curr->md;
737     curr = curr->next;
738     char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem));
739     char* value;
740     if (grpc_is_binary_header_internal(GRPC_MDKEY(mdelem))) {
741       grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
742       value = grpc_slice_to_c_string(wire_value);
743       grpc_slice_unref_internal(wire_value);
744     } else {
745       value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
746     }
747     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) ||
748         grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem),
749                                       GRPC_MDSTR_AUTHORITY)) {
750       /* Cronet populates these fields on its own */
751       gpr_free(key);
752       gpr_free(value);
753       continue;
754     }
755     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_METHOD)) {
756       if (grpc_slice_eq_static_interned(GRPC_MDVALUE(mdelem), GRPC_MDSTR_PUT)) {
757         *method = "PUT";
758       } else {
759         /* POST method in default*/
760         *method = "POST";
761       }
762       gpr_free(key);
763       gpr_free(value);
764       continue;
765     }
766     if (grpc_slice_eq_static_interned(GRPC_MDKEY(mdelem), GRPC_MDSTR_PATH)) {
767       /* Create URL by appending :path value to the hostname */
768       *pp_url = absl::StrCat("https://", host, value);
769       gpr_free(key);
770       gpr_free(value);
771       continue;
772     }
773     CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
774     headers[num_headers].key = key;
775     headers[num_headers].value = value;
776     num_headers++;
777     if (curr == nullptr) {
778       break;
779     }
780   }
781   if (deadline != GRPC_MILLIS_INF_FUTURE) {
782     char* key = grpc_slice_to_c_string(GRPC_MDSTR_GRPC_TIMEOUT);
783     char* value =
784         static_cast<char*>(gpr_malloc(GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE));
785     grpc_http2_encode_timeout(deadline - grpc_core::ExecCtx::Get()->Now(),
786                               value);
787     headers[num_headers].key = key;
788     headers[num_headers].value = value;
789 
790     num_headers++;
791   }
792 
793   *p_num_headers = num_headers;
794 }
795 
parse_grpc_header(const uint8_t * data,int * length,bool * compressed)796 static void parse_grpc_header(const uint8_t* data, int* length,
797                               bool* compressed) {
798   const uint8_t c = *data;
799   const uint8_t* p = data + 1;
800   *compressed = ((c & 0x01) == 0x01);
801   *length = 0;
802   *length |= (*p++) << 24;
803   *length |= (*p++) << 16;
804   *length |= (*p++) << 8;
805   *length |= (*p++);
806 }
807 
header_has_authority(grpc_linked_mdelem * head)808 static bool header_has_authority(grpc_linked_mdelem* head) {
809   while (head != nullptr) {
810     if (grpc_slice_eq_static_interned(GRPC_MDKEY(head->md),
811                                       GRPC_MDSTR_AUTHORITY)) {
812       return true;
813     }
814     head = head->next;
815   }
816   return false;
817 }
818 
819 /*
820   Op Execution: Decide if one of the actions contained in the stream op can be
821   executed. This is the heart of the state machine.
822 */
op_can_be_run(grpc_transport_stream_op_batch * curr_op,struct stream_obj * s,struct op_state * op_state,enum e_op_id op_id)823 static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
824                           struct stream_obj* s, struct op_state* op_state,
825                           enum e_op_id op_id) {
826   struct op_state* stream_state = &s->state;
827   grpc_cronet_transport* t = s->curr_ct;
828   bool result = true;
829   /* When call is canceled, every op can be run, except under following
830   conditions
831   */
832   bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
833                                stream_state->state_callback_received[OP_FAILED];
834   if (is_canceled_or_failed) {
835     if (op_id == OP_SEND_INITIAL_METADATA) {
836       CRONET_LOG(GPR_DEBUG, "Because");
837       result = false;
838     }
839     if (op_id == OP_SEND_MESSAGE) {
840       CRONET_LOG(GPR_DEBUG, "Because");
841       result = false;
842     }
843     if (op_id == OP_SEND_TRAILING_METADATA) {
844       CRONET_LOG(GPR_DEBUG, "Because");
845       result = false;
846     }
847     if (op_id == OP_CANCEL_ERROR) {
848       CRONET_LOG(GPR_DEBUG, "Because");
849       result = false;
850     }
851     /* already executed */
852     if (op_id == OP_RECV_INITIAL_METADATA &&
853         stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
854       CRONET_LOG(GPR_DEBUG, "Because");
855       result = false;
856     }
857     if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
858       CRONET_LOG(GPR_DEBUG, "Because");
859       result = false;
860     }
861     if (op_id == OP_RECV_TRAILING_METADATA &&
862         stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
863       CRONET_LOG(GPR_DEBUG, "Because");
864       result = false;
865     }
866     /* ON_COMPLETE can be processed if one of the following conditions is met:
867      * 1. the stream failed
868      * 2. the stream is cancelled, and the callback is received
869      * 3. the stream succeeded before cancel is effective
870      * 4. the stream is cancelled, and the stream is never started */
871     if (op_id == OP_ON_COMPLETE &&
872         !(stream_state->state_callback_received[OP_FAILED] ||
873           stream_state->state_callback_received[OP_CANCELED] ||
874           stream_state->state_callback_received[OP_SUCCEEDED] ||
875           !stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
876       CRONET_LOG(GPR_DEBUG, "Because");
877       result = false;
878     }
879   } else if (op_id == OP_SEND_INITIAL_METADATA) {
880     /* already executed */
881     if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
882   } else if (op_id == OP_RECV_INITIAL_METADATA) {
883     if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
884       /* already executed */
885       result = false;
886     } else if (!stream_state
887                     ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
888       /* we haven't sent headers yet. */
889       result = false;
890     } else if (!stream_state
891                     ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
892                !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
893       /* we haven't received headers yet. */
894       result = false;
895     }
896   } else if (op_id == OP_SEND_MESSAGE) {
897     if (op_state->state_op_done[OP_SEND_MESSAGE]) {
898       /* already executed (note we're checking op specific state, not stream
899          state) */
900       result = false;
901     } else if (!stream_state
902                     ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
903       /* we haven't sent headers yet. */
904       result = false;
905     }
906   } else if (op_id == OP_RECV_MESSAGE) {
907     if (op_state->state_op_done[OP_RECV_MESSAGE]) {
908       /* already executed */
909       result = false;
910     } else if (!stream_state
911                     ->state_callback_received[OP_RECV_INITIAL_METADATA] &&
912                !stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
913       /* we haven't received headers yet. */
914       result = false;
915     }
916   } else if (op_id == OP_RECV_TRAILING_METADATA) {
917     if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
918       /* already executed */
919       result = false;
920     } else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
921                !stream_state->state_op_done[OP_RECV_MESSAGE]) {
922       /* we have asked for but haven't received message yet. */
923       result = false;
924     } else if (!stream_state
925                     ->state_callback_received[OP_RECV_TRAILING_METADATA]) {
926       /* we haven't received trailers  yet. */
927       result = false;
928     } else if (!stream_state->state_callback_received[OP_SUCCEEDED]) {
929       /* we haven't received on_succeeded  yet. */
930       result = false;
931     }
932   } else if (op_id == OP_SEND_TRAILING_METADATA) {
933     if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
934       /* already executed */
935       result = false;
936     } else if (!stream_state
937                     ->state_callback_received[OP_SEND_INITIAL_METADATA]) {
938       /* we haven't sent initial metadata yet */
939       result = false;
940     } else if (stream_state->pending_send_message &&
941                !stream_state->state_op_done[OP_SEND_MESSAGE]) {
942       /* we haven't sent message yet */
943       result = false;
944     } else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
945                !stream_state->state_callback_received[OP_SEND_MESSAGE] &&
946                !(t->use_packet_coalescing &&
947                  stream_state->pending_write_for_trailer)) {
948       /* we haven't got on_write_completed for the send yet */
949       result = false;
950     }
951   } else if (op_id == OP_CANCEL_ERROR) {
952     /* already executed */
953     if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
954   } else if (op_id == OP_ON_COMPLETE) {
955     if (op_state->state_op_done[OP_ON_COMPLETE]) {
956       /* already executed (note we're checking op specific state, not stream
957       state) */
958       CRONET_LOG(GPR_DEBUG, "Because");
959       result = false;
960     }
961     /* Check if every op that was asked for is done. */
962     /* TODO(muxi): We should not consider the recv ops here, since they
963      * have their own callbacks.  We should invoke a batch's on_complete
964      * as soon as all of the batch's send ops are complete, even if
965      * there are still recv ops pending. */
966     else if (curr_op->send_initial_metadata &&
967              !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
968       CRONET_LOG(GPR_DEBUG, "Because");
969       result = false;
970     } else if (curr_op->send_message &&
971                !op_state->state_op_done[OP_SEND_MESSAGE]) {
972       CRONET_LOG(GPR_DEBUG, "Because");
973       result = false;
974     } else if (curr_op->send_message &&
975                !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
976       CRONET_LOG(GPR_DEBUG, "Because");
977       result = false;
978     } else if (curr_op->send_trailing_metadata &&
979                !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
980       CRONET_LOG(GPR_DEBUG, "Because");
981       result = false;
982     } else if (curr_op->recv_initial_metadata &&
983                !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
984       CRONET_LOG(GPR_DEBUG, "Because");
985       result = false;
986     } else if (curr_op->recv_message &&
987                !op_state->state_op_done[OP_RECV_MESSAGE]) {
988       CRONET_LOG(GPR_DEBUG, "Because");
989       result = false;
990     } else if (curr_op->cancel_stream &&
991                !stream_state->state_callback_received[OP_CANCELED]) {
992       CRONET_LOG(GPR_DEBUG, "Because");
993       result = false;
994     } else if (curr_op->recv_trailing_metadata) {
995       /* We aren't done with trailing metadata yet */
996       if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
997         CRONET_LOG(GPR_DEBUG, "Because");
998         result = false;
999       }
1000       /* We've asked for actual message in an earlier op, and it hasn't been
1001         delivered yet. */
1002       else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
1003         /* If this op is not the one asking for read, (which means some earlier
1004           op has asked), and the read hasn't been delivered. */
1005         if (!curr_op->recv_message &&
1006             !stream_state->state_callback_received[OP_SUCCEEDED]) {
1007           CRONET_LOG(GPR_DEBUG, "Because");
1008           result = false;
1009         }
1010       }
1011     }
1012     /* We should see at least one on_write_completed for the trailers that we
1013       sent */
1014     else if (curr_op->send_trailing_metadata &&
1015              !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
1016       result = false;
1017     }
1018   }
1019   CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
1020              result ? "YES" : "NO");
1021   return result;
1022 }
1023 
1024 /*
1025   TODO (makdharma): Break down this function in smaller chunks for readability.
1026 */
execute_stream_op(struct op_and_state * oas)1027 static enum e_op_result execute_stream_op(struct op_and_state* oas) {
1028   grpc_transport_stream_op_batch* stream_op = &oas->op;
1029   struct stream_obj* s = oas->s;
1030   grpc_cronet_transport* t = s->curr_ct;
1031   struct op_state* stream_state = &s->state;
1032   enum e_op_result result = NO_ACTION_POSSIBLE;
1033   if (stream_op->send_initial_metadata &&
1034       op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
1035     CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
1036     /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
1037      * on_failed */
1038     GPR_ASSERT(s->cbs == nullptr);
1039     GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
1040     s->cbs =
1041         bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
1042     CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
1043     if (t->use_packet_coalescing) {
1044       bidirectional_stream_disable_auto_flush(s->cbs, true);
1045       bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
1046     }
1047     std::string url;
1048     const char* method = "POST";
1049     s->header_array.headers = nullptr;
1050     convert_metadata_to_cronet_headers(
1051         stream_op->payload->send_initial_metadata.send_initial_metadata,
1052         t->host, &url, &s->header_array.headers, &s->header_array.count,
1053         &method);
1054     s->header_array.capacity = s->header_array.count;
1055     CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs,
1056                url.c_str());
1057     bidirectional_stream_start(s->cbs, url.c_str(), 0, method, &s->header_array,
1058                                false);
1059     unsigned int header_index;
1060     for (header_index = 0; header_index < s->header_array.count;
1061          header_index++) {
1062       gpr_free((void*)s->header_array.headers[header_index].key);
1063       gpr_free((void*)s->header_array.headers[header_index].value);
1064     }
1065     stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
1066     if (t->use_packet_coalescing) {
1067       if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
1068         s->state.flush_cronet_when_ready = true;
1069       }
1070     }
1071     result = ACTION_TAKEN_WITH_CALLBACK;
1072   } else if (stream_op->send_message &&
1073              op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
1074     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
1075     stream_state->pending_send_message = false;
1076     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1077         stream_state->state_callback_received[OP_FAILED] ||
1078         stream_state->state_callback_received[OP_SUCCEEDED]) {
1079       result = NO_ACTION_POSSIBLE;
1080       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1081     } else {
1082       grpc_slice_buffer write_slice_buffer;
1083       grpc_slice slice;
1084       grpc_slice_buffer_init(&write_slice_buffer);
1085       while (write_slice_buffer.length <
1086              stream_op->payload->send_message.send_message->length()) {
1087         /* TODO(roth): When we add support for incremental sending,this code
1088          * will need to be changed to support asynchronous delivery of the
1089          * send_message payload. */
1090         if (!stream_op->payload->send_message.send_message->Next(
1091                 stream_op->payload->send_message.send_message->length(),
1092                 nullptr)) {
1093           /* Should never reach here */
1094           GPR_ASSERT(false);
1095         }
1096         if (GRPC_ERROR_NONE !=
1097             stream_op->payload->send_message.send_message->Pull(&slice)) {
1098           /* Should never reach here */
1099           GPR_ASSERT(false);
1100         }
1101         grpc_slice_buffer_add(&write_slice_buffer, slice);
1102       }
1103       size_t write_buffer_size;
1104       create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
1105                         &write_buffer_size,
1106                         stream_op->payload->send_message.send_message->flags());
1107       if (write_buffer_size > 0) {
1108         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
1109                    stream_state->ws.write_buffer);
1110         stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1111         bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
1112                                    static_cast<int>(write_buffer_size), false);
1113         grpc_slice_buffer_destroy_internal(&write_slice_buffer);
1114         if (t->use_packet_coalescing) {
1115           if (!stream_op->send_trailing_metadata) {
1116             CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1117             bidirectional_stream_flush(s->cbs);
1118             result = ACTION_TAKEN_WITH_CALLBACK;
1119           } else {
1120             stream_state->pending_write_for_trailer = true;
1121             result = ACTION_TAKEN_NO_CALLBACK;
1122           }
1123         } else {
1124           result = ACTION_TAKEN_WITH_CALLBACK;
1125         }
1126       } else {
1127         /* Should never reach here */
1128         GPR_ASSERT(false);
1129       }
1130     }
1131     stream_state->state_op_done[OP_SEND_MESSAGE] = true;
1132     oas->state.state_op_done[OP_SEND_MESSAGE] = true;
1133     stream_op->payload->send_message.send_message.reset();
1134   } else if (stream_op->send_trailing_metadata &&
1135              op_can_be_run(stream_op, s, &oas->state,
1136                            OP_SEND_TRAILING_METADATA)) {
1137     CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
1138     if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
1139         stream_state->state_callback_received[OP_FAILED] ||
1140         stream_state->state_callback_received[OP_SUCCEEDED]) {
1141       result = NO_ACTION_POSSIBLE;
1142       CRONET_LOG(GPR_DEBUG, "Stream is either cancelled, failed or finished");
1143     } else {
1144       CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
1145       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1146       bidirectional_stream_write(s->cbs, "", 0, true);
1147       if (t->use_packet_coalescing) {
1148         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
1149         bidirectional_stream_flush(s->cbs);
1150       }
1151       result = ACTION_TAKEN_WITH_CALLBACK;
1152     }
1153     stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
1154   } else if (stream_op->recv_initial_metadata &&
1155              op_can_be_run(stream_op, s, &oas->state,
1156                            OP_RECV_INITIAL_METADATA)) {
1157     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
1158     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1159       grpc_core::ExecCtx::Run(
1160           DEBUG_LOCATION,
1161           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1162           GRPC_ERROR_NONE);
1163     } else if (stream_state->state_callback_received[OP_FAILED]) {
1164       grpc_core::ExecCtx::Run(
1165           DEBUG_LOCATION,
1166           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1167           GRPC_ERROR_NONE);
1168     } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
1169       grpc_core::ExecCtx::Run(
1170           DEBUG_LOCATION,
1171           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1172           GRPC_ERROR_NONE);
1173     } else {
1174       grpc_chttp2_incoming_metadata_buffer_publish(
1175           &oas->s->state.rs.initial_metadata,
1176           stream_op->payload->recv_initial_metadata.recv_initial_metadata);
1177       grpc_core::ExecCtx::Run(
1178           DEBUG_LOCATION,
1179           stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1180           GRPC_ERROR_NONE);
1181     }
1182     stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
1183     result = ACTION_TAKEN_NO_CALLBACK;
1184   } else if (stream_op->recv_message &&
1185              op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
1186     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
1187     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1188       CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
1189       grpc_core::ExecCtx::Run(
1190           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1191           GRPC_ERROR_NONE);
1192       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1193       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1194       result = ACTION_TAKEN_NO_CALLBACK;
1195     } else if (stream_state->state_callback_received[OP_FAILED]) {
1196       CRONET_LOG(GPR_DEBUG, "Stream failed.");
1197       grpc_core::ExecCtx::Run(
1198           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1199           GRPC_ERROR_NONE);
1200       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1201       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1202       result = ACTION_TAKEN_NO_CALLBACK;
1203     } else if (stream_state->rs.read_stream_closed == true) {
1204       /* No more data will be received */
1205       CRONET_LOG(GPR_DEBUG, "read stream closed");
1206       grpc_core::ExecCtx::Run(
1207           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1208           GRPC_ERROR_NONE);
1209       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1210       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1211       result = ACTION_TAKEN_NO_CALLBACK;
1212     } else if (stream_state->flush_read) {
1213       CRONET_LOG(GPR_DEBUG, "flush read");
1214       grpc_core::ExecCtx::Run(
1215           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1216           GRPC_ERROR_NONE);
1217       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1218       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1219       result = ACTION_TAKEN_NO_CALLBACK;
1220     } else if (stream_state->rs.length_field_received == false) {
1221       if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
1222           stream_state->rs.remaining_bytes == 0) {
1223         /* Start a read operation for data */
1224         stream_state->rs.length_field_received = true;
1225         parse_grpc_header(
1226             reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
1227             &stream_state->rs.length_field, &stream_state->rs.compressed);
1228         CRONET_LOG(GPR_DEBUG, "length field = %d",
1229                    stream_state->rs.length_field);
1230         if (stream_state->rs.length_field > 0) {
1231           stream_state->rs.read_buffer = static_cast<char*>(
1232               gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
1233           GPR_ASSERT(stream_state->rs.read_buffer);
1234           stream_state->rs.remaining_bytes = stream_state->rs.length_field;
1235           stream_state->rs.received_bytes = 0;
1236           CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1237           stream_state->state_op_done[OP_READ_REQ_MADE] =
1238               true; /* Indicates that at least one read request has been made */
1239           bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1240                                     stream_state->rs.remaining_bytes);
1241           result = ACTION_TAKEN_WITH_CALLBACK;
1242         } else {
1243           stream_state->rs.remaining_bytes = 0;
1244           CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
1245           /* Clean up read_slice_buffer in case there is unread data. */
1246           grpc_slice_buffer_destroy_internal(
1247               &stream_state->rs.read_slice_buffer);
1248           grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1249           uint32_t flags = 0;
1250           if (stream_state->rs.compressed) {
1251             flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1252           }
1253           stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1254           stream_op->payload->recv_message.recv_message->reset(
1255               stream_state->rs.sbs.get());
1256           grpc_core::ExecCtx::Run(
1257               DEBUG_LOCATION,
1258               stream_op->payload->recv_message.recv_message_ready,
1259               GRPC_ERROR_NONE);
1260           stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1261           oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1262 
1263           /* Extra read to trigger on_succeed */
1264           stream_state->rs.length_field_received = false;
1265           stream_state->state_op_done[OP_READ_REQ_MADE] =
1266               true; /* Indicates that at least one read request has been made */
1267           read_grpc_header(s);
1268           result = ACTION_TAKEN_NO_CALLBACK;
1269         }
1270       } else if (stream_state->rs.remaining_bytes == 0) {
1271         /* Start a read operation for first 5 bytes (GRPC header) */
1272         stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
1273         stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
1274         stream_state->rs.received_bytes = 0;
1275         stream_state->rs.compressed = false;
1276         CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
1277         stream_state->state_op_done[OP_READ_REQ_MADE] =
1278             true; /* Indicates that at least one read request has been made */
1279         bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
1280                                   stream_state->rs.remaining_bytes);
1281         result = ACTION_TAKEN_WITH_CALLBACK;
1282       } else {
1283         result = NO_ACTION_POSSIBLE;
1284       }
1285     } else if (stream_state->rs.remaining_bytes == 0) {
1286       CRONET_LOG(GPR_DEBUG, "read operation complete");
1287       grpc_slice read_data_slice =
1288           GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
1289       uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
1290       memcpy(dst_p, stream_state->rs.read_buffer,
1291              static_cast<size_t>(stream_state->rs.length_field));
1292       null_and_maybe_free_read_buffer(s);
1293       /* Clean up read_slice_buffer in case there is unread data. */
1294       grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
1295       grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
1296       grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
1297                             read_data_slice);
1298       uint32_t flags = 0;
1299       if (stream_state->rs.compressed) {
1300         flags = GRPC_WRITE_INTERNAL_COMPRESS;
1301       }
1302       stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
1303       stream_op->payload->recv_message.recv_message->reset(
1304           stream_state->rs.sbs.get());
1305       grpc_core::ExecCtx::Run(
1306           DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
1307           GRPC_ERROR_NONE);
1308       stream_state->state_op_done[OP_RECV_MESSAGE] = true;
1309       oas->state.state_op_done[OP_RECV_MESSAGE] = true;
1310       /* Do an extra read to trigger on_succeeded() callback in case connection
1311          is closed */
1312       stream_state->rs.length_field_received = false;
1313       read_grpc_header(s);
1314       result = ACTION_TAKEN_NO_CALLBACK;
1315     }
1316   } else if (stream_op->recv_trailing_metadata &&
1317              op_can_be_run(stream_op, s, &oas->state,
1318                            OP_RECV_TRAILING_METADATA)) {
1319     CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
1320     grpc_error* error = GRPC_ERROR_NONE;
1321     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1322       error = GRPC_ERROR_REF(stream_state->cancel_error);
1323     } else if (stream_state->state_callback_received[OP_FAILED]) {
1324       const char* desc = cronet_net_error_as_string(stream_state->net_error);
1325       error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE,
1326                                    stream_state->net_error, desc);
1327     } else if (oas->s->state.rs.trailing_metadata_valid) {
1328       grpc_chttp2_incoming_metadata_buffer_publish(
1329           &oas->s->state.rs.trailing_metadata,
1330           stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
1331       stream_state->rs.trailing_metadata_valid = false;
1332     }
1333     grpc_core::ExecCtx::Run(
1334         DEBUG_LOCATION,
1335         stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1336         error);
1337     stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
1338     result = ACTION_TAKEN_NO_CALLBACK;
1339   } else if (stream_op->cancel_stream &&
1340              op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
1341     CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
1342     if (s->cbs) {
1343       CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
1344       bidirectional_stream_cancel(s->cbs);
1345       result = ACTION_TAKEN_WITH_CALLBACK;
1346     } else {
1347       result = ACTION_TAKEN_NO_CALLBACK;
1348     }
1349     stream_state->state_op_done[OP_CANCEL_ERROR] = true;
1350     if (!stream_state->cancel_error) {
1351       stream_state->cancel_error =
1352           GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
1353     }
1354   } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
1355     CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
1356     if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
1357       if (stream_op->on_complete) {
1358         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1359                                 GRPC_ERROR_REF(stream_state->cancel_error));
1360       }
1361     } else if (stream_state->state_callback_received[OP_FAILED]) {
1362       if (stream_op->on_complete) {
1363         const char* error_message =
1364             cronet_net_error_as_string(stream_state->net_error);
1365         grpc_core::ExecCtx::Run(
1366             DEBUG_LOCATION, stream_op->on_complete,
1367             make_error_with_desc(GRPC_STATUS_UNAVAILABLE,
1368                                  stream_state->net_error, error_message));
1369       }
1370     } else {
1371       /* All actions in this stream_op are complete. Call the on_complete
1372        * callback
1373        */
1374       if (stream_op->on_complete) {
1375         grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
1376                                 GRPC_ERROR_NONE);
1377       }
1378     }
1379     oas->state.state_op_done[OP_ON_COMPLETE] = true;
1380     oas->done = true;
1381     /* reset any send message state, only if this ON_COMPLETE is about a send.
1382      */
1383     if (stream_op->send_message) {
1384       stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
1385       stream_state->state_op_done[OP_SEND_MESSAGE] = false;
1386     }
1387     result = ACTION_TAKEN_NO_CALLBACK;
1388     /* If this is the on_complete callback being called for a received message -
1389       make a note */
1390     if (stream_op->recv_message) {
1391       stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
1392     }
1393   } else {
1394     result = NO_ACTION_POSSIBLE;
1395   }
1396   return result;
1397 }
1398 
1399 /*
1400   Functions used by upper layers to access transport functionality.
1401 */
1402 
stream_obj(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,grpc_core::Arena * arena)1403 inline stream_obj::stream_obj(grpc_transport* gt, grpc_stream* gs,
1404                               grpc_stream_refcount* refcount,
1405                               grpc_core::Arena* arena)
1406     : arena(arena),
1407       curr_ct(reinterpret_cast<grpc_cronet_transport*>(gt)),
1408       curr_gs(gs),
1409       state(arena),
1410       refcount(refcount) {
1411   GRPC_CRONET_STREAM_REF(this, "cronet transport");
1412   gpr_mu_init(&mu);
1413 }
1414 
~stream_obj()1415 inline stream_obj::~stream_obj() {
1416   null_and_maybe_free_read_buffer(this);
1417   /* Clean up read_slice_buffer in case there is unread data. */
1418   grpc_slice_buffer_destroy_internal(&state.rs.read_slice_buffer);
1419   GRPC_ERROR_UNREF(state.cancel_error);
1420 }
1421 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)1422 static int init_stream(grpc_transport* gt, grpc_stream* gs,
1423                        grpc_stream_refcount* refcount, const void* server_data,
1424                        grpc_core::Arena* arena) {
1425   new (gs) stream_obj(gt, gs, refcount, arena);
1426   return 0;
1427 }
1428 
set_pollset_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)1429 static void set_pollset_do_nothing(grpc_transport* gt, grpc_stream* gs,
1430                                    grpc_pollset* pollset) {}
1431 
set_pollset_set_do_nothing(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)1432 static void set_pollset_set_do_nothing(grpc_transport* gt, grpc_stream* gs,
1433                                        grpc_pollset_set* pollset_set) {}
1434 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1435 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1436                               grpc_transport_stream_op_batch* op) {
1437   CRONET_LOG(GPR_DEBUG, "perform_stream_op");
1438   if (op->send_initial_metadata &&
1439       header_has_authority(op->payload->send_initial_metadata
1440                                .send_initial_metadata->list.head)) {
1441     /* Cronet does not support :authority header field. We cancel the call when
1442      this field is present in metadata */
1443     if (op->recv_initial_metadata) {
1444       grpc_core::ExecCtx::Run(
1445           DEBUG_LOCATION,
1446           op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1447           GRPC_ERROR_CANCELLED);
1448     }
1449     if (op->recv_message) {
1450       grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1451                               op->payload->recv_message.recv_message_ready,
1452                               GRPC_ERROR_CANCELLED);
1453     }
1454     if (op->recv_trailing_metadata) {
1455       grpc_core::ExecCtx::Run(
1456           DEBUG_LOCATION,
1457           op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1458           GRPC_ERROR_CANCELLED);
1459     }
1460     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
1461                             GRPC_ERROR_CANCELLED);
1462     return;
1463   }
1464   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1465   add_to_storage(s, op);
1466   execute_from_storage(s);
1467 }
1468 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)1469 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
1470                            grpc_closure* then_schedule_closure) {
1471   stream_obj* s = reinterpret_cast<stream_obj*>(gs);
1472   s->~stream_obj();
1473   grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1474                           GRPC_ERROR_NONE);
1475 }
1476 
destroy_transport(grpc_transport * gt)1477 static void destroy_transport(grpc_transport* gt) {}
1478 
get_endpoint(grpc_transport * gt)1479 static grpc_endpoint* get_endpoint(grpc_transport* gt) { return nullptr; }
1480 
perform_op(grpc_transport * gt,grpc_transport_op * op)1481 static void perform_op(grpc_transport* gt, grpc_transport_op* op) {}
1482 
1483 static const grpc_transport_vtable grpc_cronet_vtable = {
1484     sizeof(stream_obj),
1485     "cronet_http",
1486     init_stream,
1487     set_pollset_do_nothing,
1488     set_pollset_set_do_nothing,
1489     perform_stream_op,
1490     perform_op,
1491     destroy_stream,
1492     destroy_transport,
1493     get_endpoint};
1494 
grpc_create_cronet_transport(void * engine,const char * target,const grpc_channel_args * args,void * reserved)1495 grpc_transport* grpc_create_cronet_transport(void* engine, const char* target,
1496                                              const grpc_channel_args* args,
1497                                              void* reserved) {
1498   grpc_cronet_transport* ct = static_cast<grpc_cronet_transport*>(
1499       gpr_malloc(sizeof(grpc_cronet_transport)));
1500   if (!ct) {
1501     goto error;
1502   }
1503   ct->base.vtable = &grpc_cronet_vtable;
1504   ct->engine = static_cast<stream_engine*>(engine);
1505   ct->host = static_cast<char*>(gpr_malloc(strlen(target) + 1));
1506   if (!ct->host) {
1507     goto error;
1508   }
1509   strcpy(ct->host, target);
1510 
1511   ct->use_packet_coalescing = true;
1512   if (args) {
1513     for (size_t i = 0; i < args->num_args; i++) {
1514       if (0 ==
1515           strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) {
1516         if (GPR_UNLIKELY(args->args[i].type != GRPC_ARG_INTEGER)) {
1517           gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
1518                   GRPC_ARG_USE_CRONET_PACKET_COALESCING);
1519         } else {
1520           ct->use_packet_coalescing = (args->args[i].value.integer != 0);
1521         }
1522       }
1523     }
1524   }
1525 
1526   return &ct->base;
1527 
1528 error:
1529   if (ct) {
1530     if (ct->host) {
1531       gpr_free(ct->host);
1532     }
1533     gpr_free(ct);
1534   }
1535 
1536   return nullptr;
1537 }
1538