1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <set>
20 #include <string>
21 #include <vector>
22
23 #include "absl/status/status.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/string_view.h"
26
27 #include <grpc/grpc.h>
28
29 #include "src/core/ext/filters/client_channel/lb_policy.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
32 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33 #include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/orphanable.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/timer.h"
39 #include "src/core/lib/iomgr/work_serializer.h"
40 #include "src/core/lib/transport/error_utils.h"
41
42 #define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
43
44 namespace grpc_core {
45
46 TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
47
48 namespace {
49
50 constexpr char kXdsClusterManager[] = "xds_cluster_manager_experimental";
51
52 // Config for xds_cluster_manager LB policy.
53 class XdsClusterManagerLbConfig : public LoadBalancingPolicy::Config {
54 public:
55 using ClusterMap =
56 std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>;
57
XdsClusterManagerLbConfig(ClusterMap cluster_map)58 explicit XdsClusterManagerLbConfig(ClusterMap cluster_map)
59 : cluster_map_(std::move(cluster_map)) {}
60
name() const61 const char* name() const override { return kXdsClusterManager; }
62
cluster_map() const63 const ClusterMap& cluster_map() const { return cluster_map_; }
64
65 private:
66 ClusterMap cluster_map_;
67 };
68
69 // xds_cluster_manager LB policy.
70 class XdsClusterManagerLb : public LoadBalancingPolicy {
71 public:
72 explicit XdsClusterManagerLb(Args args);
73
name() const74 const char* name() const override { return kXdsClusterManager; }
75
76 void UpdateLocked(UpdateArgs args) override;
77 void ExitIdleLocked() override;
78 void ResetBackoffLocked() override;
79
80 private:
81 // A simple wrapper for ref-counting a picker from the child policy.
82 class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
83 public:
ChildPickerWrapper(std::string name,std::unique_ptr<SubchannelPicker> picker)84 ChildPickerWrapper(std::string name,
85 std::unique_ptr<SubchannelPicker> picker)
86 : name_(std::move(name)), picker_(std::move(picker)) {}
Pick(PickArgs args)87 PickResult Pick(PickArgs args) { return picker_->Pick(args); }
88
name() const89 const std::string& name() const { return name_; }
90
91 private:
92 std::string name_;
93 std::unique_ptr<SubchannelPicker> picker_;
94 };
95
96 // Picks a child using prefix or path matching and then delegates to that
97 // child's picker.
98 class ClusterPicker : public SubchannelPicker {
99 public:
100 // Maintains a map of cluster names to pickers.
101 using ClusterMap = std::map<absl::string_view /*cluster_name*/,
102 RefCountedPtr<ChildPickerWrapper>>;
103
104 // It is required that the keys of cluster_map have to live at least as long
105 // as the ClusterPicker instance.
ClusterPicker(ClusterMap cluster_map)106 explicit ClusterPicker(ClusterMap cluster_map)
107 : cluster_map_(std::move(cluster_map)) {}
108
109 PickResult Pick(PickArgs args) override;
110
111 private:
112 ClusterMap cluster_map_;
113 };
114
115 // Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
116 class ClusterChild : public InternallyRefCounted<ClusterChild> {
117 public:
118 ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
119 const std::string& name);
120 ~ClusterChild() override;
121
122 void Orphan() override;
123
124 void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
125 const ServerAddressList& addresses,
126 const grpc_channel_args* args);
127 void ExitIdleLocked();
128 void ResetBackoffLocked();
129 void DeactivateLocked();
130
connectivity_state() const131 grpc_connectivity_state connectivity_state() const {
132 return connectivity_state_;
133 }
picker_wrapper() const134 RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
135 return picker_wrapper_;
136 }
137
138 private:
139 class Helper : public ChannelControlHelper {
140 public:
Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)141 explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
142 : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
143
~Helper()144 ~Helper() override {
145 xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper");
146 }
147
148 RefCountedPtr<SubchannelInterface> CreateSubchannel(
149 ServerAddress address, const grpc_channel_args& args) override;
150 void UpdateState(grpc_connectivity_state state,
151 const absl::Status& status,
152 std::unique_ptr<SubchannelPicker> picker) override;
153 void RequestReresolution() override;
154 absl::string_view GetAuthority() override;
155 void AddTraceEvent(TraceSeverity severity,
156 absl::string_view message) override;
157
158 private:
159 RefCountedPtr<ClusterChild> xds_cluster_manager_child_;
160 };
161
162 // Methods for dealing with the child policy.
163 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
164 const grpc_channel_args* args);
165
166 static void OnDelayedRemovalTimer(void* arg, grpc_error_handle error);
167 void OnDelayedRemovalTimerLocked(grpc_error_handle error);
168
169 // The owning LB policy.
170 RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
171
172 // Points to the corresponding key in children map.
173 const std::string name_;
174
175 OrphanablePtr<LoadBalancingPolicy> child_policy_;
176
177 RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
178 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
179 bool seen_failure_since_ready_ = false;
180
181 // States for delayed removal.
182 grpc_timer delayed_removal_timer_;
183 grpc_closure on_delayed_removal_timer_;
184 bool delayed_removal_timer_callback_pending_ = false;
185 bool shutdown_ = false;
186 };
187
188 ~XdsClusterManagerLb() override;
189
190 void ShutdownLocked() override;
191
192 void UpdateStateLocked();
193
194 // Current config from the resolver.
195 RefCountedPtr<XdsClusterManagerLbConfig> config_;
196
197 // Internal state.
198 bool shutting_down_ = false;
199
200 // Children.
201 std::map<std::string, OrphanablePtr<ClusterChild>> children_;
202 };
203
204 //
205 // XdsClusterManagerLb::ClusterPicker
206 //
207
Pick(PickArgs args)208 XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
209 PickArgs args) {
210 auto cluster_name =
211 args.call_state->ExperimentalGetCallAttribute(kXdsClusterAttribute);
212 auto it = cluster_map_.find(cluster_name);
213 if (it != cluster_map_.end()) {
214 return it->second->Pick(args);
215 }
216 return PickResult::Fail(absl::InternalError(absl::StrCat(
217 "xds cluster manager picker: unknown cluster \"", cluster_name, "\"")));
218 }
219
220 //
221 // XdsClusterManagerLb
222 //
223
XdsClusterManagerLb(Args args)224 XdsClusterManagerLb::XdsClusterManagerLb(Args args)
225 : LoadBalancingPolicy(std::move(args)) {}
226
~XdsClusterManagerLb()227 XdsClusterManagerLb::~XdsClusterManagerLb() {
228 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
229 gpr_log(
230 GPR_INFO,
231 "[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy",
232 this);
233 }
234 }
235
ShutdownLocked()236 void XdsClusterManagerLb::ShutdownLocked() {
237 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
238 gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] shutting down", this);
239 }
240 shutting_down_ = true;
241 children_.clear();
242 }
243
ExitIdleLocked()244 void XdsClusterManagerLb::ExitIdleLocked() {
245 for (auto& p : children_) p.second->ExitIdleLocked();
246 }
247
ResetBackoffLocked()248 void XdsClusterManagerLb::ResetBackoffLocked() {
249 for (auto& p : children_) p.second->ResetBackoffLocked();
250 }
251
UpdateLocked(UpdateArgs args)252 void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
253 if (shutting_down_) return;
254 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
255 gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
256 }
257 // Update config.
258 config_ = std::move(args.config);
259 // Deactivate the children not in the new config.
260 for (const auto& p : children_) {
261 const std::string& name = p.first;
262 ClusterChild* child = p.second.get();
263 if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
264 child->DeactivateLocked();
265 }
266 }
267 // Add or update the children in the new config.
268 for (const auto& p : config_->cluster_map()) {
269 const std::string& name = p.first;
270 const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
271 auto it = children_.find(name);
272 if (it == children_.end()) {
273 it = children_
274 .emplace(name, MakeOrphanable<ClusterChild>(
275 Ref(DEBUG_LOCATION, "ClusterChild"), name))
276 .first;
277 }
278 it->second->UpdateLocked(config, args.addresses, args.args);
279 }
280 UpdateStateLocked();
281 }
282
UpdateStateLocked()283 void XdsClusterManagerLb::UpdateStateLocked() {
284 // Also count the number of children in each state, to determine the
285 // overall state.
286 size_t num_ready = 0;
287 size_t num_connecting = 0;
288 size_t num_idle = 0;
289 size_t num_transient_failures = 0;
290 for (const auto& p : children_) {
291 const auto& child_name = p.first;
292 const ClusterChild* child = p.second.get();
293 // Skip the children that are not in the latest update.
294 if (config_->cluster_map().find(child_name) ==
295 config_->cluster_map().end()) {
296 continue;
297 }
298 switch (child->connectivity_state()) {
299 case GRPC_CHANNEL_READY: {
300 ++num_ready;
301 break;
302 }
303 case GRPC_CHANNEL_CONNECTING: {
304 ++num_connecting;
305 break;
306 }
307 case GRPC_CHANNEL_IDLE: {
308 ++num_idle;
309 break;
310 }
311 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
312 ++num_transient_failures;
313 break;
314 }
315 default:
316 GPR_UNREACHABLE_CODE(return );
317 }
318 }
319 // Determine aggregated connectivity state.
320 grpc_connectivity_state connectivity_state;
321 if (num_ready > 0) {
322 connectivity_state = GRPC_CHANNEL_READY;
323 } else if (num_connecting > 0) {
324 connectivity_state = GRPC_CHANNEL_CONNECTING;
325 } else if (num_idle > 0) {
326 connectivity_state = GRPC_CHANNEL_IDLE;
327 } else {
328 connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
329 }
330 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
331 gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] connectivity changed to %s",
332 this, ConnectivityStateName(connectivity_state));
333 }
334 ClusterPicker::ClusterMap cluster_map;
335 for (const auto& p : config_->cluster_map()) {
336 const std::string& cluster_name = p.first;
337 RefCountedPtr<ChildPickerWrapper>& child_picker = cluster_map[cluster_name];
338 child_picker = children_[cluster_name]->picker_wrapper();
339 if (child_picker == nullptr) {
340 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
341 gpr_log(GPR_INFO,
342 "[xds_cluster_manager_lb %p] child %s has not yet returned a "
343 "picker; creating a QueuePicker.",
344 this, cluster_name.c_str());
345 }
346 child_picker = MakeRefCounted<ChildPickerWrapper>(
347 cluster_name,
348 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
349 }
350 }
351 std::unique_ptr<SubchannelPicker> picker =
352 absl::make_unique<ClusterPicker>(std::move(cluster_map));
353 absl::Status status;
354 if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
355 status = absl::Status(absl::StatusCode::kUnavailable,
356 "TRANSIENT_FAILURE from XdsClusterManagerLb");
357 }
358 channel_control_helper()->UpdateState(connectivity_state, status,
359 std::move(picker));
360 }
361
362 //
363 // XdsClusterManagerLb::ClusterChild
364 //
365
ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,const std::string & name)366 XdsClusterManagerLb::ClusterChild::ClusterChild(
367 RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
368 const std::string& name)
369 : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
370 name_(name) {
371 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
372 gpr_log(GPR_INFO,
373 "[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
374 xds_cluster_manager_policy_.get(), this, name_.c_str());
375 }
376 GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
377 grpc_schedule_on_exec_ctx);
378 }
379
~ClusterChild()380 XdsClusterManagerLb::ClusterChild::~ClusterChild() {
381 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
382 gpr_log(GPR_INFO,
383 "[xds_cluster_manager_lb %p] ClusterChild %p: destroying "
384 "child",
385 xds_cluster_manager_policy_.get(), this);
386 }
387 xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
388 }
389
Orphan()390 void XdsClusterManagerLb::ClusterChild::Orphan() {
391 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
392 gpr_log(GPR_INFO,
393 "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
394 "shutting down child",
395 xds_cluster_manager_policy_.get(), this, name_.c_str());
396 }
397 // Remove the child policy's interested_parties pollset_set from the
398 // xDS policy.
399 grpc_pollset_set_del_pollset_set(
400 child_policy_->interested_parties(),
401 xds_cluster_manager_policy_->interested_parties());
402 child_policy_.reset();
403 // Drop our ref to the child's picker, in case it's holding a ref to
404 // the child.
405 picker_wrapper_.reset();
406 if (delayed_removal_timer_callback_pending_) {
407 grpc_timer_cancel(&delayed_removal_timer_);
408 }
409 shutdown_ = true;
410 Unref();
411 }
412
413 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)414 XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
415 const grpc_channel_args* args) {
416 LoadBalancingPolicy::Args lb_policy_args;
417 lb_policy_args.work_serializer =
418 xds_cluster_manager_policy_->work_serializer();
419 lb_policy_args.args = args;
420 lb_policy_args.channel_control_helper =
421 absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
422 OrphanablePtr<LoadBalancingPolicy> lb_policy =
423 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
424 &grpc_xds_cluster_manager_lb_trace);
425 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
426 gpr_log(GPR_INFO,
427 "[xds_cluster_manager_lb %p] ClusterChild %p %s: Created "
428 "new child "
429 "policy handler %p",
430 xds_cluster_manager_policy_.get(), this, name_.c_str(),
431 lb_policy.get());
432 }
433 // Add the xDS's interested_parties pollset_set to that of the newly created
434 // child policy. This will make the child policy progress upon activity on
435 // xDS LB, which in turn is tied to the application's call.
436 grpc_pollset_set_add_pollset_set(
437 lb_policy->interested_parties(),
438 xds_cluster_manager_policy_->interested_parties());
439 return lb_policy;
440 }
441
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,const ServerAddressList & addresses,const grpc_channel_args * args)442 void XdsClusterManagerLb::ClusterChild::UpdateLocked(
443 RefCountedPtr<LoadBalancingPolicy::Config> config,
444 const ServerAddressList& addresses, const grpc_channel_args* args) {
445 if (xds_cluster_manager_policy_->shutting_down_) return;
446 // Update child weight.
447 // Reactivate if needed.
448 if (delayed_removal_timer_callback_pending_) {
449 delayed_removal_timer_callback_pending_ = false;
450 grpc_timer_cancel(&delayed_removal_timer_);
451 }
452 // Create child policy if needed.
453 if (child_policy_ == nullptr) {
454 child_policy_ = CreateChildPolicyLocked(args);
455 }
456 // Construct update args.
457 UpdateArgs update_args;
458 update_args.config = std::move(config);
459 update_args.addresses = addresses;
460 update_args.args = grpc_channel_args_copy(args);
461 // Update the policy.
462 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
463 gpr_log(GPR_INFO,
464 "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
465 "Updating child "
466 "policy handler %p",
467 xds_cluster_manager_policy_.get(), this, name_.c_str(),
468 child_policy_.get());
469 }
470 child_policy_->UpdateLocked(std::move(update_args));
471 }
472
ExitIdleLocked()473 void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
474 child_policy_->ExitIdleLocked();
475 }
476
ResetBackoffLocked()477 void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
478 child_policy_->ResetBackoffLocked();
479 }
480
DeactivateLocked()481 void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
482 // If already deactivated, don't do that again.
483 if (delayed_removal_timer_callback_pending_) return;
484 // Set the child weight to 0 so that future picker won't contain this child.
485 // Start a timer to delete the child.
486 Ref(DEBUG_LOCATION, "ClusterChild+timer").release();
487 grpc_timer_init(&delayed_removal_timer_,
488 ExecCtx::Get()->Now() +
489 GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS,
490 &on_delayed_removal_timer_);
491 delayed_removal_timer_callback_pending_ = true;
492 }
493
OnDelayedRemovalTimer(void * arg,grpc_error_handle error)494 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer(
495 void* arg, grpc_error_handle error) {
496 ClusterChild* self = static_cast<ClusterChild*>(arg);
497 (void)GRPC_ERROR_REF(error); // Ref owned by the lambda
498 self->xds_cluster_manager_policy_->work_serializer()->Run(
499 [self, error]() { self->OnDelayedRemovalTimerLocked(error); },
500 DEBUG_LOCATION);
501 }
502
OnDelayedRemovalTimerLocked(grpc_error_handle error)503 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked(
504 grpc_error_handle error) {
505 delayed_removal_timer_callback_pending_ = false;
506 if (error == GRPC_ERROR_NONE && !shutdown_) {
507 xds_cluster_manager_policy_->children_.erase(name_);
508 }
509 Unref(DEBUG_LOCATION, "ClusterChild+timer");
510 GRPC_ERROR_UNREF(error);
511 }
512
513 //
514 // XdsClusterManagerLb::ClusterChild::Helper
515 //
516
517 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)518 XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel(
519 ServerAddress address, const grpc_channel_args& args) {
520 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
521 return nullptr;
522 }
523 return xds_cluster_manager_child_->xds_cluster_manager_policy_
524 ->channel_control_helper()
525 ->CreateSubchannel(std::move(address), args);
526 }
527
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)528 void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
529 grpc_connectivity_state state, const absl::Status& status,
530 std::unique_ptr<SubchannelPicker> picker) {
531 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
532 gpr_log(
533 GPR_INFO,
534 "[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) "
535 "picker=%p",
536 xds_cluster_manager_child_->xds_cluster_manager_policy_.get(),
537 xds_cluster_manager_child_->name_.c_str(), ConnectivityStateName(state),
538 status.ToString().c_str(), picker.get());
539 }
540 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
541 return;
542 }
543 // Cache the picker in the ClusterChild.
544 xds_cluster_manager_child_->picker_wrapper_ =
545 MakeRefCounted<ChildPickerWrapper>(xds_cluster_manager_child_->name_,
546 std::move(picker));
547 // Decide what state to report for aggregation purposes.
548 // If we haven't seen a failure since the last time we were in state
549 // READY, then we report the state change as-is. However, once we do see
550 // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
551 // changes until we go back into state READY.
552 if (!xds_cluster_manager_child_->seen_failure_since_ready_) {
553 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
554 xds_cluster_manager_child_->seen_failure_since_ready_ = true;
555 }
556 } else {
557 if (state != GRPC_CHANNEL_READY) return;
558 xds_cluster_manager_child_->seen_failure_since_ready_ = false;
559 }
560 xds_cluster_manager_child_->connectivity_state_ = state;
561 // Notify the LB policy.
562 xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
563 }
564
RequestReresolution()565 void XdsClusterManagerLb::ClusterChild::Helper::RequestReresolution() {
566 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
567 return;
568 }
569 xds_cluster_manager_child_->xds_cluster_manager_policy_
570 ->channel_control_helper()
571 ->RequestReresolution();
572 }
573
GetAuthority()574 absl::string_view XdsClusterManagerLb::ClusterChild::Helper::GetAuthority() {
575 return xds_cluster_manager_child_->xds_cluster_manager_policy_
576 ->channel_control_helper()
577 ->GetAuthority();
578 }
579
AddTraceEvent(TraceSeverity severity,absl::string_view message)580 void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent(
581 TraceSeverity severity, absl::string_view message) {
582 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
583 return;
584 }
585 xds_cluster_manager_child_->xds_cluster_manager_policy_
586 ->channel_control_helper()
587 ->AddTraceEvent(severity, message);
588 }
589
590 //
591 // factory
592 //
593
594 class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
595 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const596 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
597 LoadBalancingPolicy::Args args) const override {
598 return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
599 }
600
name() const601 const char* name() const override { return kXdsClusterManager; }
602
ParseLoadBalancingConfig(const Json & json,grpc_error_handle * error) const603 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
604 const Json& json, grpc_error_handle* error) const override {
605 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
606 if (json.type() == Json::Type::JSON_NULL) {
607 // xds_cluster_manager was mentioned as a policy in the deprecated
608 // loadBalancingPolicy field or in the client API.
609 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
610 "field:loadBalancingPolicy error:xds_cluster_manager policy requires "
611 "configuration. Please use loadBalancingConfig field of service "
612 "config instead.");
613 return nullptr;
614 }
615 std::vector<grpc_error_handle> error_list;
616 XdsClusterManagerLbConfig::ClusterMap cluster_map;
617 std::set<std::string /*cluster_name*/> clusters_to_be_used;
618 auto it = json.object_value().find("children");
619 if (it == json.object_value().end()) {
620 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
621 "field:children error:required field not present"));
622 } else if (it->second.type() != Json::Type::OBJECT) {
623 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
624 "field:children error:type should be object"));
625 } else {
626 for (const auto& p : it->second.object_value()) {
627 const std::string& child_name = p.first;
628 if (child_name.empty()) {
629 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
630 "field:children element error: name cannot be empty"));
631 continue;
632 }
633 RefCountedPtr<LoadBalancingPolicy::Config> child_config;
634 std::vector<grpc_error_handle> child_errors =
635 ParseChildConfig(p.second, &child_config);
636 if (!child_errors.empty()) {
637 error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
638 absl::StrCat("field:children name:", child_name), &child_errors));
639 } else {
640 cluster_map[child_name] = std::move(child_config);
641 clusters_to_be_used.insert(child_name);
642 }
643 }
644 }
645 if (cluster_map.empty()) {
646 error_list.push_back(
647 GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid children configured"));
648 }
649 if (!error_list.empty()) {
650 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
651 "xds_cluster_manager_experimental LB policy config", &error_list);
652 return nullptr;
653 }
654 return MakeRefCounted<XdsClusterManagerLbConfig>(std::move(cluster_map));
655 }
656
657 private:
ParseChildConfig(const Json & json,RefCountedPtr<LoadBalancingPolicy::Config> * child_config)658 static std::vector<grpc_error_handle> ParseChildConfig(
659 const Json& json,
660 RefCountedPtr<LoadBalancingPolicy::Config>* child_config) {
661 std::vector<grpc_error_handle> error_list;
662 if (json.type() != Json::Type::OBJECT) {
663 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
664 "value should be of type object"));
665 return error_list;
666 }
667 auto it = json.object_value().find("childPolicy");
668 if (it == json.object_value().end()) {
669 error_list.push_back(
670 GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
671 } else {
672 grpc_error_handle parse_error = GRPC_ERROR_NONE;
673 *child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
674 it->second, &parse_error);
675 if (*child_config == nullptr) {
676 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
677 std::vector<grpc_error_handle> child_errors;
678 child_errors.push_back(parse_error);
679 error_list.push_back(
680 GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
681 }
682 }
683 return error_list;
684 }
685 };
686
687 } // namespace
688
689 } // namespace grpc_core
690
691 //
692 // Plugin registration
693 //
694
grpc_lb_policy_xds_cluster_manager_init()695 void grpc_lb_policy_xds_cluster_manager_init() {
696 grpc_core::LoadBalancingPolicyRegistry::Builder::
697 RegisterLoadBalancingPolicyFactory(
698 absl::make_unique<grpc_core::XdsClusterManagerLbFactory>());
699 }
700
grpc_lb_policy_xds_cluster_manager_shutdown()701 void grpc_lb_policy_xds_cluster_manager_shutdown() {}
702