1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/client_channel.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33 
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
36 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
37 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
38 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
39 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
40 #include "src/core/ext/filters/client_channel/resolver_registry.h"
41 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
42 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
43 #include "src/core/ext/filters/client_channel/retry_throttle.h"
44 #include "src/core/ext/filters/client_channel/service_config.h"
45 #include "src/core/ext/filters/client_channel/subchannel.h"
46 #include "src/core/ext/filters/deadline/deadline_filter.h"
47 #include "src/core/lib/backoff/backoff.h"
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/channel/connected_channel.h"
50 #include "src/core/lib/channel/status_util.h"
51 #include "src/core/lib/gpr/string.h"
52 #include "src/core/lib/gprpp/inlined_vector.h"
53 #include "src/core/lib/gprpp/manual_constructor.h"
54 #include "src/core/lib/gprpp/map.h"
55 #include "src/core/lib/gprpp/sync.h"
56 #include "src/core/lib/iomgr/combiner.h"
57 #include "src/core/lib/iomgr/iomgr.h"
58 #include "src/core/lib/iomgr/polling_entity.h"
59 #include "src/core/lib/profiling/timers.h"
60 #include "src/core/lib/slice/slice_internal.h"
61 #include "src/core/lib/slice/slice_string_helpers.h"
62 #include "src/core/lib/surface/channel.h"
63 #include "src/core/lib/transport/connectivity_state.h"
64 #include "src/core/lib/transport/error_utils.h"
65 #include "src/core/lib/transport/metadata.h"
66 #include "src/core/lib/transport/metadata_batch.h"
67 #include "src/core/lib/transport/static_metadata.h"
68 #include "src/core/lib/transport/status_metadata.h"
69 
70 using grpc_core::internal::ClientChannelMethodParsedConfig;
71 using grpc_core::internal::ServerRetryThrottleData;
72 
73 //
74 // Client channel filter
75 //
76 
77 // By default, we buffer 256 KiB per RPC for retries.
78 // TODO(roth): Do we have any data to suggest a better value?
79 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
80 
81 // This value was picked arbitrarily.  It can be changed if there is
82 // any even moderately compelling reason to do so.
83 #define RETRY_BACKOFF_JITTER 0.2
84 
85 // Max number of batches that can be pending on a call at any given
86 // time.  This includes one batch for each of the following ops:
87 //   recv_initial_metadata
88 //   send_initial_metadata
89 //   recv_message
90 //   send_message
91 //   recv_trailing_metadata
92 //   send_trailing_metadata
93 #define MAX_PENDING_BATCHES 6
94 
95 namespace grpc_core {
96 
97 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
98 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
99 
100 namespace {
101 
102 //
103 // ChannelData definition
104 //
105 
106 class ChannelData {
107  public:
108   struct QueuedPick {
109     grpc_call_element* elem;
110     QueuedPick* next = nullptr;
111   };
112 
113   static grpc_error* Init(grpc_channel_element* elem,
114                           grpc_channel_element_args* args);
115   static void Destroy(grpc_channel_element* elem);
116   static void StartTransportOp(grpc_channel_element* elem,
117                                grpc_transport_op* op);
118   static void GetChannelInfo(grpc_channel_element* elem,
119                              const grpc_channel_info* info);
120 
deadline_checking_enabled() const121   bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
enable_retries() const122   bool enable_retries() const { return enable_retries_; }
per_rpc_retry_buffer_size() const123   size_t per_rpc_retry_buffer_size() const {
124     return per_rpc_retry_buffer_size_;
125   }
126 
127   // Note: Does NOT return a new ref.
disconnect_error() const128   grpc_error* disconnect_error() const {
129     return disconnect_error_.Load(MemoryOrder::ACQUIRE);
130   }
131 
data_plane_combiner() const132   grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; }
133 
picker() const134   LoadBalancingPolicy::SubchannelPicker* picker() const {
135     return picker_.get();
136   }
137   void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
138   void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
139 
received_service_config_data() const140   bool received_service_config_data() const {
141     return received_service_config_data_;
142   }
retry_throttle_data() const143   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
144     return retry_throttle_data_;
145   }
service_config() const146   RefCountedPtr<ServiceConfig> service_config() const {
147     return service_config_;
148   }
149 
150   grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
AddExternalConnectivityWatcher(grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)151   void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
152                                       grpc_connectivity_state* state,
153                                       grpc_closure* on_complete,
154                                       grpc_closure* watcher_timer_init) {
155     // Will delete itself.
156     New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
157                                      watcher_timer_init);
158   }
NumExternalConnectivityWatchers() const159   int NumExternalConnectivityWatchers() const {
160     return external_connectivity_watcher_list_.size();
161   }
162 
163  private:
164   class ConnectivityStateAndPickerSetter;
165   class ServiceConfigSetter;
166   class GrpcSubchannel;
167   class ClientChannelControlHelper;
168 
169   class ExternalConnectivityWatcher {
170    public:
171     class WatcherList {
172      public:
WatcherList()173       WatcherList() { gpr_mu_init(&mu_); }
~WatcherList()174       ~WatcherList() { gpr_mu_destroy(&mu_); }
175 
176       int size() const;
177       ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
178       void Add(ExternalConnectivityWatcher* watcher);
179       void Remove(const ExternalConnectivityWatcher* watcher);
180 
181      private:
182       // head_ is guarded by a mutex, since the size() method needs to
183       // iterate over the list, and it's called from the C-core API
184       // function grpc_channel_num_external_connectivity_watchers(), which
185       // is synchronous and therefore cannot run in the combiner.
186       mutable gpr_mu mu_;
187       ExternalConnectivityWatcher* head_ = nullptr;
188     };
189 
190     ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
191                                 grpc_connectivity_state* state,
192                                 grpc_closure* on_complete,
193                                 grpc_closure* watcher_timer_init);
194 
195     ~ExternalConnectivityWatcher();
196 
197    private:
198     static void OnWatchCompleteLocked(void* arg, grpc_error* error);
199     static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
200 
201     ChannelData* chand_;
202     grpc_polling_entity pollent_;
203     grpc_connectivity_state* state_;
204     grpc_closure* on_complete_;
205     grpc_closure* watcher_timer_init_;
206     grpc_closure my_closure_;
207     ExternalConnectivityWatcher* next_ = nullptr;
208   };
209 
210   ChannelData(grpc_channel_element_args* args, grpc_error** error);
211   ~ChannelData();
212 
213   void CreateResolvingLoadBalancingPolicyLocked();
214 
215   void DestroyResolvingLoadBalancingPolicyLocked();
216 
217   static bool ProcessResolverResultLocked(
218       void* arg, const Resolver::Result& result, const char** lb_policy_name,
219       RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
220       grpc_error** service_config_error);
221 
222   grpc_error* DoPingLocked(grpc_transport_op* op);
223 
224   static void StartTransportOpLocked(void* arg, grpc_error* ignored);
225 
226   static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
227 
228   void ProcessLbPolicy(
229       const Resolver::Result& resolver_result,
230       const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
231       UniquePtr<char>* lb_policy_name,
232       RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config);
233 
234   //
235   // Fields set at construction and never modified.
236   //
237   const bool deadline_checking_enabled_;
238   const bool enable_retries_;
239   const size_t per_rpc_retry_buffer_size_;
240   grpc_channel_stack* owning_stack_;
241   ClientChannelFactory* client_channel_factory_;
242   const grpc_channel_args* channel_args_;
243   RefCountedPtr<ServiceConfig> default_service_config_;
244   UniquePtr<char> server_name_;
245   UniquePtr<char> target_uri_;
246   channelz::ChannelNode* channelz_node_;
247 
248   //
249   // Fields used in the data plane.  Guarded by data_plane_combiner.
250   //
251   grpc_combiner* data_plane_combiner_;
252   UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
253   QueuedPick* queued_picks_ = nullptr;  // Linked list of queued picks.
254   // Data from service config.
255   bool received_service_config_data_ = false;
256   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
257   RefCountedPtr<ServiceConfig> service_config_;
258 
259   //
260   // Fields used in the control plane.  Guarded by combiner.
261   //
262   grpc_combiner* combiner_;
263   grpc_pollset_set* interested_parties_;
264   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
265   OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
266   grpc_connectivity_state_tracker state_tracker_;
267   ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
268   UniquePtr<char> health_check_service_name_;
269   RefCountedPtr<ServiceConfig> saved_service_config_;
270   bool received_first_resolver_result_ = false;
271   Map<Subchannel*, int> subchannel_refcount_map_;
272 
273   //
274   // Fields accessed from both data plane and control plane combiners.
275   //
276   Atomic<grpc_error*> disconnect_error_;
277 
278   //
279   // Fields guarded by a mutex, since they need to be accessed
280   // synchronously via get_channel_info().
281   //
282   gpr_mu info_mu_;
283   UniquePtr<char> info_lb_policy_name_;
284   UniquePtr<char> info_service_config_json_;
285 };
286 
287 //
288 // CallData definition
289 //
290 
291 class CallData {
292  public:
293   static grpc_error* Init(grpc_call_element* elem,
294                           const grpc_call_element_args* args);
295   static void Destroy(grpc_call_element* elem,
296                       const grpc_call_final_info* final_info,
297                       grpc_closure* then_schedule_closure);
298   static void StartTransportStreamOpBatch(
299       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
300   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
301 
subchannel_call()302   RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
303 
304   // Invoked by channel for queued picks once resolver results are available.
305   void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
306 
307   // Invoked by channel for queued picks when the picker is updated.
308   static void StartPickLocked(void* arg, grpc_error* error);
309 
310  private:
311   class QueuedPickCanceller;
312 
313   class LbCallState : public LoadBalancingPolicy::CallState {
314    public:
LbCallState(CallData * calld)315     explicit LbCallState(CallData* calld) : calld_(calld) {}
316 
Alloc(size_t size)317     void* Alloc(size_t size) override { return calld_->arena_->Alloc(size); }
318 
319    private:
320     CallData* calld_;
321   };
322 
323   // State used for starting a retryable batch on a subchannel call.
324   // This provides its own grpc_transport_stream_op_batch and other data
325   // structures needed to populate the ops in the batch.
326   // We allocate one struct on the arena for each attempt at starting a
327   // batch on a given subchannel call.
328   struct SubchannelCallBatchData {
329     // Creates a SubchannelCallBatchData object on the call's arena with the
330     // specified refcount.  If set_on_complete is true, the batch's
331     // on_complete callback will be set to point to on_complete();
332     // otherwise, the batch's on_complete callback will be null.
333     static SubchannelCallBatchData* Create(grpc_call_element* elem,
334                                            int refcount, bool set_on_complete);
335 
Unrefgrpc_core::__anone9b9ae780111::CallData::SubchannelCallBatchData336     void Unref() {
337       if (gpr_unref(&refs)) Destroy();
338     }
339 
340     SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
341                             int refcount, bool set_on_complete);
342     // All dtor code must be added in `Destroy()`. This is because we may
343     // call closures in `SubchannelCallBatchData` after they are unrefed by
344     // `Unref()`, and msan would complain about accessing this class
345     // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
346     // TODO(soheil): We should try to call the dtor in `Unref()`.
~SubchannelCallBatchDatagrpc_core::__anone9b9ae780111::CallData::SubchannelCallBatchData347     ~SubchannelCallBatchData() { Destroy(); }
348     void Destroy();
349 
350     gpr_refcount refs;
351     grpc_call_element* elem;
352     RefCountedPtr<SubchannelCall> subchannel_call;
353     // The batch to use in the subchannel call.
354     // Its payload field points to SubchannelCallRetryState::batch_payload.
355     grpc_transport_stream_op_batch batch;
356     // For intercepting on_complete.
357     grpc_closure on_complete;
358   };
359 
360   // Retry state associated with a subchannel call.
361   // Stored in the parent_data of the subchannel call object.
362   struct SubchannelCallRetryState {
SubchannelCallRetryStategrpc_core::__anone9b9ae780111::CallData::SubchannelCallRetryState363     explicit SubchannelCallRetryState(grpc_call_context_element* context)
364         : batch_payload(context),
365           started_send_initial_metadata(false),
366           completed_send_initial_metadata(false),
367           started_send_trailing_metadata(false),
368           completed_send_trailing_metadata(false),
369           started_recv_initial_metadata(false),
370           completed_recv_initial_metadata(false),
371           started_recv_trailing_metadata(false),
372           completed_recv_trailing_metadata(false),
373           retry_dispatched(false) {}
374 
375     // SubchannelCallBatchData.batch.payload points to this.
376     grpc_transport_stream_op_batch_payload batch_payload;
377     // For send_initial_metadata.
378     // Note that we need to make a copy of the initial metadata for each
379     // subchannel call instead of just referring to the copy in call_data,
380     // because filters in the subchannel stack will probably add entries,
381     // so we need to start in a pristine state for each attempt of the call.
382     grpc_linked_mdelem* send_initial_metadata_storage;
383     grpc_metadata_batch send_initial_metadata;
384     // For send_message.
385     // TODO(roth): Restructure this to eliminate use of ManualConstructor.
386     ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
387     // For send_trailing_metadata.
388     grpc_linked_mdelem* send_trailing_metadata_storage;
389     grpc_metadata_batch send_trailing_metadata;
390     // For intercepting recv_initial_metadata.
391     grpc_metadata_batch recv_initial_metadata;
392     grpc_closure recv_initial_metadata_ready;
393     bool trailing_metadata_available = false;
394     // For intercepting recv_message.
395     grpc_closure recv_message_ready;
396     OrphanablePtr<ByteStream> recv_message;
397     // For intercepting recv_trailing_metadata.
398     grpc_metadata_batch recv_trailing_metadata;
399     grpc_transport_stream_stats collect_stats;
400     grpc_closure recv_trailing_metadata_ready;
401     // These fields indicate which ops have been started and completed on
402     // this subchannel call.
403     size_t started_send_message_count = 0;
404     size_t completed_send_message_count = 0;
405     size_t started_recv_message_count = 0;
406     size_t completed_recv_message_count = 0;
407     bool started_send_initial_metadata : 1;
408     bool completed_send_initial_metadata : 1;
409     bool started_send_trailing_metadata : 1;
410     bool completed_send_trailing_metadata : 1;
411     bool started_recv_initial_metadata : 1;
412     bool completed_recv_initial_metadata : 1;
413     bool started_recv_trailing_metadata : 1;
414     bool completed_recv_trailing_metadata : 1;
415     // State for callback processing.
416     SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
417         nullptr;
418     grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
419     SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
420     grpc_error* recv_message_error = GRPC_ERROR_NONE;
421     SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
422     // NOTE: Do not move this next to the metadata bitfields above. That would
423     //       save space but will also result in a data race because compiler
424     //       will generate a 2 byte store which overwrites the meta-data
425     //       fields upon setting this field.
426     bool retry_dispatched : 1;
427   };
428 
429   // Pending batches stored in call data.
430   struct PendingBatch {
431     // The pending batch.  If nullptr, this slot is empty.
432     grpc_transport_stream_op_batch* batch;
433     // Indicates whether payload for send ops has been cached in CallData.
434     bool send_ops_cached;
435   };
436 
437   CallData(grpc_call_element* elem, const ChannelData& chand,
438            const grpc_call_element_args& args);
439   ~CallData();
440 
441   // Caches data for send ops so that it can be retried later, if not
442   // already cached.
443   void MaybeCacheSendOpsForBatch(PendingBatch* pending);
444   void FreeCachedSendInitialMetadata(ChannelData* chand);
445   // Frees cached send_message at index idx.
446   void FreeCachedSendMessage(ChannelData* chand, size_t idx);
447   void FreeCachedSendTrailingMetadata(ChannelData* chand);
448   // Frees cached send ops that have already been completed after
449   // committing the call.
450   void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
451                                        SubchannelCallRetryState* retry_state);
452   // Frees cached send ops that were completed by the completed batch in
453   // batch_data.  Used when batches are completed after the call is committed.
454   void FreeCachedSendOpDataForCompletedBatch(
455       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
456       SubchannelCallRetryState* retry_state);
457 
458   static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
459       void* arg, grpc_error* error);
460   void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
461       grpc_transport_stream_op_batch* batch);
462 
463   // Returns the index into pending_batches_ to be used for batch.
464   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
465   void PendingBatchesAdd(grpc_call_element* elem,
466                          grpc_transport_stream_op_batch* batch);
467   void PendingBatchClear(PendingBatch* pending);
468   void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
469   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
470   // A predicate type and some useful implementations for PendingBatchesFail().
471   typedef bool (*YieldCallCombinerPredicate)(
472       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList & closures)473   static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
474     return true;
475   }
NoYieldCallCombiner(const CallCombinerClosureList & closures)476   static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
477     return false;
478   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)479   static bool YieldCallCombinerIfPendingBatchesFound(
480       const CallCombinerClosureList& closures) {
481     return closures.size() > 0;
482   }
483   // Fails all pending batches.
484   // If yield_call_combiner_predicate returns true, assumes responsibility for
485   // yielding the call combiner.
486   void PendingBatchesFail(
487       grpc_call_element* elem, grpc_error* error,
488       YieldCallCombinerPredicate yield_call_combiner_predicate);
489   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
490   // Resumes all pending batches on subchannel_call_.
491   void PendingBatchesResume(grpc_call_element* elem);
492   // Returns a pointer to the first pending batch for which predicate(batch)
493   // returns true, or null if not found.
494   template <typename Predicate>
495   PendingBatch* PendingBatchFind(grpc_call_element* elem,
496                                  const char* log_message, Predicate predicate);
497 
498   // Commits the call so that no further retry attempts will be performed.
499   void RetryCommit(grpc_call_element* elem,
500                    SubchannelCallRetryState* retry_state);
501   // Starts a retry after appropriate back-off.
502   void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
503                grpc_millis server_pushback_ms);
504   // Returns true if the call is being retried.
505   bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
506                   grpc_status_code status, grpc_mdelem* server_pushback_md);
507 
508   // Invokes recv_initial_metadata_ready for a subchannel batch.
509   static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
510   // Intercepts recv_initial_metadata_ready callback for retries.
511   // Commits the call and returns the initial metadata up the stack.
512   static void RecvInitialMetadataReady(void* arg, grpc_error* error);
513 
514   // Invokes recv_message_ready for a subchannel batch.
515   static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
516   // Intercepts recv_message_ready callback for retries.
517   // Commits the call and returns the message up the stack.
518   static void RecvMessageReady(void* arg, grpc_error* error);
519 
520   // Sets *status and *server_pushback_md based on md_batch and error.
521   // Only sets *server_pushback_md if server_pushback_md != nullptr.
522   void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
523                      grpc_error* error, grpc_status_code* status,
524                      grpc_mdelem** server_pushback_md);
525   // Adds recv_trailing_metadata_ready closure to closures.
526   void AddClosureForRecvTrailingMetadataReady(
527       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
528       grpc_error* error, CallCombinerClosureList* closures);
529   // Adds any necessary closures for deferred recv_initial_metadata and
530   // recv_message callbacks to closures.
531   static void AddClosuresForDeferredRecvCallbacks(
532       SubchannelCallBatchData* batch_data,
533       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
534   // Returns true if any op in the batch was not yet started.
535   // Only looks at send ops, since recv ops are always started immediately.
536   bool PendingBatchIsUnstarted(PendingBatch* pending,
537                                SubchannelCallRetryState* retry_state);
538   // For any pending batch containing an op that has not yet been started,
539   // adds the pending batch's completion closures to closures.
540   void AddClosuresToFailUnstartedPendingBatches(
541       grpc_call_element* elem, SubchannelCallRetryState* retry_state,
542       grpc_error* error, CallCombinerClosureList* closures);
543   // Runs necessary closures upon completion of a call attempt.
544   void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
545                                    grpc_error* error);
546   // Intercepts recv_trailing_metadata_ready callback for retries.
547   // Commits the call and returns the trailing metadata up the stack.
548   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
549 
550   // Adds the on_complete closure for the pending batch completed in
551   // batch_data to closures.
552   void AddClosuresForCompletedPendingBatch(
553       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
554       SubchannelCallRetryState* retry_state, grpc_error* error,
555       CallCombinerClosureList* closures);
556 
557   // If there are any cached ops to replay or pending ops to start on the
558   // subchannel call, adds a closure to closures to invoke
559   // StartRetriableSubchannelBatches().
560   void AddClosuresForReplayOrPendingSendOps(
561       grpc_call_element* elem, SubchannelCallBatchData* batch_data,
562       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
563 
564   // Callback used to intercept on_complete from subchannel calls.
565   // Called only when retries are enabled.
566   static void OnComplete(void* arg, grpc_error* error);
567 
568   static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
569   // Adds a closure to closures that will execute batch in the call combiner.
570   void AddClosureForSubchannelBatch(grpc_call_element* elem,
571                                     grpc_transport_stream_op_batch* batch,
572                                     CallCombinerClosureList* closures);
573   // Adds retriable send_initial_metadata op to batch_data.
574   void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
575                                          SubchannelCallBatchData* batch_data);
576   // Adds retriable send_message op to batch_data.
577   void AddRetriableSendMessageOp(grpc_call_element* elem,
578                                  SubchannelCallRetryState* retry_state,
579                                  SubchannelCallBatchData* batch_data);
580   // Adds retriable send_trailing_metadata op to batch_data.
581   void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
582                                           SubchannelCallBatchData* batch_data);
583   // Adds retriable recv_initial_metadata op to batch_data.
584   void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
585                                          SubchannelCallBatchData* batch_data);
586   // Adds retriable recv_message op to batch_data.
587   void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
588                                  SubchannelCallBatchData* batch_data);
589   // Adds retriable recv_trailing_metadata op to batch_data.
590   void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
591                                           SubchannelCallBatchData* batch_data);
592   // Helper function used to start a recv_trailing_metadata batch.  This
593   // is used in the case where a recv_initial_metadata or recv_message
594   // op fails in a way that we know the call is over but when the application
595   // has not yet started its own recv_trailing_metadata op.
596   void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
597   // If there are any cached send ops that need to be replayed on the
598   // current subchannel call, creates and returns a new subchannel batch
599   // to replay those ops.  Otherwise, returns nullptr.
600   SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
601       grpc_call_element* elem, SubchannelCallRetryState* retry_state);
602   // Adds subchannel batches for pending batches to closures.
603   void AddSubchannelBatchesForPendingBatches(
604       grpc_call_element* elem, SubchannelCallRetryState* retry_state,
605       CallCombinerClosureList* closures);
606   // Constructs and starts whatever subchannel batches are needed on the
607   // subchannel call.
608   static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
609 
610   void CreateSubchannelCall(grpc_call_element* elem);
611   // Invoked when a pick is completed, on both success or failure.
612   static void PickDone(void* arg, grpc_error* error);
613   // Removes the call from the channel's list of queued picks.
614   void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
615   // Adds the call to the channel's list of queued picks.
616   void AddCallToQueuedPicksLocked(grpc_call_element* elem);
617   // Applies service config to the call.  Must be invoked once we know
618   // that the resolver has returned results to the channel.
619   void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
620 
621   // State for handling deadlines.
622   // The code in deadline_filter.c requires this to be the first field.
623   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
624   // and this struct both independently store pointers to the call stack
625   // and call combiner.  If/when we have time, find a way to avoid this
626   // without breaking the grpc_deadline_state abstraction.
627   grpc_deadline_state deadline_state_;
628 
629   grpc_slice path_;  // Request path.
630   gpr_timespec call_start_time_;
631   grpc_millis deadline_;
632   Arena* arena_;
633   grpc_call_stack* owning_call_;
634   CallCombiner* call_combiner_;
635   grpc_call_context_element* call_context_;
636 
637   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
638   ServiceConfig::CallData service_config_call_data_;
639   const ClientChannelMethodParsedConfig* method_params_ = nullptr;
640 
641   RefCountedPtr<SubchannelCall> subchannel_call_;
642 
643   // Set when we get a cancel_stream op.
644   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
645 
646   ChannelData::QueuedPick pick_;
647   bool pick_queued_ = false;
648   bool service_config_applied_ = false;
649   QueuedPickCanceller* pick_canceller_ = nullptr;
650   LbCallState lb_call_state_;
651   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
652   void (*lb_recv_trailing_metadata_ready_)(
653       void* user_data, grpc_metadata_batch* recv_trailing_metadata,
654       LoadBalancingPolicy::CallState* call_state) = nullptr;
655   void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
656   grpc_closure pick_closure_;
657 
658   // For intercepting recv_trailing_metadata_ready for the LB policy.
659   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
660   grpc_closure recv_trailing_metadata_ready_;
661   grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
662 
663   grpc_polling_entity* pollent_ = nullptr;
664 
665   // Batches are added to this list when received from above.
666   // They are removed when we are done handling the batch (i.e., when
667   // either we have invoked all of the batch's callbacks or we have
668   // passed the batch down to the subchannel call and are not
669   // intercepting any of its callbacks).
670   PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
671   bool pending_send_initial_metadata_ : 1;
672   bool pending_send_message_ : 1;
673   bool pending_send_trailing_metadata_ : 1;
674 
675   // Retry state.
676   bool enable_retries_ : 1;
677   bool retry_committed_ : 1;
678   bool last_attempt_got_server_pushback_ : 1;
679   int num_attempts_completed_ = 0;
680   size_t bytes_buffered_for_retry_ = 0;
681   // TODO(roth): Restructure this to eliminate use of ManualConstructor.
682   ManualConstructor<BackOff> retry_backoff_;
683   grpc_timer retry_timer_;
684 
685   // The number of pending retriable subchannel batches containing send ops.
686   // We hold a ref to the call stack while this is non-zero, since replay
687   // batches may not complete until after all callbacks have been returned
688   // to the surface, and we need to make sure that the call is not destroyed
689   // until all of these batches have completed.
690   // Note that we actually only need to track replay batches, but it's
691   // easier to track all batches with send ops.
692   int num_pending_retriable_subchannel_send_batches_ = 0;
693 
694   // Cached data for retrying send ops.
695   // send_initial_metadata
696   bool seen_send_initial_metadata_ = false;
697   grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
698   grpc_metadata_batch send_initial_metadata_;
699   uint32_t send_initial_metadata_flags_;
700   gpr_atm* peer_string_;
701   // send_message
702   // When we get a send_message op, we replace the original byte stream
703   // with a CachingByteStream that caches the slices to a local buffer for
704   // use in retries.
705   // Note: We inline the cache for the first 3 send_message ops and use
706   // dynamic allocation after that.  This number was essentially picked
707   // at random; it could be changed in the future to tune performance.
708   InlinedVector<ByteStreamCache*, 3> send_messages_;
709   // send_trailing_metadata
710   bool seen_send_trailing_metadata_ = false;
711   grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
712   grpc_metadata_batch send_trailing_metadata_;
713 };
714 
715 //
716 // ChannelData::ConnectivityStateAndPickerSetter
717 //
718 
719 // A fire-and-forget class that sets the channel's connectivity state
720 // and then hops into the data plane combiner to update the picker.
721 // Must be instantiated while holding the control plane combiner.
722 // Deletes itself when done.
723 class ChannelData::ConnectivityStateAndPickerSetter {
724  public:
ConnectivityStateAndPickerSetter(ChannelData * chand,grpc_connectivity_state state,const char * reason,UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)725   ConnectivityStateAndPickerSetter(
726       ChannelData* chand, grpc_connectivity_state state, const char* reason,
727       UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
728       : chand_(chand), picker_(std::move(picker)) {
729     // Update connectivity state here, while holding control plane combiner.
730     grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
731     if (chand->channelz_node_ != nullptr) {
732       chand->channelz_node_->SetConnectivityState(state);
733       chand->channelz_node_->AddTraceEvent(
734           channelz::ChannelTrace::Severity::Info,
735           grpc_slice_from_static_string(
736               GetChannelConnectivityStateChangeString(state)));
737     }
738     // Bounce into the data plane combiner to reset the picker.
739     GRPC_CHANNEL_STACK_REF(chand->owning_stack_,
740                            "ConnectivityStateAndPickerSetter");
741     GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
742                       grpc_combiner_scheduler(chand->data_plane_combiner_));
743     GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
744   }
745 
746  private:
GetChannelConnectivityStateChangeString(grpc_connectivity_state state)747   static const char* GetChannelConnectivityStateChangeString(
748       grpc_connectivity_state state) {
749     switch (state) {
750       case GRPC_CHANNEL_IDLE:
751         return "Channel state change to IDLE";
752       case GRPC_CHANNEL_CONNECTING:
753         return "Channel state change to CONNECTING";
754       case GRPC_CHANNEL_READY:
755         return "Channel state change to READY";
756       case GRPC_CHANNEL_TRANSIENT_FAILURE:
757         return "Channel state change to TRANSIENT_FAILURE";
758       case GRPC_CHANNEL_SHUTDOWN:
759         return "Channel state change to SHUTDOWN";
760     }
761     GPR_UNREACHABLE_CODE(return "UNKNOWN");
762   }
763 
SetPicker(void * arg,grpc_error * ignored)764   static void SetPicker(void* arg, grpc_error* ignored) {
765     auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
766     // Update picker.
767     self->chand_->picker_ = std::move(self->picker_);
768     // Re-process queued picks.
769     for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
770          pick = pick->next) {
771       CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE);
772     }
773     // Clean up.
774     GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
775                              "ConnectivityStateAndPickerSetter");
776     Delete(self);
777   }
778 
779   ChannelData* chand_;
780   UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
781   grpc_closure closure_;
782 };
783 
784 //
785 // ChannelData::ServiceConfigSetter
786 //
787 
788 // A fire-and-forget class that sets the channel's service config data
789 // in the data plane combiner.  Deletes itself when done.
790 class ChannelData::ServiceConfigSetter {
791  public:
ServiceConfigSetter(ChannelData * chand,Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling> retry_throttle_data,RefCountedPtr<ServiceConfig> service_config)792   ServiceConfigSetter(
793       ChannelData* chand,
794       Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
795           retry_throttle_data,
796       RefCountedPtr<ServiceConfig> service_config)
797       : chand_(chand),
798         retry_throttle_data_(retry_throttle_data),
799         service_config_(std::move(service_config)) {
800     GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter");
801     GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
802                       grpc_combiner_scheduler(chand->data_plane_combiner_));
803     GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
804   }
805 
806  private:
SetServiceConfigData(void * arg,grpc_error * ignored)807   static void SetServiceConfigData(void* arg, grpc_error* ignored) {
808     ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
809     ChannelData* chand = self->chand_;
810     // Update channel state.
811     chand->received_service_config_data_ = true;
812     if (self->retry_throttle_data_.has_value()) {
813       chand->retry_throttle_data_ =
814           internal::ServerRetryThrottleMap::GetDataForServer(
815               chand->server_name_.get(),
816               self->retry_throttle_data_.value().max_milli_tokens,
817               self->retry_throttle_data_.value().milli_token_ratio);
818     }
819     chand->service_config_ = std::move(self->service_config_);
820     // Apply service config to queued picks.
821     for (QueuedPick* pick = chand->queued_picks_; pick != nullptr;
822          pick = pick->next) {
823       CallData* calld = static_cast<CallData*>(pick->elem->call_data);
824       calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
825     }
826     // Clean up.
827     GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
828                              "ServiceConfigSetter");
829     Delete(self);
830   }
831 
832   ChannelData* chand_;
833   Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
834       retry_throttle_data_;
835   RefCountedPtr<ServiceConfig> service_config_;
836   grpc_closure closure_;
837 };
838 
839 //
840 // ChannelData::ExternalConnectivityWatcher::WatcherList
841 //
842 
size() const843 int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
844   MutexLock lock(&mu_);
845   int count = 0;
846   for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
847     ++count;
848   }
849   return count;
850 }
851 
852 ChannelData::ExternalConnectivityWatcher*
Lookup(grpc_closure * on_complete) const853 ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
854     grpc_closure* on_complete) const {
855   MutexLock lock(&mu_);
856   ExternalConnectivityWatcher* w = head_;
857   while (w != nullptr && w->on_complete_ != on_complete) {
858     w = w->next_;
859   }
860   return w;
861 }
862 
Add(ExternalConnectivityWatcher * watcher)863 void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
864     ExternalConnectivityWatcher* watcher) {
865   GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
866   MutexLock lock(&mu_);
867   GPR_ASSERT(watcher->next_ == nullptr);
868   watcher->next_ = head_;
869   head_ = watcher;
870 }
871 
Remove(const ExternalConnectivityWatcher * watcher)872 void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
873     const ExternalConnectivityWatcher* watcher) {
874   MutexLock lock(&mu_);
875   if (watcher == head_) {
876     head_ = watcher->next_;
877     return;
878   }
879   for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
880     if (w->next_ == watcher) {
881       w->next_ = w->next_->next_;
882       return;
883     }
884   }
885   GPR_UNREACHABLE_CODE(return );
886 }
887 
888 //
889 // ChannelData::ExternalConnectivityWatcher
890 //
891 
ExternalConnectivityWatcher(ChannelData * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)892 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
893     ChannelData* chand, grpc_polling_entity pollent,
894     grpc_connectivity_state* state, grpc_closure* on_complete,
895     grpc_closure* watcher_timer_init)
896     : chand_(chand),
897       pollent_(pollent),
898       state_(state),
899       on_complete_(on_complete),
900       watcher_timer_init_(watcher_timer_init) {
901   grpc_polling_entity_add_to_pollset_set(&pollent_,
902                                          chand_->interested_parties_);
903   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
904   GRPC_CLOSURE_SCHED(
905       GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
906                         grpc_combiner_scheduler(chand_->combiner_)),
907       GRPC_ERROR_NONE);
908 }
909 
~ExternalConnectivityWatcher()910 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
911   grpc_polling_entity_del_from_pollset_set(&pollent_,
912                                            chand_->interested_parties_);
913   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
914                            "ExternalConnectivityWatcher");
915 }
916 
OnWatchCompleteLocked(void * arg,grpc_error * error)917 void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
918     void* arg, grpc_error* error) {
919   ExternalConnectivityWatcher* self =
920       static_cast<ExternalConnectivityWatcher*>(arg);
921   grpc_closure* on_complete = self->on_complete_;
922   self->chand_->external_connectivity_watcher_list_.Remove(self);
923   Delete(self);
924   GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
925 }
926 
WatchConnectivityStateLocked(void * arg,grpc_error * ignored)927 void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
928     void* arg, grpc_error* ignored) {
929   ExternalConnectivityWatcher* self =
930       static_cast<ExternalConnectivityWatcher*>(arg);
931   if (self->state_ == nullptr) {
932     // Handle cancellation.
933     GPR_ASSERT(self->watcher_timer_init_ == nullptr);
934     ExternalConnectivityWatcher* found =
935         self->chand_->external_connectivity_watcher_list_.Lookup(
936             self->on_complete_);
937     if (found != nullptr) {
938       grpc_connectivity_state_notify_on_state_change(
939           &found->chand_->state_tracker_, nullptr, &found->my_closure_);
940     }
941     Delete(self);
942     return;
943   }
944   // New watcher.
945   self->chand_->external_connectivity_watcher_list_.Add(self);
946   // This assumes that the closure is scheduled on the ExecCtx scheduler
947   // and that GRPC_CLOSURE_RUN would run the closure immediately.
948   GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
949   GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
950                     grpc_combiner_scheduler(self->chand_->combiner_));
951   grpc_connectivity_state_notify_on_state_change(
952       &self->chand_->state_tracker_, self->state_, &self->my_closure_);
953 }
954 
955 //
956 // ChannelData::GrpcSubchannel
957 //
958 
959 // This class is a wrapper for Subchannel that hides details of the
960 // channel's implementation (such as the health check service name) from
961 // the LB policy API.
962 //
963 // Note that no synchronization is needed here, because even if the
964 // underlying subchannel is shared between channels, this wrapper will only
965 // be used within one channel, so it will always be synchronized by the
966 // control plane combiner.
967 class ChannelData::GrpcSubchannel : public SubchannelInterface {
968  public:
GrpcSubchannel(ChannelData * chand,Subchannel * subchannel,UniquePtr<char> health_check_service_name)969   GrpcSubchannel(ChannelData* chand, Subchannel* subchannel,
970                  UniquePtr<char> health_check_service_name)
971       : chand_(chand),
972         subchannel_(subchannel),
973         health_check_service_name_(std::move(health_check_service_name)) {
974     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "GrpcSubchannel");
975     auto* subchannel_node = subchannel_->channelz_node();
976     if (subchannel_node != nullptr) {
977       intptr_t subchannel_uuid = subchannel_node->uuid();
978       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
979       if (it == chand_->subchannel_refcount_map_.end()) {
980         chand_->channelz_node_->AddChildSubchannel(subchannel_uuid);
981         it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
982       }
983       ++it->second;
984     }
985   }
986 
~GrpcSubchannel()987   ~GrpcSubchannel() {
988     auto* subchannel_node = subchannel_->channelz_node();
989     if (subchannel_node != nullptr) {
990       intptr_t subchannel_uuid = subchannel_node->uuid();
991       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
992       GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
993       --it->second;
994       if (it->second == 0) {
995         chand_->channelz_node_->RemoveChildSubchannel(subchannel_uuid);
996         chand_->subchannel_refcount_map_.erase(it);
997       }
998     }
999     GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
1000     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "GrpcSubchannel");
1001   }
1002 
CheckConnectivityState(RefCountedPtr<ConnectedSubchannelInterface> * connected_subchannel)1003   grpc_connectivity_state CheckConnectivityState(
1004       RefCountedPtr<ConnectedSubchannelInterface>* connected_subchannel)
1005       override {
1006     RefCountedPtr<ConnectedSubchannel> tmp;
1007     auto retval = subchannel_->CheckConnectivityState(
1008         health_check_service_name_.get(), &tmp);
1009     *connected_subchannel = std::move(tmp);
1010     return retval;
1011   }
1012 
WatchConnectivityState(grpc_connectivity_state initial_state,UniquePtr<ConnectivityStateWatcher> watcher)1013   void WatchConnectivityState(
1014       grpc_connectivity_state initial_state,
1015       UniquePtr<ConnectivityStateWatcher> watcher) override {
1016     subchannel_->WatchConnectivityState(
1017         initial_state,
1018         UniquePtr<char>(gpr_strdup(health_check_service_name_.get())),
1019         std::move(watcher));
1020   }
1021 
CancelConnectivityStateWatch(ConnectivityStateWatcher * watcher)1022   void CancelConnectivityStateWatch(
1023       ConnectivityStateWatcher* watcher) override {
1024     subchannel_->CancelConnectivityStateWatch(health_check_service_name_.get(),
1025                                               watcher);
1026   }
1027 
AttemptToConnect()1028   void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
1029 
ResetBackoff()1030   void ResetBackoff() override { subchannel_->ResetBackoff(); }
1031 
1032  private:
1033   ChannelData* chand_;
1034   Subchannel* subchannel_;
1035   UniquePtr<char> health_check_service_name_;
1036 };
1037 
1038 //
1039 // ChannelData::ClientChannelControlHelper
1040 //
1041 
1042 class ChannelData::ClientChannelControlHelper
1043     : public LoadBalancingPolicy::ChannelControlHelper {
1044  public:
ClientChannelControlHelper(ChannelData * chand)1045   explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1046     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1047   }
1048 
~ClientChannelControlHelper()1049   ~ClientChannelControlHelper() override {
1050     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1051                              "ClientChannelControlHelper");
1052   }
1053 
CreateSubchannel(const grpc_channel_args & args)1054   RefCountedPtr<SubchannelInterface> CreateSubchannel(
1055       const grpc_channel_args& args) override {
1056     bool inhibit_health_checking = grpc_channel_arg_get_bool(
1057         grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1058     UniquePtr<char> health_check_service_name;
1059     if (!inhibit_health_checking) {
1060       health_check_service_name.reset(
1061           gpr_strdup(chand_->health_check_service_name_.get()));
1062     }
1063     static const char* args_to_remove[] = {
1064         GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1065         GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1066     };
1067     grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
1068         chand_->subchannel_pool_.get());
1069     grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1070         &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1);
1071     Subchannel* subchannel =
1072         chand_->client_channel_factory_->CreateSubchannel(new_args);
1073     grpc_channel_args_destroy(new_args);
1074     if (subchannel == nullptr) return nullptr;
1075     return MakeRefCounted<GrpcSubchannel>(chand_, subchannel,
1076                                           std::move(health_check_service_name));
1077   }
1078 
CreateChannel(const char * target,const grpc_channel_args & args)1079   grpc_channel* CreateChannel(const char* target,
1080                               const grpc_channel_args& args) override {
1081     return chand_->client_channel_factory_->CreateChannel(target, &args);
1082   }
1083 
UpdateState(grpc_connectivity_state state,UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)1084   void UpdateState(
1085       grpc_connectivity_state state,
1086       UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1087     grpc_error* disconnect_error =
1088         chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE);
1089     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1090       const char* extra = disconnect_error == GRPC_ERROR_NONE
1091                               ? ""
1092                               : " (ignoring -- channel shutting down)";
1093       gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
1094               grpc_connectivity_state_name(state), picker.get(), extra);
1095     }
1096     // Do update only if not shutting down.
1097     if (disconnect_error == GRPC_ERROR_NONE) {
1098       // Will delete itself.
1099       New<ConnectivityStateAndPickerSetter>(chand_, state, "helper",
1100                                             std::move(picker));
1101     }
1102   }
1103 
1104   // No-op -- we should never get this from ResolvingLoadBalancingPolicy.
RequestReresolution()1105   void RequestReresolution() override {}
1106 
AddTraceEvent(TraceSeverity severity,const char * message)1107   void AddTraceEvent(TraceSeverity severity, const char* message) override {
1108     if (chand_->channelz_node_ != nullptr) {
1109       chand_->channelz_node_->AddTraceEvent(
1110           ConvertSeverityEnum(severity),
1111           grpc_slice_from_copied_string(message));
1112     }
1113   }
1114 
1115  private:
ConvertSeverityEnum(TraceSeverity severity)1116   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1117       TraceSeverity severity) {
1118     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1119     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1120     return channelz::ChannelTrace::Error;
1121   }
1122 
1123   ChannelData* chand_;
1124 };
1125 
1126 //
1127 // ChannelData implementation
1128 //
1129 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1130 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1131                               grpc_channel_element_args* args) {
1132   GPR_ASSERT(args->is_last);
1133   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1134   grpc_error* error = GRPC_ERROR_NONE;
1135   new (elem->channel_data) ChannelData(args, &error);
1136   return error;
1137 }
1138 
Destroy(grpc_channel_element * elem)1139 void ChannelData::Destroy(grpc_channel_element* elem) {
1140   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1141   chand->~ChannelData();
1142 }
1143 
GetEnableRetries(const grpc_channel_args * args)1144 bool GetEnableRetries(const grpc_channel_args* args) {
1145   return grpc_channel_arg_get_bool(
1146       grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1147 }
1148 
GetMaxPerRpcRetryBufferSize(const grpc_channel_args * args)1149 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1150   return static_cast<size_t>(grpc_channel_arg_get_integer(
1151       grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1152       {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1153 }
1154 
GetSubchannelPool(const grpc_channel_args * args)1155 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1156     const grpc_channel_args* args) {
1157   const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1158       grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1159   if (use_local_subchannel_pool) {
1160     return MakeRefCounted<LocalSubchannelPool>();
1161   }
1162   return GlobalSubchannelPool::instance();
1163 }
1164 
GetChannelzNode(const grpc_channel_args * args)1165 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1166   const grpc_arg* arg =
1167       grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1168   if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1169     return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1170   }
1171   return nullptr;
1172 }
1173 
ChannelData(grpc_channel_element_args * args,grpc_error ** error)1174 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1175     : deadline_checking_enabled_(
1176           grpc_deadline_checking_enabled(args->channel_args)),
1177       enable_retries_(GetEnableRetries(args->channel_args)),
1178       per_rpc_retry_buffer_size_(
1179           GetMaxPerRpcRetryBufferSize(args->channel_args)),
1180       owning_stack_(args->channel_stack),
1181       client_channel_factory_(
1182           ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1183       channelz_node_(GetChannelzNode(args->channel_args)),
1184       data_plane_combiner_(grpc_combiner_create()),
1185       combiner_(grpc_combiner_create()),
1186       interested_parties_(grpc_pollset_set_create()),
1187       subchannel_pool_(GetSubchannelPool(args->channel_args)),
1188       disconnect_error_(GRPC_ERROR_NONE) {
1189   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1190     gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1191             this, owning_stack_);
1192   }
1193   // Initialize data members.
1194   grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
1195                                "client_channel");
1196   gpr_mu_init(&info_mu_);
1197   // Start backup polling.
1198   grpc_client_channel_start_backup_polling(interested_parties_);
1199   // Check client channel factory.
1200   if (client_channel_factory_ == nullptr) {
1201     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1202         "Missing client channel factory in args for client channel filter");
1203     return;
1204   }
1205   // Get server name to resolve, using proxy mapper if needed.
1206   const char* server_uri = grpc_channel_arg_get_string(
1207       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1208   if (server_uri == nullptr) {
1209     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1210         "server URI channel arg missing or wrong type in client channel "
1211         "filter");
1212     return;
1213   }
1214   // Get default service config
1215   const char* service_config_json = grpc_channel_arg_get_string(
1216       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1217   if (service_config_json != nullptr) {
1218     *error = GRPC_ERROR_NONE;
1219     default_service_config_ = ServiceConfig::Create(service_config_json, error);
1220     if (*error != GRPC_ERROR_NONE) {
1221       default_service_config_.reset();
1222       return;
1223     }
1224   }
1225   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1226   if (uri != nullptr && uri->path[0] != '\0') {
1227     server_name_.reset(
1228         gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
1229   }
1230   grpc_uri_destroy(uri);
1231   char* proxy_name = nullptr;
1232   grpc_channel_args* new_args = nullptr;
1233   grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
1234                               &new_args);
1235   target_uri_.reset(proxy_name != nullptr ? proxy_name
1236                                           : gpr_strdup(server_uri));
1237   channel_args_ = new_args != nullptr
1238                       ? new_args
1239                       : grpc_channel_args_copy(args->channel_args);
1240   if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1241     *error =
1242         GRPC_ERROR_CREATE_FROM_STATIC_STRING("the target uri is not valid.");
1243     return;
1244   }
1245   *error = GRPC_ERROR_NONE;
1246 }
1247 
~ChannelData()1248 ChannelData::~ChannelData() {
1249   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1250     gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1251   }
1252   DestroyResolvingLoadBalancingPolicyLocked();
1253   grpc_channel_args_destroy(channel_args_);
1254   // Stop backup polling.
1255   grpc_client_channel_stop_backup_polling(interested_parties_);
1256   grpc_pollset_set_destroy(interested_parties_);
1257   GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
1258   GRPC_COMBINER_UNREF(combiner_, "client_channel");
1259   GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1260   grpc_connectivity_state_destroy(&state_tracker_);
1261   gpr_mu_destroy(&info_mu_);
1262 }
1263 
CreateResolvingLoadBalancingPolicyLocked()1264 void ChannelData::CreateResolvingLoadBalancingPolicyLocked() {
1265   // Instantiate resolving LB policy.
1266   LoadBalancingPolicy::Args lb_args;
1267   lb_args.combiner = combiner_;
1268   lb_args.channel_control_helper =
1269       UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
1270           New<ClientChannelControlHelper>(this));
1271   lb_args.args = channel_args_;
1272   UniquePtr<char> target_uri(strdup(target_uri_.get()));
1273   resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
1274       std::move(lb_args), &grpc_client_channel_routing_trace,
1275       std::move(target_uri), ProcessResolverResultLocked, this));
1276   grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
1277                                    interested_parties_);
1278   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1279     gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
1280             resolving_lb_policy_.get());
1281   }
1282 }
1283 
DestroyResolvingLoadBalancingPolicyLocked()1284 void ChannelData::DestroyResolvingLoadBalancingPolicyLocked() {
1285   if (resolving_lb_policy_ != nullptr) {
1286     grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
1287                                      interested_parties_);
1288     resolving_lb_policy_.reset();
1289   }
1290 }
1291 
ProcessLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config,UniquePtr<char> * lb_policy_name,RefCountedPtr<LoadBalancingPolicy::Config> * lb_policy_config)1292 void ChannelData::ProcessLbPolicy(
1293     const Resolver::Result& resolver_result,
1294     const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
1295     UniquePtr<char>* lb_policy_name,
1296     RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
1297   // Prefer the LB policy name found in the service config.
1298   if (parsed_service_config != nullptr &&
1299       parsed_service_config->parsed_lb_config() != nullptr) {
1300     lb_policy_name->reset(
1301         gpr_strdup(parsed_service_config->parsed_lb_config()->name()));
1302     *lb_policy_config = parsed_service_config->parsed_lb_config();
1303     return;
1304   }
1305   const char* local_policy_name = nullptr;
1306   if (parsed_service_config != nullptr &&
1307       parsed_service_config->parsed_deprecated_lb_policy() != nullptr) {
1308     local_policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1309   } else {
1310     const grpc_arg* channel_arg =
1311         grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1312     local_policy_name = grpc_channel_arg_get_string(channel_arg);
1313   }
1314   // Special case: If at least one balancer address is present, we use
1315   // the grpclb policy, regardless of what the resolver has returned.
1316   bool found_balancer_address = false;
1317   for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
1318     const ServerAddress& address = resolver_result.addresses[i];
1319     if (address.IsBalancer()) {
1320       found_balancer_address = true;
1321       break;
1322     }
1323   }
1324   if (found_balancer_address) {
1325     if (local_policy_name != nullptr &&
1326         strcmp(local_policy_name, "grpclb") != 0) {
1327       gpr_log(GPR_INFO,
1328               "resolver requested LB policy %s but provided at least one "
1329               "balancer address -- forcing use of grpclb LB policy",
1330               local_policy_name);
1331     }
1332     local_policy_name = "grpclb";
1333   }
1334   // Use pick_first if nothing was specified and we didn't select grpclb
1335   // above.
1336   lb_policy_name->reset(gpr_strdup(
1337       local_policy_name == nullptr ? "pick_first" : local_policy_name));
1338 }
1339 
1340 // Synchronous callback from ResolvingLoadBalancingPolicy to process a
1341 // resolver result update.
ProcessResolverResultLocked(void * arg,const Resolver::Result & result,const char ** lb_policy_name,RefCountedPtr<LoadBalancingPolicy::Config> * lb_policy_config,grpc_error ** service_config_error)1342 bool ChannelData::ProcessResolverResultLocked(
1343     void* arg, const Resolver::Result& result, const char** lb_policy_name,
1344     RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config,
1345     grpc_error** service_config_error) {
1346   ChannelData* chand = static_cast<ChannelData*>(arg);
1347   RefCountedPtr<ServiceConfig> service_config;
1348   // If resolver did not return a service config or returned an invalid service
1349   // config, we need a fallback service config.
1350   if (result.service_config_error != GRPC_ERROR_NONE) {
1351     // If the service config was invalid, then fallback to the saved service
1352     // config. If there is no saved config either, use the default service
1353     // config.
1354     if (chand->saved_service_config_ != nullptr) {
1355       service_config = chand->saved_service_config_;
1356       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1357         gpr_log(GPR_INFO,
1358                 "chand=%p: resolver returned invalid service config. "
1359                 "Continuing to use previous service config.",
1360                 chand);
1361       }
1362     } else if (chand->default_service_config_ != nullptr) {
1363       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1364         gpr_log(GPR_INFO,
1365                 "chand=%p: resolver returned invalid service config. Using "
1366                 "default service config provided by client API.",
1367                 chand);
1368       }
1369       service_config = chand->default_service_config_;
1370     }
1371   } else if (result.service_config == nullptr) {
1372     if (chand->default_service_config_ != nullptr) {
1373       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1374         gpr_log(GPR_INFO,
1375                 "chand=%p: resolver returned no service config. Using default "
1376                 "service config provided by client API.",
1377                 chand);
1378       }
1379       service_config = chand->default_service_config_;
1380     }
1381   } else {
1382     service_config = result.service_config;
1383   }
1384   *service_config_error = GRPC_ERROR_REF(result.service_config_error);
1385   if (service_config == nullptr &&
1386       result.service_config_error != GRPC_ERROR_NONE) {
1387     return false;
1388   }
1389   // Process service config.
1390   UniquePtr<char> service_config_json;
1391   const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1392       nullptr;
1393   if (service_config != nullptr) {
1394     parsed_service_config =
1395         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1396             service_config->GetGlobalParsedConfig(
1397                 internal::ClientChannelServiceConfigParser::ParserIndex()));
1398   }
1399   // Check if the config has changed.
1400   const bool service_config_changed =
1401       ((service_config == nullptr) !=
1402        (chand->saved_service_config_ == nullptr)) ||
1403       (service_config != nullptr &&
1404        strcmp(service_config->service_config_json(),
1405               chand->saved_service_config_->service_config_json()) != 0);
1406   if (service_config_changed) {
1407     service_config_json.reset(gpr_strdup(
1408         service_config != nullptr ? service_config->service_config_json()
1409                                   : ""));
1410     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1411       gpr_log(GPR_INFO,
1412               "chand=%p: resolver returned updated service config: \"%s\"",
1413               chand, service_config_json.get());
1414     }
1415     // Save health check service name.
1416     if (service_config != nullptr) {
1417       chand->health_check_service_name_.reset(
1418           gpr_strdup(parsed_service_config->health_check_service_name()));
1419     } else {
1420       chand->health_check_service_name_.reset();
1421     }
1422     // Save service config.
1423     chand->saved_service_config_ = std::move(service_config);
1424   }
1425   // We want to set the service config at least once. This should not really be
1426   // needed, but we are doing it as a defensive approach. This can be removed,
1427   // if we feel it is unnecessary.
1428   if (service_config_changed || !chand->received_first_resolver_result_) {
1429     chand->received_first_resolver_result_ = true;
1430     Optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
1431         retry_throttle_data;
1432     if (parsed_service_config != nullptr) {
1433       retry_throttle_data = parsed_service_config->retry_throttling();
1434     }
1435     // Create service config setter to update channel state in the data
1436     // plane combiner.  Destroys itself when done.
1437     New<ServiceConfigSetter>(chand, retry_throttle_data,
1438                              chand->saved_service_config_);
1439   }
1440   UniquePtr<char> processed_lb_policy_name;
1441   chand->ProcessLbPolicy(result, parsed_service_config,
1442                          &processed_lb_policy_name, lb_policy_config);
1443   // Swap out the data used by GetChannelInfo().
1444   {
1445     MutexLock lock(&chand->info_mu_);
1446     chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
1447     if (service_config_json != nullptr) {
1448       chand->info_service_config_json_ = std::move(service_config_json);
1449     }
1450   }
1451   // Return results.
1452   *lb_policy_name = chand->info_lb_policy_name_.get();
1453   return service_config_changed;
1454 }
1455 
DoPingLocked(grpc_transport_op * op)1456 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
1457   if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
1458     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1459   }
1460   LoadBalancingPolicy::PickResult result =
1461       picker_->Pick(LoadBalancingPolicy::PickArgs());
1462   if (result.connected_subchannel != nullptr) {
1463     ConnectedSubchannel* connected_subchannel =
1464         static_cast<ConnectedSubchannel*>(result.connected_subchannel.get());
1465     connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
1466   } else {
1467     if (result.error == GRPC_ERROR_NONE) {
1468       result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1469           "LB policy dropped call on ping");
1470     }
1471   }
1472   return result.error;
1473 }
1474 
StartTransportOpLocked(void * arg,grpc_error * ignored)1475 void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
1476   grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
1477   grpc_channel_element* elem =
1478       static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
1479   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1480   // Connectivity watch.
1481   if (op->on_connectivity_state_change != nullptr) {
1482     grpc_connectivity_state_notify_on_state_change(
1483         &chand->state_tracker_, op->connectivity_state,
1484         op->on_connectivity_state_change);
1485     op->on_connectivity_state_change = nullptr;
1486     op->connectivity_state = nullptr;
1487   }
1488   // Ping.
1489   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1490     grpc_error* error = chand->DoPingLocked(op);
1491     if (error != GRPC_ERROR_NONE) {
1492       GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
1493       GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
1494     }
1495     op->bind_pollset = nullptr;
1496     op->send_ping.on_initiate = nullptr;
1497     op->send_ping.on_ack = nullptr;
1498   }
1499   // Reset backoff.
1500   if (op->reset_connect_backoff) {
1501     if (chand->resolving_lb_policy_ != nullptr) {
1502       chand->resolving_lb_policy_->ResetBackoffLocked();
1503     }
1504   }
1505   // Disconnect.
1506   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1507     grpc_error* error = GRPC_ERROR_NONE;
1508     GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
1509         &error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
1510         MemoryOrder::ACQUIRE));
1511     chand->DestroyResolvingLoadBalancingPolicyLocked();
1512     // Will delete itself.
1513     New<ConnectivityStateAndPickerSetter>(
1514         chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
1515         UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
1516             New<LoadBalancingPolicy::TransientFailurePicker>(
1517                 GRPC_ERROR_REF(op->disconnect_with_error))));
1518   }
1519   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
1520   GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
1521 }
1522 
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1523 void ChannelData::StartTransportOp(grpc_channel_element* elem,
1524                                    grpc_transport_op* op) {
1525   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1526   GPR_ASSERT(op->set_accept_stream == false);
1527   // Handle bind_pollset.
1528   if (op->bind_pollset != nullptr) {
1529     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1530   }
1531   // Pop into control plane combiner for remaining ops.
1532   op->handler_private.extra_arg = elem;
1533   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1534   GRPC_CLOSURE_SCHED(
1535       GRPC_CLOSURE_INIT(&op->handler_private.closure,
1536                         ChannelData::StartTransportOpLocked, op,
1537                         grpc_combiner_scheduler(chand->combiner_)),
1538       GRPC_ERROR_NONE);
1539 }
1540 
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1541 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
1542                                  const grpc_channel_info* info) {
1543   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1544   MutexLock lock(&chand->info_mu_);
1545   if (info->lb_policy_name != nullptr) {
1546     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1547   }
1548   if (info->service_config_json != nullptr) {
1549     *info->service_config_json =
1550         gpr_strdup(chand->info_service_config_json_.get());
1551   }
1552 }
1553 
AddQueuedPick(QueuedPick * pick,grpc_polling_entity * pollent)1554 void ChannelData::AddQueuedPick(QueuedPick* pick,
1555                                 grpc_polling_entity* pollent) {
1556   // Add call to queued picks list.
1557   pick->next = queued_picks_;
1558   queued_picks_ = pick;
1559   // Add call's pollent to channel's interested_parties, so that I/O
1560   // can be done under the call's CQ.
1561   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1562 }
1563 
RemoveQueuedPick(QueuedPick * to_remove,grpc_polling_entity * pollent)1564 void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
1565                                    grpc_polling_entity* pollent) {
1566   // Remove call's pollent from channel's interested_parties.
1567   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1568   // Remove from queued picks list.
1569   for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
1570        pick = &(*pick)->next) {
1571     if (*pick == to_remove) {
1572       *pick = to_remove->next;
1573       return;
1574     }
1575   }
1576 }
1577 
TryToConnectLocked(void * arg,grpc_error * error_ignored)1578 void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
1579   auto* chand = static_cast<ChannelData*>(arg);
1580   if (chand->resolving_lb_policy_ != nullptr) {
1581     chand->resolving_lb_policy_->ExitIdleLocked();
1582   } else {
1583     chand->CreateResolvingLoadBalancingPolicyLocked();
1584   }
1585   GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
1586 }
1587 
CheckConnectivityState(bool try_to_connect)1588 grpc_connectivity_state ChannelData::CheckConnectivityState(
1589     bool try_to_connect) {
1590   grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
1591   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1592     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1593     GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
1594                                            grpc_combiner_scheduler(combiner_)),
1595                        GRPC_ERROR_NONE);
1596   }
1597   return out;
1598 }
1599 
1600 //
1601 // CallData implementation
1602 //
1603 
1604 // Retry support:
1605 //
1606 // In order to support retries, we act as a proxy for stream op batches.
1607 // When we get a batch from the surface, we add it to our list of pending
1608 // batches, and we then use those batches to construct separate "child"
1609 // batches to be started on the subchannel call.  When the child batches
1610 // return, we then decide which pending batches have been completed and
1611 // schedule their callbacks accordingly.  If a subchannel call fails and
1612 // we want to retry it, we do a new pick and start again, constructing
1613 // new "child" batches for the new subchannel call.
1614 //
1615 // Note that retries are committed when receiving data from the server
1616 // (except for Trailers-Only responses).  However, there may be many
1617 // send ops started before receiving any data, so we may have already
1618 // completed some number of send ops (and returned the completions up to
1619 // the surface) by the time we realize that we need to retry.  To deal
1620 // with this, we cache data for send ops, so that we can replay them on a
1621 // different subchannel call even after we have completed the original
1622 // batches.
1623 //
1624 // There are two sets of data to maintain:
1625 // - In call_data (in the parent channel), we maintain a list of pending
1626 //   ops and cached data for send ops.
1627 // - In the subchannel call, we maintain state to indicate what ops have
1628 //   already been sent down to that call.
1629 //
1630 // When constructing the "child" batches, we compare those two sets of
1631 // data to see which batches need to be sent to the subchannel call.
1632 
1633 // TODO(roth): In subsequent PRs:
1634 // - add support for transparent retries (including initial metadata)
1635 // - figure out how to record stats in census for retries
1636 //   (census filter is on top of this one)
1637 // - add census stats for retries
1638 
CallData(grpc_call_element * elem,const ChannelData & chand,const grpc_call_element_args & args)1639 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
1640                    const grpc_call_element_args& args)
1641     : deadline_state_(elem, args.call_stack, args.call_combiner,
1642                       GPR_LIKELY(chand.deadline_checking_enabled())
1643                           ? args.deadline
1644                           : GRPC_MILLIS_INF_FUTURE),
1645       path_(grpc_slice_ref_internal(args.path)),
1646       call_start_time_(args.start_time),
1647       deadline_(args.deadline),
1648       arena_(args.arena),
1649       owning_call_(args.call_stack),
1650       call_combiner_(args.call_combiner),
1651       call_context_(args.context),
1652       lb_call_state_(this),
1653       pending_send_initial_metadata_(false),
1654       pending_send_message_(false),
1655       pending_send_trailing_metadata_(false),
1656       enable_retries_(chand.enable_retries()),
1657       retry_committed_(false),
1658       last_attempt_got_server_pushback_(false) {}
1659 
~CallData()1660 CallData::~CallData() {
1661   grpc_slice_unref_internal(path_);
1662   GRPC_ERROR_UNREF(cancel_error_);
1663   // Make sure there are no remaining pending batches.
1664   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1665     GPR_ASSERT(pending_batches_[i].batch == nullptr);
1666   }
1667 }
1668 
Init(grpc_call_element * elem,const grpc_call_element_args * args)1669 grpc_error* CallData::Init(grpc_call_element* elem,
1670                            const grpc_call_element_args* args) {
1671   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1672   new (elem->call_data) CallData(elem, *chand, *args);
1673   return GRPC_ERROR_NONE;
1674 }
1675 
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)1676 void CallData::Destroy(grpc_call_element* elem,
1677                        const grpc_call_final_info* final_info,
1678                        grpc_closure* then_schedule_closure) {
1679   CallData* calld = static_cast<CallData*>(elem->call_data);
1680   if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
1681     calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
1682     then_schedule_closure = nullptr;
1683   }
1684   calld->~CallData();
1685   GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
1686 }
1687 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1688 void CallData::StartTransportStreamOpBatch(
1689     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1690   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
1691   CallData* calld = static_cast<CallData*>(elem->call_data);
1692   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1693   if (GPR_LIKELY(chand->deadline_checking_enabled())) {
1694     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
1695   }
1696   // If we've previously been cancelled, immediately fail any new batches.
1697   if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
1698     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1699       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
1700               chand, calld, grpc_error_string(calld->cancel_error_));
1701     }
1702     // Note: This will release the call combiner.
1703     grpc_transport_stream_op_batch_finish_with_failure(
1704         batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1705     return;
1706   }
1707   // Handle cancellation.
1708   if (GPR_UNLIKELY(batch->cancel_stream)) {
1709     // Stash a copy of cancel_error in our call data, so that we can use
1710     // it for subsequent operations.  This ensures that if the call is
1711     // cancelled before any batches are passed down (e.g., if the deadline
1712     // is in the past when the call starts), we can return the right
1713     // error to the caller when the first batch does get passed down.
1714     GRPC_ERROR_UNREF(calld->cancel_error_);
1715     calld->cancel_error_ =
1716         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
1717     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1718       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
1719               calld, grpc_error_string(calld->cancel_error_));
1720     }
1721     // If we do not have a subchannel call (i.e., a pick has not yet
1722     // been started), fail all pending batches.  Otherwise, send the
1723     // cancellation down to the subchannel call.
1724     if (calld->subchannel_call_ == nullptr) {
1725       // TODO(roth): If there is a pending retry callback, do we need to
1726       // cancel it here?
1727       calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
1728                                 NoYieldCallCombiner);
1729       // Note: This will release the call combiner.
1730       grpc_transport_stream_op_batch_finish_with_failure(
1731           batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1732     } else {
1733       // Note: This will release the call combiner.
1734       calld->subchannel_call_->StartTransportStreamOpBatch(batch);
1735     }
1736     return;
1737   }
1738   // Add the batch to the pending list.
1739   calld->PendingBatchesAdd(elem, batch);
1740   // Check if we've already gotten a subchannel call.
1741   // Note that once we have completed the pick, we do not need to enter
1742   // the channel combiner, which is more efficient (especially for
1743   // streaming calls).
1744   if (calld->subchannel_call_ != nullptr) {
1745     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1746       gpr_log(GPR_INFO,
1747               "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
1748               calld, calld->subchannel_call_.get());
1749     }
1750     calld->PendingBatchesResume(elem);
1751     return;
1752   }
1753   // We do not yet have a subchannel call.
1754   // For batches containing a send_initial_metadata op, enter the channel
1755   // combiner to start a pick.
1756   if (GPR_LIKELY(batch->send_initial_metadata)) {
1757     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1758       gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
1759               chand, calld);
1760     }
1761     GRPC_CLOSURE_SCHED(
1762         GRPC_CLOSURE_INIT(
1763             &batch->handler_private.closure, StartPickLocked, elem,
1764             grpc_combiner_scheduler(chand->data_plane_combiner())),
1765         GRPC_ERROR_NONE);
1766   } else {
1767     // For all other batches, release the call combiner.
1768     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1769       gpr_log(GPR_INFO,
1770               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
1771               calld);
1772     }
1773     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1774                             "batch does not include send_initial_metadata");
1775   }
1776 }
1777 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)1778 void CallData::SetPollent(grpc_call_element* elem,
1779                           grpc_polling_entity* pollent) {
1780   CallData* calld = static_cast<CallData*>(elem->call_data);
1781   calld->pollent_ = pollent;
1782 }
1783 
1784 //
1785 // send op data caching
1786 //
1787 
MaybeCacheSendOpsForBatch(PendingBatch * pending)1788 void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
1789   if (pending->send_ops_cached) return;
1790   pending->send_ops_cached = true;
1791   grpc_transport_stream_op_batch* batch = pending->batch;
1792   // Save a copy of metadata for send_initial_metadata ops.
1793   if (batch->send_initial_metadata) {
1794     seen_send_initial_metadata_ = true;
1795     GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
1796     grpc_metadata_batch* send_initial_metadata =
1797         batch->payload->send_initial_metadata.send_initial_metadata;
1798     send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1799         sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
1800     grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
1801                              send_initial_metadata_storage_);
1802     send_initial_metadata_flags_ =
1803         batch->payload->send_initial_metadata.send_initial_metadata_flags;
1804     peer_string_ = batch->payload->send_initial_metadata.peer_string;
1805   }
1806   // Set up cache for send_message ops.
1807   if (batch->send_message) {
1808     ByteStreamCache* cache = arena_->New<ByteStreamCache>(
1809         std::move(batch->payload->send_message.send_message));
1810     send_messages_.push_back(cache);
1811   }
1812   // Save metadata batch for send_trailing_metadata ops.
1813   if (batch->send_trailing_metadata) {
1814     seen_send_trailing_metadata_ = true;
1815     GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
1816     grpc_metadata_batch* send_trailing_metadata =
1817         batch->payload->send_trailing_metadata.send_trailing_metadata;
1818     send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
1819         sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1820     grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
1821                              send_trailing_metadata_storage_);
1822   }
1823 }
1824 
FreeCachedSendInitialMetadata(ChannelData * chand)1825 void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
1826   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1827     gpr_log(GPR_INFO,
1828             "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1829             this);
1830   }
1831   grpc_metadata_batch_destroy(&send_initial_metadata_);
1832 }
1833 
FreeCachedSendMessage(ChannelData * chand,size_t idx)1834 void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
1835   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1836     gpr_log(GPR_INFO,
1837             "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1838             chand, this, idx);
1839   }
1840   send_messages_[idx]->Destroy();
1841 }
1842 
FreeCachedSendTrailingMetadata(ChannelData * chand)1843 void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
1844   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1845     gpr_log(GPR_INFO,
1846             "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1847             chand, this);
1848   }
1849   grpc_metadata_batch_destroy(&send_trailing_metadata_);
1850 }
1851 
FreeCachedSendOpDataAfterCommit(grpc_call_element * elem,SubchannelCallRetryState * retry_state)1852 void CallData::FreeCachedSendOpDataAfterCommit(
1853     grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
1854   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1855   if (retry_state->completed_send_initial_metadata) {
1856     FreeCachedSendInitialMetadata(chand);
1857   }
1858   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
1859     FreeCachedSendMessage(chand, i);
1860   }
1861   if (retry_state->completed_send_trailing_metadata) {
1862     FreeCachedSendTrailingMetadata(chand);
1863   }
1864 }
1865 
FreeCachedSendOpDataForCompletedBatch(grpc_call_element * elem,SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state)1866 void CallData::FreeCachedSendOpDataForCompletedBatch(
1867     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
1868     SubchannelCallRetryState* retry_state) {
1869   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1870   if (batch_data->batch.send_initial_metadata) {
1871     FreeCachedSendInitialMetadata(chand);
1872   }
1873   if (batch_data->batch.send_message) {
1874     FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
1875   }
1876   if (batch_data->batch.send_trailing_metadata) {
1877     FreeCachedSendTrailingMetadata(chand);
1878   }
1879 }
1880 
1881 //
1882 // LB recv_trailing_metadata_ready handling
1883 //
1884 
RecvTrailingMetadataReadyForLoadBalancingPolicy(void * arg,grpc_error * error)1885 void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
1886     void* arg, grpc_error* error) {
1887   CallData* calld = static_cast<CallData*>(arg);
1888   // Invoke callback to LB policy.
1889   calld->lb_recv_trailing_metadata_ready_(
1890       calld->lb_recv_trailing_metadata_ready_user_data_,
1891       calld->recv_trailing_metadata_, &calld->lb_call_state_);
1892   // Chain to original callback.
1893   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
1894                    GRPC_ERROR_REF(error));
1895 }
1896 
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(grpc_transport_stream_op_batch * batch)1897 void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
1898     grpc_transport_stream_op_batch* batch) {
1899   if (lb_recv_trailing_metadata_ready_ != nullptr) {
1900     recv_trailing_metadata_ =
1901         batch->payload->recv_trailing_metadata.recv_trailing_metadata;
1902     original_recv_trailing_metadata_ready_ =
1903         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1904     GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
1905                       RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
1906                       grpc_schedule_on_exec_ctx);
1907     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1908         &recv_trailing_metadata_ready_;
1909   }
1910 }
1911 
1912 //
1913 // pending_batches management
1914 //
1915 
GetBatchIndex(grpc_transport_stream_op_batch * batch)1916 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
1917   // Note: It is important the send_initial_metadata be the first entry
1918   // here, since the code in pick_subchannel_locked() assumes it will be.
1919   if (batch->send_initial_metadata) return 0;
1920   if (batch->send_message) return 1;
1921   if (batch->send_trailing_metadata) return 2;
1922   if (batch->recv_initial_metadata) return 3;
1923   if (batch->recv_message) return 4;
1924   if (batch->recv_trailing_metadata) return 5;
1925   GPR_UNREACHABLE_CODE(return (size_t)-1);
1926 }
1927 
1928 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1929 void CallData::PendingBatchesAdd(grpc_call_element* elem,
1930                                  grpc_transport_stream_op_batch* batch) {
1931   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1932   const size_t idx = GetBatchIndex(batch);
1933   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1934     gpr_log(GPR_INFO,
1935             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1936             this, idx);
1937   }
1938   PendingBatch* pending = &pending_batches_[idx];
1939   GPR_ASSERT(pending->batch == nullptr);
1940   pending->batch = batch;
1941   pending->send_ops_cached = false;
1942   if (enable_retries_) {
1943     // Update state in calld about pending batches.
1944     // Also check if the batch takes us over the retry buffer limit.
1945     // Note: We don't check the size of trailing metadata here, because
1946     // gRPC clients do not send trailing metadata.
1947     if (batch->send_initial_metadata) {
1948       pending_send_initial_metadata_ = true;
1949       bytes_buffered_for_retry_ += grpc_metadata_batch_size(
1950           batch->payload->send_initial_metadata.send_initial_metadata);
1951     }
1952     if (batch->send_message) {
1953       pending_send_message_ = true;
1954       bytes_buffered_for_retry_ +=
1955           batch->payload->send_message.send_message->length();
1956     }
1957     if (batch->send_trailing_metadata) {
1958       pending_send_trailing_metadata_ = true;
1959     }
1960     if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
1961                      chand->per_rpc_retry_buffer_size())) {
1962       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1963         gpr_log(GPR_INFO,
1964                 "chand=%p calld=%p: exceeded retry buffer size, committing",
1965                 chand, this);
1966       }
1967       SubchannelCallRetryState* retry_state =
1968           subchannel_call_ == nullptr ? nullptr
1969                                       : static_cast<SubchannelCallRetryState*>(
1970                                             subchannel_call_->GetParentData());
1971       RetryCommit(elem, retry_state);
1972       // If we are not going to retry and have not yet started, pretend
1973       // retries are disabled so that we don't bother with retry overhead.
1974       if (num_attempts_completed_ == 0) {
1975         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1976           gpr_log(GPR_INFO,
1977                   "chand=%p calld=%p: disabling retries before first attempt",
1978                   chand, this);
1979         }
1980         enable_retries_ = false;
1981       }
1982     }
1983   }
1984 }
1985 
PendingBatchClear(PendingBatch * pending)1986 void CallData::PendingBatchClear(PendingBatch* pending) {
1987   if (enable_retries_) {
1988     if (pending->batch->send_initial_metadata) {
1989       pending_send_initial_metadata_ = false;
1990     }
1991     if (pending->batch->send_message) {
1992       pending_send_message_ = false;
1993     }
1994     if (pending->batch->send_trailing_metadata) {
1995       pending_send_trailing_metadata_ = false;
1996     }
1997   }
1998   pending->batch = nullptr;
1999 }
2000 
MaybeClearPendingBatch(grpc_call_element * elem,PendingBatch * pending)2001 void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
2002                                       PendingBatch* pending) {
2003   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2004   grpc_transport_stream_op_batch* batch = pending->batch;
2005   // We clear the pending batch if all of its callbacks have been
2006   // scheduled and reset to nullptr.
2007   if (batch->on_complete == nullptr &&
2008       (!batch->recv_initial_metadata ||
2009        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2010            nullptr) &&
2011       (!batch->recv_message ||
2012        batch->payload->recv_message.recv_message_ready == nullptr) &&
2013       (!batch->recv_trailing_metadata ||
2014        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2015            nullptr)) {
2016     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2017       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
2018               this);
2019     }
2020     PendingBatchClear(pending);
2021   }
2022 }
2023 
2024 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)2025 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2026   grpc_transport_stream_op_batch* batch =
2027       static_cast<grpc_transport_stream_op_batch*>(arg);
2028   CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2029   // Note: This will release the call combiner.
2030   grpc_transport_stream_op_batch_finish_with_failure(
2031       batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2032 }
2033 
2034 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_call_element * elem,grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)2035 void CallData::PendingBatchesFail(
2036     grpc_call_element* elem, grpc_error* error,
2037     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2038   GPR_ASSERT(error != GRPC_ERROR_NONE);
2039   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2040     size_t num_batches = 0;
2041     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2042       if (pending_batches_[i].batch != nullptr) ++num_batches;
2043     }
2044     gpr_log(GPR_INFO,
2045             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2046             elem->channel_data, this, num_batches, grpc_error_string(error));
2047   }
2048   CallCombinerClosureList closures;
2049   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2050     PendingBatch* pending = &pending_batches_[i];
2051     grpc_transport_stream_op_batch* batch = pending->batch;
2052     if (batch != nullptr) {
2053       if (batch->recv_trailing_metadata) {
2054         MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2055       }
2056       batch->handler_private.extra_arg = this;
2057       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2058                         FailPendingBatchInCallCombiner, batch,
2059                         grpc_schedule_on_exec_ctx);
2060       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2061                    "PendingBatchesFail");
2062       PendingBatchClear(pending);
2063     }
2064   }
2065   if (yield_call_combiner_predicate(closures)) {
2066     closures.RunClosures(call_combiner_);
2067   } else {
2068     closures.RunClosuresWithoutYielding(call_combiner_);
2069   }
2070   GRPC_ERROR_UNREF(error);
2071 }
2072 
2073 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error * ignored)2074 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2075                                                 grpc_error* ignored) {
2076   grpc_transport_stream_op_batch* batch =
2077       static_cast<grpc_transport_stream_op_batch*>(arg);
2078   SubchannelCall* subchannel_call =
2079       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2080   // Note: This will release the call combiner.
2081   subchannel_call->StartTransportStreamOpBatch(batch);
2082 }
2083 
2084 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume(grpc_call_element * elem)2085 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2086   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2087   if (enable_retries_) {
2088     StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
2089     return;
2090   }
2091   // Retries not enabled; send down batches as-is.
2092   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2093     size_t num_batches = 0;
2094     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2095       if (pending_batches_[i].batch != nullptr) ++num_batches;
2096     }
2097     gpr_log(GPR_INFO,
2098             "chand=%p calld=%p: starting %" PRIuPTR
2099             " pending batches on subchannel_call=%p",
2100             chand, this, num_batches, subchannel_call_.get());
2101   }
2102   CallCombinerClosureList closures;
2103   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2104     PendingBatch* pending = &pending_batches_[i];
2105     grpc_transport_stream_op_batch* batch = pending->batch;
2106     if (batch != nullptr) {
2107       if (batch->recv_trailing_metadata) {
2108         MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2109       }
2110       batch->handler_private.extra_arg = subchannel_call_.get();
2111       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2112                         ResumePendingBatchInCallCombiner, batch,
2113                         grpc_schedule_on_exec_ctx);
2114       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2115                    "PendingBatchesResume");
2116       PendingBatchClear(pending);
2117     }
2118   }
2119   // Note: This will release the call combiner.
2120   closures.RunClosures(call_combiner_);
2121 }
2122 
2123 template <typename Predicate>
PendingBatchFind(grpc_call_element * elem,const char * log_message,Predicate predicate)2124 CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
2125                                                    const char* log_message,
2126                                                    Predicate predicate) {
2127   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2128   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2129     PendingBatch* pending = &pending_batches_[i];
2130     grpc_transport_stream_op_batch* batch = pending->batch;
2131     if (batch != nullptr && predicate(batch)) {
2132       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2133         gpr_log(GPR_INFO,
2134                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
2135                 this, log_message, i);
2136       }
2137       return pending;
2138     }
2139   }
2140   return nullptr;
2141 }
2142 
2143 //
2144 // retry code
2145 //
2146 
RetryCommit(grpc_call_element * elem,SubchannelCallRetryState * retry_state)2147 void CallData::RetryCommit(grpc_call_element* elem,
2148                            SubchannelCallRetryState* retry_state) {
2149   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2150   if (retry_committed_) return;
2151   retry_committed_ = true;
2152   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2153     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
2154   }
2155   if (retry_state != nullptr) {
2156     FreeCachedSendOpDataAfterCommit(elem, retry_state);
2157   }
2158 }
2159 
DoRetry(grpc_call_element * elem,SubchannelCallRetryState * retry_state,grpc_millis server_pushback_ms)2160 void CallData::DoRetry(grpc_call_element* elem,
2161                        SubchannelCallRetryState* retry_state,
2162                        grpc_millis server_pushback_ms) {
2163   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2164   GPR_ASSERT(method_params_ != nullptr);
2165   const auto* retry_policy = method_params_->retry_policy();
2166   GPR_ASSERT(retry_policy != nullptr);
2167   // Reset subchannel call.
2168   subchannel_call_.reset();
2169   // Compute backoff delay.
2170   grpc_millis next_attempt_time;
2171   if (server_pushback_ms >= 0) {
2172     next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2173     last_attempt_got_server_pushback_ = true;
2174   } else {
2175     if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2176       retry_backoff_.Init(
2177           BackOff::Options()
2178               .set_initial_backoff(retry_policy->initial_backoff)
2179               .set_multiplier(retry_policy->backoff_multiplier)
2180               .set_jitter(RETRY_BACKOFF_JITTER)
2181               .set_max_backoff(retry_policy->max_backoff));
2182       last_attempt_got_server_pushback_ = false;
2183     }
2184     next_attempt_time = retry_backoff_->NextAttemptTime();
2185   }
2186   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2187     gpr_log(GPR_INFO,
2188             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
2189             this, next_attempt_time - ExecCtx::Get()->Now());
2190   }
2191   // Schedule retry after computed delay.
2192   GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem,
2193                     grpc_combiner_scheduler(chand->data_plane_combiner()));
2194   grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
2195   // Update bookkeeping.
2196   if (retry_state != nullptr) retry_state->retry_dispatched = true;
2197 }
2198 
MaybeRetry(grpc_call_element * elem,SubchannelCallBatchData * batch_data,grpc_status_code status,grpc_mdelem * server_pushback_md)2199 bool CallData::MaybeRetry(grpc_call_element* elem,
2200                           SubchannelCallBatchData* batch_data,
2201                           grpc_status_code status,
2202                           grpc_mdelem* server_pushback_md) {
2203   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2204   // Get retry policy.
2205   if (method_params_ == nullptr) return false;
2206   const auto* retry_policy = method_params_->retry_policy();
2207   if (retry_policy == nullptr) return false;
2208   // If we've already dispatched a retry from this call, return true.
2209   // This catches the case where the batch has multiple callbacks
2210   // (i.e., it includes either recv_message or recv_initial_metadata).
2211   SubchannelCallRetryState* retry_state = nullptr;
2212   if (batch_data != nullptr) {
2213     retry_state = static_cast<SubchannelCallRetryState*>(
2214         batch_data->subchannel_call->GetParentData());
2215     if (retry_state->retry_dispatched) {
2216       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2217         gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
2218                 this);
2219       }
2220       return true;
2221     }
2222   }
2223   // Check status.
2224   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
2225     if (retry_throttle_data_ != nullptr) {
2226       retry_throttle_data_->RecordSuccess();
2227     }
2228     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2229       gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
2230     }
2231     return false;
2232   }
2233   // Status is not OK.  Check whether the status is retryable.
2234   if (!retry_policy->retryable_status_codes.Contains(status)) {
2235     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2236       gpr_log(GPR_INFO,
2237               "chand=%p calld=%p: status %s not configured as retryable", chand,
2238               this, grpc_status_code_to_string(status));
2239     }
2240     return false;
2241   }
2242   // Record the failure and check whether retries are throttled.
2243   // Note that it's important for this check to come after the status
2244   // code check above, since we should only record failures whose statuses
2245   // match the configured retryable status codes, so that we don't count
2246   // things like failures due to malformed requests (INVALID_ARGUMENT).
2247   // Conversely, it's important for this to come before the remaining
2248   // checks, so that we don't fail to record failures due to other factors.
2249   if (retry_throttle_data_ != nullptr &&
2250       !retry_throttle_data_->RecordFailure()) {
2251     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2252       gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
2253     }
2254     return false;
2255   }
2256   // Check whether the call is committed.
2257   if (retry_committed_) {
2258     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2259       gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
2260               this);
2261     }
2262     return false;
2263   }
2264   // Check whether we have retries remaining.
2265   ++num_attempts_completed_;
2266   if (num_attempts_completed_ >= retry_policy->max_attempts) {
2267     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2268       gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
2269               this, retry_policy->max_attempts);
2270     }
2271     return false;
2272   }
2273   // If the call was cancelled from the surface, don't retry.
2274   if (cancel_error_ != GRPC_ERROR_NONE) {
2275     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2276       gpr_log(GPR_INFO,
2277               "chand=%p calld=%p: call cancelled from surface, not retrying",
2278               chand, this);
2279     }
2280     return false;
2281   }
2282   // Check server push-back.
2283   grpc_millis server_pushback_ms = -1;
2284   if (server_pushback_md != nullptr) {
2285     // If the value is "-1" or any other unparseable string, we do not retry.
2286     uint32_t ms;
2287     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
2288       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2289         gpr_log(GPR_INFO,
2290                 "chand=%p calld=%p: not retrying due to server push-back",
2291                 chand, this);
2292       }
2293       return false;
2294     } else {
2295       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2296         gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
2297                 chand, this, ms);
2298       }
2299       server_pushback_ms = (grpc_millis)ms;
2300     }
2301   }
2302   DoRetry(elem, retry_state, server_pushback_ms);
2303   return true;
2304 }
2305 
2306 //
2307 // CallData::SubchannelCallBatchData
2308 //
2309 
Create(grpc_call_element * elem,int refcount,bool set_on_complete)2310 CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
2311     grpc_call_element* elem, int refcount, bool set_on_complete) {
2312   CallData* calld = static_cast<CallData*>(elem->call_data);
2313   return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
2314                                                      set_on_complete);
2315 }
2316 
SubchannelCallBatchData(grpc_call_element * elem,CallData * calld,int refcount,bool set_on_complete)2317 CallData::SubchannelCallBatchData::SubchannelCallBatchData(
2318     grpc_call_element* elem, CallData* calld, int refcount,
2319     bool set_on_complete)
2320     : elem(elem), subchannel_call(calld->subchannel_call_) {
2321   SubchannelCallRetryState* retry_state =
2322       static_cast<SubchannelCallRetryState*>(
2323           calld->subchannel_call_->GetParentData());
2324   batch.payload = &retry_state->batch_payload;
2325   gpr_ref_init(&refs, refcount);
2326   if (set_on_complete) {
2327     GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
2328                       grpc_schedule_on_exec_ctx);
2329     batch.on_complete = &on_complete;
2330   }
2331   GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
2332 }
2333 
Destroy()2334 void CallData::SubchannelCallBatchData::Destroy() {
2335   SubchannelCallRetryState* retry_state =
2336       static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
2337   if (batch.send_initial_metadata) {
2338     grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
2339   }
2340   if (batch.send_trailing_metadata) {
2341     grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
2342   }
2343   if (batch.recv_initial_metadata) {
2344     grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
2345   }
2346   if (batch.recv_trailing_metadata) {
2347     grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
2348   }
2349   subchannel_call.reset();
2350   CallData* calld = static_cast<CallData*>(elem->call_data);
2351   GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
2352 }
2353 
2354 //
2355 // recv_initial_metadata callback handling
2356 //
2357 
InvokeRecvInitialMetadataCallback(void * arg,grpc_error * error)2358 void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
2359   SubchannelCallBatchData* batch_data =
2360       static_cast<SubchannelCallBatchData*>(arg);
2361   CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2362   // Find pending batch.
2363   PendingBatch* pending = calld->PendingBatchFind(
2364       batch_data->elem, "invoking recv_initial_metadata_ready for",
2365       [](grpc_transport_stream_op_batch* batch) {
2366         return batch->recv_initial_metadata &&
2367                batch->payload->recv_initial_metadata
2368                        .recv_initial_metadata_ready != nullptr;
2369       });
2370   GPR_ASSERT(pending != nullptr);
2371   // Return metadata.
2372   SubchannelCallRetryState* retry_state =
2373       static_cast<SubchannelCallRetryState*>(
2374           batch_data->subchannel_call->GetParentData());
2375   grpc_metadata_batch_move(
2376       &retry_state->recv_initial_metadata,
2377       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
2378   // Update bookkeeping.
2379   // Note: Need to do this before invoking the callback, since invoking
2380   // the callback will result in yielding the call combiner.
2381   grpc_closure* recv_initial_metadata_ready =
2382       pending->batch->payload->recv_initial_metadata
2383           .recv_initial_metadata_ready;
2384   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2385       nullptr;
2386   calld->MaybeClearPendingBatch(batch_data->elem, pending);
2387   batch_data->Unref();
2388   // Invoke callback.
2389   GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
2390 }
2391 
RecvInitialMetadataReady(void * arg,grpc_error * error)2392 void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
2393   SubchannelCallBatchData* batch_data =
2394       static_cast<SubchannelCallBatchData*>(arg);
2395   grpc_call_element* elem = batch_data->elem;
2396   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2397   CallData* calld = static_cast<CallData*>(elem->call_data);
2398   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2399     gpr_log(GPR_INFO,
2400             "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
2401             chand, calld, grpc_error_string(error));
2402   }
2403   SubchannelCallRetryState* retry_state =
2404       static_cast<SubchannelCallRetryState*>(
2405           batch_data->subchannel_call->GetParentData());
2406   retry_state->completed_recv_initial_metadata = true;
2407   // If a retry was already dispatched, then we're not going to use the
2408   // result of this recv_initial_metadata op, so do nothing.
2409   if (retry_state->retry_dispatched) {
2410     GRPC_CALL_COMBINER_STOP(
2411         calld->call_combiner_,
2412         "recv_initial_metadata_ready after retry dispatched");
2413     return;
2414   }
2415   // If we got an error or a Trailers-Only response and have not yet gotten
2416   // the recv_trailing_metadata_ready callback, then defer propagating this
2417   // callback back to the surface.  We can evaluate whether to retry when
2418   // recv_trailing_metadata comes back.
2419   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
2420                     error != GRPC_ERROR_NONE) &&
2421                    !retry_state->completed_recv_trailing_metadata)) {
2422     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2423       gpr_log(GPR_INFO,
2424               "chand=%p calld=%p: deferring recv_initial_metadata_ready "
2425               "(Trailers-Only)",
2426               chand, calld);
2427     }
2428     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
2429     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
2430     if (!retry_state->started_recv_trailing_metadata) {
2431       // recv_trailing_metadata not yet started by application; start it
2432       // ourselves to get status.
2433       calld->StartInternalRecvTrailingMetadata(elem);
2434     } else {
2435       GRPC_CALL_COMBINER_STOP(
2436           calld->call_combiner_,
2437           "recv_initial_metadata_ready trailers-only or error");
2438     }
2439     return;
2440   }
2441   // Received valid initial metadata, so commit the call.
2442   calld->RetryCommit(elem, retry_state);
2443   // Invoke the callback to return the result to the surface.
2444   // Manually invoking a callback function; it does not take ownership of error.
2445   calld->InvokeRecvInitialMetadataCallback(batch_data, error);
2446 }
2447 
2448 //
2449 // recv_message callback handling
2450 //
2451 
InvokeRecvMessageCallback(void * arg,grpc_error * error)2452 void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
2453   SubchannelCallBatchData* batch_data =
2454       static_cast<SubchannelCallBatchData*>(arg);
2455   CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
2456   // Find pending op.
2457   PendingBatch* pending = calld->PendingBatchFind(
2458       batch_data->elem, "invoking recv_message_ready for",
2459       [](grpc_transport_stream_op_batch* batch) {
2460         return batch->recv_message &&
2461                batch->payload->recv_message.recv_message_ready != nullptr;
2462       });
2463   GPR_ASSERT(pending != nullptr);
2464   // Return payload.
2465   SubchannelCallRetryState* retry_state =
2466       static_cast<SubchannelCallRetryState*>(
2467           batch_data->subchannel_call->GetParentData());
2468   *pending->batch->payload->recv_message.recv_message =
2469       std::move(retry_state->recv_message);
2470   // Update bookkeeping.
2471   // Note: Need to do this before invoking the callback, since invoking
2472   // the callback will result in yielding the call combiner.
2473   grpc_closure* recv_message_ready =
2474       pending->batch->payload->recv_message.recv_message_ready;
2475   pending->batch->payload->recv_message.recv_message_ready = nullptr;
2476   calld->MaybeClearPendingBatch(batch_data->elem, pending);
2477   batch_data->Unref();
2478   // Invoke callback.
2479   GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
2480 }
2481 
RecvMessageReady(void * arg,grpc_error * error)2482 void CallData::RecvMessageReady(void* arg, grpc_error* error) {
2483   SubchannelCallBatchData* batch_data =
2484       static_cast<SubchannelCallBatchData*>(arg);
2485   grpc_call_element* elem = batch_data->elem;
2486   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2487   CallData* calld = static_cast<CallData*>(elem->call_data);
2488   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2489     gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
2490             chand, calld, grpc_error_string(error));
2491   }
2492   SubchannelCallRetryState* retry_state =
2493       static_cast<SubchannelCallRetryState*>(
2494           batch_data->subchannel_call->GetParentData());
2495   ++retry_state->completed_recv_message_count;
2496   // If a retry was already dispatched, then we're not going to use the
2497   // result of this recv_message op, so do nothing.
2498   if (retry_state->retry_dispatched) {
2499     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2500                             "recv_message_ready after retry dispatched");
2501     return;
2502   }
2503   // If we got an error or the payload was nullptr and we have not yet gotten
2504   // the recv_trailing_metadata_ready callback, then defer propagating this
2505   // callback back to the surface.  We can evaluate whether to retry when
2506   // recv_trailing_metadata comes back.
2507   if (GPR_UNLIKELY(
2508           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
2509           !retry_state->completed_recv_trailing_metadata)) {
2510     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2511       gpr_log(GPR_INFO,
2512               "chand=%p calld=%p: deferring recv_message_ready (nullptr "
2513               "message and recv_trailing_metadata pending)",
2514               chand, calld);
2515     }
2516     retry_state->recv_message_ready_deferred_batch = batch_data;
2517     retry_state->recv_message_error = GRPC_ERROR_REF(error);
2518     if (!retry_state->started_recv_trailing_metadata) {
2519       // recv_trailing_metadata not yet started by application; start it
2520       // ourselves to get status.
2521       calld->StartInternalRecvTrailingMetadata(elem);
2522     } else {
2523       GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
2524     }
2525     return;
2526   }
2527   // Received a valid message, so commit the call.
2528   calld->RetryCommit(elem, retry_state);
2529   // Invoke the callback to return the result to the surface.
2530   // Manually invoking a callback function; it does not take ownership of error.
2531   calld->InvokeRecvMessageCallback(batch_data, error);
2532 }
2533 
2534 //
2535 // recv_trailing_metadata handling
2536 //
2537 
GetCallStatus(grpc_call_element * elem,grpc_metadata_batch * md_batch,grpc_error * error,grpc_status_code * status,grpc_mdelem ** server_pushback_md)2538 void CallData::GetCallStatus(grpc_call_element* elem,
2539                              grpc_metadata_batch* md_batch, grpc_error* error,
2540                              grpc_status_code* status,
2541                              grpc_mdelem** server_pushback_md) {
2542   if (error != GRPC_ERROR_NONE) {
2543     grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
2544   } else {
2545     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
2546     *status =
2547         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
2548     if (server_pushback_md != nullptr &&
2549         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
2550       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
2551     }
2552   }
2553   GRPC_ERROR_UNREF(error);
2554 }
2555 
AddClosureForRecvTrailingMetadataReady(grpc_call_element * elem,SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)2556 void CallData::AddClosureForRecvTrailingMetadataReady(
2557     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2558     grpc_error* error, CallCombinerClosureList* closures) {
2559   // Find pending batch.
2560   PendingBatch* pending = PendingBatchFind(
2561       elem, "invoking recv_trailing_metadata for",
2562       [](grpc_transport_stream_op_batch* batch) {
2563         return batch->recv_trailing_metadata &&
2564                batch->payload->recv_trailing_metadata
2565                        .recv_trailing_metadata_ready != nullptr;
2566       });
2567   // If we generated the recv_trailing_metadata op internally via
2568   // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
2569   if (pending == nullptr) {
2570     GRPC_ERROR_UNREF(error);
2571     return;
2572   }
2573   // Return metadata.
2574   SubchannelCallRetryState* retry_state =
2575       static_cast<SubchannelCallRetryState*>(
2576           batch_data->subchannel_call->GetParentData());
2577   grpc_metadata_batch_move(
2578       &retry_state->recv_trailing_metadata,
2579       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
2580   // Add closure.
2581   closures->Add(pending->batch->payload->recv_trailing_metadata
2582                     .recv_trailing_metadata_ready,
2583                 error, "recv_trailing_metadata_ready for pending batch");
2584   // Update bookkeeping.
2585   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2586       nullptr;
2587   MaybeClearPendingBatch(elem, pending);
2588 }
2589 
AddClosuresForDeferredRecvCallbacks(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)2590 void CallData::AddClosuresForDeferredRecvCallbacks(
2591     SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
2592     CallCombinerClosureList* closures) {
2593   if (batch_data->batch.recv_trailing_metadata) {
2594     // Add closure for deferred recv_initial_metadata_ready.
2595     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
2596                      nullptr)) {
2597       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2598                         InvokeRecvInitialMetadataCallback,
2599                         retry_state->recv_initial_metadata_ready_deferred_batch,
2600                         grpc_schedule_on_exec_ctx);
2601       closures->Add(&retry_state->recv_initial_metadata_ready,
2602                     retry_state->recv_initial_metadata_error,
2603                     "resuming recv_initial_metadata_ready");
2604       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
2605     }
2606     // Add closure for deferred recv_message_ready.
2607     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
2608                      nullptr)) {
2609       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
2610                         InvokeRecvMessageCallback,
2611                         retry_state->recv_message_ready_deferred_batch,
2612                         grpc_schedule_on_exec_ctx);
2613       closures->Add(&retry_state->recv_message_ready,
2614                     retry_state->recv_message_error,
2615                     "resuming recv_message_ready");
2616       retry_state->recv_message_ready_deferred_batch = nullptr;
2617     }
2618   }
2619 }
2620 
PendingBatchIsUnstarted(PendingBatch * pending,SubchannelCallRetryState * retry_state)2621 bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
2622                                        SubchannelCallRetryState* retry_state) {
2623   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
2624     return false;
2625   }
2626   if (pending->batch->send_initial_metadata &&
2627       !retry_state->started_send_initial_metadata) {
2628     return true;
2629   }
2630   if (pending->batch->send_message &&
2631       retry_state->started_send_message_count < send_messages_.size()) {
2632     return true;
2633   }
2634   if (pending->batch->send_trailing_metadata &&
2635       !retry_state->started_send_trailing_metadata) {
2636     return true;
2637   }
2638   return false;
2639 }
2640 
AddClosuresToFailUnstartedPendingBatches(grpc_call_element * elem,SubchannelCallRetryState * retry_state,grpc_error * error,CallCombinerClosureList * closures)2641 void CallData::AddClosuresToFailUnstartedPendingBatches(
2642     grpc_call_element* elem, SubchannelCallRetryState* retry_state,
2643     grpc_error* error, CallCombinerClosureList* closures) {
2644   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2645   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2646     PendingBatch* pending = &pending_batches_[i];
2647     if (PendingBatchIsUnstarted(pending, retry_state)) {
2648       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2649         gpr_log(GPR_INFO,
2650                 "chand=%p calld=%p: failing unstarted pending batch at index "
2651                 "%" PRIuPTR,
2652                 chand, this, i);
2653       }
2654       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
2655                     "failing on_complete for pending batch");
2656       pending->batch->on_complete = nullptr;
2657       MaybeClearPendingBatch(elem, pending);
2658     }
2659   }
2660   GRPC_ERROR_UNREF(error);
2661 }
2662 
RunClosuresForCompletedCall(SubchannelCallBatchData * batch_data,grpc_error * error)2663 void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
2664                                            grpc_error* error) {
2665   grpc_call_element* elem = batch_data->elem;
2666   SubchannelCallRetryState* retry_state =
2667       static_cast<SubchannelCallRetryState*>(
2668           batch_data->subchannel_call->GetParentData());
2669   // Construct list of closures to execute.
2670   CallCombinerClosureList closures;
2671   // First, add closure for recv_trailing_metadata_ready.
2672   AddClosureForRecvTrailingMetadataReady(elem, batch_data,
2673                                          GRPC_ERROR_REF(error), &closures);
2674   // If there are deferred recv_initial_metadata_ready or recv_message_ready
2675   // callbacks, add them to closures.
2676   AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
2677   // Add closures to fail any pending batches that have not yet been started.
2678   AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
2679                                            GRPC_ERROR_REF(error), &closures);
2680   // Don't need batch_data anymore.
2681   batch_data->Unref();
2682   // Schedule all of the closures identified above.
2683   // Note: This will release the call combiner.
2684   closures.RunClosures(call_combiner_);
2685   GRPC_ERROR_UNREF(error);
2686 }
2687 
RecvTrailingMetadataReady(void * arg,grpc_error * error)2688 void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
2689   SubchannelCallBatchData* batch_data =
2690       static_cast<SubchannelCallBatchData*>(arg);
2691   grpc_call_element* elem = batch_data->elem;
2692   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2693   CallData* calld = static_cast<CallData*>(elem->call_data);
2694   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2695     gpr_log(GPR_INFO,
2696             "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
2697             chand, calld, grpc_error_string(error));
2698   }
2699   SubchannelCallRetryState* retry_state =
2700       static_cast<SubchannelCallRetryState*>(
2701           batch_data->subchannel_call->GetParentData());
2702   retry_state->completed_recv_trailing_metadata = true;
2703   // Get the call's status and check for server pushback metadata.
2704   grpc_status_code status = GRPC_STATUS_OK;
2705   grpc_mdelem* server_pushback_md = nullptr;
2706   grpc_metadata_batch* md_batch =
2707       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
2708   calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
2709                        &server_pushback_md);
2710   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2711     gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
2712             calld, grpc_status_code_to_string(status));
2713   }
2714   // Check if we should retry.
2715   if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
2716     // Unref batch_data for deferred recv_initial_metadata_ready or
2717     // recv_message_ready callbacks, if any.
2718     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
2719       batch_data->Unref();
2720       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2721     }
2722     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
2723       batch_data->Unref();
2724       GRPC_ERROR_UNREF(retry_state->recv_message_error);
2725     }
2726     batch_data->Unref();
2727     return;
2728   }
2729   // Not retrying, so commit the call.
2730   calld->RetryCommit(elem, retry_state);
2731   // Run any necessary closures.
2732   calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
2733 }
2734 
2735 //
2736 // on_complete callback handling
2737 //
2738 
AddClosuresForCompletedPendingBatch(grpc_call_element * elem,SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,grpc_error * error,CallCombinerClosureList * closures)2739 void CallData::AddClosuresForCompletedPendingBatch(
2740     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2741     SubchannelCallRetryState* retry_state, grpc_error* error,
2742     CallCombinerClosureList* closures) {
2743   PendingBatch* pending = PendingBatchFind(
2744       elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
2745         // Match the pending batch with the same set of send ops as the
2746         // subchannel batch we've just completed.
2747         return batch->on_complete != nullptr &&
2748                batch_data->batch.send_initial_metadata ==
2749                    batch->send_initial_metadata &&
2750                batch_data->batch.send_message == batch->send_message &&
2751                batch_data->batch.send_trailing_metadata ==
2752                    batch->send_trailing_metadata;
2753       });
2754   // If batch_data is a replay batch, then there will be no pending
2755   // batch to complete.
2756   if (pending == nullptr) {
2757     GRPC_ERROR_UNREF(error);
2758     return;
2759   }
2760   // Add closure.
2761   closures->Add(pending->batch->on_complete, error,
2762                 "on_complete for pending batch");
2763   pending->batch->on_complete = nullptr;
2764   MaybeClearPendingBatch(elem, pending);
2765 }
2766 
AddClosuresForReplayOrPendingSendOps(grpc_call_element * elem,SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)2767 void CallData::AddClosuresForReplayOrPendingSendOps(
2768     grpc_call_element* elem, SubchannelCallBatchData* batch_data,
2769     SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
2770   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2771   bool have_pending_send_message_ops =
2772       retry_state->started_send_message_count < send_messages_.size();
2773   bool have_pending_send_trailing_metadata_op =
2774       seen_send_trailing_metadata_ &&
2775       !retry_state->started_send_trailing_metadata;
2776   if (!have_pending_send_message_ops &&
2777       !have_pending_send_trailing_metadata_op) {
2778     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2779       PendingBatch* pending = &pending_batches_[i];
2780       grpc_transport_stream_op_batch* batch = pending->batch;
2781       if (batch == nullptr || pending->send_ops_cached) continue;
2782       if (batch->send_message) have_pending_send_message_ops = true;
2783       if (batch->send_trailing_metadata) {
2784         have_pending_send_trailing_metadata_op = true;
2785       }
2786     }
2787   }
2788   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2789     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2790       gpr_log(GPR_INFO,
2791               "chand=%p calld=%p: starting next batch for pending send op(s)",
2792               chand, this);
2793     }
2794     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2795                       StartRetriableSubchannelBatches, elem,
2796                       grpc_schedule_on_exec_ctx);
2797     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2798                   "starting next batch for send_* op(s)");
2799   }
2800 }
2801 
OnComplete(void * arg,grpc_error * error)2802 void CallData::OnComplete(void* arg, grpc_error* error) {
2803   SubchannelCallBatchData* batch_data =
2804       static_cast<SubchannelCallBatchData*>(arg);
2805   grpc_call_element* elem = batch_data->elem;
2806   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2807   CallData* calld = static_cast<CallData*>(elem->call_data);
2808   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2809     char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
2810     gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
2811             chand, calld, grpc_error_string(error), batch_str);
2812     gpr_free(batch_str);
2813   }
2814   SubchannelCallRetryState* retry_state =
2815       static_cast<SubchannelCallRetryState*>(
2816           batch_data->subchannel_call->GetParentData());
2817   // Update bookkeeping in retry_state.
2818   if (batch_data->batch.send_initial_metadata) {
2819     retry_state->completed_send_initial_metadata = true;
2820   }
2821   if (batch_data->batch.send_message) {
2822     ++retry_state->completed_send_message_count;
2823   }
2824   if (batch_data->batch.send_trailing_metadata) {
2825     retry_state->completed_send_trailing_metadata = true;
2826   }
2827   // If the call is committed, free cached data for send ops that we've just
2828   // completed.
2829   if (calld->retry_committed_) {
2830     calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
2831   }
2832   // Construct list of closures to execute.
2833   CallCombinerClosureList closures;
2834   // If a retry was already dispatched, that means we saw
2835   // recv_trailing_metadata before this, so we do nothing here.
2836   // Otherwise, invoke the callback to return the result to the surface.
2837   if (!retry_state->retry_dispatched) {
2838     // Add closure for the completed pending batch, if any.
2839     calld->AddClosuresForCompletedPendingBatch(
2840         elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
2841     // If needed, add a callback to start any replay or pending send ops on
2842     // the subchannel call.
2843     if (!retry_state->completed_recv_trailing_metadata) {
2844       calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
2845                                                   &closures);
2846     }
2847   }
2848   // Track number of pending subchannel send batches and determine if this
2849   // was the last one.
2850   --calld->num_pending_retriable_subchannel_send_batches_;
2851   const bool last_send_batch_complete =
2852       calld->num_pending_retriable_subchannel_send_batches_ == 0;
2853   // Don't need batch_data anymore.
2854   batch_data->Unref();
2855   // Schedule all of the closures identified above.
2856   // Note: This yeilds the call combiner.
2857   closures.RunClosures(calld->call_combiner_);
2858   // If this was the last subchannel send batch, unref the call stack.
2859   if (last_send_batch_complete) {
2860     GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
2861   }
2862 }
2863 
2864 //
2865 // subchannel batch construction
2866 //
2867 
StartBatchInCallCombiner(void * arg,grpc_error * ignored)2868 void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
2869   grpc_transport_stream_op_batch* batch =
2870       static_cast<grpc_transport_stream_op_batch*>(arg);
2871   SubchannelCall* subchannel_call =
2872       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2873   // Note: This will release the call combiner.
2874   subchannel_call->StartTransportStreamOpBatch(batch);
2875 }
2876 
AddClosureForSubchannelBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch,CallCombinerClosureList * closures)2877 void CallData::AddClosureForSubchannelBatch(
2878     grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2879     CallCombinerClosureList* closures) {
2880   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2881   batch->handler_private.extra_arg = subchannel_call_.get();
2882   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
2883                     batch, grpc_schedule_on_exec_ctx);
2884   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2885     char* batch_str = grpc_transport_stream_op_batch_string(batch);
2886     gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2887             this, batch_str);
2888     gpr_free(batch_str);
2889   }
2890   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2891                 "start_subchannel_batch");
2892 }
2893 
AddRetriableSendInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2894 void CallData::AddRetriableSendInitialMetadataOp(
2895     SubchannelCallRetryState* retry_state,
2896     SubchannelCallBatchData* batch_data) {
2897   // Maps the number of retries to the corresponding metadata value slice.
2898   static const grpc_slice* retry_count_strings[] = {
2899       &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2900   // We need to make a copy of the metadata batch for each attempt, since
2901   // the filters in the subchannel stack may modify this batch, and we don't
2902   // want those modifications to be passed forward to subsequent attempts.
2903   //
2904   // If we've already completed one or more attempts, add the
2905   // grpc-retry-attempts header.
2906   retry_state->send_initial_metadata_storage =
2907       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2908           sizeof(grpc_linked_mdelem) *
2909           (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
2910   grpc_metadata_batch_copy(&send_initial_metadata_,
2911                            &retry_state->send_initial_metadata,
2912                            retry_state->send_initial_metadata_storage);
2913   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2914                        .grpc_previous_rpc_attempts != nullptr)) {
2915     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2916                                retry_state->send_initial_metadata.idx.named
2917                                    .grpc_previous_rpc_attempts);
2918   }
2919   if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
2920     grpc_mdelem retry_md = grpc_mdelem_create(
2921         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2922         *retry_count_strings[num_attempts_completed_ - 1], nullptr);
2923     grpc_error* error = grpc_metadata_batch_add_tail(
2924         &retry_state->send_initial_metadata,
2925         &retry_state
2926              ->send_initial_metadata_storage[send_initial_metadata_.list.count],
2927         retry_md);
2928     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2929       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2930               grpc_error_string(error));
2931       GPR_ASSERT(false);
2932     }
2933   }
2934   retry_state->started_send_initial_metadata = true;
2935   batch_data->batch.send_initial_metadata = true;
2936   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2937       &retry_state->send_initial_metadata;
2938   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2939       send_initial_metadata_flags_;
2940   batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
2941 }
2942 
AddRetriableSendMessageOp(grpc_call_element * elem,SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2943 void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
2944                                          SubchannelCallRetryState* retry_state,
2945                                          SubchannelCallBatchData* batch_data) {
2946   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2947   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2948     gpr_log(GPR_INFO,
2949             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2950             chand, this, retry_state->started_send_message_count);
2951   }
2952   ByteStreamCache* cache =
2953       send_messages_[retry_state->started_send_message_count];
2954   ++retry_state->started_send_message_count;
2955   retry_state->send_message.Init(cache);
2956   batch_data->batch.send_message = true;
2957   batch_data->batch.payload->send_message.send_message.reset(
2958       retry_state->send_message.get());
2959 }
2960 
AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2961 void CallData::AddRetriableSendTrailingMetadataOp(
2962     SubchannelCallRetryState* retry_state,
2963     SubchannelCallBatchData* batch_data) {
2964   // We need to make a copy of the metadata batch for each attempt, since
2965   // the filters in the subchannel stack may modify this batch, and we don't
2966   // want those modifications to be passed forward to subsequent attempts.
2967   retry_state->send_trailing_metadata_storage =
2968       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
2969           sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
2970   grpc_metadata_batch_copy(&send_trailing_metadata_,
2971                            &retry_state->send_trailing_metadata,
2972                            retry_state->send_trailing_metadata_storage);
2973   retry_state->started_send_trailing_metadata = true;
2974   batch_data->batch.send_trailing_metadata = true;
2975   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2976       &retry_state->send_trailing_metadata;
2977 }
2978 
AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2979 void CallData::AddRetriableRecvInitialMetadataOp(
2980     SubchannelCallRetryState* retry_state,
2981     SubchannelCallBatchData* batch_data) {
2982   retry_state->started_recv_initial_metadata = true;
2983   batch_data->batch.recv_initial_metadata = true;
2984   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2985   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2986       &retry_state->recv_initial_metadata;
2987   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2988       &retry_state->trailing_metadata_available;
2989   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2990                     RecvInitialMetadataReady, batch_data,
2991                     grpc_schedule_on_exec_ctx);
2992   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2993       &retry_state->recv_initial_metadata_ready;
2994 }
2995 
AddRetriableRecvMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)2996 void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
2997                                          SubchannelCallBatchData* batch_data) {
2998   ++retry_state->started_recv_message_count;
2999   batch_data->batch.recv_message = true;
3000   batch_data->batch.payload->recv_message.recv_message =
3001       &retry_state->recv_message;
3002   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
3003                     batch_data, grpc_schedule_on_exec_ctx);
3004   batch_data->batch.payload->recv_message.recv_message_ready =
3005       &retry_state->recv_message_ready;
3006 }
3007 
AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)3008 void CallData::AddRetriableRecvTrailingMetadataOp(
3009     SubchannelCallRetryState* retry_state,
3010     SubchannelCallBatchData* batch_data) {
3011   retry_state->started_recv_trailing_metadata = true;
3012   batch_data->batch.recv_trailing_metadata = true;
3013   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
3014   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
3015       &retry_state->recv_trailing_metadata;
3016   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
3017       &retry_state->collect_stats;
3018   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
3019                     RecvTrailingMetadataReady, batch_data,
3020                     grpc_schedule_on_exec_ctx);
3021   batch_data->batch.payload->recv_trailing_metadata
3022       .recv_trailing_metadata_ready =
3023       &retry_state->recv_trailing_metadata_ready;
3024   MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
3025       &batch_data->batch);
3026 }
3027 
StartInternalRecvTrailingMetadata(grpc_call_element * elem)3028 void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
3029   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3030   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3031     gpr_log(GPR_INFO,
3032             "chand=%p calld=%p: call failed but recv_trailing_metadata not "
3033             "started; starting it internally",
3034             chand, this);
3035   }
3036   SubchannelCallRetryState* retry_state =
3037       static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
3038   // Create batch_data with 2 refs, since this batch will be unreffed twice:
3039   // once for the recv_trailing_metadata_ready callback when the subchannel
3040   // batch returns, and again when we actually get a recv_trailing_metadata
3041   // op from the surface.
3042   SubchannelCallBatchData* batch_data =
3043       SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
3044   AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3045   retry_state->recv_trailing_metadata_internal_batch = batch_data;
3046   // Note: This will release the call combiner.
3047   subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
3048 }
3049 
3050 // If there are any cached send ops that need to be replayed on the
3051 // current subchannel call, creates and returns a new subchannel batch
3052 // to replay those ops.  Otherwise, returns nullptr.
3053 CallData::SubchannelCallBatchData*
MaybeCreateSubchannelBatchForReplay(grpc_call_element * elem,SubchannelCallRetryState * retry_state)3054 CallData::MaybeCreateSubchannelBatchForReplay(
3055     grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
3056   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3057   SubchannelCallBatchData* replay_batch_data = nullptr;
3058   // send_initial_metadata.
3059   if (seen_send_initial_metadata_ &&
3060       !retry_state->started_send_initial_metadata &&
3061       !pending_send_initial_metadata_) {
3062     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3063       gpr_log(GPR_INFO,
3064               "chand=%p calld=%p: replaying previously completed "
3065               "send_initial_metadata op",
3066               chand, this);
3067     }
3068     replay_batch_data =
3069         SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3070     AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
3071   }
3072   // send_message.
3073   // Note that we can only have one send_message op in flight at a time.
3074   if (retry_state->started_send_message_count < send_messages_.size() &&
3075       retry_state->started_send_message_count ==
3076           retry_state->completed_send_message_count &&
3077       !pending_send_message_) {
3078     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3079       gpr_log(GPR_INFO,
3080               "chand=%p calld=%p: replaying previously completed "
3081               "send_message op",
3082               chand, this);
3083     }
3084     if (replay_batch_data == nullptr) {
3085       replay_batch_data =
3086           SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3087     }
3088     AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
3089   }
3090   // send_trailing_metadata.
3091   // Note that we only add this op if we have no more send_message ops
3092   // to start, since we can't send down any more send_message ops after
3093   // send_trailing_metadata.
3094   if (seen_send_trailing_metadata_ &&
3095       retry_state->started_send_message_count == send_messages_.size() &&
3096       !retry_state->started_send_trailing_metadata &&
3097       !pending_send_trailing_metadata_) {
3098     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3099       gpr_log(GPR_INFO,
3100               "chand=%p calld=%p: replaying previously completed "
3101               "send_trailing_metadata op",
3102               chand, this);
3103     }
3104     if (replay_batch_data == nullptr) {
3105       replay_batch_data =
3106           SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
3107     }
3108     AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
3109   }
3110   return replay_batch_data;
3111 }
3112 
AddSubchannelBatchesForPendingBatches(grpc_call_element * elem,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)3113 void CallData::AddSubchannelBatchesForPendingBatches(
3114     grpc_call_element* elem, SubchannelCallRetryState* retry_state,
3115     CallCombinerClosureList* closures) {
3116   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3117     PendingBatch* pending = &pending_batches_[i];
3118     grpc_transport_stream_op_batch* batch = pending->batch;
3119     if (batch == nullptr) continue;
3120     // Skip any batch that either (a) has already been started on this
3121     // subchannel call or (b) we can't start yet because we're still
3122     // replaying send ops that need to be completed first.
3123     // TODO(roth): Note that if any one op in the batch can't be sent
3124     // yet due to ops that we're replaying, we don't start any of the ops
3125     // in the batch.  This is probably okay, but it could conceivably
3126     // lead to increased latency in some cases -- e.g., we could delay
3127     // starting a recv op due to it being in the same batch with a send
3128     // op.  If/when we revamp the callback protocol in
3129     // transport_stream_op_batch, we may be able to fix this.
3130     if (batch->send_initial_metadata &&
3131         retry_state->started_send_initial_metadata) {
3132       continue;
3133     }
3134     if (batch->send_message && retry_state->completed_send_message_count <
3135                                    retry_state->started_send_message_count) {
3136       continue;
3137     }
3138     // Note that we only start send_trailing_metadata if we have no more
3139     // send_message ops to start, since we can't send down any more
3140     // send_message ops after send_trailing_metadata.
3141     if (batch->send_trailing_metadata &&
3142         (retry_state->started_send_message_count + batch->send_message <
3143              send_messages_.size() ||
3144          retry_state->started_send_trailing_metadata)) {
3145       continue;
3146     }
3147     if (batch->recv_initial_metadata &&
3148         retry_state->started_recv_initial_metadata) {
3149       continue;
3150     }
3151     if (batch->recv_message && retry_state->completed_recv_message_count <
3152                                    retry_state->started_recv_message_count) {
3153       continue;
3154     }
3155     if (batch->recv_trailing_metadata &&
3156         retry_state->started_recv_trailing_metadata) {
3157       // If we previously completed a recv_trailing_metadata op
3158       // initiated by StartInternalRecvTrailingMetadata(), use the
3159       // result of that instead of trying to re-start this op.
3160       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
3161                         nullptr))) {
3162         // If the batch completed, then trigger the completion callback
3163         // directly, so that we return the previously returned results to
3164         // the application.  Otherwise, just unref the internally
3165         // started subchannel batch, since we'll propagate the
3166         // completion when it completes.
3167         if (retry_state->completed_recv_trailing_metadata) {
3168           // Batches containing recv_trailing_metadata always succeed.
3169           closures->Add(
3170               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
3171               "re-executing recv_trailing_metadata_ready to propagate "
3172               "internally triggered result");
3173         } else {
3174           retry_state->recv_trailing_metadata_internal_batch->Unref();
3175         }
3176         retry_state->recv_trailing_metadata_internal_batch = nullptr;
3177       }
3178       continue;
3179     }
3180     // If we're not retrying, just send the batch as-is.
3181     if (method_params_ == nullptr ||
3182         method_params_->retry_policy() == nullptr || retry_committed_) {
3183       // TODO(roth) : We should probably call
3184       // MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
3185       AddClosureForSubchannelBatch(elem, batch, closures);
3186       PendingBatchClear(pending);
3187       continue;
3188     }
3189     // Create batch with the right number of callbacks.
3190     const bool has_send_ops = batch->send_initial_metadata ||
3191                               batch->send_message ||
3192                               batch->send_trailing_metadata;
3193     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
3194                               batch->recv_message +
3195                               batch->recv_trailing_metadata;
3196     SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
3197         elem, num_callbacks, has_send_ops /* set_on_complete */);
3198     // Cache send ops if needed.
3199     MaybeCacheSendOpsForBatch(pending);
3200     // send_initial_metadata.
3201     if (batch->send_initial_metadata) {
3202       AddRetriableSendInitialMetadataOp(retry_state, batch_data);
3203     }
3204     // send_message.
3205     if (batch->send_message) {
3206       AddRetriableSendMessageOp(elem, retry_state, batch_data);
3207     }
3208     // send_trailing_metadata.
3209     if (batch->send_trailing_metadata) {
3210       AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
3211     }
3212     // recv_initial_metadata.
3213     if (batch->recv_initial_metadata) {
3214       // recv_flags is only used on the server side.
3215       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
3216       AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
3217     }
3218     // recv_message.
3219     if (batch->recv_message) {
3220       AddRetriableRecvMessageOp(retry_state, batch_data);
3221     }
3222     // recv_trailing_metadata.
3223     if (batch->recv_trailing_metadata) {
3224       AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
3225     }
3226     AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
3227     // Track number of pending subchannel send batches.
3228     // If this is the first one, take a ref to the call stack.
3229     if (batch->send_initial_metadata || batch->send_message ||
3230         batch->send_trailing_metadata) {
3231       if (num_pending_retriable_subchannel_send_batches_ == 0) {
3232         GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
3233       }
3234       ++num_pending_retriable_subchannel_send_batches_;
3235     }
3236   }
3237 }
3238 
StartRetriableSubchannelBatches(void * arg,grpc_error * ignored)3239 void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
3240   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3241   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3242   CallData* calld = static_cast<CallData*>(elem->call_data);
3243   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3244     gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
3245             chand, calld);
3246   }
3247   SubchannelCallRetryState* retry_state =
3248       static_cast<SubchannelCallRetryState*>(
3249           calld->subchannel_call_->GetParentData());
3250   // Construct list of closures to execute, one for each pending batch.
3251   CallCombinerClosureList closures;
3252   // Replay previously-returned send_* ops if needed.
3253   SubchannelCallBatchData* replay_batch_data =
3254       calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
3255   if (replay_batch_data != nullptr) {
3256     calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
3257                                         &closures);
3258     // Track number of pending subchannel send batches.
3259     // If this is the first one, take a ref to the call stack.
3260     if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
3261       GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
3262     }
3263     ++calld->num_pending_retriable_subchannel_send_batches_;
3264   }
3265   // Now add pending batches.
3266   calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
3267   // Start batches on subchannel call.
3268   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3269     gpr_log(GPR_INFO,
3270             "chand=%p calld=%p: starting %" PRIuPTR
3271             " retriable batches on subchannel_call=%p",
3272             chand, calld, closures.size(), calld->subchannel_call_.get());
3273   }
3274   // Note: This will yield the call combiner.
3275   closures.RunClosures(calld->call_combiner_);
3276 }
3277 
3278 //
3279 // LB pick
3280 //
3281 
CreateSubchannelCall(grpc_call_element * elem)3282 void CallData::CreateSubchannelCall(grpc_call_element* elem) {
3283   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3284   const size_t parent_data_size =
3285       enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
3286   SubchannelCall::Args call_args = {
3287       std::move(connected_subchannel_), pollent_, path_, call_start_time_,
3288       deadline_, arena_,
3289       // TODO(roth): When we implement hedging support, we will probably
3290       // need to use a separate call context for each subchannel call.
3291       call_context_, call_combiner_, parent_data_size};
3292   grpc_error* error = GRPC_ERROR_NONE;
3293   subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3294   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3295     gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
3296             chand, this, subchannel_call_.get(), grpc_error_string(error));
3297   }
3298   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3299     PendingBatchesFail(elem, error, YieldCallCombiner);
3300   } else {
3301     if (parent_data_size > 0) {
3302       new (subchannel_call_->GetParentData())
3303           SubchannelCallRetryState(call_context_);
3304     }
3305     PendingBatchesResume(elem);
3306   }
3307 }
3308 
PickDone(void * arg,grpc_error * error)3309 void CallData::PickDone(void* arg, grpc_error* error) {
3310   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3311   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3312   CallData* calld = static_cast<CallData*>(elem->call_data);
3313   if (error != GRPC_ERROR_NONE) {
3314     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3315       gpr_log(GPR_INFO,
3316               "chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
3317               calld, grpc_error_string(error));
3318     }
3319     calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3320     return;
3321   }
3322   calld->CreateSubchannelCall(elem);
3323 }
3324 
3325 // A class to handle the call combiner cancellation callback for a
3326 // queued pick.
3327 class CallData::QueuedPickCanceller {
3328  public:
QueuedPickCanceller(grpc_call_element * elem)3329   explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
3330     auto* calld = static_cast<CallData*>(elem->call_data);
3331     auto* chand = static_cast<ChannelData*>(elem->channel_data);
3332     GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
3333     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
3334                       grpc_combiner_scheduler(chand->data_plane_combiner()));
3335     calld->call_combiner_->SetNotifyOnCancel(&closure_);
3336   }
3337 
3338  private:
CancelLocked(void * arg,grpc_error * error)3339   static void CancelLocked(void* arg, grpc_error* error) {
3340     auto* self = static_cast<QueuedPickCanceller*>(arg);
3341     auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
3342     auto* calld = static_cast<CallData*>(self->elem_->call_data);
3343     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3344       gpr_log(GPR_INFO,
3345               "chand=%p calld=%p: cancelling queued pick: "
3346               "error=%s self=%p calld->pick_canceller=%p",
3347               chand, calld, grpc_error_string(error), self,
3348               calld->pick_canceller_);
3349     }
3350     if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
3351       // Remove pick from list of queued picks.
3352       calld->RemoveCallFromQueuedPicksLocked(self->elem_);
3353       // Fail pending batches on the call.
3354       calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
3355                                 YieldCallCombinerIfPendingBatchesFound);
3356     }
3357     GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
3358     Delete(self);
3359   }
3360 
3361   grpc_call_element* elem_;
3362   grpc_closure closure_;
3363 };
3364 
RemoveCallFromQueuedPicksLocked(grpc_call_element * elem)3365 void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
3366   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3367   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3368     gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
3369             chand, this);
3370   }
3371   chand->RemoveQueuedPick(&pick_, pollent_);
3372   pick_queued_ = false;
3373   // Lame the call combiner canceller.
3374   pick_canceller_ = nullptr;
3375 }
3376 
AddCallToQueuedPicksLocked(grpc_call_element * elem)3377 void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
3378   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3379   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3380     gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
3381             this);
3382   }
3383   pick_queued_ = true;
3384   pick_.elem = elem;
3385   chand->AddQueuedPick(&pick_, pollent_);
3386   // Register call combiner cancellation callback.
3387   pick_canceller_ = New<QueuedPickCanceller>(elem);
3388 }
3389 
ApplyServiceConfigToCallLocked(grpc_call_element * elem)3390 void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3391   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3392   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3393     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
3394             chand, this);
3395   }
3396   // Store a ref to the service_config in service_config_call_data_. Also, save
3397   // a pointer to this in the call_context so that all future filters can access
3398   // it.
3399   service_config_call_data_ =
3400       ServiceConfig::CallData(chand->service_config(), path_);
3401   if (service_config_call_data_.service_config() != nullptr) {
3402     call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value =
3403         &service_config_call_data_;
3404     method_params_ = static_cast<ClientChannelMethodParsedConfig*>(
3405         service_config_call_data_.GetMethodParsedConfig(
3406             internal::ClientChannelServiceConfigParser::ParserIndex()));
3407   }
3408   retry_throttle_data_ = chand->retry_throttle_data();
3409   if (method_params_ != nullptr) {
3410     // If the deadline from the service config is shorter than the one
3411     // from the client API, reset the deadline timer.
3412     if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
3413       const grpc_millis per_method_deadline =
3414           grpc_timespec_to_millis_round_up(call_start_time_) +
3415           method_params_->timeout();
3416       if (per_method_deadline < deadline_) {
3417         deadline_ = per_method_deadline;
3418         grpc_deadline_state_reset(elem, deadline_);
3419       }
3420     }
3421     // If the service config set wait_for_ready and the application
3422     // did not explicitly set it, use the value from the service config.
3423     uint32_t* send_initial_metadata_flags =
3424         &pending_batches_[0]
3425              .batch->payload->send_initial_metadata.send_initial_metadata_flags;
3426     if (method_params_->wait_for_ready().has_value() &&
3427         !(*send_initial_metadata_flags &
3428           GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3429       if (method_params_->wait_for_ready().value()) {
3430         *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3431       } else {
3432         *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3433       }
3434     }
3435   }
3436   // If no retry policy, disable retries.
3437   // TODO(roth): Remove this when adding support for transparent retries.
3438   if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
3439     enable_retries_ = false;
3440   }
3441 }
3442 
MaybeApplyServiceConfigToCallLocked(grpc_call_element * elem)3443 void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
3444   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3445   // Apply service config data to the call only once, and only if the
3446   // channel has the data available.
3447   if (GPR_LIKELY(chand->received_service_config_data() &&
3448                  !service_config_applied_)) {
3449     service_config_applied_ = true;
3450     ApplyServiceConfigToCallLocked(elem);
3451   }
3452 }
3453 
PickResultTypeName(LoadBalancingPolicy::PickResult::ResultType type)3454 const char* PickResultTypeName(
3455     LoadBalancingPolicy::PickResult::ResultType type) {
3456   switch (type) {
3457     case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
3458       return "COMPLETE";
3459     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3460       return "QUEUE";
3461     case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE:
3462       return "TRANSIENT_FAILURE";
3463   }
3464   GPR_UNREACHABLE_CODE(return "UNKNOWN");
3465 }
3466 
StartPickLocked(void * arg,grpc_error * error)3467 void CallData::StartPickLocked(void* arg, grpc_error* error) {
3468   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3469   CallData* calld = static_cast<CallData*>(elem->call_data);
3470   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3471   GPR_ASSERT(calld->connected_subchannel_ == nullptr);
3472   GPR_ASSERT(calld->subchannel_call_ == nullptr);
3473   // picker's being null means the channel is currently in IDLE state. The
3474   // incoming call will make the channel exit IDLE and queue itself.
3475   if (chand->picker() == nullptr) {
3476     // We are currently in the data plane.
3477     // Bounce into the control plane to exit IDLE.
3478     chand->CheckConnectivityState(true);
3479     calld->AddCallToQueuedPicksLocked(elem);
3480     return;
3481   }
3482   // Apply service config to call if needed.
3483   calld->MaybeApplyServiceConfigToCallLocked(elem);
3484   // If this is a retry, use the send_initial_metadata payload that
3485   // we've cached; otherwise, use the pending batch.  The
3486   // send_initial_metadata batch will be the first pending batch in the
3487   // list, as set by GetBatchIndex() above.
3488   // TODO(roth): What if the LB policy needs to add something to the
3489   // call's initial metadata, and then there's a retry?  We don't want
3490   // the new metadata to be added twice.  We might need to somehow
3491   // allocate the subchannel batch earlier so that we can give the
3492   // subchannel's copy of the metadata batch (which is copied for each
3493   // attempt) to the LB policy instead the one from the parent channel.
3494   LoadBalancingPolicy::PickArgs pick_args;
3495   pick_args.call_state = &calld->lb_call_state_;
3496   pick_args.initial_metadata =
3497       calld->seen_send_initial_metadata_
3498           ? &calld->send_initial_metadata_
3499           : calld->pending_batches_[0]
3500                 .batch->payload->send_initial_metadata.send_initial_metadata;
3501   // Grab initial metadata flags so that we can check later if the call has
3502   // wait_for_ready enabled.
3503   const uint32_t send_initial_metadata_flags =
3504       calld->seen_send_initial_metadata_
3505           ? calld->send_initial_metadata_flags_
3506           : calld->pending_batches_[0]
3507                 .batch->payload->send_initial_metadata
3508                 .send_initial_metadata_flags;
3509   // When done, we schedule this closure to leave the data plane combiner.
3510   GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem,
3511                     grpc_schedule_on_exec_ctx);
3512   // Attempt pick.
3513   auto result = chand->picker()->Pick(pick_args);
3514   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3515     gpr_log(GPR_INFO,
3516             "chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
3517             "error=%s)",
3518             chand, calld, PickResultTypeName(result.type),
3519             result.connected_subchannel.get(), grpc_error_string(result.error));
3520   }
3521   switch (result.type) {
3522     case LoadBalancingPolicy::PickResult::PICK_TRANSIENT_FAILURE: {
3523       // If we're shutting down, fail all RPCs.
3524       grpc_error* disconnect_error = chand->disconnect_error();
3525       if (disconnect_error != GRPC_ERROR_NONE) {
3526         GRPC_ERROR_UNREF(result.error);
3527         GRPC_CLOSURE_SCHED(&calld->pick_closure_,
3528                            GRPC_ERROR_REF(disconnect_error));
3529         break;
3530       }
3531       // If wait_for_ready is false, then the error indicates the RPC
3532       // attempt's final status.
3533       if ((send_initial_metadata_flags &
3534            GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3535         // Retry if appropriate; otherwise, fail.
3536         grpc_status_code status = GRPC_STATUS_OK;
3537         grpc_error_get_status(result.error, calld->deadline_, &status, nullptr,
3538                               nullptr, nullptr);
3539         if (!calld->enable_retries_ ||
3540             !calld->MaybeRetry(elem, nullptr /* batch_data */, status,
3541                                nullptr /* server_pushback_md */)) {
3542           grpc_error* new_error =
3543               GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3544                   "Failed to pick subchannel", &result.error, 1);
3545           GRPC_ERROR_UNREF(result.error);
3546           GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error);
3547         }
3548         if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3549         break;
3550       }
3551       // If wait_for_ready is true, then queue to retry when we get a new
3552       // picker.
3553       GRPC_ERROR_UNREF(result.error);
3554     }
3555     // Fallthrough
3556     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3557       if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem);
3558       break;
3559     default:  // PICK_COMPLETE
3560       // Handle drops.
3561       if (GPR_UNLIKELY(result.connected_subchannel == nullptr)) {
3562         result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3563             "Call dropped by load balancing policy");
3564       }
3565       calld->connected_subchannel_ = std::move(result.connected_subchannel);
3566       calld->lb_recv_trailing_metadata_ready_ =
3567           result.recv_trailing_metadata_ready;
3568       calld->lb_recv_trailing_metadata_ready_user_data_ =
3569           result.recv_trailing_metadata_ready_user_data;
3570       GRPC_CLOSURE_SCHED(&calld->pick_closure_, result.error);
3571       if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
3572   }
3573 }
3574 
3575 }  // namespace
3576 }  // namespace grpc_core
3577 
3578 /*************************************************************************
3579  * EXPORTED SYMBOLS
3580  */
3581 
3582 using grpc_core::CallData;
3583 using grpc_core::ChannelData;
3584 
3585 const grpc_channel_filter grpc_client_channel_filter = {
3586     CallData::StartTransportStreamOpBatch,
3587     ChannelData::StartTransportOp,
3588     sizeof(CallData),
3589     CallData::Init,
3590     CallData::SetPollent,
3591     CallData::Destroy,
3592     sizeof(ChannelData),
3593     ChannelData::Init,
3594     ChannelData::Destroy,
3595     ChannelData::GetChannelInfo,
3596     "client-channel",
3597 };
3598 
grpc_client_channel_check_connectivity_state(grpc_channel_element * elem,int try_to_connect)3599 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3600     grpc_channel_element* elem, int try_to_connect) {
3601   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3602   return chand->CheckConnectivityState(try_to_connect);
3603 }
3604 
grpc_client_channel_num_external_connectivity_watchers(grpc_channel_element * elem)3605 int grpc_client_channel_num_external_connectivity_watchers(
3606     grpc_channel_element* elem) {
3607   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3608   return chand->NumExternalConnectivityWatchers();
3609 }
3610 
grpc_client_channel_watch_connectivity_state(grpc_channel_element * elem,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * closure,grpc_closure * watcher_timer_init)3611 void grpc_client_channel_watch_connectivity_state(
3612     grpc_channel_element* elem, grpc_polling_entity pollent,
3613     grpc_connectivity_state* state, grpc_closure* closure,
3614     grpc_closure* watcher_timer_init) {
3615   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3616   return chand->AddExternalConnectivityWatcher(pollent, state, closure,
3617                                                watcher_timer_init);
3618 }
3619 
3620 grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element * elem)3621 grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
3622   auto* calld = static_cast<CallData*>(elem->call_data);
3623   return calld->subchannel_call();
3624 }
3625