1/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"fmt"
23	"math"
24	"strconv"
25	"sync"
26	"testing"
27	"time"
28
29	"golang.org/x/net/context"
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/naming"
32)
33
34type testWatcher struct {
35	// the channel to receives name resolution updates
36	update chan *naming.Update
37	// the side channel to get to know how many updates in a batch
38	side chan int
39	// the channel to notifiy update injector that the update reading is done
40	readDone chan int
41}
42
43func (w *testWatcher) Next() (updates []*naming.Update, err error) {
44	n := <-w.side
45	if n == 0 {
46		return nil, fmt.Errorf("w.side is closed")
47	}
48	for i := 0; i < n; i++ {
49		u := <-w.update
50		if u != nil {
51			updates = append(updates, u)
52		}
53	}
54	w.readDone <- 0
55	return
56}
57
58func (w *testWatcher) Close() {
59}
60
61// Inject naming resolution updates to the testWatcher.
62func (w *testWatcher) inject(updates []*naming.Update) {
63	w.side <- len(updates)
64	for _, u := range updates {
65		w.update <- u
66	}
67	<-w.readDone
68}
69
70type testNameResolver struct {
71	w    *testWatcher
72	addr string
73}
74
75func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
76	r.w = &testWatcher{
77		update:   make(chan *naming.Update, 1),
78		side:     make(chan int, 1),
79		readDone: make(chan int),
80	}
81	r.w.side <- 1
82	r.w.update <- &naming.Update{
83		Op:   naming.Add,
84		Addr: r.addr,
85	}
86	go func() {
87		<-r.w.readDone
88	}()
89	return r.w, nil
90}
91
92func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver) {
93	var servers []*server
94	for i := 0; i < numServers; i++ {
95		s := newTestServer()
96		servers = append(servers, s)
97		go s.start(t, 0, maxStreams)
98		s.wait(t, 2*time.Second)
99	}
100	// Point to server[0]
101	addr := "localhost:" + servers[0].port
102	return servers, &testNameResolver{
103		addr: addr,
104	}
105}
106
107func TestNameDiscovery(t *testing.T) {
108	// Start 2 servers on 2 ports.
109	numServers := 2
110	servers, r := startServers(t, numServers, math.MaxUint32)
111	cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
112	if err != nil {
113		t.Fatalf("Failed to create ClientConn: %v", err)
114	}
115	req := "port"
116	var reply string
117	if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
118		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
119	}
120	// Inject the name resolution change to remove servers[0] and add servers[1].
121	var updates []*naming.Update
122	updates = append(updates, &naming.Update{
123		Op:   naming.Delete,
124		Addr: "localhost:" + servers[0].port,
125	})
126	updates = append(updates, &naming.Update{
127		Op:   naming.Add,
128		Addr: "localhost:" + servers[1].port,
129	})
130	r.w.inject(updates)
131	// Loop until the rpcs in flight talks to servers[1].
132	for {
133		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
134			break
135		}
136		time.Sleep(10 * time.Millisecond)
137	}
138	cc.Close()
139	for i := 0; i < numServers; i++ {
140		servers[i].stop()
141	}
142}
143
144func TestEmptyAddrs(t *testing.T) {
145	servers, r := startServers(t, 1, math.MaxUint32)
146	cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
147	if err != nil {
148		t.Fatalf("Failed to create ClientConn: %v", err)
149	}
150	var reply string
151	if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
152		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
153	}
154	// Inject name resolution change to remove the server so that there is no address
155	// available after that.
156	u := &naming.Update{
157		Op:   naming.Delete,
158		Addr: "localhost:" + servers[0].port,
159	}
160	r.w.inject([]*naming.Update{u})
161	// Loop until the above updates apply.
162	for {
163		time.Sleep(10 * time.Millisecond)
164		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
165		if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
166			cancel()
167			break
168		}
169		cancel()
170	}
171	cc.Close()
172	servers[0].stop()
173}
174
175func TestRoundRobin(t *testing.T) {
176	// Start 3 servers on 3 ports.
177	numServers := 3
178	servers, r := startServers(t, numServers, math.MaxUint32)
179	cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
180	if err != nil {
181		t.Fatalf("Failed to create ClientConn: %v", err)
182	}
183	// Add servers[1] to the service discovery.
184	u := &naming.Update{
185		Op:   naming.Add,
186		Addr: "localhost:" + servers[1].port,
187	}
188	r.w.inject([]*naming.Update{u})
189	req := "port"
190	var reply string
191	// Loop until servers[1] is up
192	for {
193		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
194			break
195		}
196		time.Sleep(10 * time.Millisecond)
197	}
198	// Add server2[2] to the service discovery.
199	u = &naming.Update{
200		Op:   naming.Add,
201		Addr: "localhost:" + servers[2].port,
202	}
203	r.w.inject([]*naming.Update{u})
204	// Loop until both servers[2] are up.
205	for {
206		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
207			break
208		}
209		time.Sleep(10 * time.Millisecond)
210	}
211	// Check the incoming RPCs served in a round-robin manner.
212	for i := 0; i < 10; i++ {
213		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[i%numServers].port {
214			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
215		}
216	}
217	cc.Close()
218	for i := 0; i < numServers; i++ {
219		servers[i].stop()
220	}
221}
222
223func TestCloseWithPendingRPC(t *testing.T) {
224	servers, r := startServers(t, 1, math.MaxUint32)
225	cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
226	if err != nil {
227		t.Fatalf("Failed to create ClientConn: %v", err)
228	}
229	var reply string
230	if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
231		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
232	}
233	// Remove the server.
234	updates := []*naming.Update{{
235		Op:   naming.Delete,
236		Addr: "localhost:" + servers[0].port,
237	}}
238	r.w.inject(updates)
239	// Loop until the above update applies.
240	for {
241		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
242		if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
243			cancel()
244			break
245		}
246		time.Sleep(10 * time.Millisecond)
247		cancel()
248	}
249	// Issue 2 RPCs which should be completed with error status once cc is closed.
250	var wg sync.WaitGroup
251	wg.Add(2)
252	go func() {
253		defer wg.Done()
254		var reply string
255		if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
256			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
257		}
258	}()
259	go func() {
260		defer wg.Done()
261		var reply string
262		time.Sleep(5 * time.Millisecond)
263		if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
264			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
265		}
266	}()
267	time.Sleep(5 * time.Millisecond)
268	cc.Close()
269	wg.Wait()
270	servers[0].stop()
271}
272
273func TestGetOnWaitChannel(t *testing.T) {
274	servers, r := startServers(t, 1, math.MaxUint32)
275	cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
276	if err != nil {
277		t.Fatalf("Failed to create ClientConn: %v", err)
278	}
279	// Remove all servers so that all upcoming RPCs will block on waitCh.
280	updates := []*naming.Update{{
281		Op:   naming.Delete,
282		Addr: "localhost:" + servers[0].port,
283	}}
284	r.w.inject(updates)
285	for {
286		var reply string
287		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
288		if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
289			cancel()
290			break
291		}
292		cancel()
293		time.Sleep(10 * time.Millisecond)
294	}
295	var wg sync.WaitGroup
296	wg.Add(1)
297	go func() {
298		defer wg.Done()
299		var reply string
300		if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
301			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
302		}
303	}()
304	// Add a connected server to get the above RPC through.
305	updates = []*naming.Update{{
306		Op:   naming.Add,
307		Addr: "localhost:" + servers[0].port,
308	}}
309	r.w.inject(updates)
310	// Wait until the above RPC succeeds.
311	wg.Wait()
312	cc.Close()
313	servers[0].stop()
314}
315
316func TestOneServerDown(t *testing.T) {
317	// Start 2 servers.
318	numServers := 2
319	servers, r := startServers(t, numServers, math.MaxUint32)
320	cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
321	if err != nil {
322		t.Fatalf("Failed to create ClientConn: %v", err)
323	}
324	// Add servers[1] to the service discovery.
325	var updates []*naming.Update
326	updates = append(updates, &naming.Update{
327		Op:   naming.Add,
328		Addr: "localhost:" + servers[1].port,
329	})
330	r.w.inject(updates)
331	req := "port"
332	var reply string
333	// Loop until servers[1] is up
334	for {
335		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
336			break
337		}
338		time.Sleep(10 * time.Millisecond)
339	}
340
341	var wg sync.WaitGroup
342	numRPC := 100
343	sleepDuration := 10 * time.Millisecond
344	wg.Add(1)
345	go func() {
346		time.Sleep(sleepDuration)
347		// After sleepDuration, kill server[0].
348		servers[0].stop()
349		wg.Done()
350	}()
351
352	// All non-failfast RPCs should not block because there's at least one connection available.
353	for i := 0; i < numRPC; i++ {
354		wg.Add(1)
355		go func() {
356			time.Sleep(sleepDuration)
357			// After sleepDuration, invoke RPC.
358			// server[0] is killed around the same time to make it racy between balancer and gRPC internals.
359			Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false))
360			wg.Done()
361		}()
362	}
363	wg.Wait()
364	cc.Close()
365	for i := 0; i < numServers; i++ {
366		servers[i].stop()
367	}
368}
369
370func TestOneAddressRemoval(t *testing.T) {
371	// Start 2 servers.
372	numServers := 2
373	servers, r := startServers(t, numServers, math.MaxUint32)
374	cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
375	if err != nil {
376		t.Fatalf("Failed to create ClientConn: %v", err)
377	}
378	// Add servers[1] to the service discovery.
379	var updates []*naming.Update
380	updates = append(updates, &naming.Update{
381		Op:   naming.Add,
382		Addr: "localhost:" + servers[1].port,
383	})
384	r.w.inject(updates)
385	req := "port"
386	var reply string
387	// Loop until servers[1] is up
388	for {
389		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
390			break
391		}
392		time.Sleep(10 * time.Millisecond)
393	}
394
395	var wg sync.WaitGroup
396	numRPC := 100
397	sleepDuration := 10 * time.Millisecond
398	wg.Add(1)
399	go func() {
400		time.Sleep(sleepDuration)
401		// After sleepDuration, delete server[0].
402		var updates []*naming.Update
403		updates = append(updates, &naming.Update{
404			Op:   naming.Delete,
405			Addr: "localhost:" + servers[0].port,
406		})
407		r.w.inject(updates)
408		wg.Done()
409	}()
410
411	// All non-failfast RPCs should not fail because there's at least one connection available.
412	for i := 0; i < numRPC; i++ {
413		wg.Add(1)
414		go func() {
415			var reply string
416			time.Sleep(sleepDuration)
417			// After sleepDuration, invoke RPC.
418			// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
419			if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
420				t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
421			}
422			wg.Done()
423		}()
424	}
425	wg.Wait()
426	cc.Close()
427	for i := 0; i < numServers; i++ {
428		servers[i].stop()
429	}
430}
431
432func checkServerUp(t *testing.T, currentServer *server) {
433	req := "port"
434	port := currentServer.port
435	cc, err := Dial("localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
436	if err != nil {
437		t.Fatalf("Failed to create ClientConn: %v", err)
438	}
439	var reply string
440	for {
441		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == port {
442			break
443		}
444		time.Sleep(10 * time.Millisecond)
445	}
446	cc.Close()
447}
448
449func TestPickFirstEmptyAddrs(t *testing.T) {
450	servers, r := startServers(t, 1, math.MaxUint32)
451	defer servers[0].stop()
452	cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
453	if err != nil {
454		t.Fatalf("Failed to create ClientConn: %v", err)
455	}
456	defer cc.Close()
457	var reply string
458	if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
459		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
460	}
461	// Inject name resolution change to remove the server so that there is no address
462	// available after that.
463	u := &naming.Update{
464		Op:   naming.Delete,
465		Addr: "localhost:" + servers[0].port,
466	}
467	r.w.inject([]*naming.Update{u})
468	// Loop until the above updates apply.
469	for {
470		time.Sleep(10 * time.Millisecond)
471		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
472		if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
473			cancel()
474			break
475		}
476		cancel()
477	}
478}
479
480func TestPickFirstCloseWithPendingRPC(t *testing.T) {
481	servers, r := startServers(t, 1, math.MaxUint32)
482	defer servers[0].stop()
483	cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
484	if err != nil {
485		t.Fatalf("Failed to create ClientConn: %v", err)
486	}
487	var reply string
488	if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
489		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
490	}
491	// Remove the server.
492	updates := []*naming.Update{{
493		Op:   naming.Delete,
494		Addr: "localhost:" + servers[0].port,
495	}}
496	r.w.inject(updates)
497	// Loop until the above update applies.
498	for {
499		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
500		if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
501			cancel()
502			break
503		}
504		time.Sleep(10 * time.Millisecond)
505		cancel()
506	}
507	// Issue 2 RPCs which should be completed with error status once cc is closed.
508	var wg sync.WaitGroup
509	wg.Add(2)
510	go func() {
511		defer wg.Done()
512		var reply string
513		if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
514			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
515		}
516	}()
517	go func() {
518		defer wg.Done()
519		var reply string
520		time.Sleep(5 * time.Millisecond)
521		if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
522			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
523		}
524	}()
525	time.Sleep(5 * time.Millisecond)
526	cc.Close()
527	wg.Wait()
528}
529
530func TestPickFirstOrderAllServerUp(t *testing.T) {
531	// Start 3 servers on 3 ports.
532	numServers := 3
533	servers, r := startServers(t, numServers, math.MaxUint32)
534	for i := 0; i < numServers; i++ {
535		defer servers[i].stop()
536	}
537	cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
538	if err != nil {
539		t.Fatalf("Failed to create ClientConn: %v", err)
540	}
541	defer cc.Close()
542	// Add servers[1] and [2] to the service discovery.
543	u := &naming.Update{
544		Op:   naming.Add,
545		Addr: "localhost:" + servers[1].port,
546	}
547	r.w.inject([]*naming.Update{u})
548
549	u = &naming.Update{
550		Op:   naming.Add,
551		Addr: "localhost:" + servers[2].port,
552	}
553	r.w.inject([]*naming.Update{u})
554
555	// Loop until all 3 servers are up
556	checkServerUp(t, servers[0])
557	checkServerUp(t, servers[1])
558	checkServerUp(t, servers[2])
559
560	// Check the incoming RPCs served in server[0]
561	req := "port"
562	var reply string
563	for i := 0; i < 20; i++ {
564		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
565			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
566		}
567		time.Sleep(10 * time.Millisecond)
568	}
569
570	// Delete server[0] in the balancer, the incoming RPCs served in server[1]
571	// For test addrconn, close server[0] instead
572	u = &naming.Update{
573		Op:   naming.Delete,
574		Addr: "localhost:" + servers[0].port,
575	}
576	r.w.inject([]*naming.Update{u})
577	// Loop until it changes to server[1]
578	for {
579		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
580			break
581		}
582		time.Sleep(10 * time.Millisecond)
583	}
584	for i := 0; i < 20; i++ {
585		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
586			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
587		}
588		time.Sleep(10 * time.Millisecond)
589	}
590
591	// Add server[0] back to the balancer, the incoming RPCs served in server[1]
592	// Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
593	u = &naming.Update{
594		Op:   naming.Add,
595		Addr: "localhost:" + servers[0].port,
596	}
597	r.w.inject([]*naming.Update{u})
598	for i := 0; i < 20; i++ {
599		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
600			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
601		}
602		time.Sleep(10 * time.Millisecond)
603	}
604
605	// Delete server[1] in the balancer, the incoming RPCs served in server[2]
606	u = &naming.Update{
607		Op:   naming.Delete,
608		Addr: "localhost:" + servers[1].port,
609	}
610	r.w.inject([]*naming.Update{u})
611	for {
612		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
613			break
614		}
615		time.Sleep(1 * time.Second)
616	}
617	for i := 0; i < 20; i++ {
618		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[2].port {
619			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
620		}
621		time.Sleep(10 * time.Millisecond)
622	}
623
624	// Delete server[2] in the balancer, the incoming RPCs served in server[0]
625	u = &naming.Update{
626		Op:   naming.Delete,
627		Addr: "localhost:" + servers[2].port,
628	}
629	r.w.inject([]*naming.Update{u})
630	for {
631		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
632			break
633		}
634		time.Sleep(1 * time.Second)
635	}
636	for i := 0; i < 20; i++ {
637		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
638			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
639		}
640		time.Sleep(10 * time.Millisecond)
641	}
642}
643
644func TestPickFirstOrderOneServerDown(t *testing.T) {
645	// Start 3 servers on 3 ports.
646	numServers := 3
647	servers, r := startServers(t, numServers, math.MaxUint32)
648	for i := 0; i < numServers; i++ {
649		defer servers[i].stop()
650	}
651	cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
652	if err != nil {
653		t.Fatalf("Failed to create ClientConn: %v", err)
654	}
655	defer cc.Close()
656	// Add servers[1] and [2] to the service discovery.
657	u := &naming.Update{
658		Op:   naming.Add,
659		Addr: "localhost:" + servers[1].port,
660	}
661	r.w.inject([]*naming.Update{u})
662
663	u = &naming.Update{
664		Op:   naming.Add,
665		Addr: "localhost:" + servers[2].port,
666	}
667	r.w.inject([]*naming.Update{u})
668
669	// Loop until all 3 servers are up
670	checkServerUp(t, servers[0])
671	checkServerUp(t, servers[1])
672	checkServerUp(t, servers[2])
673
674	// Check the incoming RPCs served in server[0]
675	req := "port"
676	var reply string
677	for i := 0; i < 20; i++ {
678		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
679			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
680		}
681		time.Sleep(10 * time.Millisecond)
682	}
683
684	// server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
685	// {server[0] server[1] server[2]}
686	servers[0].stop()
687	// Loop until it changes to server[1]
688	for {
689		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
690			break
691		}
692		time.Sleep(10 * time.Millisecond)
693	}
694	for i := 0; i < 20; i++ {
695		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
696			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
697		}
698		time.Sleep(10 * time.Millisecond)
699	}
700
701	// up the server[0] back, the incoming RPCs served in server[1]
702	p, _ := strconv.Atoi(servers[0].port)
703	servers[0] = newTestServer()
704	go servers[0].start(t, p, math.MaxUint32)
705	servers[0].wait(t, 2*time.Second)
706	checkServerUp(t, servers[0])
707
708	for i := 0; i < 20; i++ {
709		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
710			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
711		}
712		time.Sleep(10 * time.Millisecond)
713	}
714
715	// Delete server[1] in the balancer, the incoming RPCs served in server[0]
716	u = &naming.Update{
717		Op:   naming.Delete,
718		Addr: "localhost:" + servers[1].port,
719	}
720	r.w.inject([]*naming.Update{u})
721	for {
722		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
723			break
724		}
725		time.Sleep(1 * time.Second)
726	}
727	for i := 0; i < 20; i++ {
728		if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
729			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
730		}
731		time.Sleep(10 * time.Millisecond)
732	}
733}
734
735func TestPickFirstOneAddressRemoval(t *testing.T) {
736	// Start 2 servers.
737	numServers := 2
738	servers, r := startServers(t, numServers, math.MaxUint32)
739	for i := 0; i < numServers; i++ {
740		defer servers[i].stop()
741	}
742	cc, err := Dial("localhost:"+servers[0].port, WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
743	if err != nil {
744		t.Fatalf("Failed to create ClientConn: %v", err)
745	}
746	defer cc.Close()
747	// Add servers[1] to the service discovery.
748	var updates []*naming.Update
749	updates = append(updates, &naming.Update{
750		Op:   naming.Add,
751		Addr: "localhost:" + servers[1].port,
752	})
753	r.w.inject(updates)
754
755	// Create a new cc to Loop until servers[1] is up
756	checkServerUp(t, servers[0])
757	checkServerUp(t, servers[1])
758
759	var wg sync.WaitGroup
760	numRPC := 100
761	sleepDuration := 10 * time.Millisecond
762	wg.Add(1)
763	go func() {
764		time.Sleep(sleepDuration)
765		// After sleepDuration, delete server[0].
766		var updates []*naming.Update
767		updates = append(updates, &naming.Update{
768			Op:   naming.Delete,
769			Addr: "localhost:" + servers[0].port,
770		})
771		r.w.inject(updates)
772		wg.Done()
773	}()
774
775	// All non-failfast RPCs should not fail because there's at least one connection available.
776	for i := 0; i < numRPC; i++ {
777		wg.Add(1)
778		go func() {
779			var reply string
780			time.Sleep(sleepDuration)
781			// After sleepDuration, invoke RPC.
782			// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
783			if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
784				t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
785			}
786			wg.Done()
787		}()
788	}
789	wg.Wait()
790}
791