1/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"fmt"
24	"math"
25	"strconv"
26	"sync"
27	"testing"
28	"time"
29
30	"google.golang.org/grpc/codes"
31	_ "google.golang.org/grpc/grpclog/glogger"
32	"google.golang.org/grpc/internal/leakcheck"
33	"google.golang.org/grpc/naming"
34	"google.golang.org/grpc/status"
35
36	// V1 balancer tests use passthrough resolver instead of dns.
37	// TODO(bar) remove this when removing v1 balaner entirely.
38
39	_ "google.golang.org/grpc/resolver/passthrough"
40)
41
42func pickFirstBalancerV1(r naming.Resolver) Balancer {
43	return &pickFirst{&roundRobin{r: r}}
44}
45
46type testWatcher struct {
47	// the channel to receives name resolution updates
48	update chan *naming.Update
49	// the side channel to get to know how many updates in a batch
50	side chan int
51	// the channel to notify update injector that the update reading is done
52	readDone chan int
53}
54
55func (w *testWatcher) Next() (updates []*naming.Update, err error) {
56	n := <-w.side
57	if n == 0 {
58		return nil, fmt.Errorf("w.side is closed")
59	}
60	for i := 0; i < n; i++ {
61		u := <-w.update
62		if u != nil {
63			updates = append(updates, u)
64		}
65	}
66	w.readDone <- 0
67	return
68}
69
70func (w *testWatcher) Close() {
71	close(w.side)
72}
73
74// Inject naming resolution updates to the testWatcher.
75func (w *testWatcher) inject(updates []*naming.Update) {
76	w.side <- len(updates)
77	for _, u := range updates {
78		w.update <- u
79	}
80	<-w.readDone
81}
82
83type testNameResolver struct {
84	w    *testWatcher
85	addr string
86}
87
88func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
89	r.w = &testWatcher{
90		update:   make(chan *naming.Update, 1),
91		side:     make(chan int, 1),
92		readDone: make(chan int),
93	}
94	r.w.side <- 1
95	r.w.update <- &naming.Update{
96		Op:   naming.Add,
97		Addr: r.addr,
98	}
99	go func() {
100		<-r.w.readDone
101	}()
102	return r.w, nil
103}
104
105func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
106	var servers []*server
107	for i := 0; i < numServers; i++ {
108		s := newTestServer()
109		servers = append(servers, s)
110		go s.start(t, 0, maxStreams)
111		s.wait(t, 2*time.Second)
112	}
113	// Point to server[0]
114	addr := "localhost:" + servers[0].port
115	return servers, &testNameResolver{
116			addr: addr,
117		}, func() {
118			for i := 0; i < numServers; i++ {
119				servers[i].stop()
120			}
121		}
122}
123
124func TestNameDiscovery(t *testing.T) {
125	defer leakcheck.Check(t)
126	// Start 2 servers on 2 ports.
127	numServers := 2
128	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
129	defer cleanup()
130	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
131	if err != nil {
132		t.Fatalf("Failed to create ClientConn: %v", err)
133	}
134	defer cc.Close()
135	req := "port"
136	var reply string
137	if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
138		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
139	}
140	// Inject the name resolution change to remove servers[0] and add servers[1].
141	var updates []*naming.Update
142	updates = append(updates, &naming.Update{
143		Op:   naming.Delete,
144		Addr: "localhost:" + servers[0].port,
145	})
146	updates = append(updates, &naming.Update{
147		Op:   naming.Add,
148		Addr: "localhost:" + servers[1].port,
149	})
150	r.w.inject(updates)
151	// Loop until the rpcs in flight talks to servers[1].
152	for {
153		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
154			break
155		}
156		time.Sleep(10 * time.Millisecond)
157	}
158}
159
160func TestEmptyAddrs(t *testing.T) {
161	defer leakcheck.Check(t)
162	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
163	defer cleanup()
164	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
165	if err != nil {
166		t.Fatalf("Failed to create ClientConn: %v", err)
167	}
168	defer cc.Close()
169	var reply string
170	if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
171		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
172	}
173	// Inject name resolution change to remove the server so that there is no address
174	// available after that.
175	u := &naming.Update{
176		Op:   naming.Delete,
177		Addr: "localhost:" + servers[0].port,
178	}
179	r.w.inject([]*naming.Update{u})
180	// Loop until the above updates apply.
181	for {
182		time.Sleep(10 * time.Millisecond)
183		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
184		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
185			cancel()
186			break
187		}
188		cancel()
189	}
190}
191
192func TestRoundRobin(t *testing.T) {
193	defer leakcheck.Check(t)
194	// Start 3 servers on 3 ports.
195	numServers := 3
196	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
197	defer cleanup()
198	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
199	if err != nil {
200		t.Fatalf("Failed to create ClientConn: %v", err)
201	}
202	defer cc.Close()
203	// Add servers[1] to the service discovery.
204	u := &naming.Update{
205		Op:   naming.Add,
206		Addr: "localhost:" + servers[1].port,
207	}
208	r.w.inject([]*naming.Update{u})
209	req := "port"
210	var reply string
211	// Loop until servers[1] is up
212	for {
213		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
214			break
215		}
216		time.Sleep(10 * time.Millisecond)
217	}
218	// Add server2[2] to the service discovery.
219	u = &naming.Update{
220		Op:   naming.Add,
221		Addr: "localhost:" + servers[2].port,
222	}
223	r.w.inject([]*naming.Update{u})
224	// Loop until both servers[2] are up.
225	for {
226		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
227			break
228		}
229		time.Sleep(10 * time.Millisecond)
230	}
231	// Check the incoming RPCs served in a round-robin manner.
232	for i := 0; i < 10; i++ {
233		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port {
234			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
235		}
236	}
237}
238
239func TestCloseWithPendingRPC(t *testing.T) {
240	defer leakcheck.Check(t)
241	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
242	defer cleanup()
243	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
244	if err != nil {
245		t.Fatalf("Failed to create ClientConn: %v", err)
246	}
247	defer cc.Close()
248	var reply string
249	if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
250		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
251	}
252	// Remove the server.
253	updates := []*naming.Update{{
254		Op:   naming.Delete,
255		Addr: "localhost:" + servers[0].port,
256	}}
257	r.w.inject(updates)
258	// Loop until the above update applies.
259	for {
260		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
261		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
262			cancel()
263			break
264		}
265		time.Sleep(10 * time.Millisecond)
266		cancel()
267	}
268	// Issue 2 RPCs which should be completed with error status once cc is closed.
269	var wg sync.WaitGroup
270	wg.Add(2)
271	go func() {
272		defer wg.Done()
273		var reply string
274		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
275			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
276		}
277	}()
278	go func() {
279		defer wg.Done()
280		var reply string
281		time.Sleep(5 * time.Millisecond)
282		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
283			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
284		}
285	}()
286	time.Sleep(5 * time.Millisecond)
287	cc.Close()
288	wg.Wait()
289}
290
291func TestGetOnWaitChannel(t *testing.T) {
292	defer leakcheck.Check(t)
293	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
294	defer cleanup()
295	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
296	if err != nil {
297		t.Fatalf("Failed to create ClientConn: %v", err)
298	}
299	defer cc.Close()
300	// Remove all servers so that all upcoming RPCs will block on waitCh.
301	updates := []*naming.Update{{
302		Op:   naming.Delete,
303		Addr: "localhost:" + servers[0].port,
304	}}
305	r.w.inject(updates)
306	for {
307		var reply string
308		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
309		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
310			cancel()
311			break
312		}
313		cancel()
314		time.Sleep(10 * time.Millisecond)
315	}
316	var wg sync.WaitGroup
317	wg.Add(1)
318	go func() {
319		defer wg.Done()
320		var reply string
321		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
322			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
323		}
324	}()
325	// Add a connected server to get the above RPC through.
326	updates = []*naming.Update{{
327		Op:   naming.Add,
328		Addr: "localhost:" + servers[0].port,
329	}}
330	r.w.inject(updates)
331	// Wait until the above RPC succeeds.
332	wg.Wait()
333}
334
335func TestOneServerDown(t *testing.T) {
336	defer leakcheck.Check(t)
337	// Start 2 servers.
338	numServers := 2
339	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
340	defer cleanup()
341	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
342	if err != nil {
343		t.Fatalf("Failed to create ClientConn: %v", err)
344	}
345	defer cc.Close()
346	// Add servers[1] to the service discovery.
347	var updates []*naming.Update
348	updates = append(updates, &naming.Update{
349		Op:   naming.Add,
350		Addr: "localhost:" + servers[1].port,
351	})
352	r.w.inject(updates)
353	req := "port"
354	var reply string
355	// Loop until servers[1] is up
356	for {
357		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
358			break
359		}
360		time.Sleep(10 * time.Millisecond)
361	}
362
363	var wg sync.WaitGroup
364	numRPC := 100
365	sleepDuration := 10 * time.Millisecond
366	wg.Add(1)
367	go func() {
368		time.Sleep(sleepDuration)
369		// After sleepDuration, kill server[0].
370		servers[0].stop()
371		wg.Done()
372	}()
373
374	// All non-failfast RPCs should not block because there's at least one connection available.
375	for i := 0; i < numRPC; i++ {
376		wg.Add(1)
377		go func() {
378			time.Sleep(sleepDuration)
379			// After sleepDuration, invoke RPC.
380			// server[0] is killed around the same time to make it racy between balancer and gRPC internals.
381			cc.Invoke(context.Background(), "/foo/bar", &req, &reply, FailFast(false))
382			wg.Done()
383		}()
384	}
385	wg.Wait()
386}
387
388func TestOneAddressRemoval(t *testing.T) {
389	defer leakcheck.Check(t)
390	// Start 2 servers.
391	numServers := 2
392	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
393	defer cleanup()
394	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
395	if err != nil {
396		t.Fatalf("Failed to create ClientConn: %v", err)
397	}
398	defer cc.Close()
399	// Add servers[1] to the service discovery.
400	var updates []*naming.Update
401	updates = append(updates, &naming.Update{
402		Op:   naming.Add,
403		Addr: "localhost:" + servers[1].port,
404	})
405	r.w.inject(updates)
406	req := "port"
407	var reply string
408	// Loop until servers[1] is up
409	for {
410		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
411			break
412		}
413		time.Sleep(10 * time.Millisecond)
414	}
415
416	var wg sync.WaitGroup
417	numRPC := 100
418	sleepDuration := 10 * time.Millisecond
419	wg.Add(1)
420	go func() {
421		time.Sleep(sleepDuration)
422		// After sleepDuration, delete server[0].
423		var updates []*naming.Update
424		updates = append(updates, &naming.Update{
425			Op:   naming.Delete,
426			Addr: "localhost:" + servers[0].port,
427		})
428		r.w.inject(updates)
429		wg.Done()
430	}()
431
432	// All non-failfast RPCs should not fail because there's at least one connection available.
433	for i := 0; i < numRPC; i++ {
434		wg.Add(1)
435		go func() {
436			var reply string
437			time.Sleep(sleepDuration)
438			// After sleepDuration, invoke RPC.
439			// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
440			if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
441				t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
442			}
443			wg.Done()
444		}()
445	}
446	wg.Wait()
447}
448
449func checkServerUp(t *testing.T, currentServer *server) {
450	req := "port"
451	port := currentServer.port
452	cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
453	if err != nil {
454		t.Fatalf("Failed to create ClientConn: %v", err)
455	}
456	defer cc.Close()
457	var reply string
458	for {
459		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port {
460			break
461		}
462		time.Sleep(10 * time.Millisecond)
463	}
464}
465
466func TestPickFirstEmptyAddrs(t *testing.T) {
467	defer leakcheck.Check(t)
468	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
469	defer cleanup()
470	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
471	if err != nil {
472		t.Fatalf("Failed to create ClientConn: %v", err)
473	}
474	defer cc.Close()
475	var reply string
476	if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
477		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
478	}
479	// Inject name resolution change to remove the server so that there is no address
480	// available after that.
481	u := &naming.Update{
482		Op:   naming.Delete,
483		Addr: "localhost:" + servers[0].port,
484	}
485	r.w.inject([]*naming.Update{u})
486	// Loop until the above updates apply.
487	for {
488		time.Sleep(10 * time.Millisecond)
489		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
490		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
491			cancel()
492			break
493		}
494		cancel()
495	}
496}
497
498func TestPickFirstCloseWithPendingRPC(t *testing.T) {
499	defer leakcheck.Check(t)
500	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
501	defer cleanup()
502	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
503	if err != nil {
504		t.Fatalf("Failed to create ClientConn: %v", err)
505	}
506	defer cc.Close()
507	var reply string
508	if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
509		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
510	}
511	// Remove the server.
512	updates := []*naming.Update{{
513		Op:   naming.Delete,
514		Addr: "localhost:" + servers[0].port,
515	}}
516	r.w.inject(updates)
517	// Loop until the above update applies.
518	for {
519		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
520		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, FailFast(false)); status.Code(err) == codes.DeadlineExceeded {
521			cancel()
522			break
523		}
524		time.Sleep(10 * time.Millisecond)
525		cancel()
526	}
527	// Issue 2 RPCs which should be completed with error status once cc is closed.
528	var wg sync.WaitGroup
529	wg.Add(2)
530	go func() {
531		defer wg.Done()
532		var reply string
533		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
534			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
535		}
536	}()
537	go func() {
538		defer wg.Done()
539		var reply string
540		time.Sleep(5 * time.Millisecond)
541		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err == nil {
542			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
543		}
544	}()
545	time.Sleep(5 * time.Millisecond)
546	cc.Close()
547	wg.Wait()
548}
549
550func TestPickFirstOrderAllServerUp(t *testing.T) {
551	defer leakcheck.Check(t)
552	// Start 3 servers on 3 ports.
553	numServers := 3
554	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
555	defer cleanup()
556	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
557	if err != nil {
558		t.Fatalf("Failed to create ClientConn: %v", err)
559	}
560	defer cc.Close()
561	// Add servers[1] and [2] to the service discovery.
562	u := &naming.Update{
563		Op:   naming.Add,
564		Addr: "localhost:" + servers[1].port,
565	}
566	r.w.inject([]*naming.Update{u})
567
568	u = &naming.Update{
569		Op:   naming.Add,
570		Addr: "localhost:" + servers[2].port,
571	}
572	r.w.inject([]*naming.Update{u})
573
574	// Loop until all 3 servers are up
575	checkServerUp(t, servers[0])
576	checkServerUp(t, servers[1])
577	checkServerUp(t, servers[2])
578
579	// Check the incoming RPCs served in server[0]
580	req := "port"
581	var reply string
582	for i := 0; i < 20; i++ {
583		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
584			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
585		}
586		time.Sleep(10 * time.Millisecond)
587	}
588
589	// Delete server[0] in the balancer, the incoming RPCs served in server[1]
590	// For test addrconn, close server[0] instead
591	u = &naming.Update{
592		Op:   naming.Delete,
593		Addr: "localhost:" + servers[0].port,
594	}
595	r.w.inject([]*naming.Update{u})
596	// Loop until it changes to server[1]
597	for {
598		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
599			break
600		}
601		time.Sleep(10 * time.Millisecond)
602	}
603	for i := 0; i < 20; i++ {
604		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
605			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
606		}
607		time.Sleep(10 * time.Millisecond)
608	}
609
610	// Add server[0] back to the balancer, the incoming RPCs served in server[1]
611	// Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
612	u = &naming.Update{
613		Op:   naming.Add,
614		Addr: "localhost:" + servers[0].port,
615	}
616	r.w.inject([]*naming.Update{u})
617	for i := 0; i < 20; i++ {
618		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
619			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
620		}
621		time.Sleep(10 * time.Millisecond)
622	}
623
624	// Delete server[1] in the balancer, the incoming RPCs served in server[2]
625	u = &naming.Update{
626		Op:   naming.Delete,
627		Addr: "localhost:" + servers[1].port,
628	}
629	r.w.inject([]*naming.Update{u})
630	for {
631		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
632			break
633		}
634		time.Sleep(1 * time.Second)
635	}
636	for i := 0; i < 20; i++ {
637		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
638			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
639		}
640		time.Sleep(10 * time.Millisecond)
641	}
642
643	// Delete server[2] in the balancer, the incoming RPCs served in server[0]
644	u = &naming.Update{
645		Op:   naming.Delete,
646		Addr: "localhost:" + servers[2].port,
647	}
648	r.w.inject([]*naming.Update{u})
649	for {
650		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
651			break
652		}
653		time.Sleep(1 * time.Second)
654	}
655	for i := 0; i < 20; i++ {
656		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
657			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
658		}
659		time.Sleep(10 * time.Millisecond)
660	}
661}
662
663func TestPickFirstOrderOneServerDown(t *testing.T) {
664	defer leakcheck.Check(t)
665	// Start 3 servers on 3 ports.
666	numServers := 3
667	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
668	defer cleanup()
669	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
670	if err != nil {
671		t.Fatalf("Failed to create ClientConn: %v", err)
672	}
673	defer cc.Close()
674	// Add servers[1] and [2] to the service discovery.
675	u := &naming.Update{
676		Op:   naming.Add,
677		Addr: "localhost:" + servers[1].port,
678	}
679	r.w.inject([]*naming.Update{u})
680
681	u = &naming.Update{
682		Op:   naming.Add,
683		Addr: "localhost:" + servers[2].port,
684	}
685	r.w.inject([]*naming.Update{u})
686
687	// Loop until all 3 servers are up
688	checkServerUp(t, servers[0])
689	checkServerUp(t, servers[1])
690	checkServerUp(t, servers[2])
691
692	// Check the incoming RPCs served in server[0]
693	req := "port"
694	var reply string
695	for i := 0; i < 20; i++ {
696		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
697			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
698		}
699		time.Sleep(10 * time.Millisecond)
700	}
701
702	// server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
703	// {server[0] server[1] server[2]}
704	servers[0].stop()
705	// Loop until it changes to server[1]
706	for {
707		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
708			break
709		}
710		time.Sleep(10 * time.Millisecond)
711	}
712	for i := 0; i < 20; i++ {
713		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
714			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
715		}
716		time.Sleep(10 * time.Millisecond)
717	}
718
719	// up the server[0] back, the incoming RPCs served in server[1]
720	p, _ := strconv.Atoi(servers[0].port)
721	servers[0] = newTestServer()
722	go servers[0].start(t, p, math.MaxUint32)
723	defer servers[0].stop()
724	servers[0].wait(t, 2*time.Second)
725	checkServerUp(t, servers[0])
726
727	for i := 0; i < 20; i++ {
728		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
729			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
730		}
731		time.Sleep(10 * time.Millisecond)
732	}
733
734	// Delete server[1] in the balancer, the incoming RPCs served in server[0]
735	u = &naming.Update{
736		Op:   naming.Delete,
737		Addr: "localhost:" + servers[1].port,
738	}
739	r.w.inject([]*naming.Update{u})
740	for {
741		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
742			break
743		}
744		time.Sleep(1 * time.Second)
745	}
746	for i := 0; i < 20; i++ {
747		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
748			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
749		}
750		time.Sleep(10 * time.Millisecond)
751	}
752}
753
754func TestPickFirstOneAddressRemoval(t *testing.T) {
755	defer leakcheck.Check(t)
756	// Start 2 servers.
757	numServers := 2
758	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
759	defer cleanup()
760	cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
761	if err != nil {
762		t.Fatalf("Failed to create ClientConn: %v", err)
763	}
764	defer cc.Close()
765	// Add servers[1] to the service discovery.
766	var updates []*naming.Update
767	updates = append(updates, &naming.Update{
768		Op:   naming.Add,
769		Addr: "localhost:" + servers[1].port,
770	})
771	r.w.inject(updates)
772
773	// Create a new cc to Loop until servers[1] is up
774	checkServerUp(t, servers[0])
775	checkServerUp(t, servers[1])
776
777	var wg sync.WaitGroup
778	numRPC := 100
779	sleepDuration := 10 * time.Millisecond
780	wg.Add(1)
781	go func() {
782		time.Sleep(sleepDuration)
783		// After sleepDuration, delete server[0].
784		var updates []*naming.Update
785		updates = append(updates, &naming.Update{
786			Op:   naming.Delete,
787			Addr: "localhost:" + servers[0].port,
788		})
789		r.w.inject(updates)
790		wg.Done()
791	}()
792
793	// All non-failfast RPCs should not fail because there's at least one connection available.
794	for i := 0; i < numRPC; i++ {
795		wg.Add(1)
796		go func() {
797			var reply string
798			time.Sleep(sleepDuration)
799			// After sleepDuration, invoke RPC.
800			// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
801			if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, FailFast(false)); err != nil {
802				t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
803			}
804			wg.Done()
805		}()
806	}
807	wg.Wait()
808}
809