1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package resolver
20
21import (
22	"context"
23	"encoding/json"
24	"fmt"
25	"math/bits"
26	"strings"
27	"sync/atomic"
28	"time"
29
30	xxhash "github.com/cespare/xxhash/v2"
31	"google.golang.org/grpc/codes"
32	"google.golang.org/grpc/internal/grpcrand"
33	iresolver "google.golang.org/grpc/internal/resolver"
34	"google.golang.org/grpc/internal/serviceconfig"
35	"google.golang.org/grpc/internal/wrr"
36	"google.golang.org/grpc/internal/xds/env"
37	"google.golang.org/grpc/metadata"
38	"google.golang.org/grpc/status"
39	"google.golang.org/grpc/xds/internal/balancer/clustermanager"
40	"google.golang.org/grpc/xds/internal/balancer/ringhash"
41	"google.golang.org/grpc/xds/internal/httpfilter"
42	"google.golang.org/grpc/xds/internal/httpfilter/router"
43	"google.golang.org/grpc/xds/internal/xdsclient"
44)
45
46const (
47	cdsName               = "cds_experimental"
48	xdsClusterManagerName = "xds_cluster_manager_experimental"
49)
50
51type serviceConfig struct {
52	LoadBalancingConfig balancerConfig `json:"loadBalancingConfig"`
53}
54
55type balancerConfig []map[string]interface{}
56
57func newBalancerConfig(name string, config interface{}) balancerConfig {
58	return []map[string]interface{}{{name: config}}
59}
60
61type cdsBalancerConfig struct {
62	Cluster string `json:"cluster"`
63}
64
65type xdsChildConfig struct {
66	ChildPolicy balancerConfig `json:"childPolicy"`
67}
68
69type xdsClusterManagerConfig struct {
70	Children map[string]xdsChildConfig `json:"children"`
71}
72
73// pruneActiveClusters deletes entries in r.activeClusters with zero
74// references.
75func (r *xdsResolver) pruneActiveClusters() {
76	for cluster, ci := range r.activeClusters {
77		if atomic.LoadInt32(&ci.refCount) == 0 {
78			delete(r.activeClusters, cluster)
79		}
80	}
81}
82
83// serviceConfigJSON produces a service config in JSON format representing all
84// the clusters referenced in activeClusters.  This includes clusters with zero
85// references, so they must be pruned first.
86func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
87	// Generate children (all entries in activeClusters).
88	children := make(map[string]xdsChildConfig)
89	for cluster := range activeClusters {
90		children[cluster] = xdsChildConfig{
91			ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}),
92		}
93	}
94
95	sc := serviceConfig{
96		LoadBalancingConfig: newBalancerConfig(
97			xdsClusterManagerName, xdsClusterManagerConfig{Children: children},
98		),
99	}
100
101	bs, err := json.Marshal(sc)
102	if err != nil {
103		return nil, fmt.Errorf("failed to marshal json: %v", err)
104	}
105	return bs, nil
106}
107
108type virtualHost struct {
109	// map from filter name to its config
110	httpFilterConfigOverride map[string]httpfilter.FilterConfig
111	// retry policy present in virtual host
112	retryConfig *xdsclient.RetryConfig
113}
114
115// routeCluster holds information about a cluster as referenced by a route.
116type routeCluster struct {
117	name string
118	// map from filter name to its config
119	httpFilterConfigOverride map[string]httpfilter.FilterConfig
120}
121
122type route struct {
123	m                 *xdsclient.CompositeMatcher // converted from route matchers
124	clusters          wrr.WRR                     // holds *routeCluster entries
125	maxStreamDuration time.Duration
126	// map from filter name to its config
127	httpFilterConfigOverride map[string]httpfilter.FilterConfig
128	retryConfig              *xdsclient.RetryConfig
129	hashPolicies             []*xdsclient.HashPolicy
130}
131
132func (r route) String() string {
133	return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration)
134}
135
136type configSelector struct {
137	r                *xdsResolver
138	virtualHost      virtualHost
139	routes           []route
140	clusters         map[string]*clusterInfo
141	httpFilterConfig []xdsclient.HTTPFilter
142}
143
144var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")
145
146func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
147	if cs == nil {
148		return nil, status.Errorf(codes.Unavailable, "no valid clusters")
149	}
150	var rt *route
151	// Loop through routes in order and select first match.
152	for _, r := range cs.routes {
153		if r.m.Match(rpcInfo) {
154			rt = &r
155			break
156		}
157	}
158	if rt == nil || rt.clusters == nil {
159		return nil, errNoMatchedRouteFound
160	}
161	cluster, ok := rt.clusters.Next().(*routeCluster)
162	if !ok {
163		return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
164	}
165	// Add a ref to the selected cluster, as this RPC needs this cluster until
166	// it is committed.
167	ref := &cs.clusters[cluster.name].refCount
168	atomic.AddInt32(ref, 1)
169
170	interceptor, err := cs.newInterceptor(rt, cluster)
171	if err != nil {
172		return nil, err
173	}
174
175	lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name)
176	// Request Hashes are only applicable for a Ring Hash LB.
177	if env.RingHashSupport {
178		lbCtx = ringhash.SetRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies))
179	}
180
181	config := &iresolver.RPCConfig{
182		// Communicate to the LB policy the chosen cluster and request hash, if Ring Hash LB policy.
183		Context: lbCtx,
184		OnCommitted: func() {
185			// When the RPC is committed, the cluster is no longer required.
186			// Decrease its ref.
187			if v := atomic.AddInt32(ref, -1); v == 0 {
188				// This entry will be removed from activeClusters when
189				// producing the service config for the empty update.
190				select {
191				case cs.r.updateCh <- suWithError{emptyUpdate: true}:
192				default:
193				}
194			}
195		},
196		Interceptor: interceptor,
197	}
198
199	if rt.maxStreamDuration != 0 {
200		config.MethodConfig.Timeout = &rt.maxStreamDuration
201	}
202	if rt.retryConfig != nil {
203		config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig)
204	} else if cs.virtualHost.retryConfig != nil {
205		config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig)
206	}
207
208	return config, nil
209}
210
211func retryConfigToPolicy(config *xdsclient.RetryConfig) *serviceconfig.RetryPolicy {
212	return &serviceconfig.RetryPolicy{
213		MaxAttempts:          int(config.NumRetries) + 1,
214		InitialBackoff:       config.RetryBackoff.BaseInterval,
215		MaxBackoff:           config.RetryBackoff.MaxInterval,
216		BackoffMultiplier:    2,
217		RetryableStatusCodes: config.RetryOn,
218	}
219}
220
221func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*xdsclient.HashPolicy) uint64 {
222	var hash uint64
223	var generatedHash bool
224	for _, policy := range hashPolicies {
225		var policyHash uint64
226		var generatedPolicyHash bool
227		switch policy.HashPolicyType {
228		case xdsclient.HashPolicyTypeHeader:
229			md, ok := metadata.FromOutgoingContext(rpcInfo.Context)
230			if !ok {
231				continue
232			}
233			values := md.Get(policy.HeaderName)
234			// If the header isn't present, no-op.
235			if len(values) == 0 {
236				continue
237			}
238			joinedValues := strings.Join(values, ",")
239			if policy.Regex != nil {
240				joinedValues = policy.Regex.ReplaceAllString(joinedValues, policy.RegexSubstitution)
241			}
242			policyHash = xxhash.Sum64String(joinedValues)
243			generatedHash = true
244			generatedPolicyHash = true
245		case xdsclient.HashPolicyTypeChannelID:
246			// Hash the ClientConn pointer which logically uniquely
247			// identifies the client.
248			policyHash = xxhash.Sum64String(fmt.Sprintf("%p", &cs.r.cc))
249			generatedHash = true
250			generatedPolicyHash = true
251		}
252
253		// Deterministically combine the hash policies. Rotating prevents
254		// duplicate hash policies from cancelling each other out and preserves
255		// the 64 bits of entropy.
256		if generatedPolicyHash {
257			hash = bits.RotateLeft64(hash, 1)
258			hash = hash ^ policyHash
259		}
260
261		// If terminal policy and a hash has already been generated, ignore the
262		// rest of the policies and use that hash already generated.
263		if policy.Terminal && generatedHash {
264			break
265		}
266	}
267
268	if generatedHash {
269		return hash
270	}
271	// If no generated hash return a random long. In the grand scheme of things
272	// this logically will map to choosing a random backend to route request to.
273	return grpcrand.Uint64()
274}
275
276func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
277	if len(cs.httpFilterConfig) == 0 {
278		return nil, nil
279	}
280	interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig))
281	for _, filter := range cs.httpFilterConfig {
282		if router.IsRouterFilter(filter.Filter) {
283			// Ignore any filters after the router filter.  The router itself
284			// is currently a nop.
285			return &interceptorList{interceptors: interceptors}, nil
286		}
287		override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority
288		if override == nil {
289			override = rt.httpFilterConfigOverride[filter.Name] // route is second priority
290		}
291		if override == nil {
292			override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority
293		}
294		ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
295		if !ok {
296			// Should not happen if it passed xdsClient validation.
297			return nil, fmt.Errorf("filter does not support use in client")
298		}
299		i, err := ib.BuildClientInterceptor(filter.Config, override)
300		if err != nil {
301			return nil, fmt.Errorf("error constructing filter: %v", err)
302		}
303		if i != nil {
304			interceptors = append(interceptors, i)
305		}
306	}
307	return nil, fmt.Errorf("error in xds config: no router filter present")
308}
309
310// stop decrements refs of all clusters referenced by this config selector.
311func (cs *configSelector) stop() {
312	// The resolver's old configSelector may be nil.  Handle that here.
313	if cs == nil {
314		return
315	}
316	// If any refs drop to zero, we'll need a service config update to delete
317	// the cluster.
318	needUpdate := false
319	// Loops over cs.clusters, but these are pointers to entries in
320	// activeClusters.
321	for _, ci := range cs.clusters {
322		if v := atomic.AddInt32(&ci.refCount, -1); v == 0 {
323			needUpdate = true
324		}
325	}
326	// We stop the old config selector immediately after sending a new config
327	// selector; we need another update to delete clusters from the config (if
328	// we don't have another update pending already).
329	if needUpdate {
330		select {
331		case cs.r.updateCh <- suWithError{emptyUpdate: true}:
332		default:
333		}
334	}
335}
336
337// A global for testing.
338var newWRR = wrr.NewRandom
339
340// newConfigSelector creates the config selector for su; may add entries to
341// r.activeClusters for previously-unseen clusters.
342func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
343	cs := &configSelector{
344		r: r,
345		virtualHost: virtualHost{
346			httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
347			retryConfig:              su.virtualHost.RetryConfig,
348		},
349		routes:           make([]route, len(su.virtualHost.Routes)),
350		clusters:         make(map[string]*clusterInfo),
351		httpFilterConfig: su.ldsConfig.httpFilterConfig,
352	}
353
354	for i, rt := range su.virtualHost.Routes {
355		clusters := newWRR()
356		for cluster, wc := range rt.WeightedClusters {
357			clusters.Add(&routeCluster{
358				name:                     cluster,
359				httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
360			}, int64(wc.Weight))
361
362			// Initialize entries in cs.clusters map, creating entries in
363			// r.activeClusters as necessary.  Set to zero as they will be
364			// incremented by incRefs.
365			ci := r.activeClusters[cluster]
366			if ci == nil {
367				ci = &clusterInfo{refCount: 0}
368				r.activeClusters[cluster] = ci
369			}
370			cs.clusters[cluster] = ci
371		}
372		cs.routes[i].clusters = clusters
373
374		var err error
375		cs.routes[i].m, err = xdsclient.RouteToMatcher(rt)
376		if err != nil {
377			return nil, err
378		}
379		if rt.MaxStreamDuration == nil {
380			cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
381		} else {
382			cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
383		}
384
385		cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
386		cs.routes[i].retryConfig = rt.RetryConfig
387		cs.routes[i].hashPolicies = rt.HashPolicies
388	}
389
390	// Account for this config selector's clusters.  Do this after no further
391	// errors may occur.  Note: cs.clusters are pointers to entries in
392	// activeClusters.
393	for _, ci := range cs.clusters {
394		atomic.AddInt32(&ci.refCount, 1)
395	}
396
397	return cs, nil
398}
399
400type clusterInfo struct {
401	// number of references to this cluster; accessed atomically
402	refCount int32
403}
404
405type interceptorList struct {
406	interceptors []iresolver.ClientInterceptor
407}
408
409func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
410	for i := len(il.interceptors) - 1; i >= 0; i-- {
411		ns := newStream
412		interceptor := il.interceptors[i]
413		newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
414			return interceptor.NewStream(ctx, ri, done, ns)
415		}
416	}
417	return newStream(ctx, func() {})
418}
419