1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package weightedtarget
20
21import (
22	"encoding/json"
23	"fmt"
24	"testing"
25	"time"
26
27	"github.com/google/go-cmp/cmp"
28	"google.golang.org/grpc/attributes"
29	"google.golang.org/grpc/balancer"
30	"google.golang.org/grpc/balancer/roundrobin"
31	"google.golang.org/grpc/connectivity"
32	"google.golang.org/grpc/internal/hierarchy"
33	"google.golang.org/grpc/resolver"
34	"google.golang.org/grpc/serviceconfig"
35	"google.golang.org/grpc/xds/internal/balancer/balancergroup"
36	"google.golang.org/grpc/xds/internal/testutils"
37)
38
39type testConfigBalancerBuilder struct {
40	balancer.Builder
41}
42
43func newTestConfigBalancerBuilder() *testConfigBalancerBuilder {
44	return &testConfigBalancerBuilder{
45		Builder: balancer.Get(roundrobin.Name),
46	}
47}
48
49func (t *testConfigBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
50	rr := t.Builder.Build(cc, opts)
51	return &testConfigBalancer{
52		Balancer: rr,
53	}
54}
55
56const testConfigBalancerName = "test_config_balancer"
57
58func (t *testConfigBalancerBuilder) Name() string {
59	return testConfigBalancerName
60}
61
62type stringBalancerConfig struct {
63	serviceconfig.LoadBalancingConfig
64	s string
65}
66
67func (t *testConfigBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
68	// Return string without quotes.
69	return stringBalancerConfig{s: string(c[1 : len(c)-1])}, nil
70}
71
72// testConfigBalancer is a roundrobin balancer, but it takes the balancer config
73// string and append it to the backend addresses.
74type testConfigBalancer struct {
75	balancer.Balancer
76}
77
78func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
79	c, ok := s.BalancerConfig.(stringBalancerConfig)
80	if !ok {
81		return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig)
82	}
83	oneMoreAddr := resolver.Address{Addr: c.s}
84	s.BalancerConfig = nil
85	s.ResolverState.Addresses = append(s.ResolverState.Addresses, oneMoreAddr)
86	return b.Balancer.UpdateClientConnState(s)
87}
88
89func (b *testConfigBalancer) Close() {
90	b.Balancer.Close()
91}
92
93var (
94	wtbBuilder          balancer.Builder
95	wtbParser           balancer.ConfigParser
96	testBackendAddrStrs []string
97)
98
99const testBackendAddrsCount = 12
100
101func init() {
102	balancer.Register(newTestConfigBalancerBuilder())
103	for i := 0; i < testBackendAddrsCount; i++ {
104		testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
105	}
106	wtbBuilder = balancer.Get(weightedTargetName)
107	wtbParser = wtbBuilder.(balancer.ConfigParser)
108
109	balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
110}
111
112// TestWeightedTarget covers the cases that a sub-balancer is added and a
113// sub-balancer is removed. It verifies that the addresses and balancer configs
114// are forwarded to the right sub-balancer.
115//
116// This test is intended to test the glue code in weighted_target. Most of the
117// functionality tests are covered by the balancer group tests.
118func TestWeightedTarget(t *testing.T) {
119	cc := testutils.NewTestClientConn(t)
120	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
121
122	// Start with "cluster_1: round_robin".
123	config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"round_robin":""}]}}}`))
124	if err != nil {
125		t.Fatalf("failed to parse balancer config: %v", err)
126	}
127
128	// Send the config, and an address with hierarchy path ["cluster_1"].
129	wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil}
130	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
131		ResolverState: resolver.State{Addresses: []resolver.Address{
132			hierarchy.Set(wantAddr1, []string{"cluster_1"}),
133		}},
134		BalancerConfig: config1,
135	}); err != nil {
136		t.Fatalf("failed to update ClientConn state: %v", err)
137	}
138
139	// Verify that a subconn is created with the address, and the hierarchy path
140	// in the address is cleared.
141	addr1 := <-cc.NewSubConnAddrsCh
142	if want := []resolver.Address{
143		hierarchy.Set(wantAddr1, []string{}),
144	}; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) {
145		t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{})))
146	}
147
148	// Send subconn state change.
149	sc1 := <-cc.NewSubConnCh
150	wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
151	wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
152
153	// Test pick with one backend.
154	p1 := <-cc.NewPickerCh
155	for i := 0; i < 5; i++ {
156		gotSCSt, _ := p1.Pick(balancer.PickInfo{})
157		if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
158			t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
159		}
160	}
161
162	// Remove cluster_1, and add "cluster_2: test_config_balancer".
163	wantAddr3Str := testBackendAddrStrs[2]
164	config2, err := wtbParser.ParseConfig([]byte(
165		fmt.Sprintf(`{"targets":{"cluster_2":{"weight":1,"childPolicy":[{%q:%q}]}}}`, testConfigBalancerName, wantAddr3Str),
166	))
167	if err != nil {
168		t.Fatalf("failed to parse balancer config: %v", err)
169	}
170
171	// Send the config, and one address with hierarchy path "cluster_2".
172	wantAddr2 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil}
173	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
174		ResolverState: resolver.State{Addresses: []resolver.Address{
175			hierarchy.Set(wantAddr2, []string{"cluster_2"}),
176		}},
177		BalancerConfig: config2,
178	}); err != nil {
179		t.Fatalf("failed to update ClientConn state: %v", err)
180	}
181
182	// Expect the address sent in the address list. The hierarchy path should be
183	// cleared.
184	addr2 := <-cc.NewSubConnAddrsCh
185	if want := []resolver.Address{
186		hierarchy.Set(wantAddr2, []string{}),
187	}; !cmp.Equal(addr2, want, cmp.AllowUnexported(attributes.Attributes{})) {
188		t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr2, want, cmp.AllowUnexported(attributes.Attributes{})))
189	}
190	// Expect the other address sent as balancer config. This address doesn't
191	// have hierarchy path.
192	wantAddr3 := resolver.Address{Addr: wantAddr3Str, Attributes: nil}
193	addr3 := <-cc.NewSubConnAddrsCh
194	if want := []resolver.Address{wantAddr3}; !cmp.Equal(addr3, want, cmp.AllowUnexported(attributes.Attributes{})) {
195		t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr3, want, cmp.AllowUnexported(attributes.Attributes{})))
196	}
197
198	// The subconn for cluster_1 should be removed.
199	scToRemove := <-cc.RemoveSubConnCh
200	if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
201		t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
202	}
203	wtb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
204
205	sc2 := <-cc.NewSubConnCh
206	wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
207	wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
208	sc3 := <-cc.NewSubConnCh
209	wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
210	wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
211
212	// Test roundrobin pick with backends in cluster_2.
213	p2 := <-cc.NewPickerCh
214	want := []balancer.SubConn{sc2, sc3}
215	if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
216		t.Fatalf("want %v, got %v", want, err)
217	}
218}
219
220func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
221	return func() balancer.SubConn {
222		scst, _ := p.Pick(balancer.PickInfo{})
223		return scst.SubConn
224	}
225}
226