1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"fmt"
24	"io"
25	"sync"
26
27	"google.golang.org/grpc/balancer"
28	"google.golang.org/grpc/codes"
29	"google.golang.org/grpc/grpclog"
30	"google.golang.org/grpc/internal/channelz"
31	"google.golang.org/grpc/internal/transport"
32	"google.golang.org/grpc/status"
33)
34
35// v2PickerWrapper wraps a balancer.Picker while providing the
36// balancer.V2Picker API.  It requires a pickerWrapper to generate errors
37// including the latest connectionError.  To be deleted when balancer.Picker is
38// updated to the balancer.V2Picker API.
39type v2PickerWrapper struct {
40	picker  balancer.Picker
41	connErr *connErr
42}
43
44func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
45	sc, done, err := v.picker.Pick(info.Ctx, info)
46	if err != nil {
47		if err == balancer.ErrTransientFailure {
48			return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError()))
49		}
50		return balancer.PickResult{}, err
51	}
52	return balancer.PickResult{SubConn: sc, Done: done}, nil
53}
54
55// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
56// actions and unblock when there's a picker update.
57type pickerWrapper struct {
58	mu         sync.Mutex
59	done       bool
60	blockingCh chan struct{}
61	picker     balancer.V2Picker
62
63	// The latest connection error.  TODO: remove when V1 picker is deprecated;
64	// balancer should be responsible for providing the error.
65	*connErr
66}
67
68type connErr struct {
69	mu  sync.Mutex
70	err error
71}
72
73func (c *connErr) updateConnectionError(err error) {
74	c.mu.Lock()
75	c.err = err
76	c.mu.Unlock()
77}
78
79func (c *connErr) connectionError() error {
80	c.mu.Lock()
81	err := c.err
82	c.mu.Unlock()
83	return err
84}
85
86func newPickerWrapper() *pickerWrapper {
87	return &pickerWrapper{blockingCh: make(chan struct{}), connErr: &connErr{}}
88}
89
90// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
91func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
92	pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr})
93}
94
95// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
96func (pw *pickerWrapper) updatePickerV2(p balancer.V2Picker) {
97	pw.mu.Lock()
98	if pw.done {
99		pw.mu.Unlock()
100		return
101	}
102	pw.picker = p
103	// pw.blockingCh should never be nil.
104	close(pw.blockingCh)
105	pw.blockingCh = make(chan struct{})
106	pw.mu.Unlock()
107}
108
109func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
110	acw.mu.Lock()
111	ac := acw.ac
112	acw.mu.Unlock()
113	ac.incrCallsStarted()
114	return func(b balancer.DoneInfo) {
115		if b.Err != nil && b.Err != io.EOF {
116			ac.incrCallsFailed()
117		} else {
118			ac.incrCallsSucceeded()
119		}
120		if done != nil {
121			done(b)
122		}
123	}
124}
125
126// pick returns the transport that will be used for the RPC.
127// It may block in the following cases:
128// - there's no picker
129// - the current picker returns ErrNoSubConnAvailable
130// - the current picker returns other errors and failfast is false.
131// - the subConn returned by the current picker is not READY
132// When one of these situations happens, pick blocks until the picker gets updated.
133func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
134	var ch chan struct{}
135
136	var lastPickErr error
137	for {
138		pw.mu.Lock()
139		if pw.done {
140			pw.mu.Unlock()
141			return nil, nil, ErrClientConnClosing
142		}
143
144		if pw.picker == nil {
145			ch = pw.blockingCh
146		}
147		if ch == pw.blockingCh {
148			// This could happen when either:
149			// - pw.picker is nil (the previous if condition), or
150			// - has called pick on the current picker.
151			pw.mu.Unlock()
152			select {
153			case <-ctx.Done():
154				var errStr string
155				if lastPickErr != nil {
156					errStr = "latest balancer error: " + lastPickErr.Error()
157				} else if connectionErr := pw.connectionError(); connectionErr != nil {
158					errStr = "latest connection error: " + connectionErr.Error()
159				} else {
160					errStr = ctx.Err().Error()
161				}
162				switch ctx.Err() {
163				case context.DeadlineExceeded:
164					return nil, nil, status.Error(codes.DeadlineExceeded, errStr)
165				case context.Canceled:
166					return nil, nil, status.Error(codes.Canceled, errStr)
167				}
168			case <-ch:
169			}
170			continue
171		}
172
173		ch = pw.blockingCh
174		p := pw.picker
175		pw.mu.Unlock()
176
177		pickResult, err := p.Pick(info)
178
179		if err != nil {
180			if err == balancer.ErrNoSubConnAvailable {
181				continue
182			}
183			if tfe, ok := err.(interface{ IsTransientFailure() bool }); ok && tfe.IsTransientFailure() {
184				if !failfast {
185					lastPickErr = err
186					continue
187				}
188				return nil, nil, status.Error(codes.Unavailable, err.Error())
189			}
190			if _, ok := status.FromError(err); ok {
191				return nil, nil, err
192			}
193			// err is some other error.
194			return nil, nil, status.Error(codes.Unknown, err.Error())
195		}
196
197		acw, ok := pickResult.SubConn.(*acBalancerWrapper)
198		if !ok {
199			grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
200			continue
201		}
202		if t, ok := acw.getAddrConn().getReadyTransport(); ok {
203			if channelz.IsOn() {
204				return t, doneChannelzWrapper(acw, pickResult.Done), nil
205			}
206			return t, pickResult.Done, nil
207		}
208		if pickResult.Done != nil {
209			// Calling done with nil error, no bytes sent and no bytes received.
210			// DoneInfo with default value works.
211			pickResult.Done(balancer.DoneInfo{})
212		}
213		grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
214		// If ok == false, ac.state is not READY.
215		// A valid picker always returns READY subConn. This means the state of ac
216		// just changed, and picker will be updated shortly.
217		// continue back to the beginning of the for loop to repick.
218	}
219}
220
221func (pw *pickerWrapper) close() {
222	pw.mu.Lock()
223	defer pw.mu.Unlock()
224	if pw.done {
225		return
226	}
227	pw.done = true
228	close(pw.blockingCh)
229}
230