1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package roundrobin_test
20
21import (
22	"context"
23	"fmt"
24	"net"
25	"sync"
26	"testing"
27	"time"
28
29	"google.golang.org/grpc"
30	"google.golang.org/grpc/balancer/roundrobin"
31	"google.golang.org/grpc/codes"
32	_ "google.golang.org/grpc/grpclog/glogger"
33	"google.golang.org/grpc/internal/leakcheck"
34	"google.golang.org/grpc/peer"
35	"google.golang.org/grpc/resolver"
36	"google.golang.org/grpc/resolver/manual"
37	"google.golang.org/grpc/status"
38	testpb "google.golang.org/grpc/test/grpc_testing"
39)
40
41type testServer struct {
42	testpb.TestServiceServer
43}
44
45func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
46	return &testpb.Empty{}, nil
47}
48
49func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
50	return nil
51}
52
53type test struct {
54	servers   []*grpc.Server
55	addresses []string
56}
57
58func (t *test) cleanup() {
59	for _, s := range t.servers {
60		s.Stop()
61	}
62}
63
64func startTestServers(count int) (_ *test, err error) {
65	t := &test{}
66
67	defer func() {
68		if err != nil {
69			for _, s := range t.servers {
70				s.Stop()
71			}
72		}
73	}()
74	for i := 0; i < count; i++ {
75		lis, err := net.Listen("tcp", "localhost:0")
76		if err != nil {
77			return nil, fmt.Errorf("Failed to listen %v", err)
78		}
79
80		s := grpc.NewServer()
81		testpb.RegisterTestServiceServer(s, &testServer{})
82		t.servers = append(t.servers, s)
83		t.addresses = append(t.addresses, lis.Addr().String())
84
85		go func(s *grpc.Server, l net.Listener) {
86			s.Serve(l)
87		}(s, lis)
88	}
89
90	return t, nil
91}
92
93func TestOneBackend(t *testing.T) {
94	defer leakcheck.Check(t)
95	r, cleanup := manual.GenerateAndRegisterManualResolver()
96	defer cleanup()
97
98	test, err := startTestServers(1)
99	if err != nil {
100		t.Fatalf("failed to start servers: %v", err)
101	}
102	defer test.cleanup()
103
104	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
105	if err != nil {
106		t.Fatalf("failed to dial: %v", err)
107	}
108	defer cc.Close()
109	testc := testpb.NewTestServiceClient(cc)
110	// The first RPC should fail because there's no address.
111	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
112	defer cancel()
113	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
114		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
115	}
116
117	r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
118	// The second RPC should succeed.
119	if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
120		t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
121	}
122}
123
124func TestBackendsRoundRobin(t *testing.T) {
125	defer leakcheck.Check(t)
126	r, cleanup := manual.GenerateAndRegisterManualResolver()
127	defer cleanup()
128
129	backendCount := 5
130	test, err := startTestServers(backendCount)
131	if err != nil {
132		t.Fatalf("failed to start servers: %v", err)
133	}
134	defer test.cleanup()
135
136	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
137	if err != nil {
138		t.Fatalf("failed to dial: %v", err)
139	}
140	defer cc.Close()
141	testc := testpb.NewTestServiceClient(cc)
142	// The first RPC should fail because there's no address.
143	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
144	defer cancel()
145	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
146		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
147	}
148
149	var resolvedAddrs []resolver.Address
150	for i := 0; i < backendCount; i++ {
151		resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
152	}
153
154	r.NewAddress(resolvedAddrs)
155	var p peer.Peer
156	// Make sure connections to all servers are up.
157	for si := 0; si < backendCount; si++ {
158		var connected bool
159		for i := 0; i < 1000; i++ {
160			if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
161				t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
162			}
163			if p.Addr.String() == test.addresses[si] {
164				connected = true
165				break
166			}
167			time.Sleep(time.Millisecond)
168		}
169		if !connected {
170			t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
171		}
172	}
173
174	for i := 0; i < 3*backendCount; i++ {
175		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
176			t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
177		}
178		if p.Addr.String() != test.addresses[i%backendCount] {
179			t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
180		}
181	}
182}
183
184func TestAddressesRemoved(t *testing.T) {
185	defer leakcheck.Check(t)
186	r, cleanup := manual.GenerateAndRegisterManualResolver()
187	defer cleanup()
188
189	test, err := startTestServers(1)
190	if err != nil {
191		t.Fatalf("failed to start servers: %v", err)
192	}
193	defer test.cleanup()
194
195	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
196	if err != nil {
197		t.Fatalf("failed to dial: %v", err)
198	}
199	defer cc.Close()
200	testc := testpb.NewTestServiceClient(cc)
201	// The first RPC should fail because there's no address.
202	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
203	defer cancel()
204	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
205		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
206	}
207
208	r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
209	// The second RPC should succeed.
210	if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
211		t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
212	}
213
214	r.NewAddress([]resolver.Address{})
215	for i := 0; i < 1000; i++ {
216		ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
217		defer cancel()
218		if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
219			return
220		}
221		time.Sleep(time.Millisecond)
222	}
223	t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
224}
225
226func TestCloseWithPendingRPC(t *testing.T) {
227	defer leakcheck.Check(t)
228	r, cleanup := manual.GenerateAndRegisterManualResolver()
229	defer cleanup()
230
231	test, err := startTestServers(1)
232	if err != nil {
233		t.Fatalf("failed to start servers: %v", err)
234	}
235	defer test.cleanup()
236
237	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
238	if err != nil {
239		t.Fatalf("failed to dial: %v", err)
240	}
241	testc := testpb.NewTestServiceClient(cc)
242
243	var wg sync.WaitGroup
244	for i := 0; i < 3; i++ {
245		wg.Add(1)
246		go func() {
247			defer wg.Done()
248			// This RPC blocks until cc is closed.
249			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
250			if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded {
251				t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
252			}
253			cancel()
254		}()
255	}
256	cc.Close()
257	wg.Wait()
258}
259
260func TestNewAddressWhileBlocking(t *testing.T) {
261	defer leakcheck.Check(t)
262	r, cleanup := manual.GenerateAndRegisterManualResolver()
263	defer cleanup()
264
265	test, err := startTestServers(1)
266	if err != nil {
267		t.Fatalf("failed to start servers: %v", err)
268	}
269	defer test.cleanup()
270
271	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
272	if err != nil {
273		t.Fatalf("failed to dial: %v", err)
274	}
275	defer cc.Close()
276	testc := testpb.NewTestServiceClient(cc)
277	// The first RPC should fail because there's no address.
278	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
279	defer cancel()
280	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
281		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
282	}
283
284	r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
285	// The second RPC should succeed.
286	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
287	defer cancel()
288	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
289		t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
290	}
291
292	r.NewAddress([]resolver.Address{})
293
294	var wg sync.WaitGroup
295	for i := 0; i < 3; i++ {
296		wg.Add(1)
297		go func() {
298			defer wg.Done()
299			// This RPC blocks until NewAddress is called.
300			testc.EmptyCall(context.Background(), &testpb.Empty{})
301		}()
302	}
303	time.Sleep(50 * time.Millisecond)
304	r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
305	wg.Wait()
306}
307
308func TestOneServerDown(t *testing.T) {
309	defer leakcheck.Check(t)
310	r, cleanup := manual.GenerateAndRegisterManualResolver()
311	defer cleanup()
312
313	backendCount := 3
314	test, err := startTestServers(backendCount)
315	if err != nil {
316		t.Fatalf("failed to start servers: %v", err)
317	}
318	defer test.cleanup()
319
320	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
321	if err != nil {
322		t.Fatalf("failed to dial: %v", err)
323	}
324	defer cc.Close()
325	testc := testpb.NewTestServiceClient(cc)
326	// The first RPC should fail because there's no address.
327	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
328	defer cancel()
329	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
330		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
331	}
332
333	var resolvedAddrs []resolver.Address
334	for i := 0; i < backendCount; i++ {
335		resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
336	}
337
338	r.NewAddress(resolvedAddrs)
339	var p peer.Peer
340	// Make sure connections to all servers are up.
341	for si := 0; si < backendCount; si++ {
342		var connected bool
343		for i := 0; i < 1000; i++ {
344			if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
345				t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
346			}
347			if p.Addr.String() == test.addresses[si] {
348				connected = true
349				break
350			}
351			time.Sleep(time.Millisecond)
352		}
353		if !connected {
354			t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
355		}
356	}
357
358	for i := 0; i < 3*backendCount; i++ {
359		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
360			t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
361		}
362		if p.Addr.String() != test.addresses[i%backendCount] {
363			t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
364		}
365	}
366
367	// Stop one server, RPCs should roundrobin among the remaining servers.
368	backendCount--
369	test.servers[backendCount].Stop()
370	// Loop until see server[backendCount-1] twice without seeing server[backendCount].
371	var targetSeen int
372	for i := 0; i < 1000; i++ {
373		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
374			targetSeen = 0
375			t.Logf("EmptyCall() = _, %v, want _, <nil>", err)
376			// Due to a race, this RPC could possibly get the connection that
377			// was closing, and this RPC may fail. Keep trying when this
378			// happens.
379			continue
380		}
381		switch p.Addr.String() {
382		case test.addresses[backendCount-1]:
383			targetSeen++
384		case test.addresses[backendCount]:
385			// Reset targetSeen if peer is server[backendCount].
386			targetSeen = 0
387		}
388		// Break to make sure the last picked address is server[-1], so the following for loop won't be flaky.
389		if targetSeen >= 2 {
390			break
391		}
392	}
393	if targetSeen != 2 {
394		t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
395	}
396	for i := 0; i < 3*backendCount; i++ {
397		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
398			t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
399		}
400		if p.Addr.String() != test.addresses[i%backendCount] {
401			t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
402		}
403	}
404}
405
406func TestAllServersDown(t *testing.T) {
407	defer leakcheck.Check(t)
408	r, cleanup := manual.GenerateAndRegisterManualResolver()
409	defer cleanup()
410
411	backendCount := 3
412	test, err := startTestServers(backendCount)
413	if err != nil {
414		t.Fatalf("failed to start servers: %v", err)
415	}
416	defer test.cleanup()
417
418	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
419	if err != nil {
420		t.Fatalf("failed to dial: %v", err)
421	}
422	defer cc.Close()
423	testc := testpb.NewTestServiceClient(cc)
424	// The first RPC should fail because there's no address.
425	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
426	defer cancel()
427	if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
428		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
429	}
430
431	var resolvedAddrs []resolver.Address
432	for i := 0; i < backendCount; i++ {
433		resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
434	}
435
436	r.NewAddress(resolvedAddrs)
437	var p peer.Peer
438	// Make sure connections to all servers are up.
439	for si := 0; si < backendCount; si++ {
440		var connected bool
441		for i := 0; i < 1000; i++ {
442			if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
443				t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
444			}
445			if p.Addr.String() == test.addresses[si] {
446				connected = true
447				break
448			}
449			time.Sleep(time.Millisecond)
450		}
451		if !connected {
452			t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
453		}
454	}
455
456	for i := 0; i < 3*backendCount; i++ {
457		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
458			t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
459		}
460		if p.Addr.String() != test.addresses[i%backendCount] {
461			t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
462		}
463	}
464
465	// All servers are stopped, failfast RPC should fail with unavailable.
466	for i := 0; i < backendCount; i++ {
467		test.servers[i].Stop()
468	}
469	time.Sleep(100 * time.Millisecond)
470	for i := 0; i < 1000; i++ {
471		if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable {
472			return
473		}
474		time.Sleep(time.Millisecond)
475	}
476	t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
477}
478