1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package test
20
21import (
22	"context"
23	"reflect"
24	"testing"
25	"time"
26
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/balancer"
29	"google.golang.org/grpc/connectivity"
30	"google.golang.org/grpc/credentials"
31	"google.golang.org/grpc/grpclog"
32	"google.golang.org/grpc/internal/balancerload"
33	"google.golang.org/grpc/internal/testutils"
34	"google.golang.org/grpc/metadata"
35	"google.golang.org/grpc/resolver"
36	"google.golang.org/grpc/resolver/manual"
37	testpb "google.golang.org/grpc/test/grpc_testing"
38	"google.golang.org/grpc/testdata"
39)
40
41const testBalancerName = "testbalancer"
42
43// testBalancer creates one subconn with the first address from resolved
44// addresses.
45//
46// It's used to test options for NewSubConn are applies correctly.
47type testBalancer struct {
48	cc balancer.ClientConn
49	sc balancer.SubConn
50
51	newSubConnOptions balancer.NewSubConnOptions
52	pickInfos         []balancer.PickInfo
53	doneInfo          []balancer.DoneInfo
54}
55
56func (b *testBalancer) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
57	b.cc = cc
58	return b
59}
60
61func (*testBalancer) Name() string {
62	return testBalancerName
63}
64
65func (b *testBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
66	// Only create a subconn at the first time.
67	if err == nil && b.sc == nil {
68		b.sc, err = b.cc.NewSubConn(addrs, b.newSubConnOptions)
69		if err != nil {
70			grpclog.Errorf("testBalancer: failed to NewSubConn: %v", err)
71			return
72		}
73		b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: &picker{sc: b.sc, bal: b}})
74		b.sc.Connect()
75	}
76}
77
78func (b *testBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
79	grpclog.Infof("testBalancer: HandleSubConnStateChange: %p, %v", sc, s)
80	if b.sc != sc {
81		grpclog.Infof("testBalancer: ignored state change because sc is not recognized")
82		return
83	}
84	if s == connectivity.Shutdown {
85		b.sc = nil
86		return
87	}
88
89	switch s {
90	case connectivity.Ready, connectivity.Idle:
91		b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{sc: sc, bal: b}})
92	case connectivity.Connecting:
93		b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrNoSubConnAvailable, bal: b}})
94	case connectivity.TransientFailure:
95		b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrTransientFailure, bal: b}})
96	}
97}
98
99func (b *testBalancer) Close() {
100}
101
102type picker struct {
103	err error
104	sc  balancer.SubConn
105	bal *testBalancer
106}
107
108func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
109	if p.err != nil {
110		return balancer.PickResult{}, p.err
111	}
112	info.Ctx = nil // Do not validate context.
113	p.bal.pickInfos = append(p.bal.pickInfos, info)
114	return balancer.PickResult{SubConn: p.sc, Done: func(d balancer.DoneInfo) { p.bal.doneInfo = append(p.bal.doneInfo, d) }}, nil
115}
116
117func (s) TestCredsBundleFromBalancer(t *testing.T) {
118	balancer.Register(&testBalancer{
119		newSubConnOptions: balancer.NewSubConnOptions{
120			CredsBundle: &testCredsBundle{},
121		},
122	})
123	te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: ""})
124	te.tapHandle = authHandle
125	te.customDialOptions = []grpc.DialOption{
126		grpc.WithBalancerName(testBalancerName),
127	}
128	creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key"))
129	if err != nil {
130		t.Fatalf("Failed to generate credentials %v", err)
131	}
132	te.customServerOptions = []grpc.ServerOption{
133		grpc.Creds(creds),
134	}
135	te.startServer(&testServer{})
136	defer te.tearDown()
137
138	cc := te.clientConn()
139	tc := testpb.NewTestServiceClient(cc)
140	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
141		t.Fatalf("Test failed. Reason: %v", err)
142	}
143}
144
145func (s) TestDoneInfo(t *testing.T) {
146	for _, e := range listTestEnv() {
147		testDoneInfo(t, e)
148	}
149}
150
151func testDoneInfo(t *testing.T, e env) {
152	te := newTest(t, e)
153	b := &testBalancer{}
154	balancer.Register(b)
155	te.customDialOptions = []grpc.DialOption{
156		grpc.WithBalancerName(testBalancerName),
157	}
158	te.userAgent = failAppUA
159	te.startServer(&testServer{security: e.security})
160	defer te.tearDown()
161
162	cc := te.clientConn()
163	tc := testpb.NewTestServiceClient(cc)
164
165	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
166	defer cancel()
167	wantErr := detailedError
168	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !testutils.StatusErrEqual(err, wantErr) {
169		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
170	}
171	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
172		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
173	}
174
175	if len(b.doneInfo) < 1 || !testutils.StatusErrEqual(b.doneInfo[0].Err, wantErr) {
176		t.Fatalf("b.doneInfo = %v; want b.doneInfo[0].Err = %v", b.doneInfo, wantErr)
177	}
178	if len(b.doneInfo) < 2 || !reflect.DeepEqual(b.doneInfo[1].Trailer, testTrailerMetadata) {
179		t.Fatalf("b.doneInfo = %v; want b.doneInfo[1].Trailer = %v", b.doneInfo, testTrailerMetadata)
180	}
181	if len(b.pickInfos) != len(b.doneInfo) {
182		t.Fatalf("Got %d picks, but %d doneInfo, want equal amount", len(b.pickInfos), len(b.doneInfo))
183	}
184	// To test done() is always called, even if it's returned with a non-Ready
185	// SubConn.
186	//
187	// Stop server and at the same time send RPCs. There are chances that picker
188	// is not updated in time, causing a non-Ready SubConn to be returned.
189	finished := make(chan struct{})
190	go func() {
191		for i := 0; i < 20; i++ {
192			tc.UnaryCall(ctx, &testpb.SimpleRequest{})
193		}
194		close(finished)
195	}()
196	te.srv.Stop()
197	<-finished
198	if len(b.pickInfos) != len(b.doneInfo) {
199		t.Fatalf("Got %d picks, %d doneInfo, want equal amount", len(b.pickInfos), len(b.doneInfo))
200	}
201}
202
203const loadMDKey = "X-Endpoint-Load-Metrics-Bin"
204
205type testLoadParser struct{}
206
207func (*testLoadParser) Parse(md metadata.MD) interface{} {
208	vs := md.Get(loadMDKey)
209	if len(vs) == 0 {
210		return nil
211	}
212	return vs[0]
213}
214
215func init() {
216	balancerload.SetParser(&testLoadParser{})
217}
218
219func (s) TestDoneLoads(t *testing.T) {
220	for _, e := range listTestEnv() {
221		testDoneLoads(t, e)
222	}
223}
224
225func testDoneLoads(t *testing.T, e env) {
226	b := &testBalancer{}
227	balancer.Register(b)
228
229	const testLoad = "test-load-,-should-be-orca"
230
231	ss := &stubServer{
232		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
233			grpc.SetTrailer(ctx, metadata.Pairs(loadMDKey, testLoad))
234			return &testpb.Empty{}, nil
235		},
236	}
237	if err := ss.Start(nil, grpc.WithBalancerName(testBalancerName)); err != nil {
238		t.Fatalf("error starting testing server: %v", err)
239	}
240	defer ss.Stop()
241
242	tc := testpb.NewTestServiceClient(ss.cc)
243
244	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
245	defer cancel()
246	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
247		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, nil)
248	}
249
250	piWant := []balancer.PickInfo{
251		{FullMethodName: "/grpc.testing.TestService/EmptyCall"},
252	}
253	if !reflect.DeepEqual(b.pickInfos, piWant) {
254		t.Fatalf("b.pickInfos = %v; want %v", b.pickInfos, piWant)
255	}
256
257	if len(b.doneInfo) < 1 {
258		t.Fatalf("b.doneInfo = %v, want length 1", b.doneInfo)
259	}
260	gotLoad, _ := b.doneInfo[0].ServerLoad.(string)
261	if gotLoad != testLoad {
262		t.Fatalf("b.doneInfo[0].ServerLoad = %v; want = %v", b.doneInfo[0].ServerLoad, testLoad)
263	}
264}
265
266const testBalancerKeepAddressesName = "testbalancer-keepingaddresses"
267
268// testBalancerKeepAddresses keeps the addresses in the builder instead of
269// creating SubConns.
270//
271// It's used to test the addresses balancer gets are correct.
272type testBalancerKeepAddresses struct {
273	addrsChan chan []resolver.Address
274}
275
276func newTestBalancerKeepAddresses() *testBalancerKeepAddresses {
277	return &testBalancerKeepAddresses{
278		addrsChan: make(chan []resolver.Address, 10),
279	}
280}
281
282func (b *testBalancerKeepAddresses) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
283	return b
284}
285
286func (*testBalancerKeepAddresses) Name() string {
287	return testBalancerKeepAddressesName
288}
289
290func (b *testBalancerKeepAddresses) HandleResolvedAddrs(addrs []resolver.Address, err error) {
291	b.addrsChan <- addrs
292}
293
294func (testBalancerKeepAddresses) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
295	panic("not used")
296}
297
298func (testBalancerKeepAddresses) Close() {
299}
300
301// Make sure that non-grpclb balancers don't get grpclb addresses even if name
302// resolver sends them
303func (s) TestNonGRPCLBBalancerGetsNoGRPCLBAddress(t *testing.T) {
304	r, rcleanup := manual.GenerateAndRegisterManualResolver()
305	defer rcleanup()
306
307	b := newTestBalancerKeepAddresses()
308	balancer.Register(b)
309
310	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(),
311		grpc.WithBalancerName(b.Name()))
312	if err != nil {
313		t.Fatalf("failed to dial: %v", err)
314	}
315	defer cc.Close()
316
317	grpclbAddresses := []resolver.Address{{
318		Addr:       "grpc.lb.com",
319		Type:       resolver.GRPCLB,
320		ServerName: "grpc.lb.com",
321	}}
322
323	nonGRPCLBAddresses := []resolver.Address{{
324		Addr: "localhost",
325		Type: resolver.Backend,
326	}}
327
328	r.UpdateState(resolver.State{
329		Addresses: nonGRPCLBAddresses,
330	})
331	if got := <-b.addrsChan; !reflect.DeepEqual(got, nonGRPCLBAddresses) {
332		t.Fatalf("With only backend addresses, balancer got addresses %v, want %v", got, nonGRPCLBAddresses)
333	}
334
335	r.UpdateState(resolver.State{
336		Addresses: grpclbAddresses,
337	})
338	if got := <-b.addrsChan; len(got) != 0 {
339		t.Fatalf("With only grpclb addresses, balancer got addresses %v, want empty", got)
340	}
341
342	r.UpdateState(resolver.State{
343		Addresses: append(grpclbAddresses, nonGRPCLBAddresses...),
344	})
345	if got := <-b.addrsChan; !reflect.DeepEqual(got, nonGRPCLBAddresses) {
346		t.Fatalf("With both backend and grpclb addresses, balancer got addresses %v, want %v", got, nonGRPCLBAddresses)
347	}
348}
349