1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package test 20 21import ( 22 "bufio" 23 "bytes" 24 "compress/gzip" 25 "context" 26 "crypto/tls" 27 "errors" 28 "flag" 29 "fmt" 30 "io" 31 "math" 32 "net" 33 "net/http" 34 "os" 35 "reflect" 36 "runtime" 37 "strings" 38 "sync" 39 "sync/atomic" 40 "syscall" 41 "testing" 42 "time" 43 44 "github.com/golang/protobuf/proto" 45 anypb "github.com/golang/protobuf/ptypes/any" 46 "golang.org/x/net/http2" 47 "golang.org/x/net/http2/hpack" 48 spb "google.golang.org/genproto/googleapis/rpc/status" 49 "google.golang.org/grpc" 50 "google.golang.org/grpc/codes" 51 "google.golang.org/grpc/connectivity" 52 "google.golang.org/grpc/credentials" 53 "google.golang.org/grpc/encoding" 54 _ "google.golang.org/grpc/encoding/gzip" 55 "google.golang.org/grpc/health" 56 healthgrpc "google.golang.org/grpc/health/grpc_health_v1" 57 healthpb "google.golang.org/grpc/health/grpc_health_v1" 58 "google.golang.org/grpc/internal/channelz" 59 "google.golang.org/grpc/internal/grpcsync" 60 "google.golang.org/grpc/internal/grpctest" 61 "google.golang.org/grpc/internal/testutils" 62 "google.golang.org/grpc/internal/transport" 63 "google.golang.org/grpc/metadata" 64 "google.golang.org/grpc/peer" 65 "google.golang.org/grpc/resolver" 66 "google.golang.org/grpc/resolver/manual" 67 "google.golang.org/grpc/serviceconfig" 68 "google.golang.org/grpc/stats" 69 "google.golang.org/grpc/status" 70 "google.golang.org/grpc/tap" 71 testpb "google.golang.org/grpc/test/grpc_testing" 72 "google.golang.org/grpc/testdata" 73) 74 75const defaultHealthService = "grpc.health.v1.Health" 76 77func init() { 78 channelz.TurnOn() 79} 80 81type s struct { 82 grpctest.Tester 83} 84 85func Test(t *testing.T) { 86 grpctest.RunSubTests(t, s{}) 87} 88 89var ( 90 // For headers: 91 testMetadata = metadata.MD{ 92 "key1": []string{"value1"}, 93 "key2": []string{"value2"}, 94 "key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})}, 95 } 96 testMetadata2 = metadata.MD{ 97 "key1": []string{"value12"}, 98 "key2": []string{"value22"}, 99 } 100 // For trailers: 101 testTrailerMetadata = metadata.MD{ 102 "tkey1": []string{"trailerValue1"}, 103 "tkey2": []string{"trailerValue2"}, 104 "tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})}, 105 } 106 testTrailerMetadata2 = metadata.MD{ 107 "tkey1": []string{"trailerValue12"}, 108 "tkey2": []string{"trailerValue22"}, 109 } 110 // capital "Key" is illegal in HTTP/2. 111 malformedHTTP2Metadata = metadata.MD{ 112 "Key": []string{"foo"}, 113 } 114 testAppUA = "myApp1/1.0 myApp2/0.9" 115 failAppUA = "fail-this-RPC" 116 detailedError = status.ErrorProto(&spb.Status{ 117 Code: int32(codes.DataLoss), 118 Message: "error for testing: " + failAppUA, 119 Details: []*anypb.Any{{ 120 TypeUrl: "url", 121 Value: []byte{6, 0, 0, 6, 1, 3}, 122 }}, 123 }) 124) 125 126var raceMode bool // set by race.go in race mode 127 128type testServer struct { 129 testpb.UnimplementedTestServiceServer 130 131 security string // indicate the authentication protocol used by this server. 132 earlyFail bool // whether to error out the execution of a service handler prematurely. 133 setAndSendHeader bool // whether to call setHeader and sendHeader. 134 setHeaderOnly bool // whether to only call setHeader, not sendHeader. 135 multipleSetTrailer bool // whether to call setTrailer multiple times. 136 unaryCallSleepTime time.Duration 137} 138 139func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 140 if md, ok := metadata.FromIncomingContext(ctx); ok { 141 // For testing purpose, returns an error if user-agent is failAppUA. 142 // To test that client gets the correct error. 143 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) { 144 return nil, detailedError 145 } 146 var str []string 147 for _, entry := range md["user-agent"] { 148 str = append(str, "ua", entry) 149 } 150 grpc.SendHeader(ctx, metadata.Pairs(str...)) 151 } 152 return new(testpb.Empty), nil 153} 154 155func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) { 156 if size < 0 { 157 return nil, fmt.Errorf("requested a response with invalid length %d", size) 158 } 159 body := make([]byte, size) 160 switch t { 161 case testpb.PayloadType_COMPRESSABLE: 162 case testpb.PayloadType_UNCOMPRESSABLE: 163 return nil, fmt.Errorf("PayloadType UNCOMPRESSABLE is not supported") 164 default: 165 return nil, fmt.Errorf("unsupported payload type: %d", t) 166 } 167 return &testpb.Payload{ 168 Type: t, 169 Body: body, 170 }, nil 171} 172 173func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 174 md, ok := metadata.FromIncomingContext(ctx) 175 if ok { 176 if _, exists := md[":authority"]; !exists { 177 return nil, status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md) 178 } 179 if s.setAndSendHeader { 180 if err := grpc.SetHeader(ctx, md); err != nil { 181 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err) 182 } 183 if err := grpc.SendHeader(ctx, testMetadata2); err != nil { 184 return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err) 185 } 186 } else if s.setHeaderOnly { 187 if err := grpc.SetHeader(ctx, md); err != nil { 188 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err) 189 } 190 if err := grpc.SetHeader(ctx, testMetadata2); err != nil { 191 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err) 192 } 193 } else { 194 if err := grpc.SendHeader(ctx, md); err != nil { 195 return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err) 196 } 197 } 198 if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil { 199 return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err) 200 } 201 if s.multipleSetTrailer { 202 if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil { 203 return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err) 204 } 205 } 206 } 207 pr, ok := peer.FromContext(ctx) 208 if !ok { 209 return nil, status.Error(codes.DataLoss, "failed to get peer from ctx") 210 } 211 if pr.Addr == net.Addr(nil) { 212 return nil, status.Error(codes.DataLoss, "failed to get peer address") 213 } 214 if s.security != "" { 215 // Check Auth info 216 var authType, serverName string 217 switch info := pr.AuthInfo.(type) { 218 case credentials.TLSInfo: 219 authType = info.AuthType() 220 serverName = info.State.ServerName 221 default: 222 return nil, status.Error(codes.Unauthenticated, "Unknown AuthInfo type") 223 } 224 if authType != s.security { 225 return nil, status.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security) 226 } 227 if serverName != "x.test.youtube.com" { 228 return nil, status.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName) 229 } 230 } 231 // Simulate some service delay. 232 time.Sleep(s.unaryCallSleepTime) 233 234 payload, err := newPayload(in.GetResponseType(), in.GetResponseSize()) 235 if err != nil { 236 return nil, err 237 } 238 239 return &testpb.SimpleResponse{ 240 Payload: payload, 241 }, nil 242} 243 244func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { 245 if md, ok := metadata.FromIncomingContext(stream.Context()); ok { 246 if _, exists := md[":authority"]; !exists { 247 return status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md) 248 } 249 // For testing purpose, returns an error if user-agent is failAppUA. 250 // To test that client gets the correct error. 251 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) { 252 return status.Error(codes.DataLoss, "error for testing: "+failAppUA) 253 } 254 } 255 cs := args.GetResponseParameters() 256 for _, c := range cs { 257 if us := c.GetIntervalUs(); us > 0 { 258 time.Sleep(time.Duration(us) * time.Microsecond) 259 } 260 261 payload, err := newPayload(args.GetResponseType(), c.GetSize()) 262 if err != nil { 263 return err 264 } 265 266 if err := stream.Send(&testpb.StreamingOutputCallResponse{ 267 Payload: payload, 268 }); err != nil { 269 return err 270 } 271 } 272 return nil 273} 274 275func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error { 276 var sum int 277 for { 278 in, err := stream.Recv() 279 if err == io.EOF { 280 return stream.SendAndClose(&testpb.StreamingInputCallResponse{ 281 AggregatedPayloadSize: int32(sum), 282 }) 283 } 284 if err != nil { 285 return err 286 } 287 p := in.GetPayload().GetBody() 288 sum += len(p) 289 if s.earlyFail { 290 return status.Error(codes.NotFound, "not found") 291 } 292 } 293} 294 295func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 296 md, ok := metadata.FromIncomingContext(stream.Context()) 297 if ok { 298 if s.setAndSendHeader { 299 if err := stream.SetHeader(md); err != nil { 300 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err) 301 } 302 if err := stream.SendHeader(testMetadata2); err != nil { 303 return status.Errorf(status.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err) 304 } 305 } else if s.setHeaderOnly { 306 if err := stream.SetHeader(md); err != nil { 307 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err) 308 } 309 if err := stream.SetHeader(testMetadata2); err != nil { 310 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err) 311 } 312 } else { 313 if err := stream.SendHeader(md); err != nil { 314 return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) 315 } 316 } 317 stream.SetTrailer(testTrailerMetadata) 318 if s.multipleSetTrailer { 319 stream.SetTrailer(testTrailerMetadata2) 320 } 321 } 322 for { 323 in, err := stream.Recv() 324 if err == io.EOF { 325 // read done. 326 return nil 327 } 328 if err != nil { 329 // to facilitate testSvrWriteStatusEarlyWrite 330 if status.Code(err) == codes.ResourceExhausted { 331 return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) 332 } 333 return err 334 } 335 cs := in.GetResponseParameters() 336 for _, c := range cs { 337 if us := c.GetIntervalUs(); us > 0 { 338 time.Sleep(time.Duration(us) * time.Microsecond) 339 } 340 341 payload, err := newPayload(in.GetResponseType(), c.GetSize()) 342 if err != nil { 343 return err 344 } 345 346 if err := stream.Send(&testpb.StreamingOutputCallResponse{ 347 Payload: payload, 348 }); err != nil { 349 // to facilitate testSvrWriteStatusEarlyWrite 350 if status.Code(err) == codes.ResourceExhausted { 351 return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) 352 } 353 return err 354 } 355 } 356 } 357} 358 359func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error { 360 var msgBuf []*testpb.StreamingOutputCallRequest 361 for { 362 in, err := stream.Recv() 363 if err == io.EOF { 364 // read done. 365 break 366 } 367 if err != nil { 368 return err 369 } 370 msgBuf = append(msgBuf, in) 371 } 372 for _, m := range msgBuf { 373 cs := m.GetResponseParameters() 374 for _, c := range cs { 375 if us := c.GetIntervalUs(); us > 0 { 376 time.Sleep(time.Duration(us) * time.Microsecond) 377 } 378 379 payload, err := newPayload(m.GetResponseType(), c.GetSize()) 380 if err != nil { 381 return err 382 } 383 384 if err := stream.Send(&testpb.StreamingOutputCallResponse{ 385 Payload: payload, 386 }); err != nil { 387 return err 388 } 389 } 390 } 391 return nil 392} 393 394type env struct { 395 name string 396 network string // The type of network such as tcp, unix, etc. 397 security string // The security protocol such as TLS, SSH, etc. 398 httpHandler bool // whether to use the http.Handler ServerTransport; requires TLS 399 balancer string // One of "round_robin", "pick_first", or "". 400 customDialer func(string, string, time.Duration) (net.Conn, error) 401} 402 403func (e env) runnable() bool { 404 if runtime.GOOS == "windows" && e.network == "unix" { 405 return false 406 } 407 return true 408} 409 410func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) { 411 if e.customDialer != nil { 412 return e.customDialer(e.network, addr, timeout) 413 } 414 return net.DialTimeout(e.network, addr, timeout) 415} 416 417var ( 418 tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp"} 419 tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls"} 420 tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"} 421 tcpTLSRREnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"} 422 handlerEnv = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"} 423 noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"} 424 allEnv = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv} 425) 426 427var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.") 428 429func listTestEnv() (envs []env) { 430 if *onlyEnv != "" { 431 for _, e := range allEnv { 432 if e.name == *onlyEnv { 433 if !e.runnable() { 434 panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS)) 435 } 436 return []env{e} 437 } 438 } 439 panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv)) 440 } 441 for _, e := range allEnv { 442 if e.runnable() { 443 envs = append(envs, e) 444 } 445 } 446 return envs 447} 448 449// test is an end-to-end test. It should be created with the newTest 450// func, modified as needed, and then started with its startServer method. 451// It should be cleaned up with the tearDown method. 452type test struct { 453 // The following are setup in newTest(). 454 t *testing.T 455 e env 456 ctx context.Context // valid for life of test, before tearDown 457 cancel context.CancelFunc 458 459 // The following knobs are for the server-side, and should be set after 460 // calling newTest() and before calling startServer(). 461 462 // whether or not to expose the server's health via the default health 463 // service implementation. 464 enableHealthServer bool 465 // In almost all cases, one should set the 'enableHealthServer' flag above to 466 // expose the server's health using the default health service 467 // implementation. This should only be used when a non-default health service 468 // implementation is required. 469 healthServer healthpb.HealthServer 470 maxStream uint32 471 tapHandle tap.ServerInHandle 472 maxServerMsgSize *int 473 maxServerReceiveMsgSize *int 474 maxServerSendMsgSize *int 475 maxServerHeaderListSize *uint32 476 // Used to test the deprecated API WithCompressor and WithDecompressor. 477 serverCompression bool 478 unknownHandler grpc.StreamHandler 479 unaryServerInt grpc.UnaryServerInterceptor 480 streamServerInt grpc.StreamServerInterceptor 481 serverInitialWindowSize int32 482 serverInitialConnWindowSize int32 483 customServerOptions []grpc.ServerOption 484 485 // The following knobs are for the client-side, and should be set after 486 // calling newTest() and before calling clientConn(). 487 maxClientMsgSize *int 488 maxClientReceiveMsgSize *int 489 maxClientSendMsgSize *int 490 maxClientHeaderListSize *uint32 491 userAgent string 492 // Used to test the deprecated API WithCompressor and WithDecompressor. 493 clientCompression bool 494 // Used to test the new compressor registration API UseCompressor. 495 clientUseCompression bool 496 // clientNopCompression is set to create a compressor whose type is not supported. 497 clientNopCompression bool 498 unaryClientInt grpc.UnaryClientInterceptor 499 streamClientInt grpc.StreamClientInterceptor 500 sc <-chan grpc.ServiceConfig 501 customCodec encoding.Codec 502 clientInitialWindowSize int32 503 clientInitialConnWindowSize int32 504 perRPCCreds credentials.PerRPCCredentials 505 customDialOptions []grpc.DialOption 506 resolverScheme string 507 508 // All test dialing is blocking by default. Set this to true if dial 509 // should be non-blocking. 510 nonBlockingDial bool 511 512 // These are are set once startServer is called. The common case is to have 513 // only one testServer. 514 srv stopper 515 hSrv healthpb.HealthServer 516 srvAddr string 517 518 // These are are set once startServers is called. 519 srvs []stopper 520 hSrvs []healthpb.HealthServer 521 srvAddrs []string 522 523 cc *grpc.ClientConn // nil until requested via clientConn 524 restoreLogs func() // nil unless declareLogNoise is used 525} 526 527type stopper interface { 528 Stop() 529 GracefulStop() 530} 531 532func (te *test) tearDown() { 533 if te.cancel != nil { 534 te.cancel() 535 te.cancel = nil 536 } 537 538 if te.cc != nil { 539 te.cc.Close() 540 te.cc = nil 541 } 542 543 if te.restoreLogs != nil { 544 te.restoreLogs() 545 te.restoreLogs = nil 546 } 547 548 if te.srv != nil { 549 te.srv.Stop() 550 } 551 for _, s := range te.srvs { 552 s.Stop() 553 } 554} 555 556// newTest returns a new test using the provided testing.T and 557// environment. It is returned with default values. Tests should 558// modify it before calling its startServer and clientConn methods. 559func newTest(t *testing.T, e env) *test { 560 te := &test{ 561 t: t, 562 e: e, 563 maxStream: math.MaxUint32, 564 } 565 te.ctx, te.cancel = context.WithCancel(context.Background()) 566 return te 567} 568 569func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener { 570 te.t.Helper() 571 te.t.Logf("Running test in %s environment...", te.e.name) 572 sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} 573 if te.maxServerMsgSize != nil { 574 sopts = append(sopts, grpc.MaxMsgSize(*te.maxServerMsgSize)) 575 } 576 if te.maxServerReceiveMsgSize != nil { 577 sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize)) 578 } 579 if te.maxServerSendMsgSize != nil { 580 sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize)) 581 } 582 if te.maxServerHeaderListSize != nil { 583 sopts = append(sopts, grpc.MaxHeaderListSize(*te.maxServerHeaderListSize)) 584 } 585 if te.tapHandle != nil { 586 sopts = append(sopts, grpc.InTapHandle(te.tapHandle)) 587 } 588 if te.serverCompression { 589 sopts = append(sopts, 590 grpc.RPCCompressor(grpc.NewGZIPCompressor()), 591 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), 592 ) 593 } 594 if te.unaryServerInt != nil { 595 sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt)) 596 } 597 if te.streamServerInt != nil { 598 sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt)) 599 } 600 if te.unknownHandler != nil { 601 sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler)) 602 } 603 if te.serverInitialWindowSize > 0 { 604 sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize)) 605 } 606 if te.serverInitialConnWindowSize > 0 { 607 sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize)) 608 } 609 la := "localhost:0" 610 switch te.e.network { 611 case "unix": 612 la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano()) 613 syscall.Unlink(la) 614 } 615 lis, err := listen(te.e.network, la) 616 if err != nil { 617 te.t.Fatalf("Failed to listen: %v", err) 618 } 619 if te.e.security == "tls" { 620 creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key")) 621 if err != nil { 622 te.t.Fatalf("Failed to generate credentials %v", err) 623 } 624 sopts = append(sopts, grpc.Creds(creds)) 625 } 626 sopts = append(sopts, te.customServerOptions...) 627 s := grpc.NewServer(sopts...) 628 if ts != nil { 629 testpb.RegisterTestServiceServer(s, ts) 630 } 631 632 // Create a new default health server if enableHealthServer is set, or use 633 // the provided one. 634 hs := te.healthServer 635 if te.enableHealthServer { 636 hs = health.NewServer() 637 } 638 if hs != nil { 639 healthgrpc.RegisterHealthServer(s, hs) 640 } 641 642 addr := la 643 switch te.e.network { 644 case "unix": 645 default: 646 _, port, err := net.SplitHostPort(lis.Addr().String()) 647 if err != nil { 648 te.t.Fatalf("Failed to parse listener address: %v", err) 649 } 650 addr = "localhost:" + port 651 } 652 653 te.srv = s 654 te.hSrv = hs 655 te.srvAddr = addr 656 657 if te.e.httpHandler { 658 if te.e.security != "tls" { 659 te.t.Fatalf("unsupported environment settings") 660 } 661 cert, err := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key")) 662 if err != nil { 663 te.t.Fatal("tls.LoadX509KeyPair(server1.pem, server1.key) failed: ", err) 664 } 665 hs := &http.Server{ 666 Handler: s, 667 TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}}, 668 } 669 if err := http2.ConfigureServer(hs, &http2.Server{MaxConcurrentStreams: te.maxStream}); err != nil { 670 te.t.Fatal("http2.ConfigureServer(_, _) failed: ", err) 671 } 672 te.srv = wrapHS{hs} 673 tlsListener := tls.NewListener(lis, hs.TLSConfig) 674 go hs.Serve(tlsListener) 675 return lis 676 } 677 678 go s.Serve(lis) 679 return lis 680} 681 682type wrapHS struct { 683 s *http.Server 684} 685 686func (w wrapHS) GracefulStop() { 687 w.s.Shutdown(context.Background()) 688} 689 690func (w wrapHS) Stop() { 691 w.s.Close() 692} 693 694func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listenerWrapper { 695 l := te.listenAndServe(ts, listenWithConnControl) 696 return l.(*listenerWrapper) 697} 698 699// startServer starts a gRPC server exposing the provided TestService 700// implementation. Callers should defer a call to te.tearDown to clean up 701func (te *test) startServer(ts testpb.TestServiceServer) { 702 te.t.Helper() 703 te.listenAndServe(ts, net.Listen) 704} 705 706// startServers starts 'num' gRPC servers exposing the provided TestService. 707func (te *test) startServers(ts testpb.TestServiceServer, num int) { 708 for i := 0; i < num; i++ { 709 te.startServer(ts) 710 te.srvs = append(te.srvs, te.srv.(*grpc.Server)) 711 te.hSrvs = append(te.hSrvs, te.hSrv) 712 te.srvAddrs = append(te.srvAddrs, te.srvAddr) 713 te.srv = nil 714 te.hSrv = nil 715 te.srvAddr = "" 716 } 717} 718 719// setHealthServingStatus is a helper function to set the health status. 720func (te *test) setHealthServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) { 721 hs, ok := te.hSrv.(*health.Server) 722 if !ok { 723 panic(fmt.Sprintf("SetServingStatus(%v, %v) called for health server of type %T", service, status, hs)) 724 } 725 hs.SetServingStatus(service, status) 726} 727 728type nopCompressor struct { 729 grpc.Compressor 730} 731 732// NewNopCompressor creates a compressor to test the case that type is not supported. 733func NewNopCompressor() grpc.Compressor { 734 return &nopCompressor{grpc.NewGZIPCompressor()} 735} 736 737func (c *nopCompressor) Type() string { 738 return "nop" 739} 740 741type nopDecompressor struct { 742 grpc.Decompressor 743} 744 745// NewNopDecompressor creates a decompressor to test the case that type is not supported. 746func NewNopDecompressor() grpc.Decompressor { 747 return &nopDecompressor{grpc.NewGZIPDecompressor()} 748} 749 750func (d *nopDecompressor) Type() string { 751 return "nop" 752} 753 754func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) { 755 opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent)) 756 757 if te.sc != nil { 758 opts = append(opts, grpc.WithServiceConfig(te.sc)) 759 } 760 761 if te.clientCompression { 762 opts = append(opts, 763 grpc.WithCompressor(grpc.NewGZIPCompressor()), 764 grpc.WithDecompressor(grpc.NewGZIPDecompressor()), 765 ) 766 } 767 if te.clientUseCompression { 768 opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) 769 } 770 if te.clientNopCompression { 771 opts = append(opts, 772 grpc.WithCompressor(NewNopCompressor()), 773 grpc.WithDecompressor(NewNopDecompressor()), 774 ) 775 } 776 if te.unaryClientInt != nil { 777 opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt)) 778 } 779 if te.streamClientInt != nil { 780 opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt)) 781 } 782 if te.maxClientMsgSize != nil { 783 opts = append(opts, grpc.WithMaxMsgSize(*te.maxClientMsgSize)) 784 } 785 if te.maxClientReceiveMsgSize != nil { 786 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize))) 787 } 788 if te.maxClientSendMsgSize != nil { 789 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize))) 790 } 791 if te.maxClientHeaderListSize != nil { 792 opts = append(opts, grpc.WithMaxHeaderListSize(*te.maxClientHeaderListSize)) 793 } 794 switch te.e.security { 795 case "tls": 796 creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com") 797 if err != nil { 798 te.t.Fatalf("Failed to load credentials: %v", err) 799 } 800 opts = append(opts, grpc.WithTransportCredentials(creds)) 801 case "empty": 802 // Don't add any transport creds option. 803 default: 804 opts = append(opts, grpc.WithInsecure()) 805 } 806 // TODO(bar) switch balancer case "pick_first". 807 var scheme string 808 if te.resolverScheme == "" { 809 scheme = "passthrough:///" 810 } else { 811 scheme = te.resolverScheme + ":///" 812 } 813 if te.e.balancer != "" { 814 opts = append(opts, grpc.WithBalancerName(te.e.balancer)) 815 } 816 if te.clientInitialWindowSize > 0 { 817 opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) 818 } 819 if te.clientInitialConnWindowSize > 0 { 820 opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize)) 821 } 822 if te.perRPCCreds != nil { 823 opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds)) 824 } 825 if te.customCodec != nil { 826 opts = append(opts, grpc.WithDefaultCallOptions(grpc.ForceCodec(te.customCodec))) 827 } 828 if !te.nonBlockingDial && te.srvAddr != "" { 829 // Only do a blocking dial if server is up. 830 opts = append(opts, grpc.WithBlock()) 831 } 832 if te.srvAddr == "" { 833 te.srvAddr = "client.side.only.test" 834 } 835 opts = append(opts, te.customDialOptions...) 836 return opts, scheme 837} 838 839func (te *test) clientConnWithConnControl() (*grpc.ClientConn, *dialerWrapper) { 840 if te.cc != nil { 841 return te.cc, nil 842 } 843 opts, scheme := te.configDial() 844 dw := &dialerWrapper{} 845 // overwrite the dialer before 846 opts = append(opts, grpc.WithDialer(dw.dialer)) 847 var err error 848 te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...) 849 if err != nil { 850 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err) 851 } 852 return te.cc, dw 853} 854 855func (te *test) clientConn(opts ...grpc.DialOption) *grpc.ClientConn { 856 if te.cc != nil { 857 return te.cc 858 } 859 var scheme string 860 opts, scheme = te.configDial(opts...) 861 var err error 862 te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...) 863 if err != nil { 864 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err) 865 } 866 return te.cc 867} 868 869func (te *test) declareLogNoise(phrases ...string) { 870 te.restoreLogs = declareLogNoise(te.t, phrases...) 871} 872 873func (te *test) withServerTester(fn func(st *serverTester)) { 874 c, err := te.e.dialer(te.srvAddr, 10*time.Second) 875 if err != nil { 876 te.t.Fatal(err) 877 } 878 defer c.Close() 879 if te.e.security == "tls" { 880 c = tls.Client(c, &tls.Config{ 881 InsecureSkipVerify: true, 882 NextProtos: []string{http2.NextProtoTLS}, 883 }) 884 } 885 st := newServerTesterFromConn(te.t, c) 886 st.greet() 887 fn(st) 888} 889 890type lazyConn struct { 891 net.Conn 892 beLazy int32 893} 894 895func (l *lazyConn) Write(b []byte) (int, error) { 896 if atomic.LoadInt32(&(l.beLazy)) == 1 { 897 time.Sleep(time.Second) 898 } 899 return l.Conn.Write(b) 900} 901 902func (s) TestContextDeadlineNotIgnored(t *testing.T) { 903 e := noBalancerEnv 904 var lc *lazyConn 905 e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) { 906 conn, err := net.DialTimeout(network, addr, timeout) 907 if err != nil { 908 return nil, err 909 } 910 lc = &lazyConn{Conn: conn} 911 return lc, nil 912 } 913 914 te := newTest(t, e) 915 te.startServer(&testServer{security: e.security}) 916 defer te.tearDown() 917 918 cc := te.clientConn() 919 tc := testpb.NewTestServiceClient(cc) 920 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 921 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 922 } 923 atomic.StoreInt32(&(lc.beLazy), 1) 924 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 925 defer cancel() 926 t1 := time.Now() 927 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 928 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err) 929 } 930 if time.Since(t1) > 2*time.Second { 931 t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline") 932 } 933} 934 935func (s) TestTimeoutOnDeadServer(t *testing.T) { 936 for _, e := range listTestEnv() { 937 testTimeoutOnDeadServer(t, e) 938 } 939} 940 941func testTimeoutOnDeadServer(t *testing.T, e env) { 942 te := newTest(t, e) 943 te.userAgent = testAppUA 944 te.declareLogNoise( 945 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 946 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 947 "grpc: addrConn.resetTransport failed to create client transport: connection error", 948 ) 949 te.startServer(&testServer{security: e.security}) 950 defer te.tearDown() 951 952 cc := te.clientConn() 953 tc := testpb.NewTestServiceClient(cc) 954 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 955 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 956 } 957 te.srv.Stop() 958 959 // Wait for the client to notice the connection is gone. 960 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) 961 state := cc.GetState() 962 for ; state == connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { 963 } 964 cancel() 965 if state == connectivity.Ready { 966 t.Fatalf("Timed out waiting for non-ready state") 967 } 968 ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond) 969 _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)) 970 cancel() 971 if e.balancer != "" && status.Code(err) != codes.DeadlineExceeded { 972 // If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error, 973 // the error will be an internal error. 974 t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded) 975 } 976 awaitNewConnLogOutput() 977} 978 979func (s) TestServerGracefulStopIdempotent(t *testing.T) { 980 for _, e := range listTestEnv() { 981 if e.name == "handler-tls" { 982 continue 983 } 984 testServerGracefulStopIdempotent(t, e) 985 } 986} 987 988func testServerGracefulStopIdempotent(t *testing.T, e env) { 989 te := newTest(t, e) 990 te.userAgent = testAppUA 991 te.startServer(&testServer{security: e.security}) 992 defer te.tearDown() 993 994 for i := 0; i < 3; i++ { 995 te.srv.GracefulStop() 996 } 997} 998 999func (s) TestServerGoAway(t *testing.T) { 1000 for _, e := range listTestEnv() { 1001 if e.name == "handler-tls" { 1002 continue 1003 } 1004 testServerGoAway(t, e) 1005 } 1006} 1007 1008func testServerGoAway(t *testing.T, e env) { 1009 te := newTest(t, e) 1010 te.userAgent = testAppUA 1011 te.startServer(&testServer{security: e.security}) 1012 defer te.tearDown() 1013 1014 cc := te.clientConn() 1015 tc := testpb.NewTestServiceClient(cc) 1016 // Finish an RPC to make sure the connection is good. 1017 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1018 defer cancel() 1019 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1020 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1021 } 1022 ch := make(chan struct{}) 1023 go func() { 1024 te.srv.GracefulStop() 1025 close(ch) 1026 }() 1027 // Loop until the server side GoAway signal is propagated to the client. 1028 for { 1029 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1030 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded { 1031 cancel() 1032 break 1033 } 1034 cancel() 1035 } 1036 // A new RPC should fail. 1037 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 1038 defer cancel() 1039 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal { 1040 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal) 1041 } 1042 <-ch 1043 awaitNewConnLogOutput() 1044} 1045 1046func (s) TestServerGoAwayPendingRPC(t *testing.T) { 1047 for _, e := range listTestEnv() { 1048 if e.name == "handler-tls" { 1049 continue 1050 } 1051 testServerGoAwayPendingRPC(t, e) 1052 } 1053} 1054 1055func testServerGoAwayPendingRPC(t *testing.T, e env) { 1056 te := newTest(t, e) 1057 te.userAgent = testAppUA 1058 te.declareLogNoise( 1059 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1060 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1061 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1062 ) 1063 te.startServer(&testServer{security: e.security}) 1064 defer te.tearDown() 1065 1066 cc := te.clientConn() 1067 tc := testpb.NewTestServiceClient(cc) 1068 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1069 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) 1070 if err != nil { 1071 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1072 } 1073 // Finish an RPC to make sure the connection is good. 1074 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1075 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1076 } 1077 ch := make(chan struct{}) 1078 go func() { 1079 te.srv.GracefulStop() 1080 close(ch) 1081 }() 1082 // Loop until the server side GoAway signal is propagated to the client. 1083 start := time.Now() 1084 errored := false 1085 for time.Since(start) < time.Second { 1086 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1087 _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)) 1088 cancel() 1089 if err != nil { 1090 errored = true 1091 break 1092 } 1093 } 1094 if !errored { 1095 t.Fatalf("GoAway never received by client") 1096 } 1097 respParam := []*testpb.ResponseParameters{{Size: 1}} 1098 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) 1099 if err != nil { 1100 t.Fatal(err) 1101 } 1102 req := &testpb.StreamingOutputCallRequest{ 1103 ResponseType: testpb.PayloadType_COMPRESSABLE, 1104 ResponseParameters: respParam, 1105 Payload: payload, 1106 } 1107 // The existing RPC should be still good to proceed. 1108 if err := stream.Send(req); err != nil { 1109 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err) 1110 } 1111 if _, err := stream.Recv(); err != nil { 1112 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) 1113 } 1114 // The RPC will run until canceled. 1115 cancel() 1116 <-ch 1117 awaitNewConnLogOutput() 1118} 1119 1120func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) { 1121 for _, e := range listTestEnv() { 1122 if e.name == "handler-tls" { 1123 continue 1124 } 1125 testServerMultipleGoAwayPendingRPC(t, e) 1126 } 1127} 1128 1129func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) { 1130 te := newTest(t, e) 1131 te.userAgent = testAppUA 1132 te.declareLogNoise( 1133 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1134 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1135 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1136 ) 1137 te.startServer(&testServer{security: e.security}) 1138 defer te.tearDown() 1139 1140 cc := te.clientConn() 1141 tc := testpb.NewTestServiceClient(cc) 1142 ctx, cancel := context.WithCancel(context.Background()) 1143 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) 1144 if err != nil { 1145 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1146 } 1147 // Finish an RPC to make sure the connection is good. 1148 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1149 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1150 } 1151 ch1 := make(chan struct{}) 1152 go func() { 1153 te.srv.GracefulStop() 1154 close(ch1) 1155 }() 1156 ch2 := make(chan struct{}) 1157 go func() { 1158 te.srv.GracefulStop() 1159 close(ch2) 1160 }() 1161 // Loop until the server side GoAway signal is propagated to the client. 1162 for { 1163 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1164 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1165 cancel() 1166 break 1167 } 1168 cancel() 1169 } 1170 select { 1171 case <-ch1: 1172 t.Fatal("GracefulStop() terminated early") 1173 case <-ch2: 1174 t.Fatal("GracefulStop() terminated early") 1175 default: 1176 } 1177 respParam := []*testpb.ResponseParameters{ 1178 { 1179 Size: 1, 1180 }, 1181 } 1182 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) 1183 if err != nil { 1184 t.Fatal(err) 1185 } 1186 req := &testpb.StreamingOutputCallRequest{ 1187 ResponseType: testpb.PayloadType_COMPRESSABLE, 1188 ResponseParameters: respParam, 1189 Payload: payload, 1190 } 1191 // The existing RPC should be still good to proceed. 1192 if err := stream.Send(req); err != nil { 1193 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 1194 } 1195 if _, err := stream.Recv(); err != nil { 1196 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) 1197 } 1198 if err := stream.CloseSend(); err != nil { 1199 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err) 1200 } 1201 <-ch1 1202 <-ch2 1203 cancel() 1204 awaitNewConnLogOutput() 1205} 1206 1207func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) { 1208 for _, e := range listTestEnv() { 1209 if e.name == "handler-tls" { 1210 continue 1211 } 1212 testConcurrentClientConnCloseAndServerGoAway(t, e) 1213 } 1214} 1215 1216func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { 1217 te := newTest(t, e) 1218 te.userAgent = testAppUA 1219 te.declareLogNoise( 1220 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1221 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1222 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1223 ) 1224 te.startServer(&testServer{security: e.security}) 1225 defer te.tearDown() 1226 1227 cc := te.clientConn() 1228 tc := testpb.NewTestServiceClient(cc) 1229 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1230 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1231 } 1232 ch := make(chan struct{}) 1233 // Close ClientConn and Server concurrently. 1234 go func() { 1235 te.srv.GracefulStop() 1236 close(ch) 1237 }() 1238 go func() { 1239 cc.Close() 1240 }() 1241 <-ch 1242} 1243 1244func (s) TestConcurrentServerStopAndGoAway(t *testing.T) { 1245 for _, e := range listTestEnv() { 1246 if e.name == "handler-tls" { 1247 continue 1248 } 1249 testConcurrentServerStopAndGoAway(t, e) 1250 } 1251} 1252 1253func testConcurrentServerStopAndGoAway(t *testing.T, e env) { 1254 te := newTest(t, e) 1255 te.userAgent = testAppUA 1256 te.declareLogNoise( 1257 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1258 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1259 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1260 ) 1261 te.startServer(&testServer{security: e.security}) 1262 defer te.tearDown() 1263 1264 cc := te.clientConn() 1265 tc := testpb.NewTestServiceClient(cc) 1266 stream, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) 1267 if err != nil { 1268 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1269 } 1270 // Finish an RPC to make sure the connection is good. 1271 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1272 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1273 } 1274 ch := make(chan struct{}) 1275 go func() { 1276 te.srv.GracefulStop() 1277 close(ch) 1278 }() 1279 // Loop until the server side GoAway signal is propagated to the client. 1280 for { 1281 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1282 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1283 cancel() 1284 break 1285 } 1286 cancel() 1287 } 1288 // Stop the server and close all the connections. 1289 te.srv.Stop() 1290 respParam := []*testpb.ResponseParameters{ 1291 { 1292 Size: 1, 1293 }, 1294 } 1295 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) 1296 if err != nil { 1297 t.Fatal(err) 1298 } 1299 req := &testpb.StreamingOutputCallRequest{ 1300 ResponseType: testpb.PayloadType_COMPRESSABLE, 1301 ResponseParameters: respParam, 1302 Payload: payload, 1303 } 1304 sendStart := time.Now() 1305 for { 1306 if err := stream.Send(req); err == io.EOF { 1307 // stream.Send should eventually send io.EOF 1308 break 1309 } else if err != nil { 1310 // Send should never return a transport-level error. 1311 t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err) 1312 } 1313 if time.Since(sendStart) > 2*time.Second { 1314 t.Fatalf("stream.Send(_) did not return io.EOF after 2s") 1315 } 1316 time.Sleep(time.Millisecond) 1317 } 1318 if _, err := stream.Recv(); err == nil || err == io.EOF { 1319 t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err) 1320 } 1321 <-ch 1322 awaitNewConnLogOutput() 1323} 1324 1325func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) { 1326 for _, e := range listTestEnv() { 1327 if e.name == "handler-tls" { 1328 continue 1329 } 1330 testClientConnCloseAfterGoAwayWithActiveStream(t, e) 1331 } 1332} 1333 1334func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) { 1335 te := newTest(t, e) 1336 te.startServer(&testServer{security: e.security}) 1337 defer te.tearDown() 1338 cc := te.clientConn() 1339 tc := testpb.NewTestServiceClient(cc) 1340 1341 ctx, cancel := context.WithCancel(context.Background()) 1342 defer cancel() 1343 if _, err := tc.FullDuplexCall(ctx); err != nil { 1344 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) 1345 } 1346 done := make(chan struct{}) 1347 go func() { 1348 te.srv.GracefulStop() 1349 close(done) 1350 }() 1351 time.Sleep(50 * time.Millisecond) 1352 cc.Close() 1353 timeout := time.NewTimer(time.Second) 1354 select { 1355 case <-done: 1356 case <-timeout.C: 1357 t.Fatalf("Test timed-out.") 1358 } 1359} 1360 1361func (s) TestFailFast(t *testing.T) { 1362 for _, e := range listTestEnv() { 1363 testFailFast(t, e) 1364 } 1365} 1366 1367func testFailFast(t *testing.T, e env) { 1368 te := newTest(t, e) 1369 te.userAgent = testAppUA 1370 te.declareLogNoise( 1371 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1372 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1373 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1374 ) 1375 te.startServer(&testServer{security: e.security}) 1376 defer te.tearDown() 1377 1378 cc := te.clientConn() 1379 tc := testpb.NewTestServiceClient(cc) 1380 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 1381 defer cancel() 1382 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 1383 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1384 } 1385 // Stop the server and tear down all the existing connections. 1386 te.srv.Stop() 1387 // Loop until the server teardown is propagated to the client. 1388 for { 1389 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 1390 _, err := tc.EmptyCall(ctx, &testpb.Empty{}) 1391 cancel() 1392 if status.Code(err) == codes.Unavailable { 1393 break 1394 } 1395 t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err) 1396 time.Sleep(10 * time.Millisecond) 1397 } 1398 // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. 1399 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { 1400 t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable) 1401 } 1402 if _, err := tc.StreamingInputCall(context.Background()); status.Code(err) != codes.Unavailable { 1403 t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable) 1404 } 1405 1406 awaitNewConnLogOutput() 1407} 1408 1409func testServiceConfigSetup(t *testing.T, e env) *test { 1410 te := newTest(t, e) 1411 te.userAgent = testAppUA 1412 te.declareLogNoise( 1413 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1414 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1415 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1416 "Failed to dial : context canceled; please retry.", 1417 ) 1418 return te 1419} 1420 1421func newBool(b bool) (a *bool) { 1422 return &b 1423} 1424 1425func newInt(b int) (a *int) { 1426 return &b 1427} 1428 1429func newDuration(b time.Duration) (a *time.Duration) { 1430 a = new(time.Duration) 1431 *a = b 1432 return 1433} 1434 1435func (s) TestGetMethodConfig(t *testing.T) { 1436 te := testServiceConfigSetup(t, tcpClearRREnv) 1437 defer te.tearDown() 1438 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1439 defer rcleanup() 1440 1441 te.resolverScheme = r.Scheme() 1442 cc := te.clientConn() 1443 addrs := []resolver.Address{{Addr: te.srvAddr}} 1444 r.UpdateState(resolver.State{ 1445 Addresses: addrs, 1446 ServiceConfig: parseCfg(r, `{ 1447 "methodConfig": [ 1448 { 1449 "name": [ 1450 { 1451 "service": "grpc.testing.TestService", 1452 "method": "EmptyCall" 1453 } 1454 ], 1455 "waitForReady": true, 1456 "timeout": ".001s" 1457 }, 1458 { 1459 "name": [ 1460 { 1461 "service": "grpc.testing.TestService" 1462 } 1463 ], 1464 "waitForReady": false 1465 } 1466 ] 1467}`)}) 1468 1469 tc := testpb.NewTestServiceClient(cc) 1470 1471 // Make sure service config has been processed by grpc. 1472 for { 1473 if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { 1474 break 1475 } 1476 time.Sleep(time.Millisecond) 1477 } 1478 1479 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 1480 var err error 1481 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 1482 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1483 } 1484 1485 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(r, `{ 1486 "methodConfig": [ 1487 { 1488 "name": [ 1489 { 1490 "service": "grpc.testing.TestService", 1491 "method": "UnaryCall" 1492 } 1493 ], 1494 "waitForReady": true, 1495 "timeout": ".001s" 1496 }, 1497 { 1498 "name": [ 1499 { 1500 "service": "grpc.testing.TestService" 1501 } 1502 ], 1503 "waitForReady": false 1504 } 1505 ] 1506}`)}) 1507 1508 // Make sure service config has been processed by grpc. 1509 for { 1510 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady { 1511 break 1512 } 1513 time.Sleep(time.Millisecond) 1514 } 1515 // The following RPCs are expected to become fail-fast. 1516 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { 1517 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) 1518 } 1519} 1520 1521func (s) TestServiceConfigWaitForReady(t *testing.T) { 1522 te := testServiceConfigSetup(t, tcpClearRREnv) 1523 defer te.tearDown() 1524 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1525 defer rcleanup() 1526 1527 // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. 1528 te.resolverScheme = r.Scheme() 1529 cc := te.clientConn() 1530 addrs := []resolver.Address{{Addr: te.srvAddr}} 1531 r.UpdateState(resolver.State{ 1532 Addresses: addrs, 1533 ServiceConfig: parseCfg(r, `{ 1534 "methodConfig": [ 1535 { 1536 "name": [ 1537 { 1538 "service": "grpc.testing.TestService", 1539 "method": "EmptyCall" 1540 }, 1541 { 1542 "service": "grpc.testing.TestService", 1543 "method": "FullDuplexCall" 1544 } 1545 ], 1546 "waitForReady": false, 1547 "timeout": ".001s" 1548 } 1549 ] 1550}`)}) 1551 1552 tc := testpb.NewTestServiceClient(cc) 1553 1554 // Make sure service config has been processed by grpc. 1555 for { 1556 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil { 1557 break 1558 } 1559 time.Sleep(time.Millisecond) 1560 } 1561 1562 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 1563 var err error 1564 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1565 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1566 } 1567 if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1568 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1569 } 1570 1571 // Generate a service config update. 1572 // Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. 1573 r.UpdateState(resolver.State{ 1574 Addresses: addrs, 1575 ServiceConfig: parseCfg(r, `{ 1576 "methodConfig": [ 1577 { 1578 "name": [ 1579 { 1580 "service": "grpc.testing.TestService", 1581 "method": "EmptyCall" 1582 }, 1583 { 1584 "service": "grpc.testing.TestService", 1585 "method": "FullDuplexCall" 1586 } 1587 ], 1588 "waitForReady": true, 1589 "timeout": ".001s" 1590 } 1591 ] 1592}`)}) 1593 1594 // Wait for the new service config to take effect. 1595 for { 1596 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady { 1597 break 1598 } 1599 time.Sleep(time.Millisecond) 1600 } 1601 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 1602 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 1603 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1604 } 1605 if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded { 1606 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1607 } 1608} 1609 1610func (s) TestServiceConfigTimeout(t *testing.T) { 1611 te := testServiceConfigSetup(t, tcpClearRREnv) 1612 defer te.tearDown() 1613 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1614 defer rcleanup() 1615 1616 // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 1617 te.resolverScheme = r.Scheme() 1618 cc := te.clientConn() 1619 addrs := []resolver.Address{{Addr: te.srvAddr}} 1620 r.UpdateState(resolver.State{ 1621 Addresses: addrs, 1622 ServiceConfig: parseCfg(r, `{ 1623 "methodConfig": [ 1624 { 1625 "name": [ 1626 { 1627 "service": "grpc.testing.TestService", 1628 "method": "EmptyCall" 1629 }, 1630 { 1631 "service": "grpc.testing.TestService", 1632 "method": "FullDuplexCall" 1633 } 1634 ], 1635 "waitForReady": true, 1636 "timeout": "3600s" 1637 } 1638 ] 1639}`)}) 1640 1641 tc := testpb.NewTestServiceClient(cc) 1642 1643 // Make sure service config has been processed by grpc. 1644 for { 1645 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { 1646 break 1647 } 1648 time.Sleep(time.Millisecond) 1649 } 1650 1651 // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. 1652 var err error 1653 ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) 1654 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1655 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1656 } 1657 cancel() 1658 1659 ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) 1660 if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1661 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1662 } 1663 cancel() 1664 1665 // Generate a service config update. 1666 // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 1667 r.UpdateState(resolver.State{ 1668 Addresses: addrs, 1669 ServiceConfig: parseCfg(r, `{ 1670 "methodConfig": [ 1671 { 1672 "name": [ 1673 { 1674 "service": "grpc.testing.TestService", 1675 "method": "EmptyCall" 1676 }, 1677 { 1678 "service": "grpc.testing.TestService", 1679 "method": "FullDuplexCall" 1680 } 1681 ], 1682 "waitForReady": true, 1683 "timeout": ".000000001s" 1684 } 1685 ] 1686}`)}) 1687 1688 // Wait for the new service config to take effect. 1689 for { 1690 if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond { 1691 break 1692 } 1693 time.Sleep(time.Millisecond) 1694 } 1695 1696 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 1697 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1698 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1699 } 1700 cancel() 1701 1702 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 1703 if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1704 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1705 } 1706 cancel() 1707} 1708 1709func (s) TestServiceConfigMaxMsgSize(t *testing.T) { 1710 e := tcpClearRREnv 1711 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1712 defer rcleanup() 1713 1714 // Setting up values and objects shared across all test cases. 1715 const smallSize = 1 1716 const largeSize = 1024 1717 const extraLargeSize = 2048 1718 1719 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 1720 if err != nil { 1721 t.Fatal(err) 1722 } 1723 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 1724 if err != nil { 1725 t.Fatal(err) 1726 } 1727 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) 1728 if err != nil { 1729 t.Fatal(err) 1730 } 1731 1732 // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 1733 te1 := testServiceConfigSetup(t, e) 1734 defer te1.tearDown() 1735 1736 te1.resolverScheme = r.Scheme() 1737 te1.nonBlockingDial = true 1738 te1.startServer(&testServer{security: e.security}) 1739 cc1 := te1.clientConn() 1740 1741 addrs := []resolver.Address{{Addr: te1.srvAddr}} 1742 sc := parseCfg(r, `{ 1743 "methodConfig": [ 1744 { 1745 "name": [ 1746 { 1747 "service": "grpc.testing.TestService", 1748 "method": "UnaryCall" 1749 }, 1750 { 1751 "service": "grpc.testing.TestService", 1752 "method": "FullDuplexCall" 1753 } 1754 ], 1755 "maxRequestMessageBytes": 2048, 1756 "maxResponseMessageBytes": 2048 1757 } 1758 ] 1759}`) 1760 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc}) 1761 tc := testpb.NewTestServiceClient(cc1) 1762 1763 req := &testpb.SimpleRequest{ 1764 ResponseType: testpb.PayloadType_COMPRESSABLE, 1765 ResponseSize: int32(extraLargeSize), 1766 Payload: smallPayload, 1767 } 1768 1769 for { 1770 if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { 1771 break 1772 } 1773 time.Sleep(time.Millisecond) 1774 } 1775 1776 // Test for unary RPC recv. 1777 if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { 1778 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1779 } 1780 1781 // Test for unary RPC send. 1782 req.Payload = extraLargePayload 1783 req.ResponseSize = int32(smallSize) 1784 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1785 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1786 } 1787 1788 // Test for streaming RPC recv. 1789 respParam := []*testpb.ResponseParameters{ 1790 { 1791 Size: int32(extraLargeSize), 1792 }, 1793 } 1794 sreq := &testpb.StreamingOutputCallRequest{ 1795 ResponseType: testpb.PayloadType_COMPRESSABLE, 1796 ResponseParameters: respParam, 1797 Payload: smallPayload, 1798 } 1799 stream, err := tc.FullDuplexCall(te1.ctx) 1800 if err != nil { 1801 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1802 } 1803 if err = stream.Send(sreq); err != nil { 1804 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1805 } 1806 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 1807 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 1808 } 1809 1810 // Test for streaming RPC send. 1811 respParam[0].Size = int32(smallSize) 1812 sreq.Payload = extraLargePayload 1813 stream, err = tc.FullDuplexCall(te1.ctx) 1814 if err != nil { 1815 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1816 } 1817 if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 1818 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 1819 } 1820 1821 // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 1822 te2 := testServiceConfigSetup(t, e) 1823 te2.resolverScheme = r.Scheme() 1824 te2.nonBlockingDial = true 1825 te2.maxClientReceiveMsgSize = newInt(1024) 1826 te2.maxClientSendMsgSize = newInt(1024) 1827 1828 te2.startServer(&testServer{security: e.security}) 1829 defer te2.tearDown() 1830 cc2 := te2.clientConn() 1831 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: sc}) 1832 tc = testpb.NewTestServiceClient(cc2) 1833 1834 for { 1835 if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { 1836 break 1837 } 1838 time.Sleep(time.Millisecond) 1839 } 1840 1841 // Test for unary RPC recv. 1842 req.Payload = smallPayload 1843 req.ResponseSize = int32(largeSize) 1844 1845 if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { 1846 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1847 } 1848 1849 // Test for unary RPC send. 1850 req.Payload = largePayload 1851 req.ResponseSize = int32(smallSize) 1852 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1853 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1854 } 1855 1856 // Test for streaming RPC recv. 1857 stream, err = tc.FullDuplexCall(te2.ctx) 1858 respParam[0].Size = int32(largeSize) 1859 sreq.Payload = smallPayload 1860 if err != nil { 1861 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1862 } 1863 if err = stream.Send(sreq); err != nil { 1864 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1865 } 1866 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 1867 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 1868 } 1869 1870 // Test for streaming RPC send. 1871 respParam[0].Size = int32(smallSize) 1872 sreq.Payload = largePayload 1873 stream, err = tc.FullDuplexCall(te2.ctx) 1874 if err != nil { 1875 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1876 } 1877 if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 1878 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 1879 } 1880 1881 // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 1882 te3 := testServiceConfigSetup(t, e) 1883 te3.resolverScheme = r.Scheme() 1884 te3.nonBlockingDial = true 1885 te3.maxClientReceiveMsgSize = newInt(4096) 1886 te3.maxClientSendMsgSize = newInt(4096) 1887 1888 te3.startServer(&testServer{security: e.security}) 1889 defer te3.tearDown() 1890 1891 cc3 := te3.clientConn() 1892 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: sc}) 1893 tc = testpb.NewTestServiceClient(cc3) 1894 1895 for { 1896 if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { 1897 break 1898 } 1899 time.Sleep(time.Millisecond) 1900 } 1901 1902 // Test for unary RPC recv. 1903 req.Payload = smallPayload 1904 req.ResponseSize = int32(largeSize) 1905 1906 if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err != nil { 1907 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 1908 } 1909 1910 req.ResponseSize = int32(extraLargeSize) 1911 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1912 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1913 } 1914 1915 // Test for unary RPC send. 1916 req.Payload = largePayload 1917 req.ResponseSize = int32(smallSize) 1918 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 1919 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 1920 } 1921 1922 req.Payload = extraLargePayload 1923 if _, err = tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1924 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1925 } 1926 1927 // Test for streaming RPC recv. 1928 stream, err = tc.FullDuplexCall(te3.ctx) 1929 if err != nil { 1930 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1931 } 1932 respParam[0].Size = int32(largeSize) 1933 sreq.Payload = smallPayload 1934 1935 if err = stream.Send(sreq); err != nil { 1936 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1937 } 1938 if _, err = stream.Recv(); err != nil { 1939 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err) 1940 } 1941 1942 respParam[0].Size = int32(extraLargeSize) 1943 1944 if err = stream.Send(sreq); err != nil { 1945 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1946 } 1947 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 1948 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 1949 } 1950 1951 // Test for streaming RPC send. 1952 respParam[0].Size = int32(smallSize) 1953 sreq.Payload = largePayload 1954 stream, err = tc.FullDuplexCall(te3.ctx) 1955 if err != nil { 1956 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1957 } 1958 if err := stream.Send(sreq); err != nil { 1959 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1960 } 1961 sreq.Payload = extraLargePayload 1962 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 1963 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 1964 } 1965} 1966 1967// Reading from a streaming RPC may fail with context canceled if timeout was 1968// set by service config (https://github.com/grpc/grpc-go/issues/1818). This 1969// test makes sure read from streaming RPC doesn't fail in this case. 1970func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) { 1971 te := testServiceConfigSetup(t, tcpClearRREnv) 1972 te.startServer(&testServer{security: tcpClearRREnv.security}) 1973 defer te.tearDown() 1974 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1975 defer rcleanup() 1976 1977 te.resolverScheme = r.Scheme() 1978 te.nonBlockingDial = true 1979 cc := te.clientConn() 1980 tc := testpb.NewTestServiceClient(cc) 1981 1982 r.UpdateState(resolver.State{ 1983 Addresses: []resolver.Address{{Addr: te.srvAddr}}, 1984 ServiceConfig: parseCfg(r, `{ 1985 "methodConfig": [ 1986 { 1987 "name": [ 1988 { 1989 "service": "grpc.testing.TestService", 1990 "method": "FullDuplexCall" 1991 } 1992 ], 1993 "waitForReady": true, 1994 "timeout": "10s" 1995 } 1996 ] 1997 }`)}) 1998 // Make sure service config has been processed by grpc. 1999 for { 2000 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { 2001 break 2002 } 2003 time.Sleep(time.Millisecond) 2004 } 2005 2006 ctx, cancel := context.WithCancel(context.Background()) 2007 defer cancel() 2008 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) 2009 if err != nil { 2010 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 2011 } 2012 2013 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0) 2014 if err != nil { 2015 t.Fatalf("failed to newPayload: %v", err) 2016 } 2017 req := &testpb.StreamingOutputCallRequest{ 2018 ResponseType: testpb.PayloadType_COMPRESSABLE, 2019 ResponseParameters: []*testpb.ResponseParameters{{Size: 0}}, 2020 Payload: payload, 2021 } 2022 if err := stream.Send(req); err != nil { 2023 t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err) 2024 } 2025 stream.CloseSend() 2026 time.Sleep(time.Second) 2027 // Sleep 1 second before recv to make sure the final status is received 2028 // before the recv. 2029 if _, err := stream.Recv(); err != nil { 2030 t.Fatalf("stream.Recv = _, %v, want _, <nil>", err) 2031 } 2032 // Keep reading to drain the stream. 2033 for { 2034 if _, err := stream.Recv(); err != nil { 2035 break 2036 } 2037 } 2038} 2039 2040func (s) TestPreloaderClientSend(t *testing.T) { 2041 for _, e := range listTestEnv() { 2042 testPreloaderClientSend(t, e) 2043 } 2044} 2045 2046func testPreloaderClientSend(t *testing.T, e env) { 2047 te := newTest(t, e) 2048 te.userAgent = testAppUA 2049 te.declareLogNoise( 2050 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2051 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2052 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2053 "Failed to dial : context canceled; please retry.", 2054 ) 2055 te.startServer(&testServer{security: e.security}) 2056 2057 defer te.tearDown() 2058 tc := testpb.NewTestServiceClient(te.clientConn()) 2059 2060 // Test for streaming RPC recv. 2061 // Set context for send with proper RPC Information 2062 stream, err := tc.FullDuplexCall(te.ctx, grpc.UseCompressor("gzip")) 2063 if err != nil { 2064 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2065 } 2066 var index int 2067 for index < len(reqSizes) { 2068 respParam := []*testpb.ResponseParameters{ 2069 { 2070 Size: int32(respSizes[index]), 2071 }, 2072 } 2073 2074 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) 2075 if err != nil { 2076 t.Fatal(err) 2077 } 2078 2079 req := &testpb.StreamingOutputCallRequest{ 2080 ResponseType: testpb.PayloadType_COMPRESSABLE, 2081 ResponseParameters: respParam, 2082 Payload: payload, 2083 } 2084 preparedMsg := &grpc.PreparedMsg{} 2085 err = preparedMsg.Encode(stream, req) 2086 if err != nil { 2087 t.Fatalf("PrepareMsg failed for size %d : %v", reqSizes[index], err) 2088 } 2089 if err := stream.SendMsg(preparedMsg); err != nil { 2090 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 2091 } 2092 reply, err := stream.Recv() 2093 if err != nil { 2094 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 2095 } 2096 pt := reply.GetPayload().GetType() 2097 if pt != testpb.PayloadType_COMPRESSABLE { 2098 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) 2099 } 2100 size := len(reply.GetPayload().GetBody()) 2101 if size != int(respSizes[index]) { 2102 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) 2103 } 2104 index++ 2105 } 2106 if err := stream.CloseSend(); err != nil { 2107 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 2108 } 2109 if _, err := stream.Recv(); err != io.EOF { 2110 t.Fatalf("%v failed to complele the ping pong test: %v", stream, err) 2111 } 2112} 2113 2114func (s) TestMaxMsgSizeClientDefault(t *testing.T) { 2115 for _, e := range listTestEnv() { 2116 testMaxMsgSizeClientDefault(t, e) 2117 } 2118} 2119 2120func testMaxMsgSizeClientDefault(t *testing.T, e env) { 2121 te := newTest(t, e) 2122 te.userAgent = testAppUA 2123 te.declareLogNoise( 2124 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2125 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2126 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2127 "Failed to dial : context canceled; please retry.", 2128 ) 2129 te.startServer(&testServer{security: e.security}) 2130 2131 defer te.tearDown() 2132 tc := testpb.NewTestServiceClient(te.clientConn()) 2133 2134 const smallSize = 1 2135 const largeSize = 4 * 1024 * 1024 2136 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2137 if err != nil { 2138 t.Fatal(err) 2139 } 2140 req := &testpb.SimpleRequest{ 2141 ResponseType: testpb.PayloadType_COMPRESSABLE, 2142 ResponseSize: int32(largeSize), 2143 Payload: smallPayload, 2144 } 2145 // Test for unary RPC recv. 2146 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2147 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2148 } 2149 2150 respParam := []*testpb.ResponseParameters{ 2151 { 2152 Size: int32(largeSize), 2153 }, 2154 } 2155 sreq := &testpb.StreamingOutputCallRequest{ 2156 ResponseType: testpb.PayloadType_COMPRESSABLE, 2157 ResponseParameters: respParam, 2158 Payload: smallPayload, 2159 } 2160 2161 // Test for streaming RPC recv. 2162 stream, err := tc.FullDuplexCall(te.ctx) 2163 if err != nil { 2164 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2165 } 2166 if err := stream.Send(sreq); err != nil { 2167 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2168 } 2169 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2170 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2171 } 2172} 2173 2174func (s) TestMaxMsgSizeClientAPI(t *testing.T) { 2175 for _, e := range listTestEnv() { 2176 testMaxMsgSizeClientAPI(t, e) 2177 } 2178} 2179 2180func testMaxMsgSizeClientAPI(t *testing.T, e env) { 2181 te := newTest(t, e) 2182 te.userAgent = testAppUA 2183 // To avoid error on server side. 2184 te.maxServerSendMsgSize = newInt(5 * 1024 * 1024) 2185 te.maxClientReceiveMsgSize = newInt(1024) 2186 te.maxClientSendMsgSize = newInt(1024) 2187 te.declareLogNoise( 2188 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2189 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2190 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2191 "Failed to dial : context canceled; please retry.", 2192 ) 2193 te.startServer(&testServer{security: e.security}) 2194 2195 defer te.tearDown() 2196 tc := testpb.NewTestServiceClient(te.clientConn()) 2197 2198 const smallSize = 1 2199 const largeSize = 1024 2200 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2201 if err != nil { 2202 t.Fatal(err) 2203 } 2204 2205 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 2206 if err != nil { 2207 t.Fatal(err) 2208 } 2209 req := &testpb.SimpleRequest{ 2210 ResponseType: testpb.PayloadType_COMPRESSABLE, 2211 ResponseSize: int32(largeSize), 2212 Payload: smallPayload, 2213 } 2214 // Test for unary RPC recv. 2215 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2216 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2217 } 2218 2219 // Test for unary RPC send. 2220 req.Payload = largePayload 2221 req.ResponseSize = int32(smallSize) 2222 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2223 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2224 } 2225 2226 respParam := []*testpb.ResponseParameters{ 2227 { 2228 Size: int32(largeSize), 2229 }, 2230 } 2231 sreq := &testpb.StreamingOutputCallRequest{ 2232 ResponseType: testpb.PayloadType_COMPRESSABLE, 2233 ResponseParameters: respParam, 2234 Payload: smallPayload, 2235 } 2236 2237 // Test for streaming RPC recv. 2238 stream, err := tc.FullDuplexCall(te.ctx) 2239 if err != nil { 2240 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2241 } 2242 if err := stream.Send(sreq); err != nil { 2243 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2244 } 2245 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2246 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2247 } 2248 2249 // Test for streaming RPC send. 2250 respParam[0].Size = int32(smallSize) 2251 sreq.Payload = largePayload 2252 stream, err = tc.FullDuplexCall(te.ctx) 2253 if err != nil { 2254 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2255 } 2256 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 2257 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 2258 } 2259} 2260 2261func (s) TestMaxMsgSizeServerAPI(t *testing.T) { 2262 for _, e := range listTestEnv() { 2263 testMaxMsgSizeServerAPI(t, e) 2264 } 2265} 2266 2267func testMaxMsgSizeServerAPI(t *testing.T, e env) { 2268 te := newTest(t, e) 2269 te.userAgent = testAppUA 2270 te.maxServerReceiveMsgSize = newInt(1024) 2271 te.maxServerSendMsgSize = newInt(1024) 2272 te.declareLogNoise( 2273 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2274 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2275 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2276 "Failed to dial : context canceled; please retry.", 2277 ) 2278 te.startServer(&testServer{security: e.security}) 2279 2280 defer te.tearDown() 2281 tc := testpb.NewTestServiceClient(te.clientConn()) 2282 2283 const smallSize = 1 2284 const largeSize = 1024 2285 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2286 if err != nil { 2287 t.Fatal(err) 2288 } 2289 2290 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 2291 if err != nil { 2292 t.Fatal(err) 2293 } 2294 req := &testpb.SimpleRequest{ 2295 ResponseType: testpb.PayloadType_COMPRESSABLE, 2296 ResponseSize: int32(largeSize), 2297 Payload: smallPayload, 2298 } 2299 // Test for unary RPC send. 2300 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2301 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2302 } 2303 2304 // Test for unary RPC recv. 2305 req.Payload = largePayload 2306 req.ResponseSize = int32(smallSize) 2307 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2308 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2309 } 2310 2311 respParam := []*testpb.ResponseParameters{ 2312 { 2313 Size: int32(largeSize), 2314 }, 2315 } 2316 sreq := &testpb.StreamingOutputCallRequest{ 2317 ResponseType: testpb.PayloadType_COMPRESSABLE, 2318 ResponseParameters: respParam, 2319 Payload: smallPayload, 2320 } 2321 2322 // Test for streaming RPC send. 2323 stream, err := tc.FullDuplexCall(te.ctx) 2324 if err != nil { 2325 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2326 } 2327 if err := stream.Send(sreq); err != nil { 2328 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2329 } 2330 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2331 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2332 } 2333 2334 // Test for streaming RPC recv. 2335 respParam[0].Size = int32(smallSize) 2336 sreq.Payload = largePayload 2337 stream, err = tc.FullDuplexCall(te.ctx) 2338 if err != nil { 2339 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2340 } 2341 if err := stream.Send(sreq); err != nil { 2342 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2343 } 2344 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2345 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2346 } 2347} 2348 2349func (s) TestTap(t *testing.T) { 2350 for _, e := range listTestEnv() { 2351 if e.name == "handler-tls" { 2352 continue 2353 } 2354 testTap(t, e) 2355 } 2356} 2357 2358type myTap struct { 2359 cnt int 2360} 2361 2362func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) { 2363 if info != nil { 2364 if info.FullMethodName == "/grpc.testing.TestService/EmptyCall" { 2365 t.cnt++ 2366 } else if info.FullMethodName == "/grpc.testing.TestService/UnaryCall" { 2367 return nil, fmt.Errorf("tap error") 2368 } 2369 } 2370 return ctx, nil 2371} 2372 2373func testTap(t *testing.T, e env) { 2374 te := newTest(t, e) 2375 te.userAgent = testAppUA 2376 ttap := &myTap{} 2377 te.tapHandle = ttap.handle 2378 te.declareLogNoise( 2379 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2380 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2381 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2382 ) 2383 te.startServer(&testServer{security: e.security}) 2384 defer te.tearDown() 2385 2386 cc := te.clientConn() 2387 tc := testpb.NewTestServiceClient(cc) 2388 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 2389 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 2390 } 2391 if ttap.cnt != 1 { 2392 t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt) 2393 } 2394 2395 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31) 2396 if err != nil { 2397 t.Fatal(err) 2398 } 2399 2400 req := &testpb.SimpleRequest{ 2401 ResponseType: testpb.PayloadType_COMPRESSABLE, 2402 ResponseSize: 45, 2403 Payload: payload, 2404 } 2405 if _, err := tc.UnaryCall(context.Background(), req); status.Code(err) != codes.Unavailable { 2406 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) 2407 } 2408} 2409 2410// healthCheck is a helper function to make a unary health check RPC and return 2411// the response. 2412func healthCheck(d time.Duration, cc *grpc.ClientConn, service string) (*healthpb.HealthCheckResponse, error) { 2413 ctx, cancel := context.WithTimeout(context.Background(), d) 2414 defer cancel() 2415 hc := healthgrpc.NewHealthClient(cc) 2416 return hc.Check(ctx, &healthpb.HealthCheckRequest{Service: service}) 2417} 2418 2419// verifyHealthCheckStatus is a helper function to verify that the current 2420// health status of the service matches the one passed in 'wantStatus'. 2421func verifyHealthCheckStatus(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantStatus healthpb.HealthCheckResponse_ServingStatus) { 2422 t.Helper() 2423 resp, err := healthCheck(d, cc, service) 2424 if err != nil { 2425 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) 2426 } 2427 if resp.Status != wantStatus { 2428 t.Fatalf("Got the serving status %v, want %v", resp.Status, wantStatus) 2429 } 2430} 2431 2432// verifyHealthCheckErrCode is a helper function to verify that a unary health 2433// check RPC returns an error with a code set to 'wantCode'. 2434func verifyHealthCheckErrCode(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantCode codes.Code) { 2435 t.Helper() 2436 if _, err := healthCheck(d, cc, service); status.Code(err) != wantCode { 2437 t.Fatalf("Health/Check() got errCode %v, want %v", status.Code(err), wantCode) 2438 } 2439} 2440 2441// newHealthCheckStream is a helper function to start a health check streaming 2442// RPC, and returns the stream. 2443func newHealthCheckStream(t *testing.T, cc *grpc.ClientConn, service string) (healthgrpc.Health_WatchClient, context.CancelFunc) { 2444 t.Helper() 2445 ctx, cancel := context.WithCancel(context.Background()) 2446 hc := healthgrpc.NewHealthClient(cc) 2447 stream, err := hc.Watch(ctx, &healthpb.HealthCheckRequest{Service: service}) 2448 if err != nil { 2449 t.Fatalf("hc.Watch(_, %v) failed: %v", service, err) 2450 } 2451 return stream, cancel 2452} 2453 2454// healthWatchChecker is a helper function to verify that the next health 2455// status returned on the given stream matches the one passed in 'wantStatus'. 2456func healthWatchChecker(t *testing.T, stream healthgrpc.Health_WatchClient, wantStatus healthpb.HealthCheckResponse_ServingStatus) { 2457 t.Helper() 2458 response, err := stream.Recv() 2459 if err != nil { 2460 t.Fatalf("stream.Recv() failed: %v", err) 2461 } 2462 if response.Status != wantStatus { 2463 t.Fatalf("got servingStatus %v, want %v", response.Status, wantStatus) 2464 } 2465} 2466 2467// TestHealthCheckSuccess invokes the unary Check() RPC on the health server in 2468// a successful case. 2469func (s) TestHealthCheckSuccess(t *testing.T) { 2470 for _, e := range listTestEnv() { 2471 testHealthCheckSuccess(t, e) 2472 } 2473} 2474 2475func testHealthCheckSuccess(t *testing.T, e env) { 2476 te := newTest(t, e) 2477 te.enableHealthServer = true 2478 te.startServer(&testServer{security: e.security}) 2479 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) 2480 defer te.tearDown() 2481 2482 verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.OK) 2483} 2484 2485// TestHealthCheckFailure invokes the unary Check() RPC on the health server 2486// with an expired context and expects the RPC to fail. 2487func (s) TestHealthCheckFailure(t *testing.T) { 2488 for _, e := range listTestEnv() { 2489 testHealthCheckFailure(t, e) 2490 } 2491} 2492 2493func testHealthCheckFailure(t *testing.T, e env) { 2494 te := newTest(t, e) 2495 te.declareLogNoise( 2496 "Failed to dial ", 2497 "grpc: the client connection is closing; please retry", 2498 ) 2499 te.enableHealthServer = true 2500 te.startServer(&testServer{security: e.security}) 2501 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) 2502 defer te.tearDown() 2503 2504 verifyHealthCheckErrCode(t, 0*time.Second, te.clientConn(), defaultHealthService, codes.DeadlineExceeded) 2505 awaitNewConnLogOutput() 2506} 2507 2508// TestHealthCheckOff makes a unary Check() RPC on the health server where the 2509// health status of the defaultHealthService is not set, and therefore expects 2510// an error code 'codes.NotFound'. 2511func (s) TestHealthCheckOff(t *testing.T) { 2512 for _, e := range listTestEnv() { 2513 // TODO(bradfitz): Temporarily skip this env due to #619. 2514 if e.name == "handler-tls" { 2515 continue 2516 } 2517 testHealthCheckOff(t, e) 2518 } 2519} 2520 2521func testHealthCheckOff(t *testing.T, e env) { 2522 te := newTest(t, e) 2523 te.enableHealthServer = true 2524 te.startServer(&testServer{security: e.security}) 2525 defer te.tearDown() 2526 2527 verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.NotFound) 2528} 2529 2530// TestHealthWatchMultipleClients makes a streaming Watch() RPC on the health 2531// server with multiple clients and expects the same status on both streams. 2532func (s) TestHealthWatchMultipleClients(t *testing.T) { 2533 for _, e := range listTestEnv() { 2534 testHealthWatchMultipleClients(t, e) 2535 } 2536} 2537 2538func testHealthWatchMultipleClients(t *testing.T, e env) { 2539 te := newTest(t, e) 2540 te.enableHealthServer = true 2541 te.startServer(&testServer{security: e.security}) 2542 defer te.tearDown() 2543 2544 cc := te.clientConn() 2545 stream1, cf1 := newHealthCheckStream(t, cc, defaultHealthService) 2546 defer cf1() 2547 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2548 2549 stream2, cf2 := newHealthCheckStream(t, cc, defaultHealthService) 2550 defer cf2() 2551 healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2552 2553 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) 2554 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING) 2555 healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING) 2556} 2557 2558// TestHealthWatchSameStatusmakes a streaming Watch() RPC on the health server 2559// and makes sure that the health status of the server is as expected after 2560// multiple calls to SetServingStatus with the same status. 2561func (s) TestHealthWatchSameStatus(t *testing.T) { 2562 for _, e := range listTestEnv() { 2563 testHealthWatchSameStatus(t, e) 2564 } 2565} 2566 2567func testHealthWatchSameStatus(t *testing.T, e env) { 2568 te := newTest(t, e) 2569 te.enableHealthServer = true 2570 te.startServer(&testServer{security: e.security}) 2571 defer te.tearDown() 2572 2573 stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) 2574 defer cf() 2575 2576 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2577 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) 2578 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2579 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) 2580 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) 2581 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING) 2582} 2583 2584// TestHealthWatchServiceStatusSetBeforeStartingServer starts a health server 2585// on which the health status for the defaultService is set before the gRPC 2586// server is started, and expects the correct health status to be returned. 2587func (s) TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) { 2588 for _, e := range listTestEnv() { 2589 testHealthWatchSetServiceStatusBeforeStartingServer(t, e) 2590 } 2591} 2592 2593func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) { 2594 hs := health.NewServer() 2595 te := newTest(t, e) 2596 te.healthServer = hs 2597 hs.SetServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) 2598 te.startServer(&testServer{security: e.security}) 2599 defer te.tearDown() 2600 2601 stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) 2602 defer cf() 2603 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2604} 2605 2606// TestHealthWatchDefaultStatusChange verifies the simple case where the 2607// service starts off with a SERVICE_UNKNOWN status (because SetServingStatus 2608// hasn't been called yet) and then moves to SERVING after SetServingStatus is 2609// called. 2610func (s) TestHealthWatchDefaultStatusChange(t *testing.T) { 2611 for _, e := range listTestEnv() { 2612 testHealthWatchDefaultStatusChange(t, e) 2613 } 2614} 2615 2616func testHealthWatchDefaultStatusChange(t *testing.T, e env) { 2617 te := newTest(t, e) 2618 te.enableHealthServer = true 2619 te.startServer(&testServer{security: e.security}) 2620 defer te.tearDown() 2621 2622 stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) 2623 defer cf() 2624 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2625 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) 2626 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2627} 2628 2629// TestHealthWatchSetServiceStatusBeforeClientCallsWatch verifies the case 2630// where the health status is set to SERVING before the client calls Watch(). 2631func (s) TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) { 2632 for _, e := range listTestEnv() { 2633 testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e) 2634 } 2635} 2636 2637func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) { 2638 te := newTest(t, e) 2639 te.enableHealthServer = true 2640 te.startServer(&testServer{security: e.security}) 2641 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) 2642 defer te.tearDown() 2643 2644 stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService) 2645 defer cf() 2646 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2647} 2648 2649// TestHealthWatchOverallServerHealthChange verifies setting the overall status 2650// of the server by using the empty service name. 2651func (s) TestHealthWatchOverallServerHealthChange(t *testing.T) { 2652 for _, e := range listTestEnv() { 2653 testHealthWatchOverallServerHealthChange(t, e) 2654 } 2655} 2656 2657func testHealthWatchOverallServerHealthChange(t *testing.T, e env) { 2658 te := newTest(t, e) 2659 te.enableHealthServer = true 2660 te.startServer(&testServer{security: e.security}) 2661 defer te.tearDown() 2662 2663 stream, cf := newHealthCheckStream(t, te.clientConn(), "") 2664 defer cf() 2665 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2666 te.setHealthServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) 2667 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING) 2668} 2669 2670// TestUnknownHandler verifies that an expected error is returned (by setting 2671// the unknownHandler on the server) for a service which is not exposed to the 2672// client. 2673func (s) TestUnknownHandler(t *testing.T) { 2674 // An example unknownHandler that returns a different code and a different 2675 // method, making sure that we do not expose what methods are implemented to 2676 // a client that is not authenticated. 2677 unknownHandler := func(srv interface{}, stream grpc.ServerStream) error { 2678 return status.Error(codes.Unauthenticated, "user unauthenticated") 2679 } 2680 for _, e := range listTestEnv() { 2681 // TODO(bradfitz): Temporarily skip this env due to #619. 2682 if e.name == "handler-tls" { 2683 continue 2684 } 2685 testUnknownHandler(t, e, unknownHandler) 2686 } 2687} 2688 2689func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) { 2690 te := newTest(t, e) 2691 te.unknownHandler = unknownHandler 2692 te.startServer(&testServer{security: e.security}) 2693 defer te.tearDown() 2694 verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), "", codes.Unauthenticated) 2695} 2696 2697// TestHealthCheckServingStatus makes a streaming Watch() RPC on the health 2698// server and verifies a bunch of health status transitions. 2699func (s) TestHealthCheckServingStatus(t *testing.T) { 2700 for _, e := range listTestEnv() { 2701 testHealthCheckServingStatus(t, e) 2702 } 2703} 2704 2705func testHealthCheckServingStatus(t *testing.T, e env) { 2706 te := newTest(t, e) 2707 te.enableHealthServer = true 2708 te.startServer(&testServer{security: e.security}) 2709 defer te.tearDown() 2710 2711 cc := te.clientConn() 2712 verifyHealthCheckStatus(t, 1*time.Second, cc, "", healthpb.HealthCheckResponse_SERVING) 2713 verifyHealthCheckErrCode(t, 1*time.Second, cc, defaultHealthService, codes.NotFound) 2714 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING) 2715 verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_SERVING) 2716 te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) 2717 verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING) 2718} 2719 2720func (s) TestEmptyUnaryWithUserAgent(t *testing.T) { 2721 for _, e := range listTestEnv() { 2722 testEmptyUnaryWithUserAgent(t, e) 2723 } 2724} 2725 2726func testEmptyUnaryWithUserAgent(t *testing.T, e env) { 2727 te := newTest(t, e) 2728 te.userAgent = testAppUA 2729 te.startServer(&testServer{security: e.security}) 2730 defer te.tearDown() 2731 2732 cc := te.clientConn() 2733 tc := testpb.NewTestServiceClient(cc) 2734 var header metadata.MD 2735 reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header)) 2736 if err != nil || !proto.Equal(&testpb.Empty{}, reply) { 2737 t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{}) 2738 } 2739 if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) { 2740 t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA) 2741 } 2742 2743 te.srv.Stop() 2744} 2745 2746func (s) TestFailedEmptyUnary(t *testing.T) { 2747 for _, e := range listTestEnv() { 2748 if e.name == "handler-tls" { 2749 // This test covers status details, but 2750 // Grpc-Status-Details-Bin is not support in handler_server. 2751 continue 2752 } 2753 testFailedEmptyUnary(t, e) 2754 } 2755} 2756 2757func testFailedEmptyUnary(t *testing.T, e env) { 2758 te := newTest(t, e) 2759 te.userAgent = failAppUA 2760 te.startServer(&testServer{security: e.security}) 2761 defer te.tearDown() 2762 tc := testpb.NewTestServiceClient(te.clientConn()) 2763 2764 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 2765 wantErr := detailedError 2766 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !testutils.StatusErrEqual(err, wantErr) { 2767 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr) 2768 } 2769} 2770 2771func (s) TestLargeUnary(t *testing.T) { 2772 for _, e := range listTestEnv() { 2773 testLargeUnary(t, e) 2774 } 2775} 2776 2777func testLargeUnary(t *testing.T, e env) { 2778 te := newTest(t, e) 2779 te.startServer(&testServer{security: e.security}) 2780 defer te.tearDown() 2781 tc := testpb.NewTestServiceClient(te.clientConn()) 2782 2783 const argSize = 271828 2784 const respSize = 314159 2785 2786 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 2787 if err != nil { 2788 t.Fatal(err) 2789 } 2790 2791 req := &testpb.SimpleRequest{ 2792 ResponseType: testpb.PayloadType_COMPRESSABLE, 2793 ResponseSize: respSize, 2794 Payload: payload, 2795 } 2796 reply, err := tc.UnaryCall(context.Background(), req) 2797 if err != nil { 2798 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 2799 } 2800 pt := reply.GetPayload().GetType() 2801 ps := len(reply.GetPayload().GetBody()) 2802 if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize { 2803 t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize) 2804 } 2805} 2806 2807// Test backward-compatibility API for setting msg size limit. 2808func (s) TestExceedMsgLimit(t *testing.T) { 2809 for _, e := range listTestEnv() { 2810 testExceedMsgLimit(t, e) 2811 } 2812} 2813 2814func testExceedMsgLimit(t *testing.T, e env) { 2815 te := newTest(t, e) 2816 maxMsgSize := 1024 2817 te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize) 2818 te.startServer(&testServer{security: e.security}) 2819 defer te.tearDown() 2820 tc := testpb.NewTestServiceClient(te.clientConn()) 2821 2822 largeSize := int32(maxMsgSize + 1) 2823 const smallSize = 1 2824 2825 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 2826 if err != nil { 2827 t.Fatal(err) 2828 } 2829 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2830 if err != nil { 2831 t.Fatal(err) 2832 } 2833 2834 // Make sure the server cannot receive a unary RPC of largeSize. 2835 req := &testpb.SimpleRequest{ 2836 ResponseType: testpb.PayloadType_COMPRESSABLE, 2837 ResponseSize: smallSize, 2838 Payload: largePayload, 2839 } 2840 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2841 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2842 } 2843 // Make sure the client cannot receive a unary RPC of largeSize. 2844 req.ResponseSize = largeSize 2845 req.Payload = smallPayload 2846 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2847 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2848 } 2849 2850 // Make sure the server cannot receive a streaming RPC of largeSize. 2851 stream, err := tc.FullDuplexCall(te.ctx) 2852 if err != nil { 2853 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2854 } 2855 respParam := []*testpb.ResponseParameters{ 2856 { 2857 Size: 1, 2858 }, 2859 } 2860 2861 sreq := &testpb.StreamingOutputCallRequest{ 2862 ResponseType: testpb.PayloadType_COMPRESSABLE, 2863 ResponseParameters: respParam, 2864 Payload: largePayload, 2865 } 2866 if err := stream.Send(sreq); err != nil { 2867 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2868 } 2869 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2870 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2871 } 2872 2873 // Test on client side for streaming RPC. 2874 stream, err = tc.FullDuplexCall(te.ctx) 2875 if err != nil { 2876 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2877 } 2878 respParam[0].Size = largeSize 2879 sreq.Payload = smallPayload 2880 if err := stream.Send(sreq); err != nil { 2881 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2882 } 2883 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2884 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2885 } 2886} 2887 2888func (s) TestPeerClientSide(t *testing.T) { 2889 for _, e := range listTestEnv() { 2890 testPeerClientSide(t, e) 2891 } 2892} 2893 2894func testPeerClientSide(t *testing.T, e env) { 2895 te := newTest(t, e) 2896 te.userAgent = testAppUA 2897 te.startServer(&testServer{security: e.security}) 2898 defer te.tearDown() 2899 tc := testpb.NewTestServiceClient(te.clientConn()) 2900 peer := new(peer.Peer) 2901 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { 2902 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 2903 } 2904 pa := peer.Addr.String() 2905 if e.network == "unix" { 2906 if pa != te.srvAddr { 2907 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr) 2908 } 2909 return 2910 } 2911 _, pp, err := net.SplitHostPort(pa) 2912 if err != nil { 2913 t.Fatalf("Failed to parse address from peer.") 2914 } 2915 _, sp, err := net.SplitHostPort(te.srvAddr) 2916 if err != nil { 2917 t.Fatalf("Failed to parse address of test server.") 2918 } 2919 if pp != sp { 2920 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp) 2921 } 2922} 2923 2924// TestPeerNegative tests that if call fails setting peer 2925// doesn't cause a segmentation fault. 2926// issue#1141 https://github.com/grpc/grpc-go/issues/1141 2927func (s) TestPeerNegative(t *testing.T) { 2928 for _, e := range listTestEnv() { 2929 testPeerNegative(t, e) 2930 } 2931} 2932 2933func testPeerNegative(t *testing.T, e env) { 2934 te := newTest(t, e) 2935 te.startServer(&testServer{security: e.security}) 2936 defer te.tearDown() 2937 2938 cc := te.clientConn() 2939 tc := testpb.NewTestServiceClient(cc) 2940 peer := new(peer.Peer) 2941 ctx, cancel := context.WithCancel(context.Background()) 2942 cancel() 2943 tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)) 2944} 2945 2946func (s) TestPeerFailedRPC(t *testing.T) { 2947 for _, e := range listTestEnv() { 2948 testPeerFailedRPC(t, e) 2949 } 2950} 2951 2952func testPeerFailedRPC(t *testing.T, e env) { 2953 te := newTest(t, e) 2954 te.maxServerReceiveMsgSize = newInt(1 * 1024) 2955 te.startServer(&testServer{security: e.security}) 2956 2957 defer te.tearDown() 2958 tc := testpb.NewTestServiceClient(te.clientConn()) 2959 2960 // first make a successful request to the server 2961 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 2962 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 2963 } 2964 2965 // make a second request that will be rejected by the server 2966 const largeSize = 5 * 1024 2967 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 2968 if err != nil { 2969 t.Fatal(err) 2970 } 2971 req := &testpb.SimpleRequest{ 2972 ResponseType: testpb.PayloadType_COMPRESSABLE, 2973 Payload: largePayload, 2974 } 2975 2976 peer := new(peer.Peer) 2977 if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted { 2978 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2979 } else { 2980 pa := peer.Addr.String() 2981 if e.network == "unix" { 2982 if pa != te.srvAddr { 2983 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr) 2984 } 2985 return 2986 } 2987 _, pp, err := net.SplitHostPort(pa) 2988 if err != nil { 2989 t.Fatalf("Failed to parse address from peer.") 2990 } 2991 _, sp, err := net.SplitHostPort(te.srvAddr) 2992 if err != nil { 2993 t.Fatalf("Failed to parse address of test server.") 2994 } 2995 if pp != sp { 2996 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp) 2997 } 2998 } 2999} 3000 3001func (s) TestMetadataUnaryRPC(t *testing.T) { 3002 for _, e := range listTestEnv() { 3003 testMetadataUnaryRPC(t, e) 3004 } 3005} 3006 3007func testMetadataUnaryRPC(t *testing.T, e env) { 3008 te := newTest(t, e) 3009 te.startServer(&testServer{security: e.security}) 3010 defer te.tearDown() 3011 tc := testpb.NewTestServiceClient(te.clientConn()) 3012 3013 const argSize = 2718 3014 const respSize = 314 3015 3016 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3017 if err != nil { 3018 t.Fatal(err) 3019 } 3020 3021 req := &testpb.SimpleRequest{ 3022 ResponseType: testpb.PayloadType_COMPRESSABLE, 3023 ResponseSize: respSize, 3024 Payload: payload, 3025 } 3026 var header, trailer metadata.MD 3027 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3028 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil { 3029 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3030 } 3031 // Ignore optional response headers that Servers may set: 3032 if header != nil { 3033 delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers 3034 delete(header, "date") // the Date header is also optional 3035 delete(header, "user-agent") 3036 delete(header, "content-type") 3037 } 3038 if !reflect.DeepEqual(header, testMetadata) { 3039 t.Fatalf("Received header metadata %v, want %v", header, testMetadata) 3040 } 3041 if !reflect.DeepEqual(trailer, testTrailerMetadata) { 3042 t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata) 3043 } 3044} 3045 3046func (s) TestMetadataOrderUnaryRPC(t *testing.T) { 3047 for _, e := range listTestEnv() { 3048 testMetadataOrderUnaryRPC(t, e) 3049 } 3050} 3051 3052func testMetadataOrderUnaryRPC(t *testing.T, e env) { 3053 te := newTest(t, e) 3054 te.startServer(&testServer{security: e.security}) 3055 defer te.tearDown() 3056 tc := testpb.NewTestServiceClient(te.clientConn()) 3057 3058 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3059 ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value2") 3060 ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value3") 3061 3062 // using Join to built expected metadata instead of FromOutgoingContext 3063 newMetadata := metadata.Join(testMetadata, metadata.Pairs("key1", "value2", "key1", "value3")) 3064 3065 var header metadata.MD 3066 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.Header(&header)); err != nil { 3067 t.Fatal(err) 3068 } 3069 3070 // Ignore optional response headers that Servers may set: 3071 if header != nil { 3072 delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers 3073 delete(header, "date") // the Date header is also optional 3074 delete(header, "user-agent") 3075 delete(header, "content-type") 3076 } 3077 3078 if !reflect.DeepEqual(header, newMetadata) { 3079 t.Fatalf("Received header metadata %v, want %v", header, newMetadata) 3080 } 3081} 3082 3083func (s) TestMultipleSetTrailerUnaryRPC(t *testing.T) { 3084 for _, e := range listTestEnv() { 3085 testMultipleSetTrailerUnaryRPC(t, e) 3086 } 3087} 3088 3089func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) { 3090 te := newTest(t, e) 3091 te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) 3092 defer te.tearDown() 3093 tc := testpb.NewTestServiceClient(te.clientConn()) 3094 3095 const ( 3096 argSize = 1 3097 respSize = 1 3098 ) 3099 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3100 if err != nil { 3101 t.Fatal(err) 3102 } 3103 3104 req := &testpb.SimpleRequest{ 3105 ResponseType: testpb.PayloadType_COMPRESSABLE, 3106 ResponseSize: respSize, 3107 Payload: payload, 3108 } 3109 var trailer metadata.MD 3110 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3111 if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.WaitForReady(true)); err != nil { 3112 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3113 } 3114 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2) 3115 if !reflect.DeepEqual(trailer, expectedTrailer) { 3116 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer) 3117 } 3118} 3119 3120func (s) TestMultipleSetTrailerStreamingRPC(t *testing.T) { 3121 for _, e := range listTestEnv() { 3122 testMultipleSetTrailerStreamingRPC(t, e) 3123 } 3124} 3125 3126func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) { 3127 te := newTest(t, e) 3128 te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) 3129 defer te.tearDown() 3130 tc := testpb.NewTestServiceClient(te.clientConn()) 3131 3132 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3133 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) 3134 if err != nil { 3135 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3136 } 3137 if err := stream.CloseSend(); err != nil { 3138 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3139 } 3140 if _, err := stream.Recv(); err != io.EOF { 3141 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) 3142 } 3143 3144 trailer := stream.Trailer() 3145 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2) 3146 if !reflect.DeepEqual(trailer, expectedTrailer) { 3147 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer) 3148 } 3149} 3150 3151func (s) TestSetAndSendHeaderUnaryRPC(t *testing.T) { 3152 for _, e := range listTestEnv() { 3153 if e.name == "handler-tls" { 3154 continue 3155 } 3156 testSetAndSendHeaderUnaryRPC(t, e) 3157 } 3158} 3159 3160// To test header metadata is sent on SendHeader(). 3161func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) { 3162 te := newTest(t, e) 3163 te.startServer(&testServer{security: e.security, setAndSendHeader: true}) 3164 defer te.tearDown() 3165 tc := testpb.NewTestServiceClient(te.clientConn()) 3166 3167 const ( 3168 argSize = 1 3169 respSize = 1 3170 ) 3171 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3172 if err != nil { 3173 t.Fatal(err) 3174 } 3175 3176 req := &testpb.SimpleRequest{ 3177 ResponseType: testpb.PayloadType_COMPRESSABLE, 3178 ResponseSize: respSize, 3179 Payload: payload, 3180 } 3181 var header metadata.MD 3182 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3183 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil { 3184 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3185 } 3186 delete(header, "user-agent") 3187 delete(header, "content-type") 3188 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3189 if !reflect.DeepEqual(header, expectedHeader) { 3190 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3191 } 3192} 3193 3194func (s) TestMultipleSetHeaderUnaryRPC(t *testing.T) { 3195 for _, e := range listTestEnv() { 3196 if e.name == "handler-tls" { 3197 continue 3198 } 3199 testMultipleSetHeaderUnaryRPC(t, e) 3200 } 3201} 3202 3203// To test header metadata is sent when sending response. 3204func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) { 3205 te := newTest(t, e) 3206 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3207 defer te.tearDown() 3208 tc := testpb.NewTestServiceClient(te.clientConn()) 3209 3210 const ( 3211 argSize = 1 3212 respSize = 1 3213 ) 3214 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3215 if err != nil { 3216 t.Fatal(err) 3217 } 3218 3219 req := &testpb.SimpleRequest{ 3220 ResponseType: testpb.PayloadType_COMPRESSABLE, 3221 ResponseSize: respSize, 3222 Payload: payload, 3223 } 3224 3225 var header metadata.MD 3226 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3227 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil { 3228 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3229 } 3230 delete(header, "user-agent") 3231 delete(header, "content-type") 3232 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3233 if !reflect.DeepEqual(header, expectedHeader) { 3234 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3235 } 3236} 3237 3238func (s) TestMultipleSetHeaderUnaryRPCError(t *testing.T) { 3239 for _, e := range listTestEnv() { 3240 if e.name == "handler-tls" { 3241 continue 3242 } 3243 testMultipleSetHeaderUnaryRPCError(t, e) 3244 } 3245} 3246 3247// To test header metadata is sent when sending status. 3248func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) { 3249 te := newTest(t, e) 3250 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3251 defer te.tearDown() 3252 tc := testpb.NewTestServiceClient(te.clientConn()) 3253 3254 const ( 3255 argSize = 1 3256 respSize = -1 // Invalid respSize to make RPC fail. 3257 ) 3258 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3259 if err != nil { 3260 t.Fatal(err) 3261 } 3262 3263 req := &testpb.SimpleRequest{ 3264 ResponseType: testpb.PayloadType_COMPRESSABLE, 3265 ResponseSize: respSize, 3266 Payload: payload, 3267 } 3268 var header metadata.MD 3269 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3270 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err == nil { 3271 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err) 3272 } 3273 delete(header, "user-agent") 3274 delete(header, "content-type") 3275 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3276 if !reflect.DeepEqual(header, expectedHeader) { 3277 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3278 } 3279} 3280 3281func (s) TestSetAndSendHeaderStreamingRPC(t *testing.T) { 3282 for _, e := range listTestEnv() { 3283 if e.name == "handler-tls" { 3284 continue 3285 } 3286 testSetAndSendHeaderStreamingRPC(t, e) 3287 } 3288} 3289 3290// To test header metadata is sent on SendHeader(). 3291func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) { 3292 te := newTest(t, e) 3293 te.startServer(&testServer{security: e.security, setAndSendHeader: true}) 3294 defer te.tearDown() 3295 tc := testpb.NewTestServiceClient(te.clientConn()) 3296 3297 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3298 stream, err := tc.FullDuplexCall(ctx) 3299 if err != nil { 3300 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3301 } 3302 if err := stream.CloseSend(); err != nil { 3303 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3304 } 3305 if _, err := stream.Recv(); err != io.EOF { 3306 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) 3307 } 3308 3309 header, err := stream.Header() 3310 if err != nil { 3311 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) 3312 } 3313 delete(header, "user-agent") 3314 delete(header, "content-type") 3315 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3316 if !reflect.DeepEqual(header, expectedHeader) { 3317 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3318 } 3319} 3320 3321func (s) TestMultipleSetHeaderStreamingRPC(t *testing.T) { 3322 for _, e := range listTestEnv() { 3323 if e.name == "handler-tls" { 3324 continue 3325 } 3326 testMultipleSetHeaderStreamingRPC(t, e) 3327 } 3328} 3329 3330// To test header metadata is sent when sending response. 3331func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) { 3332 te := newTest(t, e) 3333 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3334 defer te.tearDown() 3335 tc := testpb.NewTestServiceClient(te.clientConn()) 3336 3337 const ( 3338 argSize = 1 3339 respSize = 1 3340 ) 3341 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3342 stream, err := tc.FullDuplexCall(ctx) 3343 if err != nil { 3344 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3345 } 3346 3347 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3348 if err != nil { 3349 t.Fatal(err) 3350 } 3351 3352 req := &testpb.StreamingOutputCallRequest{ 3353 ResponseType: testpb.PayloadType_COMPRESSABLE, 3354 ResponseParameters: []*testpb.ResponseParameters{ 3355 {Size: respSize}, 3356 }, 3357 Payload: payload, 3358 } 3359 if err := stream.Send(req); err != nil { 3360 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3361 } 3362 if _, err := stream.Recv(); err != nil { 3363 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 3364 } 3365 if err := stream.CloseSend(); err != nil { 3366 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3367 } 3368 if _, err := stream.Recv(); err != io.EOF { 3369 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) 3370 } 3371 3372 header, err := stream.Header() 3373 if err != nil { 3374 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) 3375 } 3376 delete(header, "user-agent") 3377 delete(header, "content-type") 3378 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3379 if !reflect.DeepEqual(header, expectedHeader) { 3380 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3381 } 3382 3383} 3384 3385func (s) TestMultipleSetHeaderStreamingRPCError(t *testing.T) { 3386 for _, e := range listTestEnv() { 3387 if e.name == "handler-tls" { 3388 continue 3389 } 3390 testMultipleSetHeaderStreamingRPCError(t, e) 3391 } 3392} 3393 3394// To test header metadata is sent when sending status. 3395func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) { 3396 te := newTest(t, e) 3397 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3398 defer te.tearDown() 3399 tc := testpb.NewTestServiceClient(te.clientConn()) 3400 3401 const ( 3402 argSize = 1 3403 respSize = -1 3404 ) 3405 ctx, cancel := context.WithCancel(context.Background()) 3406 defer cancel() 3407 ctx = metadata.NewOutgoingContext(ctx, testMetadata) 3408 stream, err := tc.FullDuplexCall(ctx) 3409 if err != nil { 3410 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3411 } 3412 3413 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3414 if err != nil { 3415 t.Fatal(err) 3416 } 3417 3418 req := &testpb.StreamingOutputCallRequest{ 3419 ResponseType: testpb.PayloadType_COMPRESSABLE, 3420 ResponseParameters: []*testpb.ResponseParameters{ 3421 {Size: respSize}, 3422 }, 3423 Payload: payload, 3424 } 3425 if err := stream.Send(req); err != nil { 3426 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3427 } 3428 if _, err := stream.Recv(); err == nil { 3429 t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err) 3430 } 3431 3432 header, err := stream.Header() 3433 if err != nil { 3434 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) 3435 } 3436 delete(header, "user-agent") 3437 delete(header, "content-type") 3438 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3439 if !reflect.DeepEqual(header, expectedHeader) { 3440 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3441 } 3442 if err := stream.CloseSend(); err != nil { 3443 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3444 } 3445} 3446 3447// TestMalformedHTTP2Metadata verfies the returned error when the client 3448// sends an illegal metadata. 3449func (s) TestMalformedHTTP2Metadata(t *testing.T) { 3450 for _, e := range listTestEnv() { 3451 if e.name == "handler-tls" { 3452 // Failed with "server stops accepting new RPCs". 3453 // Server stops accepting new RPCs when the client sends an illegal http2 header. 3454 continue 3455 } 3456 testMalformedHTTP2Metadata(t, e) 3457 } 3458} 3459 3460func testMalformedHTTP2Metadata(t *testing.T, e env) { 3461 te := newTest(t, e) 3462 te.startServer(&testServer{security: e.security}) 3463 defer te.tearDown() 3464 tc := testpb.NewTestServiceClient(te.clientConn()) 3465 3466 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718) 3467 if err != nil { 3468 t.Fatal(err) 3469 } 3470 3471 req := &testpb.SimpleRequest{ 3472 ResponseType: testpb.PayloadType_COMPRESSABLE, 3473 ResponseSize: 314, 3474 Payload: payload, 3475 } 3476 ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata) 3477 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Internal { 3478 t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal) 3479 } 3480} 3481 3482func (s) TestTransparentRetry(t *testing.T) { 3483 for _, e := range listTestEnv() { 3484 if e.name == "handler-tls" { 3485 // Fails with RST_STREAM / FLOW_CONTROL_ERROR 3486 continue 3487 } 3488 testTransparentRetry(t, e) 3489 } 3490} 3491 3492// This test makes sure RPCs are retried times when they receive a RST_STREAM 3493// with the REFUSED_STREAM error code, which the InTapHandle provokes. 3494func testTransparentRetry(t *testing.T, e env) { 3495 te := newTest(t, e) 3496 attempts := 0 3497 successAttempt := 2 3498 te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) { 3499 attempts++ 3500 if attempts < successAttempt { 3501 return nil, errors.New("not now") 3502 } 3503 return ctx, nil 3504 } 3505 te.startServer(&testServer{security: e.security}) 3506 defer te.tearDown() 3507 3508 cc := te.clientConn() 3509 tsc := testpb.NewTestServiceClient(cc) 3510 testCases := []struct { 3511 successAttempt int 3512 failFast bool 3513 errCode codes.Code 3514 }{{ 3515 successAttempt: 1, 3516 }, { 3517 successAttempt: 2, 3518 }, { 3519 successAttempt: 3, 3520 errCode: codes.Unavailable, 3521 }, { 3522 successAttempt: 1, 3523 failFast: true, 3524 }, { 3525 successAttempt: 2, 3526 failFast: true, 3527 }, { 3528 successAttempt: 3, 3529 failFast: true, 3530 errCode: codes.Unavailable, 3531 }} 3532 for _, tc := range testCases { 3533 attempts = 0 3534 successAttempt = tc.successAttempt 3535 3536 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 3537 _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!tc.failFast)) 3538 cancel() 3539 if status.Code(err) != tc.errCode { 3540 t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode) 3541 } 3542 } 3543} 3544 3545func (s) TestCancel(t *testing.T) { 3546 for _, e := range listTestEnv() { 3547 testCancel(t, e) 3548 } 3549} 3550 3551func testCancel(t *testing.T, e env) { 3552 te := newTest(t, e) 3553 te.declareLogNoise("grpc: the client connection is closing; please retry") 3554 te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second}) 3555 defer te.tearDown() 3556 3557 cc := te.clientConn() 3558 tc := testpb.NewTestServiceClient(cc) 3559 3560 const argSize = 2718 3561 const respSize = 314 3562 3563 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3564 if err != nil { 3565 t.Fatal(err) 3566 } 3567 3568 req := &testpb.SimpleRequest{ 3569 ResponseType: testpb.PayloadType_COMPRESSABLE, 3570 ResponseSize: respSize, 3571 Payload: payload, 3572 } 3573 ctx, cancel := context.WithCancel(context.Background()) 3574 time.AfterFunc(1*time.Millisecond, cancel) 3575 if r, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Canceled { 3576 t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled) 3577 } 3578 awaitNewConnLogOutput() 3579} 3580 3581func (s) TestCancelNoIO(t *testing.T) { 3582 for _, e := range listTestEnv() { 3583 testCancelNoIO(t, e) 3584 } 3585} 3586 3587func testCancelNoIO(t *testing.T, e env) { 3588 te := newTest(t, e) 3589 te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken") 3590 te.maxStream = 1 // Only allows 1 live stream per server transport. 3591 te.startServer(&testServer{security: e.security}) 3592 defer te.tearDown() 3593 3594 cc := te.clientConn() 3595 tc := testpb.NewTestServiceClient(cc) 3596 3597 // Start one blocked RPC for which we'll never send streaming 3598 // input. This will consume the 1 maximum concurrent streams, 3599 // causing future RPCs to hang. 3600 ctx, cancelFirst := context.WithCancel(context.Background()) 3601 _, err := tc.StreamingInputCall(ctx) 3602 if err != nil { 3603 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) 3604 } 3605 3606 // Loop until the ClientConn receives the initial settings 3607 // frame from the server, notifying it about the maximum 3608 // concurrent streams. We know when it's received it because 3609 // an RPC will fail with codes.DeadlineExceeded instead of 3610 // succeeding. 3611 // TODO(bradfitz): add internal test hook for this (Issue 534) 3612 for { 3613 ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond) 3614 _, err := tc.StreamingInputCall(ctx) 3615 cancelSecond() 3616 if err == nil { 3617 continue 3618 } 3619 if status.Code(err) == codes.DeadlineExceeded { 3620 break 3621 } 3622 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) 3623 } 3624 // If there are any RPCs in flight before the client receives 3625 // the max streams setting, let them be expired. 3626 // TODO(bradfitz): add internal test hook for this (Issue 534) 3627 time.Sleep(50 * time.Millisecond) 3628 3629 go func() { 3630 time.Sleep(50 * time.Millisecond) 3631 cancelFirst() 3632 }() 3633 3634 // This should be blocked until the 1st is canceled, then succeed. 3635 ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond) 3636 if _, err := tc.StreamingInputCall(ctx); err != nil { 3637 t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) 3638 } 3639 cancelThird() 3640} 3641 3642// The following tests the gRPC streaming RPC implementations. 3643// TODO(zhaoq): Have better coverage on error cases. 3644var ( 3645 reqSizes = []int{27182, 8, 1828, 45904} 3646 respSizes = []int{31415, 9, 2653, 58979} 3647) 3648 3649func (s) TestNoService(t *testing.T) { 3650 for _, e := range listTestEnv() { 3651 testNoService(t, e) 3652 } 3653} 3654 3655func testNoService(t *testing.T, e env) { 3656 te := newTest(t, e) 3657 te.startServer(nil) 3658 defer te.tearDown() 3659 3660 cc := te.clientConn() 3661 tc := testpb.NewTestServiceClient(cc) 3662 3663 stream, err := tc.FullDuplexCall(te.ctx, grpc.WaitForReady(true)) 3664 if err != nil { 3665 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3666 } 3667 if _, err := stream.Recv(); status.Code(err) != codes.Unimplemented { 3668 t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented) 3669 } 3670} 3671 3672func (s) TestPingPong(t *testing.T) { 3673 for _, e := range listTestEnv() { 3674 testPingPong(t, e) 3675 } 3676} 3677 3678func testPingPong(t *testing.T, e env) { 3679 te := newTest(t, e) 3680 te.startServer(&testServer{security: e.security}) 3681 defer te.tearDown() 3682 tc := testpb.NewTestServiceClient(te.clientConn()) 3683 3684 stream, err := tc.FullDuplexCall(te.ctx) 3685 if err != nil { 3686 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3687 } 3688 var index int 3689 for index < len(reqSizes) { 3690 respParam := []*testpb.ResponseParameters{ 3691 { 3692 Size: int32(respSizes[index]), 3693 }, 3694 } 3695 3696 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) 3697 if err != nil { 3698 t.Fatal(err) 3699 } 3700 3701 req := &testpb.StreamingOutputCallRequest{ 3702 ResponseType: testpb.PayloadType_COMPRESSABLE, 3703 ResponseParameters: respParam, 3704 Payload: payload, 3705 } 3706 if err := stream.Send(req); err != nil { 3707 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3708 } 3709 reply, err := stream.Recv() 3710 if err != nil { 3711 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 3712 } 3713 pt := reply.GetPayload().GetType() 3714 if pt != testpb.PayloadType_COMPRESSABLE { 3715 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) 3716 } 3717 size := len(reply.GetPayload().GetBody()) 3718 if size != int(respSizes[index]) { 3719 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) 3720 } 3721 index++ 3722 } 3723 if err := stream.CloseSend(); err != nil { 3724 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3725 } 3726 if _, err := stream.Recv(); err != io.EOF { 3727 t.Fatalf("%v failed to complele the ping pong test: %v", stream, err) 3728 } 3729} 3730 3731func (s) TestMetadataStreamingRPC(t *testing.T) { 3732 for _, e := range listTestEnv() { 3733 testMetadataStreamingRPC(t, e) 3734 } 3735} 3736 3737func testMetadataStreamingRPC(t *testing.T, e env) { 3738 te := newTest(t, e) 3739 te.startServer(&testServer{security: e.security}) 3740 defer te.tearDown() 3741 tc := testpb.NewTestServiceClient(te.clientConn()) 3742 3743 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata) 3744 stream, err := tc.FullDuplexCall(ctx) 3745 if err != nil { 3746 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3747 } 3748 go func() { 3749 headerMD, err := stream.Header() 3750 if e.security == "tls" { 3751 delete(headerMD, "transport_security_type") 3752 } 3753 delete(headerMD, "trailer") // ignore if present 3754 delete(headerMD, "user-agent") 3755 delete(headerMD, "content-type") 3756 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { 3757 t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata) 3758 } 3759 // test the cached value. 3760 headerMD, err = stream.Header() 3761 delete(headerMD, "trailer") // ignore if present 3762 delete(headerMD, "user-agent") 3763 delete(headerMD, "content-type") 3764 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { 3765 t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata) 3766 } 3767 err = func() error { 3768 for index := 0; index < len(reqSizes); index++ { 3769 respParam := []*testpb.ResponseParameters{ 3770 { 3771 Size: int32(respSizes[index]), 3772 }, 3773 } 3774 3775 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) 3776 if err != nil { 3777 return err 3778 } 3779 3780 req := &testpb.StreamingOutputCallRequest{ 3781 ResponseType: testpb.PayloadType_COMPRESSABLE, 3782 ResponseParameters: respParam, 3783 Payload: payload, 3784 } 3785 if err := stream.Send(req); err != nil { 3786 return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3787 } 3788 } 3789 return nil 3790 }() 3791 // Tell the server we're done sending args. 3792 stream.CloseSend() 3793 if err != nil { 3794 t.Error(err) 3795 } 3796 }() 3797 for { 3798 if _, err := stream.Recv(); err != nil { 3799 break 3800 } 3801 } 3802 trailerMD := stream.Trailer() 3803 if !reflect.DeepEqual(testTrailerMetadata, trailerMD) { 3804 t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata) 3805 } 3806} 3807 3808func (s) TestServerStreaming(t *testing.T) { 3809 for _, e := range listTestEnv() { 3810 testServerStreaming(t, e) 3811 } 3812} 3813 3814func testServerStreaming(t *testing.T, e env) { 3815 te := newTest(t, e) 3816 te.startServer(&testServer{security: e.security}) 3817 defer te.tearDown() 3818 tc := testpb.NewTestServiceClient(te.clientConn()) 3819 3820 respParam := make([]*testpb.ResponseParameters, len(respSizes)) 3821 for i, s := range respSizes { 3822 respParam[i] = &testpb.ResponseParameters{ 3823 Size: int32(s), 3824 } 3825 } 3826 req := &testpb.StreamingOutputCallRequest{ 3827 ResponseType: testpb.PayloadType_COMPRESSABLE, 3828 ResponseParameters: respParam, 3829 } 3830 stream, err := tc.StreamingOutputCall(context.Background(), req) 3831 if err != nil { 3832 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) 3833 } 3834 var rpcStatus error 3835 var respCnt int 3836 var index int 3837 for { 3838 reply, err := stream.Recv() 3839 if err != nil { 3840 rpcStatus = err 3841 break 3842 } 3843 pt := reply.GetPayload().GetType() 3844 if pt != testpb.PayloadType_COMPRESSABLE { 3845 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) 3846 } 3847 size := len(reply.GetPayload().GetBody()) 3848 if size != int(respSizes[index]) { 3849 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) 3850 } 3851 index++ 3852 respCnt++ 3853 } 3854 if rpcStatus != io.EOF { 3855 t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus) 3856 } 3857 if respCnt != len(respSizes) { 3858 t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt) 3859 } 3860} 3861 3862func (s) TestFailedServerStreaming(t *testing.T) { 3863 for _, e := range listTestEnv() { 3864 testFailedServerStreaming(t, e) 3865 } 3866} 3867 3868func testFailedServerStreaming(t *testing.T, e env) { 3869 te := newTest(t, e) 3870 te.userAgent = failAppUA 3871 te.startServer(&testServer{security: e.security}) 3872 defer te.tearDown() 3873 tc := testpb.NewTestServiceClient(te.clientConn()) 3874 3875 respParam := make([]*testpb.ResponseParameters, len(respSizes)) 3876 for i, s := range respSizes { 3877 respParam[i] = &testpb.ResponseParameters{ 3878 Size: int32(s), 3879 } 3880 } 3881 req := &testpb.StreamingOutputCallRequest{ 3882 ResponseType: testpb.PayloadType_COMPRESSABLE, 3883 ResponseParameters: respParam, 3884 } 3885 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata) 3886 stream, err := tc.StreamingOutputCall(ctx, req) 3887 if err != nil { 3888 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) 3889 } 3890 wantErr := status.Error(codes.DataLoss, "error for testing: "+failAppUA) 3891 if _, err := stream.Recv(); !equalError(err, wantErr) { 3892 t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr) 3893 } 3894} 3895 3896func equalError(x, y error) bool { 3897 return x == y || (x != nil && y != nil && x.Error() == y.Error()) 3898} 3899 3900// concurrentSendServer is a TestServiceServer whose 3901// StreamingOutputCall makes ten serial Send calls, sending payloads 3902// "0".."9", inclusive. TestServerStreamingConcurrent verifies they 3903// were received in the correct order, and that there were no races. 3904// 3905// All other TestServiceServer methods crash if called. 3906type concurrentSendServer struct { 3907 testpb.TestServiceServer 3908} 3909 3910func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { 3911 for i := 0; i < 10; i++ { 3912 stream.Send(&testpb.StreamingOutputCallResponse{ 3913 Payload: &testpb.Payload{ 3914 Body: []byte{'0' + uint8(i)}, 3915 }, 3916 }) 3917 } 3918 return nil 3919} 3920 3921// Tests doing a bunch of concurrent streaming output calls. 3922func (s) TestServerStreamingConcurrent(t *testing.T) { 3923 for _, e := range listTestEnv() { 3924 testServerStreamingConcurrent(t, e) 3925 } 3926} 3927 3928func testServerStreamingConcurrent(t *testing.T, e env) { 3929 te := newTest(t, e) 3930 te.startServer(concurrentSendServer{}) 3931 defer te.tearDown() 3932 3933 cc := te.clientConn() 3934 tc := testpb.NewTestServiceClient(cc) 3935 3936 doStreamingCall := func() { 3937 req := &testpb.StreamingOutputCallRequest{} 3938 stream, err := tc.StreamingOutputCall(context.Background(), req) 3939 if err != nil { 3940 t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) 3941 return 3942 } 3943 var ngot int 3944 var buf bytes.Buffer 3945 for { 3946 reply, err := stream.Recv() 3947 if err == io.EOF { 3948 break 3949 } 3950 if err != nil { 3951 t.Fatal(err) 3952 } 3953 ngot++ 3954 if buf.Len() > 0 { 3955 buf.WriteByte(',') 3956 } 3957 buf.Write(reply.GetPayload().GetBody()) 3958 } 3959 if want := 10; ngot != want { 3960 t.Errorf("Got %d replies, want %d", ngot, want) 3961 } 3962 if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want { 3963 t.Errorf("Got replies %q; want %q", got, want) 3964 } 3965 } 3966 3967 var wg sync.WaitGroup 3968 for i := 0; i < 20; i++ { 3969 wg.Add(1) 3970 go func() { 3971 defer wg.Done() 3972 doStreamingCall() 3973 }() 3974 } 3975 wg.Wait() 3976 3977} 3978 3979func generatePayloadSizes() [][]int { 3980 reqSizes := [][]int{ 3981 {27182, 8, 1828, 45904}, 3982 } 3983 3984 num8KPayloads := 1024 3985 eightKPayloads := []int{} 3986 for i := 0; i < num8KPayloads; i++ { 3987 eightKPayloads = append(eightKPayloads, (1 << 13)) 3988 } 3989 reqSizes = append(reqSizes, eightKPayloads) 3990 3991 num2MPayloads := 8 3992 twoMPayloads := []int{} 3993 for i := 0; i < num2MPayloads; i++ { 3994 twoMPayloads = append(twoMPayloads, (1 << 21)) 3995 } 3996 reqSizes = append(reqSizes, twoMPayloads) 3997 3998 return reqSizes 3999} 4000 4001func (s) TestClientStreaming(t *testing.T) { 4002 for _, s := range generatePayloadSizes() { 4003 for _, e := range listTestEnv() { 4004 testClientStreaming(t, e, s) 4005 } 4006 } 4007} 4008 4009func testClientStreaming(t *testing.T, e env, sizes []int) { 4010 te := newTest(t, e) 4011 te.startServer(&testServer{security: e.security}) 4012 defer te.tearDown() 4013 tc := testpb.NewTestServiceClient(te.clientConn()) 4014 4015 ctx, cancel := context.WithTimeout(te.ctx, time.Second*30) 4016 defer cancel() 4017 stream, err := tc.StreamingInputCall(ctx) 4018 if err != nil { 4019 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err) 4020 } 4021 4022 var sum int 4023 for _, s := range sizes { 4024 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s)) 4025 if err != nil { 4026 t.Fatal(err) 4027 } 4028 4029 req := &testpb.StreamingInputCallRequest{ 4030 Payload: payload, 4031 } 4032 if err := stream.Send(req); err != nil { 4033 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err) 4034 } 4035 sum += s 4036 } 4037 reply, err := stream.CloseAndRecv() 4038 if err != nil { 4039 t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) 4040 } 4041 if reply.GetAggregatedPayloadSize() != int32(sum) { 4042 t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum) 4043 } 4044} 4045 4046func (s) TestClientStreamingError(t *testing.T) { 4047 for _, e := range listTestEnv() { 4048 if e.name == "handler-tls" { 4049 continue 4050 } 4051 testClientStreamingError(t, e) 4052 } 4053} 4054 4055func testClientStreamingError(t *testing.T, e env) { 4056 te := newTest(t, e) 4057 te.startServer(&testServer{security: e.security, earlyFail: true}) 4058 defer te.tearDown() 4059 tc := testpb.NewTestServiceClient(te.clientConn()) 4060 4061 stream, err := tc.StreamingInputCall(te.ctx) 4062 if err != nil { 4063 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err) 4064 } 4065 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1) 4066 if err != nil { 4067 t.Fatal(err) 4068 } 4069 4070 req := &testpb.StreamingInputCallRequest{ 4071 Payload: payload, 4072 } 4073 // The 1st request should go through. 4074 if err := stream.Send(req); err != nil { 4075 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 4076 } 4077 for { 4078 if err := stream.Send(req); err != io.EOF { 4079 continue 4080 } 4081 if _, err := stream.CloseAndRecv(); status.Code(err) != codes.NotFound { 4082 t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound) 4083 } 4084 break 4085 } 4086} 4087 4088func (s) TestExceedMaxStreamsLimit(t *testing.T) { 4089 for _, e := range listTestEnv() { 4090 testExceedMaxStreamsLimit(t, e) 4091 } 4092} 4093 4094func testExceedMaxStreamsLimit(t *testing.T, e env) { 4095 te := newTest(t, e) 4096 te.declareLogNoise( 4097 "http2Client.notifyError got notified that the client transport was broken", 4098 "Conn.resetTransport failed to create client transport", 4099 "grpc: the connection is closing", 4100 ) 4101 te.maxStream = 1 // Only allows 1 live stream per server transport. 4102 te.startServer(&testServer{security: e.security}) 4103 defer te.tearDown() 4104 4105 cc := te.clientConn() 4106 tc := testpb.NewTestServiceClient(cc) 4107 4108 _, err := tc.StreamingInputCall(te.ctx) 4109 if err != nil { 4110 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) 4111 } 4112 // Loop until receiving the new max stream setting from the server. 4113 for { 4114 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 4115 defer cancel() 4116 _, err := tc.StreamingInputCall(ctx) 4117 if err == nil { 4118 time.Sleep(50 * time.Millisecond) 4119 continue 4120 } 4121 if status.Code(err) == codes.DeadlineExceeded { 4122 break 4123 } 4124 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) 4125 } 4126} 4127 4128func (s) TestStreamsQuotaRecovery(t *testing.T) { 4129 for _, e := range listTestEnv() { 4130 testStreamsQuotaRecovery(t, e) 4131 } 4132} 4133 4134func testStreamsQuotaRecovery(t *testing.T, e env) { 4135 te := newTest(t, e) 4136 te.declareLogNoise( 4137 "http2Client.notifyError got notified that the client transport was broken", 4138 "Conn.resetTransport failed to create client transport", 4139 "grpc: the connection is closing", 4140 ) 4141 te.maxStream = 1 // Allows 1 live stream. 4142 te.startServer(&testServer{security: e.security}) 4143 defer te.tearDown() 4144 4145 cc := te.clientConn() 4146 tc := testpb.NewTestServiceClient(cc) 4147 ctx, cancel := context.WithCancel(context.Background()) 4148 defer cancel() 4149 if _, err := tc.StreamingInputCall(ctx); err != nil { 4150 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, <nil>", err) 4151 } 4152 // Loop until the new max stream setting is effective. 4153 for { 4154 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 4155 _, err := tc.StreamingInputCall(ctx) 4156 cancel() 4157 if err == nil { 4158 time.Sleep(5 * time.Millisecond) 4159 continue 4160 } 4161 if status.Code(err) == codes.DeadlineExceeded { 4162 break 4163 } 4164 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %s", err, codes.DeadlineExceeded) 4165 } 4166 4167 var wg sync.WaitGroup 4168 for i := 0; i < 10; i++ { 4169 wg.Add(1) 4170 go func() { 4171 defer wg.Done() 4172 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314) 4173 if err != nil { 4174 t.Error(err) 4175 return 4176 } 4177 req := &testpb.SimpleRequest{ 4178 ResponseType: testpb.PayloadType_COMPRESSABLE, 4179 ResponseSize: 1592, 4180 Payload: payload, 4181 } 4182 // No rpc should go through due to the max streams limit. 4183 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 4184 defer cancel() 4185 if _, err := tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 4186 t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 4187 } 4188 }() 4189 } 4190 wg.Wait() 4191 4192 cancel() 4193 // A new stream should be allowed after canceling the first one. 4194 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 4195 defer cancel() 4196 if _, err := tc.StreamingInputCall(ctx); err != nil { 4197 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %v", err, nil) 4198 } 4199} 4200 4201func (s) TestCompressServerHasNoSupport(t *testing.T) { 4202 for _, e := range listTestEnv() { 4203 testCompressServerHasNoSupport(t, e) 4204 } 4205} 4206 4207func testCompressServerHasNoSupport(t *testing.T, e env) { 4208 te := newTest(t, e) 4209 te.serverCompression = false 4210 te.clientCompression = false 4211 te.clientNopCompression = true 4212 te.startServer(&testServer{security: e.security}) 4213 defer te.tearDown() 4214 tc := testpb.NewTestServiceClient(te.clientConn()) 4215 4216 const argSize = 271828 4217 const respSize = 314159 4218 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 4219 if err != nil { 4220 t.Fatal(err) 4221 } 4222 req := &testpb.SimpleRequest{ 4223 ResponseType: testpb.PayloadType_COMPRESSABLE, 4224 ResponseSize: respSize, 4225 Payload: payload, 4226 } 4227 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.Unimplemented { 4228 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented) 4229 } 4230 // Streaming RPC 4231 stream, err := tc.FullDuplexCall(context.Background()) 4232 if err != nil { 4233 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4234 } 4235 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Unimplemented { 4236 t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented) 4237 } 4238} 4239 4240func (s) TestCompressOK(t *testing.T) { 4241 for _, e := range listTestEnv() { 4242 testCompressOK(t, e) 4243 } 4244} 4245 4246func testCompressOK(t *testing.T, e env) { 4247 te := newTest(t, e) 4248 te.serverCompression = true 4249 te.clientCompression = true 4250 te.startServer(&testServer{security: e.security}) 4251 defer te.tearDown() 4252 tc := testpb.NewTestServiceClient(te.clientConn()) 4253 4254 // Unary call 4255 const argSize = 271828 4256 const respSize = 314159 4257 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 4258 if err != nil { 4259 t.Fatal(err) 4260 } 4261 req := &testpb.SimpleRequest{ 4262 ResponseType: testpb.PayloadType_COMPRESSABLE, 4263 ResponseSize: respSize, 4264 Payload: payload, 4265 } 4266 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) 4267 if _, err := tc.UnaryCall(ctx, req); err != nil { 4268 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 4269 } 4270 // Streaming RPC 4271 ctx, cancel := context.WithCancel(context.Background()) 4272 defer cancel() 4273 stream, err := tc.FullDuplexCall(ctx) 4274 if err != nil { 4275 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4276 } 4277 respParam := []*testpb.ResponseParameters{ 4278 { 4279 Size: 31415, 4280 }, 4281 } 4282 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) 4283 if err != nil { 4284 t.Fatal(err) 4285 } 4286 sreq := &testpb.StreamingOutputCallRequest{ 4287 ResponseType: testpb.PayloadType_COMPRESSABLE, 4288 ResponseParameters: respParam, 4289 Payload: payload, 4290 } 4291 if err := stream.Send(sreq); err != nil { 4292 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 4293 } 4294 stream.CloseSend() 4295 if _, err := stream.Recv(); err != nil { 4296 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 4297 } 4298 if _, err := stream.Recv(); err != io.EOF { 4299 t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err) 4300 } 4301} 4302 4303func (s) TestIdentityEncoding(t *testing.T) { 4304 for _, e := range listTestEnv() { 4305 testIdentityEncoding(t, e) 4306 } 4307} 4308 4309func testIdentityEncoding(t *testing.T, e env) { 4310 te := newTest(t, e) 4311 te.startServer(&testServer{security: e.security}) 4312 defer te.tearDown() 4313 tc := testpb.NewTestServiceClient(te.clientConn()) 4314 4315 // Unary call 4316 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 5) 4317 if err != nil { 4318 t.Fatal(err) 4319 } 4320 req := &testpb.SimpleRequest{ 4321 ResponseType: testpb.PayloadType_COMPRESSABLE, 4322 ResponseSize: 10, 4323 Payload: payload, 4324 } 4325 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) 4326 if _, err := tc.UnaryCall(ctx, req); err != nil { 4327 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 4328 } 4329 // Streaming RPC 4330 ctx, cancel := context.WithCancel(context.Background()) 4331 defer cancel() 4332 stream, err := tc.FullDuplexCall(ctx, grpc.UseCompressor("identity")) 4333 if err != nil { 4334 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4335 } 4336 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) 4337 if err != nil { 4338 t.Fatal(err) 4339 } 4340 sreq := &testpb.StreamingOutputCallRequest{ 4341 ResponseType: testpb.PayloadType_COMPRESSABLE, 4342 ResponseParameters: []*testpb.ResponseParameters{{Size: 10}}, 4343 Payload: payload, 4344 } 4345 if err := stream.Send(sreq); err != nil { 4346 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 4347 } 4348 stream.CloseSend() 4349 if _, err := stream.Recv(); err != nil { 4350 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 4351 } 4352 if _, err := stream.Recv(); err != io.EOF { 4353 t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err) 4354 } 4355} 4356 4357func (s) TestUnaryClientInterceptor(t *testing.T) { 4358 for _, e := range listTestEnv() { 4359 testUnaryClientInterceptor(t, e) 4360 } 4361} 4362 4363func failOkayRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 4364 err := invoker(ctx, method, req, reply, cc, opts...) 4365 if err == nil { 4366 return status.Error(codes.NotFound, "") 4367 } 4368 return err 4369} 4370 4371func testUnaryClientInterceptor(t *testing.T, e env) { 4372 te := newTest(t, e) 4373 te.userAgent = testAppUA 4374 te.unaryClientInt = failOkayRPC 4375 te.startServer(&testServer{security: e.security}) 4376 defer te.tearDown() 4377 4378 tc := testpb.NewTestServiceClient(te.clientConn()) 4379 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.NotFound { 4380 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound) 4381 } 4382} 4383 4384func (s) TestStreamClientInterceptor(t *testing.T) { 4385 for _, e := range listTestEnv() { 4386 testStreamClientInterceptor(t, e) 4387 } 4388} 4389 4390func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 4391 s, err := streamer(ctx, desc, cc, method, opts...) 4392 if err == nil { 4393 return nil, status.Error(codes.NotFound, "") 4394 } 4395 return s, nil 4396} 4397 4398func testStreamClientInterceptor(t *testing.T, e env) { 4399 te := newTest(t, e) 4400 te.streamClientInt = failOkayStream 4401 te.startServer(&testServer{security: e.security}) 4402 defer te.tearDown() 4403 4404 tc := testpb.NewTestServiceClient(te.clientConn()) 4405 respParam := []*testpb.ResponseParameters{ 4406 { 4407 Size: int32(1), 4408 }, 4409 } 4410 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1)) 4411 if err != nil { 4412 t.Fatal(err) 4413 } 4414 req := &testpb.StreamingOutputCallRequest{ 4415 ResponseType: testpb.PayloadType_COMPRESSABLE, 4416 ResponseParameters: respParam, 4417 Payload: payload, 4418 } 4419 if _, err := tc.StreamingOutputCall(context.Background(), req); status.Code(err) != codes.NotFound { 4420 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound) 4421 } 4422} 4423 4424func (s) TestUnaryServerInterceptor(t *testing.T) { 4425 for _, e := range listTestEnv() { 4426 testUnaryServerInterceptor(t, e) 4427 } 4428} 4429 4430func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { 4431 return nil, status.Error(codes.PermissionDenied, "") 4432} 4433 4434func testUnaryServerInterceptor(t *testing.T, e env) { 4435 te := newTest(t, e) 4436 te.unaryServerInt = errInjector 4437 te.startServer(&testServer{security: e.security}) 4438 defer te.tearDown() 4439 4440 tc := testpb.NewTestServiceClient(te.clientConn()) 4441 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.PermissionDenied { 4442 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) 4443 } 4444} 4445 4446func (s) TestStreamServerInterceptor(t *testing.T) { 4447 for _, e := range listTestEnv() { 4448 // TODO(bradfitz): Temporarily skip this env due to #619. 4449 if e.name == "handler-tls" { 4450 continue 4451 } 4452 testStreamServerInterceptor(t, e) 4453 } 4454} 4455 4456func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 4457 if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" { 4458 return handler(srv, ss) 4459 } 4460 // Reject the other methods. 4461 return status.Error(codes.PermissionDenied, "") 4462} 4463 4464func testStreamServerInterceptor(t *testing.T, e env) { 4465 te := newTest(t, e) 4466 te.streamServerInt = fullDuplexOnly 4467 te.startServer(&testServer{security: e.security}) 4468 defer te.tearDown() 4469 4470 tc := testpb.NewTestServiceClient(te.clientConn()) 4471 respParam := []*testpb.ResponseParameters{ 4472 { 4473 Size: int32(1), 4474 }, 4475 } 4476 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1)) 4477 if err != nil { 4478 t.Fatal(err) 4479 } 4480 req := &testpb.StreamingOutputCallRequest{ 4481 ResponseType: testpb.PayloadType_COMPRESSABLE, 4482 ResponseParameters: respParam, 4483 Payload: payload, 4484 } 4485 s1, err := tc.StreamingOutputCall(context.Background(), req) 4486 if err != nil { 4487 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err) 4488 } 4489 if _, err := s1.Recv(); status.Code(err) != codes.PermissionDenied { 4490 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) 4491 } 4492 s2, err := tc.FullDuplexCall(context.Background()) 4493 if err != nil { 4494 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4495 } 4496 if err := s2.Send(req); err != nil { 4497 t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err) 4498 } 4499 if _, err := s2.Recv(); err != nil { 4500 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err) 4501 } 4502} 4503 4504// funcServer implements methods of TestServiceServer using funcs, 4505// similar to an http.HandlerFunc. 4506// Any unimplemented method will crash. Tests implement the method(s) 4507// they need. 4508type funcServer struct { 4509 testpb.TestServiceServer 4510 unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) 4511 streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error 4512 fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error 4513} 4514 4515func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4516 return s.unaryCall(ctx, in) 4517} 4518 4519func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error { 4520 return s.streamingInputCall(stream) 4521} 4522 4523func (s *funcServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 4524 return s.fullDuplexCall(stream) 4525} 4526 4527func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) { 4528 for _, e := range listTestEnv() { 4529 testClientRequestBodyErrorUnexpectedEOF(t, e) 4530 } 4531} 4532 4533func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) { 4534 te := newTest(t, e) 4535 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4536 errUnexpectedCall := errors.New("unexpected call func server method") 4537 t.Error(errUnexpectedCall) 4538 return nil, errUnexpectedCall 4539 }} 4540 te.startServer(ts) 4541 defer te.tearDown() 4542 te.withServerTester(func(st *serverTester) { 4543 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") 4544 // Say we have 5 bytes coming, but set END_STREAM flag: 4545 st.writeData(1, true, []byte{0, 0, 0, 0, 5}) 4546 st.wantAnyFrame() // wait for server to crash (it used to crash) 4547 }) 4548} 4549 4550func (s) TestClientRequestBodyErrorCloseAfterLength(t *testing.T) { 4551 for _, e := range listTestEnv() { 4552 testClientRequestBodyErrorCloseAfterLength(t, e) 4553 } 4554} 4555 4556func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) { 4557 te := newTest(t, e) 4558 te.declareLogNoise("Server.processUnaryRPC failed to write status") 4559 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4560 errUnexpectedCall := errors.New("unexpected call func server method") 4561 t.Error(errUnexpectedCall) 4562 return nil, errUnexpectedCall 4563 }} 4564 te.startServer(ts) 4565 defer te.tearDown() 4566 te.withServerTester(func(st *serverTester) { 4567 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") 4568 // say we're sending 5 bytes, but then close the connection instead. 4569 st.writeData(1, false, []byte{0, 0, 0, 0, 5}) 4570 st.cc.Close() 4571 }) 4572} 4573 4574func (s) TestClientRequestBodyErrorCancel(t *testing.T) { 4575 for _, e := range listTestEnv() { 4576 testClientRequestBodyErrorCancel(t, e) 4577 } 4578} 4579 4580func testClientRequestBodyErrorCancel(t *testing.T, e env) { 4581 te := newTest(t, e) 4582 gotCall := make(chan bool, 1) 4583 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4584 gotCall <- true 4585 return new(testpb.SimpleResponse), nil 4586 }} 4587 te.startServer(ts) 4588 defer te.tearDown() 4589 te.withServerTester(func(st *serverTester) { 4590 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") 4591 // Say we have 5 bytes coming, but cancel it instead. 4592 st.writeRSTStream(1, http2.ErrCodeCancel) 4593 st.writeData(1, false, []byte{0, 0, 0, 0, 5}) 4594 4595 // Verify we didn't a call yet. 4596 select { 4597 case <-gotCall: 4598 t.Fatal("unexpected call") 4599 default: 4600 } 4601 4602 // And now send an uncanceled (but still invalid), just to get a response. 4603 st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall") 4604 st.writeData(3, true, []byte{0, 0, 0, 0, 0}) 4605 <-gotCall 4606 st.wantAnyFrame() 4607 }) 4608} 4609 4610func (s) TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) { 4611 for _, e := range listTestEnv() { 4612 testClientRequestBodyErrorCancelStreamingInput(t, e) 4613 } 4614} 4615 4616func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) { 4617 te := newTest(t, e) 4618 recvErr := make(chan error, 1) 4619 ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { 4620 _, err := stream.Recv() 4621 recvErr <- err 4622 return nil 4623 }} 4624 te.startServer(ts) 4625 defer te.tearDown() 4626 te.withServerTester(func(st *serverTester) { 4627 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall") 4628 // Say we have 5 bytes coming, but cancel it instead. 4629 st.writeData(1, false, []byte{0, 0, 0, 0, 5}) 4630 st.writeRSTStream(1, http2.ErrCodeCancel) 4631 4632 var got error 4633 select { 4634 case got = <-recvErr: 4635 case <-time.After(3 * time.Second): 4636 t.Fatal("timeout waiting for error") 4637 } 4638 if grpc.Code(got) != codes.Canceled { 4639 t.Errorf("error = %#v; want error code %s", got, codes.Canceled) 4640 } 4641 }) 4642} 4643 4644func (s) TestClientResourceExhaustedCancelFullDuplex(t *testing.T) { 4645 for _, e := range listTestEnv() { 4646 if e.httpHandler { 4647 // httpHandler write won't be blocked on flow control window. 4648 continue 4649 } 4650 testClientResourceExhaustedCancelFullDuplex(t, e) 4651 } 4652} 4653 4654func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) { 4655 te := newTest(t, e) 4656 recvErr := make(chan error, 1) 4657 ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 4658 defer close(recvErr) 4659 _, err := stream.Recv() 4660 if err != nil { 4661 return status.Errorf(codes.Internal, "stream.Recv() got error: %v, want <nil>", err) 4662 } 4663 // create a payload that's larger than the default flow control window. 4664 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10) 4665 if err != nil { 4666 return err 4667 } 4668 resp := &testpb.StreamingOutputCallResponse{ 4669 Payload: payload, 4670 } 4671 ce := make(chan error, 1) 4672 go func() { 4673 var err error 4674 for { 4675 if err = stream.Send(resp); err != nil { 4676 break 4677 } 4678 } 4679 ce <- err 4680 }() 4681 select { 4682 case err = <-ce: 4683 case <-time.After(10 * time.Second): 4684 err = errors.New("10s timeout reached") 4685 } 4686 recvErr <- err 4687 return err 4688 }} 4689 te.startServer(ts) 4690 defer te.tearDown() 4691 // set a low limit on receive message size to error with Resource Exhausted on 4692 // client side when server send a large message. 4693 te.maxClientReceiveMsgSize = newInt(10) 4694 cc := te.clientConn() 4695 tc := testpb.NewTestServiceClient(cc) 4696 stream, err := tc.FullDuplexCall(context.Background()) 4697 if err != nil { 4698 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4699 } 4700 req := &testpb.StreamingOutputCallRequest{} 4701 if err := stream.Send(req); err != nil { 4702 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 4703 } 4704 if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted { 4705 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 4706 } 4707 err = <-recvErr 4708 if status.Code(err) != codes.Canceled { 4709 t.Fatalf("server got error %v, want error code: %s", err, codes.Canceled) 4710 } 4711} 4712 4713type clientFailCreds struct{} 4714 4715func (c *clientFailCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4716 return rawConn, nil, nil 4717} 4718func (c *clientFailCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4719 return nil, nil, fmt.Errorf("client handshake fails with fatal error") 4720} 4721func (c *clientFailCreds) Info() credentials.ProtocolInfo { 4722 return credentials.ProtocolInfo{} 4723} 4724func (c *clientFailCreds) Clone() credentials.TransportCredentials { 4725 return c 4726} 4727func (c *clientFailCreds) OverrideServerName(s string) error { 4728 return nil 4729} 4730 4731// This test makes sure that failfast RPCs fail if client handshake fails with 4732// fatal errors. 4733func (s) TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) { 4734 lis, err := net.Listen("tcp", "localhost:0") 4735 if err != nil { 4736 t.Fatalf("Failed to listen: %v", err) 4737 } 4738 defer lis.Close() 4739 4740 cc, err := grpc.Dial("passthrough:///"+lis.Addr().String(), grpc.WithTransportCredentials(&clientFailCreds{})) 4741 if err != nil { 4742 t.Fatalf("grpc.Dial(_) = %v", err) 4743 } 4744 defer cc.Close() 4745 4746 tc := testpb.NewTestServiceClient(cc) 4747 // This unary call should fail, but not timeout. 4748 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 4749 defer cancel() 4750 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(false)); status.Code(err) != codes.Unavailable { 4751 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <Unavailable>", err) 4752 } 4753} 4754 4755func (s) TestFlowControlLogicalRace(t *testing.T) { 4756 // Test for a regression of https://github.com/grpc/grpc-go/issues/632, 4757 // and other flow control bugs. 4758 4759 const ( 4760 itemCount = 100 4761 itemSize = 1 << 10 4762 recvCount = 2 4763 maxFailures = 3 4764 4765 requestTimeout = time.Second * 5 4766 ) 4767 4768 requestCount := 10000 4769 if raceMode { 4770 requestCount = 1000 4771 } 4772 4773 lis, err := net.Listen("tcp", "localhost:0") 4774 if err != nil { 4775 t.Fatalf("Failed to listen: %v", err) 4776 } 4777 defer lis.Close() 4778 4779 s := grpc.NewServer() 4780 testpb.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{ 4781 itemCount: itemCount, 4782 itemSize: itemSize, 4783 }) 4784 defer s.Stop() 4785 4786 go s.Serve(lis) 4787 4788 ctx := context.Background() 4789 4790 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) 4791 if err != nil { 4792 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) 4793 } 4794 defer cc.Close() 4795 cl := testpb.NewTestServiceClient(cc) 4796 4797 failures := 0 4798 for i := 0; i < requestCount; i++ { 4799 ctx, cancel := context.WithTimeout(ctx, requestTimeout) 4800 output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{}) 4801 if err != nil { 4802 t.Fatalf("StreamingOutputCall; err = %q", err) 4803 } 4804 4805 j := 0 4806 loop: 4807 for ; j < recvCount; j++ { 4808 _, err := output.Recv() 4809 if err != nil { 4810 if err == io.EOF { 4811 break loop 4812 } 4813 switch status.Code(err) { 4814 case codes.DeadlineExceeded: 4815 break loop 4816 default: 4817 t.Fatalf("Recv; err = %q", err) 4818 } 4819 } 4820 } 4821 cancel() 4822 <-ctx.Done() 4823 4824 if j < recvCount { 4825 t.Errorf("got %d responses to request %d", j, i) 4826 failures++ 4827 if failures >= maxFailures { 4828 // Continue past the first failure to see if the connection is 4829 // entirely broken, or if only a single RPC was affected 4830 break 4831 } 4832 } 4833 } 4834} 4835 4836type flowControlLogicalRaceServer struct { 4837 testpb.TestServiceServer 4838 4839 itemSize int 4840 itemCount int 4841} 4842 4843func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testpb.TestService_StreamingOutputCallServer) error { 4844 for i := 0; i < s.itemCount; i++ { 4845 err := srv.Send(&testpb.StreamingOutputCallResponse{ 4846 Payload: &testpb.Payload{ 4847 // Sending a large stream of data which the client reject 4848 // helps to trigger some types of flow control bugs. 4849 // 4850 // Reallocating memory here is inefficient, but the stress it 4851 // puts on the GC leads to more frequent flow control 4852 // failures. The GC likely causes more variety in the 4853 // goroutine scheduling orders. 4854 Body: bytes.Repeat([]byte("a"), s.itemSize), 4855 }, 4856 }) 4857 if err != nil { 4858 return err 4859 } 4860 } 4861 return nil 4862} 4863 4864type lockingWriter struct { 4865 mu sync.Mutex 4866 w io.Writer 4867} 4868 4869func (lw *lockingWriter) Write(p []byte) (n int, err error) { 4870 lw.mu.Lock() 4871 defer lw.mu.Unlock() 4872 return lw.w.Write(p) 4873} 4874 4875func (lw *lockingWriter) setWriter(w io.Writer) { 4876 lw.mu.Lock() 4877 defer lw.mu.Unlock() 4878 lw.w = w 4879} 4880 4881var testLogOutput = &lockingWriter{w: os.Stderr} 4882 4883// awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to 4884// terminate, if they're still running. It spams logs with this 4885// message. We wait for it so our log filter is still 4886// active. Otherwise the "defer restore()" at the top of various test 4887// functions restores our log filter and then the goroutine spams. 4888func awaitNewConnLogOutput() { 4889 awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry") 4890} 4891 4892func awaitLogOutput(maxWait time.Duration, phrase string) { 4893 pb := []byte(phrase) 4894 4895 timer := time.NewTimer(maxWait) 4896 defer timer.Stop() 4897 wakeup := make(chan bool, 1) 4898 for { 4899 if logOutputHasContents(pb, wakeup) { 4900 return 4901 } 4902 select { 4903 case <-timer.C: 4904 // Too slow. Oh well. 4905 return 4906 case <-wakeup: 4907 } 4908 } 4909} 4910 4911func logOutputHasContents(v []byte, wakeup chan<- bool) bool { 4912 testLogOutput.mu.Lock() 4913 defer testLogOutput.mu.Unlock() 4914 fw, ok := testLogOutput.w.(*filterWriter) 4915 if !ok { 4916 return false 4917 } 4918 fw.mu.Lock() 4919 defer fw.mu.Unlock() 4920 if bytes.Contains(fw.buf.Bytes(), v) { 4921 return true 4922 } 4923 fw.wakeup = wakeup 4924 return false 4925} 4926 4927var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering") 4928 4929func noop() {} 4930 4931// declareLogNoise declares that t is expected to emit the following noisy phrases, 4932// even on success. Those phrases will be filtered from grpclog output 4933// and only be shown if *verbose_logs or t ends up failing. 4934// The returned restore function should be called with defer to be run 4935// before the test ends. 4936func declareLogNoise(t *testing.T, phrases ...string) (restore func()) { 4937 if *verboseLogs { 4938 return noop 4939 } 4940 fw := &filterWriter{dst: os.Stderr, filter: phrases} 4941 testLogOutput.setWriter(fw) 4942 return func() { 4943 if t.Failed() { 4944 fw.mu.Lock() 4945 defer fw.mu.Unlock() 4946 if fw.buf.Len() > 0 { 4947 t.Logf("Complete log output:\n%s", fw.buf.Bytes()) 4948 } 4949 } 4950 testLogOutput.setWriter(os.Stderr) 4951 } 4952} 4953 4954type filterWriter struct { 4955 dst io.Writer 4956 filter []string 4957 4958 mu sync.Mutex 4959 buf bytes.Buffer 4960 wakeup chan<- bool // if non-nil, gets true on write 4961} 4962 4963func (fw *filterWriter) Write(p []byte) (n int, err error) { 4964 fw.mu.Lock() 4965 fw.buf.Write(p) 4966 if fw.wakeup != nil { 4967 select { 4968 case fw.wakeup <- true: 4969 default: 4970 } 4971 } 4972 fw.mu.Unlock() 4973 4974 ps := string(p) 4975 for _, f := range fw.filter { 4976 if strings.Contains(ps, f) { 4977 return len(p), nil 4978 } 4979 } 4980 return fw.dst.Write(p) 4981} 4982 4983// stubServer is a server that is easy to customize within individual test 4984// cases. 4985type stubServer struct { 4986 // Guarantees we satisfy this interface; panics if unimplemented methods are called. 4987 testpb.TestServiceServer 4988 4989 // Customizable implementations of server handlers. 4990 emptyCall func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) 4991 unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) 4992 fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error 4993 4994 // A client connected to this service the test may use. Created in Start(). 4995 client testpb.TestServiceClient 4996 cc *grpc.ClientConn 4997 s *grpc.Server 4998 4999 addr string // address of listener 5000 5001 cleanups []func() // Lambdas executed in Stop(); populated by Start(). 5002 5003 r *manual.Resolver 5004} 5005 5006func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5007 return ss.emptyCall(ctx, in) 5008} 5009 5010func (ss *stubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 5011 return ss.unaryCall(ctx, in) 5012} 5013 5014func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 5015 return ss.fullDuplexCall(stream) 5016} 5017 5018// Start starts the server and creates a client connected to it. 5019func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { 5020 r, cleanup := manual.GenerateAndRegisterManualResolver() 5021 ss.r = r 5022 ss.cleanups = append(ss.cleanups, cleanup) 5023 5024 lis, err := net.Listen("tcp", "localhost:0") 5025 if err != nil { 5026 return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err) 5027 } 5028 ss.addr = lis.Addr().String() 5029 ss.cleanups = append(ss.cleanups, func() { lis.Close() }) 5030 5031 s := grpc.NewServer(sopts...) 5032 testpb.RegisterTestServiceServer(s, ss) 5033 go s.Serve(lis) 5034 ss.cleanups = append(ss.cleanups, s.Stop) 5035 ss.s = s 5036 5037 target := ss.r.Scheme() + ":///" + ss.addr 5038 5039 opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...) 5040 cc, err := grpc.Dial(target, opts...) 5041 if err != nil { 5042 return fmt.Errorf("grpc.Dial(%q) = %v", target, err) 5043 } 5044 ss.cc = cc 5045 ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}}) 5046 if err := ss.waitForReady(cc); err != nil { 5047 return err 5048 } 5049 5050 ss.cleanups = append(ss.cleanups, func() { cc.Close() }) 5051 5052 ss.client = testpb.NewTestServiceClient(cc) 5053 return nil 5054} 5055 5056func (ss *stubServer) newServiceConfig(sc string) { 5057 ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}, ServiceConfig: parseCfg(ss.r, sc)}) 5058} 5059 5060func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error { 5061 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 5062 defer cancel() 5063 for { 5064 s := cc.GetState() 5065 if s == connectivity.Ready { 5066 return nil 5067 } 5068 if !cc.WaitForStateChange(ctx, s) { 5069 // ctx got timeout or canceled. 5070 return ctx.Err() 5071 } 5072 } 5073} 5074 5075func (ss *stubServer) Stop() { 5076 for i := len(ss.cleanups) - 1; i >= 0; i-- { 5077 ss.cleanups[i]() 5078 } 5079} 5080 5081func (s) TestGRPCMethod(t *testing.T) { 5082 var method string 5083 var ok bool 5084 5085 ss := &stubServer{ 5086 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5087 method, ok = grpc.Method(ctx) 5088 return &testpb.Empty{}, nil 5089 }, 5090 } 5091 if err := ss.Start(nil); err != nil { 5092 t.Fatalf("Error starting endpoint server: %v", err) 5093 } 5094 defer ss.Stop() 5095 5096 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5097 defer cancel() 5098 5099 if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { 5100 t.Fatalf("ss.client.EmptyCall(_, _) = _, %v; want _, nil", err) 5101 } 5102 5103 if want := "/grpc.testing.TestService/EmptyCall"; !ok || method != want { 5104 t.Fatalf("grpc.Method(_) = %q, %v; want %q, true", method, ok, want) 5105 } 5106} 5107 5108func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) { 5109 const mdkey = "somedata" 5110 5111 // endpoint ensures mdkey is NOT in metadata and returns an error if it is. 5112 endpoint := &stubServer{ 5113 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5114 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil { 5115 return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) 5116 } 5117 return &testpb.Empty{}, nil 5118 }, 5119 } 5120 if err := endpoint.Start(nil); err != nil { 5121 t.Fatalf("Error starting endpoint server: %v", err) 5122 } 5123 defer endpoint.Stop() 5124 5125 // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint 5126 // without explicitly copying the metadata. 5127 proxy := &stubServer{ 5128 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5129 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil { 5130 return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey) 5131 } 5132 return endpoint.client.EmptyCall(ctx, in) 5133 }, 5134 } 5135 if err := proxy.Start(nil); err != nil { 5136 t.Fatalf("Error starting proxy server: %v", err) 5137 } 5138 defer proxy.Stop() 5139 5140 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5141 defer cancel() 5142 md := metadata.Pairs(mdkey, "val") 5143 ctx = metadata.NewOutgoingContext(ctx, md) 5144 5145 // Sanity check that endpoint properly errors when it sees mdkey. 5146 _, err := endpoint.client.EmptyCall(ctx, &testpb.Empty{}) 5147 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal { 5148 t.Fatalf("endpoint.client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err) 5149 } 5150 5151 if _, err := proxy.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { 5152 t.Fatal(err.Error()) 5153 } 5154} 5155 5156func (s) TestStreamingProxyDoesNotForwardMetadata(t *testing.T) { 5157 const mdkey = "somedata" 5158 5159 // doFDC performs a FullDuplexCall with client and returns the error from the 5160 // first stream.Recv call, or nil if that error is io.EOF. Calls t.Fatal if 5161 // the stream cannot be established. 5162 doFDC := func(ctx context.Context, client testpb.TestServiceClient) error { 5163 stream, err := client.FullDuplexCall(ctx) 5164 if err != nil { 5165 t.Fatalf("Unwanted error: %v", err) 5166 } 5167 if _, err := stream.Recv(); err != io.EOF { 5168 return err 5169 } 5170 return nil 5171 } 5172 5173 // endpoint ensures mdkey is NOT in metadata and returns an error if it is. 5174 endpoint := &stubServer{ 5175 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 5176 ctx := stream.Context() 5177 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil { 5178 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) 5179 } 5180 return nil 5181 }, 5182 } 5183 if err := endpoint.Start(nil); err != nil { 5184 t.Fatalf("Error starting endpoint server: %v", err) 5185 } 5186 defer endpoint.Stop() 5187 5188 // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint 5189 // without explicitly copying the metadata. 5190 proxy := &stubServer{ 5191 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 5192 ctx := stream.Context() 5193 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil { 5194 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) 5195 } 5196 return doFDC(ctx, endpoint.client) 5197 }, 5198 } 5199 if err := proxy.Start(nil); err != nil { 5200 t.Fatalf("Error starting proxy server: %v", err) 5201 } 5202 defer proxy.Stop() 5203 5204 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5205 defer cancel() 5206 md := metadata.Pairs(mdkey, "val") 5207 ctx = metadata.NewOutgoingContext(ctx, md) 5208 5209 // Sanity check that endpoint properly errors when it sees mdkey in ctx. 5210 err := doFDC(ctx, endpoint.client) 5211 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal { 5212 t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err) 5213 } 5214 5215 if err := doFDC(ctx, proxy.client); err != nil { 5216 t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err) 5217 } 5218} 5219 5220func (s) TestStatsTagsAndTrace(t *testing.T) { 5221 // Data added to context by client (typically in a stats handler). 5222 tags := []byte{1, 5, 2, 4, 3} 5223 trace := []byte{5, 2, 1, 3, 4} 5224 5225 // endpoint ensures Tags() and Trace() in context match those that were added 5226 // by the client and returns an error if not. 5227 endpoint := &stubServer{ 5228 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5229 md, _ := metadata.FromIncomingContext(ctx) 5230 if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) { 5231 return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags) 5232 } 5233 if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) { 5234 return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags) 5235 } 5236 if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) { 5237 return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace) 5238 } 5239 if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) { 5240 return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace) 5241 } 5242 return &testpb.Empty{}, nil 5243 }, 5244 } 5245 if err := endpoint.Start(nil); err != nil { 5246 t.Fatalf("Error starting endpoint server: %v", err) 5247 } 5248 defer endpoint.Stop() 5249 5250 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5251 defer cancel() 5252 5253 testCases := []struct { 5254 ctx context.Context 5255 want codes.Code 5256 }{ 5257 {ctx: ctx, want: codes.Internal}, 5258 {ctx: stats.SetTags(ctx, tags), want: codes.Internal}, 5259 {ctx: stats.SetTrace(ctx, trace), want: codes.Internal}, 5260 {ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal}, 5261 {ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK}, 5262 } 5263 5264 for _, tc := range testCases { 5265 _, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{}) 5266 if tc.want == codes.OK && err != nil { 5267 t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err) 5268 } 5269 if s, ok := status.FromError(err); !ok || s.Code() != tc.want { 5270 t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want) 5271 } 5272 } 5273} 5274 5275func (s) TestTapTimeout(t *testing.T) { 5276 sopts := []grpc.ServerOption{ 5277 grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) { 5278 c, cancel := context.WithCancel(ctx) 5279 // Call cancel instead of setting a deadline so we can detect which error 5280 // occurred -- this cancellation (desired) or the client's deadline 5281 // expired (indicating this cancellation did not affect the RPC). 5282 time.AfterFunc(10*time.Millisecond, cancel) 5283 return c, nil 5284 }), 5285 } 5286 5287 ss := &stubServer{ 5288 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5289 <-ctx.Done() 5290 return nil, status.Errorf(codes.Canceled, ctx.Err().Error()) 5291 }, 5292 } 5293 if err := ss.Start(sopts); err != nil { 5294 t.Fatalf("Error starting endpoint server: %v", err) 5295 } 5296 defer ss.Stop() 5297 5298 // This was known to be flaky; test several times. 5299 for i := 0; i < 10; i++ { 5300 // Set our own deadline in case the server hangs. 5301 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5302 res, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) 5303 cancel() 5304 if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled { 5305 t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, <status with Code()=Canceled>", res, err) 5306 } 5307 } 5308 5309} 5310 5311func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) { 5312 ss := &stubServer{ 5313 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 5314 return status.Errorf(codes.Internal, "") 5315 }, 5316 } 5317 sopts := []grpc.ServerOption{} 5318 if err := ss.Start(sopts); err != nil { 5319 t.Fatalf("Error starting endpoint server: %v", err) 5320 } 5321 defer ss.Stop() 5322 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 5323 defer cancel() 5324 stream, err := ss.client.FullDuplexCall(ctx) 5325 if err != nil { 5326 t.Fatalf("Error while creating stream: %v", err) 5327 } 5328 for { 5329 if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err == nil { 5330 time.Sleep(5 * time.Millisecond) 5331 } else if err == io.EOF { 5332 break // Success. 5333 } else { 5334 t.Fatalf("stream.Send(_) = %v, want io.EOF", err) 5335 } 5336 } 5337} 5338 5339type windowSizeConfig struct { 5340 serverStream int32 5341 serverConn int32 5342 clientStream int32 5343 clientConn int32 5344} 5345 5346func max(a, b int32) int32 { 5347 if a > b { 5348 return a 5349 } 5350 return b 5351} 5352 5353func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) { 5354 wc := windowSizeConfig{ 5355 serverStream: 8 * 1024 * 1024, 5356 serverConn: 12 * 1024 * 1024, 5357 clientStream: 6 * 1024 * 1024, 5358 clientConn: 8 * 1024 * 1024, 5359 } 5360 for _, e := range listTestEnv() { 5361 testConfigurableWindowSize(t, e, wc) 5362 } 5363} 5364 5365func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) { 5366 wc := windowSizeConfig{ 5367 serverStream: 1, 5368 serverConn: 1, 5369 clientStream: 1, 5370 clientConn: 1, 5371 } 5372 for _, e := range listTestEnv() { 5373 testConfigurableWindowSize(t, e, wc) 5374 } 5375} 5376 5377func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) { 5378 te := newTest(t, e) 5379 te.serverInitialWindowSize = wc.serverStream 5380 te.serverInitialConnWindowSize = wc.serverConn 5381 te.clientInitialWindowSize = wc.clientStream 5382 te.clientInitialConnWindowSize = wc.clientConn 5383 5384 te.startServer(&testServer{security: e.security}) 5385 defer te.tearDown() 5386 5387 cc := te.clientConn() 5388 tc := testpb.NewTestServiceClient(cc) 5389 stream, err := tc.FullDuplexCall(context.Background()) 5390 if err != nil { 5391 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5392 } 5393 numOfIter := 11 5394 // Set message size to exhaust largest of window sizes. 5395 messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1) 5396 messageSize = max(messageSize, 64*1024) 5397 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize) 5398 if err != nil { 5399 t.Fatal(err) 5400 } 5401 respParams := []*testpb.ResponseParameters{ 5402 { 5403 Size: messageSize, 5404 }, 5405 } 5406 req := &testpb.StreamingOutputCallRequest{ 5407 ResponseType: testpb.PayloadType_COMPRESSABLE, 5408 ResponseParameters: respParams, 5409 Payload: payload, 5410 } 5411 for i := 0; i < numOfIter; i++ { 5412 if err := stream.Send(req); err != nil { 5413 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 5414 } 5415 if _, err := stream.Recv(); err != nil { 5416 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) 5417 } 5418 } 5419 if err := stream.CloseSend(); err != nil { 5420 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err) 5421 } 5422} 5423 5424func (s) TestWaitForReadyConnection(t *testing.T) { 5425 for _, e := range listTestEnv() { 5426 testWaitForReadyConnection(t, e) 5427 } 5428 5429} 5430 5431func testWaitForReadyConnection(t *testing.T, e env) { 5432 te := newTest(t, e) 5433 te.userAgent = testAppUA 5434 te.startServer(&testServer{security: e.security}) 5435 defer te.tearDown() 5436 5437 cc := te.clientConn() // Non-blocking dial. 5438 tc := testpb.NewTestServiceClient(cc) 5439 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 5440 defer cancel() 5441 state := cc.GetState() 5442 // Wait for connection to be Ready. 5443 for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { 5444 } 5445 if state != connectivity.Ready { 5446 t.Fatalf("Want connection state to be Ready, got %v", state) 5447 } 5448 ctx, cancel = context.WithTimeout(context.Background(), time.Second) 5449 defer cancel() 5450 // Make a fail-fast RPC. 5451 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 5452 t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err) 5453 } 5454} 5455 5456type errCodec struct { 5457 noError bool 5458} 5459 5460func (c *errCodec) Marshal(v interface{}) ([]byte, error) { 5461 if c.noError { 5462 return []byte{}, nil 5463 } 5464 return nil, fmt.Errorf("3987^12 + 4365^12 = 4472^12") 5465} 5466 5467func (c *errCodec) Unmarshal(data []byte, v interface{}) error { 5468 return nil 5469} 5470 5471func (c *errCodec) Name() string { 5472 return "Fermat's near-miss." 5473} 5474 5475func (s) TestEncodeDoesntPanic(t *testing.T) { 5476 for _, e := range listTestEnv() { 5477 testEncodeDoesntPanic(t, e) 5478 } 5479} 5480 5481func testEncodeDoesntPanic(t *testing.T, e env) { 5482 te := newTest(t, e) 5483 erc := &errCodec{} 5484 te.customCodec = erc 5485 te.startServer(&testServer{security: e.security}) 5486 defer te.tearDown() 5487 te.customCodec = nil 5488 tc := testpb.NewTestServiceClient(te.clientConn()) 5489 // Failure case, should not panic. 5490 tc.EmptyCall(context.Background(), &testpb.Empty{}) 5491 erc.noError = true 5492 // Passing case. 5493 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 5494 t.Fatalf("EmptyCall(_, _) = _, %v, want _, <nil>", err) 5495 } 5496} 5497 5498func (s) TestSvrWriteStatusEarlyWrite(t *testing.T) { 5499 for _, e := range listTestEnv() { 5500 testSvrWriteStatusEarlyWrite(t, e) 5501 } 5502} 5503 5504func testSvrWriteStatusEarlyWrite(t *testing.T, e env) { 5505 te := newTest(t, e) 5506 const smallSize = 1024 5507 const largeSize = 2048 5508 const extraLargeSize = 4096 5509 te.maxServerReceiveMsgSize = newInt(largeSize) 5510 te.maxServerSendMsgSize = newInt(largeSize) 5511 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 5512 if err != nil { 5513 t.Fatal(err) 5514 } 5515 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) 5516 if err != nil { 5517 t.Fatal(err) 5518 } 5519 te.startServer(&testServer{security: e.security}) 5520 defer te.tearDown() 5521 tc := testpb.NewTestServiceClient(te.clientConn()) 5522 respParam := []*testpb.ResponseParameters{ 5523 { 5524 Size: int32(smallSize), 5525 }, 5526 } 5527 sreq := &testpb.StreamingOutputCallRequest{ 5528 ResponseType: testpb.PayloadType_COMPRESSABLE, 5529 ResponseParameters: respParam, 5530 Payload: extraLargePayload, 5531 } 5532 // Test recv case: server receives a message larger than maxServerReceiveMsgSize. 5533 stream, err := tc.FullDuplexCall(te.ctx) 5534 if err != nil { 5535 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5536 } 5537 if err = stream.Send(sreq); err != nil { 5538 t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err) 5539 } 5540 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 5541 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 5542 } 5543 // Test send case: server sends a message larger than maxServerSendMsgSize. 5544 sreq.Payload = smallPayload 5545 respParam[0].Size = int32(extraLargeSize) 5546 5547 stream, err = tc.FullDuplexCall(te.ctx) 5548 if err != nil { 5549 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5550 } 5551 if err = stream.Send(sreq); err != nil { 5552 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 5553 } 5554 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 5555 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 5556 } 5557} 5558 5559// The following functions with function name ending with TD indicates that they 5560// should be deleted after old service config API is deprecated and deleted. 5561func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) { 5562 te := newTest(t, e) 5563 // We write before read. 5564 ch := make(chan grpc.ServiceConfig, 1) 5565 te.sc = ch 5566 te.userAgent = testAppUA 5567 te.declareLogNoise( 5568 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 5569 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 5570 "grpc: addrConn.resetTransport failed to create client transport: connection error", 5571 "Failed to dial : context canceled; please retry.", 5572 ) 5573 return te, ch 5574} 5575 5576func (s) TestServiceConfigGetMethodConfigTD(t *testing.T) { 5577 for _, e := range listTestEnv() { 5578 testGetMethodConfigTD(t, e) 5579 } 5580} 5581 5582func testGetMethodConfigTD(t *testing.T, e env) { 5583 te, ch := testServiceConfigSetupTD(t, e) 5584 defer te.tearDown() 5585 5586 mc1 := grpc.MethodConfig{ 5587 WaitForReady: newBool(true), 5588 Timeout: newDuration(time.Millisecond), 5589 } 5590 mc2 := grpc.MethodConfig{WaitForReady: newBool(false)} 5591 m := make(map[string]grpc.MethodConfig) 5592 m["/grpc.testing.TestService/EmptyCall"] = mc1 5593 m["/grpc.testing.TestService/"] = mc2 5594 sc := grpc.ServiceConfig{ 5595 Methods: m, 5596 } 5597 ch <- sc 5598 5599 cc := te.clientConn() 5600 tc := testpb.NewTestServiceClient(cc) 5601 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 5602 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 5603 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5604 } 5605 5606 m = make(map[string]grpc.MethodConfig) 5607 m["/grpc.testing.TestService/UnaryCall"] = mc1 5608 m["/grpc.testing.TestService/"] = mc2 5609 sc = grpc.ServiceConfig{ 5610 Methods: m, 5611 } 5612 ch <- sc 5613 // Wait for the new service config to propagate. 5614 for { 5615 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { 5616 continue 5617 } 5618 break 5619 } 5620 // The following RPCs are expected to become fail-fast. 5621 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { 5622 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) 5623 } 5624} 5625 5626func (s) TestServiceConfigWaitForReadyTD(t *testing.T) { 5627 for _, e := range listTestEnv() { 5628 testServiceConfigWaitForReadyTD(t, e) 5629 } 5630} 5631 5632func testServiceConfigWaitForReadyTD(t *testing.T, e env) { 5633 te, ch := testServiceConfigSetupTD(t, e) 5634 defer te.tearDown() 5635 5636 // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. 5637 mc := grpc.MethodConfig{ 5638 WaitForReady: newBool(false), 5639 Timeout: newDuration(time.Millisecond), 5640 } 5641 m := make(map[string]grpc.MethodConfig) 5642 m["/grpc.testing.TestService/EmptyCall"] = mc 5643 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5644 sc := grpc.ServiceConfig{ 5645 Methods: m, 5646 } 5647 ch <- sc 5648 5649 cc := te.clientConn() 5650 tc := testpb.NewTestServiceClient(cc) 5651 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 5652 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 5653 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5654 } 5655 if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 5656 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 5657 } 5658 5659 // Generate a service config update. 5660 // Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. 5661 mc.WaitForReady = newBool(true) 5662 m = make(map[string]grpc.MethodConfig) 5663 m["/grpc.testing.TestService/EmptyCall"] = mc 5664 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5665 sc = grpc.ServiceConfig{ 5666 Methods: m, 5667 } 5668 ch <- sc 5669 5670 // Wait for the new service config to take effect. 5671 mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") 5672 for { 5673 if !*mc.WaitForReady { 5674 time.Sleep(100 * time.Millisecond) 5675 mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") 5676 continue 5677 } 5678 break 5679 } 5680 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 5681 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 5682 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5683 } 5684 if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded { 5685 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 5686 } 5687} 5688 5689func (s) TestServiceConfigTimeoutTD(t *testing.T) { 5690 for _, e := range listTestEnv() { 5691 testServiceConfigTimeoutTD(t, e) 5692 } 5693} 5694 5695func testServiceConfigTimeoutTD(t *testing.T, e env) { 5696 te, ch := testServiceConfigSetupTD(t, e) 5697 defer te.tearDown() 5698 5699 // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 5700 mc := grpc.MethodConfig{ 5701 Timeout: newDuration(time.Hour), 5702 } 5703 m := make(map[string]grpc.MethodConfig) 5704 m["/grpc.testing.TestService/EmptyCall"] = mc 5705 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5706 sc := grpc.ServiceConfig{ 5707 Methods: m, 5708 } 5709 ch <- sc 5710 5711 cc := te.clientConn() 5712 tc := testpb.NewTestServiceClient(cc) 5713 // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. 5714 ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) 5715 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 5716 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5717 } 5718 cancel() 5719 ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) 5720 if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 5721 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 5722 } 5723 cancel() 5724 5725 // Generate a service config update. 5726 // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 5727 mc.Timeout = newDuration(time.Nanosecond) 5728 m = make(map[string]grpc.MethodConfig) 5729 m["/grpc.testing.TestService/EmptyCall"] = mc 5730 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5731 sc = grpc.ServiceConfig{ 5732 Methods: m, 5733 } 5734 ch <- sc 5735 5736 // Wait for the new service config to take effect. 5737 mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") 5738 for { 5739 if *mc.Timeout != time.Nanosecond { 5740 time.Sleep(100 * time.Millisecond) 5741 mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") 5742 continue 5743 } 5744 break 5745 } 5746 5747 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 5748 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 5749 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5750 } 5751 cancel() 5752 5753 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 5754 if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 5755 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 5756 } 5757 cancel() 5758} 5759 5760func (s) TestServiceConfigMaxMsgSizeTD(t *testing.T) { 5761 for _, e := range listTestEnv() { 5762 testServiceConfigMaxMsgSizeTD(t, e) 5763 } 5764} 5765 5766func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { 5767 // Setting up values and objects shared across all test cases. 5768 const smallSize = 1 5769 const largeSize = 1024 5770 const extraLargeSize = 2048 5771 5772 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 5773 if err != nil { 5774 t.Fatal(err) 5775 } 5776 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 5777 if err != nil { 5778 t.Fatal(err) 5779 } 5780 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) 5781 if err != nil { 5782 t.Fatal(err) 5783 } 5784 5785 mc := grpc.MethodConfig{ 5786 MaxReqSize: newInt(extraLargeSize), 5787 MaxRespSize: newInt(extraLargeSize), 5788 } 5789 5790 m := make(map[string]grpc.MethodConfig) 5791 m["/grpc.testing.TestService/UnaryCall"] = mc 5792 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5793 sc := grpc.ServiceConfig{ 5794 Methods: m, 5795 } 5796 // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 5797 te1, ch1 := testServiceConfigSetupTD(t, e) 5798 te1.startServer(&testServer{security: e.security}) 5799 defer te1.tearDown() 5800 5801 ch1 <- sc 5802 tc := testpb.NewTestServiceClient(te1.clientConn()) 5803 5804 req := &testpb.SimpleRequest{ 5805 ResponseType: testpb.PayloadType_COMPRESSABLE, 5806 ResponseSize: int32(extraLargeSize), 5807 Payload: smallPayload, 5808 } 5809 // Test for unary RPC recv. 5810 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 5811 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 5812 } 5813 5814 // Test for unary RPC send. 5815 req.Payload = extraLargePayload 5816 req.ResponseSize = int32(smallSize) 5817 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 5818 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 5819 } 5820 5821 // Test for streaming RPC recv. 5822 respParam := []*testpb.ResponseParameters{ 5823 { 5824 Size: int32(extraLargeSize), 5825 }, 5826 } 5827 sreq := &testpb.StreamingOutputCallRequest{ 5828 ResponseType: testpb.PayloadType_COMPRESSABLE, 5829 ResponseParameters: respParam, 5830 Payload: smallPayload, 5831 } 5832 stream, err := tc.FullDuplexCall(te1.ctx) 5833 if err != nil { 5834 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5835 } 5836 if err := stream.Send(sreq); err != nil { 5837 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 5838 } 5839 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 5840 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 5841 } 5842 5843 // Test for streaming RPC send. 5844 respParam[0].Size = int32(smallSize) 5845 sreq.Payload = extraLargePayload 5846 stream, err = tc.FullDuplexCall(te1.ctx) 5847 if err != nil { 5848 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5849 } 5850 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 5851 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 5852 } 5853 5854 // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 5855 te2, ch2 := testServiceConfigSetupTD(t, e) 5856 te2.maxClientReceiveMsgSize = newInt(1024) 5857 te2.maxClientSendMsgSize = newInt(1024) 5858 te2.startServer(&testServer{security: e.security}) 5859 defer te2.tearDown() 5860 ch2 <- sc 5861 tc = testpb.NewTestServiceClient(te2.clientConn()) 5862 5863 // Test for unary RPC recv. 5864 req.Payload = smallPayload 5865 req.ResponseSize = int32(largeSize) 5866 5867 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 5868 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 5869 } 5870 5871 // Test for unary RPC send. 5872 req.Payload = largePayload 5873 req.ResponseSize = int32(smallSize) 5874 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 5875 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 5876 } 5877 5878 // Test for streaming RPC recv. 5879 stream, err = tc.FullDuplexCall(te2.ctx) 5880 respParam[0].Size = int32(largeSize) 5881 sreq.Payload = smallPayload 5882 if err != nil { 5883 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5884 } 5885 if err := stream.Send(sreq); err != nil { 5886 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 5887 } 5888 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 5889 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 5890 } 5891 5892 // Test for streaming RPC send. 5893 respParam[0].Size = int32(smallSize) 5894 sreq.Payload = largePayload 5895 stream, err = tc.FullDuplexCall(te2.ctx) 5896 if err != nil { 5897 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5898 } 5899 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 5900 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 5901 } 5902 5903 // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 5904 te3, ch3 := testServiceConfigSetupTD(t, e) 5905 te3.maxClientReceiveMsgSize = newInt(4096) 5906 te3.maxClientSendMsgSize = newInt(4096) 5907 te3.startServer(&testServer{security: e.security}) 5908 defer te3.tearDown() 5909 ch3 <- sc 5910 tc = testpb.NewTestServiceClient(te3.clientConn()) 5911 5912 // Test for unary RPC recv. 5913 req.Payload = smallPayload 5914 req.ResponseSize = int32(largeSize) 5915 5916 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 5917 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 5918 } 5919 5920 req.ResponseSize = int32(extraLargeSize) 5921 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 5922 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 5923 } 5924 5925 // Test for unary RPC send. 5926 req.Payload = largePayload 5927 req.ResponseSize = int32(smallSize) 5928 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 5929 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 5930 } 5931 5932 req.Payload = extraLargePayload 5933 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 5934 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 5935 } 5936 5937 // Test for streaming RPC recv. 5938 stream, err = tc.FullDuplexCall(te3.ctx) 5939 if err != nil { 5940 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5941 } 5942 respParam[0].Size = int32(largeSize) 5943 sreq.Payload = smallPayload 5944 5945 if err := stream.Send(sreq); err != nil { 5946 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 5947 } 5948 if _, err := stream.Recv(); err != nil { 5949 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err) 5950 } 5951 5952 respParam[0].Size = int32(extraLargeSize) 5953 5954 if err := stream.Send(sreq); err != nil { 5955 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 5956 } 5957 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 5958 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 5959 } 5960 5961 // Test for streaming RPC send. 5962 respParam[0].Size = int32(smallSize) 5963 sreq.Payload = largePayload 5964 stream, err = tc.FullDuplexCall(te3.ctx) 5965 if err != nil { 5966 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5967 } 5968 if err := stream.Send(sreq); err != nil { 5969 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 5970 } 5971 sreq.Payload = extraLargePayload 5972 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 5973 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 5974 } 5975} 5976 5977func (s) TestMethodFromServerStream(t *testing.T) { 5978 const testMethod = "/package.service/method" 5979 e := tcpClearRREnv 5980 te := newTest(t, e) 5981 var method string 5982 var ok bool 5983 te.unknownHandler = func(srv interface{}, stream grpc.ServerStream) error { 5984 method, ok = grpc.MethodFromServerStream(stream) 5985 return nil 5986 } 5987 5988 te.startServer(nil) 5989 defer te.tearDown() 5990 _ = te.clientConn().Invoke(context.Background(), testMethod, nil, nil) 5991 if !ok || method != testMethod { 5992 t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod) 5993 } 5994} 5995 5996func (s) TestInterceptorCanAccessCallOptions(t *testing.T) { 5997 e := tcpClearRREnv 5998 te := newTest(t, e) 5999 te.startServer(&testServer{security: e.security}) 6000 defer te.tearDown() 6001 6002 type observedOptions struct { 6003 headers []*metadata.MD 6004 trailers []*metadata.MD 6005 peer []*peer.Peer 6006 creds []credentials.PerRPCCredentials 6007 failFast []bool 6008 maxRecvSize []int 6009 maxSendSize []int 6010 compressor []string 6011 subtype []string 6012 } 6013 var observedOpts observedOptions 6014 populateOpts := func(opts []grpc.CallOption) { 6015 for _, o := range opts { 6016 switch o := o.(type) { 6017 case grpc.HeaderCallOption: 6018 observedOpts.headers = append(observedOpts.headers, o.HeaderAddr) 6019 case grpc.TrailerCallOption: 6020 observedOpts.trailers = append(observedOpts.trailers, o.TrailerAddr) 6021 case grpc.PeerCallOption: 6022 observedOpts.peer = append(observedOpts.peer, o.PeerAddr) 6023 case grpc.PerRPCCredsCallOption: 6024 observedOpts.creds = append(observedOpts.creds, o.Creds) 6025 case grpc.FailFastCallOption: 6026 observedOpts.failFast = append(observedOpts.failFast, o.FailFast) 6027 case grpc.MaxRecvMsgSizeCallOption: 6028 observedOpts.maxRecvSize = append(observedOpts.maxRecvSize, o.MaxRecvMsgSize) 6029 case grpc.MaxSendMsgSizeCallOption: 6030 observedOpts.maxSendSize = append(observedOpts.maxSendSize, o.MaxSendMsgSize) 6031 case grpc.CompressorCallOption: 6032 observedOpts.compressor = append(observedOpts.compressor, o.CompressorType) 6033 case grpc.ContentSubtypeCallOption: 6034 observedOpts.subtype = append(observedOpts.subtype, o.ContentSubtype) 6035 } 6036 } 6037 } 6038 6039 te.unaryClientInt = func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 6040 populateOpts(opts) 6041 return nil 6042 } 6043 te.streamClientInt = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 6044 populateOpts(opts) 6045 return nil, nil 6046 } 6047 6048 defaults := []grpc.CallOption{ 6049 grpc.WaitForReady(true), 6050 grpc.MaxCallRecvMsgSize(1010), 6051 } 6052 tc := testpb.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...))) 6053 6054 var headers metadata.MD 6055 var trailers metadata.MD 6056 var pr peer.Peer 6057 tc.UnaryCall(context.Background(), &testpb.SimpleRequest{}, 6058 grpc.MaxCallRecvMsgSize(100), 6059 grpc.MaxCallSendMsgSize(200), 6060 grpc.PerRPCCredentials(testPerRPCCredentials{}), 6061 grpc.Header(&headers), 6062 grpc.Trailer(&trailers), 6063 grpc.Peer(&pr)) 6064 expected := observedOptions{ 6065 failFast: []bool{false}, 6066 maxRecvSize: []int{1010, 100}, 6067 maxSendSize: []int{200}, 6068 creds: []credentials.PerRPCCredentials{testPerRPCCredentials{}}, 6069 headers: []*metadata.MD{&headers}, 6070 trailers: []*metadata.MD{&trailers}, 6071 peer: []*peer.Peer{&pr}, 6072 } 6073 6074 if !reflect.DeepEqual(expected, observedOpts) { 6075 t.Errorf("unary call did not observe expected options: expected %#v, got %#v", expected, observedOpts) 6076 } 6077 6078 observedOpts = observedOptions{} // reset 6079 6080 tc.StreamingInputCall(context.Background(), 6081 grpc.WaitForReady(false), 6082 grpc.MaxCallSendMsgSize(2020), 6083 grpc.UseCompressor("comp-type"), 6084 grpc.CallContentSubtype("json")) 6085 expected = observedOptions{ 6086 failFast: []bool{false, true}, 6087 maxRecvSize: []int{1010}, 6088 maxSendSize: []int{2020}, 6089 compressor: []string{"comp-type"}, 6090 subtype: []string{"json"}, 6091 } 6092 6093 if !reflect.DeepEqual(expected, observedOpts) { 6094 t.Errorf("streaming call did not observe expected options: expected %#v, got %#v", expected, observedOpts) 6095 } 6096} 6097 6098func (s) TestCompressorRegister(t *testing.T) { 6099 for _, e := range listTestEnv() { 6100 testCompressorRegister(t, e) 6101 } 6102} 6103 6104func testCompressorRegister(t *testing.T, e env) { 6105 te := newTest(t, e) 6106 te.clientCompression = false 6107 te.serverCompression = false 6108 te.clientUseCompression = true 6109 6110 te.startServer(&testServer{security: e.security}) 6111 defer te.tearDown() 6112 tc := testpb.NewTestServiceClient(te.clientConn()) 6113 6114 // Unary call 6115 const argSize = 271828 6116 const respSize = 314159 6117 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 6118 if err != nil { 6119 t.Fatal(err) 6120 } 6121 req := &testpb.SimpleRequest{ 6122 ResponseType: testpb.PayloadType_COMPRESSABLE, 6123 ResponseSize: respSize, 6124 Payload: payload, 6125 } 6126 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) 6127 if _, err := tc.UnaryCall(ctx, req); err != nil { 6128 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 6129 } 6130 // Streaming RPC 6131 ctx, cancel := context.WithCancel(context.Background()) 6132 defer cancel() 6133 stream, err := tc.FullDuplexCall(ctx) 6134 if err != nil { 6135 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6136 } 6137 respParam := []*testpb.ResponseParameters{ 6138 { 6139 Size: 31415, 6140 }, 6141 } 6142 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) 6143 if err != nil { 6144 t.Fatal(err) 6145 } 6146 sreq := &testpb.StreamingOutputCallRequest{ 6147 ResponseType: testpb.PayloadType_COMPRESSABLE, 6148 ResponseParameters: respParam, 6149 Payload: payload, 6150 } 6151 if err := stream.Send(sreq); err != nil { 6152 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6153 } 6154 if _, err := stream.Recv(); err != nil { 6155 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 6156 } 6157} 6158 6159func (s) TestServeExitsWhenListenerClosed(t *testing.T) { 6160 ss := &stubServer{ 6161 emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { 6162 return &testpb.Empty{}, nil 6163 }, 6164 } 6165 6166 s := grpc.NewServer() 6167 defer s.Stop() 6168 testpb.RegisterTestServiceServer(s, ss) 6169 6170 lis, err := net.Listen("tcp", "localhost:0") 6171 if err != nil { 6172 t.Fatalf("Failed to create listener: %v", err) 6173 } 6174 6175 done := make(chan struct{}) 6176 go func() { 6177 s.Serve(lis) 6178 close(done) 6179 }() 6180 6181 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) 6182 if err != nil { 6183 t.Fatalf("Failed to dial server: %v", err) 6184 } 6185 defer cc.Close() 6186 c := testpb.NewTestServiceClient(cc) 6187 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6188 defer cancel() 6189 if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil { 6190 t.Fatalf("Failed to send test RPC to server: %v", err) 6191 } 6192 6193 if err := lis.Close(); err != nil { 6194 t.Fatalf("Failed to close listener: %v", err) 6195 } 6196 const timeout = 5 * time.Second 6197 timer := time.NewTimer(timeout) 6198 select { 6199 case <-done: 6200 return 6201 case <-timer.C: 6202 t.Fatalf("Serve did not return after %v", timeout) 6203 } 6204} 6205 6206// Service handler returns status with invalid utf8 message. 6207func (s) TestStatusInvalidUTF8Message(t *testing.T) { 6208 var ( 6209 origMsg = string([]byte{0xff, 0xfe, 0xfd}) 6210 wantMsg = "���" 6211 ) 6212 6213 ss := &stubServer{ 6214 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 6215 return nil, status.Errorf(codes.Internal, origMsg) 6216 }, 6217 } 6218 if err := ss.Start(nil); err != nil { 6219 t.Fatalf("Error starting endpoint server: %v", err) 6220 } 6221 defer ss.Stop() 6222 6223 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 6224 defer cancel() 6225 6226 if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg { 6227 t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg) 6228 } 6229} 6230 6231// Service handler returns status with details and invalid utf8 message. Proto 6232// will fail to marshal the status because of the invalid utf8 message. Details 6233// will be dropped when sending. 6234func (s) TestStatusInvalidUTF8Details(t *testing.T) { 6235 grpctest.TLogger.ExpectError("transport: failed to marshal rpc status") 6236 6237 var ( 6238 origMsg = string([]byte{0xff, 0xfe, 0xfd}) 6239 wantMsg = "���" 6240 ) 6241 6242 ss := &stubServer{ 6243 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 6244 st := status.New(codes.Internal, origMsg) 6245 st, err := st.WithDetails(&testpb.Empty{}) 6246 if err != nil { 6247 return nil, err 6248 } 6249 return nil, st.Err() 6250 }, 6251 } 6252 if err := ss.Start(nil); err != nil { 6253 t.Fatalf("Error starting endpoint server: %v", err) 6254 } 6255 defer ss.Stop() 6256 6257 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 6258 defer cancel() 6259 6260 _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) 6261 st := status.Convert(err) 6262 if st.Message() != wantMsg { 6263 t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg) 6264 } 6265 if len(st.Details()) != 0 { 6266 // Details should be dropped on the server side. 6267 t.Fatalf("RPC status contain details: %v, want no details", st.Details()) 6268 } 6269} 6270 6271func (s) TestClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T) { 6272 for _, e := range listTestEnv() { 6273 if e.httpHandler { 6274 continue 6275 } 6276 testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t, e) 6277 } 6278} 6279 6280func testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T, e env) { 6281 te := newTest(t, e) 6282 te.userAgent = testAppUA 6283 smallSize := 1024 6284 te.maxServerReceiveMsgSize = &smallSize 6285 te.startServer(&testServer{security: e.security}) 6286 defer te.tearDown() 6287 tc := testpb.NewTestServiceClient(te.clientConn()) 6288 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1048576) 6289 if err != nil { 6290 t.Fatal(err) 6291 } 6292 req := &testpb.SimpleRequest{ 6293 ResponseType: testpb.PayloadType_COMPRESSABLE, 6294 Payload: payload, 6295 } 6296 var wg sync.WaitGroup 6297 for i := 0; i < 10; i++ { 6298 wg.Add(1) 6299 go func() { 6300 defer wg.Done() 6301 for j := 0; j < 100; j++ { 6302 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) 6303 defer cancel() 6304 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.ResourceExhausted { 6305 t.Errorf("TestService/UnaryCall(_,_) = _. %v, want code: %s", err, codes.ResourceExhausted) 6306 return 6307 } 6308 } 6309 }() 6310 } 6311 wg.Wait() 6312} 6313 6314func (s) TestRPCTimeout(t *testing.T) { 6315 for _, e := range listTestEnv() { 6316 testRPCTimeout(t, e) 6317 } 6318} 6319 6320func testRPCTimeout(t *testing.T, e env) { 6321 te := newTest(t, e) 6322 te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond}) 6323 defer te.tearDown() 6324 6325 cc := te.clientConn() 6326 tc := testpb.NewTestServiceClient(cc) 6327 6328 const argSize = 2718 6329 const respSize = 314 6330 6331 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 6332 if err != nil { 6333 t.Fatal(err) 6334 } 6335 6336 req := &testpb.SimpleRequest{ 6337 ResponseType: testpb.PayloadType_COMPRESSABLE, 6338 ResponseSize: respSize, 6339 Payload: payload, 6340 } 6341 for i := -1; i <= 10; i++ { 6342 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond) 6343 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded { 6344 t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded) 6345 } 6346 cancel() 6347 } 6348} 6349 6350func (s) TestDisabledIOBuffers(t *testing.T) { 6351 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000)) 6352 if err != nil { 6353 t.Fatalf("Failed to create payload: %v", err) 6354 } 6355 req := &testpb.StreamingOutputCallRequest{ 6356 Payload: payload, 6357 } 6358 resp := &testpb.StreamingOutputCallResponse{ 6359 Payload: payload, 6360 } 6361 6362 ss := &stubServer{ 6363 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 6364 for { 6365 in, err := stream.Recv() 6366 if err == io.EOF { 6367 return nil 6368 } 6369 if err != nil { 6370 t.Errorf("stream.Recv() = _, %v, want _, <nil>", err) 6371 return err 6372 } 6373 if !reflect.DeepEqual(in.Payload.Body, payload.Body) { 6374 t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body)) 6375 return err 6376 } 6377 if err := stream.Send(resp); err != nil { 6378 t.Errorf("stream.Send(_)= %v, want <nil>", err) 6379 return err 6380 } 6381 6382 } 6383 }, 6384 } 6385 6386 s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0)) 6387 testpb.RegisterTestServiceServer(s, ss) 6388 6389 lis, err := net.Listen("tcp", "localhost:0") 6390 if err != nil { 6391 t.Fatalf("Failed to create listener: %v", err) 6392 } 6393 6394 done := make(chan struct{}) 6395 go func() { 6396 s.Serve(lis) 6397 close(done) 6398 }() 6399 defer s.Stop() 6400 dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second) 6401 defer dcancel() 6402 cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0)) 6403 if err != nil { 6404 t.Fatalf("Failed to dial server") 6405 } 6406 defer cc.Close() 6407 c := testpb.NewTestServiceClient(cc) 6408 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6409 defer cancel() 6410 stream, err := c.FullDuplexCall(ctx, grpc.WaitForReady(true)) 6411 if err != nil { 6412 t.Fatalf("Failed to send test RPC to server") 6413 } 6414 for i := 0; i < 10; i++ { 6415 if err := stream.Send(req); err != nil { 6416 t.Fatalf("stream.Send(_) = %v, want <nil>", err) 6417 } 6418 in, err := stream.Recv() 6419 if err != nil { 6420 t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err) 6421 } 6422 if !reflect.DeepEqual(in.Payload.Body, payload.Body) { 6423 t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body)) 6424 } 6425 } 6426 stream.CloseSend() 6427 if _, err := stream.Recv(); err != io.EOF { 6428 t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err) 6429 } 6430} 6431 6432func (s) TestServerMaxHeaderListSizeClientUserViolation(t *testing.T) { 6433 for _, e := range listTestEnv() { 6434 if e.httpHandler { 6435 continue 6436 } 6437 testServerMaxHeaderListSizeClientUserViolation(t, e) 6438 } 6439} 6440 6441func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) { 6442 te := newTest(t, e) 6443 te.maxServerHeaderListSize = new(uint32) 6444 *te.maxServerHeaderListSize = 216 6445 te.startServer(&testServer{security: e.security}) 6446 defer te.tearDown() 6447 6448 cc := te.clientConn() 6449 tc := testpb.NewTestServiceClient(cc) 6450 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6451 defer cancel() 6452 metadata.AppendToOutgoingContext(ctx, "oversize", string(make([]byte, 216))) 6453 var err error 6454 if err = verifyResultWithDelay(func() (bool, error) { 6455 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal { 6456 return true, nil 6457 } 6458 return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal) 6459 }); err != nil { 6460 t.Fatal(err) 6461 } 6462} 6463 6464func (s) TestClientMaxHeaderListSizeServerUserViolation(t *testing.T) { 6465 for _, e := range listTestEnv() { 6466 if e.httpHandler { 6467 continue 6468 } 6469 testClientMaxHeaderListSizeServerUserViolation(t, e) 6470 } 6471} 6472 6473func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) { 6474 te := newTest(t, e) 6475 te.maxClientHeaderListSize = new(uint32) 6476 *te.maxClientHeaderListSize = 1 // any header server sends will violate 6477 te.startServer(&testServer{security: e.security}) 6478 defer te.tearDown() 6479 6480 cc := te.clientConn() 6481 tc := testpb.NewTestServiceClient(cc) 6482 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6483 defer cancel() 6484 var err error 6485 if err = verifyResultWithDelay(func() (bool, error) { 6486 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal { 6487 return true, nil 6488 } 6489 return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal) 6490 }); err != nil { 6491 t.Fatal(err) 6492 } 6493} 6494 6495func (s) TestServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T) { 6496 for _, e := range listTestEnv() { 6497 if e.httpHandler || e.security == "tls" { 6498 continue 6499 } 6500 testServerMaxHeaderListSizeClientIntentionalViolation(t, e) 6501 } 6502} 6503 6504func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env) { 6505 te := newTest(t, e) 6506 te.maxServerHeaderListSize = new(uint32) 6507 *te.maxServerHeaderListSize = 512 6508 te.startServer(&testServer{security: e.security}) 6509 defer te.tearDown() 6510 6511 cc, dw := te.clientConnWithConnControl() 6512 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 6513 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6514 defer cancel() 6515 stream, err := tc.FullDuplexCall(ctx) 6516 if err != nil { 6517 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) 6518 } 6519 rcw := dw.getRawConnWrapper() 6520 val := make([]string, 512) 6521 for i := range val { 6522 val[i] = "a" 6523 } 6524 // allow for client to send the initial header 6525 time.Sleep(100 * time.Millisecond) 6526 rcw.writeHeaders(http2.HeadersFrameParam{ 6527 StreamID: tc.getCurrentStreamID(), 6528 BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")), 6529 EndStream: false, 6530 EndHeaders: true, 6531 }) 6532 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal { 6533 t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal) 6534 } 6535} 6536 6537func (s) TestClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T) { 6538 for _, e := range listTestEnv() { 6539 if e.httpHandler || e.security == "tls" { 6540 continue 6541 } 6542 testClientMaxHeaderListSizeServerIntentionalViolation(t, e) 6543 } 6544} 6545 6546func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) { 6547 te := newTest(t, e) 6548 te.maxClientHeaderListSize = new(uint32) 6549 *te.maxClientHeaderListSize = 200 6550 lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true}) 6551 defer te.tearDown() 6552 cc, _ := te.clientConnWithConnControl() 6553 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 6554 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6555 defer cancel() 6556 stream, err := tc.FullDuplexCall(ctx) 6557 if err != nil { 6558 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) 6559 } 6560 var i int 6561 var rcw *rawConnWrapper 6562 for i = 0; i < 100; i++ { 6563 rcw = lw.getLastConn() 6564 if rcw != nil { 6565 break 6566 } 6567 time.Sleep(10 * time.Millisecond) 6568 continue 6569 } 6570 if i == 100 { 6571 t.Fatalf("failed to create server transport after 1s") 6572 } 6573 6574 val := make([]string, 200) 6575 for i := range val { 6576 val[i] = "a" 6577 } 6578 // allow for client to send the initial header. 6579 time.Sleep(100 * time.Millisecond) 6580 rcw.writeHeaders(http2.HeadersFrameParam{ 6581 StreamID: tc.getCurrentStreamID(), 6582 BlockFragment: rcw.encodeRawHeader("oversize", strings.Join(val, "")), 6583 EndStream: false, 6584 EndHeaders: true, 6585 }) 6586 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal { 6587 t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal) 6588 } 6589} 6590 6591func (s) TestNetPipeConn(t *testing.T) { 6592 // This test will block indefinitely if grpc writes both client and server 6593 // prefaces without either reading from the Conn. 6594 pl := testutils.NewPipeListener() 6595 s := grpc.NewServer() 6596 defer s.Stop() 6597 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 6598 return &testpb.SimpleResponse{}, nil 6599 }} 6600 testpb.RegisterTestServiceServer(s, ts) 6601 go s.Serve(pl) 6602 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6603 defer cancel() 6604 cc, err := grpc.DialContext(ctx, "", grpc.WithInsecure(), grpc.WithDialer(pl.Dialer())) 6605 if err != nil { 6606 t.Fatalf("Error creating client: %v", err) 6607 } 6608 defer cc.Close() 6609 client := testpb.NewTestServiceClient(cc) 6610 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 6611 t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) 6612 } 6613} 6614 6615func (s) TestLargeTimeout(t *testing.T) { 6616 for _, e := range listTestEnv() { 6617 testLargeTimeout(t, e) 6618 } 6619} 6620 6621func testLargeTimeout(t *testing.T, e env) { 6622 te := newTest(t, e) 6623 te.declareLogNoise("Server.processUnaryRPC failed to write status") 6624 6625 ts := &funcServer{} 6626 te.startServer(ts) 6627 defer te.tearDown() 6628 tc := testpb.NewTestServiceClient(te.clientConn()) 6629 6630 timeouts := []time.Duration{ 6631 time.Duration(math.MaxInt64), // will be (correctly) converted to 6632 // 2562048 hours, which overflows upon converting back to an int64 6633 2562047 * time.Hour, // the largest timeout that does not overflow 6634 } 6635 6636 for i, maxTimeout := range timeouts { 6637 ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 6638 deadline, ok := ctx.Deadline() 6639 timeout := time.Until(deadline) 6640 minTimeout := maxTimeout - 5*time.Second 6641 if !ok || timeout < minTimeout || timeout > maxTimeout { 6642 t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout) 6643 return nil, status.Error(codes.OutOfRange, "deadline error") 6644 } 6645 return &testpb.SimpleResponse{}, nil 6646 } 6647 6648 ctx, cancel := context.WithTimeout(context.Background(), maxTimeout) 6649 defer cancel() 6650 6651 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 6652 t.Errorf("case %v: UnaryCall(_) = _, %v; want _, nil", i, err) 6653 } 6654 } 6655} 6656 6657// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This 6658// test ensures that the connection is re-created after GO_AWAY and not affected by the 6659// subsequent (old) connection closure. 6660func (s) TestGoAwayThenClose(t *testing.T) { 6661 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) 6662 defer cancel() 6663 6664 lis1, err := net.Listen("tcp", "localhost:0") 6665 if err != nil { 6666 t.Fatalf("Error while listening. Err: %v", err) 6667 } 6668 s1 := grpc.NewServer() 6669 defer s1.Stop() 6670 ts := &funcServer{ 6671 unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 6672 return &testpb.SimpleResponse{}, nil 6673 }, 6674 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 6675 // Wait forever. 6676 _, err := stream.Recv() 6677 if err == nil { 6678 t.Error("expected to never receive any message") 6679 } 6680 return err 6681 }, 6682 } 6683 testpb.RegisterTestServiceServer(s1, ts) 6684 go s1.Serve(lis1) 6685 6686 conn2Established := grpcsync.NewEvent() 6687 lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established) 6688 if err != nil { 6689 t.Fatalf("Error while listening. Err: %v", err) 6690 } 6691 s2 := grpc.NewServer() 6692 defer s2.Stop() 6693 testpb.RegisterTestServiceServer(s2, ts) 6694 go s2.Serve(lis2) 6695 6696 r, rcleanup := manual.GenerateAndRegisterManualResolver() 6697 defer rcleanup() 6698 r.InitialState(resolver.State{Addresses: []resolver.Address{ 6699 {Addr: lis1.Addr().String()}, 6700 }}) 6701 cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure()) 6702 if err != nil { 6703 t.Fatalf("Error creating client: %v", err) 6704 } 6705 defer cc.Close() 6706 6707 client := testpb.NewTestServiceClient(cc) 6708 6709 // Should go on connection 1. We use a long-lived RPC because it will cause GracefulStop to send GO_AWAY, but the 6710 // connection doesn't get closed until the server stops and the client receives. 6711 stream, err := client.FullDuplexCall(ctx) 6712 if err != nil { 6713 t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err) 6714 } 6715 6716 r.UpdateState(resolver.State{Addresses: []resolver.Address{ 6717 {Addr: lis1.Addr().String()}, 6718 {Addr: lis2.Addr().String()}, 6719 }}) 6720 6721 // Send GO_AWAY to connection 1. 6722 go s1.GracefulStop() 6723 6724 // Wait for connection 2 to be established. 6725 <-conn2Established.Done() 6726 6727 // Close connection 1. 6728 s1.Stop() 6729 6730 // Wait for client to close. 6731 _, err = stream.Recv() 6732 if err == nil { 6733 t.Fatal("expected the stream to die, but got a successful Recv") 6734 } 6735 6736 // Do a bunch of RPCs, make sure it stays stable. These should go to connection 2. 6737 for i := 0; i < 10; i++ { 6738 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 6739 t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) 6740 } 6741 } 6742} 6743 6744func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) { 6745 lis, err := net.Listen(network, address) 6746 if err != nil { 6747 return nil, err 6748 } 6749 return notifyingListener{connEstablished: event, Listener: lis}, nil 6750} 6751 6752type notifyingListener struct { 6753 connEstablished *grpcsync.Event 6754 net.Listener 6755} 6756 6757func (lis notifyingListener) Accept() (net.Conn, error) { 6758 defer lis.connEstablished.Fire() 6759 return lis.Listener.Accept() 6760} 6761 6762func (s) TestRPCWaitsForResolver(t *testing.T) { 6763 te := testServiceConfigSetup(t, tcpClearRREnv) 6764 te.startServer(&testServer{security: tcpClearRREnv.security}) 6765 defer te.tearDown() 6766 r, rcleanup := manual.GenerateAndRegisterManualResolver() 6767 defer rcleanup() 6768 6769 te.resolverScheme = r.Scheme() 6770 te.nonBlockingDial = true 6771 cc := te.clientConn() 6772 tc := testpb.NewTestServiceClient(cc) 6773 6774 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) 6775 defer cancel() 6776 // With no resolved addresses yet, this will timeout. 6777 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 6778 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 6779 } 6780 6781 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) 6782 defer cancel() 6783 go func() { 6784 time.Sleep(time.Second) 6785 r.UpdateState(resolver.State{ 6786 Addresses: []resolver.Address{{Addr: te.srvAddr}}, 6787 ServiceConfig: parseCfg(r, `{ 6788 "methodConfig": [ 6789 { 6790 "name": [ 6791 { 6792 "service": "grpc.testing.TestService", 6793 "method": "UnaryCall" 6794 } 6795 ], 6796 "maxRequestMessageBytes": 0 6797 } 6798 ] 6799 }`)}) 6800 }() 6801 // We wait a second before providing a service config and resolving 6802 // addresses. So this will wait for that and then honor the 6803 // maxRequestMessageBytes it contains. 6804 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{ResponseType: testpb.PayloadType_UNCOMPRESSABLE}); status.Code(err) != codes.ResourceExhausted { 6805 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err) 6806 } 6807 if got := ctx.Err(); got != nil { 6808 t.Fatalf("ctx.Err() = %v; want nil (deadline should be set short by service config)", got) 6809 } 6810 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 6811 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err) 6812 } 6813} 6814 6815func (s) TestHTTPHeaderFrameErrorHandlingHTTPMode(t *testing.T) { 6816 // Non-gRPC content-type fallback path. 6817 for httpCode := range transport.HTTPStatusConvTab { 6818 doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ 6819 ":status", fmt.Sprintf("%d", httpCode), 6820 "content-type", "text/html", // non-gRPC content type to switch to HTTP mode. 6821 "grpc-status", "1", // Make up a gRPC status error 6822 "grpc-status-details-bin", "???", // Make up a gRPC field parsing error 6823 }) 6824 } 6825 6826 // Missing content-type fallback path. 6827 for httpCode := range transport.HTTPStatusConvTab { 6828 doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ 6829 ":status", fmt.Sprintf("%d", httpCode), 6830 // Omitting content type to switch to HTTP mode. 6831 "grpc-status", "1", // Make up a gRPC status error 6832 "grpc-status-details-bin", "???", // Make up a gRPC field parsing error 6833 }) 6834 } 6835 6836 // Malformed HTTP status when fallback. 6837 doHTTPHeaderTest(t, codes.Internal, []string{ 6838 ":status", "abc", 6839 // Omitting content type to switch to HTTP mode. 6840 "grpc-status", "1", // Make up a gRPC status error 6841 "grpc-status-details-bin", "???", // Make up a gRPC field parsing error 6842 }) 6843} 6844 6845// Testing erroneous ResponseHeader or Trailers-only (delivered in the first HEADERS frame). 6846func (s) TestHTTPHeaderFrameErrorHandlingInitialHeader(t *testing.T) { 6847 for _, test := range []struct { 6848 header []string 6849 errCode codes.Code 6850 }{ 6851 { 6852 // missing gRPC status. 6853 header: []string{ 6854 ":status", "403", 6855 "content-type", "application/grpc", 6856 }, 6857 errCode: codes.Unknown, 6858 }, 6859 { 6860 // malformed grpc-status. 6861 header: []string{ 6862 ":status", "502", 6863 "content-type", "application/grpc", 6864 "grpc-status", "abc", 6865 }, 6866 errCode: codes.Internal, 6867 }, 6868 { 6869 // Malformed grpc-tags-bin field. 6870 header: []string{ 6871 ":status", "502", 6872 "content-type", "application/grpc", 6873 "grpc-status", "0", 6874 "grpc-tags-bin", "???", 6875 }, 6876 errCode: codes.Internal, 6877 }, 6878 { 6879 // gRPC status error. 6880 header: []string{ 6881 ":status", "502", 6882 "content-type", "application/grpc", 6883 "grpc-status", "3", 6884 }, 6885 errCode: codes.InvalidArgument, 6886 }, 6887 } { 6888 doHTTPHeaderTest(t, test.errCode, test.header) 6889 } 6890} 6891 6892// Testing non-Trailers-only Trailers (delievered in second HEADERS frame) 6893func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { 6894 for _, test := range []struct { 6895 responseHeader []string 6896 trailer []string 6897 errCode codes.Code 6898 }{ 6899 { 6900 responseHeader: []string{ 6901 ":status", "200", 6902 "content-type", "application/grpc", 6903 }, 6904 trailer: []string{ 6905 // trailer missing grpc-status 6906 ":status", "502", 6907 }, 6908 errCode: codes.Unknown, 6909 }, 6910 { 6911 responseHeader: []string{ 6912 ":status", "404", 6913 "content-type", "application/grpc", 6914 }, 6915 trailer: []string{ 6916 // malformed grpc-status-details-bin field 6917 "grpc-status", "0", 6918 "grpc-status-details-bin", "????", 6919 }, 6920 errCode: codes.Internal, 6921 }, 6922 } { 6923 doHTTPHeaderTest(t, test.errCode, test.responseHeader, test.trailer) 6924 } 6925} 6926 6927func (s) TestHTTPHeaderFrameErrorHandlingMoreThanTwoHeaders(t *testing.T) { 6928 header := []string{ 6929 ":status", "200", 6930 "content-type", "application/grpc", 6931 } 6932 doHTTPHeaderTest(t, codes.Internal, header, header, header) 6933} 6934 6935type httpServer struct { 6936 headerFields [][]string 6937} 6938 6939func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string, endStream bool) error { 6940 if len(headerFields)%2 == 1 { 6941 panic("odd number of kv args") 6942 } 6943 6944 var buf bytes.Buffer 6945 henc := hpack.NewEncoder(&buf) 6946 for len(headerFields) > 0 { 6947 k, v := headerFields[0], headerFields[1] 6948 headerFields = headerFields[2:] 6949 henc.WriteField(hpack.HeaderField{Name: k, Value: v}) 6950 } 6951 6952 return framer.WriteHeaders(http2.HeadersFrameParam{ 6953 StreamID: sid, 6954 BlockFragment: buf.Bytes(), 6955 EndStream: endStream, 6956 EndHeaders: true, 6957 }) 6958} 6959 6960func (s *httpServer) start(t *testing.T, lis net.Listener) { 6961 // Launch an HTTP server to send back header. 6962 go func() { 6963 conn, err := lis.Accept() 6964 if err != nil { 6965 t.Errorf("Error accepting connection: %v", err) 6966 return 6967 } 6968 defer conn.Close() 6969 // Read preface sent by client. 6970 if _, err = io.ReadFull(conn, make([]byte, len(http2.ClientPreface))); err != nil { 6971 t.Errorf("Error at server-side while reading preface from client. Err: %v", err) 6972 return 6973 } 6974 reader := bufio.NewReader(conn) 6975 writer := bufio.NewWriter(conn) 6976 framer := http2.NewFramer(writer, reader) 6977 if err = framer.WriteSettingsAck(); err != nil { 6978 t.Errorf("Error at server-side while sending Settings ack. Err: %v", err) 6979 return 6980 } 6981 writer.Flush() // necessary since client is expecting preface before declaring connection fully setup. 6982 6983 var sid uint32 6984 // Read frames until a header is received. 6985 for { 6986 frame, err := framer.ReadFrame() 6987 if err != nil { 6988 t.Errorf("Error at server-side while reading frame. Err: %v", err) 6989 return 6990 } 6991 if hframe, ok := frame.(*http2.HeadersFrame); ok { 6992 sid = hframe.Header().StreamID 6993 break 6994 } 6995 } 6996 for i, headers := range s.headerFields { 6997 if err = s.writeHeader(framer, sid, headers, i == len(s.headerFields)-1); err != nil { 6998 t.Errorf("Error at server-side while writing headers. Err: %v", err) 6999 return 7000 } 7001 writer.Flush() 7002 } 7003 }() 7004} 7005 7006func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string) { 7007 t.Helper() 7008 lis, err := net.Listen("tcp", "localhost:0") 7009 if err != nil { 7010 t.Fatalf("Failed to listen. Err: %v", err) 7011 } 7012 defer lis.Close() 7013 server := &httpServer{ 7014 headerFields: headerFields, 7015 } 7016 server.start(t, lis) 7017 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) 7018 if err != nil { 7019 t.Fatalf("failed to dial due to err: %v", err) 7020 } 7021 defer cc.Close() 7022 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 7023 defer cancel() 7024 client := testpb.NewTestServiceClient(cc) 7025 stream, err := client.FullDuplexCall(ctx) 7026 if err != nil { 7027 t.Fatalf("error creating stream due to err: %v", err) 7028 } 7029 if _, err := stream.Recv(); err == nil || status.Code(err) != errCode { 7030 t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode) 7031 } 7032} 7033 7034func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { 7035 g := r.CC.ParseServiceConfig(s) 7036 if g.Err != nil { 7037 panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err)) 7038 } 7039 return g 7040} 7041 7042func (s) TestClientCancellationPropagatesUnary(t *testing.T) { 7043 wg := &sync.WaitGroup{} 7044 called, done := make(chan struct{}), make(chan struct{}) 7045 ss := &stubServer{ 7046 emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { 7047 close(called) 7048 <-ctx.Done() 7049 err := ctx.Err() 7050 if err != context.Canceled { 7051 t.Errorf("ctx.Err() = %v; want context.Canceled", err) 7052 } 7053 close(done) 7054 return nil, err 7055 }, 7056 } 7057 if err := ss.Start(nil); err != nil { 7058 t.Fatalf("Error starting endpoint server: %v", err) 7059 } 7060 defer ss.Stop() 7061 7062 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 7063 7064 wg.Add(1) 7065 go func() { 7066 if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Canceled { 7067 t.Errorf("ss.client.EmptyCall() = _, %v; want _, Code()=codes.Canceled", err) 7068 } 7069 wg.Done() 7070 }() 7071 7072 select { 7073 case <-called: 7074 case <-time.After(5 * time.Second): 7075 t.Fatalf("failed to perform EmptyCall after 10s") 7076 } 7077 cancel() 7078 select { 7079 case <-done: 7080 case <-time.After(5 * time.Second): 7081 t.Fatalf("server failed to close done chan due to cancellation propagation") 7082 } 7083 wg.Wait() 7084} 7085 7086type badGzipCompressor struct{} 7087 7088func (badGzipCompressor) Do(w io.Writer, p []byte) error { 7089 buf := &bytes.Buffer{} 7090 gzw := gzip.NewWriter(buf) 7091 if _, err := gzw.Write(p); err != nil { 7092 return err 7093 } 7094 err := gzw.Close() 7095 bs := buf.Bytes() 7096 if len(bs) >= 6 { 7097 bs[len(bs)-6] ^= 1 // modify checksum at end by 1 byte 7098 } 7099 w.Write(bs) 7100 return err 7101} 7102 7103func (badGzipCompressor) Type() string { 7104 return "gzip" 7105} 7106 7107func (s) TestGzipBadChecksum(t *testing.T) { 7108 ss := &stubServer{ 7109 unaryCall: func(ctx context.Context, _ *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 7110 return &testpb.SimpleResponse{}, nil 7111 }, 7112 } 7113 if err := ss.Start(nil, grpc.WithCompressor(badGzipCompressor{})); err != nil { 7114 t.Fatalf("Error starting endpoint server: %v", err) 7115 } 7116 defer ss.Stop() 7117 7118 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 7119 defer cancel() 7120 7121 p, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1024)) 7122 if err != nil { 7123 t.Fatalf("Unexpected error from newPayload: %v", err) 7124 } 7125 if _, err := ss.client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: p}); err == nil || 7126 status.Code(err) != codes.Internal || 7127 !strings.Contains(status.Convert(err).Message(), gzip.ErrChecksum.Error()) { 7128 t.Errorf("ss.client.UnaryCall(_) = _, %v\n\twant: _, status(codes.Internal, contains %q)", err, gzip.ErrChecksum) 7129 } 7130} 7131 7132// When an RPC is canceled, it's possible that the last Recv() returns before 7133// all call options' after are executed. 7134func (s) TestCanceledRPCCallOptionRace(t *testing.T) { 7135 ss := &stubServer{ 7136 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 7137 err := stream.Send(&testpb.StreamingOutputCallResponse{}) 7138 if err != nil { 7139 return err 7140 } 7141 <-stream.Context().Done() 7142 return nil 7143 }, 7144 } 7145 if err := ss.Start(nil); err != nil { 7146 t.Fatalf("Error starting endpoint server: %v", err) 7147 } 7148 defer ss.Stop() 7149 7150 const count = 1000 7151 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 7152 defer cancel() 7153 7154 var wg sync.WaitGroup 7155 wg.Add(count) 7156 for i := 0; i < count; i++ { 7157 go func() { 7158 defer wg.Done() 7159 var p peer.Peer 7160 ctx, cancel := context.WithCancel(ctx) 7161 defer cancel() 7162 stream, err := ss.client.FullDuplexCall(ctx, grpc.Peer(&p)) 7163 if err != nil { 7164 t.Errorf("_.FullDuplexCall(_) = _, %v", err) 7165 return 7166 } 7167 if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil { 7168 t.Errorf("_ has error %v while sending", err) 7169 return 7170 } 7171 if _, err := stream.Recv(); err != nil { 7172 t.Errorf("%v.Recv() = %v", stream, err) 7173 return 7174 } 7175 cancel() 7176 if _, err := stream.Recv(); status.Code(err) != codes.Canceled { 7177 t.Errorf("%v compleled with error %v, want %s", stream, err, codes.Canceled) 7178 return 7179 } 7180 // If recv returns before call options are executed, peer.Addr is not set, 7181 // fail the test. 7182 if p.Addr == nil { 7183 t.Errorf("peer.Addr is nil, want non-nil") 7184 return 7185 } 7186 }() 7187 } 7188 wg.Wait() 7189} 7190