1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19//go:generate protoc --go_out=plugins=grpc:. codec_perf/perf.proto 20//go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto 21 22package test 23 24import ( 25 "bufio" 26 "bytes" 27 "context" 28 "crypto/tls" 29 "errors" 30 "flag" 31 "fmt" 32 "io" 33 "math" 34 "net" 35 "net/http" 36 "os" 37 "reflect" 38 "runtime" 39 "strings" 40 "sync" 41 "sync/atomic" 42 "syscall" 43 "testing" 44 "time" 45 46 "github.com/golang/protobuf/proto" 47 anypb "github.com/golang/protobuf/ptypes/any" 48 "golang.org/x/net/http2" 49 "golang.org/x/net/http2/hpack" 50 spb "google.golang.org/genproto/googleapis/rpc/status" 51 "google.golang.org/grpc" 52 "google.golang.org/grpc/balancer/roundrobin" 53 "google.golang.org/grpc/codes" 54 "google.golang.org/grpc/connectivity" 55 "google.golang.org/grpc/credentials" 56 "google.golang.org/grpc/encoding" 57 _ "google.golang.org/grpc/encoding/gzip" 58 _ "google.golang.org/grpc/grpclog/glogger" 59 "google.golang.org/grpc/health" 60 healthgrpc "google.golang.org/grpc/health/grpc_health_v1" 61 healthpb "google.golang.org/grpc/health/grpc_health_v1" 62 "google.golang.org/grpc/internal/channelz" 63 "google.golang.org/grpc/internal/grpcsync" 64 "google.golang.org/grpc/internal/grpctest" 65 "google.golang.org/grpc/internal/leakcheck" 66 "google.golang.org/grpc/internal/testutils" 67 "google.golang.org/grpc/internal/transport" 68 "google.golang.org/grpc/metadata" 69 "google.golang.org/grpc/peer" 70 "google.golang.org/grpc/resolver" 71 "google.golang.org/grpc/resolver/manual" 72 _ "google.golang.org/grpc/resolver/passthrough" 73 "google.golang.org/grpc/stats" 74 "google.golang.org/grpc/status" 75 "google.golang.org/grpc/tap" 76 testpb "google.golang.org/grpc/test/grpc_testing" 77 "google.golang.org/grpc/testdata" 78) 79 80func init() { 81 channelz.TurnOn() 82} 83 84type s struct{} 85 86var lcFailed uint32 87 88type errorer struct { 89 t *testing.T 90} 91 92func (e errorer) Errorf(format string, args ...interface{}) { 93 atomic.StoreUint32(&lcFailed, 1) 94 e.t.Errorf(format, args...) 95} 96 97func (s) Teardown(t *testing.T) { 98 if atomic.LoadUint32(&lcFailed) == 1 { 99 return 100 } 101 leakcheck.Check(errorer{t: t}) 102 if atomic.LoadUint32(&lcFailed) == 1 { 103 t.Log("Leak check disabled for future tests") 104 } 105} 106 107func Test(t *testing.T) { 108 grpctest.RunSubTests(t, s{}) 109} 110 111var ( 112 // For headers: 113 testMetadata = metadata.MD{ 114 "key1": []string{"value1"}, 115 "key2": []string{"value2"}, 116 "key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})}, 117 } 118 testMetadata2 = metadata.MD{ 119 "key1": []string{"value12"}, 120 "key2": []string{"value22"}, 121 } 122 // For trailers: 123 testTrailerMetadata = metadata.MD{ 124 "tkey1": []string{"trailerValue1"}, 125 "tkey2": []string{"trailerValue2"}, 126 "tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})}, 127 } 128 testTrailerMetadata2 = metadata.MD{ 129 "tkey1": []string{"trailerValue12"}, 130 "tkey2": []string{"trailerValue22"}, 131 } 132 // capital "Key" is illegal in HTTP/2. 133 malformedHTTP2Metadata = metadata.MD{ 134 "Key": []string{"foo"}, 135 } 136 testAppUA = "myApp1/1.0 myApp2/0.9" 137 failAppUA = "fail-this-RPC" 138 detailedError = status.ErrorProto(&spb.Status{ 139 Code: int32(codes.DataLoss), 140 Message: "error for testing: " + failAppUA, 141 Details: []*anypb.Any{{ 142 TypeUrl: "url", 143 Value: []byte{6, 0, 0, 6, 1, 3}, 144 }}, 145 }) 146) 147 148var raceMode bool // set by race.go in race mode 149 150type testServer struct { 151 security string // indicate the authentication protocol used by this server. 152 earlyFail bool // whether to error out the execution of a service handler prematurely. 153 setAndSendHeader bool // whether to call setHeader and sendHeader. 154 setHeaderOnly bool // whether to only call setHeader, not sendHeader. 155 multipleSetTrailer bool // whether to call setTrailer multiple times. 156 unaryCallSleepTime time.Duration 157} 158 159func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 160 if md, ok := metadata.FromIncomingContext(ctx); ok { 161 // For testing purpose, returns an error if user-agent is failAppUA. 162 // To test that client gets the correct error. 163 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) { 164 return nil, detailedError 165 } 166 var str []string 167 for _, entry := range md["user-agent"] { 168 str = append(str, "ua", entry) 169 } 170 grpc.SendHeader(ctx, metadata.Pairs(str...)) 171 } 172 return new(testpb.Empty), nil 173} 174 175func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) { 176 if size < 0 { 177 return nil, fmt.Errorf("requested a response with invalid length %d", size) 178 } 179 body := make([]byte, size) 180 switch t { 181 case testpb.PayloadType_COMPRESSABLE: 182 case testpb.PayloadType_UNCOMPRESSABLE: 183 return nil, fmt.Errorf("PayloadType UNCOMPRESSABLE is not supported") 184 default: 185 return nil, fmt.Errorf("unsupported payload type: %d", t) 186 } 187 return &testpb.Payload{ 188 Type: t, 189 Body: body, 190 }, nil 191} 192 193func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 194 md, ok := metadata.FromIncomingContext(ctx) 195 if ok { 196 if _, exists := md[":authority"]; !exists { 197 return nil, status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md) 198 } 199 if s.setAndSendHeader { 200 if err := grpc.SetHeader(ctx, md); err != nil { 201 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err) 202 } 203 if err := grpc.SendHeader(ctx, testMetadata2); err != nil { 204 return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err) 205 } 206 } else if s.setHeaderOnly { 207 if err := grpc.SetHeader(ctx, md); err != nil { 208 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err) 209 } 210 if err := grpc.SetHeader(ctx, testMetadata2); err != nil { 211 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err) 212 } 213 } else { 214 if err := grpc.SendHeader(ctx, md); err != nil { 215 return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err) 216 } 217 } 218 if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil { 219 return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err) 220 } 221 if s.multipleSetTrailer { 222 if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil { 223 return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err) 224 } 225 } 226 } 227 pr, ok := peer.FromContext(ctx) 228 if !ok { 229 return nil, status.Error(codes.DataLoss, "failed to get peer from ctx") 230 } 231 if pr.Addr == net.Addr(nil) { 232 return nil, status.Error(codes.DataLoss, "failed to get peer address") 233 } 234 if s.security != "" { 235 // Check Auth info 236 var authType, serverName string 237 switch info := pr.AuthInfo.(type) { 238 case credentials.TLSInfo: 239 authType = info.AuthType() 240 serverName = info.State.ServerName 241 default: 242 return nil, status.Error(codes.Unauthenticated, "Unknown AuthInfo type") 243 } 244 if authType != s.security { 245 return nil, status.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security) 246 } 247 if serverName != "x.test.youtube.com" { 248 return nil, status.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName) 249 } 250 } 251 // Simulate some service delay. 252 time.Sleep(s.unaryCallSleepTime) 253 254 payload, err := newPayload(in.GetResponseType(), in.GetResponseSize()) 255 if err != nil { 256 return nil, err 257 } 258 259 return &testpb.SimpleResponse{ 260 Payload: payload, 261 }, nil 262} 263 264func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { 265 if md, ok := metadata.FromIncomingContext(stream.Context()); ok { 266 if _, exists := md[":authority"]; !exists { 267 return status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md) 268 } 269 // For testing purpose, returns an error if user-agent is failAppUA. 270 // To test that client gets the correct error. 271 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) { 272 return status.Error(codes.DataLoss, "error for testing: "+failAppUA) 273 } 274 } 275 cs := args.GetResponseParameters() 276 for _, c := range cs { 277 if us := c.GetIntervalUs(); us > 0 { 278 time.Sleep(time.Duration(us) * time.Microsecond) 279 } 280 281 payload, err := newPayload(args.GetResponseType(), c.GetSize()) 282 if err != nil { 283 return err 284 } 285 286 if err := stream.Send(&testpb.StreamingOutputCallResponse{ 287 Payload: payload, 288 }); err != nil { 289 return err 290 } 291 } 292 return nil 293} 294 295func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error { 296 var sum int 297 for { 298 in, err := stream.Recv() 299 if err == io.EOF { 300 return stream.SendAndClose(&testpb.StreamingInputCallResponse{ 301 AggregatedPayloadSize: int32(sum), 302 }) 303 } 304 if err != nil { 305 return err 306 } 307 p := in.GetPayload().GetBody() 308 sum += len(p) 309 if s.earlyFail { 310 return status.Error(codes.NotFound, "not found") 311 } 312 } 313} 314 315func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 316 md, ok := metadata.FromIncomingContext(stream.Context()) 317 if ok { 318 if s.setAndSendHeader { 319 if err := stream.SetHeader(md); err != nil { 320 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err) 321 } 322 if err := stream.SendHeader(testMetadata2); err != nil { 323 return status.Errorf(status.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err) 324 } 325 } else if s.setHeaderOnly { 326 if err := stream.SetHeader(md); err != nil { 327 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err) 328 } 329 if err := stream.SetHeader(testMetadata2); err != nil { 330 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err) 331 } 332 } else { 333 if err := stream.SendHeader(md); err != nil { 334 return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) 335 } 336 } 337 stream.SetTrailer(testTrailerMetadata) 338 if s.multipleSetTrailer { 339 stream.SetTrailer(testTrailerMetadata2) 340 } 341 } 342 for { 343 in, err := stream.Recv() 344 if err == io.EOF { 345 // read done. 346 return nil 347 } 348 if err != nil { 349 // to facilitate testSvrWriteStatusEarlyWrite 350 if status.Code(err) == codes.ResourceExhausted { 351 return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) 352 } 353 return err 354 } 355 cs := in.GetResponseParameters() 356 for _, c := range cs { 357 if us := c.GetIntervalUs(); us > 0 { 358 time.Sleep(time.Duration(us) * time.Microsecond) 359 } 360 361 payload, err := newPayload(in.GetResponseType(), c.GetSize()) 362 if err != nil { 363 return err 364 } 365 366 if err := stream.Send(&testpb.StreamingOutputCallResponse{ 367 Payload: payload, 368 }); err != nil { 369 // to facilitate testSvrWriteStatusEarlyWrite 370 if status.Code(err) == codes.ResourceExhausted { 371 return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) 372 } 373 return err 374 } 375 } 376 } 377} 378 379func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error { 380 var msgBuf []*testpb.StreamingOutputCallRequest 381 for { 382 in, err := stream.Recv() 383 if err == io.EOF { 384 // read done. 385 break 386 } 387 if err != nil { 388 return err 389 } 390 msgBuf = append(msgBuf, in) 391 } 392 for _, m := range msgBuf { 393 cs := m.GetResponseParameters() 394 for _, c := range cs { 395 if us := c.GetIntervalUs(); us > 0 { 396 time.Sleep(time.Duration(us) * time.Microsecond) 397 } 398 399 payload, err := newPayload(m.GetResponseType(), c.GetSize()) 400 if err != nil { 401 return err 402 } 403 404 if err := stream.Send(&testpb.StreamingOutputCallResponse{ 405 Payload: payload, 406 }); err != nil { 407 return err 408 } 409 } 410 } 411 return nil 412} 413 414type env struct { 415 name string 416 network string // The type of network such as tcp, unix, etc. 417 security string // The security protocol such as TLS, SSH, etc. 418 httpHandler bool // whether to use the http.Handler ServerTransport; requires TLS 419 balancer string // One of "round_robin", "pick_first", "v1", or "". 420 customDialer func(string, string, time.Duration) (net.Conn, error) 421} 422 423func (e env) runnable() bool { 424 if runtime.GOOS == "windows" && e.network == "unix" { 425 return false 426 } 427 return true 428} 429 430func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) { 431 if e.customDialer != nil { 432 return e.customDialer(e.network, addr, timeout) 433 } 434 return net.DialTimeout(e.network, addr, timeout) 435} 436 437var ( 438 tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp", balancer: "v1"} 439 tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls", balancer: "v1"} 440 tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"} 441 tcpTLSRREnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"} 442 handlerEnv = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"} 443 noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"} 444 allEnv = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv} 445) 446 447var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.") 448 449func listTestEnv() (envs []env) { 450 if *onlyEnv != "" { 451 for _, e := range allEnv { 452 if e.name == *onlyEnv { 453 if !e.runnable() { 454 panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS)) 455 } 456 return []env{e} 457 } 458 } 459 panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv)) 460 } 461 for _, e := range allEnv { 462 if e.runnable() { 463 envs = append(envs, e) 464 } 465 } 466 return envs 467} 468 469// test is an end-to-end test. It should be created with the newTest 470// func, modified as needed, and then started with its startServer method. 471// It should be cleaned up with the tearDown method. 472type test struct { 473 t *testing.T 474 e env 475 476 ctx context.Context // valid for life of test, before tearDown 477 cancel context.CancelFunc 478 479 // Configurable knobs, after newTest returns: 480 testServer testpb.TestServiceServer // nil means none 481 healthServer *health.Server // nil means disabled 482 maxStream uint32 483 tapHandle tap.ServerInHandle 484 maxMsgSize *int 485 maxClientReceiveMsgSize *int 486 maxClientSendMsgSize *int 487 maxServerReceiveMsgSize *int 488 maxServerSendMsgSize *int 489 maxClientHeaderListSize *uint32 490 maxServerHeaderListSize *uint32 491 userAgent string 492 // clientCompression and serverCompression are set to test the deprecated API 493 // WithCompressor and WithDecompressor. 494 clientCompression bool 495 serverCompression bool 496 // clientUseCompression is set to test the new compressor registration API UseCompressor. 497 clientUseCompression bool 498 // clientNopCompression is set to create a compressor whose type is not supported. 499 clientNopCompression bool 500 unaryClientInt grpc.UnaryClientInterceptor 501 streamClientInt grpc.StreamClientInterceptor 502 unaryServerInt grpc.UnaryServerInterceptor 503 streamServerInt grpc.StreamServerInterceptor 504 unknownHandler grpc.StreamHandler 505 sc <-chan grpc.ServiceConfig 506 customCodec encoding.Codec 507 serverInitialWindowSize int32 508 serverInitialConnWindowSize int32 509 clientInitialWindowSize int32 510 clientInitialConnWindowSize int32 511 perRPCCreds credentials.PerRPCCredentials 512 customDialOptions []grpc.DialOption 513 customServerOptions []grpc.ServerOption 514 resolverScheme string 515 516 // All test dialing is blocking by default. Set this to true if dial 517 // should be non-blocking. 518 nonBlockingDial bool 519 520 // srv and srvAddr are set once startServer is called. 521 srv stopper 522 srvAddr string 523 524 // srvs and srvAddrs are set once startServers is called. 525 srvs []*grpc.Server 526 srvAddrs []string 527 528 cc *grpc.ClientConn // nil until requested via clientConn 529 restoreLogs func() // nil unless declareLogNoise is used 530} 531 532type stopper interface { 533 Stop() 534 GracefulStop() 535} 536 537func (te *test) tearDown() { 538 if te.cancel != nil { 539 te.cancel() 540 te.cancel = nil 541 } 542 543 if te.cc != nil { 544 te.cc.Close() 545 te.cc = nil 546 } 547 548 if te.restoreLogs != nil { 549 te.restoreLogs() 550 te.restoreLogs = nil 551 } 552 553 if te.srv != nil { 554 te.srv.Stop() 555 } 556 if len(te.srvs) != 0 { 557 for _, s := range te.srvs { 558 s.Stop() 559 } 560 } 561} 562 563// newTest returns a new test using the provided testing.T and 564// environment. It is returned with default values. Tests should 565// modify it before calling its startServer and clientConn methods. 566func newTest(t *testing.T, e env) *test { 567 te := &test{ 568 t: t, 569 e: e, 570 maxStream: math.MaxUint32, 571 } 572 te.ctx, te.cancel = context.WithCancel(context.Background()) 573 return te 574} 575 576func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener { 577 te.testServer = ts 578 te.t.Logf("Running test in %s environment...", te.e.name) 579 sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} 580 if te.maxMsgSize != nil { 581 sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize)) 582 } 583 if te.maxServerReceiveMsgSize != nil { 584 sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize)) 585 } 586 if te.maxServerSendMsgSize != nil { 587 sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize)) 588 } 589 if te.maxServerHeaderListSize != nil { 590 sopts = append(sopts, grpc.MaxHeaderListSize(*te.maxServerHeaderListSize)) 591 } 592 if te.tapHandle != nil { 593 sopts = append(sopts, grpc.InTapHandle(te.tapHandle)) 594 } 595 if te.serverCompression { 596 sopts = append(sopts, 597 grpc.RPCCompressor(grpc.NewGZIPCompressor()), 598 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), 599 ) 600 } 601 if te.unaryServerInt != nil { 602 sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt)) 603 } 604 if te.streamServerInt != nil { 605 sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt)) 606 } 607 if te.unknownHandler != nil { 608 sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler)) 609 } 610 if te.serverInitialWindowSize > 0 { 611 sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize)) 612 } 613 if te.serverInitialConnWindowSize > 0 { 614 sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize)) 615 } 616 la := "localhost:0" 617 switch te.e.network { 618 case "unix": 619 la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano()) 620 syscall.Unlink(la) 621 } 622 lis, err := listen(te.e.network, la) 623 if err != nil { 624 te.t.Fatalf("Failed to listen: %v", err) 625 } 626 switch te.e.security { 627 case "tls": 628 creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key")) 629 if err != nil { 630 te.t.Fatalf("Failed to generate credentials %v", err) 631 } 632 sopts = append(sopts, grpc.Creds(creds)) 633 case "clientTimeoutCreds": 634 sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{})) 635 } 636 sopts = append(sopts, te.customServerOptions...) 637 s := grpc.NewServer(sopts...) 638 te.srv = s 639 if te.healthServer != nil { 640 healthgrpc.RegisterHealthServer(s, te.healthServer) 641 } 642 if te.testServer != nil { 643 testpb.RegisterTestServiceServer(s, te.testServer) 644 } 645 addr := la 646 switch te.e.network { 647 case "unix": 648 default: 649 _, port, err := net.SplitHostPort(lis.Addr().String()) 650 if err != nil { 651 te.t.Fatalf("Failed to parse listener address: %v", err) 652 } 653 addr = "localhost:" + port 654 } 655 656 te.srvAddr = addr 657 658 if te.e.httpHandler { 659 if te.e.security != "tls" { 660 te.t.Fatalf("unsupported environment settings") 661 } 662 cert, err := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key")) 663 if err != nil { 664 te.t.Fatal("Error creating TLS certificate: ", err) 665 } 666 hs := &http.Server{ 667 Handler: s, 668 } 669 err = http2.ConfigureServer(hs, &http2.Server{ 670 MaxConcurrentStreams: te.maxStream, 671 }) 672 if err != nil { 673 te.t.Fatal("error starting http2 server: ", err) 674 } 675 hs.TLSConfig.Certificates = []tls.Certificate{cert} 676 tlsListener := tls.NewListener(lis, hs.TLSConfig) 677 whs := &wrapHS{Listener: tlsListener, s: hs, conns: make(map[net.Conn]bool)} 678 te.srv = whs 679 go hs.Serve(whs) 680 681 return lis 682 } 683 684 go s.Serve(lis) 685 return lis 686} 687 688// TODO: delete wrapHS and wrapConn when Go1.6 and Go1.7 support are gone and 689// call s.Close and s.Shutdown instead. 690type wrapHS struct { 691 sync.Mutex 692 net.Listener 693 s *http.Server 694 conns map[net.Conn]bool 695} 696 697func (w *wrapHS) Accept() (net.Conn, error) { 698 c, err := w.Listener.Accept() 699 if err != nil { 700 return nil, err 701 } 702 w.Lock() 703 if w.conns == nil { 704 w.Unlock() 705 c.Close() 706 return nil, errors.New("connection after listener closed") 707 } 708 w.conns[&wrapConn{Conn: c, hs: w}] = true 709 w.Unlock() 710 return c, nil 711} 712 713func (w *wrapHS) Stop() { 714 w.Listener.Close() 715 w.Lock() 716 conns := w.conns 717 w.conns = nil 718 w.Unlock() 719 for c := range conns { 720 c.Close() 721 } 722} 723 724// Poll for now.. 725func (w *wrapHS) GracefulStop() { 726 w.Listener.Close() 727 for { 728 w.Lock() 729 l := len(w.conns) 730 w.Unlock() 731 if l == 0 { 732 return 733 } 734 time.Sleep(50 * time.Millisecond) 735 } 736} 737 738type wrapConn struct { 739 net.Conn 740 hs *wrapHS 741} 742 743func (w *wrapConn) Close() error { 744 w.hs.Lock() 745 delete(w.hs.conns, w.Conn) 746 w.hs.Unlock() 747 return w.Conn.Close() 748} 749 750func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listenerWrapper { 751 l := te.listenAndServe(ts, listenWithConnControl) 752 return l.(*listenerWrapper) 753} 754 755// startServer starts a gRPC server listening. Callers should defer a 756// call to te.tearDown to clean up. 757func (te *test) startServer(ts testpb.TestServiceServer) { 758 te.listenAndServe(ts, net.Listen) 759} 760 761type nopCompressor struct { 762 grpc.Compressor 763} 764 765// NewNopCompressor creates a compressor to test the case that type is not supported. 766func NewNopCompressor() grpc.Compressor { 767 return &nopCompressor{grpc.NewGZIPCompressor()} 768} 769 770func (c *nopCompressor) Type() string { 771 return "nop" 772} 773 774type nopDecompressor struct { 775 grpc.Decompressor 776} 777 778// NewNopDecompressor creates a decompressor to test the case that type is not supported. 779func NewNopDecompressor() grpc.Decompressor { 780 return &nopDecompressor{grpc.NewGZIPDecompressor()} 781} 782 783func (d *nopDecompressor) Type() string { 784 return "nop" 785} 786 787func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) { 788 opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent)) 789 790 if te.sc != nil { 791 opts = append(opts, grpc.WithServiceConfig(te.sc)) 792 } 793 794 if te.clientCompression { 795 opts = append(opts, 796 grpc.WithCompressor(grpc.NewGZIPCompressor()), 797 grpc.WithDecompressor(grpc.NewGZIPDecompressor()), 798 ) 799 } 800 if te.clientUseCompression { 801 opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) 802 } 803 if te.clientNopCompression { 804 opts = append(opts, 805 grpc.WithCompressor(NewNopCompressor()), 806 grpc.WithDecompressor(NewNopDecompressor()), 807 ) 808 } 809 if te.unaryClientInt != nil { 810 opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt)) 811 } 812 if te.streamClientInt != nil { 813 opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt)) 814 } 815 if te.maxMsgSize != nil { 816 opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize)) 817 } 818 if te.maxClientReceiveMsgSize != nil { 819 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize))) 820 } 821 if te.maxClientSendMsgSize != nil { 822 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize))) 823 } 824 if te.maxClientHeaderListSize != nil { 825 opts = append(opts, grpc.WithMaxHeaderListSize(*te.maxClientHeaderListSize)) 826 } 827 switch te.e.security { 828 case "tls": 829 creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com") 830 if err != nil { 831 te.t.Fatalf("Failed to load credentials: %v", err) 832 } 833 opts = append(opts, grpc.WithTransportCredentials(creds)) 834 case "clientTimeoutCreds": 835 opts = append(opts, grpc.WithTransportCredentials(&clientTimeoutCreds{})) 836 case "empty": 837 // Don't add any transport creds option. 838 default: 839 opts = append(opts, grpc.WithInsecure()) 840 } 841 // TODO(bar) switch balancer case "pick_first". 842 var scheme string 843 if te.resolverScheme == "" { 844 scheme = "passthrough:///" 845 } else { 846 scheme = te.resolverScheme + ":///" 847 } 848 switch te.e.balancer { 849 case "v1": 850 opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil))) 851 case "round_robin": 852 opts = append(opts, grpc.WithBalancerName(roundrobin.Name)) 853 } 854 if te.clientInitialWindowSize > 0 { 855 opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) 856 } 857 if te.clientInitialConnWindowSize > 0 { 858 opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize)) 859 } 860 if te.perRPCCreds != nil { 861 opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds)) 862 } 863 if te.customCodec != nil { 864 opts = append(opts, grpc.WithDefaultCallOptions(grpc.ForceCodec(te.customCodec))) 865 } 866 if !te.nonBlockingDial && te.srvAddr != "" { 867 // Only do a blocking dial if server is up. 868 opts = append(opts, grpc.WithBlock()) 869 } 870 if te.srvAddr == "" { 871 te.srvAddr = "client.side.only.test" 872 } 873 opts = append(opts, te.customDialOptions...) 874 return opts, scheme 875} 876 877func (te *test) clientConnWithConnControl() (*grpc.ClientConn, *dialerWrapper) { 878 if te.cc != nil { 879 return te.cc, nil 880 } 881 opts, scheme := te.configDial() 882 dw := &dialerWrapper{} 883 // overwrite the dialer before 884 opts = append(opts, grpc.WithDialer(dw.dialer)) 885 var err error 886 te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...) 887 if err != nil { 888 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err) 889 } 890 return te.cc, dw 891} 892 893func (te *test) clientConn(opts ...grpc.DialOption) *grpc.ClientConn { 894 if te.cc != nil { 895 return te.cc 896 } 897 var scheme string 898 opts, scheme = te.configDial(opts...) 899 var err error 900 te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...) 901 if err != nil { 902 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err) 903 } 904 return te.cc 905} 906 907func (te *test) declareLogNoise(phrases ...string) { 908 te.restoreLogs = declareLogNoise(te.t, phrases...) 909} 910 911func (te *test) withServerTester(fn func(st *serverTester)) { 912 c, err := te.e.dialer(te.srvAddr, 10*time.Second) 913 if err != nil { 914 te.t.Fatal(err) 915 } 916 defer c.Close() 917 if te.e.security == "tls" { 918 c = tls.Client(c, &tls.Config{ 919 InsecureSkipVerify: true, 920 NextProtos: []string{http2.NextProtoTLS}, 921 }) 922 } 923 st := newServerTesterFromConn(te.t, c) 924 st.greet() 925 fn(st) 926} 927 928type lazyConn struct { 929 net.Conn 930 beLazy int32 931} 932 933func (l *lazyConn) Write(b []byte) (int, error) { 934 if atomic.LoadInt32(&(l.beLazy)) == 1 { 935 time.Sleep(time.Second) 936 } 937 return l.Conn.Write(b) 938} 939 940func (s) TestContextDeadlineNotIgnored(t *testing.T) { 941 e := noBalancerEnv 942 var lc *lazyConn 943 e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) { 944 conn, err := net.DialTimeout(network, addr, timeout) 945 if err != nil { 946 return nil, err 947 } 948 lc = &lazyConn{Conn: conn} 949 return lc, nil 950 } 951 952 te := newTest(t, e) 953 te.startServer(&testServer{security: e.security}) 954 defer te.tearDown() 955 956 cc := te.clientConn() 957 tc := testpb.NewTestServiceClient(cc) 958 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 959 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 960 } 961 atomic.StoreInt32(&(lc.beLazy), 1) 962 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 963 defer cancel() 964 t1 := time.Now() 965 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 966 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err) 967 } 968 if time.Since(t1) > 2*time.Second { 969 t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline") 970 } 971} 972 973func (s) TestTimeoutOnDeadServer(t *testing.T) { 974 for _, e := range listTestEnv() { 975 testTimeoutOnDeadServer(t, e) 976 } 977} 978 979func testTimeoutOnDeadServer(t *testing.T, e env) { 980 te := newTest(t, e) 981 te.userAgent = testAppUA 982 te.declareLogNoise( 983 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 984 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 985 "grpc: addrConn.resetTransport failed to create client transport: connection error", 986 ) 987 te.startServer(&testServer{security: e.security}) 988 defer te.tearDown() 989 990 cc := te.clientConn() 991 tc := testpb.NewTestServiceClient(cc) 992 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 993 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 994 } 995 te.srv.Stop() 996 997 // Wait for the client to notice the connection is gone. 998 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) 999 state := cc.GetState() 1000 for ; state == connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { 1001 } 1002 cancel() 1003 if state == connectivity.Ready { 1004 t.Fatalf("Timed out waiting for non-ready state") 1005 } 1006 ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond) 1007 _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)) 1008 cancel() 1009 if e.balancer != "" && status.Code(err) != codes.DeadlineExceeded { 1010 // If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error, 1011 // the error will be an internal error. 1012 t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded) 1013 } 1014 awaitNewConnLogOutput() 1015} 1016 1017func (s) TestServerGracefulStopIdempotent(t *testing.T) { 1018 for _, e := range listTestEnv() { 1019 if e.name == "handler-tls" { 1020 continue 1021 } 1022 testServerGracefulStopIdempotent(t, e) 1023 } 1024} 1025 1026func testServerGracefulStopIdempotent(t *testing.T, e env) { 1027 te := newTest(t, e) 1028 te.userAgent = testAppUA 1029 te.startServer(&testServer{security: e.security}) 1030 defer te.tearDown() 1031 1032 for i := 0; i < 3; i++ { 1033 te.srv.GracefulStop() 1034 } 1035} 1036 1037func (s) TestServerGoAway(t *testing.T) { 1038 for _, e := range listTestEnv() { 1039 if e.name == "handler-tls" { 1040 continue 1041 } 1042 testServerGoAway(t, e) 1043 } 1044} 1045 1046func testServerGoAway(t *testing.T, e env) { 1047 te := newTest(t, e) 1048 te.userAgent = testAppUA 1049 te.startServer(&testServer{security: e.security}) 1050 defer te.tearDown() 1051 1052 cc := te.clientConn() 1053 tc := testpb.NewTestServiceClient(cc) 1054 // Finish an RPC to make sure the connection is good. 1055 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1056 defer cancel() 1057 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1058 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1059 } 1060 ch := make(chan struct{}) 1061 go func() { 1062 te.srv.GracefulStop() 1063 close(ch) 1064 }() 1065 // Loop until the server side GoAway signal is propagated to the client. 1066 for { 1067 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1068 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded { 1069 cancel() 1070 break 1071 } 1072 cancel() 1073 } 1074 // A new RPC should fail. 1075 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 1076 defer cancel() 1077 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal { 1078 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal) 1079 } 1080 <-ch 1081 awaitNewConnLogOutput() 1082} 1083 1084func (s) TestServerGoAwayPendingRPC(t *testing.T) { 1085 for _, e := range listTestEnv() { 1086 if e.name == "handler-tls" { 1087 continue 1088 } 1089 testServerGoAwayPendingRPC(t, e) 1090 } 1091} 1092 1093func testServerGoAwayPendingRPC(t *testing.T, e env) { 1094 te := newTest(t, e) 1095 te.userAgent = testAppUA 1096 te.declareLogNoise( 1097 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1098 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1099 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1100 ) 1101 te.startServer(&testServer{security: e.security}) 1102 defer te.tearDown() 1103 1104 cc := te.clientConn() 1105 tc := testpb.NewTestServiceClient(cc) 1106 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1107 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) 1108 if err != nil { 1109 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1110 } 1111 // Finish an RPC to make sure the connection is good. 1112 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1113 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1114 } 1115 ch := make(chan struct{}) 1116 go func() { 1117 te.srv.GracefulStop() 1118 close(ch) 1119 }() 1120 // Loop until the server side GoAway signal is propagated to the client. 1121 start := time.Now() 1122 errored := false 1123 for time.Since(start) < time.Second { 1124 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1125 _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)) 1126 cancel() 1127 if err != nil { 1128 errored = true 1129 break 1130 } 1131 } 1132 if !errored { 1133 t.Fatalf("GoAway never received by client") 1134 } 1135 respParam := []*testpb.ResponseParameters{{Size: 1}} 1136 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) 1137 if err != nil { 1138 t.Fatal(err) 1139 } 1140 req := &testpb.StreamingOutputCallRequest{ 1141 ResponseType: testpb.PayloadType_COMPRESSABLE, 1142 ResponseParameters: respParam, 1143 Payload: payload, 1144 } 1145 // The existing RPC should be still good to proceed. 1146 if err := stream.Send(req); err != nil { 1147 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err) 1148 } 1149 if _, err := stream.Recv(); err != nil { 1150 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) 1151 } 1152 // The RPC will run until canceled. 1153 cancel() 1154 <-ch 1155 awaitNewConnLogOutput() 1156} 1157 1158func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) { 1159 for _, e := range listTestEnv() { 1160 if e.name == "handler-tls" { 1161 continue 1162 } 1163 testServerMultipleGoAwayPendingRPC(t, e) 1164 } 1165} 1166 1167func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) { 1168 te := newTest(t, e) 1169 te.userAgent = testAppUA 1170 te.declareLogNoise( 1171 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1172 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1173 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1174 ) 1175 te.startServer(&testServer{security: e.security}) 1176 defer te.tearDown() 1177 1178 cc := te.clientConn() 1179 tc := testpb.NewTestServiceClient(cc) 1180 ctx, cancel := context.WithCancel(context.Background()) 1181 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) 1182 if err != nil { 1183 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1184 } 1185 // Finish an RPC to make sure the connection is good. 1186 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1187 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1188 } 1189 ch1 := make(chan struct{}) 1190 go func() { 1191 te.srv.GracefulStop() 1192 close(ch1) 1193 }() 1194 ch2 := make(chan struct{}) 1195 go func() { 1196 te.srv.GracefulStop() 1197 close(ch2) 1198 }() 1199 // Loop until the server side GoAway signal is propagated to the client. 1200 for { 1201 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1202 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1203 cancel() 1204 break 1205 } 1206 cancel() 1207 } 1208 select { 1209 case <-ch1: 1210 t.Fatal("GracefulStop() terminated early") 1211 case <-ch2: 1212 t.Fatal("GracefulStop() terminated early") 1213 default: 1214 } 1215 respParam := []*testpb.ResponseParameters{ 1216 { 1217 Size: 1, 1218 }, 1219 } 1220 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) 1221 if err != nil { 1222 t.Fatal(err) 1223 } 1224 req := &testpb.StreamingOutputCallRequest{ 1225 ResponseType: testpb.PayloadType_COMPRESSABLE, 1226 ResponseParameters: respParam, 1227 Payload: payload, 1228 } 1229 // The existing RPC should be still good to proceed. 1230 if err := stream.Send(req); err != nil { 1231 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 1232 } 1233 if _, err := stream.Recv(); err != nil { 1234 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) 1235 } 1236 if err := stream.CloseSend(); err != nil { 1237 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err) 1238 } 1239 <-ch1 1240 <-ch2 1241 cancel() 1242 awaitNewConnLogOutput() 1243} 1244 1245func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) { 1246 for _, e := range listTestEnv() { 1247 if e.name == "handler-tls" { 1248 continue 1249 } 1250 testConcurrentClientConnCloseAndServerGoAway(t, e) 1251 } 1252} 1253 1254func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { 1255 te := newTest(t, e) 1256 te.userAgent = testAppUA 1257 te.declareLogNoise( 1258 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1259 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1260 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1261 ) 1262 te.startServer(&testServer{security: e.security}) 1263 defer te.tearDown() 1264 1265 cc := te.clientConn() 1266 tc := testpb.NewTestServiceClient(cc) 1267 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1268 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1269 } 1270 ch := make(chan struct{}) 1271 // Close ClientConn and Server concurrently. 1272 go func() { 1273 te.srv.GracefulStop() 1274 close(ch) 1275 }() 1276 go func() { 1277 cc.Close() 1278 }() 1279 <-ch 1280} 1281 1282func (s) TestConcurrentServerStopAndGoAway(t *testing.T) { 1283 for _, e := range listTestEnv() { 1284 if e.name == "handler-tls" { 1285 continue 1286 } 1287 testConcurrentServerStopAndGoAway(t, e) 1288 } 1289} 1290 1291func testConcurrentServerStopAndGoAway(t *testing.T, e env) { 1292 te := newTest(t, e) 1293 te.userAgent = testAppUA 1294 te.declareLogNoise( 1295 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1296 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1297 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1298 ) 1299 te.startServer(&testServer{security: e.security}) 1300 defer te.tearDown() 1301 1302 cc := te.clientConn() 1303 tc := testpb.NewTestServiceClient(cc) 1304 stream, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)) 1305 if err != nil { 1306 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1307 } 1308 // Finish an RPC to make sure the connection is good. 1309 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1310 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err) 1311 } 1312 ch := make(chan struct{}) 1313 go func() { 1314 te.srv.GracefulStop() 1315 close(ch) 1316 }() 1317 // Loop until the server side GoAway signal is propagated to the client. 1318 for { 1319 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 1320 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 1321 cancel() 1322 break 1323 } 1324 cancel() 1325 } 1326 // Stop the server and close all the connections. 1327 te.srv.Stop() 1328 respParam := []*testpb.ResponseParameters{ 1329 { 1330 Size: 1, 1331 }, 1332 } 1333 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) 1334 if err != nil { 1335 t.Fatal(err) 1336 } 1337 req := &testpb.StreamingOutputCallRequest{ 1338 ResponseType: testpb.PayloadType_COMPRESSABLE, 1339 ResponseParameters: respParam, 1340 Payload: payload, 1341 } 1342 sendStart := time.Now() 1343 for { 1344 if err := stream.Send(req); err == io.EOF { 1345 // stream.Send should eventually send io.EOF 1346 break 1347 } else if err != nil { 1348 // Send should never return a transport-level error. 1349 t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err) 1350 } 1351 if time.Since(sendStart) > 2*time.Second { 1352 t.Fatalf("stream.Send(_) did not return io.EOF after 2s") 1353 } 1354 time.Sleep(time.Millisecond) 1355 } 1356 if _, err := stream.Recv(); err == nil || err == io.EOF { 1357 t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err) 1358 } 1359 <-ch 1360 awaitNewConnLogOutput() 1361} 1362 1363func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) { 1364 for _, e := range listTestEnv() { 1365 if e.name == "handler-tls" { 1366 continue 1367 } 1368 testClientConnCloseAfterGoAwayWithActiveStream(t, e) 1369 } 1370} 1371 1372func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) { 1373 te := newTest(t, e) 1374 te.startServer(&testServer{security: e.security}) 1375 defer te.tearDown() 1376 cc := te.clientConn() 1377 tc := testpb.NewTestServiceClient(cc) 1378 1379 ctx, cancel := context.WithCancel(context.Background()) 1380 defer cancel() 1381 if _, err := tc.FullDuplexCall(ctx); err != nil { 1382 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) 1383 } 1384 done := make(chan struct{}) 1385 go func() { 1386 te.srv.GracefulStop() 1387 close(done) 1388 }() 1389 time.Sleep(50 * time.Millisecond) 1390 cc.Close() 1391 timeout := time.NewTimer(time.Second) 1392 select { 1393 case <-done: 1394 case <-timeout.C: 1395 t.Fatalf("Test timed-out.") 1396 } 1397} 1398 1399func (s) TestFailFast(t *testing.T) { 1400 for _, e := range listTestEnv() { 1401 testFailFast(t, e) 1402 } 1403} 1404 1405func testFailFast(t *testing.T, e env) { 1406 te := newTest(t, e) 1407 te.userAgent = testAppUA 1408 te.declareLogNoise( 1409 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1410 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1411 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1412 ) 1413 te.startServer(&testServer{security: e.security}) 1414 defer te.tearDown() 1415 1416 cc := te.clientConn() 1417 tc := testpb.NewTestServiceClient(cc) 1418 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 1419 defer cancel() 1420 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 1421 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 1422 } 1423 // Stop the server and tear down all the existing connections. 1424 te.srv.Stop() 1425 // Loop until the server teardown is propagated to the client. 1426 for { 1427 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 1428 _, err := tc.EmptyCall(ctx, &testpb.Empty{}) 1429 cancel() 1430 if status.Code(err) == codes.Unavailable { 1431 break 1432 } 1433 t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err) 1434 time.Sleep(10 * time.Millisecond) 1435 } 1436 // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. 1437 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { 1438 t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable) 1439 } 1440 if _, err := tc.StreamingInputCall(context.Background()); status.Code(err) != codes.Unavailable { 1441 t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable) 1442 } 1443 1444 awaitNewConnLogOutput() 1445} 1446 1447func testServiceConfigSetup(t *testing.T, e env) *test { 1448 te := newTest(t, e) 1449 te.userAgent = testAppUA 1450 te.declareLogNoise( 1451 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 1452 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 1453 "grpc: addrConn.resetTransport failed to create client transport: connection error", 1454 "Failed to dial : context canceled; please retry.", 1455 ) 1456 return te 1457} 1458 1459func newBool(b bool) (a *bool) { 1460 return &b 1461} 1462 1463func newInt(b int) (a *int) { 1464 return &b 1465} 1466 1467func newDuration(b time.Duration) (a *time.Duration) { 1468 a = new(time.Duration) 1469 *a = b 1470 return 1471} 1472 1473func (s) TestGetMethodConfig(t *testing.T) { 1474 te := testServiceConfigSetup(t, tcpClearRREnv) 1475 defer te.tearDown() 1476 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1477 defer rcleanup() 1478 1479 te.resolverScheme = r.Scheme() 1480 cc := te.clientConn() 1481 addrs := []resolver.Address{{Addr: te.srvAddr}} 1482 r.UpdateState(resolver.State{ 1483 Addresses: addrs, 1484 ServiceConfig: `{ 1485 "methodConfig": [ 1486 { 1487 "name": [ 1488 { 1489 "service": "grpc.testing.TestService", 1490 "method": "EmptyCall" 1491 } 1492 ], 1493 "waitForReady": true, 1494 "timeout": ".001s" 1495 }, 1496 { 1497 "name": [ 1498 { 1499 "service": "grpc.testing.TestService" 1500 } 1501 ], 1502 "waitForReady": false 1503 } 1504 ] 1505}`}) 1506 1507 tc := testpb.NewTestServiceClient(cc) 1508 1509 // Make sure service config has been processed by grpc. 1510 for { 1511 if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { 1512 break 1513 } 1514 time.Sleep(time.Millisecond) 1515 } 1516 1517 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 1518 var err error 1519 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 1520 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1521 } 1522 1523 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: `{ 1524 "methodConfig": [ 1525 { 1526 "name": [ 1527 { 1528 "service": "grpc.testing.TestService", 1529 "method": "UnaryCall" 1530 } 1531 ], 1532 "waitForReady": true, 1533 "timeout": ".001s" 1534 }, 1535 { 1536 "name": [ 1537 { 1538 "service": "grpc.testing.TestService" 1539 } 1540 ], 1541 "waitForReady": false 1542 } 1543 ] 1544}`}) 1545 1546 // Make sure service config has been processed by grpc. 1547 for { 1548 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady { 1549 break 1550 } 1551 time.Sleep(time.Millisecond) 1552 } 1553 // The following RPCs are expected to become fail-fast. 1554 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { 1555 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) 1556 } 1557} 1558 1559func (s) TestServiceConfigWaitForReady(t *testing.T) { 1560 te := testServiceConfigSetup(t, tcpClearRREnv) 1561 defer te.tearDown() 1562 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1563 defer rcleanup() 1564 1565 // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. 1566 te.resolverScheme = r.Scheme() 1567 cc := te.clientConn() 1568 addrs := []resolver.Address{{Addr: te.srvAddr}} 1569 r.UpdateState(resolver.State{ 1570 Addresses: addrs, 1571 ServiceConfig: `{ 1572 "methodConfig": [ 1573 { 1574 "name": [ 1575 { 1576 "service": "grpc.testing.TestService", 1577 "method": "EmptyCall" 1578 }, 1579 { 1580 "service": "grpc.testing.TestService", 1581 "method": "FullDuplexCall" 1582 } 1583 ], 1584 "waitForReady": false, 1585 "timeout": ".001s" 1586 } 1587 ] 1588}`}) 1589 1590 tc := testpb.NewTestServiceClient(cc) 1591 1592 // Make sure service config has been processed by grpc. 1593 for { 1594 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil { 1595 break 1596 } 1597 time.Sleep(time.Millisecond) 1598 } 1599 1600 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 1601 var err error 1602 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1603 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1604 } 1605 if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1606 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1607 } 1608 1609 // Generate a service config update. 1610 // Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. 1611 r.UpdateState(resolver.State{ 1612 Addresses: addrs, 1613 ServiceConfig: `{ 1614 "methodConfig": [ 1615 { 1616 "name": [ 1617 { 1618 "service": "grpc.testing.TestService", 1619 "method": "EmptyCall" 1620 }, 1621 { 1622 "service": "grpc.testing.TestService", 1623 "method": "FullDuplexCall" 1624 } 1625 ], 1626 "waitForReady": true, 1627 "timeout": ".001s" 1628 } 1629 ] 1630}`}) 1631 1632 // Wait for the new service config to take effect. 1633 for { 1634 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady { 1635 break 1636 } 1637 time.Sleep(time.Millisecond) 1638 } 1639 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 1640 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 1641 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1642 } 1643 if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded { 1644 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1645 } 1646} 1647 1648func (s) TestServiceConfigTimeout(t *testing.T) { 1649 te := testServiceConfigSetup(t, tcpClearRREnv) 1650 defer te.tearDown() 1651 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1652 defer rcleanup() 1653 1654 // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 1655 te.resolverScheme = r.Scheme() 1656 cc := te.clientConn() 1657 addrs := []resolver.Address{{Addr: te.srvAddr}} 1658 r.UpdateState(resolver.State{ 1659 Addresses: addrs, 1660 ServiceConfig: `{ 1661 "methodConfig": [ 1662 { 1663 "name": [ 1664 { 1665 "service": "grpc.testing.TestService", 1666 "method": "EmptyCall" 1667 }, 1668 { 1669 "service": "grpc.testing.TestService", 1670 "method": "FullDuplexCall" 1671 } 1672 ], 1673 "waitForReady": true, 1674 "timeout": "3600s" 1675 } 1676 ] 1677}`}) 1678 1679 tc := testpb.NewTestServiceClient(cc) 1680 1681 // Make sure service config has been processed by grpc. 1682 for { 1683 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { 1684 break 1685 } 1686 time.Sleep(time.Millisecond) 1687 } 1688 1689 // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. 1690 var err error 1691 ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) 1692 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1693 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1694 } 1695 cancel() 1696 1697 ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) 1698 if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1699 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1700 } 1701 cancel() 1702 1703 // Generate a service config update. 1704 // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 1705 r.UpdateState(resolver.State{ 1706 Addresses: addrs, 1707 ServiceConfig: `{ 1708 "methodConfig": [ 1709 { 1710 "name": [ 1711 { 1712 "service": "grpc.testing.TestService", 1713 "method": "EmptyCall" 1714 }, 1715 { 1716 "service": "grpc.testing.TestService", 1717 "method": "FullDuplexCall" 1718 } 1719 ], 1720 "waitForReady": true, 1721 "timeout": ".000000001s" 1722 } 1723 ] 1724}`}) 1725 1726 // Wait for the new service config to take effect. 1727 for { 1728 if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond { 1729 break 1730 } 1731 time.Sleep(time.Millisecond) 1732 } 1733 1734 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 1735 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1736 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 1737 } 1738 cancel() 1739 1740 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 1741 if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 1742 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 1743 } 1744 cancel() 1745} 1746 1747func (s) TestServiceConfigMaxMsgSize(t *testing.T) { 1748 e := tcpClearRREnv 1749 r, rcleanup := manual.GenerateAndRegisterManualResolver() 1750 defer rcleanup() 1751 1752 // Setting up values and objects shared across all test cases. 1753 const smallSize = 1 1754 const largeSize = 1024 1755 const extraLargeSize = 2048 1756 1757 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 1758 if err != nil { 1759 t.Fatal(err) 1760 } 1761 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 1762 if err != nil { 1763 t.Fatal(err) 1764 } 1765 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) 1766 if err != nil { 1767 t.Fatal(err) 1768 } 1769 1770 scjs := `{ 1771 "methodConfig": [ 1772 { 1773 "name": [ 1774 { 1775 "service": "grpc.testing.TestService", 1776 "method": "UnaryCall" 1777 }, 1778 { 1779 "service": "grpc.testing.TestService", 1780 "method": "FullDuplexCall" 1781 } 1782 ], 1783 "maxRequestMessageBytes": 2048, 1784 "maxResponseMessageBytes": 2048 1785 } 1786 ] 1787}` 1788 1789 // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 1790 te1 := testServiceConfigSetup(t, e) 1791 defer te1.tearDown() 1792 1793 te1.resolverScheme = r.Scheme() 1794 te1.nonBlockingDial = true 1795 te1.startServer(&testServer{security: e.security}) 1796 cc1 := te1.clientConn() 1797 1798 addrs := []resolver.Address{{Addr: te1.srvAddr}} 1799 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: scjs}) 1800 tc := testpb.NewTestServiceClient(cc1) 1801 1802 req := &testpb.SimpleRequest{ 1803 ResponseType: testpb.PayloadType_COMPRESSABLE, 1804 ResponseSize: int32(extraLargeSize), 1805 Payload: smallPayload, 1806 } 1807 1808 for { 1809 if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { 1810 break 1811 } 1812 time.Sleep(time.Millisecond) 1813 } 1814 1815 // Test for unary RPC recv. 1816 if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { 1817 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1818 } 1819 1820 // Test for unary RPC send. 1821 req.Payload = extraLargePayload 1822 req.ResponseSize = int32(smallSize) 1823 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1824 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1825 } 1826 1827 // Test for streaming RPC recv. 1828 respParam := []*testpb.ResponseParameters{ 1829 { 1830 Size: int32(extraLargeSize), 1831 }, 1832 } 1833 sreq := &testpb.StreamingOutputCallRequest{ 1834 ResponseType: testpb.PayloadType_COMPRESSABLE, 1835 ResponseParameters: respParam, 1836 Payload: smallPayload, 1837 } 1838 stream, err := tc.FullDuplexCall(te1.ctx) 1839 if err != nil { 1840 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1841 } 1842 if err = stream.Send(sreq); err != nil { 1843 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1844 } 1845 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 1846 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 1847 } 1848 1849 // Test for streaming RPC send. 1850 respParam[0].Size = int32(smallSize) 1851 sreq.Payload = extraLargePayload 1852 stream, err = tc.FullDuplexCall(te1.ctx) 1853 if err != nil { 1854 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1855 } 1856 if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 1857 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 1858 } 1859 1860 // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 1861 te2 := testServiceConfigSetup(t, e) 1862 te2.resolverScheme = r.Scheme() 1863 te2.nonBlockingDial = true 1864 te2.maxClientReceiveMsgSize = newInt(1024) 1865 te2.maxClientSendMsgSize = newInt(1024) 1866 1867 te2.startServer(&testServer{security: e.security}) 1868 defer te2.tearDown() 1869 cc2 := te2.clientConn() 1870 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: scjs}) 1871 tc = testpb.NewTestServiceClient(cc2) 1872 1873 for { 1874 if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { 1875 break 1876 } 1877 time.Sleep(time.Millisecond) 1878 } 1879 1880 // Test for unary RPC recv. 1881 req.Payload = smallPayload 1882 req.ResponseSize = int32(largeSize) 1883 1884 if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted { 1885 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1886 } 1887 1888 // Test for unary RPC send. 1889 req.Payload = largePayload 1890 req.ResponseSize = int32(smallSize) 1891 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1892 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1893 } 1894 1895 // Test for streaming RPC recv. 1896 stream, err = tc.FullDuplexCall(te2.ctx) 1897 respParam[0].Size = int32(largeSize) 1898 sreq.Payload = smallPayload 1899 if err != nil { 1900 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1901 } 1902 if err = stream.Send(sreq); err != nil { 1903 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1904 } 1905 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 1906 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 1907 } 1908 1909 // Test for streaming RPC send. 1910 respParam[0].Size = int32(smallSize) 1911 sreq.Payload = largePayload 1912 stream, err = tc.FullDuplexCall(te2.ctx) 1913 if err != nil { 1914 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1915 } 1916 if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 1917 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 1918 } 1919 1920 // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 1921 te3 := testServiceConfigSetup(t, e) 1922 te3.resolverScheme = r.Scheme() 1923 te3.nonBlockingDial = true 1924 te3.maxClientReceiveMsgSize = newInt(4096) 1925 te3.maxClientSendMsgSize = newInt(4096) 1926 1927 te3.startServer(&testServer{security: e.security}) 1928 defer te3.tearDown() 1929 1930 cc3 := te3.clientConn() 1931 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: scjs}) 1932 tc = testpb.NewTestServiceClient(cc3) 1933 1934 for { 1935 if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { 1936 break 1937 } 1938 time.Sleep(time.Millisecond) 1939 } 1940 1941 // Test for unary RPC recv. 1942 req.Payload = smallPayload 1943 req.ResponseSize = int32(largeSize) 1944 1945 if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err != nil { 1946 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 1947 } 1948 1949 req.ResponseSize = int32(extraLargeSize) 1950 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1951 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1952 } 1953 1954 // Test for unary RPC send. 1955 req.Payload = largePayload 1956 req.ResponseSize = int32(smallSize) 1957 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 1958 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 1959 } 1960 1961 req.Payload = extraLargePayload 1962 if _, err = tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 1963 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 1964 } 1965 1966 // Test for streaming RPC recv. 1967 stream, err = tc.FullDuplexCall(te3.ctx) 1968 if err != nil { 1969 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1970 } 1971 respParam[0].Size = int32(largeSize) 1972 sreq.Payload = smallPayload 1973 1974 if err = stream.Send(sreq); err != nil { 1975 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1976 } 1977 if _, err = stream.Recv(); err != nil { 1978 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err) 1979 } 1980 1981 respParam[0].Size = int32(extraLargeSize) 1982 1983 if err = stream.Send(sreq); err != nil { 1984 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1985 } 1986 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 1987 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 1988 } 1989 1990 // Test for streaming RPC send. 1991 respParam[0].Size = int32(smallSize) 1992 sreq.Payload = largePayload 1993 stream, err = tc.FullDuplexCall(te3.ctx) 1994 if err != nil { 1995 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 1996 } 1997 if err := stream.Send(sreq); err != nil { 1998 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 1999 } 2000 sreq.Payload = extraLargePayload 2001 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 2002 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 2003 } 2004} 2005 2006// Reading from a streaming RPC may fail with context canceled if timeout was 2007// set by service config (https://github.com/grpc/grpc-go/issues/1818). This 2008// test makes sure read from streaming RPC doesn't fail in this case. 2009func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) { 2010 te := testServiceConfigSetup(t, tcpClearRREnv) 2011 te.startServer(&testServer{security: tcpClearRREnv.security}) 2012 defer te.tearDown() 2013 r, rcleanup := manual.GenerateAndRegisterManualResolver() 2014 defer rcleanup() 2015 2016 te.resolverScheme = r.Scheme() 2017 te.nonBlockingDial = true 2018 cc := te.clientConn() 2019 tc := testpb.NewTestServiceClient(cc) 2020 2021 r.UpdateState(resolver.State{ 2022 Addresses: []resolver.Address{{Addr: te.srvAddr}}, 2023 ServiceConfig: `{ 2024 "methodConfig": [ 2025 { 2026 "name": [ 2027 { 2028 "service": "grpc.testing.TestService", 2029 "method": "FullDuplexCall" 2030 } 2031 ], 2032 "waitForReady": true, 2033 "timeout": "10s" 2034 } 2035 ] 2036 }`}) 2037 // Make sure service config has been processed by grpc. 2038 for { 2039 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { 2040 break 2041 } 2042 time.Sleep(time.Millisecond) 2043 } 2044 2045 ctx, cancel := context.WithCancel(context.Background()) 2046 defer cancel() 2047 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) 2048 if err != nil { 2049 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err) 2050 } 2051 2052 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0) 2053 if err != nil { 2054 t.Fatalf("failed to newPayload: %v", err) 2055 } 2056 req := &testpb.StreamingOutputCallRequest{ 2057 ResponseType: testpb.PayloadType_COMPRESSABLE, 2058 ResponseParameters: []*testpb.ResponseParameters{{Size: 0}}, 2059 Payload: payload, 2060 } 2061 if err := stream.Send(req); err != nil { 2062 t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err) 2063 } 2064 stream.CloseSend() 2065 time.Sleep(time.Second) 2066 // Sleep 1 second before recv to make sure the final status is received 2067 // before the recv. 2068 if _, err := stream.Recv(); err != nil { 2069 t.Fatalf("stream.Recv = _, %v, want _, <nil>", err) 2070 } 2071 // Keep reading to drain the stream. 2072 for { 2073 if _, err := stream.Recv(); err != nil { 2074 break 2075 } 2076 } 2077} 2078 2079func (s) TestMaxMsgSizeClientDefault(t *testing.T) { 2080 for _, e := range listTestEnv() { 2081 testMaxMsgSizeClientDefault(t, e) 2082 } 2083} 2084 2085func testMaxMsgSizeClientDefault(t *testing.T, e env) { 2086 te := newTest(t, e) 2087 te.userAgent = testAppUA 2088 te.declareLogNoise( 2089 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2090 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2091 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2092 "Failed to dial : context canceled; please retry.", 2093 ) 2094 te.startServer(&testServer{security: e.security}) 2095 2096 defer te.tearDown() 2097 tc := testpb.NewTestServiceClient(te.clientConn()) 2098 2099 const smallSize = 1 2100 const largeSize = 4 * 1024 * 1024 2101 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2102 if err != nil { 2103 t.Fatal(err) 2104 } 2105 req := &testpb.SimpleRequest{ 2106 ResponseType: testpb.PayloadType_COMPRESSABLE, 2107 ResponseSize: int32(largeSize), 2108 Payload: smallPayload, 2109 } 2110 // Test for unary RPC recv. 2111 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2112 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2113 } 2114 2115 respParam := []*testpb.ResponseParameters{ 2116 { 2117 Size: int32(largeSize), 2118 }, 2119 } 2120 sreq := &testpb.StreamingOutputCallRequest{ 2121 ResponseType: testpb.PayloadType_COMPRESSABLE, 2122 ResponseParameters: respParam, 2123 Payload: smallPayload, 2124 } 2125 2126 // Test for streaming RPC recv. 2127 stream, err := tc.FullDuplexCall(te.ctx) 2128 if err != nil { 2129 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2130 } 2131 if err := stream.Send(sreq); err != nil { 2132 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2133 } 2134 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2135 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2136 } 2137} 2138 2139func (s) TestMaxMsgSizeClientAPI(t *testing.T) { 2140 for _, e := range listTestEnv() { 2141 testMaxMsgSizeClientAPI(t, e) 2142 } 2143} 2144 2145func testMaxMsgSizeClientAPI(t *testing.T, e env) { 2146 te := newTest(t, e) 2147 te.userAgent = testAppUA 2148 // To avoid error on server side. 2149 te.maxServerSendMsgSize = newInt(5 * 1024 * 1024) 2150 te.maxClientReceiveMsgSize = newInt(1024) 2151 te.maxClientSendMsgSize = newInt(1024) 2152 te.declareLogNoise( 2153 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2154 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2155 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2156 "Failed to dial : context canceled; please retry.", 2157 ) 2158 te.startServer(&testServer{security: e.security}) 2159 2160 defer te.tearDown() 2161 tc := testpb.NewTestServiceClient(te.clientConn()) 2162 2163 const smallSize = 1 2164 const largeSize = 1024 2165 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2166 if err != nil { 2167 t.Fatal(err) 2168 } 2169 2170 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 2171 if err != nil { 2172 t.Fatal(err) 2173 } 2174 req := &testpb.SimpleRequest{ 2175 ResponseType: testpb.PayloadType_COMPRESSABLE, 2176 ResponseSize: int32(largeSize), 2177 Payload: smallPayload, 2178 } 2179 // Test for unary RPC recv. 2180 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2181 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2182 } 2183 2184 // Test for unary RPC send. 2185 req.Payload = largePayload 2186 req.ResponseSize = int32(smallSize) 2187 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2188 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2189 } 2190 2191 respParam := []*testpb.ResponseParameters{ 2192 { 2193 Size: int32(largeSize), 2194 }, 2195 } 2196 sreq := &testpb.StreamingOutputCallRequest{ 2197 ResponseType: testpb.PayloadType_COMPRESSABLE, 2198 ResponseParameters: respParam, 2199 Payload: smallPayload, 2200 } 2201 2202 // Test for streaming RPC recv. 2203 stream, err := tc.FullDuplexCall(te.ctx) 2204 if err != nil { 2205 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2206 } 2207 if err := stream.Send(sreq); err != nil { 2208 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2209 } 2210 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2211 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2212 } 2213 2214 // Test for streaming RPC send. 2215 respParam[0].Size = int32(smallSize) 2216 sreq.Payload = largePayload 2217 stream, err = tc.FullDuplexCall(te.ctx) 2218 if err != nil { 2219 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2220 } 2221 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 2222 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 2223 } 2224} 2225 2226func (s) TestMaxMsgSizeServerAPI(t *testing.T) { 2227 for _, e := range listTestEnv() { 2228 testMaxMsgSizeServerAPI(t, e) 2229 } 2230} 2231 2232func testMaxMsgSizeServerAPI(t *testing.T, e env) { 2233 te := newTest(t, e) 2234 te.userAgent = testAppUA 2235 te.maxServerReceiveMsgSize = newInt(1024) 2236 te.maxServerSendMsgSize = newInt(1024) 2237 te.declareLogNoise( 2238 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2239 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2240 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2241 "Failed to dial : context canceled; please retry.", 2242 ) 2243 te.startServer(&testServer{security: e.security}) 2244 2245 defer te.tearDown() 2246 tc := testpb.NewTestServiceClient(te.clientConn()) 2247 2248 const smallSize = 1 2249 const largeSize = 1024 2250 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2251 if err != nil { 2252 t.Fatal(err) 2253 } 2254 2255 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 2256 if err != nil { 2257 t.Fatal(err) 2258 } 2259 req := &testpb.SimpleRequest{ 2260 ResponseType: testpb.PayloadType_COMPRESSABLE, 2261 ResponseSize: int32(largeSize), 2262 Payload: smallPayload, 2263 } 2264 // Test for unary RPC send. 2265 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2266 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2267 } 2268 2269 // Test for unary RPC recv. 2270 req.Payload = largePayload 2271 req.ResponseSize = int32(smallSize) 2272 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2273 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2274 } 2275 2276 respParam := []*testpb.ResponseParameters{ 2277 { 2278 Size: int32(largeSize), 2279 }, 2280 } 2281 sreq := &testpb.StreamingOutputCallRequest{ 2282 ResponseType: testpb.PayloadType_COMPRESSABLE, 2283 ResponseParameters: respParam, 2284 Payload: smallPayload, 2285 } 2286 2287 // Test for streaming RPC send. 2288 stream, err := tc.FullDuplexCall(te.ctx) 2289 if err != nil { 2290 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2291 } 2292 if err := stream.Send(sreq); err != nil { 2293 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2294 } 2295 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2296 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2297 } 2298 2299 // Test for streaming RPC recv. 2300 respParam[0].Size = int32(smallSize) 2301 sreq.Payload = largePayload 2302 stream, err = tc.FullDuplexCall(te.ctx) 2303 if err != nil { 2304 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2305 } 2306 if err := stream.Send(sreq); err != nil { 2307 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2308 } 2309 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2310 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2311 } 2312} 2313 2314func (s) TestTap(t *testing.T) { 2315 for _, e := range listTestEnv() { 2316 if e.name == "handler-tls" { 2317 continue 2318 } 2319 testTap(t, e) 2320 } 2321} 2322 2323type myTap struct { 2324 cnt int 2325} 2326 2327func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) { 2328 if info != nil { 2329 if info.FullMethodName == "/grpc.testing.TestService/EmptyCall" { 2330 t.cnt++ 2331 } else if info.FullMethodName == "/grpc.testing.TestService/UnaryCall" { 2332 return nil, fmt.Errorf("tap error") 2333 } 2334 } 2335 return ctx, nil 2336} 2337 2338func testTap(t *testing.T, e env) { 2339 te := newTest(t, e) 2340 te.userAgent = testAppUA 2341 ttap := &myTap{} 2342 te.tapHandle = ttap.handle 2343 te.declareLogNoise( 2344 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 2345 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 2346 "grpc: addrConn.resetTransport failed to create client transport: connection error", 2347 ) 2348 te.startServer(&testServer{security: e.security}) 2349 defer te.tearDown() 2350 2351 cc := te.clientConn() 2352 tc := testpb.NewTestServiceClient(cc) 2353 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 2354 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 2355 } 2356 if ttap.cnt != 1 { 2357 t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt) 2358 } 2359 2360 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31) 2361 if err != nil { 2362 t.Fatal(err) 2363 } 2364 2365 req := &testpb.SimpleRequest{ 2366 ResponseType: testpb.PayloadType_COMPRESSABLE, 2367 ResponseSize: 45, 2368 Payload: payload, 2369 } 2370 if _, err := tc.UnaryCall(context.Background(), req); status.Code(err) != codes.Unavailable { 2371 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) 2372 } 2373} 2374 2375func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { 2376 ctx, cancel := context.WithTimeout(context.Background(), d) 2377 defer cancel() 2378 hc := healthgrpc.NewHealthClient(cc) 2379 req := &healthpb.HealthCheckRequest{ 2380 Service: serviceName, 2381 } 2382 return hc.Check(ctx, req) 2383} 2384 2385func (s) TestHealthCheckOnSuccess(t *testing.T) { 2386 for _, e := range listTestEnv() { 2387 testHealthCheckOnSuccess(t, e) 2388 } 2389} 2390 2391func testHealthCheckOnSuccess(t *testing.T, e env) { 2392 te := newTest(t, e) 2393 hs := health.NewServer() 2394 hs.SetServingStatus("grpc.health.v1.Health", 1) 2395 te.healthServer = hs 2396 te.startServer(&testServer{security: e.security}) 2397 defer te.tearDown() 2398 2399 cc := te.clientConn() 2400 if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); err != nil { 2401 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) 2402 } 2403} 2404 2405func (s) TestHealthCheckOnFailure(t *testing.T) { 2406 for _, e := range listTestEnv() { 2407 testHealthCheckOnFailure(t, e) 2408 } 2409} 2410 2411func testHealthCheckOnFailure(t *testing.T, e env) { 2412 te := newTest(t, e) 2413 te.declareLogNoise( 2414 "Failed to dial ", 2415 "grpc: the client connection is closing; please retry", 2416 ) 2417 hs := health.NewServer() 2418 hs.SetServingStatus("grpc.health.v1.HealthCheck", 1) 2419 te.healthServer = hs 2420 te.startServer(&testServer{security: e.security}) 2421 defer te.tearDown() 2422 2423 cc := te.clientConn() 2424 wantErr := status.Error(codes.DeadlineExceeded, "context deadline exceeded") 2425 if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) { 2426 t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.DeadlineExceeded) 2427 } 2428 awaitNewConnLogOutput() 2429} 2430 2431func (s) TestHealthCheckOff(t *testing.T) { 2432 for _, e := range listTestEnv() { 2433 // TODO(bradfitz): Temporarily skip this env due to #619. 2434 if e.name == "handler-tls" { 2435 continue 2436 } 2437 testHealthCheckOff(t, e) 2438 } 2439} 2440 2441func testHealthCheckOff(t *testing.T, e env) { 2442 te := newTest(t, e) 2443 te.startServer(&testServer{security: e.security}) 2444 defer te.tearDown() 2445 want := status.Error(codes.Unimplemented, "unknown service grpc.health.v1.Health") 2446 if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) { 2447 t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want) 2448 } 2449} 2450 2451func (s) TestHealthWatchMultipleClients(t *testing.T) { 2452 for _, e := range listTestEnv() { 2453 testHealthWatchMultipleClients(t, e) 2454 } 2455} 2456 2457func testHealthWatchMultipleClients(t *testing.T, e env) { 2458 const service = "grpc.health.v1.Health1" 2459 2460 hs := health.NewServer() 2461 2462 te := newTest(t, e) 2463 te.healthServer = hs 2464 te.startServer(&testServer{security: e.security}) 2465 defer te.tearDown() 2466 2467 cc := te.clientConn() 2468 hc := healthgrpc.NewHealthClient(cc) 2469 2470 ctx, cancel := context.WithCancel(context.Background()) 2471 defer cancel() 2472 2473 req := &healthpb.HealthCheckRequest{ 2474 Service: service, 2475 } 2476 2477 stream1, err := hc.Watch(ctx, req) 2478 if err != nil { 2479 t.Fatalf("error: %v", err) 2480 } 2481 2482 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2483 2484 stream2, err := hc.Watch(ctx, req) 2485 if err != nil { 2486 t.Fatalf("error: %v", err) 2487 } 2488 2489 healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2490 2491 hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING) 2492 2493 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING) 2494 healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING) 2495} 2496 2497func (s) TestHealthWatchSameStatus(t *testing.T) { 2498 for _, e := range listTestEnv() { 2499 testHealthWatchSameStatus(t, e) 2500 } 2501} 2502 2503func testHealthWatchSameStatus(t *testing.T, e env) { 2504 const service = "grpc.health.v1.Health1" 2505 2506 hs := health.NewServer() 2507 2508 te := newTest(t, e) 2509 te.healthServer = hs 2510 te.startServer(&testServer{security: e.security}) 2511 defer te.tearDown() 2512 2513 cc := te.clientConn() 2514 hc := healthgrpc.NewHealthClient(cc) 2515 2516 ctx, cancel := context.WithCancel(context.Background()) 2517 defer cancel() 2518 2519 req := &healthpb.HealthCheckRequest{ 2520 Service: service, 2521 } 2522 2523 stream1, err := hc.Watch(ctx, req) 2524 if err != nil { 2525 t.Fatalf("error: %v", err) 2526 } 2527 2528 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2529 2530 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2531 2532 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVING) 2533 2534 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2535 hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING) 2536 2537 healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING) 2538} 2539 2540func (s) TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) { 2541 for _, e := range listTestEnv() { 2542 testHealthWatchSetServiceStatusBeforeStartingServer(t, e) 2543 } 2544} 2545 2546func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) { 2547 const service = "grpc.health.v1.Health1" 2548 2549 hs := health.NewServer() 2550 2551 te := newTest(t, e) 2552 te.healthServer = hs 2553 2554 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2555 2556 te.startServer(&testServer{security: e.security}) 2557 defer te.tearDown() 2558 2559 cc := te.clientConn() 2560 hc := healthgrpc.NewHealthClient(cc) 2561 2562 ctx, cancel := context.WithCancel(context.Background()) 2563 defer cancel() 2564 2565 req := &healthpb.HealthCheckRequest{ 2566 Service: service, 2567 } 2568 2569 stream, err := hc.Watch(ctx, req) 2570 if err != nil { 2571 t.Fatalf("error: %v", err) 2572 } 2573 2574 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2575} 2576 2577func (s) TestHealthWatchDefaultStatusChange(t *testing.T) { 2578 for _, e := range listTestEnv() { 2579 testHealthWatchDefaultStatusChange(t, e) 2580 } 2581} 2582 2583func testHealthWatchDefaultStatusChange(t *testing.T, e env) { 2584 const service = "grpc.health.v1.Health1" 2585 2586 hs := health.NewServer() 2587 2588 te := newTest(t, e) 2589 te.healthServer = hs 2590 te.startServer(&testServer{security: e.security}) 2591 defer te.tearDown() 2592 2593 cc := te.clientConn() 2594 hc := healthgrpc.NewHealthClient(cc) 2595 2596 ctx, cancel := context.WithCancel(context.Background()) 2597 defer cancel() 2598 2599 req := &healthpb.HealthCheckRequest{ 2600 Service: service, 2601 } 2602 2603 stream, err := hc.Watch(ctx, req) 2604 if err != nil { 2605 t.Fatalf("error: %v", err) 2606 } 2607 2608 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN) 2609 2610 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2611 2612 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2613} 2614 2615func (s) TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) { 2616 for _, e := range listTestEnv() { 2617 testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e) 2618 } 2619} 2620 2621func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) { 2622 const service = "grpc.health.v1.Health1" 2623 2624 hs := health.NewServer() 2625 2626 te := newTest(t, e) 2627 te.healthServer = hs 2628 te.startServer(&testServer{security: e.security}) 2629 defer te.tearDown() 2630 2631 cc := te.clientConn() 2632 hc := healthgrpc.NewHealthClient(cc) 2633 2634 ctx, cancel := context.WithCancel(context.Background()) 2635 defer cancel() 2636 2637 req := &healthpb.HealthCheckRequest{ 2638 Service: service, 2639 } 2640 2641 hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING) 2642 2643 stream, err := hc.Watch(ctx, req) 2644 if err != nil { 2645 t.Fatalf("error: %v", err) 2646 } 2647 2648 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2649} 2650 2651func (s) TestHealthWatchOverallServerHealthChange(t *testing.T) { 2652 for _, e := range listTestEnv() { 2653 testHealthWatchOverallServerHealthChange(t, e) 2654 } 2655} 2656 2657func testHealthWatchOverallServerHealthChange(t *testing.T, e env) { 2658 const service = "" 2659 2660 hs := health.NewServer() 2661 2662 te := newTest(t, e) 2663 te.healthServer = hs 2664 te.startServer(&testServer{security: e.security}) 2665 defer te.tearDown() 2666 2667 cc := te.clientConn() 2668 hc := healthgrpc.NewHealthClient(cc) 2669 2670 ctx, cancel := context.WithCancel(context.Background()) 2671 defer cancel() 2672 2673 req := &healthpb.HealthCheckRequest{ 2674 Service: service, 2675 } 2676 2677 stream, err := hc.Watch(ctx, req) 2678 if err != nil { 2679 t.Fatalf("error: %v", err) 2680 } 2681 2682 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING) 2683 2684 hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING) 2685 2686 healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING) 2687} 2688 2689func healthWatchChecker(t *testing.T, stream healthgrpc.Health_WatchClient, expectedServingStatus healthpb.HealthCheckResponse_ServingStatus) { 2690 response, err := stream.Recv() 2691 if err != nil { 2692 t.Fatalf("error on %v.Recv(): %v", stream, err) 2693 } 2694 if response.Status != expectedServingStatus { 2695 t.Fatalf("response.Status is %v (%v expected)", response.Status, expectedServingStatus) 2696 } 2697} 2698 2699func (s) TestUnknownHandler(t *testing.T) { 2700 // An example unknownHandler that returns a different code and a different method, making sure that we do not 2701 // expose what methods are implemented to a client that is not authenticated. 2702 unknownHandler := func(srv interface{}, stream grpc.ServerStream) error { 2703 return status.Error(codes.Unauthenticated, "user unauthenticated") 2704 } 2705 for _, e := range listTestEnv() { 2706 // TODO(bradfitz): Temporarily skip this env due to #619. 2707 if e.name == "handler-tls" { 2708 continue 2709 } 2710 testUnknownHandler(t, e, unknownHandler) 2711 } 2712} 2713 2714func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) { 2715 te := newTest(t, e) 2716 te.unknownHandler = unknownHandler 2717 te.startServer(&testServer{security: e.security}) 2718 defer te.tearDown() 2719 want := status.Error(codes.Unauthenticated, "user unauthenticated") 2720 if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) { 2721 t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want) 2722 } 2723} 2724 2725func (s) TestHealthCheckServingStatus(t *testing.T) { 2726 for _, e := range listTestEnv() { 2727 testHealthCheckServingStatus(t, e) 2728 } 2729} 2730 2731func testHealthCheckServingStatus(t *testing.T, e env) { 2732 te := newTest(t, e) 2733 hs := health.NewServer() 2734 te.healthServer = hs 2735 te.startServer(&testServer{security: e.security}) 2736 defer te.tearDown() 2737 2738 cc := te.clientConn() 2739 out, err := healthCheck(1*time.Second, cc, "") 2740 if err != nil { 2741 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) 2742 } 2743 if out.Status != healthpb.HealthCheckResponse_SERVING { 2744 t.Fatalf("Got the serving status %v, want SERVING", out.Status) 2745 } 2746 wantErr := status.Error(codes.NotFound, "unknown service") 2747 if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) { 2748 t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.NotFound) 2749 } 2750 hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_SERVING) 2751 out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health") 2752 if err != nil { 2753 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) 2754 } 2755 if out.Status != healthpb.HealthCheckResponse_SERVING { 2756 t.Fatalf("Got the serving status %v, want SERVING", out.Status) 2757 } 2758 hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_NOT_SERVING) 2759 out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health") 2760 if err != nil { 2761 t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err) 2762 } 2763 if out.Status != healthpb.HealthCheckResponse_NOT_SERVING { 2764 t.Fatalf("Got the serving status %v, want NOT_SERVING", out.Status) 2765 } 2766 2767} 2768 2769func (s) TestEmptyUnaryWithUserAgent(t *testing.T) { 2770 for _, e := range listTestEnv() { 2771 testEmptyUnaryWithUserAgent(t, e) 2772 } 2773} 2774 2775func testEmptyUnaryWithUserAgent(t *testing.T, e env) { 2776 te := newTest(t, e) 2777 te.userAgent = testAppUA 2778 te.startServer(&testServer{security: e.security}) 2779 defer te.tearDown() 2780 2781 cc := te.clientConn() 2782 tc := testpb.NewTestServiceClient(cc) 2783 var header metadata.MD 2784 reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header)) 2785 if err != nil || !proto.Equal(&testpb.Empty{}, reply) { 2786 t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{}) 2787 } 2788 if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) { 2789 t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA) 2790 } 2791 2792 te.srv.Stop() 2793} 2794 2795func (s) TestFailedEmptyUnary(t *testing.T) { 2796 for _, e := range listTestEnv() { 2797 if e.name == "handler-tls" { 2798 // This test covers status details, but 2799 // Grpc-Status-Details-Bin is not support in handler_server. 2800 continue 2801 } 2802 testFailedEmptyUnary(t, e) 2803 } 2804} 2805 2806func testFailedEmptyUnary(t *testing.T, e env) { 2807 te := newTest(t, e) 2808 te.userAgent = failAppUA 2809 te.startServer(&testServer{security: e.security}) 2810 defer te.tearDown() 2811 tc := testpb.NewTestServiceClient(te.clientConn()) 2812 2813 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 2814 wantErr := detailedError 2815 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !reflect.DeepEqual(err, wantErr) { 2816 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr) 2817 } 2818} 2819 2820func (s) TestLargeUnary(t *testing.T) { 2821 for _, e := range listTestEnv() { 2822 testLargeUnary(t, e) 2823 } 2824} 2825 2826func testLargeUnary(t *testing.T, e env) { 2827 te := newTest(t, e) 2828 te.startServer(&testServer{security: e.security}) 2829 defer te.tearDown() 2830 tc := testpb.NewTestServiceClient(te.clientConn()) 2831 2832 const argSize = 271828 2833 const respSize = 314159 2834 2835 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 2836 if err != nil { 2837 t.Fatal(err) 2838 } 2839 2840 req := &testpb.SimpleRequest{ 2841 ResponseType: testpb.PayloadType_COMPRESSABLE, 2842 ResponseSize: respSize, 2843 Payload: payload, 2844 } 2845 reply, err := tc.UnaryCall(context.Background(), req) 2846 if err != nil { 2847 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 2848 } 2849 pt := reply.GetPayload().GetType() 2850 ps := len(reply.GetPayload().GetBody()) 2851 if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize { 2852 t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize) 2853 } 2854} 2855 2856// Test backward-compatibility API for setting msg size limit. 2857func (s) TestExceedMsgLimit(t *testing.T) { 2858 for _, e := range listTestEnv() { 2859 testExceedMsgLimit(t, e) 2860 } 2861} 2862 2863func testExceedMsgLimit(t *testing.T, e env) { 2864 te := newTest(t, e) 2865 te.maxMsgSize = newInt(1024) 2866 te.startServer(&testServer{security: e.security}) 2867 defer te.tearDown() 2868 tc := testpb.NewTestServiceClient(te.clientConn()) 2869 2870 argSize := int32(*te.maxMsgSize + 1) 2871 const smallSize = 1 2872 2873 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 2874 if err != nil { 2875 t.Fatal(err) 2876 } 2877 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 2878 if err != nil { 2879 t.Fatal(err) 2880 } 2881 2882 // Test on server side for unary RPC. 2883 req := &testpb.SimpleRequest{ 2884 ResponseType: testpb.PayloadType_COMPRESSABLE, 2885 ResponseSize: smallSize, 2886 Payload: payload, 2887 } 2888 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2889 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2890 } 2891 // Test on client side for unary RPC. 2892 req.ResponseSize = int32(*te.maxMsgSize) + 1 2893 req.Payload = smallPayload 2894 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 2895 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 2896 } 2897 2898 // Test on server side for streaming RPC. 2899 stream, err := tc.FullDuplexCall(te.ctx) 2900 if err != nil { 2901 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2902 } 2903 respParam := []*testpb.ResponseParameters{ 2904 { 2905 Size: 1, 2906 }, 2907 } 2908 2909 spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1)) 2910 if err != nil { 2911 t.Fatal(err) 2912 } 2913 2914 sreq := &testpb.StreamingOutputCallRequest{ 2915 ResponseType: testpb.PayloadType_COMPRESSABLE, 2916 ResponseParameters: respParam, 2917 Payload: spayload, 2918 } 2919 if err := stream.Send(sreq); err != nil { 2920 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2921 } 2922 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2923 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2924 } 2925 2926 // Test on client side for streaming RPC. 2927 stream, err = tc.FullDuplexCall(te.ctx) 2928 if err != nil { 2929 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 2930 } 2931 respParam[0].Size = int32(*te.maxMsgSize) + 1 2932 sreq.Payload = smallPayload 2933 if err := stream.Send(sreq); err != nil { 2934 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 2935 } 2936 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 2937 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 2938 } 2939 2940} 2941 2942func (s) TestPeerClientSide(t *testing.T) { 2943 for _, e := range listTestEnv() { 2944 testPeerClientSide(t, e) 2945 } 2946} 2947 2948func testPeerClientSide(t *testing.T, e env) { 2949 te := newTest(t, e) 2950 te.userAgent = testAppUA 2951 te.startServer(&testServer{security: e.security}) 2952 defer te.tearDown() 2953 tc := testpb.NewTestServiceClient(te.clientConn()) 2954 peer := new(peer.Peer) 2955 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { 2956 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 2957 } 2958 pa := peer.Addr.String() 2959 if e.network == "unix" { 2960 if pa != te.srvAddr { 2961 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr) 2962 } 2963 return 2964 } 2965 _, pp, err := net.SplitHostPort(pa) 2966 if err != nil { 2967 t.Fatalf("Failed to parse address from peer.") 2968 } 2969 _, sp, err := net.SplitHostPort(te.srvAddr) 2970 if err != nil { 2971 t.Fatalf("Failed to parse address of test server.") 2972 } 2973 if pp != sp { 2974 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp) 2975 } 2976} 2977 2978// TestPeerNegative tests that if call fails setting peer 2979// doesn't cause a segmentation fault. 2980// issue#1141 https://github.com/grpc/grpc-go/issues/1141 2981func (s) TestPeerNegative(t *testing.T) { 2982 for _, e := range listTestEnv() { 2983 testPeerNegative(t, e) 2984 } 2985} 2986 2987func testPeerNegative(t *testing.T, e env) { 2988 te := newTest(t, e) 2989 te.startServer(&testServer{security: e.security}) 2990 defer te.tearDown() 2991 2992 cc := te.clientConn() 2993 tc := testpb.NewTestServiceClient(cc) 2994 peer := new(peer.Peer) 2995 ctx, cancel := context.WithCancel(context.Background()) 2996 cancel() 2997 tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)) 2998} 2999 3000func (s) TestPeerFailedRPC(t *testing.T) { 3001 for _, e := range listTestEnv() { 3002 testPeerFailedRPC(t, e) 3003 } 3004} 3005 3006func testPeerFailedRPC(t *testing.T, e env) { 3007 te := newTest(t, e) 3008 te.maxServerReceiveMsgSize = newInt(1 * 1024) 3009 te.startServer(&testServer{security: e.security}) 3010 3011 defer te.tearDown() 3012 tc := testpb.NewTestServiceClient(te.clientConn()) 3013 3014 // first make a successful request to the server 3015 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 3016 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) 3017 } 3018 3019 // make a second request that will be rejected by the server 3020 const largeSize = 5 * 1024 3021 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 3022 if err != nil { 3023 t.Fatal(err) 3024 } 3025 req := &testpb.SimpleRequest{ 3026 ResponseType: testpb.PayloadType_COMPRESSABLE, 3027 Payload: largePayload, 3028 } 3029 3030 peer := new(peer.Peer) 3031 if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted { 3032 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 3033 } else { 3034 pa := peer.Addr.String() 3035 if e.network == "unix" { 3036 if pa != te.srvAddr { 3037 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr) 3038 } 3039 return 3040 } 3041 _, pp, err := net.SplitHostPort(pa) 3042 if err != nil { 3043 t.Fatalf("Failed to parse address from peer.") 3044 } 3045 _, sp, err := net.SplitHostPort(te.srvAddr) 3046 if err != nil { 3047 t.Fatalf("Failed to parse address of test server.") 3048 } 3049 if pp != sp { 3050 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp) 3051 } 3052 } 3053} 3054 3055func (s) TestMetadataUnaryRPC(t *testing.T) { 3056 for _, e := range listTestEnv() { 3057 testMetadataUnaryRPC(t, e) 3058 } 3059} 3060 3061func testMetadataUnaryRPC(t *testing.T, e env) { 3062 te := newTest(t, e) 3063 te.startServer(&testServer{security: e.security}) 3064 defer te.tearDown() 3065 tc := testpb.NewTestServiceClient(te.clientConn()) 3066 3067 const argSize = 2718 3068 const respSize = 314 3069 3070 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3071 if err != nil { 3072 t.Fatal(err) 3073 } 3074 3075 req := &testpb.SimpleRequest{ 3076 ResponseType: testpb.PayloadType_COMPRESSABLE, 3077 ResponseSize: respSize, 3078 Payload: payload, 3079 } 3080 var header, trailer metadata.MD 3081 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3082 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil { 3083 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3084 } 3085 // Ignore optional response headers that Servers may set: 3086 if header != nil { 3087 delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers 3088 delete(header, "date") // the Date header is also optional 3089 delete(header, "user-agent") 3090 delete(header, "content-type") 3091 } 3092 if !reflect.DeepEqual(header, testMetadata) { 3093 t.Fatalf("Received header metadata %v, want %v", header, testMetadata) 3094 } 3095 if !reflect.DeepEqual(trailer, testTrailerMetadata) { 3096 t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata) 3097 } 3098} 3099 3100func (s) TestMultipleSetTrailerUnaryRPC(t *testing.T) { 3101 for _, e := range listTestEnv() { 3102 testMultipleSetTrailerUnaryRPC(t, e) 3103 } 3104} 3105 3106func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) { 3107 te := newTest(t, e) 3108 te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) 3109 defer te.tearDown() 3110 tc := testpb.NewTestServiceClient(te.clientConn()) 3111 3112 const ( 3113 argSize = 1 3114 respSize = 1 3115 ) 3116 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3117 if err != nil { 3118 t.Fatal(err) 3119 } 3120 3121 req := &testpb.SimpleRequest{ 3122 ResponseType: testpb.PayloadType_COMPRESSABLE, 3123 ResponseSize: respSize, 3124 Payload: payload, 3125 } 3126 var trailer metadata.MD 3127 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3128 if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.WaitForReady(true)); err != nil { 3129 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3130 } 3131 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2) 3132 if !reflect.DeepEqual(trailer, expectedTrailer) { 3133 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer) 3134 } 3135} 3136 3137func (s) TestMultipleSetTrailerStreamingRPC(t *testing.T) { 3138 for _, e := range listTestEnv() { 3139 testMultipleSetTrailerStreamingRPC(t, e) 3140 } 3141} 3142 3143func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) { 3144 te := newTest(t, e) 3145 te.startServer(&testServer{security: e.security, multipleSetTrailer: true}) 3146 defer te.tearDown() 3147 tc := testpb.NewTestServiceClient(te.clientConn()) 3148 3149 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3150 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)) 3151 if err != nil { 3152 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3153 } 3154 if err := stream.CloseSend(); err != nil { 3155 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3156 } 3157 if _, err := stream.Recv(); err != io.EOF { 3158 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) 3159 } 3160 3161 trailer := stream.Trailer() 3162 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2) 3163 if !reflect.DeepEqual(trailer, expectedTrailer) { 3164 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer) 3165 } 3166} 3167 3168func (s) TestSetAndSendHeaderUnaryRPC(t *testing.T) { 3169 for _, e := range listTestEnv() { 3170 if e.name == "handler-tls" { 3171 continue 3172 } 3173 testSetAndSendHeaderUnaryRPC(t, e) 3174 } 3175} 3176 3177// To test header metadata is sent on SendHeader(). 3178func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) { 3179 te := newTest(t, e) 3180 te.startServer(&testServer{security: e.security, setAndSendHeader: true}) 3181 defer te.tearDown() 3182 tc := testpb.NewTestServiceClient(te.clientConn()) 3183 3184 const ( 3185 argSize = 1 3186 respSize = 1 3187 ) 3188 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3189 if err != nil { 3190 t.Fatal(err) 3191 } 3192 3193 req := &testpb.SimpleRequest{ 3194 ResponseType: testpb.PayloadType_COMPRESSABLE, 3195 ResponseSize: respSize, 3196 Payload: payload, 3197 } 3198 var header metadata.MD 3199 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3200 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil { 3201 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3202 } 3203 delete(header, "user-agent") 3204 delete(header, "content-type") 3205 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3206 if !reflect.DeepEqual(header, expectedHeader) { 3207 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3208 } 3209} 3210 3211func (s) TestMultipleSetHeaderUnaryRPC(t *testing.T) { 3212 for _, e := range listTestEnv() { 3213 if e.name == "handler-tls" { 3214 continue 3215 } 3216 testMultipleSetHeaderUnaryRPC(t, e) 3217 } 3218} 3219 3220// To test header metadata is sent when sending response. 3221func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) { 3222 te := newTest(t, e) 3223 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3224 defer te.tearDown() 3225 tc := testpb.NewTestServiceClient(te.clientConn()) 3226 3227 const ( 3228 argSize = 1 3229 respSize = 1 3230 ) 3231 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3232 if err != nil { 3233 t.Fatal(err) 3234 } 3235 3236 req := &testpb.SimpleRequest{ 3237 ResponseType: testpb.PayloadType_COMPRESSABLE, 3238 ResponseSize: respSize, 3239 Payload: payload, 3240 } 3241 3242 var header metadata.MD 3243 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3244 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil { 3245 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err) 3246 } 3247 delete(header, "user-agent") 3248 delete(header, "content-type") 3249 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3250 if !reflect.DeepEqual(header, expectedHeader) { 3251 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3252 } 3253} 3254 3255func (s) TestMultipleSetHeaderUnaryRPCError(t *testing.T) { 3256 for _, e := range listTestEnv() { 3257 if e.name == "handler-tls" { 3258 continue 3259 } 3260 testMultipleSetHeaderUnaryRPCError(t, e) 3261 } 3262} 3263 3264// To test header metadata is sent when sending status. 3265func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) { 3266 te := newTest(t, e) 3267 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3268 defer te.tearDown() 3269 tc := testpb.NewTestServiceClient(te.clientConn()) 3270 3271 const ( 3272 argSize = 1 3273 respSize = -1 // Invalid respSize to make RPC fail. 3274 ) 3275 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3276 if err != nil { 3277 t.Fatal(err) 3278 } 3279 3280 req := &testpb.SimpleRequest{ 3281 ResponseType: testpb.PayloadType_COMPRESSABLE, 3282 ResponseSize: respSize, 3283 Payload: payload, 3284 } 3285 var header metadata.MD 3286 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3287 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err == nil { 3288 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err) 3289 } 3290 delete(header, "user-agent") 3291 delete(header, "content-type") 3292 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3293 if !reflect.DeepEqual(header, expectedHeader) { 3294 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3295 } 3296} 3297 3298func (s) TestSetAndSendHeaderStreamingRPC(t *testing.T) { 3299 for _, e := range listTestEnv() { 3300 if e.name == "handler-tls" { 3301 continue 3302 } 3303 testSetAndSendHeaderStreamingRPC(t, e) 3304 } 3305} 3306 3307// To test header metadata is sent on SendHeader(). 3308func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) { 3309 te := newTest(t, e) 3310 te.startServer(&testServer{security: e.security, setAndSendHeader: true}) 3311 defer te.tearDown() 3312 tc := testpb.NewTestServiceClient(te.clientConn()) 3313 3314 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3315 stream, err := tc.FullDuplexCall(ctx) 3316 if err != nil { 3317 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3318 } 3319 if err := stream.CloseSend(); err != nil { 3320 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3321 } 3322 if _, err := stream.Recv(); err != io.EOF { 3323 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) 3324 } 3325 3326 header, err := stream.Header() 3327 if err != nil { 3328 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) 3329 } 3330 delete(header, "user-agent") 3331 delete(header, "content-type") 3332 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3333 if !reflect.DeepEqual(header, expectedHeader) { 3334 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3335 } 3336} 3337 3338func (s) TestMultipleSetHeaderStreamingRPC(t *testing.T) { 3339 for _, e := range listTestEnv() { 3340 if e.name == "handler-tls" { 3341 continue 3342 } 3343 testMultipleSetHeaderStreamingRPC(t, e) 3344 } 3345} 3346 3347// To test header metadata is sent when sending response. 3348func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) { 3349 te := newTest(t, e) 3350 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3351 defer te.tearDown() 3352 tc := testpb.NewTestServiceClient(te.clientConn()) 3353 3354 const ( 3355 argSize = 1 3356 respSize = 1 3357 ) 3358 ctx := metadata.NewOutgoingContext(context.Background(), testMetadata) 3359 stream, err := tc.FullDuplexCall(ctx) 3360 if err != nil { 3361 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3362 } 3363 3364 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3365 if err != nil { 3366 t.Fatal(err) 3367 } 3368 3369 req := &testpb.StreamingOutputCallRequest{ 3370 ResponseType: testpb.PayloadType_COMPRESSABLE, 3371 ResponseParameters: []*testpb.ResponseParameters{ 3372 {Size: respSize}, 3373 }, 3374 Payload: payload, 3375 } 3376 if err := stream.Send(req); err != nil { 3377 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3378 } 3379 if _, err := stream.Recv(); err != nil { 3380 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 3381 } 3382 if err := stream.CloseSend(); err != nil { 3383 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3384 } 3385 if _, err := stream.Recv(); err != io.EOF { 3386 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err) 3387 } 3388 3389 header, err := stream.Header() 3390 if err != nil { 3391 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) 3392 } 3393 delete(header, "user-agent") 3394 delete(header, "content-type") 3395 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3396 if !reflect.DeepEqual(header, expectedHeader) { 3397 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3398 } 3399 3400} 3401 3402func (s) TestMultipleSetHeaderStreamingRPCError(t *testing.T) { 3403 for _, e := range listTestEnv() { 3404 if e.name == "handler-tls" { 3405 continue 3406 } 3407 testMultipleSetHeaderStreamingRPCError(t, e) 3408 } 3409} 3410 3411// To test header metadata is sent when sending status. 3412func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) { 3413 te := newTest(t, e) 3414 te.startServer(&testServer{security: e.security, setHeaderOnly: true}) 3415 defer te.tearDown() 3416 tc := testpb.NewTestServiceClient(te.clientConn()) 3417 3418 const ( 3419 argSize = 1 3420 respSize = -1 3421 ) 3422 ctx, cancel := context.WithCancel(context.Background()) 3423 defer cancel() 3424 ctx = metadata.NewOutgoingContext(ctx, testMetadata) 3425 stream, err := tc.FullDuplexCall(ctx) 3426 if err != nil { 3427 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3428 } 3429 3430 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3431 if err != nil { 3432 t.Fatal(err) 3433 } 3434 3435 req := &testpb.StreamingOutputCallRequest{ 3436 ResponseType: testpb.PayloadType_COMPRESSABLE, 3437 ResponseParameters: []*testpb.ResponseParameters{ 3438 {Size: respSize}, 3439 }, 3440 Payload: payload, 3441 } 3442 if err := stream.Send(req); err != nil { 3443 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3444 } 3445 if _, err := stream.Recv(); err == nil { 3446 t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err) 3447 } 3448 3449 header, err := stream.Header() 3450 if err != nil { 3451 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err) 3452 } 3453 delete(header, "user-agent") 3454 delete(header, "content-type") 3455 expectedHeader := metadata.Join(testMetadata, testMetadata2) 3456 if !reflect.DeepEqual(header, expectedHeader) { 3457 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) 3458 } 3459 if err := stream.CloseSend(); err != nil { 3460 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3461 } 3462} 3463 3464// TestMalformedHTTP2Metedata verfies the returned error when the client 3465// sends an illegal metadata. 3466func (s) TestMalformedHTTP2Metadata(t *testing.T) { 3467 for _, e := range listTestEnv() { 3468 if e.name == "handler-tls" { 3469 // Failed with "server stops accepting new RPCs". 3470 // Server stops accepting new RPCs when the client sends an illegal http2 header. 3471 continue 3472 } 3473 testMalformedHTTP2Metadata(t, e) 3474 } 3475} 3476 3477func testMalformedHTTP2Metadata(t *testing.T, e env) { 3478 te := newTest(t, e) 3479 te.startServer(&testServer{security: e.security}) 3480 defer te.tearDown() 3481 tc := testpb.NewTestServiceClient(te.clientConn()) 3482 3483 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718) 3484 if err != nil { 3485 t.Fatal(err) 3486 } 3487 3488 req := &testpb.SimpleRequest{ 3489 ResponseType: testpb.PayloadType_COMPRESSABLE, 3490 ResponseSize: 314, 3491 Payload: payload, 3492 } 3493 ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata) 3494 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Internal { 3495 t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal) 3496 } 3497} 3498 3499func (s) TestTransparentRetry(t *testing.T) { 3500 for _, e := range listTestEnv() { 3501 if e.name == "handler-tls" { 3502 // Fails with RST_STREAM / FLOW_CONTROL_ERROR 3503 continue 3504 } 3505 testTransparentRetry(t, e) 3506 } 3507} 3508 3509// This test makes sure RPCs are retried times when they receive a RST_STREAM 3510// with the REFUSED_STREAM error code, which the InTapHandle provokes. 3511func testTransparentRetry(t *testing.T, e env) { 3512 te := newTest(t, e) 3513 attempts := 0 3514 successAttempt := 2 3515 te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) { 3516 attempts++ 3517 if attempts < successAttempt { 3518 return nil, errors.New("not now") 3519 } 3520 return ctx, nil 3521 } 3522 te.startServer(&testServer{security: e.security}) 3523 defer te.tearDown() 3524 3525 cc := te.clientConn() 3526 tsc := testpb.NewTestServiceClient(cc) 3527 testCases := []struct { 3528 successAttempt int 3529 failFast bool 3530 errCode codes.Code 3531 }{{ 3532 successAttempt: 1, 3533 }, { 3534 successAttempt: 2, 3535 }, { 3536 successAttempt: 3, 3537 errCode: codes.Unavailable, 3538 }, { 3539 successAttempt: 1, 3540 failFast: true, 3541 }, { 3542 successAttempt: 2, 3543 failFast: true, 3544 errCode: codes.Unavailable, // We won't retry on fail fast. 3545 }} 3546 for _, tc := range testCases { 3547 attempts = 0 3548 successAttempt = tc.successAttempt 3549 3550 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 3551 _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!tc.failFast)) 3552 cancel() 3553 if status.Code(err) != tc.errCode { 3554 t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode) 3555 } 3556 } 3557} 3558 3559func (s) TestCancel(t *testing.T) { 3560 for _, e := range listTestEnv() { 3561 testCancel(t, e) 3562 } 3563} 3564 3565func testCancel(t *testing.T, e env) { 3566 te := newTest(t, e) 3567 te.declareLogNoise("grpc: the client connection is closing; please retry") 3568 te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second}) 3569 defer te.tearDown() 3570 3571 cc := te.clientConn() 3572 tc := testpb.NewTestServiceClient(cc) 3573 3574 const argSize = 2718 3575 const respSize = 314 3576 3577 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 3578 if err != nil { 3579 t.Fatal(err) 3580 } 3581 3582 req := &testpb.SimpleRequest{ 3583 ResponseType: testpb.PayloadType_COMPRESSABLE, 3584 ResponseSize: respSize, 3585 Payload: payload, 3586 } 3587 ctx, cancel := context.WithCancel(context.Background()) 3588 time.AfterFunc(1*time.Millisecond, cancel) 3589 if r, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Canceled { 3590 t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled) 3591 } 3592 awaitNewConnLogOutput() 3593} 3594 3595func (s) TestCancelNoIO(t *testing.T) { 3596 for _, e := range listTestEnv() { 3597 testCancelNoIO(t, e) 3598 } 3599} 3600 3601func testCancelNoIO(t *testing.T, e env) { 3602 te := newTest(t, e) 3603 te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken") 3604 te.maxStream = 1 // Only allows 1 live stream per server transport. 3605 te.startServer(&testServer{security: e.security}) 3606 defer te.tearDown() 3607 3608 cc := te.clientConn() 3609 tc := testpb.NewTestServiceClient(cc) 3610 3611 // Start one blocked RPC for which we'll never send streaming 3612 // input. This will consume the 1 maximum concurrent streams, 3613 // causing future RPCs to hang. 3614 ctx, cancelFirst := context.WithCancel(context.Background()) 3615 _, err := tc.StreamingInputCall(ctx) 3616 if err != nil { 3617 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) 3618 } 3619 3620 // Loop until the ClientConn receives the initial settings 3621 // frame from the server, notifying it about the maximum 3622 // concurrent streams. We know when it's received it because 3623 // an RPC will fail with codes.DeadlineExceeded instead of 3624 // succeeding. 3625 // TODO(bradfitz): add internal test hook for this (Issue 534) 3626 for { 3627 ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond) 3628 _, err := tc.StreamingInputCall(ctx) 3629 cancelSecond() 3630 if err == nil { 3631 continue 3632 } 3633 if status.Code(err) == codes.DeadlineExceeded { 3634 break 3635 } 3636 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) 3637 } 3638 // If there are any RPCs in flight before the client receives 3639 // the max streams setting, let them be expired. 3640 // TODO(bradfitz): add internal test hook for this (Issue 534) 3641 time.Sleep(50 * time.Millisecond) 3642 3643 go func() { 3644 time.Sleep(50 * time.Millisecond) 3645 cancelFirst() 3646 }() 3647 3648 // This should be blocked until the 1st is canceled, then succeed. 3649 ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond) 3650 if _, err := tc.StreamingInputCall(ctx); err != nil { 3651 t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) 3652 } 3653 cancelThird() 3654} 3655 3656// The following tests the gRPC streaming RPC implementations. 3657// TODO(zhaoq): Have better coverage on error cases. 3658var ( 3659 reqSizes = []int{27182, 8, 1828, 45904} 3660 respSizes = []int{31415, 9, 2653, 58979} 3661) 3662 3663func (s) TestNoService(t *testing.T) { 3664 for _, e := range listTestEnv() { 3665 testNoService(t, e) 3666 } 3667} 3668 3669func testNoService(t *testing.T, e env) { 3670 te := newTest(t, e) 3671 te.startServer(nil) 3672 defer te.tearDown() 3673 3674 cc := te.clientConn() 3675 tc := testpb.NewTestServiceClient(cc) 3676 3677 stream, err := tc.FullDuplexCall(te.ctx, grpc.WaitForReady(true)) 3678 if err != nil { 3679 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3680 } 3681 if _, err := stream.Recv(); status.Code(err) != codes.Unimplemented { 3682 t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented) 3683 } 3684} 3685 3686func (s) TestPingPong(t *testing.T) { 3687 for _, e := range listTestEnv() { 3688 testPingPong(t, e) 3689 } 3690} 3691 3692func testPingPong(t *testing.T, e env) { 3693 te := newTest(t, e) 3694 te.startServer(&testServer{security: e.security}) 3695 defer te.tearDown() 3696 tc := testpb.NewTestServiceClient(te.clientConn()) 3697 3698 stream, err := tc.FullDuplexCall(te.ctx) 3699 if err != nil { 3700 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3701 } 3702 var index int 3703 for index < len(reqSizes) { 3704 respParam := []*testpb.ResponseParameters{ 3705 { 3706 Size: int32(respSizes[index]), 3707 }, 3708 } 3709 3710 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) 3711 if err != nil { 3712 t.Fatal(err) 3713 } 3714 3715 req := &testpb.StreamingOutputCallRequest{ 3716 ResponseType: testpb.PayloadType_COMPRESSABLE, 3717 ResponseParameters: respParam, 3718 Payload: payload, 3719 } 3720 if err := stream.Send(req); err != nil { 3721 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3722 } 3723 reply, err := stream.Recv() 3724 if err != nil { 3725 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 3726 } 3727 pt := reply.GetPayload().GetType() 3728 if pt != testpb.PayloadType_COMPRESSABLE { 3729 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) 3730 } 3731 size := len(reply.GetPayload().GetBody()) 3732 if size != int(respSizes[index]) { 3733 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) 3734 } 3735 index++ 3736 } 3737 if err := stream.CloseSend(); err != nil { 3738 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil) 3739 } 3740 if _, err := stream.Recv(); err != io.EOF { 3741 t.Fatalf("%v failed to complele the ping pong test: %v", stream, err) 3742 } 3743} 3744 3745func (s) TestMetadataStreamingRPC(t *testing.T) { 3746 for _, e := range listTestEnv() { 3747 testMetadataStreamingRPC(t, e) 3748 } 3749} 3750 3751func testMetadataStreamingRPC(t *testing.T, e env) { 3752 te := newTest(t, e) 3753 te.startServer(&testServer{security: e.security}) 3754 defer te.tearDown() 3755 tc := testpb.NewTestServiceClient(te.clientConn()) 3756 3757 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata) 3758 stream, err := tc.FullDuplexCall(ctx) 3759 if err != nil { 3760 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 3761 } 3762 go func() { 3763 headerMD, err := stream.Header() 3764 if e.security == "tls" { 3765 delete(headerMD, "transport_security_type") 3766 } 3767 delete(headerMD, "trailer") // ignore if present 3768 delete(headerMD, "user-agent") 3769 delete(headerMD, "content-type") 3770 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { 3771 t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata) 3772 } 3773 // test the cached value. 3774 headerMD, err = stream.Header() 3775 delete(headerMD, "trailer") // ignore if present 3776 delete(headerMD, "user-agent") 3777 delete(headerMD, "content-type") 3778 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { 3779 t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata) 3780 } 3781 err = func() error { 3782 for index := 0; index < len(reqSizes); index++ { 3783 respParam := []*testpb.ResponseParameters{ 3784 { 3785 Size: int32(respSizes[index]), 3786 }, 3787 } 3788 3789 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index])) 3790 if err != nil { 3791 return err 3792 } 3793 3794 req := &testpb.StreamingOutputCallRequest{ 3795 ResponseType: testpb.PayloadType_COMPRESSABLE, 3796 ResponseParameters: respParam, 3797 Payload: payload, 3798 } 3799 if err := stream.Send(req); err != nil { 3800 return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err) 3801 } 3802 } 3803 return nil 3804 }() 3805 // Tell the server we're done sending args. 3806 stream.CloseSend() 3807 if err != nil { 3808 t.Error(err) 3809 } 3810 }() 3811 for { 3812 if _, err := stream.Recv(); err != nil { 3813 break 3814 } 3815 } 3816 trailerMD := stream.Trailer() 3817 if !reflect.DeepEqual(testTrailerMetadata, trailerMD) { 3818 t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata) 3819 } 3820} 3821 3822func (s) TestServerStreaming(t *testing.T) { 3823 for _, e := range listTestEnv() { 3824 testServerStreaming(t, e) 3825 } 3826} 3827 3828func testServerStreaming(t *testing.T, e env) { 3829 te := newTest(t, e) 3830 te.startServer(&testServer{security: e.security}) 3831 defer te.tearDown() 3832 tc := testpb.NewTestServiceClient(te.clientConn()) 3833 3834 respParam := make([]*testpb.ResponseParameters, len(respSizes)) 3835 for i, s := range respSizes { 3836 respParam[i] = &testpb.ResponseParameters{ 3837 Size: int32(s), 3838 } 3839 } 3840 req := &testpb.StreamingOutputCallRequest{ 3841 ResponseType: testpb.PayloadType_COMPRESSABLE, 3842 ResponseParameters: respParam, 3843 } 3844 stream, err := tc.StreamingOutputCall(context.Background(), req) 3845 if err != nil { 3846 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) 3847 } 3848 var rpcStatus error 3849 var respCnt int 3850 var index int 3851 for { 3852 reply, err := stream.Recv() 3853 if err != nil { 3854 rpcStatus = err 3855 break 3856 } 3857 pt := reply.GetPayload().GetType() 3858 if pt != testpb.PayloadType_COMPRESSABLE { 3859 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE) 3860 } 3861 size := len(reply.GetPayload().GetBody()) 3862 if size != int(respSizes[index]) { 3863 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index]) 3864 } 3865 index++ 3866 respCnt++ 3867 } 3868 if rpcStatus != io.EOF { 3869 t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus) 3870 } 3871 if respCnt != len(respSizes) { 3872 t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt) 3873 } 3874} 3875 3876func (s) TestFailedServerStreaming(t *testing.T) { 3877 for _, e := range listTestEnv() { 3878 testFailedServerStreaming(t, e) 3879 } 3880} 3881 3882func testFailedServerStreaming(t *testing.T, e env) { 3883 te := newTest(t, e) 3884 te.userAgent = failAppUA 3885 te.startServer(&testServer{security: e.security}) 3886 defer te.tearDown() 3887 tc := testpb.NewTestServiceClient(te.clientConn()) 3888 3889 respParam := make([]*testpb.ResponseParameters, len(respSizes)) 3890 for i, s := range respSizes { 3891 respParam[i] = &testpb.ResponseParameters{ 3892 Size: int32(s), 3893 } 3894 } 3895 req := &testpb.StreamingOutputCallRequest{ 3896 ResponseType: testpb.PayloadType_COMPRESSABLE, 3897 ResponseParameters: respParam, 3898 } 3899 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata) 3900 stream, err := tc.StreamingOutputCall(ctx, req) 3901 if err != nil { 3902 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) 3903 } 3904 wantErr := status.Error(codes.DataLoss, "error for testing: "+failAppUA) 3905 if _, err := stream.Recv(); !reflect.DeepEqual(err, wantErr) { 3906 t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr) 3907 } 3908} 3909 3910// concurrentSendServer is a TestServiceServer whose 3911// StreamingOutputCall makes ten serial Send calls, sending payloads 3912// "0".."9", inclusive. TestServerStreamingConcurrent verifies they 3913// were received in the correct order, and that there were no races. 3914// 3915// All other TestServiceServer methods crash if called. 3916type concurrentSendServer struct { 3917 testpb.TestServiceServer 3918} 3919 3920func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { 3921 for i := 0; i < 10; i++ { 3922 stream.Send(&testpb.StreamingOutputCallResponse{ 3923 Payload: &testpb.Payload{ 3924 Body: []byte{'0' + uint8(i)}, 3925 }, 3926 }) 3927 } 3928 return nil 3929} 3930 3931// Tests doing a bunch of concurrent streaming output calls. 3932func (s) TestServerStreamingConcurrent(t *testing.T) { 3933 for _, e := range listTestEnv() { 3934 testServerStreamingConcurrent(t, e) 3935 } 3936} 3937 3938func testServerStreamingConcurrent(t *testing.T, e env) { 3939 te := newTest(t, e) 3940 te.startServer(concurrentSendServer{}) 3941 defer te.tearDown() 3942 3943 cc := te.clientConn() 3944 tc := testpb.NewTestServiceClient(cc) 3945 3946 doStreamingCall := func() { 3947 req := &testpb.StreamingOutputCallRequest{} 3948 stream, err := tc.StreamingOutputCall(context.Background(), req) 3949 if err != nil { 3950 t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) 3951 return 3952 } 3953 var ngot int 3954 var buf bytes.Buffer 3955 for { 3956 reply, err := stream.Recv() 3957 if err == io.EOF { 3958 break 3959 } 3960 if err != nil { 3961 t.Fatal(err) 3962 } 3963 ngot++ 3964 if buf.Len() > 0 { 3965 buf.WriteByte(',') 3966 } 3967 buf.Write(reply.GetPayload().GetBody()) 3968 } 3969 if want := 10; ngot != want { 3970 t.Errorf("Got %d replies, want %d", ngot, want) 3971 } 3972 if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want { 3973 t.Errorf("Got replies %q; want %q", got, want) 3974 } 3975 } 3976 3977 var wg sync.WaitGroup 3978 for i := 0; i < 20; i++ { 3979 wg.Add(1) 3980 go func() { 3981 defer wg.Done() 3982 doStreamingCall() 3983 }() 3984 } 3985 wg.Wait() 3986 3987} 3988 3989func generatePayloadSizes() [][]int { 3990 reqSizes := [][]int{ 3991 {27182, 8, 1828, 45904}, 3992 } 3993 3994 num8KPayloads := 1024 3995 eightKPayloads := []int{} 3996 for i := 0; i < num8KPayloads; i++ { 3997 eightKPayloads = append(eightKPayloads, (1 << 13)) 3998 } 3999 reqSizes = append(reqSizes, eightKPayloads) 4000 4001 num2MPayloads := 8 4002 twoMPayloads := []int{} 4003 for i := 0; i < num2MPayloads; i++ { 4004 twoMPayloads = append(twoMPayloads, (1 << 21)) 4005 } 4006 reqSizes = append(reqSizes, twoMPayloads) 4007 4008 return reqSizes 4009} 4010 4011func (s) TestClientStreaming(t *testing.T) { 4012 for _, s := range generatePayloadSizes() { 4013 for _, e := range listTestEnv() { 4014 testClientStreaming(t, e, s) 4015 } 4016 } 4017} 4018 4019func testClientStreaming(t *testing.T, e env, sizes []int) { 4020 te := newTest(t, e) 4021 te.startServer(&testServer{security: e.security}) 4022 defer te.tearDown() 4023 tc := testpb.NewTestServiceClient(te.clientConn()) 4024 4025 ctx, cancel := context.WithTimeout(te.ctx, time.Second*30) 4026 defer cancel() 4027 stream, err := tc.StreamingInputCall(ctx) 4028 if err != nil { 4029 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err) 4030 } 4031 4032 var sum int 4033 for _, s := range sizes { 4034 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s)) 4035 if err != nil { 4036 t.Fatal(err) 4037 } 4038 4039 req := &testpb.StreamingInputCallRequest{ 4040 Payload: payload, 4041 } 4042 if err := stream.Send(req); err != nil { 4043 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err) 4044 } 4045 sum += s 4046 } 4047 reply, err := stream.CloseAndRecv() 4048 if err != nil { 4049 t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) 4050 } 4051 if reply.GetAggregatedPayloadSize() != int32(sum) { 4052 t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum) 4053 } 4054} 4055 4056func (s) TestClientStreamingError(t *testing.T) { 4057 for _, e := range listTestEnv() { 4058 if e.name == "handler-tls" { 4059 continue 4060 } 4061 testClientStreamingError(t, e) 4062 } 4063} 4064 4065func testClientStreamingError(t *testing.T, e env) { 4066 te := newTest(t, e) 4067 te.startServer(&testServer{security: e.security, earlyFail: true}) 4068 defer te.tearDown() 4069 tc := testpb.NewTestServiceClient(te.clientConn()) 4070 4071 stream, err := tc.StreamingInputCall(te.ctx) 4072 if err != nil { 4073 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err) 4074 } 4075 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1) 4076 if err != nil { 4077 t.Fatal(err) 4078 } 4079 4080 req := &testpb.StreamingInputCallRequest{ 4081 Payload: payload, 4082 } 4083 // The 1st request should go through. 4084 if err := stream.Send(req); err != nil { 4085 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 4086 } 4087 for { 4088 if err := stream.Send(req); err != io.EOF { 4089 continue 4090 } 4091 if _, err := stream.CloseAndRecv(); status.Code(err) != codes.NotFound { 4092 t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound) 4093 } 4094 break 4095 } 4096} 4097 4098func (s) TestExceedMaxStreamsLimit(t *testing.T) { 4099 for _, e := range listTestEnv() { 4100 testExceedMaxStreamsLimit(t, e) 4101 } 4102} 4103 4104func testExceedMaxStreamsLimit(t *testing.T, e env) { 4105 te := newTest(t, e) 4106 te.declareLogNoise( 4107 "http2Client.notifyError got notified that the client transport was broken", 4108 "Conn.resetTransport failed to create client transport", 4109 "grpc: the connection is closing", 4110 ) 4111 te.maxStream = 1 // Only allows 1 live stream per server transport. 4112 te.startServer(&testServer{security: e.security}) 4113 defer te.tearDown() 4114 4115 cc := te.clientConn() 4116 tc := testpb.NewTestServiceClient(cc) 4117 4118 _, err := tc.StreamingInputCall(te.ctx) 4119 if err != nil { 4120 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) 4121 } 4122 // Loop until receiving the new max stream setting from the server. 4123 for { 4124 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 4125 defer cancel() 4126 _, err := tc.StreamingInputCall(ctx) 4127 if err == nil { 4128 time.Sleep(50 * time.Millisecond) 4129 continue 4130 } 4131 if status.Code(err) == codes.DeadlineExceeded { 4132 break 4133 } 4134 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) 4135 } 4136} 4137 4138func (s) TestStreamsQuotaRecovery(t *testing.T) { 4139 for _, e := range listTestEnv() { 4140 testStreamsQuotaRecovery(t, e) 4141 } 4142} 4143 4144func testStreamsQuotaRecovery(t *testing.T, e env) { 4145 te := newTest(t, e) 4146 te.declareLogNoise( 4147 "http2Client.notifyError got notified that the client transport was broken", 4148 "Conn.resetTransport failed to create client transport", 4149 "grpc: the connection is closing", 4150 ) 4151 te.maxStream = 1 // Allows 1 live stream. 4152 te.startServer(&testServer{security: e.security}) 4153 defer te.tearDown() 4154 4155 cc := te.clientConn() 4156 tc := testpb.NewTestServiceClient(cc) 4157 ctx, cancel := context.WithCancel(context.Background()) 4158 defer cancel() 4159 if _, err := tc.StreamingInputCall(ctx); err != nil { 4160 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, <nil>", err) 4161 } 4162 // Loop until the new max stream setting is effective. 4163 for { 4164 ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) 4165 _, err := tc.StreamingInputCall(ctx) 4166 cancel() 4167 if err == nil { 4168 time.Sleep(5 * time.Millisecond) 4169 continue 4170 } 4171 if status.Code(err) == codes.DeadlineExceeded { 4172 break 4173 } 4174 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %s", err, codes.DeadlineExceeded) 4175 } 4176 4177 var wg sync.WaitGroup 4178 for i := 0; i < 10; i++ { 4179 wg.Add(1) 4180 go func() { 4181 defer wg.Done() 4182 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314) 4183 if err != nil { 4184 t.Error(err) 4185 return 4186 } 4187 req := &testpb.SimpleRequest{ 4188 ResponseType: testpb.PayloadType_COMPRESSABLE, 4189 ResponseSize: 1592, 4190 Payload: payload, 4191 } 4192 // No rpc should go through due to the max streams limit. 4193 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) 4194 defer cancel() 4195 if _, err := tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 4196 t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 4197 } 4198 }() 4199 } 4200 wg.Wait() 4201 4202 cancel() 4203 // A new stream should be allowed after canceling the first one. 4204 ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) 4205 defer cancel() 4206 if _, err := tc.StreamingInputCall(ctx); err != nil { 4207 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %v", err, nil) 4208 } 4209} 4210 4211func (s) TestCompressServerHasNoSupport(t *testing.T) { 4212 for _, e := range listTestEnv() { 4213 testCompressServerHasNoSupport(t, e) 4214 } 4215} 4216 4217func testCompressServerHasNoSupport(t *testing.T, e env) { 4218 te := newTest(t, e) 4219 te.serverCompression = false 4220 te.clientCompression = false 4221 te.clientNopCompression = true 4222 te.startServer(&testServer{security: e.security}) 4223 defer te.tearDown() 4224 tc := testpb.NewTestServiceClient(te.clientConn()) 4225 4226 const argSize = 271828 4227 const respSize = 314159 4228 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 4229 if err != nil { 4230 t.Fatal(err) 4231 } 4232 req := &testpb.SimpleRequest{ 4233 ResponseType: testpb.PayloadType_COMPRESSABLE, 4234 ResponseSize: respSize, 4235 Payload: payload, 4236 } 4237 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.Unimplemented { 4238 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented) 4239 } 4240 // Streaming RPC 4241 stream, err := tc.FullDuplexCall(context.Background()) 4242 if err != nil { 4243 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4244 } 4245 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Unimplemented { 4246 t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented) 4247 } 4248} 4249 4250func (s) TestCompressOK(t *testing.T) { 4251 for _, e := range listTestEnv() { 4252 testCompressOK(t, e) 4253 } 4254} 4255 4256func testCompressOK(t *testing.T, e env) { 4257 te := newTest(t, e) 4258 te.serverCompression = true 4259 te.clientCompression = true 4260 te.startServer(&testServer{security: e.security}) 4261 defer te.tearDown() 4262 tc := testpb.NewTestServiceClient(te.clientConn()) 4263 4264 // Unary call 4265 const argSize = 271828 4266 const respSize = 314159 4267 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 4268 if err != nil { 4269 t.Fatal(err) 4270 } 4271 req := &testpb.SimpleRequest{ 4272 ResponseType: testpb.PayloadType_COMPRESSABLE, 4273 ResponseSize: respSize, 4274 Payload: payload, 4275 } 4276 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) 4277 if _, err := tc.UnaryCall(ctx, req); err != nil { 4278 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 4279 } 4280 // Streaming RPC 4281 ctx, cancel := context.WithCancel(context.Background()) 4282 defer cancel() 4283 stream, err := tc.FullDuplexCall(ctx) 4284 if err != nil { 4285 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4286 } 4287 respParam := []*testpb.ResponseParameters{ 4288 { 4289 Size: 31415, 4290 }, 4291 } 4292 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) 4293 if err != nil { 4294 t.Fatal(err) 4295 } 4296 sreq := &testpb.StreamingOutputCallRequest{ 4297 ResponseType: testpb.PayloadType_COMPRESSABLE, 4298 ResponseParameters: respParam, 4299 Payload: payload, 4300 } 4301 if err := stream.Send(sreq); err != nil { 4302 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 4303 } 4304 stream.CloseSend() 4305 if _, err := stream.Recv(); err != nil { 4306 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 4307 } 4308 if _, err := stream.Recv(); err != io.EOF { 4309 t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err) 4310 } 4311} 4312 4313func (s) TestIdentityEncoding(t *testing.T) { 4314 for _, e := range listTestEnv() { 4315 testIdentityEncoding(t, e) 4316 } 4317} 4318 4319func testIdentityEncoding(t *testing.T, e env) { 4320 te := newTest(t, e) 4321 te.startServer(&testServer{security: e.security}) 4322 defer te.tearDown() 4323 tc := testpb.NewTestServiceClient(te.clientConn()) 4324 4325 // Unary call 4326 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 5) 4327 if err != nil { 4328 t.Fatal(err) 4329 } 4330 req := &testpb.SimpleRequest{ 4331 ResponseType: testpb.PayloadType_COMPRESSABLE, 4332 ResponseSize: 10, 4333 Payload: payload, 4334 } 4335 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) 4336 if _, err := tc.UnaryCall(ctx, req); err != nil { 4337 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 4338 } 4339 // Streaming RPC 4340 ctx, cancel := context.WithCancel(context.Background()) 4341 defer cancel() 4342 stream, err := tc.FullDuplexCall(ctx, grpc.UseCompressor("identity")) 4343 if err != nil { 4344 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4345 } 4346 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) 4347 if err != nil { 4348 t.Fatal(err) 4349 } 4350 sreq := &testpb.StreamingOutputCallRequest{ 4351 ResponseType: testpb.PayloadType_COMPRESSABLE, 4352 ResponseParameters: []*testpb.ResponseParameters{{Size: 10}}, 4353 Payload: payload, 4354 } 4355 if err := stream.Send(sreq); err != nil { 4356 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 4357 } 4358 stream.CloseSend() 4359 if _, err := stream.Recv(); err != nil { 4360 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 4361 } 4362 if _, err := stream.Recv(); err != io.EOF { 4363 t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err) 4364 } 4365} 4366 4367func (s) TestUnaryClientInterceptor(t *testing.T) { 4368 for _, e := range listTestEnv() { 4369 testUnaryClientInterceptor(t, e) 4370 } 4371} 4372 4373func failOkayRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 4374 err := invoker(ctx, method, req, reply, cc, opts...) 4375 if err == nil { 4376 return status.Error(codes.NotFound, "") 4377 } 4378 return err 4379} 4380 4381func testUnaryClientInterceptor(t *testing.T, e env) { 4382 te := newTest(t, e) 4383 te.userAgent = testAppUA 4384 te.unaryClientInt = failOkayRPC 4385 te.startServer(&testServer{security: e.security}) 4386 defer te.tearDown() 4387 4388 tc := testpb.NewTestServiceClient(te.clientConn()) 4389 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.NotFound { 4390 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound) 4391 } 4392} 4393 4394func (s) TestStreamClientInterceptor(t *testing.T) { 4395 for _, e := range listTestEnv() { 4396 testStreamClientInterceptor(t, e) 4397 } 4398} 4399 4400func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 4401 s, err := streamer(ctx, desc, cc, method, opts...) 4402 if err == nil { 4403 return nil, status.Error(codes.NotFound, "") 4404 } 4405 return s, nil 4406} 4407 4408func testStreamClientInterceptor(t *testing.T, e env) { 4409 te := newTest(t, e) 4410 te.streamClientInt = failOkayStream 4411 te.startServer(&testServer{security: e.security}) 4412 defer te.tearDown() 4413 4414 tc := testpb.NewTestServiceClient(te.clientConn()) 4415 respParam := []*testpb.ResponseParameters{ 4416 { 4417 Size: int32(1), 4418 }, 4419 } 4420 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1)) 4421 if err != nil { 4422 t.Fatal(err) 4423 } 4424 req := &testpb.StreamingOutputCallRequest{ 4425 ResponseType: testpb.PayloadType_COMPRESSABLE, 4426 ResponseParameters: respParam, 4427 Payload: payload, 4428 } 4429 if _, err := tc.StreamingOutputCall(context.Background(), req); status.Code(err) != codes.NotFound { 4430 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound) 4431 } 4432} 4433 4434func (s) TestUnaryServerInterceptor(t *testing.T) { 4435 for _, e := range listTestEnv() { 4436 testUnaryServerInterceptor(t, e) 4437 } 4438} 4439 4440func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { 4441 return nil, status.Error(codes.PermissionDenied, "") 4442} 4443 4444func testUnaryServerInterceptor(t *testing.T, e env) { 4445 te := newTest(t, e) 4446 te.unaryServerInt = errInjector 4447 te.startServer(&testServer{security: e.security}) 4448 defer te.tearDown() 4449 4450 tc := testpb.NewTestServiceClient(te.clientConn()) 4451 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.PermissionDenied { 4452 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) 4453 } 4454} 4455 4456func (s) TestStreamServerInterceptor(t *testing.T) { 4457 for _, e := range listTestEnv() { 4458 // TODO(bradfitz): Temporarily skip this env due to #619. 4459 if e.name == "handler-tls" { 4460 continue 4461 } 4462 testStreamServerInterceptor(t, e) 4463 } 4464} 4465 4466func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 4467 if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" { 4468 return handler(srv, ss) 4469 } 4470 // Reject the other methods. 4471 return status.Error(codes.PermissionDenied, "") 4472} 4473 4474func testStreamServerInterceptor(t *testing.T, e env) { 4475 te := newTest(t, e) 4476 te.streamServerInt = fullDuplexOnly 4477 te.startServer(&testServer{security: e.security}) 4478 defer te.tearDown() 4479 4480 tc := testpb.NewTestServiceClient(te.clientConn()) 4481 respParam := []*testpb.ResponseParameters{ 4482 { 4483 Size: int32(1), 4484 }, 4485 } 4486 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1)) 4487 if err != nil { 4488 t.Fatal(err) 4489 } 4490 req := &testpb.StreamingOutputCallRequest{ 4491 ResponseType: testpb.PayloadType_COMPRESSABLE, 4492 ResponseParameters: respParam, 4493 Payload: payload, 4494 } 4495 s1, err := tc.StreamingOutputCall(context.Background(), req) 4496 if err != nil { 4497 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err) 4498 } 4499 if _, err := s1.Recv(); status.Code(err) != codes.PermissionDenied { 4500 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) 4501 } 4502 s2, err := tc.FullDuplexCall(context.Background()) 4503 if err != nil { 4504 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4505 } 4506 if err := s2.Send(req); err != nil { 4507 t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err) 4508 } 4509 if _, err := s2.Recv(); err != nil { 4510 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err) 4511 } 4512} 4513 4514// funcServer implements methods of TestServiceServer using funcs, 4515// similar to an http.HandlerFunc. 4516// Any unimplemented method will crash. Tests implement the method(s) 4517// they need. 4518type funcServer struct { 4519 testpb.TestServiceServer 4520 unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) 4521 streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error 4522 fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error 4523} 4524 4525func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4526 return s.unaryCall(ctx, in) 4527} 4528 4529func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error { 4530 return s.streamingInputCall(stream) 4531} 4532 4533func (s *funcServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 4534 return s.fullDuplexCall(stream) 4535} 4536 4537func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) { 4538 for _, e := range listTestEnv() { 4539 testClientRequestBodyErrorUnexpectedEOF(t, e) 4540 } 4541} 4542 4543func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) { 4544 te := newTest(t, e) 4545 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4546 errUnexpectedCall := errors.New("unexpected call func server method") 4547 t.Error(errUnexpectedCall) 4548 return nil, errUnexpectedCall 4549 }} 4550 te.startServer(ts) 4551 defer te.tearDown() 4552 te.withServerTester(func(st *serverTester) { 4553 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") 4554 // Say we have 5 bytes coming, but set END_STREAM flag: 4555 st.writeData(1, true, []byte{0, 0, 0, 0, 5}) 4556 st.wantAnyFrame() // wait for server to crash (it used to crash) 4557 }) 4558} 4559 4560func (s) TestClientRequestBodyErrorCloseAfterLength(t *testing.T) { 4561 for _, e := range listTestEnv() { 4562 testClientRequestBodyErrorCloseAfterLength(t, e) 4563 } 4564} 4565 4566func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) { 4567 te := newTest(t, e) 4568 te.declareLogNoise("Server.processUnaryRPC failed to write status") 4569 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4570 errUnexpectedCall := errors.New("unexpected call func server method") 4571 t.Error(errUnexpectedCall) 4572 return nil, errUnexpectedCall 4573 }} 4574 te.startServer(ts) 4575 defer te.tearDown() 4576 te.withServerTester(func(st *serverTester) { 4577 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") 4578 // say we're sending 5 bytes, but then close the connection instead. 4579 st.writeData(1, false, []byte{0, 0, 0, 0, 5}) 4580 st.cc.Close() 4581 }) 4582} 4583 4584func (s) TestClientRequestBodyErrorCancel(t *testing.T) { 4585 for _, e := range listTestEnv() { 4586 testClientRequestBodyErrorCancel(t, e) 4587 } 4588} 4589 4590func testClientRequestBodyErrorCancel(t *testing.T, e env) { 4591 te := newTest(t, e) 4592 gotCall := make(chan bool, 1) 4593 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 4594 gotCall <- true 4595 return new(testpb.SimpleResponse), nil 4596 }} 4597 te.startServer(ts) 4598 defer te.tearDown() 4599 te.withServerTester(func(st *serverTester) { 4600 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall") 4601 // Say we have 5 bytes coming, but cancel it instead. 4602 st.writeRSTStream(1, http2.ErrCodeCancel) 4603 st.writeData(1, false, []byte{0, 0, 0, 0, 5}) 4604 4605 // Verify we didn't a call yet. 4606 select { 4607 case <-gotCall: 4608 t.Fatal("unexpected call") 4609 default: 4610 } 4611 4612 // And now send an uncanceled (but still invalid), just to get a response. 4613 st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall") 4614 st.writeData(3, true, []byte{0, 0, 0, 0, 0}) 4615 <-gotCall 4616 st.wantAnyFrame() 4617 }) 4618} 4619 4620func (s) TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) { 4621 for _, e := range listTestEnv() { 4622 testClientRequestBodyErrorCancelStreamingInput(t, e) 4623 } 4624} 4625 4626func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) { 4627 te := newTest(t, e) 4628 recvErr := make(chan error, 1) 4629 ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error { 4630 _, err := stream.Recv() 4631 recvErr <- err 4632 return nil 4633 }} 4634 te.startServer(ts) 4635 defer te.tearDown() 4636 te.withServerTester(func(st *serverTester) { 4637 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall") 4638 // Say we have 5 bytes coming, but cancel it instead. 4639 st.writeData(1, false, []byte{0, 0, 0, 0, 5}) 4640 st.writeRSTStream(1, http2.ErrCodeCancel) 4641 4642 var got error 4643 select { 4644 case got = <-recvErr: 4645 case <-time.After(3 * time.Second): 4646 t.Fatal("timeout waiting for error") 4647 } 4648 if grpc.Code(got) != codes.Canceled { 4649 t.Errorf("error = %#v; want error code %s", got, codes.Canceled) 4650 } 4651 }) 4652} 4653 4654func (s) TestClientResourceExhaustedCancelFullDuplex(t *testing.T) { 4655 for _, e := range listTestEnv() { 4656 if e.httpHandler { 4657 // httpHandler write won't be blocked on flow control window. 4658 continue 4659 } 4660 testClientResourceExhaustedCancelFullDuplex(t, e) 4661 } 4662} 4663 4664func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) { 4665 te := newTest(t, e) 4666 recvErr := make(chan error, 1) 4667 ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 4668 defer close(recvErr) 4669 _, err := stream.Recv() 4670 if err != nil { 4671 return status.Errorf(codes.Internal, "stream.Recv() got error: %v, want <nil>", err) 4672 } 4673 // create a payload that's larger than the default flow control window. 4674 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10) 4675 if err != nil { 4676 return err 4677 } 4678 resp := &testpb.StreamingOutputCallResponse{ 4679 Payload: payload, 4680 } 4681 ce := make(chan error) 4682 go func() { 4683 var err error 4684 for { 4685 if err = stream.Send(resp); err != nil { 4686 break 4687 } 4688 } 4689 ce <- err 4690 }() 4691 select { 4692 case err = <-ce: 4693 case <-time.After(10 * time.Second): 4694 err = errors.New("10s timeout reached") 4695 } 4696 recvErr <- err 4697 return err 4698 }} 4699 te.startServer(ts) 4700 defer te.tearDown() 4701 // set a low limit on receive message size to error with Resource Exhausted on 4702 // client side when server send a large message. 4703 te.maxClientReceiveMsgSize = newInt(10) 4704 cc := te.clientConn() 4705 tc := testpb.NewTestServiceClient(cc) 4706 stream, err := tc.FullDuplexCall(context.Background()) 4707 if err != nil { 4708 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 4709 } 4710 req := &testpb.StreamingOutputCallRequest{} 4711 if err := stream.Send(req); err != nil { 4712 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 4713 } 4714 if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted { 4715 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 4716 } 4717 err = <-recvErr 4718 if status.Code(err) != codes.Canceled { 4719 t.Fatalf("server got error %v, want error code: %s", err, codes.Canceled) 4720 } 4721} 4722 4723type clientTimeoutCreds struct { 4724 timeoutReturned bool 4725} 4726 4727func (c *clientTimeoutCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4728 if !c.timeoutReturned { 4729 c.timeoutReturned = true 4730 return nil, nil, context.DeadlineExceeded 4731 } 4732 return rawConn, nil, nil 4733} 4734func (c *clientTimeoutCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4735 return rawConn, nil, nil 4736} 4737func (c *clientTimeoutCreds) Info() credentials.ProtocolInfo { 4738 return credentials.ProtocolInfo{} 4739} 4740func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials { 4741 return nil 4742} 4743func (c *clientTimeoutCreds) OverrideServerName(s string) error { 4744 return nil 4745} 4746 4747func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) { 4748 te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds", balancer: "v1"}) 4749 te.userAgent = testAppUA 4750 te.startServer(&testServer{security: te.e.security}) 4751 defer te.tearDown() 4752 4753 cc := te.clientConn() 4754 tc := testpb.NewTestServiceClient(cc) 4755 // This unary call should succeed, because ClientHandshake will succeed for the second time. 4756 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { 4757 te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <nil>", err) 4758 } 4759} 4760 4761type serverDispatchCred struct { 4762 rawConnCh chan net.Conn 4763} 4764 4765func newServerDispatchCred() *serverDispatchCred { 4766 return &serverDispatchCred{ 4767 rawConnCh: make(chan net.Conn, 1), 4768 } 4769} 4770func (c *serverDispatchCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4771 return rawConn, nil, nil 4772} 4773func (c *serverDispatchCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4774 select { 4775 case c.rawConnCh <- rawConn: 4776 default: 4777 } 4778 return nil, nil, credentials.ErrConnDispatched 4779} 4780func (c *serverDispatchCred) Info() credentials.ProtocolInfo { 4781 return credentials.ProtocolInfo{} 4782} 4783func (c *serverDispatchCred) Clone() credentials.TransportCredentials { 4784 return nil 4785} 4786func (c *serverDispatchCred) OverrideServerName(s string) error { 4787 return nil 4788} 4789func (c *serverDispatchCred) getRawConn() net.Conn { 4790 return <-c.rawConnCh 4791} 4792 4793func (s) TestServerCredsDispatch(t *testing.T) { 4794 lis, err := net.Listen("tcp", "localhost:0") 4795 if err != nil { 4796 t.Fatalf("Failed to listen: %v", err) 4797 } 4798 cred := newServerDispatchCred() 4799 s := grpc.NewServer(grpc.Creds(cred)) 4800 go s.Serve(lis) 4801 defer s.Stop() 4802 4803 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(cred)) 4804 if err != nil { 4805 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) 4806 } 4807 defer cc.Close() 4808 4809 rawConn := cred.getRawConn() 4810 // Give grpc a chance to see the error and potentially close the connection. 4811 // And check that connection is not closed after that. 4812 time.Sleep(100 * time.Millisecond) 4813 // Check rawConn is not closed. 4814 if n, err := rawConn.Write([]byte{0}); n <= 0 || err != nil { 4815 t.Errorf("Read() = %v, %v; want n>0, <nil>", n, err) 4816 } 4817} 4818 4819type authorityCheckCreds struct { 4820 got string 4821} 4822 4823func (c *authorityCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4824 return rawConn, nil, nil 4825} 4826func (c *authorityCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4827 c.got = authority 4828 return rawConn, nil, nil 4829} 4830func (c *authorityCheckCreds) Info() credentials.ProtocolInfo { 4831 return credentials.ProtocolInfo{} 4832} 4833func (c *authorityCheckCreds) Clone() credentials.TransportCredentials { 4834 return c 4835} 4836func (c *authorityCheckCreds) OverrideServerName(s string) error { 4837 return nil 4838} 4839 4840// This test makes sure that the authority client handshake gets is the endpoint 4841// in dial target, not the resolved ip address. 4842func (s) TestCredsHandshakeAuthority(t *testing.T) { 4843 const testAuthority = "test.auth.ori.ty" 4844 4845 lis, err := net.Listen("tcp", "localhost:0") 4846 if err != nil { 4847 t.Fatalf("Failed to listen: %v", err) 4848 } 4849 cred := &authorityCheckCreds{} 4850 s := grpc.NewServer() 4851 go s.Serve(lis) 4852 defer s.Stop() 4853 4854 r, rcleanup := manual.GenerateAndRegisterManualResolver() 4855 defer rcleanup() 4856 4857 cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred)) 4858 if err != nil { 4859 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) 4860 } 4861 defer cc.Close() 4862 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) 4863 4864 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) 4865 defer cancel() 4866 for { 4867 s := cc.GetState() 4868 if s == connectivity.Ready { 4869 break 4870 } 4871 if !cc.WaitForStateChange(ctx, s) { 4872 // ctx got timeout or canceled. 4873 t.Fatalf("ClientConn is not ready after 100 ms") 4874 } 4875 } 4876 4877 if cred.got != testAuthority { 4878 t.Fatalf("client creds got authority: %q, want: %q", cred.got, testAuthority) 4879 } 4880} 4881 4882type clientFailCreds struct{} 4883 4884func (c *clientFailCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4885 return rawConn, nil, nil 4886} 4887func (c *clientFailCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 4888 return nil, nil, fmt.Errorf("client handshake fails with fatal error") 4889} 4890func (c *clientFailCreds) Info() credentials.ProtocolInfo { 4891 return credentials.ProtocolInfo{} 4892} 4893func (c *clientFailCreds) Clone() credentials.TransportCredentials { 4894 return c 4895} 4896func (c *clientFailCreds) OverrideServerName(s string) error { 4897 return nil 4898} 4899 4900// This test makes sure that failfast RPCs fail if client handshake fails with 4901// fatal errors. 4902func (s) TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) { 4903 lis, err := net.Listen("tcp", "localhost:0") 4904 if err != nil { 4905 t.Fatalf("Failed to listen: %v", err) 4906 } 4907 defer lis.Close() 4908 4909 cc, err := grpc.Dial("passthrough:///"+lis.Addr().String(), grpc.WithTransportCredentials(&clientFailCreds{})) 4910 if err != nil { 4911 t.Fatalf("grpc.Dial(_) = %v", err) 4912 } 4913 defer cc.Close() 4914 4915 tc := testpb.NewTestServiceClient(cc) 4916 // This unary call should fail, but not timeout. 4917 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 4918 defer cancel() 4919 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(false)); status.Code(err) != codes.Unavailable { 4920 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <Unavailable>", err) 4921 } 4922} 4923 4924func (s) TestFlowControlLogicalRace(t *testing.T) { 4925 // Test for a regression of https://github.com/grpc/grpc-go/issues/632, 4926 // and other flow control bugs. 4927 4928 const ( 4929 itemCount = 100 4930 itemSize = 1 << 10 4931 recvCount = 2 4932 maxFailures = 3 4933 4934 requestTimeout = time.Second * 5 4935 ) 4936 4937 requestCount := 10000 4938 if raceMode { 4939 requestCount = 1000 4940 } 4941 4942 lis, err := net.Listen("tcp", "localhost:0") 4943 if err != nil { 4944 t.Fatalf("Failed to listen: %v", err) 4945 } 4946 defer lis.Close() 4947 4948 s := grpc.NewServer() 4949 testpb.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{ 4950 itemCount: itemCount, 4951 itemSize: itemSize, 4952 }) 4953 defer s.Stop() 4954 4955 go s.Serve(lis) 4956 4957 ctx := context.Background() 4958 4959 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) 4960 if err != nil { 4961 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) 4962 } 4963 defer cc.Close() 4964 cl := testpb.NewTestServiceClient(cc) 4965 4966 failures := 0 4967 for i := 0; i < requestCount; i++ { 4968 ctx, cancel := context.WithTimeout(ctx, requestTimeout) 4969 output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{}) 4970 if err != nil { 4971 t.Fatalf("StreamingOutputCall; err = %q", err) 4972 } 4973 4974 j := 0 4975 loop: 4976 for ; j < recvCount; j++ { 4977 _, err := output.Recv() 4978 if err != nil { 4979 if err == io.EOF { 4980 break loop 4981 } 4982 switch status.Code(err) { 4983 case codes.DeadlineExceeded: 4984 break loop 4985 default: 4986 t.Fatalf("Recv; err = %q", err) 4987 } 4988 } 4989 } 4990 cancel() 4991 <-ctx.Done() 4992 4993 if j < recvCount { 4994 t.Errorf("got %d responses to request %d", j, i) 4995 failures++ 4996 if failures >= maxFailures { 4997 // Continue past the first failure to see if the connection is 4998 // entirely broken, or if only a single RPC was affected 4999 break 5000 } 5001 } 5002 } 5003} 5004 5005type flowControlLogicalRaceServer struct { 5006 testpb.TestServiceServer 5007 5008 itemSize int 5009 itemCount int 5010} 5011 5012func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testpb.TestService_StreamingOutputCallServer) error { 5013 for i := 0; i < s.itemCount; i++ { 5014 err := srv.Send(&testpb.StreamingOutputCallResponse{ 5015 Payload: &testpb.Payload{ 5016 // Sending a large stream of data which the client reject 5017 // helps to trigger some types of flow control bugs. 5018 // 5019 // Reallocating memory here is inefficient, but the stress it 5020 // puts on the GC leads to more frequent flow control 5021 // failures. The GC likely causes more variety in the 5022 // goroutine scheduling orders. 5023 Body: bytes.Repeat([]byte("a"), s.itemSize), 5024 }, 5025 }) 5026 if err != nil { 5027 return err 5028 } 5029 } 5030 return nil 5031} 5032 5033type lockingWriter struct { 5034 mu sync.Mutex 5035 w io.Writer 5036} 5037 5038func (lw *lockingWriter) Write(p []byte) (n int, err error) { 5039 lw.mu.Lock() 5040 defer lw.mu.Unlock() 5041 return lw.w.Write(p) 5042} 5043 5044func (lw *lockingWriter) setWriter(w io.Writer) { 5045 lw.mu.Lock() 5046 defer lw.mu.Unlock() 5047 lw.w = w 5048} 5049 5050var testLogOutput = &lockingWriter{w: os.Stderr} 5051 5052// awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to 5053// terminate, if they're still running. It spams logs with this 5054// message. We wait for it so our log filter is still 5055// active. Otherwise the "defer restore()" at the top of various test 5056// functions restores our log filter and then the goroutine spams. 5057func awaitNewConnLogOutput() { 5058 awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry") 5059} 5060 5061func awaitLogOutput(maxWait time.Duration, phrase string) { 5062 pb := []byte(phrase) 5063 5064 timer := time.NewTimer(maxWait) 5065 defer timer.Stop() 5066 wakeup := make(chan bool, 1) 5067 for { 5068 if logOutputHasContents(pb, wakeup) { 5069 return 5070 } 5071 select { 5072 case <-timer.C: 5073 // Too slow. Oh well. 5074 return 5075 case <-wakeup: 5076 } 5077 } 5078} 5079 5080func logOutputHasContents(v []byte, wakeup chan<- bool) bool { 5081 testLogOutput.mu.Lock() 5082 defer testLogOutput.mu.Unlock() 5083 fw, ok := testLogOutput.w.(*filterWriter) 5084 if !ok { 5085 return false 5086 } 5087 fw.mu.Lock() 5088 defer fw.mu.Unlock() 5089 if bytes.Contains(fw.buf.Bytes(), v) { 5090 return true 5091 } 5092 fw.wakeup = wakeup 5093 return false 5094} 5095 5096var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering") 5097 5098func noop() {} 5099 5100// declareLogNoise declares that t is expected to emit the following noisy phrases, 5101// even on success. Those phrases will be filtered from grpclog output 5102// and only be shown if *verbose_logs or t ends up failing. 5103// The returned restore function should be called with defer to be run 5104// before the test ends. 5105func declareLogNoise(t *testing.T, phrases ...string) (restore func()) { 5106 if *verboseLogs { 5107 return noop 5108 } 5109 fw := &filterWriter{dst: os.Stderr, filter: phrases} 5110 testLogOutput.setWriter(fw) 5111 return func() { 5112 if t.Failed() { 5113 fw.mu.Lock() 5114 defer fw.mu.Unlock() 5115 if fw.buf.Len() > 0 { 5116 t.Logf("Complete log output:\n%s", fw.buf.Bytes()) 5117 } 5118 } 5119 testLogOutput.setWriter(os.Stderr) 5120 } 5121} 5122 5123type filterWriter struct { 5124 dst io.Writer 5125 filter []string 5126 5127 mu sync.Mutex 5128 buf bytes.Buffer 5129 wakeup chan<- bool // if non-nil, gets true on write 5130} 5131 5132func (fw *filterWriter) Write(p []byte) (n int, err error) { 5133 fw.mu.Lock() 5134 fw.buf.Write(p) 5135 if fw.wakeup != nil { 5136 select { 5137 case fw.wakeup <- true: 5138 default: 5139 } 5140 } 5141 fw.mu.Unlock() 5142 5143 ps := string(p) 5144 for _, f := range fw.filter { 5145 if strings.Contains(ps, f) { 5146 return len(p), nil 5147 } 5148 } 5149 return fw.dst.Write(p) 5150} 5151 5152// stubServer is a server that is easy to customize within individual test 5153// cases. 5154type stubServer struct { 5155 // Guarantees we satisfy this interface; panics if unimplemented methods are called. 5156 testpb.TestServiceServer 5157 5158 // Customizable implementations of server handlers. 5159 emptyCall func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) 5160 unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) 5161 fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error 5162 5163 // A client connected to this service the test may use. Created in Start(). 5164 client testpb.TestServiceClient 5165 cc *grpc.ClientConn 5166 5167 addr string // address of listener 5168 5169 cleanups []func() // Lambdas executed in Stop(); populated by Start(). 5170 5171 r *manual.Resolver 5172} 5173 5174func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5175 return ss.emptyCall(ctx, in) 5176} 5177 5178func (ss *stubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 5179 return ss.unaryCall(ctx, in) 5180} 5181 5182func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { 5183 return ss.fullDuplexCall(stream) 5184} 5185 5186// Start starts the server and creates a client connected to it. 5187func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { 5188 r, cleanup := manual.GenerateAndRegisterManualResolver() 5189 ss.r = r 5190 ss.cleanups = append(ss.cleanups, cleanup) 5191 5192 lis, err := net.Listen("tcp", "localhost:0") 5193 if err != nil { 5194 return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err) 5195 } 5196 ss.addr = lis.Addr().String() 5197 ss.cleanups = append(ss.cleanups, func() { lis.Close() }) 5198 5199 s := grpc.NewServer(sopts...) 5200 testpb.RegisterTestServiceServer(s, ss) 5201 go s.Serve(lis) 5202 ss.cleanups = append(ss.cleanups, s.Stop) 5203 5204 target := ss.r.Scheme() + ":///" + ss.addr 5205 5206 opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...) 5207 cc, err := grpc.Dial(target, opts...) 5208 if err != nil { 5209 return fmt.Errorf("grpc.Dial(%q) = %v", target, err) 5210 } 5211 ss.cc = cc 5212 ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}}) 5213 if err := ss.waitForReady(cc); err != nil { 5214 return err 5215 } 5216 5217 ss.cleanups = append(ss.cleanups, func() { cc.Close() }) 5218 5219 ss.client = testpb.NewTestServiceClient(cc) 5220 return nil 5221} 5222 5223func (ss *stubServer) newServiceConfig(sc string) { 5224 ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}, ServiceConfig: sc}) 5225} 5226 5227func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error { 5228 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 5229 defer cancel() 5230 for { 5231 s := cc.GetState() 5232 if s == connectivity.Ready { 5233 return nil 5234 } 5235 if !cc.WaitForStateChange(ctx, s) { 5236 // ctx got timeout or canceled. 5237 return ctx.Err() 5238 } 5239 } 5240} 5241 5242func (ss *stubServer) Stop() { 5243 for i := len(ss.cleanups) - 1; i >= 0; i-- { 5244 ss.cleanups[i]() 5245 } 5246} 5247 5248func (s) TestGRPCMethod(t *testing.T) { 5249 var method string 5250 var ok bool 5251 5252 ss := &stubServer{ 5253 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5254 method, ok = grpc.Method(ctx) 5255 return &testpb.Empty{}, nil 5256 }, 5257 } 5258 if err := ss.Start(nil); err != nil { 5259 t.Fatalf("Error starting endpoint server: %v", err) 5260 } 5261 defer ss.Stop() 5262 5263 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5264 defer cancel() 5265 5266 if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { 5267 t.Fatalf("ss.client.EmptyCall(_, _) = _, %v; want _, nil", err) 5268 } 5269 5270 if want := "/grpc.testing.TestService/EmptyCall"; !ok || method != want { 5271 t.Fatalf("grpc.Method(_) = %q, %v; want %q, true", method, ok, want) 5272 } 5273} 5274 5275func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) { 5276 const mdkey = "somedata" 5277 5278 // endpoint ensures mdkey is NOT in metadata and returns an error if it is. 5279 endpoint := &stubServer{ 5280 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5281 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil { 5282 return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) 5283 } 5284 return &testpb.Empty{}, nil 5285 }, 5286 } 5287 if err := endpoint.Start(nil); err != nil { 5288 t.Fatalf("Error starting endpoint server: %v", err) 5289 } 5290 defer endpoint.Stop() 5291 5292 // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint 5293 // without explicitly copying the metadata. 5294 proxy := &stubServer{ 5295 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5296 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil { 5297 return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey) 5298 } 5299 return endpoint.client.EmptyCall(ctx, in) 5300 }, 5301 } 5302 if err := proxy.Start(nil); err != nil { 5303 t.Fatalf("Error starting proxy server: %v", err) 5304 } 5305 defer proxy.Stop() 5306 5307 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5308 defer cancel() 5309 md := metadata.Pairs(mdkey, "val") 5310 ctx = metadata.NewOutgoingContext(ctx, md) 5311 5312 // Sanity check that endpoint properly errors when it sees mdkey. 5313 _, err := endpoint.client.EmptyCall(ctx, &testpb.Empty{}) 5314 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal { 5315 t.Fatalf("endpoint.client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err) 5316 } 5317 5318 if _, err := proxy.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { 5319 t.Fatal(err.Error()) 5320 } 5321} 5322 5323func (s) TestStreamingProxyDoesNotForwardMetadata(t *testing.T) { 5324 const mdkey = "somedata" 5325 5326 // doFDC performs a FullDuplexCall with client and returns the error from the 5327 // first stream.Recv call, or nil if that error is io.EOF. Calls t.Fatal if 5328 // the stream cannot be established. 5329 doFDC := func(ctx context.Context, client testpb.TestServiceClient) error { 5330 stream, err := client.FullDuplexCall(ctx) 5331 if err != nil { 5332 t.Fatalf("Unwanted error: %v", err) 5333 } 5334 if _, err := stream.Recv(); err != io.EOF { 5335 return err 5336 } 5337 return nil 5338 } 5339 5340 // endpoint ensures mdkey is NOT in metadata and returns an error if it is. 5341 endpoint := &stubServer{ 5342 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 5343 ctx := stream.Context() 5344 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil { 5345 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) 5346 } 5347 return nil 5348 }, 5349 } 5350 if err := endpoint.Start(nil); err != nil { 5351 t.Fatalf("Error starting endpoint server: %v", err) 5352 } 5353 defer endpoint.Stop() 5354 5355 // proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint 5356 // without explicitly copying the metadata. 5357 proxy := &stubServer{ 5358 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 5359 ctx := stream.Context() 5360 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil { 5361 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey) 5362 } 5363 return doFDC(ctx, endpoint.client) 5364 }, 5365 } 5366 if err := proxy.Start(nil); err != nil { 5367 t.Fatalf("Error starting proxy server: %v", err) 5368 } 5369 defer proxy.Stop() 5370 5371 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5372 defer cancel() 5373 md := metadata.Pairs(mdkey, "val") 5374 ctx = metadata.NewOutgoingContext(ctx, md) 5375 5376 // Sanity check that endpoint properly errors when it sees mdkey in ctx. 5377 err := doFDC(ctx, endpoint.client) 5378 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal { 5379 t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err) 5380 } 5381 5382 if err := doFDC(ctx, proxy.client); err != nil { 5383 t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err) 5384 } 5385} 5386 5387func (s) TestStatsTagsAndTrace(t *testing.T) { 5388 // Data added to context by client (typically in a stats handler). 5389 tags := []byte{1, 5, 2, 4, 3} 5390 trace := []byte{5, 2, 1, 3, 4} 5391 5392 // endpoint ensures Tags() and Trace() in context match those that were added 5393 // by the client and returns an error if not. 5394 endpoint := &stubServer{ 5395 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5396 md, _ := metadata.FromIncomingContext(ctx) 5397 if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) { 5398 return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags) 5399 } 5400 if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) { 5401 return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags) 5402 } 5403 if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) { 5404 return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace) 5405 } 5406 if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) { 5407 return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace) 5408 } 5409 return &testpb.Empty{}, nil 5410 }, 5411 } 5412 if err := endpoint.Start(nil); err != nil { 5413 t.Fatalf("Error starting endpoint server: %v", err) 5414 } 5415 defer endpoint.Stop() 5416 5417 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5418 defer cancel() 5419 5420 testCases := []struct { 5421 ctx context.Context 5422 want codes.Code 5423 }{ 5424 {ctx: ctx, want: codes.Internal}, 5425 {ctx: stats.SetTags(ctx, tags), want: codes.Internal}, 5426 {ctx: stats.SetTrace(ctx, trace), want: codes.Internal}, 5427 {ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal}, 5428 {ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK}, 5429 } 5430 5431 for _, tc := range testCases { 5432 _, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{}) 5433 if tc.want == codes.OK && err != nil { 5434 t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err) 5435 } 5436 if s, ok := status.FromError(err); !ok || s.Code() != tc.want { 5437 t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want) 5438 } 5439 } 5440} 5441 5442func (s) TestTapTimeout(t *testing.T) { 5443 sopts := []grpc.ServerOption{ 5444 grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) { 5445 c, cancel := context.WithCancel(ctx) 5446 // Call cancel instead of setting a deadline so we can detect which error 5447 // occurred -- this cancellation (desired) or the client's deadline 5448 // expired (indicating this cancellation did not affect the RPC). 5449 time.AfterFunc(10*time.Millisecond, cancel) 5450 return c, nil 5451 }), 5452 } 5453 5454 ss := &stubServer{ 5455 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 5456 <-ctx.Done() 5457 return nil, status.Errorf(codes.Canceled, ctx.Err().Error()) 5458 }, 5459 } 5460 if err := ss.Start(sopts); err != nil { 5461 t.Fatalf("Error starting endpoint server: %v", err) 5462 } 5463 defer ss.Stop() 5464 5465 // This was known to be flaky; test several times. 5466 for i := 0; i < 10; i++ { 5467 // Set our own deadline in case the server hangs. 5468 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 5469 res, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) 5470 cancel() 5471 if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled { 5472 t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, <status with Code()=Canceled>", res, err) 5473 } 5474 } 5475 5476} 5477 5478func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) { 5479 ss := &stubServer{ 5480 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 5481 return status.Errorf(codes.Internal, "") 5482 }, 5483 } 5484 sopts := []grpc.ServerOption{} 5485 if err := ss.Start(sopts); err != nil { 5486 t.Fatalf("Error starting endpoint server: %v", err) 5487 } 5488 defer ss.Stop() 5489 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 5490 defer cancel() 5491 stream, err := ss.client.FullDuplexCall(ctx) 5492 if err != nil { 5493 t.Fatalf("Error while creating stream: %v", err) 5494 } 5495 for { 5496 if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err == nil { 5497 time.Sleep(5 * time.Millisecond) 5498 } else if err == io.EOF { 5499 break // Success. 5500 } else { 5501 t.Fatalf("stream.Send(_) = %v, want io.EOF", err) 5502 } 5503 } 5504} 5505 5506type windowSizeConfig struct { 5507 serverStream int32 5508 serverConn int32 5509 clientStream int32 5510 clientConn int32 5511} 5512 5513func max(a, b int32) int32 { 5514 if a > b { 5515 return a 5516 } 5517 return b 5518} 5519 5520func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) { 5521 wc := windowSizeConfig{ 5522 serverStream: 8 * 1024 * 1024, 5523 serverConn: 12 * 1024 * 1024, 5524 clientStream: 6 * 1024 * 1024, 5525 clientConn: 8 * 1024 * 1024, 5526 } 5527 for _, e := range listTestEnv() { 5528 testConfigurableWindowSize(t, e, wc) 5529 } 5530} 5531 5532func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) { 5533 wc := windowSizeConfig{ 5534 serverStream: 1, 5535 serverConn: 1, 5536 clientStream: 1, 5537 clientConn: 1, 5538 } 5539 for _, e := range listTestEnv() { 5540 testConfigurableWindowSize(t, e, wc) 5541 } 5542} 5543 5544func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) { 5545 te := newTest(t, e) 5546 te.serverInitialWindowSize = wc.serverStream 5547 te.serverInitialConnWindowSize = wc.serverConn 5548 te.clientInitialWindowSize = wc.clientStream 5549 te.clientInitialConnWindowSize = wc.clientConn 5550 5551 te.startServer(&testServer{security: e.security}) 5552 defer te.tearDown() 5553 5554 cc := te.clientConn() 5555 tc := testpb.NewTestServiceClient(cc) 5556 stream, err := tc.FullDuplexCall(context.Background()) 5557 if err != nil { 5558 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5559 } 5560 numOfIter := 11 5561 // Set message size to exhaust largest of window sizes. 5562 messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1) 5563 messageSize = max(messageSize, 64*1024) 5564 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize) 5565 if err != nil { 5566 t.Fatal(err) 5567 } 5568 respParams := []*testpb.ResponseParameters{ 5569 { 5570 Size: messageSize, 5571 }, 5572 } 5573 req := &testpb.StreamingOutputCallRequest{ 5574 ResponseType: testpb.PayloadType_COMPRESSABLE, 5575 ResponseParameters: respParams, 5576 Payload: payload, 5577 } 5578 for i := 0; i < numOfIter; i++ { 5579 if err := stream.Send(req); err != nil { 5580 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) 5581 } 5582 if _, err := stream.Recv(); err != nil { 5583 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err) 5584 } 5585 } 5586 if err := stream.CloseSend(); err != nil { 5587 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err) 5588 } 5589} 5590 5591var ( 5592 // test authdata 5593 authdata = map[string]string{ 5594 "test-key": "test-value", 5595 "test-key2-bin": string([]byte{1, 2, 3}), 5596 } 5597) 5598 5599type testPerRPCCredentials struct{} 5600 5601func (cr testPerRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { 5602 return authdata, nil 5603} 5604 5605func (cr testPerRPCCredentials) RequireTransportSecurity() bool { 5606 return false 5607} 5608 5609func authHandle(ctx context.Context, info *tap.Info) (context.Context, error) { 5610 md, ok := metadata.FromIncomingContext(ctx) 5611 if !ok { 5612 return ctx, fmt.Errorf("didn't find metadata in context") 5613 } 5614 for k, vwant := range authdata { 5615 vgot, ok := md[k] 5616 if !ok { 5617 return ctx, fmt.Errorf("didn't find authdata key %v in context", k) 5618 } 5619 if vgot[0] != vwant { 5620 return ctx, fmt.Errorf("for key %v, got value %v, want %v", k, vgot, vwant) 5621 } 5622 } 5623 return ctx, nil 5624} 5625 5626func (s) TestPerRPCCredentialsViaDialOptions(t *testing.T) { 5627 for _, e := range listTestEnv() { 5628 testPerRPCCredentialsViaDialOptions(t, e) 5629 } 5630} 5631 5632func testPerRPCCredentialsViaDialOptions(t *testing.T, e env) { 5633 te := newTest(t, e) 5634 te.tapHandle = authHandle 5635 te.perRPCCreds = testPerRPCCredentials{} 5636 te.startServer(&testServer{security: e.security}) 5637 defer te.tearDown() 5638 5639 cc := te.clientConn() 5640 tc := testpb.NewTestServiceClient(cc) 5641 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 5642 t.Fatalf("Test failed. Reason: %v", err) 5643 } 5644} 5645 5646func (s) TestPerRPCCredentialsViaCallOptions(t *testing.T) { 5647 for _, e := range listTestEnv() { 5648 testPerRPCCredentialsViaCallOptions(t, e) 5649 } 5650} 5651 5652func testPerRPCCredentialsViaCallOptions(t *testing.T, e env) { 5653 te := newTest(t, e) 5654 te.tapHandle = authHandle 5655 te.startServer(&testServer{security: e.security}) 5656 defer te.tearDown() 5657 5658 cc := te.clientConn() 5659 tc := testpb.NewTestServiceClient(cc) 5660 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil { 5661 t.Fatalf("Test failed. Reason: %v", err) 5662 } 5663} 5664 5665func (s) TestPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T) { 5666 for _, e := range listTestEnv() { 5667 testPerRPCCredentialsViaDialOptionsAndCallOptions(t, e) 5668 } 5669} 5670 5671func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) { 5672 te := newTest(t, e) 5673 te.perRPCCreds = testPerRPCCredentials{} 5674 // When credentials are provided via both dial options and call options, 5675 // we apply both sets. 5676 te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) { 5677 md, ok := metadata.FromIncomingContext(ctx) 5678 if !ok { 5679 return ctx, fmt.Errorf("couldn't find metadata in context") 5680 } 5681 for k, vwant := range authdata { 5682 vgot, ok := md[k] 5683 if !ok { 5684 return ctx, fmt.Errorf("couldn't find metadata for key %v", k) 5685 } 5686 if len(vgot) != 2 { 5687 return ctx, fmt.Errorf("len of value for key %v was %v, want 2", k, len(vgot)) 5688 } 5689 if vgot[0] != vwant || vgot[1] != vwant { 5690 return ctx, fmt.Errorf("value for %v was %v, want [%v, %v]", k, vgot, vwant, vwant) 5691 } 5692 } 5693 return ctx, nil 5694 } 5695 te.startServer(&testServer{security: e.security}) 5696 defer te.tearDown() 5697 5698 cc := te.clientConn() 5699 tc := testpb.NewTestServiceClient(cc) 5700 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil { 5701 t.Fatalf("Test failed. Reason: %v", err) 5702 } 5703} 5704 5705func (s) TestWaitForReadyConnection(t *testing.T) { 5706 for _, e := range listTestEnv() { 5707 testWaitForReadyConnection(t, e) 5708 } 5709 5710} 5711 5712func testWaitForReadyConnection(t *testing.T, e env) { 5713 te := newTest(t, e) 5714 te.userAgent = testAppUA 5715 te.startServer(&testServer{security: e.security}) 5716 defer te.tearDown() 5717 5718 cc := te.clientConn() // Non-blocking dial. 5719 tc := testpb.NewTestServiceClient(cc) 5720 ctx, cancel := context.WithTimeout(context.Background(), time.Second) 5721 defer cancel() 5722 state := cc.GetState() 5723 // Wait for connection to be Ready. 5724 for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { 5725 } 5726 if state != connectivity.Ready { 5727 t.Fatalf("Want connection state to be Ready, got %v", state) 5728 } 5729 ctx, cancel = context.WithTimeout(context.Background(), time.Second) 5730 defer cancel() 5731 // Make a fail-fast RPC. 5732 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { 5733 t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err) 5734 } 5735} 5736 5737type errCodec struct { 5738 noError bool 5739} 5740 5741func (c *errCodec) Marshal(v interface{}) ([]byte, error) { 5742 if c.noError { 5743 return []byte{}, nil 5744 } 5745 return nil, fmt.Errorf("3987^12 + 4365^12 = 4472^12") 5746} 5747 5748func (c *errCodec) Unmarshal(data []byte, v interface{}) error { 5749 return nil 5750} 5751 5752func (c *errCodec) Name() string { 5753 return "Fermat's near-miss." 5754} 5755 5756func (s) TestEncodeDoesntPanic(t *testing.T) { 5757 for _, e := range listTestEnv() { 5758 testEncodeDoesntPanic(t, e) 5759 } 5760} 5761 5762func testEncodeDoesntPanic(t *testing.T, e env) { 5763 te := newTest(t, e) 5764 erc := &errCodec{} 5765 te.customCodec = erc 5766 te.startServer(&testServer{security: e.security}) 5767 defer te.tearDown() 5768 te.customCodec = nil 5769 tc := testpb.NewTestServiceClient(te.clientConn()) 5770 // Failure case, should not panic. 5771 tc.EmptyCall(context.Background(), &testpb.Empty{}) 5772 erc.noError = true 5773 // Passing case. 5774 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { 5775 t.Fatalf("EmptyCall(_, _) = _, %v, want _, <nil>", err) 5776 } 5777} 5778 5779func (s) TestSvrWriteStatusEarlyWrite(t *testing.T) { 5780 for _, e := range listTestEnv() { 5781 testSvrWriteStatusEarlyWrite(t, e) 5782 } 5783} 5784 5785func testSvrWriteStatusEarlyWrite(t *testing.T, e env) { 5786 te := newTest(t, e) 5787 const smallSize = 1024 5788 const largeSize = 2048 5789 const extraLargeSize = 4096 5790 te.maxServerReceiveMsgSize = newInt(largeSize) 5791 te.maxServerSendMsgSize = newInt(largeSize) 5792 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 5793 if err != nil { 5794 t.Fatal(err) 5795 } 5796 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) 5797 if err != nil { 5798 t.Fatal(err) 5799 } 5800 te.startServer(&testServer{security: e.security}) 5801 defer te.tearDown() 5802 tc := testpb.NewTestServiceClient(te.clientConn()) 5803 respParam := []*testpb.ResponseParameters{ 5804 { 5805 Size: int32(smallSize), 5806 }, 5807 } 5808 sreq := &testpb.StreamingOutputCallRequest{ 5809 ResponseType: testpb.PayloadType_COMPRESSABLE, 5810 ResponseParameters: respParam, 5811 Payload: extraLargePayload, 5812 } 5813 // Test recv case: server receives a message larger than maxServerReceiveMsgSize. 5814 stream, err := tc.FullDuplexCall(te.ctx) 5815 if err != nil { 5816 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5817 } 5818 if err = stream.Send(sreq); err != nil { 5819 t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err) 5820 } 5821 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 5822 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 5823 } 5824 // Test send case: server sends a message larger than maxServerSendMsgSize. 5825 sreq.Payload = smallPayload 5826 respParam[0].Size = int32(extraLargeSize) 5827 5828 stream, err = tc.FullDuplexCall(te.ctx) 5829 if err != nil { 5830 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 5831 } 5832 if err = stream.Send(sreq); err != nil { 5833 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 5834 } 5835 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 5836 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 5837 } 5838} 5839 5840// The following functions with function name ending with TD indicates that they 5841// should be deleted after old service config API is deprecated and deleted. 5842func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) { 5843 te := newTest(t, e) 5844 // We write before read. 5845 ch := make(chan grpc.ServiceConfig, 1) 5846 te.sc = ch 5847 te.userAgent = testAppUA 5848 te.declareLogNoise( 5849 "transport: http2Client.notifyError got notified that the client transport was broken EOF", 5850 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", 5851 "grpc: addrConn.resetTransport failed to create client transport: connection error", 5852 "Failed to dial : context canceled; please retry.", 5853 ) 5854 return te, ch 5855} 5856 5857func (s) TestServiceConfigGetMethodConfigTD(t *testing.T) { 5858 for _, e := range listTestEnv() { 5859 testGetMethodConfigTD(t, e) 5860 } 5861} 5862 5863func testGetMethodConfigTD(t *testing.T, e env) { 5864 te, ch := testServiceConfigSetupTD(t, e) 5865 defer te.tearDown() 5866 5867 mc1 := grpc.MethodConfig{ 5868 WaitForReady: newBool(true), 5869 Timeout: newDuration(time.Millisecond), 5870 } 5871 mc2 := grpc.MethodConfig{WaitForReady: newBool(false)} 5872 m := make(map[string]grpc.MethodConfig) 5873 m["/grpc.testing.TestService/EmptyCall"] = mc1 5874 m["/grpc.testing.TestService/"] = mc2 5875 sc := grpc.ServiceConfig{ 5876 Methods: m, 5877 } 5878 ch <- sc 5879 5880 cc := te.clientConn() 5881 tc := testpb.NewTestServiceClient(cc) 5882 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 5883 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 5884 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5885 } 5886 5887 m = make(map[string]grpc.MethodConfig) 5888 m["/grpc.testing.TestService/UnaryCall"] = mc1 5889 m["/grpc.testing.TestService/"] = mc2 5890 sc = grpc.ServiceConfig{ 5891 Methods: m, 5892 } 5893 ch <- sc 5894 // Wait for the new service config to propagate. 5895 for { 5896 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { 5897 continue 5898 } 5899 break 5900 } 5901 // The following RPCs are expected to become fail-fast. 5902 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable { 5903 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) 5904 } 5905} 5906 5907func (s) TestServiceConfigWaitForReadyTD(t *testing.T) { 5908 for _, e := range listTestEnv() { 5909 testServiceConfigWaitForReadyTD(t, e) 5910 } 5911} 5912 5913func testServiceConfigWaitForReadyTD(t *testing.T, e env) { 5914 te, ch := testServiceConfigSetupTD(t, e) 5915 defer te.tearDown() 5916 5917 // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. 5918 mc := grpc.MethodConfig{ 5919 WaitForReady: newBool(false), 5920 Timeout: newDuration(time.Millisecond), 5921 } 5922 m := make(map[string]grpc.MethodConfig) 5923 m["/grpc.testing.TestService/EmptyCall"] = mc 5924 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5925 sc := grpc.ServiceConfig{ 5926 Methods: m, 5927 } 5928 ch <- sc 5929 5930 cc := te.clientConn() 5931 tc := testpb.NewTestServiceClient(cc) 5932 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 5933 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 5934 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5935 } 5936 if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 5937 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 5938 } 5939 5940 // Generate a service config update. 5941 // Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. 5942 mc.WaitForReady = newBool(true) 5943 m = make(map[string]grpc.MethodConfig) 5944 m["/grpc.testing.TestService/EmptyCall"] = mc 5945 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5946 sc = grpc.ServiceConfig{ 5947 Methods: m, 5948 } 5949 ch <- sc 5950 5951 // Wait for the new service config to take effect. 5952 mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") 5953 for { 5954 if !*mc.WaitForReady { 5955 time.Sleep(100 * time.Millisecond) 5956 mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") 5957 continue 5958 } 5959 break 5960 } 5961 // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. 5962 if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 5963 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5964 } 5965 if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded { 5966 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 5967 } 5968} 5969 5970func (s) TestServiceConfigTimeoutTD(t *testing.T) { 5971 for _, e := range listTestEnv() { 5972 testServiceConfigTimeoutTD(t, e) 5973 } 5974} 5975 5976func testServiceConfigTimeoutTD(t *testing.T, e env) { 5977 te, ch := testServiceConfigSetupTD(t, e) 5978 defer te.tearDown() 5979 5980 // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 5981 mc := grpc.MethodConfig{ 5982 Timeout: newDuration(time.Hour), 5983 } 5984 m := make(map[string]grpc.MethodConfig) 5985 m["/grpc.testing.TestService/EmptyCall"] = mc 5986 m["/grpc.testing.TestService/FullDuplexCall"] = mc 5987 sc := grpc.ServiceConfig{ 5988 Methods: m, 5989 } 5990 ch <- sc 5991 5992 cc := te.clientConn() 5993 tc := testpb.NewTestServiceClient(cc) 5994 // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. 5995 ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) 5996 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 5997 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 5998 } 5999 cancel() 6000 ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) 6001 if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 6002 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 6003 } 6004 cancel() 6005 6006 // Generate a service config update. 6007 // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. 6008 mc.Timeout = newDuration(time.Nanosecond) 6009 m = make(map[string]grpc.MethodConfig) 6010 m["/grpc.testing.TestService/EmptyCall"] = mc 6011 m["/grpc.testing.TestService/FullDuplexCall"] = mc 6012 sc = grpc.ServiceConfig{ 6013 Methods: m, 6014 } 6015 ch <- sc 6016 6017 // Wait for the new service config to take effect. 6018 mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") 6019 for { 6020 if *mc.Timeout != time.Nanosecond { 6021 time.Sleep(100 * time.Millisecond) 6022 mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") 6023 continue 6024 } 6025 break 6026 } 6027 6028 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 6029 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 6030 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 6031 } 6032 cancel() 6033 6034 ctx, cancel = context.WithTimeout(context.Background(), time.Hour) 6035 if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { 6036 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) 6037 } 6038 cancel() 6039} 6040 6041func (s) TestServiceConfigMaxMsgSizeTD(t *testing.T) { 6042 for _, e := range listTestEnv() { 6043 testServiceConfigMaxMsgSizeTD(t, e) 6044 } 6045} 6046 6047func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { 6048 // Setting up values and objects shared across all test cases. 6049 const smallSize = 1 6050 const largeSize = 1024 6051 const extraLargeSize = 2048 6052 6053 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) 6054 if err != nil { 6055 t.Fatal(err) 6056 } 6057 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) 6058 if err != nil { 6059 t.Fatal(err) 6060 } 6061 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) 6062 if err != nil { 6063 t.Fatal(err) 6064 } 6065 6066 mc := grpc.MethodConfig{ 6067 MaxReqSize: newInt(extraLargeSize), 6068 MaxRespSize: newInt(extraLargeSize), 6069 } 6070 6071 m := make(map[string]grpc.MethodConfig) 6072 m["/grpc.testing.TestService/UnaryCall"] = mc 6073 m["/grpc.testing.TestService/FullDuplexCall"] = mc 6074 sc := grpc.ServiceConfig{ 6075 Methods: m, 6076 } 6077 // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 6078 te1, ch1 := testServiceConfigSetupTD(t, e) 6079 te1.startServer(&testServer{security: e.security}) 6080 defer te1.tearDown() 6081 6082 ch1 <- sc 6083 tc := testpb.NewTestServiceClient(te1.clientConn()) 6084 6085 req := &testpb.SimpleRequest{ 6086 ResponseType: testpb.PayloadType_COMPRESSABLE, 6087 ResponseSize: int32(extraLargeSize), 6088 Payload: smallPayload, 6089 } 6090 // Test for unary RPC recv. 6091 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6092 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6093 } 6094 6095 // Test for unary RPC send. 6096 req.Payload = extraLargePayload 6097 req.ResponseSize = int32(smallSize) 6098 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6099 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6100 } 6101 6102 // Test for streaming RPC recv. 6103 respParam := []*testpb.ResponseParameters{ 6104 { 6105 Size: int32(extraLargeSize), 6106 }, 6107 } 6108 sreq := &testpb.StreamingOutputCallRequest{ 6109 ResponseType: testpb.PayloadType_COMPRESSABLE, 6110 ResponseParameters: respParam, 6111 Payload: smallPayload, 6112 } 6113 stream, err := tc.FullDuplexCall(te1.ctx) 6114 if err != nil { 6115 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6116 } 6117 if err := stream.Send(sreq); err != nil { 6118 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6119 } 6120 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 6121 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 6122 } 6123 6124 // Test for streaming RPC send. 6125 respParam[0].Size = int32(smallSize) 6126 sreq.Payload = extraLargePayload 6127 stream, err = tc.FullDuplexCall(te1.ctx) 6128 if err != nil { 6129 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6130 } 6131 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 6132 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 6133 } 6134 6135 // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 6136 te2, ch2 := testServiceConfigSetupTD(t, e) 6137 te2.maxClientReceiveMsgSize = newInt(1024) 6138 te2.maxClientSendMsgSize = newInt(1024) 6139 te2.startServer(&testServer{security: e.security}) 6140 defer te2.tearDown() 6141 ch2 <- sc 6142 tc = testpb.NewTestServiceClient(te2.clientConn()) 6143 6144 // Test for unary RPC recv. 6145 req.Payload = smallPayload 6146 req.ResponseSize = int32(largeSize) 6147 6148 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6149 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6150 } 6151 6152 // Test for unary RPC send. 6153 req.Payload = largePayload 6154 req.ResponseSize = int32(smallSize) 6155 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6156 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6157 } 6158 6159 // Test for streaming RPC recv. 6160 stream, err = tc.FullDuplexCall(te2.ctx) 6161 respParam[0].Size = int32(largeSize) 6162 sreq.Payload = smallPayload 6163 if err != nil { 6164 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6165 } 6166 if err := stream.Send(sreq); err != nil { 6167 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6168 } 6169 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 6170 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 6171 } 6172 6173 // Test for streaming RPC send. 6174 respParam[0].Size = int32(smallSize) 6175 sreq.Payload = largePayload 6176 stream, err = tc.FullDuplexCall(te2.ctx) 6177 if err != nil { 6178 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6179 } 6180 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 6181 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 6182 } 6183 6184 // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). 6185 te3, ch3 := testServiceConfigSetupTD(t, e) 6186 te3.maxClientReceiveMsgSize = newInt(4096) 6187 te3.maxClientSendMsgSize = newInt(4096) 6188 te3.startServer(&testServer{security: e.security}) 6189 defer te3.tearDown() 6190 ch3 <- sc 6191 tc = testpb.NewTestServiceClient(te3.clientConn()) 6192 6193 // Test for unary RPC recv. 6194 req.Payload = smallPayload 6195 req.ResponseSize = int32(largeSize) 6196 6197 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 6198 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 6199 } 6200 6201 req.ResponseSize = int32(extraLargeSize) 6202 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6203 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6204 } 6205 6206 // Test for unary RPC send. 6207 req.Payload = largePayload 6208 req.ResponseSize = int32(smallSize) 6209 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 6210 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err) 6211 } 6212 6213 req.Payload = extraLargePayload 6214 if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted { 6215 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) 6216 } 6217 6218 // Test for streaming RPC recv. 6219 stream, err = tc.FullDuplexCall(te3.ctx) 6220 if err != nil { 6221 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6222 } 6223 respParam[0].Size = int32(largeSize) 6224 sreq.Payload = smallPayload 6225 6226 if err := stream.Send(sreq); err != nil { 6227 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6228 } 6229 if _, err := stream.Recv(); err != nil { 6230 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err) 6231 } 6232 6233 respParam[0].Size = int32(extraLargeSize) 6234 6235 if err := stream.Send(sreq); err != nil { 6236 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6237 } 6238 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { 6239 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) 6240 } 6241 6242 // Test for streaming RPC send. 6243 respParam[0].Size = int32(smallSize) 6244 sreq.Payload = largePayload 6245 stream, err = tc.FullDuplexCall(te3.ctx) 6246 if err != nil { 6247 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6248 } 6249 if err := stream.Send(sreq); err != nil { 6250 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6251 } 6252 sreq.Payload = extraLargePayload 6253 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { 6254 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) 6255 } 6256} 6257 6258func (s) TestMethodFromServerStream(t *testing.T) { 6259 const testMethod = "/package.service/method" 6260 e := tcpClearRREnv 6261 te := newTest(t, e) 6262 var method string 6263 var ok bool 6264 te.unknownHandler = func(srv interface{}, stream grpc.ServerStream) error { 6265 method, ok = grpc.MethodFromServerStream(stream) 6266 return nil 6267 } 6268 6269 te.startServer(nil) 6270 defer te.tearDown() 6271 _ = te.clientConn().Invoke(context.Background(), testMethod, nil, nil) 6272 if !ok || method != testMethod { 6273 t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod) 6274 } 6275} 6276 6277func (s) TestInterceptorCanAccessCallOptions(t *testing.T) { 6278 e := tcpClearRREnv 6279 te := newTest(t, e) 6280 te.startServer(&testServer{security: e.security}) 6281 defer te.tearDown() 6282 6283 type observedOptions struct { 6284 headers []*metadata.MD 6285 trailers []*metadata.MD 6286 peer []*peer.Peer 6287 creds []credentials.PerRPCCredentials 6288 failFast []bool 6289 maxRecvSize []int 6290 maxSendSize []int 6291 compressor []string 6292 subtype []string 6293 } 6294 var observedOpts observedOptions 6295 populateOpts := func(opts []grpc.CallOption) { 6296 for _, o := range opts { 6297 switch o := o.(type) { 6298 case grpc.HeaderCallOption: 6299 observedOpts.headers = append(observedOpts.headers, o.HeaderAddr) 6300 case grpc.TrailerCallOption: 6301 observedOpts.trailers = append(observedOpts.trailers, o.TrailerAddr) 6302 case grpc.PeerCallOption: 6303 observedOpts.peer = append(observedOpts.peer, o.PeerAddr) 6304 case grpc.PerRPCCredsCallOption: 6305 observedOpts.creds = append(observedOpts.creds, o.Creds) 6306 case grpc.FailFastCallOption: 6307 observedOpts.failFast = append(observedOpts.failFast, o.FailFast) 6308 case grpc.MaxRecvMsgSizeCallOption: 6309 observedOpts.maxRecvSize = append(observedOpts.maxRecvSize, o.MaxRecvMsgSize) 6310 case grpc.MaxSendMsgSizeCallOption: 6311 observedOpts.maxSendSize = append(observedOpts.maxSendSize, o.MaxSendMsgSize) 6312 case grpc.CompressorCallOption: 6313 observedOpts.compressor = append(observedOpts.compressor, o.CompressorType) 6314 case grpc.ContentSubtypeCallOption: 6315 observedOpts.subtype = append(observedOpts.subtype, o.ContentSubtype) 6316 } 6317 } 6318 } 6319 6320 te.unaryClientInt = func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 6321 populateOpts(opts) 6322 return nil 6323 } 6324 te.streamClientInt = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 6325 populateOpts(opts) 6326 return nil, nil 6327 } 6328 6329 defaults := []grpc.CallOption{ 6330 grpc.WaitForReady(true), 6331 grpc.MaxCallRecvMsgSize(1010), 6332 } 6333 tc := testpb.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...))) 6334 6335 var headers metadata.MD 6336 var trailers metadata.MD 6337 var pr peer.Peer 6338 tc.UnaryCall(context.Background(), &testpb.SimpleRequest{}, 6339 grpc.MaxCallRecvMsgSize(100), 6340 grpc.MaxCallSendMsgSize(200), 6341 grpc.PerRPCCredentials(testPerRPCCredentials{}), 6342 grpc.Header(&headers), 6343 grpc.Trailer(&trailers), 6344 grpc.Peer(&pr)) 6345 expected := observedOptions{ 6346 failFast: []bool{false}, 6347 maxRecvSize: []int{1010, 100}, 6348 maxSendSize: []int{200}, 6349 creds: []credentials.PerRPCCredentials{testPerRPCCredentials{}}, 6350 headers: []*metadata.MD{&headers}, 6351 trailers: []*metadata.MD{&trailers}, 6352 peer: []*peer.Peer{&pr}, 6353 } 6354 6355 if !reflect.DeepEqual(expected, observedOpts) { 6356 t.Errorf("unary call did not observe expected options: expected %#v, got %#v", expected, observedOpts) 6357 } 6358 6359 observedOpts = observedOptions{} // reset 6360 6361 tc.StreamingInputCall(context.Background(), 6362 grpc.WaitForReady(false), 6363 grpc.MaxCallSendMsgSize(2020), 6364 grpc.UseCompressor("comp-type"), 6365 grpc.CallContentSubtype("json")) 6366 expected = observedOptions{ 6367 failFast: []bool{false, true}, 6368 maxRecvSize: []int{1010}, 6369 maxSendSize: []int{2020}, 6370 compressor: []string{"comp-type"}, 6371 subtype: []string{"json"}, 6372 } 6373 6374 if !reflect.DeepEqual(expected, observedOpts) { 6375 t.Errorf("streaming call did not observe expected options: expected %#v, got %#v", expected, observedOpts) 6376 } 6377} 6378 6379func (s) TestCompressorRegister(t *testing.T) { 6380 for _, e := range listTestEnv() { 6381 testCompressorRegister(t, e) 6382 } 6383} 6384 6385func testCompressorRegister(t *testing.T, e env) { 6386 te := newTest(t, e) 6387 te.clientCompression = false 6388 te.serverCompression = false 6389 te.clientUseCompression = true 6390 6391 te.startServer(&testServer{security: e.security}) 6392 defer te.tearDown() 6393 tc := testpb.NewTestServiceClient(te.clientConn()) 6394 6395 // Unary call 6396 const argSize = 271828 6397 const respSize = 314159 6398 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 6399 if err != nil { 6400 t.Fatal(err) 6401 } 6402 req := &testpb.SimpleRequest{ 6403 ResponseType: testpb.PayloadType_COMPRESSABLE, 6404 ResponseSize: respSize, 6405 Payload: payload, 6406 } 6407 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something")) 6408 if _, err := tc.UnaryCall(ctx, req); err != nil { 6409 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 6410 } 6411 // Streaming RPC 6412 ctx, cancel := context.WithCancel(context.Background()) 6413 defer cancel() 6414 stream, err := tc.FullDuplexCall(ctx) 6415 if err != nil { 6416 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) 6417 } 6418 respParam := []*testpb.ResponseParameters{ 6419 { 6420 Size: 31415, 6421 }, 6422 } 6423 payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415)) 6424 if err != nil { 6425 t.Fatal(err) 6426 } 6427 sreq := &testpb.StreamingOutputCallRequest{ 6428 ResponseType: testpb.PayloadType_COMPRESSABLE, 6429 ResponseParameters: respParam, 6430 Payload: payload, 6431 } 6432 if err := stream.Send(sreq); err != nil { 6433 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err) 6434 } 6435 if _, err := stream.Recv(); err != nil { 6436 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err) 6437 } 6438} 6439 6440func (s) TestServeExitsWhenListenerClosed(t *testing.T) { 6441 6442 ss := &stubServer{ 6443 emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { 6444 return &testpb.Empty{}, nil 6445 }, 6446 } 6447 6448 s := grpc.NewServer() 6449 testpb.RegisterTestServiceServer(s, ss) 6450 6451 lis, err := net.Listen("tcp", "localhost:0") 6452 if err != nil { 6453 t.Fatalf("Failed to create listener: %v", err) 6454 } 6455 6456 done := make(chan struct{}) 6457 go func() { 6458 s.Serve(lis) 6459 close(done) 6460 }() 6461 6462 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock()) 6463 if err != nil { 6464 t.Fatalf("Failed to dial server: %v", err) 6465 } 6466 defer cc.Close() 6467 c := testpb.NewTestServiceClient(cc) 6468 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6469 defer cancel() 6470 if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil { 6471 t.Fatalf("Failed to send test RPC to server: %v", err) 6472 } 6473 6474 if err := lis.Close(); err != nil { 6475 t.Fatalf("Failed to close listener: %v", err) 6476 } 6477 const timeout = 5 * time.Second 6478 timer := time.NewTimer(timeout) 6479 select { 6480 case <-done: 6481 return 6482 case <-timer.C: 6483 t.Fatalf("Serve did not return after %v", timeout) 6484 } 6485} 6486 6487// Service handler returns status with invalid utf8 message. 6488func (s) TestStatusInvalidUTF8Message(t *testing.T) { 6489 6490 var ( 6491 origMsg = string([]byte{0xff, 0xfe, 0xfd}) 6492 wantMsg = "���" 6493 ) 6494 6495 ss := &stubServer{ 6496 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 6497 return nil, status.Errorf(codes.Internal, origMsg) 6498 }, 6499 } 6500 if err := ss.Start(nil); err != nil { 6501 t.Fatalf("Error starting endpoint server: %v", err) 6502 } 6503 defer ss.Stop() 6504 6505 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 6506 defer cancel() 6507 6508 if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg { 6509 t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg) 6510 } 6511} 6512 6513// Service handler returns status with details and invalid utf8 message. Proto 6514// will fail to marshal the status because of the invalid utf8 message. Details 6515// will be dropped when sending. 6516func (s) TestStatusInvalidUTF8Details(t *testing.T) { 6517 6518 var ( 6519 origMsg = string([]byte{0xff, 0xfe, 0xfd}) 6520 wantMsg = "���" 6521 ) 6522 6523 ss := &stubServer{ 6524 emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { 6525 st := status.New(codes.Internal, origMsg) 6526 st, err := st.WithDetails(&testpb.Empty{}) 6527 if err != nil { 6528 return nil, err 6529 } 6530 return nil, st.Err() 6531 }, 6532 } 6533 if err := ss.Start(nil); err != nil { 6534 t.Fatalf("Error starting endpoint server: %v", err) 6535 } 6536 defer ss.Stop() 6537 6538 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 6539 defer cancel() 6540 6541 _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}) 6542 st := status.Convert(err) 6543 if st.Message() != wantMsg { 6544 t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg) 6545 } 6546 if len(st.Details()) != 0 { 6547 // Details should be dropped on the server side. 6548 t.Fatalf("RPC status contain details: %v, want no details", st.Details()) 6549 } 6550} 6551 6552func (s) TestClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T) { 6553 for _, e := range listTestEnv() { 6554 if e.httpHandler { 6555 continue 6556 } 6557 testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t, e) 6558 } 6559} 6560 6561func testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T, e env) { 6562 te := newTest(t, e) 6563 te.userAgent = testAppUA 6564 smallSize := 1024 6565 te.maxServerReceiveMsgSize = &smallSize 6566 te.startServer(&testServer{security: e.security}) 6567 defer te.tearDown() 6568 tc := testpb.NewTestServiceClient(te.clientConn()) 6569 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1048576) 6570 if err != nil { 6571 t.Fatal(err) 6572 } 6573 req := &testpb.SimpleRequest{ 6574 ResponseType: testpb.PayloadType_COMPRESSABLE, 6575 Payload: payload, 6576 } 6577 var wg sync.WaitGroup 6578 for i := 0; i < 10; i++ { 6579 wg.Add(1) 6580 go func() { 6581 defer wg.Done() 6582 for j := 0; j < 100; j++ { 6583 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) 6584 defer cancel() 6585 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.ResourceExhausted { 6586 t.Errorf("TestService/UnaryCall(_,_) = _. %v, want code: %s", err, codes.ResourceExhausted) 6587 return 6588 } 6589 } 6590 }() 6591 } 6592 wg.Wait() 6593} 6594 6595const clientAlwaysFailCredErrorMsg = "clientAlwaysFailCred always fails" 6596 6597var errClientAlwaysFailCred = errors.New(clientAlwaysFailCredErrorMsg) 6598 6599type clientAlwaysFailCred struct{} 6600 6601func (c clientAlwaysFailCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 6602 return nil, nil, errClientAlwaysFailCred 6603} 6604func (c clientAlwaysFailCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 6605 return rawConn, nil, nil 6606} 6607func (c clientAlwaysFailCred) Info() credentials.ProtocolInfo { 6608 return credentials.ProtocolInfo{} 6609} 6610func (c clientAlwaysFailCred) Clone() credentials.TransportCredentials { 6611 return nil 6612} 6613func (c clientAlwaysFailCred) OverrideServerName(s string) error { 6614 return nil 6615} 6616 6617func (s) TestFailFastRPCErrorOnBadCertificates(t *testing.T) { 6618 te := newTest(t, env{name: "bad-cred", network: "tcp", security: "clientAlwaysFailCred", balancer: "round_robin"}) 6619 te.startServer(&testServer{security: te.e.security}) 6620 defer te.tearDown() 6621 6622 opts := []grpc.DialOption{grpc.WithTransportCredentials(clientAlwaysFailCred{})} 6623 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 6624 defer cancel() 6625 cc, err := grpc.DialContext(ctx, te.srvAddr, opts...) 6626 if err != nil { 6627 t.Fatalf("Dial(_) = %v, want %v", err, nil) 6628 } 6629 defer cc.Close() 6630 6631 tc := testpb.NewTestServiceClient(cc) 6632 for i := 0; i < 1000; i++ { 6633 // This loop runs for at most 1 second. The first several RPCs will fail 6634 // with Unavailable because the connection hasn't started. When the 6635 // first connection failed with creds error, the next RPC should also 6636 // fail with the expected error. 6637 if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) { 6638 return 6639 } 6640 time.Sleep(time.Millisecond) 6641 } 6642 te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg) 6643} 6644 6645func (s) TestRPCTimeout(t *testing.T) { 6646 for _, e := range listTestEnv() { 6647 testRPCTimeout(t, e) 6648 } 6649} 6650 6651func testRPCTimeout(t *testing.T, e env) { 6652 te := newTest(t, e) 6653 te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond}) 6654 defer te.tearDown() 6655 6656 cc := te.clientConn() 6657 tc := testpb.NewTestServiceClient(cc) 6658 6659 const argSize = 2718 6660 const respSize = 314 6661 6662 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize) 6663 if err != nil { 6664 t.Fatal(err) 6665 } 6666 6667 req := &testpb.SimpleRequest{ 6668 ResponseType: testpb.PayloadType_COMPRESSABLE, 6669 ResponseSize: respSize, 6670 Payload: payload, 6671 } 6672 for i := -1; i <= 10; i++ { 6673 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond) 6674 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded { 6675 t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded) 6676 } 6677 cancel() 6678 } 6679} 6680 6681func (s) TestDisabledIOBuffers(t *testing.T) { 6682 6683 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000)) 6684 if err != nil { 6685 t.Fatalf("Failed to create payload: %v", err) 6686 } 6687 req := &testpb.StreamingOutputCallRequest{ 6688 Payload: payload, 6689 } 6690 resp := &testpb.StreamingOutputCallResponse{ 6691 Payload: payload, 6692 } 6693 6694 ss := &stubServer{ 6695 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 6696 for { 6697 in, err := stream.Recv() 6698 if err == io.EOF { 6699 return nil 6700 } 6701 if err != nil { 6702 t.Errorf("stream.Recv() = _, %v, want _, <nil>", err) 6703 return err 6704 } 6705 if !reflect.DeepEqual(in.Payload.Body, payload.Body) { 6706 t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body)) 6707 return err 6708 } 6709 if err := stream.Send(resp); err != nil { 6710 t.Errorf("stream.Send(_)= %v, want <nil>", err) 6711 return err 6712 } 6713 6714 } 6715 }, 6716 } 6717 6718 s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0)) 6719 testpb.RegisterTestServiceServer(s, ss) 6720 6721 lis, err := net.Listen("tcp", "localhost:0") 6722 if err != nil { 6723 t.Fatalf("Failed to create listener: %v", err) 6724 } 6725 6726 done := make(chan struct{}) 6727 go func() { 6728 s.Serve(lis) 6729 close(done) 6730 }() 6731 defer s.Stop() 6732 dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second) 6733 defer dcancel() 6734 cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0)) 6735 if err != nil { 6736 t.Fatalf("Failed to dial server") 6737 } 6738 defer cc.Close() 6739 c := testpb.NewTestServiceClient(cc) 6740 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6741 defer cancel() 6742 stream, err := c.FullDuplexCall(ctx, grpc.WaitForReady(true)) 6743 if err != nil { 6744 t.Fatalf("Failed to send test RPC to server") 6745 } 6746 for i := 0; i < 10; i++ { 6747 if err := stream.Send(req); err != nil { 6748 t.Fatalf("stream.Send(_) = %v, want <nil>", err) 6749 } 6750 in, err := stream.Recv() 6751 if err != nil { 6752 t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err) 6753 } 6754 if !reflect.DeepEqual(in.Payload.Body, payload.Body) { 6755 t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body)) 6756 } 6757 } 6758 stream.CloseSend() 6759 if _, err := stream.Recv(); err != io.EOF { 6760 t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err) 6761 } 6762} 6763 6764func (s) TestServerMaxHeaderListSizeClientUserViolation(t *testing.T) { 6765 for _, e := range listTestEnv() { 6766 if e.httpHandler { 6767 continue 6768 } 6769 testServerMaxHeaderListSizeClientUserViolation(t, e) 6770 } 6771} 6772 6773func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) { 6774 te := newTest(t, e) 6775 te.maxServerHeaderListSize = new(uint32) 6776 *te.maxServerHeaderListSize = 216 6777 te.startServer(&testServer{security: e.security}) 6778 defer te.tearDown() 6779 6780 cc := te.clientConn() 6781 tc := testpb.NewTestServiceClient(cc) 6782 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6783 defer cancel() 6784 metadata.AppendToOutgoingContext(ctx, "oversize", string(make([]byte, 216))) 6785 var err error 6786 if err = verifyResultWithDelay(func() (bool, error) { 6787 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal { 6788 return true, nil 6789 } 6790 return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal) 6791 }); err != nil { 6792 t.Fatal(err) 6793 } 6794} 6795 6796func (s) TestClientMaxHeaderListSizeServerUserViolation(t *testing.T) { 6797 for _, e := range listTestEnv() { 6798 if e.httpHandler { 6799 continue 6800 } 6801 testClientMaxHeaderListSizeServerUserViolation(t, e) 6802 } 6803} 6804 6805func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) { 6806 te := newTest(t, e) 6807 te.maxClientHeaderListSize = new(uint32) 6808 *te.maxClientHeaderListSize = 1 // any header server sends will violate 6809 te.startServer(&testServer{security: e.security}) 6810 defer te.tearDown() 6811 6812 cc := te.clientConn() 6813 tc := testpb.NewTestServiceClient(cc) 6814 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6815 defer cancel() 6816 var err error 6817 if err = verifyResultWithDelay(func() (bool, error) { 6818 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal { 6819 return true, nil 6820 } 6821 return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal) 6822 }); err != nil { 6823 t.Fatal(err) 6824 } 6825} 6826 6827func (s) TestServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T) { 6828 for _, e := range listTestEnv() { 6829 if e.httpHandler || e.security == "tls" { 6830 continue 6831 } 6832 testServerMaxHeaderListSizeClientIntentionalViolation(t, e) 6833 } 6834} 6835 6836func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env) { 6837 te := newTest(t, e) 6838 te.maxServerHeaderListSize = new(uint32) 6839 *te.maxServerHeaderListSize = 512 6840 te.startServer(&testServer{security: e.security}) 6841 defer te.tearDown() 6842 6843 cc, dw := te.clientConnWithConnControl() 6844 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 6845 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6846 defer cancel() 6847 stream, err := tc.FullDuplexCall(ctx) 6848 if err != nil { 6849 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) 6850 } 6851 rcw := dw.getRawConnWrapper() 6852 val := make([]string, 512) 6853 for i := range val { 6854 val[i] = "a" 6855 } 6856 // allow for client to send the initial header 6857 time.Sleep(100 * time.Millisecond) 6858 rcw.writeHeaders(http2.HeadersFrameParam{ 6859 StreamID: tc.getCurrentStreamID(), 6860 BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")), 6861 EndStream: false, 6862 EndHeaders: true, 6863 }) 6864 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal { 6865 t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal) 6866 } 6867} 6868 6869func (s) TestClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T) { 6870 for _, e := range listTestEnv() { 6871 if e.httpHandler || e.security == "tls" { 6872 continue 6873 } 6874 testClientMaxHeaderListSizeServerIntentionalViolation(t, e) 6875 } 6876} 6877 6878func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) { 6879 te := newTest(t, e) 6880 te.maxClientHeaderListSize = new(uint32) 6881 *te.maxClientHeaderListSize = 200 6882 lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true}) 6883 defer te.tearDown() 6884 cc, _ := te.clientConnWithConnControl() 6885 tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)} 6886 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6887 defer cancel() 6888 stream, err := tc.FullDuplexCall(ctx) 6889 if err != nil { 6890 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err) 6891 } 6892 var i int 6893 var rcw *rawConnWrapper 6894 for i = 0; i < 100; i++ { 6895 rcw = lw.getLastConn() 6896 if rcw != nil { 6897 break 6898 } 6899 time.Sleep(10 * time.Millisecond) 6900 continue 6901 } 6902 if i == 100 { 6903 t.Fatalf("failed to create server transport after 1s") 6904 } 6905 6906 val := make([]string, 200) 6907 for i := range val { 6908 val[i] = "a" 6909 } 6910 // allow for client to send the initial header. 6911 time.Sleep(100 * time.Millisecond) 6912 rcw.writeHeaders(http2.HeadersFrameParam{ 6913 StreamID: tc.getCurrentStreamID(), 6914 BlockFragment: rcw.encodeRawHeader("oversize", strings.Join(val, "")), 6915 EndStream: false, 6916 EndHeaders: true, 6917 }) 6918 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal { 6919 t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal) 6920 } 6921} 6922 6923func (s) TestNetPipeConn(t *testing.T) { 6924 // This test will block indefinitely if grpc writes both client and server 6925 // prefaces without either reading from the Conn. 6926 pl := testutils.NewPipeListener() 6927 s := grpc.NewServer() 6928 defer s.Stop() 6929 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 6930 return &testpb.SimpleResponse{}, nil 6931 }} 6932 testpb.RegisterTestServiceServer(s, ts) 6933 go s.Serve(pl) 6934 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 6935 defer cancel() 6936 cc, err := grpc.DialContext(ctx, "", grpc.WithInsecure(), grpc.WithDialer(pl.Dialer())) 6937 if err != nil { 6938 t.Fatalf("Error creating client: %v", err) 6939 } 6940 defer cc.Close() 6941 client := testpb.NewTestServiceClient(cc) 6942 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 6943 t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) 6944 } 6945} 6946 6947func (s) TestLargeTimeout(t *testing.T) { 6948 for _, e := range listTestEnv() { 6949 testLargeTimeout(t, e) 6950 } 6951} 6952 6953func testLargeTimeout(t *testing.T, e env) { 6954 te := newTest(t, e) 6955 te.declareLogNoise("Server.processUnaryRPC failed to write status") 6956 6957 ts := &funcServer{} 6958 te.startServer(ts) 6959 defer te.tearDown() 6960 tc := testpb.NewTestServiceClient(te.clientConn()) 6961 6962 timeouts := []time.Duration{ 6963 time.Duration(math.MaxInt64), // will be (correctly) converted to 6964 // 2562048 hours, which overflows upon converting back to an int64 6965 2562047 * time.Hour, // the largest timeout that does not overflow 6966 } 6967 6968 for i, maxTimeout := range timeouts { 6969 ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 6970 deadline, ok := ctx.Deadline() 6971 timeout := time.Until(deadline) 6972 minTimeout := maxTimeout - 5*time.Second 6973 if !ok || timeout < minTimeout || timeout > maxTimeout { 6974 t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout) 6975 return nil, status.Error(codes.OutOfRange, "deadline error") 6976 } 6977 return &testpb.SimpleResponse{}, nil 6978 } 6979 6980 ctx, cancel := context.WithTimeout(context.Background(), maxTimeout) 6981 defer cancel() 6982 6983 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 6984 t.Errorf("case %v: UnaryCall(_) = _, %v; want _, nil", i, err) 6985 } 6986 } 6987} 6988 6989// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This 6990// test ensures that the connection is re-created after GO_AWAY and not affected by the 6991// subsequent (old) connection closure. 6992func (s) TestGoAwayThenClose(t *testing.T) { 6993 6994 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) 6995 defer cancel() 6996 6997 lis1, err := net.Listen("tcp", "localhost:0") 6998 if err != nil { 6999 t.Fatalf("Error while listening. Err: %v", err) 7000 } 7001 s1 := grpc.NewServer() 7002 defer s1.Stop() 7003 ts1 := &funcServer{ 7004 unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 7005 return &testpb.SimpleResponse{}, nil 7006 }, 7007 fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error { 7008 // Wait forever. 7009 _, err := stream.Recv() 7010 if err == nil { 7011 t.Error("expected to never receive any message") 7012 } 7013 return err 7014 }, 7015 } 7016 testpb.RegisterTestServiceServer(s1, ts1) 7017 go s1.Serve(lis1) 7018 7019 conn2Established := grpcsync.NewEvent() 7020 lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established) 7021 if err != nil { 7022 t.Fatalf("Error while listening. Err: %v", err) 7023 } 7024 s2 := grpc.NewServer() 7025 defer s2.Stop() 7026 conn2Ready := grpcsync.NewEvent() 7027 ts2 := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 7028 conn2Ready.Fire() 7029 return &testpb.SimpleResponse{}, nil 7030 }} 7031 testpb.RegisterTestServiceServer(s2, ts2) 7032 go s2.Serve(lis2) 7033 7034 r, rcleanup := manual.GenerateAndRegisterManualResolver() 7035 defer rcleanup() 7036 r.InitialState(resolver.State{Addresses: []resolver.Address{ 7037 {Addr: lis1.Addr().String()}, 7038 {Addr: lis2.Addr().String()}, 7039 }}) 7040 cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure()) 7041 if err != nil { 7042 t.Fatalf("Error creating client: %v", err) 7043 } 7044 defer cc.Close() 7045 7046 client := testpb.NewTestServiceClient(cc) 7047 7048 // Should go on connection 1. We use a long-lived RPC because it will cause GracefulStop to send GO_AWAY, but the 7049 // connection doesn't get closed until the server stops and the client receives. 7050 stream, err := client.FullDuplexCall(ctx) 7051 if err != nil { 7052 t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err) 7053 } 7054 7055 // Send GO_AWAY to connection 1. 7056 go s1.GracefulStop() 7057 7058 // Wait for connection 2 to be established. 7059 <-conn2Established.Done() 7060 7061 // Close connection 1. 7062 s1.Stop() 7063 7064 // Wait for client to close. 7065 _, err = stream.Recv() 7066 if err == nil { 7067 t.Fatal("expected the stream to die, but got a successful Recv") 7068 } 7069 7070 // Do a bunch of RPCs, make sure it stays stable. These should go to connection 2. 7071 for i := 0; i < 10; i++ { 7072 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 7073 t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) 7074 } 7075 } 7076} 7077 7078func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) { 7079 lis, err := net.Listen(network, address) 7080 if err != nil { 7081 return nil, err 7082 } 7083 return notifyingListener{connEstablished: event, Listener: lis}, nil 7084} 7085 7086type notifyingListener struct { 7087 connEstablished *grpcsync.Event 7088 net.Listener 7089} 7090 7091func (lis notifyingListener) Accept() (net.Conn, error) { 7092 defer lis.connEstablished.Fire() 7093 return lis.Listener.Accept() 7094} 7095 7096func (s) TestRPCWaitsForResolver(t *testing.T) { 7097 te := testServiceConfigSetup(t, tcpClearRREnv) 7098 te.startServer(&testServer{security: tcpClearRREnv.security}) 7099 defer te.tearDown() 7100 r, rcleanup := manual.GenerateAndRegisterManualResolver() 7101 defer rcleanup() 7102 7103 te.resolverScheme = r.Scheme() 7104 te.nonBlockingDial = true 7105 cc := te.clientConn() 7106 tc := testpb.NewTestServiceClient(cc) 7107 7108 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) 7109 defer cancel() 7110 // With no resolved addresses yet, this will timeout. 7111 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { 7112 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) 7113 } 7114 7115 ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) 7116 defer cancel() 7117 go func() { 7118 time.Sleep(time.Second) 7119 r.UpdateState(resolver.State{ 7120 Addresses: []resolver.Address{{Addr: te.srvAddr}}, 7121 ServiceConfig: `{ 7122 "methodConfig": [ 7123 { 7124 "name": [ 7125 { 7126 "service": "grpc.testing.TestService", 7127 "method": "UnaryCall" 7128 } 7129 ], 7130 "maxRequestMessageBytes": 0 7131 } 7132 ] 7133 }`}) 7134 }() 7135 // We wait a second before providing a service config and resolving 7136 // addresses. So this will wait for that and then honor the 7137 // maxRequestMessageBytes it contains. 7138 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{ResponseType: testpb.PayloadType_UNCOMPRESSABLE}); status.Code(err) != codes.ResourceExhausted { 7139 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err) 7140 } 7141 if got := ctx.Err(); got != nil { 7142 t.Fatalf("ctx.Err() = %v; want nil (deadline should be set short by service config)", got) 7143 } 7144 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { 7145 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err) 7146 } 7147} 7148 7149func (s) TestHTTPHeaderFrameErrorHandlingHTTPMode(t *testing.T) { 7150 // Non-gRPC content-type fallback path. 7151 for httpCode := range transport.HTTPStatusConvTab { 7152 doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ 7153 ":status", fmt.Sprintf("%d", httpCode), 7154 "content-type", "text/html", // non-gRPC content type to switch to HTTP mode. 7155 "grpc-status", "1", // Make up a gRPC status error 7156 "grpc-status-details-bin", "???", // Make up a gRPC field parsing error 7157 }) 7158 } 7159 7160 // Missing content-type fallback path. 7161 for httpCode := range transport.HTTPStatusConvTab { 7162 doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{ 7163 ":status", fmt.Sprintf("%d", httpCode), 7164 // Omitting content type to switch to HTTP mode. 7165 "grpc-status", "1", // Make up a gRPC status error 7166 "grpc-status-details-bin", "???", // Make up a gRPC field parsing error 7167 }) 7168 } 7169 7170 // Malformed HTTP status when fallback. 7171 doHTTPHeaderTest(t, codes.Internal, []string{ 7172 ":status", "abc", 7173 // Omitting content type to switch to HTTP mode. 7174 "grpc-status", "1", // Make up a gRPC status error 7175 "grpc-status-details-bin", "???", // Make up a gRPC field parsing error 7176 }) 7177} 7178 7179// Testing erroneous ResponseHeader or Trailers-only (delivered in the first HEADERS frame). 7180func (s) TestHTTPHeaderFrameErrorHandlingInitialHeader(t *testing.T) { 7181 for _, test := range []struct { 7182 header []string 7183 errCode codes.Code 7184 }{ 7185 { 7186 // missing gRPC status. 7187 header: []string{ 7188 ":status", "403", 7189 "content-type", "application/grpc", 7190 }, 7191 errCode: codes.Unknown, 7192 }, 7193 { 7194 // malformed grpc-status. 7195 header: []string{ 7196 ":status", "502", 7197 "content-type", "application/grpc", 7198 "grpc-status", "abc", 7199 }, 7200 errCode: codes.Internal, 7201 }, 7202 { 7203 // Malformed grpc-tags-bin field. 7204 header: []string{ 7205 ":status", "502", 7206 "content-type", "application/grpc", 7207 "grpc-status", "0", 7208 "grpc-tags-bin", "???", 7209 }, 7210 errCode: codes.Internal, 7211 }, 7212 { 7213 // gRPC status error. 7214 header: []string{ 7215 ":status", "502", 7216 "content-type", "application/grpc", 7217 "grpc-status", "3", 7218 }, 7219 errCode: codes.InvalidArgument, 7220 }, 7221 } { 7222 doHTTPHeaderTest(t, test.errCode, test.header) 7223 } 7224} 7225 7226// Testing non-Trailers-only Trailers (delievered in second HEADERS frame) 7227func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { 7228 for _, test := range []struct { 7229 responseHeader []string 7230 trailer []string 7231 errCode codes.Code 7232 }{ 7233 { 7234 responseHeader: []string{ 7235 ":status", "200", 7236 "content-type", "application/grpc", 7237 }, 7238 trailer: []string{ 7239 // trailer missing grpc-status 7240 ":status", "502", 7241 }, 7242 errCode: codes.Unknown, 7243 }, 7244 { 7245 responseHeader: []string{ 7246 ":status", "404", 7247 "content-type", "application/grpc", 7248 }, 7249 trailer: []string{ 7250 // malformed grpc-status-details-bin field 7251 "grpc-status", "0", 7252 "grpc-status-details-bin", "????", 7253 }, 7254 errCode: codes.Internal, 7255 }, 7256 } { 7257 doHTTPHeaderTest(t, test.errCode, test.responseHeader, test.trailer) 7258 } 7259} 7260 7261func (s) TestHTTPHeaderFrameErrorHandlingMoreThanTwoHeaders(t *testing.T) { 7262 header := []string{ 7263 ":status", "200", 7264 "content-type", "application/grpc", 7265 } 7266 doHTTPHeaderTest(t, codes.Internal, header, header, header) 7267} 7268 7269type httpServer struct { 7270 headerFields [][]string 7271} 7272 7273func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string, endStream bool) error { 7274 if len(headerFields)%2 == 1 { 7275 panic("odd number of kv args") 7276 } 7277 7278 var buf bytes.Buffer 7279 henc := hpack.NewEncoder(&buf) 7280 for len(headerFields) > 0 { 7281 k, v := headerFields[0], headerFields[1] 7282 headerFields = headerFields[2:] 7283 henc.WriteField(hpack.HeaderField{Name: k, Value: v}) 7284 } 7285 7286 return framer.WriteHeaders(http2.HeadersFrameParam{ 7287 StreamID: sid, 7288 BlockFragment: buf.Bytes(), 7289 EndStream: endStream, 7290 EndHeaders: true, 7291 }) 7292} 7293 7294func (s *httpServer) start(t *testing.T, lis net.Listener) { 7295 // Launch an HTTP server to send back header. 7296 go func() { 7297 conn, err := lis.Accept() 7298 if err != nil { 7299 t.Errorf("Error accepting connection: %v", err) 7300 return 7301 } 7302 defer conn.Close() 7303 // Read preface sent by client. 7304 if _, err = io.ReadFull(conn, make([]byte, len(http2.ClientPreface))); err != nil { 7305 t.Errorf("Error at server-side while reading preface from client. Err: %v", err) 7306 return 7307 } 7308 reader := bufio.NewReader(conn) 7309 writer := bufio.NewWriter(conn) 7310 framer := http2.NewFramer(writer, reader) 7311 if err = framer.WriteSettingsAck(); err != nil { 7312 t.Errorf("Error at server-side while sending Settings ack. Err: %v", err) 7313 return 7314 } 7315 writer.Flush() // necessary since client is expecting preface before declaring connection fully setup. 7316 7317 var sid uint32 7318 // Read frames until a header is received. 7319 for { 7320 frame, err := framer.ReadFrame() 7321 if err != nil { 7322 t.Errorf("Error at server-side while reading frame. Err: %v", err) 7323 return 7324 } 7325 if hframe, ok := frame.(*http2.HeadersFrame); ok { 7326 sid = hframe.Header().StreamID 7327 break 7328 } 7329 } 7330 for i, headers := range s.headerFields { 7331 if err = s.writeHeader(framer, sid, headers, i == len(s.headerFields)-1); err != nil { 7332 t.Errorf("Error at server-side while writing headers. Err: %v", err) 7333 return 7334 } 7335 writer.Flush() 7336 } 7337 }() 7338} 7339 7340func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string) { 7341 lis, err := net.Listen("tcp", "localhost:0") 7342 if err != nil { 7343 t.Fatalf("Failed to listen. Err: %v", err) 7344 } 7345 defer lis.Close() 7346 server := &httpServer{ 7347 headerFields: headerFields, 7348 } 7349 server.start(t, lis) 7350 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) 7351 defer cancel() 7352 cc, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure()) 7353 if err != nil { 7354 t.Fatalf("failed to dial due to err: %v", err) 7355 } 7356 defer cc.Close() 7357 client := testpb.NewTestServiceClient(cc) 7358 stream, err := client.FullDuplexCall(ctx) 7359 if err != nil { 7360 t.Fatalf("error creating stream due to err: %v", err) 7361 } 7362 if _, err := stream.Recv(); err == nil || status.Code(err) != errCode { 7363 t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode) 7364 } 7365} 7366