1// +build go1.13
2// +build !386
3
4/*
5 *
6 * Copyright 2021 gRPC authors.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 *     http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 *
20 */
21
22// Package xds_test contains e2e tests for xDS use.
23package xds_test
24
25import (
26	"context"
27	"fmt"
28	"net"
29	"sync"
30	"testing"
31
32	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
33
34	"google.golang.org/grpc"
35	"google.golang.org/grpc/connectivity"
36	"google.golang.org/grpc/credentials/insecure"
37	xdscreds "google.golang.org/grpc/credentials/xds"
38	"google.golang.org/grpc/internal/testutils"
39	testpb "google.golang.org/grpc/test/grpc_testing"
40	"google.golang.org/grpc/xds"
41	xdstestutils "google.golang.org/grpc/xds/internal/testutils"
42	"google.golang.org/grpc/xds/internal/testutils/e2e"
43)
44
45// A convenience typed used to keep track of mode changes on multiple listeners.
46type modeTracker struct {
47	mu       sync.Mutex
48	modes    map[string]xds.ServingMode
49	updateCh *testutils.Channel
50}
51
52func newModeTracker() *modeTracker {
53	return &modeTracker{
54		modes:    make(map[string]xds.ServingMode),
55		updateCh: testutils.NewChannel(),
56	}
57}
58
59func (mt *modeTracker) updateMode(ctx context.Context, addr net.Addr, mode xds.ServingMode) {
60	mt.mu.Lock()
61	defer mt.mu.Unlock()
62
63	mt.modes[addr.String()] = mode
64	// Sometimes we could get state updates which are not expected by the test.
65	// Using `Send()` here would block in that case and cause the whole test to
66	// hang and will eventually only timeout when the `-timeout` passed to `go
67	// test` elapses. Using `SendContext()` here instead fails the test within a
68	// reasonable timeout.
69	mt.updateCh.SendContext(ctx, nil)
70}
71
72func (mt *modeTracker) getMode(addr net.Addr) xds.ServingMode {
73	mt.mu.Lock()
74	defer mt.mu.Unlock()
75	return mt.modes[addr.String()]
76}
77
78func (mt *modeTracker) waitForUpdate(ctx context.Context) error {
79	_, err := mt.updateCh.Receive(ctx)
80	if err != nil {
81		return fmt.Errorf("error when waiting for a mode change update: %v", err)
82	}
83	return nil
84}
85
86// TestServerSideXDS_ServingModeChanges tests the serving mode functionality in
87// xDS enabled gRPC servers. It verifies that appropriate mode changes happen in
88// the server, and also verifies behavior of clientConns under these modes.
89func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
90	// Configure xDS credentials to be used on the server-side.
91	creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
92		FallbackCreds: insecure.NewCredentials(),
93	})
94	if err != nil {
95		t.Fatal(err)
96	}
97
98	// Create a server option to get notified about serving mode changes.
99	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
100	defer cancel()
101	modeTracker := newModeTracker()
102	modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
103		t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
104		modeTracker.updateMode(ctx, addr, args.Mode)
105	})
106
107	// Initialize an xDS-enabled gRPC server and register the stubServer on it.
108	server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
109	defer server.Stop()
110	testpb.RegisterTestServiceServer(server, &testService{})
111
112	// Create two local listeners and pass it to Serve().
113	lis1, err := xdstestutils.LocalTCPListener()
114	if err != nil {
115		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
116	}
117	lis2, err := xdstestutils.LocalTCPListener()
118	if err != nil {
119		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
120	}
121
122	// Setup the management server to respond with server-side Listener
123	// resources for both listeners.
124	host1, port1, err := hostPortFromListener(lis1)
125	if err != nil {
126		t.Fatalf("failed to retrieve host and port of server: %v", err)
127	}
128	listener1 := e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone)
129	host2, port2, err := hostPortFromListener(lis2)
130	if err != nil {
131		t.Fatalf("failed to retrieve host and port of server: %v", err)
132	}
133	listener2 := e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone)
134	resources := e2e.UpdateOptions{
135		NodeID:    xdsClientNodeID,
136		Listeners: []*v3listenerpb.Listener{listener1, listener2},
137	}
138	if err := managementServer.Update(resources); err != nil {
139		t.Fatal(err)
140	}
141
142	go func() {
143		if err := server.Serve(lis1); err != nil {
144			t.Errorf("Serve() failed: %v", err)
145		}
146	}()
147	go func() {
148		if err := server.Serve(lis2); err != nil {
149			t.Errorf("Serve() failed: %v", err)
150		}
151	}()
152
153	// Wait for both listeners to move to "serving" mode.
154	if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil {
155		t.Fatal(err)
156	}
157	if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil {
158		t.Fatal(err)
159	}
160
161	// Create a ClientConn to the first listener and make a successful RPCs.
162	cc1, err := grpc.DialContext(ctx, lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
163	if err != nil {
164		t.Fatalf("failed to dial local test server: %v", err)
165	}
166	defer cc1.Close()
167
168	client1 := testpb.NewTestServiceClient(cc1)
169	if _, err := client1.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
170		t.Fatalf("rpc EmptyCall() failed: %v", err)
171	}
172
173	// Create a ClientConn to the second listener and make a successful RPCs.
174	cc2, err := grpc.DialContext(ctx, lis2.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
175	if err != nil {
176		t.Fatalf("failed to dial local test server: %v", err)
177	}
178	defer cc2.Close()
179
180	client2 := testpb.NewTestServiceClient(cc2)
181	if _, err := client2.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
182		t.Fatalf("rpc EmptyCall() failed: %v", err)
183	}
184
185	// Update the management server to remove the second listener resource. This
186	// should push only the second listener into "not-serving" mode.
187	if err := managementServer.Update(e2e.UpdateOptions{
188		NodeID:    xdsClientNodeID,
189		Listeners: []*v3listenerpb.Listener{listener1},
190	}); err != nil {
191		t.Error(err)
192	}
193	if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeNotServing); err != nil {
194		t.Fatal(err)
195	}
196
197	// Make sure cc1 is still in READY state, while cc2 has moved out of READY.
198	if s := cc1.GetState(); s != connectivity.Ready {
199		t.Fatalf("clientConn1 state is %s, want %s", s, connectivity.Ready)
200	}
201	if !cc2.WaitForStateChange(ctx, connectivity.Ready) {
202		t.Fatal("clientConn2 failed to move out of READY")
203	}
204
205	// Make sure RPCs succeed on cc1 and fail on cc2.
206	if _, err := client1.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
207		t.Fatalf("rpc EmptyCall() failed: %v", err)
208	}
209	if _, err := client2.EmptyCall(ctx, &testpb.Empty{}); err == nil {
210		t.Fatal("rpc EmptyCall() succeeded when expected to fail")
211	}
212
213	// Update the management server to remove the first listener resource as
214	// well. This should push the first listener into "not-serving" mode. Second
215	// listener is already in "not-serving" mode.
216	if err := managementServer.Update(e2e.UpdateOptions{
217		NodeID:    xdsClientNodeID,
218		Listeners: []*v3listenerpb.Listener{},
219	}); err != nil {
220		t.Error(err)
221	}
222	if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeNotServing); err != nil {
223		t.Fatal(err)
224	}
225
226	// Make sure cc1 has moved out of READY.
227	if !cc1.WaitForStateChange(ctx, connectivity.Ready) {
228		t.Fatal("clientConn1 failed to move out of READY")
229	}
230
231	// Make sure RPCs fail on both.
232	if _, err := client1.EmptyCall(ctx, &testpb.Empty{}); err == nil {
233		t.Fatal("rpc EmptyCall() succeeded when expected to fail")
234	}
235	if _, err := client2.EmptyCall(ctx, &testpb.Empty{}); err == nil {
236		t.Fatal("rpc EmptyCall() succeeded when expected to fail")
237	}
238
239	// Make sure new connection attempts to "not-serving" servers fail. We use a
240	// short timeout since we expect this to fail.
241	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
242	defer sCancel()
243	if _, err := grpc.DialContext(sCtx, lis1.Addr().String(), grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())); err == nil {
244		t.Fatal("successfully created clientConn to a server in \"not-serving\" state")
245	}
246
247	// Update the management server with both listener resources.
248	if err := managementServer.Update(e2e.UpdateOptions{
249		NodeID:    xdsClientNodeID,
250		Listeners: []*v3listenerpb.Listener{listener1, listener2},
251	}); err != nil {
252		t.Error(err)
253	}
254
255	// Wait for both listeners to move to "serving" mode.
256	if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil {
257		t.Fatal(err)
258	}
259	if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil {
260		t.Fatal(err)
261	}
262
263	// The clientConns created earlier should be able to make RPCs now.
264	if _, err := client1.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
265		t.Fatalf("rpc EmptyCall() failed: %v", err)
266	}
267	if _, err := client2.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
268		t.Fatalf("rpc EmptyCall() failed: %v", err)
269	}
270}
271
272func waitForModeChange(ctx context.Context, modeTracker *modeTracker, addr net.Addr, wantMode xds.ServingMode) error {
273	for {
274		if gotMode := modeTracker.getMode(addr); gotMode == wantMode {
275			return nil
276		}
277		if err := modeTracker.waitForUpdate(ctx); err != nil {
278			return err
279		}
280	}
281}
282