1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package test 20 21import ( 22 "context" 23 "reflect" 24 "testing" 25 "time" 26 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/balancer" 29 "google.golang.org/grpc/connectivity" 30 "google.golang.org/grpc/credentials" 31 "google.golang.org/grpc/grpclog" 32 "google.golang.org/grpc/internal/balancerload" 33 "google.golang.org/grpc/internal/testutils" 34 "google.golang.org/grpc/metadata" 35 "google.golang.org/grpc/resolver" 36 "google.golang.org/grpc/resolver/manual" 37 testpb "google.golang.org/grpc/test/grpc_testing" 38 "google.golang.org/grpc/testdata" 39) 40 41const testBalancerName = "testbalancer" 42 43// testBalancer creates one subconn with the first address from resolved 44// addresses. 45// 46// It's used to test options for NewSubConn are applies correctly. 47type testBalancer struct { 48 cc balancer.ClientConn 49 sc balancer.SubConn 50 51 newSubConnOptions balancer.NewSubConnOptions 52 pickInfos []balancer.PickInfo 53 doneInfo []balancer.DoneInfo 54} 55 56func (b *testBalancer) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { 57 b.cc = cc 58 return b 59} 60 61func (*testBalancer) Name() string { 62 return testBalancerName 63} 64 65func (b *testBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { 66 // Only create a subconn at the first time. 67 if err == nil && b.sc == nil { 68 b.sc, err = b.cc.NewSubConn(addrs, b.newSubConnOptions) 69 if err != nil { 70 grpclog.Errorf("testBalancer: failed to NewSubConn: %v", err) 71 return 72 } 73 b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: &picker{sc: b.sc, bal: b}}) 74 b.sc.Connect() 75 } 76} 77 78func (b *testBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 79 grpclog.Infof("testBalancer: HandleSubConnStateChange: %p, %v", sc, s) 80 if b.sc != sc { 81 grpclog.Infof("testBalancer: ignored state change because sc is not recognized") 82 return 83 } 84 if s == connectivity.Shutdown { 85 b.sc = nil 86 return 87 } 88 89 switch s { 90 case connectivity.Ready, connectivity.Idle: 91 b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{sc: sc, bal: b}}) 92 case connectivity.Connecting: 93 b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrNoSubConnAvailable, bal: b}}) 94 case connectivity.TransientFailure: 95 b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrTransientFailure, bal: b}}) 96 } 97} 98 99func (b *testBalancer) Close() { 100} 101 102type picker struct { 103 err error 104 sc balancer.SubConn 105 bal *testBalancer 106} 107 108func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { 109 if p.err != nil { 110 return balancer.PickResult{}, p.err 111 } 112 info.Ctx = nil // Do not validate context. 113 p.bal.pickInfos = append(p.bal.pickInfos, info) 114 return balancer.PickResult{SubConn: p.sc, Done: func(d balancer.DoneInfo) { p.bal.doneInfo = append(p.bal.doneInfo, d) }}, nil 115} 116 117func (s) TestCredsBundleFromBalancer(t *testing.T) { 118 balancer.Register(&testBalancer{ 119 newSubConnOptions: balancer.NewSubConnOptions{ 120 CredsBundle: &testCredsBundle{}, 121 }, 122 }) 123 te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: ""}) 124 te.tapHandle = authHandle 125 te.customDialOptions = []grpc.DialOption{ 126 grpc.WithBalancerName(testBalancerName), 127 } 128 creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key")) 129 if err != nil { 130 t.Fatalf("Failed to generate credentials %v", err) 131 } 132 te.customServerOptions = []grpc.ServerOption{ 133 grpc.Creds(creds), 134 } 135 te.startServer(&testServer{}) 136 defer te.tearDown() 137 138 cc := te.clientConn() 139 tc := testpb.NewTestServiceClient(cc) 140 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 141 t.Fatalf("Test failed. Reason: %v", err) 142 } 143} 144 145func (s) TestDoneInfo(t *testing.T) { 146 for _, e := range listTestEnv() { 147 testDoneInfo(t, e) 148 } 149} 150 151func testDoneInfo(t *testing.T, e env) { 152 te := newTest(t, e) 153 b := &testBalancer{} 154 balancer.Register(b) 155 te.customDialOptions = []grpc.DialOption{ 156 grpc.WithBalancerName(testBalancerName), 157 } 158 te.userAgent = failAppUA 159 te.startServer(&testServer{security: e.security}) 160 defer te.tearDown() 161 162 cc := te.clientConn() 163 tc := testpb.NewTestServiceClient(cc) 164 165 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 166 defer cancel() 167 wantErr := detailedError 168 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !testutils.StatusErrEqual(err, wantErr) { 169 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr) 170 } 171 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 172 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 173 } 174 175 if len(b.doneInfo) < 1 || !testutils.StatusErrEqual(b.doneInfo[0].Err, wantErr) { 176 t.Fatalf("b.doneInfo = %v; want b.doneInfo[0].Err = %v", b.doneInfo, wantErr) 177 } 178 if len(b.doneInfo) < 2 || !reflect.DeepEqual(b.doneInfo[1].Trailer, testTrailerMetadata) { 179 t.Fatalf("b.doneInfo = %v; want b.doneInfo[1].Trailer = %v", b.doneInfo, testTrailerMetadata) 180 } 181 if len(b.pickInfos) != len(b.doneInfo) { 182 t.Fatalf("Got %d picks, but %d doneInfo, want equal amount", len(b.pickInfos), len(b.doneInfo)) 183 } 184 // To test done() is always called, even if it's returned with a non-Ready 185 // SubConn. 186 // 187 // Stop server and at the same time send RPCs. There are chances that picker 188 // is not updated in time, causing a non-Ready SubConn to be returned. 189 finished := make(chan struct{}) 190 go func() { 191 for i := 0; i < 20; i++ { 192 tc.UnaryCall(ctx, &testpb.SimpleRequest{}) 193 } 194 close(finished) 195 }() 196 te.srv.Stop() 197 <-finished 198 if len(b.pickInfos) != len(b.doneInfo) { 199 t.Fatalf("Got %d picks, %d doneInfo, want equal amount", len(b.pickInfos), len(b.doneInfo)) 200 } 201} 202 203const loadMDKey = "X-Endpoint-Load-Metrics-Bin" 204 205type testLoadParser struct{} 206 207func (*testLoadParser) Parse(md metadata.MD) interface{} { 208 vs := md.Get(loadMDKey) 209 if len(vs) == 0 { 210 return nil 211 } 212 return vs[0] 213} 214 215func init() { 216 balancerload.SetParser(&testLoadParser{}) 217} 218 219func (s) TestDoneLoads(t *testing.T) { 220 for _, e := range listTestEnv() { 221 testDoneLoads(t, e) 222 } 223} 224 225func testDoneLoads(t *testing.T, e env) { 226 b := &testBalancer{} 227 balancer.Register(b) 228 229 const testLoad = "test-load-,-should-be-orca" 230 231 ss := &stubServer{ 232 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 233 grpc.SetTrailer(ctx, metadata.Pairs(loadMDKey, testLoad)) 234 return &testpb.Empty{}, nil 235 }, 236 } 237 if err := ss.Start(nil, grpc.WithBalancerName(testBalancerName)); err != nil { 238 t.Fatalf("error starting testing server: %v", err) 239 } 240 defer ss.Stop() 241 242 tc := testpb.NewTestServiceClient(ss.cc) 243 244 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 245 defer cancel() 246 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 247 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, nil) 248 } 249 250 piWant := []balancer.PickInfo{ 251 {FullMethodName: "/grpc.testing.TestService/EmptyCall"}, 252 } 253 if !reflect.DeepEqual(b.pickInfos, piWant) { 254 t.Fatalf("b.pickInfos = %v; want %v", b.pickInfos, piWant) 255 } 256 257 if len(b.doneInfo) < 1 { 258 t.Fatalf("b.doneInfo = %v, want length 1", b.doneInfo) 259 } 260 gotLoad, _ := b.doneInfo[0].ServerLoad.(string) 261 if gotLoad != testLoad { 262 t.Fatalf("b.doneInfo[0].ServerLoad = %v; want = %v", b.doneInfo[0].ServerLoad, testLoad) 263 } 264} 265 266const testBalancerKeepAddressesName = "testbalancer-keepingaddresses" 267 268// testBalancerKeepAddresses keeps the addresses in the builder instead of 269// creating SubConns. 270// 271// It's used to test the addresses balancer gets are correct. 272type testBalancerKeepAddresses struct { 273 addrsChan chan []resolver.Address 274} 275 276func newTestBalancerKeepAddresses() *testBalancerKeepAddresses { 277 return &testBalancerKeepAddresses{ 278 addrsChan: make(chan []resolver.Address, 10), 279 } 280} 281 282func (b *testBalancerKeepAddresses) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { 283 return b 284} 285 286func (*testBalancerKeepAddresses) Name() string { 287 return testBalancerKeepAddressesName 288} 289 290func (b *testBalancerKeepAddresses) HandleResolvedAddrs(addrs []resolver.Address, err error) { 291 b.addrsChan <- addrs 292} 293 294func (testBalancerKeepAddresses) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 295 panic("not used") 296} 297 298func (testBalancerKeepAddresses) Close() { 299} 300 301// Make sure that non-grpclb balancers don't get grpclb addresses even if name 302// resolver sends them 303func (s) TestNonGRPCLBBalancerGetsNoGRPCLBAddress(t *testing.T) { 304 r, rcleanup := manual.GenerateAndRegisterManualResolver() 305 defer rcleanup() 306 307 b := newTestBalancerKeepAddresses() 308 balancer.Register(b) 309 310 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), 311 grpc.WithBalancerName(b.Name())) 312 if err != nil { 313 t.Fatalf("failed to dial: %v", err) 314 } 315 defer cc.Close() 316 317 grpclbAddresses := []resolver.Address{{ 318 Addr: "grpc.lb.com", 319 Type: resolver.GRPCLB, 320 ServerName: "grpc.lb.com", 321 }} 322 323 nonGRPCLBAddresses := []resolver.Address{{ 324 Addr: "localhost", 325 Type: resolver.Backend, 326 }} 327 328 r.UpdateState(resolver.State{ 329 Addresses: nonGRPCLBAddresses, 330 }) 331 if got := <-b.addrsChan; !reflect.DeepEqual(got, nonGRPCLBAddresses) { 332 t.Fatalf("With only backend addresses, balancer got addresses %v, want %v", got, nonGRPCLBAddresses) 333 } 334 335 r.UpdateState(resolver.State{ 336 Addresses: grpclbAddresses, 337 }) 338 if got := <-b.addrsChan; len(got) != 0 { 339 t.Fatalf("With only grpclb addresses, balancer got addresses %v, want empty", got) 340 } 341 342 r.UpdateState(resolver.State{ 343 Addresses: append(grpclbAddresses, nonGRPCLBAddresses...), 344 }) 345 if got := <-b.addrsChan; !reflect.DeepEqual(got, nonGRPCLBAddresses) { 346 t.Fatalf("With both backend and grpclb addresses, balancer got addresses %v, want %v", got, nonGRPCLBAddresses) 347 } 348} 349