1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
24 #include "re2/re2.h"
25 #define XXH_INLINE_ALL
26 #include "xxhash.h"
27 
28 #include "src/core/ext/filters/client_channel/config_selector.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
30 #include "src/core/ext/filters/client_channel/resolver_registry.h"
31 #include "src/core/ext/xds/xds_channel_args.h"
32 #include "src/core/ext/xds/xds_client.h"
33 #include "src/core/ext/xds/xds_http_filters.h"
34 #include "src/core/ext/xds/xds_routing.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/iomgr/closure.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/transport/timeout_encoding.h"
39 
40 namespace grpc_core {
41 
42 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
43 
44 const char* kXdsClusterAttribute = "xds_cluster_name";
45 
46 namespace {
47 
48 //
49 // XdsResolver
50 //
51 
52 class XdsResolver : public Resolver {
53  public:
XdsResolver(ResolverArgs args)54   explicit XdsResolver(ResolverArgs args)
55       : work_serializer_(std::move(args.work_serializer)),
56         result_handler_(std::move(args.result_handler)),
57         server_name_(absl::StripPrefix(args.uri.path(), "/")),
58         args_(grpc_channel_args_copy(args.args)),
59         interested_parties_(args.pollset_set) {
60     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
61       gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
62               server_name_.c_str());
63     }
64   }
65 
~XdsResolver()66   ~XdsResolver() override {
67     grpc_channel_args_destroy(args_);
68     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
69       gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
70     }
71   }
72 
73   void StartLocked() override;
74 
75   void ShutdownLocked() override;
76 
ResetBackoffLocked()77   void ResetBackoffLocked() override {
78     if (xds_client_ != nullptr) xds_client_->ResetBackoff();
79   }
80 
81  private:
82   class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
83    public:
ListenerWatcher(RefCountedPtr<XdsResolver> resolver)84     explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
85         : resolver_(std::move(resolver)) {}
OnListenerChanged(XdsApi::LdsUpdate listener)86     void OnListenerChanged(XdsApi::LdsUpdate listener) override {
87       Ref().release();  // ref held by lambda
88       resolver_->work_serializer_->Run(
89           // TODO(yashykt): When we move to C++14, capture listener with
90           // std::move
91           [this, listener]() mutable {
92             resolver_->OnListenerUpdate(std::move(listener));
93             Unref();
94           },
95           DEBUG_LOCATION);
96     }
OnError(grpc_error_handle error)97     void OnError(grpc_error_handle error) override {
98       Ref().release();  // ref held by lambda
99       resolver_->work_serializer_->Run(
100           [this, error]() {
101             resolver_->OnError(error);
102             Unref();
103           },
104           DEBUG_LOCATION);
105     }
OnResourceDoesNotExist()106     void OnResourceDoesNotExist() override {
107       Ref().release();  // ref held by lambda
108       resolver_->work_serializer_->Run(
109           [this]() {
110             resolver_->OnResourceDoesNotExist();
111             Unref();
112           },
113           DEBUG_LOCATION);
114     }
115 
116    private:
117     RefCountedPtr<XdsResolver> resolver_;
118   };
119 
120   class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
121    public:
RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)122     explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
123         : resolver_(std::move(resolver)) {}
OnRouteConfigChanged(XdsApi::RdsUpdate route_config)124     void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
125       Ref().release();  // ref held by lambda
126       resolver_->work_serializer_->Run(
127           // TODO(yashykt): When we move to C++14, capture route_config with
128           // std::move
129           [this, route_config]() mutable {
130             resolver_->OnRouteConfigUpdate(std::move(route_config));
131             Unref();
132           },
133           DEBUG_LOCATION);
134     }
OnError(grpc_error_handle error)135     void OnError(grpc_error_handle error) override {
136       Ref().release();  // ref held by lambda
137       resolver_->work_serializer_->Run(
138           [this, error]() {
139             resolver_->OnError(error);
140             Unref();
141           },
142           DEBUG_LOCATION);
143     }
OnResourceDoesNotExist()144     void OnResourceDoesNotExist() override {
145       Ref().release();  // ref held by lambda
146       resolver_->work_serializer_->Run(
147           [this]() {
148             resolver_->OnResourceDoesNotExist();
149             Unref();
150           },
151           DEBUG_LOCATION);
152     }
153 
154    private:
155     RefCountedPtr<XdsResolver> resolver_;
156   };
157 
158   // An entry in the map of clusters that need to be present in the LB
159   // policy config.  The map holds a weak ref.  One strong ref is held by
160   // the ConfigSelector, and another is held by each call assigned to
161   // the cluster by the ConfigSelector.  The ref for each call is held
162   // until the call is committed.  When the strong refs go away, we hop
163   // back into the WorkSerializer to remove the entry from the map.
164   class ClusterState : public DualRefCounted<ClusterState> {
165    public:
166     using ClusterStateMap =
167         std::map<std::string, WeakRefCountedPtr<ClusterState>>;
168 
ClusterState(RefCountedPtr<XdsResolver> resolver,const std::string & cluster_name)169     ClusterState(RefCountedPtr<XdsResolver> resolver,
170                  const std::string& cluster_name)
171         : resolver_(std::move(resolver)),
172           it_(resolver_->cluster_state_map_.emplace(cluster_name, WeakRef())
173                   .first) {}
174 
Orphan()175     void Orphan() override {
176       auto* resolver = resolver_.release();
177       resolver->work_serializer_->Run(
178           [resolver]() {
179             resolver->MaybeRemoveUnusedClusters();
180             resolver->Unref();
181           },
182           DEBUG_LOCATION);
183     }
184 
cluster() const185     const std::string& cluster() const { return it_->first; }
186 
187    private:
188     RefCountedPtr<XdsResolver> resolver_;
189     ClusterStateMap::iterator it_;
190   };
191 
192   // Call dispatch controller, created for each call handled by the
193   // ConfigSelector.  Holds a ref to the ClusterState object until the
194   // call is committed.
195   class XdsCallDispatchController
196       : public ConfigSelector::CallDispatchController {
197    public:
XdsCallDispatchController(RefCountedPtr<ClusterState> cluster_state)198     explicit XdsCallDispatchController(
199         RefCountedPtr<ClusterState> cluster_state)
200         : cluster_state_(std::move(cluster_state)) {}
201 
ShouldRetry()202     bool ShouldRetry() override {
203       // TODO(donnadionne): Implement the retry circuit breaker here.
204       return true;
205     }
206 
Commit()207     void Commit() override {
208       // TODO(donnadionne): If ShouldRetry() was called previously,
209       // decrement the retry circuit breaker counter.
210       cluster_state_.reset();
211     }
212 
213    private:
214     // Note: The XdsCallDispatchController object is never actually destroyed,
215     // so do not add any data members that require destruction unless you have
216     // some other way to clean them up.
217     RefCountedPtr<ClusterState> cluster_state_;
218   };
219 
220   class XdsConfigSelector : public ConfigSelector {
221    public:
222     XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
223                       grpc_error_handle* error);
224     ~XdsConfigSelector() override;
225 
name() const226     const char* name() const override { return "XdsConfigSelector"; }
227 
Equals(const ConfigSelector * other) const228     bool Equals(const ConfigSelector* other) const override {
229       const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
230       // Don't need to compare resolver_, since that will always be the same.
231       return route_table_ == other_xds->route_table_ &&
232              clusters_ == other_xds->clusters_;
233     }
234 
235     CallConfig GetCallConfig(GetCallConfigArgs args) override;
236 
GetFilters()237     std::vector<const grpc_channel_filter*> GetFilters() override {
238       return filters_;
239     }
240 
241     grpc_channel_args* ModifyChannelArgs(grpc_channel_args* args) override;
242 
243    private:
244     struct Route {
245       struct ClusterWeightState {
246         uint32_t range_end;
247         absl::string_view cluster;
248         RefCountedPtr<ServiceConfig> method_config;
249 
250         bool operator==(const ClusterWeightState& other) const;
251       };
252 
253       XdsApi::Route route;
254       RefCountedPtr<ServiceConfig> method_config;
255       absl::InlinedVector<ClusterWeightState, 2> weighted_cluster_state;
256 
257       bool operator==(const Route& other) const;
258     };
259     using RouteTable = std::vector<Route>;
260 
261     class RouteListIterator;
262 
263     void MaybeAddCluster(const std::string& name);
264     grpc_error_handle CreateMethodConfig(
265         const XdsApi::Route& route,
266         const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight,
267         RefCountedPtr<ServiceConfig>* method_config);
268 
269     RefCountedPtr<XdsResolver> resolver_;
270     RouteTable route_table_;
271     std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
272     std::vector<const grpc_channel_filter*> filters_;
273   };
274 
275   void OnListenerUpdate(XdsApi::LdsUpdate listener);
276   void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
277   void OnError(grpc_error_handle error);
278   void OnResourceDoesNotExist();
279 
280   grpc_error_handle CreateServiceConfig(
281       RefCountedPtr<ServiceConfig>* service_config);
282   void GenerateResult();
283   void MaybeRemoveUnusedClusters();
284 
285   std::shared_ptr<WorkSerializer> work_serializer_;
286   std::unique_ptr<ResultHandler> result_handler_;
287   std::string server_name_;
288   const grpc_channel_args* args_;
289   grpc_pollset_set* interested_parties_;
290 
291   RefCountedPtr<XdsClient> xds_client_;
292 
293   XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
294   // This will not contain the RouteConfiguration, even if it comes with the
295   // LDS response; instead, the relevant VirtualHost from the
296   // RouteConfiguration will be saved in current_virtual_host_.
297   XdsApi::LdsUpdate current_listener_;
298 
299   std::string route_config_name_;
300   XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
301   XdsApi::RdsUpdate::VirtualHost current_virtual_host_;
302 
303   ClusterState::ClusterStateMap cluster_state_map_;
304 };
305 
306 //
307 // XdsResolver::XdsConfigSelector::Route
308 //
309 
MethodConfigsEqual(const ServiceConfig * sc1,const ServiceConfig * sc2)310 bool MethodConfigsEqual(const ServiceConfig* sc1, const ServiceConfig* sc2) {
311   if (sc1 == nullptr) return sc2 == nullptr;
312   if (sc2 == nullptr) return false;
313   return sc1->json_string() == sc2->json_string();
314 }
315 
operator ==(const ClusterWeightState & other) const316 bool XdsResolver::XdsConfigSelector::Route::ClusterWeightState::operator==(
317     const ClusterWeightState& other) const {
318   return range_end == other.range_end && cluster == other.cluster &&
319          MethodConfigsEqual(method_config.get(), other.method_config.get());
320 }
321 
operator ==(const Route & other) const322 bool XdsResolver::XdsConfigSelector::Route::operator==(
323     const Route& other) const {
324   return route == other.route &&
325          weighted_cluster_state == other.weighted_cluster_state &&
326          MethodConfigsEqual(method_config.get(), other.method_config.get());
327 }
328 
329 // Implementation of XdsRouting::RouteListIterator for getting the matching
330 // route for a request.
331 class XdsResolver::XdsConfigSelector::RouteListIterator
332     : public XdsRouting::RouteListIterator {
333  public:
RouteListIterator(const RouteTable * route_table)334   explicit RouteListIterator(const RouteTable* route_table)
335       : route_table_(route_table) {}
336 
Size() const337   size_t Size() const override { return route_table_->size(); }
338 
GetMatchersForRoute(size_t index) const339   const XdsApi::Route::Matchers& GetMatchersForRoute(
340       size_t index) const override {
341     return (*route_table_)[index].route.matchers;
342   }
343 
344  private:
345   const RouteTable* route_table_;
346 };
347 
348 //
349 // XdsResolver::XdsConfigSelector
350 //
351 
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,grpc_error_handle * error)352 XdsResolver::XdsConfigSelector::XdsConfigSelector(
353     RefCountedPtr<XdsResolver> resolver, grpc_error_handle* error)
354     : resolver_(std::move(resolver)) {
355   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
356     gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
357             resolver_.get(), this);
358   }
359   // 1. Construct the route table
360   // 2  Update resolver's cluster state map
361   // 3. Construct cluster list to hold on to entries in the cluster state
362   // map.
363   // Reserve the necessary entries up-front to avoid reallocation as we add
364   // elements. This is necessary because the string_view in the entry's
365   // weighted_cluster_state field points to the memory in the route field, so
366   // moving the entry in a reallocation will cause the string_view to point to
367   // invalid data.
368   route_table_.reserve(resolver_->current_virtual_host_.routes.size());
369   for (auto& route : resolver_->current_virtual_host_.routes) {
370     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
371       gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
372               resolver_.get(), this, route.ToString().c_str());
373     }
374     route_table_.emplace_back();
375     auto& route_entry = route_table_.back();
376     route_entry.route = route;
377     auto* route_action =
378         absl::get_if<XdsApi::Route::RouteAction>(&route_entry.route.action);
379     if (route_action != nullptr) {
380       // If the route doesn't specify a timeout, set its timeout to the global
381       // one.
382       if (!route_action->max_stream_duration.has_value()) {
383         route_action->max_stream_duration =
384             resolver_->current_listener_.http_connection_manager
385                 .http_max_stream_duration;
386       }
387       if (route_action->weighted_clusters.empty()) {
388         *error = CreateMethodConfig(route_entry.route, nullptr,
389                                     &route_entry.method_config);
390         MaybeAddCluster(route_action->cluster_name);
391       } else {
392         uint32_t end = 0;
393         for (const auto& weighted_cluster : route_action->weighted_clusters) {
394           Route::ClusterWeightState cluster_weight_state;
395           *error = CreateMethodConfig(route_entry.route, &weighted_cluster,
396                                       &cluster_weight_state.method_config);
397           if (*error != GRPC_ERROR_NONE) return;
398           end += weighted_cluster.weight;
399           cluster_weight_state.range_end = end;
400           cluster_weight_state.cluster = weighted_cluster.name;
401           route_entry.weighted_cluster_state.push_back(
402               std::move(cluster_weight_state));
403           MaybeAddCluster(weighted_cluster.name);
404         }
405       }
406     }
407   }
408   // Populate filter list.
409   for (const auto& http_filter :
410        resolver_->current_listener_.http_connection_manager.http_filters) {
411     // Find filter.  This is guaranteed to succeed, because it's checked
412     // at config validation time in the XdsApi code.
413     const XdsHttpFilterImpl* filter_impl =
414         XdsHttpFilterRegistry::GetFilterForType(
415             http_filter.config.config_proto_type_name);
416     GPR_ASSERT(filter_impl != nullptr);
417     // Add C-core filter to list.
418     if (filter_impl->channel_filter() != nullptr) {
419       filters_.push_back(filter_impl->channel_filter());
420     }
421   }
422 }
423 
~XdsConfigSelector()424 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
425   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
426     gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
427             resolver_.get(), this);
428   }
429   clusters_.clear();
430   resolver_->MaybeRemoveUnusedClusters();
431 }
432 
CreateMethodConfig(const XdsApi::Route & route,const XdsApi::Route::RouteAction::ClusterWeight * cluster_weight,RefCountedPtr<ServiceConfig> * method_config)433 grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
434     const XdsApi::Route& route,
435     const XdsApi::Route::RouteAction::ClusterWeight* cluster_weight,
436     RefCountedPtr<ServiceConfig>* method_config) {
437   std::vector<std::string> fields;
438   const auto& route_action =
439       absl::get<XdsApi::Route::RouteAction>(route.action);
440   // Set retry policy if any.
441   if (route_action.retry_policy.has_value() &&
442       !route_action.retry_policy->retry_on.Empty()) {
443     std::vector<std::string> retry_parts;
444     retry_parts.push_back(absl::StrFormat(
445         "\"retryPolicy\": {\n"
446         "      \"maxAttempts\": %d,\n"
447         "      \"initialBackoff\": \"%d.%09ds\",\n"
448         "      \"maxBackoff\": \"%d.%09ds\",\n"
449         "      \"backoffMultiplier\": 2,\n",
450         route_action.retry_policy->num_retries + 1,
451         route_action.retry_policy->retry_back_off.base_interval.seconds,
452         route_action.retry_policy->retry_back_off.base_interval.nanos,
453         route_action.retry_policy->retry_back_off.max_interval.seconds,
454         route_action.retry_policy->retry_back_off.max_interval.nanos));
455     std::vector<std::string> code_parts;
456     if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_CANCELLED)) {
457       code_parts.push_back("        \"CANCELLED\"");
458     }
459     if (route_action.retry_policy->retry_on.Contains(
460             GRPC_STATUS_DEADLINE_EXCEEDED)) {
461       code_parts.push_back("        \"DEADLINE_EXCEEDED\"");
462     }
463     if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_INTERNAL)) {
464       code_parts.push_back("        \"INTERNAL\"");
465     }
466     if (route_action.retry_policy->retry_on.Contains(
467             GRPC_STATUS_RESOURCE_EXHAUSTED)) {
468       code_parts.push_back("        \"RESOURCE_EXHAUSTED\"");
469     }
470     if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_UNAVAILABLE)) {
471       code_parts.push_back("        \"UNAVAILABLE\"");
472     }
473     retry_parts.push_back(
474         absl::StrFormat("      \"retryableStatusCodes\": [\n %s ]\n",
475                         absl::StrJoin(code_parts, ",\n")));
476     retry_parts.push_back(absl::StrFormat("    }"));
477     fields.emplace_back(absl::StrJoin(retry_parts, ""));
478   }
479   // Set timeout.
480   if (route_action.max_stream_duration.has_value() &&
481       (route_action.max_stream_duration->seconds != 0 ||
482        route_action.max_stream_duration->nanos != 0)) {
483     fields.emplace_back(
484         absl::StrFormat("    \"timeout\": \"%d.%09ds\"",
485                         route_action.max_stream_duration->seconds,
486                         route_action.max_stream_duration->nanos));
487   }
488   // Handle xDS HTTP filters.
489   XdsRouting::GeneratePerHttpFilterConfigsResult result =
490       XdsRouting::GeneratePerHTTPFilterConfigs(
491           resolver_->current_listener_.http_connection_manager.http_filters,
492           resolver_->current_virtual_host_, route, cluster_weight,
493           grpc_channel_args_copy(resolver_->args_));
494   if (result.error != GRPC_ERROR_NONE) {
495     return result.error;
496   }
497   for (const auto& p : result.per_filter_configs) {
498     fields.emplace_back(absl::StrCat("    \"", p.first, "\": [\n",
499                                      absl::StrJoin(p.second, ",\n"),
500                                      "\n    ]"));
501   }
502   // Construct service config.
503   grpc_error_handle error = GRPC_ERROR_NONE;
504   if (!fields.empty()) {
505     std::string json = absl::StrCat(
506         "{\n"
507         "  \"methodConfig\": [ {\n"
508         "    \"name\": [\n"
509         "      {}\n"
510         "    ],\n"
511         "    ",
512         absl::StrJoin(fields, ",\n"),
513         "\n  } ]\n"
514         "}");
515     *method_config = ServiceConfig::Create(result.args, json.c_str(), &error);
516   }
517   grpc_channel_args_destroy(result.args);
518   return error;
519 }
520 
ModifyChannelArgs(grpc_channel_args * args)521 grpc_channel_args* XdsResolver::XdsConfigSelector::ModifyChannelArgs(
522     grpc_channel_args* args) {
523   return args;
524 }
525 
MaybeAddCluster(const std::string & name)526 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
527   if (clusters_.find(name) == clusters_.end()) {
528     auto it = resolver_->cluster_state_map_.find(name);
529     if (it == resolver_->cluster_state_map_.end()) {
530       auto new_cluster_state = MakeRefCounted<ClusterState>(resolver_, name);
531       clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
532     } else {
533       clusters_[it->second->cluster()] = it->second->Ref();
534     }
535   }
536 }
537 
HeaderHashHelper(const XdsApi::Route::RouteAction::HashPolicy & policy,grpc_metadata_batch * initial_metadata)538 absl::optional<uint64_t> HeaderHashHelper(
539     const XdsApi::Route::RouteAction::HashPolicy& policy,
540     grpc_metadata_batch* initial_metadata) {
541   GPR_ASSERT(policy.type == XdsApi::Route::RouteAction::HashPolicy::HEADER);
542   std::string value_buffer;
543   absl::optional<absl::string_view> header_value = XdsRouting::GetHeaderValue(
544       initial_metadata, policy.header_name, &value_buffer);
545   if (!header_value.has_value()) {
546     return absl::nullopt;
547   }
548   if (policy.regex != nullptr) {
549     // If GetHeaderValue() did not already store the value in
550     // value_buffer, copy it there now, so we can modify it.
551     if (header_value->data() != value_buffer.data()) {
552       value_buffer = std::string(*header_value);
553     }
554     RE2::GlobalReplace(&value_buffer, *policy.regex, policy.regex_substitution);
555     header_value = value_buffer;
556   }
557   return XXH64(header_value->data(), header_value->size(), 0);
558 }
559 
GetCallConfig(GetCallConfigArgs args)560 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
561     GetCallConfigArgs args) {
562   auto route_index = XdsRouting::GetRouteForRequest(
563       RouteListIterator(&route_table_), StringViewFromSlice(*args.path),
564       args.initial_metadata);
565   if (!route_index.has_value()) {
566     return CallConfig();
567   }
568   auto& entry = route_table_[*route_index];
569   // Found a route match
570   const auto* route_action =
571       absl::get_if<XdsApi::Route::RouteAction>(&entry.route.action);
572   if (route_action == nullptr) {
573     CallConfig call_config;
574     call_config.error =
575         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
576                                "Matching route has inappropriate action"),
577                            GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
578     return call_config;
579   }
580   absl::string_view cluster_name;
581   RefCountedPtr<ServiceConfig> method_config;
582   if (route_action->weighted_clusters.empty()) {
583     cluster_name = route_action->cluster_name;
584     method_config = entry.method_config;
585   } else {
586     const uint32_t key =
587         rand() %
588         entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
589             .range_end;
590     // Find the index in weighted clusters corresponding to key.
591     size_t mid = 0;
592     size_t start_index = 0;
593     size_t end_index = entry.weighted_cluster_state.size() - 1;
594     size_t index = 0;
595     while (end_index > start_index) {
596       mid = (start_index + end_index) / 2;
597       if (entry.weighted_cluster_state[mid].range_end > key) {
598         end_index = mid;
599       } else if (entry.weighted_cluster_state[mid].range_end < key) {
600         start_index = mid + 1;
601       } else {
602         index = mid + 1;
603         break;
604       }
605     }
606     if (index == 0) index = start_index;
607     GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key);
608     cluster_name = entry.weighted_cluster_state[index].cluster;
609     method_config = entry.weighted_cluster_state[index].method_config;
610   }
611   auto it = clusters_.find(cluster_name);
612   GPR_ASSERT(it != clusters_.end());
613   // Generate a hash.
614   absl::optional<uint64_t> hash;
615   for (const auto& hash_policy : route_action->hash_policies) {
616     absl::optional<uint64_t> new_hash;
617     switch (hash_policy.type) {
618       case XdsApi::Route::RouteAction::HashPolicy::HEADER:
619         new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
620         break;
621       case XdsApi::Route::RouteAction::HashPolicy::CHANNEL_ID:
622         new_hash =
623             static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver_.get()));
624         break;
625       default:
626         GPR_ASSERT(0);
627     }
628     if (new_hash.has_value()) {
629       // Rotating the old value prevents duplicate hash rules from cancelling
630       // each other out and preserves all of the entropy
631       const uint64_t old_value =
632           hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0;
633       hash = old_value ^ new_hash.value();
634     }
635     // If the policy is a terminal policy and a hash has been generated,
636     // ignore the rest of the hash policies.
637     if (hash_policy.terminal && hash.has_value()) {
638       break;
639     }
640   }
641   if (!hash.has_value()) {
642     // If there is no hash, we just choose a random value as a default.
643     // We cannot directly use the result of rand() as the hash value,
644     // since it is a 32-bit number and not a 64-bit number and will
645     // therefore not be evenly distributed.
646     uint32_t upper = rand();
647     uint32_t lower = rand();
648     hash = (static_cast<uint64_t>(upper) << 32) | lower;
649   }
650   CallConfig call_config;
651   if (method_config != nullptr) {
652     call_config.method_configs =
653         method_config->GetMethodParsedConfigVector(grpc_empty_slice());
654     call_config.service_config = std::move(method_config);
655   }
656   call_config.call_attributes[kXdsClusterAttribute] = it->first;
657   std::string hash_string = absl::StrCat(hash.value());
658   char* hash_value =
659       static_cast<char*>(args.arena->Alloc(hash_string.size() + 1));
660   memcpy(hash_value, hash_string.c_str(), hash_string.size());
661   hash_value[hash_string.size()] = '\0';
662   call_config.call_attributes[kRequestRingHashAttribute] = hash_value;
663   call_config.call_dispatch_controller =
664       args.arena->New<XdsCallDispatchController>(it->second->Ref());
665   return call_config;
666 }
667 
668 //
669 // XdsResolver
670 //
671 
StartLocked()672 void XdsResolver::StartLocked() {
673   grpc_error_handle error = GRPC_ERROR_NONE;
674   xds_client_ = XdsClient::GetOrCreate(args_, &error);
675   if (error != GRPC_ERROR_NONE) {
676     gpr_log(GPR_ERROR,
677             "Failed to create xds client -- channel will remain in "
678             "TRANSIENT_FAILURE: %s",
679             grpc_error_std_string(error).c_str());
680     result_handler_->ReturnError(error);
681     return;
682   }
683   grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
684                                    interested_parties_);
685   auto watcher = MakeRefCounted<ListenerWatcher>(Ref());
686   listener_watcher_ = watcher.get();
687   xds_client_->WatchListenerData(server_name_, std::move(watcher));
688 }
689 
ShutdownLocked()690 void XdsResolver::ShutdownLocked() {
691   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
692     gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
693   }
694   if (xds_client_ != nullptr) {
695     if (listener_watcher_ != nullptr) {
696       xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
697                                            /*delay_unsubscription=*/false);
698     }
699     if (route_config_watcher_ != nullptr) {
700       xds_client_->CancelRouteConfigDataWatch(
701           server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
702     }
703     grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
704                                      interested_parties_);
705     xds_client_.reset();
706   }
707 }
708 
OnListenerUpdate(XdsApi::LdsUpdate listener)709 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
710   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
711     gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
712   }
713   if (xds_client_ == nullptr) {
714     return;
715   }
716   if (listener.http_connection_manager.route_config_name !=
717       route_config_name_) {
718     if (route_config_watcher_ != nullptr) {
719       xds_client_->CancelRouteConfigDataWatch(
720           route_config_name_, route_config_watcher_,
721           /*delay_unsubscription=*/
722           !listener.http_connection_manager.route_config_name.empty());
723       route_config_watcher_ = nullptr;
724     }
725     route_config_name_ =
726         std::move(listener.http_connection_manager.route_config_name);
727     if (!route_config_name_.empty()) {
728       current_virtual_host_.routes.clear();
729       auto watcher = MakeRefCounted<RouteConfigWatcher>(Ref());
730       route_config_watcher_ = watcher.get();
731       xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
732     }
733   }
734   current_listener_ = std::move(listener);
735   if (route_config_name_.empty()) {
736     GPR_ASSERT(
737         current_listener_.http_connection_manager.rds_update.has_value());
738     OnRouteConfigUpdate(
739         std::move(*current_listener_.http_connection_manager.rds_update));
740   } else {
741     // HCM may contain newer filter config. We need to propagate the update as
742     // config selector to the channel
743     GenerateResult();
744   }
745 }
746 
747 namespace {
748 class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator {
749  public:
VirtualHostListIterator(const std::vector<XdsApi::RdsUpdate::VirtualHost> * virtual_hosts)750   explicit VirtualHostListIterator(
751       const std::vector<XdsApi::RdsUpdate::VirtualHost>* virtual_hosts)
752       : virtual_hosts_(virtual_hosts) {}
753 
Size() const754   size_t Size() const override { return virtual_hosts_->size(); }
755 
GetDomainsForVirtualHost(size_t index) const756   const std::vector<std::string>& GetDomainsForVirtualHost(
757       size_t index) const override {
758     return (*virtual_hosts_)[index].domains;
759   }
760 
761  private:
762   const std::vector<XdsApi::RdsUpdate::VirtualHost>* virtual_hosts_;
763 };
764 }  // namespace
765 
OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update)766 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
767   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
768     gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
769   }
770   if (xds_client_ == nullptr) {
771     return;
772   }
773   // Find the relevant VirtualHost from the RouteConfiguration.
774   auto vhost_index = XdsRouting::FindVirtualHostForDomain(
775       VirtualHostListIterator(&rds_update.virtual_hosts), server_name_);
776   if (!vhost_index.has_value()) {
777     OnError(GRPC_ERROR_CREATE_FROM_CPP_STRING(
778         absl::StrCat("could not find VirtualHost for ", server_name_,
779                      " in RouteConfiguration")));
780     return;
781   }
782   // Save the virtual host in the resolver.
783   current_virtual_host_ = std::move(rds_update.virtual_hosts[*vhost_index]);
784   // Send a new result to the channel.
785   GenerateResult();
786 }
787 
OnError(grpc_error_handle error)788 void XdsResolver::OnError(grpc_error_handle error) {
789   gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
790           this, grpc_error_std_string(error).c_str());
791   if (xds_client_ == nullptr) {
792     GRPC_ERROR_UNREF(error);
793     return;
794   }
795   Result result;
796   grpc_arg new_arg = xds_client_->MakeChannelArg();
797   result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
798   result.service_config_error = error;
799   result_handler_->ReturnResult(std::move(result));
800 }
801 
OnResourceDoesNotExist()802 void XdsResolver::OnResourceDoesNotExist() {
803   gpr_log(GPR_ERROR,
804           "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
805           "update and returning empty service config",
806           this);
807   if (xds_client_ == nullptr) {
808     return;
809   }
810   current_virtual_host_.routes.clear();
811   Result result;
812   result.service_config =
813       ServiceConfig::Create(args_, "{}", &result.service_config_error);
814   GPR_ASSERT(result.service_config != nullptr);
815   result.args = grpc_channel_args_copy(args_);
816   result_handler_->ReturnResult(std::move(result));
817 }
818 
CreateServiceConfig(RefCountedPtr<ServiceConfig> * service_config)819 grpc_error_handle XdsResolver::CreateServiceConfig(
820     RefCountedPtr<ServiceConfig>* service_config) {
821   std::vector<std::string> clusters;
822   for (const auto& cluster : cluster_state_map_) {
823     clusters.push_back(
824         absl::StrFormat("      \"%s\":{\n"
825                         "        \"childPolicy\":[ {\n"
826                         "          \"cds_experimental\":{\n"
827                         "            \"cluster\": \"%s\"\n"
828                         "          }\n"
829                         "        } ]\n"
830                         "       }",
831                         cluster.first, cluster.first));
832   }
833   std::vector<std::string> config_parts;
834   config_parts.push_back(
835       "{\n"
836       "  \"loadBalancingConfig\":[\n"
837       "    { \"xds_cluster_manager_experimental\":{\n"
838       "      \"children\":{\n");
839   config_parts.push_back(absl::StrJoin(clusters, ",\n"));
840   config_parts.push_back(
841       "    }\n"
842       "    } }\n"
843       "  ]\n"
844       "}");
845   std::string json = absl::StrJoin(config_parts, "");
846   grpc_error_handle error = GRPC_ERROR_NONE;
847   *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
848   return error;
849 }
850 
GenerateResult()851 void XdsResolver::GenerateResult() {
852   if (current_virtual_host_.routes.empty()) return;
853   // First create XdsConfigSelector, which may add new entries to the cluster
854   // state map, and then CreateServiceConfig for LB policies.
855   grpc_error_handle error = GRPC_ERROR_NONE;
856   auto config_selector = MakeRefCounted<XdsConfigSelector>(Ref(), &error);
857   if (error != GRPC_ERROR_NONE) {
858     OnError(grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
859                                GRPC_STATUS_UNAVAILABLE));
860     return;
861   }
862   Result result;
863   error = CreateServiceConfig(&result.service_config);
864   if (error != GRPC_ERROR_NONE) {
865     OnError(grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
866                                GRPC_STATUS_UNAVAILABLE));
867     return;
868   }
869   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
870     gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
871             result.service_config->json_string().c_str());
872   }
873   grpc_arg new_args[] = {
874       xds_client_->MakeChannelArg(),
875       config_selector->MakeChannelArg(),
876   };
877   result.args =
878       grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args));
879   result_handler_->ReturnResult(std::move(result));
880 }
881 
MaybeRemoveUnusedClusters()882 void XdsResolver::MaybeRemoveUnusedClusters() {
883   bool update_needed = false;
884   for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
885     RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
886     if (cluster_state != nullptr) {
887       ++it;
888     } else {
889       update_needed = true;
890       it = cluster_state_map_.erase(it);
891     }
892   }
893   if (update_needed && xds_client_ != nullptr) {
894     // Send a new result to the channel.
895     GenerateResult();
896   }
897 }
898 
899 //
900 // Factory
901 //
902 
903 class XdsResolverFactory : public ResolverFactory {
904  public:
IsValidUri(const URI & uri) const905   bool IsValidUri(const URI& uri) const override {
906     if (GPR_UNLIKELY(!uri.authority().empty())) {
907       gpr_log(GPR_ERROR, "URI authority not supported");
908       return false;
909     }
910     return true;
911   }
912 
CreateResolver(ResolverArgs args) const913   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
914     if (!IsValidUri(args.uri)) return nullptr;
915     return MakeOrphanable<XdsResolver>(std::move(args));
916   }
917 
scheme() const918   const char* scheme() const override { return "xds"; }
919 };
920 
921 }  // namespace
922 
923 }  // namespace grpc_core
924 
grpc_resolver_xds_init()925 void grpc_resolver_xds_init() {
926   grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
927       absl::make_unique<grpc_core::XdsResolverFactory>());
928 }
929 
grpc_resolver_xds_shutdown()930 void grpc_resolver_xds_shutdown() {}
931