1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"fmt"
24	"math"
25	"testing"
26	"time"
27
28	"google.golang.org/grpc/balancer"
29	"google.golang.org/grpc/balancer/roundrobin"
30	"google.golang.org/grpc/internal"
31	"google.golang.org/grpc/internal/balancer/stub"
32	"google.golang.org/grpc/resolver"
33	"google.golang.org/grpc/resolver/manual"
34	"google.golang.org/grpc/serviceconfig"
35)
36
37var _ balancer.Builder = &magicalLB{}
38var _ balancer.Balancer = &magicalLB{}
39
40// magicalLB is a ringer for grpclb.  It is used to avoid circular dependencies on the grpclb package
41type magicalLB struct{}
42
43func (b *magicalLB) Name() string {
44	return "grpclb"
45}
46
47func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
48	return b
49}
50
51func (b *magicalLB) ResolverError(error) {}
52
53func (b *magicalLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {}
54
55func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error {
56	return nil
57}
58
59func (b *magicalLB) Close() {}
60
61func init() {
62	balancer.Register(&magicalLB{})
63}
64
65func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, func()) {
66	var servers []*server
67	for i := 0; i < numServers; i++ {
68		s := newTestServer()
69		servers = append(servers, s)
70		go s.start(t, 0, maxStreams)
71		s.wait(t, 2*time.Second)
72	}
73	return servers, func() {
74		for i := 0; i < numServers; i++ {
75			servers[i].stop()
76		}
77	}
78}
79
80func checkPickFirst(cc *ClientConn, servers []*server) error {
81	var (
82		req   = "port"
83		reply string
84		err   error
85	)
86	connected := false
87	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
88	defer cancel()
89	for i := 0; i < 5000; i++ {
90		if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
91			if connected {
92				// connected is set to false if peer is not server[0]. So if
93				// connected is true here, this is the second time we saw
94				// server[0] in a row. Break because pickfirst is in effect.
95				break
96			}
97			connected = true
98		} else {
99			connected = false
100		}
101		time.Sleep(time.Millisecond)
102	}
103	if !connected {
104		return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
105	}
106
107	// The following RPCs should all succeed with the first server.
108	for i := 0; i < 3; i++ {
109		err = cc.Invoke(ctx, "/foo/bar", &req, &reply)
110		if errorDesc(err) != servers[0].port {
111			return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err)
112		}
113	}
114	return nil
115}
116
117func checkRoundRobin(cc *ClientConn, servers []*server) error {
118	var (
119		req   = "port"
120		reply string
121		err   error
122	)
123
124	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
125	defer cancel()
126	// Make sure connections to all servers are up.
127	for i := 0; i < 2; i++ {
128		// Do this check twice, otherwise the first RPC's transport may still be
129		// picked by the closing pickfirst balancer, and the test becomes flaky.
130		for _, s := range servers {
131			var up bool
132			for i := 0; i < 5000; i++ {
133				if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == s.port {
134					up = true
135					break
136				}
137				time.Sleep(time.Millisecond)
138			}
139			if !up {
140				return fmt.Errorf("server %v is not up within 5 second", s.port)
141			}
142		}
143	}
144
145	serverCount := len(servers)
146	for i := 0; i < 3*serverCount; i++ {
147		err = cc.Invoke(ctx, "/foo/bar", &req, &reply)
148		if errorDesc(err) != servers[i%serverCount].port {
149			return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
150		}
151	}
152	return nil
153}
154
155func (s) TestSwitchBalancer(t *testing.T) {
156	r := manual.NewBuilderWithScheme("whatever")
157
158	const numServers = 2
159	servers, scleanup := startServers(t, numServers, math.MaxInt32)
160	defer scleanup()
161
162	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
163	if err != nil {
164		t.Fatalf("failed to dial: %v", err)
165	}
166	defer cc.Close()
167	addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
168	r.UpdateState(resolver.State{Addresses: addrs})
169	// The default balancer is pickfirst.
170	if err := checkPickFirst(cc, servers); err != nil {
171		t.Fatalf("check pickfirst returned non-nil error: %v", err)
172	}
173	// Switch to roundrobin.
174	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
175	if err := checkRoundRobin(cc, servers); err != nil {
176		t.Fatalf("check roundrobin returned non-nil error: %v", err)
177	}
178	// Switch to pickfirst.
179	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
180	if err := checkPickFirst(cc, servers); err != nil {
181		t.Fatalf("check pickfirst returned non-nil error: %v", err)
182	}
183}
184
185// Test that balancer specified by dial option will not be overridden.
186func (s) TestBalancerDialOption(t *testing.T) {
187	r := manual.NewBuilderWithScheme("whatever")
188
189	const numServers = 2
190	servers, scleanup := startServers(t, numServers, math.MaxInt32)
191	defer scleanup()
192
193	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
194	if err != nil {
195		t.Fatalf("failed to dial: %v", err)
196	}
197	defer cc.Close()
198	addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
199	r.UpdateState(resolver.State{Addresses: addrs})
200	// The init balancer is roundrobin.
201	if err := checkRoundRobin(cc, servers); err != nil {
202		t.Fatalf("check roundrobin returned non-nil error: %v", err)
203	}
204	// Switch to pickfirst.
205	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
206	// Balancer is still roundrobin.
207	if err := checkRoundRobin(cc, servers); err != nil {
208		t.Fatalf("check roundrobin returned non-nil error: %v", err)
209	}
210}
211
212// First addr update contains grpclb.
213func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
214	r := manual.NewBuilderWithScheme("whatever")
215
216	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
217	if err != nil {
218		t.Fatalf("failed to dial: %v", err)
219	}
220	defer cc.Close()
221
222	// ClientConn will switch balancer to grpclb when receives an address of
223	// type GRPCLB.
224	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
225	var isGRPCLB bool
226	for i := 0; i < 5000; i++ {
227		cc.mu.Lock()
228		isGRPCLB = cc.curBalancerName == "grpclb"
229		cc.mu.Unlock()
230		if isGRPCLB {
231			break
232		}
233		time.Sleep(time.Millisecond)
234	}
235	if !isGRPCLB {
236		t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
237	}
238
239	// New update containing new backend and new grpclb. Should not switch
240	// balancer.
241	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
242	for i := 0; i < 200; i++ {
243		cc.mu.Lock()
244		isGRPCLB = cc.curBalancerName == "grpclb"
245		cc.mu.Unlock()
246		if !isGRPCLB {
247			break
248		}
249		time.Sleep(time.Millisecond)
250	}
251	if !isGRPCLB {
252		t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
253	}
254
255	var isPickFirst bool
256	// Switch balancer to pickfirst.
257	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
258	for i := 0; i < 5000; i++ {
259		cc.mu.Lock()
260		isPickFirst = cc.curBalancerName == PickFirstBalancerName
261		cc.mu.Unlock()
262		if isPickFirst {
263			break
264		}
265		time.Sleep(time.Millisecond)
266	}
267	if !isPickFirst {
268		t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
269	}
270}
271
272// First addr update does not contain grpclb.
273func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
274	r := manual.NewBuilderWithScheme("whatever")
275
276	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
277	if err != nil {
278		t.Fatalf("failed to dial: %v", err)
279	}
280	defer cc.Close()
281
282	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
283	var isPickFirst bool
284	for i := 0; i < 5000; i++ {
285		cc.mu.Lock()
286		isPickFirst = cc.curBalancerName == PickFirstBalancerName
287		cc.mu.Unlock()
288		if isPickFirst {
289			break
290		}
291		time.Sleep(time.Millisecond)
292	}
293	if !isPickFirst {
294		t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
295	}
296
297	// ClientConn will switch balancer to grpclb when receives an address of
298	// type GRPCLB.
299	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
300	var isGRPCLB bool
301	for i := 0; i < 5000; i++ {
302		cc.mu.Lock()
303		isGRPCLB = cc.curBalancerName == "grpclb"
304		cc.mu.Unlock()
305		if isGRPCLB {
306			break
307		}
308		time.Sleep(time.Millisecond)
309	}
310	if !isGRPCLB {
311		t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
312	}
313
314	// New update containing new backend and new grpclb. Should not switch
315	// balancer.
316	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
317	for i := 0; i < 200; i++ {
318		cc.mu.Lock()
319		isGRPCLB = cc.curBalancerName == "grpclb"
320		cc.mu.Unlock()
321		if !isGRPCLB {
322			break
323		}
324		time.Sleep(time.Millisecond)
325	}
326	if !isGRPCLB {
327		t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
328	}
329
330	// Switch balancer back.
331	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
332	for i := 0; i < 5000; i++ {
333		cc.mu.Lock()
334		isPickFirst = cc.curBalancerName == PickFirstBalancerName
335		cc.mu.Unlock()
336		if isPickFirst {
337			break
338		}
339		time.Sleep(time.Millisecond)
340	}
341	if !isPickFirst {
342		t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
343	}
344}
345
346// Test that if the current balancer is roundrobin, after switching to grpclb,
347// when the resolved address doesn't contain grpclb addresses, balancer will be
348// switched back to roundrobin.
349func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
350	r := manual.NewBuilderWithScheme("whatever")
351
352	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
353	if err != nil {
354		t.Fatalf("failed to dial: %v", err)
355	}
356	defer cc.Close()
357
358	sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
359
360	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
361	var isRoundRobin bool
362	for i := 0; i < 5000; i++ {
363		cc.mu.Lock()
364		isRoundRobin = cc.curBalancerName == "round_robin"
365		cc.mu.Unlock()
366		if isRoundRobin {
367			break
368		}
369		time.Sleep(time.Millisecond)
370	}
371	if !isRoundRobin {
372		t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
373	}
374
375	// ClientConn will switch balancer to grpclb when receives an address of
376	// type GRPCLB.
377	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}, ServiceConfig: sc})
378	var isGRPCLB bool
379	for i := 0; i < 5000; i++ {
380		cc.mu.Lock()
381		isGRPCLB = cc.curBalancerName == "grpclb"
382		cc.mu.Unlock()
383		if isGRPCLB {
384			break
385		}
386		time.Sleep(time.Millisecond)
387	}
388	if !isGRPCLB {
389		t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
390	}
391
392	// Switch balancer back.
393	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
394	for i := 0; i < 5000; i++ {
395		cc.mu.Lock()
396		isRoundRobin = cc.curBalancerName == "round_robin"
397		cc.mu.Unlock()
398		if isRoundRobin {
399			break
400		}
401		time.Sleep(time.Millisecond)
402	}
403	if !isRoundRobin {
404		t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
405	}
406}
407
408// Test that if resolved address list contains grpclb, the balancer option in
409// service config won't take effect. But when there's no grpclb address in a new
410// resolved address list, balancer will be switched to the new one.
411func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
412	r := manual.NewBuilderWithScheme("whatever")
413
414	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
415	if err != nil {
416		t.Fatalf("failed to dial: %v", err)
417	}
418	defer cc.Close()
419
420	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
421	var isPickFirst bool
422	for i := 0; i < 5000; i++ {
423		cc.mu.Lock()
424		isPickFirst = cc.curBalancerName == PickFirstBalancerName
425		cc.mu.Unlock()
426		if isPickFirst {
427			break
428		}
429		time.Sleep(time.Millisecond)
430	}
431	if !isPickFirst {
432		t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
433	}
434
435	// ClientConn will switch balancer to grpclb when receives an address of
436	// type GRPCLB.
437	addrs := []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}
438	r.UpdateState(resolver.State{Addresses: addrs})
439	var isGRPCLB bool
440	for i := 0; i < 5000; i++ {
441		cc.mu.Lock()
442		isGRPCLB = cc.curBalancerName == "grpclb"
443		cc.mu.Unlock()
444		if isGRPCLB {
445			break
446		}
447		time.Sleep(time.Millisecond)
448	}
449	if !isGRPCLB {
450		t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
451	}
452
453	sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
454	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
455	var isRoundRobin bool
456	for i := 0; i < 200; i++ {
457		cc.mu.Lock()
458		isRoundRobin = cc.curBalancerName == "round_robin"
459		cc.mu.Unlock()
460		if isRoundRobin {
461			break
462		}
463		time.Sleep(time.Millisecond)
464	}
465	// Balancer should NOT switch to round_robin because resolved list contains
466	// grpclb.
467	if isRoundRobin {
468		t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb")
469	}
470
471	// Switch balancer back.
472	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
473	for i := 0; i < 5000; i++ {
474		cc.mu.Lock()
475		isRoundRobin = cc.curBalancerName == "round_robin"
476		cc.mu.Unlock()
477		if isRoundRobin {
478			break
479		}
480		time.Sleep(time.Millisecond)
481	}
482	if !isRoundRobin {
483		t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
484	}
485}
486
487// Test that when switching to grpclb fails because grpclb is not registered,
488// the fallback balancer will only get backend addresses, not the grpclb server
489// address.
490//
491// The tests sends 3 server addresses (all backends) as resolved addresses, but
492// claim the first one is grpclb server. The all RPCs should all be send to the
493// other addresses, not the first one.
494func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
495	internal.BalancerUnregister("grpclb")
496	defer balancer.Register(&magicalLB{})
497
498	r := manual.NewBuilderWithScheme("whatever")
499
500	const numServers = 3
501	servers, scleanup := startServers(t, numServers, math.MaxInt32)
502	defer scleanup()
503
504	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
505	if err != nil {
506		t.Fatalf("failed to dial: %v", err)
507	}
508	defer cc.Close()
509	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
510	// The default balancer is pickfirst.
511	if err := checkPickFirst(cc, servers[1:]); err != nil {
512		t.Fatalf("check pickfirst returned non-nil error: %v", err)
513	}
514	// Try switching to grpclb by sending servers[0] as grpclb address. It's
515	// expected that servers[0] will be filtered out, so it will not be used by
516	// the balancer.
517	//
518	// If the filtering failed, servers[0] will be used for RPCs and the RPCs
519	// will succeed. The following checks will catch this and fail.
520	addrs := []resolver.Address{
521		{Addr: servers[0].addr, Type: resolver.GRPCLB},
522		{Addr: servers[1].addr}, {Addr: servers[2].addr}}
523	r.UpdateState(resolver.State{Addresses: addrs})
524	// Still check for pickfirst, but only with server[1] and server[2].
525	if err := checkPickFirst(cc, servers[1:]); err != nil {
526		t.Fatalf("check pickfirst returned non-nil error: %v", err)
527	}
528	// Switch to roundrobin, and check against server[1] and server[2].
529	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
530	if err := checkRoundRobin(cc, servers[1:]); err != nil {
531		t.Fatalf("check roundrobin returned non-nil error: %v", err)
532	}
533}
534
535const inlineRemoveSubConnBalancerName = "test-inline-remove-subconn-balancer"
536
537func init() {
538	stub.Register(inlineRemoveSubConnBalancerName, stub.BalancerFuncs{
539		Close: func(data *stub.BalancerData) {
540			data.ClientConn.RemoveSubConn(&acBalancerWrapper{})
541		},
542	})
543}
544
545// Test that when switching to balancers, the old balancer calls RemoveSubConn
546// in Close.
547//
548// This test is to make sure this close doesn't cause a deadlock.
549func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) {
550	r := manual.NewBuilderWithScheme("whatever")
551	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
552	if err != nil {
553		t.Fatalf("failed to dial: %v", err)
554	}
555	defer cc.Close()
556	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, inlineRemoveSubConnBalancerName))}, nil)
557	// This service config update will switch balancer from
558	// "test-inline-remove-subconn-balancer" to "pick_first". The test balancer
559	// will be closed, which will call cc.RemoveSubConn() inline (this
560	// RemoveSubConn is not required by the API, but some balancers might do
561	// it).
562	//
563	// This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a
564	// deadlock (e.g. trying to grab a mutex while it's already locked).
565	//
566	// Do it in a goroutine so this test will fail with a helpful message
567	// (though the goroutine will still leak).
568	done := make(chan struct{})
569	go func() {
570		cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`)}, nil)
571		close(done)
572	}()
573	select {
574	case <-time.After(defaultTestTimeout):
575		t.Fatalf("timeout waiting for updateResolverState to finish")
576	case <-done:
577	}
578}
579
580func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
581	scpr := r.CC.ParseServiceConfig(s)
582	if scpr.Err != nil {
583		panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
584	}
585	return scpr
586}
587