1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <inttypes.h>
20 #include <limits.h>
21 
22 #include "absl/strings/str_cat.h"
23 #include "absl/types/optional.h"
24 
25 #include <grpc/grpc.h>
26 
27 #include "src/core/ext/filters/client_channel/client_channel.h"
28 #include "src/core/ext/filters/client_channel/lb_policy.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
32 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
33 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
34 #include "src/core/ext/filters/client_channel/server_address.h"
35 #include "src/core/ext/xds/xds_channel_args.h"
36 #include "src/core/ext/xds/xds_client.h"
37 #include "src/core/ext/xds/xds_client_stats.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/gpr/string.h"
40 #include "src/core/lib/gprpp/orphanable.h"
41 #include "src/core/lib/gprpp/ref_counted_ptr.h"
42 #include "src/core/lib/iomgr/work_serializer.h"
43 #include "src/core/lib/transport/error_utils.h"
44 #include "src/core/lib/uri/uri_parser.h"
45 
46 #define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000
47 
48 namespace grpc_core {
49 
50 TraceFlag grpc_lb_eds_trace(false, "eds_lb");
51 
52 const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
53 
54 namespace {
55 
56 constexpr char kEds[] = "eds_experimental";
57 
58 // Config for EDS LB policy.
59 class EdsLbConfig : public LoadBalancingPolicy::Config {
60  public:
EdsLbConfig(std::string cluster_name,std::string eds_service_name,absl::optional<std::string> lrs_load_reporting_server_name,Json locality_picking_policy,Json endpoint_picking_policy,uint32_t max_concurrent_requests)61   EdsLbConfig(std::string cluster_name, std::string eds_service_name,
62               absl::optional<std::string> lrs_load_reporting_server_name,
63               Json locality_picking_policy, Json endpoint_picking_policy,
64               uint32_t max_concurrent_requests)
65       : cluster_name_(std::move(cluster_name)),
66         eds_service_name_(std::move(eds_service_name)),
67         lrs_load_reporting_server_name_(
68             std::move(lrs_load_reporting_server_name)),
69         locality_picking_policy_(std::move(locality_picking_policy)),
70         endpoint_picking_policy_(std::move(endpoint_picking_policy)),
71         max_concurrent_requests_(max_concurrent_requests) {}
72 
name() const73   const char* name() const override { return kEds; }
74 
cluster_name() const75   const std::string& cluster_name() const { return cluster_name_; }
eds_service_name() const76   const std::string& eds_service_name() const { return eds_service_name_; }
lrs_load_reporting_server_name() const77   const absl::optional<std::string>& lrs_load_reporting_server_name() const {
78     return lrs_load_reporting_server_name_;
79   };
locality_picking_policy() const80   const Json& locality_picking_policy() const {
81     return locality_picking_policy_;
82   }
endpoint_picking_policy() const83   const Json& endpoint_picking_policy() const {
84     return endpoint_picking_policy_;
85   }
max_concurrent_requests() const86   const uint32_t max_concurrent_requests() const {
87     return max_concurrent_requests_;
88   }
89 
90  private:
91   std::string cluster_name_;
92   std::string eds_service_name_;
93   absl::optional<std::string> lrs_load_reporting_server_name_;
94   Json locality_picking_policy_;
95   Json endpoint_picking_policy_;
96   uint32_t max_concurrent_requests_;
97 };
98 
99 // EDS LB policy.
100 class EdsLb : public LoadBalancingPolicy {
101  public:
102   EdsLb(RefCountedPtr<XdsClient> xds_client, Args args);
103 
name() const104   const char* name() const override { return kEds; }
105 
106   void UpdateLocked(UpdateArgs args) override;
107   void ResetBackoffLocked() override;
108 
109  private:
110   class EndpointWatcher : public XdsClient::EndpointWatcherInterface {
111    public:
EndpointWatcher(RefCountedPtr<EdsLb> parent)112     explicit EndpointWatcher(RefCountedPtr<EdsLb> parent)
113         : parent_(std::move(parent)) {}
OnEndpointChanged(XdsApi::EdsUpdate update)114     void OnEndpointChanged(XdsApi::EdsUpdate update) override {
115       new Notifier(parent_, std::move(update));
116     }
OnError(grpc_error * error)117     void OnError(grpc_error* error) override { new Notifier(parent_, error); }
OnResourceDoesNotExist()118     void OnResourceDoesNotExist() override { new Notifier(parent_); }
119 
120    private:
121     class Notifier {
122      public:
123       Notifier(RefCountedPtr<EdsLb> parent, XdsApi::EdsUpdate update);
124       Notifier(RefCountedPtr<EdsLb> parent, grpc_error* error);
125       explicit Notifier(RefCountedPtr<EdsLb> parent);
126 
127      private:
128       enum Type { kUpdate, kError, kDoesNotExist };
129 
130       static void RunInExecCtx(void* arg, grpc_error* error);
131       void RunInWorkSerializer(grpc_error* error);
132 
133       RefCountedPtr<EdsLb> parent_;
134       grpc_closure closure_;
135       XdsApi::EdsUpdate update_;
136       Type type_;
137     };
138 
139     RefCountedPtr<EdsLb> parent_;
140   };
141 
142   class Helper : public ChannelControlHelper {
143    public:
Helper(RefCountedPtr<EdsLb> eds_policy)144     explicit Helper(RefCountedPtr<EdsLb> eds_policy)
145         : eds_policy_(std::move(eds_policy)) {}
146 
~Helper()147     ~Helper() override { eds_policy_.reset(DEBUG_LOCATION, "Helper"); }
148 
149     RefCountedPtr<SubchannelInterface> CreateSubchannel(
150         ServerAddress address, const grpc_channel_args& args) override;
151     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
152                      std::unique_ptr<SubchannelPicker> picker) override;
153     // This is a no-op, because we get the addresses from the xds
154     // client, which is a watch-based API.
RequestReresolution()155     void RequestReresolution() override {}
156     void AddTraceEvent(TraceSeverity severity,
157                        absl::string_view message) override;
158 
159    private:
160     RefCountedPtr<EdsLb> eds_policy_;
161   };
162 
163   ~EdsLb() override;
164 
165   void ShutdownLocked() override;
166 
167   void OnEndpointChanged(XdsApi::EdsUpdate update);
168   void OnError(grpc_error* error);
169   void OnResourceDoesNotExist();
170 
171   void MaybeDestroyChildPolicyLocked();
172 
173   void UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list);
174   void UpdateChildPolicyLocked();
175   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
176       const grpc_channel_args* args);
177   ServerAddressList CreateChildPolicyAddressesLocked();
178   RefCountedPtr<Config> CreateChildPolicyConfigLocked();
179   grpc_channel_args* CreateChildPolicyArgsLocked(
180       const grpc_channel_args* args_in);
181 
182   // Caller must ensure that config_ is set before calling.
GetEdsResourceName() const183   const absl::string_view GetEdsResourceName() const {
184     if (!is_xds_uri_) return server_name_;
185     if (!config_->eds_service_name().empty()) {
186       return config_->eds_service_name();
187     }
188     return config_->cluster_name();
189   }
190 
191   // Returns a pair containing the cluster and eds_service_name to use
192   // for LRS load reporting.
193   // Caller must ensure that config_ is set before calling.
GetLrsClusterKey() const194   std::pair<absl::string_view, absl::string_view> GetLrsClusterKey() const {
195     if (!is_xds_uri_) return {server_name_, nullptr};
196     return {config_->cluster_name(), config_->eds_service_name()};
197   }
198 
199   // Server name from target URI.
200   std::string server_name_;
201   bool is_xds_uri_;
202 
203   // Current channel args and config from the resolver.
204   const grpc_channel_args* args_ = nullptr;
205   RefCountedPtr<EdsLbConfig> config_;
206 
207   // Internal state.
208   bool shutting_down_ = false;
209 
210   // The xds client and endpoint watcher.
211   RefCountedPtr<XdsClient> xds_client_;
212   // A pointer to the endpoint watcher, to be used when cancelling the watch.
213   // Note that this is not owned, so this pointer must never be derefernced.
214   EndpointWatcher* endpoint_watcher_ = nullptr;
215   // The latest data from the endpoint watcher.
216   XdsApi::EdsUpdate::PriorityList priority_list_;
217   // State used to retain child policy names for priority policy.
218   std::vector<size_t /*child_number*/> priority_child_numbers_;
219 
220   RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
221 
222   OrphanablePtr<LoadBalancingPolicy> child_policy_;
223 };
224 
225 //
226 // EdsLb::Helper
227 //
228 
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)229 RefCountedPtr<SubchannelInterface> EdsLb::Helper::CreateSubchannel(
230     ServerAddress address, const grpc_channel_args& args) {
231   if (eds_policy_->shutting_down_) return nullptr;
232   return eds_policy_->channel_control_helper()->CreateSubchannel(
233       std::move(address), args);
234 }
235 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)236 void EdsLb::Helper::UpdateState(grpc_connectivity_state state,
237                                 const absl::Status& status,
238                                 std::unique_ptr<SubchannelPicker> picker) {
239   if (eds_policy_->shutting_down_ || eds_policy_->child_policy_ == nullptr) {
240     return;
241   }
242   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
243     gpr_log(GPR_INFO, "[edslb %p] child policy updated state=%s (%s) picker=%p",
244             eds_policy_.get(), ConnectivityStateName(state),
245             status.ToString().c_str(), picker.get());
246   }
247   eds_policy_->channel_control_helper()->UpdateState(state, status,
248                                                      std::move(picker));
249 }
250 
AddTraceEvent(TraceSeverity severity,absl::string_view message)251 void EdsLb::Helper::AddTraceEvent(TraceSeverity severity,
252                                   absl::string_view message) {
253   if (eds_policy_->shutting_down_) return;
254   eds_policy_->channel_control_helper()->AddTraceEvent(severity, message);
255 }
256 
257 //
258 // EdsLb::EndpointWatcher::Notifier
259 //
260 
Notifier(RefCountedPtr<EdsLb> parent,XdsApi::EdsUpdate update)261 EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr<EdsLb> parent,
262                                            XdsApi::EdsUpdate update)
263     : parent_(std::move(parent)), update_(std::move(update)), type_(kUpdate) {
264   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
265   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
266 }
267 
Notifier(RefCountedPtr<EdsLb> parent,grpc_error * error)268 EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr<EdsLb> parent,
269                                            grpc_error* error)
270     : parent_(std::move(parent)), type_(kError) {
271   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
272   ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
273 }
274 
Notifier(RefCountedPtr<EdsLb> parent)275 EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr<EdsLb> parent)
276     : parent_(std::move(parent)), type_(kDoesNotExist) {
277   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
278   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
279 }
280 
RunInExecCtx(void * arg,grpc_error * error)281 void EdsLb::EndpointWatcher::Notifier::RunInExecCtx(void* arg,
282                                                     grpc_error* error) {
283   Notifier* self = static_cast<Notifier*>(arg);
284   GRPC_ERROR_REF(error);
285   self->parent_->work_serializer()->Run(
286       [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
287 }
288 
RunInWorkSerializer(grpc_error * error)289 void EdsLb::EndpointWatcher::Notifier::RunInWorkSerializer(grpc_error* error) {
290   switch (type_) {
291     case kUpdate:
292       parent_->OnEndpointChanged(std::move(update_));
293       break;
294     case kError:
295       parent_->OnError(error);
296       break;
297     case kDoesNotExist:
298       parent_->OnResourceDoesNotExist();
299       break;
300   };
301   delete this;
302 }
303 
304 //
305 // EdsLb public methods
306 //
307 
EdsLb(RefCountedPtr<XdsClient> xds_client,Args args)308 EdsLb::EdsLb(RefCountedPtr<XdsClient> xds_client, Args args)
309     : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
310   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
311     gpr_log(GPR_INFO, "[edslb %p] created -- using xds client %p", this,
312             xds_client_.get());
313   }
314   // Record server name.
315   const char* server_uri =
316       grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
317   GPR_ASSERT(server_uri != nullptr);
318   grpc_uri* uri = grpc_uri_parse(server_uri, true);
319   GPR_ASSERT(uri->path[0] != '\0');
320   server_name_ = uri->path[0] == '/' ? uri->path + 1 : uri->path;
321   is_xds_uri_ = strcmp(uri->scheme, "xds") == 0;
322   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
323     gpr_log(GPR_INFO, "[edslb %p] server name from channel (is_xds_uri=%d): %s",
324             this, is_xds_uri_, server_name_.c_str());
325   }
326   grpc_uri_destroy(uri);
327   // EDS-only flow.
328   if (!is_xds_uri_) {
329     // Setup channelz linkage.
330     channelz::ChannelNode* parent_channelz_node =
331         grpc_channel_args_find_pointer<channelz::ChannelNode>(
332             args.args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
333     if (parent_channelz_node != nullptr) {
334       xds_client_->AddChannelzLinkage(parent_channelz_node);
335     }
336     // Couple polling.
337     grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
338                                      interested_parties());
339   }
340 }
341 
~EdsLb()342 EdsLb::~EdsLb() {
343   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
344     gpr_log(GPR_INFO, "[edslb %p] destroying eds LB policy", this);
345   }
346 }
347 
ShutdownLocked()348 void EdsLb::ShutdownLocked() {
349   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
350     gpr_log(GPR_INFO, "[edslb %p] shutting down", this);
351   }
352   shutting_down_ = true;
353   MaybeDestroyChildPolicyLocked();
354   // Cancel watcher.
355   if (endpoint_watcher_ != nullptr) {
356     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
357       gpr_log(GPR_INFO, "[edslb %p] cancelling xds watch for %s", this,
358               std::string(GetEdsResourceName()).c_str());
359     }
360     xds_client_->CancelEndpointDataWatch(GetEdsResourceName(),
361                                          endpoint_watcher_);
362   }
363   if (!is_xds_uri_) {
364     // Remove channelz linkage.
365     channelz::ChannelNode* parent_channelz_node =
366         grpc_channel_args_find_pointer<channelz::ChannelNode>(
367             args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
368     if (parent_channelz_node != nullptr) {
369       xds_client_->RemoveChannelzLinkage(parent_channelz_node);
370     }
371     // Decouple polling.
372     grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
373                                      interested_parties());
374   }
375   xds_client_.reset(DEBUG_LOCATION, "EdsLb");
376   // Destroy channel args.
377   grpc_channel_args_destroy(args_);
378   args_ = nullptr;
379 }
380 
MaybeDestroyChildPolicyLocked()381 void EdsLb::MaybeDestroyChildPolicyLocked() {
382   if (child_policy_ != nullptr) {
383     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
384                                      interested_parties());
385     child_policy_.reset();
386   }
387 }
388 
UpdateLocked(UpdateArgs args)389 void EdsLb::UpdateLocked(UpdateArgs args) {
390   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
391     gpr_log(GPR_INFO, "[edslb %p] Received update", this);
392   }
393   const bool is_initial_update = args_ == nullptr;
394   // Update config.
395   auto old_config = std::move(config_);
396   config_ = std::move(args.config);
397   // Update args.
398   grpc_channel_args_destroy(args_);
399   args_ = args.args;
400   args.args = nullptr;
401   // Update child policy if needed.
402   if (child_policy_ != nullptr) UpdateChildPolicyLocked();
403   // Create endpoint watcher if needed.
404   if (is_initial_update) {
405     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
406       gpr_log(GPR_INFO, "[edslb %p] starting xds watch for %s", this,
407               std::string(GetEdsResourceName()).c_str());
408     }
409     auto watcher = absl::make_unique<EndpointWatcher>(
410         Ref(DEBUG_LOCATION, "EndpointWatcher"));
411     endpoint_watcher_ = watcher.get();
412     xds_client_->WatchEndpointData(GetEdsResourceName(), std::move(watcher));
413   }
414 }
415 
ResetBackoffLocked()416 void EdsLb::ResetBackoffLocked() {
417   // When the XdsClient is instantiated in the resolver instead of in this
418   // LB policy, this is done via the resolver, so we don't need to do it here.
419   if (!is_xds_uri_ && xds_client_ != nullptr) xds_client_->ResetBackoff();
420   if (child_policy_ != nullptr) {
421     child_policy_->ResetBackoffLocked();
422   }
423 }
424 
OnEndpointChanged(XdsApi::EdsUpdate update)425 void EdsLb::OnEndpointChanged(XdsApi::EdsUpdate update) {
426   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
427     gpr_log(GPR_INFO, "[edslb %p] Received EDS update from xds client", this);
428   }
429   // Update the drop config.
430   drop_config_ = std::move(update.drop_config);
431   // If priority list is empty, add a single priority, just so that we
432   // have a child in which to create the xds_cluster_impl policy.
433   if (update.priorities.empty()) update.priorities.emplace_back();
434   // Update child policy.
435   UpdatePriorityList(std::move(update.priorities));
436 }
437 
OnError(grpc_error * error)438 void EdsLb::OnError(grpc_error* error) {
439   gpr_log(GPR_ERROR, "[edslb %p] xds watcher reported error: %s", this,
440           grpc_error_string(error));
441   // Go into TRANSIENT_FAILURE if we have not yet created the child
442   // policy (i.e., we have not yet received data from xds).  Otherwise,
443   // we keep running with the data we had previously.
444   if (child_policy_ == nullptr) {
445     channel_control_helper()->UpdateState(
446         GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
447         absl::make_unique<TransientFailurePicker>(error));
448   } else {
449     GRPC_ERROR_UNREF(error);
450   }
451 }
452 
OnResourceDoesNotExist()453 void EdsLb::OnResourceDoesNotExist() {
454   gpr_log(
455       GPR_ERROR,
456       "[edslb %p] EDS resource does not exist -- reporting TRANSIENT_FAILURE",
457       this);
458   grpc_error* error = grpc_error_set_int(
459       GRPC_ERROR_CREATE_FROM_STATIC_STRING("EDS resource does not exist"),
460       GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
461   channel_control_helper()->UpdateState(
462       GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
463       absl::make_unique<TransientFailurePicker>(error));
464   MaybeDestroyChildPolicyLocked();
465 }
466 
467 //
468 // child policy-related methods
469 //
470 
UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list)471 void EdsLb::UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list) {
472   // Build some maps from locality to child number and the reverse from
473   // the old data in priority_list_ and priority_child_numbers_.
474   std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
475       locality_child_map;
476   std::map<size_t, std::set<XdsLocalityName*>> child_locality_map;
477   for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
478     size_t child_number = priority_child_numbers_[priority];
479     const auto& localities = priority_list_[priority].localities;
480     for (const auto& p : localities) {
481       XdsLocalityName* locality_name = p.first;
482       locality_child_map[locality_name] = child_number;
483       child_locality_map[child_number].insert(locality_name);
484     }
485   }
486   // Construct new list of children.
487   std::vector<size_t> priority_child_numbers;
488   for (size_t priority = 0; priority < priority_list.size(); ++priority) {
489     const auto& localities = priority_list[priority].localities;
490     absl::optional<size_t> child_number;
491     // If one of the localities in this priority already existed, reuse its
492     // child number.
493     for (const auto& p : localities) {
494       XdsLocalityName* locality_name = p.first;
495       if (!child_number.has_value()) {
496         auto it = locality_child_map.find(locality_name);
497         if (it != locality_child_map.end()) {
498           child_number = it->second;
499           locality_child_map.erase(it);
500           // Remove localities that *used* to be in this child number, so
501           // that we don't incorrectly reuse this child number for a
502           // subsequent priority.
503           for (XdsLocalityName* old_locality :
504                child_locality_map[*child_number]) {
505             locality_child_map.erase(old_locality);
506           }
507         }
508       } else {
509         // Remove all localities that are now in this child number, so
510         // that we don't accidentally reuse this child number for a
511         // subsequent priority.
512         locality_child_map.erase(locality_name);
513       }
514     }
515     // If we didn't find an existing child number, assign a new one.
516     if (!child_number.has_value()) {
517       for (child_number = 0;
518            child_locality_map.find(*child_number) != child_locality_map.end();
519            ++(*child_number)) {
520       }
521       // Add entry so we know that the child number is in use.
522       // (Don't need to add the list of localities, since we won't use them.)
523       child_locality_map[*child_number];
524     }
525     priority_child_numbers.push_back(*child_number);
526   }
527   // Save update.
528   priority_list_ = std::move(priority_list);
529   priority_child_numbers_ = std::move(priority_child_numbers);
530   // Update child policy.
531   UpdateChildPolicyLocked();
532 }
533 
CreateChildPolicyAddressesLocked()534 ServerAddressList EdsLb::CreateChildPolicyAddressesLocked() {
535   ServerAddressList addresses;
536   for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
537     const auto& localities = priority_list_[priority].localities;
538     std::string priority_child_name =
539         absl::StrCat("child", priority_child_numbers_[priority]);
540     for (const auto& p : localities) {
541       const auto& locality_name = p.first;
542       const auto& locality = p.second;
543       std::vector<std::string> hierarchical_path = {
544           priority_child_name, locality_name->AsHumanReadableString()};
545       for (const auto& endpoint : locality.endpoints) {
546         addresses.emplace_back(
547             endpoint
548                 .WithAttribute(kHierarchicalPathAttributeKey,
549                                MakeHierarchicalPathAttribute(hierarchical_path))
550                 .WithAttribute(kXdsLocalityNameAttributeKey,
551                                absl::make_unique<XdsLocalityAttribute>(
552                                    locality_name->Ref())));
553       }
554     }
555   }
556   return addresses;
557 }
558 
559 RefCountedPtr<LoadBalancingPolicy::Config>
CreateChildPolicyConfigLocked()560 EdsLb::CreateChildPolicyConfigLocked() {
561   const auto lrs_key = GetLrsClusterKey();
562   Json::Object priority_children;
563   Json::Array priority_priorities;
564   for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
565     const auto& localities = priority_list_[priority].localities;
566     Json::Object weighted_targets;
567     for (const auto& p : localities) {
568       XdsLocalityName* locality_name = p.first;
569       const auto& locality = p.second;
570       // Construct JSON object containing locality name.
571       Json::Object locality_name_json;
572       if (!locality_name->region().empty()) {
573         locality_name_json["region"] = locality_name->region();
574       }
575       if (!locality_name->zone().empty()) {
576         locality_name_json["zone"] = locality_name->zone();
577       }
578       if (!locality_name->sub_zone().empty()) {
579         locality_name_json["subzone"] = locality_name->sub_zone();
580       }
581       // Add weighted target entry.
582       weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{
583           {"weight", locality.lb_weight},
584           {"childPolicy", config_->endpoint_picking_policy()},
585       };
586     }
587     // Construct locality-picking policy.
588     // Start with field from our config and add the "targets" field.
589     Json locality_picking_config = config_->locality_picking_policy();
590     Json::Object& config =
591         *(*locality_picking_config.mutable_array())[0].mutable_object();
592     auto it = config.begin();
593     GPR_ASSERT(it != config.end());
594     (*it->second.mutable_object())["targets"] = std::move(weighted_targets);
595     // Wrap it in the drop policy.
596     Json::Array drop_categories;
597     for (const auto& category : drop_config_->drop_category_list()) {
598       drop_categories.push_back(Json::Object{
599           {"category", category.name},
600           {"requests_per_million", category.parts_per_million},
601       });
602     }
603     Json::Object xds_cluster_impl_config = {
604         {"clusterName", std::string(lrs_key.first)},
605         {"childPolicy", std::move(locality_picking_config)},
606         {"dropCategories", std::move(drop_categories)},
607         {"maxConcurrentRequests", config_->max_concurrent_requests()},
608     };
609     if (!lrs_key.second.empty()) {
610       xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second);
611     }
612     if (config_->lrs_load_reporting_server_name().has_value()) {
613       xds_cluster_impl_config["lrsLoadReportingServerName"] =
614           config_->lrs_load_reporting_server_name().value();
615     }
616     Json locality_picking_policy = Json::Array{Json::Object{
617         {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)},
618     }};
619     // Add priority entry.
620     const size_t child_number = priority_child_numbers_[priority];
621     std::string child_name = absl::StrCat("child", child_number);
622     priority_priorities.emplace_back(child_name);
623     priority_children[child_name] = Json::Object{
624         {"config", std::move(locality_picking_policy)},
625         {"ignore_reresolution_requests", true},
626     };
627   }
628   Json json = Json::Array{Json::Object{
629       {"priority_experimental",
630        Json::Object{
631            {"children", std::move(priority_children)},
632            {"priorities", std::move(priority_priorities)},
633        }},
634   }};
635   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
636     std::string json_str = json.Dump(/*indent=*/1);
637     gpr_log(GPR_INFO, "[edslb %p] generated config for child policy: %s", this,
638             json_str.c_str());
639   }
640   grpc_error* error = GRPC_ERROR_NONE;
641   RefCountedPtr<LoadBalancingPolicy::Config> config =
642       LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
643   if (error != GRPC_ERROR_NONE) {
644     // This should never happen, but if it does, we basically have no
645     // way to fix it, so we put the channel in TRANSIENT_FAILURE.
646     gpr_log(GPR_ERROR,
647             "[edslb %p] error parsing generated child policy config -- "
648             "will put channel in TRANSIENT_FAILURE: %s",
649             this, grpc_error_string(error));
650     error = grpc_error_set_int(
651         grpc_error_add_child(
652             GRPC_ERROR_CREATE_FROM_STATIC_STRING(
653                 "eds LB policy: error parsing generated child policy config"),
654             error),
655         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
656     channel_control_helper()->UpdateState(
657         GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
658         absl::make_unique<TransientFailurePicker>(error));
659     return nullptr;
660   }
661   return config;
662 }
663 
UpdateChildPolicyLocked()664 void EdsLb::UpdateChildPolicyLocked() {
665   if (shutting_down_) return;
666   UpdateArgs update_args;
667   update_args.config = CreateChildPolicyConfigLocked();
668   if (update_args.config == nullptr) return;
669   update_args.addresses = CreateChildPolicyAddressesLocked();
670   update_args.args = CreateChildPolicyArgsLocked(args_);
671   if (child_policy_ == nullptr) {
672     child_policy_ = CreateChildPolicyLocked(update_args.args);
673   }
674   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
675     gpr_log(GPR_INFO, "[edslb %p] Updating child policy %p", this,
676             child_policy_.get());
677   }
678   child_policy_->UpdateLocked(std::move(update_args));
679 }
680 
CreateChildPolicyArgsLocked(const grpc_channel_args * args)681 grpc_channel_args* EdsLb::CreateChildPolicyArgsLocked(
682     const grpc_channel_args* args) {
683   grpc_arg args_to_add[] = {
684       // A channel arg indicating if the target is a backend inferred from an
685       // xds load balancer.
686       // TODO(roth): This isn't needed with the new fallback design.
687       // Remove as part of implementing the new fallback functionality.
688       grpc_channel_arg_integer_create(
689           const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER),
690           1),
691       // Inhibit client-side health checking, since the balancer does
692       // this for us.
693       grpc_channel_arg_integer_create(
694           const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
695   };
696   return grpc_channel_args_copy_and_add(args, args_to_add,
697                                         GPR_ARRAY_SIZE(args_to_add));
698 }
699 
CreateChildPolicyLocked(const grpc_channel_args * args)700 OrphanablePtr<LoadBalancingPolicy> EdsLb::CreateChildPolicyLocked(
701     const grpc_channel_args* args) {
702   LoadBalancingPolicy::Args lb_policy_args;
703   lb_policy_args.work_serializer = work_serializer();
704   lb_policy_args.args = args;
705   lb_policy_args.channel_control_helper =
706       absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
707   OrphanablePtr<LoadBalancingPolicy> lb_policy =
708       LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
709           "priority_experimental", std::move(lb_policy_args));
710   if (GPR_UNLIKELY(lb_policy == nullptr)) {
711     gpr_log(GPR_ERROR, "[edslb %p] failure creating child policy", this);
712     return nullptr;
713   }
714   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
715     gpr_log(GPR_INFO, "[edslb %p]: Created new child policy %p", this,
716             lb_policy.get());
717   }
718   // Add our interested_parties pollset_set to that of the newly created
719   // child policy. This will make the child policy progress upon activity on
720   // this policy, which in turn is tied to the application's call.
721   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
722                                    interested_parties());
723   return lb_policy;
724 }
725 
726 //
727 // factory
728 //
729 
730 class EdsLbFactory : public LoadBalancingPolicyFactory {
731  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const732   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
733       LoadBalancingPolicy::Args args) const override {
734     grpc_error* error = GRPC_ERROR_NONE;
735     RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
736     if (error != GRPC_ERROR_NONE) {
737       gpr_log(GPR_ERROR,
738               "cannot get XdsClient to instantiate eds LB policy: %s",
739               grpc_error_string(error));
740       GRPC_ERROR_UNREF(error);
741       return nullptr;
742     }
743     return MakeOrphanable<EdsChildHandler>(std::move(xds_client),
744                                            std::move(args));
745   }
746 
name() const747   const char* name() const override { return kEds; }
748 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const749   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
750       const Json& json, grpc_error** error) const override {
751     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
752     if (json.type() == Json::Type::JSON_NULL) {
753       // eds was mentioned as a policy in the deprecated loadBalancingPolicy
754       // field or in the client API.
755       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
756           "field:loadBalancingPolicy error:eds policy requires configuration. "
757           "Please use loadBalancingConfig field of service config instead.");
758       return nullptr;
759     }
760     std::vector<grpc_error*> error_list;
761     // EDS service name.
762     std::string eds_service_name;
763     auto it = json.object_value().find("edsServiceName");
764     if (it != json.object_value().end()) {
765       if (it->second.type() != Json::Type::STRING) {
766         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
767             "field:edsServiceName error:type should be string"));
768       } else {
769         eds_service_name = it->second.string_value();
770       }
771     }
772     // Cluster name.
773     std::string cluster_name;
774     it = json.object_value().find("clusterName");
775     if (it == json.object_value().end()) {
776       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
777           "field:clusterName error:required field missing"));
778     } else if (it->second.type() != Json::Type::STRING) {
779       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
780           "field:clusterName error:type should be string"));
781     } else {
782       cluster_name = it->second.string_value();
783     }
784     // LRS load reporting server name.
785     absl::optional<std::string> lrs_load_reporting_server_name;
786     it = json.object_value().find("lrsLoadReportingServerName");
787     if (it != json.object_value().end()) {
788       if (it->second.type() != Json::Type::STRING) {
789         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
790             "field:lrsLoadReportingServerName error:type should be string"));
791       } else {
792         lrs_load_reporting_server_name.emplace(it->second.string_value());
793       }
794     }
795     // Locality-picking policy.
796     Json locality_picking_policy;
797     it = json.object_value().find("localityPickingPolicy");
798     if (it == json.object_value().end()) {
799       locality_picking_policy = Json::Array{
800           Json::Object{
801               {"weighted_target_experimental",
802                Json::Object{
803                    {"targets", Json::Object()},
804                }},
805           },
806       };
807     } else {
808       locality_picking_policy = it->second;
809     }
810     grpc_error* parse_error = GRPC_ERROR_NONE;
811     if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
812             locality_picking_policy, &parse_error) == nullptr) {
813       GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
814       error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
815           "localityPickingPolicy", &parse_error, 1));
816       GRPC_ERROR_UNREF(parse_error);
817     }
818     // Endpoint-picking policy.  Called "childPolicy" for xds policy.
819     Json endpoint_picking_policy;
820     it = json.object_value().find("endpointPickingPolicy");
821     if (it == json.object_value().end()) {
822       endpoint_picking_policy = Json::Array{
823           Json::Object{
824               {"round_robin", Json::Object()},
825           },
826       };
827     } else {
828       endpoint_picking_policy = it->second;
829     }
830     parse_error = GRPC_ERROR_NONE;
831     if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
832             endpoint_picking_policy, &parse_error) == nullptr) {
833       GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
834       error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
835           "endpointPickingPolicy", &parse_error, 1));
836       GRPC_ERROR_UNREF(parse_error);
837     }
838     // Max concurrent requests.
839     uint32_t max_concurrent_requests = 1024;
840     it = json.object_value().find("max_concurrent_requests");
841     if (it != json.object_value().end()) {
842       if (it->second.type() != Json::Type::NUMBER) {
843         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
844             "field:max_concurrent_requests error:must be of type number"));
845       } else {
846         max_concurrent_requests =
847             gpr_parse_nonnegative_int(it->second.string_value().c_str());
848       }
849     }
850     // Construct config.
851     if (error_list.empty()) {
852       return MakeRefCounted<EdsLbConfig>(
853           std::move(cluster_name), std::move(eds_service_name),
854           std::move(lrs_load_reporting_server_name),
855           std::move(locality_picking_policy),
856           std::move(endpoint_picking_policy), max_concurrent_requests);
857     } else {
858       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
859           "eds_experimental LB policy config", &error_list);
860       return nullptr;
861     }
862   }
863 
864  private:
865   class EdsChildHandler : public ChildPolicyHandler {
866    public:
EdsChildHandler(RefCountedPtr<XdsClient> xds_client,Args args)867     EdsChildHandler(RefCountedPtr<XdsClient> xds_client, Args args)
868         : ChildPolicyHandler(std::move(args), &grpc_lb_eds_trace),
869           xds_client_(std::move(xds_client)) {}
870 
ConfigChangeRequiresNewPolicyInstance(LoadBalancingPolicy::Config * old_config,LoadBalancingPolicy::Config * new_config) const871     bool ConfigChangeRequiresNewPolicyInstance(
872         LoadBalancingPolicy::Config* old_config,
873         LoadBalancingPolicy::Config* new_config) const override {
874       GPR_ASSERT(old_config->name() == kEds);
875       GPR_ASSERT(new_config->name() == kEds);
876       EdsLbConfig* old_eds_config = static_cast<EdsLbConfig*>(old_config);
877       EdsLbConfig* new_eds_config = static_cast<EdsLbConfig*>(new_config);
878       return old_eds_config->cluster_name() != new_eds_config->cluster_name() ||
879              old_eds_config->eds_service_name() !=
880                  new_eds_config->eds_service_name() ||
881              old_eds_config->lrs_load_reporting_server_name() !=
882                  new_eds_config->lrs_load_reporting_server_name();
883     }
884 
CreateLoadBalancingPolicy(const char * name,LoadBalancingPolicy::Args args) const885     OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
886         const char* name, LoadBalancingPolicy::Args args) const override {
887       return MakeOrphanable<EdsLb>(xds_client_, std::move(args));
888     }
889 
890    private:
891     RefCountedPtr<XdsClient> xds_client_;
892   };
893 };
894 
895 }  // namespace
896 
897 }  // namespace grpc_core
898 
899 //
900 // Plugin registration
901 //
902 
grpc_lb_policy_eds_init()903 void grpc_lb_policy_eds_init() {
904   grpc_core::LoadBalancingPolicyRegistry::Builder::
905       RegisterLoadBalancingPolicyFactory(
906           absl::make_unique<grpc_core::EdsLbFactory>());
907 }
908 
grpc_lb_policy_eds_shutdown()909 void grpc_lb_policy_eds_shutdown() {}
910