1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"net"
24	"sync"
25	"testing"
26	"time"
27
28	"golang.org/x/net/http2"
29	"google.golang.org/grpc/balancer"
30	"google.golang.org/grpc/connectivity"
31	"google.golang.org/grpc/internal/testutils"
32	"google.golang.org/grpc/resolver"
33	"google.golang.org/grpc/resolver/manual"
34)
35
36const stateRecordingBalancerName = "state_recoding_balancer"
37
38var testBalancerBuilder = newStateRecordingBalancerBuilder()
39
40func init() {
41	balancer.Register(testBalancerBuilder)
42}
43
44// These tests use a pipeListener. This listener is similar to net.Listener
45// except that it is unbuffered, so each read and write will wait for the other
46// side's corresponding write or read.
47func (s) TestStateTransitions_SingleAddress(t *testing.T) {
48	for _, test := range []struct {
49		desc   string
50		want   []connectivity.State
51		server func(net.Listener) net.Conn
52	}{
53		{
54			desc: "When the server returns server preface, the client enters READY.",
55			want: []connectivity.State{
56				connectivity.Connecting,
57				connectivity.Ready,
58			},
59			server: func(lis net.Listener) net.Conn {
60				conn, err := lis.Accept()
61				if err != nil {
62					t.Error(err)
63					return nil
64				}
65
66				go keepReading(conn)
67
68				framer := http2.NewFramer(conn, conn)
69				if err := framer.WriteSettings(http2.Setting{}); err != nil {
70					t.Errorf("Error while writing settings frame. %v", err)
71					return nil
72				}
73
74				return conn
75			},
76		},
77		{
78			desc: "When the connection is closed, the client enters TRANSIENT FAILURE.",
79			want: []connectivity.State{
80				connectivity.Connecting,
81				connectivity.TransientFailure,
82			},
83			server: func(lis net.Listener) net.Conn {
84				conn, err := lis.Accept()
85				if err != nil {
86					t.Error(err)
87					return nil
88				}
89
90				conn.Close()
91				return nil
92			},
93		},
94		{
95			desc: `When the server sends its connection preface, but the connection dies before the client can write its
96connection preface, the client enters TRANSIENT FAILURE.`,
97			want: []connectivity.State{
98				connectivity.Connecting,
99				connectivity.TransientFailure,
100			},
101			server: func(lis net.Listener) net.Conn {
102				conn, err := lis.Accept()
103				if err != nil {
104					t.Error(err)
105					return nil
106				}
107
108				framer := http2.NewFramer(conn, conn)
109				if err := framer.WriteSettings(http2.Setting{}); err != nil {
110					t.Errorf("Error while writing settings frame. %v", err)
111					return nil
112				}
113
114				conn.Close()
115				return nil
116			},
117		},
118		{
119			desc: `When the server reads the client connection preface but does not send its connection preface, the
120client enters TRANSIENT FAILURE.`,
121			want: []connectivity.State{
122				connectivity.Connecting,
123				connectivity.TransientFailure,
124			},
125			server: func(lis net.Listener) net.Conn {
126				conn, err := lis.Accept()
127				if err != nil {
128					t.Error(err)
129					return nil
130				}
131
132				go keepReading(conn)
133
134				return conn
135			},
136		},
137	} {
138		t.Log(test.desc)
139		testStateTransitionSingleAddress(t, test.want, test.server)
140	}
141}
142
143func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
144	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
145	defer cancel()
146
147	pl := testutils.NewPipeListener()
148	defer pl.Close()
149
150	// Launch the server.
151	var conn net.Conn
152	var connMu sync.Mutex
153	go func() {
154		connMu.Lock()
155		conn = server(pl)
156		connMu.Unlock()
157	}()
158
159	client, err := DialContext(ctx,
160		"",
161		WithInsecure(),
162		WithBalancerName(stateRecordingBalancerName),
163		WithDialer(pl.Dialer()),
164		withBackoff(noBackoff{}),
165		withMinConnectDeadline(func() time.Duration { return time.Millisecond * 100 }))
166	if err != nil {
167		t.Fatal(err)
168	}
169	defer client.Close()
170
171	stateNotifications := testBalancerBuilder.nextStateNotifier()
172
173	timeout := time.After(5 * time.Second)
174
175	for i := 0; i < len(want); i++ {
176		select {
177		case <-timeout:
178			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
179		case seen := <-stateNotifications:
180			if seen != want[i] {
181				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
182			}
183		}
184	}
185
186	connMu.Lock()
187	defer connMu.Unlock()
188	if conn != nil {
189		err = conn.Close()
190		if err != nil {
191			t.Fatal(err)
192		}
193	}
194}
195
196// When a READY connection is closed, the client enters CONNECTING.
197func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
198	want := []connectivity.State{
199		connectivity.Connecting,
200		connectivity.Ready,
201		connectivity.Connecting,
202	}
203
204	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
205	defer cancel()
206
207	lis, err := net.Listen("tcp", "localhost:0")
208	if err != nil {
209		t.Fatalf("Error while listening. Err: %v", err)
210	}
211	defer lis.Close()
212
213	sawReady := make(chan struct{})
214
215	// Launch the server.
216	go func() {
217		conn, err := lis.Accept()
218		if err != nil {
219			t.Error(err)
220			return
221		}
222
223		go keepReading(conn)
224
225		framer := http2.NewFramer(conn, conn)
226		if err := framer.WriteSettings(http2.Setting{}); err != nil {
227			t.Errorf("Error while writing settings frame. %v", err)
228			return
229		}
230
231		// Prevents race between onPrefaceReceipt and onClose.
232		<-sawReady
233
234		conn.Close()
235	}()
236
237	client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
238	if err != nil {
239		t.Fatal(err)
240	}
241	defer client.Close()
242
243	stateNotifications := testBalancerBuilder.nextStateNotifier()
244
245	timeout := time.After(5 * time.Second)
246
247	for i := 0; i < len(want); i++ {
248		select {
249		case <-timeout:
250			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
251		case seen := <-stateNotifications:
252			if seen == connectivity.Ready {
253				close(sawReady)
254			}
255			if seen != want[i] {
256				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
257			}
258		}
259	}
260}
261
262// When the first connection is closed, the client stays in CONNECTING until it
263// tries the second address (which succeeds, and then it enters READY).
264func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
265	want := []connectivity.State{
266		connectivity.Connecting,
267		connectivity.Ready,
268	}
269
270	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
271	defer cancel()
272
273	lis1, err := net.Listen("tcp", "localhost:0")
274	if err != nil {
275		t.Fatalf("Error while listening. Err: %v", err)
276	}
277	defer lis1.Close()
278
279	lis2, err := net.Listen("tcp", "localhost:0")
280	if err != nil {
281		t.Fatalf("Error while listening. Err: %v", err)
282	}
283	defer lis2.Close()
284
285	server1Done := make(chan struct{})
286	server2Done := make(chan struct{})
287
288	// Launch server 1.
289	go func() {
290		conn, err := lis1.Accept()
291		if err != nil {
292			t.Error(err)
293			return
294		}
295
296		conn.Close()
297		close(server1Done)
298	}()
299	// Launch server 2.
300	go func() {
301		conn, err := lis2.Accept()
302		if err != nil {
303			t.Error(err)
304			return
305		}
306
307		go keepReading(conn)
308
309		framer := http2.NewFramer(conn, conn)
310		if err := framer.WriteSettings(http2.Setting{}); err != nil {
311			t.Errorf("Error while writing settings frame. %v", err)
312			return
313		}
314
315		close(server2Done)
316	}()
317
318	rb := manual.NewBuilderWithScheme("whatever")
319	rb.InitialState(resolver.State{Addresses: []resolver.Address{
320		{Addr: lis1.Addr().String()},
321		{Addr: lis2.Addr().String()},
322	}})
323	client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
324	if err != nil {
325		t.Fatal(err)
326	}
327	defer client.Close()
328
329	stateNotifications := testBalancerBuilder.nextStateNotifier()
330
331	timeout := time.After(5 * time.Second)
332
333	for i := 0; i < len(want); i++ {
334		select {
335		case <-timeout:
336			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
337		case seen := <-stateNotifications:
338			if seen != want[i] {
339				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
340			}
341		}
342	}
343	select {
344	case <-timeout:
345		t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
346	case <-server1Done:
347	}
348	select {
349	case <-timeout:
350		t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
351	case <-server2Done:
352	}
353}
354
355// When there are multiple addresses, and we enter READY on one of them, a
356// later closure should cause the client to enter CONNECTING
357func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
358	want := []connectivity.State{
359		connectivity.Connecting,
360		connectivity.Ready,
361		connectivity.Connecting,
362	}
363
364	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
365	defer cancel()
366
367	lis1, err := net.Listen("tcp", "localhost:0")
368	if err != nil {
369		t.Fatalf("Error while listening. Err: %v", err)
370	}
371	defer lis1.Close()
372
373	// Never actually gets used; we just want it to be alive so that the resolver has two addresses to target.
374	lis2, err := net.Listen("tcp", "localhost:0")
375	if err != nil {
376		t.Fatalf("Error while listening. Err: %v", err)
377	}
378	defer lis2.Close()
379
380	server1Done := make(chan struct{})
381	sawReady := make(chan struct{})
382
383	// Launch server 1.
384	go func() {
385		conn, err := lis1.Accept()
386		if err != nil {
387			t.Error(err)
388			return
389		}
390
391		go keepReading(conn)
392
393		framer := http2.NewFramer(conn, conn)
394		if err := framer.WriteSettings(http2.Setting{}); err != nil {
395			t.Errorf("Error while writing settings frame. %v", err)
396			return
397		}
398
399		<-sawReady
400
401		conn.Close()
402
403		_, err = lis1.Accept()
404		if err != nil {
405			t.Error(err)
406			return
407		}
408
409		close(server1Done)
410	}()
411
412	rb := manual.NewBuilderWithScheme("whatever")
413	rb.InitialState(resolver.State{Addresses: []resolver.Address{
414		{Addr: lis1.Addr().String()},
415		{Addr: lis2.Addr().String()},
416	}})
417	client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
418	if err != nil {
419		t.Fatal(err)
420	}
421	defer client.Close()
422
423	stateNotifications := testBalancerBuilder.nextStateNotifier()
424
425	timeout := time.After(2 * time.Second)
426
427	for i := 0; i < len(want); i++ {
428		select {
429		case <-timeout:
430			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
431		case seen := <-stateNotifications:
432			if seen == connectivity.Ready {
433				close(sawReady)
434			}
435			if seen != want[i] {
436				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
437			}
438		}
439	}
440	select {
441	case <-timeout:
442		t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
443	case <-server1Done:
444	}
445}
446
447type stateRecordingBalancer struct {
448	notifier chan<- connectivity.State
449	balancer.Balancer
450}
451
452func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
453	b.notifier <- s.ConnectivityState
454	b.Balancer.UpdateSubConnState(sc, s)
455}
456
457func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
458	b.notifier = r
459}
460
461func (b *stateRecordingBalancer) Close() {
462	b.Balancer.Close()
463}
464
465type stateRecordingBalancerBuilder struct {
466	mu       sync.Mutex
467	notifier chan connectivity.State // The notifier used in the last Balancer.
468}
469
470func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
471	return &stateRecordingBalancerBuilder{}
472}
473
474func (b *stateRecordingBalancerBuilder) Name() string {
475	return stateRecordingBalancerName
476}
477
478func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
479	stateNotifications := make(chan connectivity.State, 10)
480	b.mu.Lock()
481	b.notifier = stateNotifications
482	b.mu.Unlock()
483	return &stateRecordingBalancer{
484		notifier: stateNotifications,
485		Balancer: balancer.Get(PickFirstBalancerName).Build(cc, opts),
486	}
487}
488
489func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
490	b.mu.Lock()
491	defer b.mu.Unlock()
492	ret := b.notifier
493	b.notifier = nil
494	return ret
495}
496
497type noBackoff struct{}
498
499func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) }
500
501// Keep reading until something causes the connection to die (EOF, server
502// closed, etc). Useful as a tool for mindlessly keeping the connection
503// healthy, since the client will error if things like client prefaces are not
504// accepted in a timely fashion.
505func keepReading(conn net.Conn) {
506	buf := make([]byte, 1024)
507	for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
508	}
509}
510