1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package resolver
20
21import (
22	"context"
23	"encoding/json"
24	"fmt"
25	"sync/atomic"
26	"time"
27
28	"google.golang.org/grpc/codes"
29	iresolver "google.golang.org/grpc/internal/resolver"
30	"google.golang.org/grpc/internal/wrr"
31	"google.golang.org/grpc/internal/xds/env"
32	"google.golang.org/grpc/status"
33	"google.golang.org/grpc/xds/internal/balancer/clustermanager"
34	xdsclient "google.golang.org/grpc/xds/internal/client"
35	"google.golang.org/grpc/xds/internal/httpfilter"
36	"google.golang.org/grpc/xds/internal/httpfilter/router"
37)
38
39const (
40	cdsName               = "cds_experimental"
41	xdsClusterManagerName = "xds_cluster_manager_experimental"
42)
43
44type serviceConfig struct {
45	LoadBalancingConfig balancerConfig `json:"loadBalancingConfig"`
46}
47
48type balancerConfig []map[string]interface{}
49
50func newBalancerConfig(name string, config interface{}) balancerConfig {
51	return []map[string]interface{}{{name: config}}
52}
53
54type cdsBalancerConfig struct {
55	Cluster string `json:"cluster"`
56}
57
58type xdsChildConfig struct {
59	ChildPolicy balancerConfig `json:"childPolicy"`
60}
61
62type xdsClusterManagerConfig struct {
63	Children map[string]xdsChildConfig `json:"children"`
64}
65
66// pruneActiveClusters deletes entries in r.activeClusters with zero
67// references.
68func (r *xdsResolver) pruneActiveClusters() {
69	for cluster, ci := range r.activeClusters {
70		if atomic.LoadInt32(&ci.refCount) == 0 {
71			delete(r.activeClusters, cluster)
72		}
73	}
74}
75
76// serviceConfigJSON produces a service config in JSON format representing all
77// the clusters referenced in activeClusters.  This includes clusters with zero
78// references, so they must be pruned first.
79func serviceConfigJSON(activeClusters map[string]*clusterInfo) (string, error) {
80	// Generate children (all entries in activeClusters).
81	children := make(map[string]xdsChildConfig)
82	for cluster := range activeClusters {
83		children[cluster] = xdsChildConfig{
84			ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}),
85		}
86	}
87
88	sc := serviceConfig{
89		LoadBalancingConfig: newBalancerConfig(
90			xdsClusterManagerName, xdsClusterManagerConfig{Children: children},
91		),
92	}
93
94	bs, err := json.Marshal(sc)
95	if err != nil {
96		return "", fmt.Errorf("failed to marshal json: %v", err)
97	}
98	return string(bs), nil
99}
100
101type virtualHost struct {
102	// map from filter name to its config
103	httpFilterConfigOverride map[string]httpfilter.FilterConfig
104}
105
106// routeCluster holds information about a cluster as referenced by a route.
107type routeCluster struct {
108	name string
109	// map from filter name to its config
110	httpFilterConfigOverride map[string]httpfilter.FilterConfig
111}
112
113type route struct {
114	m                 *compositeMatcher // converted from route matchers
115	clusters          wrr.WRR           // holds *routeCluster entries
116	maxStreamDuration time.Duration
117	// map from filter name to its config
118	httpFilterConfigOverride map[string]httpfilter.FilterConfig
119}
120
121func (r route) String() string {
122	return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration)
123}
124
125type configSelector struct {
126	r                *xdsResolver
127	virtualHost      virtualHost
128	routes           []route
129	clusters         map[string]*clusterInfo
130	httpFilterConfig []xdsclient.HTTPFilter
131}
132
133var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")
134
135func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
136	if cs == nil {
137		return nil, status.Errorf(codes.Unavailable, "no valid clusters")
138	}
139	var rt *route
140	// Loop through routes in order and select first match.
141	for _, r := range cs.routes {
142		if r.m.match(rpcInfo) {
143			rt = &r
144			break
145		}
146	}
147	if rt == nil || rt.clusters == nil {
148		return nil, errNoMatchedRouteFound
149	}
150	cluster, ok := rt.clusters.Next().(*routeCluster)
151	if !ok {
152		return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
153	}
154	// Add a ref to the selected cluster, as this RPC needs this cluster until
155	// it is committed.
156	ref := &cs.clusters[cluster.name].refCount
157	atomic.AddInt32(ref, 1)
158
159	interceptor, err := cs.newInterceptor(rt, cluster)
160	if err != nil {
161		return nil, err
162	}
163
164	config := &iresolver.RPCConfig{
165		// Communicate to the LB policy the chosen cluster.
166		Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name),
167		OnCommitted: func() {
168			// When the RPC is committed, the cluster is no longer required.
169			// Decrease its ref.
170			if v := atomic.AddInt32(ref, -1); v == 0 {
171				// This entry will be removed from activeClusters when
172				// producing the service config for the empty update.
173				select {
174				case cs.r.updateCh <- suWithError{emptyUpdate: true}:
175				default:
176				}
177			}
178		},
179		Interceptor: interceptor,
180	}
181
182	if env.TimeoutSupport && rt.maxStreamDuration != 0 {
183		config.MethodConfig.Timeout = &rt.maxStreamDuration
184	}
185
186	return config, nil
187}
188
189func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
190	if len(cs.httpFilterConfig) == 0 {
191		return nil, nil
192	}
193	interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig))
194	for _, filter := range cs.httpFilterConfig {
195		if router.IsRouterFilter(filter.Filter) {
196			// Ignore any filters after the router filter.  The router itself
197			// is currently a nop.
198			return &interceptorList{interceptors: interceptors}, nil
199		}
200		override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority
201		if override == nil {
202			override = rt.httpFilterConfigOverride[filter.Name] // route is second priority
203		}
204		if override == nil {
205			override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority
206		}
207		ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
208		if !ok {
209			// Should not happen if it passed xdsClient validation.
210			return nil, fmt.Errorf("filter does not support use in client")
211		}
212		i, err := ib.BuildClientInterceptor(filter.Config, override)
213		if err != nil {
214			return nil, fmt.Errorf("error constructing filter: %v", err)
215		}
216		if i != nil {
217			interceptors = append(interceptors, i)
218		}
219	}
220	return nil, fmt.Errorf("error in xds config: no router filter present")
221}
222
223// stop decrements refs of all clusters referenced by this config selector.
224func (cs *configSelector) stop() {
225	// The resolver's old configSelector may be nil.  Handle that here.
226	if cs == nil {
227		return
228	}
229	// If any refs drop to zero, we'll need a service config update to delete
230	// the cluster.
231	needUpdate := false
232	// Loops over cs.clusters, but these are pointers to entries in
233	// activeClusters.
234	for _, ci := range cs.clusters {
235		if v := atomic.AddInt32(&ci.refCount, -1); v == 0 {
236			needUpdate = true
237		}
238	}
239	// We stop the old config selector immediately after sending a new config
240	// selector; we need another update to delete clusters from the config (if
241	// we don't have another update pending already).
242	if needUpdate {
243		select {
244		case cs.r.updateCh <- suWithError{emptyUpdate: true}:
245		default:
246		}
247	}
248}
249
250// A global for testing.
251var newWRR = wrr.NewRandom
252
253// newConfigSelector creates the config selector for su; may add entries to
254// r.activeClusters for previously-unseen clusters.
255func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
256	cs := &configSelector{
257		r:                r,
258		virtualHost:      virtualHost{httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride},
259		routes:           make([]route, len(su.virtualHost.Routes)),
260		clusters:         make(map[string]*clusterInfo),
261		httpFilterConfig: su.ldsConfig.httpFilterConfig,
262	}
263
264	for i, rt := range su.virtualHost.Routes {
265		clusters := newWRR()
266		for cluster, wc := range rt.WeightedClusters {
267			clusters.Add(&routeCluster{
268				name:                     cluster,
269				httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
270			}, int64(wc.Weight))
271
272			// Initialize entries in cs.clusters map, creating entries in
273			// r.activeClusters as necessary.  Set to zero as they will be
274			// incremented by incRefs.
275			ci := r.activeClusters[cluster]
276			if ci == nil {
277				ci = &clusterInfo{refCount: 0}
278				r.activeClusters[cluster] = ci
279			}
280			cs.clusters[cluster] = ci
281		}
282		cs.routes[i].clusters = clusters
283
284		var err error
285		cs.routes[i].m, err = routeToMatcher(rt)
286		if err != nil {
287			return nil, err
288		}
289		if rt.MaxStreamDuration == nil {
290			cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
291		} else {
292			cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
293		}
294
295		cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
296	}
297
298	// Account for this config selector's clusters.  Do this after no further
299	// errors may occur.  Note: cs.clusters are pointers to entries in
300	// activeClusters.
301	for _, ci := range cs.clusters {
302		atomic.AddInt32(&ci.refCount, 1)
303	}
304
305	return cs, nil
306}
307
308type clusterInfo struct {
309	// number of references to this cluster; accessed atomically
310	refCount int32
311}
312
313type interceptorList struct {
314	interceptors []iresolver.ClientInterceptor
315}
316
317func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
318	for i := len(il.interceptors) - 1; i >= 0; i-- {
319		ns := newStream
320		interceptor := il.interceptors[i]
321		newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
322			return interceptor.NewStream(ctx, ri, done, ns)
323		}
324	}
325	return newStream(ctx, func() {})
326}
327