1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
24 #include "re2/re2.h"
25 #define XXH_INLINE_ALL
26 #include "xxhash.h"
27
28 #include "src/core/ext/filters/client_channel/config_selector.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
30 #include "src/core/ext/filters/client_channel/resolver_registry.h"
31 #include "src/core/ext/xds/xds_channel_args.h"
32 #include "src/core/ext/xds/xds_client.h"
33 #include "src/core/ext/xds/xds_http_filters.h"
34 #include "src/core/ext/xds/xds_routing.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/iomgr/closure.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/transport/timeout_encoding.h"
39
40 namespace grpc_core {
41
42 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
43
44 const char* kXdsClusterAttribute = "xds_cluster_name";
45
46 namespace {
47
48 //
49 // XdsResolver
50 //
51
52 class XdsResolver : public Resolver {
53 public:
XdsResolver(ResolverArgs args)54 explicit XdsResolver(ResolverArgs args)
55 : work_serializer_(std::move(args.work_serializer)),
56 result_handler_(std::move(args.result_handler)),
57 server_name_(absl::StripPrefix(args.uri.path(), "/")),
58 args_(grpc_channel_args_copy(args.args)),
59 interested_parties_(args.pollset_set) {
60 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
61 gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
62 server_name_.c_str());
63 }
64 }
65
~XdsResolver()66 ~XdsResolver() override {
67 grpc_channel_args_destroy(args_);
68 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
69 gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
70 }
71 }
72
73 void StartLocked() override;
74
75 void ShutdownLocked() override;
76
ResetBackoffLocked()77 void ResetBackoffLocked() override {
78 if (xds_client_ != nullptr) xds_client_->ResetBackoff();
79 }
80
81 private:
82 class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
83 public:
ListenerWatcher(RefCountedPtr<XdsResolver> resolver)84 explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
85 : resolver_(std::move(resolver)) {}
OnListenerChanged(XdsApi::LdsUpdate listener)86 void OnListenerChanged(XdsApi::LdsUpdate listener) override {
87 Ref().release(); // ref held by lambda
88 resolver_->work_serializer_->Run(
89 // TODO(yashykt): When we move to C++14, capture listener with
90 // std::move
91 [this, listener]() mutable {
92 resolver_->OnListenerUpdate(std::move(listener));
93 Unref();
94 },
95 DEBUG_LOCATION);
96 }
OnError(grpc_error_handle error)97 void OnError(grpc_error_handle error) override {
98 Ref().release(); // ref held by lambda
99 resolver_->work_serializer_->Run(
100 [this, error]() {
101 resolver_->OnError(error);
102 Unref();
103 },
104 DEBUG_LOCATION);
105 }
OnResourceDoesNotExist()106 void OnResourceDoesNotExist() override {
107 Ref().release(); // ref held by lambda
108 resolver_->work_serializer_->Run(
109 [this]() {
110 resolver_->OnResourceDoesNotExist();
111 Unref();
112 },
113 DEBUG_LOCATION);
114 }
115
116 private:
117 RefCountedPtr<XdsResolver> resolver_;
118 };
119
120 class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
121 public:
RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)122 explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
123 : resolver_(std::move(resolver)) {}
OnRouteConfigChanged(XdsApi::RdsUpdate route_config)124 void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
125 Ref().release(); // ref held by lambda
126 resolver_->work_serializer_->Run(
127 // TODO(yashykt): When we move to C++14, capture route_config with
128 // std::move
129 [this, route_config]() mutable {
130 resolver_->OnRouteConfigUpdate(std::move(route_config));
131 Unref();
132 },
133 DEBUG_LOCATION);
134 }
OnError(grpc_error_handle error)135 void OnError(grpc_error_handle error) override {
136 Ref().release(); // ref held by lambda
137 resolver_->work_serializer_->Run(
138 [this, error]() {
139 resolver_->OnError(error);
140 Unref();
141 },
142 DEBUG_LOCATION);
143 }
OnResourceDoesNotExist()144 void OnResourceDoesNotExist() override {
145 Ref().release(); // ref held by lambda
146 resolver_->work_serializer_->Run(
147 [this]() {
148 resolver_->OnResourceDoesNotExist();
149 Unref();
150 },
151 DEBUG_LOCATION);
152 }
153
154 private:
155 RefCountedPtr<XdsResolver> resolver_;
156 };
157
158 // An entry in the map of clusters that need to be present in the LB
159 // policy config. The map holds a weak ref. One strong ref is held by
160 // the ConfigSelector, and another is held by each call assigned to
161 // the cluster by the ConfigSelector. The ref for each call is held
162 // until the call is committed. When the strong refs go away, we hop
163 // back into the WorkSerializer to remove the entry from the map.
164 class ClusterState : public DualRefCounted<ClusterState> {
165 public:
166 using ClusterStateMap =
167 std::map<std::string, WeakRefCountedPtr<ClusterState>>;
168
ClusterState(RefCountedPtr<XdsResolver> resolver,const std::string & cluster_name)169 ClusterState(RefCountedPtr<XdsResolver> resolver,
170 const std::string& cluster_name)
171 : resolver_(std::move(resolver)),
172 it_(resolver_->cluster_state_map_.emplace(cluster_name, WeakRef())
173 .first) {}
174
Orphan()175 void Orphan() override {
176 auto* resolver = resolver_.release();
177 resolver->work_serializer_->Run(
178 [resolver]() {
179 resolver->MaybeRemoveUnusedClusters();
180 resolver->Unref();
181 },
182 DEBUG_LOCATION);
183 }
184
cluster() const185 const std::string& cluster() const { return it_->first; }
186
187 private:
188 RefCountedPtr<XdsResolver> resolver_;
189 ClusterStateMap::iterator it_;
190 };
191
192 // Call dispatch controller, created for each call handled by the
193 // ConfigSelector. Holds a ref to the ClusterState object until the
194 // call is committed.
195 class XdsCallDispatchController
196 : public ConfigSelector::CallDispatchController {
197 public:
XdsCallDispatchController(RefCountedPtr<ClusterState> cluster_state)198 explicit XdsCallDispatchController(
199 RefCountedPtr<ClusterState> cluster_state)
200 : cluster_state_(std::move(cluster_state)) {}
201
ShouldRetry()202 bool ShouldRetry() override {
203 // TODO(donnadionne): Implement the retry circuit breaker here.
204 return true;
205 }
206
Commit()207 void Commit() override {
208 // TODO(donnadionne): If ShouldRetry() was called previously,
209 // decrement the retry circuit breaker counter.
210 cluster_state_.reset();
211 }
212
213 private:
214 // Note: The XdsCallDispatchController object is never actually destroyed,
215 // so do not add any data members that require destruction unless you have
216 // some other way to clean them up.
217 RefCountedPtr<ClusterState> cluster_state_;
218 };
219
220 class XdsConfigSelector : public ConfigSelector {
221 public:
222 XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
223 grpc_error_handle* error);
224 ~XdsConfigSelector() override;
225
name() const226 const char* name() const override { return "XdsConfigSelector"; }
227
Equals(const ConfigSelector * other) const228 bool Equals(const ConfigSelector* other) const override {
229 const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
230 // Don't need to compare resolver_, since that will always be the same.
231 return route_table_ == other_xds->route_table_ &&
232 clusters_ == other_xds->clusters_;
233 }
234
235 CallConfig GetCallConfig(GetCallConfigArgs args) override;
236
GetFilters()237 std::vector<const grpc_channel_filter*> GetFilters() override {
238 return filters_;
239 }
240
241 grpc_channel_args* ModifyChannelArgs(grpc_channel_args* args) override;
242
243 private:
244 struct Route {
245 struct ClusterWeightState {
246 uint32_t range_end;
247 absl::string_view cluster;
248 RefCountedPtr<ServiceConfig> method_config;
249
250 bool operator==(const ClusterWeightState& other) const;
251 };
252
253 XdsApi::Route route;
254 RefCountedPtr<ServiceConfig> method_config;
255 absl::InlinedVector<ClusterWeightState, 2> weighted_cluster_state;
256
257 bool operator==(const Route& other) const;
258 };
259 using RouteTable = std::vector<Route>;
260
261 class RouteListIterator;
262
263 void MaybeAddCluster(const std::string& name);
264 grpc_error_handle CreateMethodConfig(
265 const XdsApi::Route& route,
266 const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight,
267 RefCountedPtr<ServiceConfig>* method_config);
268
269 RefCountedPtr<XdsResolver> resolver_;
270 RouteTable route_table_;
271 std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
272 std::vector<const grpc_channel_filter*> filters_;
273 };
274
275 void OnListenerUpdate(XdsApi::LdsUpdate listener);
276 void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
277 void OnError(grpc_error_handle error);
278 void OnResourceDoesNotExist();
279
280 grpc_error_handle CreateServiceConfig(
281 RefCountedPtr<ServiceConfig>* service_config);
282 void GenerateResult();
283 void MaybeRemoveUnusedClusters();
284
285 std::shared_ptr<WorkSerializer> work_serializer_;
286 std::unique_ptr<ResultHandler> result_handler_;
287 std::string server_name_;
288 const grpc_channel_args* args_;
289 grpc_pollset_set* interested_parties_;
290
291 RefCountedPtr<XdsClient> xds_client_;
292
293 XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
294 // This will not contain the RouteConfiguration, even if it comes with the
295 // LDS response; instead, the relevant VirtualHost from the
296 // RouteConfiguration will be saved in current_virtual_host_.
297 XdsApi::LdsUpdate current_listener_;
298
299 std::string route_config_name_;
300 XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
301 XdsApi::RdsUpdate::VirtualHost current_virtual_host_;
302
303 ClusterState::ClusterStateMap cluster_state_map_;
304 };
305
306 //
307 // XdsResolver::XdsConfigSelector::Route
308 //
309
MethodConfigsEqual(const ServiceConfig * sc1,const ServiceConfig * sc2)310 bool MethodConfigsEqual(const ServiceConfig* sc1, const ServiceConfig* sc2) {
311 if (sc1 == nullptr) return sc2 == nullptr;
312 if (sc2 == nullptr) return false;
313 return sc1->json_string() == sc2->json_string();
314 }
315
operator ==(const ClusterWeightState & other) const316 bool XdsResolver::XdsConfigSelector::Route::ClusterWeightState::operator==(
317 const ClusterWeightState& other) const {
318 return range_end == other.range_end && cluster == other.cluster &&
319 MethodConfigsEqual(method_config.get(), other.method_config.get());
320 }
321
operator ==(const Route & other) const322 bool XdsResolver::XdsConfigSelector::Route::operator==(
323 const Route& other) const {
324 return route == other.route &&
325 weighted_cluster_state == other.weighted_cluster_state &&
326 MethodConfigsEqual(method_config.get(), other.method_config.get());
327 }
328
329 // Implementation of XdsRouting::RouteListIterator for getting the matching
330 // route for a request.
331 class XdsResolver::XdsConfigSelector::RouteListIterator
332 : public XdsRouting::RouteListIterator {
333 public:
RouteListIterator(const RouteTable * route_table)334 explicit RouteListIterator(const RouteTable* route_table)
335 : route_table_(route_table) {}
336
Size() const337 size_t Size() const override { return route_table_->size(); }
338
GetMatchersForRoute(size_t index) const339 const XdsApi::Route::Matchers& GetMatchersForRoute(
340 size_t index) const override {
341 return (*route_table_)[index].route.matchers;
342 }
343
344 private:
345 const RouteTable* route_table_;
346 };
347
348 //
349 // XdsResolver::XdsConfigSelector
350 //
351
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,grpc_error_handle * error)352 XdsResolver::XdsConfigSelector::XdsConfigSelector(
353 RefCountedPtr<XdsResolver> resolver, grpc_error_handle* error)
354 : resolver_(std::move(resolver)) {
355 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
356 gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
357 resolver_.get(), this);
358 }
359 // 1. Construct the route table
360 // 2 Update resolver's cluster state map
361 // 3. Construct cluster list to hold on to entries in the cluster state
362 // map.
363 // Reserve the necessary entries up-front to avoid reallocation as we add
364 // elements. This is necessary because the string_view in the entry's
365 // weighted_cluster_state field points to the memory in the route field, so
366 // moving the entry in a reallocation will cause the string_view to point to
367 // invalid data.
368 route_table_.reserve(resolver_->current_virtual_host_.routes.size());
369 for (auto& route : resolver_->current_virtual_host_.routes) {
370 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
371 gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
372 resolver_.get(), this, route.ToString().c_str());
373 }
374 route_table_.emplace_back();
375 auto& route_entry = route_table_.back();
376 route_entry.route = route;
377 auto* route_action =
378 absl::get_if<XdsApi::Route::RouteAction>(&route_entry.route.action);
379 if (route_action != nullptr) {
380 // If the route doesn't specify a timeout, set its timeout to the global
381 // one.
382 if (!route_action->max_stream_duration.has_value()) {
383 route_action->max_stream_duration =
384 resolver_->current_listener_.http_connection_manager
385 .http_max_stream_duration;
386 }
387 if (route_action->weighted_clusters.empty()) {
388 *error = CreateMethodConfig(route_entry.route, nullptr,
389 &route_entry.method_config);
390 MaybeAddCluster(route_action->cluster_name);
391 } else {
392 uint32_t end = 0;
393 for (const auto& weighted_cluster : route_action->weighted_clusters) {
394 Route::ClusterWeightState cluster_weight_state;
395 *error = CreateMethodConfig(route_entry.route, &weighted_cluster,
396 &cluster_weight_state.method_config);
397 if (*error != GRPC_ERROR_NONE) return;
398 end += weighted_cluster.weight;
399 cluster_weight_state.range_end = end;
400 cluster_weight_state.cluster = weighted_cluster.name;
401 route_entry.weighted_cluster_state.push_back(
402 std::move(cluster_weight_state));
403 MaybeAddCluster(weighted_cluster.name);
404 }
405 }
406 }
407 }
408 // Populate filter list.
409 for (const auto& http_filter :
410 resolver_->current_listener_.http_connection_manager.http_filters) {
411 // Find filter. This is guaranteed to succeed, because it's checked
412 // at config validation time in the XdsApi code.
413 const XdsHttpFilterImpl* filter_impl =
414 XdsHttpFilterRegistry::GetFilterForType(
415 http_filter.config.config_proto_type_name);
416 GPR_ASSERT(filter_impl != nullptr);
417 // Add C-core filter to list.
418 if (filter_impl->channel_filter() != nullptr) {
419 filters_.push_back(filter_impl->channel_filter());
420 }
421 }
422 }
423
~XdsConfigSelector()424 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
425 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
426 gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
427 resolver_.get(), this);
428 }
429 clusters_.clear();
430 resolver_->MaybeRemoveUnusedClusters();
431 }
432
CreateMethodConfig(const XdsApi::Route & route,const XdsApi::Route::RouteAction::ClusterWeight * cluster_weight,RefCountedPtr<ServiceConfig> * method_config)433 grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
434 const XdsApi::Route& route,
435 const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight,
436 RefCountedPtr<ServiceConfig>* method_config) {
437 std::vector<std::string> fields;
438 const auto& route_action =
439 absl::get<XdsApi::Route::RouteAction>(route.action);
440 // Set retry policy if any.
441 if (route_action.retry_policy.has_value() &&
442 !route_action.retry_policy->retry_on.Empty()) {
443 std::vector<std::string> retry_parts;
444 retry_parts.push_back(absl::StrFormat(
445 "\"retryPolicy\": {\n"
446 " \"maxAttempts\": %d,\n"
447 " \"initialBackoff\": \"%d.%09ds\",\n"
448 " \"maxBackoff\": \"%d.%09ds\",\n"
449 " \"backoffMultiplier\": 2,\n",
450 route_action.retry_policy->num_retries + 1,
451 route_action.retry_policy->retry_back_off.base_interval.seconds,
452 route_action.retry_policy->retry_back_off.base_interval.nanos,
453 route_action.retry_policy->retry_back_off.max_interval.seconds,
454 route_action.retry_policy->retry_back_off.max_interval.nanos));
455 std::vector<std::string> code_parts;
456 if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_CANCELLED)) {
457 code_parts.push_back(" \"CANCELLED\"");
458 }
459 if (route_action.retry_policy->retry_on.Contains(
460 GRPC_STATUS_DEADLINE_EXCEEDED)) {
461 code_parts.push_back(" \"DEADLINE_EXCEEDED\"");
462 }
463 if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_INTERNAL)) {
464 code_parts.push_back(" \"INTERNAL\"");
465 }
466 if (route_action.retry_policy->retry_on.Contains(
467 GRPC_STATUS_RESOURCE_EXHAUSTED)) {
468 code_parts.push_back(" \"RESOURCE_EXHAUSTED\"");
469 }
470 if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_UNAVAILABLE)) {
471 code_parts.push_back(" \"UNAVAILABLE\"");
472 }
473 retry_parts.push_back(
474 absl::StrFormat(" \"retryableStatusCodes\": [\n %s ]\n",
475 absl::StrJoin(code_parts, ",\n")));
476 retry_parts.push_back(absl::StrFormat(" }"));
477 fields.emplace_back(absl::StrJoin(retry_parts, ""));
478 }
479 // Set timeout.
480 if (route_action.max_stream_duration.has_value() &&
481 (route_action.max_stream_duration->seconds != 0 ||
482 route_action.max_stream_duration->nanos != 0)) {
483 fields.emplace_back(
484 absl::StrFormat(" \"timeout\": \"%d.%09ds\"",
485 route_action.max_stream_duration->seconds,
486 route_action.max_stream_duration->nanos));
487 }
488 // Handle xDS HTTP filters.
489 XdsRouting::GeneratePerHttpFilterConfigsResult result =
490 XdsRouting::GeneratePerHTTPFilterConfigs(
491 resolver_->current_listener_.http_connection_manager.http_filters,
492 resolver_->current_virtual_host_, route, cluster_weight,
493 grpc_channel_args_copy(resolver_->args_));
494 if (result.error != GRPC_ERROR_NONE) {
495 return result.error;
496 }
497 for (const auto& p : result.per_filter_configs) {
498 fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n",
499 absl::StrJoin(p.second, ",\n"),
500 "\n ]"));
501 }
502 // Construct service config.
503 grpc_error_handle error = GRPC_ERROR_NONE;
504 if (!fields.empty()) {
505 std::string json = absl::StrCat(
506 "{\n"
507 " \"methodConfig\": [ {\n"
508 " \"name\": [\n"
509 " {}\n"
510 " ],\n"
511 " ",
512 absl::StrJoin(fields, ",\n"),
513 "\n } ]\n"
514 "}");
515 *method_config = ServiceConfig::Create(result.args, json.c_str(), &error);
516 }
517 grpc_channel_args_destroy(result.args);
518 return error;
519 }
520
ModifyChannelArgs(grpc_channel_args * args)521 grpc_channel_args* XdsResolver::XdsConfigSelector::ModifyChannelArgs(
522 grpc_channel_args* args) {
523 return args;
524 }
525
MaybeAddCluster(const std::string & name)526 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
527 if (clusters_.find(name) == clusters_.end()) {
528 auto it = resolver_->cluster_state_map_.find(name);
529 if (it == resolver_->cluster_state_map_.end()) {
530 auto new_cluster_state = MakeRefCounted<ClusterState>(resolver_, name);
531 clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
532 } else {
533 clusters_[it->second->cluster()] = it->second->Ref();
534 }
535 }
536 }
537
HeaderHashHelper(const XdsApi::Route::RouteAction::HashPolicy & policy,grpc_metadata_batch * initial_metadata)538 absl::optional<uint64_t> HeaderHashHelper(
539 const XdsApi::Route::RouteAction::HashPolicy& policy,
540 grpc_metadata_batch* initial_metadata) {
541 GPR_ASSERT(policy.type == XdsApi::Route::RouteAction::HashPolicy::HEADER);
542 std::string value_buffer;
543 absl::optional<absl::string_view> header_value = XdsRouting::GetHeaderValue(
544 initial_metadata, policy.header_name, &value_buffer);
545 if (!header_value.has_value()) {
546 return absl::nullopt;
547 }
548 if (policy.regex != nullptr) {
549 // If GetHeaderValue() did not already store the value in
550 // value_buffer, copy it there now, so we can modify it.
551 if (header_value->data() != value_buffer.data()) {
552 value_buffer = std::string(*header_value);
553 }
554 RE2::GlobalReplace(&value_buffer, *policy.regex, policy.regex_substitution);
555 header_value = value_buffer;
556 }
557 return XXH64(header_value->data(), header_value->size(), 0);
558 }
559
GetCallConfig(GetCallConfigArgs args)560 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
561 GetCallConfigArgs args) {
562 auto route_index = XdsRouting::GetRouteForRequest(
563 RouteListIterator(&route_table_), StringViewFromSlice(*args.path),
564 args.initial_metadata);
565 if (!route_index.has_value()) {
566 return CallConfig();
567 }
568 auto& entry = route_table_[*route_index];
569 // Found a route match
570 const auto* route_action =
571 absl::get_if<XdsApi::Route::RouteAction>(&entry.route.action);
572 if (route_action == nullptr) {
573 CallConfig call_config;
574 call_config.error =
575 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
576 "Matching route has inappropriate action"),
577 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
578 return call_config;
579 }
580 absl::string_view cluster_name;
581 RefCountedPtr<ServiceConfig> method_config;
582 if (route_action->weighted_clusters.empty()) {
583 cluster_name = route_action->cluster_name;
584 method_config = entry.method_config;
585 } else {
586 const uint32_t key =
587 rand() %
588 entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
589 .range_end;
590 // Find the index in weighted clusters corresponding to key.
591 size_t mid = 0;
592 size_t start_index = 0;
593 size_t end_index = entry.weighted_cluster_state.size() - 1;
594 size_t index = 0;
595 while (end_index > start_index) {
596 mid = (start_index + end_index) / 2;
597 if (entry.weighted_cluster_state[mid].range_end > key) {
598 end_index = mid;
599 } else if (entry.weighted_cluster_state[mid].range_end < key) {
600 start_index = mid + 1;
601 } else {
602 index = mid + 1;
603 break;
604 }
605 }
606 if (index == 0) index = start_index;
607 GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key);
608 cluster_name = entry.weighted_cluster_state[index].cluster;
609 method_config = entry.weighted_cluster_state[index].method_config;
610 }
611 auto it = clusters_.find(cluster_name);
612 GPR_ASSERT(it != clusters_.end());
613 // Generate a hash.
614 absl::optional<uint64_t> hash;
615 for (const auto& hash_policy : route_action->hash_policies) {
616 absl::optional<uint64_t> new_hash;
617 switch (hash_policy.type) {
618 case XdsApi::Route::RouteAction::HashPolicy::HEADER:
619 new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
620 break;
621 case XdsApi::Route::RouteAction::HashPolicy::CHANNEL_ID:
622 new_hash =
623 static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver_.get()));
624 break;
625 default:
626 GPR_ASSERT(0);
627 }
628 if (new_hash.has_value()) {
629 // Rotating the old value prevents duplicate hash rules from cancelling
630 // each other out and preserves all of the entropy
631 const uint64_t old_value =
632 hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0;
633 hash = old_value ^ new_hash.value();
634 }
635 // If the policy is a terminal policy and a hash has been generated,
636 // ignore the rest of the hash policies.
637 if (hash_policy.terminal && hash.has_value()) {
638 break;
639 }
640 }
641 if (!hash.has_value()) {
642 // If there is no hash, we just choose a random value as a default.
643 // We cannot directly use the result of rand() as the hash value,
644 // since it is a 32-bit number and not a 64-bit number and will
645 // therefore not be evenly distributed.
646 uint32_t upper = rand();
647 uint32_t lower = rand();
648 hash = (static_cast<uint64_t>(upper) << 32) | lower;
649 }
650 CallConfig call_config;
651 if (method_config != nullptr) {
652 call_config.method_configs =
653 method_config->GetMethodParsedConfigVector(grpc_empty_slice());
654 call_config.service_config = std::move(method_config);
655 }
656 call_config.call_attributes[kXdsClusterAttribute] = it->first;
657 std::string hash_string = absl::StrCat(hash.value());
658 char* hash_value =
659 static_cast<char*>(args.arena->Alloc(hash_string.size() + 1));
660 memcpy(hash_value, hash_string.c_str(), hash_string.size());
661 hash_value[hash_string.size()] = '\0';
662 call_config.call_attributes[kRequestRingHashAttribute] = hash_value;
663 call_config.call_dispatch_controller =
664 args.arena->New<XdsCallDispatchController>(it->second->Ref());
665 return call_config;
666 }
667
668 //
669 // XdsResolver
670 //
671
StartLocked()672 void XdsResolver::StartLocked() {
673 grpc_error_handle error = GRPC_ERROR_NONE;
674 xds_client_ = XdsClient::GetOrCreate(args_, &error);
675 if (error != GRPC_ERROR_NONE) {
676 gpr_log(GPR_ERROR,
677 "Failed to create xds client -- channel will remain in "
678 "TRANSIENT_FAILURE: %s",
679 grpc_error_std_string(error).c_str());
680 result_handler_->ReturnError(error);
681 return;
682 }
683 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
684 interested_parties_);
685 auto watcher = MakeRefCounted<ListenerWatcher>(Ref());
686 listener_watcher_ = watcher.get();
687 xds_client_->WatchListenerData(server_name_, std::move(watcher));
688 }
689
ShutdownLocked()690 void XdsResolver::ShutdownLocked() {
691 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
692 gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
693 }
694 if (xds_client_ != nullptr) {
695 if (listener_watcher_ != nullptr) {
696 xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
697 /*delay_unsubscription=*/false);
698 }
699 if (route_config_watcher_ != nullptr) {
700 xds_client_->CancelRouteConfigDataWatch(
701 server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
702 }
703 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
704 interested_parties_);
705 xds_client_.reset();
706 }
707 }
708
OnListenerUpdate(XdsApi::LdsUpdate listener)709 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
710 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
711 gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
712 }
713 if (xds_client_ == nullptr) {
714 return;
715 }
716 if (listener.http_connection_manager.route_config_name !=
717 route_config_name_) {
718 if (route_config_watcher_ != nullptr) {
719 xds_client_->CancelRouteConfigDataWatch(
720 route_config_name_, route_config_watcher_,
721 /*delay_unsubscription=*/
722 !listener.http_connection_manager.route_config_name.empty());
723 route_config_watcher_ = nullptr;
724 }
725 route_config_name_ =
726 std::move(listener.http_connection_manager.route_config_name);
727 if (!route_config_name_.empty()) {
728 current_virtual_host_.routes.clear();
729 auto watcher = MakeRefCounted<RouteConfigWatcher>(Ref());
730 route_config_watcher_ = watcher.get();
731 xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
732 }
733 }
734 current_listener_ = std::move(listener);
735 if (route_config_name_.empty()) {
736 GPR_ASSERT(
737 current_listener_.http_connection_manager.rds_update.has_value());
738 OnRouteConfigUpdate(
739 std::move(*current_listener_.http_connection_manager.rds_update));
740 } else {
741 // HCM may contain newer filter config. We need to propagate the update as
742 // config selector to the channel
743 GenerateResult();
744 }
745 }
746
747 namespace {
748 class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator {
749 public:
VirtualHostListIterator(const std::vector<XdsApi::RdsUpdate::VirtualHost> * virtual_hosts)750 explicit VirtualHostListIterator(
751 const std::vector<XdsApi::RdsUpdate::VirtualHost>* virtual_hosts)
752 : virtual_hosts_(virtual_hosts) {}
753
Size() const754 size_t Size() const override { return virtual_hosts_->size(); }
755
GetDomainsForVirtualHost(size_t index) const756 const std::vector<std::string>& GetDomainsForVirtualHost(
757 size_t index) const override {
758 return (*virtual_hosts_)[index].domains;
759 }
760
761 private:
762 const std::vector<XdsApi::RdsUpdate::VirtualHost>* virtual_hosts_;
763 };
764 } // namespace
765
OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update)766 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
767 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
768 gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
769 }
770 if (xds_client_ == nullptr) {
771 return;
772 }
773 // Find the relevant VirtualHost from the RouteConfiguration.
774 auto vhost_index = XdsRouting::FindVirtualHostForDomain(
775 VirtualHostListIterator(&rds_update.virtual_hosts), server_name_);
776 if (!vhost_index.has_value()) {
777 OnError(GRPC_ERROR_CREATE_FROM_CPP_STRING(
778 absl::StrCat("could not find VirtualHost for ", server_name_,
779 " in RouteConfiguration")));
780 return;
781 }
782 // Save the virtual host in the resolver.
783 current_virtual_host_ = std::move(rds_update.virtual_hosts[*vhost_index]);
784 // Send a new result to the channel.
785 GenerateResult();
786 }
787
OnError(grpc_error_handle error)788 void XdsResolver::OnError(grpc_error_handle error) {
789 gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
790 this, grpc_error_std_string(error).c_str());
791 if (xds_client_ == nullptr) {
792 GRPC_ERROR_UNREF(error);
793 return;
794 }
795 Result result;
796 grpc_arg new_arg = xds_client_->MakeChannelArg();
797 result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
798 result.service_config_error = error;
799 result_handler_->ReturnResult(std::move(result));
800 }
801
OnResourceDoesNotExist()802 void XdsResolver::OnResourceDoesNotExist() {
803 gpr_log(GPR_ERROR,
804 "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
805 "update and returning empty service config",
806 this);
807 if (xds_client_ == nullptr) {
808 return;
809 }
810 current_virtual_host_.routes.clear();
811 Result result;
812 result.service_config =
813 ServiceConfig::Create(args_, "{}", &result.service_config_error);
814 GPR_ASSERT(result.service_config != nullptr);
815 result.args = grpc_channel_args_copy(args_);
816 result_handler_->ReturnResult(std::move(result));
817 }
818
CreateServiceConfig(RefCountedPtr<ServiceConfig> * service_config)819 grpc_error_handle XdsResolver::CreateServiceConfig(
820 RefCountedPtr<ServiceConfig>* service_config) {
821 std::vector<std::string> clusters;
822 for (const auto& cluster : cluster_state_map_) {
823 clusters.push_back(
824 absl::StrFormat(" \"%s\":{\n"
825 " \"childPolicy\":[ {\n"
826 " \"cds_experimental\":{\n"
827 " \"cluster\": \"%s\"\n"
828 " }\n"
829 " } ]\n"
830 " }",
831 cluster.first, cluster.first));
832 }
833 std::vector<std::string> config_parts;
834 config_parts.push_back(
835 "{\n"
836 " \"loadBalancingConfig\":[\n"
837 " { \"xds_cluster_manager_experimental\":{\n"
838 " \"children\":{\n");
839 config_parts.push_back(absl::StrJoin(clusters, ",\n"));
840 config_parts.push_back(
841 " }\n"
842 " } }\n"
843 " ]\n"
844 "}");
845 std::string json = absl::StrJoin(config_parts, "");
846 grpc_error_handle error = GRPC_ERROR_NONE;
847 *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
848 return error;
849 }
850
GenerateResult()851 void XdsResolver::GenerateResult() {
852 if (current_virtual_host_.routes.empty()) return;
853 // First create XdsConfigSelector, which may add new entries to the cluster
854 // state map, and then CreateServiceConfig for LB policies.
855 grpc_error_handle error = GRPC_ERROR_NONE;
856 auto config_selector = MakeRefCounted<XdsConfigSelector>(Ref(), &error);
857 if (error != GRPC_ERROR_NONE) {
858 OnError(grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
859 GRPC_STATUS_UNAVAILABLE));
860 return;
861 }
862 Result result;
863 error = CreateServiceConfig(&result.service_config);
864 if (error != GRPC_ERROR_NONE) {
865 OnError(grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
866 GRPC_STATUS_UNAVAILABLE));
867 return;
868 }
869 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
870 gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
871 result.service_config->json_string().c_str());
872 }
873 grpc_arg new_args[] = {
874 xds_client_->MakeChannelArg(),
875 config_selector->MakeChannelArg(),
876 };
877 result.args =
878 grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args));
879 result_handler_->ReturnResult(std::move(result));
880 }
881
MaybeRemoveUnusedClusters()882 void XdsResolver::MaybeRemoveUnusedClusters() {
883 bool update_needed = false;
884 for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
885 RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
886 if (cluster_state != nullptr) {
887 ++it;
888 } else {
889 update_needed = true;
890 it = cluster_state_map_.erase(it);
891 }
892 }
893 if (update_needed && xds_client_ != nullptr) {
894 // Send a new result to the channel.
895 GenerateResult();
896 }
897 }
898
899 //
900 // Factory
901 //
902
903 class XdsResolverFactory : public ResolverFactory {
904 public:
IsValidUri(const URI & uri) const905 bool IsValidUri(const URI& uri) const override {
906 if (GPR_UNLIKELY(!uri.authority().empty())) {
907 gpr_log(GPR_ERROR, "URI authority not supported");
908 return false;
909 }
910 return true;
911 }
912
CreateResolver(ResolverArgs args) const913 OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
914 if (!IsValidUri(args.uri)) return nullptr;
915 return MakeOrphanable<XdsResolver>(std::move(args));
916 }
917
scheme() const918 const char* scheme() const override { return "xds"; }
919 };
920
921 } // namespace
922
923 } // namespace grpc_core
924
grpc_resolver_xds_init()925 void grpc_resolver_xds_init() {
926 grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
927 absl::make_unique<grpc_core::XdsResolverFactory>());
928 }
929
grpc_resolver_xds_shutdown()930 void grpc_resolver_xds_shutdown() {}
931