1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"fmt"
24	"math"
25	"testing"
26	"time"
27
28	"google.golang.org/grpc/balancer"
29	"google.golang.org/grpc/balancer/roundrobin"
30	"google.golang.org/grpc/internal"
31	"google.golang.org/grpc/internal/balancer/stub"
32	"google.golang.org/grpc/resolver"
33	"google.golang.org/grpc/resolver/manual"
34	"google.golang.org/grpc/serviceconfig"
35)
36
37var _ balancer.Builder = &magicalLB{}
38var _ balancer.Balancer = &magicalLB{}
39
40// magicalLB is a ringer for grpclb.  It is used to avoid circular dependencies on the grpclb package
41type magicalLB struct{}
42
43func (b *magicalLB) Name() string {
44	return "grpclb"
45}
46
47func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
48	return b
49}
50
51func (b *magicalLB) ResolverError(error) {}
52
53func (b *magicalLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {}
54
55func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error {
56	return nil
57}
58
59func (b *magicalLB) Close() {}
60
61func (b *magicalLB) ExitIdle() {}
62
63func init() {
64	balancer.Register(&magicalLB{})
65}
66
67func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, func()) {
68	var servers []*server
69	for i := 0; i < numServers; i++ {
70		s := newTestServer()
71		servers = append(servers, s)
72		go s.start(t, 0, maxStreams)
73		s.wait(t, 2*time.Second)
74	}
75	return servers, func() {
76		for i := 0; i < numServers; i++ {
77			servers[i].stop()
78		}
79	}
80}
81
82func checkPickFirst(cc *ClientConn, servers []*server) error {
83	var (
84		req   = "port"
85		reply string
86		err   error
87	)
88	connected := false
89	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
90	defer cancel()
91	for i := 0; i < 5000; i++ {
92		if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
93			if connected {
94				// connected is set to false if peer is not server[0]. So if
95				// connected is true here, this is the second time we saw
96				// server[0] in a row. Break because pickfirst is in effect.
97				break
98			}
99			connected = true
100		} else {
101			connected = false
102		}
103		time.Sleep(time.Millisecond)
104	}
105	if !connected {
106		return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
107	}
108
109	// The following RPCs should all succeed with the first server.
110	for i := 0; i < 3; i++ {
111		err = cc.Invoke(ctx, "/foo/bar", &req, &reply)
112		if errorDesc(err) != servers[0].port {
113			return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err)
114		}
115	}
116	return nil
117}
118
119func checkRoundRobin(cc *ClientConn, servers []*server) error {
120	var (
121		req   = "port"
122		reply string
123		err   error
124	)
125
126	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
127	defer cancel()
128	// Make sure connections to all servers are up.
129	for i := 0; i < 2; i++ {
130		// Do this check twice, otherwise the first RPC's transport may still be
131		// picked by the closing pickfirst balancer, and the test becomes flaky.
132		for _, s := range servers {
133			var up bool
134			for i := 0; i < 5000; i++ {
135				if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == s.port {
136					up = true
137					break
138				}
139				time.Sleep(time.Millisecond)
140			}
141			if !up {
142				return fmt.Errorf("server %v is not up within 5 second", s.port)
143			}
144		}
145	}
146
147	serverCount := len(servers)
148	for i := 0; i < 3*serverCount; i++ {
149		err = cc.Invoke(ctx, "/foo/bar", &req, &reply)
150		if errorDesc(err) != servers[i%serverCount].port {
151			return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
152		}
153	}
154	return nil
155}
156
157func (s) TestSwitchBalancer(t *testing.T) {
158	r := manual.NewBuilderWithScheme("whatever")
159
160	const numServers = 2
161	servers, scleanup := startServers(t, numServers, math.MaxInt32)
162	defer scleanup()
163
164	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
165	if err != nil {
166		t.Fatalf("failed to dial: %v", err)
167	}
168	defer cc.Close()
169	addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
170	r.UpdateState(resolver.State{Addresses: addrs})
171	// The default balancer is pickfirst.
172	if err := checkPickFirst(cc, servers); err != nil {
173		t.Fatalf("check pickfirst returned non-nil error: %v", err)
174	}
175	// Switch to roundrobin.
176	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
177	if err := checkRoundRobin(cc, servers); err != nil {
178		t.Fatalf("check roundrobin returned non-nil error: %v", err)
179	}
180	// Switch to pickfirst.
181	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
182	if err := checkPickFirst(cc, servers); err != nil {
183		t.Fatalf("check pickfirst returned non-nil error: %v", err)
184	}
185}
186
187// Test that balancer specified by dial option will not be overridden.
188func (s) TestBalancerDialOption(t *testing.T) {
189	r := manual.NewBuilderWithScheme("whatever")
190
191	const numServers = 2
192	servers, scleanup := startServers(t, numServers, math.MaxInt32)
193	defer scleanup()
194
195	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
196	if err != nil {
197		t.Fatalf("failed to dial: %v", err)
198	}
199	defer cc.Close()
200	addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
201	r.UpdateState(resolver.State{Addresses: addrs})
202	// The init balancer is roundrobin.
203	if err := checkRoundRobin(cc, servers); err != nil {
204		t.Fatalf("check roundrobin returned non-nil error: %v", err)
205	}
206	// Switch to pickfirst.
207	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
208	// Balancer is still roundrobin.
209	if err := checkRoundRobin(cc, servers); err != nil {
210		t.Fatalf("check roundrobin returned non-nil error: %v", err)
211	}
212}
213
214// First addr update contains grpclb.
215func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
216	r := manual.NewBuilderWithScheme("whatever")
217
218	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
219	if err != nil {
220		t.Fatalf("failed to dial: %v", err)
221	}
222	defer cc.Close()
223
224	// ClientConn will switch balancer to grpclb when receives an address of
225	// type GRPCLB.
226	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
227	var isGRPCLB bool
228	for i := 0; i < 5000; i++ {
229		cc.mu.Lock()
230		isGRPCLB = cc.curBalancerName == "grpclb"
231		cc.mu.Unlock()
232		if isGRPCLB {
233			break
234		}
235		time.Sleep(time.Millisecond)
236	}
237	if !isGRPCLB {
238		t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
239	}
240
241	// New update containing new backend and new grpclb. Should not switch
242	// balancer.
243	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
244	for i := 0; i < 200; i++ {
245		cc.mu.Lock()
246		isGRPCLB = cc.curBalancerName == "grpclb"
247		cc.mu.Unlock()
248		if !isGRPCLB {
249			break
250		}
251		time.Sleep(time.Millisecond)
252	}
253	if !isGRPCLB {
254		t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
255	}
256
257	var isPickFirst bool
258	// Switch balancer to pickfirst.
259	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
260	for i := 0; i < 5000; i++ {
261		cc.mu.Lock()
262		isPickFirst = cc.curBalancerName == PickFirstBalancerName
263		cc.mu.Unlock()
264		if isPickFirst {
265			break
266		}
267		time.Sleep(time.Millisecond)
268	}
269	if !isPickFirst {
270		t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
271	}
272}
273
274// First addr update does not contain grpclb.
275func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
276	r := manual.NewBuilderWithScheme("whatever")
277
278	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
279	if err != nil {
280		t.Fatalf("failed to dial: %v", err)
281	}
282	defer cc.Close()
283
284	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
285	var isPickFirst bool
286	for i := 0; i < 5000; i++ {
287		cc.mu.Lock()
288		isPickFirst = cc.curBalancerName == PickFirstBalancerName
289		cc.mu.Unlock()
290		if isPickFirst {
291			break
292		}
293		time.Sleep(time.Millisecond)
294	}
295	if !isPickFirst {
296		t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
297	}
298
299	// ClientConn will switch balancer to grpclb when receives an address of
300	// type GRPCLB.
301	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
302	var isGRPCLB bool
303	for i := 0; i < 5000; i++ {
304		cc.mu.Lock()
305		isGRPCLB = cc.curBalancerName == "grpclb"
306		cc.mu.Unlock()
307		if isGRPCLB {
308			break
309		}
310		time.Sleep(time.Millisecond)
311	}
312	if !isGRPCLB {
313		t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
314	}
315
316	// New update containing new backend and new grpclb. Should not switch
317	// balancer.
318	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
319	for i := 0; i < 200; i++ {
320		cc.mu.Lock()
321		isGRPCLB = cc.curBalancerName == "grpclb"
322		cc.mu.Unlock()
323		if !isGRPCLB {
324			break
325		}
326		time.Sleep(time.Millisecond)
327	}
328	if !isGRPCLB {
329		t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
330	}
331
332	// Switch balancer back.
333	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
334	for i := 0; i < 5000; i++ {
335		cc.mu.Lock()
336		isPickFirst = cc.curBalancerName == PickFirstBalancerName
337		cc.mu.Unlock()
338		if isPickFirst {
339			break
340		}
341		time.Sleep(time.Millisecond)
342	}
343	if !isPickFirst {
344		t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
345	}
346}
347
348// Test that if the current balancer is roundrobin, after switching to grpclb,
349// when the resolved address doesn't contain grpclb addresses, balancer will be
350// switched back to roundrobin.
351func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
352	r := manual.NewBuilderWithScheme("whatever")
353
354	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
355	if err != nil {
356		t.Fatalf("failed to dial: %v", err)
357	}
358	defer cc.Close()
359
360	sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
361
362	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
363	var isRoundRobin bool
364	for i := 0; i < 5000; i++ {
365		cc.mu.Lock()
366		isRoundRobin = cc.curBalancerName == "round_robin"
367		cc.mu.Unlock()
368		if isRoundRobin {
369			break
370		}
371		time.Sleep(time.Millisecond)
372	}
373	if !isRoundRobin {
374		t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
375	}
376
377	// ClientConn will switch balancer to grpclb when receives an address of
378	// type GRPCLB.
379	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}, ServiceConfig: sc})
380	var isGRPCLB bool
381	for i := 0; i < 5000; i++ {
382		cc.mu.Lock()
383		isGRPCLB = cc.curBalancerName == "grpclb"
384		cc.mu.Unlock()
385		if isGRPCLB {
386			break
387		}
388		time.Sleep(time.Millisecond)
389	}
390	if !isGRPCLB {
391		t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
392	}
393
394	// Switch balancer back.
395	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
396	for i := 0; i < 5000; i++ {
397		cc.mu.Lock()
398		isRoundRobin = cc.curBalancerName == "round_robin"
399		cc.mu.Unlock()
400		if isRoundRobin {
401			break
402		}
403		time.Sleep(time.Millisecond)
404	}
405	if !isRoundRobin {
406		t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
407	}
408}
409
410// Test that if resolved address list contains grpclb, the balancer option in
411// service config won't take effect. But when there's no grpclb address in a new
412// resolved address list, balancer will be switched to the new one.
413func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
414	r := manual.NewBuilderWithScheme("whatever")
415
416	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
417	if err != nil {
418		t.Fatalf("failed to dial: %v", err)
419	}
420	defer cc.Close()
421
422	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
423	var isPickFirst bool
424	for i := 0; i < 5000; i++ {
425		cc.mu.Lock()
426		isPickFirst = cc.curBalancerName == PickFirstBalancerName
427		cc.mu.Unlock()
428		if isPickFirst {
429			break
430		}
431		time.Sleep(time.Millisecond)
432	}
433	if !isPickFirst {
434		t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
435	}
436
437	// ClientConn will switch balancer to grpclb when receives an address of
438	// type GRPCLB.
439	addrs := []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}
440	r.UpdateState(resolver.State{Addresses: addrs})
441	var isGRPCLB bool
442	for i := 0; i < 5000; i++ {
443		cc.mu.Lock()
444		isGRPCLB = cc.curBalancerName == "grpclb"
445		cc.mu.Unlock()
446		if isGRPCLB {
447			break
448		}
449		time.Sleep(time.Millisecond)
450	}
451	if !isGRPCLB {
452		t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
453	}
454
455	sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
456	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
457	var isRoundRobin bool
458	for i := 0; i < 200; i++ {
459		cc.mu.Lock()
460		isRoundRobin = cc.curBalancerName == "round_robin"
461		cc.mu.Unlock()
462		if isRoundRobin {
463			break
464		}
465		time.Sleep(time.Millisecond)
466	}
467	// Balancer should NOT switch to round_robin because resolved list contains
468	// grpclb.
469	if isRoundRobin {
470		t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb")
471	}
472
473	// Switch balancer back.
474	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
475	for i := 0; i < 5000; i++ {
476		cc.mu.Lock()
477		isRoundRobin = cc.curBalancerName == "round_robin"
478		cc.mu.Unlock()
479		if isRoundRobin {
480			break
481		}
482		time.Sleep(time.Millisecond)
483	}
484	if !isRoundRobin {
485		t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
486	}
487}
488
489// Test that when switching to grpclb fails because grpclb is not registered,
490// the fallback balancer will only get backend addresses, not the grpclb server
491// address.
492//
493// The tests sends 3 server addresses (all backends) as resolved addresses, but
494// claim the first one is grpclb server. The all RPCs should all be send to the
495// other addresses, not the first one.
496func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
497	internal.BalancerUnregister("grpclb")
498	defer balancer.Register(&magicalLB{})
499
500	r := manual.NewBuilderWithScheme("whatever")
501
502	const numServers = 3
503	servers, scleanup := startServers(t, numServers, math.MaxInt32)
504	defer scleanup()
505
506	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
507	if err != nil {
508		t.Fatalf("failed to dial: %v", err)
509	}
510	defer cc.Close()
511	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
512	// The default balancer is pickfirst.
513	if err := checkPickFirst(cc, servers[1:]); err != nil {
514		t.Fatalf("check pickfirst returned non-nil error: %v", err)
515	}
516	// Try switching to grpclb by sending servers[0] as grpclb address. It's
517	// expected that servers[0] will be filtered out, so it will not be used by
518	// the balancer.
519	//
520	// If the filtering failed, servers[0] will be used for RPCs and the RPCs
521	// will succeed. The following checks will catch this and fail.
522	addrs := []resolver.Address{
523		{Addr: servers[0].addr, Type: resolver.GRPCLB},
524		{Addr: servers[1].addr}, {Addr: servers[2].addr}}
525	r.UpdateState(resolver.State{Addresses: addrs})
526	// Still check for pickfirst, but only with server[1] and server[2].
527	if err := checkPickFirst(cc, servers[1:]); err != nil {
528		t.Fatalf("check pickfirst returned non-nil error: %v", err)
529	}
530	// Switch to roundrobin, and check against server[1] and server[2].
531	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
532	if err := checkRoundRobin(cc, servers[1:]); err != nil {
533		t.Fatalf("check roundrobin returned non-nil error: %v", err)
534	}
535}
536
537const inlineRemoveSubConnBalancerName = "test-inline-remove-subconn-balancer"
538
539func init() {
540	stub.Register(inlineRemoveSubConnBalancerName, stub.BalancerFuncs{
541		Close: func(data *stub.BalancerData) {
542			data.ClientConn.RemoveSubConn(&acBalancerWrapper{})
543		},
544	})
545}
546
547// Test that when switching to balancers, the old balancer calls RemoveSubConn
548// in Close.
549//
550// This test is to make sure this close doesn't cause a deadlock.
551func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) {
552	r := manual.NewBuilderWithScheme("whatever")
553	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
554	if err != nil {
555		t.Fatalf("failed to dial: %v", err)
556	}
557	defer cc.Close()
558	cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, inlineRemoveSubConnBalancerName))}, nil)
559	// This service config update will switch balancer from
560	// "test-inline-remove-subconn-balancer" to "pick_first". The test balancer
561	// will be closed, which will call cc.RemoveSubConn() inline (this
562	// RemoveSubConn is not required by the API, but some balancers might do
563	// it).
564	//
565	// This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a
566	// deadlock (e.g. trying to grab a mutex while it's already locked).
567	//
568	// Do it in a goroutine so this test will fail with a helpful message
569	// (though the goroutine will still leak).
570	done := make(chan struct{})
571	go func() {
572		cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`)}, nil)
573		close(done)
574	}()
575	select {
576	case <-time.After(defaultTestTimeout):
577		t.Fatalf("timeout waiting for updateResolverState to finish")
578	case <-done:
579	}
580}
581
582func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
583	scpr := r.CC.ParseServiceConfig(s)
584	if scpr.Err != nil {
585		panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
586	}
587	return scpr
588}
589