1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <string>
28 
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 
32 #include <grpc/compression.h>
33 #include <grpc/grpc.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38 
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/compression/algorithm_metadata.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/alloc.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gpr/time_precise.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/arena.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/iomgr/timer.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/slice/slice_utils.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/call_test_only.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/server.h"
59 #include "src/core/lib/surface/validate_metadata.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64 #include "src/core/lib/transport/transport.h"
65 
66 /** The maximum number of concurrent batches possible.
67     Based upon the maximum number of individually queueable ops in the batch
68     api:
69       - initial metadata send
70       - message send
71       - status/close send (depending on client/server)
72       - initial metadata recv
73       - message recv
74       - status/close recv (depending on client/server) */
75 #define MAX_CONCURRENT_BATCHES 6
76 
77 #define MAX_SEND_EXTRA_METADATA_COUNT 3
78 
79 // Used to create arena for the first call.
80 #define ESTIMATED_MDELEM_COUNT 16
81 
82 struct batch_control {
83   batch_control() = default;
84 
85   grpc_call* call = nullptr;
86   grpc_transport_stream_op_batch op;
87   /* Share memory for cq_completion and notify_tag as they are never needed
88      simultaneously. Each byte used in this data structure count as six bytes
89      per call, so any savings we can make are worthwhile,
90 
91      We use notify_tag to determine whether or not to send notification to the
92      completion queue. Once we've made that determination, we can reuse the
93      memory for cq_completion. */
94   union {
95     grpc_cq_completion cq_completion;
96     struct {
97       /* Any given op indicates completion by either (a) calling a closure or
98          (b) sending a notification on the call's completion queue.  If
99          \a is_closure is true, \a tag indicates a closure to be invoked;
100          otherwise, \a tag indicates the tag to be used in the notification to
101          be sent to the completion queue. */
102       void* tag;
103       bool is_closure;
104     } notify_tag;
105   } completion_data;
106   grpc_closure start_batch;
107   grpc_closure finish_batch;
108   grpc_core::Atomic<intptr_t> steps_to_complete;
109   gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
set_num_steps_to_completebatch_control110   void set_num_steps_to_complete(uintptr_t steps) {
111     steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
112   }
completed_batch_stepbatch_control113   bool completed_batch_step() {
114     return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
115   }
116 };
117 
118 struct parent_call {
parent_callparent_call119   parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call120   ~parent_call() { gpr_mu_destroy(&child_list_mu); }
121 
122   gpr_mu child_list_mu;
123   grpc_call* first_child = nullptr;
124 };
125 
126 struct child_call {
child_callchild_call127   child_call(grpc_call* parent) : parent(parent) {}
128   grpc_call* parent;
129   /** siblings: children of the same parent form a list, and this list is
130      protected under
131       parent->mu */
132   grpc_call* sibling_next = nullptr;
133   grpc_call* sibling_prev = nullptr;
134 };
135 
136 #define RECV_NONE ((gpr_atm)0)
137 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
138 
139 struct grpc_call {
grpc_callgrpc_call140   grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
141       : arena(arena),
142         cq(args.cq),
143         channel(args.channel),
144         is_client(args.server_transport_data == nullptr),
145         stream_op_payload(context) {
146     for (int i = 0; i < 2; i++) {
147       for (int j = 0; j < 2; j++) {
148         metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
149       }
150     }
151   }
152 
~grpc_callgrpc_call153   ~grpc_call() {
154     gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
155   }
156 
157   grpc_core::RefCount ext_ref;
158   grpc_core::Arena* arena;
159   grpc_core::CallCombiner call_combiner;
160   grpc_completion_queue* cq;
161   grpc_polling_entity pollent;
162   grpc_channel* channel;
163   gpr_cycle_counter start_time = gpr_get_cycle_counter();
164   /* parent_call* */ gpr_atm parent_call_atm = 0;
165   child_call* child = nullptr;
166 
167   /* client or server call */
168   bool is_client;
169   /** has grpc_call_unref been called */
170   bool destroy_called = false;
171   /** flag indicating that cancellation is inherited */
172   bool cancellation_is_inherited = false;
173   /** which ops are in-flight */
174   bool sent_initial_metadata = false;
175   bool sending_message = false;
176   bool sent_final_op = false;
177   bool received_initial_metadata = false;
178   bool receiving_message = false;
179   bool requested_final_op = false;
180   gpr_atm any_ops_sent_atm = 0;
181   gpr_atm received_final_op_atm = 0;
182 
183   batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
184   grpc_transport_stream_op_batch_payload stream_op_payload;
185 
186   /* first idx: is_receiving, second idx: is_trailing */
187   grpc_metadata_batch metadata_batch[2][2] = {};
188 
189   /* Buffered read metadata waiting to be returned to the application.
190      Element 0 is initial metadata, element 1 is trailing metadata. */
191   grpc_metadata_array* buffered_metadata[2] = {};
192 
193   grpc_metadata compression_md;
194 
195   // A char* indicating the peer name.
196   gpr_atm peer_string = 0;
197 
198   /* Call data useful used for reporting. Only valid after the call has
199    * completed */
200   grpc_call_final_info final_info;
201 
202   /* Compression algorithm for *incoming* data */
203   grpc_message_compression_algorithm incoming_message_compression_algorithm =
204       GRPC_MESSAGE_COMPRESS_NONE;
205   /* Stream compression algorithm for *incoming* data */
206   grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
207       GRPC_STREAM_COMPRESS_NONE;
208   /* Supported encodings (compression algorithms), a bitset.
209    * Always support no compression. */
210   uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
211   /* Supported stream encodings (stream compression algorithms), a bitset */
212   uint32_t stream_encodings_accepted_by_peer = 0;
213 
214   /* Contexts for various subsystems (security, tracing, ...). */
215   grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
216 
217   /* for the client, extra metadata is initial metadata; for the
218      server, it's trailing metadata */
219   grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
220   int send_extra_metadata_count;
221   grpc_millis send_deadline;
222 
223   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
224 
225   grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
226   grpc_byte_buffer** receiving_buffer = nullptr;
227   grpc_slice receiving_slice = grpc_empty_slice();
228   grpc_closure receiving_slice_ready;
229   grpc_closure receiving_stream_ready;
230   grpc_closure receiving_initial_metadata_ready;
231   grpc_closure receiving_trailing_metadata_ready;
232   uint32_t test_only_last_message_flags = 0;
233   // Status about operation of call
234   bool sent_server_trailing_metadata = false;
235   gpr_atm cancelled_with_error = 0;
236 
237   grpc_closure release_call;
238 
239   union {
240     struct {
241       grpc_status_code* status;
242       grpc_slice* status_details;
243       const char** error_string;
244     } client;
245     struct {
246       int* cancelled;
247       // backpointer to owning server if this is a server side call.
248       grpc_core::Server* core_server;
249     } server;
250   } final_op;
251   gpr_atm status_error = 0;
252 
253   /* recv_state can contain one of the following values:
254      RECV_NONE :                 :  no initial metadata and messages received
255      RECV_INITIAL_METADATA_FIRST :  received initial metadata first
256      a batch_control*            :  received messages first
257 
258                  +------1------RECV_NONE------3-----+
259                  |                                  |
260                  |                                  |
261                  v                                  v
262      RECV_INITIAL_METADATA_FIRST        receiving_stream_ready_bctlp
263            |           ^                      |           ^
264            |           |                      |           |
265            +-----2-----+                      +-----4-----+
266 
267     For 1, 4: See receiving_initial_metadata_ready() function
268     For 2, 3: See receiving_stream_ready() function */
269   gpr_atm recv_state = 0;
270 };
271 
272 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
273 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
274 
275 #define CALL_STACK_FROM_CALL(call)   \
276   (grpc_call_stack*)((char*)(call) + \
277                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
278 #define CALL_FROM_CALL_STACK(call_stack) \
279   (grpc_call*)(((char*)(call_stack)) -   \
280                GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
281 
282 #define CALL_ELEM_FROM_CALL(call, idx) \
283   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
284 #define CALL_FROM_TOP_ELEM(top_elem) \
285   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
286 
287 static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
288                           grpc_closure* start_batch_closure);
289 
290 static void cancel_with_status(grpc_call* c, grpc_status_code status,
291                                const char* description);
292 static void cancel_with_error(grpc_call* c, grpc_error* error);
293 static void destroy_call(void* call_stack, grpc_error* error);
294 static void receiving_slice_ready(void* bctlp, grpc_error* error);
295 static void set_final_status(grpc_call* call, grpc_error* error);
296 static void process_data_after_md(batch_control* bctl);
297 static void post_batch_completion(batch_control* bctl);
298 
add_init_error(grpc_error ** composite,grpc_error * new_err)299 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
300   if (new_err == GRPC_ERROR_NONE) return;
301   if (*composite == GRPC_ERROR_NONE) {
302     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
303   }
304   *composite = grpc_error_add_child(*composite, new_err);
305 }
306 
grpc_call_arena_alloc(grpc_call * call,size_t size)307 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
308   return call->arena->Alloc(size);
309 }
310 
get_or_create_parent_call(grpc_call * call)311 static parent_call* get_or_create_parent_call(grpc_call* call) {
312   parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
313   if (p == nullptr) {
314     p = call->arena->New<parent_call>();
315     if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
316                          (gpr_atm)p)) {
317       p->~parent_call();
318       p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
319     }
320   }
321   return p;
322 }
323 
get_parent_call(grpc_call * call)324 static parent_call* get_parent_call(grpc_call* call) {
325   return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
326 }
327 
grpc_call_get_initial_size_estimate()328 size_t grpc_call_get_initial_size_estimate() {
329   return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
330          sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
331 }
332 
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)333 grpc_error* grpc_call_create(const grpc_call_create_args* args,
334                              grpc_call** out_call) {
335   GPR_TIMER_SCOPE("grpc_call_create", 0);
336 
337   GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
338 
339   grpc_core::Arena* arena;
340   grpc_call* call;
341   grpc_error* error = GRPC_ERROR_NONE;
342   grpc_channel_stack* channel_stack =
343       grpc_channel_get_channel_stack(args->channel);
344   size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
345   GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
346   size_t call_and_stack_size =
347       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
348       channel_stack->call_stack_size;
349   size_t call_alloc_size =
350       call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
351 
352   std::pair<grpc_core::Arena*, void*> arena_with_call =
353       grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
354   arena = arena_with_call.first;
355   call = new (arena_with_call.second) grpc_call(arena, *args);
356   *out_call = call;
357   grpc_slice path = grpc_empty_slice();
358   if (call->is_client) {
359     call->final_op.client.status_details = nullptr;
360     call->final_op.client.status = nullptr;
361     call->final_op.client.error_string = nullptr;
362     GRPC_STATS_INC_CLIENT_CALLS_CREATED();
363     GPR_ASSERT(args->add_initial_metadata_count <
364                MAX_SEND_EXTRA_METADATA_COUNT);
365     for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
366       call->send_extra_metadata[i].md = args->add_initial_metadata[i];
367       if (grpc_slice_eq_static_interned(
368               GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
369         path = grpc_slice_ref_internal(
370             GRPC_MDVALUE(args->add_initial_metadata[i]));
371       }
372     }
373     call->send_extra_metadata_count =
374         static_cast<int>(args->add_initial_metadata_count);
375   } else {
376     GRPC_STATS_INC_SERVER_CALLS_CREATED();
377     call->final_op.server.cancelled = nullptr;
378     call->final_op.server.core_server = args->server;
379     GPR_ASSERT(args->add_initial_metadata_count == 0);
380     call->send_extra_metadata_count = 0;
381   }
382 
383   grpc_millis send_deadline = args->send_deadline;
384   bool immediately_cancel = false;
385 
386   if (args->parent != nullptr) {
387     call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
388                        call_and_stack_size) child_call(args->parent);
389 
390     GRPC_CALL_INTERNAL_REF(args->parent, "child");
391     GPR_ASSERT(call->is_client);
392     GPR_ASSERT(!args->parent->is_client);
393 
394     if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
395       send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
396     }
397     /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
398      * GRPC_PROPAGATE_STATS_CONTEXT */
399     /* TODO(ctiller): This should change to use the appropriate census start_op
400      * call. */
401     if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
402       if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
403         add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
404                                    "Census tracing propagation requested "
405                                    "without Census context propagation"));
406       }
407       grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
408                             args->parent->context[GRPC_CONTEXT_TRACING].value,
409                             nullptr);
410     } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
411       add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
412                                  "Census context propagation requested "
413                                  "without Census tracing propagation"));
414     }
415     if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
416       call->cancellation_is_inherited = true;
417       if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
418         immediately_cancel = true;
419       }
420     }
421   }
422   call->send_deadline = send_deadline;
423   /* initial refcount dropped by grpc_call_unref */
424   grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
425                                       args->server_transport_data,
426                                       call->context,
427                                       path,
428                                       call->start_time,
429                                       send_deadline,
430                                       call->arena,
431                                       &call->call_combiner};
432   add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
433                                               call, &call_args));
434   // Publish this call to parent only after the call stack has been initialized.
435   if (args->parent != nullptr) {
436     child_call* cc = call->child;
437     parent_call* pc = get_or_create_parent_call(args->parent);
438     gpr_mu_lock(&pc->child_list_mu);
439     if (pc->first_child == nullptr) {
440       pc->first_child = call;
441       cc->sibling_next = cc->sibling_prev = call;
442     } else {
443       cc->sibling_next = pc->first_child;
444       cc->sibling_prev = pc->first_child->child->sibling_prev;
445       cc->sibling_next->child->sibling_prev =
446           cc->sibling_prev->child->sibling_next = call;
447     }
448     gpr_mu_unlock(&pc->child_list_mu);
449   }
450 
451   if (error != GRPC_ERROR_NONE) {
452     cancel_with_error(call, GRPC_ERROR_REF(error));
453   }
454   if (immediately_cancel) {
455     cancel_with_error(call, GRPC_ERROR_CANCELLED);
456   }
457   if (args->cq != nullptr) {
458     GPR_ASSERT(args->pollset_set_alternative == nullptr &&
459                "Only one of 'cq' and 'pollset_set_alternative' should be "
460                "non-nullptr.");
461     GRPC_CQ_INTERNAL_REF(args->cq, "bind");
462     call->pollent =
463         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
464   }
465   if (args->pollset_set_alternative != nullptr) {
466     call->pollent = grpc_polling_entity_create_from_pollset_set(
467         args->pollset_set_alternative);
468   }
469   if (!grpc_polling_entity_is_empty(&call->pollent)) {
470     grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
471                                                &call->pollent);
472   }
473 
474   if (call->is_client) {
475     grpc_core::channelz::ChannelNode* channelz_channel =
476         grpc_channel_get_channelz_node(call->channel);
477     if (channelz_channel != nullptr) {
478       channelz_channel->RecordCallStarted();
479     }
480   } else if (call->final_op.server.core_server != nullptr) {
481     grpc_core::channelz::ServerNode* channelz_node =
482         call->final_op.server.core_server->channelz_node();
483     if (channelz_node != nullptr) {
484       channelz_node->RecordCallStarted();
485     }
486   }
487 
488   grpc_slice_unref_internal(path);
489 
490   return error;
491 }
492 
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)493 void grpc_call_set_completion_queue(grpc_call* call,
494                                     grpc_completion_queue* cq) {
495   GPR_ASSERT(cq);
496 
497   if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
498     gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
499     abort();
500   }
501   call->cq = cq;
502   GRPC_CQ_INTERNAL_REF(cq, "bind");
503   call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
504   grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
505                                              &call->pollent);
506 }
507 
508 #ifndef NDEBUG
509 #define REF_REASON reason
510 #define REF_ARG , const char* reason
511 #else
512 #define REF_REASON ""
513 #define REF_ARG
514 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)515 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
516   GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
517 }
grpc_call_internal_unref(grpc_call * c REF_ARG)518 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
519   GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
520 }
521 
release_call(void * call,grpc_error *)522 static void release_call(void* call, grpc_error* /*error*/) {
523   grpc_call* c = static_cast<grpc_call*>(call);
524   grpc_channel* channel = c->channel;
525   grpc_core::Arena* arena = c->arena;
526   c->~grpc_call();
527   grpc_channel_update_call_size_estimate(channel, arena->Destroy());
528   GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
529 }
530 
destroy_call(void * call,grpc_error *)531 static void destroy_call(void* call, grpc_error* /*error*/) {
532   GPR_TIMER_SCOPE("destroy_call", 0);
533   size_t i;
534   int ii;
535   grpc_call* c = static_cast<grpc_call*>(call);
536   for (i = 0; i < 2; i++) {
537     grpc_metadata_batch_destroy(
538         &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
539   }
540   c->receiving_stream.reset();
541   parent_call* pc = get_parent_call(c);
542   if (pc != nullptr) {
543     pc->~parent_call();
544   }
545   for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
546     GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
547   }
548   for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
549     if (c->context[i].destroy) {
550       c->context[i].destroy(c->context[i].value);
551     }
552   }
553   if (c->cq) {
554     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
555   }
556 
557   grpc_error* status_error =
558       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
559   grpc_error_get_status(status_error, c->send_deadline,
560                         &c->final_info.final_status, nullptr, nullptr,
561                         &(c->final_info.error_string));
562   GRPC_ERROR_UNREF(status_error);
563   c->final_info.stats.latency =
564       gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
565   grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
566                           GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
567                                             grpc_schedule_on_exec_ctx));
568 }
569 
grpc_call_ref(grpc_call * c)570 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
571 
grpc_call_unref(grpc_call * c)572 void grpc_call_unref(grpc_call* c) {
573   if (GPR_LIKELY(!c->ext_ref.Unref())) return;
574 
575   GPR_TIMER_SCOPE("grpc_call_unref", 0);
576 
577   child_call* cc = c->child;
578   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
579   grpc_core::ExecCtx exec_ctx;
580 
581   GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
582 
583   if (cc) {
584     parent_call* pc = get_parent_call(cc->parent);
585     gpr_mu_lock(&pc->child_list_mu);
586     if (c == pc->first_child) {
587       pc->first_child = cc->sibling_next;
588       if (c == pc->first_child) {
589         pc->first_child = nullptr;
590       }
591     }
592     cc->sibling_prev->child->sibling_next = cc->sibling_next;
593     cc->sibling_next->child->sibling_prev = cc->sibling_prev;
594     gpr_mu_unlock(&pc->child_list_mu);
595     GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
596   }
597 
598   GPR_ASSERT(!c->destroy_called);
599   c->destroy_called = true;
600   bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
601                 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
602   if (cancel) {
603     cancel_with_error(c, GRPC_ERROR_CANCELLED);
604   } else {
605     // Unset the call combiner cancellation closure.  This has the
606     // effect of scheduling the previously set cancellation closure, if
607     // any, so that it can release any internal references it may be
608     // holding to the call stack. Also flush the closures on exec_ctx so that
609     // filters that schedule cancel notification closures on exec_ctx do not
610     // need to take a ref of the call stack to guarantee closure liveness.
611     c->call_combiner.SetNotifyOnCancel(nullptr);
612     grpc_core::ExecCtx::Get()->Flush();
613   }
614   GRPC_CALL_INTERNAL_UNREF(c, "destroy");
615 }
616 
grpc_call_cancel(grpc_call * call,void * reserved)617 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
618   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
619   GPR_ASSERT(!reserved);
620   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
621   grpc_core::ExecCtx exec_ctx;
622   cancel_with_error(call, GRPC_ERROR_CANCELLED);
623   return GRPC_CALL_OK;
624 }
625 
626 // This is called via the call combiner to start sending a batch down
627 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error *)628 static void execute_batch_in_call_combiner(void* arg, grpc_error* /*ignored*/) {
629   GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
630   grpc_transport_stream_op_batch* batch =
631       static_cast<grpc_transport_stream_op_batch*>(arg);
632   grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
633   grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
634   GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
635   elem->filter->start_transport_stream_op_batch(elem, batch);
636 }
637 
638 // start_batch_closure points to a caller-allocated closure to be used
639 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)640 static void execute_batch(grpc_call* call,
641                           grpc_transport_stream_op_batch* batch,
642                           grpc_closure* start_batch_closure) {
643   batch->handler_private.extra_arg = call;
644   GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
645                     grpc_schedule_on_exec_ctx);
646   GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
647                            GRPC_ERROR_NONE, "executing batch");
648 }
649 
grpc_call_get_peer(grpc_call * call)650 char* grpc_call_get_peer(grpc_call* call) {
651   char* peer_string = (char*)gpr_atm_acq_load(&call->peer_string);
652   if (peer_string != nullptr) return gpr_strdup(peer_string);
653   peer_string = grpc_channel_get_target(call->channel);
654   if (peer_string != nullptr) return peer_string;
655   return gpr_strdup("unknown");
656 }
657 
grpc_call_from_top_element(grpc_call_element * elem)658 grpc_call* grpc_call_from_top_element(grpc_call_element* elem) {
659   return CALL_FROM_TOP_ELEM(elem);
660 }
661 
662 /*******************************************************************************
663  * CANCELLATION
664  */
665 
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)666 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
667                                              grpc_status_code status,
668                                              const char* description,
669                                              void* reserved) {
670   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
671   grpc_core::ExecCtx exec_ctx;
672   GRPC_API_TRACE(
673       "grpc_call_cancel_with_status("
674       "c=%p, status=%d, description=%s, reserved=%p)",
675       4, (c, (int)status, description, reserved));
676   GPR_ASSERT(reserved == nullptr);
677   cancel_with_status(c, status, description);
678   return GRPC_CALL_OK;
679 }
680 
681 struct cancel_state {
682   grpc_call* call;
683   grpc_closure start_batch;
684   grpc_closure finish_batch;
685 };
686 // The on_complete callback used when sending a cancel_stream batch down
687 // the filter stack.  Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error *)688 static void done_termination(void* arg, grpc_error* /*error*/) {
689   cancel_state* state = static_cast<cancel_state*>(arg);
690   GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
691                           "on_complete for cancel_stream op");
692   GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
693   gpr_free(state);
694 }
695 
cancel_with_error(grpc_call * c,grpc_error * error)696 static void cancel_with_error(grpc_call* c, grpc_error* error) {
697   if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
698     GRPC_ERROR_UNREF(error);
699     return;
700   }
701   GRPC_CALL_INTERNAL_REF(c, "termination");
702   // Inform the call combiner of the cancellation, so that it can cancel
703   // any in-flight asynchronous actions that may be holding the call
704   // combiner.  This ensures that the cancel_stream batch can be sent
705   // down the filter stack in a timely manner.
706   c->call_combiner.Cancel(GRPC_ERROR_REF(error));
707   cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
708   state->call = c;
709   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
710                     grpc_schedule_on_exec_ctx);
711   grpc_transport_stream_op_batch* op =
712       grpc_make_transport_stream_op(&state->finish_batch);
713   op->cancel_stream = true;
714   op->payload->cancel_stream.cancel_error = error;
715   execute_batch(c, op, &state->start_batch);
716 }
717 
grpc_call_cancel_internal(grpc_call * call)718 void grpc_call_cancel_internal(grpc_call* call) {
719   cancel_with_error(call, GRPC_ERROR_CANCELLED);
720 }
721 
error_from_status(grpc_status_code status,const char * description)722 static grpc_error* error_from_status(grpc_status_code status,
723                                      const char* description) {
724   // copying 'description' is needed to ensure the grpc_call_cancel_with_status
725   // guarantee that can be short-lived.
726   return grpc_error_set_int(
727       grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
728                          GRPC_ERROR_STR_GRPC_MESSAGE,
729                          grpc_slice_from_copied_string(description)),
730       GRPC_ERROR_INT_GRPC_STATUS, status);
731 }
732 
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)733 static void cancel_with_status(grpc_call* c, grpc_status_code status,
734                                const char* description) {
735   cancel_with_error(c, error_from_status(status, description));
736 }
737 
set_final_status(grpc_call * call,grpc_error * error)738 static void set_final_status(grpc_call* call, grpc_error* error) {
739   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
740     gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
741     gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
742   }
743   if (call->is_client) {
744     grpc_error_get_status(error, call->send_deadline,
745                           call->final_op.client.status,
746                           call->final_op.client.status_details, nullptr,
747                           call->final_op.client.error_string);
748     // explicitly take a ref
749     grpc_slice_ref_internal(*call->final_op.client.status_details);
750     gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
751     grpc_core::channelz::ChannelNode* channelz_channel =
752         grpc_channel_get_channelz_node(call->channel);
753     if (channelz_channel != nullptr) {
754       if (*call->final_op.client.status != GRPC_STATUS_OK) {
755         channelz_channel->RecordCallFailed();
756       } else {
757         channelz_channel->RecordCallSucceeded();
758       }
759     }
760   } else {
761     *call->final_op.server.cancelled =
762         error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
763     grpc_core::channelz::ServerNode* channelz_node =
764         call->final_op.server.core_server->channelz_node();
765     if (channelz_node != nullptr) {
766       if (*call->final_op.server.cancelled ||
767           reinterpret_cast<grpc_error*>(
768               gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
769         channelz_node->RecordCallFailed();
770       } else {
771         channelz_node->RecordCallSucceeded();
772       }
773     }
774     GRPC_ERROR_UNREF(error);
775   }
776 }
777 
778 /*******************************************************************************
779  * COMPRESSION
780  */
781 
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)782 static void set_incoming_message_compression_algorithm(
783     grpc_call* call, grpc_message_compression_algorithm algo) {
784   GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
785   call->incoming_message_compression_algorithm = algo;
786 }
787 
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)788 static void set_incoming_stream_compression_algorithm(
789     grpc_call* call, grpc_stream_compression_algorithm algo) {
790   GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
791   call->incoming_stream_compression_algorithm = algo;
792 }
793 
grpc_call_test_only_get_compression_algorithm(grpc_call * call)794 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
795     grpc_call* call) {
796   grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
797   grpc_compression_algorithm_from_message_stream_compression_algorithm(
798       &algorithm, call->incoming_message_compression_algorithm,
799       call->incoming_stream_compression_algorithm);
800   return algorithm;
801 }
802 
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)803 static grpc_compression_algorithm compression_algorithm_for_level_locked(
804     grpc_call* call, grpc_compression_level level) {
805   return grpc_compression_algorithm_for_level(level,
806                                               call->encodings_accepted_by_peer);
807 }
808 
grpc_call_test_only_get_message_flags(grpc_call * call)809 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
810   uint32_t flags;
811   flags = call->test_only_last_message_flags;
812   return flags;
813 }
814 
destroy_encodings_accepted_by_peer(void *)815 static void destroy_encodings_accepted_by_peer(void* /*p*/) {}
816 
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)817 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
818                                            grpc_mdelem mdel,
819                                            uint32_t* encodings_accepted_by_peer,
820                                            bool stream_encoding) {
821   size_t i;
822   uint32_t algorithm;
823   grpc_slice_buffer accept_encoding_parts;
824   grpc_slice accept_encoding_slice;
825   void* accepted_user_data;
826 
827   accepted_user_data =
828       grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
829   if (accepted_user_data != nullptr) {
830     *encodings_accepted_by_peer =
831         static_cast<uint32_t>(((uintptr_t)accepted_user_data) - 1);
832     return;
833   }
834 
835   *encodings_accepted_by_peer = 0;
836 
837   accept_encoding_slice = GRPC_MDVALUE(mdel);
838   grpc_slice_buffer_init(&accept_encoding_parts);
839   grpc_slice_split_without_space(accept_encoding_slice, ",",
840                                  &accept_encoding_parts);
841 
842   GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
843   for (i = 0; i < accept_encoding_parts.count; i++) {
844     int r;
845     grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
846     if (!stream_encoding) {
847       r = grpc_message_compression_algorithm_parse(
848           accept_encoding_entry_slice,
849           reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
850     } else {
851       r = grpc_stream_compression_algorithm_parse(
852           accept_encoding_entry_slice,
853           reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
854     }
855     if (r) {
856       GPR_BITSET(encodings_accepted_by_peer, algorithm);
857     } else {
858       char* accept_encoding_entry_str =
859           grpc_slice_to_c_string(accept_encoding_entry_slice);
860       gpr_log(GPR_DEBUG,
861               "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
862               accept_encoding_entry_str);
863       gpr_free(accept_encoding_entry_str);
864     }
865   }
866 
867   grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
868 
869   grpc_mdelem_set_user_data(
870       mdel, destroy_encodings_accepted_by_peer,
871       (void*)((static_cast<uintptr_t>(*encodings_accepted_by_peer)) + 1));
872 }
873 
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)874 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
875   uint32_t encodings_accepted_by_peer;
876   encodings_accepted_by_peer = call->encodings_accepted_by_peer;
877   return encodings_accepted_by_peer;
878 }
879 
880 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)881 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
882   return call->incoming_stream_compression_algorithm;
883 }
884 
linked_from_md(const grpc_metadata * md)885 static grpc_linked_mdelem* linked_from_md(const grpc_metadata* md) {
886   return (grpc_linked_mdelem*)&md->internal_data;
887 }
888 
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)889 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
890                                   grpc_metadata* additional_metadata, int i,
891                                   int count) {
892   grpc_metadata* res =
893       i < count ? &metadata[i] : &additional_metadata[i - count];
894   GPR_ASSERT(res);
895   return res;
896 }
897 
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)898 static int prepare_application_metadata(grpc_call* call, int count,
899                                         grpc_metadata* metadata,
900                                         int is_trailing,
901                                         int prepend_extra_metadata,
902                                         grpc_metadata* additional_metadata,
903                                         int additional_metadata_count) {
904   int total_count = count + additional_metadata_count;
905   int i;
906   grpc_metadata_batch* batch =
907       &call->metadata_batch[0 /* is_receiving */][is_trailing];
908   for (i = 0; i < total_count; i++) {
909     const grpc_metadata* md =
910         get_md_elem(metadata, additional_metadata, i, count);
911     grpc_linked_mdelem* l = linked_from_md(md);
912     GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
913     if (!GRPC_LOG_IF_ERROR("validate_metadata",
914                            grpc_validate_header_key_is_legal(md->key))) {
915       break;
916     } else if (!grpc_is_binary_header_internal(md->key) &&
917                !GRPC_LOG_IF_ERROR(
918                    "validate_metadata",
919                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
920       break;
921     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
922       // HTTP2 hpack encoding has a maximum limit.
923       break;
924     }
925     l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
926   }
927   if (i != total_count) {
928     for (int j = 0; j < i; j++) {
929       const grpc_metadata* md =
930           get_md_elem(metadata, additional_metadata, j, count);
931       grpc_linked_mdelem* l = linked_from_md(md);
932       GRPC_MDELEM_UNREF(l->md);
933     }
934     return 0;
935   }
936   if (prepend_extra_metadata) {
937     if (call->send_extra_metadata_count == 0) {
938       prepend_extra_metadata = 0;
939     } else {
940       for (i = 0; i < call->send_extra_metadata_count; i++) {
941         GRPC_LOG_IF_ERROR("prepare_application_metadata",
942                           grpc_metadata_batch_link_tail(
943                               batch, &call->send_extra_metadata[i]));
944       }
945     }
946   }
947   for (i = 0; i < total_count; i++) {
948     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
949     grpc_linked_mdelem* l = linked_from_md(md);
950     grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
951     if (error != GRPC_ERROR_NONE) {
952       GRPC_MDELEM_UNREF(l->md);
953     }
954     GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
955   }
956   call->send_extra_metadata_count = 0;
957 
958   return 1;
959 }
960 
decode_message_compression(grpc_mdelem md)961 static grpc_message_compression_algorithm decode_message_compression(
962     grpc_mdelem md) {
963   grpc_message_compression_algorithm algorithm =
964       grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
965   if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
966     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
967     gpr_log(GPR_ERROR,
968             "Invalid incoming message compression algorithm: '%s'. "
969             "Interpreting incoming data as uncompressed.",
970             md_c_str);
971     gpr_free(md_c_str);
972     return GRPC_MESSAGE_COMPRESS_NONE;
973   }
974   return algorithm;
975 }
976 
decode_stream_compression(grpc_mdelem md)977 static grpc_stream_compression_algorithm decode_stream_compression(
978     grpc_mdelem md) {
979   grpc_stream_compression_algorithm algorithm =
980       grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
981   if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
982     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
983     gpr_log(GPR_ERROR,
984             "Invalid incoming stream compression algorithm: '%s'. Interpreting "
985             "incoming data as uncompressed.",
986             md_c_str);
987     gpr_free(md_c_str);
988     return GRPC_STREAM_COMPRESS_NONE;
989   }
990   return algorithm;
991 }
992 
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)993 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
994                                  int is_trailing) {
995   if (b->list.count == 0) return;
996   if (!call->is_client && is_trailing) return;
997   if (is_trailing && call->buffered_metadata[1] == nullptr) return;
998   GPR_TIMER_SCOPE("publish_app_metadata", 0);
999   grpc_metadata_array* dest;
1000   grpc_metadata* mdusr;
1001   dest = call->buffered_metadata[is_trailing];
1002   if (dest->count + b->list.count > dest->capacity) {
1003     dest->capacity =
1004         GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
1005     dest->metadata = static_cast<grpc_metadata*>(
1006         gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1007   }
1008   for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1009     mdusr = &dest->metadata[dest->count++];
1010     /* we pass back borrowed slices that are valid whilst the call is valid */
1011     mdusr->key = GRPC_MDKEY(l->md);
1012     mdusr->value = GRPC_MDVALUE(l->md);
1013   }
1014 }
1015 
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1016 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1017   if (b->idx.named.content_encoding != nullptr) {
1018     GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1019     set_incoming_stream_compression_algorithm(
1020         call, decode_stream_compression(b->idx.named.content_encoding->md));
1021     grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1022   }
1023   if (b->idx.named.grpc_encoding != nullptr) {
1024     GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1025     set_incoming_message_compression_algorithm(
1026         call, decode_message_compression(b->idx.named.grpc_encoding->md));
1027     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1028   }
1029   uint32_t message_encodings_accepted_by_peer = 1u;
1030   uint32_t stream_encodings_accepted_by_peer = 1u;
1031   if (b->idx.named.grpc_accept_encoding != nullptr) {
1032     GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1033     set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1034                                    &message_encodings_accepted_by_peer, false);
1035     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1036   }
1037   if (b->idx.named.accept_encoding != nullptr) {
1038     GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1039     set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1040                                    &stream_encodings_accepted_by_peer, true);
1041     grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1042   }
1043   call->encodings_accepted_by_peer =
1044       grpc_compression_bitset_from_message_stream_compression_bitset(
1045           message_encodings_accepted_by_peer,
1046           stream_encodings_accepted_by_peer);
1047   publish_app_metadata(call, b, false);
1048 }
1049 
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error * batch_error)1050 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1051                                  grpc_error* batch_error) {
1052   grpc_call* call = static_cast<grpc_call*>(args);
1053   if (batch_error != GRPC_ERROR_NONE) {
1054     set_final_status(call, batch_error);
1055   } else if (b->idx.named.grpc_status != nullptr) {
1056     grpc_status_code status_code =
1057         grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1058     grpc_error* error = GRPC_ERROR_NONE;
1059     if (status_code != GRPC_STATUS_OK) {
1060       char* peer = grpc_call_get_peer(call);
1061       error = grpc_error_set_int(
1062           GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1063               absl::StrCat("Error received from peer ", peer).c_str()),
1064           GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1065       gpr_free(peer);
1066     }
1067     if (b->idx.named.grpc_message != nullptr) {
1068       error = grpc_error_set_str(
1069           error, GRPC_ERROR_STR_GRPC_MESSAGE,
1070           grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1071       grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1072     } else if (error != GRPC_ERROR_NONE) {
1073       error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1074                                  grpc_empty_slice());
1075     }
1076     set_final_status(call, GRPC_ERROR_REF(error));
1077     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1078     GRPC_ERROR_UNREF(error);
1079   } else if (!call->is_client) {
1080     set_final_status(call, GRPC_ERROR_NONE);
1081   } else {
1082     gpr_log(GPR_DEBUG,
1083             "Received trailing metadata with no error and no status");
1084     set_final_status(
1085         call, grpc_error_set_int(
1086                   GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1087                   GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1088   }
1089   publish_app_metadata(call, b, true);
1090 }
1091 
grpc_call_get_arena(grpc_call * call)1092 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1093 
grpc_call_get_call_stack(grpc_call * call)1094 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1095   return CALL_STACK_FROM_CALL(call);
1096 }
1097 
1098 /*******************************************************************************
1099  * BATCH API IMPLEMENTATION
1100  */
1101 
are_write_flags_valid(uint32_t flags)1102 static bool are_write_flags_valid(uint32_t flags) {
1103   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1104   const uint32_t allowed_write_positions =
1105       (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1106   const uint32_t invalid_positions = ~allowed_write_positions;
1107   return !(flags & invalid_positions);
1108 }
1109 
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1110 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1111   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1112   uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1113   if (!is_client) {
1114     invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1115   }
1116   return !(flags & invalid_positions);
1117 }
1118 
batch_slot_for_op(grpc_op_type type)1119 static size_t batch_slot_for_op(grpc_op_type type) {
1120   switch (type) {
1121     case GRPC_OP_SEND_INITIAL_METADATA:
1122       return 0;
1123     case GRPC_OP_SEND_MESSAGE:
1124       return 1;
1125     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1126     case GRPC_OP_SEND_STATUS_FROM_SERVER:
1127       return 2;
1128     case GRPC_OP_RECV_INITIAL_METADATA:
1129       return 3;
1130     case GRPC_OP_RECV_MESSAGE:
1131       return 4;
1132     case GRPC_OP_RECV_CLOSE_ON_SERVER:
1133     case GRPC_OP_RECV_STATUS_ON_CLIENT:
1134       return 5;
1135   }
1136   GPR_UNREACHABLE_CODE(return 123456789);
1137 }
1138 
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1139 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1140                                                       const grpc_op* ops) {
1141   size_t slot_idx = batch_slot_for_op(ops[0].op);
1142   batch_control** pslot = &call->active_batches[slot_idx];
1143   batch_control* bctl;
1144   if (*pslot != nullptr) {
1145     bctl = *pslot;
1146     if (bctl->call != nullptr) {
1147       return nullptr;
1148     }
1149     bctl->~batch_control();
1150     bctl->op = {};
1151   } else {
1152     bctl = call->arena->New<batch_control>();
1153     *pslot = bctl;
1154   }
1155   bctl->call = call;
1156   bctl->op.payload = &call->stream_op_payload;
1157   return bctl;
1158 }
1159 
finish_batch_completion(void * user_data,grpc_cq_completion *)1160 static void finish_batch_completion(void* user_data,
1161                                     grpc_cq_completion* /*storage*/) {
1162   batch_control* bctl = static_cast<batch_control*>(user_data);
1163   grpc_call* call = bctl->call;
1164   bctl->call = nullptr;
1165   GRPC_CALL_INTERNAL_UNREF(call, "completion");
1166 }
1167 
reset_batch_errors(batch_control * bctl)1168 static void reset_batch_errors(batch_control* bctl) {
1169   GRPC_ERROR_UNREF(
1170       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1171   gpr_atm_rel_store(&bctl->batch_error,
1172                     reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1173 }
1174 
post_batch_completion(batch_control * bctl)1175 static void post_batch_completion(batch_control* bctl) {
1176   grpc_call* next_child_call;
1177   grpc_call* call = bctl->call;
1178   grpc_error* error = GRPC_ERROR_REF(
1179       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1180 
1181   if (bctl->op.send_initial_metadata) {
1182     grpc_metadata_batch_destroy(
1183         &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1184   }
1185   if (bctl->op.send_message) {
1186     if (bctl->op.payload->send_message.stream_write_closed &&
1187         error == GRPC_ERROR_NONE) {
1188       error = grpc_error_add_child(
1189           error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1190                      "Attempt to send message after stream was closed."));
1191     }
1192     call->sending_message = false;
1193   }
1194   if (bctl->op.send_trailing_metadata) {
1195     grpc_metadata_batch_destroy(
1196         &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1197   }
1198   if (bctl->op.recv_trailing_metadata) {
1199     /* propagate cancellation to any interested children */
1200     gpr_atm_rel_store(&call->received_final_op_atm, 1);
1201     parent_call* pc = get_parent_call(call);
1202     if (pc != nullptr) {
1203       grpc_call* child;
1204       gpr_mu_lock(&pc->child_list_mu);
1205       child = pc->first_child;
1206       if (child != nullptr) {
1207         do {
1208           next_child_call = child->child->sibling_next;
1209           if (child->cancellation_is_inherited) {
1210             GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1211             cancel_with_error(child, GRPC_ERROR_CANCELLED);
1212             GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1213           }
1214           child = next_child_call;
1215         } while (child != pc->first_child);
1216       }
1217       gpr_mu_unlock(&pc->child_list_mu);
1218     }
1219     GRPC_ERROR_UNREF(error);
1220     error = GRPC_ERROR_NONE;
1221   }
1222   if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1223       *call->receiving_buffer != nullptr) {
1224     grpc_byte_buffer_destroy(*call->receiving_buffer);
1225     *call->receiving_buffer = nullptr;
1226   }
1227   reset_batch_errors(bctl);
1228 
1229   if (bctl->completion_data.notify_tag.is_closure) {
1230     /* unrefs error */
1231     bctl->call = nullptr;
1232     grpc_core::Closure::Run(DEBUG_LOCATION,
1233                             (grpc_closure*)bctl->completion_data.notify_tag.tag,
1234                             error);
1235     GRPC_CALL_INTERNAL_UNREF(call, "completion");
1236   } else {
1237     /* unrefs error */
1238     grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1239                    finish_batch_completion, bctl,
1240                    &bctl->completion_data.cq_completion);
1241   }
1242 }
1243 
finish_batch_step(batch_control * bctl)1244 static void finish_batch_step(batch_control* bctl) {
1245   if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1246     post_batch_completion(bctl);
1247   }
1248 }
1249 
continue_receiving_slices(batch_control * bctl)1250 static void continue_receiving_slices(batch_control* bctl) {
1251   grpc_error* error;
1252   grpc_call* call = bctl->call;
1253   for (;;) {
1254     size_t remaining = call->receiving_stream->length() -
1255                        (*call->receiving_buffer)->data.raw.slice_buffer.length;
1256     if (remaining == 0) {
1257       call->receiving_message = false;
1258       call->receiving_stream.reset();
1259       finish_batch_step(bctl);
1260       return;
1261     }
1262     if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1263       error = call->receiving_stream->Pull(&call->receiving_slice);
1264       if (error == GRPC_ERROR_NONE) {
1265         grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1266                               call->receiving_slice);
1267       } else {
1268         call->receiving_stream.reset();
1269         grpc_byte_buffer_destroy(*call->receiving_buffer);
1270         *call->receiving_buffer = nullptr;
1271         call->receiving_message = false;
1272         finish_batch_step(bctl);
1273         GRPC_ERROR_UNREF(error);
1274         return;
1275       }
1276     } else {
1277       return;
1278     }
1279   }
1280 }
1281 
receiving_slice_ready(void * bctlp,grpc_error * error)1282 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1283   batch_control* bctl = static_cast<batch_control*>(bctlp);
1284   grpc_call* call = bctl->call;
1285   bool release_error = false;
1286 
1287   if (error == GRPC_ERROR_NONE) {
1288     grpc_slice slice;
1289     error = call->receiving_stream->Pull(&slice);
1290     if (error == GRPC_ERROR_NONE) {
1291       grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1292                             slice);
1293       continue_receiving_slices(bctl);
1294     } else {
1295       /* Error returned by ByteStream::Pull() needs to be released manually */
1296       release_error = true;
1297     }
1298   }
1299 
1300   if (error != GRPC_ERROR_NONE) {
1301     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1302       GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1303     }
1304     call->receiving_stream.reset();
1305     grpc_byte_buffer_destroy(*call->receiving_buffer);
1306     *call->receiving_buffer = nullptr;
1307     call->receiving_message = false;
1308     finish_batch_step(bctl);
1309     if (release_error) {
1310       GRPC_ERROR_UNREF(error);
1311     }
1312   }
1313 }
1314 
process_data_after_md(batch_control * bctl)1315 static void process_data_after_md(batch_control* bctl) {
1316   grpc_call* call = bctl->call;
1317   if (call->receiving_stream == nullptr) {
1318     *call->receiving_buffer = nullptr;
1319     call->receiving_message = false;
1320     finish_batch_step(bctl);
1321   } else {
1322     call->test_only_last_message_flags = call->receiving_stream->flags();
1323     if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1324         (call->incoming_message_compression_algorithm >
1325          GRPC_MESSAGE_COMPRESS_NONE)) {
1326       grpc_compression_algorithm algo;
1327       GPR_ASSERT(
1328           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1329               &algo, call->incoming_message_compression_algorithm,
1330               (grpc_stream_compression_algorithm)0));
1331       *call->receiving_buffer =
1332           grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1333     } else {
1334       *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1335     }
1336     GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1337                       grpc_schedule_on_exec_ctx);
1338     continue_receiving_slices(bctl);
1339   }
1340 }
1341 
receiving_stream_ready(void * bctlp,grpc_error * error)1342 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1343   batch_control* bctl = static_cast<batch_control*>(bctlp);
1344   grpc_call* call = bctl->call;
1345   if (error != GRPC_ERROR_NONE) {
1346     call->receiving_stream.reset();
1347     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1348         GRPC_ERROR_NONE) {
1349       gpr_atm_rel_store(&bctl->batch_error,
1350                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1351     }
1352     cancel_with_error(call, GRPC_ERROR_REF(error));
1353   }
1354   /* If recv_state is RECV_NONE, we will save the batch_control
1355    * object with rel_cas, and will not use it after the cas. Its corresponding
1356    * acq_load is in receiving_initial_metadata_ready() */
1357   if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1358       !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
1359     process_data_after_md(bctl);
1360   }
1361 }
1362 
1363 // The recv_message_ready callback used when sending a batch containing
1364 // a recv_message op down the filter stack.  Yields the call combiner
1365 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error * error)1366 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1367                                                     grpc_error* error) {
1368   batch_control* bctl = static_cast<batch_control*>(bctlp);
1369   grpc_call* call = bctl->call;
1370   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1371   receiving_stream_ready(bctlp, error);
1372 }
1373 
1374 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1375 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1376   std::string error_msg = absl::StrFormat(
1377       "Incoming stream has both stream compression (%d) and message "
1378       "compression (%d).",
1379       call->incoming_stream_compression_algorithm,
1380       call->incoming_message_compression_algorithm);
1381   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1382   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1383 }
1384 
1385 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1386 handle_error_parsing_compression_algorithm(grpc_call* call) {
1387   std::string error_msg = absl::StrFormat(
1388       "Error in incoming message compression (%d) or stream "
1389       "compression (%d).",
1390       call->incoming_stream_compression_algorithm,
1391       call->incoming_message_compression_algorithm);
1392   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1393 }
1394 
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1395 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1396     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1397   std::string error_msg = absl::StrFormat(
1398       "Invalid compression algorithm value '%d'.", compression_algorithm);
1399   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1400   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1401 }
1402 
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1403 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1404     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1405   const char* algo_name = nullptr;
1406   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1407   std::string error_msg =
1408       absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1409   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1410   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1411 }
1412 
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1413 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1414     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1415   const char* algo_name = nullptr;
1416   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1417   gpr_log(GPR_ERROR,
1418           "Compression algorithm ('%s') not present in the bitset of "
1419           "accepted encodings ('0x%x')",
1420           algo_name, call->encodings_accepted_by_peer);
1421 }
1422 
validate_filtered_metadata(batch_control * bctl)1423 static void validate_filtered_metadata(batch_control* bctl) {
1424   grpc_compression_algorithm compression_algorithm;
1425   grpc_call* call = bctl->call;
1426   if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1427                        GRPC_STREAM_COMPRESS_NONE &&
1428                    call->incoming_message_compression_algorithm !=
1429                        GRPC_MESSAGE_COMPRESS_NONE)) {
1430     handle_both_stream_and_msg_compression_set(call);
1431   } else if (
1432       GPR_UNLIKELY(
1433           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1434               &compression_algorithm,
1435               call->incoming_message_compression_algorithm,
1436               call->incoming_stream_compression_algorithm) == 0)) {
1437     handle_error_parsing_compression_algorithm(call);
1438   } else {
1439     const grpc_compression_options compression_options =
1440         grpc_channel_compression_options(call->channel);
1441     if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1442       handle_invalid_compression(call, compression_algorithm);
1443     } else if (GPR_UNLIKELY(
1444                    grpc_compression_options_is_algorithm_enabled_internal(
1445                        &compression_options, compression_algorithm) == 0)) {
1446       /* check if algorithm is supported by current channel config */
1447       handle_compression_algorithm_disabled(call, compression_algorithm);
1448     }
1449     /* GRPC_COMPRESS_NONE is always set. */
1450     GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1451     if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1452                                  compression_algorithm))) {
1453       if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1454         handle_compression_algorithm_not_accepted(call, compression_algorithm);
1455       }
1456     }
1457   }
1458 }
1459 
receiving_initial_metadata_ready(void * bctlp,grpc_error * error)1460 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1461   batch_control* bctl = static_cast<batch_control*>(bctlp);
1462   grpc_call* call = bctl->call;
1463 
1464   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1465 
1466   if (error == GRPC_ERROR_NONE) {
1467     grpc_metadata_batch* md =
1468         &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1469     recv_initial_filter(call, md);
1470 
1471     /* TODO(ctiller): this could be moved into recv_initial_filter now */
1472     GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1473     validate_filtered_metadata(bctl);
1474 
1475     if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1476       call->send_deadline = md->deadline;
1477     }
1478   } else {
1479     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1480         GRPC_ERROR_NONE) {
1481       gpr_atm_rel_store(&bctl->batch_error,
1482                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1483     }
1484     cancel_with_error(call, GRPC_ERROR_REF(error));
1485   }
1486 
1487   grpc_closure* saved_rsr_closure = nullptr;
1488   while (true) {
1489     gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1490     /* Should only receive initial metadata once */
1491     GPR_ASSERT(rsr_bctlp != 1);
1492     if (rsr_bctlp == 0) {
1493       /* We haven't seen initial metadata and messages before, thus initial
1494        * metadata is received first.
1495        * no_barrier_cas is used, as this function won't access the batch_control
1496        * object saved by receiving_stream_ready() if the initial metadata is
1497        * received first. */
1498       if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1499                                  RECV_INITIAL_METADATA_FIRST)) {
1500         break;
1501       }
1502     } else {
1503       /* Already received messages */
1504       saved_rsr_closure =
1505           GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1506                               grpc_schedule_on_exec_ctx);
1507       /* No need to modify recv_state */
1508       break;
1509     }
1510   }
1511   if (saved_rsr_closure != nullptr) {
1512     grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1513                             GRPC_ERROR_REF(error));
1514   }
1515 
1516   finish_batch_step(bctl);
1517 }
1518 
receiving_trailing_metadata_ready(void * bctlp,grpc_error * error)1519 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1520   batch_control* bctl = static_cast<batch_control*>(bctlp);
1521   grpc_call* call = bctl->call;
1522   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1523   grpc_metadata_batch* md =
1524       &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1525   recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1526   finish_batch_step(bctl);
1527 }
1528 
finish_batch(void * bctlp,grpc_error * error)1529 static void finish_batch(void* bctlp, grpc_error* error) {
1530   batch_control* bctl = static_cast<batch_control*>(bctlp);
1531   grpc_call* call = bctl->call;
1532   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1533   if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1534       GRPC_ERROR_NONE) {
1535     gpr_atm_rel_store(&bctl->batch_error,
1536                       reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1537   }
1538   if (error != GRPC_ERROR_NONE) {
1539     cancel_with_error(call, GRPC_ERROR_REF(error));
1540   }
1541   finish_batch_step(bctl);
1542 }
1543 
free_no_op_completion(void *,grpc_cq_completion * completion)1544 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1545   gpr_free(completion);
1546 }
1547 
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1548 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1549                                         size_t nops, void* notify_tag,
1550                                         int is_notify_tag_closure) {
1551   GPR_TIMER_SCOPE("call_start_batch", 0);
1552 
1553   size_t i;
1554   const grpc_op* op;
1555   batch_control* bctl;
1556   bool has_send_ops = false;
1557   int num_recv_ops = 0;
1558   grpc_call_error error = GRPC_CALL_OK;
1559   grpc_transport_stream_op_batch* stream_op;
1560   grpc_transport_stream_op_batch_payload* stream_op_payload;
1561 
1562   GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1563 
1564   if (nops == 0) {
1565     if (!is_notify_tag_closure) {
1566       GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1567       grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1568                      free_no_op_completion, nullptr,
1569                      static_cast<grpc_cq_completion*>(
1570                          gpr_malloc(sizeof(grpc_cq_completion))));
1571     } else {
1572       grpc_core::Closure::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag,
1573                               GRPC_ERROR_NONE);
1574     }
1575     error = GRPC_CALL_OK;
1576     goto done;
1577   }
1578 
1579   bctl = reuse_or_allocate_batch_control(call, ops);
1580   if (bctl == nullptr) {
1581     return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1582   }
1583   bctl->completion_data.notify_tag.tag = notify_tag;
1584   bctl->completion_data.notify_tag.is_closure =
1585       static_cast<uint8_t>(is_notify_tag_closure != 0);
1586 
1587   stream_op = &bctl->op;
1588   stream_op_payload = &call->stream_op_payload;
1589 
1590   /* rewrite batch ops into a transport op */
1591   for (i = 0; i < nops; i++) {
1592     op = &ops[i];
1593     if (op->reserved != nullptr) {
1594       error = GRPC_CALL_ERROR;
1595       goto done_with_error;
1596     }
1597     switch (op->op) {
1598       case GRPC_OP_SEND_INITIAL_METADATA: {
1599         /* Flag validation: currently allow no flags */
1600         if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1601           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1602           goto done_with_error;
1603         }
1604         if (call->sent_initial_metadata) {
1605           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1606           goto done_with_error;
1607         }
1608         // TODO(juanlishen): If the user has already specified a compression
1609         // algorithm by setting the initial metadata with key of
1610         // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1611         // with the compression algorithm mapped from compression level.
1612         /* process compression level */
1613         grpc_metadata& compression_md = call->compression_md;
1614         compression_md.key = grpc_empty_slice();
1615         compression_md.value = grpc_empty_slice();
1616         compression_md.flags = 0;
1617         size_t additional_metadata_count = 0;
1618         grpc_compression_level effective_compression_level =
1619             GRPC_COMPRESS_LEVEL_NONE;
1620         bool level_set = false;
1621         if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1622           effective_compression_level =
1623               op->data.send_initial_metadata.maybe_compression_level.level;
1624           level_set = true;
1625         } else {
1626           const grpc_compression_options copts =
1627               grpc_channel_compression_options(call->channel);
1628           if (copts.default_level.is_set) {
1629             level_set = true;
1630             effective_compression_level = copts.default_level.level;
1631           }
1632         }
1633         // Currently, only server side supports compression level setting.
1634         if (level_set && !call->is_client) {
1635           const grpc_compression_algorithm calgo =
1636               compression_algorithm_for_level_locked(
1637                   call, effective_compression_level);
1638           // The following metadata will be checked and removed by the message
1639           // compression filter. It will be used as the call's compression
1640           // algorithm.
1641           compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1642           compression_md.value = grpc_compression_algorithm_slice(calgo);
1643           additional_metadata_count++;
1644         }
1645         if (op->data.send_initial_metadata.count + additional_metadata_count >
1646             INT_MAX) {
1647           error = GRPC_CALL_ERROR_INVALID_METADATA;
1648           goto done_with_error;
1649         }
1650         stream_op->send_initial_metadata = true;
1651         call->sent_initial_metadata = true;
1652         if (!prepare_application_metadata(
1653                 call, static_cast<int>(op->data.send_initial_metadata.count),
1654                 op->data.send_initial_metadata.metadata, 0, call->is_client,
1655                 &compression_md, static_cast<int>(additional_metadata_count))) {
1656           error = GRPC_CALL_ERROR_INVALID_METADATA;
1657           goto done_with_error;
1658         }
1659         /* TODO(ctiller): just make these the same variable? */
1660         if (call->is_client) {
1661           call->metadata_batch[0][0].deadline = call->send_deadline;
1662         }
1663         stream_op_payload->send_initial_metadata.send_initial_metadata =
1664             &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1665         stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1666             op->flags;
1667         if (call->is_client) {
1668           stream_op_payload->send_initial_metadata.peer_string =
1669               &call->peer_string;
1670         }
1671         has_send_ops = true;
1672         break;
1673       }
1674       case GRPC_OP_SEND_MESSAGE: {
1675         if (!are_write_flags_valid(op->flags)) {
1676           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1677           goto done_with_error;
1678         }
1679         if (op->data.send_message.send_message == nullptr) {
1680           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1681           goto done_with_error;
1682         }
1683         if (call->sending_message) {
1684           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1685           goto done_with_error;
1686         }
1687         uint32_t flags = op->flags;
1688         /* If the outgoing buffer is already compressed, mark it as so in the
1689            flags. These will be picked up by the compression filter and further
1690            (wasteful) attempts at compression skipped. */
1691         if (op->data.send_message.send_message->data.raw.compression >
1692             GRPC_COMPRESS_NONE) {
1693           flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1694         }
1695         stream_op->send_message = true;
1696         call->sending_message = true;
1697         call->sending_stream.Init(
1698             &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1699         stream_op_payload->send_message.send_message.reset(
1700             call->sending_stream.get());
1701         has_send_ops = true;
1702         break;
1703       }
1704       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1705         /* Flag validation: currently allow no flags */
1706         if (op->flags != 0) {
1707           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1708           goto done_with_error;
1709         }
1710         if (!call->is_client) {
1711           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1712           goto done_with_error;
1713         }
1714         if (call->sent_final_op) {
1715           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1716           goto done_with_error;
1717         }
1718         stream_op->send_trailing_metadata = true;
1719         call->sent_final_op = true;
1720         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1721             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1722         has_send_ops = true;
1723         break;
1724       }
1725       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1726         /* Flag validation: currently allow no flags */
1727         if (op->flags != 0) {
1728           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1729           goto done_with_error;
1730         }
1731         if (call->is_client) {
1732           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1733           goto done_with_error;
1734         }
1735         if (call->sent_final_op) {
1736           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1737           goto done_with_error;
1738         }
1739         if (op->data.send_status_from_server.trailing_metadata_count >
1740             INT_MAX) {
1741           error = GRPC_CALL_ERROR_INVALID_METADATA;
1742           goto done_with_error;
1743         }
1744         stream_op->send_trailing_metadata = true;
1745         call->sent_final_op = true;
1746         GPR_ASSERT(call->send_extra_metadata_count == 0);
1747         call->send_extra_metadata_count = 1;
1748         call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1749             op->data.send_status_from_server.status);
1750         grpc_error* status_error =
1751             op->data.send_status_from_server.status == GRPC_STATUS_OK
1752                 ? GRPC_ERROR_NONE
1753                 : grpc_error_set_int(
1754                       GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1755                           "Server returned error"),
1756                       GRPC_ERROR_INT_GRPC_STATUS,
1757                       static_cast<intptr_t>(
1758                           op->data.send_status_from_server.status));
1759         if (op->data.send_status_from_server.status_details != nullptr) {
1760           call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1761               GRPC_MDSTR_GRPC_MESSAGE,
1762               grpc_slice_ref_internal(
1763                   *op->data.send_status_from_server.status_details));
1764           call->send_extra_metadata_count++;
1765           if (status_error != GRPC_ERROR_NONE) {
1766             char* msg = grpc_slice_to_c_string(
1767                 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1768             status_error =
1769                 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1770                                    grpc_slice_from_copied_string(msg));
1771             gpr_free(msg);
1772           }
1773         }
1774 
1775         gpr_atm_rel_store(&call->status_error,
1776                           reinterpret_cast<gpr_atm>(status_error));
1777         if (!prepare_application_metadata(
1778                 call,
1779                 static_cast<int>(
1780                     op->data.send_status_from_server.trailing_metadata_count),
1781                 op->data.send_status_from_server.trailing_metadata, 1, 1,
1782                 nullptr, 0)) {
1783           for (int n = 0; n < call->send_extra_metadata_count; n++) {
1784             GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1785           }
1786           call->send_extra_metadata_count = 0;
1787           error = GRPC_CALL_ERROR_INVALID_METADATA;
1788           goto done_with_error;
1789         }
1790         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1791             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1792         stream_op_payload->send_trailing_metadata.sent =
1793             &call->sent_server_trailing_metadata;
1794         has_send_ops = true;
1795         break;
1796       }
1797       case GRPC_OP_RECV_INITIAL_METADATA: {
1798         /* Flag validation: currently allow no flags */
1799         if (op->flags != 0) {
1800           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1801           goto done_with_error;
1802         }
1803         if (call->received_initial_metadata) {
1804           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1805           goto done_with_error;
1806         }
1807         call->received_initial_metadata = true;
1808         call->buffered_metadata[0] =
1809             op->data.recv_initial_metadata.recv_initial_metadata;
1810         GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1811                           receiving_initial_metadata_ready, bctl,
1812                           grpc_schedule_on_exec_ctx);
1813         stream_op->recv_initial_metadata = true;
1814         stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1815             &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1816         stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1817             &call->receiving_initial_metadata_ready;
1818         if (!call->is_client) {
1819           stream_op_payload->recv_initial_metadata.peer_string =
1820               &call->peer_string;
1821         }
1822         ++num_recv_ops;
1823         break;
1824       }
1825       case GRPC_OP_RECV_MESSAGE: {
1826         /* Flag validation: currently allow no flags */
1827         if (op->flags != 0) {
1828           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1829           goto done_with_error;
1830         }
1831         if (call->receiving_message) {
1832           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1833           goto done_with_error;
1834         }
1835         call->receiving_message = true;
1836         stream_op->recv_message = true;
1837         call->receiving_buffer = op->data.recv_message.recv_message;
1838         stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1839         GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1840                           receiving_stream_ready_in_call_combiner, bctl,
1841                           grpc_schedule_on_exec_ctx);
1842         stream_op_payload->recv_message.recv_message_ready =
1843             &call->receiving_stream_ready;
1844         ++num_recv_ops;
1845         break;
1846       }
1847       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1848         /* Flag validation: currently allow no flags */
1849         if (op->flags != 0) {
1850           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1851           goto done_with_error;
1852         }
1853         if (!call->is_client) {
1854           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1855           goto done_with_error;
1856         }
1857         if (call->requested_final_op) {
1858           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1859           goto done_with_error;
1860         }
1861         call->requested_final_op = true;
1862         call->buffered_metadata[1] =
1863             op->data.recv_status_on_client.trailing_metadata;
1864         call->final_op.client.status = op->data.recv_status_on_client.status;
1865         call->final_op.client.status_details =
1866             op->data.recv_status_on_client.status_details;
1867         call->final_op.client.error_string =
1868             op->data.recv_status_on_client.error_string;
1869         stream_op->recv_trailing_metadata = true;
1870         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1871             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1872         stream_op_payload->recv_trailing_metadata.collect_stats =
1873             &call->final_info.stats.transport_stream_stats;
1874         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1875                           receiving_trailing_metadata_ready, bctl,
1876                           grpc_schedule_on_exec_ctx);
1877         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1878             &call->receiving_trailing_metadata_ready;
1879         ++num_recv_ops;
1880         break;
1881       }
1882       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1883         /* Flag validation: currently allow no flags */
1884         if (op->flags != 0) {
1885           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1886           goto done_with_error;
1887         }
1888         if (call->is_client) {
1889           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1890           goto done_with_error;
1891         }
1892         if (call->requested_final_op) {
1893           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1894           goto done_with_error;
1895         }
1896         call->requested_final_op = true;
1897         call->final_op.server.cancelled =
1898             op->data.recv_close_on_server.cancelled;
1899         stream_op->recv_trailing_metadata = true;
1900         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1901             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1902         stream_op_payload->recv_trailing_metadata.collect_stats =
1903             &call->final_info.stats.transport_stream_stats;
1904         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1905                           receiving_trailing_metadata_ready, bctl,
1906                           grpc_schedule_on_exec_ctx);
1907         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1908             &call->receiving_trailing_metadata_ready;
1909         ++num_recv_ops;
1910         break;
1911       }
1912     }
1913   }
1914 
1915   GRPC_CALL_INTERNAL_REF(call, "completion");
1916   if (!is_notify_tag_closure) {
1917     GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1918   }
1919   bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1920 
1921   if (has_send_ops) {
1922     GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1923                       grpc_schedule_on_exec_ctx);
1924     stream_op->on_complete = &bctl->finish_batch;
1925   }
1926 
1927   gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1928   execute_batch(call, stream_op, &bctl->start_batch);
1929 
1930 done:
1931   return error;
1932 
1933 done_with_error:
1934   /* reverse any mutations that occurred */
1935   if (stream_op->send_initial_metadata) {
1936     call->sent_initial_metadata = false;
1937     grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1938   }
1939   if (stream_op->send_message) {
1940     call->sending_message = false;
1941     call->sending_stream->Orphan();
1942   }
1943   if (stream_op->send_trailing_metadata) {
1944     call->sent_final_op = false;
1945     grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1946   }
1947   if (stream_op->recv_initial_metadata) {
1948     call->received_initial_metadata = false;
1949   }
1950   if (stream_op->recv_message) {
1951     call->receiving_message = false;
1952   }
1953   if (stream_op->recv_trailing_metadata) {
1954     call->requested_final_op = false;
1955   }
1956   goto done;
1957 }
1958 
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1959 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1960                                       size_t nops, void* tag, void* reserved) {
1961   grpc_call_error err;
1962 
1963   GRPC_API_TRACE(
1964       "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1965       "reserved=%p)",
1966       5, (call, ops, (unsigned long)nops, tag, reserved));
1967 
1968   if (reserved != nullptr) {
1969     err = GRPC_CALL_ERROR;
1970   } else {
1971     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1972     grpc_core::ExecCtx exec_ctx;
1973     err = call_start_batch(call, ops, nops, tag, 0);
1974   }
1975 
1976   return err;
1977 }
1978 
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1979 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1980                                                   const grpc_op* ops,
1981                                                   size_t nops,
1982                                                   grpc_closure* closure) {
1983   return call_start_batch(call, ops, nops, closure, 1);
1984 }
1985 
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1986 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1987                            void* value, void (*destroy)(void* value)) {
1988   if (call->context[elem].destroy) {
1989     call->context[elem].destroy(call->context[elem].value);
1990   }
1991   call->context[elem].value = value;
1992   call->context[elem].destroy = destroy;
1993 }
1994 
grpc_call_context_get(grpc_call * call,grpc_context_index elem)1995 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1996   return call->context[elem].value;
1997 }
1998 
grpc_call_is_client(grpc_call * call)1999 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2000 
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2001 grpc_compression_algorithm grpc_call_compression_for_level(
2002     grpc_call* call, grpc_compression_level level) {
2003   grpc_compression_algorithm algo =
2004       compression_algorithm_for_level_locked(call, level);
2005   return algo;
2006 }
2007 
grpc_call_error_to_string(grpc_call_error error)2008 const char* grpc_call_error_to_string(grpc_call_error error) {
2009   switch (error) {
2010     case GRPC_CALL_ERROR:
2011       return "GRPC_CALL_ERROR";
2012     case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2013       return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2014     case GRPC_CALL_ERROR_ALREADY_FINISHED:
2015       return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2016     case GRPC_CALL_ERROR_ALREADY_INVOKED:
2017       return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2018     case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2019       return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2020     case GRPC_CALL_ERROR_INVALID_FLAGS:
2021       return "GRPC_CALL_ERROR_INVALID_FLAGS";
2022     case GRPC_CALL_ERROR_INVALID_MESSAGE:
2023       return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2024     case GRPC_CALL_ERROR_INVALID_METADATA:
2025       return "GRPC_CALL_ERROR_INVALID_METADATA";
2026     case GRPC_CALL_ERROR_NOT_INVOKED:
2027       return "GRPC_CALL_ERROR_NOT_INVOKED";
2028     case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2029       return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2030     case GRPC_CALL_ERROR_NOT_ON_SERVER:
2031       return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2032     case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2033       return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2034     case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2035       return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2036     case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2037       return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2038     case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2039       return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2040     case GRPC_CALL_OK:
2041       return "GRPC_CALL_OK";
2042   }
2043   GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2044 }
2045