1package resolver
2
3import (
4	"fmt"
5	"math/rand"
6	"strings"
7	"sync"
8	"time"
9
10	"google.golang.org/grpc/resolver"
11
12	"github.com/hashicorp/consul/agent/metadata"
13)
14
15// ServerResolverBuilder tracks the current server list and keeps any
16// ServerResolvers updated when changes occur.
17type ServerResolverBuilder struct {
18	cfg Config
19	// servers is an index of Servers by Server.ID. The map contains server IDs
20	// for all datacenters.
21	servers map[string]*metadata.Server
22	// resolvers is an index of connections to the serverResolver which manages
23	// addresses of servers for that connection.
24	resolvers map[resolver.ClientConn]*serverResolver
25	// lock for servers and resolvers.
26	lock sync.RWMutex
27}
28
29type Config struct {
30	// Authority used to query the server. Defaults to "". Used to support
31	// parallel testing because gRPC registers resolvers globally.
32	Authority string
33}
34
35func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder {
36	return &ServerResolverBuilder{
37		cfg:       cfg,
38		servers:   make(map[string]*metadata.Server),
39		resolvers: make(map[resolver.ClientConn]*serverResolver),
40	}
41}
42
43// NewRebalancer returns a function which shuffles the server list for resolvers
44// in all datacenters.
45func (s *ServerResolverBuilder) NewRebalancer(dc string) func() {
46	shuffler := rand.New(rand.NewSource(time.Now().UnixNano()))
47	return func() {
48		s.lock.RLock()
49		defer s.lock.RUnlock()
50
51		for _, resolver := range s.resolvers {
52			if resolver.datacenter != dc {
53				continue
54			}
55			// Shuffle the list of addresses using the last list given to the resolver.
56			resolver.addrLock.Lock()
57			addrs := resolver.addrs
58			shuffler.Shuffle(len(addrs), func(i, j int) {
59				addrs[i], addrs[j] = addrs[j], addrs[i]
60			})
61			// Pass the shuffled list to the resolver.
62			resolver.updateAddrsLocked(addrs)
63			resolver.addrLock.Unlock()
64		}
65	}
66}
67
68// ServerForGlobalAddr returns server metadata for a server with the specified globally unique address.
69func (s *ServerResolverBuilder) ServerForGlobalAddr(globalAddr string) (*metadata.Server, error) {
70	s.lock.RLock()
71	defer s.lock.RUnlock()
72
73	for _, server := range s.servers {
74		if DCPrefix(server.Datacenter, server.Addr.String()) == globalAddr {
75			return server, nil
76		}
77	}
78	return nil, fmt.Errorf("failed to find Consul server for global address %q", globalAddr)
79}
80
81// Build returns a new serverResolver for the given ClientConn. The resolver
82// will keep the ClientConn's state updated based on updates from Serf.
83func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
84	s.lock.Lock()
85	defer s.lock.Unlock()
86
87	// If there's already a resolver for this connection, return it.
88	// TODO(streaming): how would this happen since we already cache connections in ClientConnPool?
89	if resolver, ok := s.resolvers[cc]; ok {
90		return resolver, nil
91	}
92
93	// Make a new resolver for the dc and add it to the list of active ones.
94	datacenter := strings.TrimPrefix(target.Endpoint, "server.")
95	resolver := &serverResolver{
96		datacenter: datacenter,
97		clientConn: cc,
98		close: func() {
99			s.lock.Lock()
100			defer s.lock.Unlock()
101			delete(s.resolvers, cc)
102		},
103	}
104	resolver.updateAddrs(s.getDCAddrs(datacenter))
105
106	s.resolvers[cc] = resolver
107	return resolver, nil
108}
109
110func (s *ServerResolverBuilder) Authority() string {
111	return s.cfg.Authority
112}
113
114// AddServer updates the resolvers' states to include the new server's address.
115func (s *ServerResolverBuilder) AddServer(server *metadata.Server) {
116	s.lock.Lock()
117	defer s.lock.Unlock()
118
119	s.servers[uniqueID(server)] = server
120
121	addrs := s.getDCAddrs(server.Datacenter)
122	for _, resolver := range s.resolvers {
123		if resolver.datacenter == server.Datacenter {
124			resolver.updateAddrs(addrs)
125		}
126	}
127}
128
129// uniqueID returns a unique identifier for the server which includes the
130// Datacenter and the ID.
131//
132// In practice it is expected that the server.ID is already a globally unique
133// UUID. This function is an extra safeguard in case that ever changes.
134func uniqueID(server *metadata.Server) string {
135	return server.Datacenter + "-" + server.ID
136}
137
138// DCPrefix prefixes the given string with a datacenter for use in
139// disambiguation.
140func DCPrefix(datacenter, suffix string) string {
141	return datacenter + "-" + suffix
142}
143
144// RemoveServer updates the resolvers' states with the given server removed.
145func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) {
146	s.lock.Lock()
147	defer s.lock.Unlock()
148
149	delete(s.servers, uniqueID(server))
150
151	addrs := s.getDCAddrs(server.Datacenter)
152	for _, resolver := range s.resolvers {
153		if resolver.datacenter == server.Datacenter {
154			resolver.updateAddrs(addrs)
155		}
156	}
157}
158
159// getDCAddrs returns a list of the server addresses for the given datacenter.
160// This method requires that lock is held for reads.
161func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address {
162	var addrs []resolver.Address
163	for _, server := range s.servers {
164		if server.Datacenter != dc {
165			continue
166		}
167
168		addrs = append(addrs, resolver.Address{
169			// NOTE: the address persisted here is only dialable using our custom dialer
170			Addr:       DCPrefix(server.Datacenter, server.Addr.String()),
171			Type:       resolver.Backend,
172			ServerName: server.Name,
173		})
174	}
175	return addrs
176}
177
178// serverResolver is a grpc Resolver that will keep a grpc.ClientConn up to date
179// on the list of server addresses to use.
180type serverResolver struct {
181	// datacenter that can be reached by the clientConn. Used by ServerResolverBuilder
182	// to filter resolvers for those in a specific datacenter.
183	datacenter string
184
185	// clientConn that this resolver is providing addresses for.
186	clientConn resolver.ClientConn
187
188	// close is used by ServerResolverBuilder to remove this resolver from the
189	// index of resolvers. It is called by grpc when the connection is closed.
190	close func()
191
192	// addrs stores the list of addresses passed to updateAddrs, so that they
193	// can be rebalanced periodically by ServerResolverBuilder.
194	addrs    []resolver.Address
195	addrLock sync.Mutex
196}
197
198var _ resolver.Resolver = (*serverResolver)(nil)
199
200// updateAddrs updates this serverResolver's ClientConn to use the given set of
201// addrs.
202func (r *serverResolver) updateAddrs(addrs []resolver.Address) {
203	r.addrLock.Lock()
204	defer r.addrLock.Unlock()
205	r.updateAddrsLocked(addrs)
206}
207
208// updateAddrsLocked updates this serverResolver's ClientConn to use the given
209// set of addrs. addrLock must be held by caller.
210func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) {
211	// Only pass the first address initially, which will cause the
212	// balancer to spin down the connection for its previous first address
213	// if it is different. If we don't do this, it will keep using the old
214	// first address as long as it is still in the list, making it impossible to
215	// rebalance until that address is removed.
216	var firstAddr []resolver.Address
217	if len(addrs) > 0 {
218		firstAddr = []resolver.Address{addrs[0]}
219	}
220	r.clientConn.UpdateState(resolver.State{Addresses: firstAddr})
221
222	// Call UpdateState again with the entire list of addrs in case we need them
223	// for failover.
224	r.clientConn.UpdateState(resolver.State{Addresses: addrs})
225
226	r.addrs = addrs
227}
228
229func (r *serverResolver) Close() {
230	r.close()
231}
232
233// ResolveNow is not used
234func (*serverResolver) ResolveNow(_ resolver.ResolveNowOption) {}
235