1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
37 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
38 #include "src/core/ext/filters/client_channel/resolver_registry.h"
39 #include "src/core/ext/filters/client_channel/retry_throttle.h"
40 #include "src/core/ext/filters/client_channel/server_address.h"
41 #include "src/core/ext/filters/client_channel/service_config.h"
42 #include "src/core/ext/filters/client_channel/subchannel.h"
43 #include "src/core/ext/filters/deadline/deadline_filter.h"
44 #include "src/core/lib/backoff/backoff.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/connected_channel.h"
47 #include "src/core/lib/channel/status_util.h"
48 #include "src/core/lib/gpr/string.h"
49 #include "src/core/lib/gprpp/inlined_vector.h"
50 #include "src/core/lib/gprpp/manual_constructor.h"
51 #include "src/core/lib/gprpp/sync.h"
52 #include "src/core/lib/iomgr/combiner.h"
53 #include "src/core/lib/iomgr/iomgr.h"
54 #include "src/core/lib/iomgr/polling_entity.h"
55 #include "src/core/lib/profiling/timers.h"
56 #include "src/core/lib/slice/slice_internal.h"
57 #include "src/core/lib/slice/slice_string_helpers.h"
58 #include "src/core/lib/surface/channel.h"
59 #include "src/core/lib/transport/connectivity_state.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/static_metadata.h"
64 #include "src/core/lib/transport/status_metadata.h"
65
66 namespace grpc_core {
67
68 //
69 // ResolvingLoadBalancingPolicy::ResolverResultHandler
70 //
71
72 class ResolvingLoadBalancingPolicy::ResolverResultHandler
73 : public Resolver::ResultHandler {
74 public:
ResolverResultHandler(RefCountedPtr<ResolvingLoadBalancingPolicy> parent)75 explicit ResolverResultHandler(
76 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
77 : parent_(std::move(parent)) {}
78
~ResolverResultHandler()79 ~ResolverResultHandler() {
80 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
81 gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
82 parent_.get());
83 }
84 }
85
ReturnResult(Resolver::Result result)86 void ReturnResult(Resolver::Result result) override {
87 parent_->OnResolverResultChangedLocked(std::move(result));
88 }
89
ReturnError(grpc_error * error)90 void ReturnError(grpc_error* error) override {
91 parent_->OnResolverError(error);
92 }
93
94 private:
95 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
96 };
97
98 //
99 // ResolvingLoadBalancingPolicy::ResolvingControlHelper
100 //
101
102 class ResolvingLoadBalancingPolicy::ResolvingControlHelper
103 : public LoadBalancingPolicy::ChannelControlHelper {
104 public:
ResolvingControlHelper(RefCountedPtr<ResolvingLoadBalancingPolicy> parent)105 explicit ResolvingControlHelper(
106 RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
107 : parent_(std::move(parent)) {}
108
CreateSubchannel(const grpc_channel_args & args)109 RefCountedPtr<SubchannelInterface> CreateSubchannel(
110 const grpc_channel_args& args) override {
111 if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
112 if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
113 return parent_->channel_control_helper()->CreateSubchannel(args);
114 }
115
CreateChannel(const char * target,const grpc_channel_args & args)116 grpc_channel* CreateChannel(const char* target,
117 const grpc_channel_args& args) override {
118 if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
119 if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
120 return parent_->channel_control_helper()->CreateChannel(target, args);
121 }
122
UpdateState(grpc_connectivity_state state,UniquePtr<SubchannelPicker> picker)123 void UpdateState(grpc_connectivity_state state,
124 UniquePtr<SubchannelPicker> picker) override {
125 if (parent_->resolver_ == nullptr) return; // Shutting down.
126 // If this request is from the pending child policy, ignore it until
127 // it reports READY, at which point we swap it into place.
128 if (CalledByPendingChild()) {
129 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
130 gpr_log(GPR_INFO,
131 "resolving_lb=%p helper=%p: pending child policy %p reports "
132 "state=%s",
133 parent_.get(), this, child_,
134 grpc_connectivity_state_name(state));
135 }
136 if (state != GRPC_CHANNEL_READY) return;
137 grpc_pollset_set_del_pollset_set(
138 parent_->lb_policy_->interested_parties(),
139 parent_->interested_parties());
140 parent_->lb_policy_ = std::move(parent_->pending_lb_policy_);
141 } else if (!CalledByCurrentChild()) {
142 // This request is from an outdated child, so ignore it.
143 return;
144 }
145 parent_->channel_control_helper()->UpdateState(state, std::move(picker));
146 }
147
RequestReresolution()148 void RequestReresolution() override {
149 // If there is a pending child policy, ignore re-resolution requests
150 // from the current child policy (or any outdated child).
151 if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) {
152 return;
153 }
154 if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
155 gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving",
156 parent_.get());
157 }
158 if (parent_->resolver_ != nullptr) {
159 parent_->resolver_->RequestReresolutionLocked();
160 }
161 }
162
set_child(LoadBalancingPolicy * child)163 void set_child(LoadBalancingPolicy* child) { child_ = child; }
164
165 private:
CalledByPendingChild() const166 bool CalledByPendingChild() const {
167 GPR_ASSERT(child_ != nullptr);
168 return child_ == parent_->pending_lb_policy_.get();
169 }
170
CalledByCurrentChild() const171 bool CalledByCurrentChild() const {
172 GPR_ASSERT(child_ != nullptr);
173 return child_ == parent_->lb_policy_.get();
174 };
175
176 RefCountedPtr<ResolvingLoadBalancingPolicy> parent_;
177 LoadBalancingPolicy* child_ = nullptr;
178 };
179
180 //
181 // ResolvingLoadBalancingPolicy
182 //
183
ResolvingLoadBalancingPolicy(Args args,TraceFlag * tracer,UniquePtr<char> target_uri,ProcessResolverResultCallback process_resolver_result,void * process_resolver_result_user_data)184 ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
185 Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
186 ProcessResolverResultCallback process_resolver_result,
187 void* process_resolver_result_user_data)
188 : LoadBalancingPolicy(std::move(args)),
189 tracer_(tracer),
190 target_uri_(std::move(target_uri)),
191 process_resolver_result_(process_resolver_result),
192 process_resolver_result_user_data_(process_resolver_result_user_data) {
193 GPR_ASSERT(process_resolver_result != nullptr);
194 resolver_ = ResolverRegistry::CreateResolver(
195 target_uri_.get(), args.args, interested_parties(), combiner(),
196 UniquePtr<Resolver::ResultHandler>(New<ResolverResultHandler>(Ref())));
197 // Since the validity of args has been checked when create the channel,
198 // CreateResolver() must return a non-null result.
199 GPR_ASSERT(resolver_ != nullptr);
200 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
201 gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
202 }
203 channel_control_helper()->UpdateState(
204 GRPC_CHANNEL_CONNECTING,
205 UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
206 resolver_->StartLocked();
207 }
208
~ResolvingLoadBalancingPolicy()209 ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
210 GPR_ASSERT(resolver_ == nullptr);
211 GPR_ASSERT(lb_policy_ == nullptr);
212 }
213
ShutdownLocked()214 void ResolvingLoadBalancingPolicy::ShutdownLocked() {
215 if (resolver_ != nullptr) {
216 resolver_.reset();
217 if (lb_policy_ != nullptr) {
218 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
219 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this,
220 lb_policy_.get());
221 }
222 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
223 interested_parties());
224 lb_policy_.reset();
225 }
226 if (pending_lb_policy_ != nullptr) {
227 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
228 gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p",
229 this, pending_lb_policy_.get());
230 }
231 grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(),
232 interested_parties());
233 pending_lb_policy_.reset();
234 }
235 }
236 }
237
ExitIdleLocked()238 void ResolvingLoadBalancingPolicy::ExitIdleLocked() {
239 if (lb_policy_ != nullptr) {
240 lb_policy_->ExitIdleLocked();
241 if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked();
242 }
243 }
244
ResetBackoffLocked()245 void ResolvingLoadBalancingPolicy::ResetBackoffLocked() {
246 if (resolver_ != nullptr) {
247 resolver_->ResetBackoffLocked();
248 resolver_->RequestReresolutionLocked();
249 }
250 if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked();
251 if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked();
252 }
253
OnResolverError(grpc_error * error)254 void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
255 if (resolver_ == nullptr) {
256 GRPC_ERROR_UNREF(error);
257 return;
258 }
259 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
260 gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this,
261 grpc_error_string(error));
262 }
263 // If we already have an LB policy from a previous resolution
264 // result, then we continue to let it set the connectivity state.
265 // Otherwise, we go into TRANSIENT_FAILURE.
266 if (lb_policy_ == nullptr) {
267 grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
268 "Resolver transient failure", &error, 1);
269 channel_control_helper()->UpdateState(
270 GRPC_CHANNEL_TRANSIENT_FAILURE,
271 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(state_error)));
272 }
273 GRPC_ERROR_UNREF(error);
274 }
275
CreateOrUpdateLbPolicyLocked(const char * lb_policy_name,RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,Resolver::Result result,TraceStringVector * trace_strings)276 void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
277 const char* lb_policy_name,
278 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
279 Resolver::Result result, TraceStringVector* trace_strings) {
280 // If the child policy name changes, we need to create a new child
281 // policy. When this happens, we leave child_policy_ as-is and store
282 // the new child policy in pending_child_policy_. Once the new child
283 // policy transitions into state READY, we swap it into child_policy_,
284 // replacing the original child policy. So pending_child_policy_ is
285 // non-null only between when we apply an update that changes the child
286 // policy name and when the new child reports state READY.
287 //
288 // Updates can arrive at any point during this transition. We always
289 // apply updates relative to the most recently created child policy,
290 // even if the most recent one is still in pending_child_policy_. This
291 // is true both when applying the updates to an existing child policy
292 // and when determining whether we need to create a new policy.
293 //
294 // As a result of this, there are several cases to consider here:
295 //
296 // 1. We have no existing child policy (i.e., we have started up but
297 // have not yet received a serverlist from the balancer or gone
298 // into fallback mode; in this case, both child_policy_ and
299 // pending_child_policy_ are null). In this case, we create a
300 // new child policy and store it in child_policy_.
301 //
302 // 2. We have an existing child policy and have no pending child policy
303 // from a previous update (i.e., either there has not been a
304 // previous update that changed the policy name, or we have already
305 // finished swapping in the new policy; in this case, child_policy_
306 // is non-null but pending_child_policy_ is null). In this case:
307 // a. If child_policy_->name() equals child_policy_name, then we
308 // update the existing child policy.
309 // b. If child_policy_->name() does not equal child_policy_name,
310 // we create a new policy. The policy will be stored in
311 // pending_child_policy_ and will later be swapped into
312 // child_policy_ by the helper when the new child transitions
313 // into state READY.
314 //
315 // 3. We have an existing child policy and have a pending child policy
316 // from a previous update (i.e., a previous update set
317 // pending_child_policy_ as per case 2b above and that policy has
318 // not yet transitioned into state READY and been swapped into
319 // child_policy_; in this case, both child_policy_ and
320 // pending_child_policy_ are non-null). In this case:
321 // a. If pending_child_policy_->name() equals child_policy_name,
322 // then we update the existing pending child policy.
323 // b. If pending_child_policy->name() does not equal
324 // child_policy_name, then we create a new policy. The new
325 // policy is stored in pending_child_policy_ (replacing the one
326 // that was there before, which will be immediately shut down)
327 // and will later be swapped into child_policy_ by the helper
328 // when the new child transitions into state READY.
329 const bool create_policy =
330 // case 1
331 lb_policy_ == nullptr ||
332 // case 2b
333 (pending_lb_policy_ == nullptr &&
334 strcmp(lb_policy_->name(), lb_policy_name) != 0) ||
335 // case 3b
336 (pending_lb_policy_ != nullptr &&
337 strcmp(pending_lb_policy_->name(), lb_policy_name) != 0);
338 LoadBalancingPolicy* policy_to_update = nullptr;
339 if (create_policy) {
340 // Cases 1, 2b, and 3b: create a new child policy.
341 // If lb_policy_ is null, we set it (case 1), else we set
342 // pending_lb_policy_ (cases 2b and 3b).
343 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
344 gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this,
345 lb_policy_ == nullptr ? "" : "pending ", lb_policy_name);
346 }
347 auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_;
348 lb_policy =
349 CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings);
350 policy_to_update = lb_policy.get();
351 } else {
352 // Cases 2a and 3a: update an existing policy.
353 // If we have a pending child policy, send the update to the pending
354 // policy (case 3a), else send it to the current policy (case 2a).
355 policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get()
356 : lb_policy_.get();
357 }
358 GPR_ASSERT(policy_to_update != nullptr);
359 // Update the policy.
360 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
361 gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this,
362 policy_to_update == pending_lb_policy_.get() ? "pending " : "",
363 policy_to_update);
364 }
365 UpdateArgs update_args;
366 update_args.addresses = std::move(result.addresses);
367 update_args.config = std::move(lb_policy_config);
368 // TODO(roth): Once channel args is converted to C++, use std::move() here.
369 update_args.args = result.args;
370 result.args = nullptr;
371 policy_to_update->UpdateLocked(std::move(update_args));
372 }
373
374 // Creates a new LB policy.
375 // Updates trace_strings to indicate what was done.
376 OrphanablePtr<LoadBalancingPolicy>
CreateLbPolicyLocked(const char * lb_policy_name,const grpc_channel_args & args,TraceStringVector * trace_strings)377 ResolvingLoadBalancingPolicy::CreateLbPolicyLocked(
378 const char* lb_policy_name, const grpc_channel_args& args,
379 TraceStringVector* trace_strings) {
380 ResolvingControlHelper* helper = New<ResolvingControlHelper>(Ref());
381 LoadBalancingPolicy::Args lb_policy_args;
382 lb_policy_args.combiner = combiner();
383 lb_policy_args.channel_control_helper =
384 UniquePtr<ChannelControlHelper>(helper);
385 lb_policy_args.args = &args;
386 OrphanablePtr<LoadBalancingPolicy> lb_policy =
387 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
388 lb_policy_name, std::move(lb_policy_args));
389 if (GPR_UNLIKELY(lb_policy == nullptr)) {
390 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
391 char* str;
392 gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name);
393 trace_strings->push_back(str);
394 return nullptr;
395 }
396 helper->set_child(lb_policy.get());
397 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
398 gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)",
399 this, lb_policy_name, lb_policy.get());
400 }
401 char* str;
402 gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name);
403 trace_strings->push_back(str);
404 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
405 interested_parties());
406 return lb_policy;
407 }
408
MaybeAddTraceMessagesForAddressChangesLocked(bool resolution_contains_addresses,TraceStringVector * trace_strings)409 void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked(
410 bool resolution_contains_addresses, TraceStringVector* trace_strings) {
411 if (!resolution_contains_addresses &&
412 previous_resolution_contained_addresses_) {
413 trace_strings->push_back(gpr_strdup("Address list became empty"));
414 } else if (resolution_contains_addresses &&
415 !previous_resolution_contained_addresses_) {
416 trace_strings->push_back(gpr_strdup("Address list became non-empty"));
417 }
418 previous_resolution_contained_addresses_ = resolution_contains_addresses;
419 }
420
ConcatenateAndAddChannelTraceLocked(TraceStringVector * trace_strings) const421 void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked(
422 TraceStringVector* trace_strings) const {
423 if (!trace_strings->empty()) {
424 gpr_strvec v;
425 gpr_strvec_init(&v);
426 gpr_strvec_add(&v, gpr_strdup("Resolution event: "));
427 bool is_first = 1;
428 for (size_t i = 0; i < trace_strings->size(); ++i) {
429 if (!is_first) gpr_strvec_add(&v, gpr_strdup(", "));
430 is_first = false;
431 gpr_strvec_add(&v, (*trace_strings)[i]);
432 }
433 size_t len = 0;
434 UniquePtr<char> message(gpr_strvec_flatten(&v, &len));
435 channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO,
436 message.get());
437 gpr_strvec_destroy(&v);
438 }
439 }
440
OnResolverResultChangedLocked(Resolver::Result result)441 void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
442 Resolver::Result result) {
443 // Handle race conditions.
444 if (resolver_ == nullptr) return;
445 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
446 gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this);
447 }
448 // We only want to trace the address resolution in the follow cases:
449 // (a) Address resolution resulted in service config change.
450 // (b) Address resolution that causes number of backends to go from
451 // zero to non-zero.
452 // (c) Address resolution that causes number of backends to go from
453 // non-zero to zero.
454 // (d) Address resolution that causes a new LB policy to be created.
455 //
456 // We track a list of strings to eventually be concatenated and traced.
457 TraceStringVector trace_strings;
458 const bool resolution_contains_addresses = result.addresses.size() > 0;
459 // Process the resolver result.
460 const char* lb_policy_name = nullptr;
461 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
462 bool service_config_changed = false;
463 char* service_config_error_string = nullptr;
464 if (process_resolver_result_ != nullptr) {
465 grpc_error* service_config_error = GRPC_ERROR_NONE;
466 service_config_changed = process_resolver_result_(
467 process_resolver_result_user_data_, result, &lb_policy_name,
468 &lb_policy_config, &service_config_error);
469 if (service_config_error != GRPC_ERROR_NONE) {
470 service_config_error_string =
471 gpr_strdup(grpc_error_string(service_config_error));
472 if (lb_policy_name == nullptr) {
473 // Use an empty lb_policy_name as an indicator that we received an
474 // invalid service config and we don't have a fallback service config.
475 OnResolverError(service_config_error);
476 } else {
477 GRPC_ERROR_UNREF(service_config_error);
478 }
479 }
480 } else {
481 lb_policy_name = child_policy_name_.get();
482 lb_policy_config = child_lb_config_;
483 }
484 if (lb_policy_name != nullptr) {
485 // Create or update LB policy, as needed.
486 CreateOrUpdateLbPolicyLocked(lb_policy_name, lb_policy_config,
487 std::move(result), &trace_strings);
488 }
489 // Add channel trace event.
490 if (service_config_changed) {
491 // TODO(ncteisen): might be worth somehow including a snippet of the
492 // config in the trace, at the risk of bloating the trace logs.
493 trace_strings.push_back(gpr_strdup("Service config changed"));
494 }
495 if (service_config_error_string != nullptr) {
496 trace_strings.push_back(service_config_error_string);
497 service_config_error_string = nullptr;
498 }
499 MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
500 &trace_strings);
501 ConcatenateAndAddChannelTraceLocked(&trace_strings);
502 }
503
504 } // namespace grpc_core
505