1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20 #include <limits.h>
21
22 #include "absl/strings/str_cat.h"
23 #include "absl/types/optional.h"
24
25 #include <grpc/grpc.h>
26
27 #include "src/core/ext/filters/client_channel/client_channel.h"
28 #include "src/core/ext/filters/client_channel/lb_policy.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
32 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
33 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
34 #include "src/core/ext/filters/client_channel/server_address.h"
35 #include "src/core/ext/xds/xds_channel_args.h"
36 #include "src/core/ext/xds/xds_client.h"
37 #include "src/core/ext/xds/xds_client_stats.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/gpr/string.h"
40 #include "src/core/lib/gprpp/orphanable.h"
41 #include "src/core/lib/gprpp/ref_counted_ptr.h"
42 #include "src/core/lib/iomgr/work_serializer.h"
43 #include "src/core/lib/transport/error_utils.h"
44 #include "src/core/lib/uri/uri_parser.h"
45
46 #define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000
47
48 namespace grpc_core {
49
50 TraceFlag grpc_lb_eds_trace(false, "eds_lb");
51
52 const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
53
54 namespace {
55
56 constexpr char kEds[] = "eds_experimental";
57
58 // Config for EDS LB policy.
59 class EdsLbConfig : public LoadBalancingPolicy::Config {
60 public:
EdsLbConfig(std::string cluster_name,std::string eds_service_name,absl::optional<std::string> lrs_load_reporting_server_name,Json locality_picking_policy,Json endpoint_picking_policy,uint32_t max_concurrent_requests)61 EdsLbConfig(std::string cluster_name, std::string eds_service_name,
62 absl::optional<std::string> lrs_load_reporting_server_name,
63 Json locality_picking_policy, Json endpoint_picking_policy,
64 uint32_t max_concurrent_requests)
65 : cluster_name_(std::move(cluster_name)),
66 eds_service_name_(std::move(eds_service_name)),
67 lrs_load_reporting_server_name_(
68 std::move(lrs_load_reporting_server_name)),
69 locality_picking_policy_(std::move(locality_picking_policy)),
70 endpoint_picking_policy_(std::move(endpoint_picking_policy)),
71 max_concurrent_requests_(max_concurrent_requests) {}
72
name() const73 const char* name() const override { return kEds; }
74
cluster_name() const75 const std::string& cluster_name() const { return cluster_name_; }
eds_service_name() const76 const std::string& eds_service_name() const { return eds_service_name_; }
lrs_load_reporting_server_name() const77 const absl::optional<std::string>& lrs_load_reporting_server_name() const {
78 return lrs_load_reporting_server_name_;
79 };
locality_picking_policy() const80 const Json& locality_picking_policy() const {
81 return locality_picking_policy_;
82 }
endpoint_picking_policy() const83 const Json& endpoint_picking_policy() const {
84 return endpoint_picking_policy_;
85 }
max_concurrent_requests() const86 const uint32_t max_concurrent_requests() const {
87 return max_concurrent_requests_;
88 }
89
90 private:
91 std::string cluster_name_;
92 std::string eds_service_name_;
93 absl::optional<std::string> lrs_load_reporting_server_name_;
94 Json locality_picking_policy_;
95 Json endpoint_picking_policy_;
96 uint32_t max_concurrent_requests_;
97 };
98
99 // EDS LB policy.
100 class EdsLb : public LoadBalancingPolicy {
101 public:
102 EdsLb(RefCountedPtr<XdsClient> xds_client, Args args);
103
name() const104 const char* name() const override { return kEds; }
105
106 void UpdateLocked(UpdateArgs args) override;
107 void ResetBackoffLocked() override;
108
109 private:
110 class EndpointWatcher : public XdsClient::EndpointWatcherInterface {
111 public:
EndpointWatcher(RefCountedPtr<EdsLb> parent)112 explicit EndpointWatcher(RefCountedPtr<EdsLb> parent)
113 : parent_(std::move(parent)) {}
OnEndpointChanged(XdsApi::EdsUpdate update)114 void OnEndpointChanged(XdsApi::EdsUpdate update) override {
115 new Notifier(parent_, std::move(update));
116 }
OnError(grpc_error * error)117 void OnError(grpc_error* error) override { new Notifier(parent_, error); }
OnResourceDoesNotExist()118 void OnResourceDoesNotExist() override { new Notifier(parent_); }
119
120 private:
121 class Notifier {
122 public:
123 Notifier(RefCountedPtr<EdsLb> parent, XdsApi::EdsUpdate update);
124 Notifier(RefCountedPtr<EdsLb> parent, grpc_error* error);
125 explicit Notifier(RefCountedPtr<EdsLb> parent);
126
127 private:
128 enum Type { kUpdate, kError, kDoesNotExist };
129
130 static void RunInExecCtx(void* arg, grpc_error* error);
131 void RunInWorkSerializer(grpc_error* error);
132
133 RefCountedPtr<EdsLb> parent_;
134 grpc_closure closure_;
135 XdsApi::EdsUpdate update_;
136 Type type_;
137 };
138
139 RefCountedPtr<EdsLb> parent_;
140 };
141
142 class Helper : public ChannelControlHelper {
143 public:
Helper(RefCountedPtr<EdsLb> eds_policy)144 explicit Helper(RefCountedPtr<EdsLb> eds_policy)
145 : eds_policy_(std::move(eds_policy)) {}
146
~Helper()147 ~Helper() override { eds_policy_.reset(DEBUG_LOCATION, "Helper"); }
148
149 RefCountedPtr<SubchannelInterface> CreateSubchannel(
150 ServerAddress address, const grpc_channel_args& args) override;
151 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
152 std::unique_ptr<SubchannelPicker> picker) override;
153 // This is a no-op, because we get the addresses from the xds
154 // client, which is a watch-based API.
RequestReresolution()155 void RequestReresolution() override {}
156 void AddTraceEvent(TraceSeverity severity,
157 absl::string_view message) override;
158
159 private:
160 RefCountedPtr<EdsLb> eds_policy_;
161 };
162
163 ~EdsLb() override;
164
165 void ShutdownLocked() override;
166
167 void OnEndpointChanged(XdsApi::EdsUpdate update);
168 void OnError(grpc_error* error);
169 void OnResourceDoesNotExist();
170
171 void MaybeDestroyChildPolicyLocked();
172
173 void UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list);
174 void UpdateChildPolicyLocked();
175 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
176 const grpc_channel_args* args);
177 ServerAddressList CreateChildPolicyAddressesLocked();
178 RefCountedPtr<Config> CreateChildPolicyConfigLocked();
179 grpc_channel_args* CreateChildPolicyArgsLocked(
180 const grpc_channel_args* args_in);
181
182 // Caller must ensure that config_ is set before calling.
GetEdsResourceName() const183 const absl::string_view GetEdsResourceName() const {
184 if (!is_xds_uri_) return server_name_;
185 if (!config_->eds_service_name().empty()) {
186 return config_->eds_service_name();
187 }
188 return config_->cluster_name();
189 }
190
191 // Returns a pair containing the cluster and eds_service_name to use
192 // for LRS load reporting.
193 // Caller must ensure that config_ is set before calling.
GetLrsClusterKey() const194 std::pair<absl::string_view, absl::string_view> GetLrsClusterKey() const {
195 if (!is_xds_uri_) return {server_name_, nullptr};
196 return {config_->cluster_name(), config_->eds_service_name()};
197 }
198
199 // Server name from target URI.
200 std::string server_name_;
201 bool is_xds_uri_;
202
203 // Current channel args and config from the resolver.
204 const grpc_channel_args* args_ = nullptr;
205 RefCountedPtr<EdsLbConfig> config_;
206
207 // Internal state.
208 bool shutting_down_ = false;
209
210 // The xds client and endpoint watcher.
211 RefCountedPtr<XdsClient> xds_client_;
212 // A pointer to the endpoint watcher, to be used when cancelling the watch.
213 // Note that this is not owned, so this pointer must never be derefernced.
214 EndpointWatcher* endpoint_watcher_ = nullptr;
215 // The latest data from the endpoint watcher.
216 XdsApi::EdsUpdate::PriorityList priority_list_;
217 // State used to retain child policy names for priority policy.
218 std::vector<size_t /*child_number*/> priority_child_numbers_;
219
220 RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
221
222 OrphanablePtr<LoadBalancingPolicy> child_policy_;
223 };
224
225 //
226 // EdsLb::Helper
227 //
228
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)229 RefCountedPtr<SubchannelInterface> EdsLb::Helper::CreateSubchannel(
230 ServerAddress address, const grpc_channel_args& args) {
231 if (eds_policy_->shutting_down_) return nullptr;
232 return eds_policy_->channel_control_helper()->CreateSubchannel(
233 std::move(address), args);
234 }
235
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)236 void EdsLb::Helper::UpdateState(grpc_connectivity_state state,
237 const absl::Status& status,
238 std::unique_ptr<SubchannelPicker> picker) {
239 if (eds_policy_->shutting_down_ || eds_policy_->child_policy_ == nullptr) {
240 return;
241 }
242 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
243 gpr_log(GPR_INFO, "[edslb %p] child policy updated state=%s (%s) picker=%p",
244 eds_policy_.get(), ConnectivityStateName(state),
245 status.ToString().c_str(), picker.get());
246 }
247 eds_policy_->channel_control_helper()->UpdateState(state, status,
248 std::move(picker));
249 }
250
AddTraceEvent(TraceSeverity severity,absl::string_view message)251 void EdsLb::Helper::AddTraceEvent(TraceSeverity severity,
252 absl::string_view message) {
253 if (eds_policy_->shutting_down_) return;
254 eds_policy_->channel_control_helper()->AddTraceEvent(severity, message);
255 }
256
257 //
258 // EdsLb::EndpointWatcher::Notifier
259 //
260
Notifier(RefCountedPtr<EdsLb> parent,XdsApi::EdsUpdate update)261 EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr<EdsLb> parent,
262 XdsApi::EdsUpdate update)
263 : parent_(std::move(parent)), update_(std::move(update)), type_(kUpdate) {
264 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
265 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
266 }
267
Notifier(RefCountedPtr<EdsLb> parent,grpc_error * error)268 EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr<EdsLb> parent,
269 grpc_error* error)
270 : parent_(std::move(parent)), type_(kError) {
271 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
272 ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
273 }
274
Notifier(RefCountedPtr<EdsLb> parent)275 EdsLb::EndpointWatcher::Notifier::Notifier(RefCountedPtr<EdsLb> parent)
276 : parent_(std::move(parent)), type_(kDoesNotExist) {
277 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
278 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
279 }
280
RunInExecCtx(void * arg,grpc_error * error)281 void EdsLb::EndpointWatcher::Notifier::RunInExecCtx(void* arg,
282 grpc_error* error) {
283 Notifier* self = static_cast<Notifier*>(arg);
284 GRPC_ERROR_REF(error);
285 self->parent_->work_serializer()->Run(
286 [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
287 }
288
RunInWorkSerializer(grpc_error * error)289 void EdsLb::EndpointWatcher::Notifier::RunInWorkSerializer(grpc_error* error) {
290 switch (type_) {
291 case kUpdate:
292 parent_->OnEndpointChanged(std::move(update_));
293 break;
294 case kError:
295 parent_->OnError(error);
296 break;
297 case kDoesNotExist:
298 parent_->OnResourceDoesNotExist();
299 break;
300 };
301 delete this;
302 }
303
304 //
305 // EdsLb public methods
306 //
307
EdsLb(RefCountedPtr<XdsClient> xds_client,Args args)308 EdsLb::EdsLb(RefCountedPtr<XdsClient> xds_client, Args args)
309 : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
310 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
311 gpr_log(GPR_INFO, "[edslb %p] created -- using xds client %p", this,
312 xds_client_.get());
313 }
314 // Record server name.
315 const char* server_uri =
316 grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
317 GPR_ASSERT(server_uri != nullptr);
318 grpc_uri* uri = grpc_uri_parse(server_uri, true);
319 GPR_ASSERT(uri->path[0] != '\0');
320 server_name_ = uri->path[0] == '/' ? uri->path + 1 : uri->path;
321 is_xds_uri_ = strcmp(uri->scheme, "xds") == 0;
322 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
323 gpr_log(GPR_INFO, "[edslb %p] server name from channel (is_xds_uri=%d): %s",
324 this, is_xds_uri_, server_name_.c_str());
325 }
326 grpc_uri_destroy(uri);
327 // EDS-only flow.
328 if (!is_xds_uri_) {
329 // Setup channelz linkage.
330 channelz::ChannelNode* parent_channelz_node =
331 grpc_channel_args_find_pointer<channelz::ChannelNode>(
332 args.args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
333 if (parent_channelz_node != nullptr) {
334 xds_client_->AddChannelzLinkage(parent_channelz_node);
335 }
336 // Couple polling.
337 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
338 interested_parties());
339 }
340 }
341
~EdsLb()342 EdsLb::~EdsLb() {
343 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
344 gpr_log(GPR_INFO, "[edslb %p] destroying eds LB policy", this);
345 }
346 }
347
ShutdownLocked()348 void EdsLb::ShutdownLocked() {
349 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
350 gpr_log(GPR_INFO, "[edslb %p] shutting down", this);
351 }
352 shutting_down_ = true;
353 MaybeDestroyChildPolicyLocked();
354 // Cancel watcher.
355 if (endpoint_watcher_ != nullptr) {
356 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
357 gpr_log(GPR_INFO, "[edslb %p] cancelling xds watch for %s", this,
358 std::string(GetEdsResourceName()).c_str());
359 }
360 xds_client_->CancelEndpointDataWatch(GetEdsResourceName(),
361 endpoint_watcher_);
362 }
363 if (!is_xds_uri_) {
364 // Remove channelz linkage.
365 channelz::ChannelNode* parent_channelz_node =
366 grpc_channel_args_find_pointer<channelz::ChannelNode>(
367 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
368 if (parent_channelz_node != nullptr) {
369 xds_client_->RemoveChannelzLinkage(parent_channelz_node);
370 }
371 // Decouple polling.
372 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
373 interested_parties());
374 }
375 xds_client_.reset(DEBUG_LOCATION, "EdsLb");
376 // Destroy channel args.
377 grpc_channel_args_destroy(args_);
378 args_ = nullptr;
379 }
380
MaybeDestroyChildPolicyLocked()381 void EdsLb::MaybeDestroyChildPolicyLocked() {
382 if (child_policy_ != nullptr) {
383 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
384 interested_parties());
385 child_policy_.reset();
386 }
387 }
388
UpdateLocked(UpdateArgs args)389 void EdsLb::UpdateLocked(UpdateArgs args) {
390 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
391 gpr_log(GPR_INFO, "[edslb %p] Received update", this);
392 }
393 const bool is_initial_update = args_ == nullptr;
394 // Update config.
395 auto old_config = std::move(config_);
396 config_ = std::move(args.config);
397 // Update args.
398 grpc_channel_args_destroy(args_);
399 args_ = args.args;
400 args.args = nullptr;
401 // Update child policy if needed.
402 if (child_policy_ != nullptr) UpdateChildPolicyLocked();
403 // Create endpoint watcher if needed.
404 if (is_initial_update) {
405 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
406 gpr_log(GPR_INFO, "[edslb %p] starting xds watch for %s", this,
407 std::string(GetEdsResourceName()).c_str());
408 }
409 auto watcher = absl::make_unique<EndpointWatcher>(
410 Ref(DEBUG_LOCATION, "EndpointWatcher"));
411 endpoint_watcher_ = watcher.get();
412 xds_client_->WatchEndpointData(GetEdsResourceName(), std::move(watcher));
413 }
414 }
415
ResetBackoffLocked()416 void EdsLb::ResetBackoffLocked() {
417 // When the XdsClient is instantiated in the resolver instead of in this
418 // LB policy, this is done via the resolver, so we don't need to do it here.
419 if (!is_xds_uri_ && xds_client_ != nullptr) xds_client_->ResetBackoff();
420 if (child_policy_ != nullptr) {
421 child_policy_->ResetBackoffLocked();
422 }
423 }
424
OnEndpointChanged(XdsApi::EdsUpdate update)425 void EdsLb::OnEndpointChanged(XdsApi::EdsUpdate update) {
426 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
427 gpr_log(GPR_INFO, "[edslb %p] Received EDS update from xds client", this);
428 }
429 // Update the drop config.
430 drop_config_ = std::move(update.drop_config);
431 // If priority list is empty, add a single priority, just so that we
432 // have a child in which to create the xds_cluster_impl policy.
433 if (update.priorities.empty()) update.priorities.emplace_back();
434 // Update child policy.
435 UpdatePriorityList(std::move(update.priorities));
436 }
437
OnError(grpc_error * error)438 void EdsLb::OnError(grpc_error* error) {
439 gpr_log(GPR_ERROR, "[edslb %p] xds watcher reported error: %s", this,
440 grpc_error_string(error));
441 // Go into TRANSIENT_FAILURE if we have not yet created the child
442 // policy (i.e., we have not yet received data from xds). Otherwise,
443 // we keep running with the data we had previously.
444 if (child_policy_ == nullptr) {
445 channel_control_helper()->UpdateState(
446 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
447 absl::make_unique<TransientFailurePicker>(error));
448 } else {
449 GRPC_ERROR_UNREF(error);
450 }
451 }
452
OnResourceDoesNotExist()453 void EdsLb::OnResourceDoesNotExist() {
454 gpr_log(
455 GPR_ERROR,
456 "[edslb %p] EDS resource does not exist -- reporting TRANSIENT_FAILURE",
457 this);
458 grpc_error* error = grpc_error_set_int(
459 GRPC_ERROR_CREATE_FROM_STATIC_STRING("EDS resource does not exist"),
460 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
461 channel_control_helper()->UpdateState(
462 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
463 absl::make_unique<TransientFailurePicker>(error));
464 MaybeDestroyChildPolicyLocked();
465 }
466
467 //
468 // child policy-related methods
469 //
470
UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list)471 void EdsLb::UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list) {
472 // Build some maps from locality to child number and the reverse from
473 // the old data in priority_list_ and priority_child_numbers_.
474 std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
475 locality_child_map;
476 std::map<size_t, std::set<XdsLocalityName*>> child_locality_map;
477 for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
478 size_t child_number = priority_child_numbers_[priority];
479 const auto& localities = priority_list_[priority].localities;
480 for (const auto& p : localities) {
481 XdsLocalityName* locality_name = p.first;
482 locality_child_map[locality_name] = child_number;
483 child_locality_map[child_number].insert(locality_name);
484 }
485 }
486 // Construct new list of children.
487 std::vector<size_t> priority_child_numbers;
488 for (size_t priority = 0; priority < priority_list.size(); ++priority) {
489 const auto& localities = priority_list[priority].localities;
490 absl::optional<size_t> child_number;
491 // If one of the localities in this priority already existed, reuse its
492 // child number.
493 for (const auto& p : localities) {
494 XdsLocalityName* locality_name = p.first;
495 if (!child_number.has_value()) {
496 auto it = locality_child_map.find(locality_name);
497 if (it != locality_child_map.end()) {
498 child_number = it->second;
499 locality_child_map.erase(it);
500 // Remove localities that *used* to be in this child number, so
501 // that we don't incorrectly reuse this child number for a
502 // subsequent priority.
503 for (XdsLocalityName* old_locality :
504 child_locality_map[*child_number]) {
505 locality_child_map.erase(old_locality);
506 }
507 }
508 } else {
509 // Remove all localities that are now in this child number, so
510 // that we don't accidentally reuse this child number for a
511 // subsequent priority.
512 locality_child_map.erase(locality_name);
513 }
514 }
515 // If we didn't find an existing child number, assign a new one.
516 if (!child_number.has_value()) {
517 for (child_number = 0;
518 child_locality_map.find(*child_number) != child_locality_map.end();
519 ++(*child_number)) {
520 }
521 // Add entry so we know that the child number is in use.
522 // (Don't need to add the list of localities, since we won't use them.)
523 child_locality_map[*child_number];
524 }
525 priority_child_numbers.push_back(*child_number);
526 }
527 // Save update.
528 priority_list_ = std::move(priority_list);
529 priority_child_numbers_ = std::move(priority_child_numbers);
530 // Update child policy.
531 UpdateChildPolicyLocked();
532 }
533
CreateChildPolicyAddressesLocked()534 ServerAddressList EdsLb::CreateChildPolicyAddressesLocked() {
535 ServerAddressList addresses;
536 for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
537 const auto& localities = priority_list_[priority].localities;
538 std::string priority_child_name =
539 absl::StrCat("child", priority_child_numbers_[priority]);
540 for (const auto& p : localities) {
541 const auto& locality_name = p.first;
542 const auto& locality = p.second;
543 std::vector<std::string> hierarchical_path = {
544 priority_child_name, locality_name->AsHumanReadableString()};
545 for (const auto& endpoint : locality.endpoints) {
546 addresses.emplace_back(
547 endpoint
548 .WithAttribute(kHierarchicalPathAttributeKey,
549 MakeHierarchicalPathAttribute(hierarchical_path))
550 .WithAttribute(kXdsLocalityNameAttributeKey,
551 absl::make_unique<XdsLocalityAttribute>(
552 locality_name->Ref())));
553 }
554 }
555 }
556 return addresses;
557 }
558
559 RefCountedPtr<LoadBalancingPolicy::Config>
CreateChildPolicyConfigLocked()560 EdsLb::CreateChildPolicyConfigLocked() {
561 const auto lrs_key = GetLrsClusterKey();
562 Json::Object priority_children;
563 Json::Array priority_priorities;
564 for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
565 const auto& localities = priority_list_[priority].localities;
566 Json::Object weighted_targets;
567 for (const auto& p : localities) {
568 XdsLocalityName* locality_name = p.first;
569 const auto& locality = p.second;
570 // Construct JSON object containing locality name.
571 Json::Object locality_name_json;
572 if (!locality_name->region().empty()) {
573 locality_name_json["region"] = locality_name->region();
574 }
575 if (!locality_name->zone().empty()) {
576 locality_name_json["zone"] = locality_name->zone();
577 }
578 if (!locality_name->sub_zone().empty()) {
579 locality_name_json["subzone"] = locality_name->sub_zone();
580 }
581 // Add weighted target entry.
582 weighted_targets[locality_name->AsHumanReadableString()] = Json::Object{
583 {"weight", locality.lb_weight},
584 {"childPolicy", config_->endpoint_picking_policy()},
585 };
586 }
587 // Construct locality-picking policy.
588 // Start with field from our config and add the "targets" field.
589 Json locality_picking_config = config_->locality_picking_policy();
590 Json::Object& config =
591 *(*locality_picking_config.mutable_array())[0].mutable_object();
592 auto it = config.begin();
593 GPR_ASSERT(it != config.end());
594 (*it->second.mutable_object())["targets"] = std::move(weighted_targets);
595 // Wrap it in the drop policy.
596 Json::Array drop_categories;
597 for (const auto& category : drop_config_->drop_category_list()) {
598 drop_categories.push_back(Json::Object{
599 {"category", category.name},
600 {"requests_per_million", category.parts_per_million},
601 });
602 }
603 Json::Object xds_cluster_impl_config = {
604 {"clusterName", std::string(lrs_key.first)},
605 {"childPolicy", std::move(locality_picking_config)},
606 {"dropCategories", std::move(drop_categories)},
607 {"maxConcurrentRequests", config_->max_concurrent_requests()},
608 };
609 if (!lrs_key.second.empty()) {
610 xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second);
611 }
612 if (config_->lrs_load_reporting_server_name().has_value()) {
613 xds_cluster_impl_config["lrsLoadReportingServerName"] =
614 config_->lrs_load_reporting_server_name().value();
615 }
616 Json locality_picking_policy = Json::Array{Json::Object{
617 {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)},
618 }};
619 // Add priority entry.
620 const size_t child_number = priority_child_numbers_[priority];
621 std::string child_name = absl::StrCat("child", child_number);
622 priority_priorities.emplace_back(child_name);
623 priority_children[child_name] = Json::Object{
624 {"config", std::move(locality_picking_policy)},
625 {"ignore_reresolution_requests", true},
626 };
627 }
628 Json json = Json::Array{Json::Object{
629 {"priority_experimental",
630 Json::Object{
631 {"children", std::move(priority_children)},
632 {"priorities", std::move(priority_priorities)},
633 }},
634 }};
635 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
636 std::string json_str = json.Dump(/*indent=*/1);
637 gpr_log(GPR_INFO, "[edslb %p] generated config for child policy: %s", this,
638 json_str.c_str());
639 }
640 grpc_error* error = GRPC_ERROR_NONE;
641 RefCountedPtr<LoadBalancingPolicy::Config> config =
642 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
643 if (error != GRPC_ERROR_NONE) {
644 // This should never happen, but if it does, we basically have no
645 // way to fix it, so we put the channel in TRANSIENT_FAILURE.
646 gpr_log(GPR_ERROR,
647 "[edslb %p] error parsing generated child policy config -- "
648 "will put channel in TRANSIENT_FAILURE: %s",
649 this, grpc_error_string(error));
650 error = grpc_error_set_int(
651 grpc_error_add_child(
652 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
653 "eds LB policy: error parsing generated child policy config"),
654 error),
655 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
656 channel_control_helper()->UpdateState(
657 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
658 absl::make_unique<TransientFailurePicker>(error));
659 return nullptr;
660 }
661 return config;
662 }
663
UpdateChildPolicyLocked()664 void EdsLb::UpdateChildPolicyLocked() {
665 if (shutting_down_) return;
666 UpdateArgs update_args;
667 update_args.config = CreateChildPolicyConfigLocked();
668 if (update_args.config == nullptr) return;
669 update_args.addresses = CreateChildPolicyAddressesLocked();
670 update_args.args = CreateChildPolicyArgsLocked(args_);
671 if (child_policy_ == nullptr) {
672 child_policy_ = CreateChildPolicyLocked(update_args.args);
673 }
674 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
675 gpr_log(GPR_INFO, "[edslb %p] Updating child policy %p", this,
676 child_policy_.get());
677 }
678 child_policy_->UpdateLocked(std::move(update_args));
679 }
680
CreateChildPolicyArgsLocked(const grpc_channel_args * args)681 grpc_channel_args* EdsLb::CreateChildPolicyArgsLocked(
682 const grpc_channel_args* args) {
683 grpc_arg args_to_add[] = {
684 // A channel arg indicating if the target is a backend inferred from an
685 // xds load balancer.
686 // TODO(roth): This isn't needed with the new fallback design.
687 // Remove as part of implementing the new fallback functionality.
688 grpc_channel_arg_integer_create(
689 const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER),
690 1),
691 // Inhibit client-side health checking, since the balancer does
692 // this for us.
693 grpc_channel_arg_integer_create(
694 const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1),
695 };
696 return grpc_channel_args_copy_and_add(args, args_to_add,
697 GPR_ARRAY_SIZE(args_to_add));
698 }
699
CreateChildPolicyLocked(const grpc_channel_args * args)700 OrphanablePtr<LoadBalancingPolicy> EdsLb::CreateChildPolicyLocked(
701 const grpc_channel_args* args) {
702 LoadBalancingPolicy::Args lb_policy_args;
703 lb_policy_args.work_serializer = work_serializer();
704 lb_policy_args.args = args;
705 lb_policy_args.channel_control_helper =
706 absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
707 OrphanablePtr<LoadBalancingPolicy> lb_policy =
708 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
709 "priority_experimental", std::move(lb_policy_args));
710 if (GPR_UNLIKELY(lb_policy == nullptr)) {
711 gpr_log(GPR_ERROR, "[edslb %p] failure creating child policy", this);
712 return nullptr;
713 }
714 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_eds_trace)) {
715 gpr_log(GPR_INFO, "[edslb %p]: Created new child policy %p", this,
716 lb_policy.get());
717 }
718 // Add our interested_parties pollset_set to that of the newly created
719 // child policy. This will make the child policy progress upon activity on
720 // this policy, which in turn is tied to the application's call.
721 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
722 interested_parties());
723 return lb_policy;
724 }
725
726 //
727 // factory
728 //
729
730 class EdsLbFactory : public LoadBalancingPolicyFactory {
731 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const732 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
733 LoadBalancingPolicy::Args args) const override {
734 grpc_error* error = GRPC_ERROR_NONE;
735 RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
736 if (error != GRPC_ERROR_NONE) {
737 gpr_log(GPR_ERROR,
738 "cannot get XdsClient to instantiate eds LB policy: %s",
739 grpc_error_string(error));
740 GRPC_ERROR_UNREF(error);
741 return nullptr;
742 }
743 return MakeOrphanable<EdsChildHandler>(std::move(xds_client),
744 std::move(args));
745 }
746
name() const747 const char* name() const override { return kEds; }
748
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const749 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
750 const Json& json, grpc_error** error) const override {
751 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
752 if (json.type() == Json::Type::JSON_NULL) {
753 // eds was mentioned as a policy in the deprecated loadBalancingPolicy
754 // field or in the client API.
755 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
756 "field:loadBalancingPolicy error:eds policy requires configuration. "
757 "Please use loadBalancingConfig field of service config instead.");
758 return nullptr;
759 }
760 std::vector<grpc_error*> error_list;
761 // EDS service name.
762 std::string eds_service_name;
763 auto it = json.object_value().find("edsServiceName");
764 if (it != json.object_value().end()) {
765 if (it->second.type() != Json::Type::STRING) {
766 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
767 "field:edsServiceName error:type should be string"));
768 } else {
769 eds_service_name = it->second.string_value();
770 }
771 }
772 // Cluster name.
773 std::string cluster_name;
774 it = json.object_value().find("clusterName");
775 if (it == json.object_value().end()) {
776 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
777 "field:clusterName error:required field missing"));
778 } else if (it->second.type() != Json::Type::STRING) {
779 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
780 "field:clusterName error:type should be string"));
781 } else {
782 cluster_name = it->second.string_value();
783 }
784 // LRS load reporting server name.
785 absl::optional<std::string> lrs_load_reporting_server_name;
786 it = json.object_value().find("lrsLoadReportingServerName");
787 if (it != json.object_value().end()) {
788 if (it->second.type() != Json::Type::STRING) {
789 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
790 "field:lrsLoadReportingServerName error:type should be string"));
791 } else {
792 lrs_load_reporting_server_name.emplace(it->second.string_value());
793 }
794 }
795 // Locality-picking policy.
796 Json locality_picking_policy;
797 it = json.object_value().find("localityPickingPolicy");
798 if (it == json.object_value().end()) {
799 locality_picking_policy = Json::Array{
800 Json::Object{
801 {"weighted_target_experimental",
802 Json::Object{
803 {"targets", Json::Object()},
804 }},
805 },
806 };
807 } else {
808 locality_picking_policy = it->second;
809 }
810 grpc_error* parse_error = GRPC_ERROR_NONE;
811 if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
812 locality_picking_policy, &parse_error) == nullptr) {
813 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
814 error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
815 "localityPickingPolicy", &parse_error, 1));
816 GRPC_ERROR_UNREF(parse_error);
817 }
818 // Endpoint-picking policy. Called "childPolicy" for xds policy.
819 Json endpoint_picking_policy;
820 it = json.object_value().find("endpointPickingPolicy");
821 if (it == json.object_value().end()) {
822 endpoint_picking_policy = Json::Array{
823 Json::Object{
824 {"round_robin", Json::Object()},
825 },
826 };
827 } else {
828 endpoint_picking_policy = it->second;
829 }
830 parse_error = GRPC_ERROR_NONE;
831 if (LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
832 endpoint_picking_policy, &parse_error) == nullptr) {
833 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
834 error_list.push_back(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
835 "endpointPickingPolicy", &parse_error, 1));
836 GRPC_ERROR_UNREF(parse_error);
837 }
838 // Max concurrent requests.
839 uint32_t max_concurrent_requests = 1024;
840 it = json.object_value().find("max_concurrent_requests");
841 if (it != json.object_value().end()) {
842 if (it->second.type() != Json::Type::NUMBER) {
843 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
844 "field:max_concurrent_requests error:must be of type number"));
845 } else {
846 max_concurrent_requests =
847 gpr_parse_nonnegative_int(it->second.string_value().c_str());
848 }
849 }
850 // Construct config.
851 if (error_list.empty()) {
852 return MakeRefCounted<EdsLbConfig>(
853 std::move(cluster_name), std::move(eds_service_name),
854 std::move(lrs_load_reporting_server_name),
855 std::move(locality_picking_policy),
856 std::move(endpoint_picking_policy), max_concurrent_requests);
857 } else {
858 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
859 "eds_experimental LB policy config", &error_list);
860 return nullptr;
861 }
862 }
863
864 private:
865 class EdsChildHandler : public ChildPolicyHandler {
866 public:
EdsChildHandler(RefCountedPtr<XdsClient> xds_client,Args args)867 EdsChildHandler(RefCountedPtr<XdsClient> xds_client, Args args)
868 : ChildPolicyHandler(std::move(args), &grpc_lb_eds_trace),
869 xds_client_(std::move(xds_client)) {}
870
ConfigChangeRequiresNewPolicyInstance(LoadBalancingPolicy::Config * old_config,LoadBalancingPolicy::Config * new_config) const871 bool ConfigChangeRequiresNewPolicyInstance(
872 LoadBalancingPolicy::Config* old_config,
873 LoadBalancingPolicy::Config* new_config) const override {
874 GPR_ASSERT(old_config->name() == kEds);
875 GPR_ASSERT(new_config->name() == kEds);
876 EdsLbConfig* old_eds_config = static_cast<EdsLbConfig*>(old_config);
877 EdsLbConfig* new_eds_config = static_cast<EdsLbConfig*>(new_config);
878 return old_eds_config->cluster_name() != new_eds_config->cluster_name() ||
879 old_eds_config->eds_service_name() !=
880 new_eds_config->eds_service_name() ||
881 old_eds_config->lrs_load_reporting_server_name() !=
882 new_eds_config->lrs_load_reporting_server_name();
883 }
884
CreateLoadBalancingPolicy(const char * name,LoadBalancingPolicy::Args args) const885 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
886 const char* name, LoadBalancingPolicy::Args args) const override {
887 return MakeOrphanable<EdsLb>(xds_client_, std::move(args));
888 }
889
890 private:
891 RefCountedPtr<XdsClient> xds_client_;
892 };
893 };
894
895 } // namespace
896
897 } // namespace grpc_core
898
899 //
900 // Plugin registration
901 //
902
grpc_lb_policy_eds_init()903 void grpc_lb_policy_eds_init() {
904 grpc_core::LoadBalancingPolicyRegistry::Builder::
905 RegisterLoadBalancingPolicyFactory(
906 absl::make_unique<grpc_core::EdsLbFactory>());
907 }
908
grpc_lb_policy_eds_shutdown()909 void grpc_lb_policy_eds_shutdown() {}
910