1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "absl/strings/string_view.h"
20 
21 #include <grpc/grpc.h>
22 
23 #include "src/core/ext/filters/client_channel/lb_policy.h"
24 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
25 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
26 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
27 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/ext/xds/xds_client_stats.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/gpr/env.h"
32 #include "src/core/lib/gpr/string.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/gprpp/sync.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
37 
38 namespace grpc_core {
39 
40 TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
41 
42 namespace {
43 
44 //
45 // global circuit breaker atomic map
46 //
47 
48 class CircuitBreakerCallCounterMap {
49  public:
50   using Key =
51       std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
52 
53   class CallCounter : public RefCounted<CallCounter> {
54    public:
CallCounter(Key key)55     explicit CallCounter(Key key) : key_(std::move(key)) {}
56     ~CallCounter() override;
57 
Increment()58     uint32_t Increment() { return concurrent_requests_.FetchAdd(1); }
Decrement()59     void Decrement() { concurrent_requests_.FetchSub(1); }
60 
61    private:
62     Key key_;
63     Atomic<uint32_t> concurrent_requests_{0};
64   };
65 
66   RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
67                                          const std::string& eds_service_name);
68 
69  private:
70   Mutex mu_;
71   std::map<Key, CallCounter*> map_;
72 };
73 
74 CircuitBreakerCallCounterMap* g_call_counter_map = nullptr;
75 
76 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
GetOrCreate(const std::string & cluster,const std::string & eds_service_name)77 CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
78                                           const std::string& eds_service_name) {
79   Key key(cluster, eds_service_name);
80   RefCountedPtr<CallCounter> result;
81   MutexLock lock(&mu_);
82   auto it = map_.find(key);
83   if (it == map_.end()) {
84     it = map_.insert({key, nullptr}).first;
85   } else {
86     result = it->second->RefIfNonZero();
87   }
88   if (result == nullptr) {
89     result = MakeRefCounted<CallCounter>(std::move(key));
90     it->second = result.get();
91   }
92   return result;
93 }
94 
~CallCounter()95 CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
96   MutexLock lock(&g_call_counter_map->mu_);
97   auto it = g_call_counter_map->map_.find(key_);
98   if (it != g_call_counter_map->map_.end() && it->second == this) {
99     g_call_counter_map->map_.erase(it);
100   }
101 }
102 
103 //
104 // LB policy
105 //
106 
107 constexpr char kXdsClusterImpl[] = "xds_cluster_impl_experimental";
108 
109 // TODO (donnadionne): Check to see if circuit breaking is enabled, this will be
110 // removed once circuit breaking feature is fully integrated and enabled by
111 // default.
XdsCircuitBreakingEnabled()112 bool XdsCircuitBreakingEnabled() {
113   char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING");
114   bool parsed_value;
115   bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
116   gpr_free(value);
117   return parse_succeeded && parsed_value;
118 }
119 
120 // Config for xDS Cluster Impl LB policy.
121 class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
122  public:
XdsClusterImplLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,std::string cluster_name,std::string eds_service_name,absl::optional<std::string> lrs_load_reporting_server_name,uint32_t max_concurrent_requests,RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config)123   XdsClusterImplLbConfig(
124       RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
125       std::string cluster_name, std::string eds_service_name,
126       absl::optional<std::string> lrs_load_reporting_server_name,
127       uint32_t max_concurrent_requests,
128       RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config)
129       : child_policy_(std::move(child_policy)),
130         cluster_name_(std::move(cluster_name)),
131         eds_service_name_(std::move(eds_service_name)),
132         lrs_load_reporting_server_name_(
133             std::move(lrs_load_reporting_server_name)),
134         max_concurrent_requests_(max_concurrent_requests),
135         drop_config_(std::move(drop_config)) {}
136 
name() const137   const char* name() const override { return kXdsClusterImpl; }
138 
child_policy() const139   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
140     return child_policy_;
141   }
cluster_name() const142   const std::string& cluster_name() const { return cluster_name_; }
eds_service_name() const143   const std::string& eds_service_name() const { return eds_service_name_; }
lrs_load_reporting_server_name() const144   const absl::optional<std::string>& lrs_load_reporting_server_name() const {
145     return lrs_load_reporting_server_name_;
146   };
max_concurrent_requests() const147   const uint32_t max_concurrent_requests() const {
148     return max_concurrent_requests_;
149   }
drop_config() const150   RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config() const {
151     return drop_config_;
152   }
153 
154  private:
155   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
156   std::string cluster_name_;
157   std::string eds_service_name_;
158   absl::optional<std::string> lrs_load_reporting_server_name_;
159   uint32_t max_concurrent_requests_;
160   RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
161 };
162 
163 // xDS Cluster Impl LB policy.
164 class XdsClusterImplLb : public LoadBalancingPolicy {
165  public:
166   XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client, Args args);
167 
name() const168   const char* name() const override { return kXdsClusterImpl; }
169 
170   void UpdateLocked(UpdateArgs args) override;
171   void ExitIdleLocked() override;
172   void ResetBackoffLocked() override;
173 
174  private:
175   class StatsSubchannelWrapper : public DelegatingSubchannel {
176    public:
StatsSubchannelWrapper(RefCountedPtr<SubchannelInterface> wrapped_subchannel,RefCountedPtr<XdsClusterLocalityStats> locality_stats)177     StatsSubchannelWrapper(
178         RefCountedPtr<SubchannelInterface> wrapped_subchannel,
179         RefCountedPtr<XdsClusterLocalityStats> locality_stats)
180         : DelegatingSubchannel(std::move(wrapped_subchannel)),
181           locality_stats_(std::move(locality_stats)) {}
182 
locality_stats() const183     XdsClusterLocalityStats* locality_stats() const {
184       return locality_stats_.get();
185     }
186 
187    private:
188     RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
189   };
190 
191   // A simple wrapper for ref-counting a picker from the child policy.
192   class RefCountedPicker : public RefCounted<RefCountedPicker> {
193    public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)194     explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
195         : picker_(std::move(picker)) {}
Pick(PickArgs args)196     PickResult Pick(PickArgs args) { return picker_->Pick(args); }
197 
198    private:
199     std::unique_ptr<SubchannelPicker> picker_;
200   };
201 
202   // A picker that wraps the picker from the child to perform drops.
203   class Picker : public SubchannelPicker {
204    public:
205     Picker(XdsClusterImplLb* xds_cluster_impl_lb,
206            RefCountedPtr<RefCountedPicker> picker);
207 
208     PickResult Pick(PickArgs args) override;
209 
210    private:
211     RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
212     bool xds_circuit_breaking_enabled_;
213     uint32_t max_concurrent_requests_;
214     RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
215     RefCountedPtr<XdsClusterDropStats> drop_stats_;
216     RefCountedPtr<RefCountedPicker> picker_;
217   };
218 
219   class Helper : public ChannelControlHelper {
220    public:
Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)221     explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
222         : xds_cluster_impl_policy_(std::move(xds_cluster_impl_policy)) {}
223 
~Helper()224     ~Helper() override {
225       xds_cluster_impl_policy_.reset(DEBUG_LOCATION, "Helper");
226     }
227 
228     RefCountedPtr<SubchannelInterface> CreateSubchannel(
229         ServerAddress address, const grpc_channel_args& args) override;
230     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
231                      std::unique_ptr<SubchannelPicker> picker) override;
232     void RequestReresolution() override;
233     void AddTraceEvent(TraceSeverity severity,
234                        absl::string_view message) override;
235 
236    private:
237     RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy_;
238   };
239 
240   ~XdsClusterImplLb() override;
241 
242   void ShutdownLocked() override;
243 
244   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
245       const grpc_channel_args* args);
246   void UpdateChildPolicyLocked(ServerAddressList addresses,
247                                const grpc_channel_args* args);
248 
249   void MaybeUpdatePickerLocked();
250 
251   // Current config from the resolver.
252   RefCountedPtr<XdsClusterImplLbConfig> config_;
253 
254   // Current concurrent number of requests.
255   RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
256 
257   // Internal state.
258   bool shutting_down_ = false;
259 
260   // The xds client.
261   RefCountedPtr<XdsClient> xds_client_;
262 
263   // The stats for client-side load reporting.
264   RefCountedPtr<XdsClusterDropStats> drop_stats_;
265 
266   OrphanablePtr<LoadBalancingPolicy> child_policy_;
267 
268   // Latest state and picker reported by the child policy.
269   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
270   absl::Status status_;
271   RefCountedPtr<RefCountedPicker> picker_;
272 };
273 
274 //
275 // XdsClusterImplLb::Picker
276 //
277 
Picker(XdsClusterImplLb * xds_cluster_impl_lb,RefCountedPtr<RefCountedPicker> picker)278 XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
279                                  RefCountedPtr<RefCountedPicker> picker)
280     : call_counter_(xds_cluster_impl_lb->call_counter_),
281       xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()),
282       max_concurrent_requests_(
283           xds_cluster_impl_lb->config_->max_concurrent_requests()),
284       drop_config_(xds_cluster_impl_lb->config_->drop_config()),
285       drop_stats_(xds_cluster_impl_lb->drop_stats_),
286       picker_(std::move(picker)) {
287   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
288     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p",
289             xds_cluster_impl_lb, this);
290   }
291 }
292 
Pick(LoadBalancingPolicy::PickArgs args)293 LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
294     LoadBalancingPolicy::PickArgs args) {
295   // Handle EDS drops.
296   const std::string* drop_category;
297   if (drop_config_->ShouldDrop(&drop_category)) {
298     if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
299     PickResult result;
300     result.type = PickResult::PICK_COMPLETE;
301     return result;
302   }
303   // Handle circuit breaking.
304   uint32_t current = call_counter_->Increment();
305   if (xds_circuit_breaking_enabled_) {
306     // Check and see if we exceeded the max concurrent requests count.
307     if (current >= max_concurrent_requests_) {
308       call_counter_->Decrement();
309       if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
310       PickResult result;
311       result.type = PickResult::PICK_COMPLETE;
312       return result;
313     }
314   }
315   // If we're not dropping the call, we should always have a child picker.
316   if (picker_ == nullptr) {  // Should never happen.
317     PickResult result;
318     result.type = PickResult::PICK_FAILED;
319     result.error = grpc_error_set_int(
320         GRPC_ERROR_CREATE_FROM_STATIC_STRING(
321             "xds_cluster_impl picker not given any child picker"),
322         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
323     call_counter_->Decrement();
324     return result;
325   }
326   // Not dropping, so delegate to child picker.
327   PickResult result = picker_->Pick(args);
328   if (result.type == result.PICK_COMPLETE && result.subchannel != nullptr) {
329     XdsClusterLocalityStats* locality_stats = nullptr;
330     if (drop_stats_ != nullptr) {  // If load reporting is enabled.
331       auto* subchannel_wrapper =
332           static_cast<StatsSubchannelWrapper*>(result.subchannel.get());
333       // Handle load reporting.
334       locality_stats = subchannel_wrapper->locality_stats()->Ref().release();
335       // Record a call started.
336       locality_stats->AddCallStarted();
337       // Unwrap subchannel to pass back up the stack.
338       result.subchannel = subchannel_wrapper->wrapped_subchannel();
339     }
340     // Intercept the recv_trailing_metadata op to record call completion.
341     auto* call_counter = call_counter_->Ref(DEBUG_LOCATION, "call").release();
342     auto original_recv_trailing_metadata_ready =
343         result.recv_trailing_metadata_ready;
344     result.recv_trailing_metadata_ready =
345         // Note: This callback does not run in either the control plane
346         // work serializer or in the data plane mutex.
347         [locality_stats, original_recv_trailing_metadata_ready, call_counter](
348             grpc_error* error, MetadataInterface* metadata,
349             CallState* call_state) {
350           // Record call completion for load reporting.
351           if (locality_stats != nullptr) {
352             const bool call_failed = error != GRPC_ERROR_NONE;
353             locality_stats->AddCallFinished(call_failed);
354             locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
355           }
356           // Decrement number of calls in flight.
357           call_counter->Decrement();
358           call_counter->Unref(DEBUG_LOCATION, "call");
359           // Invoke the original recv_trailing_metadata_ready callback, if any.
360           if (original_recv_trailing_metadata_ready != nullptr) {
361             original_recv_trailing_metadata_ready(error, metadata, call_state);
362           }
363         };
364   } else {
365     // TODO(roth): We should ideally also record call failures here in the case
366     // where a pick fails.  This is challenging, because we don't know which
367     // picks are for wait_for_ready RPCs or how many times we'll return a
368     // failure for the same wait_for_ready RPC.
369     call_counter_->Decrement();
370   }
371   return result;
372 }
373 
374 //
375 // XdsClusterImplLb
376 //
377 
XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,Args args)378 XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,
379                                    Args args)
380     : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
381   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
382     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] created -- using xds client %p",
383             this, xds_client_.get());
384   }
385 }
386 
~XdsClusterImplLb()387 XdsClusterImplLb::~XdsClusterImplLb() {
388   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
389     gpr_log(GPR_INFO,
390             "[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy",
391             this);
392   }
393 }
394 
ShutdownLocked()395 void XdsClusterImplLb::ShutdownLocked() {
396   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
397     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this);
398   }
399   shutting_down_ = true;
400   // Remove the child policy's interested_parties pollset_set from the
401   // xDS policy.
402   if (child_policy_ != nullptr) {
403     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
404                                      interested_parties());
405     child_policy_.reset();
406   }
407   // Drop our ref to the child's picker, in case it's holding a ref to
408   // the child.
409   picker_.reset();
410   drop_stats_.reset();
411   xds_client_.reset();
412 }
413 
ExitIdleLocked()414 void XdsClusterImplLb::ExitIdleLocked() {
415   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
416 }
417 
ResetBackoffLocked()418 void XdsClusterImplLb::ResetBackoffLocked() {
419   // The XdsClient will have its backoff reset by the xds resolver, so we
420   // don't need to do it here.
421   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
422 }
423 
UpdateLocked(UpdateArgs args)424 void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
425   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
426     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
427   }
428   // Update config.
429   const bool is_initial_update = config_ == nullptr;
430   auto old_config = std::move(config_);
431   config_ = std::move(args.config);
432   // On initial update, create drop stats.
433   if (is_initial_update) {
434     if (config_->lrs_load_reporting_server_name().has_value()) {
435       drop_stats_ = xds_client_->AddClusterDropStats(
436           config_->lrs_load_reporting_server_name().value(),
437           config_->cluster_name(), config_->eds_service_name());
438     }
439     call_counter_ = g_call_counter_map->GetOrCreate(
440         config_->cluster_name(), config_->eds_service_name());
441   } else {
442     // Cluster name, EDS service name, and LRS server name should never
443     // change, because the EDS policy above us should be swapped out if
444     // that happens.
445     GPR_ASSERT(config_->cluster_name() == old_config->cluster_name());
446     GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name());
447     GPR_ASSERT(config_->lrs_load_reporting_server_name() ==
448                old_config->lrs_load_reporting_server_name());
449   }
450   // Update picker if max_concurrent_requests has changed.
451   if (is_initial_update || config_->max_concurrent_requests() !=
452                                old_config->max_concurrent_requests()) {
453     MaybeUpdatePickerLocked();
454   }
455   // Update child policy.
456   UpdateChildPolicyLocked(std::move(args.addresses), args.args);
457   args.args = nullptr;
458 }
459 
MaybeUpdatePickerLocked()460 void XdsClusterImplLb::MaybeUpdatePickerLocked() {
461   // If we're dropping all calls, report READY, regardless of what (or
462   // whether) the child has reported.
463   if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
464     auto drop_picker = absl::make_unique<Picker>(this, picker_);
465     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
466       gpr_log(GPR_INFO,
467               "[xds_cluster_impl_lb %p] updating connectivity (drop all): "
468               "state=READY "
469               "picker=%p",
470               this, drop_picker.get());
471     }
472     channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
473                                           std::move(drop_picker));
474     return;
475   }
476   // Otherwise, update only if we have a child picker.
477   if (picker_ != nullptr) {
478     auto drop_picker = absl::make_unique<Picker>(this, picker_);
479     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
480       gpr_log(GPR_INFO,
481               "[xds_cluster_impl_lb %p] updating connectivity: state=%s "
482               "status=(%s) "
483               "picker=%p",
484               this, ConnectivityStateName(state_), status_.ToString().c_str(),
485               drop_picker.get());
486     }
487     channel_control_helper()->UpdateState(state_, status_,
488                                           std::move(drop_picker));
489   }
490 }
491 
CreateChildPolicyLocked(const grpc_channel_args * args)492 OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
493     const grpc_channel_args* args) {
494   LoadBalancingPolicy::Args lb_policy_args;
495   lb_policy_args.work_serializer = work_serializer();
496   lb_policy_args.args = args;
497   lb_policy_args.channel_control_helper =
498       absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
499   OrphanablePtr<LoadBalancingPolicy> lb_policy =
500       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
501                                          &grpc_xds_cluster_impl_lb_trace);
502   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
503     gpr_log(GPR_INFO,
504             "[xds_cluster_impl_lb %p] Created new child policy handler %p",
505             this, lb_policy.get());
506   }
507   // Add our interested_parties pollset_set to that of the newly created
508   // child policy. This will make the child policy progress upon activity on
509   // this policy, which in turn is tied to the application's call.
510   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
511                                    interested_parties());
512   return lb_policy;
513 }
514 
UpdateChildPolicyLocked(ServerAddressList addresses,const grpc_channel_args * args)515 void XdsClusterImplLb::UpdateChildPolicyLocked(ServerAddressList addresses,
516                                                const grpc_channel_args* args) {
517   // Create policy if needed.
518   if (child_policy_ == nullptr) {
519     child_policy_ = CreateChildPolicyLocked(args);
520   }
521   // Construct update args.
522   UpdateArgs update_args;
523   update_args.addresses = std::move(addresses);
524   update_args.config = config_->child_policy();
525   update_args.args = args;
526   // Update the policy.
527   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
528     gpr_log(GPR_INFO,
529             "[xds_cluster_impl_lb %p] Updating child policy handler %p", this,
530             child_policy_.get());
531   }
532   child_policy_->UpdateLocked(std::move(update_args));
533 }
534 
535 //
536 // XdsClusterImplLb::Helper
537 //
538 
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)539 RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
540     ServerAddress address, const grpc_channel_args& args) {
541   if (xds_cluster_impl_policy_->shutting_down_) return nullptr;
542   // If load reporting is enabled, wrap the subchannel such that it
543   // includes the locality stats object, which will be used by the EdsPicker.
544   if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server_name()
545           .has_value()) {
546     RefCountedPtr<XdsLocalityName> locality_name;
547     auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey);
548     if (attribute != nullptr) {
549       const auto* locality_attr =
550           static_cast<const XdsLocalityAttribute*>(attribute);
551       locality_name = locality_attr->locality_name();
552     }
553     RefCountedPtr<XdsClusterLocalityStats> locality_stats =
554         xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats(
555             *xds_cluster_impl_policy_->config_
556                  ->lrs_load_reporting_server_name(),
557             xds_cluster_impl_policy_->config_->cluster_name(),
558             xds_cluster_impl_policy_->config_->eds_service_name(),
559             std::move(locality_name));
560     return MakeRefCounted<StatsSubchannelWrapper>(
561         xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
562             std::move(address), args),
563         std::move(locality_stats));
564   }
565   // Load reporting not enabled, so don't wrap the subchannel.
566   return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
567       std::move(address), args);
568 }
569 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)570 void XdsClusterImplLb::Helper::UpdateState(
571     grpc_connectivity_state state, const absl::Status& status,
572     std::unique_ptr<SubchannelPicker> picker) {
573   if (xds_cluster_impl_policy_->shutting_down_) return;
574   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
575     gpr_log(GPR_INFO,
576             "[xds_cluster_impl_lb %p] child connectivity state update: "
577             "state=%s (%s) "
578             "picker=%p",
579             xds_cluster_impl_policy_.get(), ConnectivityStateName(state),
580             status.ToString().c_str(), picker.get());
581   }
582   // Save the state and picker.
583   xds_cluster_impl_policy_->state_ = state;
584   xds_cluster_impl_policy_->status_ = status;
585   xds_cluster_impl_policy_->picker_ =
586       MakeRefCounted<RefCountedPicker>(std::move(picker));
587   // Wrap the picker and return it to the channel.
588   xds_cluster_impl_policy_->MaybeUpdatePickerLocked();
589 }
590 
RequestReresolution()591 void XdsClusterImplLb::Helper::RequestReresolution() {
592   if (xds_cluster_impl_policy_->shutting_down_) return;
593   xds_cluster_impl_policy_->channel_control_helper()->RequestReresolution();
594 }
595 
AddTraceEvent(TraceSeverity severity,absl::string_view message)596 void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity,
597                                              absl::string_view message) {
598   if (xds_cluster_impl_policy_->shutting_down_) return;
599   xds_cluster_impl_policy_->channel_control_helper()->AddTraceEvent(severity,
600                                                                     message);
601 }
602 
603 //
604 // factory
605 //
606 
607 class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
608  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const609   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
610       LoadBalancingPolicy::Args args) const override {
611     grpc_error* error = GRPC_ERROR_NONE;
612     RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
613     if (error != GRPC_ERROR_NONE) {
614       gpr_log(
615           GPR_ERROR,
616           "cannot get XdsClient to instantiate xds_cluster_impl LB policy: %s",
617           grpc_error_string(error));
618       GRPC_ERROR_UNREF(error);
619       return nullptr;
620     }
621     return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
622                                             std::move(args));
623   }
624 
name() const625   const char* name() const override { return kXdsClusterImpl; }
626 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const627   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
628       const Json& json, grpc_error** error) const override {
629     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
630     if (json.type() == Json::Type::JSON_NULL) {
631       // This policy was configured in the deprecated loadBalancingPolicy
632       // field or in the client API.
633       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
634           "field:loadBalancingPolicy error:xds_cluster_impl policy requires "
635           "configuration. Please use loadBalancingConfig field of service "
636           "config instead.");
637       return nullptr;
638     }
639     std::vector<grpc_error*> error_list;
640     // Child policy.
641     RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
642     auto it = json.object_value().find("childPolicy");
643     if (it == json.object_value().end()) {
644       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
645           "field:childPolicy error:required field missing"));
646     } else {
647       grpc_error* parse_error = GRPC_ERROR_NONE;
648       child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
649           it->second, &parse_error);
650       if (child_policy == nullptr) {
651         GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
652         std::vector<grpc_error*> child_errors;
653         child_errors.push_back(parse_error);
654         error_list.push_back(
655             GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
656       }
657     }
658     // Cluster name.
659     std::string cluster_name;
660     it = json.object_value().find("clusterName");
661     if (it == json.object_value().end()) {
662       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
663           "field:clusterName error:required field missing"));
664     } else if (it->second.type() != Json::Type::STRING) {
665       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
666           "field:clusterName error:type should be string"));
667     } else {
668       cluster_name = it->second.string_value();
669     }
670     // EDS service name.
671     std::string eds_service_name;
672     it = json.object_value().find("edsServiceName");
673     if (it != json.object_value().end()) {
674       if (it->second.type() != Json::Type::STRING) {
675         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
676             "field:edsServiceName error:type should be string"));
677       } else {
678         eds_service_name = it->second.string_value();
679       }
680     }
681     // LRS load reporting server name.
682     absl::optional<std::string> lrs_load_reporting_server_name;
683     it = json.object_value().find("lrsLoadReportingServerName");
684     if (it != json.object_value().end()) {
685       if (it->second.type() != Json::Type::STRING) {
686         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
687             "field:lrsLoadReportingServerName error:type should be string"));
688       } else {
689         lrs_load_reporting_server_name = it->second.string_value();
690       }
691     }
692     // Max concurrent requests.
693     uint32_t max_concurrent_requests = 1024;
694     it = json.object_value().find("maxConcurrentRequests");
695     if (it != json.object_value().end()) {
696       if (it->second.type() != Json::Type::NUMBER) {
697         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
698             "field:max_concurrent_requests error:must be of type number"));
699       } else {
700         max_concurrent_requests =
701             gpr_parse_nonnegative_int(it->second.string_value().c_str());
702       }
703     }
704     // Drop config.
705     auto drop_config = MakeRefCounted<XdsApi::EdsUpdate::DropConfig>();
706     it = json.object_value().find("dropCategories");
707     if (it == json.object_value().end()) {
708       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
709           "field:dropCategories error:required field missing"));
710     } else {
711       std::vector<grpc_error*> child_errors =
712           ParseDropCategories(it->second, drop_config.get());
713       if (!child_errors.empty()) {
714         error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
715             "field:dropCategories", &child_errors));
716       }
717     }
718     if (!error_list.empty()) {
719       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
720           "xds_cluster_impl_experimental LB policy config", &error_list);
721       return nullptr;
722     }
723     return MakeRefCounted<XdsClusterImplLbConfig>(
724         std::move(child_policy), std::move(cluster_name),
725         std::move(eds_service_name), std::move(lrs_load_reporting_server_name),
726         max_concurrent_requests, std::move(drop_config));
727   }
728 
729  private:
ParseDropCategories(const Json & json,XdsApi::EdsUpdate::DropConfig * drop_config)730   static std::vector<grpc_error*> ParseDropCategories(
731       const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) {
732     std::vector<grpc_error*> error_list;
733     if (json.type() != Json::Type::ARRAY) {
734       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
735           "dropCategories field is not an array"));
736       return error_list;
737     }
738     for (size_t i = 0; i < json.array_value().size(); ++i) {
739       const Json& entry = json.array_value()[i];
740       std::vector<grpc_error*> child_errors =
741           ParseDropCategory(entry, drop_config);
742       if (!child_errors.empty()) {
743         grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
744             absl::StrCat("errors parsing index ", i).c_str());
745         for (size_t i = 0; i < child_errors.size(); ++i) {
746           error = grpc_error_add_child(error, child_errors[i]);
747         }
748         error_list.push_back(error);
749       }
750     }
751     return error_list;
752   }
753 
ParseDropCategory(const Json & json,XdsApi::EdsUpdate::DropConfig * drop_config)754   static std::vector<grpc_error*> ParseDropCategory(
755       const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) {
756     std::vector<grpc_error*> error_list;
757     if (json.type() != Json::Type::OBJECT) {
758       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
759           "dropCategories entry is not an object"));
760       return error_list;
761     }
762     std::string category;
763     auto it = json.object_value().find("category");
764     if (it == json.object_value().end()) {
765       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
766           "\"category\" field not present"));
767     } else if (it->second.type() != Json::Type::STRING) {
768       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
769           "\"category\" field is not a string"));
770     } else {
771       category = it->second.string_value();
772     }
773     uint32_t requests_per_million = 0;
774     it = json.object_value().find("requests_per_million");
775     if (it == json.object_value().end()) {
776       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
777           "\"requests_per_million\" field is not present"));
778     } else if (it->second.type() != Json::Type::NUMBER) {
779       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
780           "\"requests_per_million\" field is not a number"));
781     } else {
782       requests_per_million =
783           gpr_parse_nonnegative_int(it->second.string_value().c_str());
784     }
785     if (error_list.empty()) {
786       drop_config->AddCategory(std::move(category), requests_per_million);
787     }
788     return error_list;
789   }
790 };
791 
792 }  // namespace
793 
794 }  // namespace grpc_core
795 
796 //
797 // Plugin registration
798 //
799 
grpc_lb_policy_xds_cluster_impl_init()800 void grpc_lb_policy_xds_cluster_impl_init() {
801   grpc_core::g_call_counter_map = new grpc_core::CircuitBreakerCallCounterMap();
802   grpc_core::LoadBalancingPolicyRegistry::Builder::
803       RegisterLoadBalancingPolicyFactory(
804           absl::make_unique<grpc_core::XdsClusterImplLbFactory>());
805 }
806 
grpc_lb_policy_xds_cluster_impl_shutdown()807 void grpc_lb_policy_xds_cluster_impl_shutdown() {
808   delete grpc_core::g_call_counter_map;
809 }
810