1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package resolver 20 21import ( 22 "context" 23 "encoding/json" 24 "fmt" 25 "math/bits" 26 "strings" 27 "sync/atomic" 28 "time" 29 30 xxhash "github.com/cespare/xxhash/v2" 31 "google.golang.org/grpc/codes" 32 "google.golang.org/grpc/internal/grpcrand" 33 iresolver "google.golang.org/grpc/internal/resolver" 34 "google.golang.org/grpc/internal/serviceconfig" 35 "google.golang.org/grpc/internal/wrr" 36 "google.golang.org/grpc/internal/xds/env" 37 "google.golang.org/grpc/metadata" 38 "google.golang.org/grpc/status" 39 "google.golang.org/grpc/xds/internal/balancer/clustermanager" 40 "google.golang.org/grpc/xds/internal/balancer/ringhash" 41 "google.golang.org/grpc/xds/internal/httpfilter" 42 "google.golang.org/grpc/xds/internal/httpfilter/router" 43 "google.golang.org/grpc/xds/internal/xdsclient" 44) 45 46const ( 47 cdsName = "cds_experimental" 48 xdsClusterManagerName = "xds_cluster_manager_experimental" 49) 50 51type serviceConfig struct { 52 LoadBalancingConfig balancerConfig `json:"loadBalancingConfig"` 53} 54 55type balancerConfig []map[string]interface{} 56 57func newBalancerConfig(name string, config interface{}) balancerConfig { 58 return []map[string]interface{}{{name: config}} 59} 60 61type cdsBalancerConfig struct { 62 Cluster string `json:"cluster"` 63} 64 65type xdsChildConfig struct { 66 ChildPolicy balancerConfig `json:"childPolicy"` 67} 68 69type xdsClusterManagerConfig struct { 70 Children map[string]xdsChildConfig `json:"children"` 71} 72 73// pruneActiveClusters deletes entries in r.activeClusters with zero 74// references. 75func (r *xdsResolver) pruneActiveClusters() { 76 for cluster, ci := range r.activeClusters { 77 if atomic.LoadInt32(&ci.refCount) == 0 { 78 delete(r.activeClusters, cluster) 79 } 80 } 81} 82 83// serviceConfigJSON produces a service config in JSON format representing all 84// the clusters referenced in activeClusters. This includes clusters with zero 85// references, so they must be pruned first. 86func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) { 87 // Generate children (all entries in activeClusters). 88 children := make(map[string]xdsChildConfig) 89 for cluster := range activeClusters { 90 children[cluster] = xdsChildConfig{ 91 ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), 92 } 93 } 94 95 sc := serviceConfig{ 96 LoadBalancingConfig: newBalancerConfig( 97 xdsClusterManagerName, xdsClusterManagerConfig{Children: children}, 98 ), 99 } 100 101 bs, err := json.Marshal(sc) 102 if err != nil { 103 return nil, fmt.Errorf("failed to marshal json: %v", err) 104 } 105 return bs, nil 106} 107 108type virtualHost struct { 109 // map from filter name to its config 110 httpFilterConfigOverride map[string]httpfilter.FilterConfig 111 // retry policy present in virtual host 112 retryConfig *xdsclient.RetryConfig 113} 114 115// routeCluster holds information about a cluster as referenced by a route. 116type routeCluster struct { 117 name string 118 // map from filter name to its config 119 httpFilterConfigOverride map[string]httpfilter.FilterConfig 120} 121 122type route struct { 123 m *xdsclient.CompositeMatcher // converted from route matchers 124 clusters wrr.WRR // holds *routeCluster entries 125 maxStreamDuration time.Duration 126 // map from filter name to its config 127 httpFilterConfigOverride map[string]httpfilter.FilterConfig 128 retryConfig *xdsclient.RetryConfig 129 hashPolicies []*xdsclient.HashPolicy 130} 131 132func (r route) String() string { 133 return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration) 134} 135 136type configSelector struct { 137 r *xdsResolver 138 virtualHost virtualHost 139 routes []route 140 clusters map[string]*clusterInfo 141 httpFilterConfig []xdsclient.HTTPFilter 142} 143 144var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") 145 146func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) { 147 if cs == nil { 148 return nil, status.Errorf(codes.Unavailable, "no valid clusters") 149 } 150 var rt *route 151 // Loop through routes in order and select first match. 152 for _, r := range cs.routes { 153 if r.m.Match(rpcInfo) { 154 rt = &r 155 break 156 } 157 } 158 if rt == nil || rt.clusters == nil { 159 return nil, errNoMatchedRouteFound 160 } 161 cluster, ok := rt.clusters.Next().(*routeCluster) 162 if !ok { 163 return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) 164 } 165 // Add a ref to the selected cluster, as this RPC needs this cluster until 166 // it is committed. 167 ref := &cs.clusters[cluster.name].refCount 168 atomic.AddInt32(ref, 1) 169 170 interceptor, err := cs.newInterceptor(rt, cluster) 171 if err != nil { 172 return nil, err 173 } 174 175 lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name) 176 // Request Hashes are only applicable for a Ring Hash LB. 177 if env.RingHashSupport { 178 lbCtx = ringhash.SetRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies)) 179 } 180 181 config := &iresolver.RPCConfig{ 182 // Communicate to the LB policy the chosen cluster and request hash, if Ring Hash LB policy. 183 Context: lbCtx, 184 OnCommitted: func() { 185 // When the RPC is committed, the cluster is no longer required. 186 // Decrease its ref. 187 if v := atomic.AddInt32(ref, -1); v == 0 { 188 // This entry will be removed from activeClusters when 189 // producing the service config for the empty update. 190 select { 191 case cs.r.updateCh <- suWithError{emptyUpdate: true}: 192 default: 193 } 194 } 195 }, 196 Interceptor: interceptor, 197 } 198 199 if rt.maxStreamDuration != 0 { 200 config.MethodConfig.Timeout = &rt.maxStreamDuration 201 } 202 if rt.retryConfig != nil { 203 config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig) 204 } else if cs.virtualHost.retryConfig != nil { 205 config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig) 206 } 207 208 return config, nil 209} 210 211func retryConfigToPolicy(config *xdsclient.RetryConfig) *serviceconfig.RetryPolicy { 212 return &serviceconfig.RetryPolicy{ 213 MaxAttempts: int(config.NumRetries) + 1, 214 InitialBackoff: config.RetryBackoff.BaseInterval, 215 MaxBackoff: config.RetryBackoff.MaxInterval, 216 BackoffMultiplier: 2, 217 RetryableStatusCodes: config.RetryOn, 218 } 219} 220 221func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*xdsclient.HashPolicy) uint64 { 222 var hash uint64 223 var generatedHash bool 224 for _, policy := range hashPolicies { 225 var policyHash uint64 226 var generatedPolicyHash bool 227 switch policy.HashPolicyType { 228 case xdsclient.HashPolicyTypeHeader: 229 md, ok := metadata.FromOutgoingContext(rpcInfo.Context) 230 if !ok { 231 continue 232 } 233 values := md.Get(policy.HeaderName) 234 // If the header isn't present, no-op. 235 if len(values) == 0 { 236 continue 237 } 238 joinedValues := strings.Join(values, ",") 239 if policy.Regex != nil { 240 joinedValues = policy.Regex.ReplaceAllString(joinedValues, policy.RegexSubstitution) 241 } 242 policyHash = xxhash.Sum64String(joinedValues) 243 generatedHash = true 244 generatedPolicyHash = true 245 case xdsclient.HashPolicyTypeChannelID: 246 // Hash the ClientConn pointer which logically uniquely 247 // identifies the client. 248 policyHash = xxhash.Sum64String(fmt.Sprintf("%p", &cs.r.cc)) 249 generatedHash = true 250 generatedPolicyHash = true 251 } 252 253 // Deterministically combine the hash policies. Rotating prevents 254 // duplicate hash policies from cancelling each other out and preserves 255 // the 64 bits of entropy. 256 if generatedPolicyHash { 257 hash = bits.RotateLeft64(hash, 1) 258 hash = hash ^ policyHash 259 } 260 261 // If terminal policy and a hash has already been generated, ignore the 262 // rest of the policies and use that hash already generated. 263 if policy.Terminal && generatedHash { 264 break 265 } 266 } 267 268 if generatedHash { 269 return hash 270 } 271 // If no generated hash return a random long. In the grand scheme of things 272 // this logically will map to choosing a random backend to route request to. 273 return grpcrand.Uint64() 274} 275 276func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) { 277 if len(cs.httpFilterConfig) == 0 { 278 return nil, nil 279 } 280 interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig)) 281 for _, filter := range cs.httpFilterConfig { 282 if router.IsRouterFilter(filter.Filter) { 283 // Ignore any filters after the router filter. The router itself 284 // is currently a nop. 285 return &interceptorList{interceptors: interceptors}, nil 286 } 287 override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority 288 if override == nil { 289 override = rt.httpFilterConfigOverride[filter.Name] // route is second priority 290 } 291 if override == nil { 292 override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority 293 } 294 ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) 295 if !ok { 296 // Should not happen if it passed xdsClient validation. 297 return nil, fmt.Errorf("filter does not support use in client") 298 } 299 i, err := ib.BuildClientInterceptor(filter.Config, override) 300 if err != nil { 301 return nil, fmt.Errorf("error constructing filter: %v", err) 302 } 303 if i != nil { 304 interceptors = append(interceptors, i) 305 } 306 } 307 return nil, fmt.Errorf("error in xds config: no router filter present") 308} 309 310// stop decrements refs of all clusters referenced by this config selector. 311func (cs *configSelector) stop() { 312 // The resolver's old configSelector may be nil. Handle that here. 313 if cs == nil { 314 return 315 } 316 // If any refs drop to zero, we'll need a service config update to delete 317 // the cluster. 318 needUpdate := false 319 // Loops over cs.clusters, but these are pointers to entries in 320 // activeClusters. 321 for _, ci := range cs.clusters { 322 if v := atomic.AddInt32(&ci.refCount, -1); v == 0 { 323 needUpdate = true 324 } 325 } 326 // We stop the old config selector immediately after sending a new config 327 // selector; we need another update to delete clusters from the config (if 328 // we don't have another update pending already). 329 if needUpdate { 330 select { 331 case cs.r.updateCh <- suWithError{emptyUpdate: true}: 332 default: 333 } 334 } 335} 336 337// A global for testing. 338var newWRR = wrr.NewRandom 339 340// newConfigSelector creates the config selector for su; may add entries to 341// r.activeClusters for previously-unseen clusters. 342func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) { 343 cs := &configSelector{ 344 r: r, 345 virtualHost: virtualHost{ 346 httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride, 347 retryConfig: su.virtualHost.RetryConfig, 348 }, 349 routes: make([]route, len(su.virtualHost.Routes)), 350 clusters: make(map[string]*clusterInfo), 351 httpFilterConfig: su.ldsConfig.httpFilterConfig, 352 } 353 354 for i, rt := range su.virtualHost.Routes { 355 clusters := newWRR() 356 for cluster, wc := range rt.WeightedClusters { 357 clusters.Add(&routeCluster{ 358 name: cluster, 359 httpFilterConfigOverride: wc.HTTPFilterConfigOverride, 360 }, int64(wc.Weight)) 361 362 // Initialize entries in cs.clusters map, creating entries in 363 // r.activeClusters as necessary. Set to zero as they will be 364 // incremented by incRefs. 365 ci := r.activeClusters[cluster] 366 if ci == nil { 367 ci = &clusterInfo{refCount: 0} 368 r.activeClusters[cluster] = ci 369 } 370 cs.clusters[cluster] = ci 371 } 372 cs.routes[i].clusters = clusters 373 374 var err error 375 cs.routes[i].m, err = xdsclient.RouteToMatcher(rt) 376 if err != nil { 377 return nil, err 378 } 379 if rt.MaxStreamDuration == nil { 380 cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration 381 } else { 382 cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration 383 } 384 385 cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride 386 cs.routes[i].retryConfig = rt.RetryConfig 387 cs.routes[i].hashPolicies = rt.HashPolicies 388 } 389 390 // Account for this config selector's clusters. Do this after no further 391 // errors may occur. Note: cs.clusters are pointers to entries in 392 // activeClusters. 393 for _, ci := range cs.clusters { 394 atomic.AddInt32(&ci.refCount, 1) 395 } 396 397 return cs, nil 398} 399 400type clusterInfo struct { 401 // number of references to this cluster; accessed atomically 402 refCount int32 403} 404 405type interceptorList struct { 406 interceptors []iresolver.ClientInterceptor 407} 408 409func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { 410 for i := len(il.interceptors) - 1; i >= 0; i-- { 411 ns := newStream 412 interceptor := il.interceptors[i] 413 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { 414 return interceptor.NewStream(ctx, ri, done, ns) 415 } 416 } 417 return newStream(ctx, func() {}) 418} 419