1/*
2 *
3 * Copyright 2021 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package clusterresolver
20
21import (
22	"sync"
23
24	"google.golang.org/grpc/xds/internal/xdsclient"
25	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
26)
27
28// resourceUpdate is a combined update from all the resources, in the order of
29// priority. For example, it can be {EDS, EDS, DNS}.
30type resourceUpdate struct {
31	priorities []priorityConfig
32	err        error
33}
34
35type discoveryMechanism interface {
36	lastUpdate() (interface{}, bool)
37	resolveNow()
38	stop()
39}
40
41// discoveryMechanismKey is {type+resource_name}, it's used as the map key, so
42// that the same resource resolver can be reused (e.g. when there are two
43// mechanisms, both for the same EDS resource, but has different circuit
44// breaking config.
45type discoveryMechanismKey struct {
46	typ  DiscoveryMechanismType
47	name string
48}
49
50// resolverMechanismTuple is needed to keep the resolver and the discovery
51// mechanism together, because resolvers can be shared. And we need the
52// mechanism for fields like circuit breaking, LRS etc when generating the
53// balancer config.
54type resolverMechanismTuple struct {
55	dm    DiscoveryMechanism
56	dmKey discoveryMechanismKey
57	r     discoveryMechanism
58}
59
60type resourceResolver struct {
61	parent        *clusterResolverBalancer
62	updateChannel chan *resourceUpdate
63
64	// mu protects the slice and map, and content of the resolvers in the slice.
65	mu          sync.Mutex
66	mechanisms  []DiscoveryMechanism
67	children    []resolverMechanismTuple
68	childrenMap map[discoveryMechanismKey]discoveryMechanism
69}
70
71func newResourceResolver(parent *clusterResolverBalancer) *resourceResolver {
72	return &resourceResolver{
73		parent:        parent,
74		updateChannel: make(chan *resourceUpdate, 1),
75		childrenMap:   make(map[discoveryMechanismKey]discoveryMechanism),
76	}
77}
78
79func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool {
80	if len(a) != len(b) {
81		return false
82	}
83	for i, aa := range a {
84		bb := b[i]
85		if !aa.Equal(bb) {
86			return false
87		}
88	}
89	return true
90}
91
92func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
93	rr.mu.Lock()
94	defer rr.mu.Unlock()
95	if equalDiscoveryMechanisms(rr.mechanisms, mechanisms) {
96		return
97	}
98	rr.mechanisms = mechanisms
99	rr.children = make([]resolverMechanismTuple, len(mechanisms))
100	newDMs := make(map[discoveryMechanismKey]bool)
101
102	// Start one watch for each new discover mechanism {type+resource_name}.
103	for i, dm := range mechanisms {
104		switch dm.Type {
105		case DiscoveryMechanismTypeEDS:
106			// If EDSServiceName is not set, use the cluster name as EDS service
107			// name to watch.
108			nameToWatch := dm.EDSServiceName
109			if nameToWatch == "" {
110				nameToWatch = dm.Cluster
111			}
112			dmKey := discoveryMechanismKey{typ: dm.Type, name: nameToWatch}
113			newDMs[dmKey] = true
114
115			r := rr.childrenMap[dmKey]
116			if r == nil {
117				r = newEDSResolver(nameToWatch, rr.parent.xdsClient, rr)
118				rr.childrenMap[dmKey] = r
119			}
120			rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r}
121		case DiscoveryMechanismTypeLogicalDNS:
122			// Name to resolve in DNS is the hostname, not the ClientConn
123			// target.
124			dmKey := discoveryMechanismKey{typ: dm.Type, name: dm.DNSHostname}
125			newDMs[dmKey] = true
126
127			r := rr.childrenMap[dmKey]
128			if r == nil {
129				r = newDNSResolver(dm.DNSHostname, rr)
130				rr.childrenMap[dmKey] = r
131			}
132			rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r}
133		}
134	}
135	// Stop the resources that were removed.
136	for dm, r := range rr.childrenMap {
137		if !newDMs[dm] {
138			delete(rr.childrenMap, dm)
139			r.stop()
140		}
141	}
142	// Regenerate even if there's no change in discovery mechanism, in case
143	// priority order changed.
144	rr.generate()
145}
146
147// resolveNow is typically called to trigger re-resolve of DNS. The EDS
148// resolveNow() is a noop.
149func (rr *resourceResolver) resolveNow() {
150	rr.mu.Lock()
151	defer rr.mu.Unlock()
152	for _, r := range rr.childrenMap {
153		r.resolveNow()
154	}
155}
156
157func (rr *resourceResolver) stop() {
158	rr.mu.Lock()
159	defer rr.mu.Unlock()
160	for dm, r := range rr.childrenMap {
161		delete(rr.childrenMap, dm)
162		r.stop()
163	}
164	rr.mechanisms = nil
165	rr.children = nil
166}
167
168// generate collects all the updates from all the resolvers, and push the
169// combined result into the update channel. It only pushes the update when all
170// the child resolvers have received at least one update, otherwise it will
171// wait.
172//
173// caller must hold rr.mu.
174func (rr *resourceResolver) generate() {
175	var ret []priorityConfig
176	for _, rDM := range rr.children {
177		r, ok := rr.childrenMap[rDM.dmKey]
178		if !ok {
179			rr.parent.logger.Infof("resolver for %+v not found, should never happen", rDM.dmKey)
180			continue
181		}
182
183		u, ok := r.lastUpdate()
184		if !ok {
185			// Don't send updates to parent until all resolvers have update to
186			// send.
187			return
188		}
189		switch uu := u.(type) {
190		case xdsresource.EndpointsUpdate:
191			ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu})
192		case []string:
193			ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu})
194		}
195	}
196	select {
197	case <-rr.updateChannel:
198	default:
199	}
200	rr.updateChannel <- &resourceUpdate{priorities: ret}
201}
202
203type edsDiscoveryMechanism struct {
204	cancel func()
205
206	update         xdsresource.EndpointsUpdate
207	updateReceived bool
208}
209
210func (er *edsDiscoveryMechanism) lastUpdate() (interface{}, bool) {
211	if !er.updateReceived {
212		return nil, false
213	}
214	return er.update, true
215}
216
217func (er *edsDiscoveryMechanism) resolveNow() {
218}
219
220func (er *edsDiscoveryMechanism) stop() {
221	er.cancel()
222}
223
224// newEDSResolver starts the EDS watch on the given xds client.
225func newEDSResolver(nameToWatch string, xdsc xdsclient.XDSClient, topLevelResolver *resourceResolver) *edsDiscoveryMechanism {
226	ret := &edsDiscoveryMechanism{}
227	topLevelResolver.parent.logger.Infof("EDS watch started on %v", nameToWatch)
228	cancel := xdsc.WatchEndpoints(nameToWatch, func(update xdsresource.EndpointsUpdate, err error) {
229		topLevelResolver.mu.Lock()
230		defer topLevelResolver.mu.Unlock()
231		if err != nil {
232			select {
233			case <-topLevelResolver.updateChannel:
234			default:
235			}
236			topLevelResolver.updateChannel <- &resourceUpdate{err: err}
237			return
238		}
239		ret.update = update
240		ret.updateReceived = true
241		topLevelResolver.generate()
242	})
243	ret.cancel = func() {
244		topLevelResolver.parent.logger.Infof("EDS watch canceled on %v", nameToWatch)
245		cancel()
246	}
247	return ret
248}
249