1// +build go1.13 2// +build !386 3 4/* 5 * 6 * Copyright 2021 gRPC authors. 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 * 20 */ 21 22// Package xds_test contains e2e tests for xDS use. 23package xds_test 24 25import ( 26 "context" 27 "fmt" 28 "net" 29 "sync" 30 "testing" 31 32 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" 33 34 "google.golang.org/grpc" 35 "google.golang.org/grpc/connectivity" 36 "google.golang.org/grpc/credentials/insecure" 37 xdscreds "google.golang.org/grpc/credentials/xds" 38 "google.golang.org/grpc/internal/testutils" 39 testpb "google.golang.org/grpc/test/grpc_testing" 40 "google.golang.org/grpc/xds" 41 xdstestutils "google.golang.org/grpc/xds/internal/testutils" 42 "google.golang.org/grpc/xds/internal/testutils/e2e" 43) 44 45// A convenience typed used to keep track of mode changes on multiple listeners. 46type modeTracker struct { 47 mu sync.Mutex 48 modes map[string]xds.ServingMode 49 updateCh *testutils.Channel 50} 51 52func newModeTracker() *modeTracker { 53 return &modeTracker{ 54 modes: make(map[string]xds.ServingMode), 55 updateCh: testutils.NewChannel(), 56 } 57} 58 59func (mt *modeTracker) updateMode(ctx context.Context, addr net.Addr, mode xds.ServingMode) { 60 mt.mu.Lock() 61 defer mt.mu.Unlock() 62 63 mt.modes[addr.String()] = mode 64 // Sometimes we could get state updates which are not expected by the test. 65 // Using `Send()` here would block in that case and cause the whole test to 66 // hang and will eventually only timeout when the `-timeout` passed to `go 67 // test` elapses. Using `SendContext()` here instead fails the test within a 68 // reasonable timeout. 69 mt.updateCh.SendContext(ctx, nil) 70} 71 72func (mt *modeTracker) getMode(addr net.Addr) xds.ServingMode { 73 mt.mu.Lock() 74 defer mt.mu.Unlock() 75 return mt.modes[addr.String()] 76} 77 78func (mt *modeTracker) waitForUpdate(ctx context.Context) error { 79 _, err := mt.updateCh.Receive(ctx) 80 if err != nil { 81 return fmt.Errorf("error when waiting for a mode change update: %v", err) 82 } 83 return nil 84} 85 86// TestServerSideXDS_ServingModeChanges tests the serving mode functionality in 87// xDS enabled gRPC servers. It verifies that appropriate mode changes happen in 88// the server, and also verifies behavior of clientConns under these modes. 89func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { 90 // Configure xDS credentials to be used on the server-side. 91 creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{ 92 FallbackCreds: insecure.NewCredentials(), 93 }) 94 if err != nil { 95 t.Fatal(err) 96 } 97 98 // Create a server option to get notified about serving mode changes. 99 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) 100 defer cancel() 101 modeTracker := newModeTracker() 102 modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { 103 t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) 104 modeTracker.updateMode(ctx, addr, args.Mode) 105 }) 106 107 // Initialize an xDS-enabled gRPC server and register the stubServer on it. 108 server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) 109 defer server.Stop() 110 testpb.RegisterTestServiceServer(server, &testService{}) 111 112 // Create two local listeners and pass it to Serve(). 113 lis1, err := xdstestutils.LocalTCPListener() 114 if err != nil { 115 t.Fatalf("testutils.LocalTCPListener() failed: %v", err) 116 } 117 lis2, err := xdstestutils.LocalTCPListener() 118 if err != nil { 119 t.Fatalf("testutils.LocalTCPListener() failed: %v", err) 120 } 121 122 // Setup the management server to respond with server-side Listener 123 // resources for both listeners. 124 host1, port1, err := hostPortFromListener(lis1) 125 if err != nil { 126 t.Fatalf("failed to retrieve host and port of server: %v", err) 127 } 128 listener1 := e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone) 129 host2, port2, err := hostPortFromListener(lis2) 130 if err != nil { 131 t.Fatalf("failed to retrieve host and port of server: %v", err) 132 } 133 listener2 := e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone) 134 resources := e2e.UpdateOptions{ 135 NodeID: xdsClientNodeID, 136 Listeners: []*v3listenerpb.Listener{listener1, listener2}, 137 } 138 if err := managementServer.Update(resources); err != nil { 139 t.Fatal(err) 140 } 141 142 go func() { 143 if err := server.Serve(lis1); err != nil { 144 t.Errorf("Serve() failed: %v", err) 145 } 146 }() 147 go func() { 148 if err := server.Serve(lis2); err != nil { 149 t.Errorf("Serve() failed: %v", err) 150 } 151 }() 152 153 // Wait for both listeners to move to "serving" mode. 154 if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil { 155 t.Fatal(err) 156 } 157 if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil { 158 t.Fatal(err) 159 } 160 161 // Create a ClientConn to the first listener and make a successful RPCs. 162 cc1, err := grpc.DialContext(ctx, lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) 163 if err != nil { 164 t.Fatalf("failed to dial local test server: %v", err) 165 } 166 defer cc1.Close() 167 168 client1 := testpb.NewTestServiceClient(cc1) 169 if _, err := client1.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 170 t.Fatalf("rpc EmptyCall() failed: %v", err) 171 } 172 173 // Create a ClientConn to the second listener and make a successful RPCs. 174 cc2, err := grpc.DialContext(ctx, lis2.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) 175 if err != nil { 176 t.Fatalf("failed to dial local test server: %v", err) 177 } 178 defer cc2.Close() 179 180 client2 := testpb.NewTestServiceClient(cc2) 181 if _, err := client2.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 182 t.Fatalf("rpc EmptyCall() failed: %v", err) 183 } 184 185 // Update the management server to remove the second listener resource. This 186 // should push only the second listener into "not-serving" mode. 187 if err := managementServer.Update(e2e.UpdateOptions{ 188 NodeID: xdsClientNodeID, 189 Listeners: []*v3listenerpb.Listener{listener1}, 190 }); err != nil { 191 t.Error(err) 192 } 193 if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeNotServing); err != nil { 194 t.Fatal(err) 195 } 196 197 // Make sure cc1 is still in READY state, while cc2 has moved out of READY. 198 if s := cc1.GetState(); s != connectivity.Ready { 199 t.Fatalf("clientConn1 state is %s, want %s", s, connectivity.Ready) 200 } 201 if !cc2.WaitForStateChange(ctx, connectivity.Ready) { 202 t.Fatal("clientConn2 failed to move out of READY") 203 } 204 205 // Make sure RPCs succeed on cc1 and fail on cc2. 206 if _, err := client1.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 207 t.Fatalf("rpc EmptyCall() failed: %v", err) 208 } 209 if _, err := client2.EmptyCall(ctx, &testpb.Empty{}); err == nil { 210 t.Fatal("rpc EmptyCall() succeeded when expected to fail") 211 } 212 213 // Update the management server to remove the first listener resource as 214 // well. This should push the first listener into "not-serving" mode. Second 215 // listener is already in "not-serving" mode. 216 if err := managementServer.Update(e2e.UpdateOptions{ 217 NodeID: xdsClientNodeID, 218 Listeners: []*v3listenerpb.Listener{}, 219 }); err != nil { 220 t.Error(err) 221 } 222 if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeNotServing); err != nil { 223 t.Fatal(err) 224 } 225 226 // Make sure cc1 has moved out of READY. 227 if !cc1.WaitForStateChange(ctx, connectivity.Ready) { 228 t.Fatal("clientConn1 failed to move out of READY") 229 } 230 231 // Make sure RPCs fail on both. 232 if _, err := client1.EmptyCall(ctx, &testpb.Empty{}); err == nil { 233 t.Fatal("rpc EmptyCall() succeeded when expected to fail") 234 } 235 if _, err := client2.EmptyCall(ctx, &testpb.Empty{}); err == nil { 236 t.Fatal("rpc EmptyCall() succeeded when expected to fail") 237 } 238 239 // Make sure new connection attempts to "not-serving" servers fail. We use a 240 // short timeout since we expect this to fail. 241 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) 242 defer sCancel() 243 if _, err := grpc.DialContext(sCtx, lis1.Addr().String(), grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())); err == nil { 244 t.Fatal("successfully created clientConn to a server in \"not-serving\" state") 245 } 246 247 // Update the management server with both listener resources. 248 if err := managementServer.Update(e2e.UpdateOptions{ 249 NodeID: xdsClientNodeID, 250 Listeners: []*v3listenerpb.Listener{listener1, listener2}, 251 }); err != nil { 252 t.Error(err) 253 } 254 255 // Wait for both listeners to move to "serving" mode. 256 if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil { 257 t.Fatal(err) 258 } 259 if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil { 260 t.Fatal(err) 261 } 262 263 // The clientConns created earlier should be able to make RPCs now. 264 if _, err := client1.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 265 t.Fatalf("rpc EmptyCall() failed: %v", err) 266 } 267 if _, err := client2.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 268 t.Fatalf("rpc EmptyCall() failed: %v", err) 269 } 270} 271 272func waitForModeChange(ctx context.Context, modeTracker *modeTracker, addr net.Addr, wantMode xds.ServingMode) error { 273 for { 274 if gotMode := modeTracker.getMode(addr); gotMode == wantMode { 275 return nil 276 } 277 if err := modeTracker.waitForUpdate(ctx); err != nil { 278 return err 279 } 280 } 281} 282