1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"io"
23	"sync"
24	"sync/atomic"
25
26	"golang.org/x/net/context"
27	"google.golang.org/grpc/balancer"
28	"google.golang.org/grpc/channelz"
29	"google.golang.org/grpc/codes"
30	"google.golang.org/grpc/grpclog"
31	"google.golang.org/grpc/metadata"
32	"google.golang.org/grpc/resolver"
33	"google.golang.org/grpc/status"
34	"google.golang.org/grpc/transport"
35)
36
37// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
38// actions and unblock when there's a picker update.
39type pickerWrapper struct {
40	mu         sync.Mutex
41	done       bool
42	blockingCh chan struct{}
43	picker     balancer.Picker
44
45	// The latest connection happened.
46	connErrMu sync.Mutex
47	connErr   error
48
49	stickinessMDKey atomic.Value
50	stickiness      *stickyStore
51}
52
53func newPickerWrapper() *pickerWrapper {
54	bp := &pickerWrapper{
55		blockingCh: make(chan struct{}),
56		stickiness: newStickyStore(),
57	}
58	return bp
59}
60
61func (bp *pickerWrapper) updateConnectionError(err error) {
62	bp.connErrMu.Lock()
63	bp.connErr = err
64	bp.connErrMu.Unlock()
65}
66
67func (bp *pickerWrapper) connectionError() error {
68	bp.connErrMu.Lock()
69	err := bp.connErr
70	bp.connErrMu.Unlock()
71	return err
72}
73
74func (bp *pickerWrapper) updateStickinessMDKey(newKey string) {
75	// No need to check ok because mdKey == "" if ok == false.
76	if oldKey, _ := bp.stickinessMDKey.Load().(string); oldKey != newKey {
77		bp.stickinessMDKey.Store(newKey)
78		bp.stickiness.reset(newKey)
79	}
80}
81
82func (bp *pickerWrapper) getStickinessMDKey() string {
83	// No need to check ok because mdKey == "" if ok == false.
84	mdKey, _ := bp.stickinessMDKey.Load().(string)
85	return mdKey
86}
87
88func (bp *pickerWrapper) clearStickinessState() {
89	if oldKey := bp.getStickinessMDKey(); oldKey != "" {
90		// There's no need to reset store if mdKey was "".
91		bp.stickiness.reset(oldKey)
92	}
93}
94
95// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
96func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
97	bp.mu.Lock()
98	if bp.done {
99		bp.mu.Unlock()
100		return
101	}
102	bp.picker = p
103	// bp.blockingCh should never be nil.
104	close(bp.blockingCh)
105	bp.blockingCh = make(chan struct{})
106	bp.mu.Unlock()
107}
108
109func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
110	acw.mu.Lock()
111	ac := acw.ac
112	acw.mu.Unlock()
113	ac.incrCallsStarted()
114	return func(b balancer.DoneInfo) {
115		if b.Err != nil && b.Err != io.EOF {
116			ac.incrCallsFailed()
117		} else {
118			ac.incrCallsSucceeded()
119		}
120		if done != nil {
121			done(b)
122		}
123	}
124}
125
126// pick returns the transport that will be used for the RPC.
127// It may block in the following cases:
128// - there's no picker
129// - the current picker returns ErrNoSubConnAvailable
130// - the current picker returns other errors and failfast is false.
131// - the subConn returned by the current picker is not READY
132// When one of these situations happens, pick blocks until the picker gets updated.
133func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
134
135	mdKey := bp.getStickinessMDKey()
136	stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey)
137
138	// Potential race here: if stickinessMDKey is updated after the above two
139	// lines, and this pick is a sticky pick, the following put could add an
140	// entry to sticky store with an outdated sticky key.
141	//
142	// The solution: keep the current md key in sticky store, and at the
143	// beginning of each get/put, check the mdkey against store.curMDKey.
144	//  - Cons: one more string comparing for each get/put.
145	//  - Pros: the string matching happens inside get/put, so the overhead for
146	//  non-sticky RPCs will be minimal.
147
148	if isSticky {
149		if t, ok := bp.stickiness.get(mdKey, stickyKey); ok {
150			// Done function returned is always nil.
151			return t, nil, nil
152		}
153	}
154
155	var (
156		p  balancer.Picker
157		ch chan struct{}
158	)
159
160	for {
161		bp.mu.Lock()
162		if bp.done {
163			bp.mu.Unlock()
164			return nil, nil, ErrClientConnClosing
165		}
166
167		if bp.picker == nil {
168			ch = bp.blockingCh
169		}
170		if ch == bp.blockingCh {
171			// This could happen when either:
172			// - bp.picker is nil (the previous if condition), or
173			// - has called pick on the current picker.
174			bp.mu.Unlock()
175			select {
176			case <-ctx.Done():
177				return nil, nil, ctx.Err()
178			case <-ch:
179			}
180			continue
181		}
182
183		ch = bp.blockingCh
184		p = bp.picker
185		bp.mu.Unlock()
186
187		subConn, done, err := p.Pick(ctx, opts)
188
189		if err != nil {
190			switch err {
191			case balancer.ErrNoSubConnAvailable:
192				continue
193			case balancer.ErrTransientFailure:
194				if !failfast {
195					continue
196				}
197				return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
198			default:
199				// err is some other error.
200				return nil, nil, toRPCErr(err)
201			}
202		}
203
204		acw, ok := subConn.(*acBalancerWrapper)
205		if !ok {
206			grpclog.Infof("subconn returned from pick is not *acBalancerWrapper")
207			continue
208		}
209		if t, ok := acw.getAddrConn().getReadyTransport(); ok {
210			if isSticky {
211				bp.stickiness.put(mdKey, stickyKey, acw)
212			}
213			if channelz.IsOn() {
214				return t, doneChannelzWrapper(acw, done), nil
215			}
216			return t, done, nil
217		}
218		grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
219		// If ok == false, ac.state is not READY.
220		// A valid picker always returns READY subConn. This means the state of ac
221		// just changed, and picker will be updated shortly.
222		// continue back to the beginning of the for loop to repick.
223	}
224}
225
226func (bp *pickerWrapper) close() {
227	bp.mu.Lock()
228	defer bp.mu.Unlock()
229	if bp.done {
230		return
231	}
232	bp.done = true
233	close(bp.blockingCh)
234}
235
236type stickyStoreEntry struct {
237	acw  *acBalancerWrapper
238	addr resolver.Address
239}
240
241type stickyStore struct {
242	mu sync.Mutex
243	// curMDKey is check before every get/put to avoid races. The operation will
244	// abort immediately when the given mdKey is different from the curMDKey.
245	curMDKey string
246	store    map[string]*stickyStoreEntry
247}
248
249func newStickyStore() *stickyStore {
250	return &stickyStore{
251		store: make(map[string]*stickyStoreEntry),
252	}
253}
254
255// reset clears the map in stickyStore, and set the currentMDKey to newMDKey.
256func (ss *stickyStore) reset(newMDKey string) {
257	ss.mu.Lock()
258	ss.curMDKey = newMDKey
259	ss.store = make(map[string]*stickyStoreEntry)
260	ss.mu.Unlock()
261}
262
263// stickyKey is the key to look up in store. mdKey will be checked against
264// curMDKey to avoid races.
265func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) {
266	ss.mu.Lock()
267	defer ss.mu.Unlock()
268	if mdKey != ss.curMDKey {
269		return
270	}
271	// TODO(stickiness): limit the total number of entries.
272	ss.store[stickyKey] = &stickyStoreEntry{
273		acw:  acw,
274		addr: acw.getAddrConn().getCurAddr(),
275	}
276}
277
278// stickyKey is the key to look up in store. mdKey will be checked against
279// curMDKey to avoid races.
280func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) {
281	ss.mu.Lock()
282	defer ss.mu.Unlock()
283	if mdKey != ss.curMDKey {
284		return nil, false
285	}
286	entry, ok := ss.store[stickyKey]
287	if !ok {
288		return nil, false
289	}
290	ac := entry.acw.getAddrConn()
291	if ac.getCurAddr() != entry.addr {
292		delete(ss.store, stickyKey)
293		return nil, false
294	}
295	t, ok := ac.getReadyTransport()
296	if !ok {
297		delete(ss.store, stickyKey)
298		return nil, false
299	}
300	return t, true
301}
302
303// Get one value from metadata in ctx with key stickinessMDKey.
304//
305// It returns "", false if stickinessMDKey is an empty string.
306func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) {
307	if stickinessMDKey == "" {
308		return "", false
309	}
310
311	md, added, ok := metadata.FromOutgoingContextRaw(ctx)
312	if !ok {
313		return "", false
314	}
315
316	if vv, ok := md[stickinessMDKey]; ok {
317		if len(vv) > 0 {
318			return vv[0], true
319		}
320	}
321
322	for _, ss := range added {
323		for i := 0; i < len(ss)-1; i += 2 {
324			if ss[i] == stickinessMDKey {
325				return ss[i+1], true
326			}
327		}
328	}
329
330	return "", false
331}
332