1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"math"
24	"sync"
25	"testing"
26	"time"
27
28	"google.golang.org/grpc/codes"
29	"google.golang.org/grpc/resolver"
30	"google.golang.org/grpc/resolver/manual"
31	"google.golang.org/grpc/status"
32)
33
34func errorDesc(err error) string {
35	if s, ok := status.FromError(err); ok {
36		return s.Message()
37	}
38	return err.Error()
39}
40
41func (s) TestOneBackendPickfirst(t *testing.T) {
42	r := manual.NewBuilderWithScheme("whatever")
43
44	numServers := 1
45	servers, scleanup := startServers(t, numServers, math.MaxInt32)
46	defer scleanup()
47
48	cc, err := Dial(r.Scheme()+":///test.server",
49		WithInsecure(),
50		WithResolvers(r),
51		WithCodec(testCodec{}))
52	if err != nil {
53		t.Fatalf("failed to dial: %v", err)
54	}
55	defer cc.Close()
56	// The first RPC should fail because there's no address.
57	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
58	defer cancel()
59	req := "port"
60	var reply string
61	if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
62		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
63	}
64
65	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
66	// The second RPC should succeed.
67	for i := 0; i < 1000; i++ {
68		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
69			return
70		}
71		time.Sleep(time.Millisecond)
72	}
73	t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
74}
75
76func (s) TestBackendsPickfirst(t *testing.T) {
77	r := manual.NewBuilderWithScheme("whatever")
78
79	numServers := 2
80	servers, scleanup := startServers(t, numServers, math.MaxInt32)
81	defer scleanup()
82
83	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
84	if err != nil {
85		t.Fatalf("failed to dial: %v", err)
86	}
87	defer cc.Close()
88	// The first RPC should fail because there's no address.
89	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
90	defer cancel()
91	req := "port"
92	var reply string
93	if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
94		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
95	}
96
97	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
98	// The second RPC should succeed with the first server.
99	for i := 0; i < 1000; i++ {
100		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
101			return
102		}
103		time.Sleep(time.Millisecond)
104	}
105	t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
106}
107
108func (s) TestNewAddressWhileBlockingPickfirst(t *testing.T) {
109	r := manual.NewBuilderWithScheme("whatever")
110
111	numServers := 1
112	servers, scleanup := startServers(t, numServers, math.MaxInt32)
113	defer scleanup()
114
115	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
116	if err != nil {
117		t.Fatalf("failed to dial: %v", err)
118	}
119	defer cc.Close()
120	// The first RPC should fail because there's no address.
121	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
122	defer cancel()
123	req := "port"
124	var reply string
125	if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
126		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
127	}
128
129	var wg sync.WaitGroup
130	for i := 0; i < 3; i++ {
131		wg.Add(1)
132		go func() {
133			defer wg.Done()
134			// This RPC blocks until NewAddress is called.
135			cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
136		}()
137	}
138	time.Sleep(50 * time.Millisecond)
139	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
140	wg.Wait()
141}
142
143func (s) TestCloseWithPendingRPCPickfirst(t *testing.T) {
144	r := manual.NewBuilderWithScheme("whatever")
145
146	numServers := 1
147	_, scleanup := startServers(t, numServers, math.MaxInt32)
148	defer scleanup()
149
150	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
151	if err != nil {
152		t.Fatalf("failed to dial: %v", err)
153	}
154	defer cc.Close()
155	// The first RPC should fail because there's no address.
156	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
157	defer cancel()
158	req := "port"
159	var reply string
160	if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
161		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
162	}
163
164	var wg sync.WaitGroup
165	for i := 0; i < 3; i++ {
166		wg.Add(1)
167		go func() {
168			defer wg.Done()
169			// This RPC blocks until NewAddress is called.
170			cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
171		}()
172	}
173	time.Sleep(50 * time.Millisecond)
174	cc.Close()
175	wg.Wait()
176}
177
178func (s) TestOneServerDownPickfirst(t *testing.T) {
179	r := manual.NewBuilderWithScheme("whatever")
180
181	numServers := 2
182	servers, scleanup := startServers(t, numServers, math.MaxInt32)
183	defer scleanup()
184
185	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
186	if err != nil {
187		t.Fatalf("failed to dial: %v", err)
188	}
189	defer cc.Close()
190	// The first RPC should fail because there's no address.
191	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
192	defer cancel()
193	req := "port"
194	var reply string
195	if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
196		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
197	}
198
199	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
200	// The second RPC should succeed with the first server.
201	for i := 0; i < 1000; i++ {
202		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
203			break
204		}
205		time.Sleep(time.Millisecond)
206	}
207
208	servers[0].stop()
209	for i := 0; i < 1000; i++ {
210		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
211			return
212		}
213		time.Sleep(time.Millisecond)
214	}
215	t.Fatalf("EmptyCall() = _, %v, want _, %v", err, servers[0].port)
216}
217
218func (s) TestAllServersDownPickfirst(t *testing.T) {
219	r := manual.NewBuilderWithScheme("whatever")
220
221	numServers := 2
222	servers, scleanup := startServers(t, numServers, math.MaxInt32)
223	defer scleanup()
224
225	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
226	if err != nil {
227		t.Fatalf("failed to dial: %v", err)
228	}
229	defer cc.Close()
230	// The first RPC should fail because there's no address.
231	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
232	defer cancel()
233	req := "port"
234	var reply string
235	if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
236		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
237	}
238
239	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
240	// The second RPC should succeed with the first server.
241	for i := 0; i < 1000; i++ {
242		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
243			break
244		}
245		time.Sleep(time.Millisecond)
246	}
247
248	for i := 0; i < numServers; i++ {
249		servers[i].stop()
250	}
251	for i := 0; i < 1000; i++ {
252		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); status.Code(err) == codes.Unavailable {
253			return
254		}
255		time.Sleep(time.Millisecond)
256	}
257	t.Fatalf("EmptyCall() = _, %v, want _, error with code unavailable", err)
258}
259
260func (s) TestAddressesRemovedPickfirst(t *testing.T) {
261	r := manual.NewBuilderWithScheme("whatever")
262
263	numServers := 3
264	servers, scleanup := startServers(t, numServers, math.MaxInt32)
265	defer scleanup()
266
267	cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
268	if err != nil {
269		t.Fatalf("failed to dial: %v", err)
270	}
271	defer cc.Close()
272	// The first RPC should fail because there's no address.
273	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
274	defer cancel()
275	req := "port"
276	var reply string
277	if err := cc.Invoke(ctx, "/foo/bar", &req, &reply); err == nil || status.Code(err) != codes.DeadlineExceeded {
278		t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
279	}
280
281	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}, {Addr: servers[2].addr}}})
282	for i := 0; i < 1000; i++ {
283		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
284			break
285		}
286		time.Sleep(time.Millisecond)
287	}
288	for i := 0; i < 20; i++ {
289		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
290			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
291		}
292		time.Sleep(10 * time.Millisecond)
293	}
294
295	// Remove server[0].
296	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
297	for i := 0; i < 1000; i++ {
298		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
299			break
300		}
301		time.Sleep(time.Millisecond)
302	}
303	for i := 0; i < 20; i++ {
304		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
305			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
306		}
307		time.Sleep(10 * time.Millisecond)
308	}
309
310	// Append server[0], nothing should change.
311	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}, {Addr: servers[0].addr}}})
312	for i := 0; i < 20; i++ {
313		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
314			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
315		}
316		time.Sleep(10 * time.Millisecond)
317	}
318
319	// Remove server[1].
320	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[2].addr}, {Addr: servers[0].addr}}})
321	for i := 0; i < 1000; i++ {
322		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
323			break
324		}
325		time.Sleep(time.Millisecond)
326	}
327	for i := 0; i < 20; i++ {
328		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
329			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
330		}
331		time.Sleep(10 * time.Millisecond)
332	}
333
334	// Remove server[2].
335	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
336	for i := 0; i < 1000; i++ {
337		if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
338			break
339		}
340		time.Sleep(time.Millisecond)
341	}
342	for i := 0; i < 20; i++ {
343		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
344			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
345		}
346		time.Sleep(10 * time.Millisecond)
347	}
348}
349