1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package balancer
16
17import (
18	"fmt"
19	"strconv"
20	"sync"
21	"time"
22
23	"go.etcd.io/etcd/clientv3/balancer/picker"
24
25	"go.uber.org/zap"
26	"google.golang.org/grpc/balancer"
27	"google.golang.org/grpc/connectivity"
28	"google.golang.org/grpc/resolver"
29	_ "google.golang.org/grpc/resolver/dns"         // register DNS resolver
30	_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
31)
32
33// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
34// must be invoked at initialization time.
35func RegisterBuilder(cfg Config) {
36	bb := &builder{cfg}
37	balancer.Register(bb)
38
39	bb.cfg.Logger.Debug(
40		"registered balancer",
41		zap.String("policy", bb.cfg.Policy.String()),
42		zap.String("name", bb.cfg.Name),
43	)
44}
45
46type builder struct {
47	cfg Config
48}
49
50// Build is called initially when creating "ccBalancerWrapper".
51// "grpc.Dial" is called to this client connection.
52// Then, resolved addresses will be handled via "HandleResolvedAddrs".
53func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
54	bb := &baseBalancer{
55		id:     strconv.FormatInt(time.Now().UnixNano(), 36),
56		policy: b.cfg.Policy,
57		name:   b.cfg.Name,
58		lg:     b.cfg.Logger,
59
60		addrToSc: make(map[resolver.Address]balancer.SubConn),
61		scToAddr: make(map[balancer.SubConn]resolver.Address),
62		scToSt:   make(map[balancer.SubConn]connectivity.State),
63
64		currentConn: nil,
65		csEvltr:     &connectivityStateEvaluator{},
66
67		// initialize picker always returns "ErrNoSubConnAvailable"
68		Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
69	}
70	if bb.lg == nil {
71		bb.lg = zap.NewNop()
72	}
73
74	// TODO: support multiple connections
75	bb.mu.Lock()
76	bb.currentConn = cc
77	bb.mu.Unlock()
78
79	bb.lg.Info(
80		"built balancer",
81		zap.String("balancer-id", bb.id),
82		zap.String("policy", bb.policy.String()),
83		zap.String("resolver-target", cc.Target()),
84	)
85	return bb
86}
87
88// Name implements "grpc/balancer.Builder" interface.
89func (b *builder) Name() string { return b.cfg.Name }
90
91// Balancer defines client balancer interface.
92type Balancer interface {
93	// Balancer is called on specified client connection. Client initiates gRPC
94	// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
95	// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
96	// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
97	// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
98	// changes, thus requires failover logic in this method.
99	balancer.Balancer
100
101	// Picker calls "Pick" for every client request.
102	picker.Picker
103}
104
105type baseBalancer struct {
106	id     string
107	policy picker.Policy
108	name   string
109	lg     *zap.Logger
110
111	mu sync.RWMutex
112
113	addrToSc map[resolver.Address]balancer.SubConn
114	scToAddr map[balancer.SubConn]resolver.Address
115	scToSt   map[balancer.SubConn]connectivity.State
116
117	currentConn  balancer.ClientConn
118	currentState connectivity.State
119	csEvltr      *connectivityStateEvaluator
120
121	picker.Picker
122}
123
124// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
125// gRPC sends initial or updated resolved addresses from "Build".
126func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
127	if err != nil {
128		bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
129		return
130	}
131	bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
132
133	bb.mu.Lock()
134	defer bb.mu.Unlock()
135
136	resolved := make(map[resolver.Address]struct{})
137	for _, addr := range addrs {
138		resolved[addr] = struct{}{}
139		if _, ok := bb.addrToSc[addr]; !ok {
140			sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
141			if err != nil {
142				bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
143				continue
144			}
145			bb.addrToSc[addr] = sc
146			bb.scToAddr[sc] = addr
147			bb.scToSt[sc] = connectivity.Idle
148			sc.Connect()
149		}
150	}
151
152	for addr, sc := range bb.addrToSc {
153		if _, ok := resolved[addr]; !ok {
154			// was removed by resolver or failed to create subconn
155			bb.currentConn.RemoveSubConn(sc)
156			delete(bb.addrToSc, addr)
157
158			bb.lg.Info(
159				"removed subconn",
160				zap.String("balancer-id", bb.id),
161				zap.String("address", addr.Addr),
162				zap.String("subconn", scToString(sc)),
163			)
164
165			// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
166			// The entry will be deleted in HandleSubConnStateChange.
167			// (DO NOT) delete(bb.scToAddr, sc)
168			// (DO NOT) delete(bb.scToSt, sc)
169		}
170	}
171}
172
173// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
174func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
175	bb.mu.Lock()
176	defer bb.mu.Unlock()
177
178	old, ok := bb.scToSt[sc]
179	if !ok {
180		bb.lg.Warn(
181			"state change for an unknown subconn",
182			zap.String("balancer-id", bb.id),
183			zap.String("subconn", scToString(sc)),
184			zap.String("state", s.String()),
185		)
186		return
187	}
188
189	bb.lg.Info(
190		"state changed",
191		zap.String("balancer-id", bb.id),
192		zap.Bool("connected", s == connectivity.Ready),
193		zap.String("subconn", scToString(sc)),
194		zap.String("address", bb.scToAddr[sc].Addr),
195		zap.String("old-state", old.String()),
196		zap.String("new-state", s.String()),
197	)
198
199	bb.scToSt[sc] = s
200	switch s {
201	case connectivity.Idle:
202		sc.Connect()
203	case connectivity.Shutdown:
204		// When an address was removed by resolver, b called RemoveSubConn but
205		// kept the sc's state in scToSt. Remove state for this sc here.
206		delete(bb.scToAddr, sc)
207		delete(bb.scToSt, sc)
208	}
209
210	oldAggrState := bb.currentState
211	bb.currentState = bb.csEvltr.recordTransition(old, s)
212
213	// Regenerate picker when one of the following happens:
214	//  - this sc became ready from not-ready
215	//  - this sc became not-ready from ready
216	//  - the aggregated state of balancer became TransientFailure from non-TransientFailure
217	//  - the aggregated state of balancer became non-TransientFailure from TransientFailure
218	if (s == connectivity.Ready) != (old == connectivity.Ready) ||
219		(bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
220		bb.regeneratePicker()
221	}
222
223	bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
224	return
225}
226
227func (bb *baseBalancer) regeneratePicker() {
228	if bb.currentState == connectivity.TransientFailure {
229		bb.lg.Info(
230			"generated transient error picker",
231			zap.String("balancer-id", bb.id),
232			zap.String("policy", bb.policy.String()),
233		)
234		bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
235		return
236	}
237
238	// only pass ready subconns to picker
239	scs := make([]balancer.SubConn, 0)
240	addrToSc := make(map[resolver.Address]balancer.SubConn)
241	scToAddr := make(map[balancer.SubConn]resolver.Address)
242	for addr, sc := range bb.addrToSc {
243		if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
244			scs = append(scs, sc)
245			addrToSc[addr] = sc
246			scToAddr[sc] = addr
247		}
248	}
249
250	switch bb.policy {
251	case picker.RoundrobinBalanced:
252		bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
253
254	default:
255		panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
256	}
257
258	bb.lg.Info(
259		"generated picker",
260		zap.String("balancer-id", bb.id),
261		zap.String("policy", bb.policy.String()),
262		zap.Strings("subconn-ready", scsToStrings(addrToSc)),
263		zap.Int("subconn-size", len(addrToSc)),
264	)
265}
266
267// Close implements "grpc/balancer.Balancer" interface.
268// Close is a nop because base balancer doesn't have internal state to clean up,
269// and it doesn't need to call RemoveSubConn for the SubConns.
270func (bb *baseBalancer) Close() {
271	// TODO
272}
273