1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19//go:generate protoc --go_out=plugins=grpc:. codec_perf/perf.proto 20//go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto 21 22package test 23 24import ( 25 "bytes" 26 "context" 27 "crypto/tls" 28 "errors" 29 "flag" 30 "fmt" 31 "io" 32 "math" 33 "net" 34 "net/http" 35 "os" 36 "reflect" 37 "runtime" 38 "strings" 39 "sync" 40 "sync/atomic" 41 "syscall" 42 "testing" 43 "time" 44 45 "github.com/golang/protobuf/proto" 46 anypb "github.com/golang/protobuf/ptypes/any" 47 "golang.org/x/net/http2" 48 spb "google.golang.org/genproto/googleapis/rpc/status" 49 "google.golang.org/grpc" 50 "google.golang.org/grpc/balancer/roundrobin" 51 "google.golang.org/grpc/codes" 52 "google.golang.org/grpc/connectivity" 53 "google.golang.org/grpc/credentials" 54 _ "google.golang.org/grpc/encoding/gzip" 55 _ "google.golang.org/grpc/grpclog/glogger" 56 "google.golang.org/grpc/health" 57 healthgrpc "google.golang.org/grpc/health/grpc_health_v1" 58 healthpb "google.golang.org/grpc/health/grpc_health_v1" 59 "google.golang.org/grpc/internal/channelz" 60 "google.golang.org/grpc/internal/grpcsync" 61 "google.golang.org/grpc/internal/leakcheck" 62 "google.golang.org/grpc/internal/testutils" 63 "google.golang.org/grpc/keepalive" 64 "google.golang.org/grpc/metadata" 65 "google.golang.org/grpc/peer" 66 "google.golang.org/grpc/resolver" 67 "google.golang.org/grpc/resolver/manual" 68 _ "google.golang.org/grpc/resolver/passthrough" 69 "google.golang.org/grpc/stats" 70 "google.golang.org/grpc/status" 71 "google.golang.org/grpc/tap" 72 testpb "google.golang.org/grpc/test/grpc_testing" 73 "google.golang.org/grpc/testdata" 74) 75 76func init() { 77 channelz.TurnOn() 78} 79 80var ( 81 // For headers: 82 testMetadata = metadata.MD{ 83 "key1": []string{"value1"}, 84 "key2": []string{"value2"}, 85 "key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})}, 86 } 87 testMetadata2 = metadata.MD{ 88 "key1": []string{"value12"}, 89 "key2": []string{"value22"}, 90 } 91 // For trailers: 92 testTrailerMetadata = metadata.MD{ 93 "tkey1": []string{"trailerValue1"}, 94 "tkey2": []string{"trailerValue2"}, 95 "tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})}, 96 } 97 testTrailerMetadata2 = metadata.MD{ 98 "tkey1": []string{"trailerValue12"}, 99 "tkey2": []string{"trailerValue22"}, 100 } 101 // capital "Key" is illegal in HTTP/2. 102 malformedHTTP2Metadata = metadata.MD{ 103 "Key": []string{"foo"}, 104 } 105 testAppUA = "myApp1/1.0 myApp2/0.9" 106 failAppUA = "fail-this-RPC" 107 detailedError = status.ErrorProto(&spb.Status{ 108 Code: int32(codes.DataLoss), 109 Message: "error for testing: " + failAppUA, 110 Details: []*anypb.Any{{ 111 TypeUrl: "url", 112 Value: []byte{6, 0, 0, 6, 1, 3}, 113 }}, 114 }) 115) 116 117var raceMode bool // set by race.go in race mode 118 119type testServer struct { 120 security string // indicate the authentication protocol used by this server. 121 earlyFail bool // whether to error out the execution of a service handler prematurely. 122 setAndSendHeader bool // whether to call setHeader and sendHeader. 123 setHeaderOnly bool // whether to only call setHeader, not sendHeader. 124 multipleSetTrailer bool // whether to call setTrailer multiple times. 125 unaryCallSleepTime time.Duration 126} 127 128func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 129 if md, ok := metadata.FromIncomingContext(ctx); ok { 130 // For testing purpose, returns an error if user-agent is failAppUA. 131 // To test that client gets the correct error. 132 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) { 133 return nil, detailedError 134 } 135 var str []string 136 for _, entry := range md["user-agent"] { 137 str = append(str, "ua", entry) 138 } 139 grpc.SendHeader(ctx, metadata.Pairs(str...)) 140 } 141 return new(testpb.Empty), nil 142} 143 144func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) { 145 if size < 0 { 146 return nil, fmt.Errorf("Requested a response with invalid length %d", size) 147 } 148 body := make([]byte, size) 149 switch t { 150 case testpb.PayloadType_COMPRESSABLE: 151 case testpb.PayloadType_UNCOMPRESSABLE: 152 return nil, fmt.Errorf("PayloadType UNCOMPRESSABLE is not supported") 153 default: 154 return nil, fmt.Errorf("Unsupported payload type: %d", t) 155 } 156 return &testpb.Payload{ 157 Type: t, 158 Body: body, 159 }, nil 160} 161 162func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 163 md, ok := metadata.FromIncomingContext(ctx) 164 if ok { 165 if _, exists := md[":authority"]; !exists { 166 return nil, status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md) 167 } 168 if s.setAndSendHeader { 169 if err := grpc.SetHeader(ctx, md); err != nil { 170 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err) 171 } 172 if err := grpc.SendHeader(ctx, testMetadata2); err != nil { 173 return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err) 174 } 175 } else if s.setHeaderOnly { 176 if err := grpc.SetHeader(ctx, md); err != nil { 177 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err) 178 } 179 if err := grpc.SetHeader(ctx, testMetadata2); err != nil { 180 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err) 181 } 182 } else { 183 if err := grpc.SendHeader(ctx, md); err != nil { 184 return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err) 185 } 186 } 187 if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil { 188 return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err) 189 } 190 if s.multipleSetTrailer { 191 if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil { 192 return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err) 193 } 194 } 195 } 196 pr, ok := peer.FromContext(ctx) 197 if !ok { 198 return nil, status.Error(codes.DataLoss, "failed to get peer from ctx") 199 } 200 if pr.Addr == net.Addr(nil) { 201 return nil, status.Error(codes.DataLoss, "failed to get peer address") 202 } 203 if s.security != "" { 204 // Check Auth info 205 var authType, serverName string 206 switch info := pr.AuthInfo.(type) { 207 case credentials.TLSInfo: 208 authType = info.AuthType() 209 serverName = info.State.ServerName 210 default: 211 return nil, status.Error(codes.Unauthenticated, "Unknown AuthInfo type") 212 } 213 if authType != s.security { 214 return nil, status.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security) 215 } 216 if serverName != "x.test.youtube.com" { 217 return nil, status.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName) 218 } 219 } 220 // Simulate some service delay. 221 time.Sleep(s.unaryCallSleepTime) 222 223 payload, err := newPayload(in.GetResponseType(), in.GetResponseSize()) 224 if err != nil { 225 return nil, err 226 } 227 228 return &testpb.SimpleResponse{ 229 Payload: payload, 230 }, nil 231} 232 233func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { 234 if md, ok := metadata.FromIncomingContext(stream.Context()); ok { 235 if _, exists := md[":authority"]; !exists { 236 return status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md) 237 } 238 // For testing purpose, returns an error if user-agent is failAppUA. 239 // To test that client gets the correct error. 240 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) { 241 return status.Error(codes.DataLoss, "error for testing: "+failAppUA) 242 } 243 } 244 cs := args.GetResponseParameters() 245 for _, c := range cs { 246 if us := c.GetIntervalUs(); us > 0 { 247 time.Sleep(time.Duration(us) * time.Microsecond) 248 } 249 250 payload, err := newPayload(args.GetResponseType(), c.GetSize()) 251 if err != nil { 252 return err 253 } 254 255 if err := stream.Send(&testpb.StreamingOutputCallResponse{ 256 Payload: payload, 257 }); err != nil { 258 return err 259 } 260 } 261 return nil 262} 263 264func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error { 265 var sum int 266 for { 267 in, err := stream.Recv() 268 if err == io.EOF { 269 return stream.SendAndClose(&testpb.StreamingInputCallResponse{ 270 AggregatedPayloadSize: int32(sum), 271 }) 272 } 273 if err != nil { 274 return err 275 } 276 p := in.GetPayload().GetBody() 277 sum += len(p) 278 if s.earlyFail { 279 return status.Error(codes.NotFound, "not found") 280 } 281 } 282} 283 284func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 285 md, ok := metadata.FromIncomingContext(stream.Context()) 286 if ok { 287 if s.setAndSendHeader { 288 if err := stream.SetHeader(md); err != nil { 289 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err) 290 } 291 if err := stream.SendHeader(testMetadata2); err != nil { 292 return status.Errorf(status.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err) 293 } 294 } else if s.setHeaderOnly { 295 if err := stream.SetHeader(md); err != nil { 296 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err) 297 } 298 if err := stream.SetHeader(testMetadata2); err != nil { 299 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err) 300 } 301 } else { 302 if err := stream.SendHeader(md); err != nil { 303 return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) 304 } 305 } 306 stream.SetTrailer(testTrailerMetadata) 307 if s.multipleSetTrailer { 308 stream.SetTrailer(testTrailerMetadata2) 309 } 310 } 311 for { 312 in, err := stream.Recv() 313 if err == io.EOF { 314 // read done. 315 return nil 316 } 317 if err != nil { 318 // to facilitate testSvrWriteStatusEarlyWrite 319 if status.Code(err) == codes.ResourceExhausted { 320 return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) 321 } 322 return err 323 } 324 cs := in.GetResponseParameters() 325 for _, c := range cs { 326 if us := c.GetIntervalUs(); us > 0 { 327 time.Sleep(time.Duration(us) * time.Microsecond) 328 } 329 330 payload, err := newPayload(in.GetResponseType(), c.GetSize()) 331 if err != nil { 332 return err 333 } 334 335 if err := stream.Send(&testpb.StreamingOutputCallResponse{ 336 Payload: payload, 337 }); err != nil { 338 // to facilitate testSvrWriteStatusEarlyWrite 339 if status.Code(err) == codes.ResourceExhausted { 340 return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) 341 } 342 return err 343 } 344 } 345 } 346} 347 348func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error { 349 var msgBuf []*testpb.StreamingOutputCallRequest 350 for { 351 in, err := stream.Recv() 352 if err == io.EOF { 353 // read done. 354 break 355 } 356 if err != nil { 357 return err 358 } 359 msgBuf = append(msgBuf, in) 360 } 361 for _, m := range msgBuf { 362 cs := m.GetResponseParameters() 363 for _, c := range cs { 364 if us := c.GetIntervalUs(); us > 0 { 365 time.Sleep(time.Duration(us) * time.Microsecond) 366 } 367 368 payload, err := newPayload(m.GetResponseType(), c.GetSize()) 369 if err != nil { 370 return err 371 } 372 373 if err := stream.Send(&testpb.StreamingOutputCallResponse{ 374 Payload: payload, 375 }); err != nil { 376 return err 377 } 378 } 379 } 380 return nil 381} 382 383type env struct { 384 name string 385 network string // The type of network such as tcp, unix, etc. 386 security string // The security protocol such as TLS, SSH, etc. 387 httpHandler bool // whether to use the http.Handler ServerTransport; requires TLS 388 balancer string // One of "round_robin", "pick_first", "v1", or "". 389 customDialer func(string, string, time.Duration) (net.Conn, error) 390} 391 392func (e env) runnable() bool { 393 if runtime.GOOS == "windows" && e.network == "unix" { 394 return false 395 } 396 return true 397} 398 399func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) { 400 if e.customDialer != nil { 401 return e.customDialer(e.network, addr, timeout) 402 } 403 return net.DialTimeout(e.network, addr, timeout) 404} 405 406var ( 407 tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp", balancer: "v1"} 408 tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls", balancer: "v1"} 409 tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"} 410 tcpTLSRREnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"} 411 handlerEnv = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"} 412 noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"} 413 allEnv = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv} 414) 415 416var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.") 417 418func listTestEnv() (envs []env) { 419 if *onlyEnv != "" { 420 for _, e := range allEnv { 421 if e.name == *onlyEnv { 422 if !e.runnable() { 423 panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS)) 424 } 425 return []env{e} 426 } 427 } 428 panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv)) 429 } 430 for _, e := range allEnv { 431 if e.runnable() { 432 envs = append(envs, e) 433 } 434 } 435 return envs 436} 437 438// test is an end-to-end test. It should be created with the newTest 439// func, modified as needed, and then started with its startServer method. 440// It should be cleaned up with the tearDown method. 441type test struct { 442 t *testing.T 443 e env 444 445 ctx context.Context // valid for life of test, before tearDown 446 cancel context.CancelFunc 447 448 // Configurable knobs, after newTest returns: 449 testServer testpb.TestServiceServer // nil means none 450 healthServer *health.Server // nil means disabled 451 maxStream uint32 452 tapHandle tap.ServerInHandle 453 maxMsgSize *int 454 maxClientReceiveMsgSize *int 455 maxClientSendMsgSize *int 456 maxServerReceiveMsgSize *int 457 maxServerSendMsgSize *int 458 maxClientHeaderListSize *uint32 459 maxServerHeaderListSize *uint32 460 userAgent string 461 // clientCompression and serverCompression are set to test the deprecated API 462 // WithCompressor and WithDecompressor. 463 clientCompression bool 464 serverCompression bool 465 // clientUseCompression is set to test the new compressor registration API UseCompressor. 466 clientUseCompression bool 467 // clientNopCompression is set to create a compressor whose type is not supported. 468 clientNopCompression bool 469 unaryClientInt grpc.UnaryClientInterceptor 470 streamClientInt grpc.StreamClientInterceptor 471 unaryServerInt grpc.UnaryServerInterceptor 472 streamServerInt grpc.StreamServerInterceptor 473 unknownHandler grpc.StreamHandler 474 sc <-chan grpc.ServiceConfig 475 customCodec grpc.Codec 476 serverInitialWindowSize int32 477 serverInitialConnWindowSize int32 478 clientInitialWindowSize int32 479 clientInitialConnWindowSize int32 480 perRPCCreds credentials.PerRPCCredentials 481 customDialOptions []grpc.DialOption 482 customServerOptions []grpc.ServerOption 483 resolverScheme string 484 cliKeepAlive *keepalive.ClientParameters 485 svrKeepAlive *keepalive.ServerParameters 486 487 // All test dialing is blocking by default. Set this to true if dial 488 // should be non-blocking. 489 nonBlockingDial bool 490 491 // srv and srvAddr are set once startServer is called. 492 srv stopper 493 srvAddr string 494 495 // srvs and srvAddrs are set once startServers is called. 496 srvs []*grpc.Server 497 srvAddrs []string 498 499 cc *grpc.ClientConn // nil until requested via clientConn 500 restoreLogs func() // nil unless declareLogNoise is used 501} 502 503type stopper interface { 504 Stop() 505 GracefulStop() 506} 507 508func (te *test) tearDown() { 509 if te.cancel != nil { 510 te.cancel() 511 te.cancel = nil 512 } 513 514 if te.cc != nil { 515 te.cc.Close() 516 te.cc = nil 517 } 518 519 if te.restoreLogs != nil { 520 te.restoreLogs() 521 te.restoreLogs = nil 522 } 523 524 if te.srv != nil { 525 te.srv.Stop() 526 } 527 if len(te.srvs) != 0 { 528 for _, s := range te.srvs { 529 s.Stop() 530 } 531 } 532} 533 534// newTest returns a new test using the provided testing.T and 535// environment. It is returned with default values. Tests should 536// modify it before calling its startServer and clientConn methods. 537func newTest(t *testing.T, e env) *test { 538 te := &test{ 539 t: t, 540 e: e, 541 maxStream: math.MaxUint32, 542 } 543 te.ctx, te.cancel = context.WithCancel(context.Background()) 544 return te 545} 546 547func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener { 548 te.testServer = ts 549 te.t.Logf("Running test in %s environment...", te.e.name) 550 sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} 551 if te.maxMsgSize != nil { 552 sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize)) 553 } 554 if te.maxServerReceiveMsgSize != nil { 555 sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize)) 556 } 557 if te.maxServerSendMsgSize != nil { 558 sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize)) 559 } 560 if te.maxServerHeaderListSize != nil { 561 sopts = append(sopts, grpc.MaxHeaderListSize(*te.maxServerHeaderListSize)) 562 } 563 if te.tapHandle != nil { 564 sopts = append(sopts, grpc.InTapHandle(te.tapHandle)) 565 } 566 if te.serverCompression { 567 sopts = append(sopts, 568 grpc.RPCCompressor(grpc.NewGZIPCompressor()), 569 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), 570 ) 571 } 572 if te.unaryServerInt != nil { 573 sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt)) 574 } 575 if te.streamServerInt != nil { 576 sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt)) 577 } 578 if te.unknownHandler != nil { 579 sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler)) 580 } 581 if te.serverInitialWindowSize > 0 { 582 sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize)) 583 } 584 if te.serverInitialConnWindowSize > 0 { 585 sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize)) 586 } 587 la := "localhost:0" 588 switch te.e.network { 589 case "unix": 590 la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano()) 591 syscall.Unlink(la) 592 } 593 lis, err := listen(te.e.network, la) 594 if err != nil { 595 te.t.Fatalf("Failed to listen: %v", err) 596 } 597 switch te.e.security { 598 case "tls": 599 creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key")) 600 if err != nil { 601 te.t.Fatalf("Failed to generate credentials %v", err) 602 } 603 sopts = append(sopts, grpc.Creds(creds)) 604 case "clientTimeoutCreds": 605 sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{})) 606 } 607 if te.customCodec != nil { 608 sopts = append(sopts, grpc.CustomCodec(te.customCodec)) 609 } 610 if te.svrKeepAlive != nil { 611 sopts = append(sopts, grpc.KeepaliveParams(*te.svrKeepAlive)) 612 } 613 sopts = append(sopts, te.customServerOptions...) 614 s := grpc.NewServer(sopts...) 615 te.srv = s 616 if te.healthServer != nil { 617 healthgrpc.RegisterHealthServer(s, te.healthServer) 618 } 619 if te.testServer != nil { 620 testpb.RegisterTestServiceServer(s, te.testServer) 621 } 622 addr := la 623 switch te.e.network { 624 case "unix": 625 default: 626 _, port, err := net.SplitHostPort(lis.Addr().String()) 627 if err != nil { 628 te.t.Fatalf("Failed to parse listener address: %v", err) 629 } 630 addr = "localhost:" + port 631 } 632 633 te.srvAddr = addr 634 635 if te.e.httpHandler { 636 if te.e.security != "tls" { 637 te.t.Fatalf("unsupported environment settings") 638 } 639 cert, err := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key")) 640 if err != nil { 641 te.t.Fatal("Error creating TLS certificate: ", err) 642 } 643 hs := &http.Server{ 644 Handler: s, 645 } 646 err = http2.ConfigureServer(hs, &http2.Server{ 647 MaxConcurrentStreams: te.maxStream, 648 }) 649 if err != nil { 650 te.t.Fatal("error starting http2 server: ", err) 651 } 652 hs.TLSConfig.Certificates = []tls.Certificate{cert} 653 tlsListener := tls.NewListener(lis, hs.TLSConfig) 654 whs := &wrapHS{Listener: tlsListener, s: hs, conns: make(map[net.Conn]bool)} 655 te.srv = whs 656 go hs.Serve(whs) 657 658 return lis 659 } 660 661 go s.Serve(lis) 662 return lis 663} 664 665// TODO: delete wrapHS and wrapConn when Go1.6 and Go1.7 support are gone and 666// call s.Close and s.Shutdown instead. 667type wrapHS struct { 668 sync.Mutex 669 net.Listener 670 s *http.Server 671 conns map[net.Conn]bool 672} 673 674func (w *wrapHS) Accept() (net.Conn, error) { 675 c, err := w.Listener.Accept() 676 if err != nil { 677 return nil, err 678 } 679 w.Lock() 680 if w.conns == nil { 681 w.Unlock() 682 c.Close() 683 return nil, errors.New("connection after listener closed") 684 } 685 w.conns[&wrapConn{Conn: c, hs: w}] = true 686 w.Unlock() 687 return c, nil 688} 689 690func (w *wrapHS) Stop() { 691 w.Listener.Close() 692 w.Lock() 693 conns := w.conns 694 w.conns = nil 695 w.Unlock() 696 for c := range conns { 697 c.Close() 698 } 699} 700 701// Poll for now.. 702func (w *wrapHS) GracefulStop() { 703 w.Listener.Close() 704 for { 705 w.Lock() 706 l := len(w.conns) 707 w.Unlock() 708 if l == 0 { 709 return 710 } 711 time.Sleep(50 * time.Millisecond) 712 } 713} 714 715type wrapConn struct { 716 net.Conn 717 hs *wrapHS 718} 719 720func (w *wrapConn) Close() error { 721 w.hs.Lock() 722 delete(w.hs.conns, w.Conn) 723 w.hs.Unlock() 724 return w.Conn.Close() 725} 726 727func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listenerWrapper { 728 l := te.listenAndServe(ts, listenWithConnControl) 729 return l.(*listenerWrapper) 730} 731 732// startServer starts a gRPC server listening. Callers should defer a 733// call to te.tearDown to clean up. 734func (te *test) startServer(ts testpb.TestServiceServer) { 735 te.listenAndServe(ts, net.Listen) 736} 737 738type nopCompressor struct { 739 grpc.Compressor 740} 741 742// NewNopCompressor creates a compressor to test the case that type is not supported. 743func NewNopCompressor() grpc.Compressor { 744 return &nopCompressor{grpc.NewGZIPCompressor()} 745} 746 747func (c *nopCompressor) Type() string { 748 return "nop" 749} 750 751type nopDecompressor struct { 752 grpc.Decompressor 753} 754 755// NewNopDecompressor creates a decompressor to test the case that type is not supported. 756func NewNopDecompressor() grpc.Decompressor { 757 return &nopDecompressor{grpc.NewGZIPDecompressor()} 758} 759 760func (d *nopDecompressor) Type() string { 761 return "nop" 762} 763 764func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) { 765 opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent)) 766 767 if te.sc != nil { 768 opts = append(opts, grpc.WithServiceConfig(te.sc)) 769 } 770 771 if te.clientCompression { 772 opts = append(opts, 773 grpc.WithCompressor(grpc.NewGZIPCompressor()), 774 grpc.WithDecompressor(grpc.NewGZIPDecompressor()), 775 ) 776 } 777 if te.clientUseCompression { 778 opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) 779 } 780 if te.clientNopCompression { 781 opts = append(opts, 782 grpc.WithCompressor(NewNopCompressor()), 783 grpc.WithDecompressor(NewNopDecompressor()), 784 ) 785 } 786 if te.unaryClientInt != nil { 787 opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt)) 788 } 789 if te.streamClientInt != nil { 790 opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt)) 791 } 792 if te.maxMsgSize != nil { 793 opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize)) 794 } 795 if te.maxClientReceiveMsgSize != nil { 796 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize))) 797 } 798 if te.maxClientSendMsgSize != nil { 799 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize))) 800 } 801 if te.maxClientHeaderListSize != nil { 802 opts = append(opts, grpc.WithMaxHeaderListSize(*te.maxClientHeaderListSize)) 803 } 804 switch te.e.security { 805 case "tls": 806 creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com") 807 if err != nil { 808 te.t.Fatalf("Failed to load credentials: %v", err) 809 } 810 opts = append(opts, grpc.WithTransportCredentials(creds)) 811 case "clientTimeoutCreds": 812 opts = append(opts, grpc.WithTransportCredentials(&clientTimeoutCreds{})) 813 case "empty": 814 // Don't add any transport creds option. 815 default: 816 opts = append(opts, grpc.WithInsecure()) 817 } 818 // TODO(bar) switch balancer case "pick_first". 819 var scheme string 820 if te.resolverScheme == "" { 821 scheme = "passthrough:///" 822 } else { 823 scheme = te.resolverScheme + ":///" 824 } 825 switch te.e.balancer { 826 case "v1": 827 opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil))) 828 case "round_robin": 829 opts = append(opts, grpc.WithBalancerName(roundrobin.Name)) 830 } 831 if te.clientInitialWindowSize > 0 { 832 opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) 833 } 834 if te.clientInitialConnWindowSize > 0 { 835 opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize)) 836 } 837 if te.perRPCCreds != nil { 838 opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds)) 839 } 840 if te.customCodec != nil { 841 opts = append(opts, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(te.customCodec))) 842 } 843 if !te.nonBlockingDial && te.srvAddr != "" { 844 // Only do a blocking dial if server is up. 845 opts = append(opts, grpc.WithBlock()) 846 } 847 if te.srvAddr == "" { 848 te.srvAddr = "client.side.only.test" 849 } 850 if te.cliKeepAlive != nil { 851 opts = append(opts, grpc.WithKeepaliveParams(*te.cliKeepAlive)) 852 } 853 opts = append(opts, te.customDialOptions...) 854 return opts, scheme 855} 856 857func (te *test) clientConnWithConnControl() (*grpc.ClientConn, *dialerWrapper) { 858 if te.cc != nil { 859 return te.cc, nil 860 } 861 opts, scheme := te.configDial() 862 dw := &dialerWrapper{} 863 // overwrite the dialer before 864 opts = append(opts, grpc.WithDialer(dw.dialer)) 865 var err error 866 te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...) 867 if err != nil { 868 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err) 869 } 870 return te.cc, dw 871} 872 873func (te *test) clientConn(opts ...grpc.DialOption) *grpc.ClientConn { 874 if te.cc != nil { 875 return te.cc 876 } 877 var scheme string 878 opts, scheme = te.configDial(opts...) 879 var err error 880 te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...) 881 if err != nil { 882 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err) 883 } 884 return te.cc 885} 886 887func (te *test) declareLogNoise(phrases ...string) { 888 te.restoreLogs = declareLogNoise(te.t, phrases...) 889} 890 891func (te *test) withServerTester(fn func(st *serverTester)) { 892 c, err := te.e.dialer(te.srvAddr, 10*time.Second) 893 if err != nil { 894 te.t.Fatal(err) 895 } 896 defer c.Close() 897 if te.e.security == "tls" { 898 c = tls.Client(c, &tls.Config{ 899 InsecureSkipVerify: true, 900 NextProtos: []string{http2.NextProtoTLS}, 901 }) 902 } 903 st := newServerTesterFromConn(te.t, c) 904 st.greet() 905 fn(st) 906} 907 908type lazyConn struct { 909 net.Conn 910 beLazy int32 911} 912 913func (l *lazyConn) Write(b []byte) (int, error) { 914 if atomic.LoadInt32(&(l.beLazy)) == 1 { 915 time.Sleep(time.Second) 916 } 917 return l.Conn.Write(b) 918} 919 920func TestContextDeadlineNotIgnored(t *testing.T) { 921 defer leakcheck.Check(t) 922 e := noBalancerEnv 923 var lc *lazyConn 924 e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) { 925 conn, err := net.DialTimeout(network, addr, timeout) 926 if err != nil { 927 return nil, err 928 } 929 lc = &lazyConn{Conn: conn} 930 return lc, nil 931 } 932 933 te := newTest(t, e) 934 te.startServer(&testServer{security: e.security}) 935 defer te.tearDown() 936 937 cc := te.clientConn() 938 tc := testpb.NewTestServiceClient(cc) 939 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 940 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 941 } 942 atomic.StoreInt32(&(lc.beLazy), 1) 943 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 944 defer cancel() 945 t1 := time.Now() 946 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 947 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err) 948 } 949 if time.Since(t1) > 2*time.Second { 950 t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline") 951 } 952} 953 954func TestTimeoutOnDeadServer(t *testing.T) { 955 defer leakcheck.Check(t) 956 for _, e := range listTestEnv() { 957 testTimeoutOnDeadServer(t, e) 958 } 959} 960 961func testTimeoutOnDeadServer(t *testing.T, e env) { 962 te := newTest(t, e) 963 te.customDialOptions = []grpc.DialOption{grpc.WithWaitForHandshake()} 964 te.userAgent = testAppUA 965 te.declareLogNoise( 966 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 967 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 968 "grpc: addrConn.resetTransport failed to create client transport: connection error", 969 ) 970 te.startServer(&testServer{security: e.security}) 971 defer te.tearDown() 972 973 cc := te.clientConn() 974 tc := testpb.NewTestServiceClient(cc) 975 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { 976 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 977 } 978 te.srv.Stop() 979 980 // Wait for the client to notice the connection is gone. 981 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) 982 state := cc.GetState() 983 for ; state == connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { 984 } 985 cancel() 986 if state == connectivity.Ready { 987 t.Fatalf("Timed out waiting for non-ready state") 988 } 989 ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond) 990 _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)) 991 cancel() 992 if e.balancer != "" && status.Code(err) != codes.DeadlineExceeded { 993 // If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error, 994 // the error will be an internal error. 995 t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded) 996 } 997 awaitNewConnLogOutput() 998} 999 1000func TestServerGracefulStopIdempotent(t *testing.T) { 1001 defer leakcheck.Check(t) 1002 for _, e := range listTestEnv() { 1003 if e.name == "handler-tls" { 1004 continue 1005 } 1006 testServerGracefulStopIdempotent(t, e) 1007 } 1008} 1009 1010func testServerGracefulStopIdempotent(t *testing.T, e env) { 1011 te := newTest(t, e) 1012 te.userAgent = testAppUA 1013 te.startServer(&testServer{security: e.security}) 1014 defer te.tearDown() 1015 1016 for i := 0; i < 3; i++ { 1017 te.srv.GracefulStop() 1018 } 1019} 1020 1021func TestServerGoAway(t *testing.T) { 1022 defer leakcheck.Check(t) 1023 for _, e := range listTestEnv() { 1024 if e.name == "handler-tls" { 1025 continue 1026 } 1027 testServerGoAway(t, e) 1028 } 1029} 1030 1031func testServerGoAway(t *testing.T, e env) { 1032 te := newTest(t, e) 1033 te.userAgent = testAppUA 1034 te.startServer(&testServer{security: e.security}) 1035 defer te.tearDown() 1036 1037 cc := te.clientConn() 1038 tc := testpb.NewTestServiceClient(cc) 1039 // Finish an RPC to make sure the connection is good. 1040 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { 1041 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1042 } 1043 ch := make(chan struct{}) 1044 go func() { 1045 te.srv.GracefulStop() 1046 close(ch) 1047 }() 1048 // Loop until the server side GoAway signal is propagated to the client. 1049 for { 1050 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1051 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded { 1052 cancel() 1053 break 1054 } 1055 cancel() 1056 } 1057 // A new RPC should fail. 1058 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal { 1059 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal) 1060 } 1061 <-ch 1062 awaitNewConnLogOutput() 1063} 1064 1065func TestServerGoAwayPendingRPC(t *testing.T) { 1066 defer leakcheck.Check(t) 1067 for _, e := range listTestEnv() { 1068 if e.name == "handler-tls" { 1069 continue 1070 } 1071 testServerGoAwayPendingRPC(t, e) 1072 } 1073} 1074 1075func testServerGoAwayPendingRPC(t *testing.T, e env) { 1076 te := newTest(t, e) 1077 te.userAgent = testAppUA 1078 te.declareLogNoise( 1079 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1080 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1081 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1082 ) 1083 te.startServer(&testServer{security: e.security}) 1084 defer te.tearDown() 1085 1086 cc := te.clientConn() 1087 tc := testpb.NewTestServiceClient(cc) 1088 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1089 stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) 1090 if err != nil { 1091 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1092 } 1093 // Finish an RPC to make sure the connection is good. 1094 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { 1095 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1096 } 1097 ch := make(chan struct{}) 1098 go func() { 1099 te.srv.GracefulStop() 1100 close(ch) 1101 }() 1102 // Loop until the server side GoAway signal is propagated to the client. 1103 start := time.Now() 1104 errored := false 1105 for time.Since(start) < time.Second { 1106 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1107 _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)) 1108 cancel() 1109 if err != nil { 1110 errored = true 1111 break 1112 } 1113 } 1114 if !errored { 1115 t.Fatalf("GoAway never received by client") 1116 } 1117 respParam := []*testpb.ResponseParameters{{Size: 1}} 1118 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) 1119 if err != nil { 1120 t.Fatal(err) 1121 } 1122 req := &testpb.StreamingOutputCallRequest{ 1123 ResponseType: testpb.PayloadType_COMPRESSABLE, 1124 ResponseParameters: respParam, 1125 Payload: payload, 1126 } 1127 // The existing RPC should be still good to proceed. 1128 if err := stream.Send(req); err != nil { 1129 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err) 1130 } 1131 if _, err := stream.Recv(); err != nil { 1132 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) 1133 } 1134 // The RPC will run until canceled. 1135 cancel() 1136 <-ch 1137 awaitNewConnLogOutput() 1138} 1139 1140func TestServerMultipleGoAwayPendingRPC(t *testing.T) { 1141 defer leakcheck.Check(t) 1142 for _, e := range listTestEnv() { 1143 if e.name == "handler-tls" { 1144 continue 1145 } 1146 testServerMultipleGoAwayPendingRPC(t, e) 1147 } 1148} 1149 1150func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) { 1151 te := newTest(t, e) 1152 te.userAgent = testAppUA 1153 te.declareLogNoise( 1154 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1155 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1156 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1157 ) 1158 te.startServer(&testServer{security: e.security}) 1159 defer te.tearDown() 1160 1161 cc := te.clientConn() 1162 tc := testpb.NewTestServiceClient(cc) 1163 ctx, cancel := context.WithCancel(context.Background()) 1164 stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) 1165 if err != nil { 1166 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1167 } 1168 // Finish an RPC to make sure the connection is good. 1169 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { 1170 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1171 } 1172 ch1 := make(chan struct{}) 1173 go func() { 1174 te.srv.GracefulStop() 1175 close(ch1) 1176 }() 1177 ch2 := make(chan struct{}) 1178 go func() { 1179 te.srv.GracefulStop() 1180 close(ch2) 1181 }() 1182 // Loop until the server side GoAway signal is propagated to the client. 1183 for { 1184 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1185 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil { 1186 cancel() 1187 break 1188 } 1189 cancel() 1190 } 1191 select { 1192 case <-ch1: 1193 t.Fatal("GracefulStop() terminated early") 1194 case <-ch2: 1195 t.Fatal("GracefulStop() terminated early") 1196 default: 1197 } 1198 respParam := []*testpb.ResponseParameters{ 1199 { 1200 Size: 1, 1201 }, 1202 } 1203 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) 1204 if err != nil { 1205 t.Fatal(err) 1206 } 1207 req := &testpb.StreamingOutputCallRequest{ 1208 ResponseType: testpb.PayloadType_COMPRESSABLE, 1209 ResponseParameters: respParam, 1210 Payload: payload, 1211 } 1212 // The existing RPC should be still good to proceed. 1213 if err := stream.Send(req); err != nil { 1214 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 1215 } 1216 if _, err := stream.Recv(); err != nil { 1217 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) 1218 } 1219 if err := stream.CloseSend(); err != nil { 1220 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err) 1221 } 1222 <-ch1 1223 <-ch2 1224 cancel() 1225 awaitNewConnLogOutput() 1226} 1227 1228func TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) { 1229 defer leakcheck.Check(t) 1230 for _, e := range listTestEnv() { 1231 if e.name == "handler-tls" { 1232 continue 1233 } 1234 testConcurrentClientConnCloseAndServerGoAway(t, e) 1235 } 1236} 1237 1238func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { 1239 te := newTest(t, e) 1240 te.userAgent = testAppUA 1241 te.declareLogNoise( 1242 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1243 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1244 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1245 ) 1246 te.startServer(&testServer{security: e.security}) 1247 defer te.tearDown() 1248 1249 cc := te.clientConn() 1250 tc := testpb.NewTestServiceClient(cc) 1251 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { 1252 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1253 } 1254 ch := make(chan struct{}) 1255 // Close ClientConn and Server concurrently. 1256 go func() { 1257 te.srv.GracefulStop() 1258 close(ch) 1259 }() 1260 go func() { 1261 cc.Close() 1262 }() 1263 <-ch 1264} 1265 1266func TestConcurrentServerStopAndGoAway(t *testing.T) { 1267 defer leakcheck.Check(t) 1268 for _, e := range listTestEnv() { 1269 if e.name == "handler-tls" { 1270 continue 1271 } 1272 testConcurrentServerStopAndGoAway(t, e) 1273 } 1274} 1275 1276func testConcurrentServerStopAndGoAway(t *testing.T, e env) { 1277 te := newTest(t, e) 1278 te.userAgent = testAppUA 1279 te.declareLogNoise( 1280 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1281 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1282 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1283 ) 1284 te.startServer(&testServer{security: e.security}) 1285 defer te.tearDown() 1286 1287 cc := te.clientConn() 1288 tc := testpb.NewTestServiceClient(cc) 1289 stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)) 1290 if err != nil { 1291 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1292 } 1293 // Finish an RPC to make sure the connection is good. 1294 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { 1295 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1296 } 1297 ch := make(chan struct{}) 1298 go func() { 1299 te.srv.GracefulStop() 1300 close(ch) 1301 }() 1302 // Loop until the server side GoAway signal is propagated to the client. 1303 for { 1304 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1305 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil { 1306 cancel() 1307 break 1308 } 1309 cancel() 1310 } 1311 // Stop the server and close all the connections. 1312 te.srv.Stop() 1313 respParam := []*testpb.ResponseParameters{ 1314 { 1315 Size: 1, 1316 }, 1317 } 1318 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) 1319 if err != nil { 1320 t.Fatal(err) 1321 } 1322 req := &testpb.StreamingOutputCallRequest{ 1323 ResponseType: testpb.PayloadType_COMPRESSABLE, 1324 ResponseParameters: respParam, 1325 Payload: payload, 1326 } 1327 sendStart := time.Now() 1328 for { 1329 if err := stream.Send(req); err == io.EOF { 1330 // stream.Send should eventually send io.EOF 1331 break 1332 } else if err != nil { 1333 // Send should never return a transport-level error. 1334 t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err) 1335 } 1336 if time.Since(sendStart) > 2*time.Second { 1337 t.Fatalf("stream.Send(_) did not return io.EOF after 2s") 1338 } 1339 time.Sleep(time.Millisecond) 1340 } 1341 if _, err := stream.Recv(); err == nil || err == io.EOF { 1342 t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err) 1343 } 1344 <-ch 1345 awaitNewConnLogOutput() 1346} 1347 1348func TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) { 1349 defer leakcheck.Check(t) 1350 for _, e := range listTestEnv() { 1351 if e.name == "handler-tls" { 1352 continue 1353 } 1354 testClientConnCloseAfterGoAwayWithActiveStream(t, e) 1355 } 1356} 1357 1358func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) { 1359 te := newTest(t, e) 1360 te.startServer(&testServer{security: e.security}) 1361 defer te.tearDown() 1362 cc := te.clientConn() 1363 tc := testpb.NewTestServiceClient(cc) 1364 1365 ctx, cancel := context.WithCancel(context.Background()) 1366 defer cancel() 1367 if _, err := tc.FullDuplexCall(ctx); err != nil { 1368 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) 1369 } 1370 done := make(chan struct{}) 1371 go func() { 1372 te.srv.GracefulStop() 1373 close(done) 1374 }() 1375 time.Sleep(50 * time.Millisecond) 1376 cc.Close() 1377 timeout := time.NewTimer(time.Second) 1378 select { 1379 case <-done: 1380 case <-timeout.C: 1381 t.Fatalf("Test timed-out.") 1382 } 1383} 1384 1385func TestFailFast(t *testing.T) { 1386 defer leakcheck.Check(t) 1387 for _, e := range listTestEnv() { 1388 testFailFast(t, e) 1389 } 1390} 1391 1392func testFailFast(t *testing.T, e env) { 1393 te := newTest(t, e) 1394 te.userAgent = testAppUA 1395 te.declareLogNoise( 1396 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1397 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1398 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1399 ) 1400 te.startServer(&testServer{security: e.security}) 1401 defer te.tearDown() 1402 1403 cc := te.clientConn() 1404 tc := testpb.NewTestServiceClient(cc) 1405 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 1406 defer cancel() 1407 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 1408 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1409 } 1410 // Stop the server and tear down all the existing connections. 1411 te.srv.Stop() 1412 // Loop until the server teardown is propagated to the client. 1413 for { 1414 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 1415 _, err := tc.EmptyCall(ctx, &testpb.Empty{}) 1416 cancel() 1417 if status.Code(err) == codes.Unavailable { 1418 break 1419 } 1420 t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err) 1421 time.Sleep(10 * time.Millisecond) 1422 } 1423 // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. 1424 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { 1425 t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable) 1426 } 1427 if _, err := tc.StreamingInputCall(context.Background()); status.Code(err) != codes.Unavailable { 1428 t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable) 1429 } 1430 1431 awaitNewConnLogOutput() 1432} 1433 1434func testServiceConfigSetup(t *testing.T, e env) *test { 1435 te := newTest(t, e) 1436 te.userAgent = testAppUA 1437 te.declareLogNoise( 1438 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1439 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1440 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1441 "Failed to dial : context canceled; please retry.", 1442 ) 1443 return te 1444} 1445 1446func newBool(b bool) (a *bool) { 1447 return &b 1448} 1449 1450func newInt(b int) (a *int) { 1451 return &b 1452} 1453 1454func newDuration(b time.Duration) (a *time.Duration) { 1455 a = new(time.Duration) 1456 *a = b 1457 return 1458} 1459 1460func TestGetMethodConfig(t *testing.T) { 1461 te := testServiceConfigSetup(t, tcpClearRREnv) 1462 defer te.tearDown() 1463 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1464 defer rcleanup() 1465 1466 te.resolverScheme = r.Scheme() 1467 cc := te.clientConn() 1468 r.NewAddress([]resolver.Address{{Addr: te.srvAddr}}) 1469 r.NewServiceConfig(`{ 1470 "methodConfig": [ 1471 { 1472 "name": [ 1473 { 1474 "service": "grpc.testing.TestService", 1475 "method": "EmptyCall" 1476 } 1477 ], 1478 "waitForReady": true, 1479 "timeout": ".001s" 1480 }, 1481 { 1482 "name": [ 1483 { 1484 "service": "grpc.testing.TestService" 1485 } 1486 ], 1487 "waitForReady": false 1488 } 1489 ] 1490}`) 1491 1492 tc := testpb.NewTestServiceClient(cc) 1493 1494 // Make sure service config has been processed by grpc. 1495 for { 1496 if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { 1497 break 1498 } 1499 time.Sleep(time.Millisecond) 1500 } 1501 1502 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 1503 var err error 1504 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 1505 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1506 } 1507 1508 r.NewServiceConfig(`{ 1509 "methodConfig": [ 1510 { 1511 "name": [ 1512 { 1513 "service": "grpc.testing.TestService", 1514 "method": "UnaryCall" 1515 } 1516 ], 1517 "waitForReady": true, 1518 "timeout": ".001s" 1519 }, 1520 { 1521 "name": [ 1522 { 1523 "service": "grpc.testing.TestService" 1524 } 1525 ], 1526 "waitForReady": false 1527 } 1528 ] 1529}`) 1530 1531 // Make sure service config has been processed by grpc. 1532 for { 1533 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady { 1534 break 1535 } 1536 time.Sleep(time.Millisecond) 1537 } 1538 // The following RPCs are expected to become fail-fast. 1539 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { 1540 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) 1541 } 1542} 1543 1544func TestServiceConfigWaitForReady(t *testing.T) { 1545 te := testServiceConfigSetup(t, tcpClearRREnv) 1546 defer te.tearDown() 1547 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1548 defer rcleanup() 1549 1550 // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. 1551 te.resolverScheme = r.Scheme() 1552 cc := te.clientConn() 1553 r.NewAddress([]resolver.Address{{Addr: te.srvAddr}}) 1554 r.NewServiceConfig(`{ 1555 "methodConfig": [ 1556 { 1557 "name": [ 1558 { 1559 "service": "grpc.testing.TestService", 1560 "method": "EmptyCall" 1561 }, 1562 { 1563 "service": "grpc.testing.TestService", 1564 "method": "FullDuplexCall" 1565 } 1566 ], 1567 "waitForReady": false, 1568 "timeout": ".001s" 1569 } 1570 ] 1571}`) 1572 1573 tc := testpb.NewTestServiceClient(cc) 1574 1575 // Make sure service config has been processed by grpc. 1576 for { 1577 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil { 1578 break 1579 } 1580 time.Sleep(time.Millisecond) 1581 } 1582 1583 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 1584 var err error 1585 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 1586 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1587 } 1588 if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 1589 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1590 } 1591 1592 // Generate a service config update. 1593 // Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. 1594 r.NewServiceConfig(`{ 1595 "methodConfig": [ 1596 { 1597 "name": [ 1598 { 1599 "service": "grpc.testing.TestService", 1600 "method": "EmptyCall" 1601 }, 1602 { 1603 "service": "grpc.testing.TestService", 1604 "method": "FullDuplexCall" 1605 } 1606 ], 1607 "waitForReady": true, 1608 "timeout": ".001s" 1609 } 1610 ] 1611}`) 1612 1613 // Wait for the new service config to take effect. 1614 for { 1615 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady { 1616 break 1617 } 1618 time.Sleep(time.Millisecond) 1619 } 1620 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 1621 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 1622 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1623 } 1624 if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded { 1625 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1626 } 1627} 1628 1629func TestServiceConfigTimeout(t *testing.T) { 1630 te := testServiceConfigSetup(t, tcpClearRREnv) 1631 defer te.tearDown() 1632 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1633 defer rcleanup() 1634 1635 // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 1636 te.resolverScheme = r.Scheme() 1637 cc := te.clientConn() 1638 r.NewAddress([]resolver.Address{{Addr: te.srvAddr}}) 1639 r.NewServiceConfig(`{ 1640 "methodConfig": [ 1641 { 1642 "name": [ 1643 { 1644 "service": "grpc.testing.TestService", 1645 "method": "EmptyCall" 1646 }, 1647 { 1648 "service": "grpc.testing.TestService", 1649 "method": "FullDuplexCall" 1650 } 1651 ], 1652 "waitForReady": true, 1653 "timeout": "3600s" 1654 } 1655 ] 1656}`) 1657 1658 tc := testpb.NewTestServiceClient(cc) 1659 1660 // Make sure service config has been processed by grpc. 1661 for { 1662 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { 1663 break 1664 } 1665 time.Sleep(time.Millisecond) 1666 } 1667 1668 // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. 1669 var err error 1670 ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) 1671 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 1672 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1673 } 1674 cancel() 1675 1676 ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) 1677 if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 1678 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1679 } 1680 cancel() 1681 1682 // Generate a service config update. 1683 // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 1684 r.NewServiceConfig(`{ 1685 "methodConfig": [ 1686 { 1687 "name": [ 1688 { 1689 "service": "grpc.testing.TestService", 1690 "method": "EmptyCall" 1691 }, 1692 { 1693 "service": "grpc.testing.TestService", 1694 "method": "FullDuplexCall" 1695 } 1696 ], 1697 "waitForReady": true, 1698 "timeout": ".000000001s" 1699 } 1700 ] 1701}`) 1702 1703 // Wait for the new service config to take effect. 1704 for { 1705 if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond { 1706 break 1707 } 1708 time.Sleep(time.Millisecond) 1709 } 1710 1711 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 1712 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 1713 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1714 } 1715 cancel() 1716 1717 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 1718 if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 1719 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1720 } 1721 cancel() 1722} 1723 1724func TestServiceConfigMaxMsgSize(t *testing.T) { 1725 e := tcpClearRREnv 1726 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1727 defer rcleanup() 1728 1729 // Setting up values and objects shared across all test cases. 1730 const smallSize = 1 1731 const largeSize = 1024 1732 const extraLargeSize = 2048 1733 1734 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 1735 if err != nil { 1736 t.Fatal(err) 1737 } 1738 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 1739 if err != nil { 1740 t.Fatal(err) 1741 } 1742 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) 1743 if err != nil { 1744 t.Fatal(err) 1745 } 1746 1747 scjs := `{ 1748 "methodConfig": [ 1749 { 1750 "name": [ 1751 { 1752 "service": "grpc.testing.TestService", 1753 "method": "UnaryCall" 1754 }, 1755 { 1756 "service": "grpc.testing.TestService", 1757 "method": "FullDuplexCall" 1758 } 1759 ], 1760 "maxRequestMessageBytes": 2048, 1761 "maxResponseMessageBytes": 2048 1762 } 1763 ] 1764}` 1765 1766 // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 1767 te1 := testServiceConfigSetup(t, e) 1768 defer te1.tearDown() 1769 1770 te1.resolverScheme = r.Scheme() 1771 te1.nonBlockingDial = true 1772 te1.startServer(&testServer{security: e.security}) 1773 cc1 := te1.clientConn() 1774 1775 r.NewAddress([]resolver.Address{{Addr: te1.srvAddr}}) 1776 r.NewServiceConfig(scjs) 1777 tc := testpb.NewTestServiceClient(cc1) 1778 1779 req := &testpb.SimpleRequest{ 1780 ResponseType: testpb.PayloadType_COMPRESSABLE, 1781 ResponseSize: int32(extraLargeSize), 1782 Payload: smallPayload, 1783 } 1784 1785 for { 1786 if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { 1787 break 1788 } 1789 time.Sleep(time.Millisecond) 1790 } 1791 1792 // Test for unary RPC recv. 1793 if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || status.Code(err) != codes.ResourceExhausted { 1794 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1795 } 1796 1797 // Test for unary RPC send. 1798 req.Payload = extraLargePayload 1799 req.ResponseSize = int32(smallSize) 1800 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1801 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1802 } 1803 1804 // Test for streaming RPC recv. 1805 respParam := []*testpb.ResponseParameters{ 1806 { 1807 Size: int32(extraLargeSize), 1808 }, 1809 } 1810 sreq := &testpb.StreamingOutputCallRequest{ 1811 ResponseType: testpb.PayloadType_COMPRESSABLE, 1812 ResponseParameters: respParam, 1813 Payload: smallPayload, 1814 } 1815 stream, err := tc.FullDuplexCall(te1.ctx) 1816 if err != nil { 1817 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1818 } 1819 if err = stream.Send(sreq); err != nil { 1820 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1821 } 1822 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 1823 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 1824 } 1825 1826 // Test for streaming RPC send. 1827 respParam[0].Size = int32(smallSize) 1828 sreq.Payload = extraLargePayload 1829 stream, err = tc.FullDuplexCall(te1.ctx) 1830 if err != nil { 1831 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1832 } 1833 if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 1834 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 1835 } 1836 1837 // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 1838 te2 := testServiceConfigSetup(t, e) 1839 te2.resolverScheme = r.Scheme() 1840 te2.nonBlockingDial = true 1841 te2.maxClientReceiveMsgSize = newInt(1024) 1842 te2.maxClientSendMsgSize = newInt(1024) 1843 1844 te2.startServer(&testServer{security: e.security}) 1845 defer te2.tearDown() 1846 cc2 := te2.clientConn() 1847 r.NewAddress([]resolver.Address{{Addr: te2.srvAddr}}) 1848 r.NewServiceConfig(scjs) 1849 tc = testpb.NewTestServiceClient(cc2) 1850 1851 for { 1852 if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { 1853 break 1854 } 1855 time.Sleep(time.Millisecond) 1856 } 1857 1858 // Test for unary RPC recv. 1859 req.Payload = smallPayload 1860 req.ResponseSize = int32(largeSize) 1861 1862 if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || status.Code(err) != codes.ResourceExhausted { 1863 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1864 } 1865 1866 // Test for unary RPC send. 1867 req.Payload = largePayload 1868 req.ResponseSize = int32(smallSize) 1869 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1870 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1871 } 1872 1873 // Test for streaming RPC recv. 1874 stream, err = tc.FullDuplexCall(te2.ctx) 1875 respParam[0].Size = int32(largeSize) 1876 sreq.Payload = smallPayload 1877 if err != nil { 1878 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1879 } 1880 if err = stream.Send(sreq); err != nil { 1881 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1882 } 1883 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 1884 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 1885 } 1886 1887 // Test for streaming RPC send. 1888 respParam[0].Size = int32(smallSize) 1889 sreq.Payload = largePayload 1890 stream, err = tc.FullDuplexCall(te2.ctx) 1891 if err != nil { 1892 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1893 } 1894 if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 1895 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 1896 } 1897 1898 // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 1899 te3 := testServiceConfigSetup(t, e) 1900 te3.resolverScheme = r.Scheme() 1901 te3.nonBlockingDial = true 1902 te3.maxClientReceiveMsgSize = newInt(4096) 1903 te3.maxClientSendMsgSize = newInt(4096) 1904 1905 te3.startServer(&testServer{security: e.security}) 1906 defer te3.tearDown() 1907 1908 cc3 := te3.clientConn() 1909 r.NewAddress([]resolver.Address{{Addr: te3.srvAddr}}) 1910 r.NewServiceConfig(scjs) 1911 tc = testpb.NewTestServiceClient(cc3) 1912 1913 for { 1914 if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { 1915 break 1916 } 1917 time.Sleep(time.Millisecond) 1918 } 1919 1920 // Test for unary RPC recv. 1921 req.Payload = smallPayload 1922 req.ResponseSize = int32(largeSize) 1923 1924 if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err != nil { 1925 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 1926 } 1927 1928 req.ResponseSize = int32(extraLargeSize) 1929 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1930 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1931 } 1932 1933 // Test for unary RPC send. 1934 req.Payload = largePayload 1935 req.ResponseSize = int32(smallSize) 1936 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 1937 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 1938 } 1939 1940 req.Payload = extraLargePayload 1941 if _, err = tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1942 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1943 } 1944 1945 // Test for streaming RPC recv. 1946 stream, err = tc.FullDuplexCall(te3.ctx) 1947 if err != nil { 1948 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1949 } 1950 respParam[0].Size = int32(largeSize) 1951 sreq.Payload = smallPayload 1952 1953 if err = stream.Send(sreq); err != nil { 1954 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1955 } 1956 if _, err = stream.Recv(); err != nil { 1957 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err) 1958 } 1959 1960 respParam[0].Size = int32(extraLargeSize) 1961 1962 if err = stream.Send(sreq); err != nil { 1963 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1964 } 1965 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 1966 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 1967 } 1968 1969 // Test for streaming RPC send. 1970 respParam[0].Size = int32(smallSize) 1971 sreq.Payload = largePayload 1972 stream, err = tc.FullDuplexCall(te3.ctx) 1973 if err != nil { 1974 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1975 } 1976 if err := stream.Send(sreq); err != nil { 1977 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1978 } 1979 sreq.Payload = extraLargePayload 1980 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 1981 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 1982 } 1983} 1984 1985// Reading from a streaming RPC may fail with context canceled if timeout was 1986// set by service config (https://github.com/grpc/grpc-go/issues/1818). This 1987// test makes sure read from streaming RPC doesn't fail in this case. 1988func TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) { 1989 te := testServiceConfigSetup(t, tcpClearRREnv) 1990 te.startServer(&testServer{security: tcpClearRREnv.security}) 1991 defer te.tearDown() 1992 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1993 defer rcleanup() 1994 1995 te.resolverScheme = r.Scheme() 1996 te.nonBlockingDial = true 1997 cc := te.clientConn() 1998 tc := testpb.NewTestServiceClient(cc) 1999 2000 r.NewAddress([]resolver.Address{{Addr: te.srvAddr}}) 2001 r.NewServiceConfig(`{ 2002 "methodConfig": [ 2003 { 2004 "name": [ 2005 { 2006 "service": "grpc.testing.TestService", 2007 "method": "FullDuplexCall" 2008 } 2009 ], 2010 "waitForReady": true, 2011 "timeout": "10s" 2012 } 2013 ] 2014 }`) 2015 // Make sure service config has been processed by grpc. 2016 for { 2017 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { 2018 break 2019 } 2020 time.Sleep(time.Millisecond) 2021 } 2022 2023 ctx, cancel := context.WithCancel(context.Background()) 2024 defer cancel() 2025 stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) 2026 if err != nil { 2027 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 2028 } 2029 2030 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0) 2031 if err != nil { 2032 t.Fatalf("failed to newPayload: %v", err) 2033 } 2034 req := &testpb.StreamingOutputCallRequest{ 2035 ResponseType: testpb.PayloadType_COMPRESSABLE, 2036 ResponseParameters: []*testpb.ResponseParameters{{Size: 0}}, 2037 Payload: payload, 2038 } 2039 if err := stream.Send(req); err != nil { 2040 t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err) 2041 } 2042 stream.CloseSend() 2043 time.Sleep(time.Second) 2044 // Sleep 1 second before recv to make sure the final status is received 2045 // before the recv. 2046 if _, err := stream.Recv(); err != nil { 2047 t.Fatalf("stream.Recv = _, %v, want _, <nil>", err) 2048 } 2049 // Keep reading to drain the stream. 2050 for { 2051 if _, err := stream.Recv(); err != nil { 2052 break 2053 } 2054 } 2055} 2056 2057func TestMaxMsgSizeClientDefault(t *testing.T) { 2058 defer leakcheck.Check(t) 2059 for _, e := range listTestEnv() { 2060 testMaxMsgSizeClientDefault(t, e) 2061 } 2062} 2063 2064func testMaxMsgSizeClientDefault(t *testing.T, e env) { 2065 te := newTest(t, e) 2066 te.userAgent = testAppUA 2067 te.declareLogNoise( 2068 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2069 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2070 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2071 "Failed to dial : context canceled; please retry.", 2072 ) 2073 te.startServer(&testServer{security: e.security}) 2074 2075 defer te.tearDown() 2076 tc := testpb.NewTestServiceClient(te.clientConn()) 2077 2078 const smallSize = 1 2079 const largeSize = 4 * 1024 * 1024 2080 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2081 if err != nil { 2082 t.Fatal(err) 2083 } 2084 req := &testpb.SimpleRequest{ 2085 ResponseType: testpb.PayloadType_COMPRESSABLE, 2086 ResponseSize: int32(largeSize), 2087 Payload: smallPayload, 2088 } 2089 // Test for unary RPC recv. 2090 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2091 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2092 } 2093 2094 respParam := []*testpb.ResponseParameters{ 2095 { 2096 Size: int32(largeSize), 2097 }, 2098 } 2099 sreq := &testpb.StreamingOutputCallRequest{ 2100 ResponseType: testpb.PayloadType_COMPRESSABLE, 2101 ResponseParameters: respParam, 2102 Payload: smallPayload, 2103 } 2104 2105 // Test for streaming RPC recv. 2106 stream, err := tc.FullDuplexCall(te.ctx) 2107 if err != nil { 2108 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2109 } 2110 if err := stream.Send(sreq); err != nil { 2111 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2112 } 2113 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2114 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2115 } 2116} 2117 2118func TestMaxMsgSizeClientAPI(t *testing.T) { 2119 defer leakcheck.Check(t) 2120 for _, e := range listTestEnv() { 2121 testMaxMsgSizeClientAPI(t, e) 2122 } 2123} 2124 2125func testMaxMsgSizeClientAPI(t *testing.T, e env) { 2126 te := newTest(t, e) 2127 te.userAgent = testAppUA 2128 // To avoid error on server side. 2129 te.maxServerSendMsgSize = newInt(5 * 1024 * 1024) 2130 te.maxClientReceiveMsgSize = newInt(1024) 2131 te.maxClientSendMsgSize = newInt(1024) 2132 te.declareLogNoise( 2133 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2134 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2135 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2136 "Failed to dial : context canceled; please retry.", 2137 ) 2138 te.startServer(&testServer{security: e.security}) 2139 2140 defer te.tearDown() 2141 tc := testpb.NewTestServiceClient(te.clientConn()) 2142 2143 const smallSize = 1 2144 const largeSize = 1024 2145 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2146 if err != nil { 2147 t.Fatal(err) 2148 } 2149 2150 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 2151 if err != nil { 2152 t.Fatal(err) 2153 } 2154 req := &testpb.SimpleRequest{ 2155 ResponseType: testpb.PayloadType_COMPRESSABLE, 2156 ResponseSize: int32(largeSize), 2157 Payload: smallPayload, 2158 } 2159 // Test for unary RPC recv. 2160 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2161 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2162 } 2163 2164 // Test for unary RPC send. 2165 req.Payload = largePayload 2166 req.ResponseSize = int32(smallSize) 2167 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2168 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2169 } 2170 2171 respParam := []*testpb.ResponseParameters{ 2172 { 2173 Size: int32(largeSize), 2174 }, 2175 } 2176 sreq := &testpb.StreamingOutputCallRequest{ 2177 ResponseType: testpb.PayloadType_COMPRESSABLE, 2178 ResponseParameters: respParam, 2179 Payload: smallPayload, 2180 } 2181 2182 // Test for streaming RPC recv. 2183 stream, err := tc.FullDuplexCall(te.ctx) 2184 if err != nil { 2185 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2186 } 2187 if err := stream.Send(sreq); err != nil { 2188 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2189 } 2190 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2191 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2192 } 2193 2194 // Test for streaming RPC send. 2195 respParam[0].Size = int32(smallSize) 2196 sreq.Payload = largePayload 2197 stream, err = tc.FullDuplexCall(te.ctx) 2198 if err != nil { 2199 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2200 } 2201 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 2202 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 2203 } 2204} 2205 2206func TestMaxMsgSizeServerAPI(t *testing.T) { 2207 defer leakcheck.Check(t) 2208 for _, e := range listTestEnv() { 2209 testMaxMsgSizeServerAPI(t, e) 2210 } 2211} 2212 2213func testMaxMsgSizeServerAPI(t *testing.T, e env) { 2214 te := newTest(t, e) 2215 te.userAgent = testAppUA 2216 te.maxServerReceiveMsgSize = newInt(1024) 2217 te.maxServerSendMsgSize = newInt(1024) 2218 te.declareLogNoise( 2219 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2220 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2221 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2222 "Failed to dial : context canceled; please retry.", 2223 ) 2224 te.startServer(&testServer{security: e.security}) 2225 2226 defer te.tearDown() 2227 tc := testpb.NewTestServiceClient(te.clientConn()) 2228 2229 const smallSize = 1 2230 const largeSize = 1024 2231 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2232 if err != nil { 2233 t.Fatal(err) 2234 } 2235 2236 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 2237 if err != nil { 2238 t.Fatal(err) 2239 } 2240 req := &testpb.SimpleRequest{ 2241 ResponseType: testpb.PayloadType_COMPRESSABLE, 2242 ResponseSize: int32(largeSize), 2243 Payload: smallPayload, 2244 } 2245 // Test for unary RPC send. 2246 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2247 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2248 } 2249 2250 // Test for unary RPC recv. 2251 req.Payload = largePayload 2252 req.ResponseSize = int32(smallSize) 2253 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2254 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2255 } 2256 2257 respParam := []*testpb.ResponseParameters{ 2258 { 2259 Size: int32(largeSize), 2260 }, 2261 } 2262 sreq := &testpb.StreamingOutputCallRequest{ 2263 ResponseType: testpb.PayloadType_COMPRESSABLE, 2264 ResponseParameters: respParam, 2265 Payload: smallPayload, 2266 } 2267 2268 // Test for streaming RPC send. 2269 stream, err := tc.FullDuplexCall(te.ctx) 2270 if err != nil { 2271 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2272 } 2273 if err := stream.Send(sreq); err != nil { 2274 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2275 } 2276 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2277 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2278 } 2279 2280 // Test for streaming RPC recv. 2281 respParam[0].Size = int32(smallSize) 2282 sreq.Payload = largePayload 2283 stream, err = tc.FullDuplexCall(te.ctx) 2284 if err != nil { 2285 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2286 } 2287 if err := stream.Send(sreq); err != nil { 2288 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2289 } 2290 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2291 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2292 } 2293} 2294 2295func TestTap(t *testing.T) { 2296 defer leakcheck.Check(t) 2297 for _, e := range listTestEnv() { 2298 if e.name == "handler-tls" { 2299 continue 2300 } 2301 testTap(t, e) 2302 } 2303} 2304 2305type myTap struct { 2306 cnt int 2307} 2308 2309func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) { 2310 if info != nil { 2311 if info.FullMethodName == "/grpc.testing.TestService/EmptyCall" { 2312 t.cnt++ 2313 } else if info.FullMethodName == "/grpc.testing.TestService/UnaryCall" { 2314 return nil, fmt.Errorf("tap error") 2315 } 2316 } 2317 return ctx, nil 2318} 2319 2320func testTap(t *testing.T, e env) { 2321 te := newTest(t, e) 2322 te.userAgent = testAppUA 2323 ttap := &myTap{} 2324 te.tapHandle = ttap.handle 2325 te.declareLogNoise( 2326 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2327 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2328 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2329 ) 2330 te.startServer(&testServer{security: e.security}) 2331 defer te.tearDown() 2332 2333 cc := te.clientConn() 2334 tc := testpb.NewTestServiceClient(cc) 2335 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 2336 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 2337 } 2338 if ttap.cnt != 1 { 2339 t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt) 2340 } 2341 2342 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31) 2343 if err != nil { 2344 t.Fatal(err) 2345 } 2346 2347 req := &testpb.SimpleRequest{ 2348 ResponseType: testpb.PayloadType_COMPRESSABLE, 2349 ResponseSize: 45, 2350 Payload: payload, 2351 } 2352 if _, err := tc.UnaryCall(context.Background(), req); status.Code(err) != codes.Unavailable { 2353 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) 2354 } 2355} 2356 2357func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { 2358 ctx, cancel := context.WithTimeout(context.Background(), d) 2359 defer cancel() 2360 hc := healthgrpc.NewHealthClient(cc) 2361 req := &healthpb.HealthCheckRequest{ 2362 Service: serviceName, 2363 } 2364 return hc.Check(ctx, req) 2365} 2366 2367func TestHealthCheckOnSuccess(t *testing.T) { 2368 defer leakcheck.Check(t) 2369 for _, e := range listTestEnv() { 2370 testHealthCheckOnSuccess(t, e) 2371 } 2372} 2373 2374func testHealthCheckOnSuccess(t *testing.T, e env) { 2375 te := newTest(t, e) 2376 hs := health.NewServer() 2377 hs.SetServingStatus("grpc.health.v1.Health", 1) 2378 te.healthServer = hs 2379 te.startServer(&testServer{security: e.security}) 2380 defer te.tearDown() 2381 2382 cc := te.clientConn() 2383 if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); err != nil { 2384 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) 2385 } 2386} 2387 2388func TestHealthCheckOnFailure(t *testing.T) { 2389 defer leakcheck.Check(t) 2390 for _, e := range listTestEnv() { 2391 testHealthCheckOnFailure(t, e) 2392 } 2393} 2394 2395func testHealthCheckOnFailure(t *testing.T, e env) { 2396 defer leakcheck.Check(t) 2397 te := newTest(t, e) 2398 te.declareLogNoise( 2399 "Failed to dial ", 2400 "grpc: the client connection is closing; please retry", 2401 ) 2402 hs := health.NewServer() 2403 hs.SetServingStatus("grpc.health.v1.HealthCheck", 1) 2404 te.healthServer = hs 2405 te.startServer(&testServer{security: e.security}) 2406 defer te.tearDown() 2407 2408 cc := te.clientConn() 2409 wantErr := status.Error(codes.DeadlineExceeded, "context deadline exceeded") 2410 if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) { 2411 t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.DeadlineExceeded) 2412 } 2413 awaitNewConnLogOutput() 2414} 2415 2416func TestHealthCheckOff(t *testing.T) { 2417 defer leakcheck.Check(t) 2418 for _, e := range listTestEnv() { 2419 // TODO(bradfitz): Temporarily skip this env due to #619. 2420 if e.name == "handler-tls" { 2421 continue 2422 } 2423 testHealthCheckOff(t, e) 2424 } 2425} 2426 2427func testHealthCheckOff(t *testing.T, e env) { 2428 te := newTest(t, e) 2429 te.startServer(&testServer{security: e.security}) 2430 defer te.tearDown() 2431 want := status.Error(codes.Unimplemented, "unknown service grpc.health.v1.Health") 2432 if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) { 2433 t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want) 2434 } 2435} 2436 2437func TestHealthWatchMultipleClients(t *testing.T) { 2438 defer leakcheck.Check(t) 2439 for _, e := range listTestEnv() { 2440 testHealthWatchMultipleClients(t, e) 2441 } 2442} 2443 2444func testHealthWatchMultipleClients(t *testing.T, e env) { 2445 const service = "grpc.health.v1.Health1" 2446 2447 hs := health.NewServer() 2448 2449 te := newTest(t, e) 2450 te.healthServer = hs 2451 te.startServer(&testServer{security: e.security}) 2452 defer te.tearDown() 2453 2454 cc := te.clientConn() 2455 hc := healthgrpc.NewHealthClient(cc) 2456 2457 ctx, cancel := context.WithCancel(context.Background()) 2458 defer cancel() 2459 2460 req := &healthpb.HealthCheckRequest{ 2461 Service: service, 2462 } 2463 2464 stream1, err := hc.Watch(ctx, req) 2465 if err != nil { 2466 t.Fatalf("error: %v", err) 2467 } 2468 2469 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2470 2471 stream2, err := hc.Watch(ctx, req) 2472 if err != nil { 2473 t.Fatalf("error: %v", err) 2474 } 2475 2476 healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2477 2478 hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING) 2479 2480 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING) 2481 healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING) 2482} 2483 2484func TestHealthWatchSameStatus(t *testing.T) { 2485 defer leakcheck.Check(t) 2486 for _, e := range listTestEnv() { 2487 testHealthWatchSameStatus(t, e) 2488 } 2489} 2490 2491func testHealthWatchSameStatus(t *testing.T, e env) { 2492 const service = "grpc.health.v1.Health1" 2493 2494 hs := health.NewServer() 2495 2496 te := newTest(t, e) 2497 te.healthServer = hs 2498 te.startServer(&testServer{security: e.security}) 2499 defer te.tearDown() 2500 2501 cc := te.clientConn() 2502 hc := healthgrpc.NewHealthClient(cc) 2503 2504 ctx, cancel := context.WithCancel(context.Background()) 2505 defer cancel() 2506 2507 req := &healthpb.HealthCheckRequest{ 2508 Service: service, 2509 } 2510 2511 stream1, err := hc.Watch(ctx, req) 2512 if err != nil { 2513 t.Fatalf("error: %v", err) 2514 } 2515 2516 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2517 2518 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2519 2520 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVING) 2521 2522 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2523 hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING) 2524 2525 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING) 2526} 2527 2528func TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) { 2529 defer leakcheck.Check(t) 2530 for _, e := range listTestEnv() { 2531 testHealthWatchSetServiceStatusBeforeStartingServer(t, e) 2532 } 2533} 2534 2535func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) { 2536 const service = "grpc.health.v1.Health1" 2537 2538 hs := health.NewServer() 2539 2540 te := newTest(t, e) 2541 te.healthServer = hs 2542 2543 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2544 2545 te.startServer(&testServer{security: e.security}) 2546 defer te.tearDown() 2547 2548 cc := te.clientConn() 2549 hc := healthgrpc.NewHealthClient(cc) 2550 2551 ctx, cancel := context.WithCancel(context.Background()) 2552 defer cancel() 2553 2554 req := &healthpb.HealthCheckRequest{ 2555 Service: service, 2556 } 2557 2558 stream, err := hc.Watch(ctx, req) 2559 if err != nil { 2560 t.Fatalf("error: %v", err) 2561 } 2562 2563 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2564} 2565 2566func TestHealthWatchDefaultStatusChange(t *testing.T) { 2567 defer leakcheck.Check(t) 2568 for _, e := range listTestEnv() { 2569 testHealthWatchDefaultStatusChange(t, e) 2570 } 2571} 2572 2573func testHealthWatchDefaultStatusChange(t *testing.T, e env) { 2574 const service = "grpc.health.v1.Health1" 2575 2576 hs := health.NewServer() 2577 2578 te := newTest(t, e) 2579 te.healthServer = hs 2580 te.startServer(&testServer{security: e.security}) 2581 defer te.tearDown() 2582 2583 cc := te.clientConn() 2584 hc := healthgrpc.NewHealthClient(cc) 2585 2586 ctx, cancel := context.WithCancel(context.Background()) 2587 defer cancel() 2588 2589 req := &healthpb.HealthCheckRequest{ 2590 Service: service, 2591 } 2592 2593 stream, err := hc.Watch(ctx, req) 2594 if err != nil { 2595 t.Fatalf("error: %v", err) 2596 } 2597 2598 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2599 2600 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2601 2602 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2603} 2604 2605func TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) { 2606 defer leakcheck.Check(t) 2607 for _, e := range listTestEnv() { 2608 testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e) 2609 } 2610} 2611 2612func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) { 2613 const service = "grpc.health.v1.Health1" 2614 2615 hs := health.NewServer() 2616 2617 te := newTest(t, e) 2618 te.healthServer = hs 2619 te.startServer(&testServer{security: e.security}) 2620 defer te.tearDown() 2621 2622 cc := te.clientConn() 2623 hc := healthgrpc.NewHealthClient(cc) 2624 2625 ctx, cancel := context.WithCancel(context.Background()) 2626 defer cancel() 2627 2628 req := &healthpb.HealthCheckRequest{ 2629 Service: service, 2630 } 2631 2632 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2633 2634 stream, err := hc.Watch(ctx, req) 2635 if err != nil { 2636 t.Fatalf("error: %v", err) 2637 } 2638 2639 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2640} 2641 2642func TestHealthWatchOverallServerHealthChange(t *testing.T) { 2643 defer leakcheck.Check(t) 2644 for _, e := range listTestEnv() { 2645 testHealthWatchOverallServerHealthChange(t, e) 2646 } 2647} 2648 2649func testHealthWatchOverallServerHealthChange(t *testing.T, e env) { 2650 const service = "" 2651 2652 hs := health.NewServer() 2653 2654 te := newTest(t, e) 2655 te.healthServer = hs 2656 te.startServer(&testServer{security: e.security}) 2657 defer te.tearDown() 2658 2659 cc := te.clientConn() 2660 hc := healthgrpc.NewHealthClient(cc) 2661 2662 ctx, cancel := context.WithCancel(context.Background()) 2663 defer cancel() 2664 2665 req := &healthpb.HealthCheckRequest{ 2666 Service: service, 2667 } 2668 2669 stream, err := hc.Watch(ctx, req) 2670 if err != nil { 2671 t.Fatalf("error: %v", err) 2672 } 2673 2674 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2675 2676 hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING) 2677 2678 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING) 2679} 2680 2681func healthWatchChecker(t *testing.T, stream healthgrpc.Health_WatchClient, expectedServingStatus healthpb.HealthCheckResponse_ServingStatus) { 2682 response, err := stream.Recv() 2683 if err != nil { 2684 t.Fatalf("error on %v.Recv(): %v", stream, err) 2685 } 2686 if response.Status != expectedServingStatus { 2687 t.Fatalf("response.Status is %v (%v expected)", response.Status, expectedServingStatus) 2688 } 2689} 2690 2691func TestUnknownHandler(t *testing.T) { 2692 defer leakcheck.Check(t) 2693 // An example unknownHandler that returns a different code and a different method, making sure that we do not 2694 // expose what methods are implemented to a client that is not authenticated. 2695 unknownHandler := func(srv interface{}, stream grpc.ServerStream) error { 2696 return status.Error(codes.Unauthenticated, "user unauthenticated") 2697 } 2698 for _, e := range listTestEnv() { 2699 // TODO(bradfitz): Temporarily skip this env due to #619. 2700 if e.name == "handler-tls" { 2701 continue 2702 } 2703 testUnknownHandler(t, e, unknownHandler) 2704 } 2705} 2706 2707func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) { 2708 te := newTest(t, e) 2709 te.unknownHandler = unknownHandler 2710 te.startServer(&testServer{security: e.security}) 2711 defer te.tearDown() 2712 want := status.Error(codes.Unauthenticated, "user unauthenticated") 2713 if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) { 2714 t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want) 2715 } 2716} 2717 2718func TestHealthCheckServingStatus(t *testing.T) { 2719 defer leakcheck.Check(t) 2720 for _, e := range listTestEnv() { 2721 testHealthCheckServingStatus(t, e) 2722 } 2723} 2724 2725func testHealthCheckServingStatus(t *testing.T, e env) { 2726 te := newTest(t, e) 2727 hs := health.NewServer() 2728 te.healthServer = hs 2729 te.startServer(&testServer{security: e.security}) 2730 defer te.tearDown() 2731 2732 cc := te.clientConn() 2733 out, err := healthCheck(1*time.Second, cc, "") 2734 if err != nil { 2735 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) 2736 } 2737 if out.Status != healthpb.HealthCheckResponse_SERVING { 2738 t.Fatalf("Got the serving status %v, want SERVING", out.Status) 2739 } 2740 wantErr := status.Error(codes.NotFound, "unknown service") 2741 if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) { 2742 t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.NotFound) 2743 } 2744 hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_SERVING) 2745 out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health") 2746 if err != nil { 2747 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) 2748 } 2749 if out.Status != healthpb.HealthCheckResponse_SERVING { 2750 t.Fatalf("Got the serving status %v, want SERVING", out.Status) 2751 } 2752 hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_NOT_SERVING) 2753 out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health") 2754 if err != nil { 2755 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) 2756 } 2757 if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { 2758 t.Fatalf("Got the serving status %v, want NOT_SERVING", out.Status) 2759 } 2760 2761} 2762 2763func TestEmptyUnaryWithUserAgent(t *testing.T) { 2764 defer leakcheck.Check(t) 2765 for _, e := range listTestEnv() { 2766 testEmptyUnaryWithUserAgent(t, e) 2767 } 2768} 2769 2770func testEmptyUnaryWithUserAgent(t *testing.T, e env) { 2771 te := newTest(t, e) 2772 te.userAgent = testAppUA 2773 te.startServer(&testServer{security: e.security}) 2774 defer te.tearDown() 2775 2776 cc := te.clientConn() 2777 tc := testpb.NewTestServiceClient(cc) 2778 var header metadata.MD 2779 reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header)) 2780 if err != nil || !proto.Equal(&testpb.Empty{}, reply) { 2781 t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{}) 2782 } 2783 if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) { 2784 t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA) 2785 } 2786 2787 te.srv.Stop() 2788} 2789 2790func TestFailedEmptyUnary(t *testing.T) { 2791 defer leakcheck.Check(t) 2792 for _, e := range listTestEnv() { 2793 if e.name == "handler-tls" { 2794 // This test covers status details, but 2795 // Grpc-Status-Details-Bin is not support in handler_server. 2796 continue 2797 } 2798 testFailedEmptyUnary(t, e) 2799 } 2800} 2801 2802func testFailedEmptyUnary(t *testing.T, e env) { 2803 te := newTest(t, e) 2804 te.userAgent = failAppUA 2805 te.startServer(&testServer{security: e.security}) 2806 defer te.tearDown() 2807 tc := testpb.NewTestServiceClient(te.clientConn()) 2808 2809 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 2810 wantErr := detailedError 2811 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !reflect.DeepEqual(err, wantErr) { 2812 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr) 2813 } 2814} 2815 2816func TestLargeUnary(t *testing.T) { 2817 defer leakcheck.Check(t) 2818 for _, e := range listTestEnv() { 2819 testLargeUnary(t, e) 2820 } 2821} 2822 2823func testLargeUnary(t *testing.T, e env) { 2824 te := newTest(t, e) 2825 te.startServer(&testServer{security: e.security}) 2826 defer te.tearDown() 2827 tc := testpb.NewTestServiceClient(te.clientConn()) 2828 2829 const argSize = 271828 2830 const respSize = 314159 2831 2832 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 2833 if err != nil { 2834 t.Fatal(err) 2835 } 2836 2837 req := &testpb.SimpleRequest{ 2838 ResponseType: testpb.PayloadType_COMPRESSABLE, 2839 ResponseSize: respSize, 2840 Payload: payload, 2841 } 2842 reply, err := tc.UnaryCall(context.Background(), req) 2843 if err != nil { 2844 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 2845 } 2846 pt := reply.GetPayload().GetType() 2847 ps := len(reply.GetPayload().GetBody()) 2848 if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize { 2849 t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize) 2850 } 2851} 2852 2853// Test backward-compatibility API for setting msg size limit. 2854func TestExceedMsgLimit(t *testing.T) { 2855 defer leakcheck.Check(t) 2856 for _, e := range listTestEnv() { 2857 testExceedMsgLimit(t, e) 2858 } 2859} 2860 2861func testExceedMsgLimit(t *testing.T, e env) { 2862 te := newTest(t, e) 2863 te.maxMsgSize = newInt(1024) 2864 te.startServer(&testServer{security: e.security}) 2865 defer te.tearDown() 2866 tc := testpb.NewTestServiceClient(te.clientConn()) 2867 2868 argSize := int32(*te.maxMsgSize + 1) 2869 const smallSize = 1 2870 2871 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 2872 if err != nil { 2873 t.Fatal(err) 2874 } 2875 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2876 if err != nil { 2877 t.Fatal(err) 2878 } 2879 2880 // Test on server side for unary RPC. 2881 req := &testpb.SimpleRequest{ 2882 ResponseType: testpb.PayloadType_COMPRESSABLE, 2883 ResponseSize: smallSize, 2884 Payload: payload, 2885 } 2886 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2887 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2888 } 2889 // Test on client side for unary RPC. 2890 req.ResponseSize = int32(*te.maxMsgSize) + 1 2891 req.Payload = smallPayload 2892 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2893 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2894 } 2895 2896 // Test on server side for streaming RPC. 2897 stream, err := tc.FullDuplexCall(te.ctx) 2898 if err != nil { 2899 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2900 } 2901 respParam := []*testpb.ResponseParameters{ 2902 { 2903 Size: 1, 2904 }, 2905 } 2906 2907 spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1)) 2908 if err != nil { 2909 t.Fatal(err) 2910 } 2911 2912 sreq := &testpb.StreamingOutputCallRequest{ 2913 ResponseType: testpb.PayloadType_COMPRESSABLE, 2914 ResponseParameters: respParam, 2915 Payload: spayload, 2916 } 2917 if err := stream.Send(sreq); err != nil { 2918 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2919 } 2920 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2921 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2922 } 2923 2924 // Test on client side for streaming RPC. 2925 stream, err = tc.FullDuplexCall(te.ctx) 2926 if err != nil { 2927 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2928 } 2929 respParam[0].Size = int32(*te.maxMsgSize) + 1 2930 sreq.Payload = smallPayload 2931 if err := stream.Send(sreq); err != nil { 2932 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2933 } 2934 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2935 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2936 } 2937 2938} 2939 2940func TestPeerClientSide(t *testing.T) { 2941 defer leakcheck.Check(t) 2942 for _, e := range listTestEnv() { 2943 testPeerClientSide(t, e) 2944 } 2945} 2946 2947func testPeerClientSide(t *testing.T, e env) { 2948 te := newTest(t, e) 2949 te.userAgent = testAppUA 2950 te.startServer(&testServer{security: e.security}) 2951 defer te.tearDown() 2952 tc := testpb.NewTestServiceClient(te.clientConn()) 2953 peer := new(peer.Peer) 2954 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.FailFast(false)); err != nil { 2955 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 2956 } 2957 pa := peer.Addr.String() 2958 if e.network == "unix" { 2959 if pa != te.srvAddr { 2960 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr) 2961 } 2962 return 2963 } 2964 _, pp, err := net.SplitHostPort(pa) 2965 if err != nil { 2966 t.Fatalf("Failed to parse address from peer.") 2967 } 2968 _, sp, err := net.SplitHostPort(te.srvAddr) 2969 if err != nil { 2970 t.Fatalf("Failed to parse address of test server.") 2971 } 2972 if pp != sp { 2973 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp) 2974 } 2975} 2976 2977// TestPeerNegative tests that if call fails setting peer 2978// doesn't cause a segmentation fault. 2979// issue#1141 https://github.com/grpc/grpc-go/issues/1141 2980func TestPeerNegative(t *testing.T) { 2981 defer leakcheck.Check(t) 2982 for _, e := range listTestEnv() { 2983 testPeerNegative(t, e) 2984 } 2985} 2986 2987func testPeerNegative(t *testing.T, e env) { 2988 te := newTest(t, e) 2989 te.startServer(&testServer{security: e.security}) 2990 defer te.tearDown() 2991 2992 cc := te.clientConn() 2993 tc := testpb.NewTestServiceClient(cc) 2994 peer := new(peer.Peer) 2995 ctx, cancel := context.WithCancel(context.Background()) 2996 cancel() 2997 tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)) 2998} 2999 3000func TestPeerFailedRPC(t *testing.T) { 3001 defer leakcheck.Check(t) 3002 for _, e := range listTestEnv() { 3003 testPeerFailedRPC(t, e) 3004 } 3005} 3006 3007func testPeerFailedRPC(t *testing.T, e env) { 3008 te := newTest(t, e) 3009 te.maxServerReceiveMsgSize = newInt(1 * 1024) 3010 te.startServer(&testServer{security: e.security}) 3011 3012 defer te.tearDown() 3013 tc := testpb.NewTestServiceClient(te.clientConn()) 3014 3015 // first make a successful request to the server 3016 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 3017 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 3018 } 3019 3020 // make a second request that will be rejected by the server 3021 const largeSize = 5 * 1024 3022 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 3023 if err != nil { 3024 t.Fatal(err) 3025 } 3026 req := &testpb.SimpleRequest{ 3027 ResponseType: testpb.PayloadType_COMPRESSABLE, 3028 Payload: largePayload, 3029 } 3030 3031 peer := new(peer.Peer) 3032 if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted { 3033 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 3034 } else { 3035 pa := peer.Addr.String() 3036 if e.network == "unix" { 3037 if pa != te.srvAddr { 3038 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr) 3039 } 3040 return 3041 } 3042 _, pp, err := net.SplitHostPort(pa) 3043 if err != nil { 3044 t.Fatalf("Failed to parse address from peer.") 3045 } 3046 _, sp, err := net.SplitHostPort(te.srvAddr) 3047 if err != nil { 3048 t.Fatalf("Failed to parse address of test server.") 3049 } 3050 if pp != sp { 3051 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp) 3052 } 3053 } 3054} 3055 3056func TestMetadataUnaryRPC(t *testing.T) { 3057 defer leakcheck.Check(t) 3058 for _, e := range listTestEnv() { 3059 testMetadataUnaryRPC(t, e) 3060 } 3061} 3062 3063func testMetadataUnaryRPC(t *testing.T, e env) { 3064 te := newTest(t, e) 3065 te.startServer(&testServer{security: e.security}) 3066 defer te.tearDown() 3067 tc := testpb.NewTestServiceClient(te.clientConn()) 3068 3069 const argSize = 2718 3070 const respSize = 314 3071 3072 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3073 if err != nil { 3074 t.Fatal(err) 3075 } 3076 3077 req := &testpb.SimpleRequest{ 3078 ResponseType: testpb.PayloadType_COMPRESSABLE, 3079 ResponseSize: respSize, 3080 Payload: payload, 3081 } 3082 var header, trailer metadata.MD 3083 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3084 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil { 3085 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3086 } 3087 // Ignore optional response headers that Servers may set: 3088 if header != nil { 3089 delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers 3090 delete(header, "date") // the Date header is also optional 3091 delete(header, "user-agent") 3092 delete(header, "content-type") 3093 } 3094 if !reflect.DeepEqual(header, testMetadata) { 3095 t.Fatalf("Received header metadata %v, want %v", header, testMetadata) 3096 } 3097 if !reflect.DeepEqual(trailer, testTrailerMetadata) { 3098 t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata) 3099 } 3100} 3101 3102func TestMultipleSetTrailerUnaryRPC(t *testing.T) { 3103 defer leakcheck.Check(t) 3104 for _, e := range listTestEnv() { 3105 testMultipleSetTrailerUnaryRPC(t, e) 3106 } 3107} 3108 3109func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) { 3110 te := newTest(t, e) 3111 te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) 3112 defer te.tearDown() 3113 tc := testpb.NewTestServiceClient(te.clientConn()) 3114 3115 const ( 3116 argSize = 1 3117 respSize = 1 3118 ) 3119 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3120 if err != nil { 3121 t.Fatal(err) 3122 } 3123 3124 req := &testpb.SimpleRequest{ 3125 ResponseType: testpb.PayloadType_COMPRESSABLE, 3126 ResponseSize: respSize, 3127 Payload: payload, 3128 } 3129 var trailer metadata.MD 3130 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3131 if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.FailFast(false)); err != nil { 3132 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3133 } 3134 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2) 3135 if !reflect.DeepEqual(trailer, expectedTrailer) { 3136 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer) 3137 } 3138} 3139 3140func TestMultipleSetTrailerStreamingRPC(t *testing.T) { 3141 defer leakcheck.Check(t) 3142 for _, e := range listTestEnv() { 3143 testMultipleSetTrailerStreamingRPC(t, e) 3144 } 3145} 3146 3147func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) { 3148 te := newTest(t, e) 3149 te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) 3150 defer te.tearDown() 3151 tc := testpb.NewTestServiceClient(te.clientConn()) 3152 3153 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3154 stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) 3155 if err != nil { 3156 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3157 } 3158 if err := stream.CloseSend(); err != nil { 3159 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3160 } 3161 if _, err := stream.Recv(); err != io.EOF { 3162 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) 3163 } 3164 3165 trailer := stream.Trailer() 3166 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2) 3167 if !reflect.DeepEqual(trailer, expectedTrailer) { 3168 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer) 3169 } 3170} 3171 3172func TestSetAndSendHeaderUnaryRPC(t *testing.T) { 3173 defer leakcheck.Check(t) 3174 for _, e := range listTestEnv() { 3175 if e.name == "handler-tls" { 3176 continue 3177 } 3178 testSetAndSendHeaderUnaryRPC(t, e) 3179 } 3180} 3181 3182// To test header metadata is sent on SendHeader(). 3183func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) { 3184 te := newTest(t, e) 3185 te.startServer(&testServer{security: e.security, setAndSendHeader: true}) 3186 defer te.tearDown() 3187 tc := testpb.NewTestServiceClient(te.clientConn()) 3188 3189 const ( 3190 argSize = 1 3191 respSize = 1 3192 ) 3193 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3194 if err != nil { 3195 t.Fatal(err) 3196 } 3197 3198 req := &testpb.SimpleRequest{ 3199 ResponseType: testpb.PayloadType_COMPRESSABLE, 3200 ResponseSize: respSize, 3201 Payload: payload, 3202 } 3203 var header metadata.MD 3204 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3205 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil { 3206 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3207 } 3208 delete(header, "user-agent") 3209 delete(header, "content-type") 3210 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3211 if !reflect.DeepEqual(header, expectedHeader) { 3212 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3213 } 3214} 3215 3216func TestMultipleSetHeaderUnaryRPC(t *testing.T) { 3217 defer leakcheck.Check(t) 3218 for _, e := range listTestEnv() { 3219 if e.name == "handler-tls" { 3220 continue 3221 } 3222 testMultipleSetHeaderUnaryRPC(t, e) 3223 } 3224} 3225 3226// To test header metadata is sent when sending response. 3227func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) { 3228 te := newTest(t, e) 3229 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3230 defer te.tearDown() 3231 tc := testpb.NewTestServiceClient(te.clientConn()) 3232 3233 const ( 3234 argSize = 1 3235 respSize = 1 3236 ) 3237 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3238 if err != nil { 3239 t.Fatal(err) 3240 } 3241 3242 req := &testpb.SimpleRequest{ 3243 ResponseType: testpb.PayloadType_COMPRESSABLE, 3244 ResponseSize: respSize, 3245 Payload: payload, 3246 } 3247 3248 var header metadata.MD 3249 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3250 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil { 3251 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3252 } 3253 delete(header, "user-agent") 3254 delete(header, "content-type") 3255 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3256 if !reflect.DeepEqual(header, expectedHeader) { 3257 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3258 } 3259} 3260 3261func TestMultipleSetHeaderUnaryRPCError(t *testing.T) { 3262 defer leakcheck.Check(t) 3263 for _, e := range listTestEnv() { 3264 if e.name == "handler-tls" { 3265 continue 3266 } 3267 testMultipleSetHeaderUnaryRPCError(t, e) 3268 } 3269} 3270 3271// To test header metadata is sent when sending status. 3272func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) { 3273 te := newTest(t, e) 3274 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3275 defer te.tearDown() 3276 tc := testpb.NewTestServiceClient(te.clientConn()) 3277 3278 const ( 3279 argSize = 1 3280 respSize = -1 // Invalid respSize to make RPC fail. 3281 ) 3282 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3283 if err != nil { 3284 t.Fatal(err) 3285 } 3286 3287 req := &testpb.SimpleRequest{ 3288 ResponseType: testpb.PayloadType_COMPRESSABLE, 3289 ResponseSize: respSize, 3290 Payload: payload, 3291 } 3292 var header metadata.MD 3293 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3294 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err == nil { 3295 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err) 3296 } 3297 delete(header, "user-agent") 3298 delete(header, "content-type") 3299 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3300 if !reflect.DeepEqual(header, expectedHeader) { 3301 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3302 } 3303} 3304 3305func TestSetAndSendHeaderStreamingRPC(t *testing.T) { 3306 defer leakcheck.Check(t) 3307 for _, e := range listTestEnv() { 3308 if e.name == "handler-tls" { 3309 continue 3310 } 3311 testSetAndSendHeaderStreamingRPC(t, e) 3312 } 3313} 3314 3315// To test header metadata is sent on SendHeader(). 3316func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) { 3317 te := newTest(t, e) 3318 te.startServer(&testServer{security: e.security, setAndSendHeader: true}) 3319 defer te.tearDown() 3320 tc := testpb.NewTestServiceClient(te.clientConn()) 3321 3322 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3323 stream, err := tc.FullDuplexCall(ctx) 3324 if err != nil { 3325 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3326 } 3327 if err := stream.CloseSend(); err != nil { 3328 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3329 } 3330 if _, err := stream.Recv(); err != io.EOF { 3331 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) 3332 } 3333 3334 header, err := stream.Header() 3335 if err != nil { 3336 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) 3337 } 3338 delete(header, "user-agent") 3339 delete(header, "content-type") 3340 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3341 if !reflect.DeepEqual(header, expectedHeader) { 3342 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3343 } 3344} 3345 3346func TestMultipleSetHeaderStreamingRPC(t *testing.T) { 3347 defer leakcheck.Check(t) 3348 for _, e := range listTestEnv() { 3349 if e.name == "handler-tls" { 3350 continue 3351 } 3352 testMultipleSetHeaderStreamingRPC(t, e) 3353 } 3354} 3355 3356// To test header metadata is sent when sending response. 3357func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) { 3358 te := newTest(t, e) 3359 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3360 defer te.tearDown() 3361 tc := testpb.NewTestServiceClient(te.clientConn()) 3362 3363 const ( 3364 argSize = 1 3365 respSize = 1 3366 ) 3367 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3368 stream, err := tc.FullDuplexCall(ctx) 3369 if err != nil { 3370 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3371 } 3372 3373 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3374 if err != nil { 3375 t.Fatal(err) 3376 } 3377 3378 req := &testpb.StreamingOutputCallRequest{ 3379 ResponseType: testpb.PayloadType_COMPRESSABLE, 3380 ResponseParameters: []*testpb.ResponseParameters{ 3381 {Size: respSize}, 3382 }, 3383 Payload: payload, 3384 } 3385 if err := stream.Send(req); err != nil { 3386 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3387 } 3388 if _, err := stream.Recv(); err != nil { 3389 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 3390 } 3391 if err := stream.CloseSend(); err != nil { 3392 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3393 } 3394 if _, err := stream.Recv(); err != io.EOF { 3395 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) 3396 } 3397 3398 header, err := stream.Header() 3399 if err != nil { 3400 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) 3401 } 3402 delete(header, "user-agent") 3403 delete(header, "content-type") 3404 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3405 if !reflect.DeepEqual(header, expectedHeader) { 3406 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3407 } 3408 3409} 3410 3411func TestMultipleSetHeaderStreamingRPCError(t *testing.T) { 3412 defer leakcheck.Check(t) 3413 for _, e := range listTestEnv() { 3414 if e.name == "handler-tls" { 3415 continue 3416 } 3417 testMultipleSetHeaderStreamingRPCError(t, e) 3418 } 3419} 3420 3421// To test header metadata is sent when sending status. 3422func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) { 3423 te := newTest(t, e) 3424 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3425 defer te.tearDown() 3426 tc := testpb.NewTestServiceClient(te.clientConn()) 3427 3428 const ( 3429 argSize = 1 3430 respSize = -1 3431 ) 3432 ctx, cancel := context.WithCancel(context.Background()) 3433 defer cancel() 3434 ctx = metadata.NewOutgoingContext(ctx, testMetadata) 3435 stream, err := tc.FullDuplexCall(ctx) 3436 if err != nil { 3437 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3438 } 3439 3440 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3441 if err != nil { 3442 t.Fatal(err) 3443 } 3444 3445 req := &testpb.StreamingOutputCallRequest{ 3446 ResponseType: testpb.PayloadType_COMPRESSABLE, 3447 ResponseParameters: []*testpb.ResponseParameters{ 3448 {Size: respSize}, 3449 }, 3450 Payload: payload, 3451 } 3452 if err := stream.Send(req); err != nil { 3453 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3454 } 3455 if _, err := stream.Recv(); err == nil { 3456 t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err) 3457 } 3458 3459 header, err := stream.Header() 3460 if err != nil { 3461 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) 3462 } 3463 delete(header, "user-agent") 3464 delete(header, "content-type") 3465 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3466 if !reflect.DeepEqual(header, expectedHeader) { 3467 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3468 } 3469 if err := stream.CloseSend(); err != nil { 3470 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3471 } 3472} 3473 3474// TestMalformedHTTP2Metedata verfies the returned error when the client 3475// sends an illegal metadata. 3476func TestMalformedHTTP2Metadata(t *testing.T) { 3477 defer leakcheck.Check(t) 3478 for _, e := range listTestEnv() { 3479 if e.name == "handler-tls" { 3480 // Failed with "server stops accepting new RPCs". 3481 // Server stops accepting new RPCs when the client sends an illegal http2 header. 3482 continue 3483 } 3484 testMalformedHTTP2Metadata(t, e) 3485 } 3486} 3487 3488func testMalformedHTTP2Metadata(t *testing.T, e env) { 3489 te := newTest(t, e) 3490 te.startServer(&testServer{security: e.security}) 3491 defer te.tearDown() 3492 tc := testpb.NewTestServiceClient(te.clientConn()) 3493 3494 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718) 3495 if err != nil { 3496 t.Fatal(err) 3497 } 3498 3499 req := &testpb.SimpleRequest{ 3500 ResponseType: testpb.PayloadType_COMPRESSABLE, 3501 ResponseSize: 314, 3502 Payload: payload, 3503 } 3504 ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata) 3505 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Internal { 3506 t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal) 3507 } 3508} 3509 3510func TestTransparentRetry(t *testing.T) { 3511 defer leakcheck.Check(t) 3512 for _, e := range listTestEnv() { 3513 if e.name == "handler-tls" { 3514 // Fails with RST_STREAM / FLOW_CONTROL_ERROR 3515 continue 3516 } 3517 testTransparentRetry(t, e) 3518 } 3519} 3520 3521// This test makes sure RPCs are retried times when they receive a RST_STREAM 3522// with the REFUSED_STREAM error code, which the InTapHandle provokes. 3523func testTransparentRetry(t *testing.T, e env) { 3524 te := newTest(t, e) 3525 attempts := 0 3526 successAttempt := 2 3527 te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) { 3528 attempts++ 3529 if attempts < successAttempt { 3530 return nil, errors.New("not now") 3531 } 3532 return ctx, nil 3533 } 3534 te.startServer(&testServer{security: e.security}) 3535 defer te.tearDown() 3536 3537 cc := te.clientConn() 3538 tsc := testpb.NewTestServiceClient(cc) 3539 testCases := []struct { 3540 successAttempt int 3541 failFast bool 3542 errCode codes.Code 3543 }{{ 3544 successAttempt: 1, 3545 }, { 3546 successAttempt: 2, 3547 }, { 3548 successAttempt: 3, 3549 errCode: codes.Unavailable, 3550 }, { 3551 successAttempt: 1, 3552 failFast: true, 3553 }, { 3554 successAttempt: 2, 3555 failFast: true, 3556 errCode: codes.Unavailable, // We won't retry on fail fast. 3557 }} 3558 for _, tc := range testCases { 3559 attempts = 0 3560 successAttempt = tc.successAttempt 3561 3562 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 3563 _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(tc.failFast)) 3564 cancel() 3565 if status.Code(err) != tc.errCode { 3566 t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode) 3567 } 3568 } 3569} 3570 3571func TestCancel(t *testing.T) { 3572 defer leakcheck.Check(t) 3573 for _, e := range listTestEnv() { 3574 testCancel(t, e) 3575 } 3576} 3577 3578func testCancel(t *testing.T, e env) { 3579 te := newTest(t, e) 3580 te.declareLogNoise("grpc: the client connection is closing; please retry") 3581 te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second}) 3582 defer te.tearDown() 3583 3584 cc := te.clientConn() 3585 tc := testpb.NewTestServiceClient(cc) 3586 3587 const argSize = 2718 3588 const respSize = 314 3589 3590 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3591 if err != nil { 3592 t.Fatal(err) 3593 } 3594 3595 req := &testpb.SimpleRequest{ 3596 ResponseType: testpb.PayloadType_COMPRESSABLE, 3597 ResponseSize: respSize, 3598 Payload: payload, 3599 } 3600 ctx, cancel := context.WithCancel(context.Background()) 3601 time.AfterFunc(1*time.Millisecond, cancel) 3602 if r, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Canceled { 3603 t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled) 3604 } 3605 awaitNewConnLogOutput() 3606} 3607 3608func TestCancelNoIO(t *testing.T) { 3609 defer leakcheck.Check(t) 3610 for _, e := range listTestEnv() { 3611 testCancelNoIO(t, e) 3612 } 3613} 3614 3615func testCancelNoIO(t *testing.T, e env) { 3616 te := newTest(t, e) 3617 te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken") 3618 te.maxStream = 1 // Only allows 1 live stream per server transport. 3619 te.startServer(&testServer{security: e.security}) 3620 defer te.tearDown() 3621 3622 cc := te.clientConn() 3623 tc := testpb.NewTestServiceClient(cc) 3624 3625 // Start one blocked RPC for which we'll never send streaming 3626 // input. This will consume the 1 maximum concurrent streams, 3627 // causing future RPCs to hang. 3628 ctx, cancelFirst := context.WithCancel(context.Background()) 3629 _, err := tc.StreamingInputCall(ctx) 3630 if err != nil { 3631 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) 3632 } 3633 3634 // Loop until the ClientConn receives the initial settings 3635 // frame from the server, notifying it about the maximum 3636 // concurrent streams. We know when it's received it because 3637 // an RPC will fail with codes.DeadlineExceeded instead of 3638 // succeeding. 3639 // TODO(bradfitz): add internal test hook for this (Issue 534) 3640 for { 3641 ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond) 3642 _, err := tc.StreamingInputCall(ctx) 3643 cancelSecond() 3644 if err == nil { 3645 continue 3646 } 3647 if status.Code(err) == codes.DeadlineExceeded { 3648 break 3649 } 3650 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) 3651 } 3652 // If there are any RPCs in flight before the client receives 3653 // the max streams setting, let them be expired. 3654 // TODO(bradfitz): add internal test hook for this (Issue 534) 3655 time.Sleep(50 * time.Millisecond) 3656 3657 go func() { 3658 time.Sleep(50 * time.Millisecond) 3659 cancelFirst() 3660 }() 3661 3662 // This should be blocked until the 1st is canceled, then succeed. 3663 ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond) 3664 if _, err := tc.StreamingInputCall(ctx); err != nil { 3665 t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) 3666 } 3667 cancelThird() 3668} 3669 3670// The following tests the gRPC streaming RPC implementations. 3671// TODO(zhaoq): Have better coverage on error cases. 3672var ( 3673 reqSizes = []int{27182, 8, 1828, 45904} 3674 respSizes = []int{31415, 9, 2653, 58979} 3675) 3676 3677func TestNoService(t *testing.T) { 3678 defer leakcheck.Check(t) 3679 for _, e := range listTestEnv() { 3680 testNoService(t, e) 3681 } 3682} 3683 3684func testNoService(t *testing.T, e env) { 3685 te := newTest(t, e) 3686 te.startServer(nil) 3687 defer te.tearDown() 3688 3689 cc := te.clientConn() 3690 tc := testpb.NewTestServiceClient(cc) 3691 3692 stream, err := tc.FullDuplexCall(te.ctx, grpc.FailFast(false)) 3693 if err != nil { 3694 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3695 } 3696 if _, err := stream.Recv(); status.Code(err) != codes.Unimplemented { 3697 t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented) 3698 } 3699} 3700 3701func TestPingPong(t *testing.T) { 3702 defer leakcheck.Check(t) 3703 for _, e := range listTestEnv() { 3704 testPingPong(t, e) 3705 } 3706} 3707 3708func testPingPong(t *testing.T, e env) { 3709 te := newTest(t, e) 3710 te.startServer(&testServer{security: e.security}) 3711 defer te.tearDown() 3712 tc := testpb.NewTestServiceClient(te.clientConn()) 3713 3714 stream, err := tc.FullDuplexCall(te.ctx) 3715 if err != nil { 3716 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3717 } 3718 var index int 3719 for index < len(reqSizes) { 3720 respParam := []*testpb.ResponseParameters{ 3721 { 3722 Size: int32(respSizes[index]), 3723 }, 3724 } 3725 3726 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) 3727 if err != nil { 3728 t.Fatal(err) 3729 } 3730 3731 req := &testpb.StreamingOutputCallRequest{ 3732 ResponseType: testpb.PayloadType_COMPRESSABLE, 3733 ResponseParameters: respParam, 3734 Payload: payload, 3735 } 3736 if err := stream.Send(req); err != nil { 3737 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3738 } 3739 reply, err := stream.Recv() 3740 if err != nil { 3741 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 3742 } 3743 pt := reply.GetPayload().GetType() 3744 if pt != testpb.PayloadType_COMPRESSABLE { 3745 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) 3746 } 3747 size := len(reply.GetPayload().GetBody()) 3748 if size != int(respSizes[index]) { 3749 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) 3750 } 3751 index++ 3752 } 3753 if err := stream.CloseSend(); err != nil { 3754 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3755 } 3756 if _, err := stream.Recv(); err != io.EOF { 3757 t.Fatalf("%v failed to complele the ping pong test: %v", stream, err) 3758 } 3759} 3760 3761func TestMetadataStreamingRPC(t *testing.T) { 3762 defer leakcheck.Check(t) 3763 for _, e := range listTestEnv() { 3764 testMetadataStreamingRPC(t, e) 3765 } 3766} 3767 3768func testMetadataStreamingRPC(t *testing.T, e env) { 3769 te := newTest(t, e) 3770 te.startServer(&testServer{security: e.security}) 3771 defer te.tearDown() 3772 tc := testpb.NewTestServiceClient(te.clientConn()) 3773 3774 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata) 3775 stream, err := tc.FullDuplexCall(ctx) 3776 if err != nil { 3777 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3778 } 3779 go func() { 3780 headerMD, err := stream.Header() 3781 if e.security == "tls" { 3782 delete(headerMD, "transport_security_type") 3783 } 3784 delete(headerMD, "trailer") // ignore if present 3785 delete(headerMD, "user-agent") 3786 delete(headerMD, "content-type") 3787 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { 3788 t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata) 3789 } 3790 // test the cached value. 3791 headerMD, err = stream.Header() 3792 delete(headerMD, "trailer") // ignore if present 3793 delete(headerMD, "user-agent") 3794 delete(headerMD, "content-type") 3795 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { 3796 t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata) 3797 } 3798 err = func() error { 3799 for index := 0; index < len(reqSizes); index++ { 3800 respParam := []*testpb.ResponseParameters{ 3801 { 3802 Size: int32(respSizes[index]), 3803 }, 3804 } 3805 3806 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) 3807 if err != nil { 3808 return err 3809 } 3810 3811 req := &testpb.StreamingOutputCallRequest{ 3812 ResponseType: testpb.PayloadType_COMPRESSABLE, 3813 ResponseParameters: respParam, 3814 Payload: payload, 3815 } 3816 if err := stream.Send(req); err != nil { 3817 return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3818 } 3819 } 3820 return nil 3821 }() 3822 // Tell the server we're done sending args. 3823 stream.CloseSend() 3824 if err != nil { 3825 t.Error(err) 3826 } 3827 }() 3828 for { 3829 if _, err := stream.Recv(); err != nil { 3830 break 3831 } 3832 } 3833 trailerMD := stream.Trailer() 3834 if !reflect.DeepEqual(testTrailerMetadata, trailerMD) { 3835 t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata) 3836 } 3837} 3838 3839func TestServerStreaming(t *testing.T) { 3840 defer leakcheck.Check(t) 3841 for _, e := range listTestEnv() { 3842 testServerStreaming(t, e) 3843 } 3844} 3845 3846func testServerStreaming(t *testing.T, e env) { 3847 te := newTest(t, e) 3848 te.startServer(&testServer{security: e.security}) 3849 defer te.tearDown() 3850 tc := testpb.NewTestServiceClient(te.clientConn()) 3851 3852 respParam := make([]*testpb.ResponseParameters, len(respSizes)) 3853 for i, s := range respSizes { 3854 respParam[i] = &testpb.ResponseParameters{ 3855 Size: int32(s), 3856 } 3857 } 3858 req := &testpb.StreamingOutputCallRequest{ 3859 ResponseType: testpb.PayloadType_COMPRESSABLE, 3860 ResponseParameters: respParam, 3861 } 3862 stream, err := tc.StreamingOutputCall(context.Background(), req) 3863 if err != nil { 3864 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) 3865 } 3866 var rpcStatus error 3867 var respCnt int 3868 var index int 3869 for { 3870 reply, err := stream.Recv() 3871 if err != nil { 3872 rpcStatus = err 3873 break 3874 } 3875 pt := reply.GetPayload().GetType() 3876 if pt != testpb.PayloadType_COMPRESSABLE { 3877 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) 3878 } 3879 size := len(reply.GetPayload().GetBody()) 3880 if size != int(respSizes[index]) { 3881 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) 3882 } 3883 index++ 3884 respCnt++ 3885 } 3886 if rpcStatus != io.EOF { 3887 t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus) 3888 } 3889 if respCnt != len(respSizes) { 3890 t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt) 3891 } 3892} 3893 3894func TestFailedServerStreaming(t *testing.T) { 3895 defer leakcheck.Check(t) 3896 for _, e := range listTestEnv() { 3897 testFailedServerStreaming(t, e) 3898 } 3899} 3900 3901func testFailedServerStreaming(t *testing.T, e env) { 3902 te := newTest(t, e) 3903 te.userAgent = failAppUA 3904 te.startServer(&testServer{security: e.security}) 3905 defer te.tearDown() 3906 tc := testpb.NewTestServiceClient(te.clientConn()) 3907 3908 respParam := make([]*testpb.ResponseParameters, len(respSizes)) 3909 for i, s := range respSizes { 3910 respParam[i] = &testpb.ResponseParameters{ 3911 Size: int32(s), 3912 } 3913 } 3914 req := &testpb.StreamingOutputCallRequest{ 3915 ResponseType: testpb.PayloadType_COMPRESSABLE, 3916 ResponseParameters: respParam, 3917 } 3918 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata) 3919 stream, err := tc.StreamingOutputCall(ctx, req) 3920 if err != nil { 3921 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) 3922 } 3923 wantErr := status.Error(codes.DataLoss, "error for testing: "+failAppUA) 3924 if _, err := stream.Recv(); !reflect.DeepEqual(err, wantErr) { 3925 t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr) 3926 } 3927} 3928 3929// concurrentSendServer is a TestServiceServer whose 3930// StreamingOutputCall makes ten serial Send calls, sending payloads 3931// "0".."9", inclusive. TestServerStreamingConcurrent verifies they 3932// were received in the correct order, and that there were no races. 3933// 3934// All other TestServiceServer methods crash if called. 3935type concurrentSendServer struct { 3936 testpb.TestServiceServer 3937} 3938 3939func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { 3940 for i := 0; i < 10; i++ { 3941 stream.Send(&testpb.StreamingOutputCallResponse{ 3942 Payload: &testpb.Payload{ 3943 Body: []byte{'0' + uint8(i)}, 3944 }, 3945 }) 3946 } 3947 return nil 3948} 3949 3950// Tests doing a bunch of concurrent streaming output calls. 3951func TestServerStreamingConcurrent(t *testing.T) { 3952 defer leakcheck.Check(t) 3953 for _, e := range listTestEnv() { 3954 testServerStreamingConcurrent(t, e) 3955 } 3956} 3957 3958func testServerStreamingConcurrent(t *testing.T, e env) { 3959 te := newTest(t, e) 3960 te.startServer(concurrentSendServer{}) 3961 defer te.tearDown() 3962 3963 cc := te.clientConn() 3964 tc := testpb.NewTestServiceClient(cc) 3965 3966 doStreamingCall := func() { 3967 req := &testpb.StreamingOutputCallRequest{} 3968 stream, err := tc.StreamingOutputCall(context.Background(), req) 3969 if err != nil { 3970 t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) 3971 return 3972 } 3973 var ngot int 3974 var buf bytes.Buffer 3975 for { 3976 reply, err := stream.Recv() 3977 if err == io.EOF { 3978 break 3979 } 3980 if err != nil { 3981 t.Fatal(err) 3982 } 3983 ngot++ 3984 if buf.Len() > 0 { 3985 buf.WriteByte(',') 3986 } 3987 buf.Write(reply.GetPayload().GetBody()) 3988 } 3989 if want := 10; ngot != want { 3990 t.Errorf("Got %d replies, want %d", ngot, want) 3991 } 3992 if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want { 3993 t.Errorf("Got replies %q; want %q", got, want) 3994 } 3995 } 3996 3997 var wg sync.WaitGroup 3998 for i := 0; i < 20; i++ { 3999 wg.Add(1) 4000 go func() { 4001 defer wg.Done() 4002 doStreamingCall() 4003 }() 4004 } 4005 wg.Wait() 4006 4007} 4008 4009func generatePayloadSizes() [][]int { 4010 reqSizes := [][]int{ 4011 {27182, 8, 1828, 45904}, 4012 } 4013 4014 num8KPayloads := 1024 4015 eightKPayloads := []int{} 4016 for i := 0; i < num8KPayloads; i++ { 4017 eightKPayloads = append(eightKPayloads, (1 << 13)) 4018 } 4019 reqSizes = append(reqSizes, eightKPayloads) 4020 4021 num2MPayloads := 8 4022 twoMPayloads := []int{} 4023 for i := 0; i < num2MPayloads; i++ { 4024 twoMPayloads = append(twoMPayloads, (1 << 21)) 4025 } 4026 reqSizes = append(reqSizes, twoMPayloads) 4027 4028 return reqSizes 4029} 4030 4031func TestClientStreaming(t *testing.T) { 4032 defer leakcheck.Check(t) 4033 for _, s := range generatePayloadSizes() { 4034 for _, e := range listTestEnv() { 4035 testClientStreaming(t, e, s) 4036 } 4037 } 4038} 4039 4040func testClientStreaming(t *testing.T, e env, sizes []int) { 4041 te := newTest(t, e) 4042 te.startServer(&testServer{security: e.security}) 4043 defer te.tearDown() 4044 tc := testpb.NewTestServiceClient(te.clientConn()) 4045 4046 ctx, cancel := context.WithTimeout(te.ctx, time.Second*30) 4047 defer cancel() 4048 stream, err := tc.StreamingInputCall(ctx) 4049 if err != nil { 4050 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err) 4051 } 4052 4053 var sum int 4054 for _, s := range sizes { 4055 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s)) 4056 if err != nil { 4057 t.Fatal(err) 4058 } 4059 4060 req := &testpb.StreamingInputCallRequest{ 4061 Payload: payload, 4062 } 4063 if err := stream.Send(req); err != nil { 4064 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err) 4065 } 4066 sum += s 4067 } 4068 reply, err := stream.CloseAndRecv() 4069 if err != nil { 4070 t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) 4071 } 4072 if reply.GetAggregatedPayloadSize() != int32(sum) { 4073 t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum) 4074 } 4075} 4076 4077func TestClientStreamingError(t *testing.T) { 4078 defer leakcheck.Check(t) 4079 for _, e := range listTestEnv() { 4080 if e.name == "handler-tls" { 4081 continue 4082 } 4083 testClientStreamingError(t, e) 4084 } 4085} 4086 4087func testClientStreamingError(t *testing.T, e env) { 4088 te := newTest(t, e) 4089 te.startServer(&testServer{security: e.security, earlyFail: true}) 4090 defer te.tearDown() 4091 tc := testpb.NewTestServiceClient(te.clientConn()) 4092 4093 stream, err := tc.StreamingInputCall(te.ctx) 4094 if err != nil { 4095 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err) 4096 } 4097 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1) 4098 if err != nil { 4099 t.Fatal(err) 4100 } 4101 4102 req := &testpb.StreamingInputCallRequest{ 4103 Payload: payload, 4104 } 4105 // The 1st request should go through. 4106 if err := stream.Send(req); err != nil { 4107 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 4108 } 4109 for { 4110 if err := stream.Send(req); err != io.EOF { 4111 continue 4112 } 4113 if _, err := stream.CloseAndRecv(); status.Code(err) != codes.NotFound { 4114 t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound) 4115 } 4116 break 4117 } 4118} 4119 4120func TestExceedMaxStreamsLimit(t *testing.T) { 4121 defer leakcheck.Check(t) 4122 for _, e := range listTestEnv() { 4123 testExceedMaxStreamsLimit(t, e) 4124 } 4125} 4126 4127func testExceedMaxStreamsLimit(t *testing.T, e env) { 4128 te := newTest(t, e) 4129 te.declareLogNoise( 4130 "http2Client.notifyError got notified that the client transport was broken", 4131 "Conn.resetTransport failed to create client transport", 4132 "grpc: the connection is closing", 4133 ) 4134 te.maxStream = 1 // Only allows 1 live stream per server transport. 4135 te.startServer(&testServer{security: e.security}) 4136 defer te.tearDown() 4137 4138 cc := te.clientConn() 4139 tc := testpb.NewTestServiceClient(cc) 4140 4141 _, err := tc.StreamingInputCall(te.ctx) 4142 if err != nil { 4143 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) 4144 } 4145 // Loop until receiving the new max stream setting from the server. 4146 for { 4147 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 4148 defer cancel() 4149 _, err := tc.StreamingInputCall(ctx) 4150 if err == nil { 4151 time.Sleep(50 * time.Millisecond) 4152 continue 4153 } 4154 if status.Code(err) == codes.DeadlineExceeded { 4155 break 4156 } 4157 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) 4158 } 4159} 4160 4161func TestStreamsQuotaRecovery(t *testing.T) { 4162 defer leakcheck.Check(t) 4163 for _, e := range listTestEnv() { 4164 testStreamsQuotaRecovery(t, e) 4165 } 4166} 4167 4168func testStreamsQuotaRecovery(t *testing.T, e env) { 4169 te := newTest(t, e) 4170 te.declareLogNoise( 4171 "http2Client.notifyError got notified that the client transport was broken", 4172 "Conn.resetTransport failed to create client transport", 4173 "grpc: the connection is closing", 4174 ) 4175 te.maxStream = 1 // Allows 1 live stream. 4176 te.startServer(&testServer{security: e.security}) 4177 defer te.tearDown() 4178 4179 cc := te.clientConn() 4180 tc := testpb.NewTestServiceClient(cc) 4181 ctx, cancel := context.WithCancel(context.Background()) 4182 defer cancel() 4183 if _, err := tc.StreamingInputCall(ctx); err != nil { 4184 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, <nil>", err) 4185 } 4186 // Loop until the new max stream setting is effective. 4187 for { 4188 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 4189 _, err := tc.StreamingInputCall(ctx) 4190 cancel() 4191 if err == nil { 4192 time.Sleep(5 * time.Millisecond) 4193 continue 4194 } 4195 if status.Code(err) == codes.DeadlineExceeded { 4196 break 4197 } 4198 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %s", err, codes.DeadlineExceeded) 4199 } 4200 4201 var wg sync.WaitGroup 4202 for i := 0; i < 10; i++ { 4203 wg.Add(1) 4204 go func() { 4205 defer wg.Done() 4206 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314) 4207 if err != nil { 4208 t.Error(err) 4209 return 4210 } 4211 req := &testpb.SimpleRequest{ 4212 ResponseType: testpb.PayloadType_COMPRESSABLE, 4213 ResponseSize: 1592, 4214 Payload: payload, 4215 } 4216 // No rpc should go through due to the max streams limit. 4217 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 4218 defer cancel() 4219 if _, err := tc.UnaryCall(ctx, req, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 4220 t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 4221 } 4222 }() 4223 } 4224 wg.Wait() 4225 4226 cancel() 4227 // A new stream should be allowed after canceling the first one. 4228 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 4229 defer cancel() 4230 if _, err := tc.StreamingInputCall(ctx); err != nil { 4231 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %v", err, nil) 4232 } 4233} 4234 4235func TestCompressServerHasNoSupport(t *testing.T) { 4236 defer leakcheck.Check(t) 4237 for _, e := range listTestEnv() { 4238 testCompressServerHasNoSupport(t, e) 4239 } 4240} 4241 4242func testCompressServerHasNoSupport(t *testing.T, e env) { 4243 te := newTest(t, e) 4244 te.serverCompression = false 4245 te.clientCompression = false 4246 te.clientNopCompression = true 4247 te.startServer(&testServer{security: e.security}) 4248 defer te.tearDown() 4249 tc := testpb.NewTestServiceClient(te.clientConn()) 4250 4251 const argSize = 271828 4252 const respSize = 314159 4253 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 4254 if err != nil { 4255 t.Fatal(err) 4256 } 4257 req := &testpb.SimpleRequest{ 4258 ResponseType: testpb.PayloadType_COMPRESSABLE, 4259 ResponseSize: respSize, 4260 Payload: payload, 4261 } 4262 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.Unimplemented { 4263 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented) 4264 } 4265 // Streaming RPC 4266 stream, err := tc.FullDuplexCall(context.Background()) 4267 if err != nil { 4268 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4269 } 4270 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Unimplemented { 4271 t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented) 4272 } 4273} 4274 4275func TestCompressOK(t *testing.T) { 4276 defer leakcheck.Check(t) 4277 for _, e := range listTestEnv() { 4278 testCompressOK(t, e) 4279 } 4280} 4281 4282func testCompressOK(t *testing.T, e env) { 4283 te := newTest(t, e) 4284 te.serverCompression = true 4285 te.clientCompression = true 4286 te.startServer(&testServer{security: e.security}) 4287 defer te.tearDown() 4288 tc := testpb.NewTestServiceClient(te.clientConn()) 4289 4290 // Unary call 4291 const argSize = 271828 4292 const respSize = 314159 4293 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 4294 if err != nil { 4295 t.Fatal(err) 4296 } 4297 req := &testpb.SimpleRequest{ 4298 ResponseType: testpb.PayloadType_COMPRESSABLE, 4299 ResponseSize: respSize, 4300 Payload: payload, 4301 } 4302 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) 4303 if _, err := tc.UnaryCall(ctx, req); err != nil { 4304 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 4305 } 4306 // Streaming RPC 4307 ctx, cancel := context.WithCancel(context.Background()) 4308 defer cancel() 4309 stream, err := tc.FullDuplexCall(ctx) 4310 if err != nil { 4311 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4312 } 4313 respParam := []*testpb.ResponseParameters{ 4314 { 4315 Size: 31415, 4316 }, 4317 } 4318 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) 4319 if err != nil { 4320 t.Fatal(err) 4321 } 4322 sreq := &testpb.StreamingOutputCallRequest{ 4323 ResponseType: testpb.PayloadType_COMPRESSABLE, 4324 ResponseParameters: respParam, 4325 Payload: payload, 4326 } 4327 if err := stream.Send(sreq); err != nil { 4328 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 4329 } 4330 stream.CloseSend() 4331 if _, err := stream.Recv(); err != nil { 4332 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 4333 } 4334 if _, err := stream.Recv(); err != io.EOF { 4335 t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err) 4336 } 4337} 4338 4339func TestIdentityEncoding(t *testing.T) { 4340 defer leakcheck.Check(t) 4341 for _, e := range listTestEnv() { 4342 testIdentityEncoding(t, e) 4343 } 4344} 4345 4346func testIdentityEncoding(t *testing.T, e env) { 4347 te := newTest(t, e) 4348 te.startServer(&testServer{security: e.security}) 4349 defer te.tearDown() 4350 tc := testpb.NewTestServiceClient(te.clientConn()) 4351 4352 // Unary call 4353 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 5) 4354 if err != nil { 4355 t.Fatal(err) 4356 } 4357 req := &testpb.SimpleRequest{ 4358 ResponseType: testpb.PayloadType_COMPRESSABLE, 4359 ResponseSize: 10, 4360 Payload: payload, 4361 } 4362 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) 4363 if _, err := tc.UnaryCall(ctx, req); err != nil { 4364 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 4365 } 4366 // Streaming RPC 4367 ctx, cancel := context.WithCancel(context.Background()) 4368 defer cancel() 4369 stream, err := tc.FullDuplexCall(ctx, grpc.UseCompressor("identity")) 4370 if err != nil { 4371 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4372 } 4373 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) 4374 if err != nil { 4375 t.Fatal(err) 4376 } 4377 sreq := &testpb.StreamingOutputCallRequest{ 4378 ResponseType: testpb.PayloadType_COMPRESSABLE, 4379 ResponseParameters: []*testpb.ResponseParameters{{Size: 10}}, 4380 Payload: payload, 4381 } 4382 if err := stream.Send(sreq); err != nil { 4383 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 4384 } 4385 stream.CloseSend() 4386 if _, err := stream.Recv(); err != nil { 4387 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 4388 } 4389 if _, err := stream.Recv(); err != io.EOF { 4390 t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err) 4391 } 4392} 4393 4394func TestUnaryClientInterceptor(t *testing.T) { 4395 defer leakcheck.Check(t) 4396 for _, e := range listTestEnv() { 4397 testUnaryClientInterceptor(t, e) 4398 } 4399} 4400 4401func failOkayRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 4402 err := invoker(ctx, method, req, reply, cc, opts...) 4403 if err == nil { 4404 return status.Error(codes.NotFound, "") 4405 } 4406 return err 4407} 4408 4409func testUnaryClientInterceptor(t *testing.T, e env) { 4410 te := newTest(t, e) 4411 te.userAgent = testAppUA 4412 te.unaryClientInt = failOkayRPC 4413 te.startServer(&testServer{security: e.security}) 4414 defer te.tearDown() 4415 4416 tc := testpb.NewTestServiceClient(te.clientConn()) 4417 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.NotFound { 4418 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound) 4419 } 4420} 4421 4422func TestStreamClientInterceptor(t *testing.T) { 4423 defer leakcheck.Check(t) 4424 for _, e := range listTestEnv() { 4425 testStreamClientInterceptor(t, e) 4426 } 4427} 4428 4429func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 4430 s, err := streamer(ctx, desc, cc, method, opts...) 4431 if err == nil { 4432 return nil, status.Error(codes.NotFound, "") 4433 } 4434 return s, nil 4435} 4436 4437func testStreamClientInterceptor(t *testing.T, e env) { 4438 te := newTest(t, e) 4439 te.streamClientInt = failOkayStream 4440 te.startServer(&testServer{security: e.security}) 4441 defer te.tearDown() 4442 4443 tc := testpb.NewTestServiceClient(te.clientConn()) 4444 respParam := []*testpb.ResponseParameters{ 4445 { 4446 Size: int32(1), 4447 }, 4448 } 4449 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1)) 4450 if err != nil { 4451 t.Fatal(err) 4452 } 4453 req := &testpb.StreamingOutputCallRequest{ 4454 ResponseType: testpb.PayloadType_COMPRESSABLE, 4455 ResponseParameters: respParam, 4456 Payload: payload, 4457 } 4458 if _, err := tc.StreamingOutputCall(context.Background(), req); status.Code(err) != codes.NotFound { 4459 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound) 4460 } 4461} 4462 4463func TestUnaryServerInterceptor(t *testing.T) { 4464 defer leakcheck.Check(t) 4465 for _, e := range listTestEnv() { 4466 testUnaryServerInterceptor(t, e) 4467 } 4468} 4469 4470func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { 4471 return nil, status.Error(codes.PermissionDenied, "") 4472} 4473 4474func testUnaryServerInterceptor(t *testing.T, e env) { 4475 te := newTest(t, e) 4476 te.unaryServerInt = errInjector 4477 te.startServer(&testServer{security: e.security}) 4478 defer te.tearDown() 4479 4480 tc := testpb.NewTestServiceClient(te.clientConn()) 4481 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.PermissionDenied { 4482 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) 4483 } 4484} 4485 4486func TestStreamServerInterceptor(t *testing.T) { 4487 defer leakcheck.Check(t) 4488 for _, e := range listTestEnv() { 4489 // TODO(bradfitz): Temporarily skip this env due to #619. 4490 if e.name == "handler-tls" { 4491 continue 4492 } 4493 testStreamServerInterceptor(t, e) 4494 } 4495} 4496 4497func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 4498 if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" { 4499 return handler(srv, ss) 4500 } 4501 // Reject the other methods. 4502 return status.Error(codes.PermissionDenied, "") 4503} 4504 4505func testStreamServerInterceptor(t *testing.T, e env) { 4506 te := newTest(t, e) 4507 te.streamServerInt = fullDuplexOnly 4508 te.startServer(&testServer{security: e.security}) 4509 defer te.tearDown() 4510 4511 tc := testpb.NewTestServiceClient(te.clientConn()) 4512 respParam := []*testpb.ResponseParameters{ 4513 { 4514 Size: int32(1), 4515 }, 4516 } 4517 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1)) 4518 if err != nil { 4519 t.Fatal(err) 4520 } 4521 req := &testpb.StreamingOutputCallRequest{ 4522 ResponseType: testpb.PayloadType_COMPRESSABLE, 4523 ResponseParameters: respParam, 4524 Payload: payload, 4525 } 4526 s1, err := tc.StreamingOutputCall(context.Background(), req) 4527 if err != nil { 4528 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err) 4529 } 4530 if _, err := s1.Recv(); status.Code(err) != codes.PermissionDenied { 4531 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) 4532 } 4533 s2, err := tc.FullDuplexCall(context.Background()) 4534 if err != nil { 4535 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4536 } 4537 if err := s2.Send(req); err != nil { 4538 t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err) 4539 } 4540 if _, err := s2.Recv(); err != nil { 4541 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err) 4542 } 4543} 4544 4545// funcServer implements methods of TestServiceServer using funcs, 4546// similar to an http.HandlerFunc. 4547// Any unimplemented method will crash. Tests implement the method(s) 4548// they need. 4549type funcServer struct { 4550 testpb.TestServiceServer 4551 unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) 4552 streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error 4553 fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error 4554} 4555 4556func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4557 return s.unaryCall(ctx, in) 4558} 4559 4560func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error { 4561 return s.streamingInputCall(stream) 4562} 4563 4564func (s *funcServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 4565 return s.fullDuplexCall(stream) 4566} 4567 4568func TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) { 4569 defer leakcheck.Check(t) 4570 for _, e := range listTestEnv() { 4571 testClientRequestBodyErrorUnexpectedEOF(t, e) 4572 } 4573} 4574 4575func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) { 4576 te := newTest(t, e) 4577 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4578 errUnexpectedCall := errors.New("unexpected call func server method") 4579 t.Error(errUnexpectedCall) 4580 return nil, errUnexpectedCall 4581 }} 4582 te.startServer(ts) 4583 defer te.tearDown() 4584 te.withServerTester(func(st *serverTester) { 4585 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") 4586 // Say we have 5 bytes coming, but set END_STREAM flag: 4587 st.writeData(1, true, []byte{0, 0, 0, 0, 5}) 4588 st.wantAnyFrame() // wait for server to crash (it used to crash) 4589 }) 4590} 4591 4592func TestClientRequestBodyErrorCloseAfterLength(t *testing.T) { 4593 defer leakcheck.Check(t) 4594 for _, e := range listTestEnv() { 4595 testClientRequestBodyErrorCloseAfterLength(t, e) 4596 } 4597} 4598 4599func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) { 4600 te := newTest(t, e) 4601 te.declareLogNoise("Server.processUnaryRPC failed to write status") 4602 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4603 errUnexpectedCall := errors.New("unexpected call func server method") 4604 t.Error(errUnexpectedCall) 4605 return nil, errUnexpectedCall 4606 }} 4607 te.startServer(ts) 4608 defer te.tearDown() 4609 te.withServerTester(func(st *serverTester) { 4610 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") 4611 // say we're sending 5 bytes, but then close the connection instead. 4612 st.writeData(1, false, []byte{0, 0, 0, 0, 5}) 4613 st.cc.Close() 4614 }) 4615} 4616 4617func TestClientRequestBodyErrorCancel(t *testing.T) { 4618 defer leakcheck.Check(t) 4619 for _, e := range listTestEnv() { 4620 testClientRequestBodyErrorCancel(t, e) 4621 } 4622} 4623 4624func testClientRequestBodyErrorCancel(t *testing.T, e env) { 4625 te := newTest(t, e) 4626 gotCall := make(chan bool, 1) 4627 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4628 gotCall <- true 4629 return new(testpb.SimpleResponse), nil 4630 }} 4631 te.startServer(ts) 4632 defer te.tearDown() 4633 te.withServerTester(func(st *serverTester) { 4634 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") 4635 // Say we have 5 bytes coming, but cancel it instead. 4636 st.writeRSTStream(1, http2.ErrCodeCancel) 4637 st.writeData(1, false, []byte{0, 0, 0, 0, 5}) 4638 4639 // Verify we didn't a call yet. 4640 select { 4641 case <-gotCall: 4642 t.Fatal("unexpected call") 4643 default: 4644 } 4645 4646 // And now send an uncanceled (but still invalid), just to get a response. 4647 st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall") 4648 st.writeData(3, true, []byte{0, 0, 0, 0, 0}) 4649 <-gotCall 4650 st.wantAnyFrame() 4651 }) 4652} 4653 4654func TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) { 4655 defer leakcheck.Check(t) 4656 for _, e := range listTestEnv() { 4657 testClientRequestBodyErrorCancelStreamingInput(t, e) 4658 } 4659} 4660 4661func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) { 4662 te := newTest(t, e) 4663 recvErr := make(chan error, 1) 4664 ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { 4665 _, err := stream.Recv() 4666 recvErr <- err 4667 return nil 4668 }} 4669 te.startServer(ts) 4670 defer te.tearDown() 4671 te.withServerTester(func(st *serverTester) { 4672 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall") 4673 // Say we have 5 bytes coming, but cancel it instead. 4674 st.writeData(1, false, []byte{0, 0, 0, 0, 5}) 4675 st.writeRSTStream(1, http2.ErrCodeCancel) 4676 4677 var got error 4678 select { 4679 case got = <-recvErr: 4680 case <-time.After(3 * time.Second): 4681 t.Fatal("timeout waiting for error") 4682 } 4683 if grpc.Code(got) != codes.Canceled { 4684 t.Errorf("error = %#v; want error code %s", got, codes.Canceled) 4685 } 4686 }) 4687} 4688 4689func TestClientResourceExhaustedCancelFullDuplex(t *testing.T) { 4690 defer leakcheck.Check(t) 4691 for _, e := range listTestEnv() { 4692 if e.httpHandler { 4693 // httpHandler write won't be blocked on flow control window. 4694 continue 4695 } 4696 testClientResourceExhaustedCancelFullDuplex(t, e) 4697 } 4698} 4699 4700func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) { 4701 te := newTest(t, e) 4702 recvErr := make(chan error, 1) 4703 ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 4704 defer close(recvErr) 4705 _, err := stream.Recv() 4706 if err != nil { 4707 return status.Errorf(codes.Internal, "stream.Recv() got error: %v, want <nil>", err) 4708 } 4709 // create a payload that's larger than the default flow control window. 4710 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10) 4711 if err != nil { 4712 return err 4713 } 4714 resp := &testpb.StreamingOutputCallResponse{ 4715 Payload: payload, 4716 } 4717 ce := make(chan error) 4718 go func() { 4719 var err error 4720 for { 4721 if err = stream.Send(resp); err != nil { 4722 break 4723 } 4724 } 4725 ce <- err 4726 }() 4727 select { 4728 case err = <-ce: 4729 case <-time.After(10 * time.Second): 4730 err = errors.New("10s timeout reached") 4731 } 4732 recvErr <- err 4733 return err 4734 }} 4735 te.startServer(ts) 4736 defer te.tearDown() 4737 // set a low limit on receive message size to error with Resource Exhausted on 4738 // client side when server send a large message. 4739 te.maxClientReceiveMsgSize = newInt(10) 4740 cc := te.clientConn() 4741 tc := testpb.NewTestServiceClient(cc) 4742 stream, err := tc.FullDuplexCall(context.Background()) 4743 if err != nil { 4744 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4745 } 4746 req := &testpb.StreamingOutputCallRequest{} 4747 if err := stream.Send(req); err != nil { 4748 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 4749 } 4750 if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted { 4751 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 4752 } 4753 err = <-recvErr 4754 if status.Code(err) != codes.Canceled { 4755 t.Fatalf("server got error %v, want error code: %s", err, codes.Canceled) 4756 } 4757} 4758 4759type clientTimeoutCreds struct { 4760 timeoutReturned bool 4761} 4762 4763func (c *clientTimeoutCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4764 if !c.timeoutReturned { 4765 c.timeoutReturned = true 4766 return nil, nil, context.DeadlineExceeded 4767 } 4768 return rawConn, nil, nil 4769} 4770func (c *clientTimeoutCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4771 return rawConn, nil, nil 4772} 4773func (c *clientTimeoutCreds) Info() credentials.ProtocolInfo { 4774 return credentials.ProtocolInfo{} 4775} 4776func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials { 4777 return nil 4778} 4779func (c *clientTimeoutCreds) OverrideServerName(s string) error { 4780 return nil 4781} 4782 4783func TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) { 4784 te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds", balancer: "v1"}) 4785 te.userAgent = testAppUA 4786 te.startServer(&testServer{security: te.e.security}) 4787 defer te.tearDown() 4788 4789 cc := te.clientConn() 4790 tc := testpb.NewTestServiceClient(cc) 4791 // This unary call should succeed, because ClientHandshake will succeed for the second time. 4792 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { 4793 te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <nil>", err) 4794 } 4795} 4796 4797type serverDispatchCred struct { 4798 rawConnCh chan net.Conn 4799} 4800 4801func newServerDispatchCred() *serverDispatchCred { 4802 return &serverDispatchCred{ 4803 rawConnCh: make(chan net.Conn, 1), 4804 } 4805} 4806func (c *serverDispatchCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4807 return rawConn, nil, nil 4808} 4809func (c *serverDispatchCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4810 select { 4811 case c.rawConnCh <- rawConn: 4812 default: 4813 } 4814 return nil, nil, credentials.ErrConnDispatched 4815} 4816func (c *serverDispatchCred) Info() credentials.ProtocolInfo { 4817 return credentials.ProtocolInfo{} 4818} 4819func (c *serverDispatchCred) Clone() credentials.TransportCredentials { 4820 return nil 4821} 4822func (c *serverDispatchCred) OverrideServerName(s string) error { 4823 return nil 4824} 4825func (c *serverDispatchCred) getRawConn() net.Conn { 4826 return <-c.rawConnCh 4827} 4828 4829func TestServerCredsDispatch(t *testing.T) { 4830 lis, err := net.Listen("tcp", "localhost:0") 4831 if err != nil { 4832 t.Fatalf("Failed to listen: %v", err) 4833 } 4834 cred := newServerDispatchCred() 4835 s := grpc.NewServer(grpc.Creds(cred)) 4836 go s.Serve(lis) 4837 defer s.Stop() 4838 4839 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(cred)) 4840 if err != nil { 4841 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) 4842 } 4843 defer cc.Close() 4844 4845 rawConn := cred.getRawConn() 4846 // Give grpc a chance to see the error and potentially close the connection. 4847 // And check that connection is not closed after that. 4848 time.Sleep(100 * time.Millisecond) 4849 // Check rawConn is not closed. 4850 if n, err := rawConn.Write([]byte{0}); n <= 0 || err != nil { 4851 t.Errorf("Read() = %v, %v; want n>0, <nil>", n, err) 4852 } 4853} 4854 4855type authorityCheckCreds struct { 4856 got string 4857} 4858 4859func (c *authorityCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4860 return rawConn, nil, nil 4861} 4862func (c *authorityCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4863 c.got = authority 4864 return rawConn, nil, nil 4865} 4866func (c *authorityCheckCreds) Info() credentials.ProtocolInfo { 4867 return credentials.ProtocolInfo{} 4868} 4869func (c *authorityCheckCreds) Clone() credentials.TransportCredentials { 4870 return c 4871} 4872func (c *authorityCheckCreds) OverrideServerName(s string) error { 4873 return nil 4874} 4875 4876// This test makes sure that the authority client handshake gets is the endpoint 4877// in dial target, not the resolved ip address. 4878func TestCredsHandshakeAuthority(t *testing.T) { 4879 const testAuthority = "test.auth.ori.ty" 4880 4881 lis, err := net.Listen("tcp", "localhost:0") 4882 if err != nil { 4883 t.Fatalf("Failed to listen: %v", err) 4884 } 4885 cred := &authorityCheckCreds{} 4886 s := grpc.NewServer() 4887 go s.Serve(lis) 4888 defer s.Stop() 4889 4890 r, rcleanup := manual.GenerateAndRegisterManualResolver() 4891 defer rcleanup() 4892 4893 cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred)) 4894 if err != nil { 4895 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) 4896 } 4897 defer cc.Close() 4898 r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}}) 4899 4900 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) 4901 defer cancel() 4902 for { 4903 s := cc.GetState() 4904 if s == connectivity.Ready { 4905 break 4906 } 4907 if !cc.WaitForStateChange(ctx, s) { 4908 // ctx got timeout or canceled. 4909 t.Fatalf("ClientConn is not ready after 100 ms") 4910 } 4911 } 4912 4913 if cred.got != testAuthority { 4914 t.Fatalf("client creds got authority: %q, want: %q", cred.got, testAuthority) 4915 } 4916} 4917 4918type clientFailCreds struct{} 4919 4920func (c *clientFailCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4921 return rawConn, nil, nil 4922} 4923func (c *clientFailCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4924 return nil, nil, fmt.Errorf("client handshake fails with fatal error") 4925} 4926func (c *clientFailCreds) Info() credentials.ProtocolInfo { 4927 return credentials.ProtocolInfo{} 4928} 4929func (c *clientFailCreds) Clone() credentials.TransportCredentials { 4930 return c 4931} 4932func (c *clientFailCreds) OverrideServerName(s string) error { 4933 return nil 4934} 4935 4936// This test makes sure that failfast RPCs fail if client handshake fails with 4937// fatal errors. 4938func TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) { 4939 lis, err := net.Listen("tcp", "localhost:0") 4940 if err != nil { 4941 t.Fatalf("Failed to listen: %v", err) 4942 } 4943 defer lis.Close() 4944 4945 cc, err := grpc.Dial("passthrough:///"+lis.Addr().String(), grpc.WithTransportCredentials(&clientFailCreds{})) 4946 if err != nil { 4947 t.Fatalf("grpc.Dial(_) = %v", err) 4948 } 4949 defer cc.Close() 4950 4951 tc := testpb.NewTestServiceClient(cc) 4952 // This unary call should fail, but not timeout. 4953 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 4954 defer cancel() 4955 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(true)); status.Code(err) != codes.Unavailable { 4956 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <Unavailable>", err) 4957 } 4958} 4959 4960func TestFlowControlLogicalRace(t *testing.T) { 4961 // Test for a regression of https://github.com/grpc/grpc-go/issues/632, 4962 // and other flow control bugs. 4963 4964 defer leakcheck.Check(t) 4965 4966 const ( 4967 itemCount = 100 4968 itemSize = 1 << 10 4969 recvCount = 2 4970 maxFailures = 3 4971 4972 requestTimeout = time.Second * 5 4973 ) 4974 4975 requestCount := 10000 4976 if raceMode { 4977 requestCount = 1000 4978 } 4979 4980 lis, err := net.Listen("tcp", "localhost:0") 4981 if err != nil { 4982 t.Fatalf("Failed to listen: %v", err) 4983 } 4984 defer lis.Close() 4985 4986 s := grpc.NewServer() 4987 testpb.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{ 4988 itemCount: itemCount, 4989 itemSize: itemSize, 4990 }) 4991 defer s.Stop() 4992 4993 go s.Serve(lis) 4994 4995 ctx := context.Background() 4996 4997 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) 4998 if err != nil { 4999 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) 5000 } 5001 defer cc.Close() 5002 cl := testpb.NewTestServiceClient(cc) 5003 5004 failures := 0 5005 for i := 0; i < requestCount; i++ { 5006 ctx, cancel := context.WithTimeout(ctx, requestTimeout) 5007 output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{}) 5008 if err != nil { 5009 t.Fatalf("StreamingOutputCall; err = %q", err) 5010 } 5011 5012 j := 0 5013 loop: 5014 for ; j < recvCount; j++ { 5015 _, err := output.Recv() 5016 if err != nil { 5017 if err == io.EOF { 5018 break loop 5019 } 5020 switch status.Code(err) { 5021 case codes.DeadlineExceeded: 5022 break loop 5023 default: 5024 t.Fatalf("Recv; err = %q", err) 5025 } 5026 } 5027 } 5028 cancel() 5029 <-ctx.Done() 5030 5031 if j < recvCount { 5032 t.Errorf("got %d responses to request %d", j, i) 5033 failures++ 5034 if failures >= maxFailures { 5035 // Continue past the first failure to see if the connection is 5036 // entirely broken, or if only a single RPC was affected 5037 break 5038 } 5039 } 5040 } 5041} 5042 5043type flowControlLogicalRaceServer struct { 5044 testpb.TestServiceServer 5045 5046 itemSize int 5047 itemCount int 5048} 5049 5050func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testpb.TestService_StreamingOutputCallServer) error { 5051 for i := 0; i < s.itemCount; i++ { 5052 err := srv.Send(&testpb.StreamingOutputCallResponse{ 5053 Payload: &testpb.Payload{ 5054 // Sending a large stream of data which the client reject 5055 // helps to trigger some types of flow control bugs. 5056 // 5057 // Reallocating memory here is inefficient, but the stress it 5058 // puts on the GC leads to more frequent flow control 5059 // failures. The GC likely causes more variety in the 5060 // goroutine scheduling orders. 5061 Body: bytes.Repeat([]byte("a"), s.itemSize), 5062 }, 5063 }) 5064 if err != nil { 5065 return err 5066 } 5067 } 5068 return nil 5069} 5070 5071type lockingWriter struct { 5072 mu sync.Mutex 5073 w io.Writer 5074} 5075 5076func (lw *lockingWriter) Write(p []byte) (n int, err error) { 5077 lw.mu.Lock() 5078 defer lw.mu.Unlock() 5079 return lw.w.Write(p) 5080} 5081 5082func (lw *lockingWriter) setWriter(w io.Writer) { 5083 lw.mu.Lock() 5084 defer lw.mu.Unlock() 5085 lw.w = w 5086} 5087 5088var testLogOutput = &lockingWriter{w: os.Stderr} 5089 5090// awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to 5091// terminate, if they're still running. It spams logs with this 5092// message. We wait for it so our log filter is still 5093// active. Otherwise the "defer restore()" at the top of various test 5094// functions restores our log filter and then the goroutine spams. 5095func awaitNewConnLogOutput() { 5096 awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry") 5097} 5098 5099func awaitLogOutput(maxWait time.Duration, phrase string) { 5100 pb := []byte(phrase) 5101 5102 timer := time.NewTimer(maxWait) 5103 defer timer.Stop() 5104 wakeup := make(chan bool, 1) 5105 for { 5106 if logOutputHasContents(pb, wakeup) { 5107 return 5108 } 5109 select { 5110 case <-timer.C: 5111 // Too slow. Oh well. 5112 return 5113 case <-wakeup: 5114 } 5115 } 5116} 5117 5118func logOutputHasContents(v []byte, wakeup chan<- bool) bool { 5119 testLogOutput.mu.Lock() 5120 defer testLogOutput.mu.Unlock() 5121 fw, ok := testLogOutput.w.(*filterWriter) 5122 if !ok { 5123 return false 5124 } 5125 fw.mu.Lock() 5126 defer fw.mu.Unlock() 5127 if bytes.Contains(fw.buf.Bytes(), v) { 5128 return true 5129 } 5130 fw.wakeup = wakeup 5131 return false 5132} 5133 5134var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering") 5135 5136func noop() {} 5137 5138// declareLogNoise declares that t is expected to emit the following noisy phrases, 5139// even on success. Those phrases will be filtered from grpclog output 5140// and only be shown if *verbose_logs or t ends up failing. 5141// The returned restore function should be called with defer to be run 5142// before the test ends. 5143func declareLogNoise(t *testing.T, phrases ...string) (restore func()) { 5144 if *verboseLogs { 5145 return noop 5146 } 5147 fw := &filterWriter{dst: os.Stderr, filter: phrases} 5148 testLogOutput.setWriter(fw) 5149 return func() { 5150 if t.Failed() { 5151 fw.mu.Lock() 5152 defer fw.mu.Unlock() 5153 if fw.buf.Len() > 0 { 5154 t.Logf("Complete log output:\n%s", fw.buf.Bytes()) 5155 } 5156 } 5157 testLogOutput.setWriter(os.Stderr) 5158 } 5159} 5160 5161type filterWriter struct { 5162 dst io.Writer 5163 filter []string 5164 5165 mu sync.Mutex 5166 buf bytes.Buffer 5167 wakeup chan<- bool // if non-nil, gets true on write 5168} 5169 5170func (fw *filterWriter) Write(p []byte) (n int, err error) { 5171 fw.mu.Lock() 5172 fw.buf.Write(p) 5173 if fw.wakeup != nil { 5174 select { 5175 case fw.wakeup <- true: 5176 default: 5177 } 5178 } 5179 fw.mu.Unlock() 5180 5181 ps := string(p) 5182 for _, f := range fw.filter { 5183 if strings.Contains(ps, f) { 5184 return len(p), nil 5185 } 5186 } 5187 return fw.dst.Write(p) 5188} 5189 5190// stubServer is a server that is easy to customize within individual test 5191// cases. 5192type stubServer struct { 5193 // Guarantees we satisfy this interface; panics if unimplemented methods are called. 5194 testpb.TestServiceServer 5195 5196 // Customizable implementations of server handlers. 5197 emptyCall func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) 5198 fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error 5199 5200 // A client connected to this service the test may use. Created in Start(). 5201 client testpb.TestServiceClient 5202 cc *grpc.ClientConn 5203 5204 cleanups []func() // Lambdas executed in Stop(); populated by Start(). 5205 5206 r *manual.Resolver 5207} 5208 5209func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5210 return ss.emptyCall(ctx, in) 5211} 5212 5213func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 5214 return ss.fullDuplexCall(stream) 5215} 5216 5217// Start starts the server and creates a client connected to it. 5218func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { 5219 r, cleanup := manual.GenerateAndRegisterManualResolver() 5220 ss.r = r 5221 ss.cleanups = append(ss.cleanups, cleanup) 5222 5223 lis, err := net.Listen("tcp", "localhost:0") 5224 if err != nil { 5225 return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err) 5226 } 5227 ss.cleanups = append(ss.cleanups, func() { lis.Close() }) 5228 5229 s := grpc.NewServer(sopts...) 5230 testpb.RegisterTestServiceServer(s, ss) 5231 go s.Serve(lis) 5232 ss.cleanups = append(ss.cleanups, s.Stop) 5233 5234 target := ss.r.Scheme() + ":///" + lis.Addr().String() 5235 5236 opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...) 5237 cc, err := grpc.Dial(target, opts...) 5238 if err != nil { 5239 return fmt.Errorf("grpc.Dial(%q) = %v", target, err) 5240 } 5241 ss.cc = cc 5242 ss.r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}}) 5243 if err := ss.waitForReady(cc); err != nil { 5244 return err 5245 } 5246 5247 ss.cleanups = append(ss.cleanups, func() { cc.Close() }) 5248 5249 ss.client = testpb.NewTestServiceClient(cc) 5250 return nil 5251} 5252 5253func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error { 5254 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 5255 defer cancel() 5256 for { 5257 s := cc.GetState() 5258 if s == connectivity.Ready { 5259 return nil 5260 } 5261 if !cc.WaitForStateChange(ctx, s) { 5262 // ctx got timeout or canceled. 5263 return ctx.Err() 5264 } 5265 } 5266} 5267 5268func (ss *stubServer) Stop() { 5269 for i := len(ss.cleanups) - 1; i >= 0; i-- { 5270 ss.cleanups[i]() 5271 } 5272} 5273 5274func TestGRPCMethod(t *testing.T) { 5275 defer leakcheck.Check(t) 5276 var method string 5277 var ok bool 5278 5279 ss := &stubServer{ 5280 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5281 method, ok = grpc.Method(ctx) 5282 return &testpb.Empty{}, nil 5283 }, 5284 } 5285 if err := ss.Start(nil); err != nil { 5286 t.Fatalf("Error starting endpoint server: %v", err) 5287 } 5288 defer ss.Stop() 5289 5290 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5291 defer cancel() 5292 5293 if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { 5294 t.Fatalf("ss.client.EmptyCall(_, _) = _, %v; want _, nil", err) 5295 } 5296 5297 if want := "/grpc.testing.TestService/EmptyCall"; !ok || method != want { 5298 t.Fatalf("grpc.Method(_) = %q, %v; want %q, true", method, ok, want) 5299 } 5300} 5301 5302func TestUnaryProxyDoesNotForwardMetadata(t *testing.T) { 5303 const mdkey = "somedata" 5304 5305 // endpoint ensures mdkey is NOT in metadata and returns an error if it is. 5306 endpoint := &stubServer{ 5307 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5308 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil { 5309 return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) 5310 } 5311 return &testpb.Empty{}, nil 5312 }, 5313 } 5314 if err := endpoint.Start(nil); err != nil { 5315 t.Fatalf("Error starting endpoint server: %v", err) 5316 } 5317 defer endpoint.Stop() 5318 5319 // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint 5320 // without explicitly copying the metadata. 5321 proxy := &stubServer{ 5322 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5323 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil { 5324 return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey) 5325 } 5326 return endpoint.client.EmptyCall(ctx, in) 5327 }, 5328 } 5329 if err := proxy.Start(nil); err != nil { 5330 t.Fatalf("Error starting proxy server: %v", err) 5331 } 5332 defer proxy.Stop() 5333 5334 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5335 defer cancel() 5336 md := metadata.Pairs(mdkey, "val") 5337 ctx = metadata.NewOutgoingContext(ctx, md) 5338 5339 // Sanity check that endpoint properly errors when it sees mdkey. 5340 _, err := endpoint.client.EmptyCall(ctx, &testpb.Empty{}) 5341 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal { 5342 t.Fatalf("endpoint.client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err) 5343 } 5344 5345 if _, err := proxy.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { 5346 t.Fatal(err.Error()) 5347 } 5348} 5349 5350func TestStreamingProxyDoesNotForwardMetadata(t *testing.T) { 5351 const mdkey = "somedata" 5352 5353 // doFDC performs a FullDuplexCall with client and returns the error from the 5354 // first stream.Recv call, or nil if that error is io.EOF. Calls t.Fatal if 5355 // the stream cannot be established. 5356 doFDC := func(ctx context.Context, client testpb.TestServiceClient) error { 5357 stream, err := client.FullDuplexCall(ctx) 5358 if err != nil { 5359 t.Fatalf("Unwanted error: %v", err) 5360 } 5361 if _, err := stream.Recv(); err != io.EOF { 5362 return err 5363 } 5364 return nil 5365 } 5366 5367 // endpoint ensures mdkey is NOT in metadata and returns an error if it is. 5368 endpoint := &stubServer{ 5369 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 5370 ctx := stream.Context() 5371 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil { 5372 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) 5373 } 5374 return nil 5375 }, 5376 } 5377 if err := endpoint.Start(nil); err != nil { 5378 t.Fatalf("Error starting endpoint server: %v", err) 5379 } 5380 defer endpoint.Stop() 5381 5382 // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint 5383 // without explicitly copying the metadata. 5384 proxy := &stubServer{ 5385 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 5386 ctx := stream.Context() 5387 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil { 5388 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) 5389 } 5390 return doFDC(ctx, endpoint.client) 5391 }, 5392 } 5393 if err := proxy.Start(nil); err != nil { 5394 t.Fatalf("Error starting proxy server: %v", err) 5395 } 5396 defer proxy.Stop() 5397 5398 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5399 defer cancel() 5400 md := metadata.Pairs(mdkey, "val") 5401 ctx = metadata.NewOutgoingContext(ctx, md) 5402 5403 // Sanity check that endpoint properly errors when it sees mdkey in ctx. 5404 err := doFDC(ctx, endpoint.client) 5405 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal { 5406 t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err) 5407 } 5408 5409 if err := doFDC(ctx, proxy.client); err != nil { 5410 t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err) 5411 } 5412} 5413 5414func TestStatsTagsAndTrace(t *testing.T) { 5415 // Data added to context by client (typically in a stats handler). 5416 tags := []byte{1, 5, 2, 4, 3} 5417 trace := []byte{5, 2, 1, 3, 4} 5418 5419 // endpoint ensures Tags() and Trace() in context match those that were added 5420 // by the client and returns an error if not. 5421 endpoint := &stubServer{ 5422 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5423 md, _ := metadata.FromIncomingContext(ctx) 5424 if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) { 5425 return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags) 5426 } 5427 if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) { 5428 return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags) 5429 } 5430 if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) { 5431 return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace) 5432 } 5433 if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) { 5434 return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace) 5435 } 5436 return &testpb.Empty{}, nil 5437 }, 5438 } 5439 if err := endpoint.Start(nil); err != nil { 5440 t.Fatalf("Error starting endpoint server: %v", err) 5441 } 5442 defer endpoint.Stop() 5443 5444 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5445 defer cancel() 5446 5447 testCases := []struct { 5448 ctx context.Context 5449 want codes.Code 5450 }{ 5451 {ctx: ctx, want: codes.Internal}, 5452 {ctx: stats.SetTags(ctx, tags), want: codes.Internal}, 5453 {ctx: stats.SetTrace(ctx, trace), want: codes.Internal}, 5454 {ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal}, 5455 {ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK}, 5456 } 5457 5458 for _, tc := range testCases { 5459 _, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{}) 5460 if tc.want == codes.OK && err != nil { 5461 t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err) 5462 } 5463 if s, ok := status.FromError(err); !ok || s.Code() != tc.want { 5464 t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want) 5465 } 5466 } 5467} 5468 5469func TestTapTimeout(t *testing.T) { 5470 sopts := []grpc.ServerOption{ 5471 grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) { 5472 c, cancel := context.WithCancel(ctx) 5473 // Call cancel instead of setting a deadline so we can detect which error 5474 // occurred -- this cancellation (desired) or the client's deadline 5475 // expired (indicating this cancellation did not affect the RPC). 5476 time.AfterFunc(10*time.Millisecond, cancel) 5477 return c, nil 5478 }), 5479 } 5480 5481 ss := &stubServer{ 5482 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5483 <-ctx.Done() 5484 return nil, status.Errorf(codes.Canceled, ctx.Err().Error()) 5485 }, 5486 } 5487 if err := ss.Start(sopts); err != nil { 5488 t.Fatalf("Error starting endpoint server: %v", err) 5489 } 5490 defer ss.Stop() 5491 5492 // This was known to be flaky; test several times. 5493 for i := 0; i < 10; i++ { 5494 // Set our own deadline in case the server hangs. 5495 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5496 res, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) 5497 cancel() 5498 if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled { 5499 t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, <status with Code()=Canceled>", res, err) 5500 } 5501 } 5502 5503} 5504 5505func TestClientWriteFailsAfterServerClosesStream(t *testing.T) { 5506 ss := &stubServer{ 5507 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 5508 return status.Errorf(codes.Internal, "") 5509 }, 5510 } 5511 sopts := []grpc.ServerOption{} 5512 if err := ss.Start(sopts); err != nil { 5513 t.Fatalf("Error starting endpoint server: %v", err) 5514 } 5515 defer ss.Stop() 5516 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 5517 defer cancel() 5518 stream, err := ss.client.FullDuplexCall(ctx) 5519 if err != nil { 5520 t.Fatalf("Error while creating stream: %v", err) 5521 } 5522 for { 5523 if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err == nil { 5524 time.Sleep(5 * time.Millisecond) 5525 } else if err == io.EOF { 5526 break // Success. 5527 } else { 5528 t.Fatalf("stream.Send(_) = %v, want io.EOF", err) 5529 } 5530 } 5531} 5532 5533type windowSizeConfig struct { 5534 serverStream int32 5535 serverConn int32 5536 clientStream int32 5537 clientConn int32 5538} 5539 5540func max(a, b int32) int32 { 5541 if a > b { 5542 return a 5543 } 5544 return b 5545} 5546 5547func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) { 5548 defer leakcheck.Check(t) 5549 wc := windowSizeConfig{ 5550 serverStream: 8 * 1024 * 1024, 5551 serverConn: 12 * 1024 * 1024, 5552 clientStream: 6 * 1024 * 1024, 5553 clientConn: 8 * 1024 * 1024, 5554 } 5555 for _, e := range listTestEnv() { 5556 testConfigurableWindowSize(t, e, wc) 5557 } 5558} 5559 5560func TestConfigurableWindowSizeWithSmallWindow(t *testing.T) { 5561 defer leakcheck.Check(t) 5562 wc := windowSizeConfig{ 5563 serverStream: 1, 5564 serverConn: 1, 5565 clientStream: 1, 5566 clientConn: 1, 5567 } 5568 for _, e := range listTestEnv() { 5569 testConfigurableWindowSize(t, e, wc) 5570 } 5571} 5572 5573func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) { 5574 te := newTest(t, e) 5575 te.serverInitialWindowSize = wc.serverStream 5576 te.serverInitialConnWindowSize = wc.serverConn 5577 te.clientInitialWindowSize = wc.clientStream 5578 te.clientInitialConnWindowSize = wc.clientConn 5579 5580 te.startServer(&testServer{security: e.security}) 5581 defer te.tearDown() 5582 5583 cc := te.clientConn() 5584 tc := testpb.NewTestServiceClient(cc) 5585 stream, err := tc.FullDuplexCall(context.Background()) 5586 if err != nil { 5587 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5588 } 5589 numOfIter := 11 5590 // Set message size to exhaust largest of window sizes. 5591 messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1) 5592 messageSize = max(messageSize, 64*1024) 5593 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize) 5594 if err != nil { 5595 t.Fatal(err) 5596 } 5597 respParams := []*testpb.ResponseParameters{ 5598 { 5599 Size: messageSize, 5600 }, 5601 } 5602 req := &testpb.StreamingOutputCallRequest{ 5603 ResponseType: testpb.PayloadType_COMPRESSABLE, 5604 ResponseParameters: respParams, 5605 Payload: payload, 5606 } 5607 for i := 0; i < numOfIter; i++ { 5608 if err := stream.Send(req); err != nil { 5609 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 5610 } 5611 if _, err := stream.Recv(); err != nil { 5612 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) 5613 } 5614 } 5615 if err := stream.CloseSend(); err != nil { 5616 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err) 5617 } 5618} 5619 5620var ( 5621 // test authdata 5622 authdata = map[string]string{ 5623 "test-key": "test-value", 5624 "test-key2-bin": string([]byte{1, 2, 3}), 5625 } 5626) 5627 5628type testPerRPCCredentials struct{} 5629 5630func (cr testPerRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { 5631 return authdata, nil 5632} 5633 5634func (cr testPerRPCCredentials) RequireTransportSecurity() bool { 5635 return false 5636} 5637 5638func authHandle(ctx context.Context, info *tap.Info) (context.Context, error) { 5639 md, ok := metadata.FromIncomingContext(ctx) 5640 if !ok { 5641 return ctx, fmt.Errorf("didn't find metadata in context") 5642 } 5643 for k, vwant := range authdata { 5644 vgot, ok := md[k] 5645 if !ok { 5646 return ctx, fmt.Errorf("didn't find authdata key %v in context", k) 5647 } 5648 if vgot[0] != vwant { 5649 return ctx, fmt.Errorf("for key %v, got value %v, want %v", k, vgot, vwant) 5650 } 5651 } 5652 return ctx, nil 5653} 5654 5655func TestPerRPCCredentialsViaDialOptions(t *testing.T) { 5656 defer leakcheck.Check(t) 5657 for _, e := range listTestEnv() { 5658 testPerRPCCredentialsViaDialOptions(t, e) 5659 } 5660} 5661 5662func testPerRPCCredentialsViaDialOptions(t *testing.T, e env) { 5663 te := newTest(t, e) 5664 te.tapHandle = authHandle 5665 te.perRPCCreds = testPerRPCCredentials{} 5666 te.startServer(&testServer{security: e.security}) 5667 defer te.tearDown() 5668 5669 cc := te.clientConn() 5670 tc := testpb.NewTestServiceClient(cc) 5671 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 5672 t.Fatalf("Test failed. Reason: %v", err) 5673 } 5674} 5675 5676func TestPerRPCCredentialsViaCallOptions(t *testing.T) { 5677 defer leakcheck.Check(t) 5678 for _, e := range listTestEnv() { 5679 testPerRPCCredentialsViaCallOptions(t, e) 5680 } 5681} 5682 5683func testPerRPCCredentialsViaCallOptions(t *testing.T, e env) { 5684 te := newTest(t, e) 5685 te.tapHandle = authHandle 5686 te.startServer(&testServer{security: e.security}) 5687 defer te.tearDown() 5688 5689 cc := te.clientConn() 5690 tc := testpb.NewTestServiceClient(cc) 5691 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil { 5692 t.Fatalf("Test failed. Reason: %v", err) 5693 } 5694} 5695 5696func TestPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T) { 5697 defer leakcheck.Check(t) 5698 for _, e := range listTestEnv() { 5699 testPerRPCCredentialsViaDialOptionsAndCallOptions(t, e) 5700 } 5701} 5702 5703func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) { 5704 te := newTest(t, e) 5705 te.perRPCCreds = testPerRPCCredentials{} 5706 // When credentials are provided via both dial options and call options, 5707 // we apply both sets. 5708 te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) { 5709 md, ok := metadata.FromIncomingContext(ctx) 5710 if !ok { 5711 return ctx, fmt.Errorf("couldn't find metadata in context") 5712 } 5713 for k, vwant := range authdata { 5714 vgot, ok := md[k] 5715 if !ok { 5716 return ctx, fmt.Errorf("couldn't find metadata for key %v", k) 5717 } 5718 if len(vgot) != 2 { 5719 return ctx, fmt.Errorf("len of value for key %v was %v, want 2", k, len(vgot)) 5720 } 5721 if vgot[0] != vwant || vgot[1] != vwant { 5722 return ctx, fmt.Errorf("value for %v was %v, want [%v, %v]", k, vgot, vwant, vwant) 5723 } 5724 } 5725 return ctx, nil 5726 } 5727 te.startServer(&testServer{security: e.security}) 5728 defer te.tearDown() 5729 5730 cc := te.clientConn() 5731 tc := testpb.NewTestServiceClient(cc) 5732 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil { 5733 t.Fatalf("Test failed. Reason: %v", err) 5734 } 5735} 5736 5737func TestWaitForReadyConnection(t *testing.T) { 5738 defer leakcheck.Check(t) 5739 for _, e := range listTestEnv() { 5740 testWaitForReadyConnection(t, e) 5741 } 5742 5743} 5744 5745func testWaitForReadyConnection(t *testing.T, e env) { 5746 te := newTest(t, e) 5747 te.userAgent = testAppUA 5748 te.startServer(&testServer{security: e.security}) 5749 defer te.tearDown() 5750 5751 cc := te.clientConn() // Non-blocking dial. 5752 tc := testpb.NewTestServiceClient(cc) 5753 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 5754 defer cancel() 5755 state := cc.GetState() 5756 // Wait for connection to be Ready. 5757 for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { 5758 } 5759 if state != connectivity.Ready { 5760 t.Fatalf("Want connection state to be Ready, got %v", state) 5761 } 5762 ctx, cancel = context.WithTimeout(context.Background(), time.Second) 5763 defer cancel() 5764 // Make a fail-fast RPC. 5765 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 5766 t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err) 5767 } 5768} 5769 5770type errCodec struct { 5771 noError bool 5772} 5773 5774func (c *errCodec) Marshal(v interface{}) ([]byte, error) { 5775 if c.noError { 5776 return []byte{}, nil 5777 } 5778 return nil, fmt.Errorf("3987^12 + 4365^12 = 4472^12") 5779} 5780 5781func (c *errCodec) Unmarshal(data []byte, v interface{}) error { 5782 return nil 5783} 5784 5785func (c *errCodec) String() string { 5786 return "Fermat's near-miss." 5787} 5788 5789func TestEncodeDoesntPanic(t *testing.T) { 5790 defer leakcheck.Check(t) 5791 for _, e := range listTestEnv() { 5792 testEncodeDoesntPanic(t, e) 5793 } 5794} 5795 5796func testEncodeDoesntPanic(t *testing.T, e env) { 5797 te := newTest(t, e) 5798 erc := &errCodec{} 5799 te.customCodec = erc 5800 te.startServer(&testServer{security: e.security}) 5801 defer te.tearDown() 5802 te.customCodec = nil 5803 tc := testpb.NewTestServiceClient(te.clientConn()) 5804 // Failure case, should not panic. 5805 tc.EmptyCall(context.Background(), &testpb.Empty{}) 5806 erc.noError = true 5807 // Passing case. 5808 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 5809 t.Fatalf("EmptyCall(_, _) = _, %v, want _, <nil>", err) 5810 } 5811} 5812 5813func TestSvrWriteStatusEarlyWrite(t *testing.T) { 5814 defer leakcheck.Check(t) 5815 for _, e := range listTestEnv() { 5816 testSvrWriteStatusEarlyWrite(t, e) 5817 } 5818} 5819 5820func testSvrWriteStatusEarlyWrite(t *testing.T, e env) { 5821 te := newTest(t, e) 5822 const smallSize = 1024 5823 const largeSize = 2048 5824 const extraLargeSize = 4096 5825 te.maxServerReceiveMsgSize = newInt(largeSize) 5826 te.maxServerSendMsgSize = newInt(largeSize) 5827 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 5828 if err != nil { 5829 t.Fatal(err) 5830 } 5831 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) 5832 if err != nil { 5833 t.Fatal(err) 5834 } 5835 te.startServer(&testServer{security: e.security}) 5836 defer te.tearDown() 5837 tc := testpb.NewTestServiceClient(te.clientConn()) 5838 respParam := []*testpb.ResponseParameters{ 5839 { 5840 Size: int32(smallSize), 5841 }, 5842 } 5843 sreq := &testpb.StreamingOutputCallRequest{ 5844 ResponseType: testpb.PayloadType_COMPRESSABLE, 5845 ResponseParameters: respParam, 5846 Payload: extraLargePayload, 5847 } 5848 // Test recv case: server receives a message larger than maxServerReceiveMsgSize. 5849 stream, err := tc.FullDuplexCall(te.ctx) 5850 if err != nil { 5851 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5852 } 5853 if err = stream.Send(sreq); err != nil { 5854 t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err) 5855 } 5856 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 5857 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 5858 } 5859 // Test send case: server sends a message larger than maxServerSendMsgSize. 5860 sreq.Payload = smallPayload 5861 respParam[0].Size = int32(extraLargeSize) 5862 5863 stream, err = tc.FullDuplexCall(te.ctx) 5864 if err != nil { 5865 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5866 } 5867 if err = stream.Send(sreq); err != nil { 5868 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 5869 } 5870 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 5871 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 5872 } 5873} 5874 5875// The following functions with function name ending with TD indicates that they 5876// should be deleted after old service config API is deprecated and deleted. 5877func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) { 5878 te := newTest(t, e) 5879 // We write before read. 5880 ch := make(chan grpc.ServiceConfig, 1) 5881 te.sc = ch 5882 te.userAgent = testAppUA 5883 te.declareLogNoise( 5884 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 5885 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 5886 "grpc: addrConn.resetTransport failed to create client transport: connection error", 5887 "Failed to dial : context canceled; please retry.", 5888 ) 5889 return te, ch 5890} 5891 5892func TestServiceConfigGetMethodConfigTD(t *testing.T) { 5893 defer leakcheck.Check(t) 5894 for _, e := range listTestEnv() { 5895 testGetMethodConfigTD(t, e) 5896 } 5897} 5898 5899func testGetMethodConfigTD(t *testing.T, e env) { 5900 te, ch := testServiceConfigSetupTD(t, e) 5901 defer te.tearDown() 5902 5903 mc1 := grpc.MethodConfig{ 5904 WaitForReady: newBool(true), 5905 Timeout: newDuration(time.Millisecond), 5906 } 5907 mc2 := grpc.MethodConfig{WaitForReady: newBool(false)} 5908 m := make(map[string]grpc.MethodConfig) 5909 m["/grpc.testing.TestService/EmptyCall"] = mc1 5910 m["/grpc.testing.TestService/"] = mc2 5911 sc := grpc.ServiceConfig{ 5912 Methods: m, 5913 } 5914 ch <- sc 5915 5916 cc := te.clientConn() 5917 tc := testpb.NewTestServiceClient(cc) 5918 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 5919 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 5920 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5921 } 5922 5923 m = make(map[string]grpc.MethodConfig) 5924 m["/grpc.testing.TestService/UnaryCall"] = mc1 5925 m["/grpc.testing.TestService/"] = mc2 5926 sc = grpc.ServiceConfig{ 5927 Methods: m, 5928 } 5929 ch <- sc 5930 // Wait for the new service config to propagate. 5931 for { 5932 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { 5933 continue 5934 } 5935 break 5936 } 5937 // The following RPCs are expected to become fail-fast. 5938 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { 5939 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) 5940 } 5941} 5942 5943func TestServiceConfigWaitForReadyTD(t *testing.T) { 5944 defer leakcheck.Check(t) 5945 for _, e := range listTestEnv() { 5946 testServiceConfigWaitForReadyTD(t, e) 5947 } 5948} 5949 5950func testServiceConfigWaitForReadyTD(t *testing.T, e env) { 5951 te, ch := testServiceConfigSetupTD(t, e) 5952 defer te.tearDown() 5953 5954 // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. 5955 mc := grpc.MethodConfig{ 5956 WaitForReady: newBool(false), 5957 Timeout: newDuration(time.Millisecond), 5958 } 5959 m := make(map[string]grpc.MethodConfig) 5960 m["/grpc.testing.TestService/EmptyCall"] = mc 5961 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5962 sc := grpc.ServiceConfig{ 5963 Methods: m, 5964 } 5965 ch <- sc 5966 5967 cc := te.clientConn() 5968 tc := testpb.NewTestServiceClient(cc) 5969 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 5970 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 5971 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5972 } 5973 if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 5974 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 5975 } 5976 5977 // Generate a service config update. 5978 // Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. 5979 mc.WaitForReady = newBool(true) 5980 m = make(map[string]grpc.MethodConfig) 5981 m["/grpc.testing.TestService/EmptyCall"] = mc 5982 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5983 sc = grpc.ServiceConfig{ 5984 Methods: m, 5985 } 5986 ch <- sc 5987 5988 // Wait for the new service config to take effect. 5989 mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") 5990 for { 5991 if !*mc.WaitForReady { 5992 time.Sleep(100 * time.Millisecond) 5993 mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") 5994 continue 5995 } 5996 break 5997 } 5998 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 5999 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 6000 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 6001 } 6002 if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded { 6003 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 6004 } 6005} 6006 6007func TestServiceConfigTimeoutTD(t *testing.T) { 6008 defer leakcheck.Check(t) 6009 for _, e := range listTestEnv() { 6010 testServiceConfigTimeoutTD(t, e) 6011 } 6012} 6013 6014func testServiceConfigTimeoutTD(t *testing.T, e env) { 6015 te, ch := testServiceConfigSetupTD(t, e) 6016 defer te.tearDown() 6017 6018 // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 6019 mc := grpc.MethodConfig{ 6020 Timeout: newDuration(time.Hour), 6021 } 6022 m := make(map[string]grpc.MethodConfig) 6023 m["/grpc.testing.TestService/EmptyCall"] = mc 6024 m["/grpc.testing.TestService/FullDuplexCall"] = mc 6025 sc := grpc.ServiceConfig{ 6026 Methods: m, 6027 } 6028 ch <- sc 6029 6030 cc := te.clientConn() 6031 tc := testpb.NewTestServiceClient(cc) 6032 // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. 6033 ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) 6034 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 6035 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 6036 } 6037 cancel() 6038 ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) 6039 if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 6040 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 6041 } 6042 cancel() 6043 6044 // Generate a service config update. 6045 // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 6046 mc.Timeout = newDuration(time.Nanosecond) 6047 m = make(map[string]grpc.MethodConfig) 6048 m["/grpc.testing.TestService/EmptyCall"] = mc 6049 m["/grpc.testing.TestService/FullDuplexCall"] = mc 6050 sc = grpc.ServiceConfig{ 6051 Methods: m, 6052 } 6053 ch <- sc 6054 6055 // Wait for the new service config to take effect. 6056 mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") 6057 for { 6058 if *mc.Timeout != time.Nanosecond { 6059 time.Sleep(100 * time.Millisecond) 6060 mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") 6061 continue 6062 } 6063 break 6064 } 6065 6066 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 6067 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 6068 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 6069 } 6070 cancel() 6071 6072 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 6073 if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded { 6074 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 6075 } 6076 cancel() 6077} 6078 6079func TestServiceConfigMaxMsgSizeTD(t *testing.T) { 6080 defer leakcheck.Check(t) 6081 for _, e := range listTestEnv() { 6082 testServiceConfigMaxMsgSizeTD(t, e) 6083 } 6084} 6085 6086func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { 6087 // Setting up values and objects shared across all test cases. 6088 const smallSize = 1 6089 const largeSize = 1024 6090 const extraLargeSize = 2048 6091 6092 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 6093 if err != nil { 6094 t.Fatal(err) 6095 } 6096 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 6097 if err != nil { 6098 t.Fatal(err) 6099 } 6100 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) 6101 if err != nil { 6102 t.Fatal(err) 6103 } 6104 6105 mc := grpc.MethodConfig{ 6106 MaxReqSize: newInt(extraLargeSize), 6107 MaxRespSize: newInt(extraLargeSize), 6108 } 6109 6110 m := make(map[string]grpc.MethodConfig) 6111 m["/grpc.testing.TestService/UnaryCall"] = mc 6112 m["/grpc.testing.TestService/FullDuplexCall"] = mc 6113 sc := grpc.ServiceConfig{ 6114 Methods: m, 6115 } 6116 // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 6117 te1, ch1 := testServiceConfigSetupTD(t, e) 6118 te1.startServer(&testServer{security: e.security}) 6119 defer te1.tearDown() 6120 6121 ch1 <- sc 6122 tc := testpb.NewTestServiceClient(te1.clientConn()) 6123 6124 req := &testpb.SimpleRequest{ 6125 ResponseType: testpb.PayloadType_COMPRESSABLE, 6126 ResponseSize: int32(extraLargeSize), 6127 Payload: smallPayload, 6128 } 6129 // Test for unary RPC recv. 6130 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6131 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6132 } 6133 6134 // Test for unary RPC send. 6135 req.Payload = extraLargePayload 6136 req.ResponseSize = int32(smallSize) 6137 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6138 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6139 } 6140 6141 // Test for streaming RPC recv. 6142 respParam := []*testpb.ResponseParameters{ 6143 { 6144 Size: int32(extraLargeSize), 6145 }, 6146 } 6147 sreq := &testpb.StreamingOutputCallRequest{ 6148 ResponseType: testpb.PayloadType_COMPRESSABLE, 6149 ResponseParameters: respParam, 6150 Payload: smallPayload, 6151 } 6152 stream, err := tc.FullDuplexCall(te1.ctx) 6153 if err != nil { 6154 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6155 } 6156 if err := stream.Send(sreq); err != nil { 6157 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6158 } 6159 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 6160 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 6161 } 6162 6163 // Test for streaming RPC send. 6164 respParam[0].Size = int32(smallSize) 6165 sreq.Payload = extraLargePayload 6166 stream, err = tc.FullDuplexCall(te1.ctx) 6167 if err != nil { 6168 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6169 } 6170 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 6171 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 6172 } 6173 6174 // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 6175 te2, ch2 := testServiceConfigSetupTD(t, e) 6176 te2.maxClientReceiveMsgSize = newInt(1024) 6177 te2.maxClientSendMsgSize = newInt(1024) 6178 te2.startServer(&testServer{security: e.security}) 6179 defer te2.tearDown() 6180 ch2 <- sc 6181 tc = testpb.NewTestServiceClient(te2.clientConn()) 6182 6183 // Test for unary RPC recv. 6184 req.Payload = smallPayload 6185 req.ResponseSize = int32(largeSize) 6186 6187 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6188 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6189 } 6190 6191 // Test for unary RPC send. 6192 req.Payload = largePayload 6193 req.ResponseSize = int32(smallSize) 6194 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6195 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6196 } 6197 6198 // Test for streaming RPC recv. 6199 stream, err = tc.FullDuplexCall(te2.ctx) 6200 respParam[0].Size = int32(largeSize) 6201 sreq.Payload = smallPayload 6202 if err != nil { 6203 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6204 } 6205 if err := stream.Send(sreq); err != nil { 6206 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6207 } 6208 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 6209 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 6210 } 6211 6212 // Test for streaming RPC send. 6213 respParam[0].Size = int32(smallSize) 6214 sreq.Payload = largePayload 6215 stream, err = tc.FullDuplexCall(te2.ctx) 6216 if err != nil { 6217 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6218 } 6219 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 6220 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 6221 } 6222 6223 // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 6224 te3, ch3 := testServiceConfigSetupTD(t, e) 6225 te3.maxClientReceiveMsgSize = newInt(4096) 6226 te3.maxClientSendMsgSize = newInt(4096) 6227 te3.startServer(&testServer{security: e.security}) 6228 defer te3.tearDown() 6229 ch3 <- sc 6230 tc = testpb.NewTestServiceClient(te3.clientConn()) 6231 6232 // Test for unary RPC recv. 6233 req.Payload = smallPayload 6234 req.ResponseSize = int32(largeSize) 6235 6236 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 6237 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 6238 } 6239 6240 req.ResponseSize = int32(extraLargeSize) 6241 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6242 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6243 } 6244 6245 // Test for unary RPC send. 6246 req.Payload = largePayload 6247 req.ResponseSize = int32(smallSize) 6248 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 6249 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 6250 } 6251 6252 req.Payload = extraLargePayload 6253 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6254 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6255 } 6256 6257 // Test for streaming RPC recv. 6258 stream, err = tc.FullDuplexCall(te3.ctx) 6259 if err != nil { 6260 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6261 } 6262 respParam[0].Size = int32(largeSize) 6263 sreq.Payload = smallPayload 6264 6265 if err := stream.Send(sreq); err != nil { 6266 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6267 } 6268 if _, err := stream.Recv(); err != nil { 6269 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err) 6270 } 6271 6272 respParam[0].Size = int32(extraLargeSize) 6273 6274 if err := stream.Send(sreq); err != nil { 6275 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6276 } 6277 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 6278 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 6279 } 6280 6281 // Test for streaming RPC send. 6282 respParam[0].Size = int32(smallSize) 6283 sreq.Payload = largePayload 6284 stream, err = tc.FullDuplexCall(te3.ctx) 6285 if err != nil { 6286 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6287 } 6288 if err := stream.Send(sreq); err != nil { 6289 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6290 } 6291 sreq.Payload = extraLargePayload 6292 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 6293 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 6294 } 6295} 6296 6297func TestMethodFromServerStream(t *testing.T) { 6298 defer leakcheck.Check(t) 6299 const testMethod = "/package.service/method" 6300 e := tcpClearRREnv 6301 te := newTest(t, e) 6302 var method string 6303 var ok bool 6304 te.unknownHandler = func(srv interface{}, stream grpc.ServerStream) error { 6305 method, ok = grpc.MethodFromServerStream(stream) 6306 return nil 6307 } 6308 6309 te.startServer(nil) 6310 defer te.tearDown() 6311 _ = te.clientConn().Invoke(context.Background(), testMethod, nil, nil) 6312 if !ok || method != testMethod { 6313 t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod) 6314 } 6315} 6316 6317func TestInterceptorCanAccessCallOptions(t *testing.T) { 6318 defer leakcheck.Check(t) 6319 e := tcpClearRREnv 6320 te := newTest(t, e) 6321 te.startServer(&testServer{security: e.security}) 6322 defer te.tearDown() 6323 6324 type observedOptions struct { 6325 headers []*metadata.MD 6326 trailers []*metadata.MD 6327 peer []*peer.Peer 6328 creds []credentials.PerRPCCredentials 6329 failFast []bool 6330 maxRecvSize []int 6331 maxSendSize []int 6332 compressor []string 6333 subtype []string 6334 } 6335 var observedOpts observedOptions 6336 populateOpts := func(opts []grpc.CallOption) { 6337 for _, o := range opts { 6338 switch o := o.(type) { 6339 case grpc.HeaderCallOption: 6340 observedOpts.headers = append(observedOpts.headers, o.HeaderAddr) 6341 case grpc.TrailerCallOption: 6342 observedOpts.trailers = append(observedOpts.trailers, o.TrailerAddr) 6343 case grpc.PeerCallOption: 6344 observedOpts.peer = append(observedOpts.peer, o.PeerAddr) 6345 case grpc.PerRPCCredsCallOption: 6346 observedOpts.creds = append(observedOpts.creds, o.Creds) 6347 case grpc.FailFastCallOption: 6348 observedOpts.failFast = append(observedOpts.failFast, o.FailFast) 6349 case grpc.MaxRecvMsgSizeCallOption: 6350 observedOpts.maxRecvSize = append(observedOpts.maxRecvSize, o.MaxRecvMsgSize) 6351 case grpc.MaxSendMsgSizeCallOption: 6352 observedOpts.maxSendSize = append(observedOpts.maxSendSize, o.MaxSendMsgSize) 6353 case grpc.CompressorCallOption: 6354 observedOpts.compressor = append(observedOpts.compressor, o.CompressorType) 6355 case grpc.ContentSubtypeCallOption: 6356 observedOpts.subtype = append(observedOpts.subtype, o.ContentSubtype) 6357 } 6358 } 6359 } 6360 6361 te.unaryClientInt = func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 6362 populateOpts(opts) 6363 return nil 6364 } 6365 te.streamClientInt = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 6366 populateOpts(opts) 6367 return nil, nil 6368 } 6369 6370 defaults := []grpc.CallOption{ 6371 grpc.FailFast(false), 6372 grpc.MaxCallRecvMsgSize(1010), 6373 } 6374 tc := testpb.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...))) 6375 6376 var headers metadata.MD 6377 var trailers metadata.MD 6378 var pr peer.Peer 6379 tc.UnaryCall(context.Background(), &testpb.SimpleRequest{}, 6380 grpc.MaxCallRecvMsgSize(100), 6381 grpc.MaxCallSendMsgSize(200), 6382 grpc.PerRPCCredentials(testPerRPCCredentials{}), 6383 grpc.Header(&headers), 6384 grpc.Trailer(&trailers), 6385 grpc.Peer(&pr)) 6386 expected := observedOptions{ 6387 failFast: []bool{false}, 6388 maxRecvSize: []int{1010, 100}, 6389 maxSendSize: []int{200}, 6390 creds: []credentials.PerRPCCredentials{testPerRPCCredentials{}}, 6391 headers: []*metadata.MD{&headers}, 6392 trailers: []*metadata.MD{&trailers}, 6393 peer: []*peer.Peer{&pr}, 6394 } 6395 6396 if !reflect.DeepEqual(expected, observedOpts) { 6397 t.Errorf("unary call did not observe expected options: expected %#v, got %#v", expected, observedOpts) 6398 } 6399 6400 observedOpts = observedOptions{} // reset 6401 6402 tc.StreamingInputCall(context.Background(), 6403 grpc.FailFast(true), 6404 grpc.MaxCallSendMsgSize(2020), 6405 grpc.UseCompressor("comp-type"), 6406 grpc.CallContentSubtype("json")) 6407 expected = observedOptions{ 6408 failFast: []bool{false, true}, 6409 maxRecvSize: []int{1010}, 6410 maxSendSize: []int{2020}, 6411 compressor: []string{"comp-type"}, 6412 subtype: []string{"json"}, 6413 } 6414 6415 if !reflect.DeepEqual(expected, observedOpts) { 6416 t.Errorf("streaming call did not observe expected options: expected %#v, got %#v", expected, observedOpts) 6417 } 6418} 6419 6420func TestCompressorRegister(t *testing.T) { 6421 defer leakcheck.Check(t) 6422 for _, e := range listTestEnv() { 6423 testCompressorRegister(t, e) 6424 } 6425} 6426 6427func testCompressorRegister(t *testing.T, e env) { 6428 te := newTest(t, e) 6429 te.clientCompression = false 6430 te.serverCompression = false 6431 te.clientUseCompression = true 6432 6433 te.startServer(&testServer{security: e.security}) 6434 defer te.tearDown() 6435 tc := testpb.NewTestServiceClient(te.clientConn()) 6436 6437 // Unary call 6438 const argSize = 271828 6439 const respSize = 314159 6440 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 6441 if err != nil { 6442 t.Fatal(err) 6443 } 6444 req := &testpb.SimpleRequest{ 6445 ResponseType: testpb.PayloadType_COMPRESSABLE, 6446 ResponseSize: respSize, 6447 Payload: payload, 6448 } 6449 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) 6450 if _, err := tc.UnaryCall(ctx, req); err != nil { 6451 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 6452 } 6453 // Streaming RPC 6454 ctx, cancel := context.WithCancel(context.Background()) 6455 defer cancel() 6456 stream, err := tc.FullDuplexCall(ctx) 6457 if err != nil { 6458 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6459 } 6460 respParam := []*testpb.ResponseParameters{ 6461 { 6462 Size: 31415, 6463 }, 6464 } 6465 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) 6466 if err != nil { 6467 t.Fatal(err) 6468 } 6469 sreq := &testpb.StreamingOutputCallRequest{ 6470 ResponseType: testpb.PayloadType_COMPRESSABLE, 6471 ResponseParameters: respParam, 6472 Payload: payload, 6473 } 6474 if err := stream.Send(sreq); err != nil { 6475 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6476 } 6477 if _, err := stream.Recv(); err != nil { 6478 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 6479 } 6480} 6481 6482func TestServeExitsWhenListenerClosed(t *testing.T) { 6483 defer leakcheck.Check(t) 6484 6485 ss := &stubServer{ 6486 emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { 6487 return &testpb.Empty{}, nil 6488 }, 6489 } 6490 6491 s := grpc.NewServer() 6492 testpb.RegisterTestServiceServer(s, ss) 6493 6494 lis, err := net.Listen("tcp", "localhost:0") 6495 if err != nil { 6496 t.Fatalf("Failed to create listener: %v", err) 6497 } 6498 6499 done := make(chan struct{}) 6500 go func() { 6501 s.Serve(lis) 6502 close(done) 6503 }() 6504 6505 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) 6506 if err != nil { 6507 t.Fatalf("Failed to dial server: %v", err) 6508 } 6509 defer cc.Close() 6510 c := testpb.NewTestServiceClient(cc) 6511 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6512 defer cancel() 6513 if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil { 6514 t.Fatalf("Failed to send test RPC to server: %v", err) 6515 } 6516 6517 if err := lis.Close(); err != nil { 6518 t.Fatalf("Failed to close listener: %v", err) 6519 } 6520 const timeout = 5 * time.Second 6521 timer := time.NewTimer(timeout) 6522 select { 6523 case <-done: 6524 return 6525 case <-timer.C: 6526 t.Fatalf("Serve did not return after %v", timeout) 6527 } 6528} 6529 6530// Service handler returns status with invalid utf8 message. 6531func TestStatusInvalidUTF8Message(t *testing.T) { 6532 defer leakcheck.Check(t) 6533 6534 var ( 6535 origMsg = string([]byte{0xff, 0xfe, 0xfd}) 6536 wantMsg = "���" 6537 ) 6538 6539 ss := &stubServer{ 6540 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 6541 return nil, status.Errorf(codes.Internal, origMsg) 6542 }, 6543 } 6544 if err := ss.Start(nil); err != nil { 6545 t.Fatalf("Error starting endpoint server: %v", err) 6546 } 6547 defer ss.Stop() 6548 6549 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 6550 defer cancel() 6551 6552 if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg { 6553 t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg) 6554 } 6555} 6556 6557// Service handler returns status with details and invalid utf8 message. Proto 6558// will fail to marshal the status because of the invalid utf8 message. Details 6559// will be dropped when sending. 6560func TestStatusInvalidUTF8Details(t *testing.T) { 6561 defer leakcheck.Check(t) 6562 6563 var ( 6564 origMsg = string([]byte{0xff, 0xfe, 0xfd}) 6565 wantMsg = "���" 6566 ) 6567 6568 ss := &stubServer{ 6569 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 6570 st := status.New(codes.Internal, origMsg) 6571 st, err := st.WithDetails(&testpb.Empty{}) 6572 if err != nil { 6573 return nil, err 6574 } 6575 return nil, st.Err() 6576 }, 6577 } 6578 if err := ss.Start(nil); err != nil { 6579 t.Fatalf("Error starting endpoint server: %v", err) 6580 } 6581 defer ss.Stop() 6582 6583 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 6584 defer cancel() 6585 6586 _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) 6587 st := status.Convert(err) 6588 if st.Message() != wantMsg { 6589 t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg) 6590 } 6591 if len(st.Details()) != 0 { 6592 // Details should be dropped on the server side. 6593 t.Fatalf("RPC status contain details: %v, want no details", st.Details()) 6594 } 6595} 6596 6597func TestClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T) { 6598 defer leakcheck.Check(t) 6599 for _, e := range listTestEnv() { 6600 if e.httpHandler { 6601 continue 6602 } 6603 testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t, e) 6604 } 6605} 6606 6607func testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T, e env) { 6608 te := newTest(t, e) 6609 te.userAgent = testAppUA 6610 smallSize := 1024 6611 te.maxServerReceiveMsgSize = &smallSize 6612 te.startServer(&testServer{security: e.security}) 6613 defer te.tearDown() 6614 tc := testpb.NewTestServiceClient(te.clientConn()) 6615 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1048576) 6616 if err != nil { 6617 t.Fatal(err) 6618 } 6619 req := &testpb.SimpleRequest{ 6620 ResponseType: testpb.PayloadType_COMPRESSABLE, 6621 Payload: payload, 6622 } 6623 var wg sync.WaitGroup 6624 for i := 0; i < 10; i++ { 6625 wg.Add(1) 6626 go func() { 6627 defer wg.Done() 6628 for j := 0; j < 100; j++ { 6629 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) 6630 defer cancel() 6631 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.ResourceExhausted { 6632 t.Errorf("TestService/UnaryCall(_,_) = _. %v, want code: %s", err, codes.ResourceExhausted) 6633 return 6634 } 6635 } 6636 }() 6637 } 6638 wg.Wait() 6639} 6640 6641const clientAlwaysFailCredErrorMsg = "clientAlwaysFailCred always fails" 6642 6643var errClientAlwaysFailCred = errors.New(clientAlwaysFailCredErrorMsg) 6644 6645type clientAlwaysFailCred struct{} 6646 6647func (c clientAlwaysFailCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 6648 return nil, nil, errClientAlwaysFailCred 6649} 6650func (c clientAlwaysFailCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 6651 return rawConn, nil, nil 6652} 6653func (c clientAlwaysFailCred) Info() credentials.ProtocolInfo { 6654 return credentials.ProtocolInfo{} 6655} 6656func (c clientAlwaysFailCred) Clone() credentials.TransportCredentials { 6657 return nil 6658} 6659func (c clientAlwaysFailCred) OverrideServerName(s string) error { 6660 return nil 6661} 6662 6663func TestFailFastRPCErrorOnBadCertificates(t *testing.T) { 6664 te := newTest(t, env{name: "bad-cred", network: "tcp", security: "clientAlwaysFailCred", balancer: "round_robin"}) 6665 te.startServer(&testServer{security: te.e.security}) 6666 defer te.tearDown() 6667 6668 opts := []grpc.DialOption{grpc.WithTransportCredentials(clientAlwaysFailCred{})} 6669 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 6670 defer cancel() 6671 cc, err := grpc.DialContext(ctx, te.srvAddr, opts...) 6672 if err != nil { 6673 t.Fatalf("Dial(_) = %v, want %v", err, nil) 6674 } 6675 defer cc.Close() 6676 6677 tc := testpb.NewTestServiceClient(cc) 6678 for i := 0; i < 1000; i++ { 6679 // This loop runs for at most 1 second. The first several RPCs will fail 6680 // with Unavailable because the connection hasn't started. When the 6681 // first connection failed with creds error, the next RPC should also 6682 // fail with the expected error. 6683 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) { 6684 return 6685 } 6686 time.Sleep(time.Millisecond) 6687 } 6688 te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg) 6689} 6690 6691func TestRPCTimeout(t *testing.T) { 6692 defer leakcheck.Check(t) 6693 for _, e := range listTestEnv() { 6694 testRPCTimeout(t, e) 6695 } 6696} 6697 6698func testRPCTimeout(t *testing.T, e env) { 6699 te := newTest(t, e) 6700 te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond}) 6701 defer te.tearDown() 6702 6703 cc := te.clientConn() 6704 tc := testpb.NewTestServiceClient(cc) 6705 6706 const argSize = 2718 6707 const respSize = 314 6708 6709 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 6710 if err != nil { 6711 t.Fatal(err) 6712 } 6713 6714 req := &testpb.SimpleRequest{ 6715 ResponseType: testpb.PayloadType_COMPRESSABLE, 6716 ResponseSize: respSize, 6717 Payload: payload, 6718 } 6719 for i := -1; i <= 10; i++ { 6720 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond) 6721 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded { 6722 t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded) 6723 } 6724 cancel() 6725 } 6726} 6727 6728func TestDisabledIOBuffers(t *testing.T) { 6729 defer leakcheck.Check(t) 6730 6731 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000)) 6732 if err != nil { 6733 t.Fatalf("Failed to create payload: %v", err) 6734 } 6735 req := &testpb.StreamingOutputCallRequest{ 6736 Payload: payload, 6737 } 6738 resp := &testpb.StreamingOutputCallResponse{ 6739 Payload: payload, 6740 } 6741 6742 ss := &stubServer{ 6743 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 6744 for { 6745 in, err := stream.Recv() 6746 if err == io.EOF { 6747 return nil 6748 } 6749 if err != nil { 6750 t.Errorf("stream.Recv() = _, %v, want _, <nil>", err) 6751 return err 6752 } 6753 if !reflect.DeepEqual(in.Payload.Body, payload.Body) { 6754 t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body)) 6755 return err 6756 } 6757 if err := stream.Send(resp); err != nil { 6758 t.Errorf("stream.Send(_)= %v, want <nil>", err) 6759 return err 6760 } 6761 6762 } 6763 }, 6764 } 6765 6766 s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0)) 6767 testpb.RegisterTestServiceServer(s, ss) 6768 6769 lis, err := net.Listen("tcp", "localhost:0") 6770 if err != nil { 6771 t.Fatalf("Failed to create listener: %v", err) 6772 } 6773 6774 done := make(chan struct{}) 6775 go func() { 6776 s.Serve(lis) 6777 close(done) 6778 }() 6779 defer s.Stop() 6780 dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second) 6781 defer dcancel() 6782 cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0)) 6783 if err != nil { 6784 t.Fatalf("Failed to dial server") 6785 } 6786 defer cc.Close() 6787 c := testpb.NewTestServiceClient(cc) 6788 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6789 defer cancel() 6790 stream, err := c.FullDuplexCall(ctx, grpc.FailFast(false)) 6791 if err != nil { 6792 t.Fatalf("Failed to send test RPC to server") 6793 } 6794 for i := 0; i < 10; i++ { 6795 if err := stream.Send(req); err != nil { 6796 t.Fatalf("stream.Send(_) = %v, want <nil>", err) 6797 } 6798 in, err := stream.Recv() 6799 if err != nil { 6800 t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err) 6801 } 6802 if !reflect.DeepEqual(in.Payload.Body, payload.Body) { 6803 t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body)) 6804 } 6805 } 6806 stream.CloseSend() 6807 if _, err := stream.Recv(); err != io.EOF { 6808 t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err) 6809 } 6810} 6811 6812func TestServerMaxHeaderListSizeClientUserViolation(t *testing.T) { 6813 defer leakcheck.Check(t) 6814 for _, e := range listTestEnv() { 6815 if e.httpHandler { 6816 continue 6817 } 6818 testServerMaxHeaderListSizeClientUserViolation(t, e) 6819 } 6820} 6821 6822func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) { 6823 te := newTest(t, e) 6824 te.maxServerHeaderListSize = new(uint32) 6825 *te.maxServerHeaderListSize = 216 6826 te.startServer(&testServer{security: e.security}) 6827 defer te.tearDown() 6828 6829 cc := te.clientConn() 6830 tc := testpb.NewTestServiceClient(cc) 6831 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6832 defer cancel() 6833 metadata.AppendToOutgoingContext(ctx, "oversize", string(make([]byte, 216))) 6834 var err error 6835 if err = verifyResultWithDelay(func() (bool, error) { 6836 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal { 6837 return true, nil 6838 } 6839 return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal) 6840 }); err != nil { 6841 t.Fatal(err) 6842 } 6843} 6844 6845func TestClientMaxHeaderListSizeServerUserViolation(t *testing.T) { 6846 defer leakcheck.Check(t) 6847 for _, e := range listTestEnv() { 6848 if e.httpHandler { 6849 continue 6850 } 6851 testClientMaxHeaderListSizeServerUserViolation(t, e) 6852 } 6853} 6854 6855func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) { 6856 te := newTest(t, e) 6857 te.maxClientHeaderListSize = new(uint32) 6858 *te.maxClientHeaderListSize = 1 // any header server sends will violate 6859 te.startServer(&testServer{security: e.security}) 6860 defer te.tearDown() 6861 6862 cc := te.clientConn() 6863 tc := testpb.NewTestServiceClient(cc) 6864 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6865 defer cancel() 6866 var err error 6867 if err = verifyResultWithDelay(func() (bool, error) { 6868 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal { 6869 return true, nil 6870 } 6871 return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal) 6872 }); err != nil { 6873 t.Fatal(err) 6874 } 6875} 6876 6877func TestServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T) { 6878 defer leakcheck.Check(t) 6879 for _, e := range listTestEnv() { 6880 if e.httpHandler || e.security == "tls" { 6881 continue 6882 } 6883 testServerMaxHeaderListSizeClientIntentionalViolation(t, e) 6884 } 6885} 6886 6887func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env) { 6888 te := newTest(t, e) 6889 te.maxServerHeaderListSize = new(uint32) 6890 *te.maxServerHeaderListSize = 512 6891 te.startServer(&testServer{security: e.security}) 6892 defer te.tearDown() 6893 6894 cc, dw := te.clientConnWithConnControl() 6895 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 6896 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6897 defer cancel() 6898 stream, err := tc.FullDuplexCall(ctx) 6899 if err != nil { 6900 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) 6901 } 6902 rcw := dw.getRawConnWrapper() 6903 val := make([]string, 512) 6904 for i := range val { 6905 val[i] = "a" 6906 } 6907 // allow for client to send the initial header 6908 time.Sleep(100 * time.Millisecond) 6909 rcw.writeHeaders(http2.HeadersFrameParam{ 6910 StreamID: tc.getCurrentStreamID(), 6911 BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")), 6912 EndStream: false, 6913 EndHeaders: true, 6914 }) 6915 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal { 6916 t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal) 6917 } 6918} 6919 6920func TestClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T) { 6921 defer leakcheck.Check(t) 6922 for _, e := range listTestEnv() { 6923 if e.httpHandler || e.security == "tls" { 6924 continue 6925 } 6926 testClientMaxHeaderListSizeServerIntentionalViolation(t, e) 6927 } 6928} 6929 6930func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) { 6931 te := newTest(t, e) 6932 te.maxClientHeaderListSize = new(uint32) 6933 *te.maxClientHeaderListSize = 200 6934 lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true}) 6935 defer te.tearDown() 6936 cc, _ := te.clientConnWithConnControl() 6937 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 6938 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6939 defer cancel() 6940 stream, err := tc.FullDuplexCall(ctx) 6941 if err != nil { 6942 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) 6943 } 6944 var i int 6945 var rcw *rawConnWrapper 6946 for i = 0; i < 100; i++ { 6947 rcw = lw.getLastConn() 6948 if rcw != nil { 6949 break 6950 } 6951 time.Sleep(10 * time.Millisecond) 6952 continue 6953 } 6954 if i == 100 { 6955 t.Fatalf("failed to create server transport after 1s") 6956 } 6957 6958 val := make([]string, 200) 6959 for i := range val { 6960 val[i] = "a" 6961 } 6962 // allow for client to send the initial header. 6963 time.Sleep(100 * time.Millisecond) 6964 rcw.writeHeaders(http2.HeadersFrameParam{ 6965 StreamID: tc.getCurrentStreamID(), 6966 BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")), 6967 EndStream: false, 6968 EndHeaders: true, 6969 }) 6970 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal { 6971 t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal) 6972 } 6973} 6974 6975func TestNetPipeConn(t *testing.T) { 6976 // This test will block indefinitely if grpc writes both client and server 6977 // prefaces without either reading from the Conn. 6978 defer leakcheck.Check(t) 6979 pl := testutils.NewPipeListener() 6980 s := grpc.NewServer() 6981 defer s.Stop() 6982 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 6983 return &testpb.SimpleResponse{}, nil 6984 }} 6985 testpb.RegisterTestServiceServer(s, ts) 6986 go s.Serve(pl) 6987 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6988 defer cancel() 6989 cc, err := grpc.DialContext(ctx, "", grpc.WithInsecure(), grpc.WithDialer(pl.Dialer())) 6990 if err != nil { 6991 t.Fatalf("Error creating client: %v", err) 6992 } 6993 defer cc.Close() 6994 client := testpb.NewTestServiceClient(cc) 6995 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 6996 t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) 6997 } 6998} 6999 7000func TestLargeTimeout(t *testing.T) { 7001 defer leakcheck.Check(t) 7002 for _, e := range listTestEnv() { 7003 testLargeTimeout(t, e) 7004 } 7005} 7006 7007func testLargeTimeout(t *testing.T, e env) { 7008 te := newTest(t, e) 7009 te.declareLogNoise("Server.processUnaryRPC failed to write status") 7010 7011 ts := &funcServer{} 7012 te.startServer(ts) 7013 defer te.tearDown() 7014 tc := testpb.NewTestServiceClient(te.clientConn()) 7015 7016 timeouts := []time.Duration{ 7017 time.Duration(math.MaxInt64), // will be (correctly) converted to 7018 // 2562048 hours, which overflows upon converting back to an int64 7019 2562047 * time.Hour, // the largest timeout that does not overflow 7020 } 7021 7022 for i, maxTimeout := range timeouts { 7023 ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 7024 deadline, ok := ctx.Deadline() 7025 timeout := deadline.Sub(time.Now()) 7026 minTimeout := maxTimeout - 5*time.Second 7027 if !ok || timeout < minTimeout || timeout > maxTimeout { 7028 t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout) 7029 return nil, status.Error(codes.OutOfRange, "deadline error") 7030 } 7031 return &testpb.SimpleResponse{}, nil 7032 } 7033 7034 ctx, cancel := context.WithTimeout(context.Background(), maxTimeout) 7035 defer cancel() 7036 7037 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 7038 t.Errorf("case %v: UnaryCall(_) = _, %v; want _, nil", i, err) 7039 } 7040 } 7041} 7042 7043// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This 7044// test ensures that the connection is re-created after GO_AWAY and not affected by the 7045// subsequent (old) connection closure. 7046func TestGoAwayThenClose(t *testing.T) { 7047 defer leakcheck.Check(t) 7048 7049 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) 7050 defer cancel() 7051 7052 lis1, err := net.Listen("tcp", "localhost:0") 7053 if err != nil { 7054 t.Fatalf("Error while listening. Err: %v", err) 7055 } 7056 s1 := grpc.NewServer() 7057 defer s1.Stop() 7058 ts1 := &funcServer{ 7059 unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 7060 return &testpb.SimpleResponse{}, nil 7061 }, 7062 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 7063 // Wait forever. 7064 _, err := stream.Recv() 7065 if err == nil { 7066 t.Error("expected to never receive any message") 7067 } 7068 return err 7069 }, 7070 } 7071 testpb.RegisterTestServiceServer(s1, ts1) 7072 go s1.Serve(lis1) 7073 7074 conn2Established := grpcsync.NewEvent() 7075 lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established) 7076 if err != nil { 7077 t.Fatalf("Error while listening. Err: %v", err) 7078 } 7079 s2 := grpc.NewServer() 7080 defer s2.Stop() 7081 conn2Ready := grpcsync.NewEvent() 7082 ts2 := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 7083 conn2Ready.Fire() 7084 return &testpb.SimpleResponse{}, nil 7085 }} 7086 testpb.RegisterTestServiceServer(s2, ts2) 7087 go s2.Serve(lis2) 7088 7089 r, rcleanup := manual.GenerateAndRegisterManualResolver() 7090 defer rcleanup() 7091 r.InitialAddrs([]resolver.Address{ 7092 {Addr: lis1.Addr().String()}, 7093 {Addr: lis2.Addr().String()}, 7094 }) 7095 cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure()) 7096 if err != nil { 7097 t.Fatalf("Error creating client: %v", err) 7098 } 7099 defer cc.Close() 7100 7101 client := testpb.NewTestServiceClient(cc) 7102 7103 // Should go on connection 1. We use a long-lived RPC because it will cause GracefulStop to send GO_AWAY, but the 7104 // connection doesn't get closed until the server stops and the client receives. 7105 stream, err := client.FullDuplexCall(ctx) 7106 if err != nil { 7107 t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err) 7108 } 7109 7110 // Send GO_AWAY to connection 1. 7111 go s1.GracefulStop() 7112 7113 // Wait for connection 2 to be established. 7114 <-conn2Established.Done() 7115 7116 // Close connection 1. 7117 s1.Stop() 7118 7119 // Wait for client to close. 7120 _, err = stream.Recv() 7121 if err == nil { 7122 t.Fatal("expected the stream to die, but got a successful Recv") 7123 } 7124 7125 // Do a bunch of RPCs, make sure it stays stable. These should go to connection 2. 7126 for i := 0; i < 10; i++ { 7127 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 7128 t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) 7129 } 7130 } 7131} 7132 7133func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) { 7134 lis, err := net.Listen(network, address) 7135 if err != nil { 7136 return nil, err 7137 } 7138 return notifyingListener{connEstablished: event, Listener: lis}, nil 7139} 7140 7141type notifyingListener struct { 7142 connEstablished *grpcsync.Event 7143 net.Listener 7144} 7145 7146func (lis notifyingListener) Accept() (net.Conn, error) { 7147 defer lis.connEstablished.Fire() 7148 return lis.Listener.Accept() 7149} 7150 7151func TestRPCWaitsForResolver(t *testing.T) { 7152 te := testServiceConfigSetup(t, tcpClearRREnv) 7153 te.startServer(&testServer{security: tcpClearRREnv.security}) 7154 defer te.tearDown() 7155 r, rcleanup := manual.GenerateAndRegisterManualResolver() 7156 defer rcleanup() 7157 7158 te.resolverScheme = r.Scheme() 7159 te.nonBlockingDial = true 7160 cc := te.clientConn() 7161 tc := testpb.NewTestServiceClient(cc) 7162 7163 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) 7164 defer cancel() 7165 // With no resolved addresses yet, this will timeout. 7166 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 7167 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 7168 } 7169 7170 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) 7171 defer cancel() 7172 go func() { 7173 time.Sleep(time.Second) 7174 r.NewServiceConfig(`{ 7175 "methodConfig": [ 7176 { 7177 "name": [ 7178 { 7179 "service": "grpc.testing.TestService", 7180 "method": "UnaryCall" 7181 } 7182 ], 7183 "maxRequestMessageBytes": 0 7184 } 7185 ] 7186 }`) 7187 r.NewAddress([]resolver.Address{{Addr: te.srvAddr}}) 7188 }() 7189 // We wait a second before providing a service config and resolving 7190 // addresses. So this will wait for that and then honor the 7191 // maxRequestMessageBytes it contains. 7192 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{ResponseType: testpb.PayloadType_UNCOMPRESSABLE}); status.Code(err) != codes.ResourceExhausted { 7193 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err) 7194 } 7195 if got := ctx.Err(); got != nil { 7196 t.Fatalf("ctx.Err() = %v; want nil (deadline should be set short by service config)", got) 7197 } 7198 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 7199 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err) 7200 } 7201} 7202