1/*
2 *
3 * Copyright 2021 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package clusterresolver
20
21import (
22	"sync"
23
24	"google.golang.org/grpc/xds/internal/xdsclient"
25)
26
27// resourceUpdate is a combined update from all the resources, in the order of
28// priority. For example, it can be {EDS, EDS, DNS}.
29type resourceUpdate struct {
30	priorities []priorityConfig
31	err        error
32}
33
34type discoveryMechanism interface {
35	lastUpdate() (interface{}, bool)
36	resolveNow()
37	stop()
38}
39
40// discoveryMechanismKey is {type+resource_name}, it's used as the map key, so
41// that the same resource resolver can be reused (e.g. when there are two
42// mechanisms, both for the same EDS resource, but has different circuit
43// breaking config.
44type discoveryMechanismKey struct {
45	typ  DiscoveryMechanismType
46	name string
47}
48
49// resolverMechanismTuple is needed to keep the resolver and the discovery
50// mechanism together, because resolvers can be shared. And we need the
51// mechanism for fields like circuit breaking, LRS etc when generating the
52// balancer config.
53type resolverMechanismTuple struct {
54	dm    DiscoveryMechanism
55	dmKey discoveryMechanismKey
56	r     discoveryMechanism
57}
58
59type resourceResolver struct {
60	parent        *clusterResolverBalancer
61	updateChannel chan *resourceUpdate
62
63	// mu protects the slice and map, and content of the resolvers in the slice.
64	mu          sync.Mutex
65	mechanisms  []DiscoveryMechanism
66	children    []resolverMechanismTuple
67	childrenMap map[discoveryMechanismKey]discoveryMechanism
68}
69
70func newResourceResolver(parent *clusterResolverBalancer) *resourceResolver {
71	return &resourceResolver{
72		parent:        parent,
73		updateChannel: make(chan *resourceUpdate, 1),
74		childrenMap:   make(map[discoveryMechanismKey]discoveryMechanism),
75	}
76}
77
78func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool {
79	if len(a) != len(b) {
80		return false
81	}
82	for i, aa := range a {
83		bb := b[i]
84		if !aa.Equal(bb) {
85			return false
86		}
87	}
88	return true
89}
90
91func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
92	rr.mu.Lock()
93	defer rr.mu.Unlock()
94	if equalDiscoveryMechanisms(rr.mechanisms, mechanisms) {
95		return
96	}
97	rr.mechanisms = mechanisms
98	rr.children = make([]resolverMechanismTuple, len(mechanisms))
99	newDMs := make(map[discoveryMechanismKey]bool)
100
101	// Start one watch for each new discover mechanism {type+resource_name}.
102	for i, dm := range mechanisms {
103		switch dm.Type {
104		case DiscoveryMechanismTypeEDS:
105			// If EDSServiceName is not set, use the cluster name as EDS service
106			// name to watch.
107			nameToWatch := dm.EDSServiceName
108			if nameToWatch == "" {
109				nameToWatch = dm.Cluster
110			}
111			dmKey := discoveryMechanismKey{typ: dm.Type, name: nameToWatch}
112			newDMs[dmKey] = true
113
114			r := rr.childrenMap[dmKey]
115			if r == nil {
116				r = newEDSResolver(nameToWatch, rr.parent.xdsClient, rr)
117				rr.childrenMap[dmKey] = r
118			}
119			rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r}
120		case DiscoveryMechanismTypeLogicalDNS:
121			// Name to resolve in DNS is the hostname, not the ClientConn
122			// target.
123			dmKey := discoveryMechanismKey{typ: dm.Type, name: dm.DNSHostname}
124			newDMs[dmKey] = true
125
126			r := rr.childrenMap[dmKey]
127			if r == nil {
128				r = newDNSResolver(dm.DNSHostname, rr)
129				rr.childrenMap[dmKey] = r
130			}
131			rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r}
132		}
133	}
134	// Stop the resources that were removed.
135	for dm, r := range rr.childrenMap {
136		if !newDMs[dm] {
137			delete(rr.childrenMap, dm)
138			r.stop()
139		}
140	}
141	// Regenerate even if there's no change in discovery mechanism, in case
142	// priority order changed.
143	rr.generate()
144}
145
146// resolveNow is typically called to trigger re-resolve of DNS. The EDS
147// resolveNow() is a noop.
148func (rr *resourceResolver) resolveNow() {
149	rr.mu.Lock()
150	defer rr.mu.Unlock()
151	for _, r := range rr.childrenMap {
152		r.resolveNow()
153	}
154}
155
156func (rr *resourceResolver) stop() {
157	rr.mu.Lock()
158	defer rr.mu.Unlock()
159	for dm, r := range rr.childrenMap {
160		delete(rr.childrenMap, dm)
161		r.stop()
162	}
163	rr.mechanisms = nil
164	rr.children = nil
165}
166
167// generate collects all the updates from all the resolvers, and push the
168// combined result into the update channel. It only pushes the update when all
169// the child resolvers have received at least one update, otherwise it will
170// wait.
171//
172// caller must hold rr.mu.
173func (rr *resourceResolver) generate() {
174	var ret []priorityConfig
175	for _, rDM := range rr.children {
176		r, ok := rr.childrenMap[rDM.dmKey]
177		if !ok {
178			rr.parent.logger.Infof("resolver for %+v not found, should never happen", rDM.dmKey)
179			continue
180		}
181
182		u, ok := r.lastUpdate()
183		if !ok {
184			// Don't send updates to parent until all resolvers have update to
185			// send.
186			return
187		}
188		switch uu := u.(type) {
189		case xdsclient.EndpointsUpdate:
190			ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu})
191		case []string:
192			ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu})
193		}
194	}
195	select {
196	case <-rr.updateChannel:
197	default:
198	}
199	rr.updateChannel <- &resourceUpdate{priorities: ret}
200}
201
202type edsDiscoveryMechanism struct {
203	cancel func()
204
205	update         xdsclient.EndpointsUpdate
206	updateReceived bool
207}
208
209func (er *edsDiscoveryMechanism) lastUpdate() (interface{}, bool) {
210	if !er.updateReceived {
211		return nil, false
212	}
213	return er.update, true
214}
215
216func (er *edsDiscoveryMechanism) resolveNow() {
217}
218
219func (er *edsDiscoveryMechanism) stop() {
220	er.cancel()
221}
222
223// newEDSResolver starts the EDS watch on the given xds client.
224func newEDSResolver(nameToWatch string, xdsc xdsclient.XDSClient, topLevelResolver *resourceResolver) *edsDiscoveryMechanism {
225	ret := &edsDiscoveryMechanism{}
226	topLevelResolver.parent.logger.Infof("EDS watch started on %v", nameToWatch)
227	cancel := xdsc.WatchEndpoints(nameToWatch, func(update xdsclient.EndpointsUpdate, err error) {
228		topLevelResolver.mu.Lock()
229		defer topLevelResolver.mu.Unlock()
230		if err != nil {
231			select {
232			case <-topLevelResolver.updateChannel:
233			default:
234			}
235			topLevelResolver.updateChannel <- &resourceUpdate{err: err}
236			return
237		}
238		ret.update = update
239		ret.updateReceived = true
240		topLevelResolver.generate()
241	})
242	ret.cancel = func() {
243		topLevelResolver.parent.logger.Infof("EDS watch canceled on %v", nameToWatch)
244		cancel()
245	}
246	return ret
247}
248