1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/surface/call.h"
22
23 #include <assert.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28
29 #include <string>
30
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/str_format.h"
33
34 #include <grpc/compression.h>
35 #include <grpc/grpc.h>
36 #include <grpc/slice.h>
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 #include <grpc/support/string_util.h>
40
41 #include "src/core/lib/channel/channel_stack.h"
42 #include "src/core/lib/compression/algorithm_metadata.h"
43 #include "src/core/lib/debug/stats.h"
44 #include "src/core/lib/gpr/alloc.h"
45 #include "src/core/lib/gpr/string.h"
46 #include "src/core/lib/gpr/time_precise.h"
47 #include "src/core/lib/gpr/useful.h"
48 #include "src/core/lib/gprpp/arena.h"
49 #include "src/core/lib/gprpp/manual_constructor.h"
50 #include "src/core/lib/gprpp/ref_counted.h"
51 #include "src/core/lib/iomgr/timer.h"
52 #include "src/core/lib/profiling/timers.h"
53 #include "src/core/lib/slice/slice_split.h"
54 #include "src/core/lib/slice/slice_string_helpers.h"
55 #include "src/core/lib/slice/slice_utils.h"
56 #include "src/core/lib/surface/api_trace.h"
57 #include "src/core/lib/surface/call_test_only.h"
58 #include "src/core/lib/surface/channel.h"
59 #include "src/core/lib/surface/completion_queue.h"
60 #include "src/core/lib/surface/server.h"
61 #include "src/core/lib/surface/validate_metadata.h"
62 #include "src/core/lib/transport/error_utils.h"
63 #include "src/core/lib/transport/metadata.h"
64 #include "src/core/lib/transport/static_metadata.h"
65 #include "src/core/lib/transport/status_metadata.h"
66 #include "src/core/lib/transport/transport.h"
67
68 /** The maximum number of concurrent batches possible.
69 Based upon the maximum number of individually queueable ops in the batch
70 api:
71 - initial metadata send
72 - message send
73 - status/close send (depending on client/server)
74 - initial metadata recv
75 - message recv
76 - status/close recv (depending on client/server) */
77 #define MAX_CONCURRENT_BATCHES 6
78
79 #define MAX_SEND_EXTRA_METADATA_COUNT 3
80
81 // Used to create arena for the first call.
82 #define ESTIMATED_MDELEM_COUNT 16
83
84 struct batch_control {
85 batch_control() = default;
86
87 grpc_call* call = nullptr;
88 grpc_transport_stream_op_batch op;
89 /* Share memory for cq_completion and notify_tag as they are never needed
90 simultaneously. Each byte used in this data structure count as six bytes
91 per call, so any savings we can make are worthwhile,
92
93 We use notify_tag to determine whether or not to send notification to the
94 completion queue. Once we've made that determination, we can reuse the
95 memory for cq_completion. */
96 union {
97 grpc_cq_completion cq_completion;
98 struct {
99 /* Any given op indicates completion by either (a) calling a closure or
100 (b) sending a notification on the call's completion queue. If
101 \a is_closure is true, \a tag indicates a closure to be invoked;
102 otherwise, \a tag indicates the tag to be used in the notification to
103 be sent to the completion queue. */
104 void* tag;
105 bool is_closure;
106 } notify_tag;
107 } completion_data;
108 grpc_closure start_batch;
109 grpc_closure finish_batch;
110 std::atomic<intptr_t> steps_to_complete{0};
111 AtomicError batch_error;
set_num_steps_to_completebatch_control112 void set_num_steps_to_complete(uintptr_t steps) {
113 steps_to_complete.store(steps, std::memory_order_release);
114 }
completed_batch_stepbatch_control115 bool completed_batch_step() {
116 return steps_to_complete.fetch_sub(1, std::memory_order_acq_rel) == 1;
117 }
118 };
119
120 struct parent_call {
parent_callparent_call121 parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call122 ~parent_call() { gpr_mu_destroy(&child_list_mu); }
123
124 gpr_mu child_list_mu;
125 grpc_call* first_child = nullptr;
126 };
127
128 struct child_call {
child_callchild_call129 explicit child_call(grpc_call* parent) : parent(parent) {}
130 grpc_call* parent;
131 /** siblings: children of the same parent form a list, and this list is
132 protected under
133 parent->mu */
134 grpc_call* sibling_next = nullptr;
135 grpc_call* sibling_prev = nullptr;
136 };
137
138 #define RECV_NONE ((gpr_atm)0)
139 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
140
141 struct grpc_call {
grpc_callgrpc_call142 grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
143 : arena(arena),
144 cq(args.cq),
145 channel(args.channel),
146 is_client(args.server_transport_data == nullptr),
147 stream_op_payload(context) {}
148
~grpc_callgrpc_call149 ~grpc_call() {
150 for (int i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
151 if (context[i].destroy) {
152 context[i].destroy(context[i].value);
153 }
154 }
155 gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
156 }
157
158 grpc_core::RefCount ext_ref;
159 grpc_core::Arena* arena;
160 grpc_core::CallCombiner call_combiner;
161 grpc_completion_queue* cq;
162 grpc_polling_entity pollent;
163 grpc_channel* channel;
164 gpr_cycle_counter start_time = gpr_get_cycle_counter();
165 /* parent_call* */ gpr_atm parent_call_atm = 0;
166 child_call* child = nullptr;
167
168 /* client or server call */
169 bool is_client;
170 /** has grpc_call_unref been called */
171 bool destroy_called = false;
172 /** flag indicating that cancellation is inherited */
173 bool cancellation_is_inherited = false;
174 // Trailers-only response status
175 bool is_trailers_only = false;
176 /** which ops are in-flight */
177 bool sent_initial_metadata = false;
178 bool sending_message = false;
179 bool sent_final_op = false;
180 bool received_initial_metadata = false;
181 bool receiving_message = false;
182 bool requested_final_op = false;
183 gpr_atm any_ops_sent_atm = 0;
184 gpr_atm received_final_op_atm = 0;
185
186 batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
187 grpc_transport_stream_op_batch_payload stream_op_payload;
188
189 /* first idx: is_receiving, second idx: is_trailing */
190 grpc_metadata_batch send_initial_metadata{arena};
191 grpc_metadata_batch send_trailing_metadata{arena};
192 grpc_metadata_batch recv_initial_metadata{arena};
193 grpc_metadata_batch recv_trailing_metadata{arena};
194
195 /* Buffered read metadata waiting to be returned to the application.
196 Element 0 is initial metadata, element 1 is trailing metadata. */
197 grpc_metadata_array* buffered_metadata[2] = {};
198
199 grpc_metadata compression_md;
200
201 // A char* indicating the peer name.
202 gpr_atm peer_string = 0;
203
204 /* Call data useful used for reporting. Only valid after the call has
205 * completed */
206 grpc_call_final_info final_info;
207
208 /* Compression algorithm for *incoming* data */
209 grpc_message_compression_algorithm incoming_message_compression_algorithm =
210 GRPC_MESSAGE_COMPRESS_NONE;
211 /* Stream compression algorithm for *incoming* data */
212 grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
213 GRPC_STREAM_COMPRESS_NONE;
214 /* Supported encodings (compression algorithms), a bitset.
215 * Always support no compression. */
216 uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
217 /* Supported stream encodings (stream compression algorithms), a bitset */
218 uint32_t stream_encodings_accepted_by_peer = 0;
219
220 /* Contexts for various subsystems (security, tracing, ...). */
221 grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
222
223 /* for the client, extra metadata is initial metadata; for the
224 server, it's trailing metadata */
225 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
226 int send_extra_metadata_count;
227 grpc_millis send_deadline;
228
229 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
230
231 grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
232 bool call_failed_before_recv_message = false;
233 grpc_byte_buffer** receiving_buffer = nullptr;
234 grpc_slice receiving_slice = grpc_empty_slice();
235 grpc_closure receiving_slice_ready;
236 grpc_closure receiving_stream_ready;
237 grpc_closure receiving_initial_metadata_ready;
238 grpc_closure receiving_trailing_metadata_ready;
239 uint32_t test_only_last_message_flags = 0;
240 // Status about operation of call
241 bool sent_server_trailing_metadata = false;
242 gpr_atm cancelled_with_error = 0;
243
244 grpc_closure release_call;
245
246 union {
247 struct {
248 grpc_status_code* status;
249 grpc_slice* status_details;
250 const char** error_string;
251 } client;
252 struct {
253 int* cancelled;
254 // backpointer to owning server if this is a server side call.
255 grpc_core::Server* core_server;
256 } server;
257 } final_op;
258 AtomicError status_error;
259
260 /* recv_state can contain one of the following values:
261 RECV_NONE : : no initial metadata and messages received
262 RECV_INITIAL_METADATA_FIRST : received initial metadata first
263 a batch_control* : received messages first
264
265 +------1------RECV_NONE------3-----+
266 | |
267 | |
268 v v
269 RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
270 | ^ | ^
271 | | | |
272 +-----2-----+ +-----4-----+
273
274 For 1, 4: See receiving_initial_metadata_ready() function
275 For 2, 3: See receiving_stream_ready() function */
276 gpr_atm recv_state = 0;
277 };
278
279 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
280 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
281
282 #define CALL_STACK_FROM_CALL(call) \
283 (grpc_call_stack*)((char*)(call) + \
284 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
285 #define CALL_FROM_CALL_STACK(call_stack) \
286 (grpc_call*)(((char*)(call_stack)) - \
287 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
288
289 #define CALL_ELEM_FROM_CALL(call, idx) \
290 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
291 #define CALL_FROM_TOP_ELEM(top_elem) \
292 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
293
294 static void execute_batch(grpc_call* call,
295 grpc_transport_stream_op_batch* batch,
296 grpc_closure* start_batch_closure);
297
298 static void cancel_with_status(grpc_call* c, grpc_status_code status,
299 const char* description);
300 static void cancel_with_error(grpc_call* c, grpc_error_handle error);
301 static void destroy_call(void* call_stack, grpc_error_handle error);
302 static void receiving_slice_ready(void* bctlp, grpc_error_handle error);
303 static void set_final_status(grpc_call* call, grpc_error_handle error);
304 static void process_data_after_md(batch_control* bctl);
305 static void post_batch_completion(batch_control* bctl);
306
add_init_error(grpc_error_handle * composite,grpc_error_handle new_err)307 static void add_init_error(grpc_error_handle* composite,
308 grpc_error_handle new_err) {
309 if (new_err == GRPC_ERROR_NONE) return;
310 if (*composite == GRPC_ERROR_NONE) {
311 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
312 }
313 *composite = grpc_error_add_child(*composite, new_err);
314 }
315
grpc_call_arena_alloc(grpc_call * call,size_t size)316 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
317 return call->arena->Alloc(size);
318 }
319
get_or_create_parent_call(grpc_call * call)320 static parent_call* get_or_create_parent_call(grpc_call* call) {
321 parent_call* p =
322 reinterpret_cast<parent_call*>(gpr_atm_acq_load(&call->parent_call_atm));
323 if (p == nullptr) {
324 p = call->arena->New<parent_call>();
325 if (!gpr_atm_rel_cas(&call->parent_call_atm,
326 reinterpret_cast<gpr_atm>(nullptr),
327 reinterpret_cast<gpr_atm>(p))) {
328 p->~parent_call();
329 p = reinterpret_cast<parent_call*>(
330 gpr_atm_acq_load(&call->parent_call_atm));
331 }
332 }
333 return p;
334 }
335
get_parent_call(grpc_call * call)336 static parent_call* get_parent_call(grpc_call* call) {
337 return reinterpret_cast<parent_call*>(
338 gpr_atm_acq_load(&call->parent_call_atm));
339 }
340
grpc_call_get_initial_size_estimate()341 size_t grpc_call_get_initial_size_estimate() {
342 return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
343 sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
344 }
345
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)346 grpc_error_handle grpc_call_create(const grpc_call_create_args* args,
347 grpc_call** out_call) {
348 GPR_TIMER_SCOPE("grpc_call_create", 0);
349
350 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
351
352 grpc_core::Arena* arena;
353 grpc_call* call;
354 grpc_error_handle error = GRPC_ERROR_NONE;
355 grpc_channel_stack* channel_stack =
356 grpc_channel_get_channel_stack(args->channel);
357 size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
358 GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
359 size_t call_and_stack_size =
360 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
361 channel_stack->call_stack_size;
362 size_t call_alloc_size =
363 call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
364
365 std::pair<grpc_core::Arena*, void*> arena_with_call =
366 grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
367 arena = arena_with_call.first;
368 call = new (arena_with_call.second) grpc_call(arena, *args);
369 *out_call = call;
370 grpc_slice path = grpc_empty_slice();
371 if (call->is_client) {
372 call->final_op.client.status_details = nullptr;
373 call->final_op.client.status = nullptr;
374 call->final_op.client.error_string = nullptr;
375 GRPC_STATS_INC_CLIENT_CALLS_CREATED();
376 GPR_ASSERT(args->add_initial_metadata_count <
377 MAX_SEND_EXTRA_METADATA_COUNT);
378 for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
379 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
380 if (grpc_slice_eq_static_interned(
381 GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
382 path = grpc_slice_ref_internal(
383 GRPC_MDVALUE(args->add_initial_metadata[i]));
384 }
385 }
386 call->send_extra_metadata_count =
387 static_cast<int>(args->add_initial_metadata_count);
388 } else {
389 GRPC_STATS_INC_SERVER_CALLS_CREATED();
390 call->final_op.server.cancelled = nullptr;
391 call->final_op.server.core_server = args->server;
392 GPR_ASSERT(args->add_initial_metadata_count == 0);
393 call->send_extra_metadata_count = 0;
394 }
395
396 grpc_millis send_deadline = args->send_deadline;
397 bool immediately_cancel = false;
398
399 if (args->parent != nullptr) {
400 call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
401 call_and_stack_size) child_call(args->parent);
402
403 GRPC_CALL_INTERNAL_REF(args->parent, "child");
404 GPR_ASSERT(call->is_client);
405 GPR_ASSERT(!args->parent->is_client);
406
407 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
408 send_deadline = std::min(send_deadline, args->parent->send_deadline);
409 }
410 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
411 * GRPC_PROPAGATE_STATS_CONTEXT */
412 /* TODO(ctiller): This should change to use the appropriate census start_op
413 * call. */
414 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
415 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
416 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
417 "Census tracing propagation requested "
418 "without Census context propagation"));
419 }
420 grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
421 args->parent->context[GRPC_CONTEXT_TRACING].value,
422 nullptr);
423 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
424 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
425 "Census context propagation requested "
426 "without Census tracing propagation"));
427 }
428 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
429 call->cancellation_is_inherited = true;
430 if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
431 immediately_cancel = true;
432 }
433 }
434 }
435 call->send_deadline = send_deadline;
436 /* initial refcount dropped by grpc_call_unref */
437 grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
438 args->server_transport_data,
439 call->context,
440 path,
441 call->start_time,
442 send_deadline,
443 call->arena,
444 &call->call_combiner};
445 add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
446 call, &call_args));
447 // Publish this call to parent only after the call stack has been initialized.
448 if (args->parent != nullptr) {
449 child_call* cc = call->child;
450 parent_call* pc = get_or_create_parent_call(args->parent);
451 gpr_mu_lock(&pc->child_list_mu);
452 if (pc->first_child == nullptr) {
453 pc->first_child = call;
454 cc->sibling_next = cc->sibling_prev = call;
455 } else {
456 cc->sibling_next = pc->first_child;
457 cc->sibling_prev = pc->first_child->child->sibling_prev;
458 cc->sibling_next->child->sibling_prev =
459 cc->sibling_prev->child->sibling_next = call;
460 }
461 gpr_mu_unlock(&pc->child_list_mu);
462 }
463
464 if (error != GRPC_ERROR_NONE) {
465 cancel_with_error(call, GRPC_ERROR_REF(error));
466 }
467 if (immediately_cancel) {
468 cancel_with_error(call, GRPC_ERROR_CANCELLED);
469 }
470 if (args->cq != nullptr) {
471 GPR_ASSERT(args->pollset_set_alternative == nullptr &&
472 "Only one of 'cq' and 'pollset_set_alternative' should be "
473 "non-nullptr.");
474 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
475 call->pollent =
476 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
477 }
478 if (args->pollset_set_alternative != nullptr) {
479 call->pollent = grpc_polling_entity_create_from_pollset_set(
480 args->pollset_set_alternative);
481 }
482 if (!grpc_polling_entity_is_empty(&call->pollent)) {
483 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
484 &call->pollent);
485 }
486
487 if (call->is_client) {
488 grpc_core::channelz::ChannelNode* channelz_channel =
489 grpc_channel_get_channelz_node(call->channel);
490 if (channelz_channel != nullptr) {
491 channelz_channel->RecordCallStarted();
492 }
493 } else if (call->final_op.server.core_server != nullptr) {
494 grpc_core::channelz::ServerNode* channelz_node =
495 call->final_op.server.core_server->channelz_node();
496 if (channelz_node != nullptr) {
497 channelz_node->RecordCallStarted();
498 }
499 }
500
501 grpc_slice_unref_internal(path);
502
503 return error;
504 }
505
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)506 void grpc_call_set_completion_queue(grpc_call* call,
507 grpc_completion_queue* cq) {
508 GPR_ASSERT(cq);
509
510 if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
511 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
512 abort();
513 }
514 call->cq = cq;
515 GRPC_CQ_INTERNAL_REF(cq, "bind");
516 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
517 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
518 &call->pollent);
519 }
520
521 #ifndef NDEBUG
522 #define REF_REASON reason
523 #define REF_ARG , const char* reason
524 #else
525 #define REF_REASON ""
526 #define REF_ARG
527 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)528 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
529 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
530 }
grpc_call_internal_unref(grpc_call * c REF_ARG)531 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
532 GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
533 }
534
release_call(void * call,grpc_error_handle)535 static void release_call(void* call, grpc_error_handle /*error*/) {
536 grpc_call* c = static_cast<grpc_call*>(call);
537 grpc_channel* channel = c->channel;
538 grpc_core::Arena* arena = c->arena;
539 c->~grpc_call();
540 grpc_channel_update_call_size_estimate(channel, arena->Destroy());
541 GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
542 }
543
destroy_call(void * call,grpc_error_handle)544 static void destroy_call(void* call, grpc_error_handle /*error*/) {
545 GPR_TIMER_SCOPE("destroy_call", 0);
546 grpc_call* c = static_cast<grpc_call*>(call);
547 c->recv_initial_metadata.Clear();
548 c->recv_trailing_metadata.Clear();
549 c->receiving_stream.reset();
550 parent_call* pc = get_parent_call(c);
551 if (pc != nullptr) {
552 pc->~parent_call();
553 }
554 for (int i = 0; i < c->send_extra_metadata_count; i++) {
555 GRPC_MDELEM_UNREF(c->send_extra_metadata[i].md);
556 }
557 if (c->cq) {
558 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
559 }
560
561 grpc_error_handle status_error = c->status_error.get();
562 grpc_error_get_status(status_error, c->send_deadline,
563 &c->final_info.final_status, nullptr, nullptr,
564 &(c->final_info.error_string));
565 c->status_error.set(GRPC_ERROR_NONE);
566 c->final_info.stats.latency =
567 gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
568 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
569 GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
570 grpc_schedule_on_exec_ctx));
571 }
572
grpc_call_ref(grpc_call * c)573 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
574
grpc_call_unref(grpc_call * c)575 void grpc_call_unref(grpc_call* c) {
576 if (GPR_LIKELY(!c->ext_ref.Unref())) return;
577
578 GPR_TIMER_SCOPE("grpc_call_unref", 0);
579
580 child_call* cc = c->child;
581 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
582 grpc_core::ExecCtx exec_ctx;
583
584 GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
585
586 if (cc) {
587 parent_call* pc = get_parent_call(cc->parent);
588 gpr_mu_lock(&pc->child_list_mu);
589 if (c == pc->first_child) {
590 pc->first_child = cc->sibling_next;
591 if (c == pc->first_child) {
592 pc->first_child = nullptr;
593 }
594 }
595 cc->sibling_prev->child->sibling_next = cc->sibling_next;
596 cc->sibling_next->child->sibling_prev = cc->sibling_prev;
597 gpr_mu_unlock(&pc->child_list_mu);
598 GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
599 }
600
601 GPR_ASSERT(!c->destroy_called);
602 c->destroy_called = true;
603 bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
604 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
605 if (cancel) {
606 cancel_with_error(c, GRPC_ERROR_CANCELLED);
607 } else {
608 // Unset the call combiner cancellation closure. This has the
609 // effect of scheduling the previously set cancellation closure, if
610 // any, so that it can release any internal references it may be
611 // holding to the call stack.
612 c->call_combiner.SetNotifyOnCancel(nullptr);
613 }
614 GRPC_CALL_INTERNAL_UNREF(c, "destroy");
615 }
616
grpc_call_cancel(grpc_call * call,void * reserved)617 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
618 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
619 GPR_ASSERT(!reserved);
620 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
621 grpc_core::ExecCtx exec_ctx;
622 cancel_with_error(call, GRPC_ERROR_CANCELLED);
623 return GRPC_CALL_OK;
624 }
625
626 // This is called via the call combiner to start sending a batch down
627 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error_handle)628 static void execute_batch_in_call_combiner(void* arg,
629 grpc_error_handle /*ignored*/) {
630 GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
631 grpc_transport_stream_op_batch* batch =
632 static_cast<grpc_transport_stream_op_batch*>(arg);
633 grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
634 grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
635 GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
636 elem->filter->start_transport_stream_op_batch(elem, batch);
637 }
638
639 // start_batch_closure points to a caller-allocated closure to be used
640 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)641 static void execute_batch(grpc_call* call,
642 grpc_transport_stream_op_batch* batch,
643 grpc_closure* start_batch_closure) {
644 batch->handler_private.extra_arg = call;
645 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
646 grpc_schedule_on_exec_ctx);
647 GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
648 GRPC_ERROR_NONE, "executing batch");
649 }
650
grpc_call_get_peer(grpc_call * call)651 char* grpc_call_get_peer(grpc_call* call) {
652 char* peer_string =
653 reinterpret_cast<char*>(gpr_atm_acq_load(&call->peer_string));
654 if (peer_string != nullptr) return gpr_strdup(peer_string);
655 peer_string = grpc_channel_get_target(call->channel);
656 if (peer_string != nullptr) return peer_string;
657 return gpr_strdup("unknown");
658 }
659
grpc_call_from_top_element(grpc_call_element * surface_element)660 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
661 return CALL_FROM_TOP_ELEM(surface_element);
662 }
663
664 /*******************************************************************************
665 * CANCELLATION
666 */
667
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)668 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
669 grpc_status_code status,
670 const char* description,
671 void* reserved) {
672 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
673 grpc_core::ExecCtx exec_ctx;
674 GRPC_API_TRACE(
675 "grpc_call_cancel_with_status("
676 "c=%p, status=%d, description=%s, reserved=%p)",
677 4, (c, (int)status, description, reserved));
678 GPR_ASSERT(reserved == nullptr);
679 cancel_with_status(c, status, description);
680 return GRPC_CALL_OK;
681 }
682
683 struct cancel_state {
684 grpc_call* call;
685 grpc_closure start_batch;
686 grpc_closure finish_batch;
687 };
688 // The on_complete callback used when sending a cancel_stream batch down
689 // the filter stack. Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error_handle)690 static void done_termination(void* arg, grpc_error_handle /*error*/) {
691 cancel_state* state = static_cast<cancel_state*>(arg);
692 GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
693 "on_complete for cancel_stream op");
694 GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
695 gpr_free(state);
696 }
697
cancel_with_error(grpc_call * c,grpc_error_handle error)698 static void cancel_with_error(grpc_call* c, grpc_error_handle error) {
699 if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
700 GRPC_ERROR_UNREF(error);
701 return;
702 }
703 GRPC_CALL_INTERNAL_REF(c, "termination");
704 // Inform the call combiner of the cancellation, so that it can cancel
705 // any in-flight asynchronous actions that may be holding the call
706 // combiner. This ensures that the cancel_stream batch can be sent
707 // down the filter stack in a timely manner.
708 c->call_combiner.Cancel(GRPC_ERROR_REF(error));
709 cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
710 state->call = c;
711 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
712 grpc_schedule_on_exec_ctx);
713 grpc_transport_stream_op_batch* op =
714 grpc_make_transport_stream_op(&state->finish_batch);
715 op->cancel_stream = true;
716 op->payload->cancel_stream.cancel_error = error;
717 execute_batch(c, op, &state->start_batch);
718 }
719
grpc_call_cancel_internal(grpc_call * call)720 void grpc_call_cancel_internal(grpc_call* call) {
721 cancel_with_error(call, GRPC_ERROR_CANCELLED);
722 }
723
error_from_status(grpc_status_code status,const char * description)724 static grpc_error_handle error_from_status(grpc_status_code status,
725 const char* description) {
726 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
727 // guarantee that can be short-lived.
728 return grpc_error_set_int(
729 grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
730 GRPC_ERROR_STR_GRPC_MESSAGE, description),
731 GRPC_ERROR_INT_GRPC_STATUS, status);
732 }
733
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)734 static void cancel_with_status(grpc_call* c, grpc_status_code status,
735 const char* description) {
736 cancel_with_error(c, error_from_status(status, description));
737 }
738
set_final_status(grpc_call * call,grpc_error_handle error)739 static void set_final_status(grpc_call* call, grpc_error_handle error) {
740 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
741 gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
742 gpr_log(GPR_DEBUG, "%s", grpc_error_std_string(error).c_str());
743 }
744 if (call->is_client) {
745 std::string status_details;
746 grpc_error_get_status(error, call->send_deadline,
747 call->final_op.client.status, &status_details,
748 nullptr, call->final_op.client.error_string);
749 *call->final_op.client.status_details =
750 grpc_slice_from_cpp_string(std::move(status_details));
751 call->status_error.set(error);
752 GRPC_ERROR_UNREF(error);
753 grpc_core::channelz::ChannelNode* channelz_channel =
754 grpc_channel_get_channelz_node(call->channel);
755 if (channelz_channel != nullptr) {
756 if (*call->final_op.client.status != GRPC_STATUS_OK) {
757 channelz_channel->RecordCallFailed();
758 } else {
759 channelz_channel->RecordCallSucceeded();
760 }
761 }
762 } else {
763 *call->final_op.server.cancelled =
764 error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
765 grpc_core::channelz::ServerNode* channelz_node =
766 call->final_op.server.core_server->channelz_node();
767 if (channelz_node != nullptr) {
768 if (*call->final_op.server.cancelled || !call->status_error.ok()) {
769 channelz_node->RecordCallFailed();
770 } else {
771 channelz_node->RecordCallSucceeded();
772 }
773 }
774 GRPC_ERROR_UNREF(error);
775 }
776 }
777
778 /*******************************************************************************
779 * COMPRESSION
780 */
781
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)782 static void set_incoming_message_compression_algorithm(
783 grpc_call* call, grpc_message_compression_algorithm algo) {
784 GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
785 call->incoming_message_compression_algorithm = algo;
786 }
787
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)788 static void set_incoming_stream_compression_algorithm(
789 grpc_call* call, grpc_stream_compression_algorithm algo) {
790 GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
791 call->incoming_stream_compression_algorithm = algo;
792 }
793
grpc_call_test_only_get_compression_algorithm(grpc_call * call)794 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
795 grpc_call* call) {
796 grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
797 grpc_compression_algorithm_from_message_stream_compression_algorithm(
798 &algorithm, call->incoming_message_compression_algorithm,
799 call->incoming_stream_compression_algorithm);
800 return algorithm;
801 }
802
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)803 static grpc_compression_algorithm compression_algorithm_for_level_locked(
804 grpc_call* call, grpc_compression_level level) {
805 return grpc_compression_algorithm_for_level(level,
806 call->encodings_accepted_by_peer);
807 }
808
grpc_call_test_only_get_message_flags(grpc_call * call)809 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
810 uint32_t flags;
811 flags = call->test_only_last_message_flags;
812 return flags;
813 }
814
destroy_encodings_accepted_by_peer(void *)815 static void destroy_encodings_accepted_by_peer(void* /*p*/) {}
816
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)817 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
818 grpc_mdelem mdel,
819 uint32_t* encodings_accepted_by_peer,
820 bool stream_encoding) {
821 size_t i;
822 uint32_t algorithm;
823 grpc_slice_buffer accept_encoding_parts;
824 grpc_slice accept_encoding_slice;
825 void* accepted_user_data;
826
827 accepted_user_data =
828 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
829 if (accepted_user_data != nullptr) {
830 *encodings_accepted_by_peer = static_cast<uint32_t>(
831 reinterpret_cast<uintptr_t>(accepted_user_data) - 1);
832 return;
833 }
834
835 *encodings_accepted_by_peer = 0;
836
837 accept_encoding_slice = GRPC_MDVALUE(mdel);
838 grpc_slice_buffer_init(&accept_encoding_parts);
839 grpc_slice_split_without_space(accept_encoding_slice, ",",
840 &accept_encoding_parts);
841
842 grpc_core::SetBit(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
843 for (i = 0; i < accept_encoding_parts.count; i++) {
844 int r;
845 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
846 if (!stream_encoding) {
847 r = grpc_message_compression_algorithm_parse(
848 accept_encoding_entry_slice,
849 reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
850 } else {
851 r = grpc_stream_compression_algorithm_parse(
852 accept_encoding_entry_slice,
853 reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
854 }
855 if (r) {
856 grpc_core::SetBit(encodings_accepted_by_peer, algorithm);
857 } else {
858 char* accept_encoding_entry_str =
859 grpc_slice_to_c_string(accept_encoding_entry_slice);
860 gpr_log(GPR_DEBUG,
861 "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
862 accept_encoding_entry_str);
863 gpr_free(accept_encoding_entry_str);
864 }
865 }
866
867 grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
868
869 grpc_mdelem_set_user_data(
870 mdel, destroy_encodings_accepted_by_peer,
871 reinterpret_cast<void*>(
872 static_cast<uintptr_t>(*encodings_accepted_by_peer) + 1));
873 }
874
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)875 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
876 uint32_t encodings_accepted_by_peer;
877 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
878 return encodings_accepted_by_peer;
879 }
880
881 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)882 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
883 return call->incoming_stream_compression_algorithm;
884 }
885
linked_from_md(grpc_metadata * md)886 static grpc_linked_mdelem* linked_from_md(grpc_metadata* md) {
887 return reinterpret_cast<grpc_linked_mdelem*>(&md->internal_data);
888 }
889
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)890 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
891 grpc_metadata* additional_metadata, int i,
892 int count) {
893 grpc_metadata* res =
894 i < count ? &metadata[i] : &additional_metadata[i - count];
895 GPR_ASSERT(res);
896 return res;
897 }
898
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)899 static int prepare_application_metadata(grpc_call* call, int count,
900 grpc_metadata* metadata,
901 int is_trailing,
902 int prepend_extra_metadata,
903 grpc_metadata* additional_metadata,
904 int additional_metadata_count) {
905 int total_count = count + additional_metadata_count;
906 int i;
907 grpc_metadata_batch* batch = is_trailing ? &call->send_trailing_metadata
908 : &call->send_initial_metadata;
909 for (i = 0; i < total_count; i++) {
910 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
911 grpc_linked_mdelem* l = linked_from_md(md);
912 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
913 if (!GRPC_LOG_IF_ERROR("validate_metadata",
914 grpc_validate_header_key_is_legal(md->key))) {
915 break;
916 } else if (!grpc_is_binary_header_internal(md->key) &&
917 !GRPC_LOG_IF_ERROR(
918 "validate_metadata",
919 grpc_validate_header_nonbin_value_is_legal(md->value))) {
920 break;
921 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
922 // HTTP2 hpack encoding has a maximum limit.
923 break;
924 }
925 l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
926 }
927 if (i != total_count) {
928 for (int j = 0; j < i; j++) {
929 grpc_metadata* md = get_md_elem(metadata, additional_metadata, j, count);
930 grpc_linked_mdelem* l = linked_from_md(md);
931 GRPC_MDELEM_UNREF(l->md);
932 }
933 return 0;
934 }
935 if (prepend_extra_metadata) {
936 if (call->send_extra_metadata_count == 0) {
937 prepend_extra_metadata = 0;
938 } else {
939 for (i = 0; i < call->send_extra_metadata_count; i++) {
940 GRPC_LOG_IF_ERROR("prepare_application_metadata",
941 batch->LinkTail(&call->send_extra_metadata[i]));
942 }
943 }
944 }
945 for (i = 0; i < total_count; i++) {
946 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
947 grpc_linked_mdelem* l = linked_from_md(md);
948 grpc_error_handle error = batch->LinkTail(l);
949 if (error != GRPC_ERROR_NONE) {
950 GRPC_MDELEM_UNREF(l->md);
951 }
952 GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
953 }
954 call->send_extra_metadata_count = 0;
955
956 return 1;
957 }
958
decode_message_compression(grpc_mdelem md)959 static grpc_message_compression_algorithm decode_message_compression(
960 grpc_mdelem md) {
961 grpc_message_compression_algorithm algorithm =
962 grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
963 if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
964 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
965 gpr_log(GPR_ERROR,
966 "Invalid incoming message compression algorithm: '%s'. "
967 "Interpreting incoming data as uncompressed.",
968 md_c_str);
969 gpr_free(md_c_str);
970 return GRPC_MESSAGE_COMPRESS_NONE;
971 }
972 return algorithm;
973 }
974
decode_stream_compression(grpc_mdelem md)975 static grpc_stream_compression_algorithm decode_stream_compression(
976 grpc_mdelem md) {
977 grpc_stream_compression_algorithm algorithm =
978 grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
979 if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
980 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
981 gpr_log(GPR_ERROR,
982 "Invalid incoming stream compression algorithm: '%s'. Interpreting "
983 "incoming data as uncompressed.",
984 md_c_str);
985 gpr_free(md_c_str);
986 return GRPC_STREAM_COMPRESS_NONE;
987 }
988 return algorithm;
989 }
990
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)991 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
992 int is_trailing) {
993 if (b->non_deadline_count() == 0) return;
994 if (!call->is_client && is_trailing) return;
995 if (is_trailing && call->buffered_metadata[1] == nullptr) return;
996 GPR_TIMER_SCOPE("publish_app_metadata", 0);
997 grpc_metadata_array* dest;
998 grpc_metadata* mdusr;
999 dest = call->buffered_metadata[is_trailing];
1000 if (dest->count + b->non_deadline_count() > dest->capacity) {
1001 dest->capacity = std::max(dest->capacity + b->non_deadline_count(),
1002 dest->capacity * 3 / 2);
1003 dest->metadata = static_cast<grpc_metadata*>(
1004 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1005 }
1006 b->ForEach([&](grpc_mdelem md) {
1007 mdusr = &dest->metadata[dest->count++];
1008 /* we pass back borrowed slices that are valid whilst the call is valid */
1009 mdusr->key = GRPC_MDKEY(md);
1010 mdusr->value = GRPC_MDVALUE(md);
1011 });
1012 }
1013
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1014 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1015 if (b->legacy_index()->named.content_encoding != nullptr) {
1016 GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1017 set_incoming_stream_compression_algorithm(
1018 call, decode_stream_compression(
1019 b->legacy_index()->named.content_encoding->md));
1020 b->Remove(GRPC_BATCH_CONTENT_ENCODING);
1021 }
1022 if (b->legacy_index()->named.grpc_encoding != nullptr) {
1023 GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1024 set_incoming_message_compression_algorithm(
1025 call,
1026 decode_message_compression(b->legacy_index()->named.grpc_encoding->md));
1027 b->Remove(GRPC_BATCH_GRPC_ENCODING);
1028 }
1029 uint32_t message_encodings_accepted_by_peer = 1u;
1030 uint32_t stream_encodings_accepted_by_peer = 1u;
1031 if (b->legacy_index()->named.grpc_accept_encoding != nullptr) {
1032 GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1033 set_encodings_accepted_by_peer(
1034 call, b->legacy_index()->named.grpc_accept_encoding->md,
1035 &message_encodings_accepted_by_peer, false);
1036 b->Remove(GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1037 }
1038 if (b->legacy_index()->named.accept_encoding != nullptr) {
1039 GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1040 set_encodings_accepted_by_peer(call,
1041 b->legacy_index()->named.accept_encoding->md,
1042 &stream_encodings_accepted_by_peer, true);
1043 b->Remove(GRPC_BATCH_ACCEPT_ENCODING);
1044 }
1045 call->encodings_accepted_by_peer =
1046 grpc_compression_bitset_from_message_stream_compression_bitset(
1047 message_encodings_accepted_by_peer,
1048 stream_encodings_accepted_by_peer);
1049 publish_app_metadata(call, b, false);
1050 }
1051
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error_handle batch_error)1052 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1053 grpc_error_handle batch_error) {
1054 grpc_call* call = static_cast<grpc_call*>(args);
1055 if (batch_error != GRPC_ERROR_NONE) {
1056 set_final_status(call, batch_error);
1057 } else if (b->legacy_index()->named.grpc_status != nullptr) {
1058 grpc_status_code status_code = grpc_get_status_code_from_metadata(
1059 b->legacy_index()->named.grpc_status->md);
1060 grpc_error_handle error = GRPC_ERROR_NONE;
1061 if (status_code != GRPC_STATUS_OK) {
1062 char* peer = grpc_call_get_peer(call);
1063 error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
1064 "Error received from peer ", peer)),
1065 GRPC_ERROR_INT_GRPC_STATUS,
1066 static_cast<intptr_t>(status_code));
1067 gpr_free(peer);
1068 }
1069 if (b->legacy_index()->named.grpc_message != nullptr) {
1070 error = grpc_error_set_str(
1071 error, GRPC_ERROR_STR_GRPC_MESSAGE,
1072 grpc_core::StringViewFromSlice(
1073 GRPC_MDVALUE(b->legacy_index()->named.grpc_message->md)));
1074 b->Remove(GRPC_BATCH_GRPC_MESSAGE);
1075 } else if (error != GRPC_ERROR_NONE) {
1076 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, "");
1077 }
1078 set_final_status(call, GRPC_ERROR_REF(error));
1079 b->Remove(GRPC_BATCH_GRPC_STATUS);
1080 GRPC_ERROR_UNREF(error);
1081 } else if (!call->is_client) {
1082 set_final_status(call, GRPC_ERROR_NONE);
1083 } else {
1084 gpr_log(GPR_DEBUG,
1085 "Received trailing metadata with no error and no status");
1086 set_final_status(
1087 call, grpc_error_set_int(
1088 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1089 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1090 }
1091 publish_app_metadata(call, b, true);
1092 }
1093
grpc_call_get_arena(grpc_call * call)1094 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1095
grpc_call_get_call_stack(grpc_call * call)1096 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1097 return CALL_STACK_FROM_CALL(call);
1098 }
1099
1100 /*******************************************************************************
1101 * BATCH API IMPLEMENTATION
1102 */
1103
are_write_flags_valid(uint32_t flags)1104 static bool are_write_flags_valid(uint32_t flags) {
1105 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1106 const uint32_t allowed_write_positions =
1107 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1108 const uint32_t invalid_positions = ~allowed_write_positions;
1109 return !(flags & invalid_positions);
1110 }
1111
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1112 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1113 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1114 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1115 if (!is_client) {
1116 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1117 }
1118 return !(flags & invalid_positions);
1119 }
1120
batch_slot_for_op(grpc_op_type type)1121 static size_t batch_slot_for_op(grpc_op_type type) {
1122 switch (type) {
1123 case GRPC_OP_SEND_INITIAL_METADATA:
1124 return 0;
1125 case GRPC_OP_SEND_MESSAGE:
1126 return 1;
1127 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1128 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1129 return 2;
1130 case GRPC_OP_RECV_INITIAL_METADATA:
1131 return 3;
1132 case GRPC_OP_RECV_MESSAGE:
1133 return 4;
1134 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1135 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1136 return 5;
1137 }
1138 GPR_UNREACHABLE_CODE(return 123456789);
1139 }
1140
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1141 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1142 const grpc_op* ops) {
1143 size_t slot_idx = batch_slot_for_op(ops[0].op);
1144 batch_control** pslot = &call->active_batches[slot_idx];
1145 batch_control* bctl;
1146 if (*pslot != nullptr) {
1147 bctl = *pslot;
1148 if (bctl->call != nullptr) {
1149 return nullptr;
1150 }
1151 bctl->~batch_control();
1152 bctl->op = {};
1153 new (&bctl->batch_error) AtomicError();
1154 } else {
1155 bctl = call->arena->New<batch_control>();
1156 *pslot = bctl;
1157 }
1158 bctl->call = call;
1159 bctl->op.payload = &call->stream_op_payload;
1160 return bctl;
1161 }
1162
finish_batch_completion(void * user_data,grpc_cq_completion *)1163 static void finish_batch_completion(void* user_data,
1164 grpc_cq_completion* /*storage*/) {
1165 batch_control* bctl = static_cast<batch_control*>(user_data);
1166 grpc_call* call = bctl->call;
1167 bctl->call = nullptr;
1168 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1169 }
1170
reset_batch_errors(batch_control * bctl)1171 static void reset_batch_errors(batch_control* bctl) {
1172 bctl->batch_error.set(GRPC_ERROR_NONE);
1173 }
1174
post_batch_completion(batch_control * bctl)1175 static void post_batch_completion(batch_control* bctl) {
1176 grpc_call* next_child_call;
1177 grpc_call* call = bctl->call;
1178 grpc_error_handle error = GRPC_ERROR_REF(bctl->batch_error.get());
1179
1180 if (bctl->op.send_initial_metadata) {
1181 call->send_initial_metadata.Clear();
1182 }
1183 if (bctl->op.send_message) {
1184 if (bctl->op.payload->send_message.stream_write_closed &&
1185 error == GRPC_ERROR_NONE) {
1186 error = grpc_error_add_child(
1187 error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1188 "Attempt to send message after stream was closed."));
1189 }
1190 call->sending_message = false;
1191 }
1192 if (bctl->op.send_trailing_metadata) {
1193 call->send_trailing_metadata.Clear();
1194 }
1195 if (bctl->op.recv_trailing_metadata) {
1196 /* propagate cancellation to any interested children */
1197 gpr_atm_rel_store(&call->received_final_op_atm, 1);
1198 parent_call* pc = get_parent_call(call);
1199 if (pc != nullptr) {
1200 grpc_call* child;
1201 gpr_mu_lock(&pc->child_list_mu);
1202 child = pc->first_child;
1203 if (child != nullptr) {
1204 do {
1205 next_child_call = child->child->sibling_next;
1206 if (child->cancellation_is_inherited) {
1207 GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1208 cancel_with_error(child, GRPC_ERROR_CANCELLED);
1209 GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1210 }
1211 child = next_child_call;
1212 } while (child != pc->first_child);
1213 }
1214 gpr_mu_unlock(&pc->child_list_mu);
1215 }
1216 GRPC_ERROR_UNREF(error);
1217 error = GRPC_ERROR_NONE;
1218 }
1219 if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1220 *call->receiving_buffer != nullptr) {
1221 grpc_byte_buffer_destroy(*call->receiving_buffer);
1222 *call->receiving_buffer = nullptr;
1223 }
1224 reset_batch_errors(bctl);
1225
1226 if (bctl->completion_data.notify_tag.is_closure) {
1227 /* unrefs error */
1228 bctl->call = nullptr;
1229 grpc_core::Closure::Run(
1230 DEBUG_LOCATION,
1231 static_cast<grpc_closure*>(bctl->completion_data.notify_tag.tag),
1232 error);
1233 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1234 } else {
1235 /* unrefs error */
1236 grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1237 finish_batch_completion, bctl,
1238 &bctl->completion_data.cq_completion);
1239 }
1240 }
1241
finish_batch_step(batch_control * bctl)1242 static void finish_batch_step(batch_control* bctl) {
1243 if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1244 post_batch_completion(bctl);
1245 }
1246 }
1247
continue_receiving_slices(batch_control * bctl)1248 static void continue_receiving_slices(batch_control* bctl) {
1249 grpc_error_handle error;
1250 grpc_call* call = bctl->call;
1251 for (;;) {
1252 size_t remaining = call->receiving_stream->length() -
1253 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1254 if (remaining == 0) {
1255 call->receiving_message = false;
1256 call->receiving_stream.reset();
1257 finish_batch_step(bctl);
1258 return;
1259 }
1260 if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1261 error = call->receiving_stream->Pull(&call->receiving_slice);
1262 if (error == GRPC_ERROR_NONE) {
1263 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1264 call->receiving_slice);
1265 } else {
1266 call->receiving_stream.reset();
1267 grpc_byte_buffer_destroy(*call->receiving_buffer);
1268 *call->receiving_buffer = nullptr;
1269 call->receiving_message = false;
1270 finish_batch_step(bctl);
1271 GRPC_ERROR_UNREF(error);
1272 return;
1273 }
1274 } else {
1275 return;
1276 }
1277 }
1278 }
1279
receiving_slice_ready(void * bctlp,grpc_error_handle error)1280 static void receiving_slice_ready(void* bctlp, grpc_error_handle error) {
1281 batch_control* bctl = static_cast<batch_control*>(bctlp);
1282 grpc_call* call = bctl->call;
1283 bool release_error = false;
1284
1285 if (error == GRPC_ERROR_NONE) {
1286 grpc_slice slice;
1287 error = call->receiving_stream->Pull(&slice);
1288 if (error == GRPC_ERROR_NONE) {
1289 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1290 slice);
1291 continue_receiving_slices(bctl);
1292 } else {
1293 /* Error returned by ByteStream::Pull() needs to be released manually */
1294 release_error = true;
1295 }
1296 }
1297
1298 if (error != GRPC_ERROR_NONE) {
1299 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1300 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1301 }
1302 call->receiving_stream.reset();
1303 grpc_byte_buffer_destroy(*call->receiving_buffer);
1304 *call->receiving_buffer = nullptr;
1305 call->receiving_message = false;
1306 finish_batch_step(bctl);
1307 if (release_error) {
1308 GRPC_ERROR_UNREF(error);
1309 }
1310 }
1311 }
1312
process_data_after_md(batch_control * bctl)1313 static void process_data_after_md(batch_control* bctl) {
1314 grpc_call* call = bctl->call;
1315 if (call->receiving_stream == nullptr) {
1316 *call->receiving_buffer = nullptr;
1317 call->receiving_message = false;
1318 finish_batch_step(bctl);
1319 } else {
1320 call->test_only_last_message_flags = call->receiving_stream->flags();
1321 if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1322 (call->incoming_message_compression_algorithm >
1323 GRPC_MESSAGE_COMPRESS_NONE)) {
1324 grpc_compression_algorithm algo;
1325 GPR_ASSERT(
1326 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1327 &algo, call->incoming_message_compression_algorithm,
1328 (grpc_stream_compression_algorithm)0));
1329 *call->receiving_buffer =
1330 grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1331 } else {
1332 *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1333 }
1334 GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1335 grpc_schedule_on_exec_ctx);
1336 continue_receiving_slices(bctl);
1337 }
1338 }
1339
receiving_stream_ready(void * bctlp,grpc_error_handle error)1340 static void receiving_stream_ready(void* bctlp, grpc_error_handle error) {
1341 batch_control* bctl = static_cast<batch_control*>(bctlp);
1342 grpc_call* call = bctl->call;
1343 if (error != GRPC_ERROR_NONE) {
1344 call->receiving_stream.reset();
1345 if (bctl->batch_error.ok()) {
1346 bctl->batch_error.set(error);
1347 }
1348 cancel_with_error(call, GRPC_ERROR_REF(error));
1349 }
1350 /* If recv_state is RECV_NONE, we will save the batch_control
1351 * object with rel_cas, and will not use it after the cas. Its corresponding
1352 * acq_load is in receiving_initial_metadata_ready() */
1353 if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1354 !gpr_atm_rel_cas(&call->recv_state, RECV_NONE,
1355 reinterpret_cast<gpr_atm>(bctlp))) {
1356 process_data_after_md(bctl);
1357 }
1358 }
1359
1360 // The recv_message_ready callback used when sending a batch containing
1361 // a recv_message op down the filter stack. Yields the call combiner
1362 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error_handle error)1363 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1364 grpc_error_handle error) {
1365 batch_control* bctl = static_cast<batch_control*>(bctlp);
1366 grpc_call* call = bctl->call;
1367 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1368 receiving_stream_ready(bctlp, error);
1369 }
1370
1371 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1372 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1373 std::string error_msg = absl::StrFormat(
1374 "Incoming stream has both stream compression (%d) and message "
1375 "compression (%d).",
1376 call->incoming_stream_compression_algorithm,
1377 call->incoming_message_compression_algorithm);
1378 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1379 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1380 }
1381
1382 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1383 handle_error_parsing_compression_algorithm(grpc_call* call) {
1384 std::string error_msg = absl::StrFormat(
1385 "Error in incoming message compression (%d) or stream "
1386 "compression (%d).",
1387 call->incoming_stream_compression_algorithm,
1388 call->incoming_message_compression_algorithm);
1389 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1390 }
1391
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1392 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1393 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1394 std::string error_msg = absl::StrFormat(
1395 "Invalid compression algorithm value '%d'.", compression_algorithm);
1396 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1397 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1398 }
1399
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1400 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1401 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1402 const char* algo_name = nullptr;
1403 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1404 std::string error_msg =
1405 absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1406 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1407 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1408 }
1409
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1410 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1411 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1412 const char* algo_name = nullptr;
1413 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1414 gpr_log(GPR_ERROR,
1415 "Compression algorithm ('%s') not present in the bitset of "
1416 "accepted encodings ('0x%x')",
1417 algo_name, call->encodings_accepted_by_peer);
1418 }
1419
validate_filtered_metadata(batch_control * bctl)1420 static void validate_filtered_metadata(batch_control* bctl) {
1421 grpc_compression_algorithm compression_algorithm;
1422 grpc_call* call = bctl->call;
1423 if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1424 GRPC_STREAM_COMPRESS_NONE &&
1425 call->incoming_message_compression_algorithm !=
1426 GRPC_MESSAGE_COMPRESS_NONE)) {
1427 handle_both_stream_and_msg_compression_set(call);
1428 } else if (
1429 GPR_UNLIKELY(
1430 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1431 &compression_algorithm,
1432 call->incoming_message_compression_algorithm,
1433 call->incoming_stream_compression_algorithm) == 0)) {
1434 handle_error_parsing_compression_algorithm(call);
1435 } else {
1436 const grpc_compression_options compression_options =
1437 grpc_channel_compression_options(call->channel);
1438 if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1439 handle_invalid_compression(call, compression_algorithm);
1440 } else if (GPR_UNLIKELY(
1441 grpc_compression_options_is_algorithm_enabled_internal(
1442 &compression_options, compression_algorithm) == 0)) {
1443 /* check if algorithm is supported by current channel config */
1444 handle_compression_algorithm_disabled(call, compression_algorithm);
1445 }
1446 /* GRPC_COMPRESS_NONE is always set. */
1447 GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1448 if (GPR_UNLIKELY(!grpc_core::GetBit(call->encodings_accepted_by_peer,
1449 compression_algorithm))) {
1450 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1451 handle_compression_algorithm_not_accepted(call, compression_algorithm);
1452 }
1453 }
1454 }
1455 }
1456
receiving_initial_metadata_ready(void * bctlp,grpc_error_handle error)1457 static void receiving_initial_metadata_ready(void* bctlp,
1458 grpc_error_handle error) {
1459 batch_control* bctl = static_cast<batch_control*>(bctlp);
1460 grpc_call* call = bctl->call;
1461
1462 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1463
1464 if (error == GRPC_ERROR_NONE) {
1465 grpc_metadata_batch* md = &call->recv_initial_metadata;
1466 recv_initial_filter(call, md);
1467
1468 /* TODO(ctiller): this could be moved into recv_initial_filter now */
1469 GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1470 validate_filtered_metadata(bctl);
1471
1472 absl::optional<grpc_millis> deadline =
1473 md->get(grpc_core::GrpcTimeoutMetadata());
1474 if (deadline.has_value() && !call->is_client) {
1475 call->send_deadline = *deadline;
1476 }
1477 } else {
1478 if (bctl->batch_error.ok()) {
1479 bctl->batch_error.set(error);
1480 }
1481 cancel_with_error(call, GRPC_ERROR_REF(error));
1482 }
1483
1484 grpc_closure* saved_rsr_closure = nullptr;
1485 while (true) {
1486 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1487 /* Should only receive initial metadata once */
1488 GPR_ASSERT(rsr_bctlp != 1);
1489 if (rsr_bctlp == 0) {
1490 /* We haven't seen initial metadata and messages before, thus initial
1491 * metadata is received first.
1492 * no_barrier_cas is used, as this function won't access the batch_control
1493 * object saved by receiving_stream_ready() if the initial metadata is
1494 * received first. */
1495 if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1496 RECV_INITIAL_METADATA_FIRST)) {
1497 break;
1498 }
1499 } else {
1500 /* Already received messages */
1501 saved_rsr_closure =
1502 GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1503 grpc_schedule_on_exec_ctx);
1504 /* No need to modify recv_state */
1505 break;
1506 }
1507 }
1508 if (saved_rsr_closure != nullptr) {
1509 grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1510 GRPC_ERROR_REF(error));
1511 }
1512
1513 finish_batch_step(bctl);
1514 }
1515
receiving_trailing_metadata_ready(void * bctlp,grpc_error_handle error)1516 static void receiving_trailing_metadata_ready(void* bctlp,
1517 grpc_error_handle error) {
1518 batch_control* bctl = static_cast<batch_control*>(bctlp);
1519 grpc_call* call = bctl->call;
1520 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1521 grpc_metadata_batch* md = &call->recv_trailing_metadata;
1522 recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1523 finish_batch_step(bctl);
1524 }
1525
finish_batch(void * bctlp,grpc_error_handle error)1526 static void finish_batch(void* bctlp, grpc_error_handle error) {
1527 batch_control* bctl = static_cast<batch_control*>(bctlp);
1528 grpc_call* call = bctl->call;
1529 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1530 if (bctl->batch_error.ok()) {
1531 bctl->batch_error.set(error);
1532 }
1533 if (error != GRPC_ERROR_NONE) {
1534 cancel_with_error(call, GRPC_ERROR_REF(error));
1535 }
1536 finish_batch_step(bctl);
1537 }
1538
free_no_op_completion(void *,grpc_cq_completion * completion)1539 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1540 gpr_free(completion);
1541 }
1542
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1543 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1544 size_t nops, void* notify_tag,
1545 int is_notify_tag_closure) {
1546 GPR_TIMER_SCOPE("call_start_batch", 0);
1547
1548 size_t i;
1549 const grpc_op* op;
1550 batch_control* bctl;
1551 bool has_send_ops = false;
1552 int num_recv_ops = 0;
1553 grpc_call_error error = GRPC_CALL_OK;
1554 grpc_transport_stream_op_batch* stream_op;
1555 grpc_transport_stream_op_batch_payload* stream_op_payload;
1556
1557 GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1558
1559 if (nops == 0) {
1560 if (!is_notify_tag_closure) {
1561 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1562 grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1563 free_no_op_completion, nullptr,
1564 static_cast<grpc_cq_completion*>(
1565 gpr_malloc(sizeof(grpc_cq_completion))));
1566 } else {
1567 grpc_core::Closure::Run(DEBUG_LOCATION,
1568 static_cast<grpc_closure*>(notify_tag),
1569 GRPC_ERROR_NONE);
1570 }
1571 error = GRPC_CALL_OK;
1572 goto done;
1573 }
1574
1575 bctl = reuse_or_allocate_batch_control(call, ops);
1576 if (bctl == nullptr) {
1577 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1578 }
1579 bctl->completion_data.notify_tag.tag = notify_tag;
1580 bctl->completion_data.notify_tag.is_closure =
1581 static_cast<uint8_t>(is_notify_tag_closure != 0);
1582
1583 stream_op = &bctl->op;
1584 stream_op_payload = &call->stream_op_payload;
1585
1586 /* rewrite batch ops into a transport op */
1587 for (i = 0; i < nops; i++) {
1588 op = &ops[i];
1589 if (op->reserved != nullptr) {
1590 error = GRPC_CALL_ERROR;
1591 goto done_with_error;
1592 }
1593 switch (op->op) {
1594 case GRPC_OP_SEND_INITIAL_METADATA: {
1595 /* Flag validation: currently allow no flags */
1596 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1597 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1598 goto done_with_error;
1599 }
1600 if (call->sent_initial_metadata) {
1601 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1602 goto done_with_error;
1603 }
1604 // TODO(juanlishen): If the user has already specified a compression
1605 // algorithm by setting the initial metadata with key of
1606 // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1607 // with the compression algorithm mapped from compression level.
1608 /* process compression level */
1609 grpc_metadata& compression_md = call->compression_md;
1610 compression_md.key = grpc_empty_slice();
1611 compression_md.value = grpc_empty_slice();
1612 size_t additional_metadata_count = 0;
1613 grpc_compression_level effective_compression_level =
1614 GRPC_COMPRESS_LEVEL_NONE;
1615 bool level_set = false;
1616 if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1617 effective_compression_level =
1618 op->data.send_initial_metadata.maybe_compression_level.level;
1619 level_set = true;
1620 } else {
1621 const grpc_compression_options copts =
1622 grpc_channel_compression_options(call->channel);
1623 if (copts.default_level.is_set) {
1624 level_set = true;
1625 effective_compression_level = copts.default_level.level;
1626 }
1627 }
1628 // Currently, only server side supports compression level setting.
1629 if (level_set && !call->is_client) {
1630 const grpc_compression_algorithm calgo =
1631 compression_algorithm_for_level_locked(
1632 call, effective_compression_level);
1633 // The following metadata will be checked and removed by the message
1634 // compression filter. It will be used as the call's compression
1635 // algorithm.
1636 compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1637 compression_md.value = grpc_compression_algorithm_slice(calgo);
1638 additional_metadata_count++;
1639 }
1640 if (op->data.send_initial_metadata.count + additional_metadata_count >
1641 INT_MAX) {
1642 error = GRPC_CALL_ERROR_INVALID_METADATA;
1643 goto done_with_error;
1644 }
1645 stream_op->send_initial_metadata = true;
1646 call->sent_initial_metadata = true;
1647 if (!prepare_application_metadata(
1648 call, static_cast<int>(op->data.send_initial_metadata.count),
1649 op->data.send_initial_metadata.metadata, 0, call->is_client,
1650 &compression_md, static_cast<int>(additional_metadata_count))) {
1651 error = GRPC_CALL_ERROR_INVALID_METADATA;
1652 goto done_with_error;
1653 }
1654 /* TODO(ctiller): just make these the same variable? */
1655 if (call->is_client && call->send_deadline != GRPC_MILLIS_INF_FUTURE) {
1656 call->send_initial_metadata.Set(grpc_core::GrpcTimeoutMetadata(),
1657 call->send_deadline);
1658 }
1659 stream_op_payload->send_initial_metadata.send_initial_metadata =
1660 &call->send_initial_metadata;
1661 stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1662 op->flags;
1663 if (call->is_client) {
1664 stream_op_payload->send_initial_metadata.peer_string =
1665 &call->peer_string;
1666 }
1667 has_send_ops = true;
1668 break;
1669 }
1670 case GRPC_OP_SEND_MESSAGE: {
1671 if (!are_write_flags_valid(op->flags)) {
1672 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1673 goto done_with_error;
1674 }
1675 if (op->data.send_message.send_message == nullptr) {
1676 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1677 goto done_with_error;
1678 }
1679 if (call->sending_message) {
1680 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1681 goto done_with_error;
1682 }
1683 uint32_t flags = op->flags;
1684 /* If the outgoing buffer is already compressed, mark it as so in the
1685 flags. These will be picked up by the compression filter and further
1686 (wasteful) attempts at compression skipped. */
1687 if (op->data.send_message.send_message->data.raw.compression >
1688 GRPC_COMPRESS_NONE) {
1689 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1690 }
1691 stream_op->send_message = true;
1692 call->sending_message = true;
1693 call->sending_stream.Init(
1694 &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1695 stream_op_payload->send_message.send_message.reset(
1696 call->sending_stream.get());
1697 has_send_ops = true;
1698 break;
1699 }
1700 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1701 /* Flag validation: currently allow no flags */
1702 if (op->flags != 0) {
1703 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1704 goto done_with_error;
1705 }
1706 if (!call->is_client) {
1707 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1708 goto done_with_error;
1709 }
1710 if (call->sent_final_op) {
1711 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1712 goto done_with_error;
1713 }
1714 stream_op->send_trailing_metadata = true;
1715 call->sent_final_op = true;
1716 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1717 &call->send_trailing_metadata;
1718 has_send_ops = true;
1719 break;
1720 }
1721 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1722 /* Flag validation: currently allow no flags */
1723 if (op->flags != 0) {
1724 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1725 goto done_with_error;
1726 }
1727 if (call->is_client) {
1728 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1729 goto done_with_error;
1730 }
1731 if (call->sent_final_op) {
1732 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1733 goto done_with_error;
1734 }
1735 if (op->data.send_status_from_server.trailing_metadata_count >
1736 INT_MAX) {
1737 error = GRPC_CALL_ERROR_INVALID_METADATA;
1738 goto done_with_error;
1739 }
1740 stream_op->send_trailing_metadata = true;
1741 call->sent_final_op = true;
1742 GPR_ASSERT(call->send_extra_metadata_count == 0);
1743 call->send_extra_metadata_count = 1;
1744 call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1745 op->data.send_status_from_server.status);
1746 grpc_error_handle status_error =
1747 op->data.send_status_from_server.status == GRPC_STATUS_OK
1748 ? GRPC_ERROR_NONE
1749 : grpc_error_set_int(
1750 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1751 "Server returned error"),
1752 GRPC_ERROR_INT_GRPC_STATUS,
1753 static_cast<intptr_t>(
1754 op->data.send_status_from_server.status));
1755 if (op->data.send_status_from_server.status_details != nullptr) {
1756 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1757 GRPC_MDSTR_GRPC_MESSAGE,
1758 grpc_slice_copy(
1759 *op->data.send_status_from_server.status_details));
1760 call->send_extra_metadata_count++;
1761 if (status_error != GRPC_ERROR_NONE) {
1762 char* msg = grpc_slice_to_c_string(
1763 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1764 status_error = grpc_error_set_str(status_error,
1765 GRPC_ERROR_STR_GRPC_MESSAGE, msg);
1766 gpr_free(msg);
1767 }
1768 }
1769
1770 call->status_error.set(status_error);
1771 GRPC_ERROR_UNREF(status_error);
1772
1773 if (!prepare_application_metadata(
1774 call,
1775 static_cast<int>(
1776 op->data.send_status_from_server.trailing_metadata_count),
1777 op->data.send_status_from_server.trailing_metadata, 1, 1,
1778 nullptr, 0)) {
1779 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1780 GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1781 }
1782 call->send_extra_metadata_count = 0;
1783 error = GRPC_CALL_ERROR_INVALID_METADATA;
1784 goto done_with_error;
1785 }
1786 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1787 &call->send_trailing_metadata;
1788 stream_op_payload->send_trailing_metadata.sent =
1789 &call->sent_server_trailing_metadata;
1790 has_send_ops = true;
1791 break;
1792 }
1793 case GRPC_OP_RECV_INITIAL_METADATA: {
1794 /* Flag validation: currently allow no flags */
1795 if (op->flags != 0) {
1796 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1797 goto done_with_error;
1798 }
1799 if (call->received_initial_metadata) {
1800 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1801 goto done_with_error;
1802 }
1803 call->received_initial_metadata = true;
1804 call->buffered_metadata[0] =
1805 op->data.recv_initial_metadata.recv_initial_metadata;
1806 GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1807 receiving_initial_metadata_ready, bctl,
1808 grpc_schedule_on_exec_ctx);
1809 stream_op->recv_initial_metadata = true;
1810 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1811 &call->recv_initial_metadata;
1812 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1813 &call->receiving_initial_metadata_ready;
1814 if (call->is_client) {
1815 stream_op_payload->recv_initial_metadata.trailing_metadata_available =
1816 &call->is_trailers_only;
1817 } else {
1818 stream_op_payload->recv_initial_metadata.peer_string =
1819 &call->peer_string;
1820 }
1821 ++num_recv_ops;
1822 break;
1823 }
1824 case GRPC_OP_RECV_MESSAGE: {
1825 /* Flag validation: currently allow no flags */
1826 if (op->flags != 0) {
1827 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1828 goto done_with_error;
1829 }
1830 if (call->receiving_message) {
1831 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1832 goto done_with_error;
1833 }
1834 call->receiving_message = true;
1835 stream_op->recv_message = true;
1836 call->receiving_buffer = op->data.recv_message.recv_message;
1837 stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1838 stream_op_payload->recv_message.call_failed_before_recv_message =
1839 &call->call_failed_before_recv_message;
1840 GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1841 receiving_stream_ready_in_call_combiner, bctl,
1842 grpc_schedule_on_exec_ctx);
1843 stream_op_payload->recv_message.recv_message_ready =
1844 &call->receiving_stream_ready;
1845 ++num_recv_ops;
1846 break;
1847 }
1848 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1849 /* Flag validation: currently allow no flags */
1850 if (op->flags != 0) {
1851 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1852 goto done_with_error;
1853 }
1854 if (!call->is_client) {
1855 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1856 goto done_with_error;
1857 }
1858 if (call->requested_final_op) {
1859 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1860 goto done_with_error;
1861 }
1862 call->requested_final_op = true;
1863 call->buffered_metadata[1] =
1864 op->data.recv_status_on_client.trailing_metadata;
1865 call->final_op.client.status = op->data.recv_status_on_client.status;
1866 call->final_op.client.status_details =
1867 op->data.recv_status_on_client.status_details;
1868 call->final_op.client.error_string =
1869 op->data.recv_status_on_client.error_string;
1870 stream_op->recv_trailing_metadata = true;
1871 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1872 &call->recv_trailing_metadata;
1873 stream_op_payload->recv_trailing_metadata.collect_stats =
1874 &call->final_info.stats.transport_stream_stats;
1875 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1876 receiving_trailing_metadata_ready, bctl,
1877 grpc_schedule_on_exec_ctx);
1878 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1879 &call->receiving_trailing_metadata_ready;
1880 ++num_recv_ops;
1881 break;
1882 }
1883 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1884 /* Flag validation: currently allow no flags */
1885 if (op->flags != 0) {
1886 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1887 goto done_with_error;
1888 }
1889 if (call->is_client) {
1890 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1891 goto done_with_error;
1892 }
1893 if (call->requested_final_op) {
1894 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1895 goto done_with_error;
1896 }
1897 call->requested_final_op = true;
1898 call->final_op.server.cancelled =
1899 op->data.recv_close_on_server.cancelled;
1900 stream_op->recv_trailing_metadata = true;
1901 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1902 &call->recv_trailing_metadata;
1903 stream_op_payload->recv_trailing_metadata.collect_stats =
1904 &call->final_info.stats.transport_stream_stats;
1905 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1906 receiving_trailing_metadata_ready, bctl,
1907 grpc_schedule_on_exec_ctx);
1908 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1909 &call->receiving_trailing_metadata_ready;
1910 ++num_recv_ops;
1911 break;
1912 }
1913 }
1914 }
1915
1916 GRPC_CALL_INTERNAL_REF(call, "completion");
1917 if (!is_notify_tag_closure) {
1918 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1919 }
1920 bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1921
1922 if (has_send_ops) {
1923 GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1924 grpc_schedule_on_exec_ctx);
1925 stream_op->on_complete = &bctl->finish_batch;
1926 }
1927
1928 gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1929 execute_batch(call, stream_op, &bctl->start_batch);
1930
1931 done:
1932 return error;
1933
1934 done_with_error:
1935 /* reverse any mutations that occurred */
1936 if (stream_op->send_initial_metadata) {
1937 call->sent_initial_metadata = false;
1938 call->send_initial_metadata.Clear();
1939 }
1940 if (stream_op->send_message) {
1941 call->sending_message = false;
1942 call->sending_stream->Orphan();
1943 stream_op_payload->send_message.send_message.reset();
1944 }
1945 if (stream_op->send_trailing_metadata) {
1946 call->sent_final_op = false;
1947 call->send_trailing_metadata.Clear();
1948 }
1949 if (stream_op->recv_initial_metadata) {
1950 call->received_initial_metadata = false;
1951 }
1952 if (stream_op->recv_message) {
1953 call->receiving_message = false;
1954 }
1955 if (stream_op->recv_trailing_metadata) {
1956 call->requested_final_op = false;
1957 }
1958 goto done;
1959 }
1960
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1961 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1962 size_t nops, void* tag, void* reserved) {
1963 grpc_call_error err;
1964
1965 GRPC_API_TRACE(
1966 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1967 "reserved=%p)",
1968 5, (call, ops, (unsigned long)nops, tag, reserved));
1969
1970 if (reserved != nullptr) {
1971 err = GRPC_CALL_ERROR;
1972 } else {
1973 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1974 grpc_core::ExecCtx exec_ctx;
1975 err = call_start_batch(call, ops, nops, tag, 0);
1976 }
1977
1978 return err;
1979 }
1980
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1981 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1982 const grpc_op* ops,
1983 size_t nops,
1984 grpc_closure* closure) {
1985 return call_start_batch(call, ops, nops, closure, 1);
1986 }
1987
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1988 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1989 void* value, void (*destroy)(void* value)) {
1990 if (call->context[elem].destroy) {
1991 call->context[elem].destroy(call->context[elem].value);
1992 }
1993 call->context[elem].value = value;
1994 call->context[elem].destroy = destroy;
1995 }
1996
grpc_call_context_get(grpc_call * call,grpc_context_index elem)1997 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1998 return call->context[elem].value;
1999 }
2000
grpc_call_is_client(grpc_call * call)2001 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2002
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2003 grpc_compression_algorithm grpc_call_compression_for_level(
2004 grpc_call* call, grpc_compression_level level) {
2005 grpc_compression_algorithm algo =
2006 compression_algorithm_for_level_locked(call, level);
2007 return algo;
2008 }
2009
grpc_call_is_trailers_only(const grpc_call * call)2010 bool grpc_call_is_trailers_only(const grpc_call* call) {
2011 bool result = call->is_trailers_only;
2012 GPR_DEBUG_ASSERT(!result || call->recv_initial_metadata.empty());
2013 return result;
2014 }
2015
grpc_call_failed_before_recv_message(const grpc_call * c)2016 int grpc_call_failed_before_recv_message(const grpc_call* c) {
2017 return c->call_failed_before_recv_message;
2018 }
2019
grpc_call_error_to_string(grpc_call_error error)2020 const char* grpc_call_error_to_string(grpc_call_error error) {
2021 switch (error) {
2022 case GRPC_CALL_ERROR:
2023 return "GRPC_CALL_ERROR";
2024 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2025 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2026 case GRPC_CALL_ERROR_ALREADY_FINISHED:
2027 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2028 case GRPC_CALL_ERROR_ALREADY_INVOKED:
2029 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2030 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2031 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2032 case GRPC_CALL_ERROR_INVALID_FLAGS:
2033 return "GRPC_CALL_ERROR_INVALID_FLAGS";
2034 case GRPC_CALL_ERROR_INVALID_MESSAGE:
2035 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2036 case GRPC_CALL_ERROR_INVALID_METADATA:
2037 return "GRPC_CALL_ERROR_INVALID_METADATA";
2038 case GRPC_CALL_ERROR_NOT_INVOKED:
2039 return "GRPC_CALL_ERROR_NOT_INVOKED";
2040 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2041 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2042 case GRPC_CALL_ERROR_NOT_ON_SERVER:
2043 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2044 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2045 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2046 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2047 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2048 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2049 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2050 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2051 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2052 case GRPC_CALL_OK:
2053 return "GRPC_CALL_OK";
2054 }
2055 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2056 }
2057