1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/client_channel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
36 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
39 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
42 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/service_config.h"
45 #include "src/core/ext/filters/client_channel/subchannel.h"
46 #include "src/core/ext/filters/deadline/deadline_filter.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/connected_channel.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/gpr/string.h"
52 #include "src/core/lib/gprpp/inlined_vector.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/gprpp/map.h"
55 #include "src/core/lib/gprpp/sync.h"
56 #include "src/core/lib/iomgr/combiner.h"
57 #include "src/core/lib/iomgr/iomgr.h"
58 #include "src/core/lib/iomgr/polling_entity.h"
59 #include "src/core/lib/profiling/timers.h"
60 #include "src/core/lib/slice/slice_internal.h"
61 #include "src/core/lib/slice/slice_string_helpers.h"
62 #include "src/core/lib/surface/channel.h"
63 #include "src/core/lib/transport/connectivity_state.h"
64 #include "src/core/lib/transport/error_utils.h"
65 #include "src/core/lib/transport/metadata.h"
66 #include "src/core/lib/transport/metadata_batch.h"
67 #include "src/core/lib/transport/static_metadata.h"
68 #include "src/core/lib/transport/status_metadata.h"
69
70 using grpc_core::internal::ClientChannelMethodParsedConfig;
71 using grpc_core::internal::ServerRetryThrottleData;
72
73 //
74 // Client channel filter
75 //
76
77 // By default, we buffer 256 KiB per RPC for retries.
78 // TODO(roth): Do we have any data to suggest a better value?
79 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
80
81 // This value was picked arbitrarily. It can be changed if there is
82 // any even moderately compelling reason to do so.
83 #define RETRY_BACKOFF_JITTER 0.2
84
85 // Max number of batches that can be pending on a call at any given
86 // time. This includes one batch for each of the following ops:
87 // recv_initial_metadata
88 // send_initial_metadata
89 // recv_message
90 // send_message
91 // recv_trailing_metadata
92 // send_trailing_metadata
93 #define MAX_PENDING_BATCHES 6
94
95 namespace grpc_core {
96
97 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
98 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
99
100 namespace {
101
102 //
103 // ChannelData definition
104 //
105
106 class ChannelData {
107 public:
108 struct QueuedPick {
109 grpc_call_element* elem;
110 QueuedPick* next = nullptr;
111 };
112
113 static grpc_error* Init(grpc_channel_element* elem,
114 grpc_channel_element_args* args);
115 static void Destroy(grpc_channel_element* elem);
116 static void StartTransportOp(grpc_channel_element* elem,
117 grpc_transport_op* op);
118 static void GetChannelInfo(grpc_channel_element* elem,
119 const grpc_channel_info* info);
120
deadline_checking_enabled() const121 bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
enable_retries() const122 bool enable_retries() const { return enable_retries_; }
per_rpc_retry_buffer_size() const123 size_t per_rpc_retry_buffer_size() const {
124 return per_rpc_retry_buffer_size_;
125 }
126
127 // Note: Does NOT return a new ref.
disconnect_error() const128 grpc_error* disconnect_error() const {
129 return disconnect_error_.Load(MemoryOrder::ACQUIRE);
130 }
131
data_plane_combiner() const132 grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; }
133
picker() const134 LoadBalancingPolicy::SubchannelPicker* picker() const {
135 return picker_.get();
136 }
137 void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
138 void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
139
received_service_config_data() const140 bool received_service_config_data() const {
141 return received_service_config_data_;
142 }
retry_throttle_data() const143 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
144 return retry_throttle_data_;
145 }
service_config() const146 RefCountedPtr<ServiceConfig> service_config() const {
147 return service_config_;
148 }
149
150 grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
AddExternalConnectivityWatcher(grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)151 void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
152 grpc_connectivity_state* state,
153 grpc_closure* on_complete,
154 grpc_closure* watcher_timer_init) {
155 // Will delete itself.
156 New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
157 watcher_timer_init);
158 }
NumExternalConnectivityWatchers() const159 int NumExternalConnectivityWatchers() const {
160 return external_connectivity_watcher_list_.size();
161 }
162
163 private:
164 class ConnectivityStateAndPickerSetter;
165 class ServiceConfigSetter;
166 class GrpcSubchannel;
167 class ClientChannelControlHelper;
168
169 class ExternalConnectivityWatcher {
170 public:
171 class WatcherList {
172 public:
WatcherList()173 WatcherList() { gpr_mu_init(&mu_); }
~WatcherList()174 ~WatcherList() { gpr_mu_destroy(&mu_); }
175
176 int size() const;
177 ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
178 void Add(ExternalConnectivityWatcher* watcher);
179 void Remove(const ExternalConnectivityWatcher* watcher);
180
181 private:
182 // head_ is guarded by a mutex, since the size() method needs to
183 // iterate over the list, and it's called from the C-core API
184 // function grpc_channel_num_external_connectivity_watchers(), which
185 // is synchronous and therefore cannot run in the combiner.
186 mutable gpr_mu mu_;
187 ExternalConnectivityWatcher* head_ = nullptr;
188 };
189
190 ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
191 grpc_connectivity_state* state,
192 grpc_closure* on_complete,
193 grpc_closure* watcher_timer_init);
194
195 ~ExternalConnectivityWatcher();
196
197 private:
198 static void OnWatchCompleteLocked(void* arg, grpc_error* error);
199 static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
200
201 ChannelData* chand_;
202 grpc_polling_entity pollent_;
203 grpc_connectivity_state* state_;
204 grpc_closure* on_complete_;
205 grpc_closure* watcher_timer_init_;
206 grpc_closure my_closure_;
207 ExternalConnectivityWatcher* next_ = nullptr;
208 };
209
210 ChannelData(grpc_channel_element_args* args, grpc_error** error);
211 ~ChannelData();
212
213 void CreateResolvingLoadBalancingPolicyLocked();
214
215 void DestroyResolvingLoadBalancingPolicyLocked();
216
217 static bool ProcessResolverResultLocked(
218 void* arg, const Resolver::Result& result, const char** lb_policy_name,
219 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
220 grpc_error** service_config_error);
221
222 grpc_error* DoPingLocked(grpc_transport_op* op);
223
224 static void StartTransportOpLocked(void* arg, grpc_error* ignored);
225
226 static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
227
228 void ProcessLbPolicy(
229 const Resolver::Result& resolver_result,
230 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
231 UniquePtr<char>* lb_policy_name,
232 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
233
234 //
235 // Fields set at construction and never modified.
236 //
237 const bool deadline_checking_enabled_;
238 const bool enable_retries_;
239 const size_t per_rpc_retry_buffer_size_;
240 grpc_channel_stack* owning_stack_;
241 ClientChannelFactory* client_channel_factory_;
242 const grpc_channel_args* channel_args_;
243 RefCountedPtr<ServiceConfig> default_service_config_;
244 UniquePtr<char> server_name_;
245 UniquePtr<char> target_uri_;
246 channelz::ChannelNode* channelz_node_;
247
248 //
249 // Fields used in the data plane. Guarded by data_plane_combiner.
250 //
251 grpc_combiner* data_plane_combiner_;
252 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
253 QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
254 // Data from service config.
255 bool received_service_config_data_ = false;
256 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
257 RefCountedPtr<ServiceConfig> service_config_;
258
259 //
260 // Fields used in the control plane. Guarded by combiner.
261 //
262 grpc_combiner* combiner_;
263 grpc_pollset_set* interested_parties_;
264 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
265 OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
266 grpc_connectivity_state_tracker state_tracker_;
267 ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
268 UniquePtr<char> health_check_service_name_;
269 RefCountedPtr<ServiceConfig> saved_service_config_;
270 bool received_first_resolver_result_ = false;
271 Map<Subchannel*, int> subchannel_refcount_map_;
272
273 //
274 // Fields accessed from both data plane and control plane combiners.
275 //
276 Atomic<grpc_error*> disconnect_error_;
277
278 //
279 // Fields guarded by a mutex, since they need to be accessed
280 // synchronously via get_channel_info().
281 //
282 gpr_mu info_mu_;
283 UniquePtr<char> info_lb_policy_name_;
284 UniquePtr<char> info_service_config_json_;
285 };
286
287 //
288 // CallData definition
289 //
290
291 class CallData {
292 public:
293 static grpc_error* Init(grpc_call_element* elem,
294 const grpc_call_element_args* args);
295 static void Destroy(grpc_call_element* elem,
296 const grpc_call_final_info* final_info,
297 grpc_closure* then_schedule_closure);
298 static void StartTransportStreamOpBatch(
299 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
300 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
301
subchannel_call()302 RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
303
304 // Invoked by channel for queued picks once resolver results are available.
305 void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
306
307 // Invoked by channel for queued picks when the picker is updated.
308 static void StartPickLocked(void* arg, grpc_error* error);
309
310 private:
311 class QueuedPickCanceller;
312
313 class LbCallState : public LoadBalancingPolicy::CallState {
314 public:
LbCallState(CallData * calld)315 explicit LbCallState(CallData* calld) : calld_(calld) {}
316
Alloc(size_t size)317 void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); }
318
319 private:
320 CallData* calld_;
321 };
322
323 // State used for starting a retryable batch on a subchannel call.
324 // This provides its own grpc_transport_stream_op_batch and other data
325 // structures needed to populate the ops in the batch.
326 // We allocate one struct on the arena for each attempt at starting a
327 // batch on a given subchannel call.
328 struct SubchannelCallBatchData {
329 // Creates a SubchannelCallBatchData object on the call's arena with the
330 // specified refcount. If set_on_complete is true, the batch's
331 // on_complete callback will be set to point to on_complete();
332 // otherwise, the batch's on_complete callback will be null.
333 static SubchannelCallBatchData* Create(grpc_call_element* elem,
334 int refcount, bool set_on_complete);
335
Unrefgrpc_core::__anone9b9ae780111::CallData::SubchannelCallBatchData336 void Unref() {
337 if (gpr_unref(&refs)) Destroy();
338 }
339
340 SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
341 int refcount, bool set_on_complete);
342 // All dtor code must be added in `Destroy()`. This is because we may
343 // call closures in `SubchannelCallBatchData` after they are unrefed by
344 // `Unref()`, and msan would complain about accessing this class
345 // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
346 // TODO(soheil): We should try to call the dtor in `Unref()`.
~SubchannelCallBatchDatagrpc_core::__anone9b9ae780111::CallData::SubchannelCallBatchData347 ~SubchannelCallBatchData() { Destroy(); }
348 void Destroy();
349
350 gpr_refcount refs;
351 grpc_call_element* elem;
352 RefCountedPtr<SubchannelCall> subchannel_call;
353 // The batch to use in the subchannel call.
354 // Its payload field points to SubchannelCallRetryState::batch_payload.
355 grpc_transport_stream_op_batch batch;
356 // For intercepting on_complete.
357 grpc_closure on_complete;
358 };
359
360 // Retry state associated with a subchannel call.
361 // Stored in the parent_data of the subchannel call object.
362 struct SubchannelCallRetryState {
SubchannelCallRetryStategrpc_core::__anone9b9ae780111::CallData::SubchannelCallRetryState363 explicit SubchannelCallRetryState(grpc_call_context_element* context)
364 : batch_payload(context),
365 started_send_initial_metadata(false),
366 completed_send_initial_metadata(false),
367 started_send_trailing_metadata(false),
368 completed_send_trailing_metadata(false),
369 started_recv_initial_metadata(false),
370 completed_recv_initial_metadata(false),
371 started_recv_trailing_metadata(false),
372 completed_recv_trailing_metadata(false),
373 retry_dispatched(false) {}
374
375 // SubchannelCallBatchData.batch.payload points to this.
376 grpc_transport_stream_op_batch_payload batch_payload;
377 // For send_initial_metadata.
378 // Note that we need to make a copy of the initial metadata for each
379 // subchannel call instead of just referring to the copy in call_data,
380 // because filters in the subchannel stack will probably add entries,
381 // so we need to start in a pristine state for each attempt of the call.
382 grpc_linked_mdelem* send_initial_metadata_storage;
383 grpc_metadata_batch send_initial_metadata;
384 // For send_message.
385 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
386 ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
387 // For send_trailing_metadata.
388 grpc_linked_mdelem* send_trailing_metadata_storage;
389 grpc_metadata_batch send_trailing_metadata;
390 // For intercepting recv_initial_metadata.
391 grpc_metadata_batch recv_initial_metadata;
392 grpc_closure recv_initial_metadata_ready;
393 bool trailing_metadata_available = false;
394 // For intercepting recv_message.
395 grpc_closure recv_message_ready;
396 OrphanablePtr<ByteStream> recv_message;
397 // For intercepting recv_trailing_metadata.
398 grpc_metadata_batch recv_trailing_metadata;
399 grpc_transport_stream_stats collect_stats;
400 grpc_closure recv_trailing_metadata_ready;
401 // These fields indicate which ops have been started and completed on
402 // this subchannel call.
403 size_t started_send_message_count = 0;
404 size_t completed_send_message_count = 0;
405 size_t started_recv_message_count = 0;
406 size_t completed_recv_message_count = 0;
407 bool started_send_initial_metadata : 1;
408 bool completed_send_initial_metadata : 1;
409 bool started_send_trailing_metadata : 1;
410 bool completed_send_trailing_metadata : 1;
411 bool started_recv_initial_metadata : 1;
412 bool completed_recv_initial_metadata : 1;
413 bool started_recv_trailing_metadata : 1;
414 bool completed_recv_trailing_metadata : 1;
415 // State for callback processing.
416 SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
417 nullptr;
418 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
419 SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
420 grpc_error* recv_message_error = GRPC_ERROR_NONE;
421 SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
422 // NOTE: Do not move this next to the metadata bitfields above. That would
423 // save space but will also result in a data race because compiler
424 // will generate a 2 byte store which overwrites the meta-data
425 // fields upon setting this field.
426 bool retry_dispatched : 1;
427 };
428
429 // Pending batches stored in call data.
430 struct PendingBatch {
431 // The pending batch. If nullptr, this slot is empty.
432 grpc_transport_stream_op_batch* batch;
433 // Indicates whether payload for send ops has been cached in CallData.
434 bool send_ops_cached;
435 };
436
437 CallData(grpc_call_element* elem, const ChannelData& chand,
438 const grpc_call_element_args& args);
439 ~CallData();
440
441 // Caches data for send ops so that it can be retried later, if not
442 // already cached.
443 void MaybeCacheSendOpsForBatch(PendingBatch* pending);
444 void FreeCachedSendInitialMetadata(ChannelData* chand);
445 // Frees cached send_message at index idx.
446 void FreeCachedSendMessage(ChannelData* chand, size_t idx);
447 void FreeCachedSendTrailingMetadata(ChannelData* chand);
448 // Frees cached send ops that have already been completed after
449 // committing the call.
450 void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
451 SubchannelCallRetryState* retry_state);
452 // Frees cached send ops that were completed by the completed batch in
453 // batch_data. Used when batches are completed after the call is committed.
454 void FreeCachedSendOpDataForCompletedBatch(
455 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
456 SubchannelCallRetryState* retry_state);
457
458 static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
459 void* arg, grpc_error* error);
460 void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
461 grpc_transport_stream_op_batch* batch);
462
463 // Returns the index into pending_batches_ to be used for batch.
464 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
465 void PendingBatchesAdd(grpc_call_element* elem,
466 grpc_transport_stream_op_batch* batch);
467 void PendingBatchClear(PendingBatch* pending);
468 void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
469 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
470 // A predicate type and some useful implementations for PendingBatchesFail().
471 typedef bool (*YieldCallCombinerPredicate)(
472 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList & closures)473 static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
474 return true;
475 }
NoYieldCallCombiner(const CallCombinerClosureList & closures)476 static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
477 return false;
478 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)479 static bool YieldCallCombinerIfPendingBatchesFound(
480 const CallCombinerClosureList& closures) {
481 return closures.size() > 0;
482 }
483 // Fails all pending batches.
484 // If yield_call_combiner_predicate returns true, assumes responsibility for
485 // yielding the call combiner.
486 void PendingBatchesFail(
487 grpc_call_element* elem, grpc_error* error,
488 YieldCallCombinerPredicate yield_call_combiner_predicate);
489 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
490 // Resumes all pending batches on subchannel_call_.
491 void PendingBatchesResume(grpc_call_element* elem);
492 // Returns a pointer to the first pending batch for which predicate(batch)
493 // returns true, or null if not found.
494 template <typename Predicate>
495 PendingBatch* PendingBatchFind(grpc_call_element* elem,
496 const char* log_message, Predicate predicate);
497
498 // Commits the call so that no further retry attempts will be performed.
499 void RetryCommit(grpc_call_element* elem,
500 SubchannelCallRetryState* retry_state);
501 // Starts a retry after appropriate back-off.
502 void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
503 grpc_millis server_pushback_ms);
504 // Returns true if the call is being retried.
505 bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
506 grpc_status_code status, grpc_mdelem* server_pushback_md);
507
508 // Invokes recv_initial_metadata_ready for a subchannel batch.
509 static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
510 // Intercepts recv_initial_metadata_ready callback for retries.
511 // Commits the call and returns the initial metadata up the stack.
512 static void RecvInitialMetadataReady(void* arg, grpc_error* error);
513
514 // Invokes recv_message_ready for a subchannel batch.
515 static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
516 // Intercepts recv_message_ready callback for retries.
517 // Commits the call and returns the message up the stack.
518 static void RecvMessageReady(void* arg, grpc_error* error);
519
520 // Sets *status and *server_pushback_md based on md_batch and error.
521 // Only sets *server_pushback_md if server_pushback_md != nullptr.
522 void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
523 grpc_error* error, grpc_status_code* status,
524 grpc_mdelem** server_pushback_md);
525 // Adds recv_trailing_metadata_ready closure to closures.
526 void AddClosureForRecvTrailingMetadataReady(
527 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
528 grpc_error* error, CallCombinerClosureList* closures);
529 // Adds any necessary closures for deferred recv_initial_metadata and
530 // recv_message callbacks to closures.
531 static void AddClosuresForDeferredRecvCallbacks(
532 SubchannelCallBatchData* batch_data,
533 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
534 // Returns true if any op in the batch was not yet started.
535 // Only looks at send ops, since recv ops are always started immediately.
536 bool PendingBatchIsUnstarted(PendingBatch* pending,
537 SubchannelCallRetryState* retry_state);
538 // For any pending batch containing an op that has not yet been started,
539 // adds the pending batch's completion closures to closures.
540 void AddClosuresToFailUnstartedPendingBatches(
541 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
542 grpc_error* error, CallCombinerClosureList* closures);
543 // Runs necessary closures upon completion of a call attempt.
544 void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
545 grpc_error* error);
546 // Intercepts recv_trailing_metadata_ready callback for retries.
547 // Commits the call and returns the trailing metadata up the stack.
548 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
549
550 // Adds the on_complete closure for the pending batch completed in
551 // batch_data to closures.
552 void AddClosuresForCompletedPendingBatch(
553 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
554 SubchannelCallRetryState* retry_state, grpc_error* error,
555 CallCombinerClosureList* closures);
556
557 // If there are any cached ops to replay or pending ops to start on the
558 // subchannel call, adds a closure to closures to invoke
559 // StartRetriableSubchannelBatches().
560 void AddClosuresForReplayOrPendingSendOps(
561 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
562 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
563
564 // Callback used to intercept on_complete from subchannel calls.
565 // Called only when retries are enabled.
566 static void OnComplete(void* arg, grpc_error* error);
567
568 static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
569 // Adds a closure to closures that will execute batch in the call combiner.
570 void AddClosureForSubchannelBatch(grpc_call_element* elem,
571 grpc_transport_stream_op_batch* batch,
572 CallCombinerClosureList* closures);
573 // Adds retriable send_initial_metadata op to batch_data.
574 void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
575 SubchannelCallBatchData* batch_data);
576 // Adds retriable send_message op to batch_data.
577 void AddRetriableSendMessageOp(grpc_call_element* elem,
578 SubchannelCallRetryState* retry_state,
579 SubchannelCallBatchData* batch_data);
580 // Adds retriable send_trailing_metadata op to batch_data.
581 void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
582 SubchannelCallBatchData* batch_data);
583 // Adds retriable recv_initial_metadata op to batch_data.
584 void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
585 SubchannelCallBatchData* batch_data);
586 // Adds retriable recv_message op to batch_data.
587 void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
588 SubchannelCallBatchData* batch_data);
589 // Adds retriable recv_trailing_metadata op to batch_data.
590 void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
591 SubchannelCallBatchData* batch_data);
592 // Helper function used to start a recv_trailing_metadata batch. This
593 // is used in the case where a recv_initial_metadata or recv_message
594 // op fails in a way that we know the call is over but when the application
595 // has not yet started its own recv_trailing_metadata op.
596 void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
597 // If there are any cached send ops that need to be replayed on the
598 // current subchannel call, creates and returns a new subchannel batch
599 // to replay those ops. Otherwise, returns nullptr.
600 SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
601 grpc_call_element* elem, SubchannelCallRetryState* retry_state);
602 // Adds subchannel batches for pending batches to closures.
603 void AddSubchannelBatchesForPendingBatches(
604 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
605 CallCombinerClosureList* closures);
606 // Constructs and starts whatever subchannel batches are needed on the
607 // subchannel call.
608 static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
609
610 void CreateSubchannelCall(grpc_call_element* elem);
611 // Invoked when a pick is completed, on both success or failure.
612 static void PickDone(void* arg, grpc_error* error);
613 // Removes the call from the channel's list of queued picks.
614 void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
615 // Adds the call to the channel's list of queued picks.
616 void AddCallToQueuedPicksLocked(grpc_call_element* elem);
617 // Applies service config to the call. Must be invoked once we know
618 // that the resolver has returned results to the channel.
619 void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
620
621 // State for handling deadlines.
622 // The code in deadline_filter.c requires this to be the first field.
623 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
624 // and this struct both independently store pointers to the call stack
625 // and call combiner. If/when we have time, find a way to avoid this
626 // without breaking the grpc_deadline_state abstraction.
627 grpc_deadline_state deadline_state_;
628
629 grpc_slice path_; // Request path.
630 gpr_timespec call_start_time_;
631 grpc_millis deadline_;
632 Arena* arena_;
633 grpc_call_stack* owning_call_;
634 CallCombiner* call_combiner_;
635 grpc_call_context_element* call_context_;
636
637 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
638 ServiceConfig::CallData service_config_call_data_;
639 const ClientChannelMethodParsedConfig* method_params_ = nullptr;
640
641 RefCountedPtr<SubchannelCall> subchannel_call_;
642
643 // Set when we get a cancel_stream op.
644 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
645
646 ChannelData::QueuedPick pick_;
647 bool pick_queued_ = false;
648 bool service_config_applied_ = false;
649 QueuedPickCanceller* pick_canceller_ = nullptr;
650 LbCallState lb_call_state_;
651 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
652 void (*lb_recv_trailing_metadata_ready_)(
653 void* user_data, grpc_metadata_batch* recv_trailing_metadata,
654 LoadBalancingPolicy::CallState* call_state) = nullptr;
655 void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
656 grpc_closure pick_closure_;
657
658 // For intercepting recv_trailing_metadata_ready for the LB policy.
659 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
660 grpc_closure recv_trailing_metadata_ready_;
661 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
662
663 grpc_polling_entity* pollent_ = nullptr;
664
665 // Batches are added to this list when received from above.
666 // They are removed when we are done handling the batch (i.e., when
667 // either we have invoked all of the batch's callbacks or we have
668 // passed the batch down to the subchannel call and are not
669 // intercepting any of its callbacks).
670 PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
671 bool pending_send_initial_metadata_ : 1;
672 bool pending_send_message_ : 1;
673 bool pending_send_trailing_metadata_ : 1;
674
675 // Retry state.
676 bool enable_retries_ : 1;
677 bool retry_committed_ : 1;
678 bool last_attempt_got_server_pushback_ : 1;
679 int num_attempts_completed_ = 0;
680 size_t bytes_buffered_for_retry_ = 0;
681 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
682 ManualConstructor<BackOff> retry_backoff_;
683 grpc_timer retry_timer_;
684
685 // The number of pending retriable subchannel batches containing send ops.
686 // We hold a ref to the call stack while this is non-zero, since replay
687 // batches may not complete until after all callbacks have been returned
688 // to the surface, and we need to make sure that the call is not destroyed
689 // until all of these batches have completed.
690 // Note that we actually only need to track replay batches, but it's
691 // easier to track all batches with send ops.
692 int num_pending_retriable_subchannel_send_batches_ = 0;
693
694 // Cached data for retrying send ops.
695 // send_initial_metadata
696 bool seen_send_initial_metadata_ = false;
697 grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
698 grpc_metadata_batch send_initial_metadata_;
699 uint32_t send_initial_metadata_flags_;
700 gpr_atm* peer_string_;
701 // send_message
702 // When we get a send_message op, we replace the original byte stream
703 // with a CachingByteStream that caches the slices to a local buffer for
704 // use in retries.
705 // Note: We inline the cache for the first 3 send_message ops and use
706 // dynamic allocation after that. This number was essentially picked
707 // at random; it could be changed in the future to tune performance.
708 InlinedVector<ByteStreamCache*, 3> send_messages_;
709 // send_trailing_metadata
710 bool seen_send_trailing_metadata_ = false;
711 grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
712 grpc_metadata_batch send_trailing_metadata_;
713 };
714
715 //
716 // ChannelData::ConnectivityStateAndPickerSetter
717 //
718
719 // A fire-and-forget class that sets the channel's connectivity state
720 // and then hops into the data plane combiner to update the picker.
721 // Must be instantiated while holding the control plane combiner.
722 // Deletes itself when done.
723 class ChannelData::ConnectivityStateAndPickerSetter {
724 public:
ConnectivityStateAndPickerSetter(ChannelData * chand,grpc_connectivity_state state,const char * reason,UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)725 ConnectivityStateAndPickerSetter(
726 ChannelData* chand, grpc_connectivity_state state, const char* reason,
727 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
728 : chand_(chand), picker_(std::move(picker)) {
729 // Update connectivity state here, while holding control plane combiner.
730 grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
731 if (chand->channelz_node_ != nullptr) {
732 chand->channelz_node_->SetConnectivityState(state);
733 chand->channelz_node_->AddTraceEvent(
734 channelz::ChannelTrace::Severity::Info,
735 grpc_slice_from_static_string(
736 GetChannelConnectivityStateChangeString(state)));
737 }
738 // Bounce into the data plane combiner to reset the picker.
739 GRPC_CHANNEL_STACK_REF(chand->owning_stack_,
740 "ConnectivityStateAndPickerSetter");
741 GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
742 grpc_combiner_scheduler(chand->data_plane_combiner_));
743 GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
744 }
745
746 private:
GetChannelConnectivityStateChangeString(grpc_connectivity_state state)747 static const char* GetChannelConnectivityStateChangeString(
748 grpc_connectivity_state state) {
749 switch (state) {
750 case GRPC_CHANNEL_IDLE:
751 return "Channel state change to IDLE";
752 case GRPC_CHANNEL_CONNECTING:
753 return "Channel state change to CONNECTING";
754 case GRPC_CHANNEL_READY:
755 return "Channel state change to READY";
756 case GRPC_CHANNEL_TRANSIENT_FAILURE:
757 return "Channel state change to TRANSIENT_FAILURE";
758 case GRPC_CHANNEL_SHUTDOWN:
759 return "Channel state change to SHUTDOWN";
760 }
761 GPR_UNREACHABLE_CODE(return "UNKNOWN");
762 }
763
SetPicker(void * arg,grpc_error * ignored)764 static void SetPicker(void* arg, grpc_error* ignored) {
765 auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
766 // Update picker.
767 self->chand_->picker_ = std::move(self->picker_);
768 // Re-process queued picks.
769 for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
770 pick = pick->next) {
771 CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE);
772 }
773 // Clean up.
774 GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
775 "ConnectivityStateAndPickerSetter");
776 Delete(self);
777 }
778
779 ChannelData* chand_;
780 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
781 grpc_closure closure_;
782 };
783
784 //
785 // ChannelData::ServiceConfigSetter
786 //
787
788 // A fire-and-forget class that sets the channel's service config data
789 // in the data plane combiner. Deletes itself when done.
790 class ChannelData::ServiceConfigSetter {
791 public:
ServiceConfigSetter(ChannelData * chand,Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling> retry_throttle_data,RefCountedPtr<ServiceConfig> service_config)792 ServiceConfigSetter(
793 ChannelData* chand,
794 Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
795 retry_throttle_data,
796 RefCountedPtr<ServiceConfig> service_config)
797 : chand_(chand),
798 retry_throttle_data_(retry_throttle_data),
799 service_config_(std::move(service_config)) {
800 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter");
801 GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
802 grpc_combiner_scheduler(chand->data_plane_combiner_));
803 GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
804 }
805
806 private:
SetServiceConfigData(void * arg,grpc_error * ignored)807 static void SetServiceConfigData(void* arg, grpc_error* ignored) {
808 ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
809 ChannelData* chand = self->chand_;
810 // Update channel state.
811 chand->received_service_config_data_ = true;
812 if (self->retry_throttle_data_.has_value()) {
813 chand->retry_throttle_data_ =
814 internal::ServerRetryThrottleMap::GetDataForServer(
815 chand->server_name_.get(),
816 self->retry_throttle_data_.value().max_milli_tokens,
817 self->retry_throttle_data_.value().milli_token_ratio);
818 }
819 chand->service_config_ = std::move(self->service_config_);
820 // Apply service config to queued picks.
821 for (QueuedPick* pick = chand->queued_picks_; pick != nullptr;
822 pick = pick->next) {
823 CallData* calld = static_cast<CallData*>(pick->elem->call_data);
824 calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
825 }
826 // Clean up.
827 GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
828 "ServiceConfigSetter");
829 Delete(self);
830 }
831
832 ChannelData* chand_;
833 Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
834 retry_throttle_data_;
835 RefCountedPtr<ServiceConfig> service_config_;
836 grpc_closure closure_;
837 };
838
839 //
840 // ChannelData::ExternalConnectivityWatcher::WatcherList
841 //
842
size() const843 int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
844 MutexLock lock(&mu_);
845 int count = 0;
846 for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
847 ++count;
848 }
849 return count;
850 }
851
852 ChannelData::ExternalConnectivityWatcher*
Lookup(grpc_closure * on_complete) const853 ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
854 grpc_closure* on_complete) const {
855 MutexLock lock(&mu_);
856 ExternalConnectivityWatcher* w = head_;
857 while (w != nullptr && w->on_complete_ != on_complete) {
858 w = w->next_;
859 }
860 return w;
861 }
862
Add(ExternalConnectivityWatcher * watcher)863 void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
864 ExternalConnectivityWatcher* watcher) {
865 GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
866 MutexLock lock(&mu_);
867 GPR_ASSERT(watcher->next_ == nullptr);
868 watcher->next_ = head_;
869 head_ = watcher;
870 }
871
Remove(const ExternalConnectivityWatcher * watcher)872 void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
873 const ExternalConnectivityWatcher* watcher) {
874 MutexLock lock(&mu_);
875 if (watcher == head_) {
876 head_ = watcher->next_;
877 return;
878 }
879 for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
880 if (w->next_ == watcher) {
881 w->next_ = w->next_->next_;
882 return;
883 }
884 }
885 GPR_UNREACHABLE_CODE(return );
886 }
887
888 //
889 // ChannelData::ExternalConnectivityWatcher
890 //
891
ExternalConnectivityWatcher(ChannelData * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)892 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
893 ChannelData* chand, grpc_polling_entity pollent,
894 grpc_connectivity_state* state, grpc_closure* on_complete,
895 grpc_closure* watcher_timer_init)
896 : chand_(chand),
897 pollent_(pollent),
898 state_(state),
899 on_complete_(on_complete),
900 watcher_timer_init_(watcher_timer_init) {
901 grpc_polling_entity_add_to_pollset_set(&pollent_,
902 chand_->interested_parties_);
903 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
904 GRPC_CLOSURE_SCHED(
905 GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
906 grpc_combiner_scheduler(chand_->combiner_)),
907 GRPC_ERROR_NONE);
908 }
909
~ExternalConnectivityWatcher()910 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
911 grpc_polling_entity_del_from_pollset_set(&pollent_,
912 chand_->interested_parties_);
913 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
914 "ExternalConnectivityWatcher");
915 }
916
OnWatchCompleteLocked(void * arg,grpc_error * error)917 void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
918 void* arg, grpc_error* error) {
919 ExternalConnectivityWatcher* self =
920 static_cast<ExternalConnectivityWatcher*>(arg);
921 grpc_closure* on_complete = self->on_complete_;
922 self->chand_->external_connectivity_watcher_list_.Remove(self);
923 Delete(self);
924 GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
925 }
926
WatchConnectivityStateLocked(void * arg,grpc_error * ignored)927 void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
928 void* arg, grpc_error* ignored) {
929 ExternalConnectivityWatcher* self =
930 static_cast<ExternalConnectivityWatcher*>(arg);
931 if (self->state_ == nullptr) {
932 // Handle cancellation.
933 GPR_ASSERT(self->watcher_timer_init_ == nullptr);
934 ExternalConnectivityWatcher* found =
935 self->chand_->external_connectivity_watcher_list_.Lookup(
936 self->on_complete_);
937 if (found != nullptr) {
938 grpc_connectivity_state_notify_on_state_change(
939 &found->chand_->state_tracker_, nullptr, &found->my_closure_);
940 }
941 Delete(self);
942 return;
943 }
944 // New watcher.
945 self->chand_->external_connectivity_watcher_list_.Add(self);
946 // This assumes that the closure is scheduled on the ExecCtx scheduler
947 // and that GRPC_CLOSURE_RUN would run the closure immediately.
948 GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
949 GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
950 grpc_combiner_scheduler(self->chand_->combiner_));
951 grpc_connectivity_state_notify_on_state_change(
952 &self->chand_->state_tracker_, self->state_, &self->my_closure_);
953 }
954
955 //
956 // ChannelData::GrpcSubchannel
957 //
958
959 // This class is a wrapper for Subchannel that hides details of the
960 // channel's implementation (such as the health check service name) from
961 // the LB policy API.
962 //
963 // Note that no synchronization is needed here, because even if the
964 // underlying subchannel is shared between channels, this wrapper will only
965 // be used within one channel, so it will always be synchronized by the
966 // control plane combiner.
967 class ChannelData::GrpcSubchannel : public SubchannelInterface {
968 public:
GrpcSubchannel(ChannelData * chand,Subchannel * subchannel,UniquePtr<char> health_check_service_name)969 GrpcSubchannel(ChannelData* chand, Subchannel* subchannel,
970 UniquePtr<char> health_check_service_name)
971 : chand_(chand),
972 subchannel_(subchannel),
973 health_check_service_name_(std::move(health_check_service_name)) {
974 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel");
975 auto* subchannel_node = subchannel_->channelz_node();
976 if (subchannel_node != nullptr) {
977 intptr_t subchannel_uuid = subchannel_node->uuid();
978 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
979 if (it == chand_->subchannel_refcount_map_.end()) {
980 chand_->channelz_node_->AddChildSubchannel(subchannel_uuid);
981 it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
982 }
983 ++it->second;
984 }
985 }
986
~GrpcSubchannel()987 ~GrpcSubchannel() {
988 auto* subchannel_node = subchannel_->channelz_node();
989 if (subchannel_node != nullptr) {
990 intptr_t subchannel_uuid = subchannel_node->uuid();
991 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
992 GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
993 --it->second;
994 if (it->second == 0) {
995 chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid);
996 chand_->subchannel_refcount_map_.erase(it);
997 }
998 }
999 GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
1000 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel");
1001 }
1002
CheckConnectivityState(RefCountedPtr<ConnectedSubchannelInterface> * connected_subchannel)1003 grpc_connectivity_state CheckConnectivityState(
1004 RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
1005 override {
1006 RefCountedPtr<ConnectedSubchannel> tmp;
1007 auto retval = subchannel_->CheckConnectivityState(
1008 health_check_service_name_.get(), &tmp);
1009 *connected_subchannel = std::move(tmp);
1010 return retval;
1011 }
1012
WatchConnectivityState(grpc_connectivity_state initial_state,UniquePtr<ConnectivityStateWatcher> watcher)1013 void WatchConnectivityState(
1014 grpc_connectivity_state initial_state,
1015 UniquePtr<ConnectivityStateWatcher> watcher) override {
1016 subchannel_->WatchConnectivityState(
1017 initial_state,
1018 UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
1019 std::move(watcher));
1020 }
1021
CancelConnectivityStateWatch(ConnectivityStateWatcher * watcher)1022 void CancelConnectivityStateWatch(
1023 ConnectivityStateWatcher* watcher) override {
1024 subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
1025 watcher);
1026 }
1027
AttemptToConnect()1028 void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
1029
ResetBackoff()1030 void ResetBackoff() override { subchannel_->ResetBackoff(); }
1031
1032 private:
1033 ChannelData* chand_;
1034 Subchannel* subchannel_;
1035 UniquePtr<char> health_check_service_name_;
1036 };
1037
1038 //
1039 // ChannelData::ClientChannelControlHelper
1040 //
1041
1042 class ChannelData::ClientChannelControlHelper
1043 : public LoadBalancingPolicy::ChannelControlHelper {
1044 public:
ClientChannelControlHelper(ChannelData * chand)1045 explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1046 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1047 }
1048
~ClientChannelControlHelper()1049 ~ClientChannelControlHelper() override {
1050 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1051 "ClientChannelControlHelper");
1052 }
1053
CreateSubchannel(const grpc_channel_args & args)1054 RefCountedPtr<SubchannelInterface> CreateSubchannel(
1055 const grpc_channel_args& args) override {
1056 bool inhibit_health_checking = grpc_channel_arg_get_bool(
1057 grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1058 UniquePtr<char> health_check_service_name;
1059 if (!inhibit_health_checking) {
1060 health_check_service_name.reset(
1061 gpr_strdup(chand_->health_check_service_name_.get()));
1062 }
1063 static const char* args_to_remove[] = {
1064 GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1065 GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1066 };
1067 grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
1068 chand_->subchannel_pool_.get());
1069 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1070 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
1071 Subchannel* subchannel =
1072 chand_->client_channel_factory_->CreateSubchannel(new_args);
1073 grpc_channel_args_destroy(new_args);
1074 if (subchannel == nullptr) return nullptr;
1075 return MakeRefCounted<GrpcSubchannel>(chand_, subchannel,
1076 std::move(health_check_service_name));
1077 }
1078
CreateChannel(const char * target,const grpc_channel_args & args)1079 grpc_channel* CreateChannel(const char* target,
1080 const grpc_channel_args& args) override {
1081 return chand_->client_channel_factory_->CreateChannel(target, &args);
1082 }
1083
UpdateState(grpc_connectivity_state state,UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)1084 void UpdateState(
1085 grpc_connectivity_state state,
1086 UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1087 grpc_error* disconnect_error =
1088 chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE);
1089 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1090 const char* extra = disconnect_error == GRPC_ERROR_NONE
1091 ? ""
1092 : " (ignoring -- channel shutting down)";
1093 gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
1094 grpc_connectivity_state_name(state), picker.get(), extra);
1095 }
1096 // Do update only if not shutting down.
1097 if (disconnect_error == GRPC_ERROR_NONE) {
1098 // Will delete itself.
1099 New<ConnectivityStateAndPickerSetter>(chand_, state, "helper",
1100 std::move(picker));
1101 }
1102 }
1103
1104 // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
RequestReresolution()1105 void RequestReresolution() override {}
1106
AddTraceEvent(TraceSeverity severity,const char * message)1107 void AddTraceEvent(TraceSeverity severity, const char* message) override {
1108 if (chand_->channelz_node_ != nullptr) {
1109 chand_->channelz_node_->AddTraceEvent(
1110 ConvertSeverityEnum(severity),
1111 grpc_slice_from_copied_string(message));
1112 }
1113 }
1114
1115 private:
ConvertSeverityEnum(TraceSeverity severity)1116 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1117 TraceSeverity severity) {
1118 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1119 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1120 return channelz::ChannelTrace::Error;
1121 }
1122
1123 ChannelData* chand_;
1124 };
1125
1126 //
1127 // ChannelData implementation
1128 //
1129
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1130 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1131 grpc_channel_element_args* args) {
1132 GPR_ASSERT(args->is_last);
1133 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1134 grpc_error* error = GRPC_ERROR_NONE;
1135 new (elem->channel_data) ChannelData(args, &error);
1136 return error;
1137 }
1138
Destroy(grpc_channel_element * elem)1139 void ChannelData::Destroy(grpc_channel_element* elem) {
1140 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1141 chand->~ChannelData();
1142 }
1143
GetEnableRetries(const grpc_channel_args * args)1144 bool GetEnableRetries(const grpc_channel_args* args) {
1145 return grpc_channel_arg_get_bool(
1146 grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1147 }
1148
GetMaxPerRpcRetryBufferSize(const grpc_channel_args * args)1149 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1150 return static_cast<size_t>(grpc_channel_arg_get_integer(
1151 grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1152 {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1153 }
1154
GetSubchannelPool(const grpc_channel_args * args)1155 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1156 const grpc_channel_args* args) {
1157 const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1158 grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1159 if (use_local_subchannel_pool) {
1160 return MakeRefCounted<LocalSubchannelPool>();
1161 }
1162 return GlobalSubchannelPool::instance();
1163 }
1164
GetChannelzNode(const grpc_channel_args * args)1165 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1166 const grpc_arg* arg =
1167 grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1168 if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1169 return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1170 }
1171 return nullptr;
1172 }
1173
ChannelData(grpc_channel_element_args * args,grpc_error ** error)1174 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1175 : deadline_checking_enabled_(
1176 grpc_deadline_checking_enabled(args->channel_args)),
1177 enable_retries_(GetEnableRetries(args->channel_args)),
1178 per_rpc_retry_buffer_size_(
1179 GetMaxPerRpcRetryBufferSize(args->channel_args)),
1180 owning_stack_(args->channel_stack),
1181 client_channel_factory_(
1182 ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1183 channelz_node_(GetChannelzNode(args->channel_args)),
1184 data_plane_combiner_(grpc_combiner_create()),
1185 combiner_(grpc_combiner_create()),
1186 interested_parties_(grpc_pollset_set_create()),
1187 subchannel_pool_(GetSubchannelPool(args->channel_args)),
1188 disconnect_error_(GRPC_ERROR_NONE) {
1189 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1190 gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1191 this, owning_stack_);
1192 }
1193 // Initialize data members.
1194 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
1195 "client_channel");
1196 gpr_mu_init(&info_mu_);
1197 // Start backup polling.
1198 grpc_client_channel_start_backup_polling(interested_parties_);
1199 // Check client channel factory.
1200 if (client_channel_factory_ == nullptr) {
1201 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1202 "Missing client channel factory in args for client channel filter");
1203 return;
1204 }
1205 // Get server name to resolve, using proxy mapper if needed.
1206 const char* server_uri = grpc_channel_arg_get_string(
1207 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1208 if (server_uri == nullptr) {
1209 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1210 "server URI channel arg missing or wrong type in client channel "
1211 "filter");
1212 return;
1213 }
1214 // Get default service config
1215 const char* service_config_json = grpc_channel_arg_get_string(
1216 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1217 if (service_config_json != nullptr) {
1218 *error = GRPC_ERROR_NONE;
1219 default_service_config_ = ServiceConfig::Create(service_config_json, error);
1220 if (*error != GRPC_ERROR_NONE) {
1221 default_service_config_.reset();
1222 return;
1223 }
1224 }
1225 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1226 if (uri != nullptr && uri->path[0] != '\0') {
1227 server_name_.reset(
1228 gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
1229 }
1230 grpc_uri_destroy(uri);
1231 char* proxy_name = nullptr;
1232 grpc_channel_args* new_args = nullptr;
1233 grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
1234 &new_args);
1235 target_uri_.reset(proxy_name != nullptr ? proxy_name
1236 : gpr_strdup(server_uri));
1237 channel_args_ = new_args != nullptr
1238 ? new_args
1239 : grpc_channel_args_copy(args->channel_args);
1240 if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1241 *error =
1242 GRPC_ERROR_CREATE_FROM_STATIC_STRING("the target uri is not valid.");
1243 return;
1244 }
1245 *error = GRPC_ERROR_NONE;
1246 }
1247
~ChannelData()1248 ChannelData::~ChannelData() {
1249 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1250 gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1251 }
1252 DestroyResolvingLoadBalancingPolicyLocked();
1253 grpc_channel_args_destroy(channel_args_);
1254 // Stop backup polling.
1255 grpc_client_channel_stop_backup_polling(interested_parties_);
1256 grpc_pollset_set_destroy(interested_parties_);
1257 GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
1258 GRPC_COMBINER_UNREF(combiner_, "client_channel");
1259 GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1260 grpc_connectivity_state_destroy(&state_tracker_);
1261 gpr_mu_destroy(&info_mu_);
1262 }
1263
CreateResolvingLoadBalancingPolicyLocked()1264 void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
1265 // Instantiate resolving LB policy.
1266 LoadBalancingPolicy::Args lb_args;
1267 lb_args.combiner = combiner_;
1268 lb_args.channel_control_helper =
1269 UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
1270 New<ClientChannelControlHelper>(this));
1271 lb_args.args = channel_args_;
1272 UniquePtr<char> target_uri(strdup(target_uri_.get()));
1273 resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
1274 std::move(lb_args), &grpc_client_channel_routing_trace,
1275 std::move(target_uri), ProcessResolverResultLocked, this));
1276 grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
1277 interested_parties_);
1278 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1279 gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
1280 resolving_lb_policy_.get());
1281 }
1282 }
1283
DestroyResolvingLoadBalancingPolicyLocked()1284 void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
1285 if (resolving_lb_policy_ != nullptr) {
1286 grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
1287 interested_parties_);
1288 resolving_lb_policy_.reset();
1289 }
1290 }
1291
ProcessLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config,UniquePtr<char> * lb_policy_name,RefCountedPtr<LoadBalancingPolicy::Config> * lb_policy_config)1292 void ChannelData::ProcessLbPolicy(
1293 const Resolver::Result& resolver_result,
1294 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
1295 UniquePtr<char>* lb_policy_name,
1296 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
1297 // Prefer the LB policy name found in the service config.
1298 if (parsed_service_config != nullptr &&
1299 parsed_service_config->parsed_lb_config() != nullptr) {
1300 lb_policy_name->reset(
1301 gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
1302 *lb_policy_config = parsed_service_config->parsed_lb_config();
1303 return;
1304 }
1305 const char* local_policy_name = nullptr;
1306 if (parsed_service_config != nullptr &&
1307 parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
1308 local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1309 } else {
1310 const grpc_arg* channel_arg =
1311 grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1312 local_policy_name = grpc_channel_arg_get_string(channel_arg);
1313 }
1314 // Special case: If at least one balancer address is present, we use
1315 // the grpclb policy, regardless of what the resolver has returned.
1316 bool found_balancer_address = false;
1317 for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
1318 const ServerAddress& address = resolver_result.addresses[i];
1319 if (address.IsBalancer()) {
1320 found_balancer_address = true;
1321 break;
1322 }
1323 }
1324 if (found_balancer_address) {
1325 if (local_policy_name != nullptr &&
1326 strcmp(local_policy_name, "grpclb") != 0) {
1327 gpr_log(GPR_INFO,
1328 "resolver requested LB policy %s but provided at least one "
1329 "balancer address -- forcing use of grpclb LB policy",
1330 local_policy_name);
1331 }
1332 local_policy_name = "grpclb";
1333 }
1334 // Use pick_first if nothing was specified and we didn't select grpclb
1335 // above.
1336 lb_policy_name->reset(gpr_strdup(
1337 local_policy_name == nullptr ? "pick_first" : local_policy_name));
1338 }
1339
1340 // Synchronous callback from ResolvingLoadBalancingPolicy to process a
1341 // resolver result update.
ProcessResolverResultLocked(void * arg,const Resolver::Result & result,const char ** lb_policy_name,RefCountedPtr<LoadBalancingPolicy::Config> * lb_policy_config,grpc_error ** service_config_error)1342 bool ChannelData::ProcessResolverResultLocked(
1343 void* arg, const Resolver::Result& result, const char** lb_policy_name,
1344 RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
1345 grpc_error** service_config_error) {
1346 ChannelData* chand = static_cast<ChannelData*>(arg);
1347 RefCountedPtr<ServiceConfig> service_config;
1348 // If resolver did not return a service config or returned an invalid service
1349 // config, we need a fallback service config.
1350 if (result.service_config_error != GRPC_ERROR_NONE) {
1351 // If the service config was invalid, then fallback to the saved service
1352 // config. If there is no saved config either, use the default service
1353 // config.
1354 if (chand->saved_service_config_ != nullptr) {
1355 service_config = chand->saved_service_config_;
1356 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1357 gpr_log(GPR_INFO,
1358 "chand=%p: resolver returned invalid service config. "
1359 "Continuing to use previous service config.",
1360 chand);
1361 }
1362 } else if (chand->default_service_config_ != nullptr) {
1363 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1364 gpr_log(GPR_INFO,
1365 "chand=%p: resolver returned invalid service config. Using "
1366 "default service config provided by client API.",
1367 chand);
1368 }
1369 service_config = chand->default_service_config_;
1370 }
1371 } else if (result.service_config == nullptr) {
1372 if (chand->default_service_config_ != nullptr) {
1373 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1374 gpr_log(GPR_INFO,
1375 "chand=%p: resolver returned no service config. Using default "
1376 "service config provided by client API.",
1377 chand);
1378 }
1379 service_config = chand->default_service_config_;
1380 }
1381 } else {
1382 service_config = result.service_config;
1383 }
1384 *service_config_error = GRPC_ERROR_REF(result.service_config_error);
1385 if (service_config == nullptr &&
1386 result.service_config_error != GRPC_ERROR_NONE) {
1387 return false;
1388 }
1389 // Process service config.
1390 UniquePtr<char> service_config_json;
1391 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1392 nullptr;
1393 if (service_config != nullptr) {
1394 parsed_service_config =
1395 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1396 service_config->GetGlobalParsedConfig(
1397 internal::ClientChannelServiceConfigParser::ParserIndex()));
1398 }
1399 // Check if the config has changed.
1400 const bool service_config_changed =
1401 ((service_config == nullptr) !=
1402 (chand->saved_service_config_ == nullptr)) ||
1403 (service_config != nullptr &&
1404 strcmp(service_config->service_config_json(),
1405 chand->saved_service_config_->service_config_json()) != 0);
1406 if (service_config_changed) {
1407 service_config_json.reset(gpr_strdup(
1408 service_config != nullptr ? service_config->service_config_json()
1409 : ""));
1410 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1411 gpr_log(GPR_INFO,
1412 "chand=%p: resolver returned updated service config: \"%s\"",
1413 chand, service_config_json.get());
1414 }
1415 // Save health check service name.
1416 if (service_config != nullptr) {
1417 chand->health_check_service_name_.reset(
1418 gpr_strdup(parsed_service_config->health_check_service_name()));
1419 } else {
1420 chand->health_check_service_name_.reset();
1421 }
1422 // Save service config.
1423 chand->saved_service_config_ = std::move(service_config);
1424 }
1425 // We want to set the service config at least once. This should not really be
1426 // needed, but we are doing it as a defensive approach. This can be removed,
1427 // if we feel it is unnecessary.
1428 if (service_config_changed || !chand->received_first_resolver_result_) {
1429 chand->received_first_resolver_result_ = true;
1430 Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
1431 retry_throttle_data;
1432 if (parsed_service_config != nullptr) {
1433 retry_throttle_data = parsed_service_config->retry_throttling();
1434 }
1435 // Create service config setter to update channel state in the data
1436 // plane combiner. Destroys itself when done.
1437 New<ServiceConfigSetter>(chand, retry_throttle_data,
1438 chand->saved_service_config_);
1439 }
1440 UniquePtr<char> processed_lb_policy_name;
1441 chand->ProcessLbPolicy(result, parsed_service_config,
1442 &processed_lb_policy_name, lb_policy_config);
1443 // Swap out the data used by GetChannelInfo().
1444 {
1445 MutexLock lock(&chand->info_mu_);
1446 chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
1447 if (service_config_json != nullptr) {
1448 chand->info_service_config_json_ = std::move(service_config_json);
1449 }
1450 }
1451 // Return results.
1452 *lb_policy_name = chand->info_lb_policy_name_.get();
1453 return service_config_changed;
1454 }
1455
DoPingLocked(grpc_transport_op * op)1456 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
1457 if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
1458 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1459 }
1460 LoadBalancingPolicy::PickResult result =
1461 picker_->Pick(LoadBalancingPolicy::PickArgs());
1462 if (result.connected_subchannel != nullptr) {
1463 ConnectedSubchannel* connected_subchannel =
1464 static_cast<ConnectedSubchannel*>(result.connected_subchannel.get());
1465 connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
1466 } else {
1467 if (result.error == GRPC_ERROR_NONE) {
1468 result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1469 "LB policy dropped call on ping");
1470 }
1471 }
1472 return result.error;
1473 }
1474
StartTransportOpLocked(void * arg,grpc_error * ignored)1475 void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
1476 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
1477 grpc_channel_element* elem =
1478 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
1479 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1480 // Connectivity watch.
1481 if (op->on_connectivity_state_change != nullptr) {
1482 grpc_connectivity_state_notify_on_state_change(
1483 &chand->state_tracker_, op->connectivity_state,
1484 op->on_connectivity_state_change);
1485 op->on_connectivity_state_change = nullptr;
1486 op->connectivity_state = nullptr;
1487 }
1488 // Ping.
1489 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1490 grpc_error* error = chand->DoPingLocked(op);
1491 if (error != GRPC_ERROR_NONE) {
1492 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
1493 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
1494 }
1495 op->bind_pollset = nullptr;
1496 op->send_ping.on_initiate = nullptr;
1497 op->send_ping.on_ack = nullptr;
1498 }
1499 // Reset backoff.
1500 if (op->reset_connect_backoff) {
1501 if (chand->resolving_lb_policy_ != nullptr) {
1502 chand->resolving_lb_policy_->ResetBackoffLocked();
1503 }
1504 }
1505 // Disconnect.
1506 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1507 grpc_error* error = GRPC_ERROR_NONE;
1508 GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
1509 &error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
1510 MemoryOrder::ACQUIRE));
1511 chand->DestroyResolvingLoadBalancingPolicyLocked();
1512 // Will delete itself.
1513 New<ConnectivityStateAndPickerSetter>(
1514 chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
1515 UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
1516 New<LoadBalancingPolicy::TransientFailurePicker>(
1517 GRPC_ERROR_REF(op->disconnect_with_error))));
1518 }
1519 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
1520 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1521 }
1522
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1523 void ChannelData::StartTransportOp(grpc_channel_element* elem,
1524 grpc_transport_op* op) {
1525 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1526 GPR_ASSERT(op->set_accept_stream == false);
1527 // Handle bind_pollset.
1528 if (op->bind_pollset != nullptr) {
1529 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1530 }
1531 // Pop into control plane combiner for remaining ops.
1532 op->handler_private.extra_arg = elem;
1533 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1534 GRPC_CLOSURE_SCHED(
1535 GRPC_CLOSURE_INIT(&op->handler_private.closure,
1536 ChannelData::StartTransportOpLocked, op,
1537 grpc_combiner_scheduler(chand->combiner_)),
1538 GRPC_ERROR_NONE);
1539 }
1540
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1541 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
1542 const grpc_channel_info* info) {
1543 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1544 MutexLock lock(&chand->info_mu_);
1545 if (info->lb_policy_name != nullptr) {
1546 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1547 }
1548 if (info->service_config_json != nullptr) {
1549 *info->service_config_json =
1550 gpr_strdup(chand->info_service_config_json_.get());
1551 }
1552 }
1553
AddQueuedPick(QueuedPick * pick,grpc_polling_entity * pollent)1554 void ChannelData::AddQueuedPick(QueuedPick* pick,
1555 grpc_polling_entity* pollent) {
1556 // Add call to queued picks list.
1557 pick->next = queued_picks_;
1558 queued_picks_ = pick;
1559 // Add call's pollent to channel's interested_parties, so that I/O
1560 // can be done under the call's CQ.
1561 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1562 }
1563
RemoveQueuedPick(QueuedPick * to_remove,grpc_polling_entity * pollent)1564 void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
1565 grpc_polling_entity* pollent) {
1566 // Remove call's pollent from channel's interested_parties.
1567 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1568 // Remove from queued picks list.
1569 for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
1570 pick = &(*pick)->next) {
1571 if (*pick == to_remove) {
1572 *pick = to_remove->next;
1573 return;
1574 }
1575 }
1576 }
1577
TryToConnectLocked(void * arg,grpc_error * error_ignored)1578 void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
1579 auto* chand = static_cast<ChannelData*>(arg);
1580 if (chand->resolving_lb_policy_ != nullptr) {
1581 chand->resolving_lb_policy_->ExitIdleLocked();
1582 } else {
1583 chand->CreateResolvingLoadBalancingPolicyLocked();
1584 }
1585 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
1586 }
1587
CheckConnectivityState(bool try_to_connect)1588 grpc_connectivity_state ChannelData::CheckConnectivityState(
1589 bool try_to_connect) {
1590 grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
1591 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1592 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1593 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
1594 grpc_combiner_scheduler(combiner_)),
1595 GRPC_ERROR_NONE);
1596 }
1597 return out;
1598 }
1599
1600 //
1601 // CallData implementation
1602 //
1603
1604 // Retry support:
1605 //
1606 // In order to support retries, we act as a proxy for stream op batches.
1607 // When we get a batch from the surface, we add it to our list of pending
1608 // batches, and we then use those batches to construct separate "child"
1609 // batches to be started on the subchannel call. When the child batches
1610 // return, we then decide which pending batches have been completed and
1611 // schedule their callbacks accordingly. If a subchannel call fails and
1612 // we want to retry it, we do a new pick and start again, constructing
1613 // new "child" batches for the new subchannel call.
1614 //
1615 // Note that retries are committed when receiving data from the server
1616 // (except for Trailers-Only responses). However, there may be many
1617 // send ops started before receiving any data, so we may have already
1618 // completed some number of send ops (and returned the completions up to
1619 // the surface) by the time we realize that we need to retry. To deal
1620 // with this, we cache data for send ops, so that we can replay them on a
1621 // different subchannel call even after we have completed the original
1622 // batches.
1623 //
1624 // There are two sets of data to maintain:
1625 // - In call_data (in the parent channel), we maintain a list of pending
1626 // ops and cached data for send ops.
1627 // - In the subchannel call, we maintain state to indicate what ops have
1628 // already been sent down to that call.
1629 //
1630 // When constructing the "child" batches, we compare those two sets of
1631 // data to see which batches need to be sent to the subchannel call.
1632
1633 // TODO(roth): In subsequent PRs:
1634 // - add support for transparent retries (including initial metadata)
1635 // - figure out how to record stats in census for retries
1636 // (census filter is on top of this one)
1637 // - add census stats for retries
1638
CallData(grpc_call_element * elem,const ChannelData & chand,const grpc_call_element_args & args)1639 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
1640 const grpc_call_element_args& args)
1641 : deadline_state_(elem, args.call_stack, args.call_combiner,
1642 GPR_LIKELY(chand.deadline_checking_enabled())
1643 ? args.deadline
1644 : GRPC_MILLIS_INF_FUTURE),
1645 path_(grpc_slice_ref_internal(args.path)),
1646 call_start_time_(args.start_time),
1647 deadline_(args.deadline),
1648 arena_(args.arena),
1649 owning_call_(args.call_stack),
1650 call_combiner_(args.call_combiner),
1651 call_context_(args.context),
1652 lb_call_state_(this),
1653 pending_send_initial_metadata_(false),
1654 pending_send_message_(false),
1655 pending_send_trailing_metadata_(false),
1656 enable_retries_(chand.enable_retries()),
1657 retry_committed_(false),
1658 last_attempt_got_server_pushback_(false) {}
1659
~CallData()1660 CallData::~CallData() {
1661 grpc_slice_unref_internal(path_);
1662 GRPC_ERROR_UNREF(cancel_error_);
1663 // Make sure there are no remaining pending batches.
1664 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1665 GPR_ASSERT(pending_batches_[i].batch == nullptr);
1666 }
1667 }
1668
Init(grpc_call_element * elem,const grpc_call_element_args * args)1669 grpc_error* CallData::Init(grpc_call_element* elem,
1670 const grpc_call_element_args* args) {
1671 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1672 new (elem->call_data) CallData(elem, *chand, *args);
1673 return GRPC_ERROR_NONE;
1674 }
1675
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)1676 void CallData::Destroy(grpc_call_element* elem,
1677 const grpc_call_final_info* final_info,
1678 grpc_closure* then_schedule_closure) {
1679 CallData* calld = static_cast<CallData*>(elem->call_data);
1680 if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
1681 calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
1682 then_schedule_closure = nullptr;
1683 }
1684 calld->~CallData();
1685 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1686 }
1687
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1688 void CallData::StartTransportStreamOpBatch(
1689 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1690 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
1691 CallData* calld = static_cast<CallData*>(elem->call_data);
1692 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1693 if (GPR_LIKELY(chand->deadline_checking_enabled())) {
1694 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
1695 }
1696 // If we've previously been cancelled, immediately fail any new batches.
1697 if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
1698 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1699 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
1700 chand, calld, grpc_error_string(calld->cancel_error_));
1701 }
1702 // Note: This will release the call combiner.
1703 grpc_transport_stream_op_batch_finish_with_failure(
1704 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1705 return;
1706 }
1707 // Handle cancellation.
1708 if (GPR_UNLIKELY(batch->cancel_stream)) {
1709 // Stash a copy of cancel_error in our call data, so that we can use
1710 // it for subsequent operations. This ensures that if the call is
1711 // cancelled before any batches are passed down (e.g., if the deadline
1712 // is in the past when the call starts), we can return the right
1713 // error to the caller when the first batch does get passed down.
1714 GRPC_ERROR_UNREF(calld->cancel_error_);
1715 calld->cancel_error_ =
1716 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
1717 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1718 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
1719 calld, grpc_error_string(calld->cancel_error_));
1720 }
1721 // If we do not have a subchannel call (i.e., a pick has not yet
1722 // been started), fail all pending batches. Otherwise, send the
1723 // cancellation down to the subchannel call.
1724 if (calld->subchannel_call_ == nullptr) {
1725 // TODO(roth): If there is a pending retry callback, do we need to
1726 // cancel it here?
1727 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
1728 NoYieldCallCombiner);
1729 // Note: This will release the call combiner.
1730 grpc_transport_stream_op_batch_finish_with_failure(
1731 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1732 } else {
1733 // Note: This will release the call combiner.
1734 calld->subchannel_call_->StartTransportStreamOpBatch(batch);
1735 }
1736 return;
1737 }
1738 // Add the batch to the pending list.
1739 calld->PendingBatchesAdd(elem, batch);
1740 // Check if we've already gotten a subchannel call.
1741 // Note that once we have completed the pick, we do not need to enter
1742 // the channel combiner, which is more efficient (especially for
1743 // streaming calls).
1744 if (calld->subchannel_call_ != nullptr) {
1745 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1746 gpr_log(GPR_INFO,
1747 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
1748 calld, calld->subchannel_call_.get());
1749 }
1750 calld->PendingBatchesResume(elem);
1751 return;
1752 }
1753 // We do not yet have a subchannel call.
1754 // For batches containing a send_initial_metadata op, enter the channel
1755 // combiner to start a pick.
1756 if (GPR_LIKELY(batch->send_initial_metadata)) {
1757 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1758 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
1759 chand, calld);
1760 }
1761 GRPC_CLOSURE_SCHED(
1762 GRPC_CLOSURE_INIT(
1763 &batch->handler_private.closure, StartPickLocked, elem,
1764 grpc_combiner_scheduler(chand->data_plane_combiner())),
1765 GRPC_ERROR_NONE);
1766 } else {
1767 // For all other batches, release the call combiner.
1768 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1769 gpr_log(GPR_INFO,
1770 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
1771 calld);
1772 }
1773 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1774 "batch does not include send_initial_metadata");
1775 }
1776 }
1777
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)1778 void CallData::SetPollent(grpc_call_element* elem,
1779 grpc_polling_entity* pollent) {
1780 CallData* calld = static_cast<CallData*>(elem->call_data);
1781 calld->pollent_ = pollent;
1782 }
1783
1784 //
1785 // send op data caching
1786 //
1787
MaybeCacheSendOpsForBatch(PendingBatch * pending)1788 void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
1789 if (pending->send_ops_cached) return;
1790 pending->send_ops_cached = true;
1791 grpc_transport_stream_op_batch* batch = pending->batch;
1792 // Save a copy of metadata for send_initial_metadata ops.
1793 if (batch->send_initial_metadata) {
1794 seen_send_initial_metadata_ = true;
1795 GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
1796 grpc_metadata_batch* send_initial_metadata =
1797 batch->payload->send_initial_metadata.send_initial_metadata;
1798 send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1799 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
1800 grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
1801 send_initial_metadata_storage_);
1802 send_initial_metadata_flags_ =
1803 batch->payload->send_initial_metadata.send_initial_metadata_flags;
1804 peer_string_ = batch->payload->send_initial_metadata.peer_string;
1805 }
1806 // Set up cache for send_message ops.
1807 if (batch->send_message) {
1808 ByteStreamCache* cache = arena_->New<ByteStreamCache>(
1809 std::move(batch->payload->send_message.send_message));
1810 send_messages_.push_back(cache);
1811 }
1812 // Save metadata batch for send_trailing_metadata ops.
1813 if (batch->send_trailing_metadata) {
1814 seen_send_trailing_metadata_ = true;
1815 GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
1816 grpc_metadata_batch* send_trailing_metadata =
1817 batch->payload->send_trailing_metadata.send_trailing_metadata;
1818 send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1819 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1820 grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
1821 send_trailing_metadata_storage_);
1822 }
1823 }
1824
FreeCachedSendInitialMetadata(ChannelData * chand)1825 void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
1826 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1827 gpr_log(GPR_INFO,
1828 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1829 this);
1830 }
1831 grpc_metadata_batch_destroy(&send_initial_metadata_);
1832 }
1833
FreeCachedSendMessage(ChannelData * chand,size_t idx)1834 void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
1835 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1836 gpr_log(GPR_INFO,
1837 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1838 chand, this, idx);
1839 }
1840 send_messages_[idx]->Destroy();
1841 }
1842
FreeCachedSendTrailingMetadata(ChannelData * chand)1843 void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
1844 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1845 gpr_log(GPR_INFO,
1846 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1847 chand, this);
1848 }
1849 grpc_metadata_batch_destroy(&send_trailing_metadata_);
1850 }
1851
FreeCachedSendOpDataAfterCommit(grpc_call_element * elem,SubchannelCallRetryState * retry_state)1852 void CallData::FreeCachedSendOpDataAfterCommit(
1853 grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
1854 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1855 if (retry_state->completed_send_initial_metadata) {
1856 FreeCachedSendInitialMetadata(chand);
1857 }
1858 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
1859 FreeCachedSendMessage(chand, i);
1860 }
1861 if (retry_state->completed_send_trailing_metadata) {
1862 FreeCachedSendTrailingMetadata(chand);
1863 }
1864 }
1865
FreeCachedSendOpDataForCompletedBatch(grpc_call_element * elem,SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state)1866 void CallData::FreeCachedSendOpDataForCompletedBatch(
1867 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
1868 SubchannelCallRetryState* retry_state) {
1869 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1870 if (batch_data->batch.send_initial_metadata) {
1871 FreeCachedSendInitialMetadata(chand);
1872 }
1873 if (batch_data->batch.send_message) {
1874 FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
1875 }
1876 if (batch_data->batch.send_trailing_metadata) {
1877 FreeCachedSendTrailingMetadata(chand);
1878 }
1879 }
1880
1881 //
1882 // LB recv_trailing_metadata_ready handling
1883 //
1884
RecvTrailingMetadataReadyForLoadBalancingPolicy(void * arg,grpc_error * error)1885 void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
1886 void* arg, grpc_error* error) {
1887 CallData* calld = static_cast<CallData*>(arg);
1888 // Invoke callback to LB policy.
1889 calld->lb_recv_trailing_metadata_ready_(
1890 calld->lb_recv_trailing_metadata_ready_user_data_,
1891 calld->recv_trailing_metadata_, &calld->lb_call_state_);
1892 // Chain to original callback.
1893 GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
1894 GRPC_ERROR_REF(error));
1895 }
1896
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(grpc_transport_stream_op_batch * batch)1897 void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
1898 grpc_transport_stream_op_batch* batch) {
1899 if (lb_recv_trailing_metadata_ready_ != nullptr) {
1900 recv_trailing_metadata_ =
1901 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
1902 original_recv_trailing_metadata_ready_ =
1903 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1904 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
1905 RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
1906 grpc_schedule_on_exec_ctx);
1907 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1908 &recv_trailing_metadata_ready_;
1909 }
1910 }
1911
1912 //
1913 // pending_batches management
1914 //
1915
GetBatchIndex(grpc_transport_stream_op_batch * batch)1916 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
1917 // Note: It is important the send_initial_metadata be the first entry
1918 // here, since the code in pick_subchannel_locked() assumes it will be.
1919 if (batch->send_initial_metadata) return 0;
1920 if (batch->send_message) return 1;
1921 if (batch->send_trailing_metadata) return 2;
1922 if (batch->recv_initial_metadata) return 3;
1923 if (batch->recv_message) return 4;
1924 if (batch->recv_trailing_metadata) return 5;
1925 GPR_UNREACHABLE_CODE(return (size_t)-1);
1926 }
1927
1928 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1929 void CallData::PendingBatchesAdd(grpc_call_element* elem,
1930 grpc_transport_stream_op_batch* batch) {
1931 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1932 const size_t idx = GetBatchIndex(batch);
1933 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1934 gpr_log(GPR_INFO,
1935 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1936 this, idx);
1937 }
1938 PendingBatch* pending = &pending_batches_[idx];
1939 GPR_ASSERT(pending->batch == nullptr);
1940 pending->batch = batch;
1941 pending->send_ops_cached = false;
1942 if (enable_retries_) {
1943 // Update state in calld about pending batches.
1944 // Also check if the batch takes us over the retry buffer limit.
1945 // Note: We don't check the size of trailing metadata here, because
1946 // gRPC clients do not send trailing metadata.
1947 if (batch->send_initial_metadata) {
1948 pending_send_initial_metadata_ = true;
1949 bytes_buffered_for_retry_ += grpc_metadata_batch_size(
1950 batch->payload->send_initial_metadata.send_initial_metadata);
1951 }
1952 if (batch->send_message) {
1953 pending_send_message_ = true;
1954 bytes_buffered_for_retry_ +=
1955 batch->payload->send_message.send_message->length();
1956 }
1957 if (batch->send_trailing_metadata) {
1958 pending_send_trailing_metadata_ = true;
1959 }
1960 if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
1961 chand->per_rpc_retry_buffer_size())) {
1962 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1963 gpr_log(GPR_INFO,
1964 "chand=%p calld=%p: exceeded retry buffer size, committing",
1965 chand, this);
1966 }
1967 SubchannelCallRetryState* retry_state =
1968 subchannel_call_ == nullptr ? nullptr
1969 : static_cast<SubchannelCallRetryState*>(
1970 subchannel_call_->GetParentData());
1971 RetryCommit(elem, retry_state);
1972 // If we are not going to retry and have not yet started, pretend
1973 // retries are disabled so that we don't bother with retry overhead.
1974 if (num_attempts_completed_ == 0) {
1975 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1976 gpr_log(GPR_INFO,
1977 "chand=%p calld=%p: disabling retries before first attempt",
1978 chand, this);
1979 }
1980 enable_retries_ = false;
1981 }
1982 }
1983 }
1984 }
1985
PendingBatchClear(PendingBatch * pending)1986 void CallData::PendingBatchClear(PendingBatch* pending) {
1987 if (enable_retries_) {
1988 if (pending->batch->send_initial_metadata) {
1989 pending_send_initial_metadata_ = false;
1990 }
1991 if (pending->batch->send_message) {
1992 pending_send_message_ = false;
1993 }
1994 if (pending->batch->send_trailing_metadata) {
1995 pending_send_trailing_metadata_ = false;
1996 }
1997 }
1998 pending->batch = nullptr;
1999 }
2000
MaybeClearPendingBatch(grpc_call_element * elem,PendingBatch * pending)2001 void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
2002 PendingBatch* pending) {
2003 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2004 grpc_transport_stream_op_batch* batch = pending->batch;
2005 // We clear the pending batch if all of its callbacks have been
2006 // scheduled and reset to nullptr.
2007 if (batch->on_complete == nullptr &&
2008 (!batch->recv_initial_metadata ||
2009 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2010 nullptr) &&
2011 (!batch->recv_message ||
2012 batch->payload->recv_message.recv_message_ready == nullptr) &&
2013 (!batch->recv_trailing_metadata ||
2014 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2015 nullptr)) {
2016 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2017 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
2018 this);
2019 }
2020 PendingBatchClear(pending);
2021 }
2022 }
2023
2024 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)2025 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2026 grpc_transport_stream_op_batch* batch =
2027 static_cast<grpc_transport_stream_op_batch*>(arg);
2028 CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2029 // Note: This will release the call combiner.
2030 grpc_transport_stream_op_batch_finish_with_failure(
2031 batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2032 }
2033
2034 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_call_element * elem,grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)2035 void CallData::PendingBatchesFail(
2036 grpc_call_element* elem, grpc_error* error,
2037 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2038 GPR_ASSERT(error != GRPC_ERROR_NONE);
2039 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2040 size_t num_batches = 0;
2041 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2042 if (pending_batches_[i].batch != nullptr) ++num_batches;
2043 }
2044 gpr_log(GPR_INFO,
2045 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2046 elem->channel_data, this, num_batches, grpc_error_string(error));
2047 }
2048 CallCombinerClosureList closures;
2049 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2050 PendingBatch* pending = &pending_batches_[i];
2051 grpc_transport_stream_op_batch* batch = pending->batch;
2052 if (batch != nullptr) {
2053 if (batch->recv_trailing_metadata) {
2054 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2055 }
2056 batch->handler_private.extra_arg = this;
2057 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2058 FailPendingBatchInCallCombiner, batch,
2059 grpc_schedule_on_exec_ctx);
2060 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2061 "PendingBatchesFail");
2062 PendingBatchClear(pending);
2063 }
2064 }
2065 if (yield_call_combiner_predicate(closures)) {
2066 closures.RunClosures(call_combiner_);
2067 } else {
2068 closures.RunClosuresWithoutYielding(call_combiner_);
2069 }
2070 GRPC_ERROR_UNREF(error);
2071 }
2072
2073 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error * ignored)2074 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2075 grpc_error* ignored) {
2076 grpc_transport_stream_op_batch* batch =
2077 static_cast<grpc_transport_stream_op_batch*>(arg);
2078 SubchannelCall* subchannel_call =
2079 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2080 // Note: This will release the call combiner.
2081 subchannel_call->StartTransportStreamOpBatch(batch);
2082 }
2083
2084 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume(grpc_call_element * elem)2085 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2086 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2087 if (enable_retries_) {
2088 StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
2089 return;
2090 }
2091 // Retries not enabled; send down batches as-is.
2092 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2093 size_t num_batches = 0;
2094 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2095 if (pending_batches_[i].batch != nullptr) ++num_batches;
2096 }
2097 gpr_log(GPR_INFO,
2098 "chand=%p calld=%p: starting %" PRIuPTR
2099 " pending batches on subchannel_call=%p",
2100 chand, this, num_batches, subchannel_call_.get());
2101 }
2102 CallCombinerClosureList closures;
2103 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2104 PendingBatch* pending = &pending_batches_[i];
2105 grpc_transport_stream_op_batch* batch = pending->batch;
2106 if (batch != nullptr) {
2107 if (batch->recv_trailing_metadata) {
2108 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2109 }
2110 batch->handler_private.extra_arg = subchannel_call_.get();
2111 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2112 ResumePendingBatchInCallCombiner, batch,
2113 grpc_schedule_on_exec_ctx);
2114 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2115 "PendingBatchesResume");
2116 PendingBatchClear(pending);
2117 }
2118 }
2119 // Note: This will release the call combiner.
2120 closures.RunClosures(call_combiner_);
2121 }
2122
2123 template <typename Predicate>
PendingBatchFind(grpc_call_element * elem,const char * log_message,Predicate predicate)2124 CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
2125 const char* log_message,
2126 Predicate predicate) {
2127 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2128 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2129 PendingBatch* pending = &pending_batches_[i];
2130 grpc_transport_stream_op_batch* batch = pending->batch;
2131 if (batch != nullptr && predicate(batch)) {
2132 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2133 gpr_log(GPR_INFO,
2134 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
2135 this, log_message, i);
2136 }
2137 return pending;
2138 }
2139 }
2140 return nullptr;
2141 }
2142
2143 //
2144 // retry code
2145 //
2146
RetryCommit(grpc_call_element * elem,SubchannelCallRetryState * retry_state)2147 void CallData::RetryCommit(grpc_call_element* elem,
2148 SubchannelCallRetryState* retry_state) {
2149 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2150 if (retry_committed_) return;
2151 retry_committed_ = true;
2152 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2153 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
2154 }
2155 if (retry_state != nullptr) {
2156 FreeCachedSendOpDataAfterCommit(elem, retry_state);
2157 }
2158 }
2159
DoRetry(grpc_call_element * elem,SubchannelCallRetryState * retry_state,grpc_millis server_pushback_ms)2160 void CallData::DoRetry(grpc_call_element* elem,
2161 SubchannelCallRetryState* retry_state,
2162 grpc_millis server_pushback_ms) {
2163 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2164 GPR_ASSERT(method_params_ != nullptr);
2165 const auto* retry_policy = method_params_->retry_policy();
2166 GPR_ASSERT(retry_policy != nullptr);
2167 // Reset subchannel call.
2168 subchannel_call_.reset();
2169 // Compute backoff delay.
2170 grpc_millis next_attempt_time;
2171 if (server_pushback_ms >= 0) {
2172 next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2173 last_attempt_got_server_pushback_ = true;
2174 } else {
2175 if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2176 retry_backoff_.Init(
2177 BackOff::Options()
2178 .set_initial_backoff(retry_policy->initial_backoff)
2179 .set_multiplier(retry_policy->backoff_multiplier)
2180 .set_jitter(RETRY_BACKOFF_JITTER)
2181 .set_max_backoff(retry_policy->max_backoff));
2182 last_attempt_got_server_pushback_ = false;
2183 }
2184 next_attempt_time = retry_backoff_->NextAttemptTime();
2185 }
2186 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2187 gpr_log(GPR_INFO,
2188 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
2189 this, next_attempt_time - ExecCtx::Get()->Now());
2190 }
2191 // Schedule retry after computed delay.
2192 GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem,
2193 grpc_combiner_scheduler(chand->data_plane_combiner()));
2194 grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
2195 // Update bookkeeping.
2196 if (retry_state != nullptr) retry_state->retry_dispatched = true;
2197 }
2198
MaybeRetry(grpc_call_element * elem,SubchannelCallBatchData * batch_data,grpc_status_code status,grpc_mdelem * server_pushback_md)2199 bool CallData::MaybeRetry(grpc_call_element* elem,
2200 SubchannelCallBatchData* batch_data,
2201 grpc_status_code status,
2202 grpc_mdelem* server_pushback_md) {
2203 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2204 // Get retry policy.
2205 if (method_params_ == nullptr) return false;
2206 const auto* retry_policy = method_params_->retry_policy();
2207 if (retry_policy == nullptr) return false;
2208 // If we've already dispatched a retry from this call, return true.
2209 // This catches the case where the batch has multiple callbacks
2210 // (i.e., it includes either recv_message or recv_initial_metadata).
2211 SubchannelCallRetryState* retry_state = nullptr;
2212 if (batch_data != nullptr) {
2213 retry_state = static_cast<SubchannelCallRetryState*>(
2214 batch_data->subchannel_call->GetParentData());
2215 if (retry_state->retry_dispatched) {
2216 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2217 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
2218 this);
2219 }
2220 return true;
2221 }
2222 }
2223 // Check status.
2224 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
2225 if (retry_throttle_data_ != nullptr) {
2226 retry_throttle_data_->RecordSuccess();
2227 }
2228 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2229 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
2230 }
2231 return false;
2232 }
2233 // Status is not OK. Check whether the status is retryable.
2234 if (!retry_policy->retryable_status_codes.Contains(status)) {
2235 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2236 gpr_log(GPR_INFO,
2237 "chand=%p calld=%p: status %s not configured as retryable", chand,
2238 this, grpc_status_code_to_string(status));
2239 }
2240 return false;
2241 }
2242 // Record the failure and check whether retries are throttled.
2243 // Note that it's important for this check to come after the status
2244 // code check above, since we should only record failures whose statuses
2245 // match the configured retryable status codes, so that we don't count
2246 // things like failures due to malformed requests (INVALID_ARGUMENT).
2247 // Conversely, it's important for this to come before the remaining
2248 // checks, so that we don't fail to record failures due to other factors.
2249 if (retry_throttle_data_ != nullptr &&
2250 !retry_throttle_data_->RecordFailure()) {
2251 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2252 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
2253 }
2254 return false;
2255 }
2256 // Check whether the call is committed.
2257 if (retry_committed_) {
2258 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2259 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
2260 this);
2261 }
2262 return false;
2263 }
2264 // Check whether we have retries remaining.
2265 ++num_attempts_completed_;
2266 if (num_attempts_completed_ >= retry_policy->max_attempts) {
2267 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2268 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
2269 this, retry_policy->max_attempts);
2270 }
2271 return false;
2272 }
2273 // If the call was cancelled from the surface, don't retry.
2274 if (cancel_error_ != GRPC_ERROR_NONE) {
2275 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2276 gpr_log(GPR_INFO,
2277 "chand=%p calld=%p: call cancelled from surface, not retrying",
2278 chand, this);
2279 }
2280 return false;
2281 }
2282 // Check server push-back.
2283 grpc_millis server_pushback_ms = -1;
2284 if (server_pushback_md != nullptr) {
2285 // If the value is "-1" or any other unparseable string, we do not retry.
2286 uint32_t ms;
2287 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
2288 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2289 gpr_log(GPR_INFO,
2290 "chand=%p calld=%p: not retrying due to server push-back",
2291 chand, this);
2292 }
2293 return false;
2294 } else {
2295 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2296 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
2297 chand, this, ms);
2298 }
2299 server_pushback_ms = (grpc_millis)ms;
2300 }
2301 }
2302 DoRetry(elem, retry_state, server_pushback_ms);
2303 return true;
2304 }
2305
2306 //
2307 // CallData::SubchannelCallBatchData
2308 //
2309
Create(grpc_call_element * elem,int refcount,bool set_on_complete)2310 CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
2311 grpc_call_element* elem, int refcount, bool set_on_complete) {
2312 CallData* calld = static_cast<CallData*>(elem->call_data);
2313 return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
2314 set_on_complete);
2315 }
2316
SubchannelCallBatchData(grpc_call_element * elem,CallData * calld,int refcount,bool set_on_complete)2317 CallData::SubchannelCallBatchData::SubchannelCallBatchData(
2318 grpc_call_element* elem, CallData* calld, int refcount,
2319 bool set_on_complete)
2320 : elem(elem), subchannel_call(calld->subchannel_call_) {
2321 SubchannelCallRetryState* retry_state =
2322 static_cast<SubchannelCallRetryState*>(
2323 calld->subchannel_call_->GetParentData());
2324 batch.payload = &retry_state->batch_payload;
2325 gpr_ref_init(&refs, refcount);
2326 if (set_on_complete) {
2327 GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
2328 grpc_schedule_on_exec_ctx);
2329 batch.on_complete = &on_complete;
2330 }
2331 GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
2332 }
2333
Destroy()2334 void CallData::SubchannelCallBatchData::Destroy() {
2335 SubchannelCallRetryState* retry_state =
2336 static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
2337 if (batch.send_initial_metadata) {
2338 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
2339 }
2340 if (batch.send_trailing_metadata) {
2341 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
2342 }
2343 if (batch.recv_initial_metadata) {
2344 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
2345 }
2346 if (batch.recv_trailing_metadata) {
2347 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
2348 }
2349 subchannel_call.reset();
2350 CallData* calld = static_cast<CallData*>(elem->call_data);
2351 GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
2352 }
2353
2354 //
2355 // recv_initial_metadata callback handling
2356 //
2357
InvokeRecvInitialMetadataCallback(void * arg,grpc_error * error)2358 void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
2359 SubchannelCallBatchData* batch_data =
2360 static_cast<SubchannelCallBatchData*>(arg);
2361 CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2362 // Find pending batch.
2363 PendingBatch* pending = calld->PendingBatchFind(
2364 batch_data->elem, "invoking recv_initial_metadata_ready for",
2365 [](grpc_transport_stream_op_batch* batch) {
2366 return batch->recv_initial_metadata &&
2367 batch->payload->recv_initial_metadata
2368 .recv_initial_metadata_ready != nullptr;
2369 });
2370 GPR_ASSERT(pending != nullptr);
2371 // Return metadata.
2372 SubchannelCallRetryState* retry_state =
2373 static_cast<SubchannelCallRetryState*>(
2374 batch_data->subchannel_call->GetParentData());
2375 grpc_metadata_batch_move(
2376 &retry_state->recv_initial_metadata,
2377 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
2378 // Update bookkeeping.
2379 // Note: Need to do this before invoking the callback, since invoking
2380 // the callback will result in yielding the call combiner.
2381 grpc_closure* recv_initial_metadata_ready =
2382 pending->batch->payload->recv_initial_metadata
2383 .recv_initial_metadata_ready;
2384 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2385 nullptr;
2386 calld->MaybeClearPendingBatch(batch_data->elem, pending);
2387 batch_data->Unref();
2388 // Invoke callback.
2389 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
2390 }
2391
RecvInitialMetadataReady(void * arg,grpc_error * error)2392 void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
2393 SubchannelCallBatchData* batch_data =
2394 static_cast<SubchannelCallBatchData*>(arg);
2395 grpc_call_element* elem = batch_data->elem;
2396 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2397 CallData* calld = static_cast<CallData*>(elem->call_data);
2398 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2399 gpr_log(GPR_INFO,
2400 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
2401 chand, calld, grpc_error_string(error));
2402 }
2403 SubchannelCallRetryState* retry_state =
2404 static_cast<SubchannelCallRetryState*>(
2405 batch_data->subchannel_call->GetParentData());
2406 retry_state->completed_recv_initial_metadata = true;
2407 // If a retry was already dispatched, then we're not going to use the
2408 // result of this recv_initial_metadata op, so do nothing.
2409 if (retry_state->retry_dispatched) {
2410 GRPC_CALL_COMBINER_STOP(
2411 calld->call_combiner_,
2412 "recv_initial_metadata_ready after retry dispatched");
2413 return;
2414 }
2415 // If we got an error or a Trailers-Only response and have not yet gotten
2416 // the recv_trailing_metadata_ready callback, then defer propagating this
2417 // callback back to the surface. We can evaluate whether to retry when
2418 // recv_trailing_metadata comes back.
2419 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
2420 error != GRPC_ERROR_NONE) &&
2421 !retry_state->completed_recv_trailing_metadata)) {
2422 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2423 gpr_log(GPR_INFO,
2424 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
2425 "(Trailers-Only)",
2426 chand, calld);
2427 }
2428 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
2429 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
2430 if (!retry_state->started_recv_trailing_metadata) {
2431 // recv_trailing_metadata not yet started by application; start it
2432 // ourselves to get status.
2433 calld->StartInternalRecvTrailingMetadata(elem);
2434 } else {
2435 GRPC_CALL_COMBINER_STOP(
2436 calld->call_combiner_,
2437 "recv_initial_metadata_ready trailers-only or error");
2438 }
2439 return;
2440 }
2441 // Received valid initial metadata, so commit the call.
2442 calld->RetryCommit(elem, retry_state);
2443 // Invoke the callback to return the result to the surface.
2444 // Manually invoking a callback function; it does not take ownership of error.
2445 calld->InvokeRecvInitialMetadataCallback(batch_data, error);
2446 }
2447
2448 //
2449 // recv_message callback handling
2450 //
2451
InvokeRecvMessageCallback(void * arg,grpc_error * error)2452 void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
2453 SubchannelCallBatchData* batch_data =
2454 static_cast<SubchannelCallBatchData*>(arg);
2455 CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2456 // Find pending op.
2457 PendingBatch* pending = calld->PendingBatchFind(
2458 batch_data->elem, "invoking recv_message_ready for",
2459 [](grpc_transport_stream_op_batch* batch) {
2460 return batch->recv_message &&
2461 batch->payload->recv_message.recv_message_ready != nullptr;
2462 });
2463 GPR_ASSERT(pending != nullptr);
2464 // Return payload.
2465 SubchannelCallRetryState* retry_state =
2466 static_cast<SubchannelCallRetryState*>(
2467 batch_data->subchannel_call->GetParentData());
2468 *pending->batch->payload->recv_message.recv_message =
2469 std::move(retry_state->recv_message);
2470 // Update bookkeeping.
2471 // Note: Need to do this before invoking the callback, since invoking
2472 // the callback will result in yielding the call combiner.
2473 grpc_closure* recv_message_ready =
2474 pending->batch->payload->recv_message.recv_message_ready;
2475 pending->batch->payload->recv_message.recv_message_ready = nullptr;
2476 calld->MaybeClearPendingBatch(batch_data->elem, pending);
2477 batch_data->Unref();
2478 // Invoke callback.
2479 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
2480 }
2481
RecvMessageReady(void * arg,grpc_error * error)2482 void CallData::RecvMessageReady(void* arg, grpc_error* error) {
2483 SubchannelCallBatchData* batch_data =
2484 static_cast<SubchannelCallBatchData*>(arg);
2485 grpc_call_element* elem = batch_data->elem;
2486 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2487 CallData* calld = static_cast<CallData*>(elem->call_data);
2488 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2489 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
2490 chand, calld, grpc_error_string(error));
2491 }
2492 SubchannelCallRetryState* retry_state =
2493 static_cast<SubchannelCallRetryState*>(
2494 batch_data->subchannel_call->GetParentData());
2495 ++retry_state->completed_recv_message_count;
2496 // If a retry was already dispatched, then we're not going to use the
2497 // result of this recv_message op, so do nothing.
2498 if (retry_state->retry_dispatched) {
2499 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2500 "recv_message_ready after retry dispatched");
2501 return;
2502 }
2503 // If we got an error or the payload was nullptr and we have not yet gotten
2504 // the recv_trailing_metadata_ready callback, then defer propagating this
2505 // callback back to the surface. We can evaluate whether to retry when
2506 // recv_trailing_metadata comes back.
2507 if (GPR_UNLIKELY(
2508 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
2509 !retry_state->completed_recv_trailing_metadata)) {
2510 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2511 gpr_log(GPR_INFO,
2512 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
2513 "message and recv_trailing_metadata pending)",
2514 chand, calld);
2515 }
2516 retry_state->recv_message_ready_deferred_batch = batch_data;
2517 retry_state->recv_message_error = GRPC_ERROR_REF(error);
2518 if (!retry_state->started_recv_trailing_metadata) {
2519 // recv_trailing_metadata not yet started by application; start it
2520 // ourselves to get status.
2521 calld->StartInternalRecvTrailingMetadata(elem);
2522 } else {
2523 GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
2524 }
2525 return;
2526 }
2527 // Received a valid message, so commit the call.
2528 calld->RetryCommit(elem, retry_state);
2529 // Invoke the callback to return the result to the surface.
2530 // Manually invoking a callback function; it does not take ownership of error.
2531 calld->InvokeRecvMessageCallback(batch_data, error);
2532 }
2533
2534 //
2535 // recv_trailing_metadata handling
2536 //
2537
GetCallStatus(grpc_call_element * elem,grpc_metadata_batch * md_batch,grpc_error * error,grpc_status_code * status,grpc_mdelem ** server_pushback_md)2538 void CallData::GetCallStatus(grpc_call_element* elem,
2539 grpc_metadata_batch* md_batch, grpc_error* error,
2540 grpc_status_code* status,
2541 grpc_mdelem** server_pushback_md) {
2542 if (error != GRPC_ERROR_NONE) {
2543 grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
2544 } else {
2545 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
2546 *status =
2547 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
2548 if (server_pushback_md != nullptr &&
2549 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
2550 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
2551 }
2552 }
2553 GRPC_ERROR_UNREF(error);
2554 }
2555
AddClosureForRecvTrailingMetadataReady(grpc_call_element * elem,SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)2556 void CallData::AddClosureForRecvTrailingMetadataReady(
2557 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2558 grpc_error* error, CallCombinerClosureList* closures) {
2559 // Find pending batch.
2560 PendingBatch* pending = PendingBatchFind(
2561 elem, "invoking recv_trailing_metadata for",
2562 [](grpc_transport_stream_op_batch* batch) {
2563 return batch->recv_trailing_metadata &&
2564 batch->payload->recv_trailing_metadata
2565 .recv_trailing_metadata_ready != nullptr;
2566 });
2567 // If we generated the recv_trailing_metadata op internally via
2568 // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
2569 if (pending == nullptr) {
2570 GRPC_ERROR_UNREF(error);
2571 return;
2572 }
2573 // Return metadata.
2574 SubchannelCallRetryState* retry_state =
2575 static_cast<SubchannelCallRetryState*>(
2576 batch_data->subchannel_call->GetParentData());
2577 grpc_metadata_batch_move(
2578 &retry_state->recv_trailing_metadata,
2579 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
2580 // Add closure.
2581 closures->Add(pending->batch->payload->recv_trailing_metadata
2582 .recv_trailing_metadata_ready,
2583 error, "recv_trailing_metadata_ready for pending batch");
2584 // Update bookkeeping.
2585 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2586 nullptr;
2587 MaybeClearPendingBatch(elem, pending);
2588 }
2589
AddClosuresForDeferredRecvCallbacks(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)2590 void CallData::AddClosuresForDeferredRecvCallbacks(
2591 SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
2592 CallCombinerClosureList* closures) {
2593 if (batch_data->batch.recv_trailing_metadata) {
2594 // Add closure for deferred recv_initial_metadata_ready.
2595 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
2596 nullptr)) {
2597 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2598 InvokeRecvInitialMetadataCallback,
2599 retry_state->recv_initial_metadata_ready_deferred_batch,
2600 grpc_schedule_on_exec_ctx);
2601 closures->Add(&retry_state->recv_initial_metadata_ready,
2602 retry_state->recv_initial_metadata_error,
2603 "resuming recv_initial_metadata_ready");
2604 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
2605 }
2606 // Add closure for deferred recv_message_ready.
2607 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
2608 nullptr)) {
2609 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
2610 InvokeRecvMessageCallback,
2611 retry_state->recv_message_ready_deferred_batch,
2612 grpc_schedule_on_exec_ctx);
2613 closures->Add(&retry_state->recv_message_ready,
2614 retry_state->recv_message_error,
2615 "resuming recv_message_ready");
2616 retry_state->recv_message_ready_deferred_batch = nullptr;
2617 }
2618 }
2619 }
2620
PendingBatchIsUnstarted(PendingBatch * pending,SubchannelCallRetryState * retry_state)2621 bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
2622 SubchannelCallRetryState* retry_state) {
2623 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
2624 return false;
2625 }
2626 if (pending->batch->send_initial_metadata &&
2627 !retry_state->started_send_initial_metadata) {
2628 return true;
2629 }
2630 if (pending->batch->send_message &&
2631 retry_state->started_send_message_count < send_messages_.size()) {
2632 return true;
2633 }
2634 if (pending->batch->send_trailing_metadata &&
2635 !retry_state->started_send_trailing_metadata) {
2636 return true;
2637 }
2638 return false;
2639 }
2640
AddClosuresToFailUnstartedPendingBatches(grpc_call_element * elem,SubchannelCallRetryState * retry_state,grpc_error * error,CallCombinerClosureList * closures)2641 void CallData::AddClosuresToFailUnstartedPendingBatches(
2642 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2643 grpc_error* error, CallCombinerClosureList* closures) {
2644 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2645 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2646 PendingBatch* pending = &pending_batches_[i];
2647 if (PendingBatchIsUnstarted(pending, retry_state)) {
2648 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2649 gpr_log(GPR_INFO,
2650 "chand=%p calld=%p: failing unstarted pending batch at index "
2651 "%" PRIuPTR,
2652 chand, this, i);
2653 }
2654 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
2655 "failing on_complete for pending batch");
2656 pending->batch->on_complete = nullptr;
2657 MaybeClearPendingBatch(elem, pending);
2658 }
2659 }
2660 GRPC_ERROR_UNREF(error);
2661 }
2662
RunClosuresForCompletedCall(SubchannelCallBatchData * batch_data,grpc_error * error)2663 void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
2664 grpc_error* error) {
2665 grpc_call_element* elem = batch_data->elem;
2666 SubchannelCallRetryState* retry_state =
2667 static_cast<SubchannelCallRetryState*>(
2668 batch_data->subchannel_call->GetParentData());
2669 // Construct list of closures to execute.
2670 CallCombinerClosureList closures;
2671 // First, add closure for recv_trailing_metadata_ready.
2672 AddClosureForRecvTrailingMetadataReady(elem, batch_data,
2673 GRPC_ERROR_REF(error), &closures);
2674 // If there are deferred recv_initial_metadata_ready or recv_message_ready
2675 // callbacks, add them to closures.
2676 AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
2677 // Add closures to fail any pending batches that have not yet been started.
2678 AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
2679 GRPC_ERROR_REF(error), &closures);
2680 // Don't need batch_data anymore.
2681 batch_data->Unref();
2682 // Schedule all of the closures identified above.
2683 // Note: This will release the call combiner.
2684 closures.RunClosures(call_combiner_);
2685 GRPC_ERROR_UNREF(error);
2686 }
2687
RecvTrailingMetadataReady(void * arg,grpc_error * error)2688 void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
2689 SubchannelCallBatchData* batch_data =
2690 static_cast<SubchannelCallBatchData*>(arg);
2691 grpc_call_element* elem = batch_data->elem;
2692 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2693 CallData* calld = static_cast<CallData*>(elem->call_data);
2694 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2695 gpr_log(GPR_INFO,
2696 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
2697 chand, calld, grpc_error_string(error));
2698 }
2699 SubchannelCallRetryState* retry_state =
2700 static_cast<SubchannelCallRetryState*>(
2701 batch_data->subchannel_call->GetParentData());
2702 retry_state->completed_recv_trailing_metadata = true;
2703 // Get the call's status and check for server pushback metadata.
2704 grpc_status_code status = GRPC_STATUS_OK;
2705 grpc_mdelem* server_pushback_md = nullptr;
2706 grpc_metadata_batch* md_batch =
2707 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
2708 calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
2709 &server_pushback_md);
2710 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2711 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
2712 calld, grpc_status_code_to_string(status));
2713 }
2714 // Check if we should retry.
2715 if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
2716 // Unref batch_data for deferred recv_initial_metadata_ready or
2717 // recv_message_ready callbacks, if any.
2718 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
2719 batch_data->Unref();
2720 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2721 }
2722 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
2723 batch_data->Unref();
2724 GRPC_ERROR_UNREF(retry_state->recv_message_error);
2725 }
2726 batch_data->Unref();
2727 return;
2728 }
2729 // Not retrying, so commit the call.
2730 calld->RetryCommit(elem, retry_state);
2731 // Run any necessary closures.
2732 calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
2733 }
2734
2735 //
2736 // on_complete callback handling
2737 //
2738
AddClosuresForCompletedPendingBatch(grpc_call_element * elem,SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,grpc_error * error,CallCombinerClosureList * closures)2739 void CallData::AddClosuresForCompletedPendingBatch(
2740 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2741 SubchannelCallRetryState* retry_state, grpc_error* error,
2742 CallCombinerClosureList* closures) {
2743 PendingBatch* pending = PendingBatchFind(
2744 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
2745 // Match the pending batch with the same set of send ops as the
2746 // subchannel batch we've just completed.
2747 return batch->on_complete != nullptr &&
2748 batch_data->batch.send_initial_metadata ==
2749 batch->send_initial_metadata &&
2750 batch_data->batch.send_message == batch->send_message &&
2751 batch_data->batch.send_trailing_metadata ==
2752 batch->send_trailing_metadata;
2753 });
2754 // If batch_data is a replay batch, then there will be no pending
2755 // batch to complete.
2756 if (pending == nullptr) {
2757 GRPC_ERROR_UNREF(error);
2758 return;
2759 }
2760 // Add closure.
2761 closures->Add(pending->batch->on_complete, error,
2762 "on_complete for pending batch");
2763 pending->batch->on_complete = nullptr;
2764 MaybeClearPendingBatch(elem, pending);
2765 }
2766
AddClosuresForReplayOrPendingSendOps(grpc_call_element * elem,SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)2767 void CallData::AddClosuresForReplayOrPendingSendOps(
2768 grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2769 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
2770 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2771 bool have_pending_send_message_ops =
2772 retry_state->started_send_message_count < send_messages_.size();
2773 bool have_pending_send_trailing_metadata_op =
2774 seen_send_trailing_metadata_ &&
2775 !retry_state->started_send_trailing_metadata;
2776 if (!have_pending_send_message_ops &&
2777 !have_pending_send_trailing_metadata_op) {
2778 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2779 PendingBatch* pending = &pending_batches_[i];
2780 grpc_transport_stream_op_batch* batch = pending->batch;
2781 if (batch == nullptr || pending->send_ops_cached) continue;
2782 if (batch->send_message) have_pending_send_message_ops = true;
2783 if (batch->send_trailing_metadata) {
2784 have_pending_send_trailing_metadata_op = true;
2785 }
2786 }
2787 }
2788 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2789 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2790 gpr_log(GPR_INFO,
2791 "chand=%p calld=%p: starting next batch for pending send op(s)",
2792 chand, this);
2793 }
2794 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2795 StartRetriableSubchannelBatches, elem,
2796 grpc_schedule_on_exec_ctx);
2797 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2798 "starting next batch for send_* op(s)");
2799 }
2800 }
2801
OnComplete(void * arg,grpc_error * error)2802 void CallData::OnComplete(void* arg, grpc_error* error) {
2803 SubchannelCallBatchData* batch_data =
2804 static_cast<SubchannelCallBatchData*>(arg);
2805 grpc_call_element* elem = batch_data->elem;
2806 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2807 CallData* calld = static_cast<CallData*>(elem->call_data);
2808 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2809 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
2810 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
2811 chand, calld, grpc_error_string(error), batch_str);
2812 gpr_free(batch_str);
2813 }
2814 SubchannelCallRetryState* retry_state =
2815 static_cast<SubchannelCallRetryState*>(
2816 batch_data->subchannel_call->GetParentData());
2817 // Update bookkeeping in retry_state.
2818 if (batch_data->batch.send_initial_metadata) {
2819 retry_state->completed_send_initial_metadata = true;
2820 }
2821 if (batch_data->batch.send_message) {
2822 ++retry_state->completed_send_message_count;
2823 }
2824 if (batch_data->batch.send_trailing_metadata) {
2825 retry_state->completed_send_trailing_metadata = true;
2826 }
2827 // If the call is committed, free cached data for send ops that we've just
2828 // completed.
2829 if (calld->retry_committed_) {
2830 calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
2831 }
2832 // Construct list of closures to execute.
2833 CallCombinerClosureList closures;
2834 // If a retry was already dispatched, that means we saw
2835 // recv_trailing_metadata before this, so we do nothing here.
2836 // Otherwise, invoke the callback to return the result to the surface.
2837 if (!retry_state->retry_dispatched) {
2838 // Add closure for the completed pending batch, if any.
2839 calld->AddClosuresForCompletedPendingBatch(
2840 elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
2841 // If needed, add a callback to start any replay or pending send ops on
2842 // the subchannel call.
2843 if (!retry_state->completed_recv_trailing_metadata) {
2844 calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
2845 &closures);
2846 }
2847 }
2848 // Track number of pending subchannel send batches and determine if this
2849 // was the last one.
2850 --calld->num_pending_retriable_subchannel_send_batches_;
2851 const bool last_send_batch_complete =
2852 calld->num_pending_retriable_subchannel_send_batches_ == 0;
2853 // Don't need batch_data anymore.
2854 batch_data->Unref();
2855 // Schedule all of the closures identified above.
2856 // Note: This yeilds the call combiner.
2857 closures.RunClosures(calld->call_combiner_);
2858 // If this was the last subchannel send batch, unref the call stack.
2859 if (last_send_batch_complete) {
2860 GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
2861 }
2862 }
2863
2864 //
2865 // subchannel batch construction
2866 //
2867
StartBatchInCallCombiner(void * arg,grpc_error * ignored)2868 void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
2869 grpc_transport_stream_op_batch* batch =
2870 static_cast<grpc_transport_stream_op_batch*>(arg);
2871 SubchannelCall* subchannel_call =
2872 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2873 // Note: This will release the call combiner.
2874 subchannel_call->StartTransportStreamOpBatch(batch);
2875 }
2876
AddClosureForSubchannelBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch,CallCombinerClosureList * closures)2877 void CallData::AddClosureForSubchannelBatch(
2878 grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2879 CallCombinerClosureList* closures) {
2880 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2881 batch->handler_private.extra_arg = subchannel_call_.get();
2882 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
2883 batch, grpc_schedule_on_exec_ctx);
2884 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2885 char* batch_str = grpc_transport_stream_op_batch_string(batch);
2886 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2887 this, batch_str);
2888 gpr_free(batch_str);
2889 }
2890 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2891 "start_subchannel_batch");
2892 }
2893
AddRetriableSendInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2894 void CallData::AddRetriableSendInitialMetadataOp(
2895 SubchannelCallRetryState* retry_state,
2896 SubchannelCallBatchData* batch_data) {
2897 // Maps the number of retries to the corresponding metadata value slice.
2898 static const grpc_slice* retry_count_strings[] = {
2899 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2900 // We need to make a copy of the metadata batch for each attempt, since
2901 // the filters in the subchannel stack may modify this batch, and we don't
2902 // want those modifications to be passed forward to subsequent attempts.
2903 //
2904 // If we've already completed one or more attempts, add the
2905 // grpc-retry-attempts header.
2906 retry_state->send_initial_metadata_storage =
2907 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2908 sizeof(grpc_linked_mdelem) *
2909 (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
2910 grpc_metadata_batch_copy(&send_initial_metadata_,
2911 &retry_state->send_initial_metadata,
2912 retry_state->send_initial_metadata_storage);
2913 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2914 .grpc_previous_rpc_attempts != nullptr)) {
2915 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2916 retry_state->send_initial_metadata.idx.named
2917 .grpc_previous_rpc_attempts);
2918 }
2919 if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
2920 grpc_mdelem retry_md = grpc_mdelem_create(
2921 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2922 *retry_count_strings[num_attempts_completed_ - 1], nullptr);
2923 grpc_error* error = grpc_metadata_batch_add_tail(
2924 &retry_state->send_initial_metadata,
2925 &retry_state
2926 ->send_initial_metadata_storage[send_initial_metadata_.list.count],
2927 retry_md);
2928 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2929 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2930 grpc_error_string(error));
2931 GPR_ASSERT(false);
2932 }
2933 }
2934 retry_state->started_send_initial_metadata = true;
2935 batch_data->batch.send_initial_metadata = true;
2936 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2937 &retry_state->send_initial_metadata;
2938 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2939 send_initial_metadata_flags_;
2940 batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
2941 }
2942
AddRetriableSendMessageOp(grpc_call_element * elem,SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2943 void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
2944 SubchannelCallRetryState* retry_state,
2945 SubchannelCallBatchData* batch_data) {
2946 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2947 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2948 gpr_log(GPR_INFO,
2949 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2950 chand, this, retry_state->started_send_message_count);
2951 }
2952 ByteStreamCache* cache =
2953 send_messages_[retry_state->started_send_message_count];
2954 ++retry_state->started_send_message_count;
2955 retry_state->send_message.Init(cache);
2956 batch_data->batch.send_message = true;
2957 batch_data->batch.payload->send_message.send_message.reset(
2958 retry_state->send_message.get());
2959 }
2960
AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2961 void CallData::AddRetriableSendTrailingMetadataOp(
2962 SubchannelCallRetryState* retry_state,
2963 SubchannelCallBatchData* batch_data) {
2964 // We need to make a copy of the metadata batch for each attempt, since
2965 // the filters in the subchannel stack may modify this batch, and we don't
2966 // want those modifications to be passed forward to subsequent attempts.
2967 retry_state->send_trailing_metadata_storage =
2968 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2969 sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
2970 grpc_metadata_batch_copy(&send_trailing_metadata_,
2971 &retry_state->send_trailing_metadata,
2972 retry_state->send_trailing_metadata_storage);
2973 retry_state->started_send_trailing_metadata = true;
2974 batch_data->batch.send_trailing_metadata = true;
2975 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2976 &retry_state->send_trailing_metadata;
2977 }
2978
AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2979 void CallData::AddRetriableRecvInitialMetadataOp(
2980 SubchannelCallRetryState* retry_state,
2981 SubchannelCallBatchData* batch_data) {
2982 retry_state->started_recv_initial_metadata = true;
2983 batch_data->batch.recv_initial_metadata = true;
2984 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2985 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2986 &retry_state->recv_initial_metadata;
2987 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2988 &retry_state->trailing_metadata_available;
2989 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2990 RecvInitialMetadataReady, batch_data,
2991 grpc_schedule_on_exec_ctx);
2992 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2993 &retry_state->recv_initial_metadata_ready;
2994 }
2995
AddRetriableRecvMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2996 void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
2997 SubchannelCallBatchData* batch_data) {
2998 ++retry_state->started_recv_message_count;
2999 batch_data->batch.recv_message = true;
3000 batch_data->batch.payload->recv_message.recv_message =
3001 &retry_state->recv_message;
3002 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
3003 batch_data, grpc_schedule_on_exec_ctx);
3004 batch_data->batch.payload->recv_message.recv_message_ready =
3005 &retry_state->recv_message_ready;
3006 }
3007
AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)3008 void CallData::AddRetriableRecvTrailingMetadataOp(
3009 SubchannelCallRetryState* retry_state,
3010 SubchannelCallBatchData* batch_data) {
3011 retry_state->started_recv_trailing_metadata = true;
3012 batch_data->batch.recv_trailing_metadata = true;
3013 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
3014 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
3015 &retry_state->recv_trailing_metadata;
3016 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
3017 &retry_state->collect_stats;
3018 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
3019 RecvTrailingMetadataReady, batch_data,
3020 grpc_schedule_on_exec_ctx);
3021 batch_data->batch.payload->recv_trailing_metadata
3022 .recv_trailing_metadata_ready =
3023 &retry_state->recv_trailing_metadata_ready;
3024 MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
3025 &batch_data->batch);
3026 }
3027
StartInternalRecvTrailingMetadata(grpc_call_element * elem)3028 void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
3029 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3030 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3031 gpr_log(GPR_INFO,
3032 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
3033 "started; starting it internally",
3034 chand, this);
3035 }
3036 SubchannelCallRetryState* retry_state =
3037 static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
3038 // Create batch_data with 2 refs, since this batch will be unreffed twice:
3039 // once for the recv_trailing_metadata_ready callback when the subchannel
3040 // batch returns, and again when we actually get a recv_trailing_metadata
3041 // op from the surface.
3042 SubchannelCallBatchData* batch_data =
3043 SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
3044 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3045 retry_state->recv_trailing_metadata_internal_batch = batch_data;
3046 // Note: This will release the call combiner.
3047 subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
3048 }
3049
3050 // If there are any cached send ops that need to be replayed on the
3051 // current subchannel call, creates and returns a new subchannel batch
3052 // to replay those ops. Otherwise, returns nullptr.
3053 CallData::SubchannelCallBatchData*
MaybeCreateSubchannelBatchForReplay(grpc_call_element * elem,SubchannelCallRetryState * retry_state)3054 CallData::MaybeCreateSubchannelBatchForReplay(
3055 grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
3056 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3057 SubchannelCallBatchData* replay_batch_data = nullptr;
3058 // send_initial_metadata.
3059 if (seen_send_initial_metadata_ &&
3060 !retry_state->started_send_initial_metadata &&
3061 !pending_send_initial_metadata_) {
3062 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3063 gpr_log(GPR_INFO,
3064 "chand=%p calld=%p: replaying previously completed "
3065 "send_initial_metadata op",
3066 chand, this);
3067 }
3068 replay_batch_data =
3069 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3070 AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
3071 }
3072 // send_message.
3073 // Note that we can only have one send_message op in flight at a time.
3074 if (retry_state->started_send_message_count < send_messages_.size() &&
3075 retry_state->started_send_message_count ==
3076 retry_state->completed_send_message_count &&
3077 !pending_send_message_) {
3078 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3079 gpr_log(GPR_INFO,
3080 "chand=%p calld=%p: replaying previously completed "
3081 "send_message op",
3082 chand, this);
3083 }
3084 if (replay_batch_data == nullptr) {
3085 replay_batch_data =
3086 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3087 }
3088 AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
3089 }
3090 // send_trailing_metadata.
3091 // Note that we only add this op if we have no more send_message ops
3092 // to start, since we can't send down any more send_message ops after
3093 // send_trailing_metadata.
3094 if (seen_send_trailing_metadata_ &&
3095 retry_state->started_send_message_count == send_messages_.size() &&
3096 !retry_state->started_send_trailing_metadata &&
3097 !pending_send_trailing_metadata_) {
3098 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3099 gpr_log(GPR_INFO,
3100 "chand=%p calld=%p: replaying previously completed "
3101 "send_trailing_metadata op",
3102 chand, this);
3103 }
3104 if (replay_batch_data == nullptr) {
3105 replay_batch_data =
3106 SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3107 }
3108 AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
3109 }
3110 return replay_batch_data;
3111 }
3112
AddSubchannelBatchesForPendingBatches(grpc_call_element * elem,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)3113 void CallData::AddSubchannelBatchesForPendingBatches(
3114 grpc_call_element* elem, SubchannelCallRetryState* retry_state,
3115 CallCombinerClosureList* closures) {
3116 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3117 PendingBatch* pending = &pending_batches_[i];
3118 grpc_transport_stream_op_batch* batch = pending->batch;
3119 if (batch == nullptr) continue;
3120 // Skip any batch that either (a) has already been started on this
3121 // subchannel call or (b) we can't start yet because we're still
3122 // replaying send ops that need to be completed first.
3123 // TODO(roth): Note that if any one op in the batch can't be sent
3124 // yet due to ops that we're replaying, we don't start any of the ops
3125 // in the batch. This is probably okay, but it could conceivably
3126 // lead to increased latency in some cases -- e.g., we could delay
3127 // starting a recv op due to it being in the same batch with a send
3128 // op. If/when we revamp the callback protocol in
3129 // transport_stream_op_batch, we may be able to fix this.
3130 if (batch->send_initial_metadata &&
3131 retry_state->started_send_initial_metadata) {
3132 continue;
3133 }
3134 if (batch->send_message && retry_state->completed_send_message_count <
3135 retry_state->started_send_message_count) {
3136 continue;
3137 }
3138 // Note that we only start send_trailing_metadata if we have no more
3139 // send_message ops to start, since we can't send down any more
3140 // send_message ops after send_trailing_metadata.
3141 if (batch->send_trailing_metadata &&
3142 (retry_state->started_send_message_count + batch->send_message <
3143 send_messages_.size() ||
3144 retry_state->started_send_trailing_metadata)) {
3145 continue;
3146 }
3147 if (batch->recv_initial_metadata &&
3148 retry_state->started_recv_initial_metadata) {
3149 continue;
3150 }
3151 if (batch->recv_message && retry_state->completed_recv_message_count <
3152 retry_state->started_recv_message_count) {
3153 continue;
3154 }
3155 if (batch->recv_trailing_metadata &&
3156 retry_state->started_recv_trailing_metadata) {
3157 // If we previously completed a recv_trailing_metadata op
3158 // initiated by StartInternalRecvTrailingMetadata(), use the
3159 // result of that instead of trying to re-start this op.
3160 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
3161 nullptr))) {
3162 // If the batch completed, then trigger the completion callback
3163 // directly, so that we return the previously returned results to
3164 // the application. Otherwise, just unref the internally
3165 // started subchannel batch, since we'll propagate the
3166 // completion when it completes.
3167 if (retry_state->completed_recv_trailing_metadata) {
3168 // Batches containing recv_trailing_metadata always succeed.
3169 closures->Add(
3170 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
3171 "re-executing recv_trailing_metadata_ready to propagate "
3172 "internally triggered result");
3173 } else {
3174 retry_state->recv_trailing_metadata_internal_batch->Unref();
3175 }
3176 retry_state->recv_trailing_metadata_internal_batch = nullptr;
3177 }
3178 continue;
3179 }
3180 // If we're not retrying, just send the batch as-is.
3181 if (method_params_ == nullptr ||
3182 method_params_->retry_policy() == nullptr || retry_committed_) {
3183 // TODO(roth) : We should probably call
3184 // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
3185 AddClosureForSubchannelBatch(elem, batch, closures);
3186 PendingBatchClear(pending);
3187 continue;
3188 }
3189 // Create batch with the right number of callbacks.
3190 const bool has_send_ops = batch->send_initial_metadata ||
3191 batch->send_message ||
3192 batch->send_trailing_metadata;
3193 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
3194 batch->recv_message +
3195 batch->recv_trailing_metadata;
3196 SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
3197 elem, num_callbacks, has_send_ops /* set_on_complete */);
3198 // Cache send ops if needed.
3199 MaybeCacheSendOpsForBatch(pending);
3200 // send_initial_metadata.
3201 if (batch->send_initial_metadata) {
3202 AddRetriableSendInitialMetadataOp(retry_state, batch_data);
3203 }
3204 // send_message.
3205 if (batch->send_message) {
3206 AddRetriableSendMessageOp(elem, retry_state, batch_data);
3207 }
3208 // send_trailing_metadata.
3209 if (batch->send_trailing_metadata) {
3210 AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
3211 }
3212 // recv_initial_metadata.
3213 if (batch->recv_initial_metadata) {
3214 // recv_flags is only used on the server side.
3215 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
3216 AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
3217 }
3218 // recv_message.
3219 if (batch->recv_message) {
3220 AddRetriableRecvMessageOp(retry_state, batch_data);
3221 }
3222 // recv_trailing_metadata.
3223 if (batch->recv_trailing_metadata) {
3224 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3225 }
3226 AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
3227 // Track number of pending subchannel send batches.
3228 // If this is the first one, take a ref to the call stack.
3229 if (batch->send_initial_metadata || batch->send_message ||
3230 batch->send_trailing_metadata) {
3231 if (num_pending_retriable_subchannel_send_batches_ == 0) {
3232 GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
3233 }
3234 ++num_pending_retriable_subchannel_send_batches_;
3235 }
3236 }
3237 }
3238
StartRetriableSubchannelBatches(void * arg,grpc_error * ignored)3239 void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
3240 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3241 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3242 CallData* calld = static_cast<CallData*>(elem->call_data);
3243 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3244 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
3245 chand, calld);
3246 }
3247 SubchannelCallRetryState* retry_state =
3248 static_cast<SubchannelCallRetryState*>(
3249 calld->subchannel_call_->GetParentData());
3250 // Construct list of closures to execute, one for each pending batch.
3251 CallCombinerClosureList closures;
3252 // Replay previously-returned send_* ops if needed.
3253 SubchannelCallBatchData* replay_batch_data =
3254 calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
3255 if (replay_batch_data != nullptr) {
3256 calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
3257 &closures);
3258 // Track number of pending subchannel send batches.
3259 // If this is the first one, take a ref to the call stack.
3260 if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
3261 GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
3262 }
3263 ++calld->num_pending_retriable_subchannel_send_batches_;
3264 }
3265 // Now add pending batches.
3266 calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
3267 // Start batches on subchannel call.
3268 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3269 gpr_log(GPR_INFO,
3270 "chand=%p calld=%p: starting %" PRIuPTR
3271 " retriable batches on subchannel_call=%p",
3272 chand, calld, closures.size(), calld->subchannel_call_.get());
3273 }
3274 // Note: This will yield the call combiner.
3275 closures.RunClosures(calld->call_combiner_);
3276 }
3277
3278 //
3279 // LB pick
3280 //
3281
CreateSubchannelCall(grpc_call_element * elem)3282 void CallData::CreateSubchannelCall(grpc_call_element* elem) {
3283 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3284 const size_t parent_data_size =
3285 enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
3286 SubchannelCall::Args call_args = {
3287 std::move(connected_subchannel_), pollent_, path_, call_start_time_,
3288 deadline_, arena_,
3289 // TODO(roth): When we implement hedging support, we will probably
3290 // need to use a separate call context for each subchannel call.
3291 call_context_, call_combiner_, parent_data_size};
3292 grpc_error* error = GRPC_ERROR_NONE;
3293 subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3294 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3295 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
3296 chand, this, subchannel_call_.get(), grpc_error_string(error));
3297 }
3298 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3299 PendingBatchesFail(elem, error, YieldCallCombiner);
3300 } else {
3301 if (parent_data_size > 0) {
3302 new (subchannel_call_->GetParentData())
3303 SubchannelCallRetryState(call_context_);
3304 }
3305 PendingBatchesResume(elem);
3306 }
3307 }
3308
PickDone(void * arg,grpc_error * error)3309 void CallData::PickDone(void* arg, grpc_error* error) {
3310 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3311 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3312 CallData* calld = static_cast<CallData*>(elem->call_data);
3313 if (error != GRPC_ERROR_NONE) {
3314 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3315 gpr_log(GPR_INFO,
3316 "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
3317 calld, grpc_error_string(error));
3318 }
3319 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3320 return;
3321 }
3322 calld->CreateSubchannelCall(elem);
3323 }
3324
3325 // A class to handle the call combiner cancellation callback for a
3326 // queued pick.
3327 class CallData::QueuedPickCanceller {
3328 public:
QueuedPickCanceller(grpc_call_element * elem)3329 explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
3330 auto* calld = static_cast<CallData*>(elem->call_data);
3331 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3332 GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
3333 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
3334 grpc_combiner_scheduler(chand->data_plane_combiner()));
3335 calld->call_combiner_->SetNotifyOnCancel(&closure_);
3336 }
3337
3338 private:
CancelLocked(void * arg,grpc_error * error)3339 static void CancelLocked(void* arg, grpc_error* error) {
3340 auto* self = static_cast<QueuedPickCanceller*>(arg);
3341 auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
3342 auto* calld = static_cast<CallData*>(self->elem_->call_data);
3343 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3344 gpr_log(GPR_INFO,
3345 "chand=%p calld=%p: cancelling queued pick: "
3346 "error=%s self=%p calld->pick_canceller=%p",
3347 chand, calld, grpc_error_string(error), self,
3348 calld->pick_canceller_);
3349 }
3350 if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
3351 // Remove pick from list of queued picks.
3352 calld->RemoveCallFromQueuedPicksLocked(self->elem_);
3353 // Fail pending batches on the call.
3354 calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
3355 YieldCallCombinerIfPendingBatchesFound);
3356 }
3357 GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
3358 Delete(self);
3359 }
3360
3361 grpc_call_element* elem_;
3362 grpc_closure closure_;
3363 };
3364
RemoveCallFromQueuedPicksLocked(grpc_call_element * elem)3365 void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
3366 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3367 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3368 gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
3369 chand, this);
3370 }
3371 chand->RemoveQueuedPick(&pick_, pollent_);
3372 pick_queued_ = false;
3373 // Lame the call combiner canceller.
3374 pick_canceller_ = nullptr;
3375 }
3376
AddCallToQueuedPicksLocked(grpc_call_element * elem)3377 void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
3378 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3379 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3380 gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
3381 this);
3382 }
3383 pick_queued_ = true;
3384 pick_.elem = elem;
3385 chand->AddQueuedPick(&pick_, pollent_);
3386 // Register call combiner cancellation callback.
3387 pick_canceller_ = New<QueuedPickCanceller>(elem);
3388 }
3389
ApplyServiceConfigToCallLocked(grpc_call_element * elem)3390 void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3391 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3392 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3393 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
3394 chand, this);
3395 }
3396 // Store a ref to the service_config in service_config_call_data_. Also, save
3397 // a pointer to this in the call_context so that all future filters can access
3398 // it.
3399 service_config_call_data_ =
3400 ServiceConfig::CallData(chand->service_config(), path_);
3401 if (service_config_call_data_.service_config() != nullptr) {
3402 call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value =
3403 &service_config_call_data_;
3404 method_params_ = static_cast<ClientChannelMethodParsedConfig*>(
3405 service_config_call_data_.GetMethodParsedConfig(
3406 internal::ClientChannelServiceConfigParser::ParserIndex()));
3407 }
3408 retry_throttle_data_ = chand->retry_throttle_data();
3409 if (method_params_ != nullptr) {
3410 // If the deadline from the service config is shorter than the one
3411 // from the client API, reset the deadline timer.
3412 if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
3413 const grpc_millis per_method_deadline =
3414 grpc_timespec_to_millis_round_up(call_start_time_) +
3415 method_params_->timeout();
3416 if (per_method_deadline < deadline_) {
3417 deadline_ = per_method_deadline;
3418 grpc_deadline_state_reset(elem, deadline_);
3419 }
3420 }
3421 // If the service config set wait_for_ready and the application
3422 // did not explicitly set it, use the value from the service config.
3423 uint32_t* send_initial_metadata_flags =
3424 &pending_batches_[0]
3425 .batch->payload->send_initial_metadata.send_initial_metadata_flags;
3426 if (method_params_->wait_for_ready().has_value() &&
3427 !(*send_initial_metadata_flags &
3428 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3429 if (method_params_->wait_for_ready().value()) {
3430 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3431 } else {
3432 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3433 }
3434 }
3435 }
3436 // If no retry policy, disable retries.
3437 // TODO(roth): Remove this when adding support for transparent retries.
3438 if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
3439 enable_retries_ = false;
3440 }
3441 }
3442
MaybeApplyServiceConfigToCallLocked(grpc_call_element * elem)3443 void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3444 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3445 // Apply service config data to the call only once, and only if the
3446 // channel has the data available.
3447 if (GPR_LIKELY(chand->received_service_config_data() &&
3448 !service_config_applied_)) {
3449 service_config_applied_ = true;
3450 ApplyServiceConfigToCallLocked(elem);
3451 }
3452 }
3453
PickResultTypeName(LoadBalancingPolicy::PickResult::ResultType type)3454 const char* PickResultTypeName(
3455 LoadBalancingPolicy::PickResult::ResultType type) {
3456 switch (type) {
3457 case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
3458 return "COMPLETE";
3459 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3460 return "QUEUE";
3461 case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE:
3462 return "TRANSIENT_FAILURE";
3463 }
3464 GPR_UNREACHABLE_CODE(return "UNKNOWN");
3465 }
3466
StartPickLocked(void * arg,grpc_error * error)3467 void CallData::StartPickLocked(void* arg, grpc_error* error) {
3468 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3469 CallData* calld = static_cast<CallData*>(elem->call_data);
3470 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3471 GPR_ASSERT(calld->connected_subchannel_ == nullptr);
3472 GPR_ASSERT(calld->subchannel_call_ == nullptr);
3473 // picker's being null means the channel is currently in IDLE state. The
3474 // incoming call will make the channel exit IDLE and queue itself.
3475 if (chand->picker() == nullptr) {
3476 // We are currently in the data plane.
3477 // Bounce into the control plane to exit IDLE.
3478 chand->CheckConnectivityState(true);
3479 calld->AddCallToQueuedPicksLocked(elem);
3480 return;
3481 }
3482 // Apply service config to call if needed.
3483 calld->MaybeApplyServiceConfigToCallLocked(elem);
3484 // If this is a retry, use the send_initial_metadata payload that
3485 // we've cached; otherwise, use the pending batch. The
3486 // send_initial_metadata batch will be the first pending batch in the
3487 // list, as set by GetBatchIndex() above.
3488 // TODO(roth): What if the LB policy needs to add something to the
3489 // call's initial metadata, and then there's a retry? We don't want
3490 // the new metadata to be added twice. We might need to somehow
3491 // allocate the subchannel batch earlier so that we can give the
3492 // subchannel's copy of the metadata batch (which is copied for each
3493 // attempt) to the LB policy instead the one from the parent channel.
3494 LoadBalancingPolicy::PickArgs pick_args;
3495 pick_args.call_state = &calld->lb_call_state_;
3496 pick_args.initial_metadata =
3497 calld->seen_send_initial_metadata_
3498 ? &calld->send_initial_metadata_
3499 : calld->pending_batches_[0]
3500 .batch->payload->send_initial_metadata.send_initial_metadata;
3501 // Grab initial metadata flags so that we can check later if the call has
3502 // wait_for_ready enabled.
3503 const uint32_t send_initial_metadata_flags =
3504 calld->seen_send_initial_metadata_
3505 ? calld->send_initial_metadata_flags_
3506 : calld->pending_batches_[0]
3507 .batch->payload->send_initial_metadata
3508 .send_initial_metadata_flags;
3509 // When done, we schedule this closure to leave the data plane combiner.
3510 GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem,
3511 grpc_schedule_on_exec_ctx);
3512 // Attempt pick.
3513 auto result = chand->picker()->Pick(pick_args);
3514 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3515 gpr_log(GPR_INFO,
3516 "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
3517 "error=%s)",
3518 chand, calld, PickResultTypeName(result.type),
3519 result.connected_subchannel.get(), grpc_error_string(result.error));
3520 }
3521 switch (result.type) {
3522 case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: {
3523 // If we're shutting down, fail all RPCs.
3524 grpc_error* disconnect_error = chand->disconnect_error();
3525 if (disconnect_error != GRPC_ERROR_NONE) {
3526 GRPC_ERROR_UNREF(result.error);
3527 GRPC_CLOSURE_SCHED(&calld->pick_closure_,
3528 GRPC_ERROR_REF(disconnect_error));
3529 break;
3530 }
3531 // If wait_for_ready is false, then the error indicates the RPC
3532 // attempt's final status.
3533 if ((send_initial_metadata_flags &
3534 GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3535 // Retry if appropriate; otherwise, fail.
3536 grpc_status_code status = GRPC_STATUS_OK;
3537 grpc_error_get_status(result.error, calld->deadline_, &status, nullptr,
3538 nullptr, nullptr);
3539 if (!calld->enable_retries_ ||
3540 !calld->MaybeRetry(elem, nullptr /* batch_data */, status,
3541 nullptr /* server_pushback_md */)) {
3542 grpc_error* new_error =
3543 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3544 "Failed to pick subchannel", &result.error, 1);
3545 GRPC_ERROR_UNREF(result.error);
3546 GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error);
3547 }
3548 if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3549 break;
3550 }
3551 // If wait_for_ready is true, then queue to retry when we get a new
3552 // picker.
3553 GRPC_ERROR_UNREF(result.error);
3554 }
3555 // Fallthrough
3556 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3557 if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem);
3558 break;
3559 default: // PICK_COMPLETE
3560 // Handle drops.
3561 if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) {
3562 result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3563 "Call dropped by load balancing policy");
3564 }
3565 calld->connected_subchannel_ = std::move(result.connected_subchannel);
3566 calld->lb_recv_trailing_metadata_ready_ =
3567 result.recv_trailing_metadata_ready;
3568 calld->lb_recv_trailing_metadata_ready_user_data_ =
3569 result.recv_trailing_metadata_ready_user_data;
3570 GRPC_CLOSURE_SCHED(&calld->pick_closure_, result.error);
3571 if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3572 }
3573 }
3574
3575 } // namespace
3576 } // namespace grpc_core
3577
3578 /*************************************************************************
3579 * EXPORTED SYMBOLS
3580 */
3581
3582 using grpc_core::CallData;
3583 using grpc_core::ChannelData;
3584
3585 const grpc_channel_filter grpc_client_channel_filter = {
3586 CallData::StartTransportStreamOpBatch,
3587 ChannelData::StartTransportOp,
3588 sizeof(CallData),
3589 CallData::Init,
3590 CallData::SetPollent,
3591 CallData::Destroy,
3592 sizeof(ChannelData),
3593 ChannelData::Init,
3594 ChannelData::Destroy,
3595 ChannelData::GetChannelInfo,
3596 "client-channel",
3597 };
3598
grpc_client_channel_check_connectivity_state(grpc_channel_element * elem,int try_to_connect)3599 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3600 grpc_channel_element* elem, int try_to_connect) {
3601 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3602 return chand->CheckConnectivityState(try_to_connect);
3603 }
3604
grpc_client_channel_num_external_connectivity_watchers(grpc_channel_element * elem)3605 int grpc_client_channel_num_external_connectivity_watchers(
3606 grpc_channel_element* elem) {
3607 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3608 return chand->NumExternalConnectivityWatchers();
3609 }
3610
grpc_client_channel_watch_connectivity_state(grpc_channel_element * elem,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * closure,grpc_closure * watcher_timer_init)3611 void grpc_client_channel_watch_connectivity_state(
3612 grpc_channel_element* elem, grpc_polling_entity pollent,
3613 grpc_connectivity_state* state, grpc_closure* closure,
3614 grpc_closure* watcher_timer_init) {
3615 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3616 return chand->AddExternalConnectivityWatcher(pollent, state, closure,
3617 watcher_timer_init);
3618 }
3619
3620 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element * elem)3621 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3622 auto* calld = static_cast<CallData*>(elem->call_data);
3623 return calld->subchannel_call();
3624 }
3625