1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package roundrobin_test
20
21import (
22	"context"
23	"fmt"
24	"net"
25	"strings"
26	"sync"
27	"testing"
28	"time"
29
30	"google.golang.org/grpc"
31	"google.golang.org/grpc/balancer/roundrobin"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/connectivity"
34	"google.golang.org/grpc/internal/grpctest"
35	"google.golang.org/grpc/peer"
36	"google.golang.org/grpc/resolver"
37	"google.golang.org/grpc/resolver/manual"
38	"google.golang.org/grpc/status"
39	testpb "google.golang.org/grpc/test/grpc_testing"
40)
41
42type s struct {
43	grpctest.Tester
44}
45
46func Test(t *testing.T) {
47	grpctest.RunSubTests(t, s{})
48}
49
50type testServer struct {
51	testpb.UnimplementedTestServiceServer
52}
53
54func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
55	return &testpb.Empty{}, nil
56}
57
58func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
59	return nil
60}
61
62type test struct {
63	servers   []*grpc.Server
64	addresses []string
65}
66
67func (t *test) cleanup() {
68	for _, s := range t.servers {
69		s.Stop()
70	}
71}
72
73func startTestServers(count int) (_ *test, err error) {
74	t := &test{}
75
76	defer func() {
77		if err != nil {
78			t.cleanup()
79		}
80	}()
81	for i := 0; i < count; i++ {
82		lis, err := net.Listen("tcp", "localhost:0")
83		if err != nil {
84			return nil, fmt.Errorf("failed to listen %v", err)
85		}
86
87		s := grpc.NewServer()
88		testpb.RegisterTestServiceServer(s, &testServer{})
89		t.servers = append(t.servers, s)
90		t.addresses = append(t.addresses, lis.Addr().String())
91
92		go func(s *grpc.Server, l net.Listener) {
93			s.Serve(l)
94		}(s, lis)
95	}
96
97	return t, nil
98}
99
100func (s) TestOneBackend(t *testing.T) {
101	r, cleanup := manual.GenerateAndRegisterManualResolver()
102	defer cleanup()
103
104	test, err := startTestServers(1)
105	if err != nil {
106		t.Fatalf("failed to start servers: %v", err)
107	}
108	defer test.cleanup()
109
110	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
111	if err != nil {
112		t.Fatalf("failed to dial: %v", err)
113	}
114	defer cc.Close()
115	testc := testpb.NewTestServiceClient(cc)
116	// The first RPC should fail because there's no address.
117	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
118	defer cancel()
119	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
120		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
121	}
122
123	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
124	// The second RPC should succeed.
125	if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
126		t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
127	}
128}
129
130func (s) TestBackendsRoundRobin(t *testing.T) {
131	r, cleanup := manual.GenerateAndRegisterManualResolver()
132	defer cleanup()
133
134	backendCount := 5
135	test, err := startTestServers(backendCount)
136	if err != nil {
137		t.Fatalf("failed to start servers: %v", err)
138	}
139	defer test.cleanup()
140
141	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
142	if err != nil {
143		t.Fatalf("failed to dial: %v", err)
144	}
145	defer cc.Close()
146	testc := testpb.NewTestServiceClient(cc)
147	// The first RPC should fail because there's no address.
148	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
149	defer cancel()
150	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
151		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
152	}
153
154	var resolvedAddrs []resolver.Address
155	for i := 0; i < backendCount; i++ {
156		resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
157	}
158
159	r.UpdateState(resolver.State{Addresses: resolvedAddrs})
160	var p peer.Peer
161	// Make sure connections to all servers are up.
162	for si := 0; si < backendCount; si++ {
163		var connected bool
164		for i := 0; i < 1000; i++ {
165			if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
166				t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
167			}
168			if p.Addr.String() == test.addresses[si] {
169				connected = true
170				break
171			}
172			time.Sleep(time.Millisecond)
173		}
174		if !connected {
175			t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
176		}
177	}
178
179	for i := 0; i < 3*backendCount; i++ {
180		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
181			t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
182		}
183		if p.Addr.String() != test.addresses[i%backendCount] {
184			t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
185		}
186	}
187}
188
189func (s) TestAddressesRemoved(t *testing.T) {
190	r, cleanup := manual.GenerateAndRegisterManualResolver()
191	defer cleanup()
192
193	test, err := startTestServers(1)
194	if err != nil {
195		t.Fatalf("failed to start servers: %v", err)
196	}
197	defer test.cleanup()
198
199	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
200	if err != nil {
201		t.Fatalf("failed to dial: %v", err)
202	}
203	defer cc.Close()
204	testc := testpb.NewTestServiceClient(cc)
205	// The first RPC should fail because there's no address.
206	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
207	defer cancel()
208	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
209		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
210	}
211
212	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
213	// The second RPC should succeed.
214	if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
215		t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
216	}
217
218	r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
219	// Removing addresses results in an error reported to the clientconn, but
220	// the existing connections remain.  RPCs should still succeed.
221	ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
222	defer cancel()
223	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
224		t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
225	}
226
227	// Stop the server to bring the channel state into transient failure.
228	test.cleanup()
229	// Wait for not-ready.
230	for src := cc.GetState(); src == connectivity.Ready; src = cc.GetState() {
231		if !cc.WaitForStateChange(ctx, src) {
232			t.Fatalf("timed out waiting for state change.  got %v; want !%v", src, connectivity.Ready)
233		}
234	}
235	// Report an empty server list again; because the state is not ready, the
236	// empty address list error should surface to the user.
237	r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
238
239	const msgWant = "produced zero addresses"
240	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) {
241		t.Fatalf("EmptyCall() = _, %v, want _, Contains(Message(), %q)", err, msgWant)
242	}
243
244}
245
246func (s) TestCloseWithPendingRPC(t *testing.T) {
247	r, cleanup := manual.GenerateAndRegisterManualResolver()
248	defer cleanup()
249
250	test, err := startTestServers(1)
251	if err != nil {
252		t.Fatalf("failed to start servers: %v", err)
253	}
254	defer test.cleanup()
255
256	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
257	if err != nil {
258		t.Fatalf("failed to dial: %v", err)
259	}
260	testc := testpb.NewTestServiceClient(cc)
261
262	var wg sync.WaitGroup
263	for i := 0; i < 3; i++ {
264		wg.Add(1)
265		go func() {
266			defer wg.Done()
267			// This RPC blocks until cc is closed.
268			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
269			if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded {
270				t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
271			}
272			cancel()
273		}()
274	}
275	cc.Close()
276	wg.Wait()
277}
278
279func (s) TestNewAddressWhileBlocking(t *testing.T) {
280	r, cleanup := manual.GenerateAndRegisterManualResolver()
281	defer cleanup()
282
283	test, err := startTestServers(1)
284	if err != nil {
285		t.Fatalf("failed to start servers: %v", err)
286	}
287	defer test.cleanup()
288
289	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
290	if err != nil {
291		t.Fatalf("failed to dial: %v", err)
292	}
293	defer cc.Close()
294	testc := testpb.NewTestServiceClient(cc)
295	// The first RPC should fail because there's no address.
296	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
297	defer cancel()
298	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
299		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
300	}
301
302	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
303	// The second RPC should succeed.
304	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
305	defer cancel()
306	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
307		t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
308	}
309
310	r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
311
312	var wg sync.WaitGroup
313	for i := 0; i < 3; i++ {
314		wg.Add(1)
315		go func() {
316			defer wg.Done()
317			// This RPC blocks until NewAddress is called.
318			testc.EmptyCall(context.Background(), &testpb.Empty{})
319		}()
320	}
321	time.Sleep(50 * time.Millisecond)
322	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
323	wg.Wait()
324}
325
326func (s) TestOneServerDown(t *testing.T) {
327	r, cleanup := manual.GenerateAndRegisterManualResolver()
328	defer cleanup()
329
330	backendCount := 3
331	test, err := startTestServers(backendCount)
332	if err != nil {
333		t.Fatalf("failed to start servers: %v", err)
334	}
335	defer test.cleanup()
336
337	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
338	if err != nil {
339		t.Fatalf("failed to dial: %v", err)
340	}
341	defer cc.Close()
342	testc := testpb.NewTestServiceClient(cc)
343	// The first RPC should fail because there's no address.
344	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
345	defer cancel()
346	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
347		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
348	}
349
350	var resolvedAddrs []resolver.Address
351	for i := 0; i < backendCount; i++ {
352		resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
353	}
354
355	r.UpdateState(resolver.State{Addresses: resolvedAddrs})
356	var p peer.Peer
357	// Make sure connections to all servers are up.
358	for si := 0; si < backendCount; si++ {
359		var connected bool
360		for i := 0; i < 1000; i++ {
361			if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
362				t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
363			}
364			if p.Addr.String() == test.addresses[si] {
365				connected = true
366				break
367			}
368			time.Sleep(time.Millisecond)
369		}
370		if !connected {
371			t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
372		}
373	}
374
375	for i := 0; i < 3*backendCount; i++ {
376		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
377			t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
378		}
379		if p.Addr.String() != test.addresses[i%backendCount] {
380			t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
381		}
382	}
383
384	// Stop one server, RPCs should roundrobin among the remaining servers.
385	backendCount--
386	test.servers[backendCount].Stop()
387	// Loop until see server[backendCount-1] twice without seeing server[backendCount].
388	var targetSeen int
389	for i := 0; i < 1000; i++ {
390		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
391			targetSeen = 0
392			t.Logf("EmptyCall() = _, %v, want _, <nil>", err)
393			// Due to a race, this RPC could possibly get the connection that
394			// was closing, and this RPC may fail. Keep trying when this
395			// happens.
396			continue
397		}
398		switch p.Addr.String() {
399		case test.addresses[backendCount-1]:
400			targetSeen++
401		case test.addresses[backendCount]:
402			// Reset targetSeen if peer is server[backendCount].
403			targetSeen = 0
404		}
405		// Break to make sure the last picked address is server[-1], so the following for loop won't be flaky.
406		if targetSeen >= 2 {
407			break
408		}
409	}
410	if targetSeen != 2 {
411		t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
412	}
413	for i := 0; i < 3*backendCount; i++ {
414		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
415			t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
416		}
417		if p.Addr.String() != test.addresses[i%backendCount] {
418			t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
419		}
420	}
421}
422
423func (s) TestAllServersDown(t *testing.T) {
424	r, cleanup := manual.GenerateAndRegisterManualResolver()
425	defer cleanup()
426
427	backendCount := 3
428	test, err := startTestServers(backendCount)
429	if err != nil {
430		t.Fatalf("failed to start servers: %v", err)
431	}
432	defer test.cleanup()
433
434	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
435	if err != nil {
436		t.Fatalf("failed to dial: %v", err)
437	}
438	defer cc.Close()
439	testc := testpb.NewTestServiceClient(cc)
440	// The first RPC should fail because there's no address.
441	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
442	defer cancel()
443	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
444		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
445	}
446
447	var resolvedAddrs []resolver.Address
448	for i := 0; i < backendCount; i++ {
449		resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
450	}
451
452	r.UpdateState(resolver.State{Addresses: resolvedAddrs})
453	var p peer.Peer
454	// Make sure connections to all servers are up.
455	for si := 0; si < backendCount; si++ {
456		var connected bool
457		for i := 0; i < 1000; i++ {
458			if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
459				t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
460			}
461			if p.Addr.String() == test.addresses[si] {
462				connected = true
463				break
464			}
465			time.Sleep(time.Millisecond)
466		}
467		if !connected {
468			t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
469		}
470	}
471
472	for i := 0; i < 3*backendCount; i++ {
473		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
474			t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
475		}
476		if p.Addr.String() != test.addresses[i%backendCount] {
477			t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
478		}
479	}
480
481	// All servers are stopped, failfast RPC should fail with unavailable.
482	for i := 0; i < backendCount; i++ {
483		test.servers[i].Stop()
484	}
485	time.Sleep(100 * time.Millisecond)
486	for i := 0; i < 1000; i++ {
487		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable {
488			return
489		}
490		time.Sleep(time.Millisecond)
491	}
492	t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
493}
494