1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package resolver 20 21import ( 22 "context" 23 "encoding/json" 24 "fmt" 25 "sync/atomic" 26 "time" 27 28 "google.golang.org/grpc/codes" 29 iresolver "google.golang.org/grpc/internal/resolver" 30 "google.golang.org/grpc/internal/wrr" 31 "google.golang.org/grpc/internal/xds/env" 32 "google.golang.org/grpc/status" 33 "google.golang.org/grpc/xds/internal/balancer/clustermanager" 34 xdsclient "google.golang.org/grpc/xds/internal/client" 35 "google.golang.org/grpc/xds/internal/httpfilter" 36 "google.golang.org/grpc/xds/internal/httpfilter/router" 37) 38 39const ( 40 cdsName = "cds_experimental" 41 xdsClusterManagerName = "xds_cluster_manager_experimental" 42) 43 44type serviceConfig struct { 45 LoadBalancingConfig balancerConfig `json:"loadBalancingConfig"` 46} 47 48type balancerConfig []map[string]interface{} 49 50func newBalancerConfig(name string, config interface{}) balancerConfig { 51 return []map[string]interface{}{{name: config}} 52} 53 54type cdsBalancerConfig struct { 55 Cluster string `json:"cluster"` 56} 57 58type xdsChildConfig struct { 59 ChildPolicy balancerConfig `json:"childPolicy"` 60} 61 62type xdsClusterManagerConfig struct { 63 Children map[string]xdsChildConfig `json:"children"` 64} 65 66// pruneActiveClusters deletes entries in r.activeClusters with zero 67// references. 68func (r *xdsResolver) pruneActiveClusters() { 69 for cluster, ci := range r.activeClusters { 70 if atomic.LoadInt32(&ci.refCount) == 0 { 71 delete(r.activeClusters, cluster) 72 } 73 } 74} 75 76// serviceConfigJSON produces a service config in JSON format representing all 77// the clusters referenced in activeClusters. This includes clusters with zero 78// references, so they must be pruned first. 79func serviceConfigJSON(activeClusters map[string]*clusterInfo) (string, error) { 80 // Generate children (all entries in activeClusters). 81 children := make(map[string]xdsChildConfig) 82 for cluster := range activeClusters { 83 children[cluster] = xdsChildConfig{ 84 ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), 85 } 86 } 87 88 sc := serviceConfig{ 89 LoadBalancingConfig: newBalancerConfig( 90 xdsClusterManagerName, xdsClusterManagerConfig{Children: children}, 91 ), 92 } 93 94 bs, err := json.Marshal(sc) 95 if err != nil { 96 return "", fmt.Errorf("failed to marshal json: %v", err) 97 } 98 return string(bs), nil 99} 100 101type virtualHost struct { 102 // map from filter name to its config 103 httpFilterConfigOverride map[string]httpfilter.FilterConfig 104} 105 106// routeCluster holds information about a cluster as referenced by a route. 107type routeCluster struct { 108 name string 109 // map from filter name to its config 110 httpFilterConfigOverride map[string]httpfilter.FilterConfig 111} 112 113type route struct { 114 m *compositeMatcher // converted from route matchers 115 clusters wrr.WRR // holds *routeCluster entries 116 maxStreamDuration time.Duration 117 // map from filter name to its config 118 httpFilterConfigOverride map[string]httpfilter.FilterConfig 119} 120 121func (r route) String() string { 122 return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration) 123} 124 125type configSelector struct { 126 r *xdsResolver 127 virtualHost virtualHost 128 routes []route 129 clusters map[string]*clusterInfo 130 httpFilterConfig []xdsclient.HTTPFilter 131} 132 133var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") 134 135func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) { 136 if cs == nil { 137 return nil, status.Errorf(codes.Unavailable, "no valid clusters") 138 } 139 var rt *route 140 // Loop through routes in order and select first match. 141 for _, r := range cs.routes { 142 if r.m.match(rpcInfo) { 143 rt = &r 144 break 145 } 146 } 147 if rt == nil || rt.clusters == nil { 148 return nil, errNoMatchedRouteFound 149 } 150 cluster, ok := rt.clusters.Next().(*routeCluster) 151 if !ok { 152 return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) 153 } 154 // Add a ref to the selected cluster, as this RPC needs this cluster until 155 // it is committed. 156 ref := &cs.clusters[cluster.name].refCount 157 atomic.AddInt32(ref, 1) 158 159 interceptor, err := cs.newInterceptor(rt, cluster) 160 if err != nil { 161 return nil, err 162 } 163 164 config := &iresolver.RPCConfig{ 165 // Communicate to the LB policy the chosen cluster. 166 Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name), 167 OnCommitted: func() { 168 // When the RPC is committed, the cluster is no longer required. 169 // Decrease its ref. 170 if v := atomic.AddInt32(ref, -1); v == 0 { 171 // This entry will be removed from activeClusters when 172 // producing the service config for the empty update. 173 select { 174 case cs.r.updateCh <- suWithError{emptyUpdate: true}: 175 default: 176 } 177 } 178 }, 179 Interceptor: interceptor, 180 } 181 182 if env.TimeoutSupport && rt.maxStreamDuration != 0 { 183 config.MethodConfig.Timeout = &rt.maxStreamDuration 184 } 185 186 return config, nil 187} 188 189func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) { 190 if len(cs.httpFilterConfig) == 0 { 191 return nil, nil 192 } 193 interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig)) 194 for _, filter := range cs.httpFilterConfig { 195 if router.IsRouterFilter(filter.Filter) { 196 // Ignore any filters after the router filter. The router itself 197 // is currently a nop. 198 return &interceptorList{interceptors: interceptors}, nil 199 } 200 override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority 201 if override == nil { 202 override = rt.httpFilterConfigOverride[filter.Name] // route is second priority 203 } 204 if override == nil { 205 override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority 206 } 207 ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) 208 if !ok { 209 // Should not happen if it passed xdsClient validation. 210 return nil, fmt.Errorf("filter does not support use in client") 211 } 212 i, err := ib.BuildClientInterceptor(filter.Config, override) 213 if err != nil { 214 return nil, fmt.Errorf("error constructing filter: %v", err) 215 } 216 if i != nil { 217 interceptors = append(interceptors, i) 218 } 219 } 220 return nil, fmt.Errorf("error in xds config: no router filter present") 221} 222 223// stop decrements refs of all clusters referenced by this config selector. 224func (cs *configSelector) stop() { 225 // The resolver's old configSelector may be nil. Handle that here. 226 if cs == nil { 227 return 228 } 229 // If any refs drop to zero, we'll need a service config update to delete 230 // the cluster. 231 needUpdate := false 232 // Loops over cs.clusters, but these are pointers to entries in 233 // activeClusters. 234 for _, ci := range cs.clusters { 235 if v := atomic.AddInt32(&ci.refCount, -1); v == 0 { 236 needUpdate = true 237 } 238 } 239 // We stop the old config selector immediately after sending a new config 240 // selector; we need another update to delete clusters from the config (if 241 // we don't have another update pending already). 242 if needUpdate { 243 select { 244 case cs.r.updateCh <- suWithError{emptyUpdate: true}: 245 default: 246 } 247 } 248} 249 250// A global for testing. 251var newWRR = wrr.NewRandom 252 253// newConfigSelector creates the config selector for su; may add entries to 254// r.activeClusters for previously-unseen clusters. 255func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) { 256 cs := &configSelector{ 257 r: r, 258 virtualHost: virtualHost{httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride}, 259 routes: make([]route, len(su.virtualHost.Routes)), 260 clusters: make(map[string]*clusterInfo), 261 httpFilterConfig: su.ldsConfig.httpFilterConfig, 262 } 263 264 for i, rt := range su.virtualHost.Routes { 265 clusters := newWRR() 266 for cluster, wc := range rt.WeightedClusters { 267 clusters.Add(&routeCluster{ 268 name: cluster, 269 httpFilterConfigOverride: wc.HTTPFilterConfigOverride, 270 }, int64(wc.Weight)) 271 272 // Initialize entries in cs.clusters map, creating entries in 273 // r.activeClusters as necessary. Set to zero as they will be 274 // incremented by incRefs. 275 ci := r.activeClusters[cluster] 276 if ci == nil { 277 ci = &clusterInfo{refCount: 0} 278 r.activeClusters[cluster] = ci 279 } 280 cs.clusters[cluster] = ci 281 } 282 cs.routes[i].clusters = clusters 283 284 var err error 285 cs.routes[i].m, err = routeToMatcher(rt) 286 if err != nil { 287 return nil, err 288 } 289 if rt.MaxStreamDuration == nil { 290 cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration 291 } else { 292 cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration 293 } 294 295 cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride 296 } 297 298 // Account for this config selector's clusters. Do this after no further 299 // errors may occur. Note: cs.clusters are pointers to entries in 300 // activeClusters. 301 for _, ci := range cs.clusters { 302 atomic.AddInt32(&ci.refCount, 1) 303 } 304 305 return cs, nil 306} 307 308type clusterInfo struct { 309 // number of references to this cluster; accessed atomically 310 refCount int32 311} 312 313type interceptorList struct { 314 interceptors []iresolver.ClientInterceptor 315} 316 317func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { 318 for i := len(il.interceptors) - 1; i >= 0; i-- { 319 ns := newStream 320 interceptor := il.interceptors[i] 321 newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { 322 return interceptor.NewStream(ctx, ri, done, ns) 323 } 324 } 325 return newStream(ctx, func() {}) 326} 327