1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <string>
28
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/compression.h>
33 #include <grpc/grpc.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/compression/algorithm_metadata.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/alloc.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gpr/time_precise.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/arena.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/iomgr/timer.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/slice/slice_utils.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/call_test_only.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/server.h"
59 #include "src/core/lib/surface/validate_metadata.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64 #include "src/core/lib/transport/transport.h"
65
66 /** The maximum number of concurrent batches possible.
67 Based upon the maximum number of individually queueable ops in the batch
68 api:
69 - initial metadata send
70 - message send
71 - status/close send (depending on client/server)
72 - initial metadata recv
73 - message recv
74 - status/close recv (depending on client/server) */
75 #define MAX_CONCURRENT_BATCHES 6
76
77 #define MAX_SEND_EXTRA_METADATA_COUNT 3
78
79 // Used to create arena for the first call.
80 #define ESTIMATED_MDELEM_COUNT 16
81
82 struct batch_control {
83 batch_control() = default;
84
85 grpc_call* call = nullptr;
86 grpc_transport_stream_op_batch op;
87 /* Share memory for cq_completion and notify_tag as they are never needed
88 simultaneously. Each byte used in this data structure count as six bytes
89 per call, so any savings we can make are worthwhile,
90
91 We use notify_tag to determine whether or not to send notification to the
92 completion queue. Once we've made that determination, we can reuse the
93 memory for cq_completion. */
94 union {
95 grpc_cq_completion cq_completion;
96 struct {
97 /* Any given op indicates completion by either (a) calling a closure or
98 (b) sending a notification on the call's completion queue. If
99 \a is_closure is true, \a tag indicates a closure to be invoked;
100 otherwise, \a tag indicates the tag to be used in the notification to
101 be sent to the completion queue. */
102 void* tag;
103 bool is_closure;
104 } notify_tag;
105 } completion_data;
106 grpc_closure start_batch;
107 grpc_closure finish_batch;
108 grpc_core::Atomic<intptr_t> steps_to_complete;
109 gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
set_num_steps_to_completebatch_control110 void set_num_steps_to_complete(uintptr_t steps) {
111 steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
112 }
completed_batch_stepbatch_control113 bool completed_batch_step() {
114 return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
115 }
116 };
117
118 struct parent_call {
parent_callparent_call119 parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call120 ~parent_call() { gpr_mu_destroy(&child_list_mu); }
121
122 gpr_mu child_list_mu;
123 grpc_call* first_child = nullptr;
124 };
125
126 struct child_call {
child_callchild_call127 child_call(grpc_call* parent) : parent(parent) {}
128 grpc_call* parent;
129 /** siblings: children of the same parent form a list, and this list is
130 protected under
131 parent->mu */
132 grpc_call* sibling_next = nullptr;
133 grpc_call* sibling_prev = nullptr;
134 };
135
136 #define RECV_NONE ((gpr_atm)0)
137 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
138
139 struct grpc_call {
grpc_callgrpc_call140 grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
141 : arena(arena),
142 cq(args.cq),
143 channel(args.channel),
144 is_client(args.server_transport_data == nullptr),
145 stream_op_payload(context) {
146 for (int i = 0; i < 2; i++) {
147 for (int j = 0; j < 2; j++) {
148 metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
149 }
150 }
151 }
152
~grpc_callgrpc_call153 ~grpc_call() {
154 gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
155 }
156
157 grpc_core::RefCount ext_ref;
158 grpc_core::Arena* arena;
159 grpc_core::CallCombiner call_combiner;
160 grpc_completion_queue* cq;
161 grpc_polling_entity pollent;
162 grpc_channel* channel;
163 gpr_cycle_counter start_time = gpr_get_cycle_counter();
164 /* parent_call* */ gpr_atm parent_call_atm = 0;
165 child_call* child = nullptr;
166
167 /* client or server call */
168 bool is_client;
169 /** has grpc_call_unref been called */
170 bool destroy_called = false;
171 /** flag indicating that cancellation is inherited */
172 bool cancellation_is_inherited = false;
173 /** which ops are in-flight */
174 bool sent_initial_metadata = false;
175 bool sending_message = false;
176 bool sent_final_op = false;
177 bool received_initial_metadata = false;
178 bool receiving_message = false;
179 bool requested_final_op = false;
180 gpr_atm any_ops_sent_atm = 0;
181 gpr_atm received_final_op_atm = 0;
182
183 batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
184 grpc_transport_stream_op_batch_payload stream_op_payload;
185
186 /* first idx: is_receiving, second idx: is_trailing */
187 grpc_metadata_batch metadata_batch[2][2] = {};
188
189 /* Buffered read metadata waiting to be returned to the application.
190 Element 0 is initial metadata, element 1 is trailing metadata. */
191 grpc_metadata_array* buffered_metadata[2] = {};
192
193 grpc_metadata compression_md;
194
195 // A char* indicating the peer name.
196 gpr_atm peer_string = 0;
197
198 /* Call data useful used for reporting. Only valid after the call has
199 * completed */
200 grpc_call_final_info final_info;
201
202 /* Compression algorithm for *incoming* data */
203 grpc_message_compression_algorithm incoming_message_compression_algorithm =
204 GRPC_MESSAGE_COMPRESS_NONE;
205 /* Stream compression algorithm for *incoming* data */
206 grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
207 GRPC_STREAM_COMPRESS_NONE;
208 /* Supported encodings (compression algorithms), a bitset.
209 * Always support no compression. */
210 uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
211 /* Supported stream encodings (stream compression algorithms), a bitset */
212 uint32_t stream_encodings_accepted_by_peer = 0;
213
214 /* Contexts for various subsystems (security, tracing, ...). */
215 grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
216
217 /* for the client, extra metadata is initial metadata; for the
218 server, it's trailing metadata */
219 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
220 int send_extra_metadata_count;
221 grpc_millis send_deadline;
222
223 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
224
225 grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
226 grpc_byte_buffer** receiving_buffer = nullptr;
227 grpc_slice receiving_slice = grpc_empty_slice();
228 grpc_closure receiving_slice_ready;
229 grpc_closure receiving_stream_ready;
230 grpc_closure receiving_initial_metadata_ready;
231 grpc_closure receiving_trailing_metadata_ready;
232 uint32_t test_only_last_message_flags = 0;
233 // Status about operation of call
234 bool sent_server_trailing_metadata = false;
235 gpr_atm cancelled_with_error = 0;
236
237 grpc_closure release_call;
238
239 union {
240 struct {
241 grpc_status_code* status;
242 grpc_slice* status_details;
243 const char** error_string;
244 } client;
245 struct {
246 int* cancelled;
247 // backpointer to owning server if this is a server side call.
248 grpc_core::Server* core_server;
249 } server;
250 } final_op;
251 gpr_atm status_error = 0;
252
253 /* recv_state can contain one of the following values:
254 RECV_NONE : : no initial metadata and messages received
255 RECV_INITIAL_METADATA_FIRST : received initial metadata first
256 a batch_control* : received messages first
257
258 +------1------RECV_NONE------3-----+
259 | |
260 | |
261 v v
262 RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
263 | ^ | ^
264 | | | |
265 +-----2-----+ +-----4-----+
266
267 For 1, 4: See receiving_initial_metadata_ready() function
268 For 2, 3: See receiving_stream_ready() function */
269 gpr_atm recv_state = 0;
270 };
271
272 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
273 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
274
275 #define CALL_STACK_FROM_CALL(call) \
276 (grpc_call_stack*)((char*)(call) + \
277 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
278 #define CALL_FROM_CALL_STACK(call_stack) \
279 (grpc_call*)(((char*)(call_stack)) - \
280 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
281
282 #define CALL_ELEM_FROM_CALL(call, idx) \
283 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
284 #define CALL_FROM_TOP_ELEM(top_elem) \
285 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
286
287 static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
288 grpc_closure* start_batch_closure);
289
290 static void cancel_with_status(grpc_call* c, grpc_status_code status,
291 const char* description);
292 static void cancel_with_error(grpc_call* c, grpc_error* error);
293 static void destroy_call(void* call_stack, grpc_error* error);
294 static void receiving_slice_ready(void* bctlp, grpc_error* error);
295 static void set_final_status(grpc_call* call, grpc_error* error);
296 static void process_data_after_md(batch_control* bctl);
297 static void post_batch_completion(batch_control* bctl);
298
add_init_error(grpc_error ** composite,grpc_error * new_err)299 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
300 if (new_err == GRPC_ERROR_NONE) return;
301 if (*composite == GRPC_ERROR_NONE) {
302 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
303 }
304 *composite = grpc_error_add_child(*composite, new_err);
305 }
306
grpc_call_arena_alloc(grpc_call * call,size_t size)307 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
308 return call->arena->Alloc(size);
309 }
310
get_or_create_parent_call(grpc_call * call)311 static parent_call* get_or_create_parent_call(grpc_call* call) {
312 parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
313 if (p == nullptr) {
314 p = call->arena->New<parent_call>();
315 if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
316 (gpr_atm)p)) {
317 p->~parent_call();
318 p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
319 }
320 }
321 return p;
322 }
323
get_parent_call(grpc_call * call)324 static parent_call* get_parent_call(grpc_call* call) {
325 return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
326 }
327
grpc_call_get_initial_size_estimate()328 size_t grpc_call_get_initial_size_estimate() {
329 return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
330 sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
331 }
332
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)333 grpc_error* grpc_call_create(const grpc_call_create_args* args,
334 grpc_call** out_call) {
335 GPR_TIMER_SCOPE("grpc_call_create", 0);
336
337 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
338
339 grpc_core::Arena* arena;
340 grpc_call* call;
341 grpc_error* error = GRPC_ERROR_NONE;
342 grpc_channel_stack* channel_stack =
343 grpc_channel_get_channel_stack(args->channel);
344 size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
345 GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
346 size_t call_and_stack_size =
347 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
348 channel_stack->call_stack_size;
349 size_t call_alloc_size =
350 call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
351
352 std::pair<grpc_core::Arena*, void*> arena_with_call =
353 grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
354 arena = arena_with_call.first;
355 call = new (arena_with_call.second) grpc_call(arena, *args);
356 *out_call = call;
357 grpc_slice path = grpc_empty_slice();
358 if (call->is_client) {
359 call->final_op.client.status_details = nullptr;
360 call->final_op.client.status = nullptr;
361 call->final_op.client.error_string = nullptr;
362 GRPC_STATS_INC_CLIENT_CALLS_CREATED();
363 GPR_ASSERT(args->add_initial_metadata_count <
364 MAX_SEND_EXTRA_METADATA_COUNT);
365 for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
366 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
367 if (grpc_slice_eq_static_interned(
368 GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
369 path = grpc_slice_ref_internal(
370 GRPC_MDVALUE(args->add_initial_metadata[i]));
371 }
372 }
373 call->send_extra_metadata_count =
374 static_cast<int>(args->add_initial_metadata_count);
375 } else {
376 GRPC_STATS_INC_SERVER_CALLS_CREATED();
377 call->final_op.server.cancelled = nullptr;
378 call->final_op.server.core_server = args->server;
379 GPR_ASSERT(args->add_initial_metadata_count == 0);
380 call->send_extra_metadata_count = 0;
381 }
382
383 grpc_millis send_deadline = args->send_deadline;
384 bool immediately_cancel = false;
385
386 if (args->parent != nullptr) {
387 call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
388 call_and_stack_size) child_call(args->parent);
389
390 GRPC_CALL_INTERNAL_REF(args->parent, "child");
391 GPR_ASSERT(call->is_client);
392 GPR_ASSERT(!args->parent->is_client);
393
394 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
395 send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
396 }
397 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
398 * GRPC_PROPAGATE_STATS_CONTEXT */
399 /* TODO(ctiller): This should change to use the appropriate census start_op
400 * call. */
401 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
402 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
403 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
404 "Census tracing propagation requested "
405 "without Census context propagation"));
406 }
407 grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
408 args->parent->context[GRPC_CONTEXT_TRACING].value,
409 nullptr);
410 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
411 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
412 "Census context propagation requested "
413 "without Census tracing propagation"));
414 }
415 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
416 call->cancellation_is_inherited = true;
417 if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
418 immediately_cancel = true;
419 }
420 }
421 }
422 call->send_deadline = send_deadline;
423 /* initial refcount dropped by grpc_call_unref */
424 grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
425 args->server_transport_data,
426 call->context,
427 path,
428 call->start_time,
429 send_deadline,
430 call->arena,
431 &call->call_combiner};
432 add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
433 call, &call_args));
434 // Publish this call to parent only after the call stack has been initialized.
435 if (args->parent != nullptr) {
436 child_call* cc = call->child;
437 parent_call* pc = get_or_create_parent_call(args->parent);
438 gpr_mu_lock(&pc->child_list_mu);
439 if (pc->first_child == nullptr) {
440 pc->first_child = call;
441 cc->sibling_next = cc->sibling_prev = call;
442 } else {
443 cc->sibling_next = pc->first_child;
444 cc->sibling_prev = pc->first_child->child->sibling_prev;
445 cc->sibling_next->child->sibling_prev =
446 cc->sibling_prev->child->sibling_next = call;
447 }
448 gpr_mu_unlock(&pc->child_list_mu);
449 }
450
451 if (error != GRPC_ERROR_NONE) {
452 cancel_with_error(call, GRPC_ERROR_REF(error));
453 }
454 if (immediately_cancel) {
455 cancel_with_error(call, GRPC_ERROR_CANCELLED);
456 }
457 if (args->cq != nullptr) {
458 GPR_ASSERT(args->pollset_set_alternative == nullptr &&
459 "Only one of 'cq' and 'pollset_set_alternative' should be "
460 "non-nullptr.");
461 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
462 call->pollent =
463 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
464 }
465 if (args->pollset_set_alternative != nullptr) {
466 call->pollent = grpc_polling_entity_create_from_pollset_set(
467 args->pollset_set_alternative);
468 }
469 if (!grpc_polling_entity_is_empty(&call->pollent)) {
470 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
471 &call->pollent);
472 }
473
474 if (call->is_client) {
475 grpc_core::channelz::ChannelNode* channelz_channel =
476 grpc_channel_get_channelz_node(call->channel);
477 if (channelz_channel != nullptr) {
478 channelz_channel->RecordCallStarted();
479 }
480 } else if (call->final_op.server.core_server != nullptr) {
481 grpc_core::channelz::ServerNode* channelz_node =
482 call->final_op.server.core_server->channelz_node();
483 if (channelz_node != nullptr) {
484 channelz_node->RecordCallStarted();
485 }
486 }
487
488 grpc_slice_unref_internal(path);
489
490 return error;
491 }
492
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)493 void grpc_call_set_completion_queue(grpc_call* call,
494 grpc_completion_queue* cq) {
495 GPR_ASSERT(cq);
496
497 if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
498 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
499 abort();
500 }
501 call->cq = cq;
502 GRPC_CQ_INTERNAL_REF(cq, "bind");
503 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
504 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
505 &call->pollent);
506 }
507
508 #ifndef NDEBUG
509 #define REF_REASON reason
510 #define REF_ARG , const char* reason
511 #else
512 #define REF_REASON ""
513 #define REF_ARG
514 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)515 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
516 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
517 }
grpc_call_internal_unref(grpc_call * c REF_ARG)518 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
519 GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
520 }
521
release_call(void * call,grpc_error *)522 static void release_call(void* call, grpc_error* /*error*/) {
523 grpc_call* c = static_cast<grpc_call*>(call);
524 grpc_channel* channel = c->channel;
525 grpc_core::Arena* arena = c->arena;
526 c->~grpc_call();
527 grpc_channel_update_call_size_estimate(channel, arena->Destroy());
528 GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
529 }
530
destroy_call(void * call,grpc_error *)531 static void destroy_call(void* call, grpc_error* /*error*/) {
532 GPR_TIMER_SCOPE("destroy_call", 0);
533 size_t i;
534 int ii;
535 grpc_call* c = static_cast<grpc_call*>(call);
536 for (i = 0; i < 2; i++) {
537 grpc_metadata_batch_destroy(
538 &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
539 }
540 c->receiving_stream.reset();
541 parent_call* pc = get_parent_call(c);
542 if (pc != nullptr) {
543 pc->~parent_call();
544 }
545 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
546 GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
547 }
548 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
549 if (c->context[i].destroy) {
550 c->context[i].destroy(c->context[i].value);
551 }
552 }
553 if (c->cq) {
554 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
555 }
556
557 grpc_error* status_error =
558 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
559 grpc_error_get_status(status_error, c->send_deadline,
560 &c->final_info.final_status, nullptr, nullptr,
561 &(c->final_info.error_string));
562 GRPC_ERROR_UNREF(status_error);
563 c->final_info.stats.latency =
564 gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
565 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
566 GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
567 grpc_schedule_on_exec_ctx));
568 }
569
grpc_call_ref(grpc_call * c)570 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
571
grpc_call_unref(grpc_call * c)572 void grpc_call_unref(grpc_call* c) {
573 if (GPR_LIKELY(!c->ext_ref.Unref())) return;
574
575 GPR_TIMER_SCOPE("grpc_call_unref", 0);
576
577 child_call* cc = c->child;
578 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
579 grpc_core::ExecCtx exec_ctx;
580
581 GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
582
583 if (cc) {
584 parent_call* pc = get_parent_call(cc->parent);
585 gpr_mu_lock(&pc->child_list_mu);
586 if (c == pc->first_child) {
587 pc->first_child = cc->sibling_next;
588 if (c == pc->first_child) {
589 pc->first_child = nullptr;
590 }
591 }
592 cc->sibling_prev->child->sibling_next = cc->sibling_next;
593 cc->sibling_next->child->sibling_prev = cc->sibling_prev;
594 gpr_mu_unlock(&pc->child_list_mu);
595 GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
596 }
597
598 GPR_ASSERT(!c->destroy_called);
599 c->destroy_called = true;
600 bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
601 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
602 if (cancel) {
603 cancel_with_error(c, GRPC_ERROR_CANCELLED);
604 } else {
605 // Unset the call combiner cancellation closure. This has the
606 // effect of scheduling the previously set cancellation closure, if
607 // any, so that it can release any internal references it may be
608 // holding to the call stack. Also flush the closures on exec_ctx so that
609 // filters that schedule cancel notification closures on exec_ctx do not
610 // need to take a ref of the call stack to guarantee closure liveness.
611 c->call_combiner.SetNotifyOnCancel(nullptr);
612 grpc_core::ExecCtx::Get()->Flush();
613 }
614 GRPC_CALL_INTERNAL_UNREF(c, "destroy");
615 }
616
grpc_call_cancel(grpc_call * call,void * reserved)617 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
618 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
619 GPR_ASSERT(!reserved);
620 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
621 grpc_core::ExecCtx exec_ctx;
622 cancel_with_error(call, GRPC_ERROR_CANCELLED);
623 return GRPC_CALL_OK;
624 }
625
626 // This is called via the call combiner to start sending a batch down
627 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error *)628 static void execute_batch_in_call_combiner(void* arg, grpc_error* /*ignored*/) {
629 GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
630 grpc_transport_stream_op_batch* batch =
631 static_cast<grpc_transport_stream_op_batch*>(arg);
632 grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
633 grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
634 GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
635 elem->filter->start_transport_stream_op_batch(elem, batch);
636 }
637
638 // start_batch_closure points to a caller-allocated closure to be used
639 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)640 static void execute_batch(grpc_call* call,
641 grpc_transport_stream_op_batch* batch,
642 grpc_closure* start_batch_closure) {
643 batch->handler_private.extra_arg = call;
644 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
645 grpc_schedule_on_exec_ctx);
646 GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
647 GRPC_ERROR_NONE, "executing batch");
648 }
649
grpc_call_get_peer(grpc_call * call)650 char* grpc_call_get_peer(grpc_call* call) {
651 char* peer_string = (char*)gpr_atm_acq_load(&call->peer_string);
652 if (peer_string != nullptr) return gpr_strdup(peer_string);
653 peer_string = grpc_channel_get_target(call->channel);
654 if (peer_string != nullptr) return peer_string;
655 return gpr_strdup("unknown");
656 }
657
grpc_call_from_top_element(grpc_call_element * elem)658 grpc_call* grpc_call_from_top_element(grpc_call_element* elem) {
659 return CALL_FROM_TOP_ELEM(elem);
660 }
661
662 /*******************************************************************************
663 * CANCELLATION
664 */
665
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)666 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
667 grpc_status_code status,
668 const char* description,
669 void* reserved) {
670 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
671 grpc_core::ExecCtx exec_ctx;
672 GRPC_API_TRACE(
673 "grpc_call_cancel_with_status("
674 "c=%p, status=%d, description=%s, reserved=%p)",
675 4, (c, (int)status, description, reserved));
676 GPR_ASSERT(reserved == nullptr);
677 cancel_with_status(c, status, description);
678 return GRPC_CALL_OK;
679 }
680
681 struct cancel_state {
682 grpc_call* call;
683 grpc_closure start_batch;
684 grpc_closure finish_batch;
685 };
686 // The on_complete callback used when sending a cancel_stream batch down
687 // the filter stack. Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error *)688 static void done_termination(void* arg, grpc_error* /*error*/) {
689 cancel_state* state = static_cast<cancel_state*>(arg);
690 GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
691 "on_complete for cancel_stream op");
692 GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
693 gpr_free(state);
694 }
695
cancel_with_error(grpc_call * c,grpc_error * error)696 static void cancel_with_error(grpc_call* c, grpc_error* error) {
697 if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
698 GRPC_ERROR_UNREF(error);
699 return;
700 }
701 GRPC_CALL_INTERNAL_REF(c, "termination");
702 // Inform the call combiner of the cancellation, so that it can cancel
703 // any in-flight asynchronous actions that may be holding the call
704 // combiner. This ensures that the cancel_stream batch can be sent
705 // down the filter stack in a timely manner.
706 c->call_combiner.Cancel(GRPC_ERROR_REF(error));
707 cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
708 state->call = c;
709 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
710 grpc_schedule_on_exec_ctx);
711 grpc_transport_stream_op_batch* op =
712 grpc_make_transport_stream_op(&state->finish_batch);
713 op->cancel_stream = true;
714 op->payload->cancel_stream.cancel_error = error;
715 execute_batch(c, op, &state->start_batch);
716 }
717
grpc_call_cancel_internal(grpc_call * call)718 void grpc_call_cancel_internal(grpc_call* call) {
719 cancel_with_error(call, GRPC_ERROR_CANCELLED);
720 }
721
error_from_status(grpc_status_code status,const char * description)722 static grpc_error* error_from_status(grpc_status_code status,
723 const char* description) {
724 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
725 // guarantee that can be short-lived.
726 return grpc_error_set_int(
727 grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
728 GRPC_ERROR_STR_GRPC_MESSAGE,
729 grpc_slice_from_copied_string(description)),
730 GRPC_ERROR_INT_GRPC_STATUS, status);
731 }
732
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)733 static void cancel_with_status(grpc_call* c, grpc_status_code status,
734 const char* description) {
735 cancel_with_error(c, error_from_status(status, description));
736 }
737
set_final_status(grpc_call * call,grpc_error * error)738 static void set_final_status(grpc_call* call, grpc_error* error) {
739 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
740 gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
741 gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
742 }
743 if (call->is_client) {
744 grpc_error_get_status(error, call->send_deadline,
745 call->final_op.client.status,
746 call->final_op.client.status_details, nullptr,
747 call->final_op.client.error_string);
748 // explicitly take a ref
749 grpc_slice_ref_internal(*call->final_op.client.status_details);
750 gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
751 grpc_core::channelz::ChannelNode* channelz_channel =
752 grpc_channel_get_channelz_node(call->channel);
753 if (channelz_channel != nullptr) {
754 if (*call->final_op.client.status != GRPC_STATUS_OK) {
755 channelz_channel->RecordCallFailed();
756 } else {
757 channelz_channel->RecordCallSucceeded();
758 }
759 }
760 } else {
761 *call->final_op.server.cancelled =
762 error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
763 grpc_core::channelz::ServerNode* channelz_node =
764 call->final_op.server.core_server->channelz_node();
765 if (channelz_node != nullptr) {
766 if (*call->final_op.server.cancelled ||
767 reinterpret_cast<grpc_error*>(
768 gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
769 channelz_node->RecordCallFailed();
770 } else {
771 channelz_node->RecordCallSucceeded();
772 }
773 }
774 GRPC_ERROR_UNREF(error);
775 }
776 }
777
778 /*******************************************************************************
779 * COMPRESSION
780 */
781
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)782 static void set_incoming_message_compression_algorithm(
783 grpc_call* call, grpc_message_compression_algorithm algo) {
784 GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
785 call->incoming_message_compression_algorithm = algo;
786 }
787
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)788 static void set_incoming_stream_compression_algorithm(
789 grpc_call* call, grpc_stream_compression_algorithm algo) {
790 GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
791 call->incoming_stream_compression_algorithm = algo;
792 }
793
grpc_call_test_only_get_compression_algorithm(grpc_call * call)794 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
795 grpc_call* call) {
796 grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
797 grpc_compression_algorithm_from_message_stream_compression_algorithm(
798 &algorithm, call->incoming_message_compression_algorithm,
799 call->incoming_stream_compression_algorithm);
800 return algorithm;
801 }
802
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)803 static grpc_compression_algorithm compression_algorithm_for_level_locked(
804 grpc_call* call, grpc_compression_level level) {
805 return grpc_compression_algorithm_for_level(level,
806 call->encodings_accepted_by_peer);
807 }
808
grpc_call_test_only_get_message_flags(grpc_call * call)809 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
810 uint32_t flags;
811 flags = call->test_only_last_message_flags;
812 return flags;
813 }
814
destroy_encodings_accepted_by_peer(void *)815 static void destroy_encodings_accepted_by_peer(void* /*p*/) {}
816
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)817 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
818 grpc_mdelem mdel,
819 uint32_t* encodings_accepted_by_peer,
820 bool stream_encoding) {
821 size_t i;
822 uint32_t algorithm;
823 grpc_slice_buffer accept_encoding_parts;
824 grpc_slice accept_encoding_slice;
825 void* accepted_user_data;
826
827 accepted_user_data =
828 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
829 if (accepted_user_data != nullptr) {
830 *encodings_accepted_by_peer =
831 static_cast<uint32_t>(((uintptr_t)accepted_user_data) - 1);
832 return;
833 }
834
835 *encodings_accepted_by_peer = 0;
836
837 accept_encoding_slice = GRPC_MDVALUE(mdel);
838 grpc_slice_buffer_init(&accept_encoding_parts);
839 grpc_slice_split_without_space(accept_encoding_slice, ",",
840 &accept_encoding_parts);
841
842 GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
843 for (i = 0; i < accept_encoding_parts.count; i++) {
844 int r;
845 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
846 if (!stream_encoding) {
847 r = grpc_message_compression_algorithm_parse(
848 accept_encoding_entry_slice,
849 reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
850 } else {
851 r = grpc_stream_compression_algorithm_parse(
852 accept_encoding_entry_slice,
853 reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
854 }
855 if (r) {
856 GPR_BITSET(encodings_accepted_by_peer, algorithm);
857 } else {
858 char* accept_encoding_entry_str =
859 grpc_slice_to_c_string(accept_encoding_entry_slice);
860 gpr_log(GPR_DEBUG,
861 "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
862 accept_encoding_entry_str);
863 gpr_free(accept_encoding_entry_str);
864 }
865 }
866
867 grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
868
869 grpc_mdelem_set_user_data(
870 mdel, destroy_encodings_accepted_by_peer,
871 (void*)((static_cast<uintptr_t>(*encodings_accepted_by_peer)) + 1));
872 }
873
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)874 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
875 uint32_t encodings_accepted_by_peer;
876 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
877 return encodings_accepted_by_peer;
878 }
879
880 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)881 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
882 return call->incoming_stream_compression_algorithm;
883 }
884
linked_from_md(const grpc_metadata * md)885 static grpc_linked_mdelem* linked_from_md(const grpc_metadata* md) {
886 return (grpc_linked_mdelem*)&md->internal_data;
887 }
888
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)889 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
890 grpc_metadata* additional_metadata, int i,
891 int count) {
892 grpc_metadata* res =
893 i < count ? &metadata[i] : &additional_metadata[i - count];
894 GPR_ASSERT(res);
895 return res;
896 }
897
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)898 static int prepare_application_metadata(grpc_call* call, int count,
899 grpc_metadata* metadata,
900 int is_trailing,
901 int prepend_extra_metadata,
902 grpc_metadata* additional_metadata,
903 int additional_metadata_count) {
904 int total_count = count + additional_metadata_count;
905 int i;
906 grpc_metadata_batch* batch =
907 &call->metadata_batch[0 /* is_receiving */][is_trailing];
908 for (i = 0; i < total_count; i++) {
909 const grpc_metadata* md =
910 get_md_elem(metadata, additional_metadata, i, count);
911 grpc_linked_mdelem* l = linked_from_md(md);
912 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
913 if (!GRPC_LOG_IF_ERROR("validate_metadata",
914 grpc_validate_header_key_is_legal(md->key))) {
915 break;
916 } else if (!grpc_is_binary_header_internal(md->key) &&
917 !GRPC_LOG_IF_ERROR(
918 "validate_metadata",
919 grpc_validate_header_nonbin_value_is_legal(md->value))) {
920 break;
921 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
922 // HTTP2 hpack encoding has a maximum limit.
923 break;
924 }
925 l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
926 }
927 if (i != total_count) {
928 for (int j = 0; j < i; j++) {
929 const grpc_metadata* md =
930 get_md_elem(metadata, additional_metadata, j, count);
931 grpc_linked_mdelem* l = linked_from_md(md);
932 GRPC_MDELEM_UNREF(l->md);
933 }
934 return 0;
935 }
936 if (prepend_extra_metadata) {
937 if (call->send_extra_metadata_count == 0) {
938 prepend_extra_metadata = 0;
939 } else {
940 for (i = 0; i < call->send_extra_metadata_count; i++) {
941 GRPC_LOG_IF_ERROR("prepare_application_metadata",
942 grpc_metadata_batch_link_tail(
943 batch, &call->send_extra_metadata[i]));
944 }
945 }
946 }
947 for (i = 0; i < total_count; i++) {
948 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
949 grpc_linked_mdelem* l = linked_from_md(md);
950 grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
951 if (error != GRPC_ERROR_NONE) {
952 GRPC_MDELEM_UNREF(l->md);
953 }
954 GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
955 }
956 call->send_extra_metadata_count = 0;
957
958 return 1;
959 }
960
decode_message_compression(grpc_mdelem md)961 static grpc_message_compression_algorithm decode_message_compression(
962 grpc_mdelem md) {
963 grpc_message_compression_algorithm algorithm =
964 grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
965 if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
966 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
967 gpr_log(GPR_ERROR,
968 "Invalid incoming message compression algorithm: '%s'. "
969 "Interpreting incoming data as uncompressed.",
970 md_c_str);
971 gpr_free(md_c_str);
972 return GRPC_MESSAGE_COMPRESS_NONE;
973 }
974 return algorithm;
975 }
976
decode_stream_compression(grpc_mdelem md)977 static grpc_stream_compression_algorithm decode_stream_compression(
978 grpc_mdelem md) {
979 grpc_stream_compression_algorithm algorithm =
980 grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
981 if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
982 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
983 gpr_log(GPR_ERROR,
984 "Invalid incoming stream compression algorithm: '%s'. Interpreting "
985 "incoming data as uncompressed.",
986 md_c_str);
987 gpr_free(md_c_str);
988 return GRPC_STREAM_COMPRESS_NONE;
989 }
990 return algorithm;
991 }
992
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)993 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
994 int is_trailing) {
995 if (b->list.count == 0) return;
996 if (!call->is_client && is_trailing) return;
997 if (is_trailing && call->buffered_metadata[1] == nullptr) return;
998 GPR_TIMER_SCOPE("publish_app_metadata", 0);
999 grpc_metadata_array* dest;
1000 grpc_metadata* mdusr;
1001 dest = call->buffered_metadata[is_trailing];
1002 if (dest->count + b->list.count > dest->capacity) {
1003 dest->capacity =
1004 GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
1005 dest->metadata = static_cast<grpc_metadata*>(
1006 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1007 }
1008 for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1009 mdusr = &dest->metadata[dest->count++];
1010 /* we pass back borrowed slices that are valid whilst the call is valid */
1011 mdusr->key = GRPC_MDKEY(l->md);
1012 mdusr->value = GRPC_MDVALUE(l->md);
1013 }
1014 }
1015
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1016 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1017 if (b->idx.named.content_encoding != nullptr) {
1018 GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1019 set_incoming_stream_compression_algorithm(
1020 call, decode_stream_compression(b->idx.named.content_encoding->md));
1021 grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1022 }
1023 if (b->idx.named.grpc_encoding != nullptr) {
1024 GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1025 set_incoming_message_compression_algorithm(
1026 call, decode_message_compression(b->idx.named.grpc_encoding->md));
1027 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1028 }
1029 uint32_t message_encodings_accepted_by_peer = 1u;
1030 uint32_t stream_encodings_accepted_by_peer = 1u;
1031 if (b->idx.named.grpc_accept_encoding != nullptr) {
1032 GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1033 set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1034 &message_encodings_accepted_by_peer, false);
1035 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1036 }
1037 if (b->idx.named.accept_encoding != nullptr) {
1038 GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1039 set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1040 &stream_encodings_accepted_by_peer, true);
1041 grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1042 }
1043 call->encodings_accepted_by_peer =
1044 grpc_compression_bitset_from_message_stream_compression_bitset(
1045 message_encodings_accepted_by_peer,
1046 stream_encodings_accepted_by_peer);
1047 publish_app_metadata(call, b, false);
1048 }
1049
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error * batch_error)1050 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1051 grpc_error* batch_error) {
1052 grpc_call* call = static_cast<grpc_call*>(args);
1053 if (batch_error != GRPC_ERROR_NONE) {
1054 set_final_status(call, batch_error);
1055 } else if (b->idx.named.grpc_status != nullptr) {
1056 grpc_status_code status_code =
1057 grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1058 grpc_error* error = GRPC_ERROR_NONE;
1059 if (status_code != GRPC_STATUS_OK) {
1060 char* peer = grpc_call_get_peer(call);
1061 error = grpc_error_set_int(
1062 GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1063 absl::StrCat("Error received from peer ", peer).c_str()),
1064 GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1065 gpr_free(peer);
1066 }
1067 if (b->idx.named.grpc_message != nullptr) {
1068 error = grpc_error_set_str(
1069 error, GRPC_ERROR_STR_GRPC_MESSAGE,
1070 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1071 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1072 } else if (error != GRPC_ERROR_NONE) {
1073 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1074 grpc_empty_slice());
1075 }
1076 set_final_status(call, GRPC_ERROR_REF(error));
1077 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1078 GRPC_ERROR_UNREF(error);
1079 } else if (!call->is_client) {
1080 set_final_status(call, GRPC_ERROR_NONE);
1081 } else {
1082 gpr_log(GPR_DEBUG,
1083 "Received trailing metadata with no error and no status");
1084 set_final_status(
1085 call, grpc_error_set_int(
1086 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1087 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1088 }
1089 publish_app_metadata(call, b, true);
1090 }
1091
grpc_call_get_arena(grpc_call * call)1092 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1093
grpc_call_get_call_stack(grpc_call * call)1094 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1095 return CALL_STACK_FROM_CALL(call);
1096 }
1097
1098 /*******************************************************************************
1099 * BATCH API IMPLEMENTATION
1100 */
1101
are_write_flags_valid(uint32_t flags)1102 static bool are_write_flags_valid(uint32_t flags) {
1103 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1104 const uint32_t allowed_write_positions =
1105 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1106 const uint32_t invalid_positions = ~allowed_write_positions;
1107 return !(flags & invalid_positions);
1108 }
1109
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1110 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1111 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1112 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1113 if (!is_client) {
1114 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1115 }
1116 return !(flags & invalid_positions);
1117 }
1118
batch_slot_for_op(grpc_op_type type)1119 static size_t batch_slot_for_op(grpc_op_type type) {
1120 switch (type) {
1121 case GRPC_OP_SEND_INITIAL_METADATA:
1122 return 0;
1123 case GRPC_OP_SEND_MESSAGE:
1124 return 1;
1125 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1126 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1127 return 2;
1128 case GRPC_OP_RECV_INITIAL_METADATA:
1129 return 3;
1130 case GRPC_OP_RECV_MESSAGE:
1131 return 4;
1132 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1133 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1134 return 5;
1135 }
1136 GPR_UNREACHABLE_CODE(return 123456789);
1137 }
1138
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1139 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1140 const grpc_op* ops) {
1141 size_t slot_idx = batch_slot_for_op(ops[0].op);
1142 batch_control** pslot = &call->active_batches[slot_idx];
1143 batch_control* bctl;
1144 if (*pslot != nullptr) {
1145 bctl = *pslot;
1146 if (bctl->call != nullptr) {
1147 return nullptr;
1148 }
1149 bctl->~batch_control();
1150 bctl->op = {};
1151 } else {
1152 bctl = call->arena->New<batch_control>();
1153 *pslot = bctl;
1154 }
1155 bctl->call = call;
1156 bctl->op.payload = &call->stream_op_payload;
1157 return bctl;
1158 }
1159
finish_batch_completion(void * user_data,grpc_cq_completion *)1160 static void finish_batch_completion(void* user_data,
1161 grpc_cq_completion* /*storage*/) {
1162 batch_control* bctl = static_cast<batch_control*>(user_data);
1163 grpc_call* call = bctl->call;
1164 bctl->call = nullptr;
1165 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1166 }
1167
reset_batch_errors(batch_control * bctl)1168 static void reset_batch_errors(batch_control* bctl) {
1169 GRPC_ERROR_UNREF(
1170 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1171 gpr_atm_rel_store(&bctl->batch_error,
1172 reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1173 }
1174
post_batch_completion(batch_control * bctl)1175 static void post_batch_completion(batch_control* bctl) {
1176 grpc_call* next_child_call;
1177 grpc_call* call = bctl->call;
1178 grpc_error* error = GRPC_ERROR_REF(
1179 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1180
1181 if (bctl->op.send_initial_metadata) {
1182 grpc_metadata_batch_destroy(
1183 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1184 }
1185 if (bctl->op.send_message) {
1186 if (bctl->op.payload->send_message.stream_write_closed &&
1187 error == GRPC_ERROR_NONE) {
1188 error = grpc_error_add_child(
1189 error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1190 "Attempt to send message after stream was closed."));
1191 }
1192 call->sending_message = false;
1193 }
1194 if (bctl->op.send_trailing_metadata) {
1195 grpc_metadata_batch_destroy(
1196 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1197 }
1198 if (bctl->op.recv_trailing_metadata) {
1199 /* propagate cancellation to any interested children */
1200 gpr_atm_rel_store(&call->received_final_op_atm, 1);
1201 parent_call* pc = get_parent_call(call);
1202 if (pc != nullptr) {
1203 grpc_call* child;
1204 gpr_mu_lock(&pc->child_list_mu);
1205 child = pc->first_child;
1206 if (child != nullptr) {
1207 do {
1208 next_child_call = child->child->sibling_next;
1209 if (child->cancellation_is_inherited) {
1210 GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1211 cancel_with_error(child, GRPC_ERROR_CANCELLED);
1212 GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1213 }
1214 child = next_child_call;
1215 } while (child != pc->first_child);
1216 }
1217 gpr_mu_unlock(&pc->child_list_mu);
1218 }
1219 GRPC_ERROR_UNREF(error);
1220 error = GRPC_ERROR_NONE;
1221 }
1222 if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1223 *call->receiving_buffer != nullptr) {
1224 grpc_byte_buffer_destroy(*call->receiving_buffer);
1225 *call->receiving_buffer = nullptr;
1226 }
1227 reset_batch_errors(bctl);
1228
1229 if (bctl->completion_data.notify_tag.is_closure) {
1230 /* unrefs error */
1231 bctl->call = nullptr;
1232 grpc_core::Closure::Run(DEBUG_LOCATION,
1233 (grpc_closure*)bctl->completion_data.notify_tag.tag,
1234 error);
1235 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1236 } else {
1237 /* unrefs error */
1238 grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1239 finish_batch_completion, bctl,
1240 &bctl->completion_data.cq_completion);
1241 }
1242 }
1243
finish_batch_step(batch_control * bctl)1244 static void finish_batch_step(batch_control* bctl) {
1245 if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1246 post_batch_completion(bctl);
1247 }
1248 }
1249
continue_receiving_slices(batch_control * bctl)1250 static void continue_receiving_slices(batch_control* bctl) {
1251 grpc_error* error;
1252 grpc_call* call = bctl->call;
1253 for (;;) {
1254 size_t remaining = call->receiving_stream->length() -
1255 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1256 if (remaining == 0) {
1257 call->receiving_message = false;
1258 call->receiving_stream.reset();
1259 finish_batch_step(bctl);
1260 return;
1261 }
1262 if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1263 error = call->receiving_stream->Pull(&call->receiving_slice);
1264 if (error == GRPC_ERROR_NONE) {
1265 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1266 call->receiving_slice);
1267 } else {
1268 call->receiving_stream.reset();
1269 grpc_byte_buffer_destroy(*call->receiving_buffer);
1270 *call->receiving_buffer = nullptr;
1271 call->receiving_message = false;
1272 finish_batch_step(bctl);
1273 GRPC_ERROR_UNREF(error);
1274 return;
1275 }
1276 } else {
1277 return;
1278 }
1279 }
1280 }
1281
receiving_slice_ready(void * bctlp,grpc_error * error)1282 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1283 batch_control* bctl = static_cast<batch_control*>(bctlp);
1284 grpc_call* call = bctl->call;
1285 bool release_error = false;
1286
1287 if (error == GRPC_ERROR_NONE) {
1288 grpc_slice slice;
1289 error = call->receiving_stream->Pull(&slice);
1290 if (error == GRPC_ERROR_NONE) {
1291 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1292 slice);
1293 continue_receiving_slices(bctl);
1294 } else {
1295 /* Error returned by ByteStream::Pull() needs to be released manually */
1296 release_error = true;
1297 }
1298 }
1299
1300 if (error != GRPC_ERROR_NONE) {
1301 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1302 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1303 }
1304 call->receiving_stream.reset();
1305 grpc_byte_buffer_destroy(*call->receiving_buffer);
1306 *call->receiving_buffer = nullptr;
1307 call->receiving_message = false;
1308 finish_batch_step(bctl);
1309 if (release_error) {
1310 GRPC_ERROR_UNREF(error);
1311 }
1312 }
1313 }
1314
process_data_after_md(batch_control * bctl)1315 static void process_data_after_md(batch_control* bctl) {
1316 grpc_call* call = bctl->call;
1317 if (call->receiving_stream == nullptr) {
1318 *call->receiving_buffer = nullptr;
1319 call->receiving_message = false;
1320 finish_batch_step(bctl);
1321 } else {
1322 call->test_only_last_message_flags = call->receiving_stream->flags();
1323 if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1324 (call->incoming_message_compression_algorithm >
1325 GRPC_MESSAGE_COMPRESS_NONE)) {
1326 grpc_compression_algorithm algo;
1327 GPR_ASSERT(
1328 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1329 &algo, call->incoming_message_compression_algorithm,
1330 (grpc_stream_compression_algorithm)0));
1331 *call->receiving_buffer =
1332 grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1333 } else {
1334 *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1335 }
1336 GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1337 grpc_schedule_on_exec_ctx);
1338 continue_receiving_slices(bctl);
1339 }
1340 }
1341
receiving_stream_ready(void * bctlp,grpc_error * error)1342 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1343 batch_control* bctl = static_cast<batch_control*>(bctlp);
1344 grpc_call* call = bctl->call;
1345 if (error != GRPC_ERROR_NONE) {
1346 call->receiving_stream.reset();
1347 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1348 GRPC_ERROR_NONE) {
1349 gpr_atm_rel_store(&bctl->batch_error,
1350 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1351 }
1352 cancel_with_error(call, GRPC_ERROR_REF(error));
1353 }
1354 /* If recv_state is RECV_NONE, we will save the batch_control
1355 * object with rel_cas, and will not use it after the cas. Its corresponding
1356 * acq_load is in receiving_initial_metadata_ready() */
1357 if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1358 !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
1359 process_data_after_md(bctl);
1360 }
1361 }
1362
1363 // The recv_message_ready callback used when sending a batch containing
1364 // a recv_message op down the filter stack. Yields the call combiner
1365 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error * error)1366 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1367 grpc_error* error) {
1368 batch_control* bctl = static_cast<batch_control*>(bctlp);
1369 grpc_call* call = bctl->call;
1370 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1371 receiving_stream_ready(bctlp, error);
1372 }
1373
1374 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1375 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1376 std::string error_msg = absl::StrFormat(
1377 "Incoming stream has both stream compression (%d) and message "
1378 "compression (%d).",
1379 call->incoming_stream_compression_algorithm,
1380 call->incoming_message_compression_algorithm);
1381 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1382 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1383 }
1384
1385 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1386 handle_error_parsing_compression_algorithm(grpc_call* call) {
1387 std::string error_msg = absl::StrFormat(
1388 "Error in incoming message compression (%d) or stream "
1389 "compression (%d).",
1390 call->incoming_stream_compression_algorithm,
1391 call->incoming_message_compression_algorithm);
1392 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1393 }
1394
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1395 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1396 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1397 std::string error_msg = absl::StrFormat(
1398 "Invalid compression algorithm value '%d'.", compression_algorithm);
1399 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1400 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1401 }
1402
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1403 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1404 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1405 const char* algo_name = nullptr;
1406 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1407 std::string error_msg =
1408 absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1409 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1410 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1411 }
1412
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1413 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1414 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1415 const char* algo_name = nullptr;
1416 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1417 gpr_log(GPR_ERROR,
1418 "Compression algorithm ('%s') not present in the bitset of "
1419 "accepted encodings ('0x%x')",
1420 algo_name, call->encodings_accepted_by_peer);
1421 }
1422
validate_filtered_metadata(batch_control * bctl)1423 static void validate_filtered_metadata(batch_control* bctl) {
1424 grpc_compression_algorithm compression_algorithm;
1425 grpc_call* call = bctl->call;
1426 if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1427 GRPC_STREAM_COMPRESS_NONE &&
1428 call->incoming_message_compression_algorithm !=
1429 GRPC_MESSAGE_COMPRESS_NONE)) {
1430 handle_both_stream_and_msg_compression_set(call);
1431 } else if (
1432 GPR_UNLIKELY(
1433 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1434 &compression_algorithm,
1435 call->incoming_message_compression_algorithm,
1436 call->incoming_stream_compression_algorithm) == 0)) {
1437 handle_error_parsing_compression_algorithm(call);
1438 } else {
1439 const grpc_compression_options compression_options =
1440 grpc_channel_compression_options(call->channel);
1441 if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1442 handle_invalid_compression(call, compression_algorithm);
1443 } else if (GPR_UNLIKELY(
1444 grpc_compression_options_is_algorithm_enabled_internal(
1445 &compression_options, compression_algorithm) == 0)) {
1446 /* check if algorithm is supported by current channel config */
1447 handle_compression_algorithm_disabled(call, compression_algorithm);
1448 }
1449 /* GRPC_COMPRESS_NONE is always set. */
1450 GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1451 if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1452 compression_algorithm))) {
1453 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1454 handle_compression_algorithm_not_accepted(call, compression_algorithm);
1455 }
1456 }
1457 }
1458 }
1459
receiving_initial_metadata_ready(void * bctlp,grpc_error * error)1460 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1461 batch_control* bctl = static_cast<batch_control*>(bctlp);
1462 grpc_call* call = bctl->call;
1463
1464 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1465
1466 if (error == GRPC_ERROR_NONE) {
1467 grpc_metadata_batch* md =
1468 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1469 recv_initial_filter(call, md);
1470
1471 /* TODO(ctiller): this could be moved into recv_initial_filter now */
1472 GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1473 validate_filtered_metadata(bctl);
1474
1475 if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1476 call->send_deadline = md->deadline;
1477 }
1478 } else {
1479 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1480 GRPC_ERROR_NONE) {
1481 gpr_atm_rel_store(&bctl->batch_error,
1482 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1483 }
1484 cancel_with_error(call, GRPC_ERROR_REF(error));
1485 }
1486
1487 grpc_closure* saved_rsr_closure = nullptr;
1488 while (true) {
1489 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1490 /* Should only receive initial metadata once */
1491 GPR_ASSERT(rsr_bctlp != 1);
1492 if (rsr_bctlp == 0) {
1493 /* We haven't seen initial metadata and messages before, thus initial
1494 * metadata is received first.
1495 * no_barrier_cas is used, as this function won't access the batch_control
1496 * object saved by receiving_stream_ready() if the initial metadata is
1497 * received first. */
1498 if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1499 RECV_INITIAL_METADATA_FIRST)) {
1500 break;
1501 }
1502 } else {
1503 /* Already received messages */
1504 saved_rsr_closure =
1505 GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1506 grpc_schedule_on_exec_ctx);
1507 /* No need to modify recv_state */
1508 break;
1509 }
1510 }
1511 if (saved_rsr_closure != nullptr) {
1512 grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1513 GRPC_ERROR_REF(error));
1514 }
1515
1516 finish_batch_step(bctl);
1517 }
1518
receiving_trailing_metadata_ready(void * bctlp,grpc_error * error)1519 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1520 batch_control* bctl = static_cast<batch_control*>(bctlp);
1521 grpc_call* call = bctl->call;
1522 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1523 grpc_metadata_batch* md =
1524 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1525 recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1526 finish_batch_step(bctl);
1527 }
1528
finish_batch(void * bctlp,grpc_error * error)1529 static void finish_batch(void* bctlp, grpc_error* error) {
1530 batch_control* bctl = static_cast<batch_control*>(bctlp);
1531 grpc_call* call = bctl->call;
1532 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1533 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1534 GRPC_ERROR_NONE) {
1535 gpr_atm_rel_store(&bctl->batch_error,
1536 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1537 }
1538 if (error != GRPC_ERROR_NONE) {
1539 cancel_with_error(call, GRPC_ERROR_REF(error));
1540 }
1541 finish_batch_step(bctl);
1542 }
1543
free_no_op_completion(void *,grpc_cq_completion * completion)1544 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1545 gpr_free(completion);
1546 }
1547
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1548 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1549 size_t nops, void* notify_tag,
1550 int is_notify_tag_closure) {
1551 GPR_TIMER_SCOPE("call_start_batch", 0);
1552
1553 size_t i;
1554 const grpc_op* op;
1555 batch_control* bctl;
1556 bool has_send_ops = false;
1557 int num_recv_ops = 0;
1558 grpc_call_error error = GRPC_CALL_OK;
1559 grpc_transport_stream_op_batch* stream_op;
1560 grpc_transport_stream_op_batch_payload* stream_op_payload;
1561
1562 GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1563
1564 if (nops == 0) {
1565 if (!is_notify_tag_closure) {
1566 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1567 grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1568 free_no_op_completion, nullptr,
1569 static_cast<grpc_cq_completion*>(
1570 gpr_malloc(sizeof(grpc_cq_completion))));
1571 } else {
1572 grpc_core::Closure::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag,
1573 GRPC_ERROR_NONE);
1574 }
1575 error = GRPC_CALL_OK;
1576 goto done;
1577 }
1578
1579 bctl = reuse_or_allocate_batch_control(call, ops);
1580 if (bctl == nullptr) {
1581 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1582 }
1583 bctl->completion_data.notify_tag.tag = notify_tag;
1584 bctl->completion_data.notify_tag.is_closure =
1585 static_cast<uint8_t>(is_notify_tag_closure != 0);
1586
1587 stream_op = &bctl->op;
1588 stream_op_payload = &call->stream_op_payload;
1589
1590 /* rewrite batch ops into a transport op */
1591 for (i = 0; i < nops; i++) {
1592 op = &ops[i];
1593 if (op->reserved != nullptr) {
1594 error = GRPC_CALL_ERROR;
1595 goto done_with_error;
1596 }
1597 switch (op->op) {
1598 case GRPC_OP_SEND_INITIAL_METADATA: {
1599 /* Flag validation: currently allow no flags */
1600 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1601 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1602 goto done_with_error;
1603 }
1604 if (call->sent_initial_metadata) {
1605 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1606 goto done_with_error;
1607 }
1608 // TODO(juanlishen): If the user has already specified a compression
1609 // algorithm by setting the initial metadata with key of
1610 // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1611 // with the compression algorithm mapped from compression level.
1612 /* process compression level */
1613 grpc_metadata& compression_md = call->compression_md;
1614 compression_md.key = grpc_empty_slice();
1615 compression_md.value = grpc_empty_slice();
1616 compression_md.flags = 0;
1617 size_t additional_metadata_count = 0;
1618 grpc_compression_level effective_compression_level =
1619 GRPC_COMPRESS_LEVEL_NONE;
1620 bool level_set = false;
1621 if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1622 effective_compression_level =
1623 op->data.send_initial_metadata.maybe_compression_level.level;
1624 level_set = true;
1625 } else {
1626 const grpc_compression_options copts =
1627 grpc_channel_compression_options(call->channel);
1628 if (copts.default_level.is_set) {
1629 level_set = true;
1630 effective_compression_level = copts.default_level.level;
1631 }
1632 }
1633 // Currently, only server side supports compression level setting.
1634 if (level_set && !call->is_client) {
1635 const grpc_compression_algorithm calgo =
1636 compression_algorithm_for_level_locked(
1637 call, effective_compression_level);
1638 // The following metadata will be checked and removed by the message
1639 // compression filter. It will be used as the call's compression
1640 // algorithm.
1641 compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1642 compression_md.value = grpc_compression_algorithm_slice(calgo);
1643 additional_metadata_count++;
1644 }
1645 if (op->data.send_initial_metadata.count + additional_metadata_count >
1646 INT_MAX) {
1647 error = GRPC_CALL_ERROR_INVALID_METADATA;
1648 goto done_with_error;
1649 }
1650 stream_op->send_initial_metadata = true;
1651 call->sent_initial_metadata = true;
1652 if (!prepare_application_metadata(
1653 call, static_cast<int>(op->data.send_initial_metadata.count),
1654 op->data.send_initial_metadata.metadata, 0, call->is_client,
1655 &compression_md, static_cast<int>(additional_metadata_count))) {
1656 error = GRPC_CALL_ERROR_INVALID_METADATA;
1657 goto done_with_error;
1658 }
1659 /* TODO(ctiller): just make these the same variable? */
1660 if (call->is_client) {
1661 call->metadata_batch[0][0].deadline = call->send_deadline;
1662 }
1663 stream_op_payload->send_initial_metadata.send_initial_metadata =
1664 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1665 stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1666 op->flags;
1667 if (call->is_client) {
1668 stream_op_payload->send_initial_metadata.peer_string =
1669 &call->peer_string;
1670 }
1671 has_send_ops = true;
1672 break;
1673 }
1674 case GRPC_OP_SEND_MESSAGE: {
1675 if (!are_write_flags_valid(op->flags)) {
1676 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1677 goto done_with_error;
1678 }
1679 if (op->data.send_message.send_message == nullptr) {
1680 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1681 goto done_with_error;
1682 }
1683 if (call->sending_message) {
1684 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1685 goto done_with_error;
1686 }
1687 uint32_t flags = op->flags;
1688 /* If the outgoing buffer is already compressed, mark it as so in the
1689 flags. These will be picked up by the compression filter and further
1690 (wasteful) attempts at compression skipped. */
1691 if (op->data.send_message.send_message->data.raw.compression >
1692 GRPC_COMPRESS_NONE) {
1693 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1694 }
1695 stream_op->send_message = true;
1696 call->sending_message = true;
1697 call->sending_stream.Init(
1698 &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1699 stream_op_payload->send_message.send_message.reset(
1700 call->sending_stream.get());
1701 has_send_ops = true;
1702 break;
1703 }
1704 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1705 /* Flag validation: currently allow no flags */
1706 if (op->flags != 0) {
1707 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1708 goto done_with_error;
1709 }
1710 if (!call->is_client) {
1711 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1712 goto done_with_error;
1713 }
1714 if (call->sent_final_op) {
1715 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1716 goto done_with_error;
1717 }
1718 stream_op->send_trailing_metadata = true;
1719 call->sent_final_op = true;
1720 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1721 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1722 has_send_ops = true;
1723 break;
1724 }
1725 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1726 /* Flag validation: currently allow no flags */
1727 if (op->flags != 0) {
1728 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1729 goto done_with_error;
1730 }
1731 if (call->is_client) {
1732 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1733 goto done_with_error;
1734 }
1735 if (call->sent_final_op) {
1736 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1737 goto done_with_error;
1738 }
1739 if (op->data.send_status_from_server.trailing_metadata_count >
1740 INT_MAX) {
1741 error = GRPC_CALL_ERROR_INVALID_METADATA;
1742 goto done_with_error;
1743 }
1744 stream_op->send_trailing_metadata = true;
1745 call->sent_final_op = true;
1746 GPR_ASSERT(call->send_extra_metadata_count == 0);
1747 call->send_extra_metadata_count = 1;
1748 call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1749 op->data.send_status_from_server.status);
1750 grpc_error* status_error =
1751 op->data.send_status_from_server.status == GRPC_STATUS_OK
1752 ? GRPC_ERROR_NONE
1753 : grpc_error_set_int(
1754 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1755 "Server returned error"),
1756 GRPC_ERROR_INT_GRPC_STATUS,
1757 static_cast<intptr_t>(
1758 op->data.send_status_from_server.status));
1759 if (op->data.send_status_from_server.status_details != nullptr) {
1760 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1761 GRPC_MDSTR_GRPC_MESSAGE,
1762 grpc_slice_ref_internal(
1763 *op->data.send_status_from_server.status_details));
1764 call->send_extra_metadata_count++;
1765 if (status_error != GRPC_ERROR_NONE) {
1766 char* msg = grpc_slice_to_c_string(
1767 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1768 status_error =
1769 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1770 grpc_slice_from_copied_string(msg));
1771 gpr_free(msg);
1772 }
1773 }
1774
1775 gpr_atm_rel_store(&call->status_error,
1776 reinterpret_cast<gpr_atm>(status_error));
1777 if (!prepare_application_metadata(
1778 call,
1779 static_cast<int>(
1780 op->data.send_status_from_server.trailing_metadata_count),
1781 op->data.send_status_from_server.trailing_metadata, 1, 1,
1782 nullptr, 0)) {
1783 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1784 GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1785 }
1786 call->send_extra_metadata_count = 0;
1787 error = GRPC_CALL_ERROR_INVALID_METADATA;
1788 goto done_with_error;
1789 }
1790 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1791 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1792 stream_op_payload->send_trailing_metadata.sent =
1793 &call->sent_server_trailing_metadata;
1794 has_send_ops = true;
1795 break;
1796 }
1797 case GRPC_OP_RECV_INITIAL_METADATA: {
1798 /* Flag validation: currently allow no flags */
1799 if (op->flags != 0) {
1800 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1801 goto done_with_error;
1802 }
1803 if (call->received_initial_metadata) {
1804 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1805 goto done_with_error;
1806 }
1807 call->received_initial_metadata = true;
1808 call->buffered_metadata[0] =
1809 op->data.recv_initial_metadata.recv_initial_metadata;
1810 GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1811 receiving_initial_metadata_ready, bctl,
1812 grpc_schedule_on_exec_ctx);
1813 stream_op->recv_initial_metadata = true;
1814 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1815 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1816 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1817 &call->receiving_initial_metadata_ready;
1818 if (!call->is_client) {
1819 stream_op_payload->recv_initial_metadata.peer_string =
1820 &call->peer_string;
1821 }
1822 ++num_recv_ops;
1823 break;
1824 }
1825 case GRPC_OP_RECV_MESSAGE: {
1826 /* Flag validation: currently allow no flags */
1827 if (op->flags != 0) {
1828 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1829 goto done_with_error;
1830 }
1831 if (call->receiving_message) {
1832 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1833 goto done_with_error;
1834 }
1835 call->receiving_message = true;
1836 stream_op->recv_message = true;
1837 call->receiving_buffer = op->data.recv_message.recv_message;
1838 stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1839 GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1840 receiving_stream_ready_in_call_combiner, bctl,
1841 grpc_schedule_on_exec_ctx);
1842 stream_op_payload->recv_message.recv_message_ready =
1843 &call->receiving_stream_ready;
1844 ++num_recv_ops;
1845 break;
1846 }
1847 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1848 /* Flag validation: currently allow no flags */
1849 if (op->flags != 0) {
1850 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1851 goto done_with_error;
1852 }
1853 if (!call->is_client) {
1854 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1855 goto done_with_error;
1856 }
1857 if (call->requested_final_op) {
1858 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1859 goto done_with_error;
1860 }
1861 call->requested_final_op = true;
1862 call->buffered_metadata[1] =
1863 op->data.recv_status_on_client.trailing_metadata;
1864 call->final_op.client.status = op->data.recv_status_on_client.status;
1865 call->final_op.client.status_details =
1866 op->data.recv_status_on_client.status_details;
1867 call->final_op.client.error_string =
1868 op->data.recv_status_on_client.error_string;
1869 stream_op->recv_trailing_metadata = true;
1870 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1871 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1872 stream_op_payload->recv_trailing_metadata.collect_stats =
1873 &call->final_info.stats.transport_stream_stats;
1874 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1875 receiving_trailing_metadata_ready, bctl,
1876 grpc_schedule_on_exec_ctx);
1877 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1878 &call->receiving_trailing_metadata_ready;
1879 ++num_recv_ops;
1880 break;
1881 }
1882 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1883 /* Flag validation: currently allow no flags */
1884 if (op->flags != 0) {
1885 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1886 goto done_with_error;
1887 }
1888 if (call->is_client) {
1889 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1890 goto done_with_error;
1891 }
1892 if (call->requested_final_op) {
1893 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1894 goto done_with_error;
1895 }
1896 call->requested_final_op = true;
1897 call->final_op.server.cancelled =
1898 op->data.recv_close_on_server.cancelled;
1899 stream_op->recv_trailing_metadata = true;
1900 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1901 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1902 stream_op_payload->recv_trailing_metadata.collect_stats =
1903 &call->final_info.stats.transport_stream_stats;
1904 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1905 receiving_trailing_metadata_ready, bctl,
1906 grpc_schedule_on_exec_ctx);
1907 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1908 &call->receiving_trailing_metadata_ready;
1909 ++num_recv_ops;
1910 break;
1911 }
1912 }
1913 }
1914
1915 GRPC_CALL_INTERNAL_REF(call, "completion");
1916 if (!is_notify_tag_closure) {
1917 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1918 }
1919 bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1920
1921 if (has_send_ops) {
1922 GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1923 grpc_schedule_on_exec_ctx);
1924 stream_op->on_complete = &bctl->finish_batch;
1925 }
1926
1927 gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1928 execute_batch(call, stream_op, &bctl->start_batch);
1929
1930 done:
1931 return error;
1932
1933 done_with_error:
1934 /* reverse any mutations that occurred */
1935 if (stream_op->send_initial_metadata) {
1936 call->sent_initial_metadata = false;
1937 grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1938 }
1939 if (stream_op->send_message) {
1940 call->sending_message = false;
1941 call->sending_stream->Orphan();
1942 }
1943 if (stream_op->send_trailing_metadata) {
1944 call->sent_final_op = false;
1945 grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1946 }
1947 if (stream_op->recv_initial_metadata) {
1948 call->received_initial_metadata = false;
1949 }
1950 if (stream_op->recv_message) {
1951 call->receiving_message = false;
1952 }
1953 if (stream_op->recv_trailing_metadata) {
1954 call->requested_final_op = false;
1955 }
1956 goto done;
1957 }
1958
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1959 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1960 size_t nops, void* tag, void* reserved) {
1961 grpc_call_error err;
1962
1963 GRPC_API_TRACE(
1964 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1965 "reserved=%p)",
1966 5, (call, ops, (unsigned long)nops, tag, reserved));
1967
1968 if (reserved != nullptr) {
1969 err = GRPC_CALL_ERROR;
1970 } else {
1971 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1972 grpc_core::ExecCtx exec_ctx;
1973 err = call_start_batch(call, ops, nops, tag, 0);
1974 }
1975
1976 return err;
1977 }
1978
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1979 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1980 const grpc_op* ops,
1981 size_t nops,
1982 grpc_closure* closure) {
1983 return call_start_batch(call, ops, nops, closure, 1);
1984 }
1985
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1986 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1987 void* value, void (*destroy)(void* value)) {
1988 if (call->context[elem].destroy) {
1989 call->context[elem].destroy(call->context[elem].value);
1990 }
1991 call->context[elem].value = value;
1992 call->context[elem].destroy = destroy;
1993 }
1994
grpc_call_context_get(grpc_call * call,grpc_context_index elem)1995 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1996 return call->context[elem].value;
1997 }
1998
grpc_call_is_client(grpc_call * call)1999 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2000
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2001 grpc_compression_algorithm grpc_call_compression_for_level(
2002 grpc_call* call, grpc_compression_level level) {
2003 grpc_compression_algorithm algo =
2004 compression_algorithm_for_level_locked(call, level);
2005 return algo;
2006 }
2007
grpc_call_error_to_string(grpc_call_error error)2008 const char* grpc_call_error_to_string(grpc_call_error error) {
2009 switch (error) {
2010 case GRPC_CALL_ERROR:
2011 return "GRPC_CALL_ERROR";
2012 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2013 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2014 case GRPC_CALL_ERROR_ALREADY_FINISHED:
2015 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2016 case GRPC_CALL_ERROR_ALREADY_INVOKED:
2017 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2018 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2019 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2020 case GRPC_CALL_ERROR_INVALID_FLAGS:
2021 return "GRPC_CALL_ERROR_INVALID_FLAGS";
2022 case GRPC_CALL_ERROR_INVALID_MESSAGE:
2023 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2024 case GRPC_CALL_ERROR_INVALID_METADATA:
2025 return "GRPC_CALL_ERROR_INVALID_METADATA";
2026 case GRPC_CALL_ERROR_NOT_INVOKED:
2027 return "GRPC_CALL_ERROR_NOT_INVOKED";
2028 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2029 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2030 case GRPC_CALL_ERROR_NOT_ON_SERVER:
2031 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2032 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2033 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2034 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2035 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2036 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2037 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2038 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2039 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2040 case GRPC_CALL_OK:
2041 return "GRPC_CALL_OK";
2042 }
2043 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2044 }
2045