1package resolver 2 3import ( 4 "fmt" 5 "math/rand" 6 "strings" 7 "sync" 8 "time" 9 10 "google.golang.org/grpc/resolver" 11 12 "github.com/hashicorp/consul/agent/metadata" 13) 14 15// ServerResolverBuilder tracks the current server list and keeps any 16// ServerResolvers updated when changes occur. 17type ServerResolverBuilder struct { 18 cfg Config 19 // servers is an index of Servers by Server.ID. The map contains server IDs 20 // for all datacenters. 21 servers map[string]*metadata.Server 22 // resolvers is an index of connections to the serverResolver which manages 23 // addresses of servers for that connection. 24 resolvers map[resolver.ClientConn]*serverResolver 25 // lock for servers and resolvers. 26 lock sync.RWMutex 27} 28 29type Config struct { 30 // Authority used to query the server. Defaults to "". Used to support 31 // parallel testing because gRPC registers resolvers globally. 32 Authority string 33} 34 35func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder { 36 return &ServerResolverBuilder{ 37 cfg: cfg, 38 servers: make(map[string]*metadata.Server), 39 resolvers: make(map[resolver.ClientConn]*serverResolver), 40 } 41} 42 43// NewRebalancer returns a function which shuffles the server list for resolvers 44// in all datacenters. 45func (s *ServerResolverBuilder) NewRebalancer(dc string) func() { 46 shuffler := rand.New(rand.NewSource(time.Now().UnixNano())) 47 return func() { 48 s.lock.RLock() 49 defer s.lock.RUnlock() 50 51 for _, resolver := range s.resolvers { 52 if resolver.datacenter != dc { 53 continue 54 } 55 // Shuffle the list of addresses using the last list given to the resolver. 56 resolver.addrLock.Lock() 57 addrs := resolver.addrs 58 shuffler.Shuffle(len(addrs), func(i, j int) { 59 addrs[i], addrs[j] = addrs[j], addrs[i] 60 }) 61 // Pass the shuffled list to the resolver. 62 resolver.updateAddrsLocked(addrs) 63 resolver.addrLock.Unlock() 64 } 65 } 66} 67 68// ServerForGlobalAddr returns server metadata for a server with the specified globally unique address. 69func (s *ServerResolverBuilder) ServerForGlobalAddr(globalAddr string) (*metadata.Server, error) { 70 s.lock.RLock() 71 defer s.lock.RUnlock() 72 73 for _, server := range s.servers { 74 if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr { 75 return server, nil 76 } 77 } 78 return nil, fmt.Errorf("failed to find Consul server for global address %q", globalAddr) 79} 80 81// Build returns a new serverResolver for the given ClientConn. The resolver 82// will keep the ClientConn's state updated based on updates from Serf. 83func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) { 84 s.lock.Lock() 85 defer s.lock.Unlock() 86 87 // If there's already a resolver for this connection, return it. 88 // TODO(streaming): how would this happen since we already cache connections in ClientConnPool? 89 if resolver, ok := s.resolvers[cc]; ok { 90 return resolver, nil 91 } 92 93 // Make a new resolver for the dc and add it to the list of active ones. 94 datacenter := strings.TrimPrefix(target.Endpoint, "server.") 95 resolver := &serverResolver{ 96 datacenter: datacenter, 97 clientConn: cc, 98 close: func() { 99 s.lock.Lock() 100 defer s.lock.Unlock() 101 delete(s.resolvers, cc) 102 }, 103 } 104 resolver.updateAddrs(s.getDCAddrs(datacenter)) 105 106 s.resolvers[cc] = resolver 107 return resolver, nil 108} 109 110func (s *ServerResolverBuilder) Authority() string { 111 return s.cfg.Authority 112} 113 114// AddServer updates the resolvers' states to include the new server's address. 115func (s *ServerResolverBuilder) AddServer(server *metadata.Server) { 116 s.lock.Lock() 117 defer s.lock.Unlock() 118 119 s.servers[uniqueID(server)] = server 120 121 addrs := s.getDCAddrs(server.Datacenter) 122 for _, resolver := range s.resolvers { 123 if resolver.datacenter == server.Datacenter { 124 resolver.updateAddrs(addrs) 125 } 126 } 127} 128 129// uniqueID returns a unique identifier for the server which includes the 130// Datacenter and the ID. 131// 132// In practice it is expected that the server.ID is already a globally unique 133// UUID. This function is an extra safeguard in case that ever changes. 134func uniqueID(server *metadata.Server) string { 135 return server.Datacenter + "-" + server.ID 136} 137 138// DCPrefix prefixes the given string with a datacenter for use in 139// disambiguation. 140func DCPrefix(datacenter, suffix string) string { 141 return datacenter + "-" + suffix 142} 143 144// RemoveServer updates the resolvers' states with the given server removed. 145func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) { 146 s.lock.Lock() 147 defer s.lock.Unlock() 148 149 delete(s.servers, uniqueID(server)) 150 151 addrs := s.getDCAddrs(server.Datacenter) 152 for _, resolver := range s.resolvers { 153 if resolver.datacenter == server.Datacenter { 154 resolver.updateAddrs(addrs) 155 } 156 } 157} 158 159// getDCAddrs returns a list of the server addresses for the given datacenter. 160// This method requires that lock is held for reads. 161func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address { 162 var addrs []resolver.Address 163 for _, server := range s.servers { 164 if server.Datacenter != dc { 165 continue 166 } 167 168 addrs = append(addrs, resolver.Address{ 169 // NOTE: the address persisted here is only dialable using our custom dialer 170 Addr: DCPrefix(server.Datacenter, server.Addr.String()), 171 Type: resolver.Backend, 172 ServerName: server.Name, 173 }) 174 } 175 return addrs 176} 177 178// serverResolver is a grpc Resolver that will keep a grpc.ClientConn up to date 179// on the list of server addresses to use. 180type serverResolver struct { 181 // datacenter that can be reached by the clientConn. Used by ServerResolverBuilder 182 // to filter resolvers for those in a specific datacenter. 183 datacenter string 184 185 // clientConn that this resolver is providing addresses for. 186 clientConn resolver.ClientConn 187 188 // close is used by ServerResolverBuilder to remove this resolver from the 189 // index of resolvers. It is called by grpc when the connection is closed. 190 close func() 191 192 // addrs stores the list of addresses passed to updateAddrs, so that they 193 // can be rebalanced periodically by ServerResolverBuilder. 194 addrs []resolver.Address 195 addrLock sync.Mutex 196} 197 198var _ resolver.Resolver = (*serverResolver)(nil) 199 200// updateAddrs updates this serverResolver's ClientConn to use the given set of 201// addrs. 202func (r *serverResolver) updateAddrs(addrs []resolver.Address) { 203 r.addrLock.Lock() 204 defer r.addrLock.Unlock() 205 r.updateAddrsLocked(addrs) 206} 207 208// updateAddrsLocked updates this serverResolver's ClientConn to use the given 209// set of addrs. addrLock must be held by caller. 210func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) { 211 // Only pass the first address initially, which will cause the 212 // balancer to spin down the connection for its previous first address 213 // if it is different. If we don't do this, it will keep using the old 214 // first address as long as it is still in the list, making it impossible to 215 // rebalance until that address is removed. 216 var firstAddr []resolver.Address 217 if len(addrs) > 0 { 218 firstAddr = []resolver.Address{addrs[0]} 219 } 220 r.clientConn.UpdateState(resolver.State{Addresses: firstAddr}) 221 222 // Call UpdateState again with the entire list of addrs in case we need them 223 // for failover. 224 r.clientConn.UpdateState(resolver.State{Addresses: addrs}) 225 226 r.addrs = addrs 227} 228 229func (r *serverResolver) Close() { 230 r.close() 231} 232 233// ResolveNow is not used 234func (*serverResolver) ResolveNow(_ resolver.ResolveNowOption) {} 235