1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33 
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
37 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
38 #include "src/core/ext/filters/client_channel/resolver_registry.h"
39 #include "src/core/ext/filters/client_channel/retry_throttle.h"
40 #include "src/core/ext/filters/client_channel/server_address.h"
41 #include "src/core/ext/filters/client_channel/service_config.h"
42 #include "src/core/ext/filters/client_channel/subchannel.h"
43 #include "src/core/ext/filters/deadline/deadline_filter.h"
44 #include "src/core/lib/backoff/backoff.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/connected_channel.h"
47 #include "src/core/lib/channel/status_util.h"
48 #include "src/core/lib/gpr/string.h"
49 #include "src/core/lib/gprpp/inlined_vector.h"
50 #include "src/core/lib/gprpp/manual_constructor.h"
51 #include "src/core/lib/gprpp/sync.h"
52 #include "src/core/lib/iomgr/combiner.h"
53 #include "src/core/lib/iomgr/iomgr.h"
54 #include "src/core/lib/iomgr/polling_entity.h"
55 #include "src/core/lib/profiling/timers.h"
56 #include "src/core/lib/slice/slice_internal.h"
57 #include "src/core/lib/slice/slice_string_helpers.h"
58 #include "src/core/lib/surface/channel.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/static_metadata.h"
64 #include "src/core/lib/transport/status_metadata.h"
65 
66 namespace grpc_core {
67 
68 //
69 // ResolvingLoadBalancingPolicy::ResolverResultHandler
70 //
71 
72 class ResolvingLoadBalancingPolicy::ResolverResultHandler
73     : public Resolver::ResultHandler {
74  public:
ResolverResultHandler(RefCountedPtr<ResolvingLoadBalancingPolicy> parent)75   explicit ResolverResultHandler(
76       RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
77       : parent_(std::move(parent)) {}
78 
~ResolverResultHandler()79   ~ResolverResultHandler() {
80     if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
81       gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
82               parent_.get());
83     }
84   }
85 
ReturnResult(Resolver::Result result)86   void ReturnResult(Resolver::Result result) override {
87     parent_->OnResolverResultChangedLocked(std::move(result));
88   }
89 
ReturnError(grpc_error * error)90   void ReturnError(grpc_error* error) override {
91     parent_->OnResolverError(error);
92   }
93 
94  private:
95   RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
96 };
97 
98 //
99 // ResolvingLoadBalancingPolicy::ResolvingControlHelper
100 //
101 
102 class ResolvingLoadBalancingPolicy::ResolvingControlHelper
103     : public LoadBalancingPolicy::ChannelControlHelper {
104  public:
ResolvingControlHelper(RefCountedPtr<ResolvingLoadBalancingPolicy> parent)105   explicit ResolvingControlHelper(
106       RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
107       : parent_(std::move(parent)) {}
108 
CreateSubchannel(const grpc_channel_args & args)109   RefCountedPtr<SubchannelInterface> CreateSubchannel(
110       const grpc_channel_args& args) override {
111     if (parent_->resolver_ == nullptr) return nullptr;  // Shutting down.
112     if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
113     return parent_->channel_control_helper()->CreateSubchannel(args);
114   }
115 
CreateChannel(const char * target,const grpc_channel_args & args)116   grpc_channel* CreateChannel(const char* target,
117                               const grpc_channel_args& args) override {
118     if (parent_->resolver_ == nullptr) return nullptr;  // Shutting down.
119     if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
120     return parent_->channel_control_helper()->CreateChannel(target, args);
121   }
122 
UpdateState(grpc_connectivity_state state,UniquePtr<SubchannelPicker> picker)123   void UpdateState(grpc_connectivity_state state,
124                    UniquePtr<SubchannelPicker> picker) override {
125     if (parent_->resolver_ == nullptr) return;  // Shutting down.
126     // If this request is from the pending child policy, ignore it until
127     // it reports READY, at which point we swap it into place.
128     if (CalledByPendingChild()) {
129       if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
130         gpr_log(GPR_INFO,
131                 "resolving_lb=%p helper=%p: pending child policy %p reports "
132                 "state=%s",
133                 parent_.get(), this, child_,
134                 grpc_connectivity_state_name(state));
135       }
136       if (state != GRPC_CHANNEL_READY) return;
137       grpc_pollset_set_del_pollset_set(
138           parent_->lb_policy_->interested_parties(),
139           parent_->interested_parties());
140       parent_->lb_policy_ = std::move(parent_->pending_lb_policy_);
141     } else if (!CalledByCurrentChild()) {
142       // This request is from an outdated child, so ignore it.
143       return;
144     }
145     parent_->channel_control_helper()->UpdateState(state, std::move(picker));
146   }
147 
RequestReresolution()148   void RequestReresolution() override {
149     // If there is a pending child policy, ignore re-resolution requests
150     // from the current child policy (or any outdated child).
151     if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) {
152       return;
153     }
154     if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
155       gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
156               parent_.get());
157     }
158     if (parent_->resolver_ != nullptr) {
159       parent_->resolver_->RequestReresolutionLocked();
160     }
161   }
162 
set_child(LoadBalancingPolicy * child)163   void set_child(LoadBalancingPolicy* child) { child_ = child; }
164 
165  private:
CalledByPendingChild() const166   bool CalledByPendingChild() const {
167     GPR_ASSERT(child_ != nullptr);
168     return child_ == parent_->pending_lb_policy_.get();
169   }
170 
CalledByCurrentChild() const171   bool CalledByCurrentChild() const {
172     GPR_ASSERT(child_ != nullptr);
173     return child_ == parent_->lb_policy_.get();
174   };
175 
176   RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
177   LoadBalancingPolicy* child_ = nullptr;
178 };
179 
180 //
181 // ResolvingLoadBalancingPolicy
182 //
183 
ResolvingLoadBalancingPolicy(Args args,TraceFlag * tracer,UniquePtr<char> target_uri,ProcessResolverResultCallback process_resolver_result,void * process_resolver_result_user_data)184 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
185     Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
186     ProcessResolverResultCallback process_resolver_result,
187     void* process_resolver_result_user_data)
188     : LoadBalancingPolicy(std::move(args)),
189       tracer_(tracer),
190       target_uri_(std::move(target_uri)),
191       process_resolver_result_(process_resolver_result),
192       process_resolver_result_user_data_(process_resolver_result_user_data) {
193   GPR_ASSERT(process_resolver_result != nullptr);
194   resolver_ = ResolverRegistry::CreateResolver(
195       target_uri_.get(), args.args, interested_parties(), combiner(),
196       UniquePtr<Resolver::ResultHandler>(New<ResolverResultHandler>(Ref())));
197   // Since the validity of args has been checked when create the channel,
198   // CreateResolver() must return a non-null result.
199   GPR_ASSERT(resolver_ != nullptr);
200   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
201     gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
202   }
203   channel_control_helper()->UpdateState(
204       GRPC_CHANNEL_CONNECTING,
205       UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
206   resolver_->StartLocked();
207 }
208 
~ResolvingLoadBalancingPolicy()209 ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
210   GPR_ASSERT(resolver_ == nullptr);
211   GPR_ASSERT(lb_policy_ == nullptr);
212 }
213 
ShutdownLocked()214 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
215   if (resolver_ != nullptr) {
216     resolver_.reset();
217     if (lb_policy_ != nullptr) {
218       if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
219         gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
220                 lb_policy_.get());
221       }
222       grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
223                                        interested_parties());
224       lb_policy_.reset();
225     }
226     if (pending_lb_policy_ != nullptr) {
227       if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
228         gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p",
229                 this, pending_lb_policy_.get());
230       }
231       grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(),
232                                        interested_parties());
233       pending_lb_policy_.reset();
234     }
235   }
236 }
237 
ExitIdleLocked()238 void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
239   if (lb_policy_ != nullptr) {
240     lb_policy_->ExitIdleLocked();
241     if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked();
242   }
243 }
244 
ResetBackoffLocked()245 void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
246   if (resolver_ != nullptr) {
247     resolver_->ResetBackoffLocked();
248     resolver_->RequestReresolutionLocked();
249   }
250   if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
251   if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked();
252 }
253 
OnResolverError(grpc_error * error)254 void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
255   if (resolver_ == nullptr) {
256     GRPC_ERROR_UNREF(error);
257     return;
258   }
259   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
260     gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
261             grpc_error_string(error));
262   }
263   // If we already have an LB policy from a previous resolution
264   // result, then we continue to let it set the connectivity state.
265   // Otherwise, we go into TRANSIENT_FAILURE.
266   if (lb_policy_ == nullptr) {
267     grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
268         "Resolver transient failure", &error, 1);
269     channel_control_helper()->UpdateState(
270         GRPC_CHANNEL_TRANSIENT_FAILURE,
271         UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(state_error)));
272   }
273   GRPC_ERROR_UNREF(error);
274 }
275 
CreateOrUpdateLbPolicyLocked(const char * lb_policy_name,RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,Resolver::Result result,TraceStringVector * trace_strings)276 void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
277     const char* lb_policy_name,
278     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
279     Resolver::Result result, TraceStringVector* trace_strings) {
280   // If the child policy name changes, we need to create a new child
281   // policy.  When this happens, we leave child_policy_ as-is and store
282   // the new child policy in pending_child_policy_.  Once the new child
283   // policy transitions into state READY, we swap it into child_policy_,
284   // replacing the original child policy.  So pending_child_policy_ is
285   // non-null only between when we apply an update that changes the child
286   // policy name and when the new child reports state READY.
287   //
288   // Updates can arrive at any point during this transition.  We always
289   // apply updates relative to the most recently created child policy,
290   // even if the most recent one is still in pending_child_policy_.  This
291   // is true both when applying the updates to an existing child policy
292   // and when determining whether we need to create a new policy.
293   //
294   // As a result of this, there are several cases to consider here:
295   //
296   // 1. We have no existing child policy (i.e., we have started up but
297   //    have not yet received a serverlist from the balancer or gone
298   //    into fallback mode; in this case, both child_policy_ and
299   //    pending_child_policy_ are null).  In this case, we create a
300   //    new child policy and store it in child_policy_.
301   //
302   // 2. We have an existing child policy and have no pending child policy
303   //    from a previous update (i.e., either there has not been a
304   //    previous update that changed the policy name, or we have already
305   //    finished swapping in the new policy; in this case, child_policy_
306   //    is non-null but pending_child_policy_ is null).  In this case:
307   //    a. If child_policy_->name() equals child_policy_name, then we
308   //       update the existing child policy.
309   //    b. If child_policy_->name() does not equal child_policy_name,
310   //       we create a new policy.  The policy will be stored in
311   //       pending_child_policy_ and will later be swapped into
312   //       child_policy_ by the helper when the new child transitions
313   //       into state READY.
314   //
315   // 3. We have an existing child policy and have a pending child policy
316   //    from a previous update (i.e., a previous update set
317   //    pending_child_policy_ as per case 2b above and that policy has
318   //    not yet transitioned into state READY and been swapped into
319   //    child_policy_; in this case, both child_policy_ and
320   //    pending_child_policy_ are non-null).  In this case:
321   //    a. If pending_child_policy_->name() equals child_policy_name,
322   //       then we update the existing pending child policy.
323   //    b. If pending_child_policy->name() does not equal
324   //       child_policy_name, then we create a new policy.  The new
325   //       policy is stored in pending_child_policy_ (replacing the one
326   //       that was there before, which will be immediately shut down)
327   //       and will later be swapped into child_policy_ by the helper
328   //       when the new child transitions into state READY.
329   const bool create_policy =
330       // case 1
331       lb_policy_ == nullptr ||
332       // case 2b
333       (pending_lb_policy_ == nullptr &&
334        strcmp(lb_policy_->name(), lb_policy_name) != 0) ||
335       // case 3b
336       (pending_lb_policy_ != nullptr &&
337        strcmp(pending_lb_policy_->name(), lb_policy_name) != 0);
338   LoadBalancingPolicy* policy_to_update = nullptr;
339   if (create_policy) {
340     // Cases 1, 2b, and 3b: create a new child policy.
341     // If lb_policy_ is null, we set it (case 1), else we set
342     // pending_lb_policy_ (cases 2b and 3b).
343     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
344       gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this,
345               lb_policy_ == nullptr ? "" : "pending ", lb_policy_name);
346     }
347     auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_;
348     lb_policy =
349         CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings);
350     policy_to_update = lb_policy.get();
351   } else {
352     // Cases 2a and 3a: update an existing policy.
353     // If we have a pending child policy, send the update to the pending
354     // policy (case 3a), else send it to the current policy (case 2a).
355     policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get()
356                                                      : lb_policy_.get();
357   }
358   GPR_ASSERT(policy_to_update != nullptr);
359   // Update the policy.
360   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
361     gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this,
362             policy_to_update == pending_lb_policy_.get() ? "pending " : "",
363             policy_to_update);
364   }
365   UpdateArgs update_args;
366   update_args.addresses = std::move(result.addresses);
367   update_args.config = std::move(lb_policy_config);
368   // TODO(roth): Once channel args is converted to C++, use std::move() here.
369   update_args.args = result.args;
370   result.args = nullptr;
371   policy_to_update->UpdateLocked(std::move(update_args));
372 }
373 
374 // Creates a new LB policy.
375 // Updates trace_strings to indicate what was done.
376 OrphanablePtr<LoadBalancingPolicy>
CreateLbPolicyLocked(const char * lb_policy_name,const grpc_channel_args & args,TraceStringVector * trace_strings)377 ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
378     const char* lb_policy_name, const grpc_channel_args& args,
379     TraceStringVector* trace_strings) {
380   ResolvingControlHelper* helper = New<ResolvingControlHelper>(Ref());
381   LoadBalancingPolicy::Args lb_policy_args;
382   lb_policy_args.combiner = combiner();
383   lb_policy_args.channel_control_helper =
384       UniquePtr<ChannelControlHelper>(helper);
385   lb_policy_args.args = &args;
386   OrphanablePtr<LoadBalancingPolicy> lb_policy =
387       LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
388           lb_policy_name, std::move(lb_policy_args));
389   if (GPR_UNLIKELY(lb_policy == nullptr)) {
390     gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
391     char* str;
392     gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name);
393     trace_strings->push_back(str);
394     return nullptr;
395   }
396   helper->set_child(lb_policy.get());
397   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
398     gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)",
399             this, lb_policy_name, lb_policy.get());
400   }
401   char* str;
402   gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
403   trace_strings->push_back(str);
404   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
405                                    interested_parties());
406   return lb_policy;
407 }
408 
MaybeAddTraceMessagesForAddressChangesLocked(bool resolution_contains_addresses,TraceStringVector * trace_strings)409 void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
410     bool resolution_contains_addresses, TraceStringVector* trace_strings) {
411   if (!resolution_contains_addresses &&
412       previous_resolution_contained_addresses_) {
413     trace_strings->push_back(gpr_strdup("Address list became empty"));
414   } else if (resolution_contains_addresses &&
415              !previous_resolution_contained_addresses_) {
416     trace_strings->push_back(gpr_strdup("Address list became non-empty"));
417   }
418   previous_resolution_contained_addresses_ = resolution_contains_addresses;
419 }
420 
ConcatenateAndAddChannelTraceLocked(TraceStringVector * trace_strings) const421 void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
422     TraceStringVector* trace_strings) const {
423   if (!trace_strings->empty()) {
424     gpr_strvec v;
425     gpr_strvec_init(&v);
426     gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
427     bool is_first = 1;
428     for (size_t i = 0; i < trace_strings->size(); ++i) {
429       if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
430       is_first = false;
431       gpr_strvec_add(&v, (*trace_strings)[i]);
432     }
433     size_t len = 0;
434     UniquePtr<char> message(gpr_strvec_flatten(&v, &len));
435     channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
436                                             message.get());
437     gpr_strvec_destroy(&v);
438   }
439 }
440 
OnResolverResultChangedLocked(Resolver::Result result)441 void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
442     Resolver::Result result) {
443   // Handle race conditions.
444   if (resolver_ == nullptr) return;
445   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
446     gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
447   }
448   // We only want to trace the address resolution in the follow cases:
449   // (a) Address resolution resulted in service config change.
450   // (b) Address resolution that causes number of backends to go from
451   //     zero to non-zero.
452   // (c) Address resolution that causes number of backends to go from
453   //     non-zero to zero.
454   // (d) Address resolution that causes a new LB policy to be created.
455   //
456   // We track a list of strings to eventually be concatenated and traced.
457   TraceStringVector trace_strings;
458   const bool resolution_contains_addresses = result.addresses.size() > 0;
459   // Process the resolver result.
460   const char* lb_policy_name = nullptr;
461   RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
462   bool service_config_changed = false;
463   char* service_config_error_string = nullptr;
464   if (process_resolver_result_ != nullptr) {
465     grpc_error* service_config_error = GRPC_ERROR_NONE;
466     service_config_changed = process_resolver_result_(
467         process_resolver_result_user_data_, result, &lb_policy_name,
468         &lb_policy_config, &service_config_error);
469     if (service_config_error != GRPC_ERROR_NONE) {
470       service_config_error_string =
471           gpr_strdup(grpc_error_string(service_config_error));
472       if (lb_policy_name == nullptr) {
473         // Use an empty lb_policy_name as an indicator that we received an
474         // invalid service config and we don't have a fallback service config.
475         OnResolverError(service_config_error);
476       } else {
477         GRPC_ERROR_UNREF(service_config_error);
478       }
479     }
480   } else {
481     lb_policy_name = child_policy_name_.get();
482     lb_policy_config = child_lb_config_;
483   }
484   if (lb_policy_name != nullptr) {
485     // Create or update LB policy, as needed.
486     CreateOrUpdateLbPolicyLocked(lb_policy_name, lb_policy_config,
487                                  std::move(result), &trace_strings);
488   }
489   // Add channel trace event.
490   if (service_config_changed) {
491     // TODO(ncteisen): might be worth somehow including a snippet of the
492     // config in the trace, at the risk of bloating the trace logs.
493     trace_strings.push_back(gpr_strdup("Service config changed"));
494   }
495   if (service_config_error_string != nullptr) {
496     trace_strings.push_back(service_config_error_string);
497     service_config_error_string = nullptr;
498   }
499   MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
500                                                &trace_strings);
501   ConcatenateAndAddChannelTraceLocked(&trace_strings);
502 }
503 
504 }  // namespace grpc_core
505