1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/surface/call.h"
22 
23 #include <assert.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 
29 #include <string>
30 
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/str_format.h"
33 
34 #include <grpc/compression.h>
35 #include <grpc/grpc.h>
36 #include <grpc/slice.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 #include <grpc/support/string_util.h>
40 
41 #include "src/core/lib/channel/channel_stack.h"
42 #include "src/core/lib/compression/algorithm_metadata.h"
43 #include "src/core/lib/debug/stats.h"
44 #include "src/core/lib/gpr/alloc.h"
45 #include "src/core/lib/gpr/string.h"
46 #include "src/core/lib/gpr/time_precise.h"
47 #include "src/core/lib/gpr/useful.h"
48 #include "src/core/lib/gprpp/arena.h"
49 #include "src/core/lib/gprpp/manual_constructor.h"
50 #include "src/core/lib/gprpp/ref_counted.h"
51 #include "src/core/lib/iomgr/timer.h"
52 #include "src/core/lib/profiling/timers.h"
53 #include "src/core/lib/slice/slice_split.h"
54 #include "src/core/lib/slice/slice_string_helpers.h"
55 #include "src/core/lib/slice/slice_utils.h"
56 #include "src/core/lib/surface/api_trace.h"
57 #include "src/core/lib/surface/call_test_only.h"
58 #include "src/core/lib/surface/channel.h"
59 #include "src/core/lib/surface/completion_queue.h"
60 #include "src/core/lib/surface/server.h"
61 #include "src/core/lib/surface/validate_metadata.h"
62 #include "src/core/lib/transport/error_utils.h"
63 #include "src/core/lib/transport/metadata.h"
64 #include "src/core/lib/transport/static_metadata.h"
65 #include "src/core/lib/transport/status_metadata.h"
66 #include "src/core/lib/transport/transport.h"
67 
68 /** The maximum number of concurrent batches possible.
69     Based upon the maximum number of individually queueable ops in the batch
70     api:
71       - initial metadata send
72       - message send
73       - status/close send (depending on client/server)
74       - initial metadata recv
75       - message recv
76       - status/close recv (depending on client/server) */
77 #define MAX_CONCURRENT_BATCHES 6
78 
79 #define MAX_SEND_EXTRA_METADATA_COUNT 3
80 
81 // Used to create arena for the first call.
82 #define ESTIMATED_MDELEM_COUNT 16
83 
84 struct batch_control {
85   batch_control() = default;
86 
87   grpc_call* call = nullptr;
88   grpc_transport_stream_op_batch op;
89   /* Share memory for cq_completion and notify_tag as they are never needed
90      simultaneously. Each byte used in this data structure count as six bytes
91      per call, so any savings we can make are worthwhile,
92 
93      We use notify_tag to determine whether or not to send notification to the
94      completion queue. Once we've made that determination, we can reuse the
95      memory for cq_completion. */
96   union {
97     grpc_cq_completion cq_completion;
98     struct {
99       /* Any given op indicates completion by either (a) calling a closure or
100          (b) sending a notification on the call's completion queue.  If
101          \a is_closure is true, \a tag indicates a closure to be invoked;
102          otherwise, \a tag indicates the tag to be used in the notification to
103          be sent to the completion queue. */
104       void* tag;
105       bool is_closure;
106     } notify_tag;
107   } completion_data;
108   grpc_closure start_batch;
109   grpc_closure finish_batch;
110   std::atomic<intptr_t> steps_to_complete{0};
111   AtomicError batch_error;
set_num_steps_to_completebatch_control112   void set_num_steps_to_complete(uintptr_t steps) {
113     steps_to_complete.store(steps, std::memory_order_release);
114   }
completed_batch_stepbatch_control115   bool completed_batch_step() {
116     return steps_to_complete.fetch_sub(1, std::memory_order_acq_rel) == 1;
117   }
118 };
119 
120 struct parent_call {
parent_callparent_call121   parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call122   ~parent_call() { gpr_mu_destroy(&child_list_mu); }
123 
124   gpr_mu child_list_mu;
125   grpc_call* first_child = nullptr;
126 };
127 
128 struct child_call {
child_callchild_call129   explicit child_call(grpc_call* parent) : parent(parent) {}
130   grpc_call* parent;
131   /** siblings: children of the same parent form a list, and this list is
132      protected under
133       parent->mu */
134   grpc_call* sibling_next = nullptr;
135   grpc_call* sibling_prev = nullptr;
136 };
137 
138 #define RECV_NONE ((gpr_atm)0)
139 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
140 
141 struct grpc_call {
grpc_callgrpc_call142   grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
143       : arena(arena),
144         cq(args.cq),
145         channel(args.channel),
146         is_client(args.server_transport_data == nullptr),
147         stream_op_payload(context) {}
148 
~grpc_callgrpc_call149   ~grpc_call() {
150     for (int i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
151       if (context[i].destroy) {
152         context[i].destroy(context[i].value);
153       }
154     }
155     gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
156   }
157 
158   grpc_core::RefCount ext_ref;
159   grpc_core::Arena* arena;
160   grpc_core::CallCombiner call_combiner;
161   grpc_completion_queue* cq;
162   grpc_polling_entity pollent;
163   grpc_channel* channel;
164   gpr_cycle_counter start_time = gpr_get_cycle_counter();
165   /* parent_call* */ gpr_atm parent_call_atm = 0;
166   child_call* child = nullptr;
167 
168   /* client or server call */
169   bool is_client;
170   /** has grpc_call_unref been called */
171   bool destroy_called = false;
172   /** flag indicating that cancellation is inherited */
173   bool cancellation_is_inherited = false;
174   // Trailers-only response status
175   bool is_trailers_only = false;
176   /** which ops are in-flight */
177   bool sent_initial_metadata = false;
178   bool sending_message = false;
179   bool sent_final_op = false;
180   bool received_initial_metadata = false;
181   bool receiving_message = false;
182   bool requested_final_op = false;
183   gpr_atm any_ops_sent_atm = 0;
184   gpr_atm received_final_op_atm = 0;
185 
186   batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
187   grpc_transport_stream_op_batch_payload stream_op_payload;
188 
189   /* first idx: is_receiving, second idx: is_trailing */
190   grpc_metadata_batch send_initial_metadata{arena};
191   grpc_metadata_batch send_trailing_metadata{arena};
192   grpc_metadata_batch recv_initial_metadata{arena};
193   grpc_metadata_batch recv_trailing_metadata{arena};
194 
195   /* Buffered read metadata waiting to be returned to the application.
196      Element 0 is initial metadata, element 1 is trailing metadata. */
197   grpc_metadata_array* buffered_metadata[2] = {};
198 
199   grpc_metadata compression_md;
200 
201   // A char* indicating the peer name.
202   gpr_atm peer_string = 0;
203 
204   /* Call data useful used for reporting. Only valid after the call has
205    * completed */
206   grpc_call_final_info final_info;
207 
208   /* Compression algorithm for *incoming* data */
209   grpc_message_compression_algorithm incoming_message_compression_algorithm =
210       GRPC_MESSAGE_COMPRESS_NONE;
211   /* Stream compression algorithm for *incoming* data */
212   grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
213       GRPC_STREAM_COMPRESS_NONE;
214   /* Supported encodings (compression algorithms), a bitset.
215    * Always support no compression. */
216   uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
217   /* Supported stream encodings (stream compression algorithms), a bitset */
218   uint32_t stream_encodings_accepted_by_peer = 0;
219 
220   /* Contexts for various subsystems (security, tracing, ...). */
221   grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
222 
223   /* for the client, extra metadata is initial metadata; for the
224      server, it's trailing metadata */
225   grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
226   int send_extra_metadata_count;
227   grpc_millis send_deadline;
228 
229   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
230 
231   grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
232   bool call_failed_before_recv_message = false;
233   grpc_byte_buffer** receiving_buffer = nullptr;
234   grpc_slice receiving_slice = grpc_empty_slice();
235   grpc_closure receiving_slice_ready;
236   grpc_closure receiving_stream_ready;
237   grpc_closure receiving_initial_metadata_ready;
238   grpc_closure receiving_trailing_metadata_ready;
239   uint32_t test_only_last_message_flags = 0;
240   // Status about operation of call
241   bool sent_server_trailing_metadata = false;
242   gpr_atm cancelled_with_error = 0;
243 
244   grpc_closure release_call;
245 
246   union {
247     struct {
248       grpc_status_code* status;
249       grpc_slice* status_details;
250       const char** error_string;
251     } client;
252     struct {
253       int* cancelled;
254       // backpointer to owning server if this is a server side call.
255       grpc_core::Server* core_server;
256     } server;
257   } final_op;
258   AtomicError status_error;
259 
260   /* recv_state can contain one of the following values:
261      RECV_NONE :                 :  no initial metadata and messages received
262      RECV_INITIAL_METADATA_FIRST :  received initial metadata first
263      a batch_control*            :  received messages first
264 
265                  +------1------RECV_NONE------3-----+
266                  |                                  |
267                  |                                  |
268                  v                                  v
269      RECV_INITIAL_METADATA_FIRST        receiving_stream_ready_bctlp
270            |           ^                      |           ^
271            |           |                      |           |
272            +-----2-----+                      +-----4-----+
273 
274     For 1, 4: See receiving_initial_metadata_ready() function
275     For 2, 3: See receiving_stream_ready() function */
276   gpr_atm recv_state = 0;
277 };
278 
279 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
280 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
281 
282 #define CALL_STACK_FROM_CALL(call)   \
283   (grpc_call_stack*)((char*)(call) + \
284                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
285 #define CALL_FROM_CALL_STACK(call_stack) \
286   (grpc_call*)(((char*)(call_stack)) -   \
287                GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
288 
289 #define CALL_ELEM_FROM_CALL(call, idx) \
290   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
291 #define CALL_FROM_TOP_ELEM(top_elem) \
292   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
293 
294 static void execute_batch(grpc_call* call,
295                           grpc_transport_stream_op_batch* batch,
296                           grpc_closure* start_batch_closure);
297 
298 static void cancel_with_status(grpc_call* c, grpc_status_code status,
299                                const char* description);
300 static void cancel_with_error(grpc_call* c, grpc_error_handle error);
301 static void destroy_call(void* call_stack, grpc_error_handle error);
302 static void receiving_slice_ready(void* bctlp, grpc_error_handle error);
303 static void set_final_status(grpc_call* call, grpc_error_handle error);
304 static void process_data_after_md(batch_control* bctl);
305 static void post_batch_completion(batch_control* bctl);
306 
add_init_error(grpc_error_handle * composite,grpc_error_handle new_err)307 static void add_init_error(grpc_error_handle* composite,
308                            grpc_error_handle new_err) {
309   if (new_err == GRPC_ERROR_NONE) return;
310   if (*composite == GRPC_ERROR_NONE) {
311     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
312   }
313   *composite = grpc_error_add_child(*composite, new_err);
314 }
315 
grpc_call_arena_alloc(grpc_call * call,size_t size)316 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
317   return call->arena->Alloc(size);
318 }
319 
get_or_create_parent_call(grpc_call * call)320 static parent_call* get_or_create_parent_call(grpc_call* call) {
321   parent_call* p =
322       reinterpret_cast<parent_call*>(gpr_atm_acq_load(&call->parent_call_atm));
323   if (p == nullptr) {
324     p = call->arena->New<parent_call>();
325     if (!gpr_atm_rel_cas(&call->parent_call_atm,
326                          reinterpret_cast<gpr_atm>(nullptr),
327                          reinterpret_cast<gpr_atm>(p))) {
328       p->~parent_call();
329       p = reinterpret_cast<parent_call*>(
330           gpr_atm_acq_load(&call->parent_call_atm));
331     }
332   }
333   return p;
334 }
335 
get_parent_call(grpc_call * call)336 static parent_call* get_parent_call(grpc_call* call) {
337   return reinterpret_cast<parent_call*>(
338       gpr_atm_acq_load(&call->parent_call_atm));
339 }
340 
grpc_call_get_initial_size_estimate()341 size_t grpc_call_get_initial_size_estimate() {
342   return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
343          sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
344 }
345 
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)346 grpc_error_handle grpc_call_create(const grpc_call_create_args* args,
347                                    grpc_call** out_call) {
348   GPR_TIMER_SCOPE("grpc_call_create", 0);
349 
350   GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
351 
352   grpc_core::Arena* arena;
353   grpc_call* call;
354   grpc_error_handle error = GRPC_ERROR_NONE;
355   grpc_channel_stack* channel_stack =
356       grpc_channel_get_channel_stack(args->channel);
357   size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
358   GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
359   size_t call_and_stack_size =
360       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
361       channel_stack->call_stack_size;
362   size_t call_alloc_size =
363       call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
364 
365   std::pair<grpc_core::Arena*, void*> arena_with_call =
366       grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
367   arena = arena_with_call.first;
368   call = new (arena_with_call.second) grpc_call(arena, *args);
369   *out_call = call;
370   grpc_slice path = grpc_empty_slice();
371   if (call->is_client) {
372     call->final_op.client.status_details = nullptr;
373     call->final_op.client.status = nullptr;
374     call->final_op.client.error_string = nullptr;
375     GRPC_STATS_INC_CLIENT_CALLS_CREATED();
376     GPR_ASSERT(args->add_initial_metadata_count <
377                MAX_SEND_EXTRA_METADATA_COUNT);
378     for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
379       call->send_extra_metadata[i].md = args->add_initial_metadata[i];
380       if (grpc_slice_eq_static_interned(
381               GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
382         path = grpc_slice_ref_internal(
383             GRPC_MDVALUE(args->add_initial_metadata[i]));
384       }
385     }
386     call->send_extra_metadata_count =
387         static_cast<int>(args->add_initial_metadata_count);
388   } else {
389     GRPC_STATS_INC_SERVER_CALLS_CREATED();
390     call->final_op.server.cancelled = nullptr;
391     call->final_op.server.core_server = args->server;
392     GPR_ASSERT(args->add_initial_metadata_count == 0);
393     call->send_extra_metadata_count = 0;
394   }
395 
396   grpc_millis send_deadline = args->send_deadline;
397   bool immediately_cancel = false;
398 
399   if (args->parent != nullptr) {
400     call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
401                        call_and_stack_size) child_call(args->parent);
402 
403     GRPC_CALL_INTERNAL_REF(args->parent, "child");
404     GPR_ASSERT(call->is_client);
405     GPR_ASSERT(!args->parent->is_client);
406 
407     if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
408       send_deadline = std::min(send_deadline, args->parent->send_deadline);
409     }
410     /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
411      * GRPC_PROPAGATE_STATS_CONTEXT */
412     /* TODO(ctiller): This should change to use the appropriate census start_op
413      * call. */
414     if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
415       if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
416         add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
417                                    "Census tracing propagation requested "
418                                    "without Census context propagation"));
419       }
420       grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
421                             args->parent->context[GRPC_CONTEXT_TRACING].value,
422                             nullptr);
423     } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
424       add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
425                                  "Census context propagation requested "
426                                  "without Census tracing propagation"));
427     }
428     if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
429       call->cancellation_is_inherited = true;
430       if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
431         immediately_cancel = true;
432       }
433     }
434   }
435   call->send_deadline = send_deadline;
436   /* initial refcount dropped by grpc_call_unref */
437   grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
438                                       args->server_transport_data,
439                                       call->context,
440                                       path,
441                                       call->start_time,
442                                       send_deadline,
443                                       call->arena,
444                                       &call->call_combiner};
445   add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
446                                               call, &call_args));
447   // Publish this call to parent only after the call stack has been initialized.
448   if (args->parent != nullptr) {
449     child_call* cc = call->child;
450     parent_call* pc = get_or_create_parent_call(args->parent);
451     gpr_mu_lock(&pc->child_list_mu);
452     if (pc->first_child == nullptr) {
453       pc->first_child = call;
454       cc->sibling_next = cc->sibling_prev = call;
455     } else {
456       cc->sibling_next = pc->first_child;
457       cc->sibling_prev = pc->first_child->child->sibling_prev;
458       cc->sibling_next->child->sibling_prev =
459           cc->sibling_prev->child->sibling_next = call;
460     }
461     gpr_mu_unlock(&pc->child_list_mu);
462   }
463 
464   if (error != GRPC_ERROR_NONE) {
465     cancel_with_error(call, GRPC_ERROR_REF(error));
466   }
467   if (immediately_cancel) {
468     cancel_with_error(call, GRPC_ERROR_CANCELLED);
469   }
470   if (args->cq != nullptr) {
471     GPR_ASSERT(args->pollset_set_alternative == nullptr &&
472                "Only one of 'cq' and 'pollset_set_alternative' should be "
473                "non-nullptr.");
474     GRPC_CQ_INTERNAL_REF(args->cq, "bind");
475     call->pollent =
476         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
477   }
478   if (args->pollset_set_alternative != nullptr) {
479     call->pollent = grpc_polling_entity_create_from_pollset_set(
480         args->pollset_set_alternative);
481   }
482   if (!grpc_polling_entity_is_empty(&call->pollent)) {
483     grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
484                                                &call->pollent);
485   }
486 
487   if (call->is_client) {
488     grpc_core::channelz::ChannelNode* channelz_channel =
489         grpc_channel_get_channelz_node(call->channel);
490     if (channelz_channel != nullptr) {
491       channelz_channel->RecordCallStarted();
492     }
493   } else if (call->final_op.server.core_server != nullptr) {
494     grpc_core::channelz::ServerNode* channelz_node =
495         call->final_op.server.core_server->channelz_node();
496     if (channelz_node != nullptr) {
497       channelz_node->RecordCallStarted();
498     }
499   }
500 
501   grpc_slice_unref_internal(path);
502 
503   return error;
504 }
505 
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)506 void grpc_call_set_completion_queue(grpc_call* call,
507                                     grpc_completion_queue* cq) {
508   GPR_ASSERT(cq);
509 
510   if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
511     gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
512     abort();
513   }
514   call->cq = cq;
515   GRPC_CQ_INTERNAL_REF(cq, "bind");
516   call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
517   grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
518                                              &call->pollent);
519 }
520 
521 #ifndef NDEBUG
522 #define REF_REASON reason
523 #define REF_ARG , const char* reason
524 #else
525 #define REF_REASON ""
526 #define REF_ARG
527 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)528 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
529   GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
530 }
grpc_call_internal_unref(grpc_call * c REF_ARG)531 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
532   GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
533 }
534 
release_call(void * call,grpc_error_handle)535 static void release_call(void* call, grpc_error_handle /*error*/) {
536   grpc_call* c = static_cast<grpc_call*>(call);
537   grpc_channel* channel = c->channel;
538   grpc_core::Arena* arena = c->arena;
539   c->~grpc_call();
540   grpc_channel_update_call_size_estimate(channel, arena->Destroy());
541   GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
542 }
543 
destroy_call(void * call,grpc_error_handle)544 static void destroy_call(void* call, grpc_error_handle /*error*/) {
545   GPR_TIMER_SCOPE("destroy_call", 0);
546   grpc_call* c = static_cast<grpc_call*>(call);
547   c->recv_initial_metadata.Clear();
548   c->recv_trailing_metadata.Clear();
549   c->receiving_stream.reset();
550   parent_call* pc = get_parent_call(c);
551   if (pc != nullptr) {
552     pc->~parent_call();
553   }
554   for (int i = 0; i < c->send_extra_metadata_count; i++) {
555     GRPC_MDELEM_UNREF(c->send_extra_metadata[i].md);
556   }
557   if (c->cq) {
558     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
559   }
560 
561   grpc_error_handle status_error = c->status_error.get();
562   grpc_error_get_status(status_error, c->send_deadline,
563                         &c->final_info.final_status, nullptr, nullptr,
564                         &(c->final_info.error_string));
565   c->status_error.set(GRPC_ERROR_NONE);
566   c->final_info.stats.latency =
567       gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
568   grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
569                           GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
570                                             grpc_schedule_on_exec_ctx));
571 }
572 
grpc_call_ref(grpc_call * c)573 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
574 
grpc_call_unref(grpc_call * c)575 void grpc_call_unref(grpc_call* c) {
576   if (GPR_LIKELY(!c->ext_ref.Unref())) return;
577 
578   GPR_TIMER_SCOPE("grpc_call_unref", 0);
579 
580   child_call* cc = c->child;
581   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
582   grpc_core::ExecCtx exec_ctx;
583 
584   GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
585 
586   if (cc) {
587     parent_call* pc = get_parent_call(cc->parent);
588     gpr_mu_lock(&pc->child_list_mu);
589     if (c == pc->first_child) {
590       pc->first_child = cc->sibling_next;
591       if (c == pc->first_child) {
592         pc->first_child = nullptr;
593       }
594     }
595     cc->sibling_prev->child->sibling_next = cc->sibling_next;
596     cc->sibling_next->child->sibling_prev = cc->sibling_prev;
597     gpr_mu_unlock(&pc->child_list_mu);
598     GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
599   }
600 
601   GPR_ASSERT(!c->destroy_called);
602   c->destroy_called = true;
603   bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
604                 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
605   if (cancel) {
606     cancel_with_error(c, GRPC_ERROR_CANCELLED);
607   } else {
608     // Unset the call combiner cancellation closure.  This has the
609     // effect of scheduling the previously set cancellation closure, if
610     // any, so that it can release any internal references it may be
611     // holding to the call stack.
612     c->call_combiner.SetNotifyOnCancel(nullptr);
613   }
614   GRPC_CALL_INTERNAL_UNREF(c, "destroy");
615 }
616 
grpc_call_cancel(grpc_call * call,void * reserved)617 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
618   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
619   GPR_ASSERT(!reserved);
620   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
621   grpc_core::ExecCtx exec_ctx;
622   cancel_with_error(call, GRPC_ERROR_CANCELLED);
623   return GRPC_CALL_OK;
624 }
625 
626 // This is called via the call combiner to start sending a batch down
627 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error_handle)628 static void execute_batch_in_call_combiner(void* arg,
629                                            grpc_error_handle /*ignored*/) {
630   GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
631   grpc_transport_stream_op_batch* batch =
632       static_cast<grpc_transport_stream_op_batch*>(arg);
633   grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
634   grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
635   GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
636   elem->filter->start_transport_stream_op_batch(elem, batch);
637 }
638 
639 // start_batch_closure points to a caller-allocated closure to be used
640 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)641 static void execute_batch(grpc_call* call,
642                           grpc_transport_stream_op_batch* batch,
643                           grpc_closure* start_batch_closure) {
644   batch->handler_private.extra_arg = call;
645   GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
646                     grpc_schedule_on_exec_ctx);
647   GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
648                            GRPC_ERROR_NONE, "executing batch");
649 }
650 
grpc_call_get_peer(grpc_call * call)651 char* grpc_call_get_peer(grpc_call* call) {
652   char* peer_string =
653       reinterpret_cast<char*>(gpr_atm_acq_load(&call->peer_string));
654   if (peer_string != nullptr) return gpr_strdup(peer_string);
655   peer_string = grpc_channel_get_target(call->channel);
656   if (peer_string != nullptr) return peer_string;
657   return gpr_strdup("unknown");
658 }
659 
grpc_call_from_top_element(grpc_call_element * surface_element)660 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
661   return CALL_FROM_TOP_ELEM(surface_element);
662 }
663 
664 /*******************************************************************************
665  * CANCELLATION
666  */
667 
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)668 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
669                                              grpc_status_code status,
670                                              const char* description,
671                                              void* reserved) {
672   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
673   grpc_core::ExecCtx exec_ctx;
674   GRPC_API_TRACE(
675       "grpc_call_cancel_with_status("
676       "c=%p, status=%d, description=%s, reserved=%p)",
677       4, (c, (int)status, description, reserved));
678   GPR_ASSERT(reserved == nullptr);
679   cancel_with_status(c, status, description);
680   return GRPC_CALL_OK;
681 }
682 
683 struct cancel_state {
684   grpc_call* call;
685   grpc_closure start_batch;
686   grpc_closure finish_batch;
687 };
688 // The on_complete callback used when sending a cancel_stream batch down
689 // the filter stack.  Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error_handle)690 static void done_termination(void* arg, grpc_error_handle /*error*/) {
691   cancel_state* state = static_cast<cancel_state*>(arg);
692   GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
693                           "on_complete for cancel_stream op");
694   GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
695   gpr_free(state);
696 }
697 
cancel_with_error(grpc_call * c,grpc_error_handle error)698 static void cancel_with_error(grpc_call* c, grpc_error_handle error) {
699   if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
700     GRPC_ERROR_UNREF(error);
701     return;
702   }
703   GRPC_CALL_INTERNAL_REF(c, "termination");
704   // Inform the call combiner of the cancellation, so that it can cancel
705   // any in-flight asynchronous actions that may be holding the call
706   // combiner.  This ensures that the cancel_stream batch can be sent
707   // down the filter stack in a timely manner.
708   c->call_combiner.Cancel(GRPC_ERROR_REF(error));
709   cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
710   state->call = c;
711   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
712                     grpc_schedule_on_exec_ctx);
713   grpc_transport_stream_op_batch* op =
714       grpc_make_transport_stream_op(&state->finish_batch);
715   op->cancel_stream = true;
716   op->payload->cancel_stream.cancel_error = error;
717   execute_batch(c, op, &state->start_batch);
718 }
719 
grpc_call_cancel_internal(grpc_call * call)720 void grpc_call_cancel_internal(grpc_call* call) {
721   cancel_with_error(call, GRPC_ERROR_CANCELLED);
722 }
723 
error_from_status(grpc_status_code status,const char * description)724 static grpc_error_handle error_from_status(grpc_status_code status,
725                                            const char* description) {
726   // copying 'description' is needed to ensure the grpc_call_cancel_with_status
727   // guarantee that can be short-lived.
728   return grpc_error_set_int(
729       grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
730                          GRPC_ERROR_STR_GRPC_MESSAGE, description),
731       GRPC_ERROR_INT_GRPC_STATUS, status);
732 }
733 
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)734 static void cancel_with_status(grpc_call* c, grpc_status_code status,
735                                const char* description) {
736   cancel_with_error(c, error_from_status(status, description));
737 }
738 
set_final_status(grpc_call * call,grpc_error_handle error)739 static void set_final_status(grpc_call* call, grpc_error_handle error) {
740   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
741     gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
742     gpr_log(GPR_DEBUG, "%s", grpc_error_std_string(error).c_str());
743   }
744   if (call->is_client) {
745     std::string status_details;
746     grpc_error_get_status(error, call->send_deadline,
747                           call->final_op.client.status, &status_details,
748                           nullptr, call->final_op.client.error_string);
749     *call->final_op.client.status_details =
750         grpc_slice_from_cpp_string(std::move(status_details));
751     call->status_error.set(error);
752     GRPC_ERROR_UNREF(error);
753     grpc_core::channelz::ChannelNode* channelz_channel =
754         grpc_channel_get_channelz_node(call->channel);
755     if (channelz_channel != nullptr) {
756       if (*call->final_op.client.status != GRPC_STATUS_OK) {
757         channelz_channel->RecordCallFailed();
758       } else {
759         channelz_channel->RecordCallSucceeded();
760       }
761     }
762   } else {
763     *call->final_op.server.cancelled =
764         error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
765     grpc_core::channelz::ServerNode* channelz_node =
766         call->final_op.server.core_server->channelz_node();
767     if (channelz_node != nullptr) {
768       if (*call->final_op.server.cancelled || !call->status_error.ok()) {
769         channelz_node->RecordCallFailed();
770       } else {
771         channelz_node->RecordCallSucceeded();
772       }
773     }
774     GRPC_ERROR_UNREF(error);
775   }
776 }
777 
778 /*******************************************************************************
779  * COMPRESSION
780  */
781 
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)782 static void set_incoming_message_compression_algorithm(
783     grpc_call* call, grpc_message_compression_algorithm algo) {
784   GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
785   call->incoming_message_compression_algorithm = algo;
786 }
787 
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)788 static void set_incoming_stream_compression_algorithm(
789     grpc_call* call, grpc_stream_compression_algorithm algo) {
790   GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
791   call->incoming_stream_compression_algorithm = algo;
792 }
793 
grpc_call_test_only_get_compression_algorithm(grpc_call * call)794 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
795     grpc_call* call) {
796   grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
797   grpc_compression_algorithm_from_message_stream_compression_algorithm(
798       &algorithm, call->incoming_message_compression_algorithm,
799       call->incoming_stream_compression_algorithm);
800   return algorithm;
801 }
802 
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)803 static grpc_compression_algorithm compression_algorithm_for_level_locked(
804     grpc_call* call, grpc_compression_level level) {
805   return grpc_compression_algorithm_for_level(level,
806                                               call->encodings_accepted_by_peer);
807 }
808 
grpc_call_test_only_get_message_flags(grpc_call * call)809 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
810   uint32_t flags;
811   flags = call->test_only_last_message_flags;
812   return flags;
813 }
814 
destroy_encodings_accepted_by_peer(void *)815 static void destroy_encodings_accepted_by_peer(void* /*p*/) {}
816 
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)817 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
818                                            grpc_mdelem mdel,
819                                            uint32_t* encodings_accepted_by_peer,
820                                            bool stream_encoding) {
821   size_t i;
822   uint32_t algorithm;
823   grpc_slice_buffer accept_encoding_parts;
824   grpc_slice accept_encoding_slice;
825   void* accepted_user_data;
826 
827   accepted_user_data =
828       grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
829   if (accepted_user_data != nullptr) {
830     *encodings_accepted_by_peer = static_cast<uint32_t>(
831         reinterpret_cast<uintptr_t>(accepted_user_data) - 1);
832     return;
833   }
834 
835   *encodings_accepted_by_peer = 0;
836 
837   accept_encoding_slice = GRPC_MDVALUE(mdel);
838   grpc_slice_buffer_init(&accept_encoding_parts);
839   grpc_slice_split_without_space(accept_encoding_slice, ",",
840                                  &accept_encoding_parts);
841 
842   grpc_core::SetBit(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
843   for (i = 0; i < accept_encoding_parts.count; i++) {
844     int r;
845     grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
846     if (!stream_encoding) {
847       r = grpc_message_compression_algorithm_parse(
848           accept_encoding_entry_slice,
849           reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
850     } else {
851       r = grpc_stream_compression_algorithm_parse(
852           accept_encoding_entry_slice,
853           reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
854     }
855     if (r) {
856       grpc_core::SetBit(encodings_accepted_by_peer, algorithm);
857     } else {
858       char* accept_encoding_entry_str =
859           grpc_slice_to_c_string(accept_encoding_entry_slice);
860       gpr_log(GPR_DEBUG,
861               "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
862               accept_encoding_entry_str);
863       gpr_free(accept_encoding_entry_str);
864     }
865   }
866 
867   grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
868 
869   grpc_mdelem_set_user_data(
870       mdel, destroy_encodings_accepted_by_peer,
871       reinterpret_cast<void*>(
872           static_cast<uintptr_t>(*encodings_accepted_by_peer) + 1));
873 }
874 
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)875 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
876   uint32_t encodings_accepted_by_peer;
877   encodings_accepted_by_peer = call->encodings_accepted_by_peer;
878   return encodings_accepted_by_peer;
879 }
880 
881 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)882 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
883   return call->incoming_stream_compression_algorithm;
884 }
885 
linked_from_md(grpc_metadata * md)886 static grpc_linked_mdelem* linked_from_md(grpc_metadata* md) {
887   return reinterpret_cast<grpc_linked_mdelem*>(&md->internal_data);
888 }
889 
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)890 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
891                                   grpc_metadata* additional_metadata, int i,
892                                   int count) {
893   grpc_metadata* res =
894       i < count ? &metadata[i] : &additional_metadata[i - count];
895   GPR_ASSERT(res);
896   return res;
897 }
898 
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)899 static int prepare_application_metadata(grpc_call* call, int count,
900                                         grpc_metadata* metadata,
901                                         int is_trailing,
902                                         int prepend_extra_metadata,
903                                         grpc_metadata* additional_metadata,
904                                         int additional_metadata_count) {
905   int total_count = count + additional_metadata_count;
906   int i;
907   grpc_metadata_batch* batch = is_trailing ? &call->send_trailing_metadata
908                                            : &call->send_initial_metadata;
909   for (i = 0; i < total_count; i++) {
910     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
911     grpc_linked_mdelem* l = linked_from_md(md);
912     GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
913     if (!GRPC_LOG_IF_ERROR("validate_metadata",
914                            grpc_validate_header_key_is_legal(md->key))) {
915       break;
916     } else if (!grpc_is_binary_header_internal(md->key) &&
917                !GRPC_LOG_IF_ERROR(
918                    "validate_metadata",
919                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
920       break;
921     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
922       // HTTP2 hpack encoding has a maximum limit.
923       break;
924     }
925     l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
926   }
927   if (i != total_count) {
928     for (int j = 0; j < i; j++) {
929       grpc_metadata* md = get_md_elem(metadata, additional_metadata, j, count);
930       grpc_linked_mdelem* l = linked_from_md(md);
931       GRPC_MDELEM_UNREF(l->md);
932     }
933     return 0;
934   }
935   if (prepend_extra_metadata) {
936     if (call->send_extra_metadata_count == 0) {
937       prepend_extra_metadata = 0;
938     } else {
939       for (i = 0; i < call->send_extra_metadata_count; i++) {
940         GRPC_LOG_IF_ERROR("prepare_application_metadata",
941                           batch->LinkTail(&call->send_extra_metadata[i]));
942       }
943     }
944   }
945   for (i = 0; i < total_count; i++) {
946     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
947     grpc_linked_mdelem* l = linked_from_md(md);
948     grpc_error_handle error = batch->LinkTail(l);
949     if (error != GRPC_ERROR_NONE) {
950       GRPC_MDELEM_UNREF(l->md);
951     }
952     GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
953   }
954   call->send_extra_metadata_count = 0;
955 
956   return 1;
957 }
958 
decode_message_compression(grpc_mdelem md)959 static grpc_message_compression_algorithm decode_message_compression(
960     grpc_mdelem md) {
961   grpc_message_compression_algorithm algorithm =
962       grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
963   if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
964     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
965     gpr_log(GPR_ERROR,
966             "Invalid incoming message compression algorithm: '%s'. "
967             "Interpreting incoming data as uncompressed.",
968             md_c_str);
969     gpr_free(md_c_str);
970     return GRPC_MESSAGE_COMPRESS_NONE;
971   }
972   return algorithm;
973 }
974 
decode_stream_compression(grpc_mdelem md)975 static grpc_stream_compression_algorithm decode_stream_compression(
976     grpc_mdelem md) {
977   grpc_stream_compression_algorithm algorithm =
978       grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
979   if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
980     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
981     gpr_log(GPR_ERROR,
982             "Invalid incoming stream compression algorithm: '%s'. Interpreting "
983             "incoming data as uncompressed.",
984             md_c_str);
985     gpr_free(md_c_str);
986     return GRPC_STREAM_COMPRESS_NONE;
987   }
988   return algorithm;
989 }
990 
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)991 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
992                                  int is_trailing) {
993   if (b->non_deadline_count() == 0) return;
994   if (!call->is_client && is_trailing) return;
995   if (is_trailing && call->buffered_metadata[1] == nullptr) return;
996   GPR_TIMER_SCOPE("publish_app_metadata", 0);
997   grpc_metadata_array* dest;
998   grpc_metadata* mdusr;
999   dest = call->buffered_metadata[is_trailing];
1000   if (dest->count + b->non_deadline_count() > dest->capacity) {
1001     dest->capacity = std::max(dest->capacity + b->non_deadline_count(),
1002                               dest->capacity * 3 / 2);
1003     dest->metadata = static_cast<grpc_metadata*>(
1004         gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1005   }
1006   b->ForEach([&](grpc_mdelem md) {
1007     mdusr = &dest->metadata[dest->count++];
1008     /* we pass back borrowed slices that are valid whilst the call is valid */
1009     mdusr->key = GRPC_MDKEY(md);
1010     mdusr->value = GRPC_MDVALUE(md);
1011   });
1012 }
1013 
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1014 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1015   if (b->legacy_index()->named.content_encoding != nullptr) {
1016     GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1017     set_incoming_stream_compression_algorithm(
1018         call, decode_stream_compression(
1019                   b->legacy_index()->named.content_encoding->md));
1020     b->Remove(GRPC_BATCH_CONTENT_ENCODING);
1021   }
1022   if (b->legacy_index()->named.grpc_encoding != nullptr) {
1023     GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1024     set_incoming_message_compression_algorithm(
1025         call,
1026         decode_message_compression(b->legacy_index()->named.grpc_encoding->md));
1027     b->Remove(GRPC_BATCH_GRPC_ENCODING);
1028   }
1029   uint32_t message_encodings_accepted_by_peer = 1u;
1030   uint32_t stream_encodings_accepted_by_peer = 1u;
1031   if (b->legacy_index()->named.grpc_accept_encoding != nullptr) {
1032     GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1033     set_encodings_accepted_by_peer(
1034         call, b->legacy_index()->named.grpc_accept_encoding->md,
1035         &message_encodings_accepted_by_peer, false);
1036     b->Remove(GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1037   }
1038   if (b->legacy_index()->named.accept_encoding != nullptr) {
1039     GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1040     set_encodings_accepted_by_peer(call,
1041                                    b->legacy_index()->named.accept_encoding->md,
1042                                    &stream_encodings_accepted_by_peer, true);
1043     b->Remove(GRPC_BATCH_ACCEPT_ENCODING);
1044   }
1045   call->encodings_accepted_by_peer =
1046       grpc_compression_bitset_from_message_stream_compression_bitset(
1047           message_encodings_accepted_by_peer,
1048           stream_encodings_accepted_by_peer);
1049   publish_app_metadata(call, b, false);
1050 }
1051 
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error_handle batch_error)1052 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1053                                  grpc_error_handle batch_error) {
1054   grpc_call* call = static_cast<grpc_call*>(args);
1055   if (batch_error != GRPC_ERROR_NONE) {
1056     set_final_status(call, batch_error);
1057   } else if (b->legacy_index()->named.grpc_status != nullptr) {
1058     grpc_status_code status_code = grpc_get_status_code_from_metadata(
1059         b->legacy_index()->named.grpc_status->md);
1060     grpc_error_handle error = GRPC_ERROR_NONE;
1061     if (status_code != GRPC_STATUS_OK) {
1062       char* peer = grpc_call_get_peer(call);
1063       error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
1064                                      "Error received from peer ", peer)),
1065                                  GRPC_ERROR_INT_GRPC_STATUS,
1066                                  static_cast<intptr_t>(status_code));
1067       gpr_free(peer);
1068     }
1069     if (b->legacy_index()->named.grpc_message != nullptr) {
1070       error = grpc_error_set_str(
1071           error, GRPC_ERROR_STR_GRPC_MESSAGE,
1072           grpc_core::StringViewFromSlice(
1073               GRPC_MDVALUE(b->legacy_index()->named.grpc_message->md)));
1074       b->Remove(GRPC_BATCH_GRPC_MESSAGE);
1075     } else if (error != GRPC_ERROR_NONE) {
1076       error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, "");
1077     }
1078     set_final_status(call, GRPC_ERROR_REF(error));
1079     b->Remove(GRPC_BATCH_GRPC_STATUS);
1080     GRPC_ERROR_UNREF(error);
1081   } else if (!call->is_client) {
1082     set_final_status(call, GRPC_ERROR_NONE);
1083   } else {
1084     gpr_log(GPR_DEBUG,
1085             "Received trailing metadata with no error and no status");
1086     set_final_status(
1087         call, grpc_error_set_int(
1088                   GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1089                   GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1090   }
1091   publish_app_metadata(call, b, true);
1092 }
1093 
grpc_call_get_arena(grpc_call * call)1094 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1095 
grpc_call_get_call_stack(grpc_call * call)1096 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1097   return CALL_STACK_FROM_CALL(call);
1098 }
1099 
1100 /*******************************************************************************
1101  * BATCH API IMPLEMENTATION
1102  */
1103 
are_write_flags_valid(uint32_t flags)1104 static bool are_write_flags_valid(uint32_t flags) {
1105   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1106   const uint32_t allowed_write_positions =
1107       (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1108   const uint32_t invalid_positions = ~allowed_write_positions;
1109   return !(flags & invalid_positions);
1110 }
1111 
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1112 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1113   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1114   uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1115   if (!is_client) {
1116     invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1117   }
1118   return !(flags & invalid_positions);
1119 }
1120 
batch_slot_for_op(grpc_op_type type)1121 static size_t batch_slot_for_op(grpc_op_type type) {
1122   switch (type) {
1123     case GRPC_OP_SEND_INITIAL_METADATA:
1124       return 0;
1125     case GRPC_OP_SEND_MESSAGE:
1126       return 1;
1127     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1128     case GRPC_OP_SEND_STATUS_FROM_SERVER:
1129       return 2;
1130     case GRPC_OP_RECV_INITIAL_METADATA:
1131       return 3;
1132     case GRPC_OP_RECV_MESSAGE:
1133       return 4;
1134     case GRPC_OP_RECV_CLOSE_ON_SERVER:
1135     case GRPC_OP_RECV_STATUS_ON_CLIENT:
1136       return 5;
1137   }
1138   GPR_UNREACHABLE_CODE(return 123456789);
1139 }
1140 
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1141 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1142                                                       const grpc_op* ops) {
1143   size_t slot_idx = batch_slot_for_op(ops[0].op);
1144   batch_control** pslot = &call->active_batches[slot_idx];
1145   batch_control* bctl;
1146   if (*pslot != nullptr) {
1147     bctl = *pslot;
1148     if (bctl->call != nullptr) {
1149       return nullptr;
1150     }
1151     bctl->~batch_control();
1152     bctl->op = {};
1153     new (&bctl->batch_error) AtomicError();
1154   } else {
1155     bctl = call->arena->New<batch_control>();
1156     *pslot = bctl;
1157   }
1158   bctl->call = call;
1159   bctl->op.payload = &call->stream_op_payload;
1160   return bctl;
1161 }
1162 
finish_batch_completion(void * user_data,grpc_cq_completion *)1163 static void finish_batch_completion(void* user_data,
1164                                     grpc_cq_completion* /*storage*/) {
1165   batch_control* bctl = static_cast<batch_control*>(user_data);
1166   grpc_call* call = bctl->call;
1167   bctl->call = nullptr;
1168   GRPC_CALL_INTERNAL_UNREF(call, "completion");
1169 }
1170 
reset_batch_errors(batch_control * bctl)1171 static void reset_batch_errors(batch_control* bctl) {
1172   bctl->batch_error.set(GRPC_ERROR_NONE);
1173 }
1174 
post_batch_completion(batch_control * bctl)1175 static void post_batch_completion(batch_control* bctl) {
1176   grpc_call* next_child_call;
1177   grpc_call* call = bctl->call;
1178   grpc_error_handle error = GRPC_ERROR_REF(bctl->batch_error.get());
1179 
1180   if (bctl->op.send_initial_metadata) {
1181     call->send_initial_metadata.Clear();
1182   }
1183   if (bctl->op.send_message) {
1184     if (bctl->op.payload->send_message.stream_write_closed &&
1185         error == GRPC_ERROR_NONE) {
1186       error = grpc_error_add_child(
1187           error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1188                      "Attempt to send message after stream was closed."));
1189     }
1190     call->sending_message = false;
1191   }
1192   if (bctl->op.send_trailing_metadata) {
1193     call->send_trailing_metadata.Clear();
1194   }
1195   if (bctl->op.recv_trailing_metadata) {
1196     /* propagate cancellation to any interested children */
1197     gpr_atm_rel_store(&call->received_final_op_atm, 1);
1198     parent_call* pc = get_parent_call(call);
1199     if (pc != nullptr) {
1200       grpc_call* child;
1201       gpr_mu_lock(&pc->child_list_mu);
1202       child = pc->first_child;
1203       if (child != nullptr) {
1204         do {
1205           next_child_call = child->child->sibling_next;
1206           if (child->cancellation_is_inherited) {
1207             GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1208             cancel_with_error(child, GRPC_ERROR_CANCELLED);
1209             GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1210           }
1211           child = next_child_call;
1212         } while (child != pc->first_child);
1213       }
1214       gpr_mu_unlock(&pc->child_list_mu);
1215     }
1216     GRPC_ERROR_UNREF(error);
1217     error = GRPC_ERROR_NONE;
1218   }
1219   if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1220       *call->receiving_buffer != nullptr) {
1221     grpc_byte_buffer_destroy(*call->receiving_buffer);
1222     *call->receiving_buffer = nullptr;
1223   }
1224   reset_batch_errors(bctl);
1225 
1226   if (bctl->completion_data.notify_tag.is_closure) {
1227     /* unrefs error */
1228     bctl->call = nullptr;
1229     grpc_core::Closure::Run(
1230         DEBUG_LOCATION,
1231         static_cast<grpc_closure*>(bctl->completion_data.notify_tag.tag),
1232         error);
1233     GRPC_CALL_INTERNAL_UNREF(call, "completion");
1234   } else {
1235     /* unrefs error */
1236     grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1237                    finish_batch_completion, bctl,
1238                    &bctl->completion_data.cq_completion);
1239   }
1240 }
1241 
finish_batch_step(batch_control * bctl)1242 static void finish_batch_step(batch_control* bctl) {
1243   if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1244     post_batch_completion(bctl);
1245   }
1246 }
1247 
continue_receiving_slices(batch_control * bctl)1248 static void continue_receiving_slices(batch_control* bctl) {
1249   grpc_error_handle error;
1250   grpc_call* call = bctl->call;
1251   for (;;) {
1252     size_t remaining = call->receiving_stream->length() -
1253                        (*call->receiving_buffer)->data.raw.slice_buffer.length;
1254     if (remaining == 0) {
1255       call->receiving_message = false;
1256       call->receiving_stream.reset();
1257       finish_batch_step(bctl);
1258       return;
1259     }
1260     if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1261       error = call->receiving_stream->Pull(&call->receiving_slice);
1262       if (error == GRPC_ERROR_NONE) {
1263         grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1264                               call->receiving_slice);
1265       } else {
1266         call->receiving_stream.reset();
1267         grpc_byte_buffer_destroy(*call->receiving_buffer);
1268         *call->receiving_buffer = nullptr;
1269         call->receiving_message = false;
1270         finish_batch_step(bctl);
1271         GRPC_ERROR_UNREF(error);
1272         return;
1273       }
1274     } else {
1275       return;
1276     }
1277   }
1278 }
1279 
receiving_slice_ready(void * bctlp,grpc_error_handle error)1280 static void receiving_slice_ready(void* bctlp, grpc_error_handle error) {
1281   batch_control* bctl = static_cast<batch_control*>(bctlp);
1282   grpc_call* call = bctl->call;
1283   bool release_error = false;
1284 
1285   if (error == GRPC_ERROR_NONE) {
1286     grpc_slice slice;
1287     error = call->receiving_stream->Pull(&slice);
1288     if (error == GRPC_ERROR_NONE) {
1289       grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1290                             slice);
1291       continue_receiving_slices(bctl);
1292     } else {
1293       /* Error returned by ByteStream::Pull() needs to be released manually */
1294       release_error = true;
1295     }
1296   }
1297 
1298   if (error != GRPC_ERROR_NONE) {
1299     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1300       GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1301     }
1302     call->receiving_stream.reset();
1303     grpc_byte_buffer_destroy(*call->receiving_buffer);
1304     *call->receiving_buffer = nullptr;
1305     call->receiving_message = false;
1306     finish_batch_step(bctl);
1307     if (release_error) {
1308       GRPC_ERROR_UNREF(error);
1309     }
1310   }
1311 }
1312 
process_data_after_md(batch_control * bctl)1313 static void process_data_after_md(batch_control* bctl) {
1314   grpc_call* call = bctl->call;
1315   if (call->receiving_stream == nullptr) {
1316     *call->receiving_buffer = nullptr;
1317     call->receiving_message = false;
1318     finish_batch_step(bctl);
1319   } else {
1320     call->test_only_last_message_flags = call->receiving_stream->flags();
1321     if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1322         (call->incoming_message_compression_algorithm >
1323          GRPC_MESSAGE_COMPRESS_NONE)) {
1324       grpc_compression_algorithm algo;
1325       GPR_ASSERT(
1326           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1327               &algo, call->incoming_message_compression_algorithm,
1328               (grpc_stream_compression_algorithm)0));
1329       *call->receiving_buffer =
1330           grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1331     } else {
1332       *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1333     }
1334     GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1335                       grpc_schedule_on_exec_ctx);
1336     continue_receiving_slices(bctl);
1337   }
1338 }
1339 
receiving_stream_ready(void * bctlp,grpc_error_handle error)1340 static void receiving_stream_ready(void* bctlp, grpc_error_handle error) {
1341   batch_control* bctl = static_cast<batch_control*>(bctlp);
1342   grpc_call* call = bctl->call;
1343   if (error != GRPC_ERROR_NONE) {
1344     call->receiving_stream.reset();
1345     if (bctl->batch_error.ok()) {
1346       bctl->batch_error.set(error);
1347     }
1348     cancel_with_error(call, GRPC_ERROR_REF(error));
1349   }
1350   /* If recv_state is RECV_NONE, we will save the batch_control
1351    * object with rel_cas, and will not use it after the cas. Its corresponding
1352    * acq_load is in receiving_initial_metadata_ready() */
1353   if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1354       !gpr_atm_rel_cas(&call->recv_state, RECV_NONE,
1355                        reinterpret_cast<gpr_atm>(bctlp))) {
1356     process_data_after_md(bctl);
1357   }
1358 }
1359 
1360 // The recv_message_ready callback used when sending a batch containing
1361 // a recv_message op down the filter stack.  Yields the call combiner
1362 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error_handle error)1363 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1364                                                     grpc_error_handle error) {
1365   batch_control* bctl = static_cast<batch_control*>(bctlp);
1366   grpc_call* call = bctl->call;
1367   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1368   receiving_stream_ready(bctlp, error);
1369 }
1370 
1371 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1372 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1373   std::string error_msg = absl::StrFormat(
1374       "Incoming stream has both stream compression (%d) and message "
1375       "compression (%d).",
1376       call->incoming_stream_compression_algorithm,
1377       call->incoming_message_compression_algorithm);
1378   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1379   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1380 }
1381 
1382 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1383 handle_error_parsing_compression_algorithm(grpc_call* call) {
1384   std::string error_msg = absl::StrFormat(
1385       "Error in incoming message compression (%d) or stream "
1386       "compression (%d).",
1387       call->incoming_stream_compression_algorithm,
1388       call->incoming_message_compression_algorithm);
1389   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1390 }
1391 
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1392 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1393     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1394   std::string error_msg = absl::StrFormat(
1395       "Invalid compression algorithm value '%d'.", compression_algorithm);
1396   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1397   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1398 }
1399 
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1400 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1401     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1402   const char* algo_name = nullptr;
1403   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1404   std::string error_msg =
1405       absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1406   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1407   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1408 }
1409 
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1410 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1411     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1412   const char* algo_name = nullptr;
1413   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1414   gpr_log(GPR_ERROR,
1415           "Compression algorithm ('%s') not present in the bitset of "
1416           "accepted encodings ('0x%x')",
1417           algo_name, call->encodings_accepted_by_peer);
1418 }
1419 
validate_filtered_metadata(batch_control * bctl)1420 static void validate_filtered_metadata(batch_control* bctl) {
1421   grpc_compression_algorithm compression_algorithm;
1422   grpc_call* call = bctl->call;
1423   if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1424                        GRPC_STREAM_COMPRESS_NONE &&
1425                    call->incoming_message_compression_algorithm !=
1426                        GRPC_MESSAGE_COMPRESS_NONE)) {
1427     handle_both_stream_and_msg_compression_set(call);
1428   } else if (
1429       GPR_UNLIKELY(
1430           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1431               &compression_algorithm,
1432               call->incoming_message_compression_algorithm,
1433               call->incoming_stream_compression_algorithm) == 0)) {
1434     handle_error_parsing_compression_algorithm(call);
1435   } else {
1436     const grpc_compression_options compression_options =
1437         grpc_channel_compression_options(call->channel);
1438     if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1439       handle_invalid_compression(call, compression_algorithm);
1440     } else if (GPR_UNLIKELY(
1441                    grpc_compression_options_is_algorithm_enabled_internal(
1442                        &compression_options, compression_algorithm) == 0)) {
1443       /* check if algorithm is supported by current channel config */
1444       handle_compression_algorithm_disabled(call, compression_algorithm);
1445     }
1446     /* GRPC_COMPRESS_NONE is always set. */
1447     GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1448     if (GPR_UNLIKELY(!grpc_core::GetBit(call->encodings_accepted_by_peer,
1449                                         compression_algorithm))) {
1450       if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1451         handle_compression_algorithm_not_accepted(call, compression_algorithm);
1452       }
1453     }
1454   }
1455 }
1456 
receiving_initial_metadata_ready(void * bctlp,grpc_error_handle error)1457 static void receiving_initial_metadata_ready(void* bctlp,
1458                                              grpc_error_handle error) {
1459   batch_control* bctl = static_cast<batch_control*>(bctlp);
1460   grpc_call* call = bctl->call;
1461 
1462   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1463 
1464   if (error == GRPC_ERROR_NONE) {
1465     grpc_metadata_batch* md = &call->recv_initial_metadata;
1466     recv_initial_filter(call, md);
1467 
1468     /* TODO(ctiller): this could be moved into recv_initial_filter now */
1469     GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1470     validate_filtered_metadata(bctl);
1471 
1472     absl::optional<grpc_millis> deadline =
1473         md->get(grpc_core::GrpcTimeoutMetadata());
1474     if (deadline.has_value() && !call->is_client) {
1475       call->send_deadline = *deadline;
1476     }
1477   } else {
1478     if (bctl->batch_error.ok()) {
1479       bctl->batch_error.set(error);
1480     }
1481     cancel_with_error(call, GRPC_ERROR_REF(error));
1482   }
1483 
1484   grpc_closure* saved_rsr_closure = nullptr;
1485   while (true) {
1486     gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1487     /* Should only receive initial metadata once */
1488     GPR_ASSERT(rsr_bctlp != 1);
1489     if (rsr_bctlp == 0) {
1490       /* We haven't seen initial metadata and messages before, thus initial
1491        * metadata is received first.
1492        * no_barrier_cas is used, as this function won't access the batch_control
1493        * object saved by receiving_stream_ready() if the initial metadata is
1494        * received first. */
1495       if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1496                                  RECV_INITIAL_METADATA_FIRST)) {
1497         break;
1498       }
1499     } else {
1500       /* Already received messages */
1501       saved_rsr_closure =
1502           GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1503                               grpc_schedule_on_exec_ctx);
1504       /* No need to modify recv_state */
1505       break;
1506     }
1507   }
1508   if (saved_rsr_closure != nullptr) {
1509     grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1510                             GRPC_ERROR_REF(error));
1511   }
1512 
1513   finish_batch_step(bctl);
1514 }
1515 
receiving_trailing_metadata_ready(void * bctlp,grpc_error_handle error)1516 static void receiving_trailing_metadata_ready(void* bctlp,
1517                                               grpc_error_handle error) {
1518   batch_control* bctl = static_cast<batch_control*>(bctlp);
1519   grpc_call* call = bctl->call;
1520   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1521   grpc_metadata_batch* md = &call->recv_trailing_metadata;
1522   recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1523   finish_batch_step(bctl);
1524 }
1525 
finish_batch(void * bctlp,grpc_error_handle error)1526 static void finish_batch(void* bctlp, grpc_error_handle error) {
1527   batch_control* bctl = static_cast<batch_control*>(bctlp);
1528   grpc_call* call = bctl->call;
1529   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1530   if (bctl->batch_error.ok()) {
1531     bctl->batch_error.set(error);
1532   }
1533   if (error != GRPC_ERROR_NONE) {
1534     cancel_with_error(call, GRPC_ERROR_REF(error));
1535   }
1536   finish_batch_step(bctl);
1537 }
1538 
free_no_op_completion(void *,grpc_cq_completion * completion)1539 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1540   gpr_free(completion);
1541 }
1542 
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1543 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1544                                         size_t nops, void* notify_tag,
1545                                         int is_notify_tag_closure) {
1546   GPR_TIMER_SCOPE("call_start_batch", 0);
1547 
1548   size_t i;
1549   const grpc_op* op;
1550   batch_control* bctl;
1551   bool has_send_ops = false;
1552   int num_recv_ops = 0;
1553   grpc_call_error error = GRPC_CALL_OK;
1554   grpc_transport_stream_op_batch* stream_op;
1555   grpc_transport_stream_op_batch_payload* stream_op_payload;
1556 
1557   GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1558 
1559   if (nops == 0) {
1560     if (!is_notify_tag_closure) {
1561       GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1562       grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1563                      free_no_op_completion, nullptr,
1564                      static_cast<grpc_cq_completion*>(
1565                          gpr_malloc(sizeof(grpc_cq_completion))));
1566     } else {
1567       grpc_core::Closure::Run(DEBUG_LOCATION,
1568                               static_cast<grpc_closure*>(notify_tag),
1569                               GRPC_ERROR_NONE);
1570     }
1571     error = GRPC_CALL_OK;
1572     goto done;
1573   }
1574 
1575   bctl = reuse_or_allocate_batch_control(call, ops);
1576   if (bctl == nullptr) {
1577     return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1578   }
1579   bctl->completion_data.notify_tag.tag = notify_tag;
1580   bctl->completion_data.notify_tag.is_closure =
1581       static_cast<uint8_t>(is_notify_tag_closure != 0);
1582 
1583   stream_op = &bctl->op;
1584   stream_op_payload = &call->stream_op_payload;
1585 
1586   /* rewrite batch ops into a transport op */
1587   for (i = 0; i < nops; i++) {
1588     op = &ops[i];
1589     if (op->reserved != nullptr) {
1590       error = GRPC_CALL_ERROR;
1591       goto done_with_error;
1592     }
1593     switch (op->op) {
1594       case GRPC_OP_SEND_INITIAL_METADATA: {
1595         /* Flag validation: currently allow no flags */
1596         if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1597           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1598           goto done_with_error;
1599         }
1600         if (call->sent_initial_metadata) {
1601           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1602           goto done_with_error;
1603         }
1604         // TODO(juanlishen): If the user has already specified a compression
1605         // algorithm by setting the initial metadata with key of
1606         // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1607         // with the compression algorithm mapped from compression level.
1608         /* process compression level */
1609         grpc_metadata& compression_md = call->compression_md;
1610         compression_md.key = grpc_empty_slice();
1611         compression_md.value = grpc_empty_slice();
1612         size_t additional_metadata_count = 0;
1613         grpc_compression_level effective_compression_level =
1614             GRPC_COMPRESS_LEVEL_NONE;
1615         bool level_set = false;
1616         if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1617           effective_compression_level =
1618               op->data.send_initial_metadata.maybe_compression_level.level;
1619           level_set = true;
1620         } else {
1621           const grpc_compression_options copts =
1622               grpc_channel_compression_options(call->channel);
1623           if (copts.default_level.is_set) {
1624             level_set = true;
1625             effective_compression_level = copts.default_level.level;
1626           }
1627         }
1628         // Currently, only server side supports compression level setting.
1629         if (level_set && !call->is_client) {
1630           const grpc_compression_algorithm calgo =
1631               compression_algorithm_for_level_locked(
1632                   call, effective_compression_level);
1633           // The following metadata will be checked and removed by the message
1634           // compression filter. It will be used as the call's compression
1635           // algorithm.
1636           compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1637           compression_md.value = grpc_compression_algorithm_slice(calgo);
1638           additional_metadata_count++;
1639         }
1640         if (op->data.send_initial_metadata.count + additional_metadata_count >
1641             INT_MAX) {
1642           error = GRPC_CALL_ERROR_INVALID_METADATA;
1643           goto done_with_error;
1644         }
1645         stream_op->send_initial_metadata = true;
1646         call->sent_initial_metadata = true;
1647         if (!prepare_application_metadata(
1648                 call, static_cast<int>(op->data.send_initial_metadata.count),
1649                 op->data.send_initial_metadata.metadata, 0, call->is_client,
1650                 &compression_md, static_cast<int>(additional_metadata_count))) {
1651           error = GRPC_CALL_ERROR_INVALID_METADATA;
1652           goto done_with_error;
1653         }
1654         /* TODO(ctiller): just make these the same variable? */
1655         if (call->is_client && call->send_deadline != GRPC_MILLIS_INF_FUTURE) {
1656           call->send_initial_metadata.Set(grpc_core::GrpcTimeoutMetadata(),
1657                                           call->send_deadline);
1658         }
1659         stream_op_payload->send_initial_metadata.send_initial_metadata =
1660             &call->send_initial_metadata;
1661         stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1662             op->flags;
1663         if (call->is_client) {
1664           stream_op_payload->send_initial_metadata.peer_string =
1665               &call->peer_string;
1666         }
1667         has_send_ops = true;
1668         break;
1669       }
1670       case GRPC_OP_SEND_MESSAGE: {
1671         if (!are_write_flags_valid(op->flags)) {
1672           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1673           goto done_with_error;
1674         }
1675         if (op->data.send_message.send_message == nullptr) {
1676           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1677           goto done_with_error;
1678         }
1679         if (call->sending_message) {
1680           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1681           goto done_with_error;
1682         }
1683         uint32_t flags = op->flags;
1684         /* If the outgoing buffer is already compressed, mark it as so in the
1685            flags. These will be picked up by the compression filter and further
1686            (wasteful) attempts at compression skipped. */
1687         if (op->data.send_message.send_message->data.raw.compression >
1688             GRPC_COMPRESS_NONE) {
1689           flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1690         }
1691         stream_op->send_message = true;
1692         call->sending_message = true;
1693         call->sending_stream.Init(
1694             &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1695         stream_op_payload->send_message.send_message.reset(
1696             call->sending_stream.get());
1697         has_send_ops = true;
1698         break;
1699       }
1700       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1701         /* Flag validation: currently allow no flags */
1702         if (op->flags != 0) {
1703           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1704           goto done_with_error;
1705         }
1706         if (!call->is_client) {
1707           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1708           goto done_with_error;
1709         }
1710         if (call->sent_final_op) {
1711           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1712           goto done_with_error;
1713         }
1714         stream_op->send_trailing_metadata = true;
1715         call->sent_final_op = true;
1716         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1717             &call->send_trailing_metadata;
1718         has_send_ops = true;
1719         break;
1720       }
1721       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1722         /* Flag validation: currently allow no flags */
1723         if (op->flags != 0) {
1724           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1725           goto done_with_error;
1726         }
1727         if (call->is_client) {
1728           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1729           goto done_with_error;
1730         }
1731         if (call->sent_final_op) {
1732           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1733           goto done_with_error;
1734         }
1735         if (op->data.send_status_from_server.trailing_metadata_count >
1736             INT_MAX) {
1737           error = GRPC_CALL_ERROR_INVALID_METADATA;
1738           goto done_with_error;
1739         }
1740         stream_op->send_trailing_metadata = true;
1741         call->sent_final_op = true;
1742         GPR_ASSERT(call->send_extra_metadata_count == 0);
1743         call->send_extra_metadata_count = 1;
1744         call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1745             op->data.send_status_from_server.status);
1746         grpc_error_handle status_error =
1747             op->data.send_status_from_server.status == GRPC_STATUS_OK
1748                 ? GRPC_ERROR_NONE
1749                 : grpc_error_set_int(
1750                       GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1751                           "Server returned error"),
1752                       GRPC_ERROR_INT_GRPC_STATUS,
1753                       static_cast<intptr_t>(
1754                           op->data.send_status_from_server.status));
1755         if (op->data.send_status_from_server.status_details != nullptr) {
1756           call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1757               GRPC_MDSTR_GRPC_MESSAGE,
1758               grpc_slice_copy(
1759                   *op->data.send_status_from_server.status_details));
1760           call->send_extra_metadata_count++;
1761           if (status_error != GRPC_ERROR_NONE) {
1762             char* msg = grpc_slice_to_c_string(
1763                 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1764             status_error = grpc_error_set_str(status_error,
1765                                               GRPC_ERROR_STR_GRPC_MESSAGE, msg);
1766             gpr_free(msg);
1767           }
1768         }
1769 
1770         call->status_error.set(status_error);
1771         GRPC_ERROR_UNREF(status_error);
1772 
1773         if (!prepare_application_metadata(
1774                 call,
1775                 static_cast<int>(
1776                     op->data.send_status_from_server.trailing_metadata_count),
1777                 op->data.send_status_from_server.trailing_metadata, 1, 1,
1778                 nullptr, 0)) {
1779           for (int n = 0; n < call->send_extra_metadata_count; n++) {
1780             GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1781           }
1782           call->send_extra_metadata_count = 0;
1783           error = GRPC_CALL_ERROR_INVALID_METADATA;
1784           goto done_with_error;
1785         }
1786         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1787             &call->send_trailing_metadata;
1788         stream_op_payload->send_trailing_metadata.sent =
1789             &call->sent_server_trailing_metadata;
1790         has_send_ops = true;
1791         break;
1792       }
1793       case GRPC_OP_RECV_INITIAL_METADATA: {
1794         /* Flag validation: currently allow no flags */
1795         if (op->flags != 0) {
1796           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1797           goto done_with_error;
1798         }
1799         if (call->received_initial_metadata) {
1800           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1801           goto done_with_error;
1802         }
1803         call->received_initial_metadata = true;
1804         call->buffered_metadata[0] =
1805             op->data.recv_initial_metadata.recv_initial_metadata;
1806         GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1807                           receiving_initial_metadata_ready, bctl,
1808                           grpc_schedule_on_exec_ctx);
1809         stream_op->recv_initial_metadata = true;
1810         stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1811             &call->recv_initial_metadata;
1812         stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1813             &call->receiving_initial_metadata_ready;
1814         if (call->is_client) {
1815           stream_op_payload->recv_initial_metadata.trailing_metadata_available =
1816               &call->is_trailers_only;
1817         } else {
1818           stream_op_payload->recv_initial_metadata.peer_string =
1819               &call->peer_string;
1820         }
1821         ++num_recv_ops;
1822         break;
1823       }
1824       case GRPC_OP_RECV_MESSAGE: {
1825         /* Flag validation: currently allow no flags */
1826         if (op->flags != 0) {
1827           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1828           goto done_with_error;
1829         }
1830         if (call->receiving_message) {
1831           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1832           goto done_with_error;
1833         }
1834         call->receiving_message = true;
1835         stream_op->recv_message = true;
1836         call->receiving_buffer = op->data.recv_message.recv_message;
1837         stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1838         stream_op_payload->recv_message.call_failed_before_recv_message =
1839             &call->call_failed_before_recv_message;
1840         GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1841                           receiving_stream_ready_in_call_combiner, bctl,
1842                           grpc_schedule_on_exec_ctx);
1843         stream_op_payload->recv_message.recv_message_ready =
1844             &call->receiving_stream_ready;
1845         ++num_recv_ops;
1846         break;
1847       }
1848       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1849         /* Flag validation: currently allow no flags */
1850         if (op->flags != 0) {
1851           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1852           goto done_with_error;
1853         }
1854         if (!call->is_client) {
1855           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1856           goto done_with_error;
1857         }
1858         if (call->requested_final_op) {
1859           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1860           goto done_with_error;
1861         }
1862         call->requested_final_op = true;
1863         call->buffered_metadata[1] =
1864             op->data.recv_status_on_client.trailing_metadata;
1865         call->final_op.client.status = op->data.recv_status_on_client.status;
1866         call->final_op.client.status_details =
1867             op->data.recv_status_on_client.status_details;
1868         call->final_op.client.error_string =
1869             op->data.recv_status_on_client.error_string;
1870         stream_op->recv_trailing_metadata = true;
1871         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1872             &call->recv_trailing_metadata;
1873         stream_op_payload->recv_trailing_metadata.collect_stats =
1874             &call->final_info.stats.transport_stream_stats;
1875         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1876                           receiving_trailing_metadata_ready, bctl,
1877                           grpc_schedule_on_exec_ctx);
1878         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1879             &call->receiving_trailing_metadata_ready;
1880         ++num_recv_ops;
1881         break;
1882       }
1883       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1884         /* Flag validation: currently allow no flags */
1885         if (op->flags != 0) {
1886           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1887           goto done_with_error;
1888         }
1889         if (call->is_client) {
1890           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1891           goto done_with_error;
1892         }
1893         if (call->requested_final_op) {
1894           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1895           goto done_with_error;
1896         }
1897         call->requested_final_op = true;
1898         call->final_op.server.cancelled =
1899             op->data.recv_close_on_server.cancelled;
1900         stream_op->recv_trailing_metadata = true;
1901         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1902             &call->recv_trailing_metadata;
1903         stream_op_payload->recv_trailing_metadata.collect_stats =
1904             &call->final_info.stats.transport_stream_stats;
1905         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1906                           receiving_trailing_metadata_ready, bctl,
1907                           grpc_schedule_on_exec_ctx);
1908         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1909             &call->receiving_trailing_metadata_ready;
1910         ++num_recv_ops;
1911         break;
1912       }
1913     }
1914   }
1915 
1916   GRPC_CALL_INTERNAL_REF(call, "completion");
1917   if (!is_notify_tag_closure) {
1918     GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1919   }
1920   bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1921 
1922   if (has_send_ops) {
1923     GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1924                       grpc_schedule_on_exec_ctx);
1925     stream_op->on_complete = &bctl->finish_batch;
1926   }
1927 
1928   gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1929   execute_batch(call, stream_op, &bctl->start_batch);
1930 
1931 done:
1932   return error;
1933 
1934 done_with_error:
1935   /* reverse any mutations that occurred */
1936   if (stream_op->send_initial_metadata) {
1937     call->sent_initial_metadata = false;
1938     call->send_initial_metadata.Clear();
1939   }
1940   if (stream_op->send_message) {
1941     call->sending_message = false;
1942     call->sending_stream->Orphan();
1943     stream_op_payload->send_message.send_message.reset();
1944   }
1945   if (stream_op->send_trailing_metadata) {
1946     call->sent_final_op = false;
1947     call->send_trailing_metadata.Clear();
1948   }
1949   if (stream_op->recv_initial_metadata) {
1950     call->received_initial_metadata = false;
1951   }
1952   if (stream_op->recv_message) {
1953     call->receiving_message = false;
1954   }
1955   if (stream_op->recv_trailing_metadata) {
1956     call->requested_final_op = false;
1957   }
1958   goto done;
1959 }
1960 
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1961 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1962                                       size_t nops, void* tag, void* reserved) {
1963   grpc_call_error err;
1964 
1965   GRPC_API_TRACE(
1966       "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1967       "reserved=%p)",
1968       5, (call, ops, (unsigned long)nops, tag, reserved));
1969 
1970   if (reserved != nullptr) {
1971     err = GRPC_CALL_ERROR;
1972   } else {
1973     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1974     grpc_core::ExecCtx exec_ctx;
1975     err = call_start_batch(call, ops, nops, tag, 0);
1976   }
1977 
1978   return err;
1979 }
1980 
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1981 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1982                                                   const grpc_op* ops,
1983                                                   size_t nops,
1984                                                   grpc_closure* closure) {
1985   return call_start_batch(call, ops, nops, closure, 1);
1986 }
1987 
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1988 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1989                            void* value, void (*destroy)(void* value)) {
1990   if (call->context[elem].destroy) {
1991     call->context[elem].destroy(call->context[elem].value);
1992   }
1993   call->context[elem].value = value;
1994   call->context[elem].destroy = destroy;
1995 }
1996 
grpc_call_context_get(grpc_call * call,grpc_context_index elem)1997 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1998   return call->context[elem].value;
1999 }
2000 
grpc_call_is_client(grpc_call * call)2001 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2002 
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2003 grpc_compression_algorithm grpc_call_compression_for_level(
2004     grpc_call* call, grpc_compression_level level) {
2005   grpc_compression_algorithm algo =
2006       compression_algorithm_for_level_locked(call, level);
2007   return algo;
2008 }
2009 
grpc_call_is_trailers_only(const grpc_call * call)2010 bool grpc_call_is_trailers_only(const grpc_call* call) {
2011   bool result = call->is_trailers_only;
2012   GPR_DEBUG_ASSERT(!result || call->recv_initial_metadata.empty());
2013   return result;
2014 }
2015 
grpc_call_failed_before_recv_message(const grpc_call * c)2016 int grpc_call_failed_before_recv_message(const grpc_call* c) {
2017   return c->call_failed_before_recv_message;
2018 }
2019 
grpc_call_error_to_string(grpc_call_error error)2020 const char* grpc_call_error_to_string(grpc_call_error error) {
2021   switch (error) {
2022     case GRPC_CALL_ERROR:
2023       return "GRPC_CALL_ERROR";
2024     case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2025       return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2026     case GRPC_CALL_ERROR_ALREADY_FINISHED:
2027       return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2028     case GRPC_CALL_ERROR_ALREADY_INVOKED:
2029       return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2030     case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2031       return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2032     case GRPC_CALL_ERROR_INVALID_FLAGS:
2033       return "GRPC_CALL_ERROR_INVALID_FLAGS";
2034     case GRPC_CALL_ERROR_INVALID_MESSAGE:
2035       return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2036     case GRPC_CALL_ERROR_INVALID_METADATA:
2037       return "GRPC_CALL_ERROR_INVALID_METADATA";
2038     case GRPC_CALL_ERROR_NOT_INVOKED:
2039       return "GRPC_CALL_ERROR_NOT_INVOKED";
2040     case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2041       return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2042     case GRPC_CALL_ERROR_NOT_ON_SERVER:
2043       return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2044     case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2045       return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2046     case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2047       return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2048     case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2049       return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2050     case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2051       return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2052     case GRPC_CALL_OK:
2053       return "GRPC_CALL_OK";
2054   }
2055   GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2056 }
2057