1/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"fmt"
24	"math"
25	"strconv"
26	"sync"
27	"testing"
28	"time"
29
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/naming"
32	"google.golang.org/grpc/status"
33)
34
35func pickFirstBalancerV1(r naming.Resolver) Balancer {
36	return &pickFirst{&roundRobin{r: r}}
37}
38
39type testWatcher struct {
40	// the channel to receives name resolution updates
41	update chan *naming.Update
42	// the side channel to get to know how many updates in a batch
43	side chan int
44	// the channel to notify update injector that the update reading is done
45	readDone chan int
46}
47
48func (w *testWatcher) Next() (updates []*naming.Update, err error) {
49	n := <-w.side
50	if n == 0 {
51		return nil, fmt.Errorf("w.side is closed")
52	}
53	for i := 0; i < n; i++ {
54		u := <-w.update
55		if u != nil {
56			updates = append(updates, u)
57		}
58	}
59	w.readDone <- 0
60	return
61}
62
63func (w *testWatcher) Close() {
64	close(w.side)
65}
66
67// Inject naming resolution updates to the testWatcher.
68func (w *testWatcher) inject(updates []*naming.Update) {
69	w.side <- len(updates)
70	for _, u := range updates {
71		w.update <- u
72	}
73	<-w.readDone
74}
75
76type testNameResolver struct {
77	w    *testWatcher
78	addr string
79}
80
81func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
82	r.w = &testWatcher{
83		update:   make(chan *naming.Update, 1),
84		side:     make(chan int, 1),
85		readDone: make(chan int),
86	}
87	r.w.side <- 1
88	r.w.update <- &naming.Update{
89		Op:   naming.Add,
90		Addr: r.addr,
91	}
92	go func() {
93		<-r.w.readDone
94	}()
95	return r.w, nil
96}
97
98func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
99	var servers []*server
100	for i := 0; i < numServers; i++ {
101		s := newTestServer()
102		servers = append(servers, s)
103		go s.start(t, 0, maxStreams)
104		s.wait(t, 2*time.Second)
105	}
106	// Point to server[0]
107	addr := "localhost:" + servers[0].port
108	return servers, &testNameResolver{
109			addr: addr,
110		}, func() {
111			for i := 0; i < numServers; i++ {
112				servers[i].stop()
113			}
114		}
115}
116
117func (s) TestNameDiscovery(t *testing.T) {
118	// Start 2 servers on 2 ports.
119	numServers := 2
120	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
121	defer cleanup()
122	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
123	if err != nil {
124		t.Fatalf("Failed to create ClientConn: %v", err)
125	}
126	defer cc.Close()
127	req := "port"
128	var reply string
129	if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
130		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
131	}
132	// Inject the name resolution change to remove servers[0] and add servers[1].
133	var updates []*naming.Update
134	updates = append(updates, &naming.Update{
135		Op:   naming.Delete,
136		Addr: "localhost:" + servers[0].port,
137	})
138	updates = append(updates, &naming.Update{
139		Op:   naming.Add,
140		Addr: "localhost:" + servers[1].port,
141	})
142	r.w.inject(updates)
143	// Loop until the rpcs in flight talks to servers[1].
144	for {
145		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
146			break
147		}
148		time.Sleep(10 * time.Millisecond)
149	}
150}
151
152func (s) TestEmptyAddrs(t *testing.T) {
153	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
154	defer cleanup()
155	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
156	if err != nil {
157		t.Fatalf("Failed to create ClientConn: %v", err)
158	}
159	defer cc.Close()
160	var reply string
161	if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
162		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
163	}
164	// Inject name resolution change to remove the server so that there is no address
165	// available after that.
166	u := &naming.Update{
167		Op:   naming.Delete,
168		Addr: "localhost:" + servers[0].port,
169	}
170	r.w.inject([]*naming.Update{u})
171	// Loop until the above updates apply.
172	for {
173		time.Sleep(10 * time.Millisecond)
174		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
175		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
176			cancel()
177			break
178		}
179		cancel()
180	}
181}
182
183func (s) TestRoundRobin(t *testing.T) {
184	// Start 3 servers on 3 ports.
185	numServers := 3
186	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
187	defer cleanup()
188	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
189	if err != nil {
190		t.Fatalf("Failed to create ClientConn: %v", err)
191	}
192	defer cc.Close()
193	// Add servers[1] to the service discovery.
194	u := &naming.Update{
195		Op:   naming.Add,
196		Addr: "localhost:" + servers[1].port,
197	}
198	r.w.inject([]*naming.Update{u})
199	req := "port"
200	var reply string
201	// Loop until servers[1] is up
202	for {
203		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
204			break
205		}
206		time.Sleep(10 * time.Millisecond)
207	}
208	// Add server2[2] to the service discovery.
209	u = &naming.Update{
210		Op:   naming.Add,
211		Addr: "localhost:" + servers[2].port,
212	}
213	r.w.inject([]*naming.Update{u})
214	// Loop until both servers[2] are up.
215	for {
216		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
217			break
218		}
219		time.Sleep(10 * time.Millisecond)
220	}
221	// Check the incoming RPCs served in a round-robin manner.
222	for i := 0; i < 10; i++ {
223		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port {
224			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
225		}
226	}
227}
228
229func (s) TestCloseWithPendingRPC(t *testing.T) {
230	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
231	defer cleanup()
232	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
233	if err != nil {
234		t.Fatalf("Failed to create ClientConn: %v", err)
235	}
236	defer cc.Close()
237	var reply string
238	if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
239		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
240	}
241	// Remove the server.
242	updates := []*naming.Update{{
243		Op:   naming.Delete,
244		Addr: "localhost:" + servers[0].port,
245	}}
246	r.w.inject(updates)
247	// Loop until the above update applies.
248	for {
249		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
250		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
251			cancel()
252			break
253		}
254		time.Sleep(10 * time.Millisecond)
255		cancel()
256	}
257	// Issue 2 RPCs which should be completed with error status once cc is closed.
258	var wg sync.WaitGroup
259	wg.Add(2)
260	go func() {
261		defer wg.Done()
262		var reply string
263		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
264			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
265		}
266	}()
267	go func() {
268		defer wg.Done()
269		var reply string
270		time.Sleep(5 * time.Millisecond)
271		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
272			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
273		}
274	}()
275	time.Sleep(5 * time.Millisecond)
276	cc.Close()
277	wg.Wait()
278}
279
280func (s) TestGetOnWaitChannel(t *testing.T) {
281	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
282	defer cleanup()
283	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
284	if err != nil {
285		t.Fatalf("Failed to create ClientConn: %v", err)
286	}
287	defer cc.Close()
288	// Remove all servers so that all upcoming RPCs will block on waitCh.
289	updates := []*naming.Update{{
290		Op:   naming.Delete,
291		Addr: "localhost:" + servers[0].port,
292	}}
293	r.w.inject(updates)
294	for {
295		var reply string
296		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
297		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
298			cancel()
299			break
300		}
301		cancel()
302		time.Sleep(10 * time.Millisecond)
303	}
304	var wg sync.WaitGroup
305	wg.Add(1)
306	go func() {
307		defer wg.Done()
308		var reply string
309		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
310			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
311		}
312	}()
313	// Add a connected server to get the above RPC through.
314	updates = []*naming.Update{{
315		Op:   naming.Add,
316		Addr: "localhost:" + servers[0].port,
317	}}
318	r.w.inject(updates)
319	// Wait until the above RPC succeeds.
320	wg.Wait()
321}
322
323func (s) TestOneServerDown(t *testing.T) {
324	// Start 2 servers.
325	numServers := 2
326	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
327	defer cleanup()
328	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
329	if err != nil {
330		t.Fatalf("Failed to create ClientConn: %v", err)
331	}
332	defer cc.Close()
333	// Add servers[1] to the service discovery.
334	var updates []*naming.Update
335	updates = append(updates, &naming.Update{
336		Op:   naming.Add,
337		Addr: "localhost:" + servers[1].port,
338	})
339	r.w.inject(updates)
340	req := "port"
341	var reply string
342	// Loop until servers[1] is up
343	for {
344		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
345			break
346		}
347		time.Sleep(10 * time.Millisecond)
348	}
349
350	var wg sync.WaitGroup
351	numRPC := 100
352	sleepDuration := 10 * time.Millisecond
353	wg.Add(1)
354	go func() {
355		time.Sleep(sleepDuration)
356		// After sleepDuration, kill server[0].
357		servers[0].stop()
358		wg.Done()
359	}()
360
361	// All non-failfast RPCs should not block because there's at least one connection available.
362	for i := 0; i < numRPC; i++ {
363		wg.Add(1)
364		go func() {
365			time.Sleep(sleepDuration)
366			// After sleepDuration, invoke RPC.
367			// server[0] is killed around the same time to make it racy between balancer and gRPC internals.
368			cc.Invoke(context.Background(), "/foo/bar", &req, &reply, WaitForReady(true))
369			wg.Done()
370		}()
371	}
372	wg.Wait()
373}
374
375func (s) TestOneAddressRemoval(t *testing.T) {
376	// Start 2 servers.
377	numServers := 2
378	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
379	defer cleanup()
380	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
381	if err != nil {
382		t.Fatalf("Failed to create ClientConn: %v", err)
383	}
384	defer cc.Close()
385	// Add servers[1] to the service discovery.
386	var updates []*naming.Update
387	updates = append(updates, &naming.Update{
388		Op:   naming.Add,
389		Addr: "localhost:" + servers[1].port,
390	})
391	r.w.inject(updates)
392	req := "port"
393	var reply string
394	// Loop until servers[1] is up
395	for {
396		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
397			break
398		}
399		time.Sleep(10 * time.Millisecond)
400	}
401
402	var wg sync.WaitGroup
403	numRPC := 100
404	sleepDuration := 10 * time.Millisecond
405	wg.Add(1)
406	go func() {
407		time.Sleep(sleepDuration)
408		// After sleepDuration, delete server[0].
409		var updates []*naming.Update
410		updates = append(updates, &naming.Update{
411			Op:   naming.Delete,
412			Addr: "localhost:" + servers[0].port,
413		})
414		r.w.inject(updates)
415		wg.Done()
416	}()
417
418	// All non-failfast RPCs should not fail because there's at least one connection available.
419	for i := 0; i < numRPC; i++ {
420		wg.Add(1)
421		go func() {
422			var reply string
423			time.Sleep(sleepDuration)
424			// After sleepDuration, invoke RPC.
425			// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
426			if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
427				t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
428			}
429			wg.Done()
430		}()
431	}
432	wg.Wait()
433}
434
435func checkServerUp(t *testing.T, currentServer *server) {
436	req := "port"
437	port := currentServer.port
438	cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
439	if err != nil {
440		t.Fatalf("Failed to create ClientConn: %v", err)
441	}
442	defer cc.Close()
443	var reply string
444	for {
445		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port {
446			break
447		}
448		time.Sleep(10 * time.Millisecond)
449	}
450}
451
452func (s) TestPickFirstEmptyAddrs(t *testing.T) {
453	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
454	defer cleanup()
455	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
456	if err != nil {
457		t.Fatalf("Failed to create ClientConn: %v", err)
458	}
459	defer cc.Close()
460	var reply string
461	if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
462		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
463	}
464	// Inject name resolution change to remove the server so that there is no address
465	// available after that.
466	u := &naming.Update{
467		Op:   naming.Delete,
468		Addr: "localhost:" + servers[0].port,
469	}
470	r.w.inject([]*naming.Update{u})
471	// Loop until the above updates apply.
472	for {
473		time.Sleep(10 * time.Millisecond)
474		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
475		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
476			cancel()
477			break
478		}
479		cancel()
480	}
481}
482
483func (s) TestPickFirstCloseWithPendingRPC(t *testing.T) {
484	servers, r, cleanup := startServers(t, 1, math.MaxUint32)
485	defer cleanup()
486	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
487	if err != nil {
488		t.Fatalf("Failed to create ClientConn: %v", err)
489	}
490	defer cc.Close()
491	var reply string
492	if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
493		t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
494	}
495	// Remove the server.
496	updates := []*naming.Update{{
497		Op:   naming.Delete,
498		Addr: "localhost:" + servers[0].port,
499	}}
500	r.w.inject(updates)
501	// Loop until the above update applies.
502	for {
503		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
504		if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
505			cancel()
506			break
507		}
508		time.Sleep(10 * time.Millisecond)
509		cancel()
510	}
511	// Issue 2 RPCs which should be completed with error status once cc is closed.
512	var wg sync.WaitGroup
513	wg.Add(2)
514	go func() {
515		defer wg.Done()
516		var reply string
517		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
518			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
519		}
520	}()
521	go func() {
522		defer wg.Done()
523		var reply string
524		time.Sleep(5 * time.Millisecond)
525		if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
526			t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
527		}
528	}()
529	time.Sleep(5 * time.Millisecond)
530	cc.Close()
531	wg.Wait()
532}
533
534func (s) TestPickFirstOrderAllServerUp(t *testing.T) {
535	// Start 3 servers on 3 ports.
536	numServers := 3
537	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
538	defer cleanup()
539	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
540	if err != nil {
541		t.Fatalf("Failed to create ClientConn: %v", err)
542	}
543	defer cc.Close()
544	// Add servers[1] and [2] to the service discovery.
545	u := &naming.Update{
546		Op:   naming.Add,
547		Addr: "localhost:" + servers[1].port,
548	}
549	r.w.inject([]*naming.Update{u})
550
551	u = &naming.Update{
552		Op:   naming.Add,
553		Addr: "localhost:" + servers[2].port,
554	}
555	r.w.inject([]*naming.Update{u})
556
557	// Loop until all 3 servers are up
558	checkServerUp(t, servers[0])
559	checkServerUp(t, servers[1])
560	checkServerUp(t, servers[2])
561
562	// Check the incoming RPCs served in server[0]
563	req := "port"
564	var reply string
565	for i := 0; i < 20; i++ {
566		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
567			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
568		}
569		time.Sleep(10 * time.Millisecond)
570	}
571
572	// Delete server[0] in the balancer, the incoming RPCs served in server[1]
573	// For test addrconn, close server[0] instead
574	u = &naming.Update{
575		Op:   naming.Delete,
576		Addr: "localhost:" + servers[0].port,
577	}
578	r.w.inject([]*naming.Update{u})
579	// Loop until it changes to server[1]
580	for {
581		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
582			break
583		}
584		time.Sleep(10 * time.Millisecond)
585	}
586	for i := 0; i < 20; i++ {
587		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
588			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
589		}
590		time.Sleep(10 * time.Millisecond)
591	}
592
593	// Add server[0] back to the balancer, the incoming RPCs served in server[1]
594	// Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
595	u = &naming.Update{
596		Op:   naming.Add,
597		Addr: "localhost:" + servers[0].port,
598	}
599	r.w.inject([]*naming.Update{u})
600	for i := 0; i < 20; i++ {
601		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
602			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
603		}
604		time.Sleep(10 * time.Millisecond)
605	}
606
607	// Delete server[1] in the balancer, the incoming RPCs served in server[2]
608	u = &naming.Update{
609		Op:   naming.Delete,
610		Addr: "localhost:" + servers[1].port,
611	}
612	r.w.inject([]*naming.Update{u})
613	for {
614		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
615			break
616		}
617		time.Sleep(1 * time.Second)
618	}
619	for i := 0; i < 20; i++ {
620		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
621			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
622		}
623		time.Sleep(10 * time.Millisecond)
624	}
625
626	// Delete server[2] in the balancer, the incoming RPCs served in server[0]
627	u = &naming.Update{
628		Op:   naming.Delete,
629		Addr: "localhost:" + servers[2].port,
630	}
631	r.w.inject([]*naming.Update{u})
632	for {
633		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
634			break
635		}
636		time.Sleep(1 * time.Second)
637	}
638	for i := 0; i < 20; i++ {
639		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
640			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
641		}
642		time.Sleep(10 * time.Millisecond)
643	}
644}
645
646func (s) TestPickFirstOrderOneServerDown(t *testing.T) {
647	// Start 3 servers on 3 ports.
648	numServers := 3
649	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
650	defer cleanup()
651	cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
652	if err != nil {
653		t.Fatalf("Failed to create ClientConn: %v", err)
654	}
655	defer cc.Close()
656	// Add servers[1] and [2] to the service discovery.
657	u := &naming.Update{
658		Op:   naming.Add,
659		Addr: "localhost:" + servers[1].port,
660	}
661	r.w.inject([]*naming.Update{u})
662
663	u = &naming.Update{
664		Op:   naming.Add,
665		Addr: "localhost:" + servers[2].port,
666	}
667	r.w.inject([]*naming.Update{u})
668
669	// Loop until all 3 servers are up
670	checkServerUp(t, servers[0])
671	checkServerUp(t, servers[1])
672	checkServerUp(t, servers[2])
673
674	// Check the incoming RPCs served in server[0]
675	req := "port"
676	var reply string
677	for i := 0; i < 20; i++ {
678		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
679			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
680		}
681		time.Sleep(10 * time.Millisecond)
682	}
683
684	// server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
685	// {server[0] server[1] server[2]}
686	servers[0].stop()
687	// Loop until it changes to server[1]
688	for {
689		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
690			break
691		}
692		time.Sleep(10 * time.Millisecond)
693	}
694	for i := 0; i < 20; i++ {
695		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
696			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
697		}
698		time.Sleep(10 * time.Millisecond)
699	}
700
701	// up the server[0] back, the incoming RPCs served in server[1]
702	p, _ := strconv.Atoi(servers[0].port)
703	servers[0] = newTestServer()
704	go servers[0].start(t, p, math.MaxUint32)
705	defer servers[0].stop()
706	servers[0].wait(t, 2*time.Second)
707	checkServerUp(t, servers[0])
708
709	for i := 0; i < 20; i++ {
710		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
711			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
712		}
713		time.Sleep(10 * time.Millisecond)
714	}
715
716	// Delete server[1] in the balancer, the incoming RPCs served in server[0]
717	u = &naming.Update{
718		Op:   naming.Delete,
719		Addr: "localhost:" + servers[1].port,
720	}
721	r.w.inject([]*naming.Update{u})
722	for {
723		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
724			break
725		}
726		time.Sleep(1 * time.Second)
727	}
728	for i := 0; i < 20; i++ {
729		if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
730			t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
731		}
732		time.Sleep(10 * time.Millisecond)
733	}
734}
735
736func (s) TestPickFirstOneAddressRemoval(t *testing.T) {
737	// Start 2 servers.
738	numServers := 2
739	servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
740	defer cleanup()
741	cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
742	if err != nil {
743		t.Fatalf("Failed to create ClientConn: %v", err)
744	}
745	defer cc.Close()
746	// Add servers[1] to the service discovery.
747	var updates []*naming.Update
748	updates = append(updates, &naming.Update{
749		Op:   naming.Add,
750		Addr: "localhost:" + servers[1].port,
751	})
752	r.w.inject(updates)
753
754	// Create a new cc to Loop until servers[1] is up
755	checkServerUp(t, servers[0])
756	checkServerUp(t, servers[1])
757
758	var wg sync.WaitGroup
759	numRPC := 100
760	sleepDuration := 10 * time.Millisecond
761	wg.Add(1)
762	go func() {
763		time.Sleep(sleepDuration)
764		// After sleepDuration, delete server[0].
765		var updates []*naming.Update
766		updates = append(updates, &naming.Update{
767			Op:   naming.Delete,
768			Addr: "localhost:" + servers[0].port,
769		})
770		r.w.inject(updates)
771		wg.Done()
772	}()
773
774	// All non-failfast RPCs should not fail because there's at least one connection available.
775	for i := 0; i < numRPC; i++ {
776		wg.Add(1)
777		go func() {
778			var reply string
779			time.Sleep(sleepDuration)
780			// After sleepDuration, invoke RPC.
781			// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
782			if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
783				t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
784			}
785			wg.Done()
786		}()
787	}
788	wg.Wait()
789}
790