1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <set>
20 #include <string>
21 #include <vector>
22 
23 #include "absl/status/status.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/string_view.h"
26 
27 #include <grpc/grpc.h>
28 
29 #include "src/core/ext/filters/client_channel/lb_policy.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
32 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33 #include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/orphanable.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/timer.h"
39 #include "src/core/lib/iomgr/work_serializer.h"
40 #include "src/core/lib/transport/error_utils.h"
41 
42 #define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
43 
44 namespace grpc_core {
45 
46 TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
47 
48 namespace {
49 
50 constexpr char kXdsClusterManager[] = "xds_cluster_manager_experimental";
51 
52 // Config for xds_cluster_manager LB policy.
53 class XdsClusterManagerLbConfig : public LoadBalancingPolicy::Config {
54  public:
55   using ClusterMap =
56       std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>;
57 
XdsClusterManagerLbConfig(ClusterMap cluster_map)58   explicit XdsClusterManagerLbConfig(ClusterMap cluster_map)
59       : cluster_map_(std::move(cluster_map)) {}
60 
name() const61   const char* name() const override { return kXdsClusterManager; }
62 
cluster_map() const63   const ClusterMap& cluster_map() const { return cluster_map_; }
64 
65  private:
66   ClusterMap cluster_map_;
67 };
68 
69 // xds_cluster_manager LB policy.
70 class XdsClusterManagerLb : public LoadBalancingPolicy {
71  public:
72   explicit XdsClusterManagerLb(Args args);
73 
name() const74   const char* name() const override { return kXdsClusterManager; }
75 
76   void UpdateLocked(UpdateArgs args) override;
77   void ExitIdleLocked() override;
78   void ResetBackoffLocked() override;
79 
80  private:
81   // A simple wrapper for ref-counting a picker from the child policy.
82   class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
83    public:
ChildPickerWrapper(std::string name,std::unique_ptr<SubchannelPicker> picker)84     ChildPickerWrapper(std::string name,
85                        std::unique_ptr<SubchannelPicker> picker)
86         : name_(std::move(name)), picker_(std::move(picker)) {}
Pick(PickArgs args)87     PickResult Pick(PickArgs args) { return picker_->Pick(args); }
88 
name() const89     const std::string& name() const { return name_; }
90 
91    private:
92     std::string name_;
93     std::unique_ptr<SubchannelPicker> picker_;
94   };
95 
96   // Picks a child using prefix or path matching and then delegates to that
97   // child's picker.
98   class ClusterPicker : public SubchannelPicker {
99    public:
100     // Maintains a map of cluster names to pickers.
101     using ClusterMap = std::map<absl::string_view /*cluster_name*/,
102                                 RefCountedPtr<ChildPickerWrapper>>;
103 
104     // It is required that the keys of cluster_map have to live at least as long
105     // as the ClusterPicker instance.
ClusterPicker(ClusterMap cluster_map)106     explicit ClusterPicker(ClusterMap cluster_map)
107         : cluster_map_(std::move(cluster_map)) {}
108 
109     PickResult Pick(PickArgs args) override;
110 
111    private:
112     ClusterMap cluster_map_;
113   };
114 
115   // Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
116   class ClusterChild : public InternallyRefCounted<ClusterChild> {
117    public:
118     ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
119                  const std::string& name);
120     ~ClusterChild() override;
121 
122     void Orphan() override;
123 
124     void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
125                       const ServerAddressList& addresses,
126                       const grpc_channel_args* args);
127     void ExitIdleLocked();
128     void ResetBackoffLocked();
129     void DeactivateLocked();
130 
connectivity_state() const131     grpc_connectivity_state connectivity_state() const {
132       return connectivity_state_;
133     }
picker_wrapper() const134     RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
135       return picker_wrapper_;
136     }
137 
138    private:
139     class Helper : public ChannelControlHelper {
140      public:
Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)141       explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
142           : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
143 
~Helper()144       ~Helper() override {
145         xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper");
146       }
147 
148       RefCountedPtr<SubchannelInterface> CreateSubchannel(
149           ServerAddress address, const grpc_channel_args& args) override;
150       void UpdateState(grpc_connectivity_state state,
151                        const absl::Status& status,
152                        std::unique_ptr<SubchannelPicker> picker) override;
153       void RequestReresolution() override;
154       absl::string_view GetAuthority() override;
155       void AddTraceEvent(TraceSeverity severity,
156                          absl::string_view message) override;
157 
158      private:
159       RefCountedPtr<ClusterChild> xds_cluster_manager_child_;
160     };
161 
162     // Methods for dealing with the child policy.
163     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
164         const grpc_channel_args* args);
165 
166     static void OnDelayedRemovalTimer(void* arg, grpc_error_handle error);
167     void OnDelayedRemovalTimerLocked(grpc_error_handle error);
168 
169     // The owning LB policy.
170     RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
171 
172     // Points to the corresponding key in children map.
173     const std::string name_;
174 
175     OrphanablePtr<LoadBalancingPolicy> child_policy_;
176 
177     RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
178     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
179     bool seen_failure_since_ready_ = false;
180 
181     // States for delayed removal.
182     grpc_timer delayed_removal_timer_;
183     grpc_closure on_delayed_removal_timer_;
184     bool delayed_removal_timer_callback_pending_ = false;
185     bool shutdown_ = false;
186   };
187 
188   ~XdsClusterManagerLb() override;
189 
190   void ShutdownLocked() override;
191 
192   void UpdateStateLocked();
193 
194   // Current config from the resolver.
195   RefCountedPtr<XdsClusterManagerLbConfig> config_;
196 
197   // Internal state.
198   bool shutting_down_ = false;
199 
200   // Children.
201   std::map<std::string, OrphanablePtr<ClusterChild>> children_;
202 };
203 
204 //
205 // XdsClusterManagerLb::ClusterPicker
206 //
207 
Pick(PickArgs args)208 XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
209     PickArgs args) {
210   auto cluster_name =
211       args.call_state->ExperimentalGetCallAttribute(kXdsClusterAttribute);
212   auto it = cluster_map_.find(cluster_name);
213   if (it != cluster_map_.end()) {
214     return it->second->Pick(args);
215   }
216   return PickResult::Fail(absl::InternalError(absl::StrCat(
217       "xds cluster manager picker: unknown cluster \"", cluster_name, "\"")));
218 }
219 
220 //
221 // XdsClusterManagerLb
222 //
223 
XdsClusterManagerLb(Args args)224 XdsClusterManagerLb::XdsClusterManagerLb(Args args)
225     : LoadBalancingPolicy(std::move(args)) {}
226 
~XdsClusterManagerLb()227 XdsClusterManagerLb::~XdsClusterManagerLb() {
228   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
229     gpr_log(
230         GPR_INFO,
231         "[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy",
232         this);
233   }
234 }
235 
ShutdownLocked()236 void XdsClusterManagerLb::ShutdownLocked() {
237   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
238     gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] shutting down", this);
239   }
240   shutting_down_ = true;
241   children_.clear();
242 }
243 
ExitIdleLocked()244 void XdsClusterManagerLb::ExitIdleLocked() {
245   for (auto& p : children_) p.second->ExitIdleLocked();
246 }
247 
ResetBackoffLocked()248 void XdsClusterManagerLb::ResetBackoffLocked() {
249   for (auto& p : children_) p.second->ResetBackoffLocked();
250 }
251 
UpdateLocked(UpdateArgs args)252 void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
253   if (shutting_down_) return;
254   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
255     gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
256   }
257   // Update config.
258   config_ = std::move(args.config);
259   // Deactivate the children not in the new config.
260   for (const auto& p : children_) {
261     const std::string& name = p.first;
262     ClusterChild* child = p.second.get();
263     if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
264       child->DeactivateLocked();
265     }
266   }
267   // Add or update the children in the new config.
268   for (const auto& p : config_->cluster_map()) {
269     const std::string& name = p.first;
270     const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
271     auto it = children_.find(name);
272     if (it == children_.end()) {
273       it = children_
274                .emplace(name, MakeOrphanable<ClusterChild>(
275                                   Ref(DEBUG_LOCATION, "ClusterChild"), name))
276                .first;
277     }
278     it->second->UpdateLocked(config, args.addresses, args.args);
279   }
280   UpdateStateLocked();
281 }
282 
UpdateStateLocked()283 void XdsClusterManagerLb::UpdateStateLocked() {
284   // Also count the number of children in each state, to determine the
285   // overall state.
286   size_t num_ready = 0;
287   size_t num_connecting = 0;
288   size_t num_idle = 0;
289   size_t num_transient_failures = 0;
290   for (const auto& p : children_) {
291     const auto& child_name = p.first;
292     const ClusterChild* child = p.second.get();
293     // Skip the children that are not in the latest update.
294     if (config_->cluster_map().find(child_name) ==
295         config_->cluster_map().end()) {
296       continue;
297     }
298     switch (child->connectivity_state()) {
299       case GRPC_CHANNEL_READY: {
300         ++num_ready;
301         break;
302       }
303       case GRPC_CHANNEL_CONNECTING: {
304         ++num_connecting;
305         break;
306       }
307       case GRPC_CHANNEL_IDLE: {
308         ++num_idle;
309         break;
310       }
311       case GRPC_CHANNEL_TRANSIENT_FAILURE: {
312         ++num_transient_failures;
313         break;
314       }
315       default:
316         GPR_UNREACHABLE_CODE(return );
317     }
318   }
319   // Determine aggregated connectivity state.
320   grpc_connectivity_state connectivity_state;
321   if (num_ready > 0) {
322     connectivity_state = GRPC_CHANNEL_READY;
323   } else if (num_connecting > 0) {
324     connectivity_state = GRPC_CHANNEL_CONNECTING;
325   } else if (num_idle > 0) {
326     connectivity_state = GRPC_CHANNEL_IDLE;
327   } else {
328     connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
329   }
330   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
331     gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] connectivity changed to %s",
332             this, ConnectivityStateName(connectivity_state));
333   }
334   ClusterPicker::ClusterMap cluster_map;
335   for (const auto& p : config_->cluster_map()) {
336     const std::string& cluster_name = p.first;
337     RefCountedPtr<ChildPickerWrapper>& child_picker = cluster_map[cluster_name];
338     child_picker = children_[cluster_name]->picker_wrapper();
339     if (child_picker == nullptr) {
340       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
341         gpr_log(GPR_INFO,
342                 "[xds_cluster_manager_lb %p] child %s has not yet returned a "
343                 "picker; creating a QueuePicker.",
344                 this, cluster_name.c_str());
345       }
346       child_picker = MakeRefCounted<ChildPickerWrapper>(
347           cluster_name,
348           absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
349     }
350   }
351   std::unique_ptr<SubchannelPicker> picker =
352       absl::make_unique<ClusterPicker>(std::move(cluster_map));
353   absl::Status status;
354   if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
355     status = absl::Status(absl::StatusCode::kUnavailable,
356                           "TRANSIENT_FAILURE from XdsClusterManagerLb");
357   }
358   channel_control_helper()->UpdateState(connectivity_state, status,
359                                         std::move(picker));
360 }
361 
362 //
363 // XdsClusterManagerLb::ClusterChild
364 //
365 
ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,const std::string & name)366 XdsClusterManagerLb::ClusterChild::ClusterChild(
367     RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
368     const std::string& name)
369     : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
370       name_(name) {
371   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
372     gpr_log(GPR_INFO,
373             "[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
374             xds_cluster_manager_policy_.get(), this, name_.c_str());
375   }
376   GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
377                     grpc_schedule_on_exec_ctx);
378 }
379 
~ClusterChild()380 XdsClusterManagerLb::ClusterChild::~ClusterChild() {
381   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
382     gpr_log(GPR_INFO,
383             "[xds_cluster_manager_lb %p] ClusterChild %p: destroying "
384             "child",
385             xds_cluster_manager_policy_.get(), this);
386   }
387   xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
388 }
389 
Orphan()390 void XdsClusterManagerLb::ClusterChild::Orphan() {
391   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
392     gpr_log(GPR_INFO,
393             "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
394             "shutting down child",
395             xds_cluster_manager_policy_.get(), this, name_.c_str());
396   }
397   // Remove the child policy's interested_parties pollset_set from the
398   // xDS policy.
399   grpc_pollset_set_del_pollset_set(
400       child_policy_->interested_parties(),
401       xds_cluster_manager_policy_->interested_parties());
402   child_policy_.reset();
403   // Drop our ref to the child's picker, in case it's holding a ref to
404   // the child.
405   picker_wrapper_.reset();
406   if (delayed_removal_timer_callback_pending_) {
407     grpc_timer_cancel(&delayed_removal_timer_);
408   }
409   shutdown_ = true;
410   Unref();
411 }
412 
413 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)414 XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
415     const grpc_channel_args* args) {
416   LoadBalancingPolicy::Args lb_policy_args;
417   lb_policy_args.work_serializer =
418       xds_cluster_manager_policy_->work_serializer();
419   lb_policy_args.args = args;
420   lb_policy_args.channel_control_helper =
421       absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
422   OrphanablePtr<LoadBalancingPolicy> lb_policy =
423       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
424                                          &grpc_xds_cluster_manager_lb_trace);
425   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
426     gpr_log(GPR_INFO,
427             "[xds_cluster_manager_lb %p] ClusterChild %p %s: Created "
428             "new child "
429             "policy handler %p",
430             xds_cluster_manager_policy_.get(), this, name_.c_str(),
431             lb_policy.get());
432   }
433   // Add the xDS's interested_parties pollset_set to that of the newly created
434   // child policy. This will make the child policy progress upon activity on
435   // xDS LB, which in turn is tied to the application's call.
436   grpc_pollset_set_add_pollset_set(
437       lb_policy->interested_parties(),
438       xds_cluster_manager_policy_->interested_parties());
439   return lb_policy;
440 }
441 
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,const ServerAddressList & addresses,const grpc_channel_args * args)442 void XdsClusterManagerLb::ClusterChild::UpdateLocked(
443     RefCountedPtr<LoadBalancingPolicy::Config> config,
444     const ServerAddressList& addresses, const grpc_channel_args* args) {
445   if (xds_cluster_manager_policy_->shutting_down_) return;
446   // Update child weight.
447   // Reactivate if needed.
448   if (delayed_removal_timer_callback_pending_) {
449     delayed_removal_timer_callback_pending_ = false;
450     grpc_timer_cancel(&delayed_removal_timer_);
451   }
452   // Create child policy if needed.
453   if (child_policy_ == nullptr) {
454     child_policy_ = CreateChildPolicyLocked(args);
455   }
456   // Construct update args.
457   UpdateArgs update_args;
458   update_args.config = std::move(config);
459   update_args.addresses = addresses;
460   update_args.args = grpc_channel_args_copy(args);
461   // Update the policy.
462   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
463     gpr_log(GPR_INFO,
464             "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
465             "Updating child "
466             "policy handler %p",
467             xds_cluster_manager_policy_.get(), this, name_.c_str(),
468             child_policy_.get());
469   }
470   child_policy_->UpdateLocked(std::move(update_args));
471 }
472 
ExitIdleLocked()473 void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
474   child_policy_->ExitIdleLocked();
475 }
476 
ResetBackoffLocked()477 void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
478   child_policy_->ResetBackoffLocked();
479 }
480 
DeactivateLocked()481 void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
482   // If already deactivated, don't do that again.
483   if (delayed_removal_timer_callback_pending_) return;
484   // Set the child weight to 0 so that future picker won't contain this child.
485   // Start a timer to delete the child.
486   Ref(DEBUG_LOCATION, "ClusterChild+timer").release();
487   grpc_timer_init(&delayed_removal_timer_,
488                   ExecCtx::Get()->Now() +
489                       GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS,
490                   &on_delayed_removal_timer_);
491   delayed_removal_timer_callback_pending_ = true;
492 }
493 
OnDelayedRemovalTimer(void * arg,grpc_error_handle error)494 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer(
495     void* arg, grpc_error_handle error) {
496   ClusterChild* self = static_cast<ClusterChild*>(arg);
497   (void)GRPC_ERROR_REF(error);  // Ref owned by the lambda
498   self->xds_cluster_manager_policy_->work_serializer()->Run(
499       [self, error]() { self->OnDelayedRemovalTimerLocked(error); },
500       DEBUG_LOCATION);
501 }
502 
OnDelayedRemovalTimerLocked(grpc_error_handle error)503 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked(
504     grpc_error_handle error) {
505   delayed_removal_timer_callback_pending_ = false;
506   if (error == GRPC_ERROR_NONE && !shutdown_) {
507     xds_cluster_manager_policy_->children_.erase(name_);
508   }
509   Unref(DEBUG_LOCATION, "ClusterChild+timer");
510   GRPC_ERROR_UNREF(error);
511 }
512 
513 //
514 // XdsClusterManagerLb::ClusterChild::Helper
515 //
516 
517 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)518 XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel(
519     ServerAddress address, const grpc_channel_args& args) {
520   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
521     return nullptr;
522   }
523   return xds_cluster_manager_child_->xds_cluster_manager_policy_
524       ->channel_control_helper()
525       ->CreateSubchannel(std::move(address), args);
526 }
527 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)528 void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
529     grpc_connectivity_state state, const absl::Status& status,
530     std::unique_ptr<SubchannelPicker> picker) {
531   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
532     gpr_log(
533         GPR_INFO,
534         "[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) "
535         "picker=%p",
536         xds_cluster_manager_child_->xds_cluster_manager_policy_.get(),
537         xds_cluster_manager_child_->name_.c_str(), ConnectivityStateName(state),
538         status.ToString().c_str(), picker.get());
539   }
540   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
541     return;
542   }
543   // Cache the picker in the ClusterChild.
544   xds_cluster_manager_child_->picker_wrapper_ =
545       MakeRefCounted<ChildPickerWrapper>(xds_cluster_manager_child_->name_,
546                                          std::move(picker));
547   // Decide what state to report for aggregation purposes.
548   // If we haven't seen a failure since the last time we were in state
549   // READY, then we report the state change as-is.  However, once we do see
550   // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
551   // changes until we go back into state READY.
552   if (!xds_cluster_manager_child_->seen_failure_since_ready_) {
553     if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
554       xds_cluster_manager_child_->seen_failure_since_ready_ = true;
555     }
556   } else {
557     if (state != GRPC_CHANNEL_READY) return;
558     xds_cluster_manager_child_->seen_failure_since_ready_ = false;
559   }
560   xds_cluster_manager_child_->connectivity_state_ = state;
561   // Notify the LB policy.
562   xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
563 }
564 
RequestReresolution()565 void XdsClusterManagerLb::ClusterChild::Helper::RequestReresolution() {
566   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
567     return;
568   }
569   xds_cluster_manager_child_->xds_cluster_manager_policy_
570       ->channel_control_helper()
571       ->RequestReresolution();
572 }
573 
GetAuthority()574 absl::string_view XdsClusterManagerLb::ClusterChild::Helper::GetAuthority() {
575   return xds_cluster_manager_child_->xds_cluster_manager_policy_
576       ->channel_control_helper()
577       ->GetAuthority();
578 }
579 
AddTraceEvent(TraceSeverity severity,absl::string_view message)580 void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent(
581     TraceSeverity severity, absl::string_view message) {
582   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
583     return;
584   }
585   xds_cluster_manager_child_->xds_cluster_manager_policy_
586       ->channel_control_helper()
587       ->AddTraceEvent(severity, message);
588 }
589 
590 //
591 // factory
592 //
593 
594 class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
595  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const596   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
597       LoadBalancingPolicy::Args args) const override {
598     return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
599   }
600 
name() const601   const char* name() const override { return kXdsClusterManager; }
602 
ParseLoadBalancingConfig(const Json & json,grpc_error_handle * error) const603   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
604       const Json& json, grpc_error_handle* error) const override {
605     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
606     if (json.type() == Json::Type::JSON_NULL) {
607       // xds_cluster_manager was mentioned as a policy in the deprecated
608       // loadBalancingPolicy field or in the client API.
609       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
610           "field:loadBalancingPolicy error:xds_cluster_manager policy requires "
611           "configuration.  Please use loadBalancingConfig field of service "
612           "config instead.");
613       return nullptr;
614     }
615     std::vector<grpc_error_handle> error_list;
616     XdsClusterManagerLbConfig::ClusterMap cluster_map;
617     std::set<std::string /*cluster_name*/> clusters_to_be_used;
618     auto it = json.object_value().find("children");
619     if (it == json.object_value().end()) {
620       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
621           "field:children error:required field not present"));
622     } else if (it->second.type() != Json::Type::OBJECT) {
623       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
624           "field:children error:type should be object"));
625     } else {
626       for (const auto& p : it->second.object_value()) {
627         const std::string& child_name = p.first;
628         if (child_name.empty()) {
629           error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
630               "field:children element error: name cannot be empty"));
631           continue;
632         }
633         RefCountedPtr<LoadBalancingPolicy::Config> child_config;
634         std::vector<grpc_error_handle> child_errors =
635             ParseChildConfig(p.second, &child_config);
636         if (!child_errors.empty()) {
637           error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR_AND_CPP_STRING(
638               absl::StrCat("field:children name:", child_name), &child_errors));
639         } else {
640           cluster_map[child_name] = std::move(child_config);
641           clusters_to_be_used.insert(child_name);
642         }
643       }
644     }
645     if (cluster_map.empty()) {
646       error_list.push_back(
647           GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid children configured"));
648     }
649     if (!error_list.empty()) {
650       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
651           "xds_cluster_manager_experimental LB policy config", &error_list);
652       return nullptr;
653     }
654     return MakeRefCounted<XdsClusterManagerLbConfig>(std::move(cluster_map));
655   }
656 
657  private:
ParseChildConfig(const Json & json,RefCountedPtr<LoadBalancingPolicy::Config> * child_config)658   static std::vector<grpc_error_handle> ParseChildConfig(
659       const Json& json,
660       RefCountedPtr<LoadBalancingPolicy::Config>* child_config) {
661     std::vector<grpc_error_handle> error_list;
662     if (json.type() != Json::Type::OBJECT) {
663       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
664           "value should be of type object"));
665       return error_list;
666     }
667     auto it = json.object_value().find("childPolicy");
668     if (it == json.object_value().end()) {
669       error_list.push_back(
670           GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
671     } else {
672       grpc_error_handle parse_error = GRPC_ERROR_NONE;
673       *child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
674           it->second, &parse_error);
675       if (*child_config == nullptr) {
676         GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
677         std::vector<grpc_error_handle> child_errors;
678         child_errors.push_back(parse_error);
679         error_list.push_back(
680             GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
681       }
682     }
683     return error_list;
684   }
685 };
686 
687 }  // namespace
688 
689 }  // namespace grpc_core
690 
691 //
692 // Plugin registration
693 //
694 
grpc_lb_policy_xds_cluster_manager_init()695 void grpc_lb_policy_xds_cluster_manager_init() {
696   grpc_core::LoadBalancingPolicyRegistry::Builder::
697       RegisterLoadBalancingPolicyFactory(
698           absl::make_unique<grpc_core::XdsClusterManagerLbFactory>());
699 }
700 
grpc_lb_policy_xds_cluster_manager_shutdown()701 void grpc_lb_policy_xds_cluster_manager_shutdown() {}
702