1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package edsbalancer
18
19import (
20	"context"
21	"reflect"
22	"testing"
23
24	"google.golang.org/grpc/balancer"
25	"google.golang.org/grpc/balancer/roundrobin"
26	"google.golang.org/grpc/balancer/xds/internal"
27	orcapb "google.golang.org/grpc/balancer/xds/internal/proto/udpa/data/orca/v1/orca_load_report"
28	"google.golang.org/grpc/connectivity"
29	"google.golang.org/grpc/resolver"
30)
31
32var (
33	rrBuilder        = balancer.Get(roundrobin.Name)
34	testBalancerIDs  = []internal.Locality{{Region: "b1"}, {Region: "b2"}, {Region: "b3"}}
35	testBackendAddrs = []resolver.Address{{Addr: "1.1.1.1:1"}, {Addr: "2.2.2.2:2"}, {Addr: "3.3.3.3:3"}, {Addr: "4.4.4.4:4"}}
36)
37
38// 1 balancer, 1 backend -> 2 backends -> 1 backend.
39func TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
40	cc := newTestClientConn(t)
41	bg := newBalancerGroup(cc, nil)
42
43	// Add one balancer to group.
44	bg.add(testBalancerIDs[0], 1, rrBuilder)
45	// Send one resolved address.
46	bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
47
48	// Send subconn state change.
49	sc1 := <-cc.newSubConnCh
50	bg.handleSubConnStateChange(sc1, connectivity.Connecting)
51	bg.handleSubConnStateChange(sc1, connectivity.Ready)
52
53	// Test pick with one backend.
54	p1 := <-cc.newPickerCh
55	for i := 0; i < 5; i++ {
56		gotSC, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
57		if !reflect.DeepEqual(gotSC, sc1) {
58			t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc1)
59		}
60	}
61
62	// Send two addresses.
63	bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
64	// Expect one new subconn, send state update.
65	sc2 := <-cc.newSubConnCh
66	bg.handleSubConnStateChange(sc2, connectivity.Connecting)
67	bg.handleSubConnStateChange(sc2, connectivity.Ready)
68
69	// Test roundrobin pick.
70	p2 := <-cc.newPickerCh
71	want := []balancer.SubConn{sc1, sc2}
72	if err := isRoundRobin(want, func() balancer.SubConn {
73		sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{})
74		return sc
75	}); err != nil {
76		t.Fatalf("want %v, got %v", want, err)
77	}
78
79	// Remove the first address.
80	bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[1:2])
81	scToRemove := <-cc.removeSubConnCh
82	if !reflect.DeepEqual(scToRemove, sc1) {
83		t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
84	}
85	bg.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
86
87	// Test pick with only the second subconn.
88	p3 := <-cc.newPickerCh
89	for i := 0; i < 5; i++ {
90		gotSC, _, _ := p3.Pick(context.Background(), balancer.PickOptions{})
91		if !reflect.DeepEqual(gotSC, sc2) {
92			t.Fatalf("picker.Pick, got %v, want %v", gotSC, sc2)
93		}
94	}
95}
96
97// 2 balancers, each with 1 backend.
98func TestBalancerGroup_TwoRR_OneBackend(t *testing.T) {
99	cc := newTestClientConn(t)
100	bg := newBalancerGroup(cc, nil)
101
102	// Add two balancers to group and send one resolved address to both
103	// balancers.
104	bg.add(testBalancerIDs[0], 1, rrBuilder)
105	bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
106	sc1 := <-cc.newSubConnCh
107
108	bg.add(testBalancerIDs[1], 1, rrBuilder)
109	bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[0:1])
110	sc2 := <-cc.newSubConnCh
111
112	// Send state changes for both subconns.
113	bg.handleSubConnStateChange(sc1, connectivity.Connecting)
114	bg.handleSubConnStateChange(sc1, connectivity.Ready)
115	bg.handleSubConnStateChange(sc2, connectivity.Connecting)
116	bg.handleSubConnStateChange(sc2, connectivity.Ready)
117
118	// Test roundrobin on the last picker.
119	p1 := <-cc.newPickerCh
120	want := []balancer.SubConn{sc1, sc2}
121	if err := isRoundRobin(want, func() balancer.SubConn {
122		sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
123		return sc
124	}); err != nil {
125		t.Fatalf("want %v, got %v", want, err)
126	}
127}
128
129// 2 balancers, each with more than 1 backends.
130func TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
131	cc := newTestClientConn(t)
132	bg := newBalancerGroup(cc, nil)
133
134	// Add two balancers to group and send one resolved address to both
135	// balancers.
136	bg.add(testBalancerIDs[0], 1, rrBuilder)
137	bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
138	sc1 := <-cc.newSubConnCh
139	sc2 := <-cc.newSubConnCh
140
141	bg.add(testBalancerIDs[1], 1, rrBuilder)
142	bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
143	sc3 := <-cc.newSubConnCh
144	sc4 := <-cc.newSubConnCh
145
146	// Send state changes for both subconns.
147	bg.handleSubConnStateChange(sc1, connectivity.Connecting)
148	bg.handleSubConnStateChange(sc1, connectivity.Ready)
149	bg.handleSubConnStateChange(sc2, connectivity.Connecting)
150	bg.handleSubConnStateChange(sc2, connectivity.Ready)
151	bg.handleSubConnStateChange(sc3, connectivity.Connecting)
152	bg.handleSubConnStateChange(sc3, connectivity.Ready)
153	bg.handleSubConnStateChange(sc4, connectivity.Connecting)
154	bg.handleSubConnStateChange(sc4, connectivity.Ready)
155
156	// Test roundrobin on the last picker.
157	p1 := <-cc.newPickerCh
158	want := []balancer.SubConn{sc1, sc2, sc3, sc4}
159	if err := isRoundRobin(want, func() balancer.SubConn {
160		sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
161		return sc
162	}); err != nil {
163		t.Fatalf("want %v, got %v", want, err)
164	}
165
166	// Turn sc2's connection down, should be RR between balancers.
167	bg.handleSubConnStateChange(sc2, connectivity.TransientFailure)
168	p2 := <-cc.newPickerCh
169	// Expect two sc1's in the result, because balancer1 will be picked twice,
170	// but there's only one sc in it.
171	want = []balancer.SubConn{sc1, sc1, sc3, sc4}
172	if err := isRoundRobin(want, func() balancer.SubConn {
173		sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{})
174		return sc
175	}); err != nil {
176		t.Fatalf("want %v, got %v", want, err)
177	}
178
179	// Remove sc3's addresses.
180	bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[3:4])
181	scToRemove := <-cc.removeSubConnCh
182	if !reflect.DeepEqual(scToRemove, sc3) {
183		t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scToRemove)
184	}
185	bg.handleSubConnStateChange(scToRemove, connectivity.Shutdown)
186	p3 := <-cc.newPickerCh
187	want = []balancer.SubConn{sc1, sc4}
188	if err := isRoundRobin(want, func() balancer.SubConn {
189		sc, _, _ := p3.Pick(context.Background(), balancer.PickOptions{})
190		return sc
191	}); err != nil {
192		t.Fatalf("want %v, got %v", want, err)
193	}
194
195	// Turn sc1's connection down.
196	bg.handleSubConnStateChange(sc1, connectivity.TransientFailure)
197	p4 := <-cc.newPickerCh
198	want = []balancer.SubConn{sc4}
199	if err := isRoundRobin(want, func() balancer.SubConn {
200		sc, _, _ := p4.Pick(context.Background(), balancer.PickOptions{})
201		return sc
202	}); err != nil {
203		t.Fatalf("want %v, got %v", want, err)
204	}
205
206	// Turn last connection to connecting.
207	bg.handleSubConnStateChange(sc4, connectivity.Connecting)
208	p5 := <-cc.newPickerCh
209	for i := 0; i < 5; i++ {
210		if _, _, err := p5.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrNoSubConnAvailable {
211			t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err)
212		}
213	}
214
215	// Turn all connections down.
216	bg.handleSubConnStateChange(sc4, connectivity.TransientFailure)
217	p6 := <-cc.newPickerCh
218	for i := 0; i < 5; i++ {
219		if _, _, err := p6.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrTransientFailure {
220			t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
221		}
222	}
223}
224
225// 2 balancers with different weights.
226func TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) {
227	cc := newTestClientConn(t)
228	bg := newBalancerGroup(cc, nil)
229
230	// Add two balancers to group and send two resolved addresses to both
231	// balancers.
232	bg.add(testBalancerIDs[0], 2, rrBuilder)
233	bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
234	sc1 := <-cc.newSubConnCh
235	sc2 := <-cc.newSubConnCh
236
237	bg.add(testBalancerIDs[1], 1, rrBuilder)
238	bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
239	sc3 := <-cc.newSubConnCh
240	sc4 := <-cc.newSubConnCh
241
242	// Send state changes for both subconns.
243	bg.handleSubConnStateChange(sc1, connectivity.Connecting)
244	bg.handleSubConnStateChange(sc1, connectivity.Ready)
245	bg.handleSubConnStateChange(sc2, connectivity.Connecting)
246	bg.handleSubConnStateChange(sc2, connectivity.Ready)
247	bg.handleSubConnStateChange(sc3, connectivity.Connecting)
248	bg.handleSubConnStateChange(sc3, connectivity.Ready)
249	bg.handleSubConnStateChange(sc4, connectivity.Connecting)
250	bg.handleSubConnStateChange(sc4, connectivity.Ready)
251
252	// Test roundrobin on the last picker.
253	p1 := <-cc.newPickerCh
254	want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
255	if err := isRoundRobin(want, func() balancer.SubConn {
256		sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
257		return sc
258	}); err != nil {
259		t.Fatalf("want %v, got %v", want, err)
260	}
261}
262
263// totally 3 balancers, add/remove balancer.
264func TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
265	cc := newTestClientConn(t)
266	bg := newBalancerGroup(cc, nil)
267
268	// Add three balancers to group and send one resolved address to both
269	// balancers.
270	bg.add(testBalancerIDs[0], 1, rrBuilder)
271	bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
272	sc1 := <-cc.newSubConnCh
273
274	bg.add(testBalancerIDs[1], 1, rrBuilder)
275	bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[1:2])
276	sc2 := <-cc.newSubConnCh
277
278	bg.add(testBalancerIDs[2], 1, rrBuilder)
279	bg.handleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:2])
280	sc3 := <-cc.newSubConnCh
281
282	// Send state changes for both subconns.
283	bg.handleSubConnStateChange(sc1, connectivity.Connecting)
284	bg.handleSubConnStateChange(sc1, connectivity.Ready)
285	bg.handleSubConnStateChange(sc2, connectivity.Connecting)
286	bg.handleSubConnStateChange(sc2, connectivity.Ready)
287	bg.handleSubConnStateChange(sc3, connectivity.Connecting)
288	bg.handleSubConnStateChange(sc3, connectivity.Ready)
289
290	p1 := <-cc.newPickerCh
291	want := []balancer.SubConn{sc1, sc2, sc3}
292	if err := isRoundRobin(want, func() balancer.SubConn {
293		sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
294		return sc
295	}); err != nil {
296		t.Fatalf("want %v, got %v", want, err)
297	}
298
299	// Remove the second balancer, while the others two are ready.
300	bg.remove(testBalancerIDs[1])
301	scToRemove := <-cc.removeSubConnCh
302	if !reflect.DeepEqual(scToRemove, sc2) {
303		t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
304	}
305	p2 := <-cc.newPickerCh
306	want = []balancer.SubConn{sc1, sc3}
307	if err := isRoundRobin(want, func() balancer.SubConn {
308		sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{})
309		return sc
310	}); err != nil {
311		t.Fatalf("want %v, got %v", want, err)
312	}
313
314	// move balancer 3 into transient failure.
315	bg.handleSubConnStateChange(sc3, connectivity.TransientFailure)
316	// Remove the first balancer, while the third is transient failure.
317	bg.remove(testBalancerIDs[0])
318	scToRemove = <-cc.removeSubConnCh
319	if !reflect.DeepEqual(scToRemove, sc1) {
320		t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
321	}
322	p3 := <-cc.newPickerCh
323	for i := 0; i < 5; i++ {
324		if _, _, err := p3.Pick(context.Background(), balancer.PickOptions{}); err != balancer.ErrTransientFailure {
325			t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
326		}
327	}
328}
329
330// 2 balancers, change balancer weight.
331func TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
332	cc := newTestClientConn(t)
333	bg := newBalancerGroup(cc, nil)
334
335	// Add two balancers to group and send two resolved addresses to both
336	// balancers.
337	bg.add(testBalancerIDs[0], 2, rrBuilder)
338	bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
339	sc1 := <-cc.newSubConnCh
340	sc2 := <-cc.newSubConnCh
341
342	bg.add(testBalancerIDs[1], 1, rrBuilder)
343	bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
344	sc3 := <-cc.newSubConnCh
345	sc4 := <-cc.newSubConnCh
346
347	// Send state changes for both subconns.
348	bg.handleSubConnStateChange(sc1, connectivity.Connecting)
349	bg.handleSubConnStateChange(sc1, connectivity.Ready)
350	bg.handleSubConnStateChange(sc2, connectivity.Connecting)
351	bg.handleSubConnStateChange(sc2, connectivity.Ready)
352	bg.handleSubConnStateChange(sc3, connectivity.Connecting)
353	bg.handleSubConnStateChange(sc3, connectivity.Ready)
354	bg.handleSubConnStateChange(sc4, connectivity.Connecting)
355	bg.handleSubConnStateChange(sc4, connectivity.Ready)
356
357	// Test roundrobin on the last picker.
358	p1 := <-cc.newPickerCh
359	want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
360	if err := isRoundRobin(want, func() balancer.SubConn {
361		sc, _, _ := p1.Pick(context.Background(), balancer.PickOptions{})
362		return sc
363	}); err != nil {
364		t.Fatalf("want %v, got %v", want, err)
365	}
366
367	bg.changeWeight(testBalancerIDs[0], 3)
368
369	// Test roundrobin with new weight.
370	p2 := <-cc.newPickerCh
371	want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4}
372	if err := isRoundRobin(want, func() balancer.SubConn {
373		sc, _, _ := p2.Pick(context.Background(), balancer.PickOptions{})
374		return sc
375	}); err != nil {
376		t.Fatalf("want %v, got %v", want, err)
377	}
378}
379
380func TestBalancerGroup_LoadReport(t *testing.T) {
381	testLoadStore := newTestLoadStore()
382
383	cc := newTestClientConn(t)
384	bg := newBalancerGroup(cc, testLoadStore)
385
386	backendToBalancerID := make(map[balancer.SubConn]internal.Locality)
387
388	// Add two balancers to group and send two resolved addresses to both
389	// balancers.
390	bg.add(testBalancerIDs[0], 2, rrBuilder)
391	bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
392	sc1 := <-cc.newSubConnCh
393	sc2 := <-cc.newSubConnCh
394	backendToBalancerID[sc1] = testBalancerIDs[0]
395	backendToBalancerID[sc2] = testBalancerIDs[0]
396
397	bg.add(testBalancerIDs[1], 1, rrBuilder)
398	bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
399	sc3 := <-cc.newSubConnCh
400	sc4 := <-cc.newSubConnCh
401	backendToBalancerID[sc3] = testBalancerIDs[1]
402	backendToBalancerID[sc4] = testBalancerIDs[1]
403
404	// Send state changes for both subconns.
405	bg.handleSubConnStateChange(sc1, connectivity.Connecting)
406	bg.handleSubConnStateChange(sc1, connectivity.Ready)
407	bg.handleSubConnStateChange(sc2, connectivity.Connecting)
408	bg.handleSubConnStateChange(sc2, connectivity.Ready)
409	bg.handleSubConnStateChange(sc3, connectivity.Connecting)
410	bg.handleSubConnStateChange(sc3, connectivity.Ready)
411	bg.handleSubConnStateChange(sc4, connectivity.Connecting)
412	bg.handleSubConnStateChange(sc4, connectivity.Ready)
413
414	// Test roundrobin on the last picker.
415	p1 := <-cc.newPickerCh
416	var (
417		wantStart []internal.Locality
418		wantEnd   []internal.Locality
419		wantCost  []testServerLoad
420	)
421	for i := 0; i < 10; i++ {
422		sc, done, _ := p1.Pick(context.Background(), balancer.PickOptions{})
423		locality := backendToBalancerID[sc]
424		wantStart = append(wantStart, locality)
425		if done != nil && sc != sc1 {
426			done(balancer.DoneInfo{
427				ServerLoad: &orcapb.OrcaLoadReport{
428					CpuUtilization:           10,
429					MemUtilization:           5,
430					RequestCostOrUtilization: map[string]float64{"pi": 3.14},
431				},
432			})
433			wantEnd = append(wantEnd, locality)
434			wantCost = append(wantCost,
435				testServerLoad{name: serverLoadCPUName, d: 10},
436				testServerLoad{name: serverLoadMemoryName, d: 5},
437				testServerLoad{name: "pi", d: 3.14})
438		}
439	}
440
441	if !reflect.DeepEqual(testLoadStore.callsStarted, wantStart) {
442		t.Fatalf("want started: %v, got: %v", testLoadStore.callsStarted, wantStart)
443	}
444	if !reflect.DeepEqual(testLoadStore.callsEnded, wantEnd) {
445		t.Fatalf("want ended: %v, got: %v", testLoadStore.callsEnded, wantEnd)
446	}
447	if !reflect.DeepEqual(testLoadStore.callsCost, wantCost) {
448		t.Fatalf("want cost: %v, got: %v", testLoadStore.callsCost, wantCost)
449	}
450}
451