1/* 2 * 3 * Copyright 2021 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package clusterresolver 20 21import ( 22 "sync" 23 24 "google.golang.org/grpc/xds/internal/xdsclient" 25) 26 27// resourceUpdate is a combined update from all the resources, in the order of 28// priority. For example, it can be {EDS, EDS, DNS}. 29type resourceUpdate struct { 30 priorities []priorityConfig 31 err error 32} 33 34type discoveryMechanism interface { 35 lastUpdate() (interface{}, bool) 36 resolveNow() 37 stop() 38} 39 40// discoveryMechanismKey is {type+resource_name}, it's used as the map key, so 41// that the same resource resolver can be reused (e.g. when there are two 42// mechanisms, both for the same EDS resource, but has different circuit 43// breaking config. 44type discoveryMechanismKey struct { 45 typ DiscoveryMechanismType 46 name string 47} 48 49// resolverMechanismTuple is needed to keep the resolver and the discovery 50// mechanism together, because resolvers can be shared. And we need the 51// mechanism for fields like circuit breaking, LRS etc when generating the 52// balancer config. 53type resolverMechanismTuple struct { 54 dm DiscoveryMechanism 55 dmKey discoveryMechanismKey 56 r discoveryMechanism 57} 58 59type resourceResolver struct { 60 parent *clusterResolverBalancer 61 updateChannel chan *resourceUpdate 62 63 // mu protects the slice and map, and content of the resolvers in the slice. 64 mu sync.Mutex 65 mechanisms []DiscoveryMechanism 66 children []resolverMechanismTuple 67 childrenMap map[discoveryMechanismKey]discoveryMechanism 68} 69 70func newResourceResolver(parent *clusterResolverBalancer) *resourceResolver { 71 return &resourceResolver{ 72 parent: parent, 73 updateChannel: make(chan *resourceUpdate, 1), 74 childrenMap: make(map[discoveryMechanismKey]discoveryMechanism), 75 } 76} 77 78func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool { 79 if len(a) != len(b) { 80 return false 81 } 82 for i, aa := range a { 83 bb := b[i] 84 if !aa.Equal(bb) { 85 return false 86 } 87 } 88 return true 89} 90 91func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) { 92 rr.mu.Lock() 93 defer rr.mu.Unlock() 94 if equalDiscoveryMechanisms(rr.mechanisms, mechanisms) { 95 return 96 } 97 rr.mechanisms = mechanisms 98 rr.children = make([]resolverMechanismTuple, len(mechanisms)) 99 newDMs := make(map[discoveryMechanismKey]bool) 100 101 // Start one watch for each new discover mechanism {type+resource_name}. 102 for i, dm := range mechanisms { 103 switch dm.Type { 104 case DiscoveryMechanismTypeEDS: 105 // If EDSServiceName is not set, use the cluster name as EDS service 106 // name to watch. 107 nameToWatch := dm.EDSServiceName 108 if nameToWatch == "" { 109 nameToWatch = dm.Cluster 110 } 111 dmKey := discoveryMechanismKey{typ: dm.Type, name: nameToWatch} 112 newDMs[dmKey] = true 113 114 r := rr.childrenMap[dmKey] 115 if r == nil { 116 r = newEDSResolver(nameToWatch, rr.parent.xdsClient, rr) 117 rr.childrenMap[dmKey] = r 118 } 119 rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r} 120 case DiscoveryMechanismTypeLogicalDNS: 121 // Name to resolve in DNS is the hostname, not the ClientConn 122 // target. 123 dmKey := discoveryMechanismKey{typ: dm.Type, name: dm.DNSHostname} 124 newDMs[dmKey] = true 125 126 r := rr.childrenMap[dmKey] 127 if r == nil { 128 r = newDNSResolver(dm.DNSHostname, rr) 129 rr.childrenMap[dmKey] = r 130 } 131 rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r} 132 } 133 } 134 // Stop the resources that were removed. 135 for dm, r := range rr.childrenMap { 136 if !newDMs[dm] { 137 delete(rr.childrenMap, dm) 138 r.stop() 139 } 140 } 141 // Regenerate even if there's no change in discovery mechanism, in case 142 // priority order changed. 143 rr.generate() 144} 145 146// resolveNow is typically called to trigger re-resolve of DNS. The EDS 147// resolveNow() is a noop. 148func (rr *resourceResolver) resolveNow() { 149 rr.mu.Lock() 150 defer rr.mu.Unlock() 151 for _, r := range rr.childrenMap { 152 r.resolveNow() 153 } 154} 155 156func (rr *resourceResolver) stop() { 157 rr.mu.Lock() 158 defer rr.mu.Unlock() 159 for dm, r := range rr.childrenMap { 160 delete(rr.childrenMap, dm) 161 r.stop() 162 } 163 rr.mechanisms = nil 164 rr.children = nil 165} 166 167// generate collects all the updates from all the resolvers, and push the 168// combined result into the update channel. It only pushes the update when all 169// the child resolvers have received at least one update, otherwise it will 170// wait. 171// 172// caller must hold rr.mu. 173func (rr *resourceResolver) generate() { 174 var ret []priorityConfig 175 for _, rDM := range rr.children { 176 r, ok := rr.childrenMap[rDM.dmKey] 177 if !ok { 178 rr.parent.logger.Infof("resolver for %+v not found, should never happen", rDM.dmKey) 179 continue 180 } 181 182 u, ok := r.lastUpdate() 183 if !ok { 184 // Don't send updates to parent until all resolvers have update to 185 // send. 186 return 187 } 188 switch uu := u.(type) { 189 case xdsclient.EndpointsUpdate: 190 ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu}) 191 case []string: 192 ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu}) 193 } 194 } 195 select { 196 case <-rr.updateChannel: 197 default: 198 } 199 rr.updateChannel <- &resourceUpdate{priorities: ret} 200} 201 202type edsDiscoveryMechanism struct { 203 cancel func() 204 205 update xdsclient.EndpointsUpdate 206 updateReceived bool 207} 208 209func (er *edsDiscoveryMechanism) lastUpdate() (interface{}, bool) { 210 if !er.updateReceived { 211 return nil, false 212 } 213 return er.update, true 214} 215 216func (er *edsDiscoveryMechanism) resolveNow() { 217} 218 219func (er *edsDiscoveryMechanism) stop() { 220 er.cancel() 221} 222 223// newEDSResolver starts the EDS watch on the given xds client. 224func newEDSResolver(nameToWatch string, xdsc xdsclient.XDSClient, topLevelResolver *resourceResolver) *edsDiscoveryMechanism { 225 ret := &edsDiscoveryMechanism{} 226 topLevelResolver.parent.logger.Infof("EDS watch started on %v", nameToWatch) 227 cancel := xdsc.WatchEndpoints(nameToWatch, func(update xdsclient.EndpointsUpdate, err error) { 228 topLevelResolver.mu.Lock() 229 defer topLevelResolver.mu.Unlock() 230 if err != nil { 231 select { 232 case <-topLevelResolver.updateChannel: 233 default: 234 } 235 topLevelResolver.updateChannel <- &resourceUpdate{err: err} 236 return 237 } 238 ret.update = update 239 ret.updateReceived = true 240 topLevelResolver.generate() 241 }) 242 ret.cancel = func() { 243 topLevelResolver.parent.logger.Infof("EDS watch canceled on %v", nameToWatch) 244 cancel() 245 } 246 return ret 247} 248