1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package test
20
21import (
22	"context"
23	"reflect"
24	"testing"
25	"time"
26
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/balancer"
29	"google.golang.org/grpc/connectivity"
30	"google.golang.org/grpc/credentials"
31	"google.golang.org/grpc/grpclog"
32	"google.golang.org/grpc/internal/balancerload"
33	"google.golang.org/grpc/internal/testutils"
34	"google.golang.org/grpc/metadata"
35	"google.golang.org/grpc/resolver"
36	"google.golang.org/grpc/resolver/manual"
37	testpb "google.golang.org/grpc/test/grpc_testing"
38	"google.golang.org/grpc/testdata"
39)
40
41const testBalancerName = "testbalancer"
42
43// testBalancer creates one subconn with the first address from resolved
44// addresses.
45//
46// It's used to test options for NewSubConn are applies correctly.
47type testBalancer struct {
48	cc balancer.ClientConn
49	sc balancer.SubConn
50
51	newSubConnOptions balancer.NewSubConnOptions
52	pickOptions       []balancer.PickOptions
53	doneInfo          []balancer.DoneInfo
54}
55
56func (b *testBalancer) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
57	b.cc = cc
58	return b
59}
60
61func (*testBalancer) Name() string {
62	return testBalancerName
63}
64
65func (b *testBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
66	// Only create a subconn at the first time.
67	if err == nil && b.sc == nil {
68		b.sc, err = b.cc.NewSubConn(addrs, b.newSubConnOptions)
69		if err != nil {
70			grpclog.Errorf("testBalancer: failed to NewSubConn: %v", err)
71			return
72		}
73		b.cc.UpdateBalancerState(connectivity.Connecting, &picker{sc: b.sc, bal: b})
74		b.sc.Connect()
75	}
76}
77
78func (b *testBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
79	grpclog.Infof("testBalancer: HandleSubConnStateChange: %p, %v", sc, s)
80	if b.sc != sc {
81		grpclog.Infof("testBalancer: ignored state change because sc is not recognized")
82		return
83	}
84	if s == connectivity.Shutdown {
85		b.sc = nil
86		return
87	}
88
89	switch s {
90	case connectivity.Ready, connectivity.Idle:
91		b.cc.UpdateBalancerState(s, &picker{sc: sc, bal: b})
92	case connectivity.Connecting:
93		b.cc.UpdateBalancerState(s, &picker{err: balancer.ErrNoSubConnAvailable, bal: b})
94	case connectivity.TransientFailure:
95		b.cc.UpdateBalancerState(s, &picker{err: balancer.ErrTransientFailure, bal: b})
96	}
97}
98
99func (b *testBalancer) Close() {
100}
101
102type picker struct {
103	err error
104	sc  balancer.SubConn
105	bal *testBalancer
106}
107
108func (p *picker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
109	if p.err != nil {
110		return nil, nil, p.err
111	}
112	p.bal.pickOptions = append(p.bal.pickOptions, opts)
113	return p.sc, func(d balancer.DoneInfo) { p.bal.doneInfo = append(p.bal.doneInfo, d) }, nil
114}
115
116func (s) TestCredsBundleFromBalancer(t *testing.T) {
117	balancer.Register(&testBalancer{
118		newSubConnOptions: balancer.NewSubConnOptions{
119			CredsBundle: &testCredsBundle{},
120		},
121	})
122	te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: ""})
123	te.tapHandle = authHandle
124	te.customDialOptions = []grpc.DialOption{
125		grpc.WithBalancerName(testBalancerName),
126	}
127	creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key"))
128	if err != nil {
129		t.Fatalf("Failed to generate credentials %v", err)
130	}
131	te.customServerOptions = []grpc.ServerOption{
132		grpc.Creds(creds),
133	}
134	te.startServer(&testServer{})
135	defer te.tearDown()
136
137	cc := te.clientConn()
138	tc := testpb.NewTestServiceClient(cc)
139	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
140		t.Fatalf("Test failed. Reason: %v", err)
141	}
142}
143
144func (s) TestDoneInfo(t *testing.T) {
145	for _, e := range listTestEnv() {
146		testDoneInfo(t, e)
147	}
148}
149
150func testDoneInfo(t *testing.T, e env) {
151	te := newTest(t, e)
152	b := &testBalancer{}
153	balancer.Register(b)
154	te.customDialOptions = []grpc.DialOption{
155		grpc.WithBalancerName(testBalancerName),
156	}
157	te.userAgent = failAppUA
158	te.startServer(&testServer{security: e.security})
159	defer te.tearDown()
160
161	cc := te.clientConn()
162	tc := testpb.NewTestServiceClient(cc)
163
164	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
165	defer cancel()
166	wantErr := detailedError
167	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !testutils.StatusErrEqual(err, wantErr) {
168		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
169	}
170	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
171		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
172	}
173
174	if len(b.doneInfo) < 1 || !testutils.StatusErrEqual(b.doneInfo[0].Err, wantErr) {
175		t.Fatalf("b.doneInfo = %v; want b.doneInfo[0].Err = %v", b.doneInfo, wantErr)
176	}
177	if len(b.doneInfo) < 2 || !reflect.DeepEqual(b.doneInfo[1].Trailer, testTrailerMetadata) {
178		t.Fatalf("b.doneInfo = %v; want b.doneInfo[1].Trailer = %v", b.doneInfo, testTrailerMetadata)
179	}
180	if len(b.pickOptions) != len(b.doneInfo) {
181		t.Fatalf("Got %d picks, but %d doneInfo, want equal amount", len(b.pickOptions), len(b.doneInfo))
182	}
183	// To test done() is always called, even if it's returned with a non-Ready
184	// SubConn.
185	//
186	// Stop server and at the same time send RPCs. There are chances that picker
187	// is not updated in time, causing a non-Ready SubConn to be returned.
188	finished := make(chan struct{})
189	go func() {
190		for i := 0; i < 20; i++ {
191			tc.UnaryCall(ctx, &testpb.SimpleRequest{})
192		}
193		close(finished)
194	}()
195	te.srv.Stop()
196	<-finished
197	if len(b.pickOptions) != len(b.doneInfo) {
198		t.Fatalf("Got %d picks, %d doneInfo, want equal amount", len(b.pickOptions), len(b.doneInfo))
199	}
200}
201
202const loadMDKey = "X-Endpoint-Load-Metrics-Bin"
203
204type testLoadParser struct{}
205
206func (*testLoadParser) Parse(md metadata.MD) interface{} {
207	vs := md.Get(loadMDKey)
208	if len(vs) == 0 {
209		return nil
210	}
211	return vs[0]
212}
213
214func init() {
215	balancerload.SetParser(&testLoadParser{})
216}
217
218func (s) TestDoneLoads(t *testing.T) {
219	for _, e := range listTestEnv() {
220		testDoneLoads(t, e)
221	}
222}
223
224func testDoneLoads(t *testing.T, e env) {
225	b := &testBalancer{}
226	balancer.Register(b)
227
228	const testLoad = "test-load-,-should-be-orca"
229
230	ss := &stubServer{
231		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
232			grpc.SetTrailer(ctx, metadata.Pairs(loadMDKey, testLoad))
233			return &testpb.Empty{}, nil
234		},
235	}
236	if err := ss.Start(nil, grpc.WithBalancerName(testBalancerName)); err != nil {
237		t.Fatalf("error starting testing server: %v", err)
238	}
239	defer ss.Stop()
240
241	tc := testpb.NewTestServiceClient(ss.cc)
242
243	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
244	defer cancel()
245	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
246		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, nil)
247	}
248
249	poWant := []balancer.PickOptions{
250		{FullMethodName: "/grpc.testing.TestService/EmptyCall"},
251	}
252	if !reflect.DeepEqual(b.pickOptions, poWant) {
253		t.Fatalf("b.pickOptions = %v; want %v", b.pickOptions, poWant)
254	}
255
256	if len(b.doneInfo) < 1 {
257		t.Fatalf("b.doneInfo = %v, want length 1", b.doneInfo)
258	}
259	gotLoad, _ := b.doneInfo[0].ServerLoad.(string)
260	if gotLoad != testLoad {
261		t.Fatalf("b.doneInfo[0].ServerLoad = %v; want = %v", b.doneInfo[0].ServerLoad, testLoad)
262	}
263}
264
265const testBalancerKeepAddressesName = "testbalancer-keepingaddresses"
266
267// testBalancerKeepAddresses keeps the addresses in the builder instead of
268// creating SubConns.
269//
270// It's used to test the addresses balancer gets are correct.
271type testBalancerKeepAddresses struct {
272	addrsChan chan []resolver.Address
273}
274
275func newTestBalancerKeepAddresses() *testBalancerKeepAddresses {
276	return &testBalancerKeepAddresses{
277		addrsChan: make(chan []resolver.Address, 10),
278	}
279}
280
281func (b *testBalancerKeepAddresses) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
282	return b
283}
284
285func (*testBalancerKeepAddresses) Name() string {
286	return testBalancerKeepAddressesName
287}
288
289func (b *testBalancerKeepAddresses) HandleResolvedAddrs(addrs []resolver.Address, err error) {
290	b.addrsChan <- addrs
291}
292
293func (testBalancerKeepAddresses) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
294	panic("not used")
295}
296
297func (testBalancerKeepAddresses) Close() {
298}
299
300// Make sure that non-grpclb balancers don't get grpclb addresses even if name
301// resolver sends them
302func (s) TestNonGRPCLBBalancerGetsNoGRPCLBAddress(t *testing.T) {
303	r, rcleanup := manual.GenerateAndRegisterManualResolver()
304	defer rcleanup()
305
306	b := newTestBalancerKeepAddresses()
307	balancer.Register(b)
308
309	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(),
310		grpc.WithBalancerName(b.Name()))
311	if err != nil {
312		t.Fatalf("failed to dial: %v", err)
313	}
314	defer cc.Close()
315
316	grpclbAddresses := []resolver.Address{{
317		Addr:       "grpc.lb.com",
318		Type:       resolver.GRPCLB,
319		ServerName: "grpc.lb.com",
320	}}
321
322	nonGRPCLBAddresses := []resolver.Address{{
323		Addr: "localhost",
324		Type: resolver.Backend,
325	}}
326
327	r.UpdateState(resolver.State{
328		Addresses: nonGRPCLBAddresses,
329	})
330	if got := <-b.addrsChan; !reflect.DeepEqual(got, nonGRPCLBAddresses) {
331		t.Fatalf("With only backend addresses, balancer got addresses %v, want %v", got, nonGRPCLBAddresses)
332	}
333
334	r.UpdateState(resolver.State{
335		Addresses: grpclbAddresses,
336	})
337	if got := <-b.addrsChan; len(got) != 0 {
338		t.Fatalf("With only grpclb addresses, balancer got addresses %v, want empty", got)
339	}
340
341	r.UpdateState(resolver.State{
342		Addresses: append(grpclbAddresses, nonGRPCLBAddresses...),
343	})
344	if got := <-b.addrsChan; !reflect.DeepEqual(got, nonGRPCLBAddresses) {
345		t.Fatalf("With both backend and grpclb addresses, balancer got addresses %v, want %v", got, nonGRPCLBAddresses)
346	}
347}
348